Unverified Commit bbcdb961 authored by Luca Boccassi's avatar Luca Boccassi Committed by GitHub

Merge pull request #3193 from mvilim/local_attach

Race condition with received message causes ZMQ_CONNECT_ROUTING_ID to be assigned to wrong socket
parents 0aa222d0 8a16fef3
...@@ -43,9 +43,12 @@ zmq::client_t::~client_t () ...@@ -43,9 +43,12 @@ zmq::client_t::~client_t ()
{ {
} }
void zmq::client_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::client_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{ {
LIBZMQ_UNUSED (subscribe_to_all_); LIBZMQ_UNUSED (subscribe_to_all_);
LIBZMQ_UNUSED (locally_initiated_);
zmq_assert (pipe_); zmq_assert (pipe_);
......
...@@ -49,7 +49,9 @@ class client_t : public socket_base_t ...@@ -49,7 +49,9 @@ class client_t : public socket_base_t
protected: protected:
// Overrides of functions from socket_base_t. // Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsend (zmq::msg_t *msg_); int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_);
bool xhas_in (); bool xhas_in ();
......
...@@ -44,9 +44,12 @@ zmq::dealer_t::~dealer_t () ...@@ -44,9 +44,12 @@ zmq::dealer_t::~dealer_t ()
{ {
} }
void zmq::dealer_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::dealer_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initated_)
{ {
LIBZMQ_UNUSED (subscribe_to_all_); LIBZMQ_UNUSED (subscribe_to_all_);
LIBZMQ_UNUSED (locally_initated_);
zmq_assert (pipe_); zmq_assert (pipe_);
......
...@@ -51,7 +51,9 @@ class dealer_t : public socket_base_t ...@@ -51,7 +51,9 @@ class dealer_t : public socket_base_t
protected: protected:
// Overrides of functions from socket_base_t. // Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq::msg_t *msg_); int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_);
......
...@@ -51,9 +51,12 @@ zmq::dgram_t::~dgram_t () ...@@ -51,9 +51,12 @@ zmq::dgram_t::~dgram_t ()
zmq_assert (!_pipe); zmq_assert (!_pipe);
} }
void zmq::dgram_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::dgram_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{ {
LIBZMQ_UNUSED (subscribe_to_all_); LIBZMQ_UNUSED (subscribe_to_all_);
LIBZMQ_UNUSED (locally_initiated_);
zmq_assert (pipe_); zmq_assert (pipe_);
......
...@@ -48,7 +48,9 @@ class dgram_t : public socket_base_t ...@@ -48,7 +48,9 @@ class dgram_t : public socket_base_t
~dgram_t (); ~dgram_t ();
// Overrides of functions from socket_base_t. // Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsend (zmq::msg_t *msg_); int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_);
bool xhas_in (); bool xhas_in ();
......
...@@ -54,9 +54,12 @@ zmq::dish_t::~dish_t () ...@@ -54,9 +54,12 @@ zmq::dish_t::~dish_t ()
errno_assert (rc == 0); errno_assert (rc == 0);
} }
void zmq::dish_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::dish_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{ {
LIBZMQ_UNUSED (subscribe_to_all_); LIBZMQ_UNUSED (subscribe_to_all_);
LIBZMQ_UNUSED (locally_initiated_);
zmq_assert (pipe_); zmq_assert (pipe_);
_fq.attach (pipe_); _fq.attach (pipe_);
......
...@@ -52,7 +52,9 @@ class dish_t : public socket_base_t ...@@ -52,7 +52,9 @@ class dish_t : public socket_base_t
protected: protected:
// Overrides of functions from socket_base_t. // Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsend (zmq::msg_t *msg_); int xsend (zmq::msg_t *msg_);
bool xhas_out (); bool xhas_out ();
int xrecv (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_);
......
...@@ -44,9 +44,12 @@ zmq::gather_t::~gather_t () ...@@ -44,9 +44,12 @@ zmq::gather_t::~gather_t ()
{ {
} }
void zmq::gather_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::gather_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{ {
LIBZMQ_UNUSED (subscribe_to_all_); LIBZMQ_UNUSED (subscribe_to_all_);
LIBZMQ_UNUSED (locally_initiated_);
zmq_assert (pipe_); zmq_assert (pipe_);
_fq.attach (pipe_); _fq.attach (pipe_);
......
...@@ -47,7 +47,9 @@ class gather_t : public socket_base_t ...@@ -47,7 +47,9 @@ class gather_t : public socket_base_t
protected: protected:
// Overrides of functions from socket_base_t. // Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xrecv (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_);
bool xhas_in (); bool xhas_in ();
const blob_t &get_credential () const; const blob_t &get_credential () const;
......
...@@ -47,9 +47,12 @@ zmq::pair_t::~pair_t () ...@@ -47,9 +47,12 @@ zmq::pair_t::~pair_t ()
zmq_assert (!_pipe); zmq_assert (!_pipe);
} }
void zmq::pair_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::pair_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{ {
LIBZMQ_UNUSED (subscribe_to_all_); LIBZMQ_UNUSED (subscribe_to_all_);
LIBZMQ_UNUSED (locally_initiated_);
zmq_assert (pipe_ != NULL); zmq_assert (pipe_ != NULL);
......
...@@ -48,7 +48,9 @@ class pair_t : public socket_base_t ...@@ -48,7 +48,9 @@ class pair_t : public socket_base_t
~pair_t (); ~pair_t ();
// Overrides of functions from socket_base_t. // Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsend (zmq::msg_t *msg_); int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_);
bool xhas_in (); bool xhas_in ();
......
...@@ -43,7 +43,9 @@ zmq::pub_t::~pub_t () ...@@ -43,7 +43,9 @@ zmq::pub_t::~pub_t ()
{ {
} }
void zmq::pub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::pub_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{ {
zmq_assert (pipe_); zmq_assert (pipe_);
...@@ -51,7 +53,7 @@ void zmq::pub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) ...@@ -51,7 +53,7 @@ void zmq::pub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
// to receive the delimiter. // to receive the delimiter.
pipe_->set_nodelay (); pipe_->set_nodelay ();
xpub_t::xattach_pipe (pipe_, subscribe_to_all_); xpub_t::xattach_pipe (pipe_, subscribe_to_all_, locally_initiated_);
} }
int zmq::pub_t::xrecv (class msg_t *) int zmq::pub_t::xrecv (class msg_t *)
......
...@@ -46,7 +46,9 @@ class pub_t : public xpub_t ...@@ -46,7 +46,9 @@ class pub_t : public xpub_t
~pub_t (); ~pub_t ();
// Implementations of virtual functions from socket_base_t. // Implementations of virtual functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_ = false); void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_ = false,
bool locally_initiated_ = false);
int xrecv (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_);
bool xhas_in (); bool xhas_in ();
......
...@@ -44,9 +44,12 @@ zmq::pull_t::~pull_t () ...@@ -44,9 +44,12 @@ zmq::pull_t::~pull_t ()
{ {
} }
void zmq::pull_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::pull_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{ {
LIBZMQ_UNUSED (subscribe_to_all_); LIBZMQ_UNUSED (subscribe_to_all_);
LIBZMQ_UNUSED (locally_initiated_);
zmq_assert (pipe_); zmq_assert (pipe_);
_fq.attach (pipe_); _fq.attach (pipe_);
......
...@@ -49,7 +49,9 @@ class pull_t : public socket_base_t ...@@ -49,7 +49,9 @@ class pull_t : public socket_base_t
protected: protected:
// Overrides of functions from socket_base_t. // Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xrecv (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_);
bool xhas_in (); bool xhas_in ();
const blob_t &get_credential () const; const blob_t &get_credential () const;
......
...@@ -44,9 +44,12 @@ zmq::push_t::~push_t () ...@@ -44,9 +44,12 @@ zmq::push_t::~push_t ()
{ {
} }
void zmq::push_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::push_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{ {
LIBZMQ_UNUSED (subscribe_to_all_); LIBZMQ_UNUSED (subscribe_to_all_);
LIBZMQ_UNUSED (locally_initiated_);
// Don't delay pipe termination as there is no one // Don't delay pipe termination as there is no one
// to receive the delimiter. // to receive the delimiter.
......
...@@ -49,7 +49,9 @@ class push_t : public socket_base_t ...@@ -49,7 +49,9 @@ class push_t : public socket_base_t
protected: protected:
// Overrides of functions from socket_base_t. // Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsend (zmq::msg_t *msg_); int xsend (zmq::msg_t *msg_);
bool xhas_out (); bool xhas_out ();
void xwrite_activated (zmq::pipe_t *pipe_); void xwrite_activated (zmq::pipe_t *pipe_);
......
...@@ -47,9 +47,12 @@ zmq::radio_t::~radio_t () ...@@ -47,9 +47,12 @@ zmq::radio_t::~radio_t ()
{ {
} }
void zmq::radio_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::radio_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{ {
LIBZMQ_UNUSED (subscribe_to_all_); LIBZMQ_UNUSED (subscribe_to_all_);
LIBZMQ_UNUSED (locally_initiated_);
zmq_assert (pipe_); zmq_assert (pipe_);
......
...@@ -52,7 +52,9 @@ class radio_t : public socket_base_t ...@@ -52,7 +52,9 @@ class radio_t : public socket_base_t
~radio_t (); ~radio_t ();
// Implementations of virtual functions from socket_base_t. // Implementations of virtual functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_ = false); void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_ = false,
bool locally_initiated_ = false);
int xsend (zmq::msg_t *msg_); int xsend (zmq::msg_t *msg_);
bool xhas_out (); bool xhas_out ();
int xrecv (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_);
......
...@@ -67,7 +67,9 @@ zmq::router_t::~router_t () ...@@ -67,7 +67,9 @@ zmq::router_t::~router_t ()
_prefetched_msg.close (); _prefetched_msg.close ();
} }
void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::router_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{ {
LIBZMQ_UNUSED (subscribe_to_all_); LIBZMQ_UNUSED (subscribe_to_all_);
...@@ -88,7 +90,7 @@ void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) ...@@ -88,7 +90,7 @@ void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
errno_assert (rc == 0); errno_assert (rc == 0);
} }
bool routing_id_ok = identify_peer (pipe_); bool routing_id_ok = identify_peer (pipe_, locally_initiated_);
if (routing_id_ok) if (routing_id_ok)
_fq.attach (pipe_); _fq.attach (pipe_);
else else
...@@ -166,7 +168,7 @@ void zmq::router_t::xread_activated (pipe_t *pipe_) ...@@ -166,7 +168,7 @@ void zmq::router_t::xread_activated (pipe_t *pipe_)
if (it == _anonymous_pipes.end ()) if (it == _anonymous_pipes.end ())
_fq.activated (pipe_); _fq.activated (pipe_);
else { else {
bool routing_id_ok = identify_peer (pipe_); bool routing_id_ok = identify_peer (pipe_, false);
if (routing_id_ok) { if (routing_id_ok) {
_anonymous_pipes.erase (it); _anonymous_pipes.erase (it);
_fq.attach (pipe_); _fq.attach (pipe_);
...@@ -440,13 +442,13 @@ int zmq::router_t::get_peer_state (const void *routing_id_, ...@@ -440,13 +442,13 @@ int zmq::router_t::get_peer_state (const void *routing_id_,
return res; return res;
} }
bool zmq::router_t::identify_peer (pipe_t *pipe_) bool zmq::router_t::identify_peer (pipe_t *pipe_, bool locally_initiated_)
{ {
msg_t msg; msg_t msg;
blob_t routing_id; blob_t routing_id;
const std::string connect_routing_id = extract_connect_routing_id (); if (locally_initiated_ && connect_routing_id_is_set ()) {
if (!connect_routing_id.empty ()) { const std::string connect_routing_id = extract_connect_routing_id ();
routing_id.set ( routing_id.set (
reinterpret_cast<const unsigned char *> (connect_routing_id.c_str ()), reinterpret_cast<const unsigned char *> (connect_routing_id.c_str ()),
connect_routing_id.length ()); connect_routing_id.length ());
......
...@@ -52,7 +52,9 @@ class router_t : public routing_socket_base_t ...@@ -52,7 +52,9 @@ class router_t : public routing_socket_base_t
~router_t (); ~router_t ();
// Overrides of functions from socket_base_t. // Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq::msg_t *msg_); int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_);
...@@ -69,7 +71,7 @@ class router_t : public routing_socket_base_t ...@@ -69,7 +71,7 @@ class router_t : public routing_socket_base_t
private: private:
// Receive peer id and update lookup map // Receive peer id and update lookup map
bool identify_peer (pipe_t *pipe_); bool identify_peer (pipe_t *pipe_, bool locally_initiated);
// Fair queueing object for inbound pipes. // Fair queueing object for inbound pipes.
fq_t _fq; fq_t _fq;
......
...@@ -44,9 +44,12 @@ zmq::scatter_t::~scatter_t () ...@@ -44,9 +44,12 @@ zmq::scatter_t::~scatter_t ()
{ {
} }
void zmq::scatter_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::scatter_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{ {
LIBZMQ_UNUSED (subscribe_to_all_); LIBZMQ_UNUSED (subscribe_to_all_);
LIBZMQ_UNUSED (locally_initiated_);
// Don't delay pipe termination as there is no one // Don't delay pipe termination as there is no one
// to receive the delimiter. // to receive the delimiter.
......
...@@ -49,7 +49,9 @@ class scatter_t : public socket_base_t ...@@ -49,7 +49,9 @@ class scatter_t : public socket_base_t
protected: protected:
// Overrides of functions from socket_base_t. // Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsend (zmq::msg_t *msg_); int xsend (zmq::msg_t *msg_);
bool xhas_out (); bool xhas_out ();
void xwrite_activated (zmq::pipe_t *pipe_); void xwrite_activated (zmq::pipe_t *pipe_);
......
...@@ -48,9 +48,12 @@ zmq::server_t::~server_t () ...@@ -48,9 +48,12 @@ zmq::server_t::~server_t ()
zmq_assert (_out_pipes.empty ()); zmq_assert (_out_pipes.empty ());
} }
void zmq::server_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::server_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{ {
LIBZMQ_UNUSED (subscribe_to_all_); LIBZMQ_UNUSED (subscribe_to_all_);
LIBZMQ_UNUSED (locally_initiated_);
zmq_assert (pipe_); zmq_assert (pipe_);
......
...@@ -52,7 +52,9 @@ class server_t : public socket_base_t ...@@ -52,7 +52,9 @@ class server_t : public socket_base_t
~server_t (); ~server_t ();
// Overrides of functions from socket_base_t. // Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsend (zmq::msg_t *msg_); int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_);
bool xhas_in (); bool xhas_in ();
......
...@@ -341,14 +341,16 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) ...@@ -341,14 +341,16 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
return 0; return 0;
} }
void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::socket_base_t::attach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{ {
// First, register the pipe so that we can terminate it later on. // First, register the pipe so that we can terminate it later on.
pipe_->set_event_sink (this); pipe_->set_event_sink (this);
_pipes.push_back (pipe_); _pipes.push_back (pipe_);
// Let the derived socket type know about new pipe. // Let the derived socket type know about new pipe.
xattach_pipe (pipe_, subscribe_to_all_); xattach_pipe (pipe_, subscribe_to_all_, locally_initiated_);
// If the socket is already being closed, ask any new pipes to terminate // If the socket is already being closed, ask any new pipes to terminate
// straight away. // straight away.
...@@ -553,7 +555,7 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -553,7 +555,7 @@ int zmq::socket_base_t::bind (const char *addr_)
errno_assert (rc == 0); errno_assert (rc == 0);
// Attach local end of the pipe to the socket object. // Attach local end of the pipe to the socket object.
attach_pipe (new_pipes[0], true); attach_pipe (new_pipes[0], true, true);
newpipe = new_pipes[0]; newpipe = new_pipes[0];
// Attach remote end of the pipe to the session object later on. // Attach remote end of the pipe to the session object later on.
...@@ -773,7 +775,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -773,7 +775,7 @@ int zmq::socket_base_t::connect (const char *addr_)
} }
// Attach local end of the pipe to this socket object. // Attach local end of the pipe to this socket object.
attach_pipe (new_pipes[0]); attach_pipe (new_pipes[0], false, true);
// Save last endpoint URI // Save last endpoint URI
_last_endpoint.assign (addr_); _last_endpoint.assign (addr_);
...@@ -959,7 +961,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -959,7 +961,7 @@ int zmq::socket_base_t::connect (const char *addr_)
errno_assert (rc == 0); errno_assert (rc == 0);
// Attach local end of the pipe to the socket object. // Attach local end of the pipe to the socket object.
attach_pipe (new_pipes[0], subscribe_to_all); attach_pipe (new_pipes[0], subscribe_to_all, true);
newpipe = new_pipes[0]; newpipe = new_pipes[0];
// Attach remote end of the pipe to the session object later on. // Attach remote end of the pipe to the session object later on.
...@@ -1809,6 +1811,11 @@ std::string zmq::routing_socket_base_t::extract_connect_routing_id () ...@@ -1809,6 +1811,11 @@ std::string zmq::routing_socket_base_t::extract_connect_routing_id ()
return res; return res;
} }
bool zmq::routing_socket_base_t::connect_routing_id_is_set ()
{
return !_connect_routing_id.empty ();
}
void zmq::routing_socket_base_t::add_out_pipe (blob_t routing_id_, void zmq::routing_socket_base_t::add_out_pipe (blob_t routing_id_,
pipe_t *pipe_) pipe_t *pipe_)
{ {
......
...@@ -150,7 +150,8 @@ class socket_base_t : public own_t, ...@@ -150,7 +150,8 @@ class socket_base_t : public own_t,
// Concrete algorithms for the x- methods are to be defined by // Concrete algorithms for the x- methods are to be defined by
// individual socket types. // individual socket types.
virtual void xattach_pipe (zmq::pipe_t *pipe_, virtual void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_ = false) = 0; bool subscribe_to_all_ = false,
bool locally_initiated_ = false) = 0;
// The default implementation assumes there are no specific socket // The default implementation assumes there are no specific socket
// options for the particular socket type. If not so, override this // options for the particular socket type. If not so, override this
...@@ -234,7 +235,9 @@ class socket_base_t : public own_t, ...@@ -234,7 +235,9 @@ class socket_base_t : public own_t,
int check_protocol (const std::string &protocol_); int check_protocol (const std::string &protocol_);
// Register the pipe with this socket. // Register the pipe with this socket.
void attach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_ = false); void attach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_ = false,
bool locally_initiated_ = false);
// Processes commands sent to this socket (if any). If timeout is -1, // Processes commands sent to this socket (if any). If timeout is -1,
// returns only after at least one command was processed. // returns only after at least one command was processed.
...@@ -311,6 +314,7 @@ class routing_socket_base_t : public socket_base_t ...@@ -311,6 +314,7 @@ class routing_socket_base_t : public socket_base_t
// own methods // own methods
std::string extract_connect_routing_id (); std::string extract_connect_routing_id ();
bool connect_routing_id_is_set ();
struct out_pipe_t struct out_pipe_t
{ {
......
...@@ -57,13 +57,15 @@ zmq::stream_t::~stream_t () ...@@ -57,13 +57,15 @@ zmq::stream_t::~stream_t ()
_prefetched_msg.close (); _prefetched_msg.close ();
} }
void zmq::stream_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::stream_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{ {
LIBZMQ_UNUSED (subscribe_to_all_); LIBZMQ_UNUSED (subscribe_to_all_);
zmq_assert (pipe_); zmq_assert (pipe_);
identify_peer (pipe_); identify_peer (pipe_, locally_initiated_);
_fq.attach (pipe_); _fq.attach (pipe_);
} }
...@@ -264,14 +266,14 @@ bool zmq::stream_t::xhas_out () ...@@ -264,14 +266,14 @@ bool zmq::stream_t::xhas_out ()
return true; return true;
} }
void zmq::stream_t::identify_peer (pipe_t *pipe_) void zmq::stream_t::identify_peer (pipe_t *pipe_, bool locally_initiated_)
{ {
// Always assign routing id for raw-socket // Always assign routing id for raw-socket
unsigned char buffer[5]; unsigned char buffer[5];
buffer[0] = 0; buffer[0] = 0;
blob_t routing_id; blob_t routing_id;
const std::string connect_routing_id = extract_connect_routing_id (); if (locally_initiated_ && connect_routing_id_is_set ()) {
if (!connect_routing_id.empty ()) { const std::string connect_routing_id = extract_connect_routing_id ();
routing_id.set ( routing_id.set (
reinterpret_cast<const unsigned char *> (connect_routing_id.c_str ()), reinterpret_cast<const unsigned char *> (connect_routing_id.c_str ()),
connect_routing_id.length ()); connect_routing_id.length ());
......
...@@ -46,7 +46,9 @@ class stream_t : public routing_socket_base_t ...@@ -46,7 +46,9 @@ class stream_t : public routing_socket_base_t
~stream_t (); ~stream_t ();
// Overrides of functions from socket_base_t. // Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsend (zmq::msg_t *msg_); int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_);
bool xhas_in (); bool xhas_in ();
...@@ -57,7 +59,7 @@ class stream_t : public routing_socket_base_t ...@@ -57,7 +59,7 @@ class stream_t : public routing_socket_base_t
private: private:
// Generate peer's id and update lookup map // Generate peer's id and update lookup map
void identify_peer (pipe_t *pipe_); void identify_peer (pipe_t *pipe_, bool locally_initiated_);
// Fair queueing object for inbound pipes. // Fair queueing object for inbound pipes.
fq_t _fq; fq_t _fq;
......
...@@ -57,8 +57,12 @@ zmq::xpub_t::~xpub_t () ...@@ -57,8 +57,12 @@ zmq::xpub_t::~xpub_t ()
_welcome_msg.close (); _welcome_msg.close ();
} }
void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::xpub_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{ {
LIBZMQ_UNUSED (locally_initiated_);
zmq_assert (pipe_); zmq_assert (pipe_);
_dist.attach (pipe_); _dist.attach (pipe_);
......
...@@ -51,7 +51,9 @@ class xpub_t : public socket_base_t ...@@ -51,7 +51,9 @@ class xpub_t : public socket_base_t
~xpub_t (); ~xpub_t ();
// Implementations of virtual functions from socket_base_t. // Implementations of virtual functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_ = false); void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_ = false,
bool locally_initiated_ = false);
int xsend (zmq::msg_t *msg_); int xsend (zmq::msg_t *msg_);
bool xhas_out (); bool xhas_out ();
int xrecv (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_);
......
...@@ -55,9 +55,12 @@ zmq::xsub_t::~xsub_t () ...@@ -55,9 +55,12 @@ zmq::xsub_t::~xsub_t ()
errno_assert (rc == 0); errno_assert (rc == 0);
} }
void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::xsub_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{ {
LIBZMQ_UNUSED (subscribe_to_all_); LIBZMQ_UNUSED (subscribe_to_all_);
LIBZMQ_UNUSED (locally_initiated_);
zmq_assert (pipe_); zmq_assert (pipe_);
_fq.attach (pipe_); _fq.attach (pipe_);
......
...@@ -50,7 +50,9 @@ class xsub_t : public socket_base_t ...@@ -50,7 +50,9 @@ class xsub_t : public socket_base_t
protected: protected:
// Overrides of functions from socket_base_t. // Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsend (zmq::msg_t *msg_); int xsend (zmq::msg_t *msg_);
bool xhas_out (); bool xhas_out ();
int xrecv (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_);
......
...@@ -189,6 +189,106 @@ void test_router_2_router (bool named_) ...@@ -189,6 +189,106 @@ void test_router_2_router (bool named_)
zmq_ctx_destroy (ctx); zmq_ctx_destroy (ctx);
} }
void test_router_2_router_while_receiving ()
{
void *xbind, *zbind, *yconn;
int ret;
char buff[256];
char msg[] = "hi 1";
const char *wildcard_bind = "tcp://127.0.0.1:*";
int zero = 0;
size_t len = MAX_SOCKET_STRING;
char x_endpoint[MAX_SOCKET_STRING];
char z_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
// Create xbind socket.
xbind = zmq_socket (ctx, ZMQ_ROUTER);
assert (xbind);
ret = zmq_setsockopt (xbind, ZMQ_LINGER, &zero, sizeof (zero));
assert (0 == ret);
ret = zmq_bind (xbind, wildcard_bind);
assert (0 == ret);
ret = zmq_getsockopt (xbind, ZMQ_LAST_ENDPOINT, x_endpoint, &len);
assert (0 == ret);
// Create zbind socket.
zbind = zmq_socket (ctx, ZMQ_ROUTER);
assert (zbind);
ret = zmq_setsockopt (zbind, ZMQ_LINGER, &zero, sizeof (zero));
assert (0 == ret);
ret = zmq_bind (zbind, wildcard_bind);
assert (0 == ret);
ret = zmq_getsockopt (zbind, ZMQ_LAST_ENDPOINT, z_endpoint, &len);
assert (0 == ret);
// Create connection socket.
yconn = zmq_socket (ctx, ZMQ_ROUTER);
assert (yconn);
ret = zmq_setsockopt (yconn, ZMQ_LINGER, &zero, sizeof (zero));
assert (0 == ret);
// set identites for each socket
ret = zmq_setsockopt (xbind, ZMQ_ROUTING_ID, "X", 2);
ret = zmq_setsockopt (yconn, ZMQ_ROUTING_ID, "Y", 2);
ret = zmq_setsockopt (zbind, ZMQ_ROUTING_ID, "Z", 2);
// Connect Y to X using a routing id
ret = zmq_setsockopt (yconn, ZMQ_CONNECT_ROUTING_ID, "X", 2);
assert (0 == ret);
ret = zmq_connect (yconn, x_endpoint);
assert (0 == ret);
// Send some data from Y to X.
ret = zmq_send (yconn, "X", 2, ZMQ_SNDMORE);
assert (2 == ret);
ret = zmq_send (yconn, msg, 5, 0);
assert (5 == ret);
// wait for the Y->X message to be received
msleep (SETTLE_TIME);
// Now X tries to connect to Z and send a message
ret = zmq_setsockopt (xbind, ZMQ_CONNECT_ROUTING_ID, "Z", 2);
assert (0 == ret);
ret = zmq_connect (xbind, z_endpoint);
assert (0 == ret);
// Try to send some data from X to Z.
ret = zmq_send (xbind, "Z", 2, ZMQ_SNDMORE);
assert (2 == ret);
ret = zmq_send (xbind, msg, 5, 0);
assert (5 == ret);
// wait for the X->Z message to be received (so that our non-blocking check will actually
// fail if the message is routed to Y)
msleep (SETTLE_TIME);
// nothing should have been received on the Y socket
ret = zmq_recv (yconn, buff, 256, ZMQ_DONTWAIT);
assert (ret == -1);
assert (zmq_errno () == EAGAIN);
// the message should have been received on the Z socket
ret = zmq_recv (zbind, buff, 256, 0);
assert (ret && 'X' == buff[0]);
ret = zmq_recv (zbind, buff + 128, 128, 0);
assert (5 == ret && 'h' == buff[128]);
ret = zmq_unbind (xbind, x_endpoint);
assert (0 == ret);
ret = zmq_unbind (zbind, z_endpoint);
assert (0 == ret);
ret = zmq_close (yconn);
assert (0 == ret);
ret = zmq_close (xbind);
assert (0 == ret);
ret = zmq_close (zbind);
assert (0 == ret);
zmq_ctx_destroy (ctx);
}
int main (void) int main (void)
{ {
setup_test_environment (); setup_test_environment ();
...@@ -196,6 +296,7 @@ int main (void) ...@@ -196,6 +296,7 @@ int main (void)
test_stream_2_stream (); test_stream_2_stream ();
test_router_2_router (false); test_router_2_router (false);
test_router_2_router (true); test_router_2_router (true);
test_router_2_router_while_receiving ();
return 0; return 0;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment