Commit 56faac7f authored by Martin Sustrik's avatar Martin Sustrik

zmq_poll returns prematurely even if infinite timeout is set - fixed

parent 3cb84b5c
...@@ -375,6 +375,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) ...@@ -375,6 +375,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
errno = EFAULT; errno = EFAULT;
return -1; return -1;
} }
pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd)); pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
zmq_assert (pollfds); zmq_assert (pollfds);
...@@ -405,55 +406,67 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) ...@@ -405,55 +406,67 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
int timeout = timeout_ > 0 ? timeout_ / 1000 : -1; int timeout = timeout_ > 0 ? timeout_ / 1000 : -1;
int nevents = 0; int nevents = 0;
// Wait for events. Ignore interrupts if there's infinite timeout.
while (true) { while (true) {
int rc = poll (pollfds, nitems_, timeout);
if (rc == -1 && errno == EINTR) { // Wait for events. Ignore interrupts if there's infinite timeout.
if (timeout_ < 0) while (true) {
continue; int rc = poll (pollfds, nitems_, timeout);
else { if (rc == -1 && errno == EINTR) {
// TODO: Calculate remaining timeout and restart poll (). if (timeout_ < 0)
free (pollfds); continue;
return 0; else {
// TODO: Calculate remaining timeout and restart poll ().
free (pollfds);
return 0;
}
} }
errno_assert (rc >= 0);
break;
} }
errno_assert (rc >= 0);
break;
}
// Check for the events. // Check for the events.
for (int i = 0; i != nitems_; i++) { for (int i = 0; i != nitems_; i++) {
items_ [i].revents = 0; items_ [i].revents = 0;
// The poll item is a 0MQ socket. Retrieve pending events // The poll item is a 0MQ socket. Retrieve pending events
// using the ZMQ_EVENTS socket option. // using the ZMQ_EVENTS socket option.
if (items_ [i].socket && (pollfds [i].revents & POLLIN)) { if (items_ [i].socket && (pollfds [i].revents & POLLIN)) {
size_t zmq_events_size = sizeof (uint32_t); size_t zmq_events_size = sizeof (uint32_t);
uint32_t zmq_events; uint32_t zmq_events;
if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events, if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
&zmq_events_size) == -1) { &zmq_events_size) == -1) {
free (pollfds); free (pollfds);
return -1; return -1;
}
if ((items_ [i].events & ZMQ_POLLOUT) &&
(zmq_events & ZMQ_POLLOUT))
items_ [i].revents |= ZMQ_POLLOUT;
if ((items_ [i].events & ZMQ_POLLIN) &&
(zmq_events & ZMQ_POLLIN))
items_ [i].revents |= ZMQ_POLLIN;
} }
if ((items_ [i].events & ZMQ_POLLOUT) && (zmq_events & ZMQ_POLLOUT)) // Else, the poll item is a raw file descriptor, simply convert
items_ [i].revents |= ZMQ_POLLOUT; // the events to zmq_pollitem_t-style format.
if ((items_ [i].events & ZMQ_POLLIN) && (zmq_events & ZMQ_POLLIN)) else {
items_ [i].revents |= ZMQ_POLLIN; if (pollfds [i].revents & POLLIN)
} items_ [i].revents |= ZMQ_POLLIN;
// Else, the poll item is a raw file descriptor, simply convert if (pollfds [i].revents & POLLOUT)
// the events to zmq_pollitem_t-style format. items_ [i].revents |= ZMQ_POLLOUT;
else { if (pollfds [i].revents & ~(POLLIN | POLLOUT))
if (pollfds [i].revents & POLLIN) items_ [i].revents |= ZMQ_POLLERR;
items_ [i].revents |= ZMQ_POLLIN; }
if (pollfds [i].revents & POLLOUT)
items_ [i].revents |= ZMQ_POLLOUT; if (items_ [i].revents)
if (pollfds [i].revents & ~(POLLIN | POLLOUT)) nevents++;
items_ [i].revents |= ZMQ_POLLERR;
} }
if (items_ [i].revents) // If timeout is set to infinite and we have to events to return
nevents++; // we can restart the polling.
if (timeout == -1 && nevents == 0)
continue;
break;
} }
free (pollfds); free (pollfds);
...@@ -505,73 +518,84 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) ...@@ -505,73 +518,84 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
int nevents = 0; int nevents = 0;
fd_set inset, outset, errset; fd_set inset, outset, errset;
// Wait for events. Ignore interrupts if there's infinite timeout.
while (true) { while (true) {
memcpy (&inset, &pollset_in, sizeof (fd_set));
memcpy (&outset, &pollset_out, sizeof (fd_set)); // Wait for events. Ignore interrupts if there's infinite timeout.
memcpy (&errset, &pollset_err, sizeof (fd_set)); while (true) {
int rc = select (maxfd, &inset, &outset, &errset, memcpy (&inset, &pollset_in, sizeof (fd_set));
(timeout_ < 0) ? NULL : &timeout); memcpy (&outset, &pollset_out, sizeof (fd_set));
memcpy (&errset, &pollset_err, sizeof (fd_set));
int rc = select (maxfd, &inset, &outset, &errset,
(timeout_ < 0) ? NULL : &timeout);
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
#else #else
if (rc == -1 && errno == EINTR) { if (rc == -1 && errno == EINTR) {
if (timeout_ < 0) if (timeout_ < 0)
continue; continue;
else else
// TODO: Calculate remaining timeout and restart select (). // TODO: Calculate remaining timeout and restart select ().
return 0; return 0;
} }
errno_assert (rc >= 0); errno_assert (rc >= 0);
#endif #endif
break; break;
} }
// Check for the events. // Check for the events.
for (int i = 0; i != nitems_; i++) { for (int i = 0; i != nitems_; i++) {
items_ [i].revents = 0; items_ [i].revents = 0;
// The poll item is a 0MQ socket. Retrieve pending events // The poll item is a 0MQ socket. Retrieve pending events
// using the ZMQ_EVENTS socket option. // using the ZMQ_EVENTS socket option.
if (items_ [i].socket) { if (items_ [i].socket) {
size_t zmq_fd_size = sizeof (zmq::fd_t); size_t zmq_fd_size = sizeof (zmq::fd_t);
zmq::fd_t notify_fd; zmq::fd_t notify_fd;
if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &notify_fd, if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &notify_fd,
&zmq_fd_size) == -1) &zmq_fd_size) == -1)
return -1;
if (FD_ISSET (notify_fd, &inset)) {
size_t zmq_events_size = sizeof (uint32_t);
uint32_t zmq_events;
if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
&zmq_events_size) == -1)
return -1; return -1;
if ((items_ [i].events & ZMQ_POLLOUT) && if (FD_ISSET (notify_fd, &inset)) {
(zmq_events & ZMQ_POLLOUT)) size_t zmq_events_size = sizeof (uint32_t);
items_ [i].revents |= ZMQ_POLLOUT; uint32_t zmq_events;
if ((items_ [i].events & ZMQ_POLLIN) && if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
(zmq_events & ZMQ_POLLIN)) &zmq_events_size) == -1)
return -1;
if ((items_ [i].events & ZMQ_POLLOUT) &&
(zmq_events & ZMQ_POLLOUT))
items_ [i].revents |= ZMQ_POLLOUT;
if ((items_ [i].events & ZMQ_POLLIN) &&
(zmq_events & ZMQ_POLLIN))
items_ [i].revents |= ZMQ_POLLIN;
}
}
// Else, the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format.
else {
if (FD_ISSET (items_ [i].fd, &inset))
items_ [i].revents |= ZMQ_POLLIN; items_ [i].revents |= ZMQ_POLLIN;
if (FD_ISSET (items_ [i].fd, &outset))
items_ [i].revents |= ZMQ_POLLOUT;
if (FD_ISSET (items_ [i].fd, &errset))
items_ [i].revents |= ZMQ_POLLERR;
} }
if (items_ [i].revents)
nevents++;
} }
// Else, the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format.
else {
if (FD_ISSET (items_ [i].fd, &inset))
items_ [i].revents |= ZMQ_POLLIN;
if (FD_ISSET (items_ [i].fd, &outset))
items_ [i].revents |= ZMQ_POLLOUT;
if (FD_ISSET (items_ [i].fd, &errset))
items_ [i].revents |= ZMQ_POLLERR;
}
if (items_ [i].revents) // If timeout is set to infinite and we have to events to return
nevents++; // we can restart the polling.
if (timeout_ < 0 && nevents == 0)
continue;
break;
} }
return nevents; return nevents;
#else #else
// Exotic platforms that support neither poll() nor select().
errno = ENOTSUP; errno = ENOTSUP;
return -1; return -1;
#endif #endif
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment