Commit 476ba22e authored by somdoron's avatar somdoron

ZMQ_Poller support more event types

parent 544d1e34
...@@ -440,19 +440,23 @@ typedef struct zmq_poller_event_t ...@@ -440,19 +440,23 @@ typedef struct zmq_poller_event_t
int fd; int fd;
#endif #endif
void *user_data; void *user_data;
short events;
} zmq_poller_event_t; } zmq_poller_event_t;
ZMQ_EXPORT void *zmq_poller_new (); ZMQ_EXPORT void *zmq_poller_new ();
ZMQ_EXPORT int zmq_poller_close (void *poller); ZMQ_EXPORT int zmq_poller_close (void *poller);
ZMQ_EXPORT int zmq_poller_add_socket (void *poller, void *socket, void *user_data); ZMQ_EXPORT int zmq_poller_add_socket (void *poller, void *socket, void *user_data, short events);
ZMQ_EXPORT int zmq_poller_modify_socket (void *poller, void *socket, short events);
ZMQ_EXPORT int zmq_poller_remove_socket (void *poller, void *socket); ZMQ_EXPORT int zmq_poller_remove_socket (void *poller, void *socket);
ZMQ_EXPORT int zmq_poller_wait (void *poller, zmq_poller_event_t *event, long timeout); ZMQ_EXPORT int zmq_poller_wait (void *poller, zmq_poller_event_t *event, long timeout);
#if defined _WIN32 #if defined _WIN32
ZMQ_EXPORT int zmq_poller_add_fd (void *poller, SOCKET fd, void *user_data); ZMQ_EXPORT int zmq_poller_add_fd (void *poller, SOCKET fd, void *user_data, short events);
ZMQ_EXPORT int zmq_poller_modify_fd (void *poller, SOCKET fd, short events);
ZMQ_EXPORT int zmq_poller_remove_fd (void *poller, SOCKET fd); ZMQ_EXPORT int zmq_poller_remove_fd (void *poller, SOCKET fd);
#else #else
ZMQ_EXPORT int zmq_poller_add_fd (void *poller, int fd, void *user_data); ZMQ_EXPORT int zmq_poller_add_fd (void *poller, int fd, void *user_data, short events);
ZMQ_EXPORT int zmq_poller_modify (void *poller, int fd, short events);
ZMQ_EXPORT int zmq_poller_remove_fd (void *poller, int fd); ZMQ_EXPORT int zmq_poller_remove_fd (void *poller, int fd);
#endif #endif
......
...@@ -61,7 +61,7 @@ bool zmq::socket_poller_t::check_tag () ...@@ -61,7 +61,7 @@ bool zmq::socket_poller_t::check_tag ()
return tag == 0xCAFEBABE; return tag == 0xCAFEBABE;
} }
int zmq::socket_poller_t::add_socket (void *socket_, void* user_data_) int zmq::socket_poller_t::add_socket (void *socket_, void* user_data_, short events_)
{ {
for (events_t::iterator it = events.begin (); it != events.end (); ++it) { for (events_t::iterator it = events.begin (); it != events.end (); ++it) {
if (it->socket == socket_) { if (it->socket == socket_) {
...@@ -81,7 +81,7 @@ int zmq::socket_poller_t::add_socket (void *socket_, void* user_data_) ...@@ -81,7 +81,7 @@ int zmq::socket_poller_t::add_socket (void *socket_, void* user_data_)
return -1; return -1;
} }
event_t event = {socket_, 0, user_data_}; event_t event = {socket_, 0, user_data_, events_};
events.push_back (event); events.push_back (event);
need_rebuild = true; need_rebuild = true;
...@@ -89,9 +89,9 @@ int zmq::socket_poller_t::add_socket (void *socket_, void* user_data_) ...@@ -89,9 +89,9 @@ int zmq::socket_poller_t::add_socket (void *socket_, void* user_data_)
} }
#if defined _WIN32 #if defined _WIN32
int zmq::socket_poller_t::add_fd (SOCKET fd_, void *user_data_) int zmq::socket_poller_t::add_fd (SOCKET fd_, void *user_data_, short events_)
#else #else
int zmq::socket_poller_t::add_fd (int fd_, void *user_data_) int zmq::socket_poller_t::add_fd (int fd_, void *user_data_, short events_)
#endif #endif
{ {
for (events_t::iterator it = events.begin (); it != events.end (); ++it) { for (events_t::iterator it = events.begin (); it != events.end (); ++it) {
...@@ -101,13 +101,59 @@ int zmq::socket_poller_t::add_fd (int fd_, void *user_data_) ...@@ -101,13 +101,59 @@ int zmq::socket_poller_t::add_fd (int fd_, void *user_data_)
} }
} }
event_t event = {NULL, fd_, user_data_}; event_t event = {NULL, fd_, user_data_, events_};
events.push_back (event); events.push_back (event);
need_rebuild = true; need_rebuild = true;
return 0; return 0;
} }
int zmq::socket_poller_t::modify_socket (void *socket_, short events_)
{
events_t::iterator it;
for (it = events.begin (); it != events.end (); ++it) {
if (it->socket == socket_)
break;
}
if (it == events.end()) {
errno = EINVAL;
return -1;
}
it->events = events_;
need_rebuild = true;
return 0;
}
#if defined _WIN32
int zmq::socket_poller_t::modify_fd (SOCKET fd_, short events_)
#else
int zmq::socket_poller_t::modify_fd (int fd_, short events_)
#endif
{
events_t::iterator it;
for (it = events.begin (); it != events.end (); ++it) {
if (!it->socket && it->fd == fd_)
break;
}
if (it == events.end()) {
errno = EINVAL;
return -1;
}
it->events = events_;
need_rebuild = true;
return 0;
}
int zmq::socket_poller_t::remove_socket (void* socket_) int zmq::socket_poller_t::remove_socket (void* socket_)
{ {
events_t::iterator it; events_t::iterator it;
...@@ -180,8 +226,9 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time ...@@ -180,8 +226,9 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time
} }
for (int i = 0; i < poll_size; i++) { for (int i = 0; i < poll_size; i++) {
if (poll_set [i].revents & ZMQ_POLLIN) { if ((poll_set [i].revents & poll_events [i].events) != 0) {
*event_ = poll_events[i]; *event_ = poll_events[i];
event_->events = poll_set [i].revents & poll_events [i].events;
break; break;
} }
...@@ -216,7 +263,7 @@ void zmq::socket_poller_t::rebuild () ...@@ -216,7 +263,7 @@ void zmq::socket_poller_t::rebuild ()
if (!it->socket) if (!it->socket)
poll_set [event_nbr].fd = it->fd; poll_set [event_nbr].fd = it->fd;
poll_set [event_nbr].events = ZMQ_POLLIN; poll_set [event_nbr].events = it->events;
poll_events [event_nbr] = *it; poll_events [event_nbr] = *it;
} }
......
...@@ -53,15 +53,19 @@ namespace zmq ...@@ -53,15 +53,19 @@ namespace zmq
int fd; int fd;
#endif #endif
void *user_data; void *user_data;
short events;
} event_t; } event_t;
int add_socket (void *socket, void *user_data); int add_socket (void *socket, void *user_data, short events);
int modify_socket (void *socket, short events);
int remove_socket (void *socket); int remove_socket (void *socket);
#if defined _WIN32 #if defined _WIN32
int add_fd (SOCKET fd, void *user_data); int add_fd (SOCKET fd, void *user_data, short events);
int mofify_fd (SOCKET fd, short events);
int remove_fd (SOCKET fd); int remove_fd (SOCKET fd);
#else #else
int add_fd (int fd, void *user_data); int add_fd (int fd, void *user_data, short events);
int modify_fd (int fd, short events);
int remove_fd (int fd); int remove_fd (int fd);
#endif #endif
......
...@@ -1579,20 +1579,20 @@ int zmq_poller_close (void *poller_) ...@@ -1579,20 +1579,20 @@ int zmq_poller_close (void *poller_)
return 0; return 0;
} }
int zmq_poller_add_socket (void *poller_, void *socket_, void *user_data_) int zmq_poller_add_socket (void *poller_, void *socket_, void *user_data_, short events_)
{ {
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) { if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
errno = EFAULT; errno = EFAULT;
return -1; return -1;
} }
return ((zmq::socket_poller_t*)poller_)->add_socket (socket_, user_data_); return ((zmq::socket_poller_t*)poller_)->add_socket (socket_, user_data_, events_);
} }
#if defined _WIN32 #if defined _WIN32
int zmq_poller_add_fd (void *poller_, SOCKET fd_, void *user_data_) int zmq_poller_add_fd (void *poller_, SOCKET fd_, void *user_data_, short events_)
#else #else
int zmq_poller_add_fd (void *poller_, int fd_, void *user_data_) int zmq_poller_add_fd (void *poller_, int fd_, void *user_data_, short events_)
#endif #endif
{ {
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) { if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
...@@ -1600,9 +1600,36 @@ int zmq_poller_add_fd (void *poller_, int fd_, void *user_data_) ...@@ -1600,9 +1600,36 @@ int zmq_poller_add_fd (void *poller_, int fd_, void *user_data_)
return -1; return -1;
} }
return ((zmq::socket_poller_t*)poller_)->add_fd (fd_, user_data_); return ((zmq::socket_poller_t*)poller_)->add_fd (fd_, user_data_, events_);
} }
int zmq_poller_modify_socket (void *poller_, void *socket_, short events_)
{
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::socket_poller_t*)poller_)->modify_socket (socket_, events_);
}
#if defined _WIN32
int zmq_poller_modify_fd (void *poller_, SOCKET fd_, short events_)
#else
int zmq_poller_modify_fd (void *poller_, int fd_, short events_)
#endif
{
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::socket_poller_t*)poller_)->modify_fd (fd_, events_);
}
int zmq_poller_remove_socket (void *poller_, void *socket) int zmq_poller_remove_socket (void *poller_, void *socket)
{ {
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) { if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
...@@ -1642,6 +1669,7 @@ int zmq_poller_wait (void *poller_, zmq_poller_event_t *event, long timeout_) ...@@ -1642,6 +1669,7 @@ int zmq_poller_wait (void *poller_, zmq_poller_event_t *event, long timeout_)
event->socket = e.socket; event->socket = e.socket;
event->fd = e.fd; event->fd = e.fd;
event->user_data = e.user_data; event->user_data = e.user_data;
event->events = e.events;
return rc; return rc;
} }
......
...@@ -61,7 +61,7 @@ int main (void) ...@@ -61,7 +61,7 @@ int main (void)
// Set up poller // Set up poller
void* poller = zmq_poller_new (); void* poller = zmq_poller_new ();
rc = zmq_poller_add_socket (poller, sink, sink); rc = zmq_poller_add_socket (poller, sink, sink, ZMQ_POLLIN);
assert (rc == 0); assert (rc == 0);
// Send a message // Send a message
...@@ -96,7 +96,7 @@ int main (void) ...@@ -96,7 +96,7 @@ int main (void)
rc = zmq_getsockopt (bowl, ZMQ_FD, &fd, &fd_size); rc = zmq_getsockopt (bowl, ZMQ_FD, &fd, &fd_size);
assert (rc == 0); assert (rc == 0);
rc = zmq_poller_add_fd (poller, fd, bowl); rc = zmq_poller_add_fd (poller, fd, bowl, ZMQ_POLLIN);
assert (rc == 0); assert (rc == 0);
rc = zmq_poller_wait (poller, &event, 500); rc = zmq_poller_wait (poller, &event, 500);
assert (rc == 0); assert (rc == 0);
...@@ -106,7 +106,8 @@ int main (void) ...@@ -106,7 +106,8 @@ int main (void)
zmq_poller_remove_fd (poller, fd); zmq_poller_remove_fd (poller, fd);
// Polling on thread safe sockets // Polling on thread safe sockets
zmq_poller_add_socket (poller, server, NULL); rc = zmq_poller_add_socket (poller, server, NULL, ZMQ_POLLIN);
assert (rc == 0);
rc = zmq_connect (client, "tcp://127.0.0.1:55557"); rc = zmq_connect (client, "tcp://127.0.0.1:55557");
assert (rc == 0); assert (rc == 0);
rc = zmq_send_const (client, data, 1, 0); rc = zmq_send_const (client, data, 1, 0);
...@@ -118,6 +119,15 @@ int main (void) ...@@ -118,6 +119,15 @@ int main (void)
rc = zmq_recv (server, data, 1, 0); rc = zmq_recv (server, data, 1, 0);
assert (rc == 1); assert (rc == 1);
// Polling on pollout
rc = zmq_poller_modify_socket (poller, server, ZMQ_POLLOUT | ZMQ_POLLIN);
assert (rc == 0);
rc = zmq_poller_wait (poller, &event, 0);
assert (rc == 0);
assert (event.socket == server);
assert (event.user_data == NULL);
assert (event.events == ZMQ_POLLOUT);
// Destory poller, sockets and ctx // Destory poller, sockets and ctx
rc = zmq_poller_close (poller); rc = zmq_poller_close (poller);
assert (rc == 0); assert (rc == 0);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment