Commit fca45921 authored by somdoron's avatar somdoron

problem: zeromq performance got worsen by some changes

parent 115e7de7
...@@ -59,6 +59,12 @@ namespace zmq ...@@ -59,6 +59,12 @@ namespace zmq
// unnecessary network stack traversals. // unnecessary network stack traversals.
in_batch_size = 8192, in_batch_size = 8192,
// Maximal batching size for engines with sending functionality.
// So, if there are 10 messages that fit into the batch size, all of
// them may be written by a single 'send' system call, thus avoiding
// unnecessary network stack traversals.
out_batch_size = 8192,
// Maximal delta between high and low watermark. // Maximal delta between high and low watermark.
max_wm_delta = 1024, max_wm_delta = 1024,
......
...@@ -43,8 +43,8 @@ zmq::options_t::options_t () : ...@@ -43,8 +43,8 @@ zmq::options_t::options_t () :
recovery_ivl (10000), recovery_ivl (10000),
multicast_hops (1), multicast_hops (1),
multicast_maxtpdu (1500), multicast_maxtpdu (1500),
sndbuf (8192), sndbuf (-1),
rcvbuf (8192), rcvbuf (-1),
tos (0), tos (0),
type (-1), type (-1),
linger (-1), linger (-1),
...@@ -146,14 +146,14 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, ...@@ -146,14 +146,14 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
break; break;
case ZMQ_SNDBUF: case ZMQ_SNDBUF:
if (is_int && value >= 0) { if (is_int && value >= -1) {
sndbuf = value; sndbuf = value;
return 0; return 0;
} }
break; break;
case ZMQ_RCVBUF: case ZMQ_RCVBUF:
if (is_int && value >= 0) { if (is_int && value >= -1) {
rcvbuf = value; rcvbuf = value;
return 0; return 0;
} }
......
...@@ -192,10 +192,10 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, ...@@ -192,10 +192,10 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
if (options.raw_socket) { if (options.raw_socket) {
// no handshaking for raw sock, instantiate raw encoder and decoders // no handshaking for raw sock, instantiate raw encoder and decoders
encoder = new (std::nothrow) raw_encoder_t (options.sndbuf); encoder = new (std::nothrow) raw_encoder_t (out_batch_size);
alloc_assert (encoder); alloc_assert (encoder);
decoder = new (std::nothrow) raw_decoder_t (options.rcvbuf); decoder = new (std::nothrow) raw_decoder_t (in_batch_size);
alloc_assert (decoder); alloc_assert (decoder);
// disable handshaking for raw socket // disable handshaking for raw socket
...@@ -374,12 +374,12 @@ void zmq::stream_engine_t::out_event () ...@@ -374,12 +374,12 @@ void zmq::stream_engine_t::out_event ()
outpos = NULL; outpos = NULL;
outsize = encoder->encode (&outpos, 0); outsize = encoder->encode (&outpos, 0);
while (outsize < (size_t) options.sndbuf) { while (outsize < (size_t) out_batch_size) {
if ((this->*next_msg) (&tx_msg) == -1) if ((this->*next_msg) (&tx_msg) == -1)
break; break;
encoder->load_msg (&tx_msg); encoder->load_msg (&tx_msg);
unsigned char *bufptr = outpos + outsize; unsigned char *bufptr = outpos + outsize;
size_t n = encoder->encode (&bufptr, options.sndbuf - outsize); size_t n = encoder->encode (&bufptr, out_batch_size - outsize);
zmq_assert (n > 0); zmq_assert (n > 0);
if (outpos == NULL) if (outpos == NULL)
outpos = bufptr; outpos = bufptr;
...@@ -576,10 +576,10 @@ bool zmq::stream_engine_t::handshake () ...@@ -576,10 +576,10 @@ bool zmq::stream_engine_t::handshake ()
return false; return false;
} }
encoder = new (std::nothrow) v1_encoder_t (options.sndbuf); encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
alloc_assert (encoder); alloc_assert (encoder);
decoder = new (std::nothrow) v1_decoder_t (options.rcvbuf, options.maxmsgsize); decoder = new (std::nothrow) v1_decoder_t (in_batch_size, options.maxmsgsize);
alloc_assert (decoder); alloc_assert (decoder);
// We have already sent the message header. // We have already sent the message header.
...@@ -623,12 +623,11 @@ bool zmq::stream_engine_t::handshake () ...@@ -623,12 +623,11 @@ bool zmq::stream_engine_t::handshake ()
return false; return false;
} }
encoder = new (std::nothrow) v1_encoder_t ( encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
options.sndbuf);
alloc_assert (encoder); alloc_assert (encoder);
decoder = new (std::nothrow) v1_decoder_t ( decoder = new (std::nothrow) v1_decoder_t (
options.rcvbuf, options.maxmsgsize); in_batch_size, options.maxmsgsize);
alloc_assert (decoder); alloc_assert (decoder);
} }
else else
...@@ -639,19 +638,19 @@ bool zmq::stream_engine_t::handshake () ...@@ -639,19 +638,19 @@ bool zmq::stream_engine_t::handshake ()
return false; return false;
} }
encoder = new (std::nothrow) v2_encoder_t (options.sndbuf); encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
alloc_assert (encoder); alloc_assert (encoder);
decoder = new (std::nothrow) v2_decoder_t ( decoder = new (std::nothrow) v2_decoder_t (
options.rcvbuf, options.maxmsgsize); in_batch_size, options.maxmsgsize);
alloc_assert (decoder); alloc_assert (decoder);
} }
else { else {
encoder = new (std::nothrow) v2_encoder_t (options.sndbuf); encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
alloc_assert (encoder); alloc_assert (encoder);
decoder = new (std::nothrow) v2_decoder_t ( decoder = new (std::nothrow) v2_decoder_t (
options.rcvbuf, options.maxmsgsize); in_batch_size, options.maxmsgsize);
alloc_assert (decoder); alloc_assert (decoder);
if (options.mechanism == ZMQ_NULL if (options.mechanism == ZMQ_NULL
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment