Commit cb5eebd8 authored by Constantin Rack's avatar Constantin Rack

Merge pull request #1258 from hintjens/master

Problem: linger values other than -1 or 0 are unsafe
parents f448af94 b6e61d72
...@@ -40,6 +40,12 @@ ZMQ_IPV6: Set IPv6 option ...@@ -40,6 +40,12 @@ ZMQ_IPV6: Set IPv6 option
~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_IPV6' argument returns the IPv6 option for the context. The 'ZMQ_IPV6' argument returns the IPv6 option for the context.
ZMQ_BLOCKY: Get blocky setting
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_BLOCKY' argument returns 1 if the context will block on terminate,
zero if the "block forever on context termination" gambit was disabled by
setting ZMQ_BLOCKY to false on all new contexts.
RETURN VALUE RETURN VALUE
------------ ------------
...@@ -63,6 +69,10 @@ zmq_ctx_set (context, ZMQ_MAX_SOCKETS, 256); ...@@ -63,6 +69,10 @@ zmq_ctx_set (context, ZMQ_MAX_SOCKETS, 256);
int max_sockets = zmq_ctx_get (context, ZMQ_MAX_SOCKETS); int max_sockets = zmq_ctx_get (context, ZMQ_MAX_SOCKETS);
assert (max_sockets == 256); assert (max_sockets == 256);
---- ----
.Switching off the context deadlock gambit
----
zmq_ctx_set (ctx, ZMQ_BLOCKY, false);
----
SEE ALSO SEE ALSO
......
...@@ -21,6 +21,21 @@ The _zmq_ctx_set()_ function shall set the option specified by the ...@@ -21,6 +21,21 @@ The _zmq_ctx_set()_ function shall set the option specified by the
The _zmq_ctx_set()_ function accepts the following options: The _zmq_ctx_set()_ function accepts the following options:
ZMQ_BLOCKY: Fix blocky behavior
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
By default the context will block, forever, on a zmq_ctx_term call. The
assumption behind this behavior is that abrupt termination will cause
message loss. Most real applications use some form of handshaking to ensure
applications receive termination messages, and then terminate the context
with 'ZMQ_LINGER' set to zero on all sockets. This setting is an easier way
to get the same result. When 'ZMQ_BLOCKY' is set to false, all new sockets
are given a linger timeout of zero. You must still close all sockets before
calling zmq_term.
[horizontal]
Default value:: false (old behavior)
ZMQ_IO_THREADS: Set number of I/O threads ZMQ_IO_THREADS: Set number of I/O threads
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_IO_THREADS' argument specifies the size of the 0MQ thread pool to The 'ZMQ_IO_THREADS' argument specifies the size of the 0MQ thread pool to
......
...@@ -304,6 +304,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property); ...@@ -304,6 +304,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property);
#define ZMQ_IDENTITY_FD 67 #define ZMQ_IDENTITY_FD 67
#define ZMQ_SOCKS_PROXY 68 #define ZMQ_SOCKS_PROXY 68
#define ZMQ_XPUB_NODROP 69 #define ZMQ_XPUB_NODROP 69
#define ZMQ_BLOCKY 70
/* Message options */ /* Message options */
#define ZMQ_MORE 1 #define ZMQ_MORE 1
......
...@@ -65,6 +65,7 @@ zmq::ctx_t::ctx_t () : ...@@ -65,6 +65,7 @@ zmq::ctx_t::ctx_t () :
slots (NULL), slots (NULL),
max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)), max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
io_thread_count (ZMQ_IO_THREADS_DFLT), io_thread_count (ZMQ_IO_THREADS_DFLT),
blocky (true),
ipv6 (false), ipv6 (false),
thread_priority (ZMQ_THREAD_PRIORITY_DFLT), thread_priority (ZMQ_THREAD_PRIORITY_DFLT),
thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT) thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT)
...@@ -222,6 +223,12 @@ int zmq::ctx_t::set (int option_, int optval_) ...@@ -222,6 +223,12 @@ int zmq::ctx_t::set (int option_, int optval_)
thread_sched_policy = optval_; thread_sched_policy = optval_;
opt_sync.unlock(); opt_sync.unlock();
} }
else
if (option_ == ZMQ_BLOCKY && optval_ >= 0) {
opt_sync.lock ();
blocky = (optval_ != 0);
opt_sync.unlock ();
}
else { else {
errno = EINVAL; errno = EINVAL;
rc = -1; rc = -1;
...@@ -243,6 +250,9 @@ int zmq::ctx_t::get (int option_) ...@@ -243,6 +250,9 @@ int zmq::ctx_t::get (int option_)
else else
if (option_ == ZMQ_IPV6) if (option_ == ZMQ_IPV6)
rc = ipv6; rc = ipv6;
else
if (option_ == ZMQ_BLOCKY)
rc = blocky;
else { else {
errno = EINVAL; errno = EINVAL;
rc = -1; rc = -1;
......
...@@ -187,6 +187,9 @@ namespace zmq ...@@ -187,6 +187,9 @@ namespace zmq
// Number of I/O threads to launch. // Number of I/O threads to launch.
int io_thread_count; int io_thread_count;
// Does context wait (possibly forever) on termination?
bool blocky;
// Is IPv6 enabled on this context? // Is IPv6 enabled on this context?
bool ipv6; bool ipv6;
......
...@@ -35,7 +35,7 @@ zmq::options_t::options_t () : ...@@ -35,7 +35,7 @@ zmq::options_t::options_t () :
rcvbuf (0), rcvbuf (0),
tos (0), tos (0),
type (-1), type (-1),
linger (30000), linger (-1),
reconnect_ivl (100), reconnect_ivl (100),
reconnect_ivl_max (0), reconnect_ivl_max (0),
backlog (100), backlog (100),
......
...@@ -143,6 +143,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) : ...@@ -143,6 +143,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) :
{ {
options.socket_id = sid_; options.socket_id = sid_;
options.ipv6 = (parent_->get (ZMQ_IPV6) != 0); options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
options.linger = parent_->get (ZMQ_BLOCKY)? -1: 0;
} }
zmq::socket_base_t::~socket_base_t () zmq::socket_base_t::~socket_base_t ()
......
...@@ -43,15 +43,26 @@ int main (void) ...@@ -43,15 +43,26 @@ int main (void)
assert (zmq_ctx_get (ctx, ZMQ_IPV6) == 1); assert (zmq_ctx_get (ctx, ZMQ_IPV6) == 1);
void *router = zmq_socket (ctx, ZMQ_ROUTER); void *router = zmq_socket (ctx, ZMQ_ROUTER);
int ipv6; int value;
size_t optsize = sizeof (int); size_t optsize = sizeof (int);
rc = zmq_getsockopt (router, ZMQ_IPV6, &ipv6, &optsize); rc = zmq_getsockopt (router, ZMQ_IPV6, &value, &optsize);
assert (rc == 0); assert (rc == 0);
assert (ipv6); assert (value == 1);
rc = zmq_getsockopt (router, ZMQ_LINGER, &value, &optsize);
assert (rc == 0);
assert (value == -1);
rc = zmq_close (router); rc = zmq_close (router);
assert (rc == 0); assert (rc == 0);
rc = zmq_ctx_set (ctx, ZMQ_BLOCKY, false);
assert (zmq_ctx_get (ctx, ZMQ_BLOCKY) == 0);
router = zmq_socket (ctx, ZMQ_ROUTER);
rc = zmq_getsockopt (router, ZMQ_LINGER, &value, &optsize);
assert (rc == 0);
assert (value == 0);
rc = zmq_close (router);
assert (rc == 0);
rc = zmq_ctx_term (ctx); rc = zmq_ctx_term (ctx);
assert (rc == 0); assert (rc == 0);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment