Commit d13933bc authored by Martin Sustrik's avatar Martin Sustrik

I/O object hierarchy implemented

parent ee1f1af0
...@@ -55,6 +55,7 @@ libzmq_la_SOURCES = \ ...@@ -55,6 +55,7 @@ libzmq_la_SOURCES = \
blob.hpp \ blob.hpp \
command.hpp \ command.hpp \
config.hpp \ config.hpp \
connect_session.hpp \
ctx.hpp \ ctx.hpp \
decoder.hpp \ decoder.hpp \
devpoll.hpp \ devpoll.hpp \
...@@ -70,15 +71,17 @@ libzmq_la_SOURCES = \ ...@@ -70,15 +71,17 @@ libzmq_la_SOURCES = \
ip.hpp \ ip.hpp \
i_engine.hpp \ i_engine.hpp \
i_poll_events.hpp \ i_poll_events.hpp \
i_terminate_events.hpp \
kqueue.hpp \ kqueue.hpp \
lb.hpp \ lb.hpp \
likely.hpp \ likely.hpp \
msg_content.hpp \ msg_content.hpp \
msg_store.hpp \ msg_store.hpp \
mutex.hpp \ mutex.hpp \
named_session.hpp \
object.hpp \ object.hpp \
options.hpp \ options.hpp \
owned.hpp \ own.hpp \
pgm_receiver.hpp \ pgm_receiver.hpp \
pgm_sender.hpp \ pgm_sender.hpp \
pgm_socket.hpp \ pgm_socket.hpp \
...@@ -106,6 +109,7 @@ libzmq_la_SOURCES = \ ...@@ -106,6 +109,7 @@ libzmq_la_SOURCES = \
tcp_listener.hpp \ tcp_listener.hpp \
tcp_socket.hpp \ tcp_socket.hpp \
thread.hpp \ thread.hpp \
transient_session.hpp \
uuid.hpp \ uuid.hpp \
windows.hpp \ windows.hpp \
wire.hpp \ wire.hpp \
...@@ -123,6 +127,7 @@ libzmq_la_SOURCES = \ ...@@ -123,6 +127,7 @@ libzmq_la_SOURCES = \
zmq_listener.hpp \ zmq_listener.hpp \
command.cpp \ command.cpp \
ctx.cpp \ ctx.cpp \
connect_session.cpp \
devpoll.cpp \ devpoll.cpp \
epoll.cpp \ epoll.cpp \
err.cpp \ err.cpp \
...@@ -134,9 +139,10 @@ libzmq_la_SOURCES = \ ...@@ -134,9 +139,10 @@ libzmq_la_SOURCES = \
kqueue.cpp \ kqueue.cpp \
lb.cpp \ lb.cpp \
msg_store.cpp \ msg_store.cpp \
named_session.cpp \
object.cpp \ object.cpp \
options.cpp \ options.cpp \
owned.cpp \ own.cpp \
pair.cpp \ pair.cpp \
pgm_receiver.cpp \ pgm_receiver.cpp \
pgm_sender.cpp \ pgm_sender.cpp \
...@@ -160,6 +166,7 @@ libzmq_la_SOURCES = \ ...@@ -160,6 +166,7 @@ libzmq_la_SOURCES = \
tcp_listener.cpp \ tcp_listener.cpp \
tcp_socket.cpp \ tcp_socket.cpp \
thread.cpp \ thread.cpp \
transient_session.cpp \
uuid.cpp \ uuid.cpp \
xrep.cpp \ xrep.cpp \
xreq.cpp \ xreq.cpp \
......
...@@ -61,7 +61,7 @@ namespace zmq ...@@ -61,7 +61,7 @@ namespace zmq
// Sent to socket to let it know about the newly created object. // Sent to socket to let it know about the newly created object.
struct { struct {
class owned_t *object; class own_t *object;
} own; } own;
// Attach the engine to the session. // Attach the engine to the session.
...@@ -104,7 +104,7 @@ namespace zmq ...@@ -104,7 +104,7 @@ namespace zmq
// Sent by I/O object ot the socket to request the shutdown of // Sent by I/O object ot the socket to request the shutdown of
// the I/O object. // the I/O object.
struct { struct {
class owned_t *object; class own_t *object;
} term_req; } term_req;
// Sent by socket to I/O object to start its shutdown. // Sent by socket to I/O object to start its shutdown.
......
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "connect_session.hpp"
#include "zmq_connecter.hpp"
zmq::connect_session_t::connect_session_t (class io_thread_t *io_thread_,
class socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
session_t (io_thread_, socket_, options_),
protocol (protocol_),
address (address_)
{
}
zmq::connect_session_t::~connect_session_t ()
{
}
void zmq::connect_session_t::process_plug ()
{
// Start connection process immediately.
start_connecting ();
}
void zmq::connect_session_t::start_connecting ()
{
// Create the connecter object.
// Both TCP and IPC transports are using the same infrastructure.
if (protocol == "tcp" || protocol == "ipc") {
zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t (
choose_io_thread (options.affinity), this, options,
protocol.c_str (), address.c_str ());
zmq_assert (connecter);
launch_child (connecter);
return;
}
#if defined ZMQ_HAVE_OPENPGM
// Both PGM and EPGM transports are using the same infrastructure.
if (addr_type == "pgm" || addr_type == "epgm") {
// For EPGM transport with UDP encapsulation of PGM is used.
bool udp_encapsulation = (addr_type == "epgm");
// At this point we'll create message pipes to the session straight
// away. There's no point in delaying it as no concept of 'connect'
// exists with PGM anyway.
if (options.requires_out) {
// PGM sender.
pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
choose_io_thread (options.affinity), options);
zmq_assert (pgm_sender);
int rc = pgm_sender->init (udp_encapsulation, addr_args.c_str ());
if (rc != 0) {
delete pgm_sender;
return -1;
}
send_attach (this, pgm_sender, blob_t ());
}
else if (options.requires_in) {
// PGM receiver.
pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
choose_io_thread (options.affinity), options);
zmq_assert (pgm_receiver);
int rc = pgm_receiver->init (udp_encapsulation, addr_args.c_str ());
if (rc != 0) {
delete pgm_receiver;
return -1;
}
send_attach (this, pgm_receiver, blob_t ());
}
else
zmq_assert (false);
return;
}
#endif
zmq_assert (false);
}
void zmq::connect_session_t::detach ()
{
// Clean up the mess left over by the failed connection.
clean_pipes ();
// Reconnect.
start_connecting ();
}
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_CONNECT_SESSION_HPP_INCLUDED__
#define __ZMQ_CONNECT_SESSION_HPP_INCLUDED__
#include <string>
#include "session.hpp"
namespace zmq
{
// Connect session contains an address to connect to. On disconnect it
// attempts to reconnect.
class connect_session_t : public session_t
{
public:
connect_session_t (class io_thread_t *io_thread_,
class socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_);
~connect_session_t ();
// i_inout interface implementation.
void detach ();
private:
// Start the connection process.
void start_connecting ();
// Command handlers.
void process_plug ();
// Address to connect to.
std::string protocol;
std::string address;
};
}
#endif
...@@ -119,6 +119,7 @@ int zmq::ctx_t::term () ...@@ -119,6 +119,7 @@ int zmq::ctx_t::term ()
// We don't even have to synchronise access to data. // We don't even have to synchronise access to data.
zmq_assert (sockets.empty ()); zmq_assert (sockets.empty ());
// TODO: We are accessing the list of zombies in unsynchronised manner here!
// Get rid of remaining zombie sockets. // Get rid of remaining zombie sockets.
while (!zombies.empty ()) { while (!zombies.empty ()) {
dezombify (); dezombify ();
...@@ -173,7 +174,7 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) ...@@ -173,7 +174,7 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
return s; return s;
} }
void zmq::ctx_t::zombify (socket_base_t *socket_) void zmq::ctx_t::zombify_socket (socket_base_t *socket_)
{ {
// Zombification of socket basically means that its ownership is tranferred // Zombification of socket basically means that its ownership is tranferred
// from the application that created it to the context. // from the application that created it to the context.
...@@ -284,7 +285,8 @@ zmq::socket_base_t *zmq::ctx_t::find_endpoint (const char *addr_) ...@@ -284,7 +285,8 @@ zmq::socket_base_t *zmq::ctx_t::find_endpoint (const char *addr_)
void zmq::ctx_t::dezombify () void zmq::ctx_t::dezombify ()
{ {
// Try to dezombify each zombie in the list. // Try to dezombify each zombie in the list. Note that caller is
// responsible for calling this method in the slot_sync critical section.
for (zombies_t::size_type i = 0; i != zombies.size ();) for (zombies_t::size_type i = 0; i != zombies.size ();)
if (zombies [i]->dezombify ()) { if (zombies [i]->dezombify ()) {
empty_slots.push_back (zombies [i]->get_slot ()); empty_slots.push_back (zombies [i]->get_slot ());
......
...@@ -58,7 +58,7 @@ namespace zmq ...@@ -58,7 +58,7 @@ namespace zmq
class socket_base_t *create_socket (int type_); class socket_base_t *create_socket (int type_);
// Make socket a zombie. // Make socket a zombie.
void zombify (socket_base_t *socket_); void zombify_socket (socket_base_t *socket_);
// Send command to the destination slot. // Send command to the destination slot.
void send_command (uint32_t slot_, const command_t &command_); void send_command (uint32_t slot_, const command_t &command_);
......
...@@ -22,11 +22,14 @@ ...@@ -22,11 +22,14 @@
#include "fq.hpp" #include "fq.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "err.hpp" #include "err.hpp"
#include "i_terminate_events.hpp"
zmq::fq_t::fq_t () : zmq::fq_t::fq_t (i_terminate_events *sink_) :
active (0), active (0),
current (0), current (0),
more (false) more (false),
sink (sink_),
terminating (false)
{ {
} }
...@@ -42,6 +45,10 @@ void zmq::fq_t::attach (reader_t *pipe_) ...@@ -42,6 +45,10 @@ void zmq::fq_t::attach (reader_t *pipe_)
pipes.push_back (pipe_); pipes.push_back (pipe_);
pipes.swap (active, pipes.size () - 1); pipes.swap (active, pipes.size () - 1);
active++; active++;
// If we are already terminating, ask the pipe to terminate straight away.
if (terminating)
pipe_->terminate ();
} }
void zmq::fq_t::terminated (reader_t *pipe_) void zmq::fq_t::terminated (reader_t *pipe_)
...@@ -59,15 +66,15 @@ void zmq::fq_t::terminated (reader_t *pipe_) ...@@ -59,15 +66,15 @@ void zmq::fq_t::terminated (reader_t *pipe_)
current = 0; current = 0;
} }
pipes.erase (pipe_); pipes.erase (pipe_);
}
bool zmq::fq_t::has_pipes () if (terminating && pipes.empty ())
{ sink->terminated ();
return !pipes.empty ();
} }
void zmq::fq_t::term_pipes () void zmq::fq_t::terminate ()
{ {
terminating = true;
for (pipes_t::size_type i = 0; i != pipes.size (); i++) for (pipes_t::size_type i = 0; i != pipes.size (); i++)
pipes [i]->terminate (); pipes [i]->terminate ();
} }
......
...@@ -33,12 +33,11 @@ namespace zmq ...@@ -33,12 +33,11 @@ namespace zmq
{ {
public: public:
fq_t (); fq_t (struct i_terminate_events *sink_);
~fq_t (); ~fq_t ();
void attach (reader_t *pipe_); void attach (reader_t *pipe_);
bool has_pipes (); void terminate ();
void term_pipes ();
int recv (zmq_msg_t *msg_, int flags_); int recv (zmq_msg_t *msg_, int flags_);
bool has_in (); bool has_in ();
...@@ -64,6 +63,12 @@ namespace zmq ...@@ -64,6 +63,12 @@ namespace zmq
// there are following parts still waiting in the current pipe. // there are following parts still waiting in the current pipe.
bool more; bool more;
// Object to send events to.
i_terminate_events *sink;
// If true, termination process is already underway.
bool terminating;
fq_t (const fq_t&); fq_t (const fq_t&);
void operator = (const fq_t&); void operator = (const fq_t&);
}; };
......
...@@ -20,8 +20,6 @@ ...@@ -20,8 +20,6 @@
#ifndef __ZMQ_I_ENGINE_HPP_INCLUDED__ #ifndef __ZMQ_I_ENGINE_HPP_INCLUDED__
#define __ZMQ_I_ENGINE_HPP_INCLUDED__ #define __ZMQ_I_ENGINE_HPP_INCLUDED__
#include <stddef.h>
namespace zmq namespace zmq
{ {
...@@ -30,18 +28,19 @@ namespace zmq ...@@ -30,18 +28,19 @@ namespace zmq
virtual ~i_engine () {} virtual ~i_engine () {}
// Plug the engine to the session. // Plug the engine to the session.
virtual void plug (struct i_inout *inout_) = 0; virtual void plug (class io_thread_t *io_thread_,
struct i_inout *inout_) = 0;
// Unplug the engine from the session. // Unplug the engine from the session.
virtual void unplug () = 0; virtual void unplug () = 0;
// This method is called by the session to signalise that there
// are messages to send available.
virtual void revive () = 0;
// This method is called by the session to signalise that more // This method is called by the session to signalise that more
// messages can be written to the pipe. // messages can be written to the pipe.
virtual void resume_input () = 0; virtual void activate_in () = 0;
// This method is called by the session to signalise that there
// are messages to send available.
virtual void activate_out () = 0;
}; };
} }
......
...@@ -31,28 +31,17 @@ namespace zmq ...@@ -31,28 +31,17 @@ namespace zmq
{ {
virtual ~i_inout () {} virtual ~i_inout () {}
// Engine asks to get a message to send to the network. // Engine asks for a message to send to the network.
virtual bool read (::zmq_msg_t *msg_) = 0; virtual bool read (::zmq_msg_t *msg_) = 0;
// Engine sends the incoming message further on downstream. // Engine received message from the network and sends it further on.
virtual bool write (::zmq_msg_t *msg_) = 0; virtual bool write (::zmq_msg_t *msg_) = 0;
// Flush all the previously written messages downstream. // Flush all the previously written messages.
virtual void flush () = 0; virtual void flush () = 0;
// Drop all the references to the engine. The parameter is the object
// to use to reconnect. If reconnection is not required, the argument
// is set to NULL.
virtual void detach (class owned_t *reconnecter_) = 0;
// Returns least loaded I/O thread. // Engine is dead. Drop all the references to it.
virtual class io_thread_t *get_io_thread () = 0; virtual void detach () = 0;
// Return pointer to the owning socket.
virtual class socket_base_t *get_owner () = 0;
// Return ordinal number of the session.
virtual uint64_t get_ordinal () = 0;
}; };
} }
......
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_I_TERMINATE_EVENTS_HPP_INCLUDED__
#define __ZMQ_I_TERMINATE_EVENTS_HPP_INCLUDED__
namespace zmq
{
// Algorithms such as fair queueing (fq_t) and load balancing (lb_t)
// use this interface to communicate termination event to the socket.
struct i_terminate_events
{
virtual ~i_terminate_events () {}
virtual void terminated () = 0;
};
}
#endif
...@@ -21,21 +21,35 @@ ...@@ -21,21 +21,35 @@
#include "io_thread.hpp" #include "io_thread.hpp"
#include "err.hpp" #include "err.hpp"
zmq::io_object_t::io_object_t (io_thread_t *io_thread_) zmq::io_object_t::io_object_t (io_thread_t *io_thread_) :
poller (NULL)
{ {
// Retrieve the poller from the thread we are running in. if (io_thread_)
poller = io_thread_->get_poller (); plug (io_thread_);
} }
zmq::io_object_t::~io_object_t () zmq::io_object_t::~io_object_t ()
{ {
} }
void zmq::io_object_t::set_io_thread (io_thread_t *io_thread_) void zmq::io_object_t::plug (io_thread_t *io_thread_)
{ {
zmq_assert (io_thread_);
zmq_assert (!poller);
// Retrieve the poller from the thread we are running in.
poller = io_thread_->get_poller (); poller = io_thread_->get_poller ();
} }
void zmq::io_object_t::unplug ()
{
zmq_assert (poller);
// Forget about old poller in preparation to be migrated
// to a different I/O thread.
poller = NULL;
}
zmq::io_object_t::handle_t zmq::io_object_t::add_fd (fd_t fd_) zmq::io_object_t::handle_t zmq::io_object_t::add_fd (fd_t fd_)
{ {
return poller->add_fd (fd_, this); return poller->add_fd (fd_, this);
......
...@@ -40,15 +40,15 @@ namespace zmq ...@@ -40,15 +40,15 @@ namespace zmq
io_object_t (class io_thread_t *io_thread_ = NULL); io_object_t (class io_thread_t *io_thread_ = NULL);
~io_object_t (); ~io_object_t ();
// When migrating an object from one I/O thread to another, first
// unplug it, then migrate it, then plug it to the new thread.
void plug (class io_thread_t *io_thread_);
void unplug ();
protected: protected:
typedef poller_t::handle_t handle_t; typedef poller_t::handle_t handle_t;
// Derived class can init/swap the underlying I/O thread.
// Caution: Remove all the file descriptors from the old I/O thread
// before swapping to the new one!
void set_io_thread (class io_thread_t *io_thread_);
// Methods to access underlying poller object. // Methods to access underlying poller object.
handle_t add_fd (fd_t fd_); handle_t add_fd (fd_t fd_);
void rm_fd (handle_t handle_); void rm_fd (handle_t handle_);
......
...@@ -22,11 +22,14 @@ ...@@ -22,11 +22,14 @@
#include "lb.hpp" #include "lb.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "err.hpp" #include "err.hpp"
#include "i_terminate_events.hpp"
zmq::lb_t::lb_t () : zmq::lb_t::lb_t (i_terminate_events *sink_) :
active (0), active (0),
current (0), current (0),
more (false) more (false),
sink (sink_),
terminating (false)
{ {
} }
...@@ -42,17 +45,22 @@ void zmq::lb_t::attach (writer_t *pipe_) ...@@ -42,17 +45,22 @@ void zmq::lb_t::attach (writer_t *pipe_)
pipes.push_back (pipe_); pipes.push_back (pipe_);
pipes.swap (active, pipes.size () - 1); pipes.swap (active, pipes.size () - 1);
active++; active++;
if (terminating)
pipe_->terminate ();
} }
void zmq::lb_t::term_pipes () void zmq::lb_t::terminate ()
{ {
terminating = true;
for (pipes_t::size_type i = 0; i != pipes.size (); i++) for (pipes_t::size_type i = 0; i != pipes.size (); i++)
pipes [i]->terminate (); pipes [i]->terminate ();
} }
void zmq::lb_t::terminated (writer_t *pipe_) void zmq::lb_t::terminated (writer_t *pipe_)
{ {
// ??? // TODO: ???
zmq_assert (!more || pipes [current] != pipe_); zmq_assert (!more || pipes [current] != pipe_);
// Remove the pipe from the list; adjust number of active pipes // Remove the pipe from the list; adjust number of active pipes
...@@ -63,11 +71,9 @@ void zmq::lb_t::terminated (writer_t *pipe_) ...@@ -63,11 +71,9 @@ void zmq::lb_t::terminated (writer_t *pipe_)
current = 0; current = 0;
} }
pipes.erase (pipe_); pipes.erase (pipe_);
}
bool zmq::lb_t::has_pipes () if (terminating && pipes.empty ())
{ sink->terminated ();
return !pipes.empty ();
} }
void zmq::lb_t::activated (writer_t *pipe_) void zmq::lb_t::activated (writer_t *pipe_)
......
...@@ -32,12 +32,11 @@ namespace zmq ...@@ -32,12 +32,11 @@ namespace zmq
{ {
public: public:
lb_t (); lb_t (struct i_terminate_events *sink_);
~lb_t (); ~lb_t ();
void attach (writer_t *pipe_); void attach (writer_t *pipe_);
void term_pipes (); void terminate ();
bool has_pipes ();
int send (zmq_msg_t *msg_, int flags_); int send (zmq_msg_t *msg_, int flags_);
bool has_out (); bool has_out ();
...@@ -61,6 +60,12 @@ namespace zmq ...@@ -61,6 +60,12 @@ namespace zmq
// True if last we are in the middle of a multipart message. // True if last we are in the middle of a multipart message.
bool more; bool more;
// Object to send events to.
struct i_terminate_events *sink;
// If true, termination process is already underway.
bool terminating;
lb_t (const lb_t&); lb_t (const lb_t&);
void operator = (const lb_t&); void operator = (const lb_t&);
}; };
......
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "named_session.hpp"
#include "socket_base.hpp"
/*
zmq::named_session_t::named_session_t (class io_thread_t *io_thread_,
socket_base_t *socket_, const options_t &options_,
const blob_t &name_) :
session_t (io_thread_, socket_, options_),
name (name_)
{
// Make double sure that the session has valid name.
zmq_assert (!name.empty ());
zmq_assert (name [0] != 0);
if (!socket_->register_session (name, this)) {
// TODO: There's already a session with the specified
// identity. We should log the error and drop the
// session.
zmq_assert (false);
}
}
zmq::named_session_t::~named_session_t ()
{
}
void zmq::named_session_t::detach ()
{
// TODO:
zmq_assert (false);
}
void zmq::named_session_t::attached (const blob_t &peer_identity_)
{
if (!peer_identity.empty ()) {
// If both IDs are temporary, no checking is needed.
// TODO: Old ID should be reused in this case...
if (peer_identity.empty () || peer_identity [0] != 0 ||
peer_identity_.empty () || peer_identity_ [0] != 0) {
// If we already know the peer name do nothing, just check whether
// it haven't changed.
zmq_assert (peer_identity == peer_identity_);
}
}
else if (!peer_identity_.empty ()) {
// Store the peer identity.
peer_identity = peer_identity_;
// Register the session using the peer name.
if (!register_session (peer_identity, this)) {
// TODO: There's already a session with the specified
// identity. We should presumably syslog it and drop the
// session.
zmq_assert (false);
}
}
}
void zmq::named_session_t::detached ()
{
socket->unregister_session (peer_identity);
}
*/
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_NAMED_SESSION_HPP_INCLUDED__
#define __ZMQ_NAMED_SESSION_HPP_INCLUDED__
#include "session.hpp"
#include "blob.hpp"
namespace zmq
{
// Named session is created by listener object when the peer identifies
// itself by a strong name. Named session survives reconnections.
class named_session_t : public session_t
{
public:
named_session_t (class io_thread_t *io_thread_,
class socket_base_t *socket_, const options_t &options_,
const blob_t &name_);
~named_session_t ();
// i_inout interface implementation.
void detach ();
// Handle events from session_t base class.
void attached (const blob_t &peer_identity_);
void detached ();
private:
// Name of the session. Corresponds to the peer's strong identity.
blob_t name;
};
}
#endif
...@@ -24,7 +24,6 @@ ...@@ -24,7 +24,6 @@
#include "err.hpp" #include "err.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "owned.hpp"
#include "session.hpp" #include "session.hpp"
#include "socket_base.hpp" #include "socket_base.hpp"
...@@ -143,9 +142,9 @@ zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_) ...@@ -143,9 +142,9 @@ zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_)
return ctx->choose_io_thread (taskset_); return ctx->choose_io_thread (taskset_);
} }
void zmq::object_t::zombify (socket_base_t *socket_) void zmq::object_t::zombify_socket (socket_base_t *socket_)
{ {
ctx->zombify (socket_); ctx->zombify_socket (socket_);
} }
void zmq::object_t::send_stop () void zmq::object_t::send_stop ()
...@@ -158,7 +157,7 @@ void zmq::object_t::send_stop () ...@@ -158,7 +157,7 @@ void zmq::object_t::send_stop ()
ctx->send_command (slot, cmd); ctx->send_command (slot, cmd);
} }
void zmq::object_t::send_plug (owned_t *destination_, bool inc_seqnum_) void zmq::object_t::send_plug (own_t *destination_, bool inc_seqnum_)
{ {
if (inc_seqnum_) if (inc_seqnum_)
destination_->inc_seqnum (); destination_->inc_seqnum ();
...@@ -169,7 +168,7 @@ void zmq::object_t::send_plug (owned_t *destination_, bool inc_seqnum_) ...@@ -169,7 +168,7 @@ void zmq::object_t::send_plug (owned_t *destination_, bool inc_seqnum_)
send_command (cmd); send_command (cmd);
} }
void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_) void zmq::object_t::send_own (own_t *destination_, own_t *object_)
{ {
destination_->inc_seqnum (); destination_->inc_seqnum ();
command_t cmd; command_t cmd;
...@@ -206,9 +205,8 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, ...@@ -206,9 +205,8 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
send_command (cmd); send_command (cmd);
} }
void zmq::object_t::send_bind (socket_base_t *destination_, void zmq::object_t::send_bind (own_t *destination_, reader_t *in_pipe_,
reader_t *in_pipe_, writer_t *out_pipe_, const blob_t &peer_identity_, writer_t *out_pipe_, const blob_t &peer_identity_, bool inc_seqnum_)
bool inc_seqnum_)
{ {
if (inc_seqnum_) if (inc_seqnum_)
destination_->inc_seqnum (); destination_->inc_seqnum ();
...@@ -269,8 +267,8 @@ void zmq::object_t::send_pipe_term_ack (reader_t *destination_) ...@@ -269,8 +267,8 @@ void zmq::object_t::send_pipe_term_ack (reader_t *destination_)
send_command (cmd); send_command (cmd);
} }
void zmq::object_t::send_term_req (socket_base_t *destination_, void zmq::object_t::send_term_req (own_t *destination_,
owned_t *object_) own_t *object_)
{ {
command_t cmd; command_t cmd;
cmd.destination = destination_; cmd.destination = destination_;
...@@ -279,7 +277,7 @@ void zmq::object_t::send_term_req (socket_base_t *destination_, ...@@ -279,7 +277,7 @@ void zmq::object_t::send_term_req (socket_base_t *destination_,
send_command (cmd); send_command (cmd);
} }
void zmq::object_t::send_term (owned_t *destination_) void zmq::object_t::send_term (own_t *destination_)
{ {
command_t cmd; command_t cmd;
cmd.destination = destination_; cmd.destination = destination_;
...@@ -287,7 +285,7 @@ void zmq::object_t::send_term (owned_t *destination_) ...@@ -287,7 +285,7 @@ void zmq::object_t::send_term (owned_t *destination_)
send_command (cmd); send_command (cmd);
} }
void zmq::object_t::send_term_ack (socket_base_t *destination_) void zmq::object_t::send_term_ack (own_t *destination_)
{ {
command_t cmd; command_t cmd;
cmd.destination = destination_; cmd.destination = destination_;
...@@ -305,7 +303,7 @@ void zmq::object_t::process_plug () ...@@ -305,7 +303,7 @@ void zmq::object_t::process_plug ()
zmq_assert (false); zmq_assert (false);
} }
void zmq::object_t::process_own (owned_t *object_) void zmq::object_t::process_own (own_t *object_)
{ {
zmq_assert (false); zmq_assert (false);
} }
...@@ -342,7 +340,7 @@ void zmq::object_t::process_pipe_term_ack () ...@@ -342,7 +340,7 @@ void zmq::object_t::process_pipe_term_ack ()
zmq_assert (false); zmq_assert (false);
} }
void zmq::object_t::process_term_req (owned_t *object_) void zmq::object_t::process_term_req (own_t *object_)
{ {
zmq_assert (false); zmq_assert (false);
} }
......
...@@ -53,18 +53,19 @@ namespace zmq ...@@ -53,18 +53,19 @@ namespace zmq
// Zombify particular socket. In other words, pass the ownership to // Zombify particular socket. In other words, pass the ownership to
// the context. // the context.
void zombify (class socket_base_t *socket_); void zombify_socket (class socket_base_t *socket_);
// Derived object can use these functions to send commands // Derived object can use these functions to send commands
// to other objects. // to other objects.
void send_stop (); void send_stop ();
void send_plug (class owned_t *destination_, bool inc_seqnum_ = true); void send_plug (class own_t *destination_,
void send_own (class socket_base_t *destination_, bool inc_seqnum_ = true);
class owned_t *object_); void send_own (class own_t *destination_,
class own_t *object_);
void send_attach (class session_t *destination_, void send_attach (class session_t *destination_,
struct i_engine *engine_, const blob_t &peer_identity_, struct i_engine *engine_, const blob_t &peer_identity_,
bool inc_seqnum_ = true); bool inc_seqnum_ = true);
void send_bind (class socket_base_t *destination_, void send_bind (class own_t *destination_,
class reader_t *in_pipe_, class writer_t *out_pipe_, class reader_t *in_pipe_, class writer_t *out_pipe_,
const blob_t &peer_identity_, bool inc_seqnum_ = true); const blob_t &peer_identity_, bool inc_seqnum_ = true);
void send_revive (class object_t *destination_); void send_revive (class object_t *destination_);
...@@ -72,16 +73,16 @@ namespace zmq ...@@ -72,16 +73,16 @@ namespace zmq
uint64_t msgs_read_); uint64_t msgs_read_);
void send_pipe_term (class writer_t *destination_); void send_pipe_term (class writer_t *destination_);
void send_pipe_term_ack (class reader_t *destination_); void send_pipe_term_ack (class reader_t *destination_);
void send_term_req (class socket_base_t *destination_, void send_term_req (class own_t *destination_,
class owned_t *object_); class own_t *object_);
void send_term (class owned_t *destination_); void send_term (class own_t *destination_);
void send_term_ack (class socket_base_t *destination_); void send_term_ack (class own_t *destination_);
// These handlers can be overloaded by the derived objects. They are // These handlers can be overloaded by the derived objects. They are
// called when command arrives from another thread. // called when command arrives from another thread.
virtual void process_stop (); virtual void process_stop ();
virtual void process_plug (); virtual void process_plug ();
virtual void process_own (class owned_t *object_); virtual void process_own (class own_t *object_);
virtual void process_attach (struct i_engine *engine_, virtual void process_attach (struct i_engine *engine_,
const blob_t &peer_identity_); const blob_t &peer_identity_);
virtual void process_bind (class reader_t *in_pipe_, virtual void process_bind (class reader_t *in_pipe_,
...@@ -90,7 +91,7 @@ namespace zmq ...@@ -90,7 +91,7 @@ namespace zmq
virtual void process_reader_info (uint64_t msgs_read_); virtual void process_reader_info (uint64_t msgs_read_);
virtual void process_pipe_term (); virtual void process_pipe_term ();
virtual void process_pipe_term_ack (); virtual void process_pipe_term_ack ();
virtual void process_term_req (class owned_t *object_); virtual void process_term_req (class own_t *object_);
virtual void process_term (); virtual void process_term ();
virtual void process_term_ack (); virtual void process_term_ack ();
......
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "own.hpp"
#include "err.hpp"
#include "io_thread.hpp"
zmq::own_t::own_t (class ctx_t *parent_, uint32_t slot_) :
object_t (parent_, slot_),
terminating (false),
sent_seqnum (0),
processed_seqnum (0),
owner (NULL),
term_acks (0)
{
}
zmq::own_t::own_t (io_thread_t *io_thread_) :
object_t (io_thread_),
terminating (false),
sent_seqnum (0),
processed_seqnum (0),
owner (NULL),
term_acks (0)
{
}
zmq::own_t::~own_t ()
{
}
void zmq::own_t::set_owner (own_t *owner_)
{
zmq_assert (!owner);
owner = owner_;
}
void zmq::own_t::inc_seqnum ()
{
// This function may be called from a different thread!
sent_seqnum.add (1);
}
void zmq::own_t::process_seqnum ()
{
// Catch up with counter of processed commands.
processed_seqnum++;
// We may have catched up and still have pending terms acks.
check_term_acks ();
}
void zmq::own_t::launch_child (own_t *object_)
{
// Specify the owner of the object.
object_->set_owner (this);
// Plug the object into the I/O thread.
send_plug (object_);
// Take ownership of the object.
send_own (this, object_);
}
void zmq::own_t::launch_sibling (own_t *object_)
{
// Specify the owner of the object.
object_->set_owner (owner);
// Plug the object into its I/O thread.
send_plug (object_);
// Take ownership of the object.
send_own (owner, object_);
}
void zmq::own_t::process_term_req (own_t *object_)
{
// When shutting down we can ignore termination requests from owned
// objects. The termination request was already sent to the object.
if (terminating)
return;
// If I/O object is well and alive let's ask it to terminate.
owned_t::iterator it = std::find (owned.begin (), owned.end (), object_);
// If not found, we assume that termination request was already sent to
// the object so we can safely ignore the request.
if (it == owned.end ())
return;
owned.erase (it);
register_term_acks (1);
send_term (object_);
}
void zmq::own_t::process_own (own_t *object_)
{
// If the object is already being shut down, new owned objects are
// immediately asked to terminate.
if (terminating) {
register_term_acks (1);
send_term (object_);
return;
}
// Store the reference to the owned object.
owned.insert (object_);
}
void zmq::own_t::terminate ()
{
// If termination is already underway, there's no point
// in starting it anew.
if (terminating)
return;
// As for the root of the ownership tree, there's noone to terminate it,
// so it has to terminate itself.
if (!owner) {
process_term ();
return;
}
// If I am an owned object, I'll ask my owner to terminate me.
send_term_req (owner, this);
}
void zmq::own_t::process_term ()
{
// Double termination should never happen.
zmq_assert (!terminating);
// Send termination request to all owned objects.
for (owned_t::iterator it = owned.begin (); it != owned.end (); it++)
send_term (*it);
register_term_acks (owned.size ());
owned.clear ();
// Start termination process and check whether by chance we cannot
// terminate immediately.
terminating = true;
check_term_acks ();
}
void zmq::own_t::register_term_acks (int count_)
{
term_acks += count_;
}
void zmq::own_t::unregister_term_ack ()
{
zmq_assert (term_acks > 0);
term_acks--;
// This may be a last ack we are waiting for before termination...
check_term_acks ();
}
void zmq::own_t::process_term_ack ()
{
unregister_term_ack ();
}
void zmq::own_t::check_term_acks ()
{
if (terminating && processed_seqnum == sent_seqnum.get () &&
term_acks == 0) {
// Sanity check. There should be no active children at this point.
zmq_assert (owned.empty ());
// The root object has nobody to confirm the termination to.
// Other nodes will confirm the termination to the owner.
if (owner)
send_term_ack (owner);
// Deallocate the resources.
delete this;
}
}
...@@ -17,26 +17,35 @@ ...@@ -17,26 +17,35 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef __ZMQ_OWNED_HPP_INCLUDED__ #ifndef __ZMQ_OWN_HPP_INCLUDED__
#define __ZMQ_OWNED_HPP_INCLUDED__ #define __ZMQ_OWN_HPP_INCLUDED__
#include "socket_base.hpp" #include <set>
#include <algorithm>
#include "object.hpp"
#include "atomic_counter.hpp" #include "atomic_counter.hpp"
#include "stdint.hpp" #include "stdint.hpp"
namespace zmq namespace zmq
{ {
// Base class for objects owned by individual sockets. Handles // Base class for objects forming a part of ownership hierarchy.
// initialisation and destruction of such objects. // It handles initialisation and destruction of such objects.
class owned_t : public object_t class own_t : public object_t
{ {
public: public:
// The object will live in parent's thread, however, its lifetime // Note that the owner is unspecified in the constructor.
// will be managed by its owner socket. // It'll be supplied later on when the object is plugged in.
owned_t (object_t *parent_, socket_base_t *owner_);
// The object is not living within an I/O thread. It has it's own
// thread outside of 0MQ infrastructure.
own_t (class ctx_t *parent_, uint32_t slot_);
// The object is living within I/O thread.
own_t (class io_thread_t *io_thread_);
// When another owned object wants to send command to this object // When another owned object wants to send command to this object
// it calls this function to let it know it should not shut down // it calls this function to let it know it should not shut down
...@@ -45,48 +54,77 @@ namespace zmq ...@@ -45,48 +54,77 @@ namespace zmq
protected: protected:
// A mechanism allowing derived owned objects to postpone the // Launch the supplied object and become its owner.
// termination process. Default implementation defines no such delay. void launch_child (own_t *object_);
// Note that the derived object has to call finalise method when the
// delay is over.
virtual bool is_terminable ();
void finalise ();
// Ask owner socket to terminate this object. // Launch the supplied object and make it your sibling (make your
void term (); // owner become its owner as well).
void launch_sibling (own_t *object_);
// Derived object destroys owned_t. No point in allowing others to // Ask owner object to terminate this object. It may take a while
// invoke the destructor. At the same time, it has to be virtual so // while actual termination is started. This function should not be
// that generic owned_t deallocation mechanism destroys specific type // called more than once.
// of the owned object correctly. void terminate ();
virtual ~owned_t ();
// io_object_t defines a new handler used to disconnect the object // Derived object destroys own_t. There's no point in allowing
// from the poller object. Implement the handlen in the derived // others to invoke the destructor. At the same time, it has to be
// classes to ensure sane cleanup. // virtual so that generic own_t deallocation mechanism destroys
virtual void process_unplug () = 0; // specific type of the owned object correctly.
virtual ~own_t ();
// Socket owning this object. When the socket is being closed it's // Term handler is protocted rather than private so that it can
// responsible for shutting down this object. // be intercepted by the derived class. This is useful to add custom
socket_base_t *owner; // steps to the beginning of the termination process.
void process_term ();
// Use following two functions to wait for arbitrary events before
// terminating. Just add number of events to wait for using
// register_tem_acks functions. When event occurs, call
// remove_term_ack. When number of pending acks reaches zero
// object will be deallocated.
void register_term_acks (int count_);
void unregister_term_ack ();
private: private:
// Set owner of the object
void set_owner (own_t *owner_);
// Handlers for incoming commands. // Handlers for incoming commands.
void process_term (); void process_own (own_t *object_);
void process_term_req (own_t *object_);
void process_term_ack ();
void process_seqnum (); void process_seqnum ();
// Check whether all the peding term acks were delivered.
// If so, deallocate this object.
void check_term_acks ();
// True if termination was already initiated. If so, we can destroy
// the object if there are no more child objects or pending term acks.
bool terminating;
// Sequence number of the last command sent to this object. // Sequence number of the last command sent to this object.
atomic_counter_t sent_seqnum; atomic_counter_t sent_seqnum;
// Sequence number of the last command processed by this object. // Sequence number of the last command processed by this object.
uint64_t processed_seqnum; uint64_t processed_seqnum;
// If true, the object is already shutting down. // Socket owning this object. It's responsible for shutting down
bool shutting_down; // this object.
own_t *owner;
// List of all objects owned by this socket. We are responsible
// for deallocating them before we quit.
typedef std::set <own_t*> owned_t;
owned_t owned;
// Number of events we have to get before we can destroy the object.
int term_acks;
owned_t (const owned_t&); own_t (const own_t&);
void operator = (const owned_t&); void operator = (const own_t&);
}; };
} }
......
...@@ -28,7 +28,8 @@ zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t slot_) : ...@@ -28,7 +28,8 @@ zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t slot_) :
inpipe (NULL), inpipe (NULL),
outpipe (NULL), outpipe (NULL),
inpipe_alive (false), inpipe_alive (false),
outpipe_alive (false) outpipe_alive (false),
terminating (false)
{ {
options.requires_in = true; options.requires_in = true;
options.requires_out = true; options.requires_out = true;
...@@ -43,6 +44,7 @@ zmq::pair_t::~pair_t () ...@@ -43,6 +44,7 @@ zmq::pair_t::~pair_t ()
void zmq::pair_t::xattach_pipes (class reader_t *inpipe_, void zmq::pair_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_) class writer_t *outpipe_, const blob_t &peer_identity_)
{ {
zmq_assert (!terminating);
zmq_assert (!inpipe && !outpipe); zmq_assert (!inpipe && !outpipe);
inpipe = inpipe_; inpipe = inpipe_;
...@@ -59,6 +61,9 @@ void zmq::pair_t::terminated (class reader_t *pipe_) ...@@ -59,6 +61,9 @@ void zmq::pair_t::terminated (class reader_t *pipe_)
zmq_assert (pipe_ == inpipe); zmq_assert (pipe_ == inpipe);
inpipe = NULL; inpipe = NULL;
inpipe_alive = false; inpipe_alive = false;
if (terminating)
unregister_term_ack ();
} }
void zmq::pair_t::terminated (class writer_t *pipe_) void zmq::pair_t::terminated (class writer_t *pipe_)
...@@ -66,19 +71,22 @@ void zmq::pair_t::terminated (class writer_t *pipe_) ...@@ -66,19 +71,22 @@ void zmq::pair_t::terminated (class writer_t *pipe_)
zmq_assert (pipe_ == outpipe); zmq_assert (pipe_ == outpipe);
outpipe = NULL; outpipe = NULL;
outpipe_alive = false; outpipe_alive = false;
}
void zmq::pair_t::xterm_pipes () if (terminating)
{ unregister_term_ack ();
if (inpipe)
inpipe->terminate ();
if (outpipe)
outpipe->terminate ();
} }
bool zmq::pair_t::xhas_pipes () void zmq::pair_t::process_term ()
{ {
return inpipe != NULL || outpipe != NULL; zmq_assert (inpipe && outpipe);
terminating = true;
register_term_acks (2);
inpipe->terminate ();
outpipe->terminate ();
socket_base_t::process_term ();
} }
void zmq::pair_t::activated (class reader_t *pipe_) void zmq::pair_t::activated (class reader_t *pipe_)
......
...@@ -39,8 +39,6 @@ namespace zmq ...@@ -39,8 +39,6 @@ namespace zmq
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_); const blob_t &peer_identity_);
void xterm_pipes ();
bool xhas_pipes ();
int xsend (zmq_msg_t *msg_, int flags_); int xsend (zmq_msg_t *msg_, int flags_);
int xrecv (zmq_msg_t *msg_, int flags_); int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
...@@ -56,12 +54,17 @@ namespace zmq ...@@ -56,12 +54,17 @@ namespace zmq
private: private:
// Hook into termination process.
void process_term ();
class reader_t *inpipe; class reader_t *inpipe;
class writer_t *outpipe; class writer_t *outpipe;
bool inpipe_alive; bool inpipe_alive;
bool outpipe_alive; bool outpipe_alive;
bool terminating;
pair_t (const pair_t&); pair_t (const pair_t&);
void operator = (const pair_t&); void operator = (const pair_t&);
}; };
......
...@@ -26,7 +26,8 @@ ...@@ -26,7 +26,8 @@
zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t slot_) : zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t slot_) :
socket_base_t (parent_, slot_), socket_base_t (parent_, slot_),
active (0) active (0),
terminating (false)
{ {
options.requires_in = false; options.requires_in = false;
options.requires_out = true; options.requires_out = true;
...@@ -40,6 +41,7 @@ zmq::pub_t::~pub_t () ...@@ -40,6 +41,7 @@ zmq::pub_t::~pub_t ()
void zmq::pub_t::xattach_pipes (class reader_t *inpipe_, void zmq::pub_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_) class writer_t *outpipe_, const blob_t &peer_identity_)
{ {
zmq_assert (!terminating);
zmq_assert (!inpipe_); zmq_assert (!inpipe_);
outpipe_->set_event_sink (this); outpipe_->set_event_sink (this);
...@@ -47,18 +49,26 @@ void zmq::pub_t::xattach_pipes (class reader_t *inpipe_, ...@@ -47,18 +49,26 @@ void zmq::pub_t::xattach_pipes (class reader_t *inpipe_,
pipes.push_back (outpipe_); pipes.push_back (outpipe_);
pipes.swap (active, pipes.size () - 1); pipes.swap (active, pipes.size () - 1);
active++; active++;
if (terminating) {
register_term_acks (1);
outpipe_->terminate ();
}
} }
void zmq::pub_t::xterm_pipes () void zmq::pub_t::process_term ()
{ {
terminating = true;
// Start shutdown process for all the pipes. // Start shutdown process for all the pipes.
for (pipes_t::size_type i = 0; i != pipes.size (); i++) for (pipes_t::size_type i = 0; i != pipes.size (); i++)
pipes [i]->terminate (); pipes [i]->terminate ();
}
bool zmq::pub_t::xhas_pipes () // Wait for pipes to terminate before terminating yourself.
{ register_term_acks (pipes.size ());
return !pipes.empty ();
// Continue with the termination immediately.
socket_base_t::process_term ();
} }
void zmq::pub_t::activated (writer_t *pipe_) void zmq::pub_t::activated (writer_t *pipe_)
...@@ -75,6 +85,10 @@ void zmq::pub_t::terminated (writer_t *pipe_) ...@@ -75,6 +85,10 @@ void zmq::pub_t::terminated (writer_t *pipe_)
if (pipes.index (pipe_) < active) if (pipes.index (pipe_) < active)
active--; active--;
pipes.erase (pipe_); pipes.erase (pipe_);
// If we are already terminating, wait for one term ack less.
if (terminating)
unregister_term_ack ();
} }
int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_) int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
......
...@@ -37,8 +37,6 @@ namespace zmq ...@@ -37,8 +37,6 @@ namespace zmq
// Implementations of virtual functions from socket_base_t. // Implementations of virtual functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_); const blob_t &peer_identity_);
void xterm_pipes ();
bool xhas_pipes ();
int xsend (zmq_msg_t *msg_, int flags_); int xsend (zmq_msg_t *msg_, int flags_);
bool xhas_out (); bool xhas_out ();
...@@ -48,6 +46,9 @@ namespace zmq ...@@ -48,6 +46,9 @@ namespace zmq
private: private:
// Hook into the termination process.
void process_term ();
// Write the message to the pipe. Make the pipe inactive if writing // Write the message to the pipe. Make the pipe inactive if writing
// fails. In such a case false is returned. // fails. In such a case false is returned.
bool write (class writer_t *pipe_, zmq_msg_t *msg_); bool write (class writer_t *pipe_, zmq_msg_t *msg_);
...@@ -60,6 +61,9 @@ namespace zmq ...@@ -60,6 +61,9 @@ namespace zmq
// beginning of the pipes array. // beginning of the pipes array.
pipes_t::size_type active; pipes_t::size_type active;
// True if termination process is already underway.
bool terminating;
pub_t (const pub_t&); pub_t (const pub_t&);
void operator = (const pub_t&); void operator = (const pub_t&);
}; };
......
...@@ -23,7 +23,8 @@ ...@@ -23,7 +23,8 @@
#include "err.hpp" #include "err.hpp"
zmq::pull_t::pull_t (class ctx_t *parent_, uint32_t slot_) : zmq::pull_t::pull_t (class ctx_t *parent_, uint32_t slot_) :
socket_base_t (parent_, slot_) socket_base_t (parent_, slot_),
fq (this)
{ {
options.requires_in = true; options.requires_in = true;
options.requires_out = false; options.requires_out = false;
...@@ -40,14 +41,17 @@ void zmq::pull_t::xattach_pipes (class reader_t *inpipe_, ...@@ -40,14 +41,17 @@ void zmq::pull_t::xattach_pipes (class reader_t *inpipe_,
fq.attach (inpipe_); fq.attach (inpipe_);
} }
void zmq::pull_t::xterm_pipes () void zmq::pull_t::process_term ()
{ {
fq.term_pipes (); register_term_acks (1);
fq.terminate ();
socket_base_t::process_term ();
} }
bool zmq::pull_t::xhas_pipes () void zmq::pull_t::terminated ()
{ {
return fq.has_pipes (); unregister_term_ack ();
} }
int zmq::pull_t::xrecv (zmq_msg_t *msg_, int flags_) int zmq::pull_t::xrecv (zmq_msg_t *msg_, int flags_)
......
...@@ -17,32 +17,39 @@ ...@@ -17,32 +17,39 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef __ZMQ_UPSTREAM_HPP_INCLUDED__ #ifndef __ZMQ_PULL_HPP_INCLUDED__
#define __ZMQ_UPSTREAM_HPP_INCLUDED__ #define __ZMQ_PULL_HPP_INCLUDED__
#include "i_terminate_events.hpp"
#include "socket_base.hpp" #include "socket_base.hpp"
#include "fq.hpp" #include "fq.hpp"
namespace zmq namespace zmq
{ {
class pull_t : public socket_base_t class pull_t : public socket_base_t, public i_terminate_events
{ {
public: public:
pull_t (class ctx_t *parent_, uint32_t slot_); pull_t (class ctx_t *parent_, uint32_t slot_);
~pull_t (); ~pull_t ();
protected:
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_); const blob_t &peer_identity_);
void xterm_pipes ();
bool xhas_pipes ();
int xrecv (zmq_msg_t *msg_, int flags_); int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
private: private:
// i_terminate_events interface implementation.
void terminated ();
// Hook into the termination process.
void process_term ();
// Fair queueing object for inbound pipes. // Fair queueing object for inbound pipes.
fq_t fq; fq_t fq;
......
...@@ -24,7 +24,8 @@ ...@@ -24,7 +24,8 @@
#include "pipe.hpp" #include "pipe.hpp"
zmq::push_t::push_t (class ctx_t *parent_, uint32_t slot_) : zmq::push_t::push_t (class ctx_t *parent_, uint32_t slot_) :
socket_base_t (parent_, slot_) socket_base_t (parent_, slot_),
lb (this)
{ {
options.requires_in = false; options.requires_in = false;
options.requires_out = true; options.requires_out = true;
...@@ -41,14 +42,17 @@ void zmq::push_t::xattach_pipes (class reader_t *inpipe_, ...@@ -41,14 +42,17 @@ void zmq::push_t::xattach_pipes (class reader_t *inpipe_,
lb.attach (outpipe_); lb.attach (outpipe_);
} }
void zmq::push_t::xterm_pipes () void zmq::push_t::process_term ()
{ {
lb.term_pipes (); register_term_acks (1);
lb.terminate ();
socket_base_t::process_term ();
} }
bool zmq::push_t::xhas_pipes () void zmq::push_t::terminated ()
{ {
return lb.has_pipes (); unregister_term_ack ();
} }
int zmq::push_t::xsend (zmq_msg_t *msg_, int flags_) int zmq::push_t::xsend (zmq_msg_t *msg_, int flags_)
......
...@@ -17,32 +17,39 @@ ...@@ -17,32 +17,39 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef __ZMQ_DOWNSTREAM_HPP_INCLUDED__ #ifndef __ZMQ_PUSH_HPP_INCLUDED__
#define __ZMQ_DOWNSTREAM_HPP_INCLUDED__ #define __ZMQ_PUSH_HPP_INCLUDED__
#include "i_terminate_events.hpp"
#include "socket_base.hpp" #include "socket_base.hpp"
#include "lb.hpp" #include "lb.hpp"
namespace zmq namespace zmq
{ {
class push_t : public socket_base_t class push_t : public socket_base_t, public i_terminate_events
{ {
public: public:
push_t (class ctx_t *parent_, uint32_t slot_); push_t (class ctx_t *parent_, uint32_t slot_);
~push_t (); ~push_t ();
protected:
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_); const blob_t &peer_identity_);
void xterm_pipes ();
bool xhas_pipes ();
int xsend (zmq_msg_t *msg_, int flags_); int xsend (zmq_msg_t *msg_, int flags_);
bool xhas_out (); bool xhas_out ();
private: private:
// i_terminate_events interface implementation.
void terminated ();
// Hook into the termination process.
void process_term ();
// Load balancer managing the outbound pipes. // Load balancer managing the outbound pipes.
lb_t lb; lb_t lb;
......
...@@ -20,47 +20,25 @@ ...@@ -20,47 +20,25 @@
#include <new> #include <new>
#include "session.hpp" #include "session.hpp"
#include "socket_base.hpp"
#include "i_engine.hpp" #include "i_engine.hpp"
#include "err.hpp" #include "err.hpp"
#include "pipe.hpp" #include "pipe.hpp"
zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, zmq::session_t::session_t (class io_thread_t *io_thread_,
const options_t &options_) : class socket_base_t *socket_, const options_t &options_) :
owned_t (parent_, owner_), own_t (io_thread_),
options (options_),
in_pipe (NULL), in_pipe (NULL),
incomplete_in (false), incomplete_in (false),
active (true), active (true),
out_pipe (NULL), out_pipe (NULL),
engine (NULL), engine (NULL),
options (options_) socket (socket_),
io_thread (io_thread_),
attach_processed (false),
term_processed (false)
{ {
// It's possible to register the session at this point as it will be
// searched for only on reconnect, i.e. no race condition (session found
// before it is plugged into it's I/O thread) is possible.
ordinal = owner->register_session (this);
}
zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
const options_t &options_, const blob_t &peer_identity_) :
owned_t (parent_, owner_),
in_pipe (NULL),
incomplete_in (false),
active (true),
out_pipe (NULL),
engine (NULL),
ordinal (0),
peer_identity (peer_identity_),
options (options_)
{
if (!peer_identity.empty () && peer_identity [0] != 0) {
if (!owner->register_session (peer_identity, this)) {
// TODO: There's already a session with the specified
// identity. We should presumably syslog it and drop the
// session.
zmq_assert (false);
}
}
} }
zmq::session_t::~session_t () zmq::session_t::~session_t ()
...@@ -69,10 +47,10 @@ zmq::session_t::~session_t () ...@@ -69,10 +47,10 @@ zmq::session_t::~session_t ()
zmq_assert (!out_pipe); zmq_assert (!out_pipe);
} }
bool zmq::session_t::is_terminable () void zmq::session_t::terminate ()
{ {
// The session won't send term_ack until both in & out pipe are closed. // TODO:
return !in_pipe && !out_pipe; zmq_assert (false);
} }
bool zmq::session_t::read (::zmq_msg_t *msg_) bool zmq::session_t::read (::zmq_msg_t *msg_)
...@@ -105,17 +83,8 @@ void zmq::session_t::flush () ...@@ -105,17 +83,8 @@ void zmq::session_t::flush ()
out_pipe->flush (); out_pipe->flush ();
} }
void zmq::session_t::detach (owned_t *reconnecter_) void zmq::session_t::clean_pipes ()
{ {
// Plug in the reconnecter object if any.
if (reconnecter_) {
send_plug (reconnecter_);
send_own (owner, reconnecter_);
}
// Engine is terminating itself. No need to deallocate it from here.
engine = NULL;
// Get rid of half-processed messages in the out pipe. Flush any // Get rid of half-processed messages in the out pipe. Flush any
// unflushed messages upstream. // unflushed messages upstream.
if (out_pipe) { if (out_pipe) {
...@@ -135,26 +104,6 @@ void zmq::session_t::detach (owned_t *reconnecter_) ...@@ -135,26 +104,6 @@ void zmq::session_t::detach (owned_t *reconnecter_)
zmq_msg_close (&msg); zmq_msg_close (&msg);
} }
} }
// Terminate transient session.
if (!ordinal && (peer_identity.empty () || peer_identity [0] == 0))
term ();
}
zmq::io_thread_t *zmq::session_t::get_io_thread ()
{
return choose_io_thread (options.affinity);
}
class zmq::socket_base_t *zmq::session_t::get_owner ()
{
return owner;
}
uint64_t zmq::session_t::get_ordinal ()
{
zmq_assert (ordinal);
return ordinal;
} }
void zmq::session_t::attach_pipes (class reader_t *inpipe_, void zmq::session_t::attach_pipes (class reader_t *inpipe_,
...@@ -172,6 +121,9 @@ void zmq::session_t::attach_pipes (class reader_t *inpipe_, ...@@ -172,6 +121,9 @@ void zmq::session_t::attach_pipes (class reader_t *inpipe_,
out_pipe = outpipe_; out_pipe = outpipe_;
out_pipe->set_event_sink (this); out_pipe->set_event_sink (this);
} }
attach_processed = true;
finalise ();
} }
void zmq::session_t::terminated (reader_t *pipe_) void zmq::session_t::terminated (reader_t *pipe_)
...@@ -192,14 +144,14 @@ void zmq::session_t::activated (reader_t *pipe_) ...@@ -192,14 +144,14 @@ void zmq::session_t::activated (reader_t *pipe_)
zmq_assert (in_pipe == pipe_); zmq_assert (in_pipe == pipe_);
active = true; active = true;
if (engine) if (engine)
engine->revive (); engine->activate_out ();
} }
void zmq::session_t::activated (writer_t *pipe_) void zmq::session_t::activated (writer_t *pipe_)
{ {
zmq_assert (out_pipe == pipe_); zmq_assert (out_pipe == pipe_);
if (engine) if (engine)
engine->resume_input (); engine->activate_in ();
} }
void zmq::session_t::process_plug () void zmq::session_t::process_plug ()
...@@ -214,10 +166,9 @@ void zmq::session_t::process_unplug () ...@@ -214,10 +166,9 @@ void zmq::session_t::process_unplug ()
// there may be some commands being sent to the session right now. // there may be some commands being sent to the session right now.
// Unregister the session from the socket. // Unregister the session from the socket.
if (ordinal) // if (!peer_identity.empty () && peer_identity [0] != 0)
owner->unregister_session (ordinal); // unregister_session (peer_identity);
else if (!peer_identity.empty () && peer_identity [0] != 0) // TODO: Should be done in named session.
owner->unregister_session (peer_identity);
// Ask associated pipes to terminate. // Ask associated pipes to terminate.
if (in_pipe) if (in_pipe)
...@@ -232,63 +183,65 @@ void zmq::session_t::process_unplug () ...@@ -232,63 +183,65 @@ void zmq::session_t::process_unplug ()
} }
} }
void zmq::session_t::finalise ()
{
// If all conditions are met, proceed with termination:
// 1. Owner object already asked us to terminate.
// 2. The pipes were already attached to the session.
// 3. Both pipes have already terminated. Note that inbound pipe
// is terminated after delimiter is read, i.e. all messages
// were already sent to the wire.
if (term_processed && attach_processed && !in_pipe && !out_pipe)
own_t::process_term ();
}
void zmq::session_t::process_attach (i_engine *engine_, void zmq::session_t::process_attach (i_engine *engine_,
const blob_t &peer_identity_) const blob_t &peer_identity_)
{ {
if (!peer_identity.empty ()) {
// If both IDs are temporary, no checking is needed.
// TODO: Old ID should be reused in this case...
if (peer_identity.empty () || peer_identity [0] != 0 ||
peer_identity_.empty () || peer_identity_ [0] != 0) {
// If we already know the peer name do nothing, just check whether
// it haven't changed.
zmq_assert (peer_identity == peer_identity_);
}
}
else if (!peer_identity_.empty ()) {
// Store the peer identity.
peer_identity = peer_identity_;
// If the session is not registered with the ordinal, let's register
// it using the peer name.
if (!ordinal) {
if (!owner->register_session (peer_identity, this)) {
// TODO: There's already a session with the specified
// identity. We should presumably syslog it and drop the
// session.
zmq_assert (false);
}
}
}
// Check whether the required pipes already exist. If not so, we'll // Check whether the required pipes already exist. If not so, we'll
// create them and bind them to the socket object. // create them and bind them to the socket object.
reader_t *socket_reader = NULL; reader_t *socket_reader = NULL;
writer_t *socket_writer = NULL; writer_t *socket_writer = NULL;
if (options.requires_in && !out_pipe) { if (options.requires_in && !out_pipe) {
create_pipe (owner, this, options.hwm, options.swap, &socket_reader, create_pipe (socket, this, options.hwm, options.swap, &socket_reader,
&out_pipe); &out_pipe);
out_pipe->set_event_sink (this); out_pipe->set_event_sink (this);
} }
if (options.requires_out && !in_pipe) { if (options.requires_out && !in_pipe) {
create_pipe (this, owner, options.hwm, options.swap, &in_pipe, create_pipe (this, socket, options.hwm, options.swap, &in_pipe,
&socket_writer); &socket_writer);
in_pipe->set_event_sink (this); in_pipe->set_event_sink (this);
} }
if (socket_reader || socket_writer) if (socket_reader || socket_writer)
send_bind (owner, socket_reader, socket_writer, peer_identity); send_bind (socket, socket_reader, socket_writer, peer_identity);
// Plug in the engine. // Plug in the engine.
zmq_assert (!engine); zmq_assert (!engine);
zmq_assert (engine_); zmq_assert (engine_);
engine = engine_; engine = engine_;
engine->plug (this); engine->plug (io_thread, this);
// Trigger the notfication about the attachment.
attached (peer_identity_);
}
void zmq::session_t::process_term ()
{
// Here we are pugging into the own_t's termination mechanism.
// The goal is to postpone the termination till all the pending messages
// are sent to the peer.
term_processed = true;
finalise ();
}
void zmq::session_t::attached (const blob_t &peer_identity_)
{
}
void zmq::session_t::detached ()
{
} }
...@@ -20,8 +20,8 @@ ...@@ -20,8 +20,8 @@
#ifndef __ZMQ_SESSION_HPP_INCLUDED__ #ifndef __ZMQ_SESSION_HPP_INCLUDED__
#define __ZMQ_SESSION_HPP_INCLUDED__ #define __ZMQ_SESSION_HPP_INCLUDED__
#include "own.hpp"
#include "i_inout.hpp" #include "i_inout.hpp"
#include "owned.hpp"
#include "options.hpp" #include "options.hpp"
#include "blob.hpp" #include "blob.hpp"
#include "pipe.hpp" #include "pipe.hpp"
...@@ -30,29 +30,22 @@ namespace zmq ...@@ -30,29 +30,22 @@ namespace zmq
{ {
class session_t : class session_t :
public owned_t, public own_t,
public i_inout, public i_inout,
public i_reader_events, public i_reader_events,
public i_writer_events public i_writer_events
{ {
public: public:
// Creates unnamed session. session_t (class io_thread_t *io_thread_,
session_t (object_t *parent_, socket_base_t *owner_, class socket_base_t *socket_, const options_t &options_);
const options_t &options_);
// Creates named session. // i_inout interface implementation. Note that detach method is not
session_t (object_t *parent_, socket_base_t *owner_, // implemented by generic session. Different session types may handle
const options_t &options_, const blob_t &peer_identity_); // engine disconnection in different ways.
// i_inout interface implementation.
bool read (::zmq_msg_t *msg_); bool read (::zmq_msg_t *msg_);
bool write (::zmq_msg_t *msg_); bool write (::zmq_msg_t *msg_);
void flush (); void flush ();
void detach (owned_t *reconnecter_);
class io_thread_t *get_io_thread ();
class socket_base_t *get_owner ();
uint64_t get_ordinal ();
void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_); const blob_t &peer_identity_);
...@@ -65,19 +58,40 @@ namespace zmq ...@@ -65,19 +58,40 @@ namespace zmq
void activated (class writer_t *pipe_); void activated (class writer_t *pipe_);
void terminated (class writer_t *pipe_); void terminated (class writer_t *pipe_);
private: protected:
// Forcefully close this session (without sending
// outbound messages to the wire).
void terminate ();
// Two events for the derived session type. Attached is triggered
// when session is attached to a peer, detached is triggered at the
// beginning of the termination process when session is about to
// be detached from the peer.
virtual void attached (const blob_t &peer_identity_);
virtual void detached ();
~session_t (); ~session_t ();
// Define the delayed termination. (I.e. termination is postponed // Remove any half processed messages. Flush unflushed messages.
// till all the data is flushed to the kernel.) // Call this function when engine disconnect to get rid of leftovers.
bool is_terminable (); void clean_pipes ();
// Inherited socket options. These are visible to all session classes.
options_t options;
private:
// Handlers for incoming commands. // Handlers for incoming commands.
void process_plug (); void process_plug ();
void process_unplug (); void process_unplug ();
void process_attach (struct i_engine *engine_, void process_attach (struct i_engine *engine_,
const blob_t &peer_identity_); const blob_t &peer_identity_);
void process_term ();
// Check whether object is ready for termination. If so proceed
// with closing child objects.
void finalise ();
// Inbound pipe, i.e. one the session is getting messages from. // Inbound pipe, i.e. one the session is getting messages from.
class reader_t *in_pipe; class reader_t *in_pipe;
...@@ -92,18 +106,25 @@ namespace zmq ...@@ -92,18 +106,25 @@ namespace zmq
// Outbound pipe, i.e. one the socket is sending messages to. // Outbound pipe, i.e. one the socket is sending messages to.
class writer_t *out_pipe; class writer_t *out_pipe;
// The protocol I/O engine connected to the session.
struct i_engine *engine; struct i_engine *engine;
// Session is identified by ordinal in the case when it was created // Identity of the peer (say the component on the other side
// before connection to the peer was established and thus we are // of TCP connection).
// unaware of peer's identity.
uint64_t ordinal;
// Identity of the peer.
blob_t peer_identity; blob_t peer_identity;
// Inherited socket options. // The socket the session belongs to.
options_t options; class socket_base_t *socket;
// I/O thread the session is living in. It will be used to plug in
// the engines into the same thread.
class io_thread_t *io_thread;
// True if pipes were already attached.
bool attach_processed;
// True if term command was already processed.
bool term_processed;
session_t (const session_t&); session_t (const session_t&);
void operator = (const session_t&); void operator = (const session_t&);
......
This diff is collapsed.
...@@ -20,13 +20,12 @@ ...@@ -20,13 +20,12 @@
#ifndef __ZMQ_SOCKET_BASE_HPP_INCLUDED__ #ifndef __ZMQ_SOCKET_BASE_HPP_INCLUDED__
#define __ZMQ_SOCKET_BASE_HPP_INCLUDED__ #define __ZMQ_SOCKET_BASE_HPP_INCLUDED__
#include <set>
#include <map> #include <map>
#include <vector> #include <vector>
#include "../include/zmq.h" #include "../include/zmq.h"
#include "object.hpp" #include "own.hpp"
#include "yarray_item.hpp" #include "yarray_item.hpp"
#include "mutex.hpp" #include "mutex.hpp"
#include "options.hpp" #include "options.hpp"
...@@ -35,12 +34,13 @@ ...@@ -35,12 +34,13 @@
#include "signaler.hpp" #include "signaler.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "blob.hpp" #include "blob.hpp"
#include "own.hpp"
namespace zmq namespace zmq
{ {
class socket_base_t : class socket_base_t :
public object_t, public own_t,
public yarray_item_t public yarray_item_t
{ {
public: public:
...@@ -65,30 +65,16 @@ namespace zmq ...@@ -65,30 +65,16 @@ namespace zmq
int recv (zmq_msg_t *msg_, int flags_); int recv (zmq_msg_t *msg_, int flags_);
int close (); int close ();
// When another owned object wants to send command to this object
// it calls this function to let it know it should not shut down
// before the command is delivered.
void inc_seqnum ();
// These functions are used by the polling mechanism to determine // These functions are used by the polling mechanism to determine
// which events are to be reported from this socket. // which events are to be reported from this socket.
bool has_in (); bool has_in ();
bool has_out (); bool has_out ();
// The list of sessions cannot be accessed via inter-thread // Registry of named sessions.
// commands as it is unacceptable to wait for the completion of the
// action till user application yields control of the application
// thread to 0MQ. Locking is used instead.
// There are two distinct types of sessions: those identified by name
// and those identified by ordinal number. Thus two sets of session
// management functions.
bool register_session (const blob_t &peer_identity_, bool register_session (const blob_t &peer_identity_,
class session_t *session_); class session_t *session_);
void unregister_session (const blob_t &peer_identity_); void unregister_session (const blob_t &peer_identity_);
class session_t *find_session (const blob_t &peer_identity_); class session_t *find_session (const blob_t &peer_identity_);
uint64_t register_session (class session_t *session_);
void unregister_session (uint64_t ordinal_);
class session_t *find_session (uint64_t ordinal_);
// i_reader_events interface implementation. // i_reader_events interface implementation.
void activated (class reader_t *pipe_); void activated (class reader_t *pipe_);
...@@ -99,7 +85,7 @@ namespace zmq ...@@ -99,7 +85,7 @@ namespace zmq
void terminated (class writer_t *pipe_); void terminated (class writer_t *pipe_);
// This function should be called only on zombie sockets. It tries // This function should be called only on zombie sockets. It tries
// to deallocate the zombie and returns true is successful. // to deallocate the zombie. Returns true if zombie is finally dead.
bool dezombify (); bool dezombify ();
protected: protected:
...@@ -109,11 +95,8 @@ namespace zmq ...@@ -109,11 +95,8 @@ namespace zmq
// Concrete algorithms for the x- methods are to be defined by // Concrete algorithms for the x- methods are to be defined by
// individual socket types. // individual socket types.
virtual void xattach_pipes (class reader_t *inpipe_, virtual void xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_) = 0; class writer_t *outpipe_, const blob_t &peer_identity_) = 0;
virtual void xterm_pipes () = 0;
virtual bool xhas_pipes () = 0;
// The default implementation assumes there are no specific socket // The default implementation assumes there are no specific socket
// options for the particular socket type. If not so, overload this // options for the particular socket type. If not so, overload this
...@@ -132,12 +115,22 @@ namespace zmq ...@@ -132,12 +115,22 @@ namespace zmq
// Socket options. // Socket options.
options_t options; options_t options;
// We are declaring termination handler as protected so that
// individual socket types can hook into the termination process
// by overloading it.
void process_term ();
private:
// TODO: Check whether we still need this flag...
// If true, socket was already closed but not yet deallocated // If true, socket was already closed but not yet deallocated
// because either shutdown is in process or there are still pipes // because either shutdown is in process or there are still pipes
// attached to the socket. // attached to the socket.
bool zombie; bool zombie;
private: // Check whether transport protocol, as specified in connect or
// bind, is available and compatible with the socket type.
int check_protocol (const std::string &protocol_);
// If no identity set generate one and call xattach_pipes (). // If no identity set generate one and call xattach_pipes ().
void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
...@@ -151,12 +144,9 @@ namespace zmq ...@@ -151,12 +144,9 @@ namespace zmq
// Handlers for incoming commands. // Handlers for incoming commands.
void process_stop (); void process_stop ();
void process_own (class owned_t *object_);
void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_, void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_,
const blob_t &peer_identity_); const blob_t &peer_identity_);
void process_term_req (class owned_t *object_); void process_unplug ();
void process_term_ack ();
void process_seqnum ();
// App thread's signaler object. // App thread's signaler object.
signaler_t signaler; signaler_t signaler;
...@@ -164,36 +154,18 @@ namespace zmq ...@@ -164,36 +154,18 @@ namespace zmq
// Timestamp of when commands were processed the last time. // Timestamp of when commands were processed the last time.
uint64_t last_processing_time; uint64_t last_processing_time;
// List of all I/O objects owned by this socket. The socket is
// responsible for deallocating them before it quits.
typedef std::set <class owned_t*> io_objects_t;
io_objects_t io_objects;
// Number of I/O objects that were already asked to terminate
// but haven't acknowledged it yet.
int pending_term_acks;
// Number of messages received since last command processing. // Number of messages received since last command processing.
int ticks; int ticks;
// If true there's a half-read message in the socket. // If true there's a half-read message in the socket.
bool rcvmore; bool rcvmore;
// Sequence number of the last command sent to this object.
atomic_counter_t sent_seqnum;
// Sequence number of the last command processed by this object.
uint64_t processed_seqnum;
// Lists of existing sessions. This lists are never referenced from // Lists of existing sessions. This lists are never referenced from
// within the socket, instead they are used by I/O objects owned by // within the socket, instead they are used by objects owned by
// the socket. As those objects can live in different threads, // the socket. As those objects can live in different threads,
// the access is synchronised by mutex. // the access is synchronised by mutex.
typedef std::map <blob_t, session_t*> named_sessions_t; typedef std::map <blob_t, session_t*> sessions_t;
named_sessions_t named_sessions; sessions_t sessions;
typedef std::map <uint64_t, session_t*> unnamed_sessions_t;
unnamed_sessions_t unnamed_sessions;
uint64_t next_ordinal;
mutex_t sessions_sync; mutex_t sessions_sync;
socket_base_t (const socket_base_t&); socket_base_t (const socket_base_t&);
......
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
zmq::sub_t::sub_t (class ctx_t *parent_, uint32_t slot_) : zmq::sub_t::sub_t (class ctx_t *parent_, uint32_t slot_) :
socket_base_t (parent_, slot_), socket_base_t (parent_, slot_),
fq (this),
has_message (false), has_message (false),
more (false) more (false)
{ {
...@@ -46,14 +47,17 @@ void zmq::sub_t::xattach_pipes (class reader_t *inpipe_, ...@@ -46,14 +47,17 @@ void zmq::sub_t::xattach_pipes (class reader_t *inpipe_,
fq.attach (inpipe_); fq.attach (inpipe_);
} }
void zmq::sub_t::xterm_pipes () void zmq::sub_t::process_term ()
{ {
fq.term_pipes (); register_term_acks (1);
fq.terminate ();
socket_base_t::process_term ();
} }
bool zmq::sub_t::xhas_pipes () void zmq::sub_t::terminated ()
{ {
return fq.has_pipes (); unregister_term_ack ();
} }
int zmq::sub_t::xsetsockopt (int option_, const void *optval_, int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
......
...@@ -24,12 +24,13 @@ ...@@ -24,12 +24,13 @@
#include "prefix_tree.hpp" #include "prefix_tree.hpp"
#include "socket_base.hpp" #include "socket_base.hpp"
#include "i_terminate_events.hpp"
#include "fq.hpp" #include "fq.hpp"
namespace zmq namespace zmq
{ {
class sub_t : public socket_base_t class sub_t : public socket_base_t, public i_terminate_events
{ {
public: public:
...@@ -41,14 +42,18 @@ namespace zmq ...@@ -41,14 +42,18 @@ namespace zmq
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_); const blob_t &peer_identity_);
void xterm_pipes ();
bool xhas_pipes ();
int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xrecv (zmq_msg_t *msg_, int flags_); int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
private: private:
// i_terminate_events interface implementation.
void terminated ();
// Hook into the termination process.
void process_term ();
// Check whether the message matches at least one subscription. // Check whether the message matches at least one subscription.
bool match (zmq_msg_t *msg_); bool match (zmq_msg_t *msg_);
......
...@@ -17,61 +17,20 @@ ...@@ -17,61 +17,20 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "owned.hpp" #include "transient_session.hpp"
#include "err.hpp"
zmq::owned_t::owned_t (object_t *parent_, socket_base_t *owner_) : zmq::transient_session_t::transient_session_t (class io_thread_t *io_thread_,
object_t (parent_), class socket_base_t *socket_, const options_t &options_) :
owner (owner_), session_t (io_thread_, socket_, options_)
sent_seqnum (0),
processed_seqnum (0),
shutting_down (false)
{ {
} }
zmq::owned_t::~owned_t () zmq::transient_session_t::~transient_session_t ()
{ {
} }
void zmq::owned_t::inc_seqnum () void zmq::transient_session_t::detach ()
{ {
// This function may be called from a different thread! // There's no way to reestablish a transient session. Tear it down.
sent_seqnum.add (1); terminate ();
} }
void zmq::owned_t::term ()
{
send_term_req (owner, this);
}
void zmq::owned_t::process_term ()
{
zmq_assert (!shutting_down);
shutting_down = true;
finalise ();
}
void zmq::owned_t::process_seqnum ()
{
// Catch up with counter of processed commands.
processed_seqnum++;
finalise ();
}
void zmq::owned_t::finalise ()
{
// If termination request was already received and there are no more
// commands to wait for, terminate the object.
if (shutting_down && processed_seqnum == sent_seqnum.get ()
&& is_terminable ()) {
process_unplug ();
send_term_ack (owner);
delete this;
}
}
bool zmq::owned_t::is_terminable ()
{
return true;
}
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_TRANSIENT_SESSION_HPP_INCLUDED__
#define __ZMQ_TRANSIENT_SESSION_HPP_INCLUDED__
#include "session.hpp"
namespace zmq
{
// Transient session is created by the listener when the connected peer
// stays anonymous. Transient session is destroyed on disconnect.
class transient_session_t : public session_t
{
public:
transient_session_t (class io_thread_t *io_thread_,
class socket_base_t *socket_, const options_t &options_);
~transient_session_t ();
// i_inout interface implementation.
void detach ();
};
}
#endif
...@@ -29,7 +29,8 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t slot_) : ...@@ -29,7 +29,8 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t slot_) :
prefetched (false), prefetched (false),
more_in (false), more_in (false),
current_out (NULL), current_out (NULL),
more_out (false) more_out (false),
terminating (false)
{ {
options.requires_in = true; options.requires_in = true;
options.requires_out = true; options.requires_out = true;
...@@ -62,16 +63,27 @@ void zmq::xrep_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_, ...@@ -62,16 +63,27 @@ void zmq::xrep_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_,
inpipe_t inpipe = {inpipe_, peer_identity_, true}; inpipe_t inpipe = {inpipe_, peer_identity_, true};
inpipes.push_back (inpipe); inpipes.push_back (inpipe);
if (terminating) {
register_term_acks (1);
inpipe_->terminate ();
}
} }
void zmq::xrep_t::xterm_pipes () void zmq::xrep_t::process_term ()
{ {
terminating = true;
register_term_acks (inpipes.size () + outpipes.size ());
for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
it++) it++)
it->reader->terminate (); it->reader->terminate ();
for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end (); for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end ();
it++) it++)
it->second.writer->terminate (); it->second.writer->terminate ();
socket_base_t::process_term ();
} }
void zmq::xrep_t::terminated (reader_t *pipe_) void zmq::xrep_t::terminated (reader_t *pipe_)
...@@ -80,6 +92,8 @@ void zmq::xrep_t::terminated (reader_t *pipe_) ...@@ -80,6 +92,8 @@ void zmq::xrep_t::terminated (reader_t *pipe_)
it++) { it++) {
if (it->reader == pipe_) { if (it->reader == pipe_) {
inpipes.erase (it); inpipes.erase (it);
if (terminating)
unregister_term_ack ();
return; return;
} }
} }
...@@ -94,17 +108,14 @@ void zmq::xrep_t::terminated (writer_t *pipe_) ...@@ -94,17 +108,14 @@ void zmq::xrep_t::terminated (writer_t *pipe_)
outpipes.erase (it); outpipes.erase (it);
if (pipe_ == current_out) if (pipe_ == current_out)
current_out = NULL; current_out = NULL;
if (terminating)
unregister_term_ack ();
return; return;
} }
} }
zmq_assert (false); zmq_assert (false);
} }
bool zmq::xrep_t::xhas_pipes ()
{
return !inpipes.empty () || !outpipes.empty ();
}
void zmq::xrep_t::activated (reader_t *pipe_) void zmq::xrep_t::activated (reader_t *pipe_)
{ {
for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
......
...@@ -44,13 +44,16 @@ namespace zmq ...@@ -44,13 +44,16 @@ namespace zmq
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipes (reader_t *inpipe_, writer_t *outpipe_, void xattach_pipes (reader_t *inpipe_, writer_t *outpipe_,
const blob_t &peer_identity_); const blob_t &peer_identity_);
void xterm_pipes ();
bool xhas_pipes ();
int xsend (zmq_msg_t *msg_, int flags_); int xsend (zmq_msg_t *msg_, int flags_);
int xrecv (zmq_msg_t *msg_, int flags_); int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
private:
// Hook into the termination process.
void process_term ();
// i_reader_events interface implementation. // i_reader_events interface implementation.
void activated (reader_t *pipe_); void activated (reader_t *pipe_);
void terminated (reader_t *pipe_); void terminated (reader_t *pipe_);
...@@ -59,8 +62,6 @@ namespace zmq ...@@ -59,8 +62,6 @@ namespace zmq
void activated (writer_t *pipe_); void activated (writer_t *pipe_);
void terminated (writer_t *pipe_); void terminated (writer_t *pipe_);
private:
struct inpipe_t struct inpipe_t
{ {
class reader_t *reader; class reader_t *reader;
...@@ -100,6 +101,9 @@ namespace zmq ...@@ -100,6 +101,9 @@ namespace zmq
// If true, more outgoing message parts are expected. // If true, more outgoing message parts are expected.
bool more_out; bool more_out;
// If true, termination process is already underway.
bool terminating;
xrep_t (const xrep_t&); xrep_t (const xrep_t&);
void operator = (const xrep_t&); void operator = (const xrep_t&);
}; };
......
...@@ -23,7 +23,9 @@ ...@@ -23,7 +23,9 @@
#include "err.hpp" #include "err.hpp"
zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t slot_) : zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t slot_) :
socket_base_t (parent_, slot_) socket_base_t (parent_, slot_),
fq (this),
lb (this)
{ {
options.requires_in = true; options.requires_in = true;
options.requires_out = true; options.requires_out = true;
...@@ -41,15 +43,18 @@ void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_, ...@@ -41,15 +43,18 @@ void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_,
lb.attach (outpipe_); lb.attach (outpipe_);
} }
void zmq::xreq_t::xterm_pipes () void zmq::xreq_t::process_term ()
{ {
fq.term_pipes (); register_term_acks (2);
lb.term_pipes (); fq.terminate ();
lb.terminate ();
socket_base_t::process_term ();
} }
bool zmq::xreq_t::xhas_pipes () void zmq::xreq_t::terminated ()
{ {
return fq.has_pipes () || lb.has_pipes (); unregister_term_ack ();
} }
int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_) int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_)
......
...@@ -21,24 +21,25 @@ ...@@ -21,24 +21,25 @@
#define __ZMQ_XREQ_HPP_INCLUDED__ #define __ZMQ_XREQ_HPP_INCLUDED__
#include "socket_base.hpp" #include "socket_base.hpp"
#include "i_terminate_events.hpp"
#include "fq.hpp" #include "fq.hpp"
#include "lb.hpp" #include "lb.hpp"
namespace zmq namespace zmq
{ {
class xreq_t : public socket_base_t class xreq_t : public socket_base_t, public i_terminate_events
{ {
public: public:
xreq_t (class ctx_t *parent_, uint32_t slot_); xreq_t (class ctx_t *parent_, uint32_t slot_);
~xreq_t (); ~xreq_t ();
protected:
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_); const blob_t &peer_identity_);
void xterm_pipes ();
bool xhas_pipes ();
int xsend (zmq_msg_t *msg_, int flags_); int xsend (zmq_msg_t *msg_, int flags_);
int xrecv (zmq_msg_t *msg_, int flags_); int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
...@@ -46,6 +47,12 @@ namespace zmq ...@@ -46,6 +47,12 @@ namespace zmq
private: private:
// i_terminate_events interface implementation.
void terminated ();
// Hook into the termination process.
void process_term ();
// Messages are fair-queued from inbound pipes. And load-balanced to // Messages are fair-queued from inbound pipes. And load-balanced to
// the outbound pipes. // the outbound pipes.
fq_t fq; fq_t fq;
......
...@@ -25,33 +25,24 @@ ...@@ -25,33 +25,24 @@
#include "io_thread.hpp" #include "io_thread.hpp"
#include "err.hpp" #include "err.hpp"
zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_, zmq::zmq_connecter_t::zmq_connecter_t (class io_thread_t *io_thread_,
socket_base_t *owner_, const options_t &options_, class session_t *session_, const options_t &options_,
uint64_t session_ordinal_, bool wait_) : const char *protocol_, const char *address_) :
owned_t (parent_, owner_), own_t (io_thread_),
io_object_t (parent_), io_object_t (io_thread_),
handle_valid (false), handle_valid (false),
wait (wait_), wait (false),
session_ordinal (session_ordinal_), session (session_),
options (options_) options (options_)
{ {
int rc = tcp_connecter.set_address (protocol_, address_);
zmq_assert (rc == 0);
} }
zmq::zmq_connecter_t::~zmq_connecter_t () zmq::zmq_connecter_t::~zmq_connecter_t ()
{ {
} }
int zmq::zmq_connecter_t::set_address (const char *protocol_,
const char *address_)
{
int rc = tcp_connecter.set_address (protocol_, address_);
if (rc != 0)
return rc;
protocol = protocol_;
address = address_;
return 0;
}
void zmq::zmq_connecter_t::process_plug () void zmq::zmq_connecter_t::process_plug ()
{ {
if (wait) if (wait)
...@@ -92,15 +83,12 @@ void zmq::zmq_connecter_t::out_event () ...@@ -92,15 +83,12 @@ void zmq::zmq_connecter_t::out_event ()
// Create an init object. // Create an init object.
zmq_init_t *init = new (std::nothrow) zmq_init_t ( zmq_init_t *init = new (std::nothrow) zmq_init_t (
choose_io_thread (options.affinity), owner, choose_io_thread (options.affinity), NULL, session, fd, options);
fd, options, true, protocol.c_str (), address.c_str (),
session_ordinal);
zmq_assert (init); zmq_assert (init);
send_plug (init); launch_sibling (init);
send_own (owner, init);
// Ask owner socket to shut the connecter down. // Shut the connecter down.
term (); terminate ();
} }
void zmq::zmq_connecter_t::timer_event () void zmq::zmq_connecter_t::timer_event ()
......
...@@ -20,9 +20,7 @@ ...@@ -20,9 +20,7 @@
#ifndef __ZMQ_ZMQ_CONNECTER_HPP_INCLUDED__ #ifndef __ZMQ_ZMQ_CONNECTER_HPP_INCLUDED__
#define __ZMQ_ZMQ_CONNECTER_HPP_INCLUDED__ #define __ZMQ_ZMQ_CONNECTER_HPP_INCLUDED__
#include <string> #include "own.hpp"
#include "owned.hpp"
#include "io_object.hpp" #include "io_object.hpp"
#include "tcp_connecter.hpp" #include "tcp_connecter.hpp"
#include "options.hpp" #include "options.hpp"
...@@ -31,17 +29,17 @@ ...@@ -31,17 +29,17 @@
namespace zmq namespace zmq
{ {
class zmq_connecter_t : public owned_t, public io_object_t class zmq_connecter_t : public own_t, public io_object_t
{ {
public: public:
zmq_connecter_t (class io_thread_t *parent_, socket_base_t *owner_, // If 'wait' is true connecter first waits for a while, then starts
const options_t &options_, uint64_t session_ordinal_, bool wait_); // connection process.
zmq_connecter_t (class io_thread_t *io_thread_,
class session_t *session_, const options_t &options_,
const char *protocol_, const char *address_);
~zmq_connecter_t (); ~zmq_connecter_t ();
// Set address to connect to.
int set_address (const char *protocol_, const char *address_);
private: private:
// Handlers for incoming commands. // Handlers for incoming commands.
...@@ -69,16 +67,12 @@ namespace zmq ...@@ -69,16 +67,12 @@ namespace zmq
// If true, connecter is waiting a while before trying to connect. // If true, connecter is waiting a while before trying to connect.
bool wait; bool wait;
// Ordinal of the session to attach to. // Reference to the session we belong to.
uint64_t session_ordinal; class session_t *session;
// Associated socket options. // Associated socket options.
options_t options; options_t options;
// Protocol and address to connect to.
std::string protocol;
std::string address;
zmq_connecter_t (const zmq_connecter_t&); zmq_connecter_t (const zmq_connecter_t&);
void operator = (const zmq_connecter_t&); void operator = (const zmq_connecter_t&);
}; };
......
...@@ -32,10 +32,7 @@ ...@@ -32,10 +32,7 @@
#include "config.hpp" #include "config.hpp"
#include "err.hpp" #include "err.hpp"
zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_, zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) :
const options_t &options_, bool reconnect_,
const char *protocol_, const char *address_) :
io_object_t (parent_),
inpos (NULL), inpos (NULL),
insize (0), insize (0),
decoder (in_batch_size), decoder (in_batch_size),
...@@ -43,14 +40,8 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_, ...@@ -43,14 +40,8 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_,
outsize (0), outsize (0),
encoder (out_batch_size), encoder (out_batch_size),
inout (NULL), inout (NULL),
options (options_), options (options_)
reconnect (reconnect_)
{ {
if (reconnect) {
protocol = protocol_;
address = address_;
}
// Initialise the underlying socket. // Initialise the underlying socket.
int rc = tcp_socket.open (fd_, options.sndbuf, options.rcvbuf); int rc = tcp_socket.open (fd_, options.sndbuf, options.rcvbuf);
zmq_assert (rc == 0); zmq_assert (rc == 0);
...@@ -60,26 +51,37 @@ zmq::zmq_engine_t::~zmq_engine_t () ...@@ -60,26 +51,37 @@ zmq::zmq_engine_t::~zmq_engine_t ()
{ {
} }
void zmq::zmq_engine_t::plug (i_inout *inout_) void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_inout *inout_)
{ {
// Conncet to session/init object.
zmq_assert (!inout); zmq_assert (!inout);
zmq_assert (inout_);
encoder.set_inout (inout_); encoder.set_inout (inout_);
decoder.set_inout (inout_); decoder.set_inout (inout_);
inout = inout_;
// Connect to I/O threads poller object.
io_object_t::plug (io_thread_);
handle = add_fd (tcp_socket.get_fd ()); handle = add_fd (tcp_socket.get_fd ());
set_pollin (handle); set_pollin (handle);
set_pollout (handle); set_pollout (handle);
inout = inout_;
// Flush all the data that may have been already received downstream. // Flush all the data that may have been already received downstream.
in_event (); in_event ();
// TODO: Re-plug to the new I/O thread & poller!
} }
void zmq::zmq_engine_t::unplug () void zmq::zmq_engine_t::unplug ()
{ {
// Cancel all fd subscriptions.
rm_fd (handle); rm_fd (handle);
// Disconnect from I/O threads poller object.
io_object_t::unplug ();
// Disconnect from init/session object.
encoder.set_inout (NULL); encoder.set_inout (NULL);
decoder.set_inout (NULL); decoder.set_inout (NULL);
inout = NULL; inout = NULL;
...@@ -155,7 +157,7 @@ void zmq::zmq_engine_t::out_event () ...@@ -155,7 +157,7 @@ void zmq::zmq_engine_t::out_event ()
outsize -= nbytes; outsize -= nbytes;
} }
void zmq::zmq_engine_t::revive () void zmq::zmq_engine_t::activate_out ()
{ {
set_pollout (handle); set_pollout (handle);
...@@ -166,30 +168,18 @@ void zmq::zmq_engine_t::revive () ...@@ -166,30 +168,18 @@ void zmq::zmq_engine_t::revive ()
out_event (); out_event ();
} }
void zmq::zmq_engine_t::resume_input () void zmq::zmq_engine_t::activate_in ()
{ {
set_pollin (handle); set_pollin (handle);
// Speculative read.
in_event (); in_event ();
} }
void zmq::zmq_engine_t::error () void zmq::zmq_engine_t::error ()
{ {
zmq_assert (inout); zmq_assert (inout);
inout->detach ();
zmq_connecter_t *reconnecter = NULL;
if (reconnect) {
// Create a connecter object to attempt reconnect.
// Ask it to wait for a while before reconnecting.
reconnecter = new (std::nothrow) zmq_connecter_t (
inout->get_io_thread (), inout->get_owner (),
options, inout->get_ordinal (), true);
zmq_assert (reconnecter);
reconnecter->set_address (protocol.c_str(), address.c_str ());
}
inout->detach (reconnecter);
unplug (); unplug ();
delete this; delete this;
} }
...@@ -38,16 +38,14 @@ namespace zmq ...@@ -38,16 +38,14 @@ namespace zmq
{ {
public: public:
zmq_engine_t (class io_thread_t *parent_, fd_t fd_, zmq_engine_t (fd_t fd_, const options_t &options_);
const options_t &options_, bool reconnect_,
const char *protocol_, const char *address_);
~zmq_engine_t (); ~zmq_engine_t ();
// i_engine interface implementation. // i_engine interface implementation.
void plug (struct i_inout *inout_); void plug (class io_thread_t *io_thread_, struct i_inout *inout_);
void unplug (); void unplug ();
void revive (); void activate_in ();
void resume_input (); void activate_out ();
// i_poll_events interface implementation. // i_poll_events interface implementation.
void in_event (); void in_event ();
...@@ -73,10 +71,6 @@ namespace zmq ...@@ -73,10 +71,6 @@ namespace zmq
options_t options; options_t options;
bool reconnect;
std::string protocol;
std::string address;
zmq_engine_t (const zmq_engine_t&); zmq_engine_t (const zmq_engine_t&);
void operator = (const zmq_engine_t&); void operator = (const zmq_engine_t&);
}; };
......
...@@ -20,24 +20,29 @@ ...@@ -20,24 +20,29 @@
#include <string.h> #include <string.h>
#include "zmq_init.hpp" #include "zmq_init.hpp"
#include "transient_session.hpp"
#include "named_session.hpp"
#include "socket_base.hpp"
#include "zmq_engine.hpp" #include "zmq_engine.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "session.hpp" #include "session.hpp"
#include "uuid.hpp" #include "uuid.hpp"
#include "blob.hpp"
#include "err.hpp" #include "err.hpp"
zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_, zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
fd_t fd_, const options_t &options_, bool reconnect_, socket_base_t *socket_, session_t *session_, fd_t fd_,
const char *protocol_, const char *address_, uint64_t session_ordinal_) : const options_t &options_) :
owned_t (parent_, owner_), own_t (io_thread_),
sent (false), sent (false),
received (false), received (false),
session_ordinal (session_ordinal_), socket (socket_),
options (options_) session (session_),
options (options_),
io_thread (io_thread_)
{ {
// Create the engine object for this connection. // Create the engine object for this connection.
engine = new (std::nothrow) zmq_engine_t (parent_, fd_, options, engine = new (std::nothrow) zmq_engine_t (fd_, options);
reconnect_, protocol_, address_);
zmq_assert (engine); zmq_assert (engine);
} }
...@@ -62,7 +67,7 @@ bool zmq::zmq_init_t::read (::zmq_msg_t *msg_) ...@@ -62,7 +67,7 @@ bool zmq::zmq_init_t::read (::zmq_msg_t *msg_)
// If initialisation is done, pass the engine to the session and // If initialisation is done, pass the engine to the session and
// destroy the init object. // destroy the init object.
finalise (); finalise_initialisation ();
return true; return true;
} }
...@@ -99,44 +104,28 @@ void zmq::zmq_init_t::flush () ...@@ -99,44 +104,28 @@ void zmq::zmq_init_t::flush ()
// If initialisation is done, pass the engine to the session and // If initialisation is done, pass the engine to the session and
// destroy the init object. // destroy the init object.
finalise (); finalise_initialisation ();
} }
void zmq::zmq_init_t::detach (owned_t *reconnecter_) void zmq::zmq_init_t::detach ()
{ {
// This function is called by engine when disconnection occurs. // This function is called by engine when disconnection occurs.
// If required, launch the reconnecter. // If there is an associated session, send it a null engine to let it know
if (reconnecter_) { // that connection process was unsuccesful.
send_plug (reconnecter_); if (session)
send_own (owner, reconnecter_); send_attach (session, NULL, blob_t (), true);
}
// The engine will destroy itself, so let's just drop the pointer here and // The engine will destroy itself, so let's just drop the pointer here and
// start termination of the init object. // start termination of the init object.
engine = NULL; engine = NULL;
term (); terminate ();
}
zmq::io_thread_t *zmq::zmq_init_t::get_io_thread ()
{
return choose_io_thread (options.affinity);
}
class zmq::socket_base_t *zmq::zmq_init_t::get_owner ()
{
return owner;
}
uint64_t zmq::zmq_init_t::get_ordinal ()
{
return session_ordinal;
} }
void zmq::zmq_init_t::process_plug () void zmq::zmq_init_t::process_plug ()
{ {
zmq_assert (engine); zmq_assert (engine);
engine->plug (this); engine->plug (io_thread, this);
} }
void zmq::zmq_init_t::process_unplug () void zmq::zmq_init_t::process_unplug ()
...@@ -145,51 +134,62 @@ void zmq::zmq_init_t::process_unplug () ...@@ -145,51 +134,62 @@ void zmq::zmq_init_t::process_unplug ()
engine->unplug (); engine->unplug ();
} }
void zmq::zmq_init_t::finalise () void zmq::zmq_init_t::finalise_initialisation ()
{ {
if (sent && received) { if (sent && received) {
// Disconnect the engine from the init object. // If we know what session we belong to, it's easy, just send the
engine->unplug (); // engine to that session and destroy the init object.
if (session) {
engine->unplug ();
send_attach (session, engine, peer_identity, true);
engine = NULL;
terminate ();
return;
}
session_t *session = NULL; // All the cases below are listener-based. Therefore we need the socket
// reference so that new sessions can bind to that socket.
// If we have the session ordinal, let's use it to find the session. zmq_assert (socket);
// If it is not found, it means socket is already being shut down
// and the session have been deallocated. // We have no associated session. If the peer has no identity we'll
// TODO: We should check whether the name of the peer haven't changed // create a transient session for the connection.
// upon reconnection. if (peer_identity [0] == 0) {
if (session_ordinal) { session = new (std::nothrow) transient_session_t (io_thread,
session = owner->find_session (session_ordinal); socket, options);
if (!session) { zmq_assert (session);
term (); launch_sibling (session);
return; engine->unplug ();
} send_attach (session, engine, peer_identity, true);
engine = NULL;
terminate ();
return;
} }
else {
// Try to find the session corresponding to the peer's identity.
// If the peer has a unique name, find the associated session. // If found, send the engine to that session and destroy this object.
// If it does not exist, create it. // Note that session's seqnum is incremented by find_session rather
zmq_assert (!peer_identity.empty ()); // than by send_attach.
session = owner->find_session (peer_identity); session = socket->find_session (peer_identity);
if (!session) { if (session) {
session = new (std::nothrow) session_t ( engine->unplug ();
choose_io_thread (options.affinity), owner, options, send_attach (session, engine, peer_identity, false);
peer_identity); engine = NULL;
zmq_assert (session); terminate ();
send_plug (session); return;
send_own (owner, session);
// Reserve a sequence number for following 'attach' command.
session->inc_seqnum ();
}
} }
// No need to increment seqnum as it was already incremented above. // There's no such named session. We have to create one.
send_attach (session, engine, peer_identity, false); // TODO:
zmq_assert (false);
// Destroy the init object. // session = new (std::nothrow) named_session_t (io_thread, socket,
// options, peer_identity);
zmq_assert (session);
launch_sibling (session);
engine->unplug ();
send_attach (session, engine, peer_identity, true);
engine = NULL; engine = NULL;
term (); terminate ();
return;
} }
} }
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
#include "i_inout.hpp" #include "i_inout.hpp"
#include "i_engine.hpp" #include "i_engine.hpp"
#include "owned.hpp" #include "own.hpp"
#include "fd.hpp" #include "fd.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "options.hpp" #include "options.hpp"
...@@ -34,28 +34,23 @@ namespace zmq ...@@ -34,28 +34,23 @@ namespace zmq
// The class handles initialisation phase of 0MQ wire-level protocol. // The class handles initialisation phase of 0MQ wire-level protocol.
class zmq_init_t : public owned_t, public i_inout class zmq_init_t : public own_t, public i_inout
{ {
public: public:
zmq_init_t (class io_thread_t *parent_, socket_base_t *owner_, zmq_init_t (class io_thread_t *io_thread_, class socket_base_t *socket_,
fd_t fd_, const options_t &options_, bool reconnect_, class session_t *session_, fd_t fd_, const options_t &options_);
const char *protocol_, const char *address_,
uint64_t session_ordinal_);
~zmq_init_t (); ~zmq_init_t ();
private: private:
void finalise (); void finalise_initialisation ();
// i_inout interface implementation. // i_inout interface implementation.
bool read (::zmq_msg_t *msg_); bool read (::zmq_msg_t *msg_);
bool write (::zmq_msg_t *msg_); bool write (::zmq_msg_t *msg_);
void flush (); void flush ();
void detach (owned_t *reconnecter_); void detach ();
class io_thread_t *get_io_thread ();
class socket_base_t *get_owner ();
uint64_t get_ordinal ();
// Handlers for incoming commands. // Handlers for incoming commands.
void process_plug (); void process_plug ();
...@@ -70,16 +65,24 @@ namespace zmq ...@@ -70,16 +65,24 @@ namespace zmq
// True if peer's identity was already received. // True if peer's identity was already received.
bool received; bool received;
// Socket the object belongs to.
class socket_base_t *socket;
// Reference to the session the init object belongs to.
// If the associated session is unknown and should be found
// depending on peer identity this value is NULL.
class session_t *session;
// Identity of the peer socket. // Identity of the peer socket.
blob_t peer_identity; blob_t peer_identity;
// TCP connecter creates session before the name of the peer is known.
// Thus we know only its ordinal number.
uint64_t session_ordinal;
// Associated socket options. // Associated socket options.
options_t options; options_t options;
// I/O thread the object is living in. It will be used to plug
// the engine into the same I/O thread.
class io_thread_t *io_thread;
zmq_init_t (const zmq_init_t&); zmq_init_t (const zmq_init_t&);
void operator = (const zmq_init_t&); void operator = (const zmq_init_t&);
}; };
......
...@@ -24,11 +24,12 @@ ...@@ -24,11 +24,12 @@
#include "io_thread.hpp" #include "io_thread.hpp"
#include "err.hpp" #include "err.hpp"
zmq::zmq_listener_t::zmq_listener_t (io_thread_t *parent_, zmq::zmq_listener_t::zmq_listener_t (io_thread_t *io_thread_,
socket_base_t *owner_, const options_t &options_) : socket_base_t *socket_, const options_t &options_) :
owned_t (parent_, owner_), own_t (io_thread_),
io_object_t (parent_), io_object_t (io_thread_),
options (options_) options (options_),
socket (socket_)
{ {
} }
...@@ -62,13 +63,11 @@ void zmq::zmq_listener_t::in_event () ...@@ -62,13 +63,11 @@ void zmq::zmq_listener_t::in_event ()
if (fd == retired_fd) if (fd == retired_fd)
return; return;
// Create an init object. // Create and launch an init object.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_init_t *init = new (std::nothrow) zmq_init_t ( zmq_init_t *init = new (std::nothrow) zmq_init_t (
io_thread, owner, fd, options, false, NULL, NULL, 0); choose_io_thread (options.affinity), socket, NULL, fd, options);
zmq_assert (init); zmq_assert (init);
send_plug (init); launch_sibling (init);
send_own (owner, init);
} }
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
#ifndef __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__ #ifndef __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__
#define __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__ #define __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__
#include "owned.hpp" #include "own.hpp"
#include "io_object.hpp" #include "io_object.hpp"
#include "tcp_listener.hpp" #include "tcp_listener.hpp"
#include "options.hpp" #include "options.hpp"
...@@ -29,12 +29,12 @@ ...@@ -29,12 +29,12 @@
namespace zmq namespace zmq
{ {
class zmq_listener_t : public owned_t, public io_object_t class zmq_listener_t : public own_t, public io_object_t
{ {
public: public:
zmq_listener_t (class io_thread_t *parent_, socket_base_t *owner_, zmq_listener_t (class io_thread_t *io_thread_,
const options_t &options_); class socket_base_t *socket_, const options_t &options_);
~zmq_listener_t (); ~zmq_listener_t ();
// Set address to listen on. // Set address to listen on.
...@@ -58,6 +58,9 @@ namespace zmq ...@@ -58,6 +58,9 @@ namespace zmq
// Associated socket options. // Associated socket options.
options_t options; options_t options;
// Socket the listerner belongs to.
class socket_base_t *socket;
zmq_listener_t (const zmq_listener_t&); zmq_listener_t (const zmq_listener_t&);
void operator = (const zmq_listener_t&); void operator = (const zmq_listener_t&);
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment