Commit 718608ce authored by Min RK's avatar Min RK

socket_poller::wait returns only triggered events

Return value is the number of events found. This also propagates to the return value of zmq_poller_wait_all.

zmq_poller_wait was only returning events on the first-registered socket.
parent 872f1e5a
...@@ -412,7 +412,6 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev ...@@ -412,7 +412,6 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
uint64_t end = 0; uint64_t end = 0;
bool first_pass = true; bool first_pass = true;
bool found = false;
while (true) { while (true) {
// Compute the timeout for the subsequent poll. // Compute the timeout for the subsequent poll.
...@@ -440,13 +439,13 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev ...@@ -440,13 +439,13 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
signaler.recv (); signaler.recv ();
// Check for the events. // Check for the events.
int i = 0; int found = 0;
for (items_t::iterator it = items.begin (); it != items.end () && i < n_events_; ++i, ++it) { for (items_t::iterator it = items.begin (); it != items.end () && found < n_events_; ++it) {
events_[i].socket = NULL; events_[found].socket = NULL;
events_[i].fd = 0; events_[found].fd = 0;
events_[i].user_data = NULL; events_[found].user_data = NULL;
events_[i].events = 0; events_[found].events = 0;
// The poll item is a 0MQ socket. Retrieve pending events // The poll item is a 0MQ socket. Retrieve pending events
// using the ZMQ_EVENTS socket option. // using the ZMQ_EVENTS socket option.
...@@ -458,10 +457,10 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev ...@@ -458,10 +457,10 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
} }
if (it->events & events) { if (it->events & events) {
events_[i].socket = it->socket; events_[found].socket = it->socket;
events_[i].user_data = it->user_data; events_[found].user_data = it->user_data;
events_[i].events = it->events & events; events_[found].events = it->events & events;
found = true; ++found;
} }
} }
// Else, the poll item is a raw file descriptor, simply convert // Else, the poll item is a raw file descriptor, simply convert
...@@ -480,16 +479,22 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev ...@@ -480,16 +479,22 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
events |= ZMQ_POLLERR; events |= ZMQ_POLLERR;
if (events) { if (events) {
events_[i].socket = NULL; events_[found].socket = NULL;
events_[i].user_data = it->user_data; events_[found].user_data = it->user_data;
events_[i].fd = it->fd; events_[found].fd = it->fd;
events_[i].events = events; events_[found].events = events;
found = true; ++found;
} }
} }
} }
if (found) { if (found) {
return 0; for (int i = found; i < n_events_; ++i) {
events_[i].socket = NULL;
events_[i].fd = 0;
events_[i].user_data = NULL;
events_[i].events = 0;
}
return found;
} }
// If timeout is zero, exit immediately whether there are events or not. // If timeout is zero, exit immediately whether there are events or not.
...@@ -548,7 +553,6 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev ...@@ -548,7 +553,6 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
uint64_t end = 0; uint64_t end = 0;
bool first_pass = true; bool first_pass = true;
bool found = false;
fd_set inset, outset, errset; fd_set inset, outset, errset;
while (true) { while (true) {
...@@ -596,8 +600,8 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev ...@@ -596,8 +600,8 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
signaler.recv (); signaler.recv ();
// Check for the events. // Check for the events.
int i = 0; int found = 0;
for (items_t::iterator it = items.begin (); it != items.end () && i < n_events_; ++i, ++it) { for (items_t::iterator it = items.begin (); it != items.end () && found < n_events_; ++it) {
// The poll item is a 0MQ socket. Retrieve pending events // The poll item is a 0MQ socket. Retrieve pending events
// using the ZMQ_EVENTS socket option. // using the ZMQ_EVENTS socket option.
...@@ -608,10 +612,10 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev ...@@ -608,10 +612,10 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
return -1; return -1;
if (it->events & events) { if (it->events & events) {
events_[i].socket = it->socket; events_[found].socket = it->socket;
events_[i].user_data = it->user_data; events_[found].user_data = it->user_data;
events_[i].events = it->events & events; events_[found].events = it->events & events;
found = true; ++found;
} }
} }
// Else, the poll item is a raw file descriptor, simply convert // Else, the poll item is a raw file descriptor, simply convert
...@@ -627,16 +631,23 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev ...@@ -627,16 +631,23 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
events |= ZMQ_POLLERR; events |= ZMQ_POLLERR;
if (events) { if (events) {
events_[i].socket = NULL; events_[found].socket = NULL;
events_[i].user_data = it->user_data; events_[found].user_data = it->user_data;
events_[i].fd = it->fd; events_[found].fd = it->fd;
events_[i].events = events; events_[found].events = events;
found = true; ++found;
} }
} }
} }
if (found) { if (found) {
return 0; // zero-out remaining events
for (int i = found; i < n_events_; ++i) {
events_[i].socket = NULL;
events_[i].fd = 0;
events_[i].user_data = NULL;
events_[i].events = 0;
}
return found;
} }
// If timeout is zero, exit immediately whether there are events or not. // If timeout is zero, exit immediately whether there are events or not.
......
...@@ -755,6 +755,7 @@ inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) ...@@ -755,6 +755,7 @@ inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
// Register sockets with poller // Register sockets with poller
for (int i = 0; i < nitems_; i++) { for (int i = 0; i < nitems_; i++) {
items_[i].revents = 0;
if (items_[i].socket) { if (items_[i].socket) {
// Poll item is a 0MQ socket. // Poll item is a 0MQ socket.
rc = zmq_poller_add (poller, items_[i].socket, NULL, items_[i].events); rc = zmq_poller_add (poller, items_[i].socket, NULL, items_[i].events);
...@@ -782,9 +783,19 @@ inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) ...@@ -782,9 +783,19 @@ inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
return rc; return rc;
} }
// Put the event information where zmq_poll expects it to go. // Transform poller events into zmq_pollitem events
for (int i = 0; i < nitems_; i++) { // items_ contains all items, while events only contains fired events.
items_[i].revents = events[i].events; // The two are still co-ordered, so the step through items
// Checking for matches only on the first event
int found_events = rc;
for (int i = 0, j = 0; i < nitems_ && j < found_events; i++) {
if (
(items_[i].socket && items_[i].socket == events[j].socket) ||
(items_[i].fd && items_[i].fd == events[j].fd)
) {
items_[i].revents = events[j].events;
j++;
}
} }
// Cleanup // Cleanup
...@@ -1284,8 +1295,8 @@ int zmq_poller_wait (void *poller_, zmq_poller_event_t *event, long timeout_) ...@@ -1284,8 +1295,8 @@ int zmq_poller_wait (void *poller_, zmq_poller_event_t *event, long timeout_)
if (rc < 0) { if (rc < 0) {
memset (event, 0, sizeof(zmq_poller_event_t)); memset (event, 0, sizeof(zmq_poller_event_t));
} }
// wait_all returns number of events, but we return 0 for any success
return rc; return rc >= 0 ? 0 : rc;
} }
int zmq_poller_wait_all (void *poller_, zmq_poller_event_t *events, int n_events, long timeout_) int zmq_poller_wait_all (void *poller_, zmq_poller_event_t *events, int n_events, long timeout_)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment