Commit fcfad568 authored by Bob Beaty's avatar Bob Beaty Committed by Martin Sustrik

Added Recovery Interval in Milliseconds

For very high-speed message systems, the memory used for recovery can get to
be very large. The corrent limitation on that reduction is the ZMQ_RECOVERY_IVL
of 1 sec. I added in an additional option ZMQ_RECOVERY_IVL_MSEC, which is the
Recovery Interval in milliseconds. If used, this will override the previous
one, and allow you to set a sub-second recovery interval. If not set, the
default behavior is to use ZMQ_RECOVERY_IVL.
Signed-off-by: 's avatarBob Beaty <rbeaty@peak6.com>
parent 1d81d2f1
...@@ -167,6 +167,26 @@ Default value:: 10 ...@@ -167,6 +167,26 @@ Default value:: 10
Applicable socket types:: all, when using multicast transports Applicable socket types:: all, when using multicast transports
ZMQ_RECOVERY_IVL_MSEC: Get multicast recovery interval in milliseconds
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_RECOVERY_IVL'_MSEC option shall retrieve the recovery interval, in
milliseconds, for multicast transports using the specified 'socket'. The
recovery interval determines the maximum time in seconds that a receiver
can be absent from a multicast group before unrecoverable data loss will
occur.
For backward compatibility, the default value of 'ZMQ_RECOVERY_IVL_MSEC' is
-1 indicating that the recovery interval should be obtained from the
'ZMQ_RECOVERY_IVL' option. However, if the 'ZMQ_RECOVERY_IVL_MSEC' value is
not zero, then it will take precedence, and be used.
[horizontal]
Option value type:: int64_t
Option value unit:: milliseconds
Default value:: -1
Applicable socket types:: all, when using multicast transports
ZMQ_MCAST_LOOP: Control multicast loop-back ZMQ_MCAST_LOOP: Control multicast loop-back
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_MCAST_LOOP' option controls whether data sent via multicast The 'ZMQ_MCAST_LOOP' option controls whether data sent via multicast
......
...@@ -171,6 +171,30 @@ Default value:: 10 ...@@ -171,6 +171,30 @@ Default value:: 10
Applicable socket types:: all, when using multicast transports Applicable socket types:: all, when using multicast transports
ZMQ_RECOVERY_IVL_MSEC: Set multicast recovery interval in milliseconds
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_RECOVERY_IVL_MSEC' option shall set the recovery interval, specified
in milliseconds (ms) for multicast transports using the specified 'socket'.
The recovery interval determines the maximum time in milliseconds that a
receiver can be absent from a multicast group before unrecoverable data loss
will occur.
A non-zero value of the 'ZMQ_RECOVERY_IVL_MSEC' option will take precedence
over the 'ZMQ_RECOVERY_IVL' option, but since the default for the
'ZMQ_RECOVERY_IVL_MSEC' is -1, the default is to use the 'ZMQ_RECOVERY_IVL'
option value.
CAUTION: Exercise care when setting large recovery intervals as the data
needed for recovery will be held in memory. For example, a 1 minute recovery
interval at a data rate of 1Gbps requires a 7GB in-memory buffer.
[horizontal]
Option value type:: int64_t
Option value unit:: milliseconds
Default value:: -1
Applicable socket types:: all, when using multicast transports
ZMQ_MCAST_LOOP: Control multicast loop-back ZMQ_MCAST_LOOP: Control multicast loop-back
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_MCAST_LOOP' option shall control whether data sent via multicast The 'ZMQ_MCAST_LOOP' option shall control whether data sent via multicast
......
...@@ -202,6 +202,7 @@ ZMQ_EXPORT int zmq_term (void *context); ...@@ -202,6 +202,7 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_LINGER 17 #define ZMQ_LINGER 17
#define ZMQ_RECONNECT_IVL 18 #define ZMQ_RECONNECT_IVL 18
#define ZMQ_BACKLOG 19 #define ZMQ_BACKLOG 19
#define ZMQ_RECOVERY_IVL_MSEC 20 /* opt. recovery time, reconcile in 3.x */
/* Send/recv options. */ /* Send/recv options. */
#define ZMQ_NOBLOCK 1 #define ZMQ_NOBLOCK 1
......
...@@ -30,6 +30,7 @@ zmq::options_t::options_t () : ...@@ -30,6 +30,7 @@ zmq::options_t::options_t () :
affinity (0), affinity (0),
rate (100), rate (100),
recovery_ivl (10), recovery_ivl (10),
recovery_ivl_msec (-1),
use_multicast_loop (true), use_multicast_loop (true),
sndbuf (0), sndbuf (0),
rcvbuf (0), rcvbuf (0),
...@@ -101,6 +102,14 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, ...@@ -101,6 +102,14 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
recovery_ivl = (uint32_t) *((int64_t*) optval_); recovery_ivl = (uint32_t) *((int64_t*) optval_);
return 0; return 0;
case ZMQ_RECOVERY_IVL_MSEC:
if (optvallen_ != sizeof (int64_t) || *((int64_t*) optval_) < 0) {
errno = EINVAL;
return -1;
}
recovery_ivl_msec = (int32_t) *((int64_t*) optval_);
return 0;
case ZMQ_MCAST_LOOP: case ZMQ_MCAST_LOOP:
if (optvallen_ != sizeof (int64_t)) { if (optvallen_ != sizeof (int64_t)) {
errno = EINVAL; errno = EINVAL;
...@@ -225,6 +234,15 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) ...@@ -225,6 +234,15 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*optvallen_ = sizeof (int64_t); *optvallen_ = sizeof (int64_t);
return 0; return 0;
case ZMQ_RECOVERY_IVL_MSEC:
if (*optvallen_ < sizeof (int64_t)) {
errno = EINVAL;
return -1;
}
*((int64_t*) optval_) = recovery_ivl_msec;
*optvallen_ = sizeof (int64_t);
return 0;
case ZMQ_MCAST_LOOP: case ZMQ_MCAST_LOOP:
if (*optvallen_ < sizeof (int64_t)) { if (*optvallen_ < sizeof (int64_t)) {
errno = EINVAL; errno = EINVAL;
......
...@@ -44,6 +44,8 @@ namespace zmq ...@@ -44,6 +44,8 @@ namespace zmq
// Reliability time interval [s]. Default 10s. // Reliability time interval [s]. Default 10s.
uint32_t recovery_ivl; uint32_t recovery_ivl;
// Reliability time interval [ms]. Default -1 = not used.
int32_t recovery_ivl_msec;
// Enable multicast loopback. Default disabled (false). // Enable multicast loopback. Default disabled (false).
bool use_multicast_loop; bool use_multicast_loop;
......
...@@ -89,8 +89,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) ...@@ -89,8 +89,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
// Recovery interval [s]. // Recovery interval [s] or [ms] - based on the user's call
if (options.recovery_ivl <= 0) { if ((options.recovery_ivl <= 0) && (options.recovery_ivl_msec <= 0)) {
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
...@@ -199,8 +199,12 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) ...@@ -199,8 +199,12 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
if (receiver) { if (receiver) {
const int recv_only = 1, const int recv_only = 1,
rxw_max_rte = options.rate * 1000 / 8, rxw_max_tpdu = (int) pgm_max_tpdu,
rxw_secs = options.recovery_ivl, rxw_sqns = (options.recovery_ivl_msec >= 0 ?
options.recovery_ivl_msec * options.rate /
(1000 * rxw_max_tpdu) :
options.recovery_ivl * options.rate /
rxw_max_tpdu),
peer_expiry = pgm_secs (300), peer_expiry = pgm_secs (300),
spmr_expiry = pgm_msecs (25), spmr_expiry = pgm_msecs (25),
nak_bo_ivl = pgm_msecs (50), nak_bo_ivl = pgm_msecs (50),
...@@ -211,10 +215,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) ...@@ -211,10 +215,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_RECV_ONLY, &recv_only, if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_RECV_ONLY, &recv_only,
sizeof (recv_only)) || sizeof (recv_only)) ||
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_MAX_RTE, &rxw_max_rte, !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SQNS, &rxw_sqns,
sizeof (rxw_max_rte)) || sizeof (rxw_sqns)) ||
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SECS, &rxw_secs,
sizeof (rxw_secs)) ||
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry, !pgm_setsockopt (sock, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry,
sizeof (peer_expiry)) || sizeof (peer_expiry)) ||
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry, !pgm_setsockopt (sock, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry,
...@@ -232,8 +234,12 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) ...@@ -232,8 +234,12 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
goto err_abort; goto err_abort;
} else { } else {
const int send_only = 1, const int send_only = 1,
txw_max_rte = options.rate * 1000 / 8, txw_max_tpdu = (int) pgm_max_tpdu,
txw_secs = options.recovery_ivl, txw_sqns = (options.recovery_ivl_msec >= 0 ?
options.recovery_ivl_msec * options.rate /
(1000 * txw_max_tpdu) :
options.recovery_ivl * options.rate /
txw_max_tpdu),
ambient_spm = pgm_secs (30), ambient_spm = pgm_secs (30),
heartbeat_spm[] = { pgm_msecs (100), heartbeat_spm[] = { pgm_msecs (100),
pgm_msecs (100), pgm_msecs (100),
...@@ -247,10 +253,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) ...@@ -247,10 +253,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_ONLY, if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_ONLY,
&send_only, sizeof (send_only)) || &send_only, sizeof (send_only)) ||
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_MAX_RTE, !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SQNS,
&txw_max_rte, sizeof (txw_max_rte)) || &txw_sqns, sizeof (txw_sqns)) ||
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SECS,
&txw_secs, sizeof (txw_secs)) ||
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_AMBIENT_SPM, !pgm_setsockopt (sock, IPPROTO_PGM, PGM_AMBIENT_SPM,
&ambient_spm, sizeof (ambient_spm)) || &ambient_spm, sizeof (ambient_spm)) ||
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_HEARTBEAT_SPM, !pgm_setsockopt (sock, IPPROTO_PGM, PGM_HEARTBEAT_SPM,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment