Commit 8f3113b2 authored by Doron Somech's avatar Doron Somech Committed by GitHub

Merge pull request #2714 from sigiesec/add-poller-tests

Problem: insufficient tests for zmq_poller_*
parents 6f665eb9 f685a3ff
......@@ -271,13 +271,13 @@ int zmq::proxy (
// If one of receiving end's queue is full ('ZMQ_POLLOUT' not available),
// 'poller_wait' is pointed to 'poller_receive_blocked', 'poller_send_blocked' or 'poller_both_blocked'.
rc = poller_wait->wait (events, 3, -1);
if (rc < 0 && errno == ETIMEDOUT)
if (rc < 0 && errno == EAGAIN)
rc = 0;
CHECK_RC_EXIT_ON_FAILURE ();
// Some of events waited for by 'poller_wait' have arrived, now poll for everything without blocking.
rc = poller_all->wait (events, 3, 0);
if (rc < 0 && errno == ETIMEDOUT)
if (rc < 0 && errno == EAGAIN)
rc = 0;
CHECK_RC_EXIT_ON_FAILURE ();
......
......@@ -106,15 +106,16 @@ int zmq::socket_poller_t::add (socket_base_t *socket_, void* user_data_, short e
int thread_safe;
size_t thread_safe_size = sizeof(int);
if (socket_->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
return -1;
int rc =
socket_->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size);
zmq_assert (rc == 0);
if (thread_safe) {
if (signaler == NULL)
signaler = new signaler_t ();
if (socket_->add_signaler (signaler) == -1)
return -1;
rc = socket_->add_signaler (signaler);
zmq_assert (rc == 0);
}
item_t item = {socket_, 0, user_data_, events_
......@@ -236,7 +237,7 @@ int zmq::socket_poller_t::remove_fd (fd_t fd_)
return 0;
}
int zmq::socket_poller_t::rebuild ()
void zmq::socket_poller_t::rebuild ()
{
#if defined ZMQ_POLL_BASED_ON_POLL
......@@ -255,8 +256,9 @@ int zmq::socket_poller_t::rebuild ()
int thread_safe;
size_t thread_safe_size = sizeof(int);
if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
return -1;
int rc = it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe,
&thread_safe_size);
zmq_assert (rc == 0);
if (thread_safe) {
if (!use_signaler) {
......@@ -273,7 +275,7 @@ int zmq::socket_poller_t::rebuild ()
}
if (poll_size == 0)
return 0;
return;
pollfds = (pollfd*) malloc (poll_size * sizeof (pollfd));
alloc_assert (pollfds);
......@@ -292,14 +294,14 @@ int zmq::socket_poller_t::rebuild ()
int thread_safe;
size_t thread_safe_size = sizeof(int);
if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
return -1;
int rc = it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe,
&thread_safe_size);
zmq_assert (rc == 0);
if (!thread_safe) {
size_t fd_size = sizeof (zmq::fd_t);
if (it->socket->getsockopt (ZMQ_FD, &pollfds [item_nbr].fd, &fd_size) == -1) {
return -1;
}
rc = it->socket->getsockopt (ZMQ_FD, &pollfds [item_nbr].fd, &fd_size);
zmq_assert (rc == 0);
pollfds [item_nbr].events = POLLIN;
item_nbr++;
......@@ -336,8 +338,9 @@ int zmq::socket_poller_t::rebuild ()
int thread_safe;
size_t thread_safe_size = sizeof(int);
if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
return -1;
int rc = it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe,
&thread_safe_size);
zmq_assert (rc == 0);
if (thread_safe && it->events) {
use_signaler = true;
......@@ -359,14 +362,15 @@ int zmq::socket_poller_t::rebuild ()
int thread_safe;
size_t thread_safe_size = sizeof(int);
if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
return -1;
int rc = it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe,
&thread_safe_size);
zmq_assert (rc == 0);
if (!thread_safe) {
zmq::fd_t notify_fd;
size_t fd_size = sizeof (zmq::fd_t);
if (it->socket->getsockopt (ZMQ_FD, &notify_fd, &fd_size) == -1)
return -1;
rc = it->socket->getsockopt (ZMQ_FD, &notify_fd, &fd_size);
zmq_assert (rc == 0);
FD_SET (notify_fd, &pollset_in);
if (maxfd < notify_fd)
......@@ -395,22 +399,25 @@ int zmq::socket_poller_t::rebuild ()
#endif
need_rebuild = false;
return 0;
}
int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_events_, long timeout_)
{
if (items.empty () && timeout_ < 0) {
errno = EFAULT;
return -1;
}
if (need_rebuild)
if (rebuild () == -1)
return -1;
rebuild ();
#if defined ZMQ_POLL_BASED_ON_POLL
if (unlikely (poll_size == 0)) {
// We'll report an error (timed out) as if the list was non-empty and
// no event occured within the specified timeout. Otherwise the caller
// no event occurred within the specified timeout. Otherwise the caller
// needs to check the return value AND the event to avoid using the
// nullified event data.
errno = ETIMEDOUT;
errno = EAGAIN;
if (timeout_ == 0)
return -1;
#if defined ZMQ_HAVE_WINDOWS
......@@ -527,25 +534,22 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
continue;
}
// The timeout is finite and there are no events. In the first pass
// we get a timestamp of when the polling have begun. (We assume that
// first pass have taken negligible time). We also compute the time
// when the polling should time out.
// The timeout is finite but non-zero and there are no events. In the
// first pass, we get a timestamp of when the polling have begun.
// (We assume that first pass have taken negligible time). We also
// compute the time when the polling should time out.
now = clock.now_ms ();
if (first_pass) {
now = clock.now_ms ();
end = now + timeout_;
if (now == end)
break;
first_pass = false;
continue;
}
// Find out whether timeout have expired.
now = clock.now_ms ();
if (now >= end)
break;
}
errno = ETIMEDOUT;
errno = EAGAIN;
return -1;
#elif defined ZMQ_POLL_BASED_ON_SELECT
......@@ -555,7 +559,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
// no event occured within the specified timeout. Otherwise the caller
// needs to check the return value AND the event to avoid using the
// nullified event data.
errno = ETIMEDOUT;
errno = EAGAIN;
if (timeout_ == 0)
return -1;
#if defined ZMQ_HAVE_WINDOWS
......@@ -691,22 +695,19 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
// we get a timestamp of when the polling have begun. (We assume that
// first pass have taken negligible time). We also compute the time
// when the polling should time out.
now = clock.now_ms ();
if (first_pass) {
now = clock.now_ms ();
end = now + timeout_;
if (now == end)
break;
first_pass = false;
continue;
}
// Find out whether timeout have expired.
now = clock.now_ms ();
if (now >= end)
break;
}
errno = ETIMEDOUT;
errno = EAGAIN;
return -1;
#else
......
......@@ -81,7 +81,7 @@ namespace zmq
bool check_tag ();
private:
int rebuild ();
void rebuild ();
// Used to check whether the object is a socket_poller.
uint32_t tag;
......
......@@ -813,7 +813,7 @@ inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
rc = zmq_poller_wait_all (&poller, events, nitems_, timeout_);
if (rc < 0) {
delete [] events;
if (zmq_errno() == ETIMEDOUT) {
if (zmq_errno() == EAGAIN) {
return 0;
}
return rc;
......@@ -1269,6 +1269,10 @@ int zmq_poller_add_fd (void *poller_, int fd_, void *user_data_, short events_)
errno = EFAULT;
return -1;
}
if (fd_ == zmq::retired_fd) {
errno = EBADF;
return -1;
}
return ((zmq::socket_poller_t*)poller_)->add_fd (fd_, user_data_, events_);
}
......@@ -1301,6 +1305,10 @@ int zmq_poller_modify_fd (void *poller_, int fd_, short events_)
errno = EFAULT;
return -1;
}
if (fd_ == zmq::retired_fd) {
errno = EBADF;
return -1;
}
return ((zmq::socket_poller_t*)poller_)->modify_fd (fd_, events_);
}
......@@ -1332,43 +1340,51 @@ int zmq_poller_remove_fd (void *poller_, int fd_)
errno = EFAULT;
return -1;
}
if (fd_ == zmq::retired_fd) {
errno = EBADF;
return -1;
}
return ((zmq::socket_poller_t*)poller_)->remove_fd (fd_);
}
int zmq_poller_wait (void *poller_, zmq_poller_event_t *event, long timeout_)
int zmq_poller_wait (void *poller_, zmq_poller_event_t *event_, long timeout_)
{
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
errno = EFAULT;
return -1;
}
if (!event_) {
errno = EFAULT;
return -1;
}
zmq_assert (event != NULL);
int rc = zmq_poller_wait_all(poller_, event, 1, timeout_);
int rc = zmq_poller_wait_all(poller_, event_, 1, timeout_);
if (rc < 0) {
memset (event, 0, sizeof(zmq_poller_event_t));
memset (event_, 0, sizeof(zmq_poller_event_t));
}
// wait_all returns number of events, but we return 0 for any success
return rc >= 0 ? 0 : rc;
}
int zmq_poller_wait_all (void *poller_, zmq_poller_event_t *events, int n_events, long timeout_)
int zmq_poller_wait_all (void *poller_, zmq_poller_event_t *events_, int n_events, long timeout_)
{
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
errno = EFAULT;
return -1;
}
if (!events_) {
errno = EFAULT;
return -1;
}
if (n_events < 0) {
errno = EINVAL;
return -1;
}
zmq_assert (events != NULL);
int rc = ((zmq::socket_poller_t*)poller_)->wait ((zmq::socket_poller_t::event_t *)events, n_events, timeout_);
int rc = ((zmq::socket_poller_t*)poller_)->wait ((zmq::socket_poller_t::event_t *)events_, n_events, timeout_);
return rc;
}
......
......@@ -29,6 +29,229 @@
#include "testutil.hpp"
// duplicated from fd.hpp
#ifdef ZMQ_HAVE_WINDOWS
#if defined _MSC_VER &&_MSC_VER <= 1400
typedef UINT_PTR fd_t;
enum {retired_fd = (fd_t)(~0)};
#else
typedef SOCKET fd_t;
enum {retired_fd = (fd_t)INVALID_SOCKET};
#endif
#else
typedef int fd_t;
enum {retired_fd = -1};
#endif
void test_null_poller_pointers (void *ctx)
{
int rc = zmq_poller_destroy (NULL);
assert (rc == -1 && errno == EFAULT);
void *null_poller = NULL;
rc = zmq_poller_destroy (&null_poller);
assert (rc == -1 && errno == EFAULT);
void *socket = zmq_socket (ctx, ZMQ_PAIR);
assert (socket != NULL);
rc = zmq_poller_add (NULL, socket, NULL, ZMQ_POLLIN);
assert (rc == -1 && errno == EFAULT);
rc = zmq_poller_add (&null_poller, socket, NULL, ZMQ_POLLIN);
assert (rc == -1 && errno == EFAULT);
rc = zmq_poller_modify (NULL, socket, ZMQ_POLLIN);
assert (rc == -1 && errno == EFAULT);
rc = zmq_poller_modify (&null_poller, socket, ZMQ_POLLIN);
assert (rc == -1 && errno == EFAULT);
rc = zmq_poller_remove (NULL, socket);
assert (rc == -1 && errno == EFAULT);
rc = zmq_poller_remove (&null_poller, socket);
assert (rc == -1 && errno == EFAULT);
fd_t fd;
size_t fd_size = sizeof fd;
rc = zmq_getsockopt(socket, ZMQ_FD, &fd, &fd_size);
assert (rc == 0);
rc = zmq_poller_add_fd (NULL, fd, NULL, ZMQ_POLLIN);
assert (rc == -1 && errno == EFAULT);
rc = zmq_poller_add_fd (&null_poller, fd, NULL, ZMQ_POLLIN);
assert (rc == -1 && errno == EFAULT);
rc = zmq_poller_modify_fd (NULL, fd, ZMQ_POLLIN);
assert (rc == -1 && errno == EFAULT);
rc = zmq_poller_modify_fd (&null_poller, fd, ZMQ_POLLIN);
assert (rc == -1 && errno == EFAULT);
rc = zmq_poller_remove_fd (NULL, fd);
assert (rc == -1 && errno == EFAULT);
rc = zmq_poller_remove_fd (&null_poller, fd);
assert (rc == -1 && errno == EFAULT);
zmq_poller_event_t event;
rc = zmq_poller_wait (NULL, &event, 0);
assert (rc == -1 && errno == EFAULT);
rc = zmq_poller_wait (&null_poller, &event, 0);
assert (rc == -1 && errno == EFAULT);
rc = zmq_poller_wait_all (NULL, &event, 1, 0);
assert (rc == -1 && errno == EFAULT);
rc = zmq_poller_wait_all (&null_poller, &event, 1, 0);
assert (rc == -1 && errno == EFAULT);
rc = zmq_close (socket);
assert (rc == 0);
}
void test_null_socket_pointers ()
{
void *poller = zmq_poller_new ();
assert (poller != NULL);
int rc = zmq_poller_add (poller, NULL, NULL, ZMQ_POLLIN);
assert (rc == -1 && errno == ENOTSOCK);
rc = zmq_poller_modify (poller, NULL, ZMQ_POLLIN);
assert (rc == -1 && errno == ENOTSOCK);
rc = zmq_poller_remove (poller, NULL);
assert (rc == -1 && errno == ENOTSOCK);
fd_t null_socket_fd = retired_fd;
rc = zmq_poller_add_fd (poller, null_socket_fd, NULL, ZMQ_POLLIN);
assert (rc == -1 && errno == EBADF);
rc = zmq_poller_modify_fd (poller, null_socket_fd, ZMQ_POLLIN);
assert (rc == -1 && errno == EBADF);
rc = zmq_poller_remove_fd (poller, null_socket_fd);
assert (rc == -1 && errno == EBADF);
rc = zmq_poller_destroy (&poller);
assert (rc == 0);
}
void test_null_event_pointers (void *ctx)
{
void *socket = zmq_socket (ctx, ZMQ_PAIR);
assert (socket != NULL);
void *poller = zmq_poller_new ();
assert (poller != NULL);
int rc = zmq_poller_add (poller, socket, NULL, ZMQ_POLLIN);
assert (rc == 0);
rc = zmq_poller_wait (poller, NULL, 0);
assert (rc == -1 && errno == EFAULT);
rc = zmq_poller_wait_all (poller, NULL, 1, 0);
assert (rc == -1 && errno == EFAULT);
// TODO this causes an assertion, which is not consistent if the number
// of events may be 0, the pointer should be allowed to by NULL in that
// case too
#if 0
rc = zmq_poller_wait_all (poller, NULL, 0, 0);
assert (rc == 0);
#endif
rc = zmq_poller_destroy (&poller);
assert (rc == 0);
rc = zmq_close (socket);
assert (rc == 0);
}
void test_add_modify_remove_corner_cases(void *ctx)
{
void *poller = zmq_poller_new ();
assert (poller != NULL);
void *zeromq_socket = zmq_socket (ctx, ZMQ_PAIR);
assert (zeromq_socket != NULL);
int rc = zmq_poller_add (poller, zeromq_socket, NULL, ZMQ_POLLIN);
assert (rc == 0);
// attempt to add the same socket twice
rc = zmq_poller_add (poller, zeromq_socket, NULL, ZMQ_POLLIN);
assert (rc == -1 && errno == EINVAL);
rc = zmq_poller_remove (poller, zeromq_socket);
assert (rc == 0);
// attempt to remove socket that is not present
rc = zmq_poller_remove (poller, zeromq_socket);
assert (rc == -1 && errno == EINVAL);
// attempt to modify socket that is not present
rc = zmq_poller_modify (poller, zeromq_socket, ZMQ_POLLIN);
assert (rc == -1 && errno == EINVAL);
// add a socket with no events
// TODO should this really be legal? it does not make any sense...
rc = zmq_poller_add (poller, zeromq_socket, NULL, 0);
assert (rc == 0);
fd_t plain_socket = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
rc = zmq_poller_add_fd (poller, plain_socket, NULL, ZMQ_POLLIN);
assert (rc == 0);
// attempt to add the same plain socket twice
rc = zmq_poller_add_fd (poller, plain_socket, NULL, ZMQ_POLLIN);
assert (rc == -1 && errno == EINVAL);
rc = zmq_poller_remove_fd (poller, plain_socket);
assert (rc == 0);
// attempt to remove plain socket that is not present
rc = zmq_poller_remove_fd (poller, plain_socket);
assert (rc == -1 && errno == EINVAL);
// attempt to modify plain socket that is not present
rc = zmq_poller_modify_fd (poller, plain_socket, ZMQ_POLLIN);
assert (rc == -1 && errno == EINVAL);
rc = zmq_poller_destroy (&poller);
assert (rc == 0);
rc = zmq_close (zeromq_socket);
assert (rc == 0);
rc = close (plain_socket);
assert (rc == 0);
}
void test_wait_corner_cases (void *ctx)
{
void *poller = zmq_poller_new ();
assert (poller != NULL);
zmq_poller_event_t event;
int rc = zmq_poller_wait(poller, &event, 0);
assert (rc == -1 && errno == EAGAIN);
// this can never return since no socket was registered, and should yield an error
rc = zmq_poller_wait(poller, &event, -1);
assert (rc == -1 && errno == EFAULT);
rc = zmq_poller_wait_all (poller, &event, -1, 0);
assert (rc == -1 && errno == EINVAL);
rc = zmq_poller_wait_all (poller, &event, 0, 0);
assert (rc == -1 && errno == EAGAIN);
// this can never return since no socket was registered, and should yield an error
rc = zmq_poller_wait_all (poller, &event, 0, -1);
assert (rc == -1 && errno == EFAULT);
rc = zmq_poller_destroy (&poller);
assert (rc == 0);
}
int main (void)
{
size_t len = MAX_SOCKET_STRING;
......@@ -70,22 +293,22 @@ int main (void)
#endif
// Set up poller
void* poller = zmq_poller_new ();
void *poller = zmq_poller_new ();
zmq_poller_event_t event;
// waiting on poller with no registered sockets should report error
rc = zmq_poller_wait(poller, &event, 0);
rc = zmq_poller_wait (poller, &event, 0);
assert (rc == -1);
assert (errno == ETIMEDOUT);
assert (errno == EAGAIN);
// register sink
rc = zmq_poller_add (poller, sink, sink, ZMQ_POLLIN);
assert (rc == 0);
// Send a message
char data[1] = {'H'};
rc = zmq_send_const (vent, data, 1, 0);
assert (rc == 1);
assert (rc == 1);
// We expect a message only on the sink
rc = zmq_poller_wait (poller, &event, -1);
......@@ -98,7 +321,7 @@ int main (void)
// We expect timed out
rc = zmq_poller_wait (poller, &event, 0);
assert (rc == -1);
assert (errno == ETIMEDOUT);
assert (errno == EAGAIN);
// Stop polling sink
rc = zmq_poller_remove (poller, sink);
......@@ -108,13 +331,8 @@ int main (void)
rc = zmq_connect (bowl, my_endpoint_0);
assert (rc == 0);
#if defined _WIN32
SOCKET fd;
size_t fd_size = sizeof (SOCKET);
#else
int fd;
size_t fd_size = sizeof (int);
#endif
fd_t fd;
size_t fd_size = sizeof (fd);
rc = zmq_getsockopt (bowl, ZMQ_FD, &fd, &fd_size);
assert (rc == 0);
......@@ -138,12 +356,12 @@ int main (void)
rc = zmq_poller_wait (poller, &event, 500);
assert (rc == 0);
assert (event.socket == server);
assert (event.user_data == NULL);
assert (event.user_data == NULL);
rc = zmq_recv (server, data, 1, 0);
assert (rc == 1);
assert (rc == 1);
// Polling on pollout
rc = zmq_poller_modify (poller, server, ZMQ_POLLOUT | ZMQ_POLLIN);
rc = zmq_poller_modify (poller, server, ZMQ_POLLOUT | ZMQ_POLLIN);
assert (rc == 0);
rc = zmq_poller_wait (poller, &event, 0);
assert (rc == 0);
......@@ -156,7 +374,7 @@ int main (void)
assert (rc == 0);
#endif
// Destory sockets, poller and ctx
// Destroy sockets, poller and ctx
rc = zmq_close (sink);
assert (rc == 0);
rc = zmq_close (vent);
......@@ -170,15 +388,15 @@ int main (void)
assert (rc == 0);
#endif
// Test error - null poller pointers
rc = zmq_poller_destroy (NULL);
assert (rc == -1 && errno == EFAULT);
void *null_poller = NULL;
rc = zmq_poller_destroy (&null_poller);
assert (rc == -1 && errno == EFAULT);
test_null_poller_pointers (ctx);
test_null_socket_pointers ();
test_null_event_pointers (ctx);
test_add_modify_remove_corner_cases (ctx);
test_wait_corner_cases (ctx);
rc = zmq_poller_destroy (&poller);
assert(rc == 0);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
......
......@@ -47,11 +47,6 @@ void initialise_network (void)
throw std::runtime_error("Could not start WSA");
}
int close (int fd)
{
return closesocket (fd);
}
#else
void initialise_network (void)
......
......@@ -396,4 +396,13 @@ is_ipv6_available(void)
#endif // _WIN32_WINNT < 0x0600
}
#if defined (ZMQ_HAVE_WINDOWS)
int close (int fd)
{
return closesocket (fd);
}
#endif
#endif
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment