Commit 7146ef85 authored by Martin Sustrik's avatar Martin Sustrik

seqnum mechanism automated

parent cb84580b
...@@ -252,6 +252,8 @@ zmq::socket_base_t *zmq::dispatcher_t::find_endpoint (const char *addr_) ...@@ -252,6 +252,8 @@ zmq::socket_base_t *zmq::dispatcher_t::find_endpoint (const char *addr_)
// Increment the command sequence number of the peer so that it won't // Increment the command sequence number of the peer so that it won't
// get deallocated until "bind" command is issued by the caller. // get deallocated until "bind" command is issued by the caller.
// The subsequent 'bind' has to be called with inc_seqnum parameter
// set to false, so that the seqnum isn't incremented twice.
endpoint->inc_seqnum (); endpoint->inc_seqnum ();
endpoints_sync.unlock (); endpoints_sync.unlock ();
......
...@@ -72,6 +72,7 @@ void zmq::object_t::process_command (command_t &cmd_) ...@@ -72,6 +72,7 @@ void zmq::object_t::process_command (command_t &cmd_)
case command_t::plug: case command_t::plug:
process_plug (); process_plug ();
process_seqnum ();
return; return;
case command_t::own: case command_t::own:
...@@ -80,10 +81,12 @@ void zmq::object_t::process_command (command_t &cmd_) ...@@ -80,10 +81,12 @@ void zmq::object_t::process_command (command_t &cmd_)
case command_t::attach: case command_t::attach:
process_attach (cmd_.args.attach.engine); process_attach (cmd_.args.attach.engine);
process_seqnum ();
return; return;
case command_t::bind: case command_t::bind:
process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe); process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe);
process_seqnum ();
return; return;
case command_t::pipe_term: case command_t::pipe_term:
...@@ -151,10 +154,10 @@ void zmq::object_t::send_stop () ...@@ -151,10 +154,10 @@ void zmq::object_t::send_stop ()
dispatcher->write (thread_slot, thread_slot, cmd); dispatcher->write (thread_slot, thread_slot, cmd);
} }
void zmq::object_t::send_plug (owned_t *destination_) void zmq::object_t::send_plug (owned_t *destination_, bool inc_seqnum_)
{ {
// Let the object know that it cannot shut down till it gets this command. if (inc_seqnum_)
destination_->inc_seqnum (); destination_->inc_seqnum ();
command_t cmd; command_t cmd;
cmd.destination = destination_; cmd.destination = destination_;
...@@ -171,10 +174,12 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_) ...@@ -171,10 +174,12 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_)
send_command (cmd); send_command (cmd);
} }
void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_) void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
bool inc_seqnum_)
{ {
// The assumption here is that command sequence number of the destination if (inc_seqnum_)
// object was already incremented in find_session function. destination_->inc_seqnum ();
command_t cmd; command_t cmd;
cmd.destination = destination_; cmd.destination = destination_;
cmd.type = command_t::attach; cmd.type = command_t::attach;
...@@ -183,8 +188,11 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_) ...@@ -183,8 +188,11 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_)
} }
void zmq::object_t::send_bind (socket_base_t *destination_, void zmq::object_t::send_bind (socket_base_t *destination_,
reader_t *in_pipe_, writer_t *out_pipe_) reader_t *in_pipe_, writer_t *out_pipe_, bool inc_seqnum_)
{ {
if (inc_seqnum_)
destination_->inc_seqnum ();
command_t cmd; command_t cmd;
cmd.destination = destination_; cmd.destination = destination_;
cmd.type = command_t::bind; cmd.type = command_t::bind;
...@@ -298,6 +306,11 @@ void zmq::object_t::process_term_ack () ...@@ -298,6 +306,11 @@ void zmq::object_t::process_term_ack ()
zmq_assert (false); zmq_assert (false);
} }
void zmq::object_t::process_seqnum ()
{
zmq_assert (false);
}
void zmq::object_t::send_command (command_t &cmd_) void zmq::object_t::send_command (command_t &cmd_)
{ {
int destination_thread_slot = cmd_.destination->get_thread_slot (); int destination_thread_slot = cmd_.destination->get_thread_slot ();
......
...@@ -63,13 +63,14 @@ namespace zmq ...@@ -63,13 +63,14 @@ namespace zmq
// Derived object can use these functions to send commands // Derived object can use these functions to send commands
// to other objects. // to other objects.
void send_stop (); void send_stop ();
void send_plug (class owned_t *destination_); void send_plug (class owned_t *destination_, bool inc_seqnum_ = true);
void send_own (class socket_base_t *destination_, void send_own (class socket_base_t *destination_,
class owned_t *object_); class owned_t *object_);
void send_attach (class session_t *destination_, void send_attach (class session_t *destination_,
struct i_engine *engine_); struct i_engine *engine_, bool inc_seqnum_ = true);
void send_bind (class socket_base_t *destination_, void send_bind (class socket_base_t *destination_,
class reader_t *in_pipe_, class writer_t *out_pipe_); class reader_t *in_pipe_, class writer_t *out_pipe_,
bool inc_seqnum_ = true);
void send_revive (class object_t *destination_); void send_revive (class object_t *destination_);
void send_pipe_term (class writer_t *destination_); void send_pipe_term (class writer_t *destination_);
void send_pipe_term_ack (class reader_t *destination_); void send_pipe_term_ack (class reader_t *destination_);
...@@ -93,6 +94,11 @@ namespace zmq ...@@ -93,6 +94,11 @@ namespace zmq
virtual void process_term (); virtual void process_term ();
virtual void process_term_ack (); virtual void process_term_ack ();
// Special handler called after a command that requires a seqnum
// was processed. The implementation should catch up with its counter
// of processed commands here.
virtual void process_seqnum ();
// Pointer to the root of the infrastructure. // Pointer to the root of the infrastructure.
class dispatcher_t *dispatcher; class dispatcher_t *dispatcher;
......
...@@ -39,22 +39,6 @@ void zmq::owned_t::inc_seqnum () ...@@ -39,22 +39,6 @@ void zmq::owned_t::inc_seqnum ()
sent_seqnum.add (1); sent_seqnum.add (1);
} }
void zmq::owned_t::process_plug ()
{
// Keep track of how many commands were processed so far.
processed_seqnum++;
finalise_command ();
}
void zmq::owned_t::process_attach (struct i_engine *engine_)
{
// Keep track of how many commands were processed so far.
processed_seqnum++;
finalise_command ();
}
void zmq::owned_t::term () void zmq::owned_t::term ()
{ {
send_term_req (owner, this); send_term_req (owner, this);
...@@ -64,11 +48,17 @@ void zmq::owned_t::process_term () ...@@ -64,11 +48,17 @@ void zmq::owned_t::process_term ()
{ {
zmq_assert (!shutting_down); zmq_assert (!shutting_down);
shutting_down = true; shutting_down = true;
finalise ();
}
finalise_command (); void zmq::owned_t::process_seqnum ()
{
// Catch up with counter of processed commands.
processed_seqnum++;
finalise ();
} }
void zmq::owned_t::finalise_command () void zmq::owned_t::finalise ()
{ {
// If termination request was already received and there are no more // If termination request was already received and there are no more
// commands to wait for, terminate the object. // commands to wait for, terminate the object.
......
...@@ -54,15 +54,6 @@ namespace zmq ...@@ -54,15 +54,6 @@ namespace zmq
// of the owned object correctly. // of the owned object correctly.
virtual ~owned_t (); virtual ~owned_t ();
// Handlers for incoming commands. It's vital that every I/O object
// invokes io_object_t::process_plug at the end of it's own plug
// handler.
void process_plug ();
// It's vital that session invokes io_object_t::process_attach
// at the end of it's own attach handler.
void process_attach (struct i_engine *engine_);
// io_object_t defines a new handler used to disconnect the object // io_object_t defines a new handler used to disconnect the object
// from the poller object. Implement the handlen in the derived // from the poller object. Implement the handlen in the derived
// classes to ensure sane cleanup. // classes to ensure sane cleanup.
...@@ -76,10 +67,9 @@ namespace zmq ...@@ -76,10 +67,9 @@ namespace zmq
// Handlers for incoming commands. // Handlers for incoming commands.
void process_term (); void process_term ();
void process_seqnum ();
// Generic command handler (to be called from all command handlers void finalise ();
// once the processing is done).
void finalise_command ();
// Sequence number of the last command sent to this object. // Sequence number of the last command sent to this object.
atomic_counter_t sent_seqnum; atomic_counter_t sent_seqnum;
......
...@@ -151,12 +151,9 @@ void zmq::session_t::process_plug () ...@@ -151,12 +151,9 @@ void zmq::session_t::process_plug ()
out_pipe->set_endpoint (this); out_pipe->set_endpoint (this);
} }
owner->inc_seqnum ();
send_bind (owner, outbound ? &outbound->reader : NULL, send_bind (owner, outbound ? &outbound->reader : NULL,
inbound ? &inbound->writer : NULL); inbound ? &inbound->writer : NULL);
} }
owned_t::process_plug ();
} }
void zmq::session_t::process_unplug () void zmq::session_t::process_unplug ()
...@@ -190,6 +187,4 @@ void zmq::session_t::process_attach (i_engine *engine_) ...@@ -190,6 +187,4 @@ void zmq::session_t::process_attach (i_engine *engine_)
zmq_assert (engine_); zmq_assert (engine_);
engine = engine_; engine = engine_;
engine->plug (this); engine->plug (this);
owned_t::process_attach (engine_);
} }
...@@ -161,7 +161,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -161,7 +161,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// was incremented in find_endpoint function. The callee is notified // was incremented in find_endpoint function. The callee is notified
// about the fact via the last parameter. // about the fact via the last parameter.
send_bind (peer, out_pipe ? &out_pipe->reader : NULL, send_bind (peer, out_pipe ? &out_pipe->reader : NULL,
in_pipe ? &in_pipe->writer : NULL); in_pipe ? &in_pipe->writer : NULL, false);
return 0; return 0;
} }
...@@ -247,8 +247,6 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -247,8 +247,6 @@ int zmq::socket_base_t::connect (const char *addr_)
return -1; return -1;
} }
// Reserve a sequence number for following 'attach' command.
session->inc_seqnum ();
send_attach (session, pgm_sender); send_attach (session, pgm_sender);
} }
else if (options.requires_in) { else if (options.requires_in) {
...@@ -264,8 +262,6 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -264,8 +262,6 @@ int zmq::socket_base_t::connect (const char *addr_)
return -1; return -1;
} }
// Reserve a sequence number for following 'attach' command.
session->inc_seqnum ();
send_attach (session, pgm_receiver); send_attach (session, pgm_receiver);
} }
else else
...@@ -511,7 +507,6 @@ void zmq::socket_base_t::process_own (owned_t *object_) ...@@ -511,7 +507,6 @@ void zmq::socket_base_t::process_own (owned_t *object_)
void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_) void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_)
{ {
processed_seqnum++;
attach_pipes (in_pipe_, out_pipe_); attach_pipes (in_pipe_, out_pipe_);
} }
...@@ -542,3 +537,8 @@ void zmq::socket_base_t::process_term_ack () ...@@ -542,3 +537,8 @@ void zmq::socket_base_t::process_term_ack ()
pending_term_acks--; pending_term_acks--;
} }
void zmq::socket_base_t::process_seqnum ()
{
processed_seqnum++;
}
...@@ -117,6 +117,7 @@ namespace zmq ...@@ -117,6 +117,7 @@ namespace zmq
void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_); void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_);
void process_term_req (class owned_t *object_); void process_term_req (class owned_t *object_);
void process_term_ack (); void process_term_ack ();
void process_seqnum ();
// List of all I/O objects owned by this socket. The socket is // List of all I/O objects owned by this socket. The socket is
// responsible for deallocating them before it quits. // responsible for deallocating them before it quits.
......
...@@ -49,7 +49,6 @@ void zmq::zmq_connecter_t::process_plug () ...@@ -49,7 +49,6 @@ void zmq::zmq_connecter_t::process_plug ()
add_timer (); add_timer ();
else else
start_connecting (); start_connecting ();
owned_t::process_plug ();
} }
void zmq::zmq_connecter_t::process_unplug () void zmq::zmq_connecter_t::process_unplug ()
......
...@@ -67,7 +67,8 @@ bool zmq::zmq_connecter_init_t::read (::zmq_msg_t *msg_) ...@@ -67,7 +67,8 @@ bool zmq::zmq_connecter_init_t::read (::zmq_msg_t *msg_)
zmq_assert (false); zmq_assert (false);
} }
send_attach (session, engine); // No need to increment seqnum as it was alredy incremented above.
send_attach (session, engine, false);
engine = NULL; engine = NULL;
// Destroy the init object. // Destroy the init object.
...@@ -113,7 +114,6 @@ void zmq::zmq_connecter_init_t::process_plug () ...@@ -113,7 +114,6 @@ void zmq::zmq_connecter_init_t::process_plug ()
{ {
zmq_assert (engine); zmq_assert (engine);
engine->plug (this); engine->plug (this);
owned_t::process_plug ();
} }
void zmq::zmq_connecter_init_t::process_unplug () void zmq::zmq_connecter_init_t::process_unplug ()
......
...@@ -44,8 +44,6 @@ void zmq::zmq_listener_t::process_plug () ...@@ -44,8 +44,6 @@ void zmq::zmq_listener_t::process_plug ()
// Start polling for incoming connections. // Start polling for incoming connections.
handle = add_fd (tcp_listener.get_fd ()); handle = add_fd (tcp_listener.get_fd ());
set_pollin (handle); set_pollin (handle);
owned_t::process_plug ();
} }
void zmq::zmq_listener_t::process_unplug () void zmq::zmq_listener_t::process_unplug ()
......
...@@ -83,7 +83,10 @@ void zmq::zmq_listener_init_t::flush () ...@@ -83,7 +83,10 @@ void zmq::zmq_listener_init_t::flush ()
// Reserve a sequence number for following 'attach' command. // Reserve a sequence number for following 'attach' command.
session->inc_seqnum (); session->inc_seqnum ();
} }
send_attach (session, engine);
// No need to increment seqnum as it was laready incremented above.
send_attach (session, engine, false);
engine = NULL; engine = NULL;
// Destroy the init object. // Destroy the init object.
...@@ -103,7 +106,6 @@ void zmq::zmq_listener_init_t::process_plug () ...@@ -103,7 +106,6 @@ void zmq::zmq_listener_init_t::process_plug ()
{ {
zmq_assert (engine); zmq_assert (engine);
engine->plug (this); engine->plug (this);
owned_t::process_plug ();
} }
void zmq::zmq_listener_init_t::process_unplug () void zmq::zmq_listener_init_t::process_unplug ()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment