Commit c897af50 authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #1177 from hurtonm/master

Code cleanup
parents 41a9968c 5a497d7d
...@@ -38,7 +38,6 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, ...@@ -38,7 +38,6 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
bool active_, class socket_base_t *socket_, const options_t &options_, bool active_, class socket_base_t *socket_, const options_t &options_,
address_t *addr_) address_t *addr_)
{ {
session_base_t *s = NULL; session_base_t *s = NULL;
switch (options_.type) { switch (options_.type) {
case ZMQ_REQ: case ZMQ_REQ:
...@@ -228,14 +227,16 @@ void zmq::session_base_t::pipe_terminated (pipe_t *pipe_) ...@@ -228,14 +227,16 @@ void zmq::session_base_t::pipe_terminated (pipe_t *pipe_)
// If we are waiting for pending messages to be sent, at this point // If we are waiting for pending messages to be sent, at this point
// we are sure that there will be no more messages and we can proceed // we are sure that there will be no more messages and we can proceed
// with termination safely. // with termination safely.
if (pending && !pipe && !zap_pipe && terminating_pipes.empty ()) if (pending && !pipe && !zap_pipe && terminating_pipes.empty ()) {
proceed_with_term (); pending = false;
own_t::process_term (0);
}
} }
void zmq::session_base_t::read_activated (pipe_t *pipe_) void zmq::session_base_t::read_activated (pipe_t *pipe_)
{ {
// Skip activating if we're detaching this pipe // Skip activating if we're detaching this pipe
if (unlikely(pipe_ != pipe && pipe_ != zap_pipe)) { if (unlikely (pipe_ != pipe && pipe_ != zap_pipe)) {
zmq_assert (terminating_pipes.count (pipe_) == 1); zmq_assert (terminating_pipes.count (pipe_) == 1);
return; return;
} }
...@@ -354,9 +355,9 @@ void zmq::session_base_t::process_attach (i_engine *engine_) ...@@ -354,9 +355,9 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
// Remember the local end of the pipe. // Remember the local end of the pipe.
zmq_assert (!pipe); zmq_assert (!pipe);
pipe = pipes [0]; pipe = pipes [0];
// Store engine assoc_fd for lilnking pipe to fd // Store engine assoc_fd for linking pipe to fd
pipe->assoc_fd=engine_->get_assoc_fd(); pipe->assoc_fd = engine_->get_assoc_fd ();
pipes[1]->assoc_fd=pipe->assoc_fd; pipes [1]->assoc_fd = pipe->assoc_fd;
// Ask socket to plug into the remote end of the pipe. // Ask socket to plug into the remote end of the pipe.
send_bind (socket, pipes [1]); send_bind (socket, pipes [1]);
} }
...@@ -409,8 +410,8 @@ void zmq::session_base_t::process_term (int linger_) ...@@ -409,8 +410,8 @@ void zmq::session_base_t::process_term (int linger_)
// If the termination of the pipe happens before the term command is // If the termination of the pipe happens before the term command is
// delivered there's nothing much to do. We can proceed with the // delivered there's nothing much to do. We can proceed with the
// standard termination immediately. // standard termination immediately.
if (!pipe && !zap_pipe) { if (!pipe && !zap_pipe && terminating_pipes.empty ()) {
proceed_with_term (); own_t::process_term (0);
return; return;
} }
...@@ -440,15 +441,6 @@ void zmq::session_base_t::process_term (int linger_) ...@@ -440,15 +441,6 @@ void zmq::session_base_t::process_term (int linger_)
zap_pipe->terminate (false); zap_pipe->terminate (false);
} }
void zmq::session_base_t::proceed_with_term ()
{
// The pending phase has just ended.
pending = false;
// Continue with standard termination.
own_t::process_term (0);
}
void zmq::session_base_t::timer_event (int id_) void zmq::session_base_t::timer_event (int id_)
{ {
// Linger period expired. We can proceed with termination even though // Linger period expired. We can proceed with termination even though
...@@ -465,8 +457,8 @@ void zmq::session_base_t::reconnect () ...@@ -465,8 +457,8 @@ void zmq::session_base_t::reconnect ()
{ {
// For delayed connect situations, terminate the pipe // For delayed connect situations, terminate the pipe
// and reestablish later on // and reestablish later on
if (pipe && options.immediate == 1 if (pipe && options.immediate == 1
&& addr->protocol != "pgm" && addr->protocol != "epgm" && addr->protocol != "pgm" && addr->protocol != "epgm"
&& addr->protocol != "norm") { && addr->protocol != "norm") {
pipe->hiccup (); pipe->hiccup ();
pipe->terminate (false); pipe->terminate (false);
...@@ -578,10 +570,9 @@ void zmq::session_base_t::start_connecting (bool wait_) ...@@ -578,10 +570,9 @@ void zmq::session_base_t::start_connecting (bool wait_)
return; return;
} }
#endif #endif
#ifdef ZMQ_HAVE_NORM #ifdef ZMQ_HAVE_NORM
if (addr->protocol == "norm") if (addr->protocol == "norm") {
{
// At this point we'll create message pipes to the session straight // At this point we'll create message pipes to the session straight
// away. There's no point in delaying it as no concept of 'connect' // away. There's no point in delaying it as no concept of 'connect'
// exists with NORM anyway. // exists with NORM anyway.
......
...@@ -112,12 +112,9 @@ namespace zmq ...@@ -112,12 +112,9 @@ namespace zmq
// Call this function when engine disconnect to get rid of leftovers. // Call this function when engine disconnect to get rid of leftovers.
void clean_pipes (); void clean_pipes ();
// Call this function to move on with the delayed process_term.
void proceed_with_term ();
// If true, this session (re)connects to the peer. Otherwise, it's // If true, this session (re)connects to the peer. Otherwise, it's
// a transient session created by the listener. // a transient session created by the listener.
bool active; const bool active;
// Pipe connecting the session to its socket. // Pipe connecting the session to its socket.
zmq::pipe_t *pipe; zmq::pipe_t *pipe;
......
...@@ -26,11 +26,11 @@ ...@@ -26,11 +26,11 @@
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_), socket_base_t (parent_, tid_, sid_),
verbose(false), verbose (false),
more (false) more (false),
lossy (true)
{ {
options.type = ZMQ_XPUB; options.type = ZMQ_XPUB;
lossy = true;
} }
zmq::xpub_t::~xpub_t () zmq::xpub_t::~xpub_t ()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment