Commit 531d3ebc authored by Ian Barber's avatar Ian Barber

Remove delay options

These were exposed to users, but have subsequently been removed as
sockopts. They are currently only being used by ZAP, so I've moved it to
a simpl function call (actually it's only used in one case even in that,
so there may be a further simplification possible there).
parent cb35fd7b
...@@ -43,8 +43,6 @@ zmq::options_t::options_t () : ...@@ -43,8 +43,6 @@ zmq::options_t::options_t () :
sndtimeo (-1), sndtimeo (-1),
ipv6 (0), ipv6 (0),
immediate (0), immediate (0),
delay_on_close (true),
delay_on_disconnect (true),
filter (false), filter (false),
recv_identity (false), recv_identity (false),
raw_sock (false), raw_sock (false),
......
...@@ -97,14 +97,6 @@ namespace zmq ...@@ -97,14 +97,6 @@ namespace zmq
// on a socket with only connecting pipes would block // on a socket with only connecting pipes would block
int immediate; int immediate;
// If true, session reads all the pending messages from the pipe and
// sends them to the network when socket is closed.
bool delay_on_close;
// If true, socket reads all the messages from the pipe and delivers
// them to the user when the peer terminates.
bool delay_on_disconnect;
// If 1, (X)SUB socket should filter the messages. If 0, it should not. // If 1, (X)SUB socket should filter the messages. If 0, it should not.
bool filter; bool filter;
......
...@@ -24,7 +24,7 @@ ...@@ -24,7 +24,7 @@
#include "err.hpp" #include "err.hpp"
int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
int hwms_ [2], bool delays_ [2]) int hwms_ [2])
{ {
// Creates two pipe objects. These objects are connected by two ypipes, // Creates two pipe objects. These objects are connected by two ypipes,
// each to pass messages in one direction. // each to pass messages in one direction.
...@@ -35,10 +35,10 @@ int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], ...@@ -35,10 +35,10 @@ int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
alloc_assert (upipe2); alloc_assert (upipe2);
pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2, pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2,
hwms_ [1], hwms_ [0], delays_ [0]); hwms_ [1], hwms_ [0]);
alloc_assert (pipes_ [0]); alloc_assert (pipes_ [0]);
pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1, pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1,
hwms_ [0], hwms_ [1], delays_ [1]); hwms_ [0], hwms_ [1]);
alloc_assert (pipes_ [1]); alloc_assert (pipes_ [1]);
pipes_ [0]->set_peer (pipes_ [1]); pipes_ [0]->set_peer (pipes_ [1]);
...@@ -48,7 +48,7 @@ int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], ...@@ -48,7 +48,7 @@ int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
} }
zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
int inhwm_, int outhwm_, bool delay_) : int inhwm_, int outhwm_) :
object_t (parent_), object_t (parent_),
inpipe (inpipe_), inpipe (inpipe_),
outpipe (outpipe_), outpipe (outpipe_),
...@@ -62,7 +62,7 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, ...@@ -62,7 +62,7 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
peer (NULL), peer (NULL),
sink (NULL), sink (NULL),
state (active), state (active),
delay (delay_) delay (true)
{ {
} }
...@@ -314,6 +314,11 @@ void zmq::pipe_t::process_pipe_term_ack () ...@@ -314,6 +314,11 @@ void zmq::pipe_t::process_pipe_term_ack ()
delete this; delete this;
} }
void zmq::pipe_t::set_nodelay ()
{
this->delay = false;
}
void zmq::pipe_t::terminate (bool delay_) void zmq::pipe_t::terminate (bool delay_)
{ {
// Overload the value specified at pipe creation. // Overload the value specified at pipe creation.
......
...@@ -41,7 +41,7 @@ namespace zmq ...@@ -41,7 +41,7 @@ namespace zmq
// pipe receives all the pending messages before terminating, otherwise it // pipe receives all the pending messages before terminating, otherwise it
// terminates straight away. // terminates straight away.
int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2], int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2],
int hwms_ [2], bool delays_ [2]); int hwms_ [2]);
struct i_pipe_events struct i_pipe_events
{ {
...@@ -65,7 +65,7 @@ namespace zmq ...@@ -65,7 +65,7 @@ namespace zmq
{ {
// This allows pipepair to create pipe objects. // This allows pipepair to create pipe objects.
friend int pipepair (zmq::object_t *parents_ [2], friend int pipepair (zmq::object_t *parents_ [2],
zmq::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2]); zmq::pipe_t* pipes_ [2], int hwms_ [2]);
public: public:
...@@ -100,6 +100,9 @@ namespace zmq ...@@ -100,6 +100,9 @@ namespace zmq
// all the messages on the fly. Causes 'hiccuped' event to be generated // all the messages on the fly. Causes 'hiccuped' event to be generated
// in the peer. // in the peer.
void hiccup (); void hiccup ();
// Ensure the pipe wont block on receiving pipe_term.
void set_nodelay ();
// Ask pipe to terminate. The termination will happen asynchronously // Ask pipe to terminate. The termination will happen asynchronously
// and user will be notified about actual deallocation by 'terminated' // and user will be notified about actual deallocation by 'terminated'
...@@ -125,7 +128,7 @@ namespace zmq ...@@ -125,7 +128,7 @@ namespace zmq
// Constructor is private. Pipe can only be created using // Constructor is private. Pipe can only be created using
// pipepair function. // pipepair function.
pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
int inhwm_, int outhwm_, bool delay_); int inhwm_, int outhwm_);
// Pipepair uses this function to let us know about // Pipepair uses this function to let us know about
// the peer pipe object. // the peer pipe object.
......
...@@ -299,14 +299,15 @@ int zmq::session_base_t::zap_connect () ...@@ -299,14 +299,15 @@ int zmq::session_base_t::zap_connect ()
object_t *parents [2] = {this, peer.socket}; object_t *parents [2] = {this, peer.socket};
pipe_t *new_pipes [2] = {NULL, NULL}; pipe_t *new_pipes [2] = {NULL, NULL};
int hwms [2] = {0, 0}; int hwms [2] = {0, 0};
bool delays [2] = {false, false}; int rc = pipepair (parents, new_pipes, hwms);
int rc = pipepair (parents, new_pipes, hwms, delays);
errno_assert (rc == 0); errno_assert (rc == 0);
// Attach local end of the pipe to this socket object. // Attach local end of the pipe to this socket object.
zap_pipe = new_pipes [0]; zap_pipe = new_pipes [0];
zap_pipe->set_nodelay ();
zap_pipe->set_event_sink (this); zap_pipe->set_event_sink (this);
new_pipes [1]->set_nodelay ();
send_bind (peer.socket, new_pipes [1], false); send_bind (peer.socket, new_pipes [1], false);
// Send empty identity if required by the peer. // Send empty identity if required by the peer.
...@@ -332,8 +333,7 @@ void zmq::session_base_t::process_attach (i_engine *engine_) ...@@ -332,8 +333,7 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
object_t *parents [2] = {this, socket}; object_t *parents [2] = {this, socket};
pipe_t *pipes [2] = {NULL, NULL}; pipe_t *pipes [2] = {NULL, NULL};
int hwms [2] = {options.rcvhwm, options.sndhwm}; int hwms [2] = {options.rcvhwm, options.sndhwm};
bool delays [2] = {options.delay_on_close, options.delay_on_disconnect}; int rc = pipepair (parents, pipes, hwms);
int rc = pipepair (parents, pipes, hwms, delays);
errno_assert (rc == 0); errno_assert (rc == 0);
// Plug the local end of the pipe. // Plug the local end of the pipe.
...@@ -378,7 +378,7 @@ void zmq::session_base_t::process_term (int linger_) ...@@ -378,7 +378,7 @@ void zmq::session_base_t::process_term (int linger_)
// If the termination of the pipe happens before the term command is // If the termination of the pipe happens before the term command is
// delivered there's nothing much to do. We can proceed with the // delivered there's nothing much to do. We can proceed with the
// stadard termination immediately. // standard termination immediately.
if (!pipe && !zap_pipe) { if (!pipe && !zap_pipe) {
proceed_with_term (); proceed_with_term ();
return; return;
......
...@@ -451,8 +451,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -451,8 +451,7 @@ int zmq::socket_base_t::connect (const char *addr_)
object_t *parents [2] = {this, peer.socket}; object_t *parents [2] = {this, peer.socket};
pipe_t *new_pipes [2] = {NULL, NULL}; pipe_t *new_pipes [2] = {NULL, NULL};
int hwms [2] = {sndhwm, rcvhwm}; int hwms [2] = {sndhwm, rcvhwm};
bool delays [2] = {options.delay_on_disconnect, options.delay_on_close}; int rc = pipepair (parents, new_pipes, hwms);
int rc = pipepair (parents, new_pipes, hwms, delays);
errno_assert (rc == 0); errno_assert (rc == 0);
// Attach local end of the pipe to this socket object. // Attach local end of the pipe to this socket object.
...@@ -555,8 +554,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -555,8 +554,7 @@ int zmq::socket_base_t::connect (const char *addr_)
object_t *parents [2] = {this, session}; object_t *parents [2] = {this, session};
pipe_t *new_pipes [2] = {NULL, NULL}; pipe_t *new_pipes [2] = {NULL, NULL};
int hwms [2] = {options.sndhwm, options.rcvhwm}; int hwms [2] = {options.sndhwm, options.rcvhwm};
bool delays [2] = {options.delay_on_disconnect, options.delay_on_close}; rc = pipepair (parents, new_pipes, hwms);
rc = pipepair (parents, new_pipes, hwms, delays);
errno_assert (rc == 0); errno_assert (rc == 0);
// Attach local end of the pipe to the socket object. // Attach local end of the pipe to the socket object.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment