Commit 80ac398b authored by Martin Sustrik's avatar Martin Sustrik

Initial implementation of reaper thread.

Reaper thread destroys the socket asynchronously.
zmq_term() can be interrupted by a signal (EINTR).
zmq_socket() will return ETERM after zmq_term() was called.
Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent 889424e6
...@@ -315,7 +315,8 @@ ERRORS ...@@ -315,7 +315,8 @@ ERRORS
The requested socket 'type' is invalid. The requested socket 'type' is invalid.
*EFAULT*:: *EFAULT*::
The provided 'context' was not valid (NULL). The provided 'context' was not valid (NULL).
*ETERM*::
The context specified was terminated.
SEE ALSO SEE ALSO
-------- --------
......
...@@ -47,6 +47,8 @@ ERRORS ...@@ -47,6 +47,8 @@ ERRORS
------ ------
*EFAULT*:: *EFAULT*::
The provided 'context' was not valid (NULL). The provided 'context' was not valid (NULL).
*EINTR*::
Termination was interrupted by a signal. It can be restarted if needed.
SEE ALSO SEE ALSO
......
...@@ -107,6 +107,7 @@ libzmq_la_SOURCES = \ ...@@ -107,6 +107,7 @@ libzmq_la_SOURCES = \
pub.hpp \ pub.hpp \
pull.hpp \ pull.hpp \
push.hpp \ push.hpp \
reaper.hpp \
rep.hpp \ rep.hpp \
req.hpp \ req.hpp \
select.hpp \ select.hpp \
...@@ -166,6 +167,7 @@ libzmq_la_SOURCES = \ ...@@ -166,6 +167,7 @@ libzmq_la_SOURCES = \
poller_base.cpp \ poller_base.cpp \
pull.cpp \ pull.cpp \
push.cpp \ push.cpp \
reaper.cpp \
pub.cpp \ pub.cpp \
rep.cpp \ rep.cpp \
req.cpp \ req.cpp \
......
...@@ -45,7 +45,9 @@ namespace zmq ...@@ -45,7 +45,9 @@ namespace zmq
pipe_term_ack, pipe_term_ack,
term_req, term_req,
term, term,
term_ack term_ack,
reap,
done
} type; } type;
union { union {
...@@ -117,6 +119,17 @@ namespace zmq ...@@ -117,6 +119,17 @@ namespace zmq
struct { struct {
} term_ack; } term_ack;
// Transfers the ownership of the closed socket
// to the reaper thread.
struct {
class socket_base_t *socket;
} reap;
// Sent by reaper thread to the term thread when all the sockets
// are successfully deallocated.
struct {
} done;
} args; } args;
}; };
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#include "socket_base.hpp" #include "socket_base.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "platform.hpp" #include "platform.hpp"
#include "reaper.hpp"
#include "err.hpp" #include "err.hpp"
#include "pipe.hpp" #include "pipe.hpp"
...@@ -34,7 +35,7 @@ ...@@ -34,7 +35,7 @@
#endif #endif
zmq::ctx_t::ctx_t (uint32_t io_threads_) : zmq::ctx_t::ctx_t (uint32_t io_threads_) :
no_sockets_notify (false) terminating (false)
{ {
int rc; int rc;
...@@ -49,13 +50,23 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) : ...@@ -49,13 +50,23 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
HIBYTE (wsa_data.wVersion) == 2); HIBYTE (wsa_data.wVersion) == 2);
#endif #endif
// Initialise the array of mailboxes. +1 accounts for internal log socket. // Initialise the array of mailboxes. Additional three slots are for
slot_count = max_sockets + io_threads_ + 1; // internal log socket and the zmq_term thread the reaper thread.
slot_count = max_sockets + io_threads_ + 3;
slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count); slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count);
zmq_assert (slots); zmq_assert (slots);
// Initialise the infrastructure for zmq_term thread.
slots [term_tid] = &term_mailbox;
// Create the reaper thread.
reaper = new (std::nothrow) reaper_t (this, reaper_tid);
zmq_assert (reaper);
slots [reaper_tid] = reaper->get_mailbox ();
reaper->start ();
// Create I/O thread objects and launch them. // Create I/O thread objects and launch them.
for (uint32_t i = 0; i != io_threads_; i++) { for (uint32_t i = 2; i != io_threads_ + 2; i++) {
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i); io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
zmq_assert (io_thread); zmq_assert (io_thread);
io_threads.push_back (io_thread); io_threads.push_back (io_thread);
...@@ -65,7 +76,7 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) : ...@@ -65,7 +76,7 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
// In the unused part of the slot array, create a list of empty slots. // In the unused part of the slot array, create a list of empty slots.
for (int32_t i = (int32_t) slot_count - 1; for (int32_t i = (int32_t) slot_count - 1;
i >= (int32_t) io_threads_; i--) { i >= (int32_t) io_threads_ + 2; i--) {
empty_slots.push_back (i); empty_slots.push_back (i);
slots [i] = NULL; slots [i] = NULL;
} }
...@@ -79,9 +90,8 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) : ...@@ -79,9 +90,8 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
zmq::ctx_t::~ctx_t () zmq::ctx_t::~ctx_t ()
{ {
// Check that there are no remaining open or zombie sockets. // Check that there are no remaining sockets.
zmq_assert (sockets.empty ()); zmq_assert (sockets.empty ());
zmq_assert (zombies.empty ());
// Ask I/O threads to terminate. If stop signal wasn't sent to I/O // Ask I/O threads to terminate. If stop signal wasn't sent to I/O
// thread subsequent invocation of destructor would hang-up. // thread subsequent invocation of destructor would hang-up.
...@@ -92,6 +102,9 @@ zmq::ctx_t::~ctx_t () ...@@ -92,6 +102,9 @@ zmq::ctx_t::~ctx_t ()
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
delete io_threads [i]; delete io_threads [i];
// Deallocate the reaper thread object.
delete reaper;
// Deallocate the array of mailboxes. No special work is // Deallocate the array of mailboxes. No special work is
// needed as mailboxes themselves were deallocated with their // needed as mailboxes themselves were deallocated with their
// corresponding io_thread/socket objects. // corresponding io_thread/socket objects.
...@@ -106,52 +119,42 @@ zmq::ctx_t::~ctx_t () ...@@ -106,52 +119,42 @@ zmq::ctx_t::~ctx_t ()
int zmq::ctx_t::terminate () int zmq::ctx_t::terminate ()
{ {
// Close the logging infrastructure. // Check whether termination was already underway, but interrupted and now
log_sync.lock (); // restarted.
int rc = log_socket->close ();
zmq_assert (rc == 0);
log_socket = NULL;
log_sync.unlock ();
// First send stop command to sockets so that any
// blocking calls are interrupted.
slot_sync.lock (); slot_sync.lock ();
for (sockets_t::size_type i = 0; i != sockets.size (); i++) bool restarted = terminating;
sockets [i]->stop ();
if (!sockets.empty ())
no_sockets_notify = true;
slot_sync.unlock (); slot_sync.unlock ();
// Find out whether there are any open sockets to care about. // First attempt to terminate the context.
// If there are open sockets, sleep till they are closed. Note that we can if (!restarted) {
// use no_sockets_notify safely out of the critical section as once set
// its value is never changed again. // Close the logging infrastructure.
if (no_sockets_notify) log_sync.lock ();
no_sockets_sync.wait (); int rc = log_socket->close ();
zmq_assert (rc == 0);
log_socket = NULL;
log_sync.unlock ();
// First send stop command to sockets so that any blocking calls can be
// interrupted. If there are no sockets we can ask reaper thread to stop.
slot_sync.lock ();
terminating = true;
for (sockets_t::size_type i = 0; i != sockets.size (); i++)
sockets [i]->stop ();
if (sockets.empty ())
reaper->stop ();
slot_sync.unlock ();
}
// Note that the lock won't block anyone here. There's noone else having // Wait till reaper thread closes all the sockets.
// open sockets anyway. The only purpose of the lock is to double-check all command_t cmd;
// the CPU caches have been synchronised. int rc = term_mailbox.recv (&cmd, true);
if (rc == -1 && errno == EINTR)
return -1;
zmq_assert (rc == 0);
zmq_assert (cmd.type == command_t::done);
slot_sync.lock (); slot_sync.lock ();
// At this point there should be no active sockets. What we have is a set
// of zombies waiting to be dezombified.
zmq_assert (sockets.empty ()); zmq_assert (sockets.empty ());
// Get rid of remaining zombie sockets.
while (!zombies.empty ()) {
dezombify ();
// Sleep for 1ms not to end up busy-looping in the case the I/O threads
// are still busy sending data. We can possibly add a grand poll here
// (polling for fds associated with all the zombie sockets), but it's
// probably not worth of implementing it.
#if defined ZMQ_HAVE_WINDOWS
Sleep (1);
#else
usleep (1000);
#endif
}
slot_sync.unlock (); slot_sync.unlock ();
// Deallocate the resources. // Deallocate the resources.
...@@ -164,8 +167,12 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) ...@@ -164,8 +167,12 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
{ {
slot_sync.lock (); slot_sync.lock ();
// Free the slots, if possible. // Once zmq_term() was called, we can't create new sockets.
dezombify (); if (terminating) {
slot_sync.unlock ();
errno = ETERM;
return NULL;
}
// If max_sockets limit was reached, return error. // If max_sockets limit was reached, return error.
if (empty_slots.empty ()) { if (empty_slots.empty ()) {
...@@ -193,29 +200,31 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) ...@@ -193,29 +200,31 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
return s; return s;
} }
void zmq::ctx_t::zombify_socket (socket_base_t *socket_) void zmq::ctx_t::destroy_socket (class socket_base_t *socket_)
{ {
// Zombification of socket basically means that its ownership is tranferred
// from the application that created it to the context.
// Note that the lock provides the memory barrier needed to migrate
// zombie-to-be socket from it's native thread to shared data area
// synchronised by slot_sync.
slot_sync.lock (); slot_sync.lock ();
sockets.erase (socket_);
zombies.push_back (socket_);
// Try to get rid of at least some zombie sockets at this point. // Free the associared thread slot.
dezombify (); uint32_t tid = socket_->get_tid ();
empty_slots.push_back (tid);
slots [tid] = NULL;
// If shutdown thread is interested in notification about no more // Remove the socket from the list of sockets.
// open sockets, notify it now. sockets.erase (socket_);
if (sockets.empty () && no_sockets_notify)
no_sockets_sync.post (); // If zmq_term() was already called and there are no more socket
// we can ask reaper thread to terminate.
if (terminating && sockets.empty ())
reaper->stop ();
slot_sync.unlock (); slot_sync.unlock ();
} }
zmq::object_t *zmq::ctx_t::get_reaper ()
{
return reaper;
}
void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_) void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
{ {
slots [tid_]->send (command_); slots [tid_]->send (command_);
...@@ -309,25 +318,4 @@ void zmq::ctx_t::log (zmq_msg_t *msg_) ...@@ -309,25 +318,4 @@ void zmq::ctx_t::log (zmq_msg_t *msg_)
log_sync.unlock (); log_sync.unlock ();
} }
void zmq::ctx_t::dezombify ()
{
// Try to dezombify each zombie in the list. Note that caller is
// responsible for calling this method in the slot_sync critical section.
for (zombies_t::iterator it = zombies.begin (); it != zombies.end ();) {
uint32_t tid = (*it)->get_tid ();
if ((*it)->dezombify ()) {
#if defined _MSC_VER
// HP implementation of STL requires doing it this way...
it = zombies.erase (it);
#else
zombies.erase (it);
#endif
empty_slots.push_back (tid);
slots [tid] = NULL;
}
else
++it;
}
}
...@@ -64,11 +64,9 @@ namespace zmq ...@@ -64,11 +64,9 @@ namespace zmq
// after the last one is closed. // after the last one is closed.
int terminate (); int terminate ();
// Create a socket. // Create and destroy a socket.
class socket_base_t *create_socket (int type_); class socket_base_t *create_socket (int type_);
void destroy_socket (class socket_base_t *socket_);
// Make socket a zombie.
void zombify_socket (socket_base_t *socket_);
// Send command to the destination thread. // Send command to the destination thread.
void send_command (uint32_t tid_, const command_t &command_); void send_command (uint32_t tid_, const command_t &command_);
...@@ -78,6 +76,9 @@ namespace zmq ...@@ -78,6 +76,9 @@ namespace zmq
// Returns NULL is no I/O thread is available. // Returns NULL is no I/O thread is available.
class io_thread_t *choose_io_thread (uint64_t affinity_); class io_thread_t *choose_io_thread (uint64_t affinity_);
// Returns reaper thread object.
class object_t *get_reaper ();
// Management of inproc endpoints. // Management of inproc endpoints.
int register_endpoint (const char *addr_, endpoint_t &endpoint_); int register_endpoint (const char *addr_, endpoint_t &endpoint_);
void unregister_endpoints (class socket_base_t *socket_); void unregister_endpoints (class socket_base_t *socket_);
...@@ -86,41 +87,36 @@ namespace zmq ...@@ -86,41 +87,36 @@ namespace zmq
// Logging. // Logging.
void log (zmq_msg_t *msg_); void log (zmq_msg_t *msg_);
enum {
term_tid = 0,
reaper_tid = 1
};
private: private:
~ctx_t (); ~ctx_t ();
// Sockets belonging to this context. // Sockets belonging to this context. We need the list so that
// we can notify the sockets when zmq_term() is called. The sockets
// will return ETERM then.
typedef array_t <socket_base_t> sockets_t; typedef array_t <socket_base_t> sockets_t;
sockets_t sockets; sockets_t sockets;
// List of sockets that were already closed but not yet deallocated.
// These sockets still have some pipes and I/O objects attached.
typedef std::vector <socket_base_t*> zombies_t;
zombies_t zombies;
// List of unused thread slots. // List of unused thread slots.
typedef std::vector <uint32_t> emtpy_slots_t; typedef std::vector <uint32_t> emtpy_slots_t;
emtpy_slots_t empty_slots; emtpy_slots_t empty_slots;
// If true, shutdown thread wants to be informed when there are no // If true, zmq_term was already called.
// more open sockets. Do so by posting no_sockets_sync semaphore. bool terminating;
// Note that this variable is synchronised by slot_sync mutex.
bool no_sockets_notify;
// Object used by zmq_term to wait while all the sockets are closed
// by different application threads.
semaphore_t no_sockets_sync;
// Synchronisation of accesses to global slot-related data: // Synchronisation of accesses to global slot-related data:
// sockets, zombies, empty_slots, terminated. It also synchronises // sockets, empty_slots, terminating. It also synchronises
// access to zombie sockets as such (as oposed to slots) and provides // access to zombie sockets as such (as oposed to slots) and provides
// a memory barrier to ensure that all CPU cores see the same data. // a memory barrier to ensure that all CPU cores see the same data.
mutex_t slot_sync; mutex_t slot_sync;
// This function attempts to deallocate as many zombie sockets as // The reaper thread.
// possible. It must be called within a slot_sync critical section. class reaper_t *reaper;
void dezombify ();
// I/O threads. // I/O threads.
typedef std::vector <class io_thread_t*> io_threads_t; typedef std::vector <class io_thread_t*> io_threads_t;
...@@ -130,6 +126,9 @@ namespace zmq ...@@ -130,6 +126,9 @@ namespace zmq
uint32_t slot_count; uint32_t slot_count;
mailbox_t **slots; mailbox_t **slots;
// Mailbox for zmq_term thread.
mailbox_t term_mailbox;
// List of inproc endpoints within this context. // List of inproc endpoints within this context.
typedef std::map <std::string, endpoint_t> endpoints_t; typedef std::map <std::string, endpoint_t> endpoints_t;
endpoints_t endpoints; endpoints_t endpoints;
......
...@@ -75,7 +75,7 @@ void zmq::io_thread_t::in_event () ...@@ -75,7 +75,7 @@ void zmq::io_thread_t::in_event ()
if (rc != 0 && errno == EINTR) if (rc != 0 && errno == EINTR)
continue; continue;
if (rc != 0 && errno == EAGAIN) if (rc != 0 && errno == EAGAIN)
break; break;
errno_assert (rc == 0); errno_assert (rc == 0);
// Process the command. // Process the command.
......
...@@ -77,6 +77,9 @@ namespace zmq ...@@ -77,6 +77,9 @@ namespace zmq
// I/O multiplexing is performed using a poller object. // I/O multiplexing is performed using a poller object.
poller_t *poller; poller_t *poller;
io_thread_t (const io_thread_t&);
const io_thread_t &operator = (const io_thread_t&);
}; };
} }
......
...@@ -114,6 +114,10 @@ void zmq::object_t::process_command (command_t &cmd_) ...@@ -114,6 +114,10 @@ void zmq::object_t::process_command (command_t &cmd_)
process_term_ack (); process_term_ack ();
break; break;
case command_t::reap:
process_reap (cmd_.args.reap.socket);
break;
default: default:
zmq_assert (false); zmq_assert (false);
} }
...@@ -138,6 +142,11 @@ zmq::endpoint_t zmq::object_t::find_endpoint (const char *addr_) ...@@ -138,6 +142,11 @@ zmq::endpoint_t zmq::object_t::find_endpoint (const char *addr_)
return ctx->find_endpoint (addr_); return ctx->find_endpoint (addr_);
} }
void zmq::object_t::destroy_socket (socket_base_t *socket_)
{
ctx->destroy_socket (socket_);
}
void zmq::object_t::log (zmq_msg_t *msg_) void zmq::object_t::log (zmq_msg_t *msg_)
{ {
ctx->log (msg_); ctx->log (msg_);
...@@ -148,11 +157,6 @@ zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t affinity_) ...@@ -148,11 +157,6 @@ zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t affinity_)
return ctx->choose_io_thread (affinity_); return ctx->choose_io_thread (affinity_);
} }
void zmq::object_t::zombify_socket (socket_base_t *socket_)
{
ctx->zombify_socket (socket_);
}
void zmq::object_t::send_stop () void zmq::object_t::send_stop ()
{ {
// 'stop' command goes always from administrative thread to // 'stop' command goes always from administrative thread to
...@@ -336,6 +340,29 @@ void zmq::object_t::send_term_ack (own_t *destination_) ...@@ -336,6 +340,29 @@ void zmq::object_t::send_term_ack (own_t *destination_)
send_command (cmd); send_command (cmd);
} }
void zmq::object_t::send_reap (class socket_base_t *socket_)
{
command_t cmd;
#if defined ZMQ_MAKE_VALGRIND_HAPPY
memset (&cmd, 0, sizeof (cmd));
#endif
cmd.destination = ctx->get_reaper ();
cmd.type = command_t::reap;
cmd.args.reap.socket = socket_;
send_command (cmd);
}
void zmq::object_t::send_done ()
{
command_t cmd;
#if defined ZMQ_MAKE_VALGRIND_HAPPY
memset (&cmd, 0, sizeof (cmd));
#endif
cmd.destination = NULL;
cmd.type = command_t::done;
ctx->send_command (ctx_t::term_tid, cmd);
}
void zmq::object_t::process_stop () void zmq::object_t::process_stop ()
{ {
zmq_assert (false); zmq_assert (false);
...@@ -398,6 +425,11 @@ void zmq::object_t::process_term_ack () ...@@ -398,6 +425,11 @@ void zmq::object_t::process_term_ack ()
zmq_assert (false); zmq_assert (false);
} }
void zmq::object_t::process_reap (class socket_base_t *socket_)
{
zmq_assert (false);
}
void zmq::object_t::process_seqnum () void zmq::object_t::process_seqnum ()
{ {
zmq_assert (false); zmq_assert (false);
......
...@@ -49,6 +49,7 @@ namespace zmq ...@@ -49,6 +49,7 @@ namespace zmq
int register_endpoint (const char *addr_, struct endpoint_t &endpoint_); int register_endpoint (const char *addr_, struct endpoint_t &endpoint_);
void unregister_endpoints (class socket_base_t *socket_); void unregister_endpoints (class socket_base_t *socket_);
struct endpoint_t find_endpoint (const char *addr_); struct endpoint_t find_endpoint (const char *addr_);
void destroy_socket (class socket_base_t *socket_);
// Logs an message. // Logs an message.
void log (zmq_msg_t *msg_); void log (zmq_msg_t *msg_);
...@@ -56,10 +57,6 @@ namespace zmq ...@@ -56,10 +57,6 @@ namespace zmq
// Chooses least loaded I/O thread. // Chooses least loaded I/O thread.
class io_thread_t *choose_io_thread (uint64_t affinity_); class io_thread_t *choose_io_thread (uint64_t affinity_);
// Zombify particular socket. In other words, pass the ownership to
// the context.
void zombify_socket (class socket_base_t *socket_);
// Derived object can use these functions to send commands // Derived object can use these functions to send commands
// to other objects. // to other objects.
void send_stop (); void send_stop ();
...@@ -82,6 +79,8 @@ namespace zmq ...@@ -82,6 +79,8 @@ namespace zmq
class own_t *object_); class own_t *object_);
void send_term (class own_t *destination_, int linger_); void send_term (class own_t *destination_, int linger_);
void send_term_ack (class own_t *destination_); void send_term_ack (class own_t *destination_);
void send_reap (class socket_base_t *socket_);
void send_done ();
// These handlers can be overloaded by the derived objects. They are // These handlers can be overloaded by the derived objects. They are
// called when command arrives from another thread. // called when command arrives from another thread.
...@@ -99,6 +98,7 @@ namespace zmq ...@@ -99,6 +98,7 @@ namespace zmq
virtual void process_term_req (class own_t *object_); virtual void process_term_req (class own_t *object_);
virtual void process_term (int linger_); virtual void process_term (int linger_);
virtual void process_term_ack (); virtual void process_term_ack ();
virtual void process_reap (class socket_base_t *socket_);
// Special handler called after a command that requires a seqnum // Special handler called after a command that requires a seqnum
// was processed. The implementation should catch up with its counter // was processed. The implementation should catch up with its counter
......
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "reaper.hpp"
#include "socket_base.hpp"
#include "err.hpp"
zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) :
object_t (ctx_, tid_),
terminating (false),
has_timer (false)
{
poller = new (std::nothrow) poller_t;
zmq_assert (poller);
mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
poller->set_pollin (mailbox_handle);
}
zmq::reaper_t::~reaper_t ()
{
delete poller;
}
zmq::mailbox_t *zmq::reaper_t::get_mailbox ()
{
return &mailbox;
}
void zmq::reaper_t::start ()
{
// Start the thread.
poller->start ();
}
void zmq::reaper_t::stop ()
{
send_stop ();
}
void zmq::reaper_t::in_event ()
{
while (true) {
// Get the next command. If there is none, exit.
command_t cmd;
int rc = mailbox.recv (&cmd, false);
if (rc != 0 && errno == EINTR)
continue;
if (rc != 0 && errno == EAGAIN)
break;
errno_assert (rc == 0);
// Process the command.
cmd.destination->process_command (cmd);
}
}
void zmq::reaper_t::out_event ()
{
// We are never polling for POLLOUT here. This function is never called.
zmq_assert (false);
}
void zmq::reaper_t::timer_event (int id_)
{
zmq_assert (has_timer);
has_timer = false;
reap ();
}
void zmq::reaper_t::reap ()
{
// Try to reap each socket in the list.
for (sockets_t::iterator it = sockets.begin (); it != sockets.end ();) {
if ((*it)->reap ()) {
// MSVC version of STL requires this to be done a spacial way...
#if defined _MSC_VER
it = sockets.erase (it);
#else
sockets.erase (it);
#endif
}
else
++it;
}
// If there are still sockets to reap, wait a while, then try again.
if (!sockets.empty () && !has_timer) {
poller->add_timer (1 , this, 0);
has_timer = true;
return;
}
// No more sockets and the context is already shutting down.
if (terminating) {
send_done ();
poller->rm_fd (mailbox_handle);
poller->stop ();
return;
}
}
void zmq::reaper_t::process_stop ()
{
terminating = true;
if (sockets.empty ()) {
send_done ();
poller->rm_fd (mailbox_handle);
poller->stop ();
}
}
void zmq::reaper_t::process_reap (socket_base_t *socket_)
{
// Start termination of associated I/O object hierarchy.
socket_->terminate ();
sockets.push_back (socket_);
reap ();
}
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_REAPER_HPP_INCLUDED__
#define __ZMQ_REAPER_HPP_INCLUDED__
#include "object.hpp"
#include "mailbox.hpp"
#include "poller.hpp"
#include "i_poll_events.hpp"
namespace zmq
{
class reaper_t : public object_t, public i_poll_events
{
public:
reaper_t (class ctx_t *ctx_, uint32_t tid_);
~reaper_t ();
mailbox_t *get_mailbox ();
void start ();
void stop ();
// i_poll_events implementation.
void in_event ();
void out_event ();
void timer_event (int id_);
private:
void reap ();
// Command handlers.
void process_stop ();
void process_reap (class socket_base_t *socket_);
// List of all sockets being terminated.
typedef std::vector <class socket_base_t*> sockets_t;
sockets_t sockets;
// Reaper thread accesses incoming commands via this mailbox.
mailbox_t mailbox;
// Handle associated with mailbox' file descriptor.
poller_t::handle_t mailbox_handle;
// I/O multiplexing is performed using a poller object.
poller_t *poller;
// If true, we were already asked to terminate.
bool terminating;
// If true, timer till next reaping is running.
bool has_timer;
reaper_t (const reaper_t&);
const reaper_t &operator = (const reaper_t&);
};
}
#endif
...@@ -570,13 +570,10 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) ...@@ -570,13 +570,10 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
int zmq::socket_base_t::close () int zmq::socket_base_t::close ()
{ {
// Start termination of associated I/O object hierarchy. // Transfer the ownership of the socket from this application thread
terminate (); // to the reaper thread which will take care of the rest of shutdown
// process.
// Ask context to zombify this socket. In other words, transfer send_reap (this);
// the ownership of the socket from this application thread
// to the context which will take care of the rest of shutdown process.
zombify_socket (this);
return 0; return 0;
} }
...@@ -627,7 +624,7 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &name_) ...@@ -627,7 +624,7 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &name_)
return session; return session;
} }
bool zmq::socket_base_t::dezombify () bool zmq::socket_base_t::reap ()
{ {
// Process any commands from other threads/sockets that may be available // Process any commands from other threads/sockets that may be available
// at the moment. Ultimately, socket will be destroyed. // at the moment. Ultimately, socket will be destroyed.
...@@ -635,6 +632,11 @@ bool zmq::socket_base_t::dezombify () ...@@ -635,6 +632,11 @@ bool zmq::socket_base_t::dezombify ()
// If the object was already marked as destroyed, finish the deallocation. // If the object was already marked as destroyed, finish the deallocation.
if (destroyed) { if (destroyed) {
// Remove the socket from the context.
destroy_socket (this);
// Deallocate.
own_t::process_destroy (); own_t::process_destroy ();
return true; return true;
} }
......
...@@ -42,6 +42,8 @@ namespace zmq ...@@ -42,6 +42,8 @@ namespace zmq
public own_t, public own_t,
public array_item_t public array_item_t
{ {
friend class reaper_t;
public: public:
// Create a socket of a specified type. // Create a socket of a specified type.
...@@ -82,9 +84,9 @@ namespace zmq ...@@ -82,9 +84,9 @@ namespace zmq
void activated (class writer_t *pipe_); void activated (class writer_t *pipe_);
void terminated (class writer_t *pipe_); void terminated (class writer_t *pipe_);
// This function should be called only on zombie sockets. It tries // This function should be called only on sockets that are already
// to deallocate the zombie. Returns true is object is destroyed. // closed -- from the reaper thread. It tries to finalise the socket.
bool dezombify (); bool reap ();
protected: protected:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment