Commit 8a16fef3 authored by Michael Vilim's avatar Michael Vilim

Problem: ZMQ_CONNECT_ROUTING_ID can be assigned to incoming socket connection (Issue #3191)

Solution: Add an identifier parameter for local attach to zmq::socket_base_t::attach_pipe
parent cc4d03fa
......@@ -43,9 +43,12 @@ zmq::client_t::~client_t ()
{
}
void zmq::client_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
void zmq::client_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{
LIBZMQ_UNUSED (subscribe_to_all_);
LIBZMQ_UNUSED (locally_initiated_);
zmq_assert (pipe_);
......
......@@ -49,7 +49,9 @@ class client_t : public socket_base_t
protected:
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_);
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
......
......@@ -44,9 +44,12 @@ zmq::dealer_t::~dealer_t ()
{
}
void zmq::dealer_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
void zmq::dealer_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initated_)
{
LIBZMQ_UNUSED (subscribe_to_all_);
LIBZMQ_UNUSED (locally_initated_);
zmq_assert (pipe_);
......
......@@ -51,7 +51,9 @@ class dealer_t : public socket_base_t
protected:
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_);
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_);
......
......@@ -51,9 +51,12 @@ zmq::dgram_t::~dgram_t ()
zmq_assert (!_pipe);
}
void zmq::dgram_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
void zmq::dgram_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{
LIBZMQ_UNUSED (subscribe_to_all_);
LIBZMQ_UNUSED (locally_initiated_);
zmq_assert (pipe_);
......
......@@ -48,7 +48,9 @@ class dgram_t : public socket_base_t
~dgram_t ();
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_);
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
......
......@@ -54,9 +54,12 @@ zmq::dish_t::~dish_t ()
errno_assert (rc == 0);
}
void zmq::dish_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
void zmq::dish_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{
LIBZMQ_UNUSED (subscribe_to_all_);
LIBZMQ_UNUSED (locally_initiated_);
zmq_assert (pipe_);
_fq.attach (pipe_);
......
......@@ -52,7 +52,9 @@ class dish_t : public socket_base_t
protected:
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_);
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsend (zmq::msg_t *msg_);
bool xhas_out ();
int xrecv (zmq::msg_t *msg_);
......
......@@ -44,9 +44,12 @@ zmq::gather_t::~gather_t ()
{
}
void zmq::gather_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
void zmq::gather_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{
LIBZMQ_UNUSED (subscribe_to_all_);
LIBZMQ_UNUSED (locally_initiated_);
zmq_assert (pipe_);
_fq.attach (pipe_);
......
......@@ -47,7 +47,9 @@ class gather_t : public socket_base_t
protected:
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_);
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
const blob_t &get_credential () const;
......
......@@ -47,9 +47,12 @@ zmq::pair_t::~pair_t ()
zmq_assert (!_pipe);
}
void zmq::pair_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
void zmq::pair_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{
LIBZMQ_UNUSED (subscribe_to_all_);
LIBZMQ_UNUSED (locally_initiated_);
zmq_assert (pipe_ != NULL);
......
......@@ -48,7 +48,9 @@ class pair_t : public socket_base_t
~pair_t ();
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_);
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
......
......@@ -43,7 +43,9 @@ zmq::pub_t::~pub_t ()
{
}
void zmq::pub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
void zmq::pub_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{
zmq_assert (pipe_);
......@@ -51,7 +53,7 @@ void zmq::pub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
// to receive the delimiter.
pipe_->set_nodelay ();
xpub_t::xattach_pipe (pipe_, subscribe_to_all_);
xpub_t::xattach_pipe (pipe_, subscribe_to_all_, locally_initiated_);
}
int zmq::pub_t::xrecv (class msg_t *)
......
......@@ -46,7 +46,9 @@ class pub_t : public xpub_t
~pub_t ();
// Implementations of virtual functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_ = false);
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_ = false,
bool locally_initiated_ = false);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
......
......@@ -44,9 +44,12 @@ zmq::pull_t::~pull_t ()
{
}
void zmq::pull_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
void zmq::pull_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{
LIBZMQ_UNUSED (subscribe_to_all_);
LIBZMQ_UNUSED (locally_initiated_);
zmq_assert (pipe_);
_fq.attach (pipe_);
......
......@@ -49,7 +49,9 @@ class pull_t : public socket_base_t
protected:
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_);
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
const blob_t &get_credential () const;
......
......@@ -44,9 +44,12 @@ zmq::push_t::~push_t ()
{
}
void zmq::push_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
void zmq::push_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{
LIBZMQ_UNUSED (subscribe_to_all_);
LIBZMQ_UNUSED (locally_initiated_);
// Don't delay pipe termination as there is no one
// to receive the delimiter.
......
......@@ -49,7 +49,9 @@ class push_t : public socket_base_t
protected:
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_);
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsend (zmq::msg_t *msg_);
bool xhas_out ();
void xwrite_activated (zmq::pipe_t *pipe_);
......
......@@ -47,9 +47,12 @@ zmq::radio_t::~radio_t ()
{
}
void zmq::radio_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
void zmq::radio_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{
LIBZMQ_UNUSED (subscribe_to_all_);
LIBZMQ_UNUSED (locally_initiated_);
zmq_assert (pipe_);
......
......@@ -52,7 +52,9 @@ class radio_t : public socket_base_t
~radio_t ();
// Implementations of virtual functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_ = false);
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_ = false,
bool locally_initiated_ = false);
int xsend (zmq::msg_t *msg_);
bool xhas_out ();
int xrecv (zmq::msg_t *msg_);
......
......@@ -67,7 +67,9 @@ zmq::router_t::~router_t ()
_prefetched_msg.close ();
}
void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
void zmq::router_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{
LIBZMQ_UNUSED (subscribe_to_all_);
......@@ -88,7 +90,7 @@ void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
errno_assert (rc == 0);
}
bool routing_id_ok = identify_peer (pipe_);
bool routing_id_ok = identify_peer (pipe_, locally_initiated_);
if (routing_id_ok)
_fq.attach (pipe_);
else
......@@ -166,7 +168,7 @@ void zmq::router_t::xread_activated (pipe_t *pipe_)
if (it == _anonymous_pipes.end ())
_fq.activated (pipe_);
else {
bool routing_id_ok = identify_peer (pipe_);
bool routing_id_ok = identify_peer (pipe_, false);
if (routing_id_ok) {
_anonymous_pipes.erase (it);
_fq.attach (pipe_);
......@@ -440,13 +442,13 @@ int zmq::router_t::get_peer_state (const void *routing_id_,
return res;
}
bool zmq::router_t::identify_peer (pipe_t *pipe_)
bool zmq::router_t::identify_peer (pipe_t *pipe_, bool locally_initiated_)
{
msg_t msg;
blob_t routing_id;
if (locally_initiated_ && connect_routing_id_is_set ()) {
const std::string connect_routing_id = extract_connect_routing_id ();
if (!connect_routing_id.empty ()) {
routing_id.set (
reinterpret_cast<const unsigned char *> (connect_routing_id.c_str ()),
connect_routing_id.length ());
......
......@@ -52,7 +52,9 @@ class router_t : public routing_socket_base_t
~router_t ();
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_);
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_);
......@@ -69,7 +71,7 @@ class router_t : public routing_socket_base_t
private:
// Receive peer id and update lookup map
bool identify_peer (pipe_t *pipe_);
bool identify_peer (pipe_t *pipe_, bool locally_initiated);
// Fair queueing object for inbound pipes.
fq_t _fq;
......
......@@ -44,9 +44,12 @@ zmq::scatter_t::~scatter_t ()
{
}
void zmq::scatter_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
void zmq::scatter_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{
LIBZMQ_UNUSED (subscribe_to_all_);
LIBZMQ_UNUSED (locally_initiated_);
// Don't delay pipe termination as there is no one
// to receive the delimiter.
......
......@@ -49,7 +49,9 @@ class scatter_t : public socket_base_t
protected:
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_);
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsend (zmq::msg_t *msg_);
bool xhas_out ();
void xwrite_activated (zmq::pipe_t *pipe_);
......
......@@ -48,9 +48,12 @@ zmq::server_t::~server_t ()
zmq_assert (_out_pipes.empty ());
}
void zmq::server_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
void zmq::server_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{
LIBZMQ_UNUSED (subscribe_to_all_);
LIBZMQ_UNUSED (locally_initiated_);
zmq_assert (pipe_);
......
......@@ -52,7 +52,9 @@ class server_t : public socket_base_t
~server_t ();
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_);
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
......
......@@ -341,14 +341,16 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
return 0;
}
void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
void zmq::socket_base_t::attach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{
// First, register the pipe so that we can terminate it later on.
pipe_->set_event_sink (this);
_pipes.push_back (pipe_);
// Let the derived socket type know about new pipe.
xattach_pipe (pipe_, subscribe_to_all_);
xattach_pipe (pipe_, subscribe_to_all_, locally_initiated_);
// If the socket is already being closed, ask any new pipes to terminate
// straight away.
......@@ -553,7 +555,7 @@ int zmq::socket_base_t::bind (const char *addr_)
errno_assert (rc == 0);
// Attach local end of the pipe to the socket object.
attach_pipe (new_pipes[0], true);
attach_pipe (new_pipes[0], true, true);
newpipe = new_pipes[0];
// Attach remote end of the pipe to the session object later on.
......@@ -773,7 +775,7 @@ int zmq::socket_base_t::connect (const char *addr_)
}
// Attach local end of the pipe to this socket object.
attach_pipe (new_pipes[0]);
attach_pipe (new_pipes[0], false, true);
// Save last endpoint URI
_last_endpoint.assign (addr_);
......@@ -959,7 +961,7 @@ int zmq::socket_base_t::connect (const char *addr_)
errno_assert (rc == 0);
// Attach local end of the pipe to the socket object.
attach_pipe (new_pipes[0], subscribe_to_all);
attach_pipe (new_pipes[0], subscribe_to_all, true);
newpipe = new_pipes[0];
// Attach remote end of the pipe to the session object later on.
......@@ -1809,6 +1811,11 @@ std::string zmq::routing_socket_base_t::extract_connect_routing_id ()
return res;
}
bool zmq::routing_socket_base_t::connect_routing_id_is_set ()
{
return !_connect_routing_id.empty ();
}
void zmq::routing_socket_base_t::add_out_pipe (blob_t routing_id_,
pipe_t *pipe_)
{
......
......@@ -150,7 +150,8 @@ class socket_base_t : public own_t,
// Concrete algorithms for the x- methods are to be defined by
// individual socket types.
virtual void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_ = false) = 0;
bool subscribe_to_all_ = false,
bool locally_initiated_ = false) = 0;
// The default implementation assumes there are no specific socket
// options for the particular socket type. If not so, override this
......@@ -234,7 +235,9 @@ class socket_base_t : public own_t,
int check_protocol (const std::string &protocol_);
// Register the pipe with this socket.
void attach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_ = false);
void attach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_ = false,
bool locally_initiated_ = false);
// Processes commands sent to this socket (if any). If timeout is -1,
// returns only after at least one command was processed.
......@@ -311,6 +314,7 @@ class routing_socket_base_t : public socket_base_t
// own methods
std::string extract_connect_routing_id ();
bool connect_routing_id_is_set ();
struct out_pipe_t
{
......
......@@ -57,13 +57,15 @@ zmq::stream_t::~stream_t ()
_prefetched_msg.close ();
}
void zmq::stream_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
void zmq::stream_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{
LIBZMQ_UNUSED (subscribe_to_all_);
zmq_assert (pipe_);
identify_peer (pipe_);
identify_peer (pipe_, locally_initiated_);
_fq.attach (pipe_);
}
......@@ -264,14 +266,14 @@ bool zmq::stream_t::xhas_out ()
return true;
}
void zmq::stream_t::identify_peer (pipe_t *pipe_)
void zmq::stream_t::identify_peer (pipe_t *pipe_, bool locally_initiated_)
{
// Always assign routing id for raw-socket
unsigned char buffer[5];
buffer[0] = 0;
blob_t routing_id;
if (locally_initiated_ && connect_routing_id_is_set ()) {
const std::string connect_routing_id = extract_connect_routing_id ();
if (!connect_routing_id.empty ()) {
routing_id.set (
reinterpret_cast<const unsigned char *> (connect_routing_id.c_str ()),
connect_routing_id.length ());
......
......@@ -46,7 +46,9 @@ class stream_t : public routing_socket_base_t
~stream_t ();
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_);
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
......@@ -57,7 +59,7 @@ class stream_t : public routing_socket_base_t
private:
// Generate peer's id and update lookup map
void identify_peer (pipe_t *pipe_);
void identify_peer (pipe_t *pipe_, bool locally_initiated_);
// Fair queueing object for inbound pipes.
fq_t _fq;
......
......@@ -57,8 +57,12 @@ zmq::xpub_t::~xpub_t ()
_welcome_msg.close ();
}
void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
void zmq::xpub_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{
LIBZMQ_UNUSED (locally_initiated_);
zmq_assert (pipe_);
_dist.attach (pipe_);
......
......@@ -51,7 +51,9 @@ class xpub_t : public socket_base_t
~xpub_t ();
// Implementations of virtual functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_ = false);
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_ = false,
bool locally_initiated_ = false);
int xsend (zmq::msg_t *msg_);
bool xhas_out ();
int xrecv (zmq::msg_t *msg_);
......
......@@ -55,9 +55,12 @@ zmq::xsub_t::~xsub_t ()
errno_assert (rc == 0);
}
void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
void zmq::xsub_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{
LIBZMQ_UNUSED (subscribe_to_all_);
LIBZMQ_UNUSED (locally_initiated_);
zmq_assert (pipe_);
_fq.attach (pipe_);
......
......@@ -50,7 +50,9 @@ class xsub_t : public socket_base_t
protected:
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_);
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsend (zmq::msg_t *msg_);
bool xhas_out ();
int xrecv (zmq::msg_t *msg_);
......
......@@ -189,6 +189,106 @@ void test_router_2_router (bool named_)
zmq_ctx_destroy (ctx);
}
void test_router_2_router_while_receiving ()
{
void *xbind, *zbind, *yconn;
int ret;
char buff[256];
char msg[] = "hi 1";
const char *wildcard_bind = "tcp://127.0.0.1:*";
int zero = 0;
size_t len = MAX_SOCKET_STRING;
char x_endpoint[MAX_SOCKET_STRING];
char z_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
// Create xbind socket.
xbind = zmq_socket (ctx, ZMQ_ROUTER);
assert (xbind);
ret = zmq_setsockopt (xbind, ZMQ_LINGER, &zero, sizeof (zero));
assert (0 == ret);
ret = zmq_bind (xbind, wildcard_bind);
assert (0 == ret);
ret = zmq_getsockopt (xbind, ZMQ_LAST_ENDPOINT, x_endpoint, &len);
assert (0 == ret);
// Create zbind socket.
zbind = zmq_socket (ctx, ZMQ_ROUTER);
assert (zbind);
ret = zmq_setsockopt (zbind, ZMQ_LINGER, &zero, sizeof (zero));
assert (0 == ret);
ret = zmq_bind (zbind, wildcard_bind);
assert (0 == ret);
ret = zmq_getsockopt (zbind, ZMQ_LAST_ENDPOINT, z_endpoint, &len);
assert (0 == ret);
// Create connection socket.
yconn = zmq_socket (ctx, ZMQ_ROUTER);
assert (yconn);
ret = zmq_setsockopt (yconn, ZMQ_LINGER, &zero, sizeof (zero));
assert (0 == ret);
// set identites for each socket
ret = zmq_setsockopt (xbind, ZMQ_ROUTING_ID, "X", 2);
ret = zmq_setsockopt (yconn, ZMQ_ROUTING_ID, "Y", 2);
ret = zmq_setsockopt (zbind, ZMQ_ROUTING_ID, "Z", 2);
// Connect Y to X using a routing id
ret = zmq_setsockopt (yconn, ZMQ_CONNECT_ROUTING_ID, "X", 2);
assert (0 == ret);
ret = zmq_connect (yconn, x_endpoint);
assert (0 == ret);
// Send some data from Y to X.
ret = zmq_send (yconn, "X", 2, ZMQ_SNDMORE);
assert (2 == ret);
ret = zmq_send (yconn, msg, 5, 0);
assert (5 == ret);
// wait for the Y->X message to be received
msleep (SETTLE_TIME);
// Now X tries to connect to Z and send a message
ret = zmq_setsockopt (xbind, ZMQ_CONNECT_ROUTING_ID, "Z", 2);
assert (0 == ret);
ret = zmq_connect (xbind, z_endpoint);
assert (0 == ret);
// Try to send some data from X to Z.
ret = zmq_send (xbind, "Z", 2, ZMQ_SNDMORE);
assert (2 == ret);
ret = zmq_send (xbind, msg, 5, 0);
assert (5 == ret);
// wait for the X->Z message to be received (so that our non-blocking check will actually
// fail if the message is routed to Y)
msleep (SETTLE_TIME);
// nothing should have been received on the Y socket
ret = zmq_recv (yconn, buff, 256, ZMQ_DONTWAIT);
assert (ret == -1);
assert (zmq_errno () == EAGAIN);
// the message should have been received on the Z socket
ret = zmq_recv (zbind, buff, 256, 0);
assert (ret && 'X' == buff[0]);
ret = zmq_recv (zbind, buff + 128, 128, 0);
assert (5 == ret && 'h' == buff[128]);
ret = zmq_unbind (xbind, x_endpoint);
assert (0 == ret);
ret = zmq_unbind (zbind, z_endpoint);
assert (0 == ret);
ret = zmq_close (yconn);
assert (0 == ret);
ret = zmq_close (xbind);
assert (0 == ret);
ret = zmq_close (zbind);
assert (0 == ret);
zmq_ctx_destroy (ctx);
}
int main (void)
{
setup_test_environment ();
......@@ -196,6 +296,7 @@ int main (void)
test_stream_2_stream ();
test_router_2_router (false);
test_router_2_router (true);
test_router_2_router_while_receiving ();
return 0;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment