Commit 1ef63bc2 authored by Ian Barber's avatar Ian Barber

Merge pull request #436 from hintjens/master

Renamed ZMQ_ROUTER_BEHAVIOR to ZMQ_ROUTER_MANDATORY
parents db690e3d 983ee761
...@@ -41,7 +41,7 @@ tests/test_ts_context ...@@ -41,7 +41,7 @@ tests/test_ts_context
tests/test_connect_resolve tests/test_connect_resolve
tests/test_connect_delay tests/test_connect_delay
tests/test_term_endpoint tests/test_term_endpoint
tests/test_router_behavior tests/test_router_mandatory
src/platform.hpp* src/platform.hpp*
src/stamp-h1 src/stamp-h1
perf/local_lat perf/local_lat
......
...@@ -13,8 +13,8 @@ SYNOPSIS ...@@ -13,8 +13,8 @@ SYNOPSIS
*int zmq_setsockopt (void '*socket', int 'option_name', const void '*option_value', size_t 'option_len');* *int zmq_setsockopt (void '*socket', int 'option_name', const void '*option_value', size_t 'option_len');*
Caution: All options, with the exception of ZMQ_SUBSCRIBE, ZMQ_UNSUBSCRIBE, Caution: All options, with the exception of ZMQ_SUBSCRIBE, ZMQ_UNSUBSCRIBE,
ZMQ_LINGER and ZMQ_ROUTER_BEHAVIOR only take effect for subsequent socket ZMQ_LINGER, ZMQ_ROUTER_MANDATORY and ZMQ_XPUB_VERBOSE only take effect for
bind/connects. subsequent socket bind/connects.
DESCRIPTION DESCRIPTION
----------- -----------
...@@ -352,8 +352,8 @@ Default value:: 1 (true) ...@@ -352,8 +352,8 @@ Default value:: 1 (true)
Applicable socket types:: all, when using TCP transports. Applicable socket types:: all, when using TCP transports.
ZMQ_DELAY_ATTACH_ON_CONNECT ZMQ_DELAY_ATTACH_ON_CONNECT: Accept messages only when connections are made
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
If set to `1`, will delay the attachment of a pipe on connect until the underlying If set to `1`, will delay the attachment of a pipe on connect until the underlying
connection has completed. This will cause the socket to block if there are no other connection has completed. This will cause the socket to block if there are no other
...@@ -363,21 +363,20 @@ connections, but will prevent queues from filling on pipes awaiting connection. ...@@ -363,21 +363,20 @@ connections, but will prevent queues from filling on pipes awaiting connection.
Option value type:: int Option value type:: int
Option value unit:: boolean Option value unit:: boolean
Default value:: 0 (false) Default value:: 0 (false)
Applicable socket types:: all, primarily when using TCP/IPC transports. Applicable socket types:: all, only for connection-oriented transports.
ZMQ_ROUTER_BEHAVIOR: Set the ROUTER socket behavior ZMQ_ROUTER_MANDATORY: accept only routable messages on ROUTER sockets
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the 'ROUTER' socket behavior when an unroutable message is encountered. A value Sets the 'ROUTER' socket behavior when an unroutable message is encountered. A
of `0` is the default when the message is silently discarded, while a value of `1` value of `0` is the default and discards the message silently when it cannot be
forces the sending to fail with an 'EAGAIN' error code, effectively enabling sending routed. A value of `1` returns an 'EAGAIN' error code if the message cannot be
messages in a blocking fashion. routed.
Note: Setting this socket option may have unpredictable effects on reactor-type Note: Setting this socket option may have unpredictable effects on reactor-type
libraries that assume EAGAIN will only be sent in HWM-type situations. libraries that assume EAGAIN will only be sent in HWM-type situations.
[horizontal] [horizontal]
Option value type:: int Option value type:: int
Option value unit:: 0, 1 Option value unit:: 0, 1
...@@ -385,12 +384,12 @@ Default value:: 0 ...@@ -385,12 +384,12 @@ Default value:: 0
Applicable socket types:: ZMQ_ROUTER Applicable socket types:: ZMQ_ROUTER
ZMQ_XPUB_VERBOSE: Set the XPUB socket behavior ZMQ_XPUB_VERBOSE: provide all subscription messages on XPUB sockets
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the 'XPUB' socket behavior on new subscriptions. A value of '0' is the default Sets the 'XPUB' socket behavior on new subscriptions and unsubscriptions.
and passes only new subscription messages to upstream. A value of '1' passes all A value of '0' is the default and passes only new subscription messages to
subscription messages upstream. upstream. A value of '1' passes all subscription messages upstream.
[horizontal] [horizontal]
Option value type:: int Option value type:: int
......
...@@ -242,7 +242,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); ...@@ -242,7 +242,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_SNDTIMEO 28 #define ZMQ_SNDTIMEO 28
#define ZMQ_IPV4ONLY 31 #define ZMQ_IPV4ONLY 31
#define ZMQ_LAST_ENDPOINT 32 #define ZMQ_LAST_ENDPOINT 32
#define ZMQ_ROUTER_BEHAVIOR 33 #define ZMQ_ROUTER_MANDATORY 33
#define ZMQ_TCP_KEEPALIVE 34 #define ZMQ_TCP_KEEPALIVE 34
#define ZMQ_TCP_KEEPALIVE_CNT 35 #define ZMQ_TCP_KEEPALIVE_CNT 35
#define ZMQ_TCP_KEEPALIVE_IDLE 36 #define ZMQ_TCP_KEEPALIVE_IDLE 36
...@@ -251,14 +251,17 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); ...@@ -251,14 +251,17 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_DELAY_ATTACH_ON_CONNECT 39 #define ZMQ_DELAY_ATTACH_ON_CONNECT 39
#define ZMQ_XPUB_VERBOSE 40 #define ZMQ_XPUB_VERBOSE 40
/* Message options */ /* Message options */
#define ZMQ_MORE 1 #define ZMQ_MORE 1
/* Send/recv options. */ /* Send/recv options. */
#define ZMQ_DONTWAIT 1 #define ZMQ_DONTWAIT 1
#define ZMQ_SNDMORE 2 #define ZMQ_SNDMORE 2
/* Deprecated aliases */ /* Deprecated aliases */
#define ZMQ_NOBLOCK ZMQ_DONTWAIT #define ZMQ_NOBLOCK ZMQ_DONTWAIT
#define ZMQ_ROUTER_BEHAVIOR ZMQ_ROUTER_MANDATORY
/******************************************************************************/ /******************************************************************************/
/* 0MQ socket events and monitoring */ /* 0MQ socket events and monitoring */
......
...@@ -35,7 +35,7 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : ...@@ -35,7 +35,7 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
current_out (NULL), current_out (NULL),
more_out (false), more_out (false),
next_peer_id (generate_random ()), next_peer_id (generate_random ()),
report_unroutable(false) mandatory(false)
{ {
options.type = ZMQ_ROUTER; options.type = ZMQ_ROUTER;
...@@ -76,7 +76,7 @@ void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) ...@@ -76,7 +76,7 @@ void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
int zmq::router_t::xsetsockopt (int option_, const void *optval_, int zmq::router_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_) size_t optvallen_)
{ {
if (option_ != ZMQ_ROUTER_BEHAVIOR) { if (option_ != ZMQ_ROUTER_MANDATORY) {
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
...@@ -84,7 +84,7 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_, ...@@ -84,7 +84,7 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
report_unroutable = *static_cast <const int*> (optval_); mandatory = *static_cast <const int*> (optval_);
return 0; return 0;
} }
...@@ -158,7 +158,9 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_) ...@@ -158,7 +158,9 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_)
it->second.active = false; it->second.active = false;
current_out = NULL; current_out = NULL;
} }
} else if (report_unroutable) { }
else
if (mandatory) {
more_out = false; more_out = false;
errno = EAGAIN; errno = EAGAIN;
return -1; return -1;
......
...@@ -112,7 +112,7 @@ namespace zmq ...@@ -112,7 +112,7 @@ namespace zmq
// If true, report EAGAIN to the caller instead of silently dropping // If true, report EAGAIN to the caller instead of silently dropping
// the message targeting an unknown peer. // the message targeting an unknown peer.
bool report_unroutable; bool mandatory;
router_t (const router_t&); router_t (const router_t&);
const router_t &operator = (const router_t&); const router_t &operator = (const router_t&);
......
...@@ -17,7 +17,7 @@ noinst_PROGRAMS = test_pair_inproc \ ...@@ -17,7 +17,7 @@ noinst_PROGRAMS = test_pair_inproc \
test_last_endpoint \ test_last_endpoint \
test_term_endpoint \ test_term_endpoint \
test_monitor \ test_monitor \
test_router_behavior test_router_mandatory
if !ON_MINGW if !ON_MINGW
noinst_PROGRAMS += test_shutdown_stress \ noinst_PROGRAMS += test_shutdown_stress \
...@@ -40,7 +40,7 @@ test_connect_delay_SOURCES = test_connect_delay.cpp ...@@ -40,7 +40,7 @@ test_connect_delay_SOURCES = test_connect_delay.cpp
test_last_endpoint_SOURCES = test_last_endpoint.cpp test_last_endpoint_SOURCES = test_last_endpoint.cpp
test_term_endpoint_SOURCES = test_term_endpoint.cpp test_term_endpoint_SOURCES = test_term_endpoint.cpp
test_monitor_SOURCES = test_monitor.cpp test_monitor_SOURCES = test_monitor.cpp
test_router_behavior_SOURCES = test_router_behavior.cpp test_router_mandatory_SOURCES = test_router_mandatory.cpp
if !ON_MINGW if !ON_MINGW
test_shutdown_stress_SOURCES = test_shutdown_stress.cpp test_shutdown_stress_SOURCES = test_shutdown_stress.cpp
......
...@@ -24,7 +24,7 @@ ...@@ -24,7 +24,7 @@
int main (void) int main (void)
{ {
fprintf (stderr, "test_router_behavior running...\n"); fprintf (stderr, "test_router_mandatory running...\n");
void *ctx = zmq_init (1); void *ctx = zmq_init (1);
assert (ctx); assert (ctx);
...@@ -36,19 +36,19 @@ int main (void) ...@@ -36,19 +36,19 @@ int main (void)
int rc = zmq_bind (sa, "tcp://127.0.0.1:15560"); int rc = zmq_bind (sa, "tcp://127.0.0.1:15560");
assert (rc == 0); assert (rc == 0);
// Sending a message to an unknown peer with the default behavior. // Sending a message to an unknown peer with the default setting
rc = zmq_send (sa, "UNKNOWN", 7, ZMQ_SNDMORE); rc = zmq_send (sa, "UNKNOWN", 7, ZMQ_SNDMORE);
assert (rc == 7); assert (rc == 7);
rc = zmq_send (sa, "DATA", 4, 0); rc = zmq_send (sa, "DATA", 4, 0);
assert (rc == 4); assert (rc == 4);
int behavior = 1; int mandatory = 1;
// Setting the socket behavior to a new mode. // Set mandatory routing on socket
rc = zmq_setsockopt (sa, ZMQ_ROUTER_BEHAVIOR, &behavior, sizeof (behavior)); rc = zmq_setsockopt (sa, ZMQ_ROUTER_MANDATORY, &mandatory, sizeof (mandatory));
assert (rc == 0); assert (rc == 0);
// Sending a message to an unknown peer with verbose behavior. // Send a message and check that it fails
rc = zmq_send (sa, "UNKNOWN", 7, ZMQ_SNDMORE | ZMQ_DONTWAIT); rc = zmq_send (sa, "UNKNOWN", 7, ZMQ_SNDMORE | ZMQ_DONTWAIT);
assert (rc == -1 && errno == EAGAIN); assert (rc == -1 && errno == EAGAIN);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment