Commit bdcaa935 authored by Min RK's avatar Min RK

zmq_poll calls zmq_poller if available

enables zmq_poll on threadsafe sockets only supported in zmq_poller (radio, dish, etc.)
parent c7518993
...@@ -742,10 +742,60 @@ const char *zmq_msg_gets (zmq_msg_t *msg_, const char *property_) ...@@ -742,10 +742,60 @@ const char *zmq_msg_gets (zmq_msg_t *msg_, const char *property_)
// Polling. // Polling.
#if defined ZMQ_HAVE_POLLER
inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
{
// implement zmq_poll on top of zmq_poller
int rc;
zmq_poller_event_t events[nitems_];
void *poller = zmq_poller_new ();
alloc_assert(poller);
// Register sockets with poller
for (int i = 0; i < nitems_; i++) {
if (items_[i].socket) {
// Poll item is a 0MQ socket.
rc = zmq_poller_add (poller, items_[i].socket, NULL, items_[i].events);
if (rc < 0) {
zmq_poller_destroy (&poller);
return rc;
}
} else {
// Poll item is a raw file descriptor.
rc = zmq_poller_add_fd (poller, items_[i].fd, NULL, items_[i].events);
if (rc < 0) {
zmq_poller_destroy (&poller);
return rc;
}
}
}
// Wait for events
rc = zmq_poller_wait_all (poller, events, nitems_, timeout_);
if (rc < 0) {
zmq_poller_destroy (&poller);
return rc;
}
// Put the event information where zmq_poll expects it to go.
for (int i = 0; i < nitems_; i++) {
items_[i].revents = events[i].events;
}
// Cleanup
rc = zmq_poller_destroy (&poller);
return rc;
}
#endif // ZMQ_HAVE_POLLER
int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
{ {
// TODO: the function implementation can just call zmq_pollfd_poll with // TODO: the function implementation can just call zmq_pollfd_poll with
// pollfd as NULL, however pollfd is not yet stable. // pollfd as NULL, however pollfd is not yet stable.
#if defined ZMQ_HAVE_POLLER
// if poller is present, use that.
return zmq_poller_poll(items_, nitems_, timeout_);
#else
#if defined ZMQ_POLL_BASED_ON_POLL #if defined ZMQ_POLL_BASED_ON_POLL
if (unlikely (nitems_ < 0)) { if (unlikely (nitems_ < 0)) {
errno = EINVAL; errno = EINVAL;
...@@ -1094,6 +1144,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) ...@@ -1094,6 +1144,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
errno = ENOTSUP; errno = ENOTSUP;
return -1; return -1;
#endif #endif
#endif // ZMQ_HAVE_POLLER
} }
// The poller functionality // The poller functionality
......
...@@ -161,6 +161,15 @@ int main (void) ...@@ -161,6 +161,15 @@ int main (void)
rc = msg_send (&msg, radio, "Movies", "Godfather"); rc = msg_send (&msg, radio, "Movies", "Godfather");
assert (rc == 9); assert (rc == 9);
// test zmq_poll with dish
zmq_pollitem_t items [] = {
{ radio, 0, ZMQ_POLLIN, 0 }, // read publications
{ dish, 0, ZMQ_POLLIN, 0 }, // read subscriptions
};
rc = zmq_poll(items, 2, 2000);
assert (rc == 0);
assert (items[1].revents == ZMQ_POLLIN);
// Check the correct message arrived // Check the correct message arrived
rc = msg_recv_cmp (&msg, dish, "Movies", "Godfather"); rc = msg_recv_cmp (&msg, dish, "Movies", "Godfather");
assert (rc == 9); assert (rc == 9);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment