Commit a85d1e51 authored by Martin Lucina's avatar Martin Lucina Committed by Martin Sustrik

zmq_poll(): Rewrite to use ZMQ_FD/ZMQ_EVENTS pt2

Rewrite the select()-based zmq_poll() implementation to use
ZMQ_FD and ZMQ_EVENTS.

Also fix some corner cases: We should not pollute revents with
unrequested events, and we don't need to poll on ZMQ_FD at all
if a pollitem with no events set was passed in.
parent 6b1ca2cb
...@@ -382,8 +382,8 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) ...@@ -382,8 +382,8 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
// If the poll item is a 0MQ socket, we poll on the file descriptor // If the poll item is a 0MQ socket, we poll on the file descriptor
// retrieved by the ZMQ_FD socket option. // retrieved by the ZMQ_FD socket option.
if (items_ [i].socket) { if (items_ [i].socket && items_ [i].events) {
size_t zmq_fd_size = sizeof (int); size_t zmq_fd_size = sizeof (zmq::fd_t);
if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd, if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd,
&zmq_fd_size) == -1) { &zmq_fd_size) == -1) {
free (pollfds); free (pollfds);
...@@ -410,9 +410,8 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) ...@@ -410,9 +410,8 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
if (rc == -1 && errno == EINTR) { if (rc == -1 && errno == EINTR) {
if (timeout_ < 0) if (timeout_ < 0)
continue; continue;
// Interrupted, no way to determine how much time is remaining
// from timeout so just return for now.
else { else {
// TODO: Calculate remaining timeout and restart poll ().
free (pollfds); free (pollfds);
return 0; return 0;
} }
...@@ -428,15 +427,18 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) ...@@ -428,15 +427,18 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
// The poll item is a 0MQ socket. Retrieve pending events // The poll item is a 0MQ socket. Retrieve pending events
// using the ZMQ_EVENTS socket option. // using the ZMQ_EVENTS socket option.
if (items_ [i].socket) { if (items_ [i].socket && (pollfds [i].revents & POLLIN)) {
if (pollfds [i].revents & POLLIN) { size_t zmq_events_size = sizeof (uint32_t);
size_t zmq_events_size = sizeof (uint32_t); uint32_t zmq_events;
if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
&items_ [i].revents, &zmq_events_size) == -1) { &zmq_events_size) == -1) {
free (pollfds); free (pollfds);
return -1; return -1;
}
} }
if ((items_ [i].events & ZMQ_POLLOUT) && (zmq_events & ZMQ_POLLOUT))
items_ [i].revents |= ZMQ_POLLOUT;
if ((items_ [i].events & ZMQ_POLLIN) && (zmq_events & ZMQ_POLLIN))
items_ [i].revents |= ZMQ_POLLIN;
} }
// Else, the poll item is a raw file descriptor, simply convert // Else, the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format. // the events to zmq_pollitem_t-style format.
...@@ -465,152 +467,103 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) ...@@ -465,152 +467,103 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
fd_set pollset_err; fd_set pollset_err;
FD_ZERO (&pollset_err); FD_ZERO (&pollset_err);
zmq::app_thread_t *app_thread = NULL; zmq::fd_t maxfd = 0;
int nsockets = 0;
zmq::fd_t maxfd = zmq::retired_fd;
zmq::fd_t notify_fd = zmq::retired_fd;
// Build the fd_sets for passing to select ().
for (int i = 0; i != nitems_; i++) { for (int i = 0; i != nitems_; i++) {
// 0MQ sockets. // If the poll item is a 0MQ socket we are interested in input on the
if (items_ [i].socket) { // notification file descriptor retrieved by the ZMQ_FD socket option.
if (items_ [i].socket && items_ [i].events) {
// Get the app_thread the socket is living in. If there are two size_t zmq_fd_size = sizeof (zmq::fd_t);
// sockets in the same pollset with different app threads, fail. zmq::fd_t notify_fd;
zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket; if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &notify_fd,
if (app_thread) { &zmq_fd_size) == -1)
if (app_thread != s->get_thread ()) { return -1;
errno = EFAULT; FD_SET (notify_fd, &pollset_in);
return -1; if (maxfd < notify_fd)
} maxfd = notify_fd;
}
else
app_thread = s->get_thread ();
nsockets++;
continue;
} }
// Else, the poll item is a raw file descriptor. Convert the poll item
// Raw file descriptors. // events to the appropriate fd_sets.
if (items_ [i].events & ZMQ_POLLIN) else {
FD_SET (items_ [i].fd, &pollset_in); if (items_ [i].events & ZMQ_POLLIN)
if (items_ [i].events & ZMQ_POLLOUT) FD_SET (items_ [i].fd, &pollset_in);
FD_SET (items_ [i].fd, &pollset_out); if (items_ [i].events & ZMQ_POLLOUT)
if (items_ [i].events & ZMQ_POLLERR) FD_SET (items_ [i].fd, &pollset_out);
FD_SET (items_ [i].fd, &pollset_err); if (items_ [i].events & ZMQ_POLLERR)
if (maxfd == zmq::retired_fd || maxfd < items_ [i].fd) FD_SET (items_ [i].fd, &pollset_err);
maxfd = items_ [i].fd; if (maxfd < items_ [i].fd)
} maxfd = items_ [i].fd;
// If there's at least one 0MQ socket in the pollset we have to poll
// for 0MQ commands. If ZMQ_POLL was not set, fail.
if (nsockets) {
notify_fd = app_thread->get_signaler ()->get_fd ();
if (notify_fd == zmq::retired_fd) {
errno = ENOTSUP;
return -1;
} }
FD_SET (notify_fd, &pollset_in);
if (maxfd == zmq::retired_fd || maxfd < notify_fd)
maxfd = notify_fd;
} }
bool block = (timeout_ < 0);
timeval timeout = {timeout_ / 1000000, timeout_ % 1000000}; timeval timeout = {timeout_ / 1000000, timeout_ % 1000000};
timeval zero_timeout = {0, 0};
int nevents = 0; int nevents = 0;
// First iteration just check for events, don't block. Waiting would
// prevent exiting on any events that may already been signaled on
// 0MQ sockets.
fd_set inset, outset, errset; fd_set inset, outset, errset;
memcpy (&inset, &pollset_in, sizeof (fd_set));
memcpy (&outset, &pollset_out, sizeof (fd_set)); // Wait for events. Ignore interrupts if there's infinite timeout.
memcpy (&errset, &pollset_err, sizeof (fd_set)); while (true) {
int rc = select (maxfd, &inset, &outset, &errset, &zero_timeout); memcpy (&inset, &pollset_in, sizeof (fd_set));
memcpy (&outset, &pollset_out, sizeof (fd_set));
memcpy (&errset, &pollset_err, sizeof (fd_set));
int rc = select (maxfd, &inset, &outset, &errset,
(timeout_ < 0) ? NULL : &timeout);
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
#else #else
if (rc == -1 && errno == EINTR && timeout_ >= 0) if (rc == -1 && errno == EINTR) {
return 0; if (timeout_ < 0)
errno_assert (rc >= 0 || (rc == -1 && errno == EINTR)); continue;
else
// TODO: Calculate remaining timeout and restart select ().
return 0;
}
errno_assert (rc >= 0);
#endif #endif
break;
}
while (true) { // Check for the events.
for (int i = 0; i != nitems_; i++) {
// Process 0MQ commands if needed.
if (nsockets && FD_ISSET (notify_fd, &inset))
if (!app_thread->process_commands (false, false)) {
errno = ETERM;
return -1;
}
// Check for the events. items_ [i].revents = 0;
for (int i = 0; i != nitems_; i++) {
// If the poll item is a raw file descriptor, simply convert // The poll item is a 0MQ socket. Retrieve pending events
// the events to zmq_pollitem_t-style format. // using the ZMQ_EVENTS socket option.
if (!items_ [i].socket) { if (items_ [i].socket) {
items_ [i].revents = 0; size_t zmq_fd_size = sizeof (zmq::fd_t);
if (FD_ISSET (items_ [i].fd, &inset)) zmq::fd_t notify_fd;
items_ [i].revents |= ZMQ_POLLIN; if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &notify_fd,
if (FD_ISSET (items_ [i].fd, &outset)) &zmq_fd_size) == -1)
return -1;
if (FD_ISSET (notify_fd, &inset)) {
size_t zmq_events_size = sizeof (uint32_t);
uint32_t zmq_events;
if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
&zmq_events_size) == -1)
return -1;
if ((items_ [i].events & ZMQ_POLLOUT) &&
(zmq_events & ZMQ_POLLOUT))
items_ [i].revents |= ZMQ_POLLOUT; items_ [i].revents |= ZMQ_POLLOUT;
if (FD_ISSET (items_ [i].fd, &errset)) if ((items_ [i].events & ZMQ_POLLIN) &&
items_ [i].revents |= ZMQ_POLLERR; (zmq_events & ZMQ_POLLIN))
if (items_ [i].revents) items_ [i].revents |= ZMQ_POLLIN;
nevents++;
continue;
} }
}
// The poll item is a 0MQ socket. // Else, the poll item is a raw file descriptor, simply convert
zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket; // the events to zmq_pollitem_t-style format.
items_ [i].revents = 0; else {
if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ()) if (FD_ISSET (items_ [i].fd, &inset))
items_ [i].revents |= ZMQ_POLLOUT;
if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ())
items_ [i].revents |= ZMQ_POLLIN; items_ [i].revents |= ZMQ_POLLIN;
if (items_ [i].revents) if (FD_ISSET (items_ [i].fd, &outset))
nevents++; items_ [i].revents |= ZMQ_POLLOUT;
if (FD_ISSET (items_ [i].fd, &errset))
items_ [i].revents |= ZMQ_POLLERR;
} }
// If there's at least one event, or if we are asked not to block, if (items_ [i].revents)
// return immediately. nevents++;
if (nevents || (timeout.tv_sec == 0 && timeout.tv_usec == 0))
break;
// Wait for events. Ignore interrupts if there's infinite timeout.
while (true) {
memcpy (&inset, &pollset_in, sizeof (fd_set));
memcpy (&outset, &pollset_out, sizeof (fd_set));
memcpy (&errset, &pollset_err, sizeof (fd_set));
rc = select (maxfd, &inset, &outset, &errset,
block ? NULL : &timeout);
#if defined ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
if (rc == -1 && errno == EINTR) {
if (timeout_ < 0)
continue;
else {
rc = 0;
break;
}
}
errno_assert (rc >= 0);
#endif
break;
}
// If timeout was hit with no events signaled, return zero.
if (rc == 0)
break;
// If timeout was already applied, we don't want to poll anymore.
// Setting timeout to zero will cause termination of the function
// once the events we've got are processed.
if (!block)
timeout = zero_timeout;
} }
return nevents; return nevents;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment