Commit d7923f08 authored by Fabien Ninoles's avatar Fabien Ninoles Committed by Martin Sustrik

Add sockopt ZMQ_RCVTIMEO/ZMQ_SNDTIMEO.

- Add doc and tests
- Add options and setup
- Wait using poll/select
Signed-off-by: 's avatarFabien Ninoles <fabien@tzone.org>
Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent 65d2b703
...@@ -28,6 +28,7 @@ tests/test_reqrep_ipc ...@@ -28,6 +28,7 @@ tests/test_reqrep_ipc
tests/test_reqrep_tcp tests/test_reqrep_tcp
tests/test_shutdown_stress tests/test_shutdown_stress
tests/test_hwm tests/test_hwm
tests/test_timeo
src/platform.hpp* src/platform.hpp*
src/stamp-h1 src/stamp-h1
devices/zmq_forwarder/zmq_forwarder devices/zmq_forwarder/zmq_forwarder
......
...@@ -255,7 +255,8 @@ interval for the specified 'socket'. This is the maximum period 0MQ shall wait ...@@ -255,7 +255,8 @@ interval for the specified 'socket'. This is the maximum period 0MQ shall wait
between attempts to reconnect. On each reconnect attempt, the previous interval between attempts to reconnect. On each reconnect attempt, the previous interval
shall be doubled untill ZMQ_RECONNECT_IVL_MAX is reached. This allows for shall be doubled untill ZMQ_RECONNECT_IVL_MAX is reached. This allows for
exponential backoff strategy. Default value means no exponential backoff is exponential backoff strategy. Default value means no exponential backoff is
performed and reconnect interval calculations are only based on ZMQ_RECONNECT_IVL. performed and reconnect interval calculations are only based on
ZMQ_RECONNECT_IVL.
NOTE: Values less than ZMQ_RECONNECT_IVL will be ignored. NOTE: Values less than ZMQ_RECONNECT_IVL will be ignored.
...@@ -324,6 +325,38 @@ Default value:: 1 ...@@ -324,6 +325,38 @@ Default value:: 1
Applicable socket types:: ZMQ_SUB, ZMQ_XSUB Applicable socket types:: ZMQ_SUB, ZMQ_XSUB
ZMQ_RCVTIMEO: Maximum time before a socket operation returns with EAGAIN
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Retrieve the timeout for recv operation on the socket. If the value is `0`,
_zmq_recv(3)_ will return immediately, with a EAGAIN error if there is no
message to receive. If the value is `-1`, it will block until a message is
available. For all other values, it will wait for a message for that amount
of time before returning with an EAGAIN error.
[horizontal]
Option value type:: int
Option value unit:: milliseconds
Default value:: -1 (infinite)
Applicable socket types:: all
ZMQ_SNDTIMEO: Maximum time before a socket operation returns with EAGAIN
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Retrieve the timeout for send operation on the socket. If the value is `0`,
_zmq_send(3)_ will return immediately, with a EAGAIN error if the message
cannot be sent. If the value is `-1`, it will block until the message is sent.
For all other values, it will try to send the message for that amount of time
before returning with an EAGAIN error.
[horizontal]
Option value type:: int
Option value unit:: milliseconds
Default value:: -1 (infinite)
Applicable socket types:: all
ZMQ_FD: Retrieve file descriptor associated with the socket ZMQ_FD: Retrieve file descriptor associated with the socket
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_FD' option shall retrieve the file descriptor associated with the The 'ZMQ_FD' option shall retrieve the file descriptor associated with the
......
...@@ -328,6 +328,38 @@ Default value:: 1 ...@@ -328,6 +328,38 @@ Default value:: 1
Applicable socket types:: ZMQ_SUB, ZMQ_XSUB Applicable socket types:: ZMQ_SUB, ZMQ_XSUB
ZMQ_RCVTIMEO: Maximum time before a recv operation returns with EAGAIN
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the timeout for receive operation on the socket. If the value is `0`,
_zmq_recv(3)_ will return immediately, with a EAGAIN error if there is no
message to receive. If the value is `-1`, it will block until a message is
available. For all other values, it will wait for a message for that amount
of time before returning with an EAGAIN error.
[horizontal]
Option value type:: int
Option value unit:: milliseconds
Default value:: -1 (infinite)
Applicable socket types:: all
ZMQ_SNDTIMEO: Maximum time before a send operation returns with EAGAIN
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the timeout for send operation on the socket. If the value is `0`,
_zmq_send(3)_ will return immediately, with a EAGAIN error if the message
cannot be sent. If the value is `-1`, it will block until the message is sent.
For all other values, it will try to send the message for that amount of time
before returning with an EAGAIN error.
[horizontal]
Option value type:: int
Option value unit:: milliseconds
Default value:: -1 (infinite)
Applicable socket types:: all
RETURN VALUE RETURN VALUE
------------ ------------
The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it
......
...@@ -181,6 +181,8 @@ ZMQ_EXPORT int zmq_term (void *context); ...@@ -181,6 +181,8 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_RCVHWM 24 #define ZMQ_RCVHWM 24
#define ZMQ_MULTICAST_HOPS 25 #define ZMQ_MULTICAST_HOPS 25
#define ZMQ_FILTER 26 #define ZMQ_FILTER 26
#define ZMQ_RCVTIMEO 27
#define ZMQ_SNDTIMEO 28
/* Send/recv options. */ /* Send/recv options. */
#define ZMQ_DONTWAIT 1 #define ZMQ_DONTWAIT 1
......
...@@ -88,6 +88,8 @@ int main (int argc, char *argv []) ...@@ -88,6 +88,8 @@ int main (int argc, char *argv [])
} }
} }
zmq_sleep (2);
rc = zmq_close (s); rc = zmq_close (s);
if (rc != 0) { if (rc != 0) {
printf ("error in zmq_close: %s\n", zmq_strerror (errno)); printf ("error in zmq_close: %s\n", zmq_strerror (errno));
......
...@@ -142,7 +142,7 @@ int zmq::ctx_t::terminate () ...@@ -142,7 +142,7 @@ int zmq::ctx_t::terminate ()
// Wait till reaper thread closes all the sockets. // Wait till reaper thread closes all the sockets.
command_t cmd; command_t cmd;
int rc = term_mailbox.recv (&cmd, true); int rc = term_mailbox.recv (&cmd, -1);
if (rc == -1 && errno == EINTR) if (rc == -1 && errno == EINTR)
return -1; return -1;
zmq_assert (rc == 0); zmq_assert (rc == 0);
......
...@@ -70,7 +70,7 @@ void zmq::io_thread_t::in_event () ...@@ -70,7 +70,7 @@ void zmq::io_thread_t::in_event ()
// Get the next command. If there is none, exit. // Get the next command. If there is none, exit.
command_t cmd; command_t cmd;
int rc = mailbox.recv (&cmd, false); int rc = mailbox.recv (&cmd, 0);
if (rc != 0 && errno == EINTR) if (rc != 0 && errno == EINTR)
continue; continue;
if (rc != 0 && errno == EAGAIN) if (rc != 0 && errno == EAGAIN)
......
...@@ -18,8 +18,33 @@ ...@@ -18,8 +18,33 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "mailbox.hpp"
#include "platform.hpp" #include "platform.hpp"
#if defined ZMQ_FORCE_SELECT
#define ZMQ_RCVTIMEO_BASED_ON_SELECT
#elif defined ZMQ_FORCE_POLL
#define ZMQ_RCVTIMEO_BASED_ON_POLL
#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\
defined ZMQ_HAVE_NETBSD
#define ZMQ_RCVTIMEO_BASED_ON_POLL
#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
#define ZMQ_RCVTIMEO_BASED_ON_SELECT
#endif
// On AIX, poll.h has to be included before zmq.h to get consistent
// definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
// instead of 'events' and 'revents' and defines macros to map from POSIX-y
// names to AIX-specific names).
#if defined ZMQ_RCVTIMEO_BASED_ON_POLL
#include <poll.h>
#elif defined ZMQ_RCVTIMEO_BASED_ON_SELECT
#include <sys/select.h>
#endif
#include "mailbox.hpp"
#include "err.hpp" #include "err.hpp"
#include "fd.hpp" #include "fd.hpp"
#include "ip.hpp" #include "ip.hpp"
...@@ -79,10 +104,14 @@ void zmq::mailbox_t::send (const command_t &cmd_) ...@@ -79,10 +104,14 @@ void zmq::mailbox_t::send (const command_t &cmd_)
zmq_assert (nbytes == sizeof (command_t)); zmq_assert (nbytes == sizeof (command_t));
} }
int zmq::mailbox_t::recv (command_t *cmd_, bool block_) int zmq::mailbox_t::recv (command_t *cmd_, int timeout_)
{ {
// If there's a finite timeout, poll on the fd.
if (timeout_ > 0)
return recv_timeout (cmd_, timeout_);
// If required, set the reader to blocking mode. // If required, set the reader to blocking mode.
if (block_) { if (timeout_ < 0) {
unsigned long argp = 0; unsigned long argp = 0;
int rc = ioctlsocket (r, FIONBIO, &argp); int rc = ioctlsocket (r, FIONBIO, &argp);
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
...@@ -97,7 +126,7 @@ int zmq::mailbox_t::recv (command_t *cmd_, bool block_) ...@@ -97,7 +126,7 @@ int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
err = EAGAIN; err = EAGAIN;
// Re-set the reader to non-blocking mode. // Re-set the reader to non-blocking mode.
if (block_) { if (timeout_ < 0) {
unsigned long argp = 1; unsigned long argp = 1;
int rc = ioctlsocket (r, FIONBIO, &argp); int rc = ioctlsocket (r, FIONBIO, &argp);
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
...@@ -194,20 +223,24 @@ void zmq::mailbox_t::send (const command_t &cmd_) ...@@ -194,20 +223,24 @@ void zmq::mailbox_t::send (const command_t &cmd_)
zmq_assert (nbytes == sizeof (command_t)); zmq_assert (nbytes == sizeof (command_t));
} }
int zmq::mailbox_t::recv (command_t *cmd_, bool block_) int zmq::mailbox_t::recv (command_t *cmd_, int timeout_)
{ {
// If there's a finite timeout, poll on the fd.
if (timeout_ > 0)
return recv_timeout (cmd_, timeout_);
#ifdef MSG_DONTWAIT #ifdef MSG_DONTWAIT
// Attempt to read an entire command. Returns EAGAIN if non-blocking // Attempt to read an entire command. Returns EAGAIN if non-blocking
// mode is requested and a command is not available. // mode is requested and a command is not available.
ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t),
block_ ? 0 : MSG_DONTWAIT); timeout_ < 0 ? 0 : MSG_DONTWAIT);
if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) if (nbytes == -1 && (errno == EAGAIN || errno == EINTR))
return -1; return -1;
#else #else
// If required, set the reader to blocking mode. // If required, set the reader to blocking mode.
if (block_) { if (timeout_ < 0) {
int flags = fcntl (r, F_GETFL, 0); int flags = fcntl (r, F_GETFL, 0);
errno_assert (flags >= 0); errno_assert (flags >= 0);
int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK); int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK);
...@@ -223,7 +256,7 @@ int zmq::mailbox_t::recv (command_t *cmd_, bool block_) ...@@ -223,7 +256,7 @@ int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
err = errno; err = errno;
// Re-set the reader to non-blocking mode. // Re-set the reader to non-blocking mode.
if (block_) { if (timeout_ < 0) {
int flags = fcntl (r, F_GETFL, 0); int flags = fcntl (r, F_GETFL, 0);
errno_assert (flags >= 0); errno_assert (flags >= 0);
int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
...@@ -380,3 +413,60 @@ int zmq::mailbox_t::make_socketpair (fd_t *r_, fd_t *w_) ...@@ -380,3 +413,60 @@ int zmq::mailbox_t::make_socketpair (fd_t *r_, fd_t *w_)
#endif #endif
} }
int zmq::mailbox_t::recv_timeout (command_t *cmd_, int timeout_)
{
#ifdef ZMQ_RCVTIMEO_BASED_ON_POLL
struct pollfd pfd;
pfd.fd = r;
pfd.events = POLLIN;
int rc = poll (&pfd, 1, timeout_);
if (unlikely (rc < 0)) {
zmq_assert (errno == EINTR);
return -1;
}
else if (unlikely (rc == 0)) {
errno = EAGAIN;
return -1;
}
zmq_assert (rc == 1);
zmq_assert (pfd.revents & POLLIN);
#elif defined ZMQ_RCVTIMEO_BASED_ON_SELECT
fd_set fds;
FD_ZERO (&fds);
FD_SET (r, &fds);
struct timeval timeout;
timeout.tv_sec = timeout_ / 1000;
timeout.tv_usec = timeout_ % 1000 * 1000;
int rc = select (r + 1, &fds, NULL, NULL, &timeout);
if (unlikely (rc < 0)) {
zmq_assert (errno == EINTR);
return -1;
}
else if (unlikely (rc == 0)) {
errno = EAGAIN;
return -1;
}
zmq_assert (rc == 1);
#else
#error
#endif
// The file descriptor is ready for reading. Extract one command out of it.
ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0);
if (unlikely (rc < 0 && errno == EINTR))
return -1;
zmq_assert (nbytes == sizeof (command_t));
return 0;
}
#if defined ZMQ_RCVTIMEO_BASED_ON_SELECT
#undef ZMQ_RCVTIMEO_BASED_ON_SELECT
#endif
#if defined ZMQ_RCVTIMEO_BASED_ON_POLL
#undef ZMQ_RCVTIMEO_BASED_ON_POLL
#endif
...@@ -41,7 +41,7 @@ namespace zmq ...@@ -41,7 +41,7 @@ namespace zmq
fd_t get_fd (); fd_t get_fd ();
void send (const command_t &cmd_); void send (const command_t &cmd_);
int recv (command_t *cmd_, bool block_); int recv (command_t *cmd_, int timeout_);
private: private:
...@@ -52,6 +52,11 @@ namespace zmq ...@@ -52,6 +52,11 @@ namespace zmq
// Platform-dependent function to create a socketpair. // Platform-dependent function to create a socketpair.
static int make_socketpair (fd_t *r_, fd_t *w_); static int make_socketpair (fd_t *r_, fd_t *w_);
// Receives a command with the specific timeout.
// This function is not to be used for non-blocking or inifinitely
// blocking recvs.
int recv_timeout (command_t *cmd_, int timeout_);
// Disable copying of mailbox_t object. // Disable copying of mailbox_t object.
mailbox_t (const mailbox_t&); mailbox_t (const mailbox_t&);
const mailbox_t &operator = (const mailbox_t&); const mailbox_t &operator = (const mailbox_t&);
......
...@@ -39,6 +39,8 @@ zmq::options_t::options_t () : ...@@ -39,6 +39,8 @@ zmq::options_t::options_t () :
backlog (100), backlog (100),
maxmsgsize (-1), maxmsgsize (-1),
filter (1), filter (1),
rcvtimeo (-1),
sndtimeo (-1),
immediate_connect (true) immediate_connect (true)
{ {
} }
...@@ -182,6 +184,22 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, ...@@ -182,6 +184,22 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
filter = *((int*) optval_); filter = *((int*) optval_);
return 0; return 0;
case ZMQ_RCVTIMEO:
if (optvallen_ != sizeof (int)) {
errno = EINVAL;
return -1;
}
rcvtimeo = *((int*) optval_);
return 0;
case ZMQ_SNDTIMEO:
if (optvallen_ != sizeof (int)) {
errno = EINVAL;
return -1;
}
sndtimeo = *((int*) optval_);
return 0;
} }
errno = EINVAL; errno = EINVAL;
...@@ -336,6 +354,24 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) ...@@ -336,6 +354,24 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*optvallen_ = sizeof (int); *optvallen_ = sizeof (int);
return 0; return 0;
case ZMQ_RCVTIMEO:
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
return -1;
}
*((int*) optval_) = rcvtimeo;
*optvallen_ = sizeof (int);
return 0;
case ZMQ_SNDTIMEO:
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
return -1;
}
*((int*) optval_) = sndtimeo;
*optvallen_ = sizeof (int);
return 0;
} }
errno = EINVAL; errno = EINVAL;
......
...@@ -78,6 +78,10 @@ namespace zmq ...@@ -78,6 +78,10 @@ namespace zmq
// If 1, (X)SUB socket should filter the messages. If 0, it should not. // If 1, (X)SUB socket should filter the messages. If 0, it should not.
int filter; int filter;
// The timeout for send/recv operations for this socket.
int rcvtimeo;
int sndtimeo;
// If true, when connecting, pipes are created immediately without // If true, when connecting, pipes are created immediately without
// waiting for the connection to be established. That way the socket // waiting for the connection to be established. That way the socket
// is not aware of the peer's identity, however, it is able to send // is not aware of the peer's identity, however, it is able to send
......
...@@ -61,7 +61,7 @@ void zmq::reaper_t::in_event () ...@@ -61,7 +61,7 @@ void zmq::reaper_t::in_event ()
// Get the next command. If there is none, exit. // Get the next command. If there is none, exit.
command_t cmd; command_t cmd;
int rc = mailbox.recv (&cmd, false); int rc = mailbox.recv (&cmd, 0);
if (rc != 0 && errno == EINTR) if (rc != 0 && errno == EINTR)
continue; continue;
if (rc != 0 && errno == EAGAIN) if (rc != 0 && errno == EAGAIN)
......
...@@ -288,7 +288,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, ...@@ -288,7 +288,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
int rc = process_commands (false, false); int rc = process_commands (0, false);
if (rc != 0 && (errno == EINTR || errno == ETERM)) if (rc != 0 && (errno == EINTR || errno == ETERM))
return -1; return -1;
errno_assert (rc == 0); errno_assert (rc == 0);
...@@ -475,7 +475,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) ...@@ -475,7 +475,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
} }
// Process pending commands, if any. // Process pending commands, if any.
int rc = process_commands (false, true); int rc = process_commands (0, true);
if (unlikely (rc != 0)) if (unlikely (rc != 0))
return -1; return -1;
...@@ -487,20 +487,38 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) ...@@ -487,20 +487,38 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
rc = xsend (msg_, flags_); rc = xsend (msg_, flags_);
if (rc == 0) if (rc == 0)
return 0; return 0;
if (unlikely (errno != EAGAIN))
return -1;
// In case of non-blocking send we'll simply propagate // In case of non-blocking send we'll simply propagate
// the error - including EAGAIN - upwards. // the error - including EAGAIN - up the stack.
if (flags_ & ZMQ_DONTWAIT) if (flags_ & ZMQ_DONTWAIT || options.sndtimeo == 0)
return -1; return -1;
// Compute the time when the timeout should occur.
// If the timeout is infite, don't care.
clock_t clock ;
int timeout = options.sndtimeo;
uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);
// Oops, we couldn't send the message. Wait for the next // Oops, we couldn't send the message. Wait for the next
// command, process it and try to send the message again. // command, process it and try to send the message again.
while (rc != 0) { // If timeout is reached in the meantime, return EAGAIN.
if (errno != EAGAIN) while (true) {
return -1; if (unlikely (process_commands (timeout, false) != 0))
if (unlikely (process_commands (true, false) != 0))
return -1; return -1;
rc = xsend (msg_, flags_); rc = xsend (msg_, flags_);
if (rc == 0)
break;
if (unlikely (errno != EAGAIN))
return -1;
if (timeout > 0) {
timeout = end - clock.now_ms ();
if (timeout <= 0) {
errno = EAGAIN;
return -1;
}
}
} }
return 0; return 0;
} }
...@@ -521,7 +539,8 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) ...@@ -521,7 +539,8 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
// Get the message. // Get the message.
int rc = xrecv (msg_, flags_); int rc = xrecv (msg_, flags_);
int err = errno; if (unlikely (rc != 0 && errno != EAGAIN))
return -1;
// Once every inbound_poll_rate messages check for signals and process // Once every inbound_poll_rate messages check for signals and process
// incoming commands. This happens only if we are not polling altogether // incoming commands. This happens only if we are not polling altogether
...@@ -532,7 +551,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) ...@@ -532,7 +551,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
// described above) from the one used by 'send'. This is because counting // described above) from the one used by 'send'. This is because counting
// ticks is more efficient than doing RDTSC all the time. // ticks is more efficient than doing RDTSC all the time.
if (++ticks == inbound_poll_rate) { if (++ticks == inbound_poll_rate) {
if (unlikely (process_commands (false, false) != 0)) if (unlikely (process_commands (0, false) != 0))
return -1; return -1;
ticks = 0; ticks = 0;
} }
...@@ -545,17 +564,12 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) ...@@ -545,17 +564,12 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
return 0; return 0;
} }
// If we don't have the message, restore the original cause of the problem.
errno = err;
// If the message cannot be fetched immediately, there are two scenarios. // If the message cannot be fetched immediately, there are two scenarios.
// For non-blocking recv, commands are processed in case there's an // For non-blocking recv, commands are processed in case there's an
// activate_reader command already waiting int a command pipe. // activate_reader command already waiting int a command pipe.
// If it's not, return EAGAIN. // If it's not, return EAGAIN.
if (flags_ & ZMQ_DONTWAIT) { if (flags_ & ZMQ_DONTWAIT || options.rcvtimeo == 0) {
if (errno != EAGAIN) if (unlikely (process_commands (0, false) != 0))
return -1;
if (unlikely (process_commands (false, false) != 0))
return -1; return -1;
ticks = 0; ticks = 0;
...@@ -568,17 +582,33 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) ...@@ -568,17 +582,33 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
return rc; return rc;
} }
// Compute the time when the timeout should occur.
// If the timeout is infite, don't care.
clock_t clock ;
int timeout = options.rcvtimeo;
uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);
// In blocking scenario, commands are processed over and over again until // In blocking scenario, commands are processed over and over again until
// we are able to fetch a message. // we are able to fetch a message.
bool block = (ticks != 0); bool block = (ticks != 0);
while (rc != 0) { while (true) {
if (errno != EAGAIN) if (unlikely (process_commands (block ? timeout : 0, false) != 0))
return -1;
if (unlikely (process_commands (block, false) != 0))
return -1; return -1;
rc = xrecv (msg_, flags_); rc = xrecv (msg_, flags_);
ticks = 0; if (rc == 0) {
ticks = 0;
break;
}
if (unlikely (errno != EAGAIN))
return -1;
block = true; block = true;
if (timeout > 0) {
timeout = end - clock.now_ms ();
if (timeout <= 0) {
errno = EAGAIN;
return -1;
}
}
} }
rcvmore = msg_->flags () & msg_t::more; rcvmore = msg_->flags () & msg_t::more;
...@@ -658,18 +688,20 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_) ...@@ -658,18 +688,20 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_)
check_destroy (); check_destroy ();
} }
int zmq::socket_base_t::process_commands (bool block_, bool throttle_) int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
{ {
int rc; int rc;
command_t cmd; command_t cmd;
if (block_) { if (timeout_ != 0) {
rc = mailbox.recv (&cmd, true);
if (rc == -1 && errno == EINTR) // If we are asked to wait, simply ask mailbox to wait.
return -1; rc = mailbox.recv (&cmd, timeout_);
errno_assert (rc == 0);
} }
else { else {
// If we are asked not to wait, check whether we haven't processed
// commands recently, so that we can throttle the new commands.
// Get the CPU's tick counter. If 0, the counter is not available. // Get the CPU's tick counter. If 0, the counter is not available.
uint64_t tsc = zmq::clock_t::rdtsc (); uint64_t tsc = zmq::clock_t::rdtsc ();
...@@ -690,7 +722,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_) ...@@ -690,7 +722,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
} }
// Check whether there are any commands pending for this thread. // Check whether there are any commands pending for this thread.
rc = mailbox.recv (&cmd, false); rc = mailbox.recv (&cmd, 0);
} }
// Process all the commands available at the moment. // Process all the commands available at the moment.
...@@ -701,7 +733,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_) ...@@ -701,7 +733,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
return -1; return -1;
errno_assert (rc == 0); errno_assert (rc == 0);
cmd.destination->process_command (cmd); cmd.destination->process_command (cmd);
rc = mailbox.recv (&cmd, false); rc = mailbox.recv (&cmd, 0);
} }
if (ctx_terminated) { if (ctx_terminated) {
...@@ -797,9 +829,11 @@ void zmq::socket_base_t::xhiccuped (pipe_t *pipe_) ...@@ -797,9 +829,11 @@ void zmq::socket_base_t::xhiccuped (pipe_t *pipe_)
void zmq::socket_base_t::in_event () void zmq::socket_base_t::in_event ()
{ {
// Process any commands from other threads/sockets that may be available // This function is invoked only once the socket is running in the context
// at the moment. Ultimately, socket will be destroyed. // of the reaper thread. Process any commands from other threads/sockets
process_commands (false, false); // that may be available at the moment. Ultimately, the socket will
// be destroyed.
process_commands (0, false);
check_destroy (); check_destroy ();
} }
......
...@@ -160,11 +160,11 @@ namespace zmq ...@@ -160,11 +160,11 @@ namespace zmq
// Register the pipe with this socket. // Register the pipe with this socket.
void attach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); void attach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
// Processes commands sent to this socket (if any). If 'block' is // Processes commands sent to this socket (if any). If timeout is -1,
// set to true, returns only after at least one command was processed. // returns only after at least one command was processed.
// If throttle argument is true, commands are processed at most once // If throttle argument is true, commands are processed at most once
// in a predefined time period. // in a predefined time period.
int process_commands (bool block_, bool throttle_); int process_commands (int timeout_, bool throttle_);
// Handlers for incoming commands. // Handlers for incoming commands.
void process_stop (); void process_stop ();
......
...@@ -5,7 +5,8 @@ noinst_PROGRAMS = test_pair_inproc \ ...@@ -5,7 +5,8 @@ noinst_PROGRAMS = test_pair_inproc \
test_pair_tcp \ test_pair_tcp \
test_reqrep_inproc \ test_reqrep_inproc \
test_reqrep_tcp \ test_reqrep_tcp \
test_hwm test_hwm \
test_timeo
if !ON_MINGW if !ON_MINGW
noinst_PROGRAMS += test_shutdown_stress \ noinst_PROGRAMS += test_shutdown_stress \
...@@ -21,6 +22,8 @@ test_reqrep_tcp_SOURCES = test_reqrep_tcp.cpp testutil.hpp ...@@ -21,6 +22,8 @@ test_reqrep_tcp_SOURCES = test_reqrep_tcp.cpp testutil.hpp
test_hwm_SOURCES = test_hwm.cpp test_hwm_SOURCES = test_hwm.cpp
test_timeo_SOURCES = test_timeo.cpp
if !ON_MINGW if !ON_MINGW
test_shutdown_stress_SOURCES = test_shutdown_stress.cpp test_shutdown_stress_SOURCES = test_shutdown_stress.cpp
test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp
......
/*
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <string.h>
#include <pthread.h>
#include "../include/zmq.h"
#include "../include/zmq_utils.h"
extern "C"
{
void *worker(void *ctx)
{
// Worker thread connects after delay of 1 second. Then it waits
// for 1 more second, so that async connect has time to succeed.
zmq_sleep (1);
void *sc = zmq_socket (ctx, ZMQ_PUSH);
assert (sc);
int rc = zmq_connect (sc, "inproc://timeout_test");
assert (rc == 0);
zmq_sleep (1);
rc = zmq_close (sc);
assert (rc == 0);
return NULL;
}
}
int main (int argc, char *argv [])
{
void *ctx = zmq_init (1);
assert (ctx);
// Create a disconnected socket.
void *sb = zmq_socket (ctx, ZMQ_PULL);
assert (sb);
int rc = zmq_bind (sb, "inproc://timeout_test");
assert (rc == 0);
// Check whether non-blocking recv returns immediately.
char buf [] = "12345678ABCDEFGH12345678abcdefgh";
rc = zmq_recv (sb, buf, 32, ZMQ_DONTWAIT);
assert (rc == -1);
assert (zmq_errno() == EAGAIN);
// Check whether recv timeout is honoured.
int timeout = 500;
size_t timeout_size = sizeof timeout;
rc = zmq_setsockopt(sb, ZMQ_RCVTIMEO, &timeout, timeout_size);
assert (rc == 0);
void *watch = zmq_stopwatch_start ();
rc = zmq_recv (sb, buf, 32, 0);
assert (rc == -1);
assert (zmq_errno () == EAGAIN);
unsigned long elapsed = zmq_stopwatch_stop (watch);
assert (elapsed > 440000 && elapsed < 550000);
// Check whether connection during the wait doesn't distort the timeout.
timeout = 2000;
rc = zmq_setsockopt(sb, ZMQ_RCVTIMEO, &timeout, timeout_size);
assert (rc == 0);
pthread_t thread;
rc = pthread_create (&thread, NULL, worker, ctx);
assert (rc == 0);
watch = zmq_stopwatch_start ();
rc = zmq_recv (sb, buf, 32, 0);
assert (rc == -1);
assert (zmq_errno () == EAGAIN);
elapsed = zmq_stopwatch_stop (watch);
assert (elapsed > 1900000 && elapsed < 2100000);
rc = pthread_join (thread, NULL);
assert (rc == 0);
// Check that timeouts don't break normal message transfer.
void *sc = zmq_socket (ctx, ZMQ_PUSH);
assert (sc);
rc = zmq_setsockopt(sb, ZMQ_RCVTIMEO, &timeout, timeout_size);
assert (rc == 0);
rc = zmq_setsockopt(sb, ZMQ_SNDTIMEO, &timeout, timeout_size);
assert (rc == 0);
rc = zmq_connect (sc, "inproc://timeout_test");
assert (rc == 0);
rc = zmq_send (sc, buf, 32, 0);
assert (rc == 32);
rc = zmq_recv (sb, buf, 32, 0);
assert (rc == 32);
// Clean-up.
rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);
rc = zmq_term (ctx);
assert (rc == 0);
return 0 ;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment