Unverified Commit c6c63481 authored by Luca Boccassi's avatar Luca Boccassi Committed by GitHub

Merge pull request #3142 from sigiesec/analyze

Remove duplication between stream_t and router_t
parents 2dfdcaff 5a343fc2
...@@ -63,7 +63,7 @@ void zmq::mechanism_t::peer_routing_id (msg_t *msg_) ...@@ -63,7 +63,7 @@ void zmq::mechanism_t::peer_routing_id (msg_t *msg_)
void zmq::mechanism_t::set_user_id (const void *data_, size_t size_) void zmq::mechanism_t::set_user_id (const void *data_, size_t size_)
{ {
_user_id.set (static_cast<const unsigned char *> (data_), size_); _user_id.set (static_cast<const unsigned char *> (data_), size_);
zap_properties.ZMQ_MAP_INSERT_OR_EMPLACE ( _zap_properties.ZMQ_MAP_INSERT_OR_EMPLACE (
std::string (ZMQ_MSG_PROPERTY_USER_ID), std::string (ZMQ_MSG_PROPERTY_USER_ID),
std::string (reinterpret_cast<const char *> (data_), size_)); std::string (reinterpret_cast<const char *> (data_), size_));
} }
...@@ -268,7 +268,7 @@ int zmq::mechanism_t::parse_metadata (const unsigned char *ptr_, ...@@ -268,7 +268,7 @@ int zmq::mechanism_t::parse_metadata (const unsigned char *ptr_,
if (rc == -1) if (rc == -1)
return -1; return -1;
} }
(zap_flag_ ? zap_properties : zmtp_properties) (zap_flag_ ? _zap_properties : _zmtp_properties)
.ZMQ_MAP_INSERT_OR_EMPLACE ( .ZMQ_MAP_INSERT_OR_EMPLACE (
name, name,
std::string (reinterpret_cast<const char *> (value), value_length)); std::string (reinterpret_cast<const char *> (value), value_length));
......
...@@ -81,9 +81,12 @@ class mechanism_t ...@@ -81,9 +81,12 @@ class mechanism_t
const blob_t &get_user_id () const; const blob_t &get_user_id () const;
const metadata_t::dict_t &get_zmtp_properties () { return zmtp_properties; } const metadata_t::dict_t &get_zmtp_properties ()
{
return _zmtp_properties;
}
const metadata_t::dict_t &get_zap_properties () { return zap_properties; } const metadata_t::dict_t &get_zap_properties () { return _zap_properties; }
protected: protected:
// Only used to identify the socket for the Socket-Type // Only used to identify the socket for the Socket-Type
...@@ -123,15 +126,15 @@ class mechanism_t ...@@ -123,15 +126,15 @@ class mechanism_t
virtual int virtual int
property (const std::string &name_, const void *value_, size_t length_); property (const std::string &name_, const void *value_, size_t length_);
const options_t options;
private:
// Properties received from ZMTP peer. // Properties received from ZMTP peer.
metadata_t::dict_t zmtp_properties; metadata_t::dict_t _zmtp_properties;
// Properties received from ZAP server. // Properties received from ZAP server.
metadata_t::dict_t zap_properties; metadata_t::dict_t _zap_properties;
const options_t options;
private:
blob_t _routing_id; blob_t _routing_id;
blob_t _user_id; blob_t _user_id;
......
...@@ -37,7 +37,7 @@ ...@@ -37,7 +37,7 @@
#include "err.hpp" #include "err.hpp"
zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_), routing_socket_base_t (parent_, tid_, sid_),
_prefetched (false), _prefetched (false),
_routing_id_sent (false), _routing_id_sent (false),
_current_in (NULL), _current_in (NULL),
...@@ -63,8 +63,6 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : ...@@ -63,8 +63,6 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
zmq::router_t::~router_t () zmq::router_t::~router_t ()
{ {
zmq_assert (_anonymous_pipes.empty ()); zmq_assert (_anonymous_pipes.empty ());
;
zmq_assert (_out_pipes.empty ());
_prefetched_id.close (); _prefetched_id.close ();
_prefetched_msg.close (); _prefetched_msg.close ();
} }
...@@ -99,21 +97,12 @@ int zmq::router_t::xsetsockopt (int option_, ...@@ -99,21 +97,12 @@ int zmq::router_t::xsetsockopt (int option_,
const void *optval_, const void *optval_,
size_t optvallen_) size_t optvallen_)
{ {
bool is_int = (optvallen_ == sizeof (int)); const bool is_int = (optvallen_ == sizeof (int));
int value = 0; int value = 0;
if (is_int) if (is_int)
memcpy (&value, optval_, sizeof (int)); memcpy (&value, optval_, sizeof (int));
switch (option_) { switch (option_) {
case ZMQ_CONNECT_ROUTING_ID:
// TODO why isn't it possible to set an empty connect_routing_id
// (which is the default value)
if (optval_ && optvallen_) {
connect_routing_id.assign ((char *) optval_, optvallen_);
return 0;
}
break;
case ZMQ_ROUTER_RAW: case ZMQ_ROUTER_RAW:
if (is_int && value >= 0) { if (is_int && value >= 0) {
_raw_socket = (value != 0); _raw_socket = (value != 0);
...@@ -147,7 +136,8 @@ int zmq::router_t::xsetsockopt (int option_, ...@@ -147,7 +136,8 @@ int zmq::router_t::xsetsockopt (int option_,
break; break;
default: default:
break; return routing_socket_base_t::xsetsockopt (option_, optval_,
optvallen_);
} }
errno = EINVAL; errno = EINVAL;
return -1; return -1;
...@@ -160,9 +150,7 @@ void zmq::router_t::xpipe_terminated (pipe_t *pipe_) ...@@ -160,9 +150,7 @@ void zmq::router_t::xpipe_terminated (pipe_t *pipe_)
if (it != _anonymous_pipes.end ()) if (it != _anonymous_pipes.end ())
_anonymous_pipes.erase (it); _anonymous_pipes.erase (it);
else { else {
outpipes_t::iterator iter = _out_pipes.find (pipe_->get_routing_id ()); erase_out_pipe (pipe_);
zmq_assert (iter != _out_pipes.end ());
_out_pipes.erase (iter);
_fq.pipe_terminated (pipe_); _fq.pipe_terminated (pipe_);
pipe_->rollback (); pipe_->rollback ();
if (pipe_ == _current_out) if (pipe_ == _current_out)
...@@ -184,18 +172,6 @@ void zmq::router_t::xread_activated (pipe_t *pipe_) ...@@ -184,18 +172,6 @@ void zmq::router_t::xread_activated (pipe_t *pipe_)
} }
} }
void zmq::router_t::xwrite_activated (pipe_t *pipe_)
{
outpipes_t::iterator it;
for (it = _out_pipes.begin (); it != _out_pipes.end (); ++it)
if (it->second.pipe == pipe_)
break;
zmq_assert (it != _out_pipes.end ());
zmq_assert (!it->second.active);
it->second.active = true;
}
int zmq::router_t::xsend (msg_t *msg_) int zmq::router_t::xsend (msg_t *msg_)
{ {
// If this is the first part of the message it's the ID of the // If this is the first part of the message it's the ID of the
...@@ -211,19 +187,19 @@ int zmq::router_t::xsend (msg_t *msg_) ...@@ -211,19 +187,19 @@ int zmq::router_t::xsend (msg_t *msg_)
// Find the pipe associated with the routing id stored in the prefix. // Find the pipe associated with the routing id stored in the prefix.
// If there's no such pipe just silently ignore the message, unless // If there's no such pipe just silently ignore the message, unless
// router_mandatory is set. // router_mandatory is set. ;
blob_t routing_id (static_cast<unsigned char *> (msg_->data ()), out_pipe_t *out_pipe = lookup_out_pipe (
msg_->size (), zmq::reference_tag_t ()); blob_t (static_cast<unsigned char *> (msg_->data ()),
outpipes_t::iterator it = _out_pipes.find (routing_id); msg_->size (), zmq::reference_tag_t ()));
if (it != _out_pipes.end ()) { if (out_pipe) {
_current_out = it->second.pipe; _current_out = out_pipe->pipe;
// Check whether pipe is closed or not // Check whether pipe is closed or not
if (!_current_out->check_write ()) { if (!_current_out->check_write ()) {
// Check whether pipe is full or not // Check whether pipe is full or not
bool pipe_full = !_current_out->check_hwm (); bool pipe_full = !_current_out->check_hwm ();
it->second.active = false; out_pipe->active = false;
_current_out = NULL; _current_out = NULL;
if (_mandatory) { if (_mandatory) {
...@@ -420,6 +396,11 @@ bool zmq::router_t::xhas_in () ...@@ -420,6 +396,11 @@ bool zmq::router_t::xhas_in ()
return true; return true;
} }
static bool check_pipe_hwm (const zmq::pipe_t &pipe)
{
return pipe.check_hwm ();
}
bool zmq::router_t::xhas_out () bool zmq::router_t::xhas_out ()
{ {
// In theory, ROUTER socket is always ready for writing (except when // In theory, ROUTER socket is always ready for writing (except when
...@@ -429,12 +410,7 @@ bool zmq::router_t::xhas_out () ...@@ -429,12 +410,7 @@ bool zmq::router_t::xhas_out ()
if (!_mandatory) if (!_mandatory)
return true; return true;
bool has_out = false; return any_of_out_pipes (check_pipe_hwm);
outpipes_t::iterator it;
for (it = _out_pipes.begin (); it != _out_pipes.end (); ++it)
has_out |= it->second.pipe->check_hwm ();
return has_out;
} }
const zmq::blob_t &zmq::router_t::get_credential () const const zmq::blob_t &zmq::router_t::get_credential () const
...@@ -448,14 +424,13 @@ int zmq::router_t::get_peer_state (const void *routing_id_, ...@@ -448,14 +424,13 @@ int zmq::router_t::get_peer_state (const void *routing_id_,
int res = 0; int res = 0;
blob_t routing_id_blob ((unsigned char *) routing_id_, routing_id_size_); blob_t routing_id_blob ((unsigned char *) routing_id_, routing_id_size_);
outpipes_t::const_iterator it = _out_pipes.find (routing_id_blob); const out_pipe_t *out_pipe = lookup_out_pipe (routing_id_blob);
if (it == _out_pipes.end ()) { if (!out_pipe) {
errno = EHOSTUNREACH; errno = EHOSTUNREACH;
return -1; return -1;
} }
const out_pipe_t &outpipe = it->second; if (out_pipe->pipe->check_hwm ())
if (outpipe.pipe->check_hwm ())
res |= ZMQ_POLLOUT; res |= ZMQ_POLLOUT;
/** \todo does it make any sense to check the inpipe as well? */ /** \todo does it make any sense to check the inpipe as well? */
...@@ -466,16 +441,15 @@ int zmq::router_t::get_peer_state (const void *routing_id_, ...@@ -466,16 +441,15 @@ int zmq::router_t::get_peer_state (const void *routing_id_,
bool zmq::router_t::identify_peer (pipe_t *pipe_) bool zmq::router_t::identify_peer (pipe_t *pipe_)
{ {
msg_t msg; msg_t msg;
bool ok;
blob_t routing_id; blob_t routing_id;
if (connect_routing_id.length ()) { const std::string connect_routing_id = extract_connect_routing_id ();
routing_id.set ((unsigned char *) connect_routing_id.c_str (), if (!connect_routing_id.empty ()) {
routing_id.set (
reinterpret_cast<const unsigned char *> (connect_routing_id.c_str ()),
connect_routing_id.length ()); connect_routing_id.length ());
connect_routing_id.clear (); // Not allowed to duplicate an existing rid
outpipes_t::iterator it = _out_pipes.find (routing_id); zmq_assert (!has_out_pipe (routing_id));
if (it != _out_pipes.end ())
zmq_assert (false); // Not allowed to duplicate an existing rid
} else if ( } else if (
options options
.raw_socket) { // Always assign an integral routing id for raw-socket .raw_socket) { // Always assign an integral routing id for raw-socket
...@@ -486,7 +460,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) ...@@ -486,7 +460,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
} else if (!options.raw_socket) { } else if (!options.raw_socket) {
// Pick up handshake cases and also case where next integral routing id is set // Pick up handshake cases and also case where next integral routing id is set
msg.init (); msg.init ();
ok = pipe_->read (&msg); bool ok = pipe_->read (&msg);
if (!ok) if (!ok)
return false; return false;
...@@ -500,10 +474,13 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) ...@@ -500,10 +474,13 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
} else { } else {
routing_id.set (static_cast<unsigned char *> (msg.data ()), routing_id.set (static_cast<unsigned char *> (msg.data ()),
msg.size ()); msg.size ());
outpipes_t::iterator it = _out_pipes.find (routing_id);
msg.close (); msg.close ();
if (it != _out_pipes.end ()) { // Try to remove an existing routing id entry to allow the new
// connection to take the routing id.
out_pipe_t existing_outpipe = try_erase_out_pipe (routing_id);
if (existing_outpipe.pipe) {
if (!_handover) if (!_handover)
// Ignore peers with duplicate ID // Ignore peers with duplicate ID
return false; return false;
...@@ -516,19 +493,10 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) ...@@ -516,19 +493,10 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
put_uint32 (buf + 1, _next_integral_routing_id++); put_uint32 (buf + 1, _next_integral_routing_id++);
blob_t new_routing_id (buf, sizeof buf); blob_t new_routing_id (buf, sizeof buf);
it->second.pipe->set_router_socket_routing_id (new_routing_id); existing_outpipe.pipe->set_router_socket_routing_id (
out_pipe_t existing_outpipe = {it->second.pipe, new_routing_id);
it->second.active};
ok = _out_pipes add_out_pipe (ZMQ_MOVE (new_routing_id), existing_outpipe.pipe);
.ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (new_routing_id),
existing_outpipe)
.second;
zmq_assert (ok);
// Remove the existing routing id entry to allow the new
// connection to take the routing id.
_out_pipes.erase (it);
if (existing_outpipe.pipe == _current_in) if (existing_outpipe.pipe == _current_in)
_terminate_current_in = true; _terminate_current_in = true;
...@@ -539,11 +507,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) ...@@ -539,11 +507,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
} }
pipe_->set_router_socket_routing_id (routing_id); pipe_->set_router_socket_routing_id (routing_id);
// Add the record into output pipes lookup table add_out_pipe (ZMQ_MOVE (routing_id), pipe_);
out_pipe_t outpipe = {pipe_, true};
ok = _out_pipes.ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (routing_id), outpipe)
.second;
zmq_assert (ok);
return true; return true;
} }
...@@ -45,7 +45,7 @@ class ctx_t; ...@@ -45,7 +45,7 @@ class ctx_t;
class pipe_t; class pipe_t;
// TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm. // TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm.
class router_t : public socket_base_t class router_t : public routing_socket_base_t
{ {
public: public:
router_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_); router_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
...@@ -59,7 +59,6 @@ class router_t : public socket_base_t ...@@ -59,7 +59,6 @@ class router_t : public socket_base_t
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
void xread_activated (zmq::pipe_t *pipe_); void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_); void xpipe_terminated (zmq::pipe_t *pipe_);
int get_peer_state (const void *identity_, size_t identity_size_) const; int get_peer_state (const void *identity_, size_t identity_size_) const;
...@@ -97,19 +96,9 @@ class router_t : public socket_base_t ...@@ -97,19 +96,9 @@ class router_t : public socket_base_t
// If true, more incoming message parts are expected. // If true, more incoming message parts are expected.
bool _more_in; bool _more_in;
struct out_pipe_t
{
zmq::pipe_t *pipe;
bool active;
};
// We keep a set of pipes that have not been identified yet. // We keep a set of pipes that have not been identified yet.
std::set<pipe_t *> _anonymous_pipes; std::set<pipe_t *> _anonymous_pipes;
// Outbound pipes indexed by the peer IDs.
typedef std::map<blob_t, out_pipe_t> outpipes_t;
outpipes_t _out_pipes;
// The pipe we are currently writing to. // The pipe we are currently writing to.
zmq::pipe_t *_current_out; zmq::pipe_t *_current_out;
......
...@@ -1762,3 +1762,102 @@ void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_) ...@@ -1762,3 +1762,102 @@ void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_)
_monitor_events = 0; _monitor_events = 0;
} }
} }
zmq::routing_socket_base_t::routing_socket_base_t (class ctx_t *parent_,
uint32_t tid_,
int sid_) :
socket_base_t (parent_, tid_, sid_)
{
}
zmq::routing_socket_base_t::~routing_socket_base_t ()
{
zmq_assert (_out_pipes.empty ());
}
int zmq::routing_socket_base_t::xsetsockopt (int option_,
const void *optval_,
size_t optvallen_)
{
switch (option_) {
case ZMQ_CONNECT_ROUTING_ID:
// TODO why isn't it possible to set an empty connect_routing_id
// (which is the default value)
if (optval_ && optvallen_) {
_connect_routing_id.assign (static_cast<const char *> (optval_),
optvallen_);
return 0;
}
break;
}
errno = EINVAL;
return -1;
}
void zmq::routing_socket_base_t::xwrite_activated (pipe_t *pipe_)
{
out_pipes_t::iterator it;
for (it = _out_pipes.begin (); it != _out_pipes.end (); ++it)
if (it->second.pipe == pipe_)
break;
zmq_assert (it != _out_pipes.end ());
zmq_assert (!it->second.active);
it->second.active = true;
}
std::string zmq::routing_socket_base_t::extract_connect_routing_id ()
{
std::string res = ZMQ_MOVE (_connect_routing_id);
_connect_routing_id.clear ();
return res;
}
void zmq::routing_socket_base_t::add_out_pipe (blob_t routing_id, pipe_t *pipe_)
{
// Add the record into output pipes lookup table
const out_pipe_t outpipe = {pipe_, true};
const bool ok =
_out_pipes.ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (routing_id), outpipe)
.second;
zmq_assert (ok);
}
bool zmq::routing_socket_base_t::has_out_pipe (const blob_t &routing_id) const
{
return 0 != _out_pipes.count (routing_id);
}
zmq::routing_socket_base_t::out_pipe_t *
zmq::routing_socket_base_t::lookup_out_pipe (const blob_t &routing_id)
{
// TODO we could probably avoid constructor a temporary blob_t to call this function
out_pipes_t::iterator it = _out_pipes.find (routing_id);
return it == _out_pipes.end () ? NULL : &it->second;
}
const zmq::routing_socket_base_t::out_pipe_t *
zmq::routing_socket_base_t::lookup_out_pipe (const blob_t &routing_id) const
{
// TODO we could probably avoid constructor a temporary blob_t to call this function
out_pipes_t::const_iterator it = _out_pipes.find (routing_id);
return it == _out_pipes.end () ? NULL : &it->second;
}
void zmq::routing_socket_base_t::erase_out_pipe (pipe_t *pipe_)
{
const size_t erased = _out_pipes.erase (pipe_->get_routing_id ());
zmq_assert (erased);
}
zmq::routing_socket_base_t::out_pipe_t
zmq::routing_socket_base_t::try_erase_out_pipe (const blob_t &routing_id)
{
const out_pipes_t::iterator it = _out_pipes.find (routing_id);
out_pipe_t res = {NULL, false};
if (it != _out_pipes.end ()) {
res = it->second;
_out_pipes.erase (it);
}
return res;
}
...@@ -184,9 +184,6 @@ class socket_base_t : public own_t, ...@@ -184,9 +184,6 @@ class socket_base_t : public own_t,
// Delay actual destruction of the socket. // Delay actual destruction of the socket.
void process_destroy (); void process_destroy ();
// Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types
std::string connect_routing_id;
private: private:
// test if event should be sent and then dispatch it // test if event should be sent and then dispatch it
void event (const std::string &addr_, intptr_t fd_, int type_); void event (const std::string &addr_, intptr_t fd_, int type_);
...@@ -300,6 +297,53 @@ class socket_base_t : public own_t, ...@@ -300,6 +297,53 @@ class socket_base_t : public own_t,
socket_base_t (const socket_base_t &); socket_base_t (const socket_base_t &);
const socket_base_t &operator= (const socket_base_t &); const socket_base_t &operator= (const socket_base_t &);
}; };
class routing_socket_base_t : public socket_base_t
{
protected:
routing_socket_base_t (class ctx_t *parent_, uint32_t tid_, int sid_);
~routing_socket_base_t ();
// methods from socket_base_t
virtual int
xsetsockopt (int option_, const void *optval_, size_t optvallen_);
virtual void xwrite_activated (pipe_t *pipe_);
// own methods
std::string extract_connect_routing_id ();
struct out_pipe_t
{
pipe_t *pipe;
bool active;
};
void add_out_pipe (blob_t routing_id, pipe_t *pipe_);
bool has_out_pipe (const blob_t &routing_id) const;
out_pipe_t *lookup_out_pipe (const blob_t &routing_id);
const out_pipe_t *lookup_out_pipe (const blob_t &routing_id) const;
void erase_out_pipe (pipe_t *pipe_);
out_pipe_t try_erase_out_pipe (const blob_t &routing_id);
template <typename Func> bool any_of_out_pipes (Func func)
{
bool res = false;
for (out_pipes_t::iterator it = _out_pipes.begin ();
it != _out_pipes.end (); ++it) {
if (res |= func (*it->second.pipe))
break;
}
return res;
}
private:
// Outbound pipes indexed by the peer IDs.
typedef std::map<blob_t, out_pipe_t> out_pipes_t;
out_pipes_t _out_pipes;
// Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types
std::string _connect_routing_id;
};
} }
#endif #endif
...@@ -37,7 +37,7 @@ ...@@ -37,7 +37,7 @@
#include "err.hpp" #include "err.hpp"
zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) : zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_), routing_socket_base_t (parent_, tid_, sid_),
_prefetched (false), _prefetched (false),
_routing_id_sent (false), _routing_id_sent (false),
_current_out (NULL), _current_out (NULL),
...@@ -53,7 +53,6 @@ zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) : ...@@ -53,7 +53,6 @@ zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
zmq::stream_t::~stream_t () zmq::stream_t::~stream_t ()
{ {
zmq_assert (_outpipes.empty ());
_prefetched_routing_id.close (); _prefetched_routing_id.close ();
_prefetched_msg.close (); _prefetched_msg.close ();
} }
...@@ -70,10 +69,10 @@ void zmq::stream_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) ...@@ -70,10 +69,10 @@ void zmq::stream_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
void zmq::stream_t::xpipe_terminated (pipe_t *pipe_) void zmq::stream_t::xpipe_terminated (pipe_t *pipe_)
{ {
outpipes_t::iterator it = _outpipes.find (pipe_->get_routing_id ()); erase_out_pipe (pipe_);
zmq_assert (it != _outpipes.end ());
_outpipes.erase (it);
_fq.pipe_terminated (pipe_); _fq.pipe_terminated (pipe_);
// TODO router_t calls pipe_->rollback() here; should this be done here as
// well? then xpipe_terminated could be pulled up to routing_socket_base_t
if (pipe_ == _current_out) if (pipe_ == _current_out)
_current_out = NULL; _current_out = NULL;
} }
...@@ -83,18 +82,6 @@ void zmq::stream_t::xread_activated (pipe_t *pipe_) ...@@ -83,18 +82,6 @@ void zmq::stream_t::xread_activated (pipe_t *pipe_)
_fq.activated (pipe_); _fq.activated (pipe_);
} }
void zmq::stream_t::xwrite_activated (pipe_t *pipe_)
{
outpipes_t::iterator it;
for (it = _outpipes.begin (); it != _outpipes.end (); ++it)
if (it->second.pipe == pipe_)
break;
zmq_assert (it != _outpipes.end ());
zmq_assert (!it->second.active);
it->second.active = true;
}
int zmq::stream_t::xsend (msg_t *msg_) int zmq::stream_t::xsend (msg_t *msg_)
{ {
// If this is the first part of the message it's the ID of the // If this is the first part of the message it's the ID of the
...@@ -108,14 +95,15 @@ int zmq::stream_t::xsend (msg_t *msg_) ...@@ -108,14 +95,15 @@ int zmq::stream_t::xsend (msg_t *msg_)
if (msg_->flags () & msg_t::more) { if (msg_->flags () & msg_t::more) {
// Find the pipe associated with the routing id stored in the prefix. // Find the pipe associated with the routing id stored in the prefix.
// If there's no such pipe return an error // If there's no such pipe return an error
blob_t routing_id (static_cast<unsigned char *> (msg_->data ()),
msg_->size ());
outpipes_t::iterator it = _outpipes.find (routing_id);
if (it != _outpipes.end ()) { out_pipe_t *out_pipe = lookup_out_pipe (
_current_out = it->second.pipe; blob_t (static_cast<unsigned char *> (msg_->data ()),
msg_->size (), reference_tag_t ()));
if (out_pipe) {
_current_out = out_pipe->pipe;
if (!_current_out->check_write ()) { if (!_current_out->check_write ()) {
it->second.active = false; out_pipe->active = false;
_current_out = NULL; _current_out = NULL;
errno = EAGAIN; errno = EAGAIN;
return -1; return -1;
...@@ -177,25 +165,14 @@ int zmq::stream_t::xsetsockopt (int option_, ...@@ -177,25 +165,14 @@ int zmq::stream_t::xsetsockopt (int option_,
size_t optvallen_) size_t optvallen_)
{ {
switch (option_) { switch (option_) {
case ZMQ_CONNECT_ROUTING_ID:
// TODO why isn't it possible to set an empty connect_routing_id
// (which is the default value)
if (optval_ && optvallen_) {
connect_routing_id.assign ((char *) optval_, optvallen_);
return 0;
}
break;
case ZMQ_STREAM_NOTIFY: case ZMQ_STREAM_NOTIFY:
return do_setsockopt_int_as_bool_strict (optval_, optvallen_, return do_setsockopt_int_as_bool_strict (optval_, optvallen_,
&options.raw_notify); &options.raw_notify);
break;
default: default:
break; return routing_socket_base_t::xsetsockopt (option_, optval_,
optvallen_);
} }
errno = EINVAL;
return -1;
} }
int zmq::stream_t::xrecv (msg_t *msg_) int zmq::stream_t::xrecv (msg_t *msg_)
...@@ -293,12 +270,13 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_) ...@@ -293,12 +270,13 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_)
unsigned char buffer[5]; unsigned char buffer[5];
buffer[0] = 0; buffer[0] = 0;
blob_t routing_id; blob_t routing_id;
if (connect_routing_id.length ()) { const std::string connect_routing_id = extract_connect_routing_id ();
routing_id.set ((unsigned char *) connect_routing_id.c_str (), if (!connect_routing_id.empty ()) {
routing_id.set (
reinterpret_cast<const unsigned char *> (connect_routing_id.c_str ()),
connect_routing_id.length ()); connect_routing_id.length ());
connect_routing_id.clear (); // Not allowed to duplicate an existing rid
outpipes_t::iterator it = _outpipes.find (routing_id); zmq_assert (!has_out_pipe (routing_id));
zmq_assert (it == _outpipes.end ());
} else { } else {
put_uint32 (buffer + 1, _next_integral_routing_id++); put_uint32 (buffer + 1, _next_integral_routing_id++);
routing_id.set (buffer, sizeof buffer); routing_id.set (buffer, sizeof buffer);
...@@ -307,10 +285,5 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_) ...@@ -307,10 +285,5 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_)
static_cast<unsigned char> (routing_id.size ()); static_cast<unsigned char> (routing_id.size ());
} }
pipe_->set_router_socket_routing_id (routing_id); pipe_->set_router_socket_routing_id (routing_id);
// Add the record into output pipes lookup table add_out_pipe (ZMQ_MOVE (routing_id), pipe_);
outpipe_t outpipe = {pipe_, true};
const bool ok =
_outpipes.ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (routing_id), outpipe)
.second;
zmq_assert (ok);
} }
...@@ -39,7 +39,7 @@ namespace zmq ...@@ -39,7 +39,7 @@ namespace zmq
class ctx_t; class ctx_t;
class pipe_t; class pipe_t;
class stream_t : public socket_base_t class stream_t : public routing_socket_base_t
{ {
public: public:
stream_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_); stream_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
...@@ -52,7 +52,6 @@ class stream_t : public socket_base_t ...@@ -52,7 +52,6 @@ class stream_t : public socket_base_t
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
void xread_activated (zmq::pipe_t *pipe_); void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_); void xpipe_terminated (zmq::pipe_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
...@@ -76,16 +75,6 @@ class stream_t : public socket_base_t ...@@ -76,16 +75,6 @@ class stream_t : public socket_base_t
// Holds the prefetched message. // Holds the prefetched message.
msg_t _prefetched_msg; msg_t _prefetched_msg;
struct outpipe_t
{
zmq::pipe_t *pipe;
bool active;
};
// Outbound pipes indexed by the peer IDs.
typedef std::map<blob_t, outpipe_t> outpipes_t;
outpipes_t _outpipes;
// The pipe we are currently writing to. // The pipe we are currently writing to.
zmq::pipe_t *_current_out; zmq::pipe_t *_current_out;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment