Commit 5b929895 authored by Marc Sune's avatar Marc Sune Committed by Marc Sune

Problem: duplicated socket_poller::wait() code

zmq::socket_poller_t::wait() had an important set of common lines
between POLL and SELECT variant.

Solution: refactor zmq::socket_poller_t::wait() and add the
following methods:

zmq::socket_poller_t::zero_trail_events()
zmq::socket_poller_t::check_events()
zmq::socket_poller_t::adjust_timeout()
Signed-off-by: 's avatarMarc Sune <mardevel@gmail.com>
parent a89d79aa
...@@ -401,83 +401,41 @@ void zmq::socket_poller_t::rebuild () ...@@ -401,83 +401,41 @@ void zmq::socket_poller_t::rebuild ()
need_rebuild = false; need_rebuild = false;
} }
int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_events_, long timeout_) void zmq::socket_poller_t::zero_trail_events (
zmq::socket_poller_t::event_t *events_,
int n_events_,
int found)
{ {
if (items.empty () && timeout_ < 0) { for (int i = found; i < n_events_; ++i) {
errno = EFAULT; events_[i].socket = NULL;
return -1; events_[i].fd = 0;
events_[i].user_data = NULL;
events_[i].events = 0;
} }
}
if (need_rebuild)
rebuild ();
#if defined ZMQ_POLL_BASED_ON_POLL #if defined ZMQ_POLL_BASED_ON_POLL
if (unlikely (poll_size == 0)) { int zmq::socket_poller_t::check_events (zmq::socket_poller_t::event_t *events_,
// We'll report an error (timed out) as if the list was non-empty and int n_events_)
// no event occurred within the specified timeout. Otherwise the caller #elif defined ZMQ_POLL_BASED_ON_SELECT
// needs to check the return value AND the event to avoid using the int zmq::socket_poller_t::check_events (zmq::socket_poller_t::event_t *events_,
// nullified event data. int n_events_,
errno = EAGAIN; fd_set& inset,
if (timeout_ == 0) fd_set& outset,
return -1; fd_set& errset)
#if defined ZMQ_HAVE_WINDOWS
Sleep (timeout_ > 0 ? timeout_ : INFINITE);
return -1;
#elif defined ZMQ_HAVE_ANDROID
usleep (timeout_ * 1000);
return -1;
#else
usleep (timeout_ * 1000);
return -1;
#endif #endif
} {
zmq::clock_t clock;
uint64_t now = 0;
uint64_t end = 0;
bool first_pass = true;
while (true) {
// Compute the timeout for the subsequent poll.
int timeout;
if (first_pass)
timeout = 0;
else
if (timeout_ < 0)
timeout = -1;
else
timeout = end - now;
// Wait for events.
while (true) {
int rc = poll (pollfds, poll_size, timeout);
if (rc == -1 && errno == EINTR) {
return -1;
}
errno_assert (rc >= 0);
break;
}
// Receive the signal from pollfd
if (use_signaler && pollfds[0].revents & POLLIN)
signaler->recv ();
// Check for the events.
int found = 0; int found = 0;
for (items_t::iterator it = items.begin (); it != items.end () && found < n_events_; ++it) { for (items_t::iterator it = items.begin (); it != items.end () &&
found < n_events_; ++it) {
events_[found].socket = NULL;
events_[found].fd = 0;
events_[found].user_data = NULL;
events_[found].events = 0;
// The poll item is a 0MQ socket. Retrieve pending events // The poll item is a 0MQ socket. Retrieve pending events
// using the ZMQ_EVENTS socket option. // using the ZMQ_EVENTS socket option.
if (it->socket) { if (it->socket) {
size_t events_size = sizeof (uint32_t); size_t events_size = sizeof (uint32_t);
uint32_t events; uint32_t events;
if (it->socket->getsockopt (ZMQ_EVENTS, &events, &events_size) == -1) { if (it->socket->getsockopt (ZMQ_EVENTS, &events, &events_size)
== -1) {
return -1; return -1;
} }
...@@ -491,6 +449,9 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev ...@@ -491,6 +449,9 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
// Else, the poll item is a raw file descriptor, simply convert // Else, the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format. // the events to zmq_pollitem_t-style format.
else { else {
#if defined ZMQ_POLL_BASED_ON_POLL
short revents = pollfds [it->pollfd_index].revents; short revents = pollfds [it->pollfd_index].revents;
short events = 0; short events = 0;
...@@ -503,6 +464,18 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev ...@@ -503,6 +464,18 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
if (revents & ~(POLLIN | POLLOUT | POLLPRI)) if (revents & ~(POLLIN | POLLOUT | POLLPRI))
events |= ZMQ_POLLERR; events |= ZMQ_POLLERR;
#elif defined ZMQ_POLL_BASED_ON_SELECT
short events = 0;
if (FD_ISSET (it->fd, &inset))
events |= ZMQ_POLLIN;
if (FD_ISSET (it->fd, &outset))
events |= ZMQ_POLLOUT;
if (FD_ISSET (it->fd, &errset))
events |= ZMQ_POLLERR;
#endif //POLL_SELECT
if (events) { if (events) {
events_[found].socket = NULL; events_[found].socket = NULL;
events_[found].user_data = it->user_data; events_[found].user_data = it->user_data;
...@@ -512,51 +485,63 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev ...@@ -512,51 +485,63 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
} }
} }
} }
if (found) {
for (int i = found; i < n_events_; ++i) {
events_[i].socket = NULL;
events_[i].fd = 0;
events_[i].user_data = NULL;
events_[i].events = 0;
}
return found; return found;
} }
// If timeout is zero, exit immediately whether there are events or not. //Return 0 if timeout is expired otherwise 1
int zmq::socket_poller_t::adjust_timeout (zmq::clock_t& clock, long timeout_,
uint64_t& now,
uint64_t& end,
bool& first_pass)
{
// If socket_poller_t::timeout is zero, exit immediately whether there
// are events or not.
if (timeout_ == 0) if (timeout_ == 0)
break; return 0;
// At this point we are meant to wait for events but there are none. // At this point we are meant to wait for events but there are none.
// If timeout is infinite we can just loop until we get some events. // If timeout is infinite we can just loop until we get some events.
if (timeout_ < 0) { if (timeout_ < 0) {
if (first_pass) if (first_pass)
first_pass = false; first_pass = false;
continue; return 1;
} }
// The timeout is finite but non-zero and there are no events. In the // The timeout is finite and there are no events. In the first pass
// first pass, we get a timestamp of when the polling have begun. // we get a timestamp of when the polling have begun. (We assume that
// (We assume that first pass have taken negligible time). We also // first pass have taken negligible time). We also compute the time
// compute the time when the polling should time out. // when the polling should time out.
now = clock.now_ms (); now = clock.now_ms ();
if (first_pass) { if (first_pass) {
end = now + timeout_; end = now + timeout_;
first_pass = false; first_pass = false;
continue; return 1;
} }
// Find out whether timeout have expired. // Find out whether timeout have expired.
if (now >= end) if (now >= end)
break; return 0;
}
errno = EAGAIN; return 1;
}
int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_,
int n_events_,
long timeout_)
{
if (items.empty () && timeout_ < 0) {
errno = EFAULT;
return -1; return -1;
}
#elif defined ZMQ_POLL_BASED_ON_SELECT if (need_rebuild)
rebuild ();
if (unlikely (poll_size == 0)) { if (unlikely (poll_size == 0)) {
// We'll report an error (timed out) as if the list was non-empty and // We'll report an error (timed out) as if the list was non-empty and
// no event occured within the specified timeout. Otherwise the caller // no event occurred within the specified timeout. Otherwise the caller
// needs to check the return value AND the event to avoid using the // needs to check the return value AND the event to avoid using the
// nullified event data. // nullified event data.
errno = EAGAIN; errno = EAGAIN;
...@@ -565,16 +550,70 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev ...@@ -565,16 +550,70 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
Sleep (timeout_ > 0 ? timeout_ : INFINITE); Sleep (timeout_ > 0 ? timeout_ : INFINITE);
return -1; return -1;
#elif defined ZMQ_HAVE_ANDROID
usleep (timeout_ * 1000);
return -1;
#else #else
usleep (timeout_ * 1000); usleep (timeout_ * 1000);
return -1; return -1;
#endif #endif
} }
#if defined ZMQ_POLL_BASED_ON_POLL
zmq::clock_t clock;
uint64_t now = 0;
uint64_t end = 0;
bool first_pass = true;
while (true) {
// Compute the timeout for the subsequent poll.
int timeout;
if (first_pass)
timeout = 0;
else
if (timeout_ < 0)
timeout = -1;
else
timeout = end - now;
// Wait for events.
while (true) {
int rc = poll (pollfds, poll_size, timeout);
if (rc == -1 && errno == EINTR) {
return -1;
}
errno_assert (rc >= 0);
break;
}
// Receive the signal from pollfd
if (use_signaler && pollfds[0].revents & POLLIN)
signaler->recv ();
// Check for the events.
int found = check_events (events_, n_events_);
if (found) {
if (found > 0)
zero_trail_events (events_, n_events_, found);
return found;
}
// Adjust timeout or break
if (adjust_timeout (clock, timeout_, now, end, first_pass) == 0)
break;
}
errno = EAGAIN;
return -1;
#elif defined ZMQ_POLL_BASED_ON_SELECT
zmq::clock_t clock; zmq::clock_t clock;
uint64_t now = 0; uint64_t now = 0;
uint64_t end = 0; uint64_t end = 0;
bool first_pass = true; bool first_pass = true;
fd_set inset, outset, errset; fd_set inset, outset, errset;
while (true) { while (true) {
...@@ -629,81 +668,15 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev ...@@ -629,81 +668,15 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
signaler->recv (); signaler->recv ();
// Check for the events. // Check for the events.
int found = 0; int found = check_events(events_, n_events_, inset, outset, errset);
for (items_t::iterator it = items.begin (); it != items.end () && found < n_events_; ++it) {
// The poll item is a 0MQ socket. Retrieve pending events
// using the ZMQ_EVENTS socket option.
if (it->socket) {
size_t events_size = sizeof (uint32_t);
uint32_t events;
if (it->socket->getsockopt (ZMQ_EVENTS, &events, &events_size) == -1)
return -1;
if (it->events & events) {
events_[found].socket = it->socket;
events_[found].user_data = it->user_data;
events_[found].events = it->events & events;
++found;
}
}
// Else, the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format.
else {
short events = 0;
if (FD_ISSET (it->fd, &inset))
events |= ZMQ_POLLIN;
if (FD_ISSET (it->fd, &outset))
events |= ZMQ_POLLOUT;
if (FD_ISSET (it->fd, &errset))
events |= ZMQ_POLLERR;
if (events) {
events_[found].socket = NULL;
events_[found].user_data = it->user_data;
events_[found].fd = it->fd;
events_[found].events = events;
++found;
}
}
}
if (found) { if (found) {
// zero-out remaining events if (found > 0)
for (int i = found; i < n_events_; ++i) { zero_trail_events (events_, n_events_, found);
events_[i].socket = NULL;
events_[i].fd = 0;
events_[i].user_data = NULL;
events_[i].events = 0;
}
return found; return found;
} }
// If timeout is zero, exit immediately whether there are events or not. // Adjust timeout or break
if (timeout_ == 0) if (adjust_timeout (clock, timeout_, now, end, first_pass) == 0)
break;
// At this point we are meant to wait for events but there are none.
// If timeout is infinite we can just loop until we get some events.
if (timeout_ < 0) {
if (first_pass)
first_pass = false;
continue;
}
// The timeout is finite and there are no events. In the first pass
// we get a timestamp of when the polling have begun. (We assume that
// first pass have taken negligible time). We also compute the time
// when the polling should time out.
now = clock.now_ms ();
if (first_pass) {
end = now + timeout_;
first_pass = false;
continue;
}
// Find out whether timeout have expired.
if (now >= end)
break; break;
} }
...@@ -711,8 +684,10 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev ...@@ -711,8 +684,10 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
return -1; return -1;
#else #else
// Exotic platforms that support neither poll() nor select(). // Exotic platforms that support neither poll() nor select().
errno = ENOTSUP; errno = ENOTSUP;
return -1; return -1;
#endif #endif
} }
...@@ -81,6 +81,21 @@ namespace zmq ...@@ -81,6 +81,21 @@ namespace zmq
bool check_tag (); bool check_tag ();
private: private:
void zero_trail_events (zmq::socket_poller_t::event_t *events_,
int n_events_,
int found);
#if defined ZMQ_POLL_BASED_ON_POLL
int check_events (zmq::socket_poller_t::event_t *events_,
int n_events_);
#elif defined ZMQ_POLL_BASED_ON_SELECT
int check_events (zmq::socket_poller_t::event_t *events_, int n_events_,
fd_set& inset,
fd_set& outset,
fd_set& errset);
#endif
int adjust_timeout (zmq::clock_t& clock, long timeout_, uint64_t& now,
uint64_t& end,
bool& first_pass);
void rebuild (); void rebuild ();
// Used to check whether the object is a socket_poller. // Used to check whether the object is a socket_poller.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment