Commit cd125084 authored by Martin Lucina's avatar Martin Lucina Committed by Martin Sustrik

zmq_poll(): Rewrite to use ZMQ_FD/ZMQ_EVENTS pt1

Rewrite zmq_poll() to use ZMQ_FD and ZMQ_EVENTS introduced on the
wip-shutdown branch. Only do the poll()-based version of zmq_poll (), the
select()-based version will not compile at the moment.
parent eb7b8a41
......@@ -364,7 +364,6 @@ int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)
int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
{
/*
#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
......@@ -377,138 +376,81 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
}
pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
zmq_assert (pollfds);
int npollfds = 0;
int nsockets = 0;
zmq::app_thread_t *app_thread = NULL;
// Build pollset for poll () system call.
for (int i = 0; i != nitems_; i++) {
// 0MQ sockets.
// If the poll item is a 0MQ socket, we poll on the file descriptor
// retrieved by the ZMQ_FD socket option.
if (items_ [i].socket) {
// Get the app_thread the socket is living in. If there are two
// sockets in the same pollset with different app threads, fail.
zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
if (app_thread) {
if (app_thread != s->get_thread ()) {
free (pollfds);
errno = EFAULT;
return -1;
}
size_t zmq_fd_size = sizeof (int);
if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd,
&zmq_fd_size) == -1) {
free (pollfds);
return -1;
}
else
app_thread = s->get_thread ();
nsockets++;
continue;
pollfds [i].events = POLLIN;
}
// Raw file descriptors.
pollfds [npollfds].fd = items_ [i].fd;
pollfds [npollfds].events =
(items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
(items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0);
npollfds++;
// Else, the poll item is a raw file descriptor. Just convert the
// events to normal POLLIN/POLLOUT for poll ().
else {
pollfds [i].fd = items_ [i].fd;
pollfds [i].events =
(items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
(items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0);
}
}
// If there's at least one 0MQ socket in the pollset we have to poll
// for 0MQ commands. If ZMQ_POLL was not set, fail.
if (nsockets) {
pollfds [npollfds].fd = app_thread->get_signaler ()->get_fd ();
if (pollfds [npollfds].fd == zmq::retired_fd) {
free (pollfds);
errno = ENOTSUP;
return -1;
}
pollfds [npollfds].events = POLLIN;
npollfds++;
}
// First iteration just check for events, don't block. Waiting would
// prevent exiting on any events that may already been signaled on
// 0MQ sockets.
int rc = poll (pollfds, npollfds, 0);
if (rc == -1 && errno == EINTR && timeout_ >= 0) {
free (pollfds);
return 0;
}
errno_assert (rc >= 0 || (rc == -1 && errno == EINTR));
int timeout = timeout_ > 0 ? timeout_ / 1000 : -1;
int nevents = 0;
// Wait for events. Ignore interrupts if there's infinite timeout.
while (true) {
// Process 0MQ commands if needed.
if (nsockets && pollfds [npollfds -1].revents & POLLIN)
if (!app_thread->process_commands (false, false)) {
free (pollfds);
errno = ETERM;
return -1;
}
// Check for the events.
int pollfd_pos = 0;
for (int i = 0; i != nitems_; i++) {
// If the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format.
if (!items_ [i].socket) {
items_ [i].revents = 0;
if (pollfds [pollfd_pos].revents & POLLIN)
items_ [i].revents |= ZMQ_POLLIN;
if (pollfds [pollfd_pos].revents & POLLOUT)
items_ [i].revents |= ZMQ_POLLOUT;
if (pollfds [pollfd_pos].revents & ~(POLLIN | POLLOUT))
items_ [i].revents |= ZMQ_POLLERR;
if (items_ [i].revents)
nevents++;
pollfd_pos++;
int rc = poll (pollfds, nitems_, timeout);
if (rc == -1 && errno == EINTR) {
if (timeout_ < 0)
continue;
// Interrupted, no way to determine how much time is remaining
// from timeout so just return for now.
else {
free (pollfds);
return 0;
}
// The poll item is a 0MQ socket.
zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
items_ [i].revents = 0;
if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ())
items_ [i].revents |= ZMQ_POLLOUT;
if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ())
items_ [i].revents |= ZMQ_POLLIN;
if (items_ [i].revents)
nevents++;
}
errno_assert (rc >= 0);
break;
}
// If there's at least one event, or if we are asked not to block,
// return immediately.
if (nevents || !timeout_)
break;
// Check for the events.
for (int i = 0; i != nitems_; i++) {
// Wait for events. Ignore interrupts if there's infinite timeout.
while (true) {
rc = poll (pollfds, npollfds, timeout);
if (rc == -1 && errno == EINTR) {
if (timeout_ < 0)
continue;
else {
rc = 0;
break;
items_ [i].revents = 0;
// The poll item is a 0MQ socket. Retrieve pending events
// using the ZMQ_EVENTS socket option.
if (items_ [i].socket) {
if (pollfds [i].revents & POLLIN) {
size_t zmq_events_size = sizeof (uint32_t);
if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS,
&items_ [i].revents, &zmq_events_size) == -1) {
free (pollfds);
return -1;
}
}
errno_assert (rc >= 0);
break;
}
// If timeout was hit with no events signaled, return zero.
if (rc == 0)
break;
// Else, the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format.
else {
if (pollfds [i].revents & POLLIN)
items_ [i].revents |= ZMQ_POLLIN;
if (pollfds [i].revents & POLLOUT)
items_ [i].revents |= ZMQ_POLLOUT;
if (pollfds [i].revents & ~(POLLIN | POLLOUT))
items_ [i].revents |= ZMQ_POLLERR;
}
// If timeout was already applied, we don't want to poll anymore.
// Setting timeout to zero will cause termination of the function
// once the events we've got are processed.
if (timeout > 0)
timeout = 0;
if (items_ [i].revents)
nevents++;
}
free (pollfds);
......@@ -677,9 +619,6 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
errno = ENOTSUP;
return -1;
#endif
*/
zmq_assert (false);
return -1;
}
int zmq_errno ()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment