Commit 2e39f892 authored by Martin Sustrik's avatar Martin Sustrik

ZMQII-27: Allow setting SNDBUF and RCVBUF size from 0MQ API (POSIX)

parent 72dacc35
......@@ -159,6 +159,8 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_RATE 8
#define ZMQ_RECOVERY_IVL 9
#define ZMQ_MCAST_LOOP 10
#define ZMQ_SNDBUF 11
#define ZMQ_RCVBUF 12
#define ZMQ_NOBLOCK 1
#define ZMQ_NOFLUSH 2
......
......@@ -158,6 +158,8 @@
(defconstant rate 8)
(defconstant recovery-ivl 9)
(defconstant mcast-loop 10)
(defconstant sndbuf 11)
(defconstant rcvbuf 12)
(defcfun* ("zmq_setsockopt" %setsockopt) :int
(s :pointer)
......
......@@ -47,6 +47,8 @@ public class Socket
public static final int RATE = 8;
public static final int RECOVERY_IVL = 9;
public static final int MCAST_LOOP = 10;
public static final int SNDBUF = 11;
public static final int RCVBUF = 12;
/**
* Class constructor.
......
......@@ -534,6 +534,12 @@ PyMODINIT_FUNC initlibpyzmq ()
t = PyInt_FromLong (ZMQ_MCAST_LOOP);
PyDict_SetItemString (dict, "MCAST_LOOP", t);
Py_DECREF (t);
t = PyInt_FromLong (ZMQ_SNDBUF);
PyDict_SetItemString (dict, "SNDBUF", t);
Py_DECREF (t);
t = PyInt_FromLong (ZMQ_RCVBUF);
PyDict_SetItemString (dict, "RCVBUF", t);
Py_DECREF (t);
t = PyInt_FromLong (ZMQ_POLL);
PyDict_SetItemString (dict, "POLL", t);
Py_DECREF (t);
......
......@@ -266,6 +266,8 @@ extern "C" void Init_librbzmq ()
rb_define_global_const ("RATE", INT2NUM (ZMQ_RATE));
rb_define_global_const ("RECOVERY_IVL", INT2NUM (ZMQ_RECOVERY_IVL));
rb_define_global_const ("MCAST_LOOP", INT2NUM (ZMQ_MCAST_LOOP));
rb_define_global_const ("SNDBUF", INT2NUM (ZMQ_SNDBUF));
rb_define_global_const ("RCVBUF", INT2NUM (ZMQ_RCVBUF));
rb_define_global_const ("NOBLOCK", INT2NUM (ZMQ_NOBLOCK));
rb_define_global_const ("NOFLUSH", INT2NUM (ZMQ_NOFLUSH));
......
......@@ -17,6 +17,7 @@ High watermark for the message pipes associated with the socket. The water
mark cannot be exceeded. If the messages don't fit into the pipe emergency
mechanisms of the particular socket type are used (block, drop etc.) If HWM
is set to zero, there are no limits for the content of the pipe.
Type: int64_t Unit: bytes Default: 0
.IP "\fBZMQ_LWM\fP"
......@@ -24,6 +25,7 @@ Low watermark makes sense only if high watermark is defined (i.e. is non-zero).
When the emergency state is reached when messages overflow the pipe, the
emergency lasts till the size of the pipe decreases to low watermark.
At that point normal state is resumed.
Type: int64_t Unit: bytes Default: 0
.IP "\fBZMQ_SWAP\fP"
......@@ -31,6 +33,7 @@ Swap allows the pipe to exceed high watermark. However, the data are written
to the disk rather than held in the memory. Until high watermark is
exceeded there is no disk activity involved though. The value of the option
defines maximal size of the swap file.
Type: int64_t Unit: bytes Default: 0
.IP "\fBZMQ_AFFINITY\fP"
......@@ -41,6 +44,7 @@ fairly among the threads in the thread pool. For non-zero values, the lowest
bit corresponds to the thread 1, second lowest bit to the thread 2 etc.
Thus, value of 3 means that from now on newly created sockets will handle
I/O activity exclusively using threads no. 1 and 2.
Type: int64_t Unit: N/A (bitmap) Default: 0
.IP "\fBZMQ_IDENTITY\fP"
......@@ -50,6 +54,7 @@ separated from other runs. However, with identity application reconnects to
existing infrastructure left by the previous run. Thus it may receive
messages that were sent in the meantime, it shares pipe limits with the
previous run etc.
Type: string Unit: N/A Default: NULL
.IP "\fBZMQ_SUBSCRIBE\fP"
......@@ -61,6 +66,7 @@ specific topic ("x.y.z") and/or messages with specific topic prefix
the very beginning of the message. Multiple filters can be attached to
a single 'sub' socket. In that case message passes if it matches at least
one of the filters.
Type: string Unit: N/A Default: N/A
.IP "\fBZMQ_UNSUBSCRIBE\fP"
......@@ -69,12 +75,14 @@ The filter specified must match the string passed to ZMQ_SUBSCRIBE options
exactly. If there were several instances of the same filter created,
this options removes only one of them, leaving the rest in place
and functional.
Type: string Unit: N/A Default: N/A
.IP "\fBZMQ_RATE\fP"
This option applies only to sending side of multicast transports (pgm & udp).
It specifies maximal outgoing data rate that an individual sender socket
can send.
Type: uint64_t Unit: kilobits/second Default: 100
.IP "\fBZMQ_RECOVERY_IVL\fP"
......@@ -84,6 +92,7 @@ Keep in mind that large recovery intervals at high data rates result in
very large recovery buffers, meaning that you can easily overload your box
by setting say 1 minute recovery interval at 1Gb/s rate (requires
7GB in-memory buffer).
Type: uint64_t Unit: seconds Default: 10
.IP "\fBZMQ_MCAST_LOOP\fP"
......@@ -92,8 +101,23 @@ means that the mutlicast packets can be received on the box they were sent
from. Setting the value to 0 disables the loopback functionality which
can have negative impact on the performance. If possible, disable
the loopback in production environments.
Type: uint64_t Unit: N/A (boolean value) Default: 1
.IP "\fBZMQ_SNDBUF\fP"
Sets the underlying kernel transmit buffer size to the specified size. See
.IR SO_SNDBUF
POSIX socket option. Value of zero means leaving the OS default unchanged.
Type: uint64_t Unit: bytes Default: 0
.IP "\fBZMQ_RCVBUF\fP"
Sets the underlying kernel receive buffer size to the specified size. See
.IR SO_RCVBUF
POSIX socket option. Value of zero means leaving the OS default unchanged.
Type: uint64_t Unit: bytes Default: 0
.SH RETURN VALUE
In case of success the function returns zero. Otherwise it returns -1 and
sets
......
......@@ -30,6 +30,8 @@ zmq::options_t::options_t () :
rate (100),
recovery_ivl (10),
use_multicast_loop (true),
sndbuf (0),
rcvbuf (0),
requires_in (false),
requires_out (false)
{
......@@ -106,6 +108,22 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
return -1;
}
return 0;
case ZMQ_SNDBUF:
if (optvallen_ != sizeof (uint64_t)) {
errno = EINVAL;
return -1;
}
sndbuf = *((uint64_t*) optval_);
return 0;
case ZMQ_RCVBUF:
if (optvallen_ != sizeof (uint64_t)) {
errno = EINVAL;
return -1;
}
rcvbuf = *((uint64_t*) optval_);
return 0;
}
errno = EINVAL;
......
......@@ -49,6 +49,9 @@ namespace zmq
// Enable multicast loopback. Default disabled (false).
bool use_multicast_loop;
uint64_t sndbuf;
uint64_t rcvbuf;
// These options are never set by the user directly. Instead they are
// provided by the specific socket type.
bool requires_in;
......
......@@ -34,7 +34,7 @@ zmq::tcp_socket_t::~tcp_socket_t ()
close ();
}
int zmq::tcp_socket_t::open (fd_t fd_)
int zmq::tcp_socket_t::open (fd_t fd_, uint64_t sndbuf_, uint64_t rcvbuf_)
{
zmq_assert (s == retired_fd);
s = fd_;
......@@ -129,10 +129,23 @@ zmq::tcp_socket_t::~tcp_socket_t ()
close ();
}
int zmq::tcp_socket_t::open (fd_t fd_)
int zmq::tcp_socket_t::open (fd_t fd_, uint64_t sndbuf_, uint64_t rcvbuf_)
{
assert (s == retired_fd);
s = fd_;
if (sndbuf_) {
int sz = (int) sndbuf_;
int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF, &sz, sizeof (int));
errno_assert (rc == 0);
}
if (rcvbuf_) {
int sz = (int) rcvbuf_;
int rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF, &sz, sizeof (int));
errno_assert (rc == 0);
}
return 0;
}
......
......@@ -21,6 +21,7 @@
#define __ZMQ_TCP_SOCKET_HPP_INCLUDED__
#include "fd.hpp"
#include "stdint.hpp"
namespace zmq
{
......@@ -35,7 +36,7 @@ namespace zmq
~tcp_socket_t ();
// Associates a socket with a native socket descriptor.
int open (fd_t fd_);
int open (fd_t fd_, uint64_t sndbuf_, uint64_t rcvbuf_);
// Closes the underlying socket.
int close ();
......
......@@ -31,7 +31,7 @@ zmq::zmq_connecter_init_t::zmq_connecter_init_t (io_thread_t *parent_,
session_name (session_name_)
{
// Create associated engine object.
engine = new zmq_engine_t (parent_, fd_);
engine = new zmq_engine_t (parent_, fd_, options);
zmq_assert (engine);
}
......
......@@ -24,7 +24,8 @@
#include "config.hpp"
#include "err.hpp"
zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_) :
zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_,
const options_t &options_) :
io_object_t (parent_),
inbuf (NULL),
insize (0),
......@@ -32,7 +33,8 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_) :
outbuf (NULL),
outsize (0),
outpos (0),
inout (NULL)
inout (NULL),
options (options_)
{
// Allocate read & write buffer.
inbuf_storage = (unsigned char*) malloc (in_batch_size);
......@@ -41,7 +43,7 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_) :
zmq_assert (outbuf_storage);
// Initialise the underlying socket.
int rc = tcp_socket.open (fd_);
int rc = tcp_socket.open (fd_, options.sndbuf, options.rcvbuf);
zmq_assert (rc == 0);
}
......
......@@ -36,7 +36,8 @@ namespace zmq
{
public:
zmq_engine_t (class io_thread_t *parent_, fd_t fd_);
zmq_engine_t (class io_thread_t *parent_, fd_t fd_,
const options_t &options_);
~zmq_engine_t ();
// i_engine interface implementation.
......@@ -71,6 +72,8 @@ namespace zmq
zmq_encoder_t encoder;
zmq_decoder_t decoder;
options_t options;
zmq_engine_t (const zmq_engine_t&);
void operator = (const zmq_engine_t&);
};
......
......@@ -29,7 +29,7 @@ zmq::zmq_listener_init_t::zmq_listener_init_t (io_thread_t *parent_,
has_peer_identity (false)
{
// Create associated engine object.
engine = new zmq_engine_t (parent_, fd_);
engine = new zmq_engine_t (parent_, fd_, options);
zmq_assert (engine);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment