Commit 035c937e authored by Martin Sustrik's avatar Martin Sustrik

zmq_poll: account for the fact that ZMQ_FD is edge-triggered

parent 67aa7885
...@@ -403,6 +403,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) ...@@ -403,6 +403,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
} }
} }
bool first_pass = true;
int timeout = timeout_ > 0 ? timeout_ / 1000 : -1; int timeout = timeout_ > 0 ? timeout_ / 1000 : -1;
int nevents = 0; int nevents = 0;
...@@ -410,7 +411,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) ...@@ -410,7 +411,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
// Wait for events. Ignore interrupts if there's infinite timeout. // Wait for events. Ignore interrupts if there's infinite timeout.
while (true) { while (true) {
int rc = poll (pollfds, nitems_, timeout); int rc = poll (pollfds, nitems_, first_pass ? 0 : timeout);
if (rc == -1 && errno == EINTR) { if (rc == -1 && errno == EINTR) {
if (timeout_ < 0) if (timeout_ < 0)
continue; continue;
...@@ -431,7 +432,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) ...@@ -431,7 +432,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
// The poll item is a 0MQ socket. Retrieve pending events // The poll item is a 0MQ socket. Retrieve pending events
// using the ZMQ_EVENTS socket option. // using the ZMQ_EVENTS socket option.
if (items_ [i].socket && (pollfds [i].revents & POLLIN)) { if (items_ [i].socket) {
size_t zmq_events_size = sizeof (uint32_t); size_t zmq_events_size = sizeof (uint32_t);
uint32_t zmq_events; uint32_t zmq_events;
if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events, if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
...@@ -461,6 +462,13 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) ...@@ -461,6 +462,13 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
nevents++; nevents++;
} }
// If there are no events from the first pass (the one with no
// timout), do at least the second pass so that we wait.
if (first_pass && nevents == 0 && timeout_ != 0) {
first_pass = false;
continue;
}
// If timeout is set to infinite and we have to events to return // If timeout is set to infinite and we have to events to return
// we can restart the polling. // we can restart the polling.
if (timeout == -1 && nevents == 0) if (timeout == -1 && nevents == 0)
...@@ -514,6 +522,8 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) ...@@ -514,6 +522,8 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
} }
} }
bool first_pass = true;
timeval zero_timeout = {0, 0};
timeval timeout = {timeout_ / 1000000, timeout_ % 1000000}; timeval timeout = {timeout_ / 1000000, timeout_ % 1000000};
int nevents = 0; int nevents = 0;
fd_set inset, outset, errset; fd_set inset, outset, errset;
...@@ -526,7 +536,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) ...@@ -526,7 +536,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
memcpy (&outset, &pollset_out, sizeof (fd_set)); memcpy (&outset, &pollset_out, sizeof (fd_set));
memcpy (&errset, &pollset_err, sizeof (fd_set)); memcpy (&errset, &pollset_err, sizeof (fd_set));
int rc = select (maxfd, &inset, &outset, &errset, int rc = select (maxfd, &inset, &outset, &errset,
(timeout_ < 0) ? NULL : &timeout); first_pass ? &zero_timeout : (timeout_ < 0 ? NULL : &timeout));
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
#else #else
...@@ -553,19 +563,19 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) ...@@ -553,19 +563,19 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
size_t zmq_fd_size = sizeof (zmq::fd_t); size_t zmq_fd_size = sizeof (zmq::fd_t);
zmq::fd_t notify_fd; zmq::fd_t notify_fd;
if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &notify_fd, if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &notify_fd,
&zmq_fd_size) == -1) &zmq_fd_size) == -1)
return -1; return -1;
if (FD_ISSET (notify_fd, &inset)) { if (FD_ISSET (notify_fd, &inset)) {
size_t zmq_events_size = sizeof (uint32_t); size_t zmq_events_size = sizeof (uint32_t);
uint32_t zmq_events; uint32_t zmq_events;
if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events, if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
&zmq_events_size) == -1) &zmq_events_size) == -1)
return -1; return -1;
if ((items_ [i].events & ZMQ_POLLOUT) && if ((items_ [i].events & ZMQ_POLLOUT) &&
(zmq_events & ZMQ_POLLOUT)) (zmq_events & ZMQ_POLLOUT))
items_ [i].revents |= ZMQ_POLLOUT; items_ [i].revents |= ZMQ_POLLOUT;
if ((items_ [i].events & ZMQ_POLLIN) && if ((items_ [i].events & ZMQ_POLLIN) &&
(zmq_events & ZMQ_POLLIN)) (zmq_events & ZMQ_POLLIN))
items_ [i].revents |= ZMQ_POLLIN; items_ [i].revents |= ZMQ_POLLIN;
} }
} }
...@@ -584,6 +594,13 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) ...@@ -584,6 +594,13 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
nevents++; nevents++;
} }
// If there are no events from the first pass (the one with no
// timout), do at least the second pass so that we wait.
if (first_pass && nevents == 0 && timeout_ != 0) {
first_pass = false;
continue;
}
// If timeout is set to infinite and we have to events to return // If timeout is set to infinite and we have to events to return
// we can restart the polling. // we can restart the polling.
if (timeout_ < 0 && nevents == 0) if (timeout_ < 0 && nevents == 0)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment