Commit f0920caf authored by Ian Barber's avatar Ian Barber

Revert "On the advice of Martin Hurton, removed the new command type and just…

Revert "On the advice of Martin Hurton, removed the new command type and just terminated the pipe in a reconnect situation, and notified the socket of the same. This handles the blocking properly, but at the cost of potentially losing in flight messages. However, this is a reasonable trade off given how much simpler it makes the patch."

This reverts commit c13f1d52.
parent b020bd4b
...@@ -47,6 +47,7 @@ namespace zmq ...@@ -47,6 +47,7 @@ namespace zmq
own, own,
attach, attach,
bind, bind,
detach,
activate_read, activate_read,
activate_write, activate_write,
hiccup, hiccup,
...@@ -87,6 +88,11 @@ namespace zmq ...@@ -87,6 +88,11 @@ namespace zmq
struct { struct {
zmq::pipe_t *pipe; zmq::pipe_t *pipe;
} bind; } bind;
// Sent from session to socket to disconnect a pipe
struct {
zmq::pipe_t *pipe;
} detach;
// Sent by pipe writer to inform dormant pipe reader that there // Sent by pipe writer to inform dormant pipe reader that there
// are messages in the pipe. // are messages in the pipe.
......
...@@ -92,6 +92,11 @@ void zmq::object_t::process_command (command_t &cmd_) ...@@ -92,6 +92,11 @@ void zmq::object_t::process_command (command_t &cmd_)
process_seqnum (); process_seqnum ();
break; break;
case command_t::detach:
process_detach (cmd_.args.detach.pipe);
process_seqnum ();
break;
case command_t::hiccup: case command_t::hiccup:
process_hiccup (cmd_.args.hiccup.pipe); process_hiccup (cmd_.args.hiccup.pipe);
break; break;
...@@ -211,6 +216,15 @@ void zmq::object_t::send_bind (own_t *destination_, pipe_t *pipe_, ...@@ -211,6 +216,15 @@ void zmq::object_t::send_bind (own_t *destination_, pipe_t *pipe_,
send_command (cmd); send_command (cmd);
} }
void zmq::object_t::send_detach (own_t *destination_, pipe_t *pipe_)
{
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::detach;
cmd.args.detach.pipe = pipe_;
send_command (cmd);
}
void zmq::object_t::send_activate_read (pipe_t *destination_) void zmq::object_t::send_activate_read (pipe_t *destination_)
{ {
command_t cmd; command_t cmd;
...@@ -331,6 +345,11 @@ void zmq::object_t::process_bind (pipe_t *pipe_) ...@@ -331,6 +345,11 @@ void zmq::object_t::process_bind (pipe_t *pipe_)
zmq_assert (false); zmq_assert (false);
} }
void zmq::object_t::process_detach (pipe_t *pipe_)
{
zmq_assert (false);
}
void zmq::object_t::process_activate_read () void zmq::object_t::process_activate_read ()
{ {
zmq_assert (false); zmq_assert (false);
......
...@@ -78,6 +78,7 @@ namespace zmq ...@@ -78,6 +78,7 @@ namespace zmq
zmq::i_engine *engine_, bool inc_seqnum_ = true); zmq::i_engine *engine_, bool inc_seqnum_ = true);
void send_bind (zmq::own_t *destination_, zmq::pipe_t *pipe_, void send_bind (zmq::own_t *destination_, zmq::pipe_t *pipe_,
bool inc_seqnum_ = true); bool inc_seqnum_ = true);
void send_detach (own_t *destination_, pipe_t *pipe_);
void send_activate_read (zmq::pipe_t *destination_); void send_activate_read (zmq::pipe_t *destination_);
void send_activate_write (zmq::pipe_t *destination_, void send_activate_write (zmq::pipe_t *destination_,
uint64_t msgs_read_); uint64_t msgs_read_);
...@@ -99,6 +100,7 @@ namespace zmq ...@@ -99,6 +100,7 @@ namespace zmq
virtual void process_own (zmq::own_t *object_); virtual void process_own (zmq::own_t *object_);
virtual void process_attach (zmq::i_engine *engine_); virtual void process_attach (zmq::i_engine *engine_);
virtual void process_bind (zmq::pipe_t *pipe_); virtual void process_bind (zmq::pipe_t *pipe_);
virtual void process_detach (zmq::pipe_t *pipe_);
virtual void process_activate_read (); virtual void process_activate_read ();
virtual void process_activate_write (uint64_t msgs_read_); virtual void process_activate_write (uint64_t msgs_read_);
virtual void process_hiccup (void *pipe_); virtual void process_hiccup (void *pipe_);
......
...@@ -411,14 +411,11 @@ void zmq::session_base_t::detached () ...@@ -411,14 +411,11 @@ void zmq::session_base_t::detached ()
// the socket object to resend all the subscriptions. // the socket object to resend all the subscriptions.
if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB)) if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB))
pipe->hiccup (); pipe->hiccup ();
// For delayed connect situations, terminate the pipe // For delayed connect situations, hiccup the socket to have it
// and reestablish later on // pause usage of this pipe
if (pipe && options.delay_attach_on_connect == 1) { if (outpipe && options.delay_attach_on_connect == 1)
pipe->terminate (false); send_detach(socket, outpipe);
socket->terminated (pipe);
pipe = NULL;
}
} }
void zmq::session_base_t::start_connecting (bool wait_) void zmq::session_base_t::start_connecting (bool wait_)
......
...@@ -876,6 +876,17 @@ void zmq::socket_base_t::process_destroy () ...@@ -876,6 +876,17 @@ void zmq::socket_base_t::process_destroy ()
destroyed = true; destroyed = true;
} }
void zmq::socket_base_t::process_detach (pipe_t *pipe_)
{
// If we are blocking connecting threads, drop this one
if (options.delay_attach_on_connect == 1) {
zmq_assert (pipe_);
pipes.erase (pipe_);
// Let derived sockets know we're ditching this pipe
xterminated (pipe_);
}
}
int zmq::socket_base_t::xsetsockopt (int option_, const void *optval_, int zmq::socket_base_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_) size_t optvallen_)
{ {
......
...@@ -185,6 +185,9 @@ namespace zmq ...@@ -185,6 +185,9 @@ namespace zmq
void process_bind (zmq::pipe_t *pipe_); void process_bind (zmq::pipe_t *pipe_);
void process_term (int linger_); void process_term (int linger_);
// Allow blocking reconnecting pipes
void process_detach (pipe_t *pipe_);
// Socket's mailbox object. // Socket's mailbox object.
mailbox_t mailbox; mailbox_t mailbox;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment