Commit bc4a1ce3 authored by Martin Sustrik's avatar Martin Sustrik

ZMQ_HWM split into ZMQ_SNDHWM and ZMQ_RCVHWM

These new options allow to control the maximum size of the
inbound and outbound message pipe separately.
Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent 507718ee
......@@ -57,12 +57,12 @@ Default value:: N/A
Applicable socket types:: all
ZMQ_HWM: Retrieve high water mark
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_HWM' option shall retrieve the high water mark for the specified
'socket'. The high water mark is a hard limit on the maximum number of
outstanding messages 0MQ shall queue in memory for any single peer that the
specified 'socket' is communicating with.
ZMQ_SNDHWM: Set high water mark for outbound messages
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_SNDHWM' option shall set the high water mark for outbound messages on
the specified 'socket'. The high water mark is a hard limit on the maximum
number of outstanding messages 0MQ shall queue in memory for any single peer
that the specified 'socket' is communicating with.
If this limit has been reached the socket shall enter an exceptional state and
depending on the socket type, 0MQ shall take appropriate action such as
......@@ -70,7 +70,29 @@ blocking or dropping sent messages. Refer to the individual socket descriptions
in linkzmq:zmq_socket[3] for details on the exact action taken for each socket
type.
The default 'ZMQ_HWM' value of zero means "no limit".
The default 'ZMQ_SNDHWM' value of zero means "no limit".
[horizontal]
Option value type:: int
Option value unit:: messages
Default value:: 0
Applicable socket types:: all
ZMQ_RCVHWM: Set high water mark for inbound messages
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_RCVHWM' option shall set the high water mark for inbound messages on
the specified 'socket'. The high water mark is a hard limit on the maximum
number of outstanding messages 0MQ shall queue in memory for any single peer
that the specified 'socket' is communicating with.
If this limit has been reached the socket shall enter an exceptional state and
depending on the socket type, 0MQ shall take appropriate action such as
blocking or dropping sent messages. Refer to the individual socket descriptions
in linkzmq:zmq_socket[3] for details on the exact action taken for each socket
type.
The default 'ZMQ_RCVHWM' value of zero means "no limit".
[horizontal]
Option value type:: int
......@@ -348,9 +370,9 @@ EXAMPLE
.Retrieving the high water mark
----
/* Retrieve high water mark into hwm */
int hwm;
size_t hwm_size = sizeof (hwm);
rc = zmq_getsockopt (socket, ZMQ_HWM, &hwm, &hwm_size);
int sndhwm;
size_t sndhwm_size = sizeof (sndhwm);
rc = zmq_getsockopt (socket, ZMQ_SNDHWM, &sndhwm, &sndhwm_size);
assert (rc == 0);
----
......
......@@ -25,12 +25,12 @@ argument is the size of the option value in bytes.
The following socket options can be set with the _zmq_setsockopt()_ function:
ZMQ_HWM: Set high water mark
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_HWM' option shall set the high water mark for the specified 'socket'.
The high water mark is a hard limit on the maximum number of outstanding
messages 0MQ shall queue in memory for any single peer that the specified
'socket' is communicating with.
ZMQ_SNDHWM: Set high water mark for outbound messages
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_SNDHWM' option shall set the high water mark for outbound messages on
the specified 'socket'. The high water mark is a hard limit on the maximum
number of outstanding messages 0MQ shall queue in memory for any single peer
that the specified 'socket' is communicating with.
If this limit has been reached the socket shall enter an exceptional state and
depending on the socket type, 0MQ shall take appropriate action such as
......@@ -38,7 +38,29 @@ blocking or dropping sent messages. Refer to the individual socket descriptions
in linkzmq:zmq_socket[3] for details on the exact action taken for each socket
type.
The default 'ZMQ_HWM' value of zero means "no limit".
The default 'ZMQ_SNDHWM' value of zero means "no limit".
[horizontal]
Option value type:: int
Option value unit:: messages
Default value:: 0
Applicable socket types:: all
ZMQ_RCVHWM: Set high water mark for inbound messages
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_RCVHWM' option shall set the high water mark for inbound messages on
the specified 'socket'. The high water mark is a hard limit on the maximum
number of outstanding messages 0MQ shall queue in memory for any single peer
that the specified 'socket' is communicating with.
If this limit has been reached the socket shall enter an exceptional state and
depending on the socket type, 0MQ shall take appropriate action such as
blocking or dropping sent messages. Refer to the individual socket descriptions
in linkzmq:zmq_socket[3] for details on the exact action taken for each socket
type.
The default 'ZMQ_RCVHWM' value of zero means "no limit".
[horizontal]
Option value type:: int
......
......@@ -183,7 +183,6 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_XSUB 10
/* Socket options. */
#define ZMQ_HWM 1
#define ZMQ_AFFINITY 4
#define ZMQ_IDENTITY 5
#define ZMQ_SUBSCRIBE 6
......@@ -201,6 +200,8 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_BACKLOG 19
#define ZMQ_RECONNECT_IVL_MAX 21
#define ZMQ_MAXMSGSIZE 22
#define ZMQ_SNDHWM 23
#define ZMQ_RCVHWM 24
/* Send/recv options. */
#define ZMQ_NOBLOCK 1
......
......@@ -26,7 +26,8 @@
#include "err.hpp"
zmq::options_t::options_t () :
hwm (0),
sndhwm (0),
rcvhwm (0),
affinity (0),
rate (100),
recovery_ivl (10000),
......@@ -49,12 +50,20 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
{
switch (option_) {
case ZMQ_HWM:
case ZMQ_SNDHWM:
if (optvallen_ != sizeof (int) || *((int*) optval_) < 0) {
errno = EINVAL;
return -1;
}
hwm = *((int*) optval_);
sndhwm = *((int*) optval_);
return 0;
case ZMQ_RCVHWM:
if (optvallen_ != sizeof (int) || *((int*) optval_) < 0) {
errno = EINVAL;
return -1;
}
rcvhwm = *((int*) optval_);
return 0;
case ZMQ_AFFINITY:
......@@ -168,13 +177,22 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
{
switch (option_) {
case ZMQ_HWM:
case ZMQ_SNDHWM:
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
return -1;
}
*((int*) optval_) = hwm;
*optvallen_ = sizeof (uint64_t);
*((int*) optval_) = sndhwm;
*optvallen_ = sizeof (int);
return 0;
case ZMQ_RCVHWM:
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
return -1;
}
*((int*) optval_) = rcvhwm;
*optvallen_ = sizeof (int);
return 0;
case ZMQ_AFFINITY:
......
......@@ -35,8 +35,9 @@ namespace zmq
int setsockopt (int option_, const void *optval_, size_t optvallen_);
int getsockopt (int option_, void *optval_, size_t *optvallen_);
// High-water mark for messages in pipe.
int hwm;
// High-water marks for message pipes.
int sndhwm;
int rcvhwm;
uint64_t affinity;
blob_t identity;
......
......@@ -247,11 +247,13 @@ void zmq::session_t::process_attach (i_engine *engine_,
// Create the pipes, as required.
if (options.requires_in) {
create_pipe (socket, this, options.hwm, &socket_reader, &out_pipe);
create_pipe (socket, this, options.rcvhwm, &socket_reader,
&out_pipe);
out_pipe->set_event_sink (this);
}
if (options.requires_out) {
create_pipe (this, socket, options.hwm, &in_pipe, &socket_writer);
create_pipe (this, socket, options.sndhwm, &in_pipe,
&socket_writer);
in_pipe->set_event_sink (this);
}
......
......@@ -377,20 +377,25 @@ int zmq::socket_base_t::connect (const char *addr_)
// The total HWM for an inproc connection should be the sum of
// the binder's HWM and the connector's HWM.
int hwm;
if (options.hwm == 0 || peer.options.hwm == 0)
hwm = 0;
int sndhwm;
int rcvhwm;
if (options.sndhwm == 0 || peer.options.rcvhwm == 0)
sndhwm = 0;
else
hwm = options.hwm + peer.options.hwm;
sndhwm = options.sndhwm + peer.options.rcvhwm;
if (options.rcvhwm == 0 || peer.options.sndhwm == 0)
rcvhwm = 0;
else
rcvhwm = options.rcvhwm + peer.options.sndhwm;
// Create inbound pipe, if required.
if (options.requires_in)
create_pipe (this, peer.socket, hwm, &inpipe_reader,
create_pipe (this, peer.socket, rcvhwm, &inpipe_reader,
&inpipe_writer);
// Create outbound pipe, if required.
if (options.requires_out)
create_pipe (peer.socket, this, hwm, &outpipe_reader,
create_pipe (peer.socket, this, sndhwm, &outpipe_reader,
&outpipe_writer);
// Attach the pipes to this socket object.
......@@ -429,12 +434,12 @@ int zmq::socket_base_t::connect (const char *addr_)
// Create inbound pipe, if required.
if (options.requires_in)
create_pipe (this, session, options.hwm,
create_pipe (this, session, options.rcvhwm,
&inpipe_reader, &inpipe_writer);
// Create outbound pipe, if required.
if (options.requires_out)
create_pipe (session, this, options.hwm,
create_pipe (session, this, options.sndhwm,
&outpipe_reader, &outpipe_writer);
// Attach the pipes to the socket object.
......
......@@ -33,14 +33,14 @@ int main (int argc, char *argv [])
void *sb = zmq_socket (ctx, ZMQ_PULL);
assert (sb);
int hwm = 2;
int rc = zmq_setsockopt (sb, ZMQ_HWM, &hwm, sizeof (hwm));
int rc = zmq_setsockopt (sb, ZMQ_RCVHWM, &hwm, sizeof (hwm));
assert (rc == 0);
rc = zmq_bind (sb, "inproc://a");
assert (rc == 0);
void *sc = zmq_socket (ctx, ZMQ_PUSH);
assert (sc);
rc = zmq_setsockopt (sc, ZMQ_HWM, &hwm, sizeof (hwm));
rc = zmq_setsockopt (sc, ZMQ_SNDHWM, &hwm, sizeof (hwm));
assert (rc == 0);
rc = zmq_connect (sc, "inproc://a");
assert (rc == 0);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment