Commit e8e24030 authored by Simon Giesecke's avatar Simon Giesecke

Problem: network initialization and shutdown functions not available for

reuse

Solution: extract into functions defined in ip.hpp

Problem: signaler_t::make_fdpair not reusable

Solution: move make_fdpair to ip.hpp

Problem: epoll worker with no fds cannot be stopped

Solution: use interruptible epoll_pwait call

Problem: insufficient unit tests for poller

Solution: add test cases
parent ecb3b503
...@@ -35,6 +35,7 @@ ...@@ -35,6 +35,7 @@
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
#include <signal.h>
#include <algorithm> #include <algorithm>
#include <new> #include <new>
...@@ -45,8 +46,7 @@ ...@@ -45,8 +46,7 @@
#include "i_poll_events.hpp" #include "i_poll_events.hpp"
zmq::epoll_t::epoll_t (const zmq::thread_ctx_t &ctx_) : zmq::epoll_t::epoll_t (const zmq::thread_ctx_t &ctx_) :
ctx (ctx_), worker_poller_base_t (ctx_)
stopping (false)
{ {
#ifdef ZMQ_USE_EPOLL_CLOEXEC #ifdef ZMQ_USE_EPOLL_CLOEXEC
// Setting this option result in sane behaviour when exec() functions // Setting this option result in sane behaviour when exec() functions
...@@ -62,7 +62,7 @@ zmq::epoll_t::epoll_t (const zmq::thread_ctx_t &ctx_) : ...@@ -62,7 +62,7 @@ zmq::epoll_t::epoll_t (const zmq::thread_ctx_t &ctx_) :
zmq::epoll_t::~epoll_t () zmq::epoll_t::~epoll_t ()
{ {
// Wait till the worker thread exits. // Wait till the worker thread exits.
worker.stop (); stop_worker ();
close (epoll_fd); close (epoll_fd);
for (retired_t::iterator it = retired.begin (); it != retired.end (); for (retired_t::iterator it = retired.begin (); it != retired.end ();
...@@ -73,6 +73,7 @@ zmq::epoll_t::~epoll_t () ...@@ -73,6 +73,7 @@ zmq::epoll_t::~epoll_t ()
zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_) zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_)
{ {
check_thread ();
poll_entry_t *pe = new (std::nothrow) poll_entry_t; poll_entry_t *pe = new (std::nothrow) poll_entry_t;
alloc_assert (pe); alloc_assert (pe);
...@@ -96,6 +97,7 @@ zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_) ...@@ -96,6 +97,7 @@ zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_)
void zmq::epoll_t::rm_fd (handle_t handle_) void zmq::epoll_t::rm_fd (handle_t handle_)
{ {
check_thread ();
poll_entry_t *pe = (poll_entry_t *) handle_; poll_entry_t *pe = (poll_entry_t *) handle_;
int rc = epoll_ctl (epoll_fd, EPOLL_CTL_DEL, pe->fd, &pe->ev); int rc = epoll_ctl (epoll_fd, EPOLL_CTL_DEL, pe->fd, &pe->ev);
errno_assert (rc != -1); errno_assert (rc != -1);
...@@ -110,6 +112,7 @@ void zmq::epoll_t::rm_fd (handle_t handle_) ...@@ -110,6 +112,7 @@ void zmq::epoll_t::rm_fd (handle_t handle_)
void zmq::epoll_t::set_pollin (handle_t handle_) void zmq::epoll_t::set_pollin (handle_t handle_)
{ {
check_thread ();
poll_entry_t *pe = (poll_entry_t *) handle_; poll_entry_t *pe = (poll_entry_t *) handle_;
pe->ev.events |= EPOLLIN; pe->ev.events |= EPOLLIN;
int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
...@@ -118,6 +121,7 @@ void zmq::epoll_t::set_pollin (handle_t handle_) ...@@ -118,6 +121,7 @@ void zmq::epoll_t::set_pollin (handle_t handle_)
void zmq::epoll_t::reset_pollin (handle_t handle_) void zmq::epoll_t::reset_pollin (handle_t handle_)
{ {
check_thread ();
poll_entry_t *pe = (poll_entry_t *) handle_; poll_entry_t *pe = (poll_entry_t *) handle_;
pe->ev.events &= ~((short) EPOLLIN); pe->ev.events &= ~((short) EPOLLIN);
int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
...@@ -126,6 +130,7 @@ void zmq::epoll_t::reset_pollin (handle_t handle_) ...@@ -126,6 +130,7 @@ void zmq::epoll_t::reset_pollin (handle_t handle_)
void zmq::epoll_t::set_pollout (handle_t handle_) void zmq::epoll_t::set_pollout (handle_t handle_)
{ {
check_thread ();
poll_entry_t *pe = (poll_entry_t *) handle_; poll_entry_t *pe = (poll_entry_t *) handle_;
pe->ev.events |= EPOLLOUT; pe->ev.events |= EPOLLOUT;
int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
...@@ -134,20 +139,16 @@ void zmq::epoll_t::set_pollout (handle_t handle_) ...@@ -134,20 +139,16 @@ void zmq::epoll_t::set_pollout (handle_t handle_)
void zmq::epoll_t::reset_pollout (handle_t handle_) void zmq::epoll_t::reset_pollout (handle_t handle_)
{ {
check_thread ();
poll_entry_t *pe = (poll_entry_t *) handle_; poll_entry_t *pe = (poll_entry_t *) handle_;
pe->ev.events &= ~((short) EPOLLOUT); pe->ev.events &= ~((short) EPOLLOUT);
int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
errno_assert (rc != -1); errno_assert (rc != -1);
} }
void zmq::epoll_t::start ()
{
ctx.start_thread (worker, worker_routine, this);
}
void zmq::epoll_t::stop () void zmq::epoll_t::stop ()
{ {
stopping = true; check_thread ();
} }
int zmq::epoll_t::max_fds () int zmq::epoll_t::max_fds ()
...@@ -159,10 +160,18 @@ void zmq::epoll_t::loop () ...@@ -159,10 +160,18 @@ void zmq::epoll_t::loop ()
{ {
epoll_event ev_buf[max_io_events]; epoll_event ev_buf[max_io_events];
while (!stopping) { while (true) {
// Execute any due timers. // Execute any due timers.
int timeout = (int) execute_timers (); int timeout = (int) execute_timers ();
if (get_load () == 0) {
if (timeout == 0)
break;
// TODO sleep for timeout
continue;
}
// Wait for events. // Wait for events.
int n = epoll_wait (epoll_fd, &ev_buf[0], max_io_events, int n = epoll_wait (epoll_fd, &ev_buf[0], max_io_events,
timeout ? timeout : -1); timeout ? timeout : -1);
...@@ -199,9 +208,4 @@ void zmq::epoll_t::loop () ...@@ -199,9 +208,4 @@ void zmq::epoll_t::loop ()
} }
} }
void zmq::epoll_t::worker_routine (void *arg_)
{
((epoll_t *) arg_)->loop ();
}
#endif #endif
...@@ -50,7 +50,7 @@ struct i_poll_events; ...@@ -50,7 +50,7 @@ struct i_poll_events;
// This class implements socket polling mechanism using the Linux-specific // This class implements socket polling mechanism using the Linux-specific
// epoll mechanism. // epoll mechanism.
class epoll_t : public poller_base_t class epoll_t : public worker_poller_base_t
{ {
public: public:
typedef void *handle_t; typedef void *handle_t;
...@@ -65,21 +65,14 @@ class epoll_t : public poller_base_t ...@@ -65,21 +65,14 @@ class epoll_t : public poller_base_t
void reset_pollin (handle_t handle_); void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_); void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_); void reset_pollout (handle_t handle_);
void start ();
void stop (); void stop ();
static int max_fds (); static int max_fds ();
private: private:
// Main worker thread routine.
static void worker_routine (void *arg_);
// Main event loop. // Main event loop.
void loop (); void loop ();
// Reference to ZMQ context.
const thread_ctx_t &ctx;
// Main epoll file descriptor // Main epoll file descriptor
fd_t epoll_fd; fd_t epoll_fd;
...@@ -94,9 +87,6 @@ class epoll_t : public poller_base_t ...@@ -94,9 +87,6 @@ class epoll_t : public poller_base_t
typedef std::vector<poll_entry_t *> retired_t; typedef std::vector<poll_entry_t *> retired_t;
retired_t retired; retired_t retired;
// If true, thread is in the process of shutting down.
bool stopping;
// Handle of the physical thread doing the I/O work. // Handle of the physical thread doing the I/O work.
thread_t worker; thread_t worker;
......
This diff is collapsed.
...@@ -57,6 +57,16 @@ int set_nosigpipe (fd_t s_); ...@@ -57,6 +57,16 @@ int set_nosigpipe (fd_t s_);
// Binds the underlying socket to the given device, eg. VRF or interface // Binds the underlying socket to the given device, eg. VRF or interface
void bind_to_device (fd_t s_, std::string &bound_device_); void bind_to_device (fd_t s_, std::string &bound_device_);
// Initialize network subsystem. May be called multiple times. Each call must be matched by a call to shutdown_network.
bool initialize_network ();
// Shutdown network subsystem. Must be called once for each call to initialize_network before terminating.
void shutdown_network ();
// Creates a pair of sockets (using signaler_port on OS using TCP sockets).
// Returns -1 if we could not make the socket pair successfully
int make_fdpair (fd_t *r_, fd_t *w_);
} }
#endif #endif
This diff is collapsed.
...@@ -66,10 +66,6 @@ class signaler_t ...@@ -66,10 +66,6 @@ class signaler_t
#endif #endif
private: private:
// Creates a pair of file descriptors that will be used
// to pass the signals.
static int make_fdpair (fd_t *r_, fd_t *w_);
// Underlying write & read file descriptor // Underlying write & read file descriptor
// Will be -1 if an error occurred during initialization, e.g. we // Will be -1 if an error occurred during initialization, e.g. we
// exceeded the number of available handles // exceeded the number of available handles
......
...@@ -87,6 +87,7 @@ struct iovec ...@@ -87,6 +87,7 @@ struct iovec
#include "signaler.hpp" #include "signaler.hpp"
#include "socket_poller.hpp" #include "socket_poller.hpp"
#include "timers.hpp" #include "timers.hpp"
#include "ip.hpp"
#if defined ZMQ_HAVE_OPENPGM #if defined ZMQ_HAVE_OPENPGM
#define __PGM_WININT_H__ #define __PGM_WININT_H__
...@@ -121,42 +122,11 @@ int zmq_errno (void) ...@@ -121,42 +122,11 @@ int zmq_errno (void)
void *zmq_ctx_new (void) void *zmq_ctx_new (void)
{ {
#if defined ZMQ_HAVE_OPENPGM
// Init PGM transport. Ensure threading and timer are enabled. Find PGM
// protocol ID. Note that if you want to use gettimeofday and sleep for
// openPGM timing, set environment variables PGM_TIMER to "GTOD" and
// PGM_SLEEP to "USLEEP".
pgm_error_t *pgm_error = NULL;
const bool ok = pgm_init (&pgm_error);
if (ok != TRUE) {
// Invalid parameters don't set pgm_error_t
zmq_assert (pgm_error != NULL);
if (pgm_error->domain == PGM_ERROR_DOMAIN_TIME
&& (pgm_error->code == PGM_ERROR_FAILED)) {
// Failed to access RTC or HPET device.
pgm_error_free (pgm_error);
errno = EINVAL;
return NULL;
}
// PGM_ERROR_DOMAIN_ENGINE: WSAStartup errors or missing WSARecvMsg.
zmq_assert (false);
}
#endif
#ifdef ZMQ_HAVE_WINDOWS
// Intialise Windows sockets. Note that WSAStartup can be called multiple
// times given that WSACleanup will be called for each WSAStartup.
// We do this before the ctx constructor since its embedded mailbox_t // We do this before the ctx constructor since its embedded mailbox_t
// object needs Winsock to be up and running. // object needs the network to be up and running (at least on Windows).
WORD version_requested = MAKEWORD (2, 2); if (!zmq::initialize_network ()) {
WSADATA wsa_data; return NULL;
int rc = WSAStartup (version_requested, &wsa_data); }
zmq_assert (rc == 0);
zmq_assert (LOBYTE (wsa_data.wVersion) == 2
&& HIBYTE (wsa_data.wVersion) == 2);
#endif
// Create 0MQ context. // Create 0MQ context.
zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t; zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t;
...@@ -181,17 +151,7 @@ int zmq_ctx_term (void *ctx_) ...@@ -181,17 +151,7 @@ int zmq_ctx_term (void *ctx_)
// Shut down only if termination was not interrupted by a signal. // Shut down only if termination was not interrupted by a signal.
if (!rc || en != EINTR) { if (!rc || en != EINTR) {
#ifdef ZMQ_HAVE_WINDOWS zmq::shutdown_network ();
// On Windows, uninitialise socket layer.
rc = WSACleanup ();
wsa_assert (rc != SOCKET_ERROR);
#endif
#if defined ZMQ_HAVE_OPENPGM
// Shut down the OpenPGM library.
if (pgm_shutdown () != TRUE)
zmq_assert (false);
#endif
} }
errno = en; errno = en;
...@@ -722,7 +682,7 @@ const char *zmq_msg_gets (const zmq_msg_t *msg_, const char *property_) ...@@ -722,7 +682,7 @@ const char *zmq_msg_gets (const zmq_msg_t *msg_, const char *property_)
} }
} }
// Polling. // Polling.
#if defined ZMQ_HAVE_POLLER #if defined ZMQ_HAVE_POLLER
inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
......
...@@ -20,9 +20,15 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. ...@@ -20,9 +20,15 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
#include "../tests/testutil.hpp" #include "../tests/testutil.hpp"
#include <poller.hpp> #include <poller.hpp>
#include <i_poll_events.hpp>
#include <ip.hpp>
#include <unity.h> #include <unity.h>
#ifndef _WIN32
#define closesocket close
#endif
void setUp () void setUp ()
{ {
} }
...@@ -36,12 +42,177 @@ void test_create () ...@@ -36,12 +42,177 @@ void test_create ()
zmq::poller_t poller (thread_ctx); zmq::poller_t poller (thread_ctx);
} }
#if 0
// TODO this triggers an assertion. should it be a valid use case?
void test_start_empty ()
{
zmq::thread_ctx_t thread_ctx;
zmq::poller_t poller (thread_ctx);
poller.start ();
msleep (SETTLE_TIME);
}
#endif
struct test_events_t : zmq::i_poll_events
{
test_events_t (zmq::fd_t fd_, zmq::poller_t &poller_) :
fd (fd_),
poller (poller_)
{
}
virtual void in_event ()
{
in_events.add (1);
poller.rm_fd (handle);
handle = (zmq::poller_t::handle_t) NULL;
}
virtual void out_event ()
{
// TODO
}
virtual void timer_event (int id_)
{
LIBZMQ_UNUSED (id_);
timer_events.add (1);
poller.rm_fd (handle);
handle = (zmq::poller_t::handle_t) NULL;
}
void set_handle (zmq::poller_t::handle_t handle_) { handle = handle_; }
zmq::atomic_counter_t in_events, timer_events;
private:
zmq::fd_t fd;
zmq::poller_t &poller;
zmq::poller_t::handle_t handle;
};
void wait_in_events (test_events_t &events)
{
void *watch = zmq_stopwatch_start ();
while (events.in_events.get () < 1) {
#ifdef ZMQ_BUILD_DRAFT
TEST_ASSERT_LESS_OR_EQUAL_MESSAGE (SETTLE_TIME,
zmq_stopwatch_intermediate (watch),
"Timeout waiting for in event");
#endif
}
zmq_stopwatch_stop (watch);
}
void wait_timer_events (test_events_t &events)
{
void *watch = zmq_stopwatch_start ();
while (events.timer_events.get () < 1) {
#ifdef ZMQ_BUILD_DRAFT
TEST_ASSERT_LESS_OR_EQUAL_MESSAGE (SETTLE_TIME,
zmq_stopwatch_intermediate (watch),
"Timeout waiting for timer event");
#endif
}
zmq_stopwatch_stop (watch);
}
void create_nonblocking_fdpair (zmq::fd_t *r, zmq::fd_t *w)
{
int rc = zmq::make_fdpair (r, w);
TEST_ASSERT_EQUAL_INT (0, rc);
TEST_ASSERT_NOT_EQUAL (zmq::retired_fd, *r);
TEST_ASSERT_NOT_EQUAL (zmq::retired_fd, *w);
zmq::unblock_socket (*r);
zmq::unblock_socket (*w);
}
void send_signal (zmq::fd_t w)
{
#if defined ZMQ_HAVE_EVENTFD
const uint64_t inc = 1;
ssize_t sz = write (w, &inc, sizeof (inc));
assert (sz == sizeof (inc));
#else
{
char msg[] = "test";
int rc = send (w, msg, sizeof (msg), 0);
assert (rc == sizeof (msg));
}
#endif
}
void close_fdpair (zmq::fd_t w, zmq::fd_t r)
{
int rc = closesocket (w);
TEST_ASSERT_EQUAL_INT (0, rc);
#if !defined ZMQ_HAVE_EVENTFD
rc = closesocket (r);
TEST_ASSERT_EQUAL_INT (0, rc);
#else
LIBZMQ_UNUSED (r);
#endif
}
void test_add_fd_and_start_and_receive_data ()
{
zmq::thread_ctx_t thread_ctx;
zmq::poller_t poller (thread_ctx);
zmq::fd_t r, w;
create_nonblocking_fdpair (&r, &w);
test_events_t events (r, poller);
zmq::poller_t::handle_t handle = poller.add_fd (r, &events);
events.set_handle (handle);
poller.set_pollin (handle);
poller.start ();
send_signal (w);
wait_in_events (events);
// required cleanup
close_fdpair (w, r);
}
void test_add_fd_and_remove_by_timer ()
{
zmq::fd_t r, w;
create_nonblocking_fdpair (&r, &w);
zmq::thread_ctx_t thread_ctx;
zmq::poller_t poller (thread_ctx);
test_events_t events (r, poller);
zmq::poller_t::handle_t handle = poller.add_fd (r, &events);
events.set_handle (handle);
poller.add_timer (50, &events, 0);
poller.start ();
wait_timer_events (events);
// required cleanup
close_fdpair (w, r);
}
int main (void) int main (void)
{ {
UNITY_BEGIN ();
zmq::initialize_network ();
setup_test_environment (); setup_test_environment ();
UNITY_BEGIN ();
RUN_TEST (test_create); RUN_TEST (test_create);
RUN_TEST (test_add_fd_and_start_and_receive_data);
RUN_TEST (test_add_fd_and_remove_by_timer);
zmq::shutdown_network ();
return UNITY_END (); return UNITY_END ();
} }
...@@ -35,12 +35,53 @@ void test_create () ...@@ -35,12 +35,53 @@ void test_create ()
zmq::ypipe_t<int, 1> ypipe; zmq::ypipe_t<int, 1> ypipe;
} }
void test_check_read_empty ()
{
zmq::ypipe_t<int, 1> ypipe;
TEST_ASSERT_FALSE (ypipe.check_read ());
}
void test_read_empty ()
{
zmq::ypipe_t<int, 1> ypipe;
int read_value = -1;
TEST_ASSERT_FALSE (ypipe.read (&read_value));
TEST_ASSERT_EQUAL (-1, read_value);
}
void test_write_complete_and_check_read_and_read ()
{
const int value = 42;
zmq::ypipe_t<int, 1> ypipe;
ypipe.write (value, false);
TEST_ASSERT_FALSE (ypipe.check_read ());
int read_value = -1;
TEST_ASSERT_FALSE (ypipe.read (&read_value));
TEST_ASSERT_EQUAL_INT (-1, read_value);
}
void test_write_complete_and_flush_and_check_read_and_read ()
{
const int value = 42;
zmq::ypipe_t<int, 1> ypipe;
ypipe.write (value, false);
ypipe.flush ();
TEST_ASSERT_TRUE (ypipe.check_read ());
int read_value = -1;
TEST_ASSERT_TRUE (ypipe.read (&read_value));
TEST_ASSERT_EQUAL_INT (value, read_value);
}
int main (void) int main (void)
{ {
setup_test_environment (); setup_test_environment ();
UNITY_BEGIN (); UNITY_BEGIN ();
RUN_TEST (test_create); RUN_TEST (test_create);
RUN_TEST (test_check_read_empty);
RUN_TEST (test_read_empty);
RUN_TEST (test_write_complete_and_check_read_and_read);
RUN_TEST (test_write_complete_and_flush_and_check_read_and_read);
return UNITY_END (); return UNITY_END ();
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment