Commit 5b82b1ba authored by Martin Sustrik's avatar Martin Sustrik

Reaper thread waits for commands rather them retrieving them periodically

Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent 80ac398b
...@@ -47,6 +47,7 @@ namespace zmq ...@@ -47,6 +47,7 @@ namespace zmq
term, term,
term_ack, term_ack,
reap, reap,
reaped,
done done
} type; } type;
...@@ -125,6 +126,10 @@ namespace zmq ...@@ -125,6 +126,10 @@ namespace zmq
class socket_base_t *socket; class socket_base_t *socket;
} reap; } reap;
// Closed socket notifies the reaper that it's already deallocated.
struct {
} reaped;
// Sent by reaper thread to the term thread when all the sockets // Sent by reaper thread to the term thread when all the sockets
// are successfully deallocated. // are successfully deallocated.
struct { struct {
......
...@@ -118,6 +118,10 @@ void zmq::object_t::process_command (command_t &cmd_) ...@@ -118,6 +118,10 @@ void zmq::object_t::process_command (command_t &cmd_)
process_reap (cmd_.args.reap.socket); process_reap (cmd_.args.reap.socket);
break; break;
case command_t::reaped:
process_reaped ();
break;
default: default:
zmq_assert (false); zmq_assert (false);
} }
...@@ -352,6 +356,17 @@ void zmq::object_t::send_reap (class socket_base_t *socket_) ...@@ -352,6 +356,17 @@ void zmq::object_t::send_reap (class socket_base_t *socket_)
send_command (cmd); send_command (cmd);
} }
void zmq::object_t::send_reaped ()
{
command_t cmd;
#if defined ZMQ_MAKE_VALGRIND_HAPPY
memset (&cmd, 0, sizeof (cmd));
#endif
cmd.destination = ctx->get_reaper ();
cmd.type = command_t::reaped;
send_command (cmd);
}
void zmq::object_t::send_done () void zmq::object_t::send_done ()
{ {
command_t cmd; command_t cmd;
...@@ -430,6 +445,11 @@ void zmq::object_t::process_reap (class socket_base_t *socket_) ...@@ -430,6 +445,11 @@ void zmq::object_t::process_reap (class socket_base_t *socket_)
zmq_assert (false); zmq_assert (false);
} }
void zmq::object_t::process_reaped ()
{
zmq_assert (false);
}
void zmq::object_t::process_seqnum () void zmq::object_t::process_seqnum ()
{ {
zmq_assert (false); zmq_assert (false);
......
...@@ -80,6 +80,7 @@ namespace zmq ...@@ -80,6 +80,7 @@ namespace zmq
void send_term (class own_t *destination_, int linger_); void send_term (class own_t *destination_, int linger_);
void send_term_ack (class own_t *destination_); void send_term_ack (class own_t *destination_);
void send_reap (class socket_base_t *socket_); void send_reap (class socket_base_t *socket_);
void send_reaped ();
void send_done (); void send_done ();
// These handlers can be overloaded by the derived objects. They are // These handlers can be overloaded by the derived objects. They are
...@@ -99,6 +100,7 @@ namespace zmq ...@@ -99,6 +100,7 @@ namespace zmq
virtual void process_term (int linger_); virtual void process_term (int linger_);
virtual void process_term_ack (); virtual void process_term_ack ();
virtual void process_reap (class socket_base_t *socket_); virtual void process_reap (class socket_base_t *socket_);
virtual void process_reaped ();
// Special handler called after a command that requires a seqnum // Special handler called after a command that requires a seqnum
// was processed. The implementation should catch up with its counter // was processed. The implementation should catch up with its counter
......
...@@ -23,8 +23,8 @@ ...@@ -23,8 +23,8 @@
zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) : zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) :
object_t (ctx_, tid_), object_t (ctx_, tid_),
terminating (false), sockets (0),
has_timer (false) terminating (false)
{ {
poller = new (std::nothrow) poller_t; poller = new (std::nothrow) poller_t;
zmq_assert (poller); zmq_assert (poller);
...@@ -74,55 +74,20 @@ void zmq::reaper_t::in_event () ...@@ -74,55 +74,20 @@ void zmq::reaper_t::in_event ()
void zmq::reaper_t::out_event () void zmq::reaper_t::out_event ()
{ {
// We are never polling for POLLOUT here. This function is never called.
zmq_assert (false); zmq_assert (false);
} }
void zmq::reaper_t::timer_event (int id_) void zmq::reaper_t::timer_event (int id_)
{ {
zmq_assert (has_timer); zmq_assert (false);
has_timer = false;
reap ();
}
void zmq::reaper_t::reap ()
{
// Try to reap each socket in the list.
for (sockets_t::iterator it = sockets.begin (); it != sockets.end ();) {
if ((*it)->reap ()) {
// MSVC version of STL requires this to be done a spacial way...
#if defined _MSC_VER
it = sockets.erase (it);
#else
sockets.erase (it);
#endif
}
else
++it;
}
// If there are still sockets to reap, wait a while, then try again.
if (!sockets.empty () && !has_timer) {
poller->add_timer (1 , this, 0);
has_timer = true;
return;
}
// No more sockets and the context is already shutting down.
if (terminating) {
send_done ();
poller->rm_fd (mailbox_handle);
poller->stop ();
return;
}
} }
void zmq::reaper_t::process_stop () void zmq::reaper_t::process_stop ()
{ {
terminating = true; terminating = true;
if (sockets.empty ()) { // If there are no sockets beig reaped finish immediately.
if (!sockets) {
send_done (); send_done ();
poller->rm_fd (mailbox_handle); poller->rm_fd (mailbox_handle);
poller->stop (); poller->stop ();
...@@ -133,7 +98,22 @@ void zmq::reaper_t::process_reap (socket_base_t *socket_) ...@@ -133,7 +98,22 @@ void zmq::reaper_t::process_reap (socket_base_t *socket_)
{ {
// Start termination of associated I/O object hierarchy. // Start termination of associated I/O object hierarchy.
socket_->terminate (); socket_->terminate ();
sockets.push_back (socket_);
reap (); // Add the socket to the poller.
socket_->start_reaping (poller);
++sockets;
} }
void zmq::reaper_t::process_reaped ()
{
--sockets;
// If reaped was already asked to terminate and there are no more sockets,
// finish immediately.
if (!sockets && terminating) {
send_done ();
poller->rm_fd (mailbox_handle);
poller->stop ();
}
}
...@@ -47,15 +47,10 @@ namespace zmq ...@@ -47,15 +47,10 @@ namespace zmq
private: private:
void reap ();
// Command handlers. // Command handlers.
void process_stop (); void process_stop ();
void process_reap (class socket_base_t *socket_); void process_reap (class socket_base_t *socket_);
void process_reaped ();
// List of all sockets being terminated.
typedef std::vector <class socket_base_t*> sockets_t;
sockets_t sockets;
// Reaper thread accesses incoming commands via this mailbox. // Reaper thread accesses incoming commands via this mailbox.
mailbox_t mailbox; mailbox_t mailbox;
...@@ -66,12 +61,12 @@ namespace zmq ...@@ -66,12 +61,12 @@ namespace zmq
// I/O multiplexing is performed using a poller object. // I/O multiplexing is performed using a poller object.
poller_t *poller; poller_t *poller;
// Number of sockets being reaped at the moment.
int sockets;
// If true, we were already asked to terminate. // If true, we were already asked to terminate.
bool terminating; bool terminating;
// If true, timer till next reaping is running.
bool has_timer;
reaper_t (const reaper_t&); reaper_t (const reaper_t&);
const reaper_t &operator = (const reaper_t&); const reaper_t &operator = (const reaper_t&);
}; };
......
...@@ -624,24 +624,11 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &name_) ...@@ -624,24 +624,11 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &name_)
return session; return session;
} }
bool zmq::socket_base_t::reap () void zmq::socket_base_t::start_reaping (poller_t *poller_)
{ {
// Process any commands from other threads/sockets that may be available poller = poller_;
// at the moment. Ultimately, socket will be destroyed. handle = poller->add_fd (mailbox.get_fd (), this);
process_commands (false, false); poller->set_pollin (handle);
// If the object was already marked as destroyed, finish the deallocation.
if (destroyed) {
// Remove the socket from the context.
destroy_socket (this);
// Deallocate.
own_t::process_destroy ();
return true;
}
return false;
} }
int zmq::socket_base_t::process_commands (bool block_, bool throttle_) int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
...@@ -762,3 +749,35 @@ int zmq::socket_base_t::xrecv (zmq_msg_t *msg_, int options_) ...@@ -762,3 +749,35 @@ int zmq::socket_base_t::xrecv (zmq_msg_t *msg_, int options_)
return -1; return -1;
} }
void zmq::socket_base_t::in_event ()
{
// Process any commands from other threads/sockets that may be available
// at the moment. Ultimately, socket will be destroyed.
process_commands (false, false);
// If the object was already marked as destroyed, finish the deallocation.
if (destroyed) {
// Remove the socket from the reaper's poller.
poller->rm_fd (handle);
// Remove the socket from the context.
destroy_socket (this);
// Notify the reaper about the fact.
send_reaped ();
// Deallocate.
own_t::process_destroy ();
}
}
void zmq::socket_base_t::out_event ()
{
zmq_assert (false);
}
void zmq::socket_base_t::timer_event (int id_)
{
zmq_assert (false);
}
...@@ -29,7 +29,9 @@ ...@@ -29,7 +29,9 @@
#include "array.hpp" #include "array.hpp"
#include "mutex.hpp" #include "mutex.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "poller.hpp"
#include "atomic_counter.hpp" #include "atomic_counter.hpp"
#include "i_poll_events.hpp"
#include "mailbox.hpp" #include "mailbox.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "blob.hpp" #include "blob.hpp"
...@@ -40,7 +42,8 @@ namespace zmq ...@@ -40,7 +42,8 @@ namespace zmq
class socket_base_t : class socket_base_t :
public own_t, public own_t,
public array_item_t public array_item_t,
public i_poll_events
{ {
friend class reaper_t; friend class reaper_t;
...@@ -84,9 +87,15 @@ namespace zmq ...@@ -84,9 +87,15 @@ namespace zmq
void activated (class writer_t *pipe_); void activated (class writer_t *pipe_);
void terminated (class writer_t *pipe_); void terminated (class writer_t *pipe_);
// This function should be called only on sockets that are already // Using this function reaper thread ask the socket to regiter with
// closed -- from the reaper thread. It tries to finalise the socket. // its poller.
bool reap (); void start_reaping (poller_t *poller_);
// i_poll_events implementation. This interface is used when socket
// is handled by the poller in the reaper thread.
void in_event ();
void out_event ();
void timer_event (int id_);
protected: protected:
...@@ -157,6 +166,10 @@ namespace zmq ...@@ -157,6 +166,10 @@ namespace zmq
// Socket's mailbox object. // Socket's mailbox object.
mailbox_t mailbox; mailbox_t mailbox;
// Reaper's poller and handle of this socket within it.
poller_t *poller;
poller_t::handle_t handle;
// Timestamp of when commands were processed the last time. // Timestamp of when commands were processed the last time.
uint64_t last_tsc; uint64_t last_tsc;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment