Commit 187b4bff authored by Doron Somech's avatar Doron Somech Committed by GitHub

Merge pull request #2128 from minrk/multi-event-poller

Problem: zmq_poller only signals one event
parents 555a0877 2bc97966
...@@ -577,6 +577,7 @@ ZMQ_EXPORT int zmq_poller_add (void *poller, void *socket, void *user_data, sho ...@@ -577,6 +577,7 @@ ZMQ_EXPORT int zmq_poller_add (void *poller, void *socket, void *user_data, sho
ZMQ_EXPORT int zmq_poller_modify (void *poller, void *socket, short events); ZMQ_EXPORT int zmq_poller_modify (void *poller, void *socket, short events);
ZMQ_EXPORT int zmq_poller_remove (void *poller, void *socket); ZMQ_EXPORT int zmq_poller_remove (void *poller, void *socket);
ZMQ_EXPORT int zmq_poller_wait (void *poller, zmq_poller_event_t *event, long timeout); ZMQ_EXPORT int zmq_poller_wait (void *poller, zmq_poller_event_t *event, long timeout);
ZMQ_EXPORT int zmq_poller_wait_all (void *poller, zmq_poller_event_t *events, long timeout);
#if defined _WIN32 #if defined _WIN32
ZMQ_EXPORT int zmq_poller_add_fd (void *poller, SOCKET fd, void *user_data, short events); ZMQ_EXPORT int zmq_poller_add_fd (void *poller, SOCKET fd, void *user_data, short events);
......
...@@ -380,7 +380,7 @@ int zmq::socket_poller_t::rebuild () ...@@ -380,7 +380,7 @@ int zmq::socket_poller_t::rebuild ()
return 0; return 0;
} }
int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long timeout_) int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, long timeout_)
{ {
if (need_rebuild) if (need_rebuild)
if (rebuild () == -1) if (rebuild () == -1)
...@@ -412,6 +412,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time ...@@ -412,6 +412,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time
uint64_t end = 0; uint64_t end = 0;
bool first_pass = true; bool first_pass = true;
bool found = false;
while (true) { while (true) {
// Compute the timeout for the subsequent poll. // Compute the timeout for the subsequent poll.
...@@ -439,7 +440,13 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time ...@@ -439,7 +440,13 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time
signaler.recv (); signaler.recv ();
// Check for the events. // Check for the events.
for (items_t::iterator it = items.begin (); it != items.end (); ++it) { int i=0;
for (items_t::iterator it = items.begin (); it != items.end (); ++i, ++it) {
events_[i].socket = NULL;
events_[i].fd = 0;
events_[i].user_data = NULL;
events_[i].events = 0;
// The poll item is a 0MQ socket. Retrieve pending events // The poll item is a 0MQ socket. Retrieve pending events
// using the ZMQ_EVENTS socket option. // using the ZMQ_EVENTS socket option.
...@@ -451,12 +458,10 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time ...@@ -451,12 +458,10 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time
} }
if (it->events & events) { if (it->events & events) {
event_->socket = it->socket; events_[i].socket = it->socket;
event_->user_data = it->user_data; events_[i].user_data = it->user_data;
event_->events = it->events & events; events_[i].events = it->events & events;
found = true;
// If there is event to return, we can exit immediately.
return 0;
} }
} }
// Else, the poll item is a raw file descriptor, simply convert // Else, the poll item is a raw file descriptor, simply convert
...@@ -475,16 +480,17 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time ...@@ -475,16 +480,17 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time
events |= ZMQ_POLLERR; events |= ZMQ_POLLERR;
if (events) { if (events) {
event_->socket = NULL; events_[i].socket = NULL;
event_->user_data = it->user_data; events_[i].user_data = it->user_data;
event_->fd = it->fd; events_[i].fd = it->fd;
event_->events = events; events_[i].events = events;
found = true;
// If there is event to return, we can exit immediately.
return 0;
} }
} }
} }
if (found) {
return 0;
}
// If timeout is zero, exit immediately whether there are events or not. // If timeout is zero, exit immediately whether there are events or not.
if (timeout_ == 0) if (timeout_ == 0)
...@@ -516,7 +522,6 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time ...@@ -516,7 +522,6 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time
if (now >= end) if (now >= end)
break; break;
} }
errno = ETIMEDOUT; errno = ETIMEDOUT;
return -1; return -1;
......
...@@ -75,6 +75,8 @@ namespace zmq ...@@ -75,6 +75,8 @@ namespace zmq
int wait (event_t *event, long timeout); int wait (event_t *event, long timeout);
inline int size (void) { return items.size (); };
// Return false if object is not a socket. // Return false if object is not a socket.
bool check_tag (); bool check_tag ();
......
...@@ -1222,15 +1222,46 @@ int zmq_poller_wait (void *poller_, zmq_poller_event_t *event, long timeout_) ...@@ -1222,15 +1222,46 @@ int zmq_poller_wait (void *poller_, zmq_poller_event_t *event, long timeout_)
zmq_assert (event != NULL); zmq_assert (event != NULL);
zmq::socket_poller_t::event_t e; int n_items = ((zmq::socket_poller_t*)poller_)->size ();
memset (&e, 0, sizeof (e)); zmq_poller_event_t *events;
events = new zmq_poller_event_t[n_items];
alloc_assert(events);
int rc = ((zmq::socket_poller_t*)poller_)->wait (&e, timeout_); int rc = zmq_poller_wait_all(poller_, events, timeout_);
event->socket = e.socket; if (rc >= 0) {
event->fd = e.fd; *event = events[0];
event->user_data = e.user_data; } else {
event->events = e.events; memset (event, 0, sizeof(zmq_poller_event_t));
}
delete [] events;
return rc;
}
int zmq_poller_wait_all (void *poller_, zmq_poller_event_t *events, long timeout_)
{
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
errno = EFAULT;
return -1;
}
zmq_assert (events != NULL);
int n_items = ((zmq::socket_poller_t*)poller_)->size ();
zmq::socket_poller_t::event_t *evts;
evts = new zmq::socket_poller_t::event_t[n_items];
alloc_assert(evts);
int rc = ((zmq::socket_poller_t*)poller_)->wait (evts, timeout_);
for(int i = 0; i < n_items; ++i) {
events[i].socket = evts[i].socket;
events[i].fd = evts[i].fd;
events[i].user_data = evts[i].user_data;
events[i].events = evts[i].events;
}
delete [] evts;
return rc; return rc;
} }
......
...@@ -80,6 +80,7 @@ int zmq_poller_add (void *poller, void *socket, void *user_data, short events); ...@@ -80,6 +80,7 @@ int zmq_poller_add (void *poller, void *socket, void *user_data, short events);
int zmq_poller_modify (void *poller, void *socket, short events); int zmq_poller_modify (void *poller, void *socket, short events);
int zmq_poller_remove (void *poller, void *socket); int zmq_poller_remove (void *poller, void *socket);
int zmq_poller_wait (void *poller, zmq_poller_event_t *event, long timeout); int zmq_poller_wait (void *poller, zmq_poller_event_t *event, long timeout);
int zmq_poller_wait_all (void *poller, zmq_poller_event_t *events, long timeout);
#if defined _WIN32 #if defined _WIN32
int zmq_poller_add_fd (void *poller, SOCKET fd, void *user_data, short events); int zmq_poller_add_fd (void *poller, SOCKET fd, void *user_data, short events);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment