Commit 42ab88e4 authored by Min RK's avatar Min RK

Merge pull request #1786 from hintjens/master

Cleaning up recent option names
parents 5eccd874 62c66ae7
......@@ -26,20 +26,30 @@ ZMQ_IO_THREADS: Get number of I/O threads
The 'ZMQ_IO_THREADS' argument returns the size of the 0MQ thread pool
for this context.
ZMQ_MAX_SOCKETS: Get maximum number of sockets
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_MAX_SOCKETS' argument returns the maximum number of sockets
allowed for this context.
ZMQ_MAX_MSGSZ: Get maximum message size
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_MAX_MSGSZ' argument returns the maximum size of a message
allowed for this context. Default value is INT_MAX.
ZMQ_SOCKET_LIMIT: Get largest configurable number of sockets
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_SOCKET_LIMIT' argument returns the largest number of sockets that
linkzmq:zmq_ctx_set[3] will accept.
ZMQ_IPV6: Set IPv6 option
~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_IPV6' argument returns the IPv6 option for the context.
ZMQ_BLOCKY: Get blocky setting
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_BLOCKY' argument returns 1 if the context will block on terminate,
......
......@@ -47,6 +47,7 @@ context.
[horizontal]
Default value:: 1
ZMQ_THREAD_SCHED_POLICY: Set scheduling policy for I/O threads
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_THREAD_SCHED_POLICY' argument sets the scheduling policy for
......@@ -58,6 +59,7 @@ This option only applies before creating any sockets on the context.
[horizontal]
Default value:: -1
ZMQ_THREAD_PRIORITY: Set scheduling priority for I/O threads
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_THREAD_PRIORITY' argument sets scheduling priority for
......@@ -69,6 +71,19 @@ This option only applies before creating any sockets on the context.
[horizontal]
Default value:: -1
ZMQ_MAX_MSGSZ: Set maximum message size
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_MAX_MSGSZ' argument sets the maximum allowed size
of a message sent in the context. You can query the maximal
allowed value with linkzmq:zmq_ctx_get[3] using the
'ZMQ_MAX_MSGSZ' option.
[horizontal]
Default value:: INT_MAX
Maximum value:: INT_MAX
ZMQ_MAX_SOCKETS: Set maximum number of sockets
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_MAX_SOCKETS' argument sets the maximum number of sockets allowed
......@@ -78,6 +93,7 @@ linkzmq:zmq_ctx_get[3] using the 'ZMQ_SOCKET_LIMIT' option.
[horizontal]
Default value:: 1024
ZMQ_IPV6: Set IPv6 option
~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_IPV6' argument sets the IPv6 value for all sockets created in
......
......@@ -498,6 +498,7 @@ documentation for the 'SO_RCVBUF' socket option.
[horizontal]
Option value type:: int
Option value unit:: bytes
Default value:: 8192
Applicable socket types:: all
......@@ -612,6 +613,7 @@ documentation for the 'SO_SNDBUF' socket option.
[horizontal]
Option value type:: int
Option value unit:: bytes
Default value:: 8192
Applicable socket types:: all
......@@ -700,14 +702,13 @@ Default value:: -1 (leave to OS default)
Applicable socket types:: all, when using TCP transports.
ZMQ_TCP_RETRANSMIT_TIMEOUT: Retrieve TCP Retransmit Timeout
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
ZMQ_TCP_MAXRT: Retrieve Max TCP Retransmit Timeout
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
On OSes where it is supported, retrieves how long before an unacknowledged TCP
retransmit times out.
The system normally attempts many TCP retransmits following an exponential
backoff strategy. This means that after a network outage, it may take a long
time before the session can be re-established. Setting this option allows
the timeout to happen at a shorter interval.
retransmit times out. The system normally attempts many TCP retransmits
following an exponential backoff strategy. This means that after a network
outage, it may take a long time before the session can be re-established.
Setting this option allows the timeout to happen at a shorter interval.
[horizontal]
Option value type:: int
......@@ -716,9 +717,9 @@ Default value:: 0 (leave to OS default)
Applicable socket types:: all, when using TCP transports.
ZMQ_THREADSAFE: Retrieve socket thread safety
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_THREADSAFE' option shall retrieve a boolean value indicating whether
ZMQ_THREAD_SAFE: Retrieve socket thread safety
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_THREAD_SAFE' option shall retrieve a boolean value indicating whether
or not the socket is threadsafe. Currently 'ZMQ_CLIENT' and 'ZMQ_SERVER' sockets
are threadsafe.
......@@ -764,31 +765,6 @@ Option value unit:: N/A
Default value:: not set
Applicable socket types:: all, when using TCP transport
ZMQ_TCP_RECV_BUFFER: Size of the TCP receive buffer
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_TCP_RECV_BUFFER' specifies the maximum number of bytes which can
be received by an individual syscall to receive data from the TCP socket.
The default value is 8192.
[horizontal]
Option value type:: int
Option value unit:: >0
Default value:: 8192
Applicable socket types:: all, when using TCP transport
ZMQ_TCP_SEND_BUFFER: Size of the TCP receive buffer
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_TCP_SEND_BUFFER' specifies the maximum number of bytes which can
be sent by an individual syscall to transmit data to the TCP socket.
The default value is 8192.
[horizontal]
Option value type:: int
Option value unit:: >0
Default value:: 8192
Applicable socket types:: all, when using TCP transport
ZMQ_VMCI_BUFFER_SIZE: Retrieve buffer size of the VMCI socket
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
......
......@@ -870,14 +870,13 @@ Default value:: -1 (leave to OS default)
Applicable socket types:: all, when using TCP transports.
ZMQ_TCP_RETRANSMIT_TIMEOUT: Set TCP Retransmit Timeout
ZMQ_TCP_MAXRT: Set TCP Maximum Retransmit Timeout
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
On OSes where it is supported, sets how long before an unacknowledged TCP
retransmit times out.
The system normally attempts many TCP retransmits following an exponential
backoff strategy. This means that after a network outage, it may take a long
time before the session can be re-established. Setting this option allows
the timeout to happen at a shorter interval.
retransmit times out. The system normally attempts many TCP retransmits
following an exponential backoff strategy. This means that after a network
outage, it may take a long time before the session can be re-established.
Setting this option allows the timeout to happen at a shorter interval.
[horizontal]
Option value type:: int
......@@ -916,11 +915,11 @@ Default value:: N/A
Applicable socket types:: ZMQ_SUB
ZMQ_XPUB_VERBOSE: provide all subscription messages on XPUB sockets
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the 'XPUB' socket behaviour on new subscriptions and unsubscriptions.
A value of '0' is the default and passes only new subscription messages to
upstream. A value of '1' passes all subscription messages upstream.
ZMQ_XPUB_VERBOSE: pass subscribe messages on XPUB socket
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the 'XPUB' socket behaviour on new subscriptions. If enabled,
the socket passes all subscribe messages to the caller. If disabled,
these are not visible to the caller. The default is 0 (disabled).
[horizontal]
Option value type:: int
......@@ -929,17 +928,12 @@ Default value:: 0
Applicable socket types:: ZMQ_XPUB
ZMQ_XPUB_VERBOSE_UNSUBSCRIBE: provide all unsubscription messages on XPUB sockets
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the 'XPUB' socket behaviour on new subscriptions and unsubscriptions.
A value of '0' is the default and passes only the last unsubscription message to
upstream. A value of '1' passes all unsubscription messages upstream.
This behaviour should be enabled in all the intermediary XPUB sockets if
ZMQ_XPUB_VERBOSE is also being used in order to allow the correct forwarding
of all the unsubscription messages.
NOTE: This behaviour only takes effect when ZMQ_XPUB_VERBOSE is also enabled.
ZMQ_XPUB_VERBOSER: pass subscribe and unsubscribe messages on XPUB socket
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the 'XPUB' socket behaviour on new subscriptions and ubsubscriptions.
If enabled, the socket passes all subscribe and unsubscribe messages to the
caller. If disabled, these are not visible to the caller. The default is 0
(disabled).
[horizontal]
Option value type:: int
......@@ -1108,31 +1102,6 @@ Option value unit:: boolean
Default value:: 1 (true)
Applicable socket types:: all, when using TCP transports.
ZMQ_TCP_RECV_BUFFER: Size of the TCP receive buffer
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_TCP_RECV_BUFFER' specifies the maximum number of bytes which can
be received by an individual syscall to receive data from the TCP socket.
The default value is 8192.
[horizontal]
Option value type:: int
Option value unit:: >0
Default value:: 8192
Applicable socket types:: all, when using TCP transport
ZMQ_TCP_SEND_BUFFER: Size of the TCP receive buffer
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_TCP_SEND_BUFFER' specifies the maximum number of bytes which can
be sent by an individual syscall to transmit data to the TCP socket.
The default value is 8192.
[horizontal]
Option value type:: int
Option value unit:: >0
Default value:: 8192
Applicable socket types:: all, when using TCP transport
ZMQ_VMCI_BUFFER_SIZE: Set buffer size of the VMCI socket
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
......
......@@ -185,13 +185,13 @@ ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch);
/* 0MQ infrastructure (a.k.a. context) initialisation & termination. */
/******************************************************************************/
/* New API */
/* Context options */
#define ZMQ_IO_THREADS 1
#define ZMQ_MAX_SOCKETS 2
#define ZMQ_SOCKET_LIMIT 3
#define ZMQ_THREAD_PRIORITY 3
#define ZMQ_THREAD_SCHED_POLICY 4
#define ZMQ_MAX_MSGSZ 5
/* Default for new contexts */
#define ZMQ_IO_THREADS_DFLT 1
......@@ -325,6 +325,8 @@ ZMQ_EXPORT const char *zmq_msg_group (zmq_msg_t *msg);
#define ZMQ_HANDSHAKE_IVL 66
#define ZMQ_SOCKS_PROXY 68
#define ZMQ_XPUB_NODROP 69
// All options after this is for version 4.2 and still *draft*
// Subject to arbitrary change without notice
#define ZMQ_BLOCKY 70
#define ZMQ_XPUB_MANUAL 71
#define ZMQ_XPUB_WELCOME_MSG 72
......@@ -333,12 +335,10 @@ ZMQ_EXPORT const char *zmq_msg_group (zmq_msg_t *msg);
#define ZMQ_HEARTBEAT_IVL 75
#define ZMQ_HEARTBEAT_TTL 76
#define ZMQ_HEARTBEAT_TIMEOUT 77
#define ZMQ_XPUB_VERBOSE_UNSUBSCRIBE 78
#define ZMQ_XPUB_VERBOSER 78
#define ZMQ_CONNECT_TIMEOUT 79
#define ZMQ_TCP_RETRANSMIT_TIMEOUT 80
#define ZMQ_TCP_MAXRT 80
#define ZMQ_THREAD_SAFE 81
#define ZMQ_TCP_RECV_BUFFER 82
#define ZMQ_TCP_SEND_BUFFER 83
#define ZMQ_MULTICAST_MAXTPDU 84
#define ZMQ_VMCI_BUFFER_SIZE 85
#define ZMQ_VMCI_BUFFER_MIN_SIZE 86
......
......@@ -36,6 +36,7 @@
#endif
#include <limits>
#include <climits>
#include <new>
#include <string.h>
......@@ -79,6 +80,7 @@ zmq::ctx_t::ctx_t () :
slot_count (0),
slots (NULL),
max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
max_msgsz (INT_MAX),
io_thread_count (ZMQ_IO_THREADS_DFLT),
blocky (true),
ipv6 (false),
......@@ -251,13 +253,13 @@ int zmq::ctx_t::set (int option_, int optval_)
if (option_ == ZMQ_THREAD_PRIORITY && optval_ >= 0) {
opt_sync.lock();
thread_priority = optval_;
opt_sync.unlock();
opt_sync.unlock ();
}
else
if (option_ == ZMQ_THREAD_SCHED_POLICY && optval_ >= 0) {
opt_sync.lock();
thread_sched_policy = optval_;
opt_sync.unlock();
opt_sync.unlock ();
}
else
if (option_ == ZMQ_BLOCKY && optval_ >= 0) {
......@@ -265,6 +267,12 @@ int zmq::ctx_t::set (int option_, int optval_)
blocky = (optval_ != 0);
opt_sync.unlock ();
}
else
if (option_ == ZMQ_MAX_MSGSZ && optval_ >= 0) {
opt_sync.lock ();
max_msgsz = optval_ < INT_MAX? optval_: INT_MAX;
opt_sync.unlock ();
}
else {
errno = EINVAL;
rc = -1;
......@@ -289,6 +297,9 @@ int zmq::ctx_t::get (int option_)
else
if (option_ == ZMQ_BLOCKY)
rc = blocky;
else
if (option_ == ZMQ_MAX_MSGSZ)
rc = max_msgsz;
else {
errno = EINVAL;
rc = -1;
......
......@@ -199,6 +199,9 @@ namespace zmq
// Maximum number of sockets that can be opened at the same time.
int max_sockets;
// Maximum allowed message size
int max_msgsz;
// Number of I/O threads to launch.
int io_thread_count;
......
......@@ -42,13 +42,13 @@ zmq::options_t::options_t () :
recovery_ivl (10000),
multicast_hops (1),
multicast_maxtpdu (1500),
sndbuf (-1),
rcvbuf (-1),
sndbuf (8192),
rcvbuf (8192),
tos (0),
type (-1),
linger (-1),
connect_timeout (0),
tcp_retransmit_timeout (0),
tcp_maxrt (0),
reconnect_ivl (100),
reconnect_ivl_max (0),
backlog (100),
......@@ -66,8 +66,6 @@ zmq::options_t::options_t () :
tcp_keepalive_cnt (-1),
tcp_keepalive_idle (-1),
tcp_keepalive_intvl (-1),
tcp_recv_buffer_size (8192),
tcp_send_buffer_size (8192),
mechanism (ZMQ_NULL),
as_server (0),
gss_plaintext (false),
......@@ -178,9 +176,9 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
}
break;
case ZMQ_TCP_RETRANSMIT_TIMEOUT:
case ZMQ_TCP_MAXRT:
if (is_int && value >= 0) {
tcp_retransmit_timeout = value;
tcp_maxrt = value;
return 0;
}
break;
......@@ -298,20 +296,6 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
}
break;
case ZMQ_TCP_RECV_BUFFER:
if (is_int && (value > 0) ) {
tcp_recv_buffer_size = static_cast<unsigned int>(value);
return 0;
}
break;
case ZMQ_TCP_SEND_BUFFER:
if (is_int && (value > 0) ) {
tcp_send_buffer_size = static_cast<unsigned int>(value);
return 0;
}
break;
case ZMQ_IMMEDIATE:
if (is_int && (value == 0 || value == 1)) {
immediate = value;
......@@ -745,9 +729,9 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
}
break;
case ZMQ_TCP_RETRANSMIT_TIMEOUT:
case ZMQ_TCP_MAXRT:
if (is_int) {
*value = tcp_retransmit_timeout;
*value = tcp_maxrt;
return 0;
}
break;
......@@ -866,20 +850,6 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
}
break;
case ZMQ_TCP_SEND_BUFFER:
if (is_int) {
*value = tcp_send_buffer_size;
return 0;
}
break;
case ZMQ_TCP_RECV_BUFFER:
if (is_int) {
*value = tcp_recv_buffer_size;
return 0;
}
break;
case ZMQ_MECHANISM:
if (is_int) {
*value = mechanism;
......
......@@ -104,7 +104,7 @@ namespace zmq
// Maximum interval in milliseconds beyond which TCP will timeout
// retransmitted packets.
// Default 0 (unused)
int tcp_retransmit_timeout;
int tcp_maxrt;
// Minimum interval between attempts to reconnect, in milliseconds.
// Default 100ms
......@@ -160,10 +160,6 @@ namespace zmq
typedef std::vector <tcp_address_mask_t> tcp_accept_filters_t;
tcp_accept_filters_t tcp_accept_filters;
// TCP buffer sizes
unsigned int tcp_recv_buffer_size;
unsigned int tcp_send_buffer_size;
// IPC accept() filters
# if defined ZMQ_HAVE_SO_PEERCRED || defined ZMQ_HAVE_LOCAL_PEERCRED
bool zap_ipc_creds;
......
......@@ -245,7 +245,7 @@ namespace zmq
void update_pipe_options(int option_);
// Socket's mailbox object.
i_mailbox* mailbox;
i_mailbox *mailbox;
// List of attached pipes.
typedef array_t <pipe_t, 3> pipes_t;
......
......@@ -203,10 +203,10 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
if (options.raw_socket) {
// no handshaking for raw sock, instantiate raw encoder and decoders
encoder = new (std::nothrow) raw_encoder_t (options.tcp_send_buffer_size);
encoder = new (std::nothrow) raw_encoder_t (options.sndbuf);
alloc_assert (encoder);
decoder = new (std::nothrow) raw_decoder_t (options.tcp_recv_buffer_size);
decoder = new (std::nothrow) raw_decoder_t (options.rcvbuf);
alloc_assert (decoder);
// disable handshaking for raw socket
......@@ -385,12 +385,12 @@ void zmq::stream_engine_t::out_event ()
outpos = NULL;
outsize = encoder->encode (&outpos, 0);
while (outsize < options.tcp_send_buffer_size) {
while (outsize < (size_t) options.sndbuf) {
if ((this->*next_msg) (&tx_msg) == -1)
break;
encoder->load_msg (&tx_msg);
unsigned char *bufptr = outpos + outsize;
size_t n = encoder->encode (&bufptr, options.tcp_send_buffer_size - outsize);
size_t n = encoder->encode (&bufptr, options.sndbuf - outsize);
zmq_assert (n > 0);
if (outpos == NULL)
outpos = bufptr;
......@@ -587,10 +587,10 @@ bool zmq::stream_engine_t::handshake ()
return false;
}
encoder = new (std::nothrow) v1_encoder_t (options.tcp_send_buffer_size);
encoder = new (std::nothrow) v1_encoder_t (options.sndbuf);
alloc_assert (encoder);
decoder = new (std::nothrow) v1_decoder_t (options.tcp_recv_buffer_size, options.maxmsgsize);
decoder = new (std::nothrow) v1_decoder_t (options.rcvbuf, options.maxmsgsize);
alloc_assert (decoder);
// We have already sent the message header.
......@@ -635,11 +635,11 @@ bool zmq::stream_engine_t::handshake ()
}
encoder = new (std::nothrow) v1_encoder_t (
options.tcp_send_buffer_size);
options.sndbuf);
alloc_assert (encoder);
decoder = new (std::nothrow) v1_decoder_t (
options.tcp_recv_buffer_size, options.maxmsgsize);
options.rcvbuf, options.maxmsgsize);
alloc_assert (decoder);
}
else
......@@ -650,19 +650,19 @@ bool zmq::stream_engine_t::handshake ()
return false;
}
encoder = new (std::nothrow) v2_encoder_t (options.tcp_send_buffer_size);
encoder = new (std::nothrow) v2_encoder_t (options.sndbuf);
alloc_assert (encoder);
decoder = new (std::nothrow) v2_decoder_t (
options.tcp_recv_buffer_size, options.maxmsgsize);
options.rcvbuf, options.maxmsgsize);
alloc_assert (decoder);
}
else {
encoder = new (std::nothrow) v2_encoder_t (options.tcp_send_buffer_size);
encoder = new (std::nothrow) v2_encoder_t (options.sndbuf);
alloc_assert (encoder);
decoder = new (std::nothrow) v2_decoder_t (
options.tcp_recv_buffer_size, options.maxmsgsize);
options.rcvbuf, options.maxmsgsize);
alloc_assert (decoder);
if (options.mechanism == ZMQ_NULL
......
......@@ -84,7 +84,7 @@ void zmq::set_tcp_send_buffer (fd_t sockfd_, int bufsize_)
void zmq::set_tcp_receive_buffer (fd_t sockfd_, int bufsize_)
{
const int rc = setsockopt (sockfd_, SOL_SOCKET, SO_RCVBUF,
(char*) &bufsize_, sizeof bufsize_);
(char *) &bufsize_, sizeof bufsize_);
#ifdef ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
......@@ -153,7 +153,7 @@ void zmq::tune_tcp_keepalives (fd_t s_, int keepalive_, int keepalive_cnt_, int
#endif // ZMQ_HAVE_WINDOWS
}
void zmq::tune_tcp_retransmit_timeout (fd_t sockfd_, int timeout_)
void zmq::tune_tcp_maxrt (fd_t sockfd_, int timeout_)
{
if (timeout_ <= 0)
return;
......
......@@ -47,8 +47,8 @@ namespace zmq
// Tunes TCP keep-alives
void tune_tcp_keepalives (fd_t s_, int keepalive_, int keepalive_cnt_, int keepalive_idle_, int keepalive_intvl_);
// Tunes TCP retransmit timeout
void tune_tcp_retransmit_timeout (fd_t sockfd_, int timeout_);
// Tunes TCP max retransmit timeout
void tune_tcp_maxrt (fd_t sockfd_, int timeout_);
// Writes data to the socket. Returns the number of bytes actually
// written (even zero is to be considered to be a success). In case
......
......@@ -146,7 +146,7 @@ void zmq::tcp_connecter_t::out_event ()
tune_tcp_socket (fd);
tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
tune_tcp_retransmit_timeout (fd, options.tcp_retransmit_timeout);
tune_tcp_maxrt (fd, options.tcp_maxrt);
// remember our fd for ZMQ_SRCFD in messages
socket->set_fd (fd);
......
......@@ -100,7 +100,7 @@ void zmq::tcp_listener_t::in_event ()
tune_tcp_socket (fd);
tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
tune_tcp_retransmit_timeout (fd, options.tcp_retransmit_timeout);
tune_tcp_maxrt (fd, options.tcp_maxrt);
// remember our fd for ZMQ_SRCFD in messages
socket->set_fd(fd);
......
......@@ -133,19 +133,23 @@ void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
if (option_ == ZMQ_XPUB_VERBOSE || option_ == ZMQ_XPUB_VERBOSE_UNSUBSCRIBE ||
option_ == ZMQ_XPUB_NODROP || option_ == ZMQ_XPUB_MANUAL)
{
if (option_ == ZMQ_XPUB_VERBOSE
|| option_ == ZMQ_XPUB_VERBOSER
|| option_ == ZMQ_XPUB_NODROP
|| option_ == ZMQ_XPUB_MANUAL) {
if (optvallen_ != sizeof(int) || *static_cast <const int*> (optval_) < 0) {
errno = EINVAL;
return -1;
}
if (option_ == ZMQ_XPUB_VERBOSE)
if (option_ == ZMQ_XPUB_VERBOSE) {
verbose_subs = (*static_cast <const int*> (optval_) != 0);
verbose_unsubs = 0;
}
else
if (option_ == ZMQ_XPUB_VERBOSE_UNSUBSCRIBE)
verbose_unsubs = (*static_cast <const int*> (optval_) != 0);
if (option_ == ZMQ_XPUB_VERBOSER) {
verbose_subs = (*static_cast <const int*> (optval_) != 0);
verbose_unsubs = verbose_subs;
}
else
if (option_ == ZMQ_XPUB_NODROP)
lossy = (*static_cast <const int*> (optval_) == 0);
......@@ -155,15 +159,13 @@ int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
}
else
if (option_ == ZMQ_SUBSCRIBE && manual) {
if (last_pipe != NULL) {
subscriptions.add((unsigned char *)optval_, optvallen_, last_pipe);
}
if (last_pipe != NULL)
subscriptions.add ((unsigned char *)optval_, optvallen_, last_pipe);
}
else
if (option_ == ZMQ_UNSUBSCRIBE && manual) {
if (last_pipe != NULL) {
subscriptions.rm((unsigned char *)optval_, optvallen_, last_pipe);
}
if (last_pipe != NULL)
subscriptions.rm ((unsigned char *)optval_, optvallen_, last_pipe);
}
else
if (option_ == ZMQ_XPUB_WELCOME_MSG) {
......
......@@ -164,12 +164,12 @@ void *zmq_ctx_new (void)
int zmq_ctx_term (void *ctx_)
{
if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
if (!ctx_ || !((zmq::ctx_t *) ctx_)->check_tag ()) {
errno = EFAULT;
return -1;
}
int rc = ((zmq::ctx_t*) ctx_)->terminate ();
int rc = ((zmq::ctx_t *) ctx_)->terminate ();
int en = errno;
// Shut down only if termination was not interrupted by a signal.
......@@ -193,30 +193,29 @@ int zmq_ctx_term (void *ctx_)
int zmq_ctx_shutdown (void *ctx_)
{
if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
if (!ctx_ || !((zmq::ctx_t *) ctx_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::ctx_t*) ctx_)->shutdown ();
return ((zmq::ctx_t *) ctx_)->shutdown ();
}
int zmq_ctx_set (void *ctx_, int option_, int optval_)
{
if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
if (!ctx_ || !((zmq::ctx_t *) ctx_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::ctx_t*) ctx_)->set (option_, optval_);
return ((zmq::ctx_t *) ctx_)->set (option_, optval_);
}
int zmq_ctx_get (void *ctx_, int option_)
{
if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
if (!ctx_ || !((zmq::ctx_t *) ctx_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::ctx_t*) ctx_)->get (option_);
return ((zmq::ctx_t *) ctx_)->get (option_);
}
// Stable/legacy context API
......@@ -247,11 +246,11 @@ int zmq_ctx_destroy (void *ctx_)
void *zmq_socket (void *ctx_, int type_)
{
if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
if (!ctx_ || !((zmq::ctx_t *) ctx_)->check_tag ()) {
errno = EFAULT;
return NULL;
}
zmq::ctx_t *ctx = (zmq::ctx_t*) ctx_;
zmq::ctx_t *ctx = (zmq::ctx_t *) ctx_;
zmq::socket_base_t *s = ctx->create_socket (type_);
return (void *) s;
}
......@@ -366,15 +365,20 @@ int zmq_disconnect (void *s_, const char *addr_)
// Sending functions.
static int
static inline int
s_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
{
size_t sz = zmq_msg_size (msg_);
int rc = s_->send ((zmq::msg_t *) msg_, flags_);
if (unlikely (rc < 0))
return -1;
// truncate returned size to INT_MAX to avoid overflow to negative values
return (int) (sz < INT_MAX ? sz : INT_MAX);
// This is what I'd like to do, my C++ fu is too weak -- PH 2016/02/09
// int max_msgsz = s_->parent->get (ZMQ_MAX_MSGSZ);
size_t max_msgsz = INT_MAX;
// Truncate returned size to INT_MAX to avoid overflow to negative values
return (int) (sz < max_msgsz? sz: max_msgsz);
}
/* To be deprecated once zmq_msg_send() is stable */
......@@ -407,7 +411,6 @@ int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
errno = err;
return -1;
}
// Note the optimisation here. We don't close the msg object as it is
// empty anyway. This may change when implementation of zmq_msg_t changes.
return rc;
......@@ -433,7 +436,6 @@ int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_)
errno = err;
return -1;
}
// Note the optimisation here. We don't close the msg object as it is
// empty anyway. This may change when implementation of zmq_msg_t changes.
return rc;
......@@ -484,12 +486,13 @@ int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
static int
s_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
{
int rc = s_->recv ((zmq::msg_t*) msg_, flags_);
int rc = s_->recv ((zmq::msg_t *) msg_, flags_);
if (unlikely (rc < 0))
return -1;
// truncate returned size to INT_MAX to avoid overflow to negative values
// Truncate returned size to INT_MAX to avoid overflow to negative values
size_t sz = zmq_msg_size (msg_);
return (int) (sz < INT_MAX ? sz : INT_MAX);
return (int) (sz < INT_MAX? sz: INT_MAX);
}
/* To be deprecated once zmq_msg_recv() is stable */
......
#include "testutil.hpp"
void test_setsockopt_tcp_recv_buffer()
void test_setsockopt_tcp_recv_buffer (void)
{
int rc;
void *ctx = zmq_ctx_new();
void *socket = zmq_socket(ctx, ZMQ_PUSH);
void *ctx = zmq_ctx_new ();
void *socket = zmq_socket (ctx, ZMQ_PUSH);
int val = 0;
size_t placeholder = sizeof(val);
size_t placeholder = sizeof (val);
rc = zmq_getsockopt(socket, ZMQ_TCP_RECV_BUFFER, &val, &placeholder);
assert(rc == 0);
assert(val == 8192);
rc = zmq_getsockopt (socket, ZMQ_RCVBUF, &val, &placeholder);
assert (rc == 0);
assert (val == 8192);
rc = zmq_setsockopt(socket, ZMQ_TCP_RECV_BUFFER, &val, sizeof(val));
assert(rc == 0);
assert(val == 8192);
rc = zmq_setsockopt (socket, ZMQ_RCVBUF, &val, sizeof (val));
assert (rc == 0);
assert (val == 8192);
rc = zmq_getsockopt(socket, ZMQ_TCP_RECV_BUFFER, &val, &placeholder);
assert(rc == 0);
assert(val == 8192);
rc = zmq_getsockopt (socket, ZMQ_RCVBUF, &val, &placeholder);
assert (rc == 0);
assert (val == 8192);
val = 16384;
rc = zmq_setsockopt(socket, ZMQ_TCP_RECV_BUFFER, &val, sizeof(val));
assert(rc == 0);
assert(val == 16384);
rc = zmq_setsockopt (socket, ZMQ_RCVBUF, &val, sizeof (val));
assert (rc == 0);
assert (val == 16384);
rc = zmq_getsockopt(socket, ZMQ_TCP_RECV_BUFFER, &val, &placeholder);
assert(rc == 0);
assert(val == 16384);
rc = zmq_getsockopt (socket, ZMQ_RCVBUF, &val, &placeholder);
assert (rc == 0);
assert (val == 16384);
zmq_close(socket);
zmq_ctx_term(ctx);
zmq_close (socket);
zmq_ctx_term (ctx);
}
void test_setsockopt_tcp_send_buffer()
void test_setsockopt_tcp_send_buffer (void)
{
int rc;
void *ctx = zmq_ctx_new();
void *socket = zmq_socket(ctx, ZMQ_PUSH);
void *ctx = zmq_ctx_new ();
void *socket = zmq_socket (ctx, ZMQ_PUSH);
int val = 0;
size_t placeholder = sizeof(val);
size_t placeholder = sizeof (val);
rc = zmq_getsockopt(socket, ZMQ_TCP_SEND_BUFFER, &val, &placeholder);
assert(rc == 0);
assert(val == 8192);
rc = zmq_getsockopt (socket, ZMQ_SNDBUF, &val, &placeholder);
assert (rc == 0);
assert (val == 8192);
rc = zmq_setsockopt(socket, ZMQ_TCP_SEND_BUFFER, &val, sizeof(val));
assert(rc == 0);
assert(val == 8192);
rc = zmq_setsockopt (socket, ZMQ_SNDBUF, &val, sizeof (val));
assert (rc == 0);
assert (val == 8192);
rc = zmq_getsockopt(socket, ZMQ_TCP_SEND_BUFFER, &val, &placeholder);
assert(rc == 0);
assert(val == 8192);
rc = zmq_getsockopt (socket, ZMQ_SNDBUF, &val, &placeholder);
assert (rc == 0);
assert (val == 8192);
val = 16384;
rc = zmq_setsockopt(socket, ZMQ_TCP_SEND_BUFFER, &val, sizeof(val));
assert(rc == 0);
assert(val == 16384);
rc = zmq_setsockopt (socket, ZMQ_SNDBUF, &val, sizeof (val));
assert (rc == 0);
assert (val == 16384);
rc = zmq_getsockopt(socket, ZMQ_TCP_SEND_BUFFER, &val, &placeholder);
assert(rc == 0);
assert(val == 16384);
rc = zmq_getsockopt (socket, ZMQ_SNDBUF, &val, &placeholder);
assert (rc == 0);
assert (val == 16384);
zmq_close(socket);
zmq_ctx_term(ctx);
zmq_close (socket);
zmq_ctx_term (ctx);
}
void test_setsockopt_use_fd()
void test_setsockopt_use_fd ()
{
int rc;
void *ctx = zmq_ctx_new();
void *socket = zmq_socket(ctx, ZMQ_PUSH);
void *ctx = zmq_ctx_new ();
void *socket = zmq_socket (ctx, ZMQ_PUSH);
int val = 0;
size_t placeholder = sizeof(val);
size_t placeholder = sizeof (val);
rc = zmq_getsockopt(socket, ZMQ_USE_FD, &val, &placeholder);
rc = zmq_getsockopt (socket, ZMQ_USE_FD, &val, &placeholder);
assert(rc == 0);
assert(val == -1);
val = 3;
rc = zmq_setsockopt(socket, ZMQ_USE_FD, &val, sizeof(val));
rc = zmq_setsockopt (socket, ZMQ_USE_FD, &val, sizeof(val));
assert(rc == 0);
assert(val == 3);
rc = zmq_getsockopt(socket, ZMQ_USE_FD, &val, &placeholder);
rc = zmq_getsockopt (socket, ZMQ_USE_FD, &val, &placeholder);
assert(rc == 0);
assert(val == 3);
zmq_close(socket);
zmq_ctx_term(ctx);
zmq_close (socket);
zmq_ctx_term (ctx);
}
int main()
int main (void)
{
test_setsockopt_tcp_recv_buffer();
test_setsockopt_tcp_send_buffer();
test_setsockopt_use_fd();
test_setsockopt_tcp_recv_buffer ();
test_setsockopt_tcp_send_buffer ();
test_setsockopt_use_fd ();
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment