Problem: No support to query poller size

Solution: Add zmq_poller_size that queries the number
of objects registered, allowing safer usages of poller
to avoid livelock situations.
parent 4c1d720a
...@@ -14,6 +14,8 @@ SYNOPSIS ...@@ -14,6 +14,8 @@ SYNOPSIS
*int zmq_poller_destroy (void ****'poller_p');* *int zmq_poller_destroy (void ****'poller_p');*
*int zmq_poller_size (void *'poller');*
*int zmq_poller_add (void *'poller', void *'socket', void *'user_data', short 'events');* *int zmq_poller_add (void *'poller', void *'socket', void *'user_data', short 'events');*
*int zmq_poller_modify (void *'poller', void *'socket', short 'events');* *int zmq_poller_modify (void *'poller', void *'socket', short 'events');*
...@@ -51,6 +53,11 @@ instance. _zmq_poller_destroy_ sets the passed pointer to NULL in case of a ...@@ -51,6 +53,11 @@ instance. _zmq_poller_destroy_ sets the passed pointer to NULL in case of a
successful execution. _zmq_poller_destroy_ implicitly unregisters all successful execution. _zmq_poller_destroy_ implicitly unregisters all
registered sockets and file descriptors. registered sockets and file descriptors.
_zmq_poller_size_ queries the number of sockets or file descriptors registered
with a poller. The initial size of a poller is 0, a successful add operation
increases the size by 1 and a successful remove operation decreases the size
by 1. The size is unaffected by the events specified.
_zmq_poller_add_, _zmq_poller_modify_ and _zmq_poller_remove_ manage the 0MQ _zmq_poller_add_, _zmq_poller_modify_ and _zmq_poller_remove_ manage the 0MQ
sockets registered with a poller. sockets registered with a poller.
...@@ -129,7 +136,8 @@ registered objects. Otherwise, a livelock situation may result: If more than ...@@ -129,7 +136,8 @@ registered objects. Otherwise, a livelock situation may result: If more than
'n_events' registered objects have an active event on each call to 'n_events' registered objects have an active event on each call to
_zmq_poller_wait_all_, it might happen that the same subset of registered _zmq_poller_wait_all_, it might happen that the same subset of registered
objects is always returned, and the caller never notices the events on the objects is always returned, and the caller never notices the events on the
others. others. The number of objects registered can be queried with
_zmq_poller_size_.
_zmq_poller_wait_all_ returns the number of valid elements. The valid elements _zmq_poller_wait_all_ returns the number of valid elements. The valid elements
are placed in positions '0' to 'n_events - 1' in the 'events' array. All are placed in positions '0' to 'n_events - 1' in the 'events' array. All
...@@ -219,6 +227,12 @@ On _zmq_poller_destroy_: ...@@ -219,6 +227,12 @@ On _zmq_poller_destroy_:
_poller_p_ did not point to a valid poller. Note that passing an invalid pointer (e.g. _poller_p_ did not point to a valid poller. Note that passing an invalid pointer (e.g.
pointer to deallocated memory) may cause undefined behaviour (e.g. an access violation). pointer to deallocated memory) may cause undefined behaviour (e.g. an access violation).
On _zmq_poller_size_:
*EFAULT*::
_poller_ did not point to a valid poller. Note that passing an
invalid pointer (e.g. pointer to deallocated memory) may cause undefined
behaviour (e.g. an access violation).
On _zmq_poller_add_, _zmq_poller_modify_ and _zmq_poller_remove_: On _zmq_poller_add_, _zmq_poller_modify_ and _zmq_poller_remove_:
*EFAULT*:: *EFAULT*::
_poller_ did not point to a valid poller. Note that passing an _poller_ did not point to a valid poller. Note that passing an
...@@ -264,7 +278,7 @@ available. ...@@ -264,7 +278,7 @@ available.
*EAGAIN*:: *EAGAIN*::
No registered event was signalled before the timeout was reached. No registered event was signalled before the timeout was reached.
On _zmq_poller_fd: On _zmq_poller_fd_:
*EINVAL*:: *EINVAL*::
The poller has no associated file descriptor. The poller has no associated file descriptor.
*EFAULT*:: *EFAULT*::
......
...@@ -735,6 +735,7 @@ typedef struct zmq_poller_event_t ...@@ -735,6 +735,7 @@ typedef struct zmq_poller_event_t
ZMQ_EXPORT void *zmq_poller_new (void); ZMQ_EXPORT void *zmq_poller_new (void);
ZMQ_EXPORT int zmq_poller_destroy (void **poller_p); ZMQ_EXPORT int zmq_poller_destroy (void **poller_p);
ZMQ_EXPORT int zmq_poller_size (void *poller);
ZMQ_EXPORT int ZMQ_EXPORT int
zmq_poller_add (void *poller, void *socket, void *user_data, short events); zmq_poller_add (void *poller, void *socket, void *user_data, short events);
ZMQ_EXPORT int zmq_poller_modify (void *poller, void *socket, short events); ZMQ_EXPORT int zmq_poller_modify (void *poller, void *socket, short events);
......
...@@ -1219,6 +1219,14 @@ static int check_poller_fd_registration_args (void *const poller_, ...@@ -1219,6 +1219,14 @@ static int check_poller_fd_registration_args (void *const poller_,
return 0; return 0;
} }
int zmq_poller_size (void *poller_)
{
if (-1 == check_poller (poller_))
return -1;
return (static_cast<zmq::socket_poller_t *> (poller_))->size ();
}
int zmq_poller_add (void *poller_, void *s_, void *user_data_, short events_) int zmq_poller_add (void *poller_, void *s_, void *user_data_, short events_)
{ {
if (-1 == check_poller_registration_args (poller_, s_) if (-1 == check_poller_registration_args (poller_, s_)
......
...@@ -124,6 +124,7 @@ typedef struct zmq_poller_event_t ...@@ -124,6 +124,7 @@ typedef struct zmq_poller_event_t
void *zmq_poller_new (void); void *zmq_poller_new (void);
int zmq_poller_destroy (void **poller_p_); int zmq_poller_destroy (void **poller_p_);
int zmq_poller_size (void *poller_);
int zmq_poller_add (void *poller_, int zmq_poller_add (void *poller_,
void *socket_, void *socket_,
void *user_data_, void *user_data_,
......
...@@ -60,6 +60,17 @@ void test_null_poller_pointers_destroy_indirect () ...@@ -60,6 +60,17 @@ void test_null_poller_pointers_destroy_indirect ()
TEST_ASSERT_FAILURE_ERRNO (EFAULT, zmq_poller_destroy (&null_poller)); TEST_ASSERT_FAILURE_ERRNO (EFAULT, zmq_poller_destroy (&null_poller));
} }
void test_null_poller_pointers_size_direct ()
{
TEST_ASSERT_FAILURE_ERRNO (EFAULT, zmq_poller_size (NULL));
}
void test_null_poller_pointers_size_indirect ()
{
void *null_poller = NULL;
TEST_ASSERT_FAILURE_ERRNO (EFAULT, zmq_poller_size (&null_poller));
}
void test_null_poller_pointers_add_direct () void test_null_poller_pointers_add_direct ()
{ {
void *socket = test_context_socket (ZMQ_PAIR); void *socket = test_context_socket (ZMQ_PAIR);
...@@ -330,6 +341,42 @@ TEST_CASE_FUNC_PARAM (call_poller_wait_all_null_event_fails_event_count_nonzero, ...@@ -330,6 +341,42 @@ TEST_CASE_FUNC_PARAM (call_poller_wait_all_null_event_fails_event_count_nonzero,
TEST_CASE_FUNC_PARAM (call_poller_wait_all_null_event_fails_event_count_zero, TEST_CASE_FUNC_PARAM (call_poller_wait_all_null_event_fails_event_count_zero,
test_with_valid_poller) test_with_valid_poller)
void call_poller_size (void *poller_, void *socket_)
{
int rc = zmq_poller_size (poller_);
TEST_ASSERT_SUCCESS_ERRNO (rc);
TEST_ASSERT_EQUAL (rc, 0);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_poller_add (poller_, socket_, NULL, ZMQ_POLLIN));
rc = zmq_poller_size (poller_);
TEST_ASSERT_SUCCESS_ERRNO (rc);
TEST_ASSERT_EQUAL (rc, 1);
TEST_ASSERT_SUCCESS_ERRNO (zmq_poller_modify (poller_, socket_, 0));
rc = zmq_poller_size (poller_);
TEST_ASSERT_SUCCESS_ERRNO (rc);
TEST_ASSERT_EQUAL (rc, 1);
fd_t plain_socket = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_poller_add_fd (poller_, plain_socket, NULL, ZMQ_POLLOUT));
rc = zmq_poller_size (poller_);
TEST_ASSERT_SUCCESS_ERRNO (rc);
TEST_ASSERT_EQUAL (rc, 2);
TEST_ASSERT_SUCCESS_ERRNO (zmq_poller_remove (poller_, socket_));
rc = zmq_poller_size (poller_);
TEST_ASSERT_SUCCESS_ERRNO (rc);
TEST_ASSERT_EQUAL (rc, 1);
TEST_ASSERT_SUCCESS_ERRNO (zmq_poller_remove_fd (poller_, plain_socket));
TEST_ASSERT_SUCCESS_ERRNO (close (plain_socket));
rc = zmq_poller_size (poller_);
TEST_ASSERT_SUCCESS_ERRNO (rc);
TEST_ASSERT_EQUAL (rc, 0);
}
void call_poller_add_twice_fails (void *poller_, void *socket_) void call_poller_add_twice_fails (void *poller_, void *socket_)
{ {
TEST_ASSERT_SUCCESS_ERRNO ( TEST_ASSERT_SUCCESS_ERRNO (
...@@ -449,6 +496,7 @@ void call_poller_modify_fd_invalid_events_fails (void *poller_, ...@@ -449,6 +496,7 @@ void call_poller_modify_fd_invalid_events_fails (void *poller_,
TEST_ASSERT_SUCCESS_ERRNO (close (plain_socket)); TEST_ASSERT_SUCCESS_ERRNO (close (plain_socket));
} }
TEST_CASE_FUNC_PARAM (call_poller_size, test_with_empty_poller)
TEST_CASE_FUNC_PARAM (call_poller_add_twice_fails, test_with_empty_poller) TEST_CASE_FUNC_PARAM (call_poller_add_twice_fails, test_with_empty_poller)
TEST_CASE_FUNC_PARAM (call_poller_remove_unregistered_fails, TEST_CASE_FUNC_PARAM (call_poller_remove_unregistered_fails,
test_with_empty_poller) test_with_empty_poller)
...@@ -666,6 +714,8 @@ int main (void) ...@@ -666,6 +714,8 @@ int main (void)
UNITY_BEGIN (); UNITY_BEGIN ();
RUN_TEST (test_null_poller_pointers_destroy_direct); RUN_TEST (test_null_poller_pointers_destroy_direct);
RUN_TEST (test_null_poller_pointers_destroy_indirect); RUN_TEST (test_null_poller_pointers_destroy_indirect);
RUN_TEST (test_null_poller_pointers_size_direct);
RUN_TEST (test_null_poller_pointers_size_indirect);
RUN_TEST (test_null_poller_pointers_add_direct); RUN_TEST (test_null_poller_pointers_add_direct);
RUN_TEST (test_null_poller_pointers_add_indirect); RUN_TEST (test_null_poller_pointers_add_indirect);
RUN_TEST (test_null_poller_pointers_modify_direct); RUN_TEST (test_null_poller_pointers_modify_direct);
...@@ -690,6 +740,7 @@ int main (void) ...@@ -690,6 +740,7 @@ int main (void)
RUN_TEST (test_call_poller_wait_all_null_event_fails_event_count_nonzero); RUN_TEST (test_call_poller_wait_all_null_event_fails_event_count_nonzero);
RUN_TEST (test_call_poller_wait_all_null_event_fails_event_count_zero); RUN_TEST (test_call_poller_wait_all_null_event_fails_event_count_zero);
RUN_TEST (test_call_poller_size);
RUN_TEST (test_call_poller_add_twice_fails); RUN_TEST (test_call_poller_add_twice_fails);
RUN_TEST (test_call_poller_remove_unregistered_fails); RUN_TEST (test_call_poller_remove_unregistered_fails);
RUN_TEST (test_call_poller_modify_unregistered_fails); RUN_TEST (test_call_poller_modify_unregistered_fails);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment