Commit 6f6466f0 authored by Ian Barber's avatar Ian Barber

Fix a number of whitespace issues in various parts of the code, add validation…

Fix a number of whitespace issues in various parts of the code, add validation to most calls on the test and take a first stab at implementing the reconnection pipe blocking.

It didn't seem straightforward to use any of the existing process calls, so I have added a new command to command_t and friends called detach. This instructs the socket_base to remove the pipe from it's pipe list. The session base stores a copy of the outpipe, and will resend the bind command on reconnection. This should allow balancing again.
parent 06485d92
...@@ -47,6 +47,7 @@ namespace zmq ...@@ -47,6 +47,7 @@ namespace zmq
own, own,
attach, attach,
bind, bind,
detach,
activate_read, activate_read,
activate_write, activate_write,
hiccup, hiccup,
...@@ -87,6 +88,11 @@ namespace zmq ...@@ -87,6 +88,11 @@ namespace zmq
struct { struct {
zmq::pipe_t *pipe; zmq::pipe_t *pipe;
} bind; } bind;
// Sent from session to socket to disconnect a pipe
struct {
zmq::pipe_t *pipe;
} detach;
// Sent by pipe writer to inform dormant pipe reader that there // Sent by pipe writer to inform dormant pipe reader that there
// are messages in the pipe. // are messages in the pipe.
......
...@@ -92,6 +92,11 @@ void zmq::object_t::process_command (command_t &cmd_) ...@@ -92,6 +92,11 @@ void zmq::object_t::process_command (command_t &cmd_)
process_seqnum (); process_seqnum ();
break; break;
case command_t::detach:
process_detach (cmd_.args.detach.pipe);
process_seqnum ();
break;
case command_t::hiccup: case command_t::hiccup:
process_hiccup (cmd_.args.hiccup.pipe); process_hiccup (cmd_.args.hiccup.pipe);
break; break;
...@@ -211,6 +216,15 @@ void zmq::object_t::send_bind (own_t *destination_, pipe_t *pipe_, ...@@ -211,6 +216,15 @@ void zmq::object_t::send_bind (own_t *destination_, pipe_t *pipe_,
send_command (cmd); send_command (cmd);
} }
void zmq::object_t::send_detach (own_t *destination_, pipe_t *pipe_)
{
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::detach;
cmd.args.detach.pipe = pipe_;
send_command (cmd);
}
void zmq::object_t::send_activate_read (pipe_t *destination_) void zmq::object_t::send_activate_read (pipe_t *destination_)
{ {
command_t cmd; command_t cmd;
...@@ -331,6 +345,11 @@ void zmq::object_t::process_bind (pipe_t *pipe_) ...@@ -331,6 +345,11 @@ void zmq::object_t::process_bind (pipe_t *pipe_)
zmq_assert (false); zmq_assert (false);
} }
void zmq::object_t::process_detach (pipe_t *pipe_)
{
zmq_assert (false);
}
void zmq::object_t::process_activate_read () void zmq::object_t::process_activate_read ()
{ {
zmq_assert (false); zmq_assert (false);
......
...@@ -78,6 +78,7 @@ namespace zmq ...@@ -78,6 +78,7 @@ namespace zmq
zmq::i_engine *engine_, bool inc_seqnum_ = true); zmq::i_engine *engine_, bool inc_seqnum_ = true);
void send_bind (zmq::own_t *destination_, zmq::pipe_t *pipe_, void send_bind (zmq::own_t *destination_, zmq::pipe_t *pipe_,
bool inc_seqnum_ = true); bool inc_seqnum_ = true);
void send_detach (own_t *destination_, pipe_t *pipe_);
void send_activate_read (zmq::pipe_t *destination_); void send_activate_read (zmq::pipe_t *destination_);
void send_activate_write (zmq::pipe_t *destination_, void send_activate_write (zmq::pipe_t *destination_,
uint64_t msgs_read_); uint64_t msgs_read_);
...@@ -99,6 +100,7 @@ namespace zmq ...@@ -99,6 +100,7 @@ namespace zmq
virtual void process_own (zmq::own_t *object_); virtual void process_own (zmq::own_t *object_);
virtual void process_attach (zmq::i_engine *engine_); virtual void process_attach (zmq::i_engine *engine_);
virtual void process_bind (zmq::pipe_t *pipe_); virtual void process_bind (zmq::pipe_t *pipe_);
virtual void process_detach (zmq::pipe_t *pipe_);
virtual void process_activate_read (); virtual void process_activate_read ();
virtual void process_activate_write (uint64_t msgs_read_); virtual void process_activate_write (uint64_t msgs_read_);
virtual void process_hiccup (void *pipe_); virtual void process_hiccup (void *pipe_);
......
...@@ -228,7 +228,7 @@ void zmq::session_base_t::clean_pipes () ...@@ -228,7 +228,7 @@ void zmq::session_base_t::clean_pipes ()
} }
void zmq::session_base_t::terminated (pipe_t *pipe_) void zmq::session_base_t::terminated (pipe_t *pipe_)
{ {
// Drop the reference to the deallocated pipe. // Drop the reference to the deallocated pipe.
zmq_assert (pipe == pipe_); zmq_assert (pipe == pipe_);
pipe = NULL; pipe = NULL;
...@@ -306,9 +306,15 @@ void zmq::session_base_t::process_attach (i_engine *engine_) ...@@ -306,9 +306,15 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
zmq_assert (!pipe); zmq_assert (!pipe);
pipe = pipes [0]; pipe = pipes [0];
// Remember the remote end of the pipe if required
if (options.delay_attach_on_connect == 1)
outpipe = pipes [1];
// Ask socket to plug into the pipe. // Ask socket to plug into the pipe.
send_bind (socket, pipes [1]); send_bind (socket, pipes [1]);
} }
else if (outpipe && (options.delay_attach_on_connect == 1))
send_bind (socket, outpipe);
// Plug in the engine. // Plug in the engine.
zmq_assert (!engine); zmq_assert (!engine);
...@@ -405,6 +411,11 @@ void zmq::session_base_t::detached () ...@@ -405,6 +411,11 @@ void zmq::session_base_t::detached ()
// the socket object to resend all the subscriptions. // the socket object to resend all the subscriptions.
if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB)) if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB))
pipe->hiccup (); pipe->hiccup ();
// For delayed connect situations, hiccup the socket to have it
// pause usage of this pipe
if (outpipe && options.delay_attach_on_connect == 1)
send_detach(socket, outpipe);
} }
void zmq::session_base_t::start_connecting (bool wait_) void zmq::session_base_t::start_connecting (bool wait_)
......
...@@ -52,9 +52,6 @@ namespace zmq ...@@ -52,9 +52,6 @@ namespace zmq
// To be used once only, when creating the session. // To be used once only, when creating the session.
void attach_pipe (zmq::pipe_t *pipe_); void attach_pipe (zmq::pipe_t *pipe_);
// To be used once only, for delayed connection
void onconnect_attach_pipe (pipe_t *pipe_);
// Following functions are the interface exposed towards the engine. // Following functions are the interface exposed towards the engine.
virtual int read (msg_t *msg_); virtual int read (msg_t *msg_);
...@@ -106,7 +103,10 @@ namespace zmq ...@@ -106,7 +103,10 @@ namespace zmq
// Pipe connecting the session to its socket. // Pipe connecting the session to its socket.
zmq::pipe_t *pipe; zmq::pipe_t *pipe;
// Socket end of pipe, in case of reconnection
zmq::pipe_t *outpipe;
// This flag is true if the remainder of the message being processed // This flag is true if the remainder of the message being processed
// is still in the in pipe. // is still in the in pipe.
bool incomplete_in; bool incomplete_in;
......
...@@ -529,13 +529,13 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -529,13 +529,13 @@ int zmq::socket_base_t::connect (const char *addr_)
session_base_t *session = session_base_t::create (io_thread, true, this, session_base_t *session = session_base_t::create (io_thread, true, this,
options, paddr); options, paddr);
errno_assert (session); errno_assert (session);
// PGM does not support subscription forwarding; ask for all data to be // PGM does not support subscription forwarding; ask for all data to be
// sent to this pipe. // sent to this pipe.
bool icanhasall = false; bool icanhasall = false;
if (protocol == "pgm" || protocol == "epgm") if (protocol == "pgm" || protocol == "epgm")
icanhasall = true; icanhasall = true;
if (options.delay_attach_on_connect != 1 && icanhasall != true) { if (options.delay_attach_on_connect != 1 && icanhasall != true) {
// Create a bi-directional pipe. // Create a bi-directional pipe.
object_t *parents [2] = {this, session}; object_t *parents [2] = {this, session};
...@@ -547,7 +547,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -547,7 +547,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// Attach local end of the pipe to the socket object. // Attach local end of the pipe to the socket object.
attach_pipe (pipes [0], icanhasall); attach_pipe (pipes [0], icanhasall);
// Attach remote end of the pipe to the session object later on. // Attach remote end of the pipe to the session object later on.
session->attach_pipe (pipes [1]); session->attach_pipe (pipes [1]);
} }
...@@ -876,6 +876,17 @@ void zmq::socket_base_t::process_destroy () ...@@ -876,6 +876,17 @@ void zmq::socket_base_t::process_destroy ()
destroyed = true; destroyed = true;
} }
void zmq::socket_base_t::process_detach (pipe_t *pipe_)
{
// If we are blocking connecting threads, drop this one
if (options.delay_attach_on_connect == 1) {
zmq_assert (pipe_);
pipes.erase (pipe_);
// Let derived sockets know we're ditching this pipe
xterminated (pipe_);
}
}
int zmq::socket_base_t::xsetsockopt (int option_, const void *optval_, int zmq::socket_base_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_) size_t optvallen_)
{ {
......
...@@ -185,6 +185,9 @@ namespace zmq ...@@ -185,6 +185,9 @@ namespace zmq
void process_bind (zmq::pipe_t *pipe_); void process_bind (zmq::pipe_t *pipe_);
void process_term (int linger_); void process_term (int linger_);
// Allow blocking reconnecting pipes
void process_detach (pipe_t *pipe_);
// Socket's mailbox object. // Socket's mailbox object.
mailbox_t mailbox; mailbox_t mailbox;
......
...@@ -36,25 +36,33 @@ int main (int argc, char *argv []) ...@@ -36,25 +36,33 @@ int main (int argc, char *argv [])
int seen = 0; int seen = 0;
void *context = zmq_ctx_new(); void *context = zmq_ctx_new();
assert (context);
void *to = zmq_socket(context, ZMQ_PULL); void *to = zmq_socket(context, ZMQ_PULL);
assert (to);
val = 0; val = 0;
zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val)); rc = zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val));
zmq_bind(to, "tcp://*:5555"); assert (rc == 0);
rc = zmq_bind(to, "tcp://*:5555");
assert (rc == 0);
// Create a socket pushing to two endpoints - only 1 message should arrive. // Create a socket pushing to two endpoints - only 1 message should arrive.
void *from = zmq_socket (context, ZMQ_PUSH); void *from = zmq_socket (context, ZMQ_PUSH);
assert(from);
val = 0; val = 0;
zmq_setsockopt(from, ZMQ_LINGER, &val, sizeof(val)); zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof(val));
rc = zmq_connect(from, "tcp://localhost:5556"); rc = zmq_connect (from, "tcp://localhost:5556");
assert (rc == 0); assert (rc == 0);
rc = zmq_connect(from, "tcp://localhost:5555"); rc = zmq_connect (from, "tcp://localhost:5555");
assert (rc == 0); assert (rc == 0);
for (int i = 0; i < 10; ++i) for (int i = 0; i < 10; ++i)
{ {
std::string message("message "); std::string message("message ");
message += ('0' + i); message += ('0' + i);
zmq_send(from, message.data(), message.size(), 0); rc = zmq_send (from, message.data(), message.size(), 0);
assert(rc >= 0);
} }
sleep(1); sleep(1);
...@@ -62,7 +70,7 @@ int main (int argc, char *argv []) ...@@ -62,7 +70,7 @@ int main (int argc, char *argv [])
for (int i = 0; i < 10; ++i) for (int i = 0; i < 10; ++i)
{ {
memset(&buffer, 0, sizeof(buffer)); memset(&buffer, 0, sizeof(buffer));
rc = zmq_recv(to, &buffer, sizeof(buffer), ZMQ_DONTWAIT); rc = zmq_recv (to, &buffer, sizeof(buffer), ZMQ_DONTWAIT);
if( rc == -1) if( rc == -1)
break; break;
seen++; seen++;
...@@ -81,27 +89,39 @@ int main (int argc, char *argv []) ...@@ -81,27 +89,39 @@ int main (int argc, char *argv [])
context = zmq_ctx_new(); context = zmq_ctx_new();
std::cout << " Rerunning with DELAY_ATTACH_ON_CONNECT\n"; std::cout << " Rerunning with DELAY_ATTACH_ON_CONNECT\n";
to = zmq_socket(context, ZMQ_PULL); to = zmq_socket (context, ZMQ_PULL);
zmq_bind(to, "tcp://*:5560"); assert (to);
rc = zmq_bind (to, "tcp://*:5560");
assert(rc == 0);
val = 0; val = 0;
zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val)); rc = zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof(val));
assert (rc == 0);
// Create a socket pushing to two endpoints - all messages should arrive. // Create a socket pushing to two endpoints - all messages should arrive.
from = zmq_socket (context, ZMQ_PUSH); from = zmq_socket (context, ZMQ_PUSH);
assert (from);
val = 0; val = 0;
zmq_setsockopt(from, ZMQ_LINGER, &val, sizeof(val)); rc = zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof(val));
assert (rc == 0);
val = 1; val = 1;
zmq_setsockopt(from, ZMQ_DELAY_ATTACH_ON_CONNECT, &val, sizeof(val)); rc = zmq_setsockopt (from, ZMQ_DELAY_ATTACH_ON_CONNECT, &val, sizeof(val));
rc = zmq_connect(from, "tcp://localhost:5561");
assert (rc == 0); assert (rc == 0);
rc = zmq_connect(from, "tcp://localhost:5560");
rc = zmq_connect (from, "tcp://localhost:5561");
assert (rc == 0);
rc = zmq_connect (from, "tcp://localhost:5560");
assert (rc == 0); assert (rc == 0);
for (int i = 0; i < 10; ++i) for (int i = 0; i < 10; ++i)
{ {
std::string message("message "); std::string message("message ");
message += ('0' + i); message += ('0' + i);
zmq_send(from, message.data(), message.size(), 0); rc = zmq_send (from, message.data(), message.size(), 0);
assert (rc >= 0);
} }
sleep(1); sleep(1);
...@@ -110,13 +130,9 @@ int main (int argc, char *argv []) ...@@ -110,13 +130,9 @@ int main (int argc, char *argv [])
for (int i = 0; i < 10; ++i) for (int i = 0; i < 10; ++i)
{ {
memset(&buffer, 0, sizeof(buffer)); memset(&buffer, 0, sizeof(buffer));
rc = zmq_recv(to, &buffer, sizeof(buffer), ZMQ_DONTWAIT); rc = zmq_recv (to, &buffer, sizeof(buffer), ZMQ_DONTWAIT);
if( rc == -1) { assert (rc != -1);
break;
}
seen++;
} }
assert (seen == 10);
rc = zmq_close (from); rc = zmq_close (from);
assert (rc == 0); assert (rc == 0);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment