Commit a801b6d8 authored by Martin Sustrik's avatar Martin Sustrik

couple of bugs in shutdown mechanism fixed

parent 131f2e30
......@@ -37,11 +37,11 @@ namespace zmq
stop,
plug,
own,
attach,
bind,
term_req,
term,
term_ack
} type;
union {
......@@ -57,9 +57,14 @@ namespace zmq
// Sent to socket to let it know about the newly created object.
struct {
class object_t *object;
class owned_t *object;
} own;
// Attach the engine to the session.
struct {
class zmq_engine_t *engine;
} attach;
// Sent between objects to establish pipe(s) between them.
struct {
} bind;
......@@ -67,7 +72,7 @@ namespace zmq
// Sent by I/O object ot the socket to request the shutdown of
// the I/O object.
struct {
class object_t *object;
class owned_t *object;
} term_req;
// Sent by socket to I/O object to start its shutdown.
......
......@@ -22,6 +22,10 @@
#include "err.hpp"
#include "io_thread.hpp"
#include "simple_semaphore.hpp"
#include "owned.hpp"
#include "session.hpp"
#include "socket_base.hpp"
#include "zmq_engine.hpp" // TODO: remove this line
zmq::object_t::object_t (dispatcher_t *dispatcher_, int thread_slot_) :
dispatcher (dispatcher_),
......@@ -65,6 +69,10 @@ void zmq::object_t::process_command (command_t &cmd_)
process_own (cmd_.args.own.object);
return;
case command_t::attach:
process_attach (cmd_.args.attach.engine);
return;
case command_t::bind:
process_bind ();
return;
......@@ -101,15 +109,18 @@ void zmq::object_t::send_stop ()
dispatcher->write (thread_slot, thread_slot, cmd);
}
void zmq::object_t::send_plug (object_t *destination_)
void zmq::object_t::send_plug (owned_t *destination_)
{
// Let the object know that it cannot shut down till it gets this command.
destination_->inc_seqnum ();
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::plug;
send_command (cmd);
}
void zmq::object_t::send_own (object_t *destination_, object_t *object_)
void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_)
{
command_t cmd;
cmd.destination = destination_;
......@@ -118,6 +129,18 @@ void zmq::object_t::send_own (object_t *destination_, object_t *object_)
send_command (cmd);
}
void zmq::object_t::send_attach (session_t *destination_, zmq_engine_t *engine_)
{
// Let the object know that it cannot shut down till it gets this command.
destination_->inc_seqnum ();
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::attach;
cmd.args.attach.engine = engine_;
send_command (cmd);
}
void zmq::object_t::send_bind (object_t *destination_)
{
command_t cmd;
......@@ -126,7 +149,8 @@ void zmq::object_t::send_bind (object_t *destination_)
send_command (cmd);
}
void zmq::object_t::send_term_req (object_t *destination_, object_t *object_)
void zmq::object_t::send_term_req (socket_base_t *destination_,
owned_t *object_)
{
command_t cmd;
cmd.destination = destination_;
......@@ -135,7 +159,7 @@ void zmq::object_t::send_term_req (object_t *destination_, object_t *object_)
send_command (cmd);
}
void zmq::object_t::send_term (object_t *destination_)
void zmq::object_t::send_term (owned_t *destination_)
{
command_t cmd;
cmd.destination = destination_;
......@@ -143,7 +167,7 @@ void zmq::object_t::send_term (object_t *destination_)
send_command (cmd);
}
void zmq::object_t::send_term_ack (object_t *destination_)
void zmq::object_t::send_term_ack (socket_base_t *destination_)
{
command_t cmd;
cmd.destination = destination_;
......@@ -161,7 +185,12 @@ void zmq::object_t::process_plug ()
zmq_assert (false);
}
void zmq::object_t::process_own (object_t *object_)
void zmq::object_t::process_own (owned_t *object_)
{
zmq_assert (false);
}
void zmq::object_t::process_attach (zmq_engine_t *engine_)
{
zmq_assert (false);
}
......@@ -171,7 +200,7 @@ void zmq::object_t::process_bind ()
zmq_assert (false);
}
void zmq::object_t::process_term_req (object_t *object_)
void zmq::object_t::process_term_req (owned_t *object_)
{
zmq_assert (false);
}
......
......@@ -49,20 +49,25 @@ namespace zmq
// Derived object can use these functions to send commands
// to other objects.
void send_stop ();
void send_plug (object_t *destination_);
void send_own (object_t *destination_, object_t *object_);
void send_plug (class owned_t *destination_);
void send_own (class socket_base_t *destination_,
class owned_t *object_);
void send_attach (class session_t *destination_,
class zmq_engine_t *engine_);
void send_bind (object_t *destination_);
void send_term_req (object_t *destination_, object_t *object_);
void send_term (object_t *destination_);
void send_term_ack (object_t *destination_);
void send_term_req (class socket_base_t *destination_,
class owned_t *object_);
void send_term (class owned_t *destination_);
void send_term_ack (class socket_base_t *destination_);
// These handlers can be overloaded by the derived objects. They are
// called when command arrives from another thread.
virtual void process_stop ();
virtual void process_plug ();
virtual void process_own (object_t *object_);
virtual void process_own (class owned_t *object_);
virtual void process_attach (class zmq_engine_t *engine_);
virtual void process_bind ();
virtual void process_term_req (object_t *object_);
virtual void process_term_req (class owned_t *object_);
virtual void process_term ();
virtual void process_term_ack ();
......
......@@ -20,11 +20,12 @@
#include "owned.hpp"
#include "err.hpp"
zmq::owned_t::owned_t (object_t *parent_, object_t *owner_) :
zmq::owned_t::owned_t (object_t *parent_, socket_base_t *owner_) :
object_t (parent_),
owner (owner_),
plugged_in (false),
terminated (false)
sent_seqnum (0),
processed_seqnum (0),
shutting_down (false)
{
}
......@@ -32,21 +33,18 @@ zmq::owned_t::~owned_t ()
{
}
void zmq::owned_t::process_plug ()
void zmq::owned_t::inc_seqnum ()
{
zmq_assert (!plugged_in);
// NB: This function may be called from a different thread!
sent_seqnum.add (1);
}
// If termination of the object was already requested, destroy it and
// send the termination acknowledgement.
if (terminated) {
send_term_ack (owner);
delete this;
return;
}
void zmq::owned_t::process_plug ()
{
// Keep track of how many commands were processed so far.
processed_seqnum++;
// Notify the generic termination mechanism (io_object_t) that the object
// is already plugged in.
plugged_in = true;
finalise_command ();
}
void zmq::owned_t::term ()
......@@ -56,19 +54,20 @@ void zmq::owned_t::term ()
void zmq::owned_t::process_term ()
{
zmq_assert (!terminated);
zmq_assert (!shutting_down);
shutting_down = true;
// If termination request has occured even before the object was plugged in
// wait till plugging in happens, then acknowledge the termination.
if (!plugged_in) {
terminated = true;
return;
}
finalise_command ();
}
// Otherwise, destroy the object and acknowledge the termination
// straight away.
void zmq::owned_t::finalise_command ()
{
// If termination request was already received and there are no more
// commands to wait for, terminate the object.
if (shutting_down && processed_seqnum == sent_seqnum.get ()) {
send_term_ack (owner);
process_unplug ();
delete this;
}
}
......@@ -20,7 +20,9 @@
#ifndef __ZMQ_OWNED_HPP_INCLUDED__
#define __ZMQ_OWNED_HPP_INCLUDED__
#include "object.hpp"
#include "socket_base.hpp"
#include "atomic_counter.hpp"
#include "stdint.hpp"
namespace zmq
{
......@@ -34,7 +36,12 @@ namespace zmq
// The object will live in parent's thread, however, its lifetime
// will be managed by its owner socket.
owned_t (object_t *parent_, object_t *owner_);
owned_t (object_t *parent_, socket_base_t *owner_);
// When another owned object wants to send command to this object
// it calls this function to let it know it should not shut down
// before the command is delivered.
void inc_seqnum ();
protected:
......@@ -57,21 +64,27 @@ namespace zmq
// classes to ensure sane cleanup.
virtual void process_unplug () = 0;
// Socket owning this object. It is responsible for destroying
// it when it's being closed.
object_t *owner;
// Socket owning this object. When the socket is being closed it's
// responsible for shutting down this object.
socket_base_t *owner;
private:
// Handlers for incoming commands.
void process_term ();
// Set to true when object is plugged in.
bool plugged_in;
// Generic command handler (to be called from all command handlers
// once the processing is done).
void finalise_command ();
// Sequence number of the last command sent to this object.
atomic_counter_t sent_seqnum;
// Sequence number of the last command processed by this object.
uint64_t processed_seqnum;
// Set to true when object was terminated before it was plugged in.
// In such case destruction is delayed till 'plug' command arrives.
bool terminated;
// If true, the object is already shutting down.
bool shutting_down;
owned_t (const owned_t&);
void operator = (const owned_t&);
......
......@@ -21,7 +21,7 @@
#include "zmq_engine.hpp"
#include "err.hpp"
zmq::session_t::session_t (object_t *parent_, object_t *owner_,
zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
zmq_engine_t *engine_) :
owned_t (parent_, owner_),
engine (engine_)
......@@ -48,11 +48,14 @@ void zmq::session_t::flush ()
void zmq::session_t::process_plug ()
{
zmq_assert (engine);
engine->plug (this);
owned_t::process_plug ();
}
void zmq::session_t::process_unplug ()
{
zmq_assert (engine);
engine->unplug ();
delete engine;
}
......@@ -30,7 +30,7 @@ namespace zmq
{
public:
session_t (object_t *parent_, object_t *owner_,
session_t (object_t *parent_, socket_base_t *owner_,
class zmq_engine_t *engine_);
private:
......
......@@ -27,7 +27,9 @@
#include "zmq_listener.hpp"
#include "zmq_connecter.hpp"
#include "io_thread.hpp"
#include "session.hpp"
#include "config.hpp"
#include "owned.hpp"
zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
object_t (parent_),
......@@ -60,6 +62,9 @@ zmq::socket_base_t::~socket_base_t ()
while (pending_term_acks)
app_thread->process_commands (true);
}
// Check whether there are no session leaks.
zmq_assert (sessions.empty ());
}
int zmq::socket_base_t::setsockopt (int option_, void *optval_,
......@@ -169,12 +174,43 @@ int zmq::socket_base_t::close ()
return 0;
}
void zmq::socket_base_t::process_own (object_t *object_)
void zmq::socket_base_t::register_session (const char *name_,
session_t *session_)
{
sessions_sync.lock ();
bool inserted = sessions.insert (std::make_pair (name_, session_)).second;
zmq_assert (inserted);
sessions_sync.unlock ();
}
void zmq::socket_base_t::unregister_session (const char *name_)
{
sessions_sync.lock ();
sessions_t::iterator it = sessions.find (name_);
zmq_assert (it != sessions.end ());
sessions.erase (it);
sessions_sync.unlock ();
}
zmq::session_t *zmq::socket_base_t::get_session (const char *name_)
{
sessions_sync.lock ();
sessions_t::iterator it = sessions.find (name_);
session_t *session = NULL;
if (it != sessions.end ()) {
session = it->second;
session->inc_seqnum ();
}
sessions_sync.unlock ();
return session;
}
void zmq::socket_base_t::process_own (owned_t *object_)
{
io_objects.insert (object_);
}
void zmq::socket_base_t::process_term_req (object_t *object_)
void zmq::socket_base_t::process_term_req (owned_t *object_)
{
// If I/O object is well and alive ask it to terminate.
io_objects_t::iterator it = std::find (io_objects.begin (),
......
......@@ -21,9 +21,11 @@
#define __ZMQ_SOCKET_BASE_HPP_INCLUDED__
#include <set>
#include <map>
#include <string>
#include "object.hpp"
#include "mutex.hpp"
#include "options.hpp"
#include "stdint.hpp"
......@@ -46,16 +48,26 @@ namespace zmq
virtual int recv (struct zmq_msg *msg_, int flags_);
virtual int close ();
// Functions that owned objects use to manipulate socket's list
// of existing sessions.
// Note that this functionality cannot be implemented via inter-thread
// commands as it is unacceptable to wait for the completion of the
// action till user application yields control of the application
// thread to 0MQ.
void register_session (const char *name_, class session_t *session_);
void unregister_session (const char *name_);
class session_t *get_session (const char *name_);
private:
// Handlers for incoming commands.
void process_own (object_t *object_);
void process_term_req (object_t *object_);
void process_own (class owned_t *object_);
void process_term_req (class owned_t *object_);
void process_term_ack ();
// List of all I/O objects owned by this socket. The socket is
// responsible for deallocating them before it quits.
typedef std::set <object_t*> io_objects_t;
typedef std::set <class owned_t*> io_objects_t;
io_objects_t io_objects;
// Number of I/O objects that were already asked to terminate
......@@ -68,6 +80,14 @@ namespace zmq
// Socket options.
options_t options;
// List of existing sessions. This list is never referenced from within
// the socket, instead it is used by I/O objects owned by the session.
// As those objects can live in different threads, the access is
// synchronised using 'sessions_sync' mutex.
typedef std::map <std::string, session_t*> sessions_t;
sessions_t sessions;
mutex_t sessions_sync;
socket_base_t (const socket_base_t&);
void operator = (const socket_base_t&);
};
......
......@@ -22,8 +22,8 @@
#include "io_thread.hpp"
#include "err.hpp"
zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_, object_t *owner_,
const options_t &options_) :
zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_,
socket_base_t *owner_, const options_t &options_) :
owned_t (parent_, owner_),
io_object_t (parent_),
handle_valid (false),
......
......@@ -33,7 +33,7 @@ namespace zmq
{
public:
zmq_connecter_t (class io_thread_t *parent_, object_t *owner_,
zmq_connecter_t (class io_thread_t *parent_, socket_base_t *owner_,
const options_t &options_);
// Set IP address to connect to.
......
......@@ -73,7 +73,6 @@ void zmq::zmq_engine_t::in_event ()
// Read as much data as possible to the read buffer.
insize = tcp_socket.read (inbuf, in_batch_size);
printf ("%d bytes read\n", (int) insize);
inpos = 0;
// Check whether the peer has closed the connection.
......@@ -132,5 +131,5 @@ void zmq::zmq_engine_t::out_event ()
void zmq::zmq_engine_t::error ()
{
zmq_assert (false);
// zmq_assert (false);
}
......@@ -22,8 +22,8 @@
#include "session.hpp"
#include "err.hpp"
zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, object_t *owner_, fd_t fd_,
bool connected_, const options_t &options_) :
zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_,
fd_t fd_, bool connected_, const options_t &options_) :
owned_t (parent_, owner_),
connected (connected_),
options (options_)
......@@ -81,12 +81,14 @@ void zmq::zmq_init_t::flush ()
void zmq::zmq_init_t::process_plug ()
{
zmq_assert (engine);
engine->plug (this);
owned_t::process_plug ();
}
void zmq::zmq_init_t::process_unplug ()
{
if (engine)
engine->unplug ();
}
......
......@@ -44,7 +44,7 @@ namespace zmq
// Set 'connected' to true if the connection was created by 'connect'
// function. If it was accepted from a listening socket, set it to
// false.
zmq_init_t (class io_thread_t *parent_, object_t *owner_, fd_t fd_,
zmq_init_t (class io_thread_t *parent_, socket_base_t *owner_, fd_t fd_,
bool connected_, const options_t &options);
~zmq_init_t ();
......
......@@ -22,8 +22,8 @@
#include "io_thread.hpp"
#include "err.hpp"
zmq::zmq_listener_t::zmq_listener_t (io_thread_t *parent_, object_t *owner_,
const options_t &options_) :
zmq::zmq_listener_t::zmq_listener_t (io_thread_t *parent_,
socket_base_t *owner_, const options_t &options_) :
owned_t (parent_, owner_),
io_object_t (parent_),
options (options_)
......
......@@ -33,7 +33,7 @@ namespace zmq
{
public:
zmq_listener_t (class io_thread_t *parent_, object_t *owner_,
zmq_listener_t (class io_thread_t *parent_, socket_base_t *owner_,
const options_t &options_);
// Set IP address to listen on.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment