Commit 09fab930 authored by Simon Giesecke's avatar Simon Giesecke

Problem: xwrite_activated duplicated between stream_t and router_t

Solution: pull up into routing_socket_base_t
parent c9d6ef30
...@@ -176,18 +176,6 @@ void zmq::router_t::xread_activated (pipe_t *pipe_) ...@@ -176,18 +176,6 @@ void zmq::router_t::xread_activated (pipe_t *pipe_)
} }
} }
void zmq::router_t::xwrite_activated (pipe_t *pipe_)
{
out_pipes_t::iterator it;
for (it = _out_pipes.begin (); it != _out_pipes.end (); ++it)
if (it->second.pipe == pipe_)
break;
zmq_assert (it != _out_pipes.end ());
zmq_assert (!it->second.active);
it->second.active = true;
}
int zmq::router_t::xsend (msg_t *msg_) int zmq::router_t::xsend (msg_t *msg_)
{ {
// If this is the first part of the message it's the ID of the // If this is the first part of the message it's the ID of the
......
...@@ -59,7 +59,6 @@ class router_t : public routing_socket_base_t ...@@ -59,7 +59,6 @@ class router_t : public routing_socket_base_t
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
void xread_activated (zmq::pipe_t *pipe_); void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_); void xpipe_terminated (zmq::pipe_t *pipe_);
int get_peer_state (const void *identity_, size_t identity_size_) const; int get_peer_state (const void *identity_, size_t identity_size_) const;
......
...@@ -1789,6 +1789,18 @@ int zmq::routing_socket_base_t::xsetsockopt (int option_, ...@@ -1789,6 +1789,18 @@ int zmq::routing_socket_base_t::xsetsockopt (int option_,
return -1; return -1;
} }
void zmq::routing_socket_base_t::xwrite_activated (pipe_t *pipe_)
{
out_pipes_t::iterator it;
for (it = _out_pipes.begin (); it != _out_pipes.end (); ++it)
if (it->second.pipe == pipe_)
break;
zmq_assert (it != _out_pipes.end ());
zmq_assert (!it->second.active);
it->second.active = true;
}
std::string zmq::routing_socket_base_t::extract_connect_routing_id () std::string zmq::routing_socket_base_t::extract_connect_routing_id ()
{ {
std::string res = ZMQ_MOVE (_connect_routing_id); std::string res = ZMQ_MOVE (_connect_routing_id);
......
...@@ -306,11 +306,14 @@ class routing_socket_base_t : public socket_base_t ...@@ -306,11 +306,14 @@ class routing_socket_base_t : public socket_base_t
virtual int virtual int
xsetsockopt (int option_, const void *optval_, size_t optvallen_); xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xwrite_activated (pipe_t *pipe_);
std::string extract_connect_routing_id (); std::string extract_connect_routing_id ();
struct out_pipe_t struct out_pipe_t
{ {
zmq::pipe_t *pipe; pipe_t *pipe;
bool active; bool active;
}; };
......
...@@ -74,6 +74,8 @@ void zmq::stream_t::xpipe_terminated (pipe_t *pipe_) ...@@ -74,6 +74,8 @@ void zmq::stream_t::xpipe_terminated (pipe_t *pipe_)
zmq_assert (it != _out_pipes.end ()); zmq_assert (it != _out_pipes.end ());
_out_pipes.erase (it); _out_pipes.erase (it);
_fq.pipe_terminated (pipe_); _fq.pipe_terminated (pipe_);
// TODO router_t calls pipe_->rollback() here; should this be done here as
// well? then xpipe_terminated could be pulled up to routing_socket_base_t
if (pipe_ == _current_out) if (pipe_ == _current_out)
_current_out = NULL; _current_out = NULL;
} }
...@@ -83,18 +85,6 @@ void zmq::stream_t::xread_activated (pipe_t *pipe_) ...@@ -83,18 +85,6 @@ void zmq::stream_t::xread_activated (pipe_t *pipe_)
_fq.activated (pipe_); _fq.activated (pipe_);
} }
void zmq::stream_t::xwrite_activated (pipe_t *pipe_)
{
out_pipes_t::iterator it;
for (it = _out_pipes.begin (); it != _out_pipes.end (); ++it)
if (it->second.pipe == pipe_)
break;
zmq_assert (it != _out_pipes.end ());
zmq_assert (!it->second.active);
it->second.active = true;
}
int zmq::stream_t::xsend (msg_t *msg_) int zmq::stream_t::xsend (msg_t *msg_)
{ {
// If this is the first part of the message it's the ID of the // If this is the first part of the message it's the ID of the
......
...@@ -52,7 +52,6 @@ class stream_t : public routing_socket_base_t ...@@ -52,7 +52,6 @@ class stream_t : public routing_socket_base_t
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
void xread_activated (zmq::pipe_t *pipe_); void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_); void xpipe_terminated (zmq::pipe_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment