Commit 0a03e86e authored by Martin Sustrik's avatar Martin Sustrik

ZMQ_LINGER socket option added.

    1. ZMQ_LINGER option can be set/get
    2. options are part of own_t base class rather than being declared
       separately by individual objects
    3. Linger option is propagated with "term" command so that the
       newest value of it is used rather than the stored old one.
    4. Session sets the linger timer if needed and terminates
       as soon as it expires.
    5. Corresponding documentation updated.
Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent a1474e30
......@@ -212,6 +212,22 @@ Default value:: 0
Applicable socket types:: all
ZMQ_LINGER: Set linger period for socket shutdown
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_LINGER' option shall retrieve the period for pending outbound
messages to linger in memory after closing the socket. Value of -1 means
infinite. Pending messages will be kept until they are fully transferred to
the peer. Value of 0 means that all the pending messages are dropped immediately
when socket is closed. Positive value means number of milliseconds to keep
trying to send the pending messages before discarding them.
[horizontal]
Option value type:: int
Option value unit:: milliseconds
Default value:: -1
Applicable socket types:: all
ZMQ_FD: Retrieve file descriptor associated with the socket
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_FD' option shall retrieve file descriptor associated with the 0MQ
......
......@@ -216,6 +216,22 @@ Default value:: 0
Applicable socket types:: all
ZMQ_LINGER: Set linger period for socket shutdown
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_LINGER' option shall be set to specify period for pending outbound
messages to linger in memory after closing the socket. Value of -1 means
infinite. Pending messages will be kept until they are fully transferred to
the peer. Value of 0 means that all the pending messages are dropped immediately
when socket is closed. Positive value means number of milliseconds to keep
trying to send the pending messages before discarding them.
[horizontal]
Option value type:: int
Option value unit:: milliseconds
Default value:: -1
Applicable socket types:: all
RETURN VALUE
------------
The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it
......
......@@ -191,6 +191,7 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_FD 14
#define ZMQ_EVENTS 15
#define ZMQ_TYPE 16
#define ZMQ_LINGER 17
/* Send/recv options. */
#define ZMQ_NOBLOCK 1
......
......@@ -109,6 +109,7 @@ namespace zmq
// Sent by socket to I/O object to start its shutdown.
struct {
int linger;
} term;
// Sent by I/O object to the socket to acknowledge it has
......
......@@ -106,7 +106,7 @@ void zmq::object_t::process_command (command_t &cmd_)
break;
case command_t::term:
process_term ();
process_term (cmd_.args.term.linger);
break;
case command_t::term_ack:
......@@ -312,7 +312,7 @@ void zmq::object_t::send_term_req (own_t *destination_,
send_command (cmd);
}
void zmq::object_t::send_term (own_t *destination_)
void zmq::object_t::send_term (own_t *destination_, int linger_)
{
command_t cmd;
#if defined ZMQ_MAKE_VALGRIND_HAPPY
......@@ -320,6 +320,7 @@ void zmq::object_t::send_term (own_t *destination_)
#endif
cmd.destination = destination_;
cmd.type = command_t::term;
cmd.args.term.linger = linger_;
send_command (cmd);
}
......@@ -386,7 +387,7 @@ void zmq::object_t::process_term_req (own_t *object_)
zmq_assert (false);
}
void zmq::object_t::process_term ()
void zmq::object_t::process_term (int linger_)
{
zmq_assert (false);
}
......
......@@ -80,7 +80,7 @@ namespace zmq
void send_pipe_term_ack (class reader_t *destination_);
void send_term_req (class own_t *destination_,
class own_t *object_);
void send_term (class own_t *destination_);
void send_term (class own_t *destination_, int linger_);
void send_term_ack (class own_t *destination_);
// These handlers can be overloaded by the derived objects. They are
......@@ -97,7 +97,7 @@ namespace zmq
virtual void process_pipe_term ();
virtual void process_pipe_term_ack ();
virtual void process_term_req (class own_t *object_);
virtual void process_term ();
virtual void process_term (int linger_);
virtual void process_term_ack ();
// Special handler called after a command that requires a seqnum
......
......@@ -34,6 +34,7 @@ zmq::options_t::options_t () :
sndbuf (0),
rcvbuf (0),
type (-1),
linger (-1),
requires_in (false),
requires_out (false),
immediate_connect (true)
......@@ -128,6 +129,14 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
}
rcvbuf = *((uint64_t*) optval_);
return 0;
case ZMQ_LINGER:
if (optvallen_ != sizeof (int)) {
errno = EINVAL;
return -1;
}
linger = *((int*) optval_);
return 0;
}
errno = EINVAL;
......@@ -138,6 +147,15 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
{
switch (option_) {
case ZMQ_LINGER:
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
return -1;
}
*((int*) optval_) = linger;
*optvallen_ = sizeof (int);
return 0;
case ZMQ_TYPE:
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
......
......@@ -54,6 +54,9 @@ namespace zmq
// Socket type.
int type;
// Linger time, in milliseconds.
int linger;
// These options are never set by the user directly. Instead they are
// provided by the specific socket type.
bool requires_in;
......
......@@ -31,8 +31,9 @@ zmq::own_t::own_t (class ctx_t *parent_, uint32_t slot_) :
{
}
zmq::own_t::own_t (io_thread_t *io_thread_) :
zmq::own_t::own_t (io_thread_t *io_thread_, const options_t &options_) :
object_t (io_thread_),
options (options_),
terminating (false),
sent_seqnum (0),
processed_seqnum (0),
......@@ -113,16 +114,19 @@ void zmq::own_t::process_term_req (own_t *object_)
owned.erase (it);
register_term_acks (1);
send_term (object_);
// Note that this object is the root of the (partial shutdown) thus, its
// value of linger is used, rather than the value stored by the children.
send_term (object_, options.linger);
}
void zmq::own_t::process_own (own_t *object_)
{
// If the object is already being shut down, new owned objects are
// immediately asked to terminate.
// immediately asked to terminate. Note that linger is set to zero.
if (terminating) {
register_term_acks (1);
send_term (object_);
send_term (object_, 0);
return;
}
......@@ -140,7 +144,7 @@ void zmq::own_t::terminate ()
// As for the root of the ownership tree, there's noone to terminate it,
// so it has to terminate itself.
if (!owner) {
process_term ();
process_term (options.linger);
return;
}
......@@ -148,14 +152,14 @@ void zmq::own_t::terminate ()
send_term_req (owner, this);
}
void zmq::own_t::process_term ()
void zmq::own_t::process_term (int linger_)
{
// Double termination should never happen.
zmq_assert (!terminating);
// Send termination request to all owned objects.
// Send termination request to all owned objects.
for (owned_t::iterator it = owned.begin (); it != owned.end (); it++)
send_term (*it);
send_term (*it, linger_);
register_term_acks (owned.size ());
owned.clear ();
......
......@@ -24,6 +24,7 @@
#include <algorithm>
#include "object.hpp"
#include "options.hpp"
#include "atomic_counter.hpp"
#include "stdint.hpp"
......@@ -45,7 +46,7 @@ namespace zmq
own_t (class ctx_t *parent_, uint32_t slot_);
// The object is living within I/O thread.
own_t (class io_thread_t *io_thread_);
own_t (class io_thread_t *io_thread_, const options_t &options_);
// When another owned object wants to send command to this object
// it calls this function to let it know it should not shut down
......@@ -83,12 +84,15 @@ namespace zmq
// Term handler is protocted rather than private so that it can
// be intercepted by the derived class. This is useful to add custom
// steps to the beginning of the termination process.
void process_term ();
void process_term (int linger_);
// A place to hook in when phyicallal destruction of the object
// is to be delayed.
virtual void process_destroy ();
// Socket options associated with this object.
options_t options;
private:
// Set owner of the object
......
......@@ -86,7 +86,7 @@ void zmq::pair_t::delimited (reader_t *pipe_)
{
}
void zmq::pair_t::process_term ()
void zmq::pair_t::process_term (int linger_)
{
terminating = true;
......@@ -100,7 +100,7 @@ void zmq::pair_t::process_term ()
outpipe->terminate ();
}
socket_base_t::process_term ();
socket_base_t::process_term (linger_);
}
void zmq::pair_t::activated (class reader_t *pipe_)
......
......@@ -56,7 +56,7 @@ namespace zmq
private:
// Hook into termination process.
void process_term ();
void process_term (int linger_);
class reader_t *inpipe;
class writer_t *outpipe;
......
......@@ -56,7 +56,7 @@ void zmq::pub_t::xattach_pipes (class reader_t *inpipe_,
}
}
void zmq::pub_t::process_term ()
void zmq::pub_t::process_term (int linger_)
{
terminating = true;
......@@ -68,7 +68,7 @@ void zmq::pub_t::process_term ()
register_term_acks (pipes.size ());
// Continue with the termination immediately.
socket_base_t::process_term ();
socket_base_t::process_term (linger_);
}
void zmq::pub_t::activated (writer_t *pipe_)
......
......@@ -47,7 +47,7 @@ namespace zmq
private:
// Hook into the termination process.
void process_term ();
void process_term (int linger_);
// Write the message to the pipe. Make the pipe inactive if writing
// fails. In such a case false is returned.
......
......@@ -42,10 +42,10 @@ void zmq::pull_t::xattach_pipes (class reader_t *inpipe_,
fq.attach (inpipe_);
}
void zmq::pull_t::process_term ()
void zmq::pull_t::process_term (int linger_)
{
fq.terminate ();
socket_base_t::process_term ();
socket_base_t::process_term (linger_);
}
int zmq::pull_t::xrecv (zmq_msg_t *msg_, int flags_)
......
......@@ -44,7 +44,7 @@ namespace zmq
private:
// Hook into the termination process.
void process_term ();
void process_term (int linger_);
// Fair queueing object for inbound pipes.
fq_t fq;
......
......@@ -43,10 +43,10 @@ void zmq::push_t::xattach_pipes (class reader_t *inpipe_,
lb.attach (outpipe_);
}
void zmq::push_t::process_term ()
void zmq::push_t::process_term (int linger_)
{
lb.terminate ();
socket_base_t::process_term ();
socket_base_t::process_term (linger_);
}
int zmq::push_t::xsend (zmq_msg_t *msg_, int flags_)
......
......@@ -44,7 +44,7 @@ namespace zmq
private:
// Hook into the termination process.
void process_term ();
void process_term (int linger_);
// Load balancer managing the outbound pipes.
lb_t lb;
......
......@@ -28,8 +28,8 @@
zmq::session_t::session_t (class io_thread_t *io_thread_,
class socket_base_t *socket_, const options_t &options_) :
own_t (io_thread_),
options (options_),
own_t (io_thread_, options_),
io_object_t (io_thread_),
in_pipe (NULL),
incomplete_in (false),
out_pipe (NULL),
......@@ -39,6 +39,7 @@ zmq::session_t::session_t (class io_thread_t *io_thread_,
pipes_attached (false),
delimiter_processed (false),
force_terminate (false),
has_linger_timer (false),
state (active)
{
}
......@@ -60,6 +61,12 @@ void zmq::session_t::proceed_with_term ()
zmq_assert (state == pending);
state = terminating;
// If there's still a pending linger timer, remove it.
if (has_linger_timer) {
cancel_timer (linger_timer_id);
has_linger_timer = false;
}
if (in_pipe) {
register_term_acks (1);
in_pipe->terminate ();
......@@ -69,7 +76,9 @@ void zmq::session_t::proceed_with_term ()
out_pipe->terminate ();
}
own_t::process_term ();
// The session has already waited for the linger period. We don't want
// the child objects to linger any more thus linger is set to zero.
own_t::process_term (0);
}
bool zmq::session_t::read (::zmq_msg_t *msg_)
......@@ -271,11 +280,25 @@ void zmq::session_t::detach ()
in_pipe->check_read ();
}
void zmq::session_t::process_term ()
void zmq::session_t::process_term (int linger_)
{
zmq_assert (state == active);
state = pending;
// If linger is set to zero, we can terminate the session straight away
// not waiting for the pending messages to be sent.
if (linger_ == 0) {
proceed_with_term ();
return;
}
// If there's finite linger value, set up a timer.
if (linger_ > 0) {
zmq_assert (!has_linger_timer);
add_timer (linger_, linger_timer_id);
has_linger_timer = true;
}
// If there's no engine and there's only delimiter in the pipe it wouldn't
// be ever read. Thus we check for it explicitly.
if (in_pipe)
......@@ -291,6 +314,15 @@ void zmq::session_t::process_term ()
proceed_with_term ();
}
void zmq::session_t::timer_event (int id_)
{
// Linger period expired. We can proceed with termination even though
// there are still pending messages to be sent.
zmq_assert (id_ == linger_timer_id);
has_linger_timer = false;
proceed_with_term ();
}
bool zmq::session_t::register_session (const blob_t &name_, session_t *session_)
{
return socket->register_session (name_, session_);
......
......@@ -22,7 +22,7 @@
#include "own.hpp"
#include "i_inout.hpp"
#include "options.hpp"
#include "io_object.hpp"
#include "blob.hpp"
#include "pipe.hpp"
......@@ -31,6 +31,7 @@ namespace zmq
class session_t :
public own_t,
public io_object_t,
public i_inout,
public i_reader_events,
public i_writer_events
......@@ -79,16 +80,16 @@ namespace zmq
~session_t ();
// Inherited socket options. These are visible to all session classes.
options_t options;
private:
// Handlers for incoming commands.
void process_plug ();
void process_attach (struct i_engine *engine_,
const blob_t &peer_identity_);
void process_term ();
void process_term (int linger_);
// i_poll_events handlers.
void timer_event (int id_);
// Remove any half processed messages. Flush unflushed messages.
// Call this function when engine disconnect to get rid of leftovers.
......@@ -127,6 +128,12 @@ namespace zmq
// pending messages in the inbound pipe.
bool force_terminate;
// ID of the linger timer
enum {linger_timer_id = 0x20};
// True is linger timer is running.
bool has_linger_timer;
enum {
active,
pending,
......
......@@ -681,7 +681,7 @@ void zmq::socket_base_t::process_unplug ()
{
}
void zmq::socket_base_t::process_term ()
void zmq::socket_base_t::process_term (int linger_)
{
// Unregister all inproc endpoints associated with this socket.
// Doing this we make sure that no new pipes from other sockets (inproc)
......@@ -689,7 +689,7 @@ void zmq::socket_base_t::process_term ()
unregister_endpoints (this);
// Continue the termination process immediately.
own_t::process_term ();
own_t::process_term (linger_);
}
void zmq::socket_base_t::process_destroy ()
......
......@@ -28,7 +28,6 @@
#include "own.hpp"
#include "array.hpp"
#include "mutex.hpp"
#include "options.hpp"
#include "stdint.hpp"
#include "atomic_counter.hpp"
#include "signaler.hpp"
......@@ -111,13 +110,10 @@ namespace zmq
virtual bool xhas_in ();
virtual int xrecv (zmq_msg_t *msg_, int options_);
// Socket options.
options_t options;
// We are declaring termination handler as protected so that
// individual socket types can hook into the termination process
// by overloading it.
void process_term ();
void process_term (int linger_);
// Delay actual destruction of the socket.
void process_destroy ();
......
......@@ -48,10 +48,10 @@ void zmq::sub_t::xattach_pipes (class reader_t *inpipe_,
fq.attach (inpipe_);
}
void zmq::sub_t::process_term ()
void zmq::sub_t::process_term (int linger_)
{
fq.terminate ();
socket_base_t::process_term ();
socket_base_t::process_term (linger_);
}
int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
......
......@@ -48,7 +48,7 @@ namespace zmq
private:
// Hook into the termination process.
void process_term ();
void process_term (int linger_);
// Check whether the message matches at least one subscription.
bool match (zmq_msg_t *msg_);
......
......@@ -80,7 +80,7 @@ void zmq::xrep_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_,
}
}
void zmq::xrep_t::process_term ()
void zmq::xrep_t::process_term (int linger_)
{
terminating = true;
......@@ -93,7 +93,7 @@ void zmq::xrep_t::process_term ()
it++)
it->second.writer->terminate ();
socket_base_t::process_term ();
socket_base_t::process_term (linger_);
}
void zmq::xrep_t::terminated (reader_t *pipe_)
......
......@@ -52,7 +52,7 @@ namespace zmq
private:
// Hook into the termination process.
void process_term ();
void process_term (int linger_);
// i_reader_events interface implementation.
void activated (reader_t *pipe_);
......
......@@ -44,11 +44,11 @@ void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_,
lb.attach (outpipe_);
}
void zmq::xreq_t::process_term ()
void zmq::xreq_t::process_term (int linger_)
{
fq.terminate ();
lb.terminate ();
socket_base_t::process_term ();
socket_base_t::process_term (linger_);
}
int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_)
......
......@@ -47,7 +47,7 @@ namespace zmq
private:
// Hook into the termination process.
void process_term ();
void process_term (int linger_);
// Messages are fair-queued from inbound pipes. And load-balanced to
// the outbound pipes.
......
......@@ -36,12 +36,11 @@
zmq::zmq_connecter_t::zmq_connecter_t (class io_thread_t *io_thread_,
class session_t *session_, const options_t &options_,
const char *protocol_, const char *address_) :
own_t (io_thread_),
own_t (io_thread_, options_),
io_object_t (io_thread_),
handle_valid (false),
wait (wait_before_connect),
session (session_),
options (options_)
session (session_)
{
int rc = tcp_connecter.set_address (protocol_, address_);
zmq_assert (rc == 0);
......
......@@ -23,7 +23,6 @@
#include "own.hpp"
#include "io_object.hpp"
#include "tcp_connecter.hpp"
#include "options.hpp"
#include "stdint.hpp"
namespace zmq
......@@ -75,9 +74,6 @@ namespace zmq
// Reference to the session we belong to.
class session_t *session;
// Associated socket options.
options_t options;
zmq_connecter_t (const zmq_connecter_t&);
void operator = (const zmq_connecter_t&);
};
......
......@@ -33,12 +33,11 @@
zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
socket_base_t *socket_, session_t *session_, fd_t fd_,
const options_t &options_) :
own_t (io_thread_),
own_t (io_thread_, options_),
sent (false),
received (false),
socket (socket_),
session (session_),
options (options_),
io_thread (io_thread_)
{
// Create the engine object for this connection.
......
......@@ -25,7 +25,6 @@
#include "own.hpp"
#include "fd.hpp"
#include "stdint.hpp"
#include "options.hpp"
#include "stdint.hpp"
#include "blob.hpp"
......@@ -76,9 +75,6 @@ namespace zmq
// Identity of the peer socket.
blob_t peer_identity;
// Associated socket options.
options_t options;
// I/O thread the object is living in. It will be used to plug
// the engine into the same I/O thread.
class io_thread_t *io_thread;
......
......@@ -26,9 +26,8 @@
zmq::zmq_listener_t::zmq_listener_t (io_thread_t *io_thread_,
socket_base_t *socket_, const options_t &options_) :
own_t (io_thread_),
own_t (io_thread_, options_),
io_object_t (io_thread_),
options (options_),
socket (socket_)
{
}
......@@ -49,10 +48,10 @@ void zmq::zmq_listener_t::process_plug ()
set_pollin (handle);
}
void zmq::zmq_listener_t::process_term ()
void zmq::zmq_listener_t::process_term (int linger_)
{
rm_fd (handle);
own_t::process_term ();
own_t::process_term (linger_);
}
void zmq::zmq_listener_t::in_event ()
......
......@@ -23,7 +23,6 @@
#include "own.hpp"
#include "io_object.hpp"
#include "tcp_listener.hpp"
#include "options.hpp"
#include "stdint.hpp"
namespace zmq
......@@ -44,7 +43,7 @@ namespace zmq
// Handlers for incoming commands.
void process_plug ();
void process_term ();
void process_term (int linger_);
// Handlers for I/O events.
void in_event ();
......@@ -55,9 +54,6 @@ namespace zmq
// Handle corresponding to the listening socket.
handle_t handle;
// Associated socket options.
options_t options;
// Socket the listerner belongs to.
class socket_base_t *socket;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment