Commit 43620b3d authored by Martin Sustrik's avatar Martin Sustrik

Multi-hop REQ/REP, part X., optional delayed creation of pipes during connect

parent 4405250d
...@@ -34,6 +34,7 @@ zmq::options_t::options_t () : ...@@ -34,6 +34,7 @@ zmq::options_t::options_t () :
rcvbuf (0), rcvbuf (0),
requires_in (false), requires_in (false),
requires_out (false), requires_out (false),
immediate_connect (true),
traceroute (false) traceroute (false)
{ {
} }
......
...@@ -56,6 +56,12 @@ namespace zmq ...@@ -56,6 +56,12 @@ namespace zmq
bool requires_in; bool requires_in;
bool requires_out; bool requires_out;
// If true, when connecting, pipes are created immediately without
// waiting for the connection to be established. That way the socket
// is not aware of the peer's identity, however, it is able to send
// messages straight away.
bool immediate_connect;
// If true, socket requires tracerouting the messages. // If true, socket requires tracerouting the messages.
bool traceroute; bool traceroute;
}; };
......
...@@ -32,6 +32,11 @@ zmq::rep_t::rep_t (class app_thread_t *parent_) : ...@@ -32,6 +32,11 @@ zmq::rep_t::rep_t (class app_thread_t *parent_) :
{ {
options.requires_in = true; options.requires_in = true;
options.requires_out = true; options.requires_out = true;
// We don't need immediate connect. We'll be able to send messages
// (replies) only when connection is established and thus requests
// can arrive anyway.
options.immediate_connect = false;
} }
zmq::rep_t::~rep_t () zmq::rep_t::~rep_t ()
......
...@@ -220,33 +220,31 @@ void zmq::session_t::process_attach (i_engine *engine_, ...@@ -220,33 +220,31 @@ void zmq::session_t::process_attach (i_engine *engine_,
} }
} }
// If session is created by 'connect' function, it has the pipes set // Check whether the required pipes already exist. If not so, we'll
// already. Otherwise, it's being created by the listener and the pipes // create them and bind them to the socket object.
// are yet to be created. reader_t *socket_reader = NULL;
if (!in_pipe && !out_pipe) { writer_t *socket_writer = NULL;
pipe_t *inbound = NULL; if (options.requires_in && !out_pipe) {
pipe_t *outbound = NULL; pipe_t *pipe = new (std::nothrow) pipe_t (owner, this,
if (options.requires_out) {
inbound = new (std::nothrow) pipe_t (this, owner,
options.hwm, options.lwm); options.hwm, options.lwm);
zmq_assert (inbound); zmq_assert (pipe);
in_pipe = &inbound->reader; out_pipe = &pipe->writer;
in_pipe->set_endpoint (this); out_pipe->set_endpoint (this);
socket_reader = &pipe->reader;
} }
if (options.requires_in) { if (options.requires_out && !in_pipe) {
outbound = new (std::nothrow) pipe_t (owner, this, pipe_t *pipe = new (std::nothrow) pipe_t (this, owner,
options.hwm, options.lwm); options.hwm, options.lwm);
zmq_assert (outbound); zmq_assert (pipe);
out_pipe = &outbound->writer; in_pipe = &pipe->reader;
out_pipe->set_endpoint (this); in_pipe->set_endpoint (this);
socket_writer = &pipe->writer;
} }
send_bind (owner, outbound ? &outbound->reader : NULL, if (socket_reader || socket_writer)
inbound ? &inbound->writer : NULL, peer_identity); send_bind (owner, socket_reader, socket_writer, peer_identity);
}
// Plug in the engine. // Plug in the engine.
zmq_assert (!engine); zmq_assert (!engine);
......
...@@ -141,6 +141,10 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -141,6 +141,10 @@ int zmq::socket_base_t::connect (const char *addr_)
if (addr_type == "inproc") { if (addr_type == "inproc") {
// TODO: inproc connect is specific with respect to creating pipes
// as there's no 'reconnect' functionality implemented. Once that
// is in place we should follow generic pipe creation algorithm.
// Find the peer socket. // Find the peer socket.
socket_base_t *peer = find_endpoint (addr_args.c_str ()); socket_base_t *peer = find_endpoint (addr_args.c_str ());
if (!peer) if (!peer)
...@@ -182,6 +186,11 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -182,6 +186,11 @@ int zmq::socket_base_t::connect (const char *addr_)
this, options); this, options);
zmq_assert (session); zmq_assert (session);
// If 'immediate connect' feature is required, we'll created the pipes
// to the session straight away. Otherwise, they'll be created by the
// session once the connection is established.
if (options.immediate_connect) {
pipe_t *in_pipe = NULL; pipe_t *in_pipe = NULL;
pipe_t *out_pipe = NULL; pipe_t *out_pipe = NULL;
...@@ -207,6 +216,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -207,6 +216,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// Attach the pipes to the session object. // Attach the pipes to the session object.
session->attach_pipes (out_pipe ? &out_pipe->reader : NULL, session->attach_pipes (out_pipe ? &out_pipe->reader : NULL,
in_pipe ? &in_pipe->writer : NULL); in_pipe ? &in_pipe->writer : NULL);
}
// Activate the session. // Activate the session.
send_plug (session); send_plug (session);
...@@ -215,6 +225,8 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -215,6 +225,8 @@ int zmq::socket_base_t::connect (const char *addr_)
if (addr_type == "tcp" || addr_type == "ipc") { if (addr_type == "tcp" || addr_type == "ipc") {
#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS #if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
// Windows named pipes are not compatible with Winsock API.
// There's no UNIX domain socket implementation on OpenVMS.
if (addr_type == "ipc") { if (addr_type == "ipc") {
errno = EPROTONOSUPPORT; errno = EPROTONOSUPPORT;
return -1; return -1;
...@@ -254,6 +266,9 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -254,6 +266,9 @@ int zmq::socket_base_t::connect (const char *addr_)
if (addr_type == "udp") if (addr_type == "udp")
udp_encapsulation = true; udp_encapsulation = true;
// At this point we'll create message pipes to the session straight
// away. There's no point in delaying it as no concept of 'connect'
// exists with PGM anyway.
if (options.requires_out) { if (options.requires_out) {
// PGM sender. // PGM sender.
......
...@@ -28,6 +28,10 @@ zmq::xrep_t::xrep_t (class app_thread_t *parent_) : ...@@ -28,6 +28,10 @@ zmq::xrep_t::xrep_t (class app_thread_t *parent_) :
options.requires_in = true; options.requires_in = true;
options.requires_out = true; options.requires_out = true;
// On connect, pipes are created only after initial handshaking.
// That way we are aware of the peer's identity when binding to the pipes.
options.immediate_connect = false;
// XREP socket adds identity to inbound messages and strips identity // XREP socket adds identity to inbound messages and strips identity
// from the outbound messages. // from the outbound messages.
options.traceroute = true; options.traceroute = true;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment