Commit aebff623 authored by Martin Sustrik's avatar Martin Sustrik

ZMQII-28: Bidirectional introduction on TCP connection establishment

parent b3bd4c15
...@@ -115,12 +115,11 @@ libzmq_la_SOURCES = app_thread.hpp \ ...@@ -115,12 +115,11 @@ libzmq_la_SOURCES = app_thread.hpp \
ypollset.hpp \ ypollset.hpp \
yqueue.hpp \ yqueue.hpp \
zmq_connecter.hpp \ zmq_connecter.hpp \
zmq_connecter_init.hpp \
zmq_decoder.hpp \ zmq_decoder.hpp \
zmq_encoder.hpp \ zmq_encoder.hpp \
zmq_engine.hpp \ zmq_engine.hpp \
zmq_init.hpp \
zmq_listener.hpp \ zmq_listener.hpp \
zmq_listener_init.hpp \
app_thread.cpp \ app_thread.cpp \
devpoll.cpp \ devpoll.cpp \
dispatcher.cpp \ dispatcher.cpp \
...@@ -161,12 +160,11 @@ libzmq_la_SOURCES = app_thread.hpp \ ...@@ -161,12 +160,11 @@ libzmq_la_SOURCES = app_thread.hpp \
ypollset.cpp \ ypollset.cpp \
zmq.cpp \ zmq.cpp \
zmq_connecter.cpp \ zmq_connecter.cpp \
zmq_connecter_init.cpp \
zmq_decoder.cpp \ zmq_decoder.cpp \
zmq_encoder.cpp \ zmq_encoder.cpp \
zmq_engine.cpp \ zmq_engine.cpp \
zmq_listener.cpp \ zmq_init.cpp \
zmq_listener_init.cpp zmq_listener.cpp
libzmq_la_LDFLAGS = -version-info @LTVER@ @LIBZMQ_EXTRA_LDFLAFS@ libzmq_la_LDFLAGS = -version-info @LTVER@ @LIBZMQ_EXTRA_LDFLAFS@
......
...@@ -22,6 +22,8 @@ ...@@ -22,6 +22,8 @@
#include "../bindings/c/zmq.h" #include "../bindings/c/zmq.h"
#include "stdint.hpp"
namespace zmq namespace zmq
{ {
...@@ -47,8 +49,8 @@ namespace zmq ...@@ -47,8 +49,8 @@ namespace zmq
// Return pointer to the owning socket. // Return pointer to the owning socket.
virtual class socket_base_t *get_owner () = 0; virtual class socket_base_t *get_owner () = 0;
// Returns the name of associated session. // Return ordinal number of the session.
virtual const char *get_session_name () = 0; virtual uint64_t get_ordinal () = 0;
}; };
} }
......
...@@ -37,11 +37,10 @@ ...@@ -37,11 +37,10 @@
#include "i_inout.hpp" #include "i_inout.hpp"
zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_, zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
const options_t &options_, const char *session_name_) : const options_t &options_) :
io_object_t (parent_), io_object_t (parent_),
pgm_socket (true, options_), pgm_socket (true, options_),
options (options_), options (options_),
session_name (session_name_),
inout (NULL) inout (NULL)
{ {
} }
......
...@@ -47,8 +47,7 @@ namespace zmq ...@@ -47,8 +47,7 @@ namespace zmq
// Creates gm_engine. Underlying PGM connection is initialised // Creates gm_engine. Underlying PGM connection is initialised
// using network_ parameter. // using network_ parameter.
pgm_receiver_t (class io_thread_t *parent_, const options_t &options_, pgm_receiver_t (class io_thread_t *parent_, const options_t &options_);
const char *session_name_);
~pgm_receiver_t (); ~pgm_receiver_t ();
int init (bool udp_encapsulation_, const char *network_); int init (bool udp_encapsulation_, const char *network_);
...@@ -94,9 +93,6 @@ namespace zmq ...@@ -94,9 +93,6 @@ namespace zmq
// Socket options. // Socket options.
options_t options; options_t options;
// Name of the session associated with the connecter.
std::string session_name;
// Parent session. // Parent session.
i_inout *inout; i_inout *inout;
......
...@@ -33,12 +33,11 @@ ...@@ -33,12 +33,11 @@
#include "wire.hpp" #include "wire.hpp"
zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_, zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
const options_t &options_, const char *session_name_) : const options_t &options_) :
io_object_t (parent_), io_object_t (parent_),
encoder (0, false), encoder (0, false),
pgm_socket (false, options_), pgm_socket (false, options_),
options (options_), options (options_),
session_name (session_name_),
inout (NULL), inout (NULL),
out_buffer (NULL), out_buffer (NULL),
out_buffer_size (0), out_buffer_size (0),
......
...@@ -42,8 +42,7 @@ namespace zmq ...@@ -42,8 +42,7 @@ namespace zmq
{ {
public: public:
pgm_sender_t (class io_thread_t *parent_, const options_t &options_, pgm_sender_t (class io_thread_t *parent_, const options_t &options_);
const char *session_name_);
~pgm_sender_t (); ~pgm_sender_t ();
int init (bool udp_encapsulation_, const char *network_); int init (bool udp_encapsulation_, const char *network_);
...@@ -74,9 +73,6 @@ namespace zmq ...@@ -74,9 +73,6 @@ namespace zmq
// Socket options. // Socket options.
options_t options; options_t options;
// Name of the session associated with the connecter.
std::string session_name;
// Poll handle associated with PGM socket. // Poll handle associated with PGM socket.
handle_t handle; handle_t handle;
handle_t uplink_handle; handle_t uplink_handle;
......
...@@ -25,16 +25,41 @@ ...@@ -25,16 +25,41 @@
#include "pipe.hpp" #include "pipe.hpp"
zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
const char *name_, const options_t &options_, bool reconnect_) : const options_t &options_) :
owned_t (parent_, owner_), owned_t (parent_, owner_),
in_pipe (NULL), in_pipe (NULL),
active (true), active (true),
out_pipe (NULL), out_pipe (NULL),
engine (NULL), engine (NULL),
name (name_), options (options_)
options (options_),
reconnect (reconnect_)
{ {
type = unnamed;
// It's possible to register the session at this point as it will be
// searched for only on reconnect, i.e. no race condition (session found
// before it is plugged into it's I/O thread) is possible.
ordinal = owner->register_session (this);
}
zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
const options_t &options_, const char *name_) :
owned_t (parent_, owner_),
in_pipe (NULL),
active (true),
out_pipe (NULL),
engine (NULL),
options (options_)
{
if (name_) {
type = named;
name = name_;
ordinal = 0;
}
else {
type = transient;
// TODO: Generate unique name here.
ordinal = 0;
}
} }
zmq::session_t::~session_t () zmq::session_t::~session_t ()
...@@ -78,8 +103,8 @@ void zmq::session_t::detach (owned_t *reconnecter_) ...@@ -78,8 +103,8 @@ void zmq::session_t::detach (owned_t *reconnecter_)
// Engine is terminating itself. No need to deallocate it from here. // Engine is terminating itself. No need to deallocate it from here.
engine = NULL; engine = NULL;
// In the case od anonymous connection, terminate the session. // Terminate transient session.
if (name.empty ()) if (type == transient)
term (); term ();
} }
...@@ -93,9 +118,11 @@ class zmq::socket_base_t *zmq::session_t::get_owner () ...@@ -93,9 +118,11 @@ class zmq::socket_base_t *zmq::session_t::get_owner ()
return owner; return owner;
} }
const char *zmq::session_t::get_session_name () uint64_t zmq::session_t::get_ordinal ()
{ {
return name.c_str (); zmq_assert (type == unnamed);
zmq_assert (ordinal);
return ordinal;
} }
void zmq::session_t::attach_pipes (class reader_t *inpipe_, void zmq::session_t::attach_pipes (class reader_t *inpipe_,
...@@ -181,11 +208,12 @@ void zmq::session_t::process_plug () ...@@ -181,11 +208,12 @@ void zmq::session_t::process_plug ()
void zmq::session_t::process_unplug () void zmq::session_t::process_unplug ()
{ {
// Unregister the session from the socket. // Unregister the session from the socket. There's nothing to do here
if (!name.empty ()) { // for transient sessions.
bool ok = owner->unregister_session (name.c_str ()); if (type == unnamed)
zmq_assert (ok); owner->unregister_session (ordinal);
} else if (type == named)
owner->unregister_session (name.c_str ());
// Ask associated pipes to terminate. // Ask associated pipes to terminate.
if (in_pipe) { if (in_pipe) {
......
...@@ -34,8 +34,14 @@ namespace zmq ...@@ -34,8 +34,14 @@ namespace zmq
{ {
public: public:
session_t (object_t *parent_, socket_base_t *owner_, const char *name_, // Creates unnamed session.
const options_t &options_, bool reconnect_); session_t (object_t *parent_, socket_base_t *owner_,
const options_t &options_);
// Creates named session. If name is NULL, transient session with
// auto-generated name is created.
session_t (object_t *parent_, socket_base_t *owner_,
const options_t &options_, const char *name_);
// i_inout interface implementation. // i_inout interface implementation.
bool read (::zmq_msg_t *msg_); bool read (::zmq_msg_t *msg_);
...@@ -44,7 +50,7 @@ namespace zmq ...@@ -44,7 +50,7 @@ namespace zmq
void detach (owned_t *reconnecter_); void detach (owned_t *reconnecter_);
class io_thread_t *get_io_thread (); class io_thread_t *get_io_thread ();
class socket_base_t *get_owner (); class socket_base_t *get_owner ();
const char *get_session_name (); uint64_t get_ordinal ();
// i_endpoint interface implementation. // i_endpoint interface implementation.
void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
...@@ -73,6 +79,15 @@ namespace zmq ...@@ -73,6 +79,15 @@ namespace zmq
struct i_engine *engine; struct i_engine *engine;
enum {
transient,
named,
unnamed
} type;
// Ordinal of the session (if any).
uint64_t ordinal;
// The name of the session. One that is used to register it with // The name of the session. One that is used to register it with
// socket-level repository of sessions. // socket-level repository of sessions.
std::string name; std::string name;
...@@ -80,9 +95,6 @@ namespace zmq ...@@ -80,9 +95,6 @@ namespace zmq
// Inherited socket options. // Inherited socket options.
options_t options; options_t options;
// If true, reconnection is required after connection breaks.
bool reconnect;
session_t (const session_t&); session_t (const session_t&);
void operator = (const session_t&); void operator = (const session_t&);
}; };
......
...@@ -32,7 +32,6 @@ ...@@ -32,7 +32,6 @@
#include "session.hpp" #include "session.hpp"
#include "config.hpp" #include "config.hpp"
#include "owned.hpp" #include "owned.hpp"
#include "uuid.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "err.hpp" #include "err.hpp"
#include "platform.hpp" #include "platform.hpp"
...@@ -46,7 +45,8 @@ zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : ...@@ -46,7 +45,8 @@ zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
app_thread (parent_), app_thread (parent_),
shutting_down (false), shutting_down (false),
sent_seqnum (0), sent_seqnum (0),
processed_seqnum (0) processed_seqnum (0),
next_ordinal (1)
{ {
} }
...@@ -114,10 +114,6 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -114,10 +114,6 @@ int zmq::socket_base_t::bind (const char *addr_)
int zmq::socket_base_t::connect (const char *addr_) int zmq::socket_base_t::connect (const char *addr_)
{ {
// Generate a unique name for the session.
std::string session_name ("#");
session_name += uuid_t ().to_string ();
// Parse addr_ string. // Parse addr_ string.
std::string addr_type; std::string addr_type;
std::string addr_args; std::string addr_args;
...@@ -170,10 +166,10 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -170,10 +166,10 @@ int zmq::socket_base_t::connect (const char *addr_)
return 0; return 0;
} }
// Create the session. // Create unnamed session.
io_thread_t *io_thread = choose_io_thread (options.affinity); io_thread_t *io_thread = choose_io_thread (options.affinity);
session_t *session = new (std::nothrow) session_t (io_thread, this, session_t *session = new (std::nothrow) session_t (io_thread,
session_name.c_str (), options, true); this, options);
zmq_assert (session); zmq_assert (session);
pipe_t *in_pipe = NULL; pipe_t *in_pipe = NULL;
...@@ -213,7 +209,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -213,7 +209,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// it is established. // it is established.
zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t ( zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t (
choose_io_thread (options.affinity), this, options, choose_io_thread (options.affinity), this, options,
session_name.c_str (), false); session->get_ordinal (), false);
zmq_assert (connecter); zmq_assert (connecter);
int rc = connecter->set_address (addr_args.c_str ()); int rc = connecter->set_address (addr_args.c_str ());
if (rc != 0) { if (rc != 0) {
...@@ -245,8 +241,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -245,8 +241,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// PGM sender. // PGM sender.
pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t ( pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
choose_io_thread (options.affinity), options, choose_io_thread (options.affinity), options);
session_name.c_str ());
zmq_assert (pgm_sender); zmq_assert (pgm_sender);
int rc = pgm_sender->init (udp_encapsulation, addr_args.c_str ()); int rc = pgm_sender->init (udp_encapsulation, addr_args.c_str ());
...@@ -261,8 +256,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -261,8 +256,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// PGM receiver. // PGM receiver.
pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t ( pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
choose_io_thread (options.affinity), options, choose_io_thread (options.affinity), options);
session_name.c_str ());
zmq_assert (pgm_receiver); zmq_assert (pgm_receiver);
int rc = pgm_receiver->init (udp_encapsulation, addr_args.c_str ()); int rc = pgm_receiver->init (udp_encapsulation, addr_args.c_str ());
...@@ -408,7 +402,8 @@ int zmq::socket_base_t::close () ...@@ -408,7 +402,8 @@ int zmq::socket_base_t::close ()
// Check whether there are no session leaks. // Check whether there are no session leaks.
sessions_sync.lock (); sessions_sync.lock ();
zmq_assert (sessions.empty ()); zmq_assert (named_sessions.empty ());
zmq_assert (unnamed_sessions.empty ());
sessions_sync.unlock (); sessions_sync.unlock ();
delete this; delete this;
...@@ -445,36 +440,74 @@ bool zmq::socket_base_t::register_session (const char *name_, ...@@ -445,36 +440,74 @@ bool zmq::socket_base_t::register_session (const char *name_,
session_t *session_) session_t *session_)
{ {
sessions_sync.lock (); sessions_sync.lock ();
bool registered = sessions.insert (std::make_pair (name_, session_)).second; bool registered =
named_sessions.insert (std::make_pair (name_, session_)).second;
sessions_sync.unlock (); sessions_sync.unlock ();
return registered; return registered;
} }
bool zmq::socket_base_t::unregister_session (const char *name_) void zmq::socket_base_t::unregister_session (const char *name_)
{ {
sessions_sync.lock (); sessions_sync.lock ();
sessions_t::iterator it = sessions.find (name_); named_sessions_t::iterator it = named_sessions.find (name_);
bool unregistered = (it != sessions.end ()); zmq_assert (it != named_sessions.end ());
sessions.erase (it); named_sessions.erase (it);
sessions_sync.unlock (); sessions_sync.unlock ();
return unregistered;
} }
zmq::session_t *zmq::socket_base_t::find_session (const char *name_) zmq::session_t *zmq::socket_base_t::find_session (const char *name_)
{ {
sessions_sync.lock (); sessions_sync.lock ();
sessions_t::iterator it = sessions.find (name_); named_sessions_t::iterator it = named_sessions.find (name_);
if (it == sessions.end ()) { if (it == named_sessions.end ()) {
sessions_sync.unlock ();
return NULL;
}
session_t *session = it->second;
// Prepare the session for subsequent attach command.
session->inc_seqnum ();
sessions_sync.unlock ();
return session;
}
uint64_t zmq::socket_base_t::register_session (session_t *session_)
{
sessions_sync.lock ();
uint64_t ordinal = next_ordinal;
next_ordinal++;
unnamed_sessions.insert (std::make_pair (ordinal, session_)).second;
sessions_sync.unlock ();
return ordinal;
}
void zmq::socket_base_t::unregister_session (uint64_t ordinal_)
{
sessions_sync.lock ();
unnamed_sessions_t::iterator it = unnamed_sessions.find (ordinal_);
zmq_assert (it != unnamed_sessions.end ());
unnamed_sessions.erase (it);
sessions_sync.unlock ();
}
zmq::session_t *zmq::socket_base_t::find_session (uint64_t ordinal_)
{
sessions_sync.lock ();
unnamed_sessions_t::iterator it = unnamed_sessions.find (ordinal_);
if (it == unnamed_sessions.end ()) {
sessions_sync.unlock (); sessions_sync.unlock ();
return NULL; return NULL;
} }
session_t *session = it->second;
// Prepare the session for subsequent attach command. // Prepare the session for subsequent attach command.
it->second->inc_seqnum (); session->inc_seqnum ();
sessions_sync.unlock (); sessions_sync.unlock ();
return it->second; return session;
} }
void zmq::socket_base_t::kill (reader_t *pipe_) void zmq::socket_base_t::kill (reader_t *pipe_)
......
...@@ -34,6 +34,7 @@ ...@@ -34,6 +34,7 @@
#include "options.hpp" #include "options.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "atomic_counter.hpp" #include "atomic_counter.hpp"
#include "stdint.hpp"
namespace zmq namespace zmq
{ {
...@@ -74,9 +75,15 @@ namespace zmq ...@@ -74,9 +75,15 @@ namespace zmq
// commands as it is unacceptable to wait for the completion of the // commands as it is unacceptable to wait for the completion of the
// action till user application yields control of the application // action till user application yields control of the application
// thread to 0MQ. Locking is used instead. // thread to 0MQ. Locking is used instead.
// There are two distinct types of sessions: those identified by name
// and those identified by ordinal number. Thus two sets of session
// management functions.
bool register_session (const char *name_, class session_t *session_); bool register_session (const char *name_, class session_t *session_);
bool unregister_session (const char *name_); void unregister_session (const char *name_);
class session_t *find_session (const char *name_); class session_t *find_session (const char *name_);
uint64_t register_session (class session_t *session_);
void unregister_session (uint64_t ordinal_);
class session_t *find_session (uint64_t ordinal_);
// i_endpoint interface implementation. // i_endpoint interface implementation.
void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
...@@ -144,15 +151,15 @@ namespace zmq ...@@ -144,15 +151,15 @@ namespace zmq
// Sequence number of the last command processed by this object. // Sequence number of the last command processed by this object.
uint64_t processed_seqnum; uint64_t processed_seqnum;
// List of existing sessions. This list is never referenced from within // Lists of existing sessions. This lists are never referenced from
// the socket, instead it is used by I/O objects owned by the session. // within the socket, instead they are used by I/O objects owned by
// As those objects can live in different threads, the access is // the socket. As those objects can live in different threads,
// synchronised using 'sessions_sync' mutex. // the access is synchronised by mutex.
// Local sessions are those named by the local instance of 0MQ. typedef std::map <std::string, session_t*> named_sessions_t;
// Remote sessions are the sessions who's identities are provided by named_sessions_t named_sessions;
// the remote party. typedef std::map <uint64_t, session_t*> unnamed_sessions_t;
typedef std::map <std::string, session_t*> sessions_t; unnamed_sessions_t unnamed_sessions;
sessions_t sessions; uint64_t next_ordinal;
mutex_t sessions_sync; mutex_t sessions_sync;
socket_base_t (const socket_base_t&); socket_base_t (const socket_base_t&);
......
...@@ -20,19 +20,20 @@ ...@@ -20,19 +20,20 @@
#include <new> #include <new>
#include "zmq_connecter.hpp" #include "zmq_connecter.hpp"
#include "zmq_connecter_init.hpp" #include "zmq_engine.hpp"
#include "zmq_init.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "err.hpp" #include "err.hpp"
zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_, zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_,
socket_base_t *owner_, const options_t &options_, socket_base_t *owner_, const options_t &options_,
const char *session_name_, bool wait_) : uint64_t session_ordinal_, bool wait_) :
owned_t (parent_, owner_), owned_t (parent_, owner_),
io_object_t (parent_), io_object_t (parent_),
handle_valid (false), handle_valid (false),
wait (wait_), wait (wait_),
options (options_), session_ordinal (session_ordinal_),
session_name (session_name_) options (options_)
{ {
} }
...@@ -88,9 +89,9 @@ void zmq::zmq_connecter_t::out_event () ...@@ -88,9 +89,9 @@ void zmq::zmq_connecter_t::out_event ()
} }
// Create an init object. // Create an init object.
io_thread_t *io_thread = choose_io_thread (options.affinity); zmq_init_t *init = new (std::nothrow) zmq_init_t (
zmq_connecter_init_t *init = new (std::nothrow) zmq_connecter_init_t ( choose_io_thread (options.affinity), owner,
io_thread, owner, fd, options, session_name.c_str (), address.c_str ()); fd, options, true, address.c_str (), session_ordinal);
zmq_assert (init); zmq_assert (init);
send_plug (init); send_plug (init);
send_own (owner, init); send_own (owner, init);
......
...@@ -36,7 +36,7 @@ namespace zmq ...@@ -36,7 +36,7 @@ namespace zmq
public: public:
zmq_connecter_t (class io_thread_t *parent_, socket_base_t *owner_, zmq_connecter_t (class io_thread_t *parent_, socket_base_t *owner_,
const options_t &options_, const char *session_name_, bool wait_); const options_t &options_, uint64_t session_ordinal_, bool wait_);
~zmq_connecter_t (); ~zmq_connecter_t ();
// Set IP address to connect to. // Set IP address to connect to.
...@@ -69,12 +69,12 @@ namespace zmq ...@@ -69,12 +69,12 @@ namespace zmq
// If true, connecter is waiting a while before trying to connect. // If true, connecter is waiting a while before trying to connect.
bool wait; bool wait;
// Ordinal of the session to attach to.
uint64_t session_ordinal;
// Associated socket options. // Associated socket options.
options_t options; options_t options;
// Name of the session associated with the connecter.
std::string session_name;
// Address to connect to. // Address to connect to.
std::string address; std::string address;
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <new>
#include "zmq_connecter_init.hpp"
#include "zmq_connecter.hpp"
#include "io_thread.hpp"
#include "session.hpp"
#include "err.hpp"
zmq::zmq_connecter_init_t::zmq_connecter_init_t (io_thread_t *parent_,
socket_base_t *owner_, fd_t fd_, const options_t &options_,
const char *session_name_, const char *address_) :
owned_t (parent_, owner_),
options (options_),
session_name (session_name_)
{
// Create associated engine object.
engine = new (std::nothrow) zmq_engine_t (parent_, fd_, options, true,
address_);
zmq_assert (engine);
}
zmq::zmq_connecter_init_t::~zmq_connecter_init_t ()
{
if (engine)
delete engine;
}
bool zmq::zmq_connecter_init_t::read (::zmq_msg_t *msg_)
{
// Send identity.
int rc = zmq_msg_init_size (msg_, options.identity.size ());
zmq_assert (rc == 0);
memcpy (zmq_msg_data (msg_), options.identity.c_str (),
options.identity.size ());
// Initialisation is done at this point. Disconnect the engine from
// the init object.
engine->unplug ();
// Find the session associated with this connecter. If it doesn't exist
// drop the newly created connection. If it does, attach it to the
// connection.
session_t *session = NULL;
if (!session_name.empty ())
session = owner->find_session (session_name.c_str ());
if (!session) {
// TODO:
// The socket is already closing. The session is already shut down,
// so no point in continuing with connecting. Shut the connection down.
zmq_assert (false);
}
// No need to increment seqnum as it was alredy incremented above.
send_attach (session, engine, false);
engine = NULL;
// Destroy the init object.
term ();
return true;
}
bool zmq::zmq_connecter_init_t::write (::zmq_msg_t *msg_)
{
return false;
}
void zmq::zmq_connecter_init_t::flush ()
{
// We are not expecting any messages. No point in flushing.
}
void zmq::zmq_connecter_init_t::detach (owned_t *reconnecter_)
{
// Plug in the reconnecter object.
zmq_assert (reconnecter_);
send_plug (reconnecter_);
send_own (owner, reconnecter_);
// This function is called by engine when disconnection occurs.
// The engine will destroy itself, so we just drop the pointer here and
// start termination of the init object.
engine = NULL;
term ();
}
zmq::io_thread_t *zmq::zmq_connecter_init_t::get_io_thread ()
{
return choose_io_thread (options.affinity);
}
class zmq::socket_base_t *zmq::zmq_connecter_init_t::get_owner ()
{
return owner;
}
const char *zmq::zmq_connecter_init_t::get_session_name ()
{
return session_name.c_str ();
}
void zmq::zmq_connecter_init_t::process_plug ()
{
zmq_assert (engine);
engine->plug (this);
}
void zmq::zmq_connecter_init_t::process_unplug ()
{
if (engine)
engine->unplug ();
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_ZMQ_CONNECTER_INIT_HPP_INCLUDED__
#define __ZMQ_ZMQ_CONNECTER_INIT_HPP_INCLUDED__
#include <string>
#include "i_inout.hpp"
#include "owned.hpp"
#include "zmq_engine.hpp"
#include "stdint.hpp"
#include "fd.hpp"
#include "options.hpp"
namespace zmq
{
// The class handles initialisation phase of native 0MQ wire-level
// protocol on the connecting side of the connection.
class zmq_connecter_init_t : public owned_t, public i_inout
{
public:
zmq_connecter_init_t (class io_thread_t *parent_, socket_base_t *owner_,
fd_t fd_, const options_t &options, const char *session_name_,
const char *address_);
~zmq_connecter_init_t ();
private:
// i_inout interface implementation.
bool read (::zmq_msg_t *msg_);
bool write (::zmq_msg_t *msg_);
void flush ();
void detach (owned_t *reconnecter_);
class io_thread_t *get_io_thread ();
class socket_base_t *get_owner ();
const char *get_session_name ();
// Handlers for incoming commands.
void process_plug ();
void process_unplug ();
// Engine is created by zmq_connecter_init_t object. Once the
// initialisation phase is over it is passed to a session object,
// possibly running in a different I/O thread.
zmq_engine_t *engine;
// Associated socket options.
options_t options;
// Name of the session to bind new connection to.
std::string session_name;
zmq_connecter_init_t (const zmq_connecter_init_t&);
void operator = (const zmq_connecter_init_t&);
};
}
#endif
...@@ -159,7 +159,7 @@ void zmq::zmq_engine_t::error () ...@@ -159,7 +159,7 @@ void zmq::zmq_engine_t::error ()
// Ask it to wait for a while before reconnecting. // Ask it to wait for a while before reconnecting.
reconnecter = new (std::nothrow) zmq_connecter_t ( reconnecter = new (std::nothrow) zmq_connecter_t (
inout->get_io_thread (), inout->get_owner (), inout->get_io_thread (), inout->get_owner (),
options, inout->get_session_name (), true); options, inout->get_ordinal (), true);
zmq_assert (reconnecter); zmq_assert (reconnecter);
reconnecter->set_address (address.c_str ()); reconnecter->set_address (address.c_str ());
} }
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "zmq_init.hpp"
#include "zmq_engine.hpp"
#include "io_thread.hpp"
#include "session.hpp"
#include "err.hpp"
zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_,
fd_t fd_, const options_t &options_, bool reconnect_,
const char *address_, uint64_t session_ordinal_) :
owned_t (parent_, owner_),
sent (false),
received (false),
session_ordinal (session_ordinal_),
options (options_)
{
// Create the engine object for this connection.
engine = new (std::nothrow) zmq_engine_t (parent_, fd_, options,
reconnect_, address_);
zmq_assert (engine);
}
zmq::zmq_init_t::~zmq_init_t ()
{
if (engine)
delete engine;
}
bool zmq::zmq_init_t::read (::zmq_msg_t *msg_)
{
// If the identity was already sent, do nothing.
if (sent)
return false;
// Send the identity.
int rc = zmq_msg_init_size (msg_, options.identity.size ());
zmq_assert (rc == 0);
memcpy (zmq_msg_data (msg_), options.identity.c_str (),
options.identity.size ());
sent = true;
// If initialisation is done, pass the engine to the session and
// destroy the init object.
finalise ();
return true;
}
bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
{
// If identity was already received, we are not interested
// in subsequent messages.
if (received)
return false;
// Retreieve the remote identity.
peer_identity.assign ((const char*) zmq_msg_data (msg_),
zmq_msg_size (msg_));
received = true;
return true;
}
void zmq::zmq_init_t::flush ()
{
// Check if there's anything to flush.
if (!received)
return;
// If initialisation is done, pass the engine to the session and
// destroy the init object.
finalise ();
}
void zmq::zmq_init_t::detach (owned_t *reconnecter_)
{
// This function is called by engine when disconnection occurs.
// If required, launch the reconnecter.
if (reconnecter_) {
send_plug (reconnecter_);
send_own (owner, reconnecter_);
}
// The engine will destroy itself, so let's just drop the pointer here and
// start termination of the init object.
engine = NULL;
term ();
}
zmq::io_thread_t *zmq::zmq_init_t::get_io_thread ()
{
return choose_io_thread (options.affinity);
}
class zmq::socket_base_t *zmq::zmq_init_t::get_owner ()
{
return owner;
}
uint64_t zmq::zmq_init_t::get_ordinal ()
{
zmq_assert (false);
}
void zmq::zmq_init_t::process_plug ()
{
zmq_assert (engine);
engine->plug (this);
}
void zmq::zmq_init_t::process_unplug ()
{
if (engine)
engine->unplug ();
}
void zmq::zmq_init_t::finalise ()
{
if (sent && received) {
// Disconnect the engine from the init object.
engine->unplug ();
session_t *session = NULL;
// If we have the session ordinal, let's use it to find the session.
// If it is not found, it means socket is already being shut down
// and the session have been deallocated.
// TODO: We should check whether the name of the peer haven't changed
// upon reconnection.
if (session_ordinal) {
session = owner->find_session (session_ordinal);
if (!session) {
term ();
return;
}
}
// If the peer has a unique name, find the associated session. If it
// doesn't exist, create it.
else if (!peer_identity.empty ()) {
session = owner->find_session (peer_identity.c_str ());
if (!session) {
session = new (std::nothrow) session_t (
choose_io_thread (options.affinity), owner, options,
peer_identity.c_str ());
zmq_assert (session);
send_plug (session);
send_own (owner, session);
// Reserve a sequence number for following 'attach' command.
session->inc_seqnum ();
}
}
// If the other party has no specific identity, let's create a
// transient session.
else {
session = new (std::nothrow) session_t (
choose_io_thread (options.affinity), owner, options, NULL);
zmq_assert (session);
send_plug (session);
send_own (owner, session);
// Reserve a sequence number for following 'attach' command.
session->inc_seqnum ();
}
// No need to increment seqnum as it was laready incremented above.
send_attach (session, engine, false);
// Destroy the init object.
engine = NULL;
term ();
}
}
...@@ -17,34 +17,37 @@ ...@@ -17,34 +17,37 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef __ZMQ_ZMQ_LISTENER_INIT_HPP_INCLUDED__ #ifndef __ZMQ_ZMQ_INIT_HPP_INCLUDED__
#define __ZMQ_ZMQ_LISTENER_INIT_HPP_INCLUDED__ #define __ZMQ_ZMQ_INIT_HPP_INCLUDED__
#include <string> #include <string>
#include "i_inout.hpp" #include "i_inout.hpp"
#include "i_engine.hpp"
#include "owned.hpp" #include "owned.hpp"
#include "zmq_engine.hpp"
#include "stdint.hpp"
#include "fd.hpp" #include "fd.hpp"
#include "stdint.hpp"
#include "options.hpp" #include "options.hpp"
#include "stdint.hpp"
namespace zmq namespace zmq
{ {
// The class handles initialisation phase of native 0MQ wire-level // The class handles initialisation phase of 0MQ wire-level protocol.
// protocol on the listening side of the connection.
class zmq_listener_init_t : public owned_t, public i_inout class zmq_init_t : public owned_t, public i_inout
{ {
public: public:
zmq_listener_init_t (class io_thread_t *parent_, socket_base_t *owner_, zmq_init_t (class io_thread_t *parent_, socket_base_t *owner_,
fd_t fd_, const options_t &options); fd_t fd_, const options_t &options_, bool reconnect_,
~zmq_listener_init_t (); const char *address_, uint64_t session_ordinal_);
~zmq_init_t ();
private: private:
void finalise ();
// i_inout interface implementation. // i_inout interface implementation.
bool read (::zmq_msg_t *msg_); bool read (::zmq_msg_t *msg_);
bool write (::zmq_msg_t *msg_); bool write (::zmq_msg_t *msg_);
...@@ -52,26 +55,33 @@ namespace zmq ...@@ -52,26 +55,33 @@ namespace zmq
void detach (owned_t *reconnecter_); void detach (owned_t *reconnecter_);
class io_thread_t *get_io_thread (); class io_thread_t *get_io_thread ();
class socket_base_t *get_owner (); class socket_base_t *get_owner ();
const char *get_session_name (); uint64_t get_ordinal ();
// Handlers for incoming commands. // Handlers for incoming commands.
void process_plug (); void process_plug ();
void process_unplug (); void process_unplug ();
// Engine is created by zmq_listener_init_t object. Once the // Associated wite-protocol engine.
// initialisation phase is over it is passed to a session object, i_engine *engine;
// possibly running in a different I/O thread.
zmq_engine_t *engine;
// Associated socket options. // True if our own identity was already sent to the peer.
options_t options; bool sent;
// Indetity on the other end of the connection. // True if peer's identity was already received.
bool has_peer_identity; bool received;
// Identity of the peer socket.
std::string peer_identity; std::string peer_identity;
zmq_listener_init_t (const zmq_listener_init_t&); // TCP connecter creates session before the name of the peer is known.
void operator = (const zmq_listener_init_t&); // Thus we know only its ordinal number.
uint64_t session_ordinal;
// Associated socket options.
options_t options;
zmq_init_t (const zmq_init_t&);
void operator = (const zmq_init_t&);
}; };
} }
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
#include <new> #include <new>
#include "zmq_listener.hpp" #include "zmq_listener.hpp"
#include "zmq_listener_init.hpp" #include "zmq_init.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "err.hpp" #include "err.hpp"
...@@ -64,8 +64,8 @@ void zmq::zmq_listener_t::in_event () ...@@ -64,8 +64,8 @@ void zmq::zmq_listener_t::in_event ()
// Create an init object. // Create an init object.
io_thread_t *io_thread = choose_io_thread (options.affinity); io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_listener_init_t *init = new (std::nothrow) zmq_listener_init_t ( zmq_init_t *init = new (std::nothrow) zmq_init_t (
io_thread, owner, fd, options); io_thread, owner, fd, options, false, NULL, 0);
zmq_assert (init); zmq_assert (init);
send_plug (init); send_plug (init);
send_own (owner, init); send_own (owner, init);
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <new>
#include "zmq_listener_init.hpp"
#include "io_thread.hpp"
#include "session.hpp"
#include "err.hpp"
zmq::zmq_listener_init_t::zmq_listener_init_t (io_thread_t *parent_,
socket_base_t *owner_, fd_t fd_, const options_t &options_) :
owned_t (parent_, owner_),
options (options_),
has_peer_identity (false)
{
// Create associated engine object.
engine = new (std::nothrow) zmq_engine_t (parent_, fd_, options,
false, NULL);
zmq_assert (engine);
}
zmq::zmq_listener_init_t::~zmq_listener_init_t ()
{
if (engine)
delete engine;
}
bool zmq::zmq_listener_init_t::read (::zmq_msg_t *msg_)
{
return false;
}
bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_)
{
// Once we've got peer's identity we aren't interested in subsequent
// messages.
if (has_peer_identity)
return false;
// Retreieve the remote identity. We'll use it as a local session name.
has_peer_identity = true;
peer_identity.assign ((const char*) zmq_msg_data (msg_),
zmq_msg_size (msg_));
return true;
}
void zmq::zmq_listener_init_t::flush ()
{
if (!has_peer_identity)
return;
// Initialisation is done. Disconnect the engine from the init object.
engine->unplug ();
// Have a look whether the session already exists. If it does, attach it
// to the engine. If it doesn't create it first.
session_t *session = NULL;
if (!peer_identity.empty ())
session = owner->find_session (peer_identity.c_str ());
if (!session) {
io_thread_t *io_thread = choose_io_thread (options.affinity);
session = new (std::nothrow) session_t (io_thread, owner,
peer_identity.c_str (), options, false);
zmq_assert (session);
send_plug (session);
send_own (owner, session);
// Reserve a sequence number for following 'attach' command.
session->inc_seqnum ();
}
// No need to increment seqnum as it was laready incremented above.
send_attach (session, engine, false);
engine = NULL;
// Destroy the init object.
term ();
}
void zmq::zmq_listener_init_t::detach (owned_t *reconnecter_)
{
// On the listening side of the connection we are never reconnecting.
zmq_assert (reconnecter_ == NULL);
// This function is called by engine when disconnection occurs.
// The engine will destroy itself, so we just drop the pointer here and
// start termination of the init object.
engine = NULL;
term ();
}
zmq::io_thread_t *zmq::zmq_listener_init_t::get_io_thread ()
{
return choose_io_thread (options.affinity);
}
class zmq::socket_base_t *zmq::zmq_listener_init_t::get_owner ()
{
return owner;
}
const char *zmq::zmq_listener_init_t::get_session_name ()
{
zmq_assert (false);
return NULL;
}
void zmq::zmq_listener_init_t::process_plug ()
{
zmq_assert (engine);
engine->plug (this);
}
void zmq::zmq_listener_init_t::process_unplug ()
{
if (engine)
engine->unplug ();
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment