Commit bbaa494f authored by Martin Sustrik's avatar Martin Sustrik

ZMQII-59: TCP server crashes sometimes when message is send and socket is closed immediately

parent d21bf21a
...@@ -67,6 +67,12 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_, ...@@ -67,6 +67,12 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_,
signalers.push_back (io_thread->get_signaler ()); signalers.push_back (io_thread->get_signaler ());
} }
// Create the administrative thread. Nothing special is needed. NULL
// is used instead of signaler given that as for now, administrative
// thread doesn't receive any commands. The only thing it is used for
// is sending 'stop' command to I/O threads on shutdown.
signalers.push_back (NULL);
// Create command pipe matrix. // Create command pipe matrix.
command_pipes = new (std::nothrow) command_pipe_t [signalers.size () * command_pipes = new (std::nothrow) command_pipe_t [signalers.size () *
signalers.size ()]; signalers.size ()];
...@@ -159,6 +165,23 @@ void zmq::dispatcher_t::destroy_socket () ...@@ -159,6 +165,23 @@ void zmq::dispatcher_t::destroy_socket ()
delete this; delete this;
} }
void zmq::dispatcher_t::write (int source_, int destination_,
const command_t &command_)
{
command_pipe_t &pipe =
command_pipes [source_ * signalers.size () + destination_];
pipe.write (command_);
if (!pipe.flush ())
signalers [destination_]->signal (source_);
}
bool zmq::dispatcher_t::read (int source_, int destination_,
command_t *command_)
{
return command_pipes [source_ * signalers.size () +
destination_].read (command_);
}
zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread () zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread ()
{ {
// Check whether thread ID is already assigned. If so, return it. // Check whether thread ID is already assigned. If so, return it.
......
...@@ -70,23 +70,11 @@ namespace zmq ...@@ -70,23 +70,11 @@ namespace zmq
int thread_slot_count (); int thread_slot_count ();
// Send command from the source to the destination. // Send command from the source to the destination.
inline void write (int source_, int destination_, void write (int source_, int destination_, const command_t &command_);
const command_t &command_)
{
command_pipe_t &pipe =
command_pipes [source_ * signalers.size () + destination_];
pipe.write (command_);
if (!pipe.flush ())
signalers [destination_]->signal (source_);
}
// Receive command from the source. Returns false if there is no // Receive command from the source. Returns false if there is no
// command available. // command available.
inline bool read (int source_, int destination_, command_t *command_) bool read (int source_, int destination_, command_t *command_);
{
return command_pipes [source_ * signalers.size () +
destination_].read (command_);
}
// Returns the I/O thread that is the least busy at the moment. // Returns the I/O thread that is the least busy at the moment.
// Taskset specifies which I/O threads are eligible (0 = all). // Taskset specifies which I/O threads are eligible (0 = all).
......
...@@ -146,12 +146,13 @@ zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_) ...@@ -146,12 +146,13 @@ zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_)
void zmq::object_t::send_stop () void zmq::object_t::send_stop ()
{ {
// Send command goes always to the current object. To-self pipe is // 'stop' command goes always from administrative thread to
// used exclusively for sending this command. // the current object.
int admin_thread_id = dispatcher->thread_slot_count () - 1;
command_t cmd; command_t cmd;
cmd.destination = this; cmd.destination = this;
cmd.type = command_t::stop; cmd.type = command_t::stop;
dispatcher->write (thread_slot, thread_slot, cmd); dispatcher->write (admin_thread_id, thread_slot, cmd);
} }
void zmq::object_t::send_plug (owned_t *destination_, bool inc_seqnum_) void zmq::object_t::send_plug (owned_t *destination_, bool inc_seqnum_)
...@@ -314,9 +315,6 @@ void zmq::object_t::process_seqnum () ...@@ -314,9 +315,6 @@ void zmq::object_t::process_seqnum ()
void zmq::object_t::send_command (command_t &cmd_) void zmq::object_t::send_command (command_t &cmd_)
{ {
int destination_thread_slot = cmd_.destination->get_thread_slot (); int destination_thread_slot = cmd_.destination->get_thread_slot ();
if (destination_thread_slot == thread_slot) dispatcher->write (thread_slot, destination_thread_slot, cmd_);
cmd_.destination->process_command (cmd_);
else
dispatcher->write (thread_slot, destination_thread_slot, cmd_);
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment