Commit 18c51702 authored by Constantin Rack's avatar Constantin Rack

Merge pull request #1638 from jens-auer/tcp_buffer_options

Tcp buffer options to set RECV/SEND buffer
parents c41fe88d 908d6b67
...@@ -736,6 +736,33 @@ Option value unit:: N/A ...@@ -736,6 +736,33 @@ Option value unit:: N/A
Default value:: not set Default value:: not set
Applicable socket types:: all, when using TCP transport Applicable socket types:: all, when using TCP transport
ZMQ_TCP_RECV_BUFFER: Size of the TCP receive buffer
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_RECV_BUFFER' specifies the maximum number of bytes which can
be received by an individual syscall to receive data from the TCP
socket. The buffer size is specified as an integer number from 0 (very small)
to 10 (very large). The default value is 3.
[horizontal]
Option value type:: int
Option value unit:: N/A
Default value:: 3
Applicable socket types:: all, when using TCP transport
ZMQ_TCP_SEND_BUFFER: Size of the TCP receive buffer
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_SEND_BUFFER' specifies the maximum number of bytes which can
be sent by an individual syscall to transmit data to the TCP
socket. The buffer size is specified as an integer number from 0 (very small)
to 10 (very large). The default value is 3.
[horizontal]
Option value type:: int
Option value unit:: N/A
Default value:: 3
Applicable socket types:: all, when using TCP transport
RETURN VALUE RETURN VALUE
------------ ------------
......
...@@ -1071,6 +1071,33 @@ Option value unit:: boolean ...@@ -1071,6 +1071,33 @@ Option value unit:: boolean
Default value:: 1 (true) Default value:: 1 (true)
Applicable socket types:: all, when using TCP transports. Applicable socket types:: all, when using TCP transports.
ZMQ_TCP_RECV_BUFFER: Size of the TCP receive buffer
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_RECV_BUFFER' specifies the maximum number of bytes which can
be received by an individual syscall to receive data from the TCP
socket. The buffer size is specified as an integer number from 0 (very small)
to 10 (very large). The default value is 3.
[horizontal]
Option value type:: int
Option value unit:: N/A
Default value:: 3
Applicable socket types:: all, when using TCP transport
ZMQ_TCP_SEND_BUFFER: Size of the TCP receive buffer
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_SEND_BUFFER' specifies the maximum number of bytes which can
be sent by an individual syscall to transmit data to the TCP
socket. The buffer size is specified as an integer number from 0 (very small)
to 10 (very large). The default value is 3.
[horizontal]
Option value type:: int
Option value unit:: N/A
Default value:: 3
Applicable socket types:: all, when using TCP transport
RETURN VALUE RETURN VALUE
------------ ------------
......
...@@ -323,6 +323,8 @@ ZMQ_EXPORT uint32_t zmq_msg_routing_id (zmq_msg_t *msg); ...@@ -323,6 +323,8 @@ ZMQ_EXPORT uint32_t zmq_msg_routing_id (zmq_msg_t *msg);
#define ZMQ_CONNECT_TIMEOUT 79 #define ZMQ_CONNECT_TIMEOUT 79
#define ZMQ_TCP_RETRANSMIT_TIMEOUT 80 #define ZMQ_TCP_RETRANSMIT_TIMEOUT 80
#define ZMQ_THREAD_SAFE 81 #define ZMQ_THREAD_SAFE 81
#define ZMQ_TCP_RECV_BUFFER 82
#define ZMQ_TCP_SEND_BUFFER 83
/* Message options */ /* Message options */
#define ZMQ_MORE 1 #define ZMQ_MORE 1
......
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
*/ */
#include <string.h> #include <string.h>
#include <cmath>
#include "options.hpp" #include "options.hpp"
#include "err.hpp" #include "err.hpp"
...@@ -65,6 +66,8 @@ zmq::options_t::options_t () : ...@@ -65,6 +66,8 @@ zmq::options_t::options_t () :
tcp_keepalive_cnt (-1), tcp_keepalive_cnt (-1),
tcp_keepalive_idle (-1), tcp_keepalive_idle (-1),
tcp_keepalive_intvl (-1), tcp_keepalive_intvl (-1),
tcp_recv_buffer_size (3),
tcp_send_buffer_size (3),
mechanism (ZMQ_NULL), mechanism (ZMQ_NULL),
as_server (0), as_server (0),
gss_plaintext (false), gss_plaintext (false),
...@@ -280,6 +283,18 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, ...@@ -280,6 +283,18 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
} }
break; break;
case ZMQ_TCP_RECV_BUFFER:
if (is_int && (value >= 0 && value <= 10) ) {
tcp_recv_buffer_size = static_cast<int>(std::pow(2, value)) * 1024;
}
break;
case ZMQ_TCP_SEND_BUFFER:
if (is_int && (value >= 0 && value <= 10) ) {
tcp_send_buffer_size = static_cast<int>(std::pow(2, value)) * 1024;
}
break;
case ZMQ_IMMEDIATE: case ZMQ_IMMEDIATE:
if (is_int && (value == 0 || value == 1)) { if (is_int && (value == 0 || value == 1)) {
immediate = value; immediate = value;
...@@ -790,6 +805,20 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) ...@@ -790,6 +805,20 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
} }
break; break;
case ZMQ_TCP_SEND_BUFFER:
if (is_int) {
*value = tcp_send_buffer_size;
return 0;
}
break;
case ZMQ_TCP_RECV_BUFFER:
if (is_int) {
*value = tcp_recv_buffer_size;
return 0;
}
break;
case ZMQ_MECHANISM: case ZMQ_MECHANISM:
if (is_int) { if (is_int) {
*value = mechanism; *value = mechanism;
......
...@@ -156,6 +156,10 @@ namespace zmq ...@@ -156,6 +156,10 @@ namespace zmq
typedef std::vector <tcp_address_mask_t> tcp_accept_filters_t; typedef std::vector <tcp_address_mask_t> tcp_accept_filters_t;
tcp_accept_filters_t tcp_accept_filters; tcp_accept_filters_t tcp_accept_filters;
// TCO buffer sizes
int tcp_recv_buffer_size;
int tcp_send_buffer_size;
// IPC accept() filters // IPC accept() filters
# if defined ZMQ_HAVE_SO_PEERCRED || defined ZMQ_HAVE_LOCAL_PEERCRED # if defined ZMQ_HAVE_SO_PEERCRED || defined ZMQ_HAVE_LOCAL_PEERCRED
bool zap_ipc_creds; bool zap_ipc_creds;
......
...@@ -203,10 +203,10 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, ...@@ -203,10 +203,10 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
if (options.raw_socket) { if (options.raw_socket) {
// no handshaking for raw sock, instantiate raw encoder and decoders // no handshaking for raw sock, instantiate raw encoder and decoders
encoder = new (std::nothrow) raw_encoder_t (out_batch_size); encoder = new (std::nothrow) raw_encoder_t (options.tcp_send_buffer_size);
alloc_assert (encoder); alloc_assert (encoder);
decoder = new (std::nothrow) raw_decoder_t (in_batch_size); decoder = new (std::nothrow) raw_decoder_t (options.tcp_recv_buffer_size);
alloc_assert (decoder); alloc_assert (decoder);
// disable handshaking for raw socket // disable handshaking for raw socket
...@@ -385,12 +385,12 @@ void zmq::stream_engine_t::out_event () ...@@ -385,12 +385,12 @@ void zmq::stream_engine_t::out_event ()
outpos = NULL; outpos = NULL;
outsize = encoder->encode (&outpos, 0); outsize = encoder->encode (&outpos, 0);
while (outsize < out_batch_size) { while (outsize < options.tcp_send_buffer_size) {
if ((this->*next_msg) (&tx_msg) == -1) if ((this->*next_msg) (&tx_msg) == -1)
break; break;
encoder->load_msg (&tx_msg); encoder->load_msg (&tx_msg);
unsigned char *bufptr = outpos + outsize; unsigned char *bufptr = outpos + outsize;
size_t n = encoder->encode (&bufptr, out_batch_size - outsize); size_t n = encoder->encode (&bufptr, options.tcp_send_buffer_size - outsize);
zmq_assert (n > 0); zmq_assert (n > 0);
if (outpos == NULL) if (outpos == NULL)
outpos = bufptr; outpos = bufptr;
...@@ -587,10 +587,10 @@ bool zmq::stream_engine_t::handshake () ...@@ -587,10 +587,10 @@ bool zmq::stream_engine_t::handshake ()
return false; return false;
} }
encoder = new (std::nothrow) v1_encoder_t (out_batch_size); encoder = new (std::nothrow) v1_encoder_t (options.tcp_send_buffer_size);
alloc_assert (encoder); alloc_assert (encoder);
decoder = new (std::nothrow) v1_decoder_t (in_batch_size, options.maxmsgsize); decoder = new (std::nothrow) v1_decoder_t (options.tcp_recv_buffer_size, options.maxmsgsize);
alloc_assert (decoder); alloc_assert (decoder);
// We have already sent the message header. // We have already sent the message header.
...@@ -635,11 +635,11 @@ bool zmq::stream_engine_t::handshake () ...@@ -635,11 +635,11 @@ bool zmq::stream_engine_t::handshake ()
} }
encoder = new (std::nothrow) v1_encoder_t ( encoder = new (std::nothrow) v1_encoder_t (
out_batch_size); options.tcp_send_buffer_size);
alloc_assert (encoder); alloc_assert (encoder);
decoder = new (std::nothrow) v1_decoder_t ( decoder = new (std::nothrow) v1_decoder_t (
in_batch_size, options.maxmsgsize); options.tcp_recv_buffer_size, options.maxmsgsize);
alloc_assert (decoder); alloc_assert (decoder);
} }
else else
...@@ -650,19 +650,19 @@ bool zmq::stream_engine_t::handshake () ...@@ -650,19 +650,19 @@ bool zmq::stream_engine_t::handshake ()
return false; return false;
} }
encoder = new (std::nothrow) v2_encoder_t (out_batch_size); encoder = new (std::nothrow) v2_encoder_t (options.tcp_send_buffer_size);
alloc_assert (encoder); alloc_assert (encoder);
decoder = new (std::nothrow) v2_decoder_t ( decoder = new (std::nothrow) v2_decoder_t (
in_batch_size, options.maxmsgsize); options.tcp_recv_buffer_size, options.maxmsgsize);
alloc_assert (decoder); alloc_assert (decoder);
} }
else { else {
encoder = new (std::nothrow) v2_encoder_t (out_batch_size); encoder = new (std::nothrow) v2_encoder_t (options.tcp_send_buffer_size);
alloc_assert (encoder); alloc_assert (encoder);
decoder = new (std::nothrow) v2_decoder_t ( decoder = new (std::nothrow) v2_decoder_t (
in_batch_size, options.maxmsgsize); options.tcp_recv_buffer_size, options.maxmsgsize);
alloc_assert (decoder); alloc_assert (decoder);
if (options.mechanism == ZMQ_NULL if (options.mechanism == ZMQ_NULL
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment