Commit de7fc1fc authored by Min RK's avatar Min RK

add n_events argument to zmq_poller_wait_all

avoids unnecessary heap allocations, races on the number of items
parent 187b4bff
...@@ -577,7 +577,7 @@ ZMQ_EXPORT int zmq_poller_add (void *poller, void *socket, void *user_data, sho ...@@ -577,7 +577,7 @@ ZMQ_EXPORT int zmq_poller_add (void *poller, void *socket, void *user_data, sho
ZMQ_EXPORT int zmq_poller_modify (void *poller, void *socket, short events); ZMQ_EXPORT int zmq_poller_modify (void *poller, void *socket, short events);
ZMQ_EXPORT int zmq_poller_remove (void *poller, void *socket); ZMQ_EXPORT int zmq_poller_remove (void *poller, void *socket);
ZMQ_EXPORT int zmq_poller_wait (void *poller, zmq_poller_event_t *event, long timeout); ZMQ_EXPORT int zmq_poller_wait (void *poller, zmq_poller_event_t *event, long timeout);
ZMQ_EXPORT int zmq_poller_wait_all (void *poller, zmq_poller_event_t *events, long timeout); ZMQ_EXPORT int zmq_poller_wait_all (void *poller, zmq_poller_event_t *events, int n_events, long timeout);
#if defined _WIN32 #if defined _WIN32
ZMQ_EXPORT int zmq_poller_add_fd (void *poller, SOCKET fd, void *user_data, short events); ZMQ_EXPORT int zmq_poller_add_fd (void *poller, SOCKET fd, void *user_data, short events);
......
...@@ -380,7 +380,7 @@ int zmq::socket_poller_t::rebuild () ...@@ -380,7 +380,7 @@ int zmq::socket_poller_t::rebuild ()
return 0; return 0;
} }
int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, long timeout_) int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_events_, long timeout_)
{ {
if (need_rebuild) if (need_rebuild)
if (rebuild () == -1) if (rebuild () == -1)
...@@ -441,7 +441,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, long tim ...@@ -441,7 +441,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, long tim
// Check for the events. // Check for the events.
int i=0; int i=0;
for (items_t::iterator it = items.begin (); it != items.end (); ++i, ++it) { for (items_t::iterator it = items.begin (); it != items.end () && i < n_events_; ++i, ++it) {
events_[i].socket = NULL; events_[i].socket = NULL;
events_[i].fd = 0; events_[i].fd = 0;
......
...@@ -73,7 +73,7 @@ namespace zmq ...@@ -73,7 +73,7 @@ namespace zmq
int modify_fd (fd_t fd, short events); int modify_fd (fd_t fd, short events);
int remove_fd (fd_t fd); int remove_fd (fd_t fd);
int wait (event_t *event, long timeout); int wait (event_t *event, int n_events, long timeout);
inline int size (void) { return items.size (); }; inline int size (void) { return items.size (); };
......
...@@ -1222,46 +1222,37 @@ int zmq_poller_wait (void *poller_, zmq_poller_event_t *event, long timeout_) ...@@ -1222,46 +1222,37 @@ int zmq_poller_wait (void *poller_, zmq_poller_event_t *event, long timeout_)
zmq_assert (event != NULL); zmq_assert (event != NULL);
int n_items = ((zmq::socket_poller_t*)poller_)->size (); int rc = zmq_poller_wait_all(poller_, event, 1, timeout_);
zmq_poller_event_t *events;
events = new zmq_poller_event_t[n_items];
alloc_assert(events);
int rc = zmq_poller_wait_all(poller_, events, timeout_); if (rc < 0) {
if (rc >= 0) {
*event = events[0];
} else {
memset (event, 0, sizeof(zmq_poller_event_t)); memset (event, 0, sizeof(zmq_poller_event_t));
} }
delete [] events;
return rc; return rc;
} }
int zmq_poller_wait_all (void *poller_, zmq_poller_event_t *events, long timeout_) int zmq_poller_wait_all (void *poller_, zmq_poller_event_t *events, int n_events, long timeout_)
{ {
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) { if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
errno = EFAULT; errno = EFAULT;
return -1; return -1;
} }
if (n_events < 0) {
errno = EINVAL;
return -1;
}
zmq_assert (events != NULL); zmq_assert (events != NULL);
zmq::socket_poller_t::event_t evts[n_events];
int n_items = ((zmq::socket_poller_t*)poller_)->size (); int rc = ((zmq::socket_poller_t*)poller_)->wait (evts, n_events, timeout_);
zmq::socket_poller_t::event_t *evts;
evts = new zmq::socket_poller_t::event_t[n_items];
alloc_assert(evts);
int rc = ((zmq::socket_poller_t*)poller_)->wait (evts, timeout_);
for(int i = 0; i < n_items; ++i) { for(int i = 0; i < n_events; ++i) {
events[i].socket = evts[i].socket; events[i].socket = evts[i].socket;
events[i].fd = evts[i].fd; events[i].fd = evts[i].fd;
events[i].user_data = evts[i].user_data; events[i].user_data = evts[i].user_data;
events[i].events = evts[i].events; events[i].events = evts[i].events;
} }
delete [] evts;
return rc; return rc;
} }
......
...@@ -80,7 +80,7 @@ int zmq_poller_add (void *poller, void *socket, void *user_data, short events); ...@@ -80,7 +80,7 @@ int zmq_poller_add (void *poller, void *socket, void *user_data, short events);
int zmq_poller_modify (void *poller, void *socket, short events); int zmq_poller_modify (void *poller, void *socket, short events);
int zmq_poller_remove (void *poller, void *socket); int zmq_poller_remove (void *poller, void *socket);
int zmq_poller_wait (void *poller, zmq_poller_event_t *event, long timeout); int zmq_poller_wait (void *poller, zmq_poller_event_t *event, long timeout);
int zmq_poller_wait_all (void *poller, zmq_poller_event_t *events, long timeout); int zmq_poller_wait_all (void *poller, zmq_poller_event_t *events, int n_events, long timeout);
#if defined _WIN32 #if defined _WIN32
int zmq_poller_add_fd (void *poller, SOCKET fd, void *user_data, short events); int zmq_poller_add_fd (void *poller, SOCKET fd, void *user_data, short events);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment