Commit a9492a08 authored by Richard Newton's avatar Richard Newton

Merge pull request #770 from hurtonm/master

Code cleanup
parents 39e2b799 973d13d5
...@@ -33,13 +33,13 @@ ...@@ -33,13 +33,13 @@
#include "req.hpp" #include "req.hpp"
zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
bool connect_, class socket_base_t *socket_, const options_t &options_, bool active_, class socket_base_t *socket_, const options_t &options_,
const address_t *addr_) const address_t *addr_)
{ {
session_base_t *s = NULL; session_base_t *s = NULL;
switch (options_.type) { switch (options_.type) {
case ZMQ_REQ: case ZMQ_REQ:
s = new (std::nothrow) req_session_t (io_thread_, connect_, s = new (std::nothrow) req_session_t (io_thread_, active_,
socket_, options_, addr_); socket_, options_, addr_);
break; break;
case ZMQ_DEALER: case ZMQ_DEALER:
...@@ -53,7 +53,7 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, ...@@ -53,7 +53,7 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
case ZMQ_PULL: case ZMQ_PULL:
case ZMQ_PAIR: case ZMQ_PAIR:
case ZMQ_STREAM: case ZMQ_STREAM:
s = new (std::nothrow) session_base_t (io_thread_, connect_, s = new (std::nothrow) session_base_t (io_thread_, active_,
socket_, options_, addr_); socket_, options_, addr_);
break; break;
default: default:
...@@ -65,11 +65,11 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, ...@@ -65,11 +65,11 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
} }
zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
bool connect_, class socket_base_t *socket_, const options_t &options_, bool active_, class socket_base_t *socket_, const options_t &options_,
const address_t *addr_) : const address_t *addr_) :
own_t (io_thread_, options_), own_t (io_thread_, options_),
io_object_t (io_thread_), io_object_t (io_thread_),
connect (connect_), active (active_),
pipe (NULL), pipe (NULL),
zap_pipe (NULL), zap_pipe (NULL),
incomplete_in (false), incomplete_in (false),
...@@ -177,23 +177,22 @@ void zmq::session_base_t::flush () ...@@ -177,23 +177,22 @@ void zmq::session_base_t::flush ()
void zmq::session_base_t::clean_pipes () void zmq::session_base_t::clean_pipes ()
{ {
if (pipe) { zmq_assert (pipe != NULL);
// Get rid of half-processed messages in the out pipe. Flush any // Get rid of half-processed messages in the out pipe. Flush any
// unflushed messages upstream. // unflushed messages upstream.
pipe->rollback (); pipe->rollback ();
pipe->flush (); pipe->flush ();
// Remove any half-read message from the in pipe. // Remove any half-read message from the in pipe.
while (incomplete_in) { while (incomplete_in) {
msg_t msg; msg_t msg;
int rc = msg.init (); int rc = msg.init ();
errno_assert (rc == 0); errno_assert (rc == 0);
rc = pull_msg (&msg); rc = pull_msg (&msg);
errno_assert (rc == 0); errno_assert (rc == 0);
rc = msg.close (); rc = msg.close ();
errno_assert (rc == 0); errno_assert (rc == 0);
}
} }
} }
...@@ -208,9 +207,8 @@ void zmq::session_base_t::pipe_terminated (pipe_t *pipe_) ...@@ -208,9 +207,8 @@ void zmq::session_base_t::pipe_terminated (pipe_t *pipe_)
// If this is our current pipe, remove it // If this is our current pipe, remove it
pipe = NULL; pipe = NULL;
else else
if (pipe_ == zap_pipe) { if (pipe_ == zap_pipe)
zap_pipe = NULL; zap_pipe = NULL;
}
else else
// Remove the pipe from the detached pipes set // Remove the pipe from the detached pipes set
terminating_pipes.erase (pipe_); terminating_pipes.erase (pipe_);
...@@ -275,7 +273,7 @@ zmq::socket_base_t *zmq::session_base_t::get_socket () ...@@ -275,7 +273,7 @@ zmq::socket_base_t *zmq::session_base_t::get_socket ()
void zmq::session_base_t::process_plug () void zmq::session_base_t::process_plug ()
{ {
if (connect) if (active)
start_connecting (false); start_connecting (false);
} }
...@@ -364,16 +362,19 @@ void zmq::session_base_t::process_attach (i_engine *engine_) ...@@ -364,16 +362,19 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
engine->plug (io_thread, this); engine->plug (io_thread, this);
} }
void zmq::session_base_t::detach () void zmq::session_base_t::engine_error ()
{ {
// Engine is dead. Let's forget about it. // Engine is dead. Let's forget about it.
engine = NULL; engine = NULL;
// Remove any half-done messages from the pipes. // Remove any half-done messages from the pipes.
clean_pipes (); if (pipe)
clean_pipes ();
// Send the event to the derived class. if (active)
detached (); reconnect ();
else
terminate ();
// Just in case there's only a delimiter in the pipe. // Just in case there's only a delimiter in the pipe.
if (pipe) if (pipe)
...@@ -432,7 +433,6 @@ void zmq::session_base_t::proceed_with_term () ...@@ -432,7 +433,6 @@ void zmq::session_base_t::proceed_with_term ()
void zmq::session_base_t::timer_event (int id_) void zmq::session_base_t::timer_event (int id_)
{ {
// Linger period expired. We can proceed with termination even though // Linger period expired. We can proceed with termination even though
// there are still pending messages to be sent. // there are still pending messages to be sent.
zmq_assert (id_ == linger_timer_id); zmq_assert (id_ == linger_timer_id);
...@@ -443,14 +443,8 @@ void zmq::session_base_t::timer_event (int id_) ...@@ -443,14 +443,8 @@ void zmq::session_base_t::timer_event (int id_)
pipe->terminate (false); pipe->terminate (false);
} }
void zmq::session_base_t::detached () void zmq::session_base_t::reconnect ()
{ {
// Transient session self-destructs after peer disconnects.
if (!connect) {
terminate ();
return;
}
// For delayed connect situations, terminate the pipe // For delayed connect situations, terminate the pipe
// and reestablish later on // and reestablish later on
if (pipe && options.immediate == 1 if (pipe && options.immediate == 1
...@@ -475,7 +469,7 @@ void zmq::session_base_t::detached () ...@@ -475,7 +469,7 @@ void zmq::session_base_t::detached ()
void zmq::session_base_t::start_connecting (bool wait_) void zmq::session_base_t::start_connecting (bool wait_)
{ {
zmq_assert (connect); zmq_assert (active);
// Choose I/O thread to run connecter in. Given that we are already // Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available. // running in an I/O thread, there must be at least one available.
...@@ -506,12 +500,11 @@ void zmq::session_base_t::start_connecting (bool wait_) ...@@ -506,12 +500,11 @@ void zmq::session_base_t::start_connecting (bool wait_)
tipc_connecter_t *connecter = new (std::nothrow) tipc_connecter_t ( tipc_connecter_t *connecter = new (std::nothrow) tipc_connecter_t (
io_thread, this, options, addr, wait_); io_thread, this, options, addr, wait_);
alloc_assert (connecter); alloc_assert (connecter);
launch_child(connecter); launch_child (connecter);
return; return;
} }
#endif #endif
#ifdef ZMQ_HAVE_OPENPGM #ifdef ZMQ_HAVE_OPENPGM
// Both PGM and EPGM transports are using the same infrastructure. // Both PGM and EPGM transports are using the same infrastructure.
......
...@@ -46,7 +46,7 @@ namespace zmq ...@@ -46,7 +46,7 @@ namespace zmq
// Create a session of the particular type. // Create a session of the particular type.
static session_base_t *create (zmq::io_thread_t *io_thread_, static session_base_t *create (zmq::io_thread_t *io_thread_,
bool connect_, zmq::socket_base_t *socket_, bool active_, zmq::socket_base_t *socket_,
const options_t &options_, const address_t *addr_); const options_t &options_, const address_t *addr_);
// To be used once only, when creating the session. // To be used once only, when creating the session.
...@@ -55,7 +55,7 @@ namespace zmq ...@@ -55,7 +55,7 @@ namespace zmq
// Following functions are the interface exposed towards the engine. // Following functions are the interface exposed towards the engine.
virtual void reset (); virtual void reset ();
void flush (); void flush ();
void detach (); void engine_error ();
// i_pipe_events interface implementation. // i_pipe_events interface implementation.
void read_activated (zmq::pipe_t *pipe_); void read_activated (zmq::pipe_t *pipe_);
...@@ -88,7 +88,7 @@ namespace zmq ...@@ -88,7 +88,7 @@ namespace zmq
protected: protected:
session_base_t (zmq::io_thread_t *io_thread_, bool connect_, session_base_t (zmq::io_thread_t *io_thread_, bool active_,
zmq::socket_base_t *socket_, const options_t &options_, zmq::socket_base_t *socket_, const options_t &options_,
const address_t *addr_); const address_t *addr_);
virtual ~session_base_t (); virtual ~session_base_t ();
...@@ -97,7 +97,7 @@ namespace zmq ...@@ -97,7 +97,7 @@ namespace zmq
void start_connecting (bool wait_); void start_connecting (bool wait_);
void detached (); void reconnect ();
// Handlers for incoming commands. // Handlers for incoming commands.
void process_plug (); void process_plug ();
...@@ -116,7 +116,7 @@ namespace zmq ...@@ -116,7 +116,7 @@ namespace zmq
// If true, this session (re)connects to the peer. Otherwise, it's // If true, this session (re)connects to the peer. Otherwise, it's
// a transient session created by the listener. // a transient session created by the listener.
bool connect; bool active;
// Pipe connecting the session to its socket. // Pipe connecting the session to its socket.
zmq::pipe_t *pipe; zmq::pipe_t *pipe;
......
...@@ -743,7 +743,7 @@ void zmq::stream_engine_t::error () ...@@ -743,7 +743,7 @@ void zmq::stream_engine_t::error ()
zmq_assert (session); zmq_assert (session);
socket->event_disconnected (endpoint, s); socket->event_disconnected (endpoint, s);
session->flush (); session->flush ();
session->detach (); session->engine_error ();
unplug (); unplug ();
delete this; delete this;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment