Commit f2b235db authored by Martin Sustrik's avatar Martin Sustrik

ZMQII-29: Add timeout to zmq_poll function

parent 7884f454
...@@ -202,7 +202,7 @@ typedef struct ...@@ -202,7 +202,7 @@ typedef struct
short revents; short revents;
} zmq_pollitem_t; } zmq_pollitem_t;
ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems); ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// Helper functions. // Helper functions.
......
...@@ -33,9 +33,9 @@ namespace zmq ...@@ -33,9 +33,9 @@ namespace zmq
typedef zmq_free_fn free_fn; typedef zmq_free_fn free_fn;
typedef zmq_pollitem_t pollitem_t; typedef zmq_pollitem_t pollitem_t;
inline int poll (zmq_pollitem_t *items_, int nitems_) inline int poll (zmq_pollitem_t *items_, int nitems_, long timeout_ = -1)
{ {
return zmq_poll (items_, nitems_); return zmq_poll (items_, nitems_, timeout_);
} }
class error_t : public std::exception class error_t : public std::exception
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
.SH NAME .SH NAME
zmq_poll \- polls for events on a set of 0MQ and POSIX sockets zmq_poll \- polls for events on a set of 0MQ and POSIX sockets
.SH SYNOPSIS .SH SYNOPSIS
.B int zmq_poll (zmq_pollitem_t *items, int nitems); .B int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
.SH DESCRIPTION .SH DESCRIPTION
Waits for the events specified by Waits for the events specified by
.IR items .IR items
...@@ -31,7 +31,7 @@ specifies which events to wait for. It's a combination of the values below. ...@@ -31,7 +31,7 @@ specifies which events to wait for. It's a combination of the values below.
Once the call exits, Once the call exits,
.IR revent .IR revent
will be filled with events that have actually occured on the socket. The field will be filled with events that have actually occured on the socket. The field
will contain a combination of the following values. will contain a combination of the values below.
.IP "\fBZMQ_POLLIN\fP" .IP "\fBZMQ_POLLIN\fP"
poll for incoming messages. poll for incoming messages.
...@@ -40,6 +40,12 @@ wait while message can be set socket. Poll will return if a message of at least ...@@ -40,6 +40,12 @@ wait while message can be set socket. Poll will return if a message of at least
one byte can be written to the socket. However, there is no guarantee that one byte can be written to the socket. However, there is no guarantee that
arbitrarily large message can be sent. arbitrarily large message can be sent.
.IR timeout
argument specifies an upper limit on the time for which
.IR zmq_poll
will block, in microseconds. Specifying a negative value in timeout means
an infinite timeout.
.SH RETURN VALUE .SH RETURN VALUE
Function returns number of items signaled or -1 in the case of error. Function returns number of items signaled or -1 in the case of error.
.SH ERRORS .SH ERRORS
......
...@@ -264,7 +264,7 @@ int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_) ...@@ -264,7 +264,7 @@ int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)
return (((zmq::socket_base_t*) s_)->recv (msg_, flags_)); return (((zmq::socket_base_t*) s_)->recv (msg_, flags_));
} }
int zmq_poll (zmq_pollitem_t *items_, int nitems_) int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
{ {
#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\ #if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\ defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
...@@ -321,6 +321,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_) ...@@ -321,6 +321,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_)
npollfds++; npollfds++;
} }
int timeout = timeout_ > 0 ? timeout_ / 1000 : -1;
int nevents = 0; int nevents = 0;
bool initial = true; bool initial = true;
while (!nevents) { while (!nevents) {
...@@ -328,10 +329,16 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_) ...@@ -328,10 +329,16 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_)
// Wait for activity. In the first iteration just check for events, // Wait for activity. In the first iteration just check for events,
// don't wait. Waiting would prevent exiting on any events that may // don't wait. Waiting would prevent exiting on any events that may
// already be signaled on 0MQ sockets. // already be signaled on 0MQ sockets.
int rc = poll (pollfds, npollfds, initial ? 0 : -1); int rc = poll (pollfds, npollfds, initial ? 0 : timeout);
if (rc == -1 && errno == EINTR) if (rc == -1 && errno == EINTR)
continue; continue;
errno_assert (rc >= 0); errno_assert (rc >= 0);
// If timeout was hit with no events signaled, return zero.
if (!initial && rc == 0)
return 0;
// From now on, perform blocking polling.
initial = false; initial = false;
// Process 0MQ commands if needed. // Process 0MQ commands if needed.
...@@ -426,6 +433,8 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_) ...@@ -426,6 +433,8 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_)
maxfd = notify_fd; maxfd = notify_fd;
} }
timeval timeout = {timeout_ / 1000000, timeout_ % 1000000};
timeval zero_timeout = {0, 0};
int nevents = 0; int nevents = 0;
bool initial = true; bool initial = true;
while (!nevents) { while (!nevents) {
...@@ -433,17 +442,21 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_) ...@@ -433,17 +442,21 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_)
// Wait for activity. In the first iteration just check for events, // Wait for activity. In the first iteration just check for events,
// don't wait. Waiting would prevent exiting on any events that may // don't wait. Waiting would prevent exiting on any events that may
// already be signaled on 0MQ sockets. // already be signaled on 0MQ sockets.
timeval timeout = {0, 0};
int rc = select (maxfd, &pollset_in, &pollset_out, &pollset_err, int rc = select (maxfd, &pollset_in, &pollset_out, &pollset_err,
initial ? &timeout : NULL); initial ? &zero_timeout : &timeout);
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
#else #else
if (rc == -1 && errno == EINTR) if (rc == -1 && errno == EINTR)
continue; continue;
#endif #endif
errno_assert (rc >= 0); errno_assert (rc >= 0);
// If timeout was hit with no events signaled, return zero.
if (!initial && rc == 0)
return 0;
// From now on, perform blocking select.
initial = false; initial = false;
// Process 0MQ commands if needed. // Process 0MQ commands if needed.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment