Commit 954d7f74 authored by Martin Sustrik's avatar Martin Sustrik

Avoid duplicate creation of pipes for a single session

When a session is being closed down its inbound and outbound
pipe pointers are set to null. If (re) connection happens at
that time, session may try to reinistantiate the pipes which
is wrong. This patch allows session to attach pipes only once
in its lifetime.
Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent c6d74e0a
...@@ -36,6 +36,7 @@ zmq::session_t::session_t (class io_thread_t *io_thread_, ...@@ -36,6 +36,7 @@ zmq::session_t::session_t (class io_thread_t *io_thread_,
engine (NULL), engine (NULL),
socket (socket_), socket (socket_),
io_thread (io_thread_), io_thread (io_thread_),
pipes_attached (false),
delimiter_processed (false), delimiter_processed (false),
force_terminate (false), force_terminate (false),
state (active) state (active)
...@@ -125,6 +126,9 @@ void zmq::session_t::clean_pipes () ...@@ -125,6 +126,9 @@ void zmq::session_t::clean_pipes ()
void zmq::session_t::attach_pipes (class reader_t *inpipe_, void zmq::session_t::attach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_) class writer_t *outpipe_, const blob_t &peer_identity_)
{ {
zmq_assert (!pipes_attached);
pipes_attached = true;
if (inpipe_) { if (inpipe_) {
zmq_assert (!in_pipe); zmq_assert (!in_pipe);
in_pipe = inpipe_; in_pipe = inpipe_;
...@@ -218,16 +222,19 @@ void zmq::session_t::process_attach (i_engine *engine_, ...@@ -218,16 +222,19 @@ void zmq::session_t::process_attach (i_engine *engine_,
// Check whether the required pipes already exist. If not so, we'll // Check whether the required pipes already exist. If not so, we'll
// create them and bind them to the socket object. // create them and bind them to the socket object.
if (!pipes_attached) {
zmq_assert (!in_pipe && !out_pipe);
pipes_attached = true;
reader_t *socket_reader = NULL; reader_t *socket_reader = NULL;
writer_t *socket_writer = NULL; writer_t *socket_writer = NULL;
// Create the pipes, if required. // Create the pipes, as required.
if (options.requires_in && !out_pipe) { if (options.requires_in) {
create_pipe (socket, this, options.hwm, options.swap, &socket_reader, create_pipe (socket, this, options.hwm, options.swap, &socket_reader,
&out_pipe); &out_pipe);
out_pipe->set_event_sink (this); out_pipe->set_event_sink (this);
} }
if (options.requires_out && !in_pipe) { if (options.requires_out) {
create_pipe (this, socket, options.hwm, options.swap, &in_pipe, create_pipe (this, socket, options.hwm, options.swap, &in_pipe,
&socket_writer); &socket_writer);
in_pipe->set_event_sink (this); in_pipe->set_event_sink (this);
...@@ -236,6 +243,7 @@ void zmq::session_t::process_attach (i_engine *engine_, ...@@ -236,6 +243,7 @@ void zmq::session_t::process_attach (i_engine *engine_,
// Bind the pipes to the socket object. // Bind the pipes to the socket object.
if (socket_reader || socket_writer) if (socket_reader || socket_writer)
send_bind (socket, socket_reader, socket_writer, peer_identity_); send_bind (socket, socket_reader, socket_writer, peer_identity_);
}
// Plug in the engine. // Plug in the engine.
zmq_assert (!engine); zmq_assert (!engine);
......
...@@ -117,6 +117,9 @@ namespace zmq ...@@ -117,6 +117,9 @@ namespace zmq
// the engines into the same thread. // the engines into the same thread.
class io_thread_t *io_thread; class io_thread_t *io_thread;
// If true, pipes were already attached to this session.
bool pipes_attached;
// If true, delimiter was already read from the inbound pipe. // If true, delimiter was already read from the inbound pipe.
bool delimiter_processed; bool delimiter_processed;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment