Commit c9d6ef30 authored by Simon Giesecke's avatar Simon Giesecke

Problem: Member outpipes is duplicate between router_t and stream_t

Solution: extract into common base class routing_socket_base_t, for now as protected
parent 728eddfc
...@@ -152,7 +152,7 @@ void zmq::router_t::xpipe_terminated (pipe_t *pipe_) ...@@ -152,7 +152,7 @@ void zmq::router_t::xpipe_terminated (pipe_t *pipe_)
if (it != _anonymous_pipes.end ()) if (it != _anonymous_pipes.end ())
_anonymous_pipes.erase (it); _anonymous_pipes.erase (it);
else { else {
outpipes_t::iterator iter = _out_pipes.find (pipe_->get_routing_id ()); out_pipes_t::iterator iter = _out_pipes.find (pipe_->get_routing_id ());
zmq_assert (iter != _out_pipes.end ()); zmq_assert (iter != _out_pipes.end ());
_out_pipes.erase (iter); _out_pipes.erase (iter);
_fq.pipe_terminated (pipe_); _fq.pipe_terminated (pipe_);
...@@ -178,7 +178,7 @@ void zmq::router_t::xread_activated (pipe_t *pipe_) ...@@ -178,7 +178,7 @@ void zmq::router_t::xread_activated (pipe_t *pipe_)
void zmq::router_t::xwrite_activated (pipe_t *pipe_) void zmq::router_t::xwrite_activated (pipe_t *pipe_)
{ {
outpipes_t::iterator it; out_pipes_t::iterator it;
for (it = _out_pipes.begin (); it != _out_pipes.end (); ++it) for (it = _out_pipes.begin (); it != _out_pipes.end (); ++it)
if (it->second.pipe == pipe_) if (it->second.pipe == pipe_)
break; break;
...@@ -206,7 +206,7 @@ int zmq::router_t::xsend (msg_t *msg_) ...@@ -206,7 +206,7 @@ int zmq::router_t::xsend (msg_t *msg_)
// router_mandatory is set. // router_mandatory is set.
blob_t routing_id (static_cast<unsigned char *> (msg_->data ()), blob_t routing_id (static_cast<unsigned char *> (msg_->data ()),
msg_->size (), zmq::reference_tag_t ()); msg_->size (), zmq::reference_tag_t ());
outpipes_t::iterator it = _out_pipes.find (routing_id); out_pipes_t::iterator it = _out_pipes.find (routing_id);
if (it != _out_pipes.end ()) { if (it != _out_pipes.end ()) {
_current_out = it->second.pipe; _current_out = it->second.pipe;
...@@ -422,7 +422,7 @@ bool zmq::router_t::xhas_out () ...@@ -422,7 +422,7 @@ bool zmq::router_t::xhas_out ()
return true; return true;
bool has_out = false; bool has_out = false;
outpipes_t::iterator it; out_pipes_t::iterator it;
for (it = _out_pipes.begin (); it != _out_pipes.end (); ++it) for (it = _out_pipes.begin (); it != _out_pipes.end (); ++it)
has_out |= it->second.pipe->check_hwm (); has_out |= it->second.pipe->check_hwm ();
...@@ -440,7 +440,7 @@ int zmq::router_t::get_peer_state (const void *routing_id_, ...@@ -440,7 +440,7 @@ int zmq::router_t::get_peer_state (const void *routing_id_,
int res = 0; int res = 0;
blob_t routing_id_blob ((unsigned char *) routing_id_, routing_id_size_); blob_t routing_id_blob ((unsigned char *) routing_id_, routing_id_size_);
outpipes_t::const_iterator it = _out_pipes.find (routing_id_blob); out_pipes_t::const_iterator it = _out_pipes.find (routing_id_blob);
if (it == _out_pipes.end ()) { if (it == _out_pipes.end ()) {
errno = EHOSTUNREACH; errno = EHOSTUNREACH;
return -1; return -1;
...@@ -492,7 +492,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) ...@@ -492,7 +492,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
} else { } else {
routing_id.set (static_cast<unsigned char *> (msg.data ()), routing_id.set (static_cast<unsigned char *> (msg.data ()),
msg.size ()); msg.size ());
outpipes_t::iterator it = _out_pipes.find (routing_id); out_pipes_t::iterator it = _out_pipes.find (routing_id);
msg.close (); msg.close ();
if (it != _out_pipes.end ()) { if (it != _out_pipes.end ()) {
......
...@@ -97,19 +97,9 @@ class router_t : public routing_socket_base_t ...@@ -97,19 +97,9 @@ class router_t : public routing_socket_base_t
// If true, more incoming message parts are expected. // If true, more incoming message parts are expected.
bool _more_in; bool _more_in;
struct out_pipe_t
{
zmq::pipe_t *pipe;
bool active;
};
// We keep a set of pipes that have not been identified yet. // We keep a set of pipes that have not been identified yet.
std::set<pipe_t *> _anonymous_pipes; std::set<pipe_t *> _anonymous_pipes;
// Outbound pipes indexed by the peer IDs.
typedef std::map<blob_t, out_pipe_t> outpipes_t;
outpipes_t _out_pipes;
// The pipe we are currently writing to. // The pipe we are currently writing to.
zmq::pipe_t *_current_out; zmq::pipe_t *_current_out;
......
...@@ -308,6 +308,16 @@ class routing_socket_base_t : public socket_base_t ...@@ -308,6 +308,16 @@ class routing_socket_base_t : public socket_base_t
std::string extract_connect_routing_id (); std::string extract_connect_routing_id ();
struct out_pipe_t
{
zmq::pipe_t *pipe;
bool active;
};
// Outbound pipes indexed by the peer IDs.
typedef std::map<blob_t, out_pipe_t> out_pipes_t;
out_pipes_t _out_pipes;
private: private:
// Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types // Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types
std::string _connect_routing_id; std::string _connect_routing_id;
......
...@@ -53,7 +53,7 @@ zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) : ...@@ -53,7 +53,7 @@ zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
zmq::stream_t::~stream_t () zmq::stream_t::~stream_t ()
{ {
zmq_assert (_outpipes.empty ()); zmq_assert (_out_pipes.empty ());
_prefetched_routing_id.close (); _prefetched_routing_id.close ();
_prefetched_msg.close (); _prefetched_msg.close ();
} }
...@@ -70,9 +70,9 @@ void zmq::stream_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) ...@@ -70,9 +70,9 @@ void zmq::stream_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
void zmq::stream_t::xpipe_terminated (pipe_t *pipe_) void zmq::stream_t::xpipe_terminated (pipe_t *pipe_)
{ {
outpipes_t::iterator it = _outpipes.find (pipe_->get_routing_id ()); out_pipes_t::iterator it = _out_pipes.find (pipe_->get_routing_id ());
zmq_assert (it != _outpipes.end ()); zmq_assert (it != _out_pipes.end ());
_outpipes.erase (it); _out_pipes.erase (it);
_fq.pipe_terminated (pipe_); _fq.pipe_terminated (pipe_);
if (pipe_ == _current_out) if (pipe_ == _current_out)
_current_out = NULL; _current_out = NULL;
...@@ -85,12 +85,12 @@ void zmq::stream_t::xread_activated (pipe_t *pipe_) ...@@ -85,12 +85,12 @@ void zmq::stream_t::xread_activated (pipe_t *pipe_)
void zmq::stream_t::xwrite_activated (pipe_t *pipe_) void zmq::stream_t::xwrite_activated (pipe_t *pipe_)
{ {
outpipes_t::iterator it; out_pipes_t::iterator it;
for (it = _outpipes.begin (); it != _outpipes.end (); ++it) for (it = _out_pipes.begin (); it != _out_pipes.end (); ++it)
if (it->second.pipe == pipe_) if (it->second.pipe == pipe_)
break; break;
zmq_assert (it != _outpipes.end ()); zmq_assert (it != _out_pipes.end ());
zmq_assert (!it->second.active); zmq_assert (!it->second.active);
it->second.active = true; it->second.active = true;
} }
...@@ -110,9 +110,9 @@ int zmq::stream_t::xsend (msg_t *msg_) ...@@ -110,9 +110,9 @@ int zmq::stream_t::xsend (msg_t *msg_)
// If there's no such pipe return an error // If there's no such pipe return an error
blob_t routing_id (static_cast<unsigned char *> (msg_->data ()), blob_t routing_id (static_cast<unsigned char *> (msg_->data ()),
msg_->size ()); msg_->size ());
outpipes_t::iterator it = _outpipes.find (routing_id); out_pipes_t::iterator it = _out_pipes.find (routing_id);
if (it != _outpipes.end ()) { if (it != _out_pipes.end ()) {
_current_out = it->second.pipe; _current_out = it->second.pipe;
if (!_current_out->check_write ()) { if (!_current_out->check_write ()) {
it->second.active = false; it->second.active = false;
...@@ -288,7 +288,7 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_) ...@@ -288,7 +288,7 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_)
reinterpret_cast<const unsigned char *> (connect_routing_id.c_str ()), reinterpret_cast<const unsigned char *> (connect_routing_id.c_str ()),
connect_routing_id.length ()); connect_routing_id.length ());
// Not allowed to duplicate an existing rid // Not allowed to duplicate an existing rid
zmq_assert (0 == _outpipes.count (routing_id)); zmq_assert (0 == _out_pipes.count (routing_id));
} else { } else {
put_uint32 (buffer + 1, _next_integral_routing_id++); put_uint32 (buffer + 1, _next_integral_routing_id++);
routing_id.set (buffer, sizeof buffer); routing_id.set (buffer, sizeof buffer);
...@@ -298,9 +298,9 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_) ...@@ -298,9 +298,9 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_)
} }
pipe_->set_router_socket_routing_id (routing_id); pipe_->set_router_socket_routing_id (routing_id);
// Add the record into output pipes lookup table // Add the record into output pipes lookup table
outpipe_t outpipe = {pipe_, true}; out_pipe_t outpipe = {pipe_, true};
const bool ok = const bool ok =
_outpipes.ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (routing_id), outpipe) _out_pipes.ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (routing_id), outpipe)
.second; .second;
zmq_assert (ok); zmq_assert (ok);
} }
...@@ -76,16 +76,6 @@ class stream_t : public routing_socket_base_t ...@@ -76,16 +76,6 @@ class stream_t : public routing_socket_base_t
// Holds the prefetched message. // Holds the prefetched message.
msg_t _prefetched_msg; msg_t _prefetched_msg;
struct outpipe_t
{
zmq::pipe_t *pipe;
bool active;
};
// Outbound pipes indexed by the peer IDs.
typedef std::map<blob_t, outpipe_t> outpipes_t;
outpipes_t _outpipes;
// The pipe we are currently writing to. // The pipe we are currently writing to.
zmq::pipe_t *_current_out; zmq::pipe_t *_current_out;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment