Commit c590873f authored by Simon Giesecke's avatar Simon Giesecke

Problem: complexity of start_connecting

Solution: extract functions for each protocol
parent 31b0a1df
...@@ -550,6 +550,51 @@ void zmq::session_base_t::reconnect () ...@@ -550,6 +550,51 @@ void zmq::session_base_t::reconnect ()
_pipe->hiccup (); _pipe->hiccup ();
} }
zmq::session_base_t::connecter_factory_entry_t
zmq::session_base_t::_connecter_factories[] = {
std::make_pair (protocol_name::tcp,
&zmq::session_base_t::create_connecter_tcp),
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
&& !defined ZMQ_HAVE_VXWORKS
std::make_pair (protocol_name::ipc,
&zmq::session_base_t::create_connecter_ipc),
#endif
#if defined ZMQ_HAVE_TIPC
std::make_pair (protocol_name::tipc,
&zmq::session_base_t::create_connecter_tipc),
#endif
#if defined ZMQ_HAVE_VMCI
std::make_pair (protocol_name::vmci,
&zmq::session_base_t::create_connecter_vmci),
#endif
};
zmq::session_base_t::connecter_factory_map_t
zmq::session_base_t::_connecter_factories_map (
_connecter_factories,
_connecter_factories
+ sizeof (_connecter_factories) / sizeof (_connecter_factories[0]));
zmq::session_base_t::start_connecting_entry_t
zmq::session_base_t::_start_connecting_entries[] = {
std::make_pair (protocol_name::udp,
&zmq::session_base_t::start_connecting_udp),
#if defined ZMQ_HAVE_OPENPGM
std::make_pair ("pgm", &zmq::session_base_t::start_connecting_pgm),
std::make_pair ("epgm", &zmq::session_base_t::start_connecting_pgm),
#endif
#if defined ZMQ_HAVE_NORM
std::make_pair ("norm", &zmq::session_base_t::start_connecting_norm),
#endif
};
zmq::session_base_t::start_connecting_map_t
zmq::session_base_t::_start_connecting_map (
_start_connecting_entries,
_start_connecting_entries
+ sizeof (_start_connecting_entries)
/ sizeof (_start_connecting_entries[0]));
void zmq::session_base_t::start_connecting (bool wait_) void zmq::session_base_t::start_connecting (bool wait_)
{ {
zmq_assert (_active); zmq_assert (_active);
...@@ -560,145 +605,160 @@ void zmq::session_base_t::start_connecting (bool wait_) ...@@ -560,145 +605,160 @@ void zmq::session_base_t::start_connecting (bool wait_)
zmq_assert (io_thread); zmq_assert (io_thread);
// Create the connecter object. // Create the connecter object.
own_t *connecter = NULL; const connecter_factory_map_t::const_iterator connecter_factories_it =
if (_addr->protocol == protocol_name::tcp) { _connecter_factories_map.find (_addr->protocol);
if (!options.socks_proxy_address.empty ()) { if (connecter_factories_it != _connecter_factories_map.end ()) {
address_t *proxy_address = new (std::nothrow) own_t *connecter =
address_t (protocol_name::tcp, options.socks_proxy_address, (this->*connecter_factories_it->second) (io_thread, wait_);
this->get_ctx ());
alloc_assert (proxy_address); alloc_assert (connecter);
connecter = new (std::nothrow) socks_connecter_t ( launch_child (connecter);
io_thread, this, options, _addr, proxy_address, wait_); return;
} else {
connecter = new (std::nothrow)
tcp_connecter_t (io_thread, this, options, _addr, wait_);
}
} }
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ const start_connecting_map_t::const_iterator start_connecting_it =
&& !defined ZMQ_HAVE_VXWORKS _start_connecting_map.find (_addr->protocol);
else if (_addr->protocol == protocol_name::ipc) { if (start_connecting_it != _start_connecting_map.end ()) {
connecter = new (std::nothrow) (this->*start_connecting_it->second) (io_thread);
ipc_connecter_t (io_thread, this, options, _addr, wait_); return;
} }
zmq_assert (false);
}
#if defined ZMQ_HAVE_VMCI
zmq::own_t *zmq::session_base_t::create_connecter_vmci (io_thread_t *io_thread_,
bool wait_)
{
return new (std::nothrow)
vmci_connecter_t (io_thread_, this, options, _addr, wait_);
}
#endif #endif
#if defined ZMQ_HAVE_TIPC #if defined ZMQ_HAVE_TIPC
else if (_addr->protocol == protocol_name::tipc) { zmq::own_t *zmq::session_base_t::create_connecter_tipc (io_thread_t *io_thread_,
connecter = new (std::nothrow) bool wait_)
tipc_connecter_t (io_thread, this, options, _addr, wait_); {
} return new (std::nothrow)
tipc_connecter_t (io_thread_, this, options, _addr, wait_);
}
#endif #endif
#if defined ZMQ_HAVE_VMCI
else if (_addr->protocol == protocol_name::vmci) { #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
connecter = new (std::nothrow) && !defined ZMQ_HAVE_VXWORKS
vmci_connecter_t (io_thread, this, options, _addr, wait_); zmq::own_t *zmq::session_base_t::create_connecter_ipc (io_thread_t *io_thread_,
} bool wait_)
{
return new (std::nothrow)
ipc_connecter_t (io_thread_, this, options, _addr, wait_);
}
#endif #endif
if (connecter != NULL) {
alloc_assert (connecter);
launch_child (connecter);
return;
}
if (_addr->protocol == protocol_name::udp) { zmq::own_t *zmq::session_base_t::create_connecter_tcp (io_thread_t *io_thread_,
zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO bool wait_)
|| options.type == ZMQ_DGRAM); {
if (!options.socks_proxy_address.empty ()) {
udp_engine_t *engine = new (std::nothrow) udp_engine_t (options); address_t *proxy_address = new (std::nothrow) address_t (
alloc_assert (engine); protocol_name::tcp, options.socks_proxy_address, this->get_ctx ());
alloc_assert (proxy_address);
bool recv = false; return new (std::nothrow) socks_connecter_t (
bool send = false; io_thread_, this, options, _addr, proxy_address, wait_);
}
if (options.type == ZMQ_RADIO) { return new (std::nothrow)
send = true; tcp_connecter_t (io_thread_, this, options, _addr, wait_);
recv = false; }
} else if (options.type == ZMQ_DISH) {
send = false;
recv = true;
} else if (options.type == ZMQ_DGRAM) {
send = true;
recv = true;
}
int rc = engine->init (_addr, send, recv); #ifdef ZMQ_HAVE_OPENPGM
void zmq::session_base_t::start_connecting_pgm (io_thread_t *io_thread_)
{
zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB
|| options.type == ZMQ_SUB || options.type == ZMQ_XSUB);
// For EPGM transport with UDP encapsulation of PGM is used.
bool const udp_encapsulation = _addr->protocol == "epgm";
// At this point we'll create message pipes to the session straight
// away. There's no point in delaying it as no concept of 'connect'
// exists with PGM anyway.
if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
// PGM sender.
pgm_sender_t *pgm_sender =
new (std::nothrow) pgm_sender_t (io_thread_, options);
alloc_assert (pgm_sender);
int rc = pgm_sender->init (udp_encapsulation, _addr->address.c_str ());
errno_assert (rc == 0); errno_assert (rc == 0);
send_attach (this, engine); send_attach (this, pgm_sender);
} else {
// PGM receiver.
pgm_receiver_t *pgm_receiver =
new (std::nothrow) pgm_receiver_t (io_thread_, options);
alloc_assert (pgm_receiver);
return; int rc =
pgm_receiver->init (udp_encapsulation, _addr->address.c_str ());
errno_assert (rc == 0);
send_attach (this, pgm_receiver);
} }
}
#endif
#ifdef ZMQ_HAVE_OPENPGM #ifdef ZMQ_HAVE_NORM
void zmq::session_base_t::start_connecting_norm (io_thread_t *io_thread_)
{
// At this point we'll create message pipes to the session straight
// away. There's no point in delaying it as no concept of 'connect'
// exists with NORM anyway.
if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
// NORM sender.
norm_engine_t *norm_sender =
new (std::nothrow) norm_engine_t (io_thread_, options);
alloc_assert (norm_sender);
int rc = norm_sender->init (_addr->address.c_str (), true, false);
errno_assert (rc == 0);
// Both PGM and EPGM transports are using the same infrastructure. send_attach (this, norm_sender);
if (_addr->protocol == "pgm" || _addr->protocol == "epgm") { } else { // ZMQ_SUB or ZMQ_XSUB
zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB
|| options.type == ZMQ_SUB || options.type == ZMQ_XSUB);
// For EPGM transport with UDP encapsulation of PGM is used.
bool const udp_encapsulation = _addr->protocol == "epgm";
// At this point we'll create message pipes to the session straight
// away. There's no point in delaying it as no concept of 'connect'
// exists with PGM anyway.
if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
// PGM sender.
pgm_sender_t *pgm_sender =
new (std::nothrow) pgm_sender_t (io_thread, options);
alloc_assert (pgm_sender);
int rc =
pgm_sender->init (udp_encapsulation, _addr->address.c_str ());
errno_assert (rc == 0);
send_attach (this, pgm_sender);
} else {
// PGM receiver.
pgm_receiver_t *pgm_receiver =
new (std::nothrow) pgm_receiver_t (io_thread, options);
alloc_assert (pgm_receiver);
int rc =
pgm_receiver->init (udp_encapsulation, _addr->address.c_str ());
errno_assert (rc == 0);
send_attach (this, pgm_receiver);
}
return; // NORM receiver.
norm_engine_t *norm_receiver =
new (std::nothrow) norm_engine_t (io_thread_, options);
alloc_assert (norm_receiver);
int rc = norm_receiver->init (_addr->address.c_str (), false, true);
errno_assert (rc == 0);
send_attach (this, norm_receiver);
} }
}
#endif #endif
#ifdef ZMQ_HAVE_NORM void zmq::session_base_t::start_connecting_udp (io_thread_t * /*io_thread_*/)
if (_addr->protocol == "norm") { {
// At this point we'll create message pipes to the session straight zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO
// away. There's no point in delaying it as no concept of 'connect' || options.type == ZMQ_DGRAM);
// exists with NORM anyway.
if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) { udp_engine_t *engine = new (std::nothrow) udp_engine_t (options);
// NORM sender. alloc_assert (engine);
norm_engine_t *norm_sender =
new (std::nothrow) norm_engine_t (io_thread, options); bool recv = false;
alloc_assert (norm_sender); bool send = false;
int rc = norm_sender->init (_addr->address.c_str (), true, false); if (options.type == ZMQ_RADIO) {
errno_assert (rc == 0); send = true;
recv = false;
send_attach (this, norm_sender); } else if (options.type == ZMQ_DISH) {
} else { // ZMQ_SUB or ZMQ_XSUB send = false;
recv = true;
// NORM receiver. } else if (options.type == ZMQ_DGRAM) {
norm_engine_t *norm_receiver = send = true;
new (std::nothrow) norm_engine_t (io_thread, options); recv = true;
alloc_assert (norm_receiver);
int rc = norm_receiver->init (_addr->address.c_str (), false, true);
errno_assert (rc == 0);
send_attach (this, norm_receiver);
}
return;
} }
#endif // ZMQ_HAVE_NORM
zmq_assert (false); const int rc = engine->init (_addr, send, recv);
errno_assert (rc == 0);
send_attach (this, engine);
} }
...@@ -105,6 +105,33 @@ class session_base_t : public own_t, public io_object_t, public i_pipe_events ...@@ -105,6 +105,33 @@ class session_base_t : public own_t, public io_object_t, public i_pipe_events
private: private:
void start_connecting (bool wait_); void start_connecting (bool wait_);
typedef own_t *(session_base_t::*connecter_factory_fun_t) (
io_thread_t *io_thread, bool wait_);
typedef std::pair<std::string, connecter_factory_fun_t>
connecter_factory_entry_t;
static connecter_factory_entry_t _connecter_factories[];
typedef std::map<std::string, connecter_factory_fun_t>
connecter_factory_map_t;
static connecter_factory_map_t _connecter_factories_map;
own_t *create_connecter_vmci (io_thread_t *io_thread_, bool wait_);
own_t *create_connecter_tipc (io_thread_t *io_thread_, bool wait_);
own_t *create_connecter_ipc (io_thread_t *io_thread_, bool wait_);
own_t *create_connecter_tcp (io_thread_t *io_thread_, bool wait_);
typedef void (session_base_t::*start_connecting_fun_t) (
io_thread_t *io_thread);
typedef std::pair<std::string, start_connecting_fun_t>
start_connecting_entry_t;
static start_connecting_entry_t _start_connecting_entries[];
typedef std::map<std::string, start_connecting_fun_t>
start_connecting_map_t;
static start_connecting_map_t _start_connecting_map;
void start_connecting_pgm (io_thread_t *io_thread_);
void start_connecting_norm (io_thread_t *io_thread_);
void start_connecting_udp (io_thread_t *io_thread_);
void reconnect (); void reconnect ();
// Handlers for incoming commands. // Handlers for incoming commands.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment