Commit 5b167aa8 authored by Ian Barber's avatar Ian Barber

Revert "Remove the extra outpipe handling as the session is quite capable of…

Revert "Remove the extra outpipe handling as the session is quite capable of delaying the creation of the pipe until the connection has happened. Simply don't build the pipe, and let it do that automatically."

This reverts commit 06485d92.
parent 81b8362a
...@@ -111,6 +111,7 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, ...@@ -111,6 +111,7 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
io_object_t (io_thread_), io_object_t (io_thread_),
connect (connect_), connect (connect_),
pipe (NULL), pipe (NULL),
outpipe (NULL),
incomplete_in (false), incomplete_in (false),
pending (false), pending (false),
engine (NULL), engine (NULL),
...@@ -150,6 +151,13 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_) ...@@ -150,6 +151,13 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
pipe->set_event_sink (this); pipe->set_event_sink (this);
} }
void zmq::session_base_t::onconnect_attach_pipe (pipe_t *pipe_)
{
zmq_assert (!is_terminating ());
zmq_assert (pipe_);
outpipe = pipe_;
}
int zmq::session_base_t::read (msg_t *msg_) int zmq::session_base_t::read (msg_t *msg_)
{ {
// First message to send is identity (if required). // First message to send is identity (if required).
...@@ -228,7 +236,12 @@ void zmq::session_base_t::clean_pipes () ...@@ -228,7 +236,12 @@ void zmq::session_base_t::clean_pipes ()
} }
void zmq::session_base_t::terminated (pipe_t *pipe_) void zmq::session_base_t::terminated (pipe_t *pipe_)
{ {
// If we get a term signal from our held outpipe
// we can safely ignore it.
if (pipe_ == outpipe)
return;
// Drop the reference to the deallocated pipe. // Drop the reference to the deallocated pipe.
zmq_assert (pipe == pipe_); zmq_assert (pipe == pipe_);
pipe = NULL; pipe = NULL;
...@@ -306,10 +319,16 @@ void zmq::session_base_t::process_attach (i_engine *engine_) ...@@ -306,10 +319,16 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
zmq_assert (!pipe); zmq_assert (!pipe);
pipe = pipes [0]; pipe = pipes [0];
// Ask socket to plug into the pipe. // Ask socket to plug into the remote end of the pipe.
send_bind (socket, pipes [1]); send_bind (socket, pipes [1]);
} }
if (outpipe && options.delay_attach_on_connect) {
send_bind (socket, outpipe);
// Forget the outpipe
outpipe = NULL;
}
// Plug in the engine. // Plug in the engine.
zmq_assert (!engine); zmq_assert (!engine);
engine = engine_; engine = engine_;
...@@ -358,6 +377,12 @@ void zmq::session_base_t::process_term (int linger_) ...@@ -358,6 +377,12 @@ void zmq::session_base_t::process_term (int linger_)
// Start pipe termination process. Delay the termination till all messages // Start pipe termination process. Delay the termination till all messages
// are processed in case the linger time is non-zero. // are processed in case the linger time is non-zero.
pipe->terminate (linger_ != 0); pipe->terminate (linger_ != 0);
// If we're storing a pipe to be connected, we can clear that as well
if (outpipe) {
outpipe->set_event_sink (this);
outpipe->terminate (linger_ != 0);
}
// TODO: Should this go into pipe_t::terminate ? // TODO: Should this go into pipe_t::terminate ?
// In case there's no engine and there's only delimiter in the // In case there's no engine and there's only delimiter in the
...@@ -385,6 +410,9 @@ void zmq::session_base_t::timer_event (int id_) ...@@ -385,6 +410,9 @@ void zmq::session_base_t::timer_event (int id_)
// Ask pipe to terminate even though there may be pending messages in it. // Ask pipe to terminate even though there may be pending messages in it.
zmq_assert (pipe); zmq_assert (pipe);
pipe->terminate (false); pipe->terminate (false);
if (outpipe)
outpipe->terminate (false);
} }
void zmq::session_base_t::detached () void zmq::session_base_t::detached ()
......
...@@ -106,7 +106,10 @@ namespace zmq ...@@ -106,7 +106,10 @@ namespace zmq
// Pipe connecting the session to its socket. // Pipe connecting the session to its socket.
zmq::pipe_t *pipe; zmq::pipe_t *pipe;
// Pipe connecting the socket to the client
zmq::pipe_t *outpipe;
// This flag is true if the remainder of the message being processed // This flag is true if the remainder of the message being processed
// is still in the in pipe. // is still in the in pipe.
bool incomplete_in; bool incomplete_in;
......
...@@ -529,28 +529,29 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -529,28 +529,29 @@ int zmq::socket_base_t::connect (const char *addr_)
session_base_t *session = session_base_t::create (io_thread, true, this, session_base_t *session = session_base_t::create (io_thread, true, this,
options, paddr); options, paddr);
errno_assert (session); errno_assert (session);
// Create a bi-directional pipe.
object_t *parents [2] = {this, session};
pipe_t *pipes [2] = {NULL, NULL};
int hwms [2] = {options.sndhwm, options.rcvhwm};
bool delays [2] = {options.delay_on_disconnect, options.delay_on_close};
rc = pipepair (parents, pipes, hwms, delays);
errno_assert (rc == 0);
// PGM does not support subscription forwarding; ask for all data to be // PGM does not support subscription forwarding; ask for all data to be
// sent to this pipe. // sent to this pipe.
bool icanhasall = false; bool icanhasall = false;
if (protocol == "pgm" || protocol == "epgm") if (protocol == "pgm" || protocol == "epgm")
icanhasall = true; icanhasall = true;
if (options.delay_attach_on_connect != 1 && icanhasall != true) {
// Create a bi-directional pipe.
object_t *parents [2] = {this, session};
pipe_t *pipes [2] = {NULL, NULL};
int hwms [2] = {options.sndhwm, options.rcvhwm};
bool delays [2] = {options.delay_on_disconnect, options.delay_on_close};
rc = pipepair (parents, pipes, hwms, delays);
errno_assert (rc == 0);
// Attach local end of the pipe to the socket object. // Attach local end of the pipe to the socket object.
if (options.delay_attach_on_connect == 0)
attach_pipe (pipes [0], icanhasall); attach_pipe (pipes [0], icanhasall);
// Attach remote end of the pipe to the session object later on. // Attach remote end of the pipe to the session object later on.
session->attach_pipe (pipes [1]); session->attach_pipe (pipes [1]);
} if (options.delay_attach_on_connect == 1)
session->onconnect_attach_pipe (pipes [0]);
// Save last endpoint URI // Save last endpoint URI
paddr->to_string (options.last_endpoint); paddr->to_string (options.last_endpoint);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment