Commit b19ee99b authored by Dhammika Pathirana's avatar Dhammika Pathirana Committed by Martin Sustrik

fix race condition in session init

Signed-off-by: 's avatarDhammika Pathirana <dhammika@gmail.com>
parent 27e83cc5
......@@ -40,6 +40,7 @@ zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) :
outsize (0),
encoder (out_batch_size),
inout (NULL),
ephemeral_inout (NULL),
options (options_),
plugged (false)
{
......@@ -57,8 +58,9 @@ void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_inout *inout_)
{
zmq_assert (!plugged);
plugged = true;
ephemeral_inout = NULL;
// Conncet to session/init object.
// Connect to session/init object.
zmq_assert (!inout);
zmq_assert (inout_);
encoder.set_inout (inout_);
......@@ -89,6 +91,7 @@ void zmq::zmq_engine_t::unplug ()
// Disconnect from init/session object.
encoder.set_inout (NULL);
decoder.set_inout (NULL);
ephemeral_inout = inout;
inout = NULL;
}
......@@ -139,7 +142,13 @@ void zmq::zmq_engine_t::in_event ()
}
// Flush all messages the decoder may have produced.
inout->flush ();
// If IO handler has unplugged engine, flush transient IO handler.
if (unlikely (!plugged)) {
zmq_assert (ephemeral_inout);
ephemeral_inout->flush ();
} else {
inout->flush ();
}
if (disconnection)
error ();
......@@ -152,7 +161,14 @@ void zmq::zmq_engine_t::out_event ()
outpos = NULL;
encoder.get_data (&outpos, &outsize);
// If IO handler has unplugged engine, flush transient IO handler.
if (unlikely (!plugged)) {
zmq_assert (ephemeral_inout);
ephemeral_inout->flush ();
return;
}
// If there is no data to send, stop polling for output.
if (outsize == 0) {
reset_pollout (handle);
......
......@@ -70,6 +70,9 @@ namespace zmq
i_inout *inout;
// Detached transient inout handler.
i_inout *ephemeral_inout;
options_t options;
bool plugged;
......
......@@ -34,6 +34,7 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
socket_base_t *socket_, session_t *session_, fd_t fd_,
const options_t &options_) :
own_t (io_thread_, options_),
ephemeral_engine (NULL),
sent (false),
received (false),
socket (socket_),
......@@ -64,8 +65,7 @@ bool zmq::zmq_init_t::read (::zmq_msg_t *msg_)
options.identity.size ());
sent = true;
// If initialisation is done, pass the engine to the session and
// destroy the init object.
// Try finalize initialization.
finalise_initialisation ();
return true;
......@@ -92,6 +92,9 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
received = true;
// Try finalize initialization.
finalise_initialisation ();
return true;
}
......@@ -101,9 +104,9 @@ void zmq::zmq_init_t::flush ()
if (!received)
return;
// If initialisation is done, pass the engine to the session and
// destroy the init object.
finalise_initialisation ();
// Initialization is done, dispatch engine.
if (ephemeral_engine)
dispatch_engine ();
}
void zmq::zmq_init_t::detach ()
......@@ -134,18 +137,31 @@ void zmq::zmq_init_t::process_unplug ()
}
void zmq::zmq_init_t::finalise_initialisation ()
{
// Unplug and prepare to dispatch engine.
if (sent && received) {
ephemeral_engine = engine;
engine = NULL;
ephemeral_engine->unplug ();
return;
}
}
void zmq::zmq_init_t::dispatch_engine ()
{
if (sent && received) {
// Engine must be detached.
zmq_assert (!engine);
zmq_assert (ephemeral_engine);
// If we know what session we belong to, it's easy, just send the
// engine to that session and destroy the init object. Note that we
// know about the session only if this object is owned by it. Thus,
// lifetime of this object in contained in the lifetime of the session
// so the pointer cannot become invalid without notice.
if (session) {
engine->unplug ();
send_attach (session, engine, peer_identity, true);
engine = NULL;
send_attach (session, ephemeral_engine, peer_identity, true);
terminate ();
return;
}
......@@ -165,9 +181,7 @@ void zmq::zmq_init_t::finalise_initialisation ()
zmq_assert (session);
session->inc_seqnum ();
launch_sibling (session);
engine->unplug ();
send_attach (session, engine, peer_identity, false);
engine = NULL;
send_attach (session, ephemeral_engine, peer_identity, false);
terminate ();
return;
}
......@@ -178,9 +192,7 @@ void zmq::zmq_init_t::finalise_initialisation ()
// than by send_attach.
session = socket->find_session (peer_identity);
if (session) {
engine->unplug ();
send_attach (session, engine, peer_identity, false);
engine = NULL;
send_attach (session, ephemeral_engine, peer_identity, false);
terminate ();
return;
}
......@@ -194,9 +206,7 @@ void zmq::zmq_init_t::finalise_initialisation ()
zmq_assert (session);
session->inc_seqnum ();
launch_sibling (session);
engine->unplug ();
send_attach (session, engine, peer_identity, false);
engine = NULL;
send_attach (session, ephemeral_engine, peer_identity, false);
terminate ();
return;
}
......
......@@ -44,6 +44,7 @@ namespace zmq
private:
void finalise_initialisation ();
void dispatch_engine ();
// i_inout interface implementation.
bool read (::zmq_msg_t *msg_);
......@@ -58,6 +59,9 @@ namespace zmq
// Associated wire-protocol engine.
i_engine *engine;
// Detached transient engine.
i_engine *ephemeral_engine;
// True if our own identity was already sent to the peer.
bool sent;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment