Commit cfcab66c authored by jean-airoldie's avatar jean-airoldie

Problem: {in,out}_batch_size must be configured at compiled time

Solution: Added a socket option to configure them at runtime.
parent 4904bf71
...@@ -922,6 +922,41 @@ Default value:: 0 ...@@ -922,6 +922,41 @@ Default value:: 0
Applicable socket types:: ZMQ_ROUTER Applicable socket types:: ZMQ_ROUTER
ZMQ_IN_BATCH_SIZE: Maximal receive batch size
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the maximal amount of messages that can be received in a single
'recv' system call. This can be used to improved throughtput at the expense of
latency and vice-versa.
Cannot be zero.
NOTE: in DRAFT state, not yet available in stable releases.
[horizontal]
Option value type:: int
Option value unit:: messages
Default value:: 8192
Applicable socket types:: All
ZMQ_OUT_BATCH_SIZE: Maximal send batch size
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the maximal amount of messages that can be sent in a single
'send' system call. This can be used to improved throughtput at the expense of
latency and vice-versa.
Cannot be zero.
NOTE: in DRAFT state, not yet available in stable releases.
[horizontal]
Option value type:: int
Option value unit:: messages
Default value:: 8192
Applicable socket types:: All
RETURN VALUE RETURN VALUE
------------ ------------
The _zmq_getsockopt()_ function shall return zero if successful. Otherwise it The _zmq_getsockopt()_ function shall return zero if successful. Otherwise it
......
...@@ -1362,6 +1362,40 @@ Default value:: 0 ...@@ -1362,6 +1362,40 @@ Default value:: 0
Applicable socket types:: ZMQ_ROUTER Applicable socket types:: ZMQ_ROUTER
ZMQ_IN_BATCH_SIZE: Maximal receive batch size
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the maximal amount of messages that can be received in a single
'recv' system call. This can be used to improved throughtput at the expense of
latency and vice-versa.
Cannot be zero.
NOTE: in DRAFT state, not yet available in stable releases.
[horizontal]
Option value type:: int
Option value unit:: messages
Default value:: 8192
Applicable socket types:: All
ZMQ_OUT_BATCH_SIZE: Maximal send batch size
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the maximal amount of messages that can be sent in a single
'send' system call. This can be used to improved throughtput at the expense of
latency and vice-versa.
Cannot be zero.
NOTE: in DRAFT state, not yet available in stable releases.
[horizontal]
Option value type:: int
Option value unit:: messages
Default value:: 8192
Applicable socket types:: All
RETURN VALUE RETURN VALUE
------------ ------------
The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it
......
...@@ -658,6 +658,8 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_); ...@@ -658,6 +658,8 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_);
#define ZMQ_XPUB_MANUAL_LAST_VALUE 98 #define ZMQ_XPUB_MANUAL_LAST_VALUE 98
#define ZMQ_SOCKS_USERNAME 99 #define ZMQ_SOCKS_USERNAME 99
#define ZMQ_SOCKS_PASSWORD 100 #define ZMQ_SOCKS_PASSWORD 100
#define ZMQ_IN_BATCH_SIZE 101
#define ZMQ_OUT_BATCH_SIZE 102
/* DRAFT Context options */ /* DRAFT Context options */
#define ZMQ_ZERO_COPY_RECV 10 #define ZMQ_ZERO_COPY_RECV 10
......
...@@ -52,18 +52,6 @@ enum ...@@ -52,18 +52,6 @@ enum
// real-time behaviour (less latency peaks). // real-time behaviour (less latency peaks).
inbound_poll_rate = 100, inbound_poll_rate = 100,
// Maximal batching size for engines with receiving functionality.
// So, if there are 10 messages that fit into the batch size, all of
// them may be read by a single 'recv' system call, thus avoiding
// unnecessary network stack traversals.
in_batch_size = 8192,
// Maximal batching size for engines with sending functionality.
// So, if there are 10 messages that fit into the batch size, all of
// them may be written by a single 'send' system call, thus avoiding
// unnecessary network stack traversals.
out_batch_size = 8192,
// Maximal delta between high and low watermark. // Maximal delta between high and low watermark.
max_wm_delta = 1024, max_wm_delta = 1024,
......
...@@ -407,7 +407,8 @@ void zmq::norm_engine_t::recv_data (NormObjectHandle object) ...@@ -407,7 +407,8 @@ void zmq::norm_engine_t::recv_data (NormObjectHandle object)
if (NULL == rxState) { if (NULL == rxState) {
// This is a new stream, so create rxState with zmq decoder, etc // This is a new stream, so create rxState with zmq decoder, etc
rxState = new (std::nothrow) rxState = new (std::nothrow)
NormRxStreamState (object, options.maxmsgsize, options.zero_copy); NormRxStreamState (object, options.maxmsgsize, options.zero_copy,
options.in_batch_size);
errno_assert (rxState); errno_assert (rxState);
if (!rxState->Init ()) { if (!rxState->Init ()) {
...@@ -548,10 +549,14 @@ void zmq::norm_engine_t::recv_data (NormObjectHandle object) ...@@ -548,10 +549,14 @@ void zmq::norm_engine_t::recv_data (NormObjectHandle object)
} // end zmq::norm_engine_t::recv_data() } // end zmq::norm_engine_t::recv_data()
zmq::norm_engine_t::NormRxStreamState::NormRxStreamState ( zmq::norm_engine_t::NormRxStreamState::NormRxStreamState (
NormObjectHandle normStream, int64_t maxMsgSize, bool zeroCopy) : NormObjectHandle normStream,
int64_t maxMsgSize,
bool zeroCopy,
int inBatchSize) :
norm_stream (normStream), norm_stream (normStream),
max_msg_size (maxMsgSize), max_msg_size (maxMsgSize),
zero_copy (zeroCopy), zero_copy (zeroCopy),
in_batch_size (inBatchSize),
in_sync (false), in_sync (false),
rx_ready (false), rx_ready (false),
zmq_decoder (NULL), zmq_decoder (NULL),
...@@ -583,7 +588,6 @@ bool zmq::norm_engine_t::NormRxStreamState::Init () ...@@ -583,7 +588,6 @@ bool zmq::norm_engine_t::NormRxStreamState::Init ()
skip_norm_sync = false; skip_norm_sync = false;
if (NULL != zmq_decoder) if (NULL != zmq_decoder)
delete zmq_decoder; delete zmq_decoder;
// Note "in_batch_size" comes from config.h
zmq_decoder = zmq_decoder =
new (std::nothrow) v2_decoder_t (in_batch_size, max_msg_size, zero_copy); new (std::nothrow) v2_decoder_t (in_batch_size, max_msg_size, zero_copy);
alloc_assert (zmq_decoder); alloc_assert (zmq_decoder);
......
...@@ -71,7 +71,8 @@ class norm_engine_t : public io_object_t, public i_engine ...@@ -71,7 +71,8 @@ class norm_engine_t : public io_object_t, public i_engine
public: public:
NormRxStreamState (NormObjectHandle normStream, NormRxStreamState (NormObjectHandle normStream,
int64_t maxMsgSize, int64_t maxMsgSize,
bool zeroCopy); bool zeroCopy,
int inBatchSize);
~NormRxStreamState (); ~NormRxStreamState ();
NormObjectHandle GetStreamHandle () const { return norm_stream; } NormObjectHandle GetStreamHandle () const { return norm_stream; }
...@@ -136,6 +137,7 @@ class norm_engine_t : public io_object_t, public i_engine ...@@ -136,6 +137,7 @@ class norm_engine_t : public io_object_t, public i_engine
NormObjectHandle norm_stream; NormObjectHandle norm_stream;
int64_t max_msg_size; int64_t max_msg_size;
bool zero_copy; bool zero_copy;
int in_batch_size;
bool in_sync; bool in_sync;
bool rx_ready; bool rx_ready;
v2_decoder_t *zmq_decoder; v2_decoder_t *zmq_decoder;
......
...@@ -243,6 +243,8 @@ zmq::options_t::options_t () : ...@@ -243,6 +243,8 @@ zmq::options_t::options_t () :
zap_enforce_domain (false), zap_enforce_domain (false),
loopback_fastpath (false), loopback_fastpath (false),
multicast_loop (true), multicast_loop (true),
in_batch_size (8192),
out_batch_size (8192),
zero_copy (true), zero_copy (true),
router_notify (0), router_notify (0),
monitor_event_version (1) monitor_event_version (1)
...@@ -768,6 +770,22 @@ int zmq::options_t::setsockopt (int option_, ...@@ -768,6 +770,22 @@ int zmq::options_t::setsockopt (int option_,
return do_setsockopt_int_as_bool_relaxed (optval_, optvallen_, return do_setsockopt_int_as_bool_relaxed (optval_, optvallen_,
&multicast_loop); &multicast_loop);
#ifdef ZMQ_BUILD_DRAFT_API
case ZMQ_IN_BATCH_SIZE:
if (is_int && value > 0) {
in_batch_size = value;
return 0;
}
break;
case ZMQ_OUT_BATCH_SIZE:
if (is_int && value > 0) {
out_batch_size = value;
return 0;
}
break;
#endif
default: default:
#if defined(ZMQ_ACT_MILITANT) #if defined(ZMQ_ACT_MILITANT)
// There are valid scenarios for probing with unknown socket option // There are valid scenarios for probing with unknown socket option
...@@ -1184,6 +1202,19 @@ int zmq::options_t::getsockopt (int option_, ...@@ -1184,6 +1202,19 @@ int zmq::options_t::getsockopt (int option_,
return 0; return 0;
} }
break; break;
case ZMQ_IN_BATCH_SIZE:
if (is_int) {
*value = in_batch_size;
return 0;
}
break;
case ZMQ_OUT_BATCH_SIZE:
if (is_int) {
*value = out_batch_size;
return 0;
}
break;
#endif #endif
......
...@@ -264,6 +264,17 @@ struct options_t ...@@ -264,6 +264,17 @@ struct options_t
// Loop sent multicast packets to local sockets // Loop sent multicast packets to local sockets
bool multicast_loop; bool multicast_loop;
// Maximal batching size for engines with receiving functionality.
// So, if there are 10 messages that fit into the batch size, all of
// them may be read by a single 'recv' system call, thus avoiding
// unnecessary network stack traversals.
int in_batch_size;
// Maximal batching size for engines with sending functionality.
// So, if there are 10 messages that fit into the batch size, all of
// them may be written by a single 'send' system call, thus avoiding
// unnecessary network stack traversals.
int out_batch_size;
// Use zero copy strategy for storing message content when decoding. // Use zero copy strategy for storing message content when decoding.
bool zero_copy; bool zero_copy;
......
...@@ -346,10 +346,10 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) ...@@ -346,10 +346,10 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
// For receiver transport preallocate pgm_msgv array. // For receiver transport preallocate pgm_msgv array.
if (receiver) { if (receiver) {
zmq_assert (in_batch_size > 0); zmq_assert (options.in_batch_size > 0);
size_t max_tsdu_size = get_max_tsdu_size (); size_t max_tsdu_size = get_max_tsdu_size ();
pgm_msgv_len = (int) in_batch_size / max_tsdu_size; pgm_msgv_len = (int) options.in_batch_size / max_tsdu_size;
if ((int) in_batch_size % max_tsdu_size) if ((int) options.in_batch_size % max_tsdu_size)
pgm_msgv_len++; pgm_msgv_len++;
zmq_assert (pgm_msgv_len); zmq_assert (pgm_msgv_len);
......
...@@ -207,10 +207,10 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, ...@@ -207,10 +207,10 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
if (_options.raw_socket) { if (_options.raw_socket) {
// no handshaking for raw sock, instantiate raw encoder and decoders // no handshaking for raw sock, instantiate raw encoder and decoders
_encoder = new (std::nothrow) raw_encoder_t (out_batch_size); _encoder = new (std::nothrow) raw_encoder_t (_options.out_batch_size);
alloc_assert (_encoder); alloc_assert (_encoder);
_decoder = new (std::nothrow) raw_decoder_t (in_batch_size); _decoder = new (std::nothrow) raw_decoder_t (_options.in_batch_size);
alloc_assert (_decoder); alloc_assert (_decoder);
// disable handshaking for raw socket // disable handshaking for raw socket
...@@ -399,12 +399,13 @@ void zmq::stream_engine_t::out_event () ...@@ -399,12 +399,13 @@ void zmq::stream_engine_t::out_event ()
_outpos = NULL; _outpos = NULL;
_outsize = _encoder->encode (&_outpos, 0); _outsize = _encoder->encode (&_outpos, 0);
while (_outsize < static_cast<size_t> (out_batch_size)) { while (_outsize < static_cast<size_t> (_options.out_batch_size)) {
if ((this->*_next_msg) (&_tx_msg) == -1) if ((this->*_next_msg) (&_tx_msg) == -1)
break; break;
_encoder->load_msg (&_tx_msg); _encoder->load_msg (&_tx_msg);
unsigned char *bufptr = _outpos + _outsize; unsigned char *bufptr = _outpos + _outsize;
size_t n = _encoder->encode (&bufptr, out_batch_size - _outsize); size_t n =
_encoder->encode (&bufptr, _options.out_batch_size - _outsize);
zmq_assert (n > 0); zmq_assert (n > 0);
if (_outpos == NULL) if (_outpos == NULL)
_outpos = bufptr; _outpos = bufptr;
...@@ -664,11 +665,11 @@ bool zmq::stream_engine_t::handshake_v1_0_unversioned () ...@@ -664,11 +665,11 @@ bool zmq::stream_engine_t::handshake_v1_0_unversioned ()
return false; return false;
} }
_encoder = new (std::nothrow) v1_encoder_t (out_batch_size); _encoder = new (std::nothrow) v1_encoder_t (_options.out_batch_size);
alloc_assert (_encoder); alloc_assert (_encoder);
_decoder = _decoder = new (std::nothrow)
new (std::nothrow) v1_decoder_t (in_batch_size, _options.maxmsgsize); v1_decoder_t (_options.in_batch_size, _options.maxmsgsize);
alloc_assert (_decoder); alloc_assert (_decoder);
// We have already sent the message header. // We have already sent the message header.
...@@ -716,11 +717,11 @@ bool zmq::stream_engine_t::handshake_v1_0 () ...@@ -716,11 +717,11 @@ bool zmq::stream_engine_t::handshake_v1_0 ()
return false; return false;
} }
_encoder = new (std::nothrow) v1_encoder_t (out_batch_size); _encoder = new (std::nothrow) v1_encoder_t (_options.out_batch_size);
alloc_assert (_encoder); alloc_assert (_encoder);
_decoder = _decoder = new (std::nothrow)
new (std::nothrow) v1_decoder_t (in_batch_size, _options.maxmsgsize); v1_decoder_t (_options.in_batch_size, _options.maxmsgsize);
alloc_assert (_decoder); alloc_assert (_decoder);
return true; return true;
...@@ -734,11 +735,11 @@ bool zmq::stream_engine_t::handshake_v2_0 () ...@@ -734,11 +735,11 @@ bool zmq::stream_engine_t::handshake_v2_0 ()
return false; return false;
} }
_encoder = new (std::nothrow) v2_encoder_t (out_batch_size); _encoder = new (std::nothrow) v2_encoder_t (_options.out_batch_size);
alloc_assert (_encoder); alloc_assert (_encoder);
_decoder = new (std::nothrow) _decoder = new (std::nothrow) v2_decoder_t (
v2_decoder_t (in_batch_size, _options.maxmsgsize, _options.zero_copy); _options.in_batch_size, _options.maxmsgsize, _options.zero_copy);
alloc_assert (_decoder); alloc_assert (_decoder);
return true; return true;
...@@ -746,11 +747,11 @@ bool zmq::stream_engine_t::handshake_v2_0 () ...@@ -746,11 +747,11 @@ bool zmq::stream_engine_t::handshake_v2_0 ()
bool zmq::stream_engine_t::handshake_v3_0 () bool zmq::stream_engine_t::handshake_v3_0 ()
{ {
_encoder = new (std::nothrow) v2_encoder_t (out_batch_size); _encoder = new (std::nothrow) v2_encoder_t (_options.out_batch_size);
alloc_assert (_encoder); alloc_assert (_encoder);
_decoder = new (std::nothrow) _decoder = new (std::nothrow) v2_decoder_t (
v2_decoder_t (in_batch_size, _options.maxmsgsize, _options.zero_copy); _options.in_batch_size, _options.maxmsgsize, _options.zero_copy);
alloc_assert (_decoder); alloc_assert (_decoder);
if (_options.mechanism == ZMQ_NULL if (_options.mechanism == ZMQ_NULL
......
...@@ -55,6 +55,8 @@ ...@@ -55,6 +55,8 @@
#define ZMQ_XPUB_MANUAL_LAST_VALUE 98 #define ZMQ_XPUB_MANUAL_LAST_VALUE 98
#define ZMQ_SOCKS_USERNAME 99 #define ZMQ_SOCKS_USERNAME 99
#define ZMQ_SOCKS_PASSWORD 100 #define ZMQ_SOCKS_PASSWORD 100
#define ZMQ_IN_BATCH_SIZE 101
#define ZMQ_OUT_BATCH_SIZE 102
/* DRAFT Context options */ /* DRAFT Context options */
#define ZMQ_ZERO_COPY_RECV 10 #define ZMQ_ZERO_COPY_RECV 10
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment