Unverified Commit bc467f06 authored by Luca Boccassi's avatar Luca Boccassi Committed by GitHub

Merge pull request #2944 from sigiesec/unity

Problem: insufficient unit tests for poller concept and ypipe
parents e57afec8 94743fd2
...@@ -580,7 +580,136 @@ set (cxx-sources ...@@ -580,7 +580,136 @@ set (cxx-sources
udp_address.cpp udp_address.cpp
scatter.cpp scatter.cpp
gather.cpp gather.cpp
zap_client.cpp) zap_client.cpp
# at least for VS, the header files must also be listed
address.hpp
array.hpp
atomic_counter.hpp
atomic_ptr.hpp
blob.hpp
client.hpp
clock.hpp
command.hpp
condition_variable.hpp
config.hpp
ctx.hpp
curve_client.hpp
curve_client_tools.hpp
curve_mechanism_base.hpp
curve_server.hpp
dbuffer.hpp
dealer.hpp
decoder.hpp
decoder_allocators.hpp
devpoll.hpp
dgram.hpp
dish.hpp
dist.hpp
encoder.hpp
epoll.hpp
err.hpp
fd.hpp
fq.hpp
gather.hpp
gssapi_client.hpp
gssapi_mechanism_base.hpp
gssapi_server.hpp
i_decoder.hpp
i_encoder.hpp
i_engine.hpp
i_mailbox.hpp
i_poll_events.hpp
io_object.hpp
io_thread.hpp
ip.hpp
ipc_address.hpp
ipc_connecter.hpp
ipc_listener.hpp
kqueue.hpp
lb.hpp
likely.hpp
macros.hpp
mailbox.hpp
mailbox_safe.hpp
mechanism.hpp
mechanism_base.hpp
metadata.hpp
msg.hpp
mtrie.hpp
mutex.hpp
norm_engine.hpp
null_mechanism.hpp
object.hpp
options.hpp
own.hpp
pair.hpp
pgm_receiver.hpp
pgm_sender.hpp
pgm_socket.hpp
pipe.hpp
plain_client.hpp
plain_server.hpp
poll.hpp
poller.hpp
poller_base.hpp
pollset.hpp
precompiled.hpp
proxy.hpp
pub.hpp
pull.hpp
push.hpp
radio.hpp
random.hpp
raw_decoder.hpp
raw_encoder.hpp
reaper.hpp
rep.hpp
req.hpp
router.hpp
scatter.hpp
select.hpp
server.hpp
session_base.hpp
signaler.hpp
socket_base.hpp
socket_poller.hpp
socks.hpp
socks_connecter.hpp
stdint.hpp
stream.hpp
stream_engine.hpp
sub.hpp
tcp.hpp
tcp_address.hpp
tcp_connecter.hpp
tcp_listener.hpp
thread.hpp
timers.hpp
tipc_address.hpp
tipc_connecter.hpp
tipc_listener.hpp
trie.hpp
udp_address.hpp
udp_engine.hpp
v1_decoder.hpp
v1_encoder.hpp
v2_decoder.hpp
v2_encoder.hpp
v2_protocol.hpp
vmci.hpp
vmci_address.hpp
vmci_connecter.hpp
vmci_listener.hpp
windows.hpp
wire.hpp
xpub.hpp
xsub.hpp
ypipe.hpp
ypipe_base.hpp
ypipe_conflate.hpp
yqueue.hpp
zap_client.hpp
)
if (MINGW) if (MINGW)
# Generate the right type when using -m32 or -m64 # Generate the right type when using -m32 or -m64
......
...@@ -625,7 +625,8 @@ tests_test_atomics_SOURCES = tests/test_atomics.cpp ...@@ -625,7 +625,8 @@ tests_test_atomics_SOURCES = tests/test_atomics.cpp
tests_test_atomics_LDADD = src/libzmq.la tests_test_atomics_LDADD = src/libzmq.la
tests_test_sockopt_hwm_SOURCES = tests/test_sockopt_hwm.cpp tests_test_sockopt_hwm_SOURCES = tests/test_sockopt_hwm.cpp
tests_test_sockopt_hwm_LDADD = src/libzmq.la tests_test_sockopt_hwm_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_sockopt_hwm_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_setsockopt_SOURCES = tests/test_setsockopt.cpp tests_test_setsockopt_SOURCES = tests/test_setsockopt.cpp
tests_test_setsockopt_LDADD = src/libzmq.la tests_test_setsockopt_LDADD = src/libzmq.la
......
version: build-{build} version: build-{build}
clone_depth: 1 shallow_clone: true
skip_tags: true skip_tags: true
...@@ -15,6 +15,15 @@ environment: ...@@ -15,6 +15,15 @@ environment:
configuration: Release configuration: Release
WITH_LIBSODIUM: ON WITH_LIBSODIUM: ON
ENABLE_CURVE: ON ENABLE_CURVE: ON
- platform: Win32
configuration: Release
POLLER: poll
WITH_LIBSODIUM: ON
ENABLE_CURVE: ON
APPVEYOR_BUILD_WORKER_IMAGE: Visual Studio 2017
CMAKE_GENERATOR: "Visual Studio 15 2017"
MSVCVERSION: "v141"
MSVCYEAR: "vs2017"
- platform: Win32 - platform: Win32
configuration: Debug configuration: Debug
WITH_LIBSODIUM: ON WITH_LIBSODIUM: ON
...@@ -51,15 +60,6 @@ environment: ...@@ -51,15 +60,6 @@ environment:
CMAKE_GENERATOR: "Visual Studio 15 2017" CMAKE_GENERATOR: "Visual Studio 15 2017"
MSVCVERSION: "v141" MSVCVERSION: "v141"
MSVCYEAR: "vs2017" MSVCYEAR: "vs2017"
- platform: Win32
configuration: Release
POLLER: poll
WITH_LIBSODIUM: ON
ENABLE_CURVE: ON
APPVEYOR_BUILD_WORKER_IMAGE: Visual Studio 2017
CMAKE_GENERATOR: "Visual Studio 15 2017"
MSVCVERSION: "v141"
MSVCYEAR: "vs2017"
matrix: matrix:
fast_finish: false fast_finish: false
......
...@@ -44,5 +44,6 @@ if [ "$DO_CLANG_FORMAT_CHECK" -eq "1" ] ; then ...@@ -44,5 +44,6 @@ if [ "$DO_CLANG_FORMAT_CHECK" -eq "1" ] ; then
exit 1 exit 1
fi fi
else else
( PKG_CONFIG_PATH=${BUILD_PREFIX}/lib/pkgconfig cmake "${CMAKE_OPTS[@]}" .. && make -j5 all VERBOSE=1 && make install && make -j5 test ) || exit 1 export CTEST_OUTPUT_ON_FAILURE=1
( PKG_CONFIG_PATH=${BUILD_PREFIX}/lib/pkgconfig cmake "${CMAKE_OPTS[@]}" .. && make -j5 all VERBOSE=1 && make install && make -j5 test ARGS="-V" ) || exit 1
fi fi
...@@ -37,7 +37,7 @@ ...@@ -37,7 +37,7 @@
#elif defined ZMQ_HAVE_ATOMIC_INTRINSICS #elif defined ZMQ_HAVE_ATOMIC_INTRINSICS
#define ZMQ_ATOMIC_COUNTER_INTRINSIC #define ZMQ_ATOMIC_COUNTER_INTRINSIC
#elif (defined __cplusplus && __cplusplus >= 201103L) \ #elif (defined __cplusplus && __cplusplus >= 201103L) \
|| (defined _MSC_VER && _MSC_VER >= 1700) || (defined _MSC_VER && _MSC_VER >= 1900)
#define ZMQ_ATOMIC_COUNTER_CXX11 #define ZMQ_ATOMIC_COUNTER_CXX11
#elif (defined __i386__ || defined __x86_64__) && defined __GNUC__ #elif (defined __i386__ || defined __x86_64__) && defined __GNUC__
#define ZMQ_ATOMIC_COUNTER_X86 #define ZMQ_ATOMIC_COUNTER_X86
......
...@@ -34,8 +34,8 @@ ...@@ -34,8 +34,8 @@
#define ZMQ_ATOMIC_PTR_MUTEX #define ZMQ_ATOMIC_PTR_MUTEX
#elif defined ZMQ_HAVE_ATOMIC_INTRINSICS #elif defined ZMQ_HAVE_ATOMIC_INTRINSICS
#define ZMQ_ATOMIC_PTR_INTRINSIC #define ZMQ_ATOMIC_PTR_INTRINSIC
#elif ((defined __cplusplus && __cplusplus >= 201103L) \ #elif (defined __cplusplus && __cplusplus >= 201103L) \
|| (defined _MSC_VER && _MSC_VER >= 1700)) || (defined _MSC_VER && _MSC_VER >= 1900)
#define ZMQ_ATOMIC_PTR_CXX11 #define ZMQ_ATOMIC_PTR_CXX11
#elif (defined __i386__ || defined __x86_64__) && defined __GNUC__ #elif (defined __i386__ || defined __x86_64__) && defined __GNUC__
#define ZMQ_ATOMIC_PTR_X86 #define ZMQ_ATOMIC_PTR_X86
......
...@@ -35,6 +35,7 @@ ...@@ -35,6 +35,7 @@
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
#include <signal.h>
#include <algorithm> #include <algorithm>
#include <new> #include <new>
...@@ -45,8 +46,7 @@ ...@@ -45,8 +46,7 @@
#include "i_poll_events.hpp" #include "i_poll_events.hpp"
zmq::epoll_t::epoll_t (const zmq::thread_ctx_t &ctx_) : zmq::epoll_t::epoll_t (const zmq::thread_ctx_t &ctx_) :
ctx (ctx_), worker_poller_base_t (ctx_)
stopping (false)
{ {
#ifdef ZMQ_USE_EPOLL_CLOEXEC #ifdef ZMQ_USE_EPOLL_CLOEXEC
// Setting this option result in sane behaviour when exec() functions // Setting this option result in sane behaviour when exec() functions
...@@ -62,7 +62,7 @@ zmq::epoll_t::epoll_t (const zmq::thread_ctx_t &ctx_) : ...@@ -62,7 +62,7 @@ zmq::epoll_t::epoll_t (const zmq::thread_ctx_t &ctx_) :
zmq::epoll_t::~epoll_t () zmq::epoll_t::~epoll_t ()
{ {
// Wait till the worker thread exits. // Wait till the worker thread exits.
worker.stop (); stop_worker ();
close (epoll_fd); close (epoll_fd);
for (retired_t::iterator it = retired.begin (); it != retired.end (); for (retired_t::iterator it = retired.begin (); it != retired.end ();
...@@ -73,6 +73,7 @@ zmq::epoll_t::~epoll_t () ...@@ -73,6 +73,7 @@ zmq::epoll_t::~epoll_t ()
zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_) zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_)
{ {
check_thread ();
poll_entry_t *pe = new (std::nothrow) poll_entry_t; poll_entry_t *pe = new (std::nothrow) poll_entry_t;
alloc_assert (pe); alloc_assert (pe);
...@@ -96,6 +97,7 @@ zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_) ...@@ -96,6 +97,7 @@ zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_)
void zmq::epoll_t::rm_fd (handle_t handle_) void zmq::epoll_t::rm_fd (handle_t handle_)
{ {
check_thread ();
poll_entry_t *pe = (poll_entry_t *) handle_; poll_entry_t *pe = (poll_entry_t *) handle_;
int rc = epoll_ctl (epoll_fd, EPOLL_CTL_DEL, pe->fd, &pe->ev); int rc = epoll_ctl (epoll_fd, EPOLL_CTL_DEL, pe->fd, &pe->ev);
errno_assert (rc != -1); errno_assert (rc != -1);
...@@ -110,6 +112,7 @@ void zmq::epoll_t::rm_fd (handle_t handle_) ...@@ -110,6 +112,7 @@ void zmq::epoll_t::rm_fd (handle_t handle_)
void zmq::epoll_t::set_pollin (handle_t handle_) void zmq::epoll_t::set_pollin (handle_t handle_)
{ {
check_thread ();
poll_entry_t *pe = (poll_entry_t *) handle_; poll_entry_t *pe = (poll_entry_t *) handle_;
pe->ev.events |= EPOLLIN; pe->ev.events |= EPOLLIN;
int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
...@@ -118,6 +121,7 @@ void zmq::epoll_t::set_pollin (handle_t handle_) ...@@ -118,6 +121,7 @@ void zmq::epoll_t::set_pollin (handle_t handle_)
void zmq::epoll_t::reset_pollin (handle_t handle_) void zmq::epoll_t::reset_pollin (handle_t handle_)
{ {
check_thread ();
poll_entry_t *pe = (poll_entry_t *) handle_; poll_entry_t *pe = (poll_entry_t *) handle_;
pe->ev.events &= ~((short) EPOLLIN); pe->ev.events &= ~((short) EPOLLIN);
int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
...@@ -126,6 +130,7 @@ void zmq::epoll_t::reset_pollin (handle_t handle_) ...@@ -126,6 +130,7 @@ void zmq::epoll_t::reset_pollin (handle_t handle_)
void zmq::epoll_t::set_pollout (handle_t handle_) void zmq::epoll_t::set_pollout (handle_t handle_)
{ {
check_thread ();
poll_entry_t *pe = (poll_entry_t *) handle_; poll_entry_t *pe = (poll_entry_t *) handle_;
pe->ev.events |= EPOLLOUT; pe->ev.events |= EPOLLOUT;
int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
...@@ -134,20 +139,16 @@ void zmq::epoll_t::set_pollout (handle_t handle_) ...@@ -134,20 +139,16 @@ void zmq::epoll_t::set_pollout (handle_t handle_)
void zmq::epoll_t::reset_pollout (handle_t handle_) void zmq::epoll_t::reset_pollout (handle_t handle_)
{ {
check_thread ();
poll_entry_t *pe = (poll_entry_t *) handle_; poll_entry_t *pe = (poll_entry_t *) handle_;
pe->ev.events &= ~((short) EPOLLOUT); pe->ev.events &= ~((short) EPOLLOUT);
int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
errno_assert (rc != -1); errno_assert (rc != -1);
} }
void zmq::epoll_t::start ()
{
ctx.start_thread (worker, worker_routine, this);
}
void zmq::epoll_t::stop () void zmq::epoll_t::stop ()
{ {
stopping = true; check_thread ();
} }
int zmq::epoll_t::max_fds () int zmq::epoll_t::max_fds ()
...@@ -159,10 +160,18 @@ void zmq::epoll_t::loop () ...@@ -159,10 +160,18 @@ void zmq::epoll_t::loop ()
{ {
epoll_event ev_buf[max_io_events]; epoll_event ev_buf[max_io_events];
while (!stopping) { while (true) {
// Execute any due timers. // Execute any due timers.
int timeout = (int) execute_timers (); int timeout = (int) execute_timers ();
if (get_load () == 0) {
if (timeout == 0)
break;
// TODO sleep for timeout
continue;
}
// Wait for events. // Wait for events.
int n = epoll_wait (epoll_fd, &ev_buf[0], max_io_events, int n = epoll_wait (epoll_fd, &ev_buf[0], max_io_events,
timeout ? timeout : -1); timeout ? timeout : -1);
...@@ -199,9 +208,4 @@ void zmq::epoll_t::loop () ...@@ -199,9 +208,4 @@ void zmq::epoll_t::loop ()
} }
} }
void zmq::epoll_t::worker_routine (void *arg_)
{
((epoll_t *) arg_)->loop ();
}
#endif #endif
...@@ -50,7 +50,7 @@ struct i_poll_events; ...@@ -50,7 +50,7 @@ struct i_poll_events;
// This class implements socket polling mechanism using the Linux-specific // This class implements socket polling mechanism using the Linux-specific
// epoll mechanism. // epoll mechanism.
class epoll_t : public poller_base_t class epoll_t : public worker_poller_base_t
{ {
public: public:
typedef void *handle_t; typedef void *handle_t;
...@@ -65,21 +65,14 @@ class epoll_t : public poller_base_t ...@@ -65,21 +65,14 @@ class epoll_t : public poller_base_t
void reset_pollin (handle_t handle_); void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_); void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_); void reset_pollout (handle_t handle_);
void start ();
void stop (); void stop ();
static int max_fds (); static int max_fds ();
private: private:
// Main worker thread routine.
static void worker_routine (void *arg_);
// Main event loop. // Main event loop.
void loop (); void loop ();
// Reference to ZMQ context.
const thread_ctx_t &ctx;
// Main epoll file descriptor // Main epoll file descriptor
fd_t epoll_fd; fd_t epoll_fd;
...@@ -94,9 +87,6 @@ class epoll_t : public poller_base_t ...@@ -94,9 +87,6 @@ class epoll_t : public poller_base_t
typedef std::vector<poll_entry_t *> retired_t; typedef std::vector<poll_entry_t *> retired_t;
retired_t retired; retired_t retired;
// If true, thread is in the process of shutting down.
bool stopping;
// Handle of the physical thread doing the I/O work. // Handle of the physical thread doing the I/O work.
thread_t worker; thread_t worker;
......
...@@ -78,8 +78,8 @@ int wsa_error_to_errno (int errcode); ...@@ -78,8 +78,8 @@ int wsa_error_to_errno (int errcode);
if (unlikely (!(x))) { \ if (unlikely (!(x))) { \
const char *errstr = zmq::wsa_error (); \ const char *errstr = zmq::wsa_error (); \
if (errstr != NULL) { \ if (errstr != NULL) { \
fprintf (stderr, "Assertion failed: %s (%s:%d)\n", errstr, \ fprintf (stderr, "Assertion failed: %s [%i] (%s:%d)\n", \
__FILE__, __LINE__); \ errstr, WSAGetLastError (), __FILE__, __LINE__); \
fflush (stderr); \ fflush (stderr); \
zmq::zmq_abort (errstr); \ zmq::zmq_abort (errstr); \
} \ } \
......
...@@ -112,8 +112,7 @@ zmq::poller_t *zmq::io_thread_t::get_poller () ...@@ -112,8 +112,7 @@ zmq::poller_t *zmq::io_thread_t::get_poller ()
void zmq::io_thread_t::process_stop () void zmq::io_thread_t::process_stop ()
{ {
if (mailbox_handle) { zmq_assert (mailbox_handle);
poller->rm_fd (mailbox_handle); poller->rm_fd (mailbox_handle);
}
poller->stop (); poller->stop ();
} }
...@@ -31,6 +31,7 @@ ...@@ -31,6 +31,7 @@
#include "ip.hpp" #include "ip.hpp"
#include "err.hpp" #include "err.hpp"
#include "macros.hpp" #include "macros.hpp"
#include "config.hpp"
#if !defined ZMQ_HAVE_WINDOWS #if !defined ZMQ_HAVE_WINDOWS
#include <fcntl.h> #include <fcntl.h>
...@@ -39,12 +40,26 @@ ...@@ -39,12 +40,26 @@
#include <netdb.h> #include <netdb.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <netinet/tcp.h> #include <netinet/tcp.h>
#else
#include "tcp.hpp"
#endif #endif
#if defined ZMQ_HAVE_OPENVMS #if defined ZMQ_HAVE_OPENVMS
#include <ioctl.h> #include <ioctl.h>
#endif #endif
#if defined ZMQ_HAVE_EVENTFD
#include <sys/eventfd.h>
#endif
#if defined ZMQ_HAVE_OPENPGM
#ifdef ZMQ_HAVE_WINDOWS
#define __PGM_WININT_H__
#endif
#include <pgm/pgm.h>
#endif
zmq::fd_t zmq::open_socket (int domain_, int type_, int protocol_) zmq::fd_t zmq::open_socket (int domain_, int type_, int protocol_)
{ {
int rc; int rc;
...@@ -229,3 +244,368 @@ void zmq::bind_to_device (fd_t s_, std::string &bound_device_) ...@@ -229,3 +244,368 @@ void zmq::bind_to_device (fd_t s_, std::string &bound_device_)
LIBZMQ_UNUSED (bound_device_); LIBZMQ_UNUSED (bound_device_);
#endif #endif
} }
bool zmq::initialize_network ()
{
#if defined ZMQ_HAVE_OPENPGM
// Init PGM transport. Ensure threading and timer are enabled. Find PGM
// protocol ID. Note that if you want to use gettimeofday and sleep for
// openPGM timing, set environment variables PGM_TIMER to "GTOD" and
// PGM_SLEEP to "USLEEP".
pgm_error_t *pgm_error = NULL;
const bool ok = pgm_init (&pgm_error);
if (ok != TRUE) {
// Invalid parameters don't set pgm_error_t
zmq_assert (pgm_error != NULL);
if (pgm_error->domain == PGM_ERROR_DOMAIN_TIME
&& (pgm_error->code == PGM_ERROR_FAILED)) {
// Failed to access RTC or HPET device.
pgm_error_free (pgm_error);
errno = EINVAL;
return false;
}
// PGM_ERROR_DOMAIN_ENGINE: WSAStartup errors or missing WSARecvMsg.
zmq_assert (false);
}
#endif
#ifdef ZMQ_HAVE_WINDOWS
// Intialise Windows sockets. Note that WSAStartup can be called multiple
// times given that WSACleanup will be called for each WSAStartup.
WORD version_requested = MAKEWORD (2, 2);
WSADATA wsa_data;
int rc = WSAStartup (version_requested, &wsa_data);
zmq_assert (rc == 0);
zmq_assert (LOBYTE (wsa_data.wVersion) == 2
&& HIBYTE (wsa_data.wVersion) == 2);
#endif
return true;
}
void zmq::shutdown_network ()
{
#ifdef ZMQ_HAVE_WINDOWS
// On Windows, uninitialise socket layer.
int rc = WSACleanup ();
wsa_assert (rc != SOCKET_ERROR);
#endif
#if defined ZMQ_HAVE_OPENPGM
// Shut down the OpenPGM library.
if (pgm_shutdown () != TRUE)
zmq_assert (false);
#endif
}
#if defined ZMQ_HAVE_WINDOWS
static void tune_socket (const SOCKET socket)
{
BOOL tcp_nodelay = 1;
int rc = setsockopt (socket, IPPROTO_TCP, TCP_NODELAY,
(char *) &tcp_nodelay, sizeof tcp_nodelay);
wsa_assert (rc != SOCKET_ERROR);
zmq::tcp_tune_loopback_fast_path (socket);
}
#endif
int zmq::make_fdpair (fd_t *r_, fd_t *w_)
{
#if defined ZMQ_HAVE_EVENTFD
int flags = 0;
#if defined ZMQ_HAVE_EVENTFD_CLOEXEC
// Setting this option result in sane behaviour when exec() functions
// are used. Old sockets are closed and don't block TCP ports, avoid
// leaks, etc.
flags |= EFD_CLOEXEC;
#endif
fd_t fd = eventfd (0, flags);
if (fd == -1) {
errno_assert (errno == ENFILE || errno == EMFILE);
*w_ = *r_ = -1;
return -1;
} else {
*w_ = *r_ = fd;
return 0;
}
#elif defined ZMQ_HAVE_WINDOWS
#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
// Windows CE does not manage security attributes
SECURITY_DESCRIPTOR sd;
SECURITY_ATTRIBUTES sa;
memset (&sd, 0, sizeof sd);
memset (&sa, 0, sizeof sa);
InitializeSecurityDescriptor (&sd, SECURITY_DESCRIPTOR_REVISION);
SetSecurityDescriptorDacl (&sd, TRUE, 0, FALSE);
sa.nLength = sizeof (SECURITY_ATTRIBUTES);
sa.lpSecurityDescriptor = &sd;
#endif
// This function has to be in a system-wide critical section so that
// two instances of the library don't accidentally create signaler
// crossing the process boundary.
// We'll use named event object to implement the critical section.
// Note that if the event object already exists, the CreateEvent requests
// EVENT_ALL_ACCESS access right. If this fails, we try to open
// the event object asking for SYNCHRONIZE access only.
HANDLE sync = NULL;
// Create critical section only if using fixed signaler port
// Use problematic Event implementation for compatibility if using old port 5905.
// Otherwise use Mutex implementation.
int event_signaler_port = 5905;
if (signaler_port == event_signaler_port) {
#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
sync =
CreateEventW (&sa, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
#else
sync =
CreateEventW (NULL, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
#endif
if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
sync = OpenEventW (SYNCHRONIZE | EVENT_MODIFY_STATE, FALSE,
L"Global\\zmq-signaler-port-sync");
win_assert (sync != NULL);
} else if (signaler_port != 0) {
wchar_t mutex_name[MAX_PATH];
#ifdef __MINGW32__
_snwprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d",
signaler_port);
#else
swprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d",
signaler_port);
#endif
#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
sync = CreateMutexW (&sa, FALSE, mutex_name);
#else
sync = CreateMutexW (NULL, FALSE, mutex_name);
#endif
if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
sync = OpenMutexW (SYNCHRONIZE, FALSE, mutex_name);
win_assert (sync != NULL);
}
// Windows has no 'socketpair' function. CreatePipe is no good as pipe
// handles cannot be polled on. Here we create the socketpair by hand.
*w_ = INVALID_SOCKET;
*r_ = INVALID_SOCKET;
// Create listening socket.
SOCKET listener;
listener = open_socket (AF_INET, SOCK_STREAM, 0);
wsa_assert (listener != INVALID_SOCKET);
// Set SO_REUSEADDR and TCP_NODELAY on listening socket.
BOOL so_reuseaddr = 1;
int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
(char *) &so_reuseaddr, sizeof so_reuseaddr);
wsa_assert (rc != SOCKET_ERROR);
tune_socket (listener);
// Init sockaddr to signaler port.
struct sockaddr_in addr;
memset (&addr, 0, sizeof addr);
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
addr.sin_port = htons (signaler_port);
// Create the writer socket.
*w_ = open_socket (AF_INET, SOCK_STREAM, 0);
wsa_assert (*w_ != INVALID_SOCKET);
// Set TCP_NODELAY on writer socket.
tune_socket (*w_);
if (sync != NULL) {
// Enter the critical section.
DWORD dwrc = WaitForSingleObject (sync, INFINITE);
zmq_assert (dwrc == WAIT_OBJECT_0 || dwrc == WAIT_ABANDONED);
}
// Bind listening socket to signaler port.
rc = bind (listener, (const struct sockaddr *) &addr, sizeof addr);
if (rc != SOCKET_ERROR && signaler_port == 0) {
// Retrieve ephemeral port number
int addrlen = sizeof addr;
rc = getsockname (listener, (struct sockaddr *) &addr, &addrlen);
}
// Listen for incoming connections.
if (rc != SOCKET_ERROR)
rc = listen (listener, 1);
// Connect writer to the listener.
if (rc != SOCKET_ERROR)
rc = connect (*w_, (struct sockaddr *) &addr, sizeof addr);
// Accept connection from writer.
if (rc != SOCKET_ERROR)
*r_ = accept (listener, NULL, NULL);
// Send/receive large chunk to work around TCP slow start
// This code is a workaround for #1608
if (*r_ != INVALID_SOCKET) {
size_t dummy_size =
1024 * 1024; // 1M to overload default receive buffer
unsigned char *dummy = (unsigned char *) malloc (dummy_size);
wsa_assert (dummy);
int still_to_send = (int) dummy_size;
int still_to_recv = (int) dummy_size;
while (still_to_send || still_to_recv) {
int nbytes;
if (still_to_send > 0) {
nbytes =
::send (*w_, (char *) (dummy + dummy_size - still_to_send),
still_to_send, 0);
wsa_assert (nbytes != SOCKET_ERROR);
still_to_send -= nbytes;
}
nbytes = ::recv (*r_, (char *) (dummy + dummy_size - still_to_recv),
still_to_recv, 0);
wsa_assert (nbytes != SOCKET_ERROR);
still_to_recv -= nbytes;
}
free (dummy);
}
// Save errno if error occurred in bind/listen/connect/accept.
int saved_errno = 0;
if (*r_ == INVALID_SOCKET)
saved_errno = WSAGetLastError ();
// We don't need the listening socket anymore. Close it.
rc = closesocket (listener);
wsa_assert (rc != SOCKET_ERROR);
if (sync != NULL) {
// Exit the critical section.
BOOL brc;
if (signaler_port == event_signaler_port)
brc = SetEvent (sync);
else
brc = ReleaseMutex (sync);
win_assert (brc != 0);
// Release the kernel object
brc = CloseHandle (sync);
win_assert (brc != 0);
}
if (*r_ != INVALID_SOCKET) {
#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
// On Windows, preventing sockets to be inherited by child processes.
BOOL brc = SetHandleInformation ((HANDLE) *r_, HANDLE_FLAG_INHERIT, 0);
win_assert (brc);
#endif
return 0;
} else {
// Cleanup writer if connection failed
if (*w_ != INVALID_SOCKET) {
rc = closesocket (*w_);
wsa_assert (rc != SOCKET_ERROR);
*w_ = INVALID_SOCKET;
}
// Set errno from saved value
errno = wsa_error_to_errno (saved_errno);
return -1;
}
#elif defined ZMQ_HAVE_OPENVMS
// Whilst OpenVMS supports socketpair - it maps to AF_INET only. Further,
// it does not set the socket options TCP_NODELAY and TCP_NODELACK which
// can lead to performance problems.
//
// The bug will be fixed in V5.6 ECO4 and beyond. In the meantime, we'll
// create the socket pair manually.
struct sockaddr_in lcladdr;
memset (&lcladdr, 0, sizeof lcladdr);
lcladdr.sin_family = AF_INET;
lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
lcladdr.sin_port = 0;
int listener = open_socket (AF_INET, SOCK_STREAM, 0);
errno_assert (listener != -1);
int on = 1;
int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
errno_assert (rc != -1);
rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
errno_assert (rc != -1);
rc = bind (listener, (struct sockaddr *) &lcladdr, sizeof lcladdr);
errno_assert (rc != -1);
socklen_t lcladdr_len = sizeof lcladdr;
rc = getsockname (listener, (struct sockaddr *) &lcladdr, &lcladdr_len);
errno_assert (rc != -1);
rc = listen (listener, 1);
errno_assert (rc != -1);
*w_ = open_socket (AF_INET, SOCK_STREAM, 0);
errno_assert (*w_ != -1);
rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
errno_assert (rc != -1);
rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
errno_assert (rc != -1);
rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr);
errno_assert (rc != -1);
*r_ = accept (listener, NULL, NULL);
errno_assert (*r_ != -1);
close (listener);
return 0;
#else
// All other implementations support socketpair()
int sv[2];
int type = SOCK_STREAM;
// Setting this option result in sane behaviour when exec() functions
// are used. Old sockets are closed and don't block TCP ports, avoid
// leaks, etc.
#if defined ZMQ_HAVE_SOCK_CLOEXEC
type |= SOCK_CLOEXEC;
#endif
int rc = socketpair (AF_UNIX, type, 0, sv);
if (rc == -1) {
errno_assert (errno == ENFILE || errno == EMFILE);
*w_ = *r_ = -1;
return -1;
} else {
// If there's no SOCK_CLOEXEC, let's try the second best option. Note that
// race condition can cause socket not to be closed (if fork happens
// between socket creation and this point).
#if !defined ZMQ_HAVE_SOCK_CLOEXEC && defined FD_CLOEXEC
rc = fcntl (sv[0], F_SETFD, FD_CLOEXEC);
errno_assert (rc != -1);
rc = fcntl (sv[1], F_SETFD, FD_CLOEXEC);
errno_assert (rc != -1);
#endif
*w_ = sv[0];
*r_ = sv[1];
return 0;
}
#endif
}
...@@ -57,6 +57,16 @@ int set_nosigpipe (fd_t s_); ...@@ -57,6 +57,16 @@ int set_nosigpipe (fd_t s_);
// Binds the underlying socket to the given device, eg. VRF or interface // Binds the underlying socket to the given device, eg. VRF or interface
void bind_to_device (fd_t s_, std::string &bound_device_); void bind_to_device (fd_t s_, std::string &bound_device_);
// Initialize network subsystem. May be called multiple times. Each call must be matched by a call to shutdown_network.
bool initialize_network ();
// Shutdown network subsystem. Must be called once for each call to initialize_network before terminating.
void shutdown_network ();
// Creates a pair of sockets (using signaler_port on OS using TCP sockets).
// Returns -1 if we could not make the socket pair successfully
int make_fdpair (fd_t *r_, fd_t *w_);
} }
#endif #endif
...@@ -55,8 +55,7 @@ ...@@ -55,8 +55,7 @@
#endif #endif
zmq::kqueue_t::kqueue_t (const zmq::thread_ctx_t &ctx_) : zmq::kqueue_t::kqueue_t (const zmq::thread_ctx_t &ctx_) :
ctx (ctx_), worker_poller_base_t (ctx_)
stopping (false)
{ {
// Create event queue // Create event queue
kqueue_fd = kqueue (); kqueue_fd = kqueue ();
...@@ -68,12 +67,13 @@ zmq::kqueue_t::kqueue_t (const zmq::thread_ctx_t &ctx_) : ...@@ -68,12 +67,13 @@ zmq::kqueue_t::kqueue_t (const zmq::thread_ctx_t &ctx_) :
zmq::kqueue_t::~kqueue_t () zmq::kqueue_t::~kqueue_t ()
{ {
worker.stop (); stop_worker ();
close (kqueue_fd); close (kqueue_fd);
} }
void zmq::kqueue_t::kevent_add (fd_t fd_, short filter_, void *udata_) void zmq::kqueue_t::kevent_add (fd_t fd_, short filter_, void *udata_)
{ {
check_thread ();
struct kevent ev; struct kevent ev;
EV_SET (&ev, fd_, filter_, EV_ADD, 0, 0, (kevent_udata_t) udata_); EV_SET (&ev, fd_, filter_, EV_ADD, 0, 0, (kevent_udata_t) udata_);
...@@ -93,6 +93,7 @@ void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_) ...@@ -93,6 +93,7 @@ void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_)
zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (fd_t fd_, zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (fd_t fd_,
i_poll_events *reactor_) i_poll_events *reactor_)
{ {
check_thread ();
poll_entry_t *pe = new (std::nothrow) poll_entry_t; poll_entry_t *pe = new (std::nothrow) poll_entry_t;
alloc_assert (pe); alloc_assert (pe);
...@@ -108,6 +109,7 @@ zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (fd_t fd_, ...@@ -108,6 +109,7 @@ zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (fd_t fd_,
void zmq::kqueue_t::rm_fd (handle_t handle_) void zmq::kqueue_t::rm_fd (handle_t handle_)
{ {
check_thread ();
poll_entry_t *pe = (poll_entry_t *) handle_; poll_entry_t *pe = (poll_entry_t *) handle_;
if (pe->flag_pollin) if (pe->flag_pollin)
kevent_delete (pe->fd, EVFILT_READ); kevent_delete (pe->fd, EVFILT_READ);
...@@ -121,6 +123,7 @@ void zmq::kqueue_t::rm_fd (handle_t handle_) ...@@ -121,6 +123,7 @@ void zmq::kqueue_t::rm_fd (handle_t handle_)
void zmq::kqueue_t::set_pollin (handle_t handle_) void zmq::kqueue_t::set_pollin (handle_t handle_)
{ {
check_thread ();
poll_entry_t *pe = (poll_entry_t *) handle_; poll_entry_t *pe = (poll_entry_t *) handle_;
if (likely (!pe->flag_pollin)) { if (likely (!pe->flag_pollin)) {
pe->flag_pollin = true; pe->flag_pollin = true;
...@@ -130,6 +133,7 @@ void zmq::kqueue_t::set_pollin (handle_t handle_) ...@@ -130,6 +133,7 @@ void zmq::kqueue_t::set_pollin (handle_t handle_)
void zmq::kqueue_t::reset_pollin (handle_t handle_) void zmq::kqueue_t::reset_pollin (handle_t handle_)
{ {
check_thread ();
poll_entry_t *pe = (poll_entry_t *) handle_; poll_entry_t *pe = (poll_entry_t *) handle_;
if (likely (pe->flag_pollin)) { if (likely (pe->flag_pollin)) {
pe->flag_pollin = false; pe->flag_pollin = false;
...@@ -139,6 +143,7 @@ void zmq::kqueue_t::reset_pollin (handle_t handle_) ...@@ -139,6 +143,7 @@ void zmq::kqueue_t::reset_pollin (handle_t handle_)
void zmq::kqueue_t::set_pollout (handle_t handle_) void zmq::kqueue_t::set_pollout (handle_t handle_)
{ {
check_thread ();
poll_entry_t *pe = (poll_entry_t *) handle_; poll_entry_t *pe = (poll_entry_t *) handle_;
if (likely (!pe->flag_pollout)) { if (likely (!pe->flag_pollout)) {
pe->flag_pollout = true; pe->flag_pollout = true;
...@@ -148,6 +153,7 @@ void zmq::kqueue_t::set_pollout (handle_t handle_) ...@@ -148,6 +153,7 @@ void zmq::kqueue_t::set_pollout (handle_t handle_)
void zmq::kqueue_t::reset_pollout (handle_t handle_) void zmq::kqueue_t::reset_pollout (handle_t handle_)
{ {
check_thread ();
poll_entry_t *pe = (poll_entry_t *) handle_; poll_entry_t *pe = (poll_entry_t *) handle_;
if (likely (pe->flag_pollout)) { if (likely (pe->flag_pollout)) {
pe->flag_pollout = false; pe->flag_pollout = false;
...@@ -155,14 +161,8 @@ void zmq::kqueue_t::reset_pollout (handle_t handle_) ...@@ -155,14 +161,8 @@ void zmq::kqueue_t::reset_pollout (handle_t handle_)
} }
} }
void zmq::kqueue_t::start ()
{
ctx.start_thread (worker, worker_routine, this);
}
void zmq::kqueue_t::stop () void zmq::kqueue_t::stop ()
{ {
stopping = true;
} }
int zmq::kqueue_t::max_fds () int zmq::kqueue_t::max_fds ()
...@@ -172,10 +172,18 @@ int zmq::kqueue_t::max_fds () ...@@ -172,10 +172,18 @@ int zmq::kqueue_t::max_fds ()
void zmq::kqueue_t::loop () void zmq::kqueue_t::loop ()
{ {
while (!stopping) { while (true) {
// Execute any due timers. // Execute any due timers.
int timeout = (int) execute_timers (); int timeout = (int) execute_timers ();
if (get_load () == 0) {
if (timeout == 0)
break;
// TODO sleep for timeout
continue;
}
// Wait for events. // Wait for events.
struct kevent ev_buf[max_io_events]; struct kevent ev_buf[max_io_events];
timespec ts = {timeout / 1000, (timeout % 1000) * 1000000}; timespec ts = {timeout / 1000, (timeout % 1000) * 1000000};
...@@ -219,9 +227,4 @@ void zmq::kqueue_t::loop () ...@@ -219,9 +227,4 @@ void zmq::kqueue_t::loop ()
} }
} }
void zmq::kqueue_t::worker_routine (void *arg_)
{
((kqueue_t *) arg_)->loop ();
}
#endif #endif
...@@ -49,7 +49,7 @@ struct i_poll_events; ...@@ -49,7 +49,7 @@ struct i_poll_events;
// Implements socket polling mechanism using the BSD-specific // Implements socket polling mechanism using the BSD-specific
// kqueue interface. // kqueue interface.
class kqueue_t : public poller_base_t class kqueue_t : public worker_poller_base_t
{ {
public: public:
typedef void *handle_t; typedef void *handle_t;
...@@ -64,21 +64,14 @@ class kqueue_t : public poller_base_t ...@@ -64,21 +64,14 @@ class kqueue_t : public poller_base_t
void reset_pollin (handle_t handle_); void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_); void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_); void reset_pollout (handle_t handle_);
void start ();
void stop (); void stop ();
static int max_fds (); static int max_fds ();
private: private:
// Main worker thread routine.
static void worker_routine (void *arg_);
// Main event loop. // Main event loop.
void loop (); void loop ();
// Reference to ZMQ context.
const thread_ctx_t &ctx;
// File descriptor referring to the kernel event queue. // File descriptor referring to the kernel event queue.
fd_t kqueue_fd; fd_t kqueue_fd;
...@@ -100,12 +93,6 @@ class kqueue_t : public poller_base_t ...@@ -100,12 +93,6 @@ class kqueue_t : public poller_base_t
typedef std::vector<poll_entry_t *> retired_t; typedef std::vector<poll_entry_t *> retired_t;
retired_t retired; retired_t retired;
// If true, thread is in the process of shutting down.
bool stopping;
// Handle of the physical thread doing the I/O work.
thread_t worker;
kqueue_t (const kqueue_t &); kqueue_t (const kqueue_t &);
const kqueue_t &operator= (const kqueue_t &); const kqueue_t &operator= (const kqueue_t &);
......
...@@ -40,22 +40,94 @@ namespace zmq ...@@ -40,22 +40,94 @@ namespace zmq
{ {
struct i_poll_events; struct i_poll_events;
// A build of libzmq must provide an implementation of the poller_t concept. By
// convention, this is done via a typedef.
//
// At the time of writing, the following implementations of the poller_t
// concept exist: zmq::devpoll_t, zmq::epoll_t, zmq::kqueue_t, zmq::poll_t,
// zmq::pollset_t, zmq::select_t
//
// An implementation of the poller_t concept must provide the following public
// methods:
// Returns load of the poller.
// int get_load() const;
//
// Add a timeout to expire in timeout_ milliseconds. After the
// expiration, timer_event on sink_ object will be called with
// argument set to id_.
// void add_timer(int timeout_, zmq::i_poll_events *sink_, int id_);
//
// Cancel the timer created by sink_ object with ID equal to id_.
// void cancel_timer(zmq::i_poll_events *sink_, int id_);
//
// Adds a fd to the poller. Initially, no events are activated. These must
// be activated by the set_* methods using the returned handle_.
// handle_t add_fd(fd_t fd_, zmq::i_poll_events *events_);
//
// Deactivates any events that may be active for the given handle_, and
// removes the fd associated with the given handle_.
// void rm_fd(handle_t handle_);
//
// The set_* and reset_* methods activate resp. deactivate polling for
// input/output readiness on the respective handle_, such that the
// in_event/out_event methods on the associated zmq::i_poll_events object
// will be called.
// Note: while handle_t and fd_t may be the same type, and may even have the
// same values for some implementation, this may not be assumed in general.
// The methods may only be called with the handle returned by add_fd.
// void set_pollin(handle_t handle_);
// void reset_pollin(handle_t handle_);
// void set_pollout(handle_t handle_);//
// void reset_pollout(handle_t handle_);
//
// Starts operation of the poller. See below for details.
// void start();
//
// Request termination of the poller.
// TODO: might be removed in the future, as it has no effect.
// void stop();
//
// Returns the maximum number of fds that can be added to an instance of the
// poller at the same time, or -1 if there is no such fixed limit.
// static int max_fds();
//
// Most of the methods may only be called from a zmq::i_poll_events callback
// function when invoked by the poller (and, therefore, typically from the
// poller's worker thread), with the following exceptions:
// - get_load may be called from outside
// - add_fd and add_timer may be called from outside before start
// - start may be called from outside once
//
// After a poller is started, it waits for the registered events (input/output
// readiness, timeout) to happen, and calls the respective functions on the
// zmq::i_poll_events object. It terminates when no further registrations (fds
// or timers) exist.
//
// Before start, add_fd must have been called at least once. Behavior may be
// undefined otherwise.
//
// If the poller is implemented by a single worker thread (the
// worker_poller_base_t base class may be used to implement such a poller),
// no synchronization is required for the data structures modified by
// add_fd, rm_fd, add_timer, cancel_timer, (re)set_poll(in|out). However,
// reentrancy must be considered, e.g. when one of the functions modifies
// a container that is being iterated by the poller.
// A class that can be used as a base class for implementations of the poller
// concept.
//
// For documentation of the public methods, see the description of the poller_t
// concept.
class poller_base_t class poller_base_t
{ {
public: public:
poller_base_t (); poller_base_t ();
virtual ~poller_base_t (); virtual ~poller_base_t ();
// Returns load of the poller. Note that this function can be // Methods from the poller concept.
// invoked from a different thread!
int get_load () const; int get_load () const;
// Add a timeout to expire in timeout_ milliseconds. After the
// expiration timer_event on sink_ object will be called with
// argument set to id_.
void add_timer (int timeout_, zmq::i_poll_events *sink_, int id_); void add_timer (int timeout_, zmq::i_poll_events *sink_, int id_);
// Cancel the timer created by sink_ object with ID equal to id_.
void cancel_timer (zmq::i_poll_events *sink_, int id_); void cancel_timer (zmq::i_poll_events *sink_, int id_);
protected: protected:
...@@ -87,19 +159,26 @@ class poller_base_t ...@@ -87,19 +159,26 @@ class poller_base_t
const poller_base_t &operator= (const poller_base_t &); const poller_base_t &operator= (const poller_base_t &);
}; };
// base class for a poller with a single worker thread. // Base class for a poller with a single worker thread.
class worker_poller_base_t : public poller_base_t class worker_poller_base_t : public poller_base_t
{ {
public: public:
worker_poller_base_t (const thread_ctx_t &ctx_); worker_poller_base_t (const thread_ctx_t &ctx_);
void stop_worker ();
// Starts the poller. // Methods from the poller concept.
void start (); void start ();
protected: protected:
// Checks whether the currently executing thread is the worker thread
// via an assertion.
// Should be called by the add_fd, removed_fd, set_*, reset_* functions
// to ensure correct usage.
void check_thread (); void check_thread ();
// Stops the worker thread. Should be called from the destructor of the
// leaf class.
void stop_worker ();
private: private:
// Main worker thread routine. // Main worker thread routine.
static void worker_routine (void *arg_); static void worker_routine (void *arg_);
......
...@@ -118,7 +118,7 @@ void zmq::select_t::trigger_events (const fd_entries_t &fd_entries_, ...@@ -118,7 +118,7 @@ void zmq::select_t::trigger_events (const fd_entries_t &fd_entries_,
// Size is cached to avoid iteration through recently added descriptors. // Size is cached to avoid iteration through recently added descriptors.
for (fd_entries_t::size_type i = 0, size = fd_entries_.size (); for (fd_entries_t::size_type i = 0, size = fd_entries_.size ();
i < size && event_count_ > 0; ++i) { i < size && event_count_ > 0; ++i) {
// fd_entries_[i] may not be stored, since calls to // fd_entries_[i] may not be stored, since calls to
// in_event/out_event may reallocate the vector // in_event/out_event may reallocate the vector
if (is_retired_fd (fd_entries_[i])) if (is_retired_fd (fd_entries_[i]))
......
...@@ -67,10 +67,6 @@ ...@@ -67,10 +67,6 @@
#include "ip.hpp" #include "ip.hpp"
#include "tcp.hpp" #include "tcp.hpp"
#if defined ZMQ_HAVE_EVENTFD
#include <sys/eventfd.h>
#endif
#if !defined ZMQ_HAVE_WINDOWS #if !defined ZMQ_HAVE_WINDOWS
#include <unistd.h> #include <unistd.h>
#include <netinet/tcp.h> #include <netinet/tcp.h>
...@@ -384,313 +380,3 @@ void zmq::signaler_t::forked () ...@@ -384,313 +380,3 @@ void zmq::signaler_t::forked ()
make_fdpair (&r, &w); make_fdpair (&r, &w);
} }
#endif #endif
#if defined ZMQ_HAVE_WINDOWS
static void tune_socket (const SOCKET socket)
{
BOOL tcp_nodelay = 1;
int rc = setsockopt (socket, IPPROTO_TCP, TCP_NODELAY,
(char *) &tcp_nodelay, sizeof tcp_nodelay);
wsa_assert (rc != SOCKET_ERROR);
zmq::tcp_tune_loopback_fast_path (socket);
}
#endif
// Returns -1 if we could not make the socket pair successfully
int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
{
#if defined ZMQ_HAVE_EVENTFD
int flags = 0;
#if defined ZMQ_HAVE_EVENTFD_CLOEXEC
// Setting this option result in sane behaviour when exec() functions
// are used. Old sockets are closed and don't block TCP ports, avoid
// leaks, etc.
flags |= EFD_CLOEXEC;
#endif
fd_t fd = eventfd (0, flags);
if (fd == -1) {
errno_assert (errno == ENFILE || errno == EMFILE);
*w_ = *r_ = -1;
return -1;
} else {
*w_ = *r_ = fd;
return 0;
}
#elif defined ZMQ_HAVE_WINDOWS
#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
// Windows CE does not manage security attributes
SECURITY_DESCRIPTOR sd;
SECURITY_ATTRIBUTES sa;
memset (&sd, 0, sizeof sd);
memset (&sa, 0, sizeof sa);
InitializeSecurityDescriptor (&sd, SECURITY_DESCRIPTOR_REVISION);
SetSecurityDescriptorDacl (&sd, TRUE, 0, FALSE);
sa.nLength = sizeof (SECURITY_ATTRIBUTES);
sa.lpSecurityDescriptor = &sd;
#endif
// This function has to be in a system-wide critical section so that
// two instances of the library don't accidentally create signaler
// crossing the process boundary.
// We'll use named event object to implement the critical section.
// Note that if the event object already exists, the CreateEvent requests
// EVENT_ALL_ACCESS access right. If this fails, we try to open
// the event object asking for SYNCHRONIZE access only.
HANDLE sync = NULL;
// Create critical section only if using fixed signaler port
// Use problematic Event implementation for compatibility if using old port 5905.
// Otherwise use Mutex implementation.
int event_signaler_port = 5905;
if (signaler_port == event_signaler_port) {
#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
sync =
CreateEventW (&sa, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
#else
sync =
CreateEventW (NULL, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
#endif
if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
sync = OpenEventW (SYNCHRONIZE | EVENT_MODIFY_STATE, FALSE,
L"Global\\zmq-signaler-port-sync");
win_assert (sync != NULL);
} else if (signaler_port != 0) {
wchar_t mutex_name[MAX_PATH];
#ifdef __MINGW32__
_snwprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d",
signaler_port);
#else
swprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d",
signaler_port);
#endif
#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
sync = CreateMutexW (&sa, FALSE, mutex_name);
#else
sync = CreateMutexW (NULL, FALSE, mutex_name);
#endif
if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
sync = OpenMutexW (SYNCHRONIZE, FALSE, mutex_name);
win_assert (sync != NULL);
}
// Windows has no 'socketpair' function. CreatePipe is no good as pipe
// handles cannot be polled on. Here we create the socketpair by hand.
*w_ = INVALID_SOCKET;
*r_ = INVALID_SOCKET;
// Create listening socket.
SOCKET listener;
listener = open_socket (AF_INET, SOCK_STREAM, 0);
wsa_assert (listener != INVALID_SOCKET);
// Set SO_REUSEADDR and TCP_NODELAY on listening socket.
BOOL so_reuseaddr = 1;
int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
(char *) &so_reuseaddr, sizeof so_reuseaddr);
wsa_assert (rc != SOCKET_ERROR);
tune_socket (listener);
// Init sockaddr to signaler port.
struct sockaddr_in addr;
memset (&addr, 0, sizeof addr);
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
addr.sin_port = htons (signaler_port);
// Create the writer socket.
*w_ = open_socket (AF_INET, SOCK_STREAM, 0);
wsa_assert (*w_ != INVALID_SOCKET);
// Set TCP_NODELAY on writer socket.
tune_socket (*w_);
if (sync != NULL) {
// Enter the critical section.
DWORD dwrc = WaitForSingleObject (sync, INFINITE);
zmq_assert (dwrc == WAIT_OBJECT_0 || dwrc == WAIT_ABANDONED);
}
// Bind listening socket to signaler port.
rc = bind (listener, (const struct sockaddr *) &addr, sizeof addr);
if (rc != SOCKET_ERROR && signaler_port == 0) {
// Retrieve ephemeral port number
int addrlen = sizeof addr;
rc = getsockname (listener, (struct sockaddr *) &addr, &addrlen);
}
// Listen for incoming connections.
if (rc != SOCKET_ERROR)
rc = listen (listener, 1);
// Connect writer to the listener.
if (rc != SOCKET_ERROR)
rc = connect (*w_, (struct sockaddr *) &addr, sizeof addr);
// Accept connection from writer.
if (rc != SOCKET_ERROR)
*r_ = accept (listener, NULL, NULL);
// Send/receive large chunk to work around TCP slow start
// This code is a workaround for #1608
if (*r_ != INVALID_SOCKET) {
size_t dummy_size =
1024 * 1024; // 1M to overload default receive buffer
unsigned char *dummy = (unsigned char *) malloc (dummy_size);
wsa_assert (dummy);
int still_to_send = (int) dummy_size;
int still_to_recv = (int) dummy_size;
while (still_to_send || still_to_recv) {
int nbytes;
if (still_to_send > 0) {
nbytes =
::send (*w_, (char *) (dummy + dummy_size - still_to_send),
still_to_send, 0);
wsa_assert (nbytes != SOCKET_ERROR);
still_to_send -= nbytes;
}
nbytes = ::recv (*r_, (char *) (dummy + dummy_size - still_to_recv),
still_to_recv, 0);
wsa_assert (nbytes != SOCKET_ERROR);
still_to_recv -= nbytes;
}
free (dummy);
}
// Save errno if error occurred in bind/listen/connect/accept.
int saved_errno = 0;
if (*r_ == INVALID_SOCKET)
saved_errno = WSAGetLastError ();
// We don't need the listening socket anymore. Close it.
rc = closesocket (listener);
wsa_assert (rc != SOCKET_ERROR);
if (sync != NULL) {
// Exit the critical section.
BOOL brc;
if (signaler_port == event_signaler_port)
brc = SetEvent (sync);
else
brc = ReleaseMutex (sync);
win_assert (brc != 0);
// Release the kernel object
brc = CloseHandle (sync);
win_assert (brc != 0);
}
if (*r_ != INVALID_SOCKET) {
#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
// On Windows, preventing sockets to be inherited by child processes.
BOOL brc = SetHandleInformation ((HANDLE) *r_, HANDLE_FLAG_INHERIT, 0);
win_assert (brc);
#endif
return 0;
} else {
// Cleanup writer if connection failed
if (*w_ != INVALID_SOCKET) {
rc = closesocket (*w_);
wsa_assert (rc != SOCKET_ERROR);
*w_ = INVALID_SOCKET;
}
// Set errno from saved value
errno = wsa_error_to_errno (saved_errno);
return -1;
}
#elif defined ZMQ_HAVE_OPENVMS
// Whilst OpenVMS supports socketpair - it maps to AF_INET only. Further,
// it does not set the socket options TCP_NODELAY and TCP_NODELACK which
// can lead to performance problems.
//
// The bug will be fixed in V5.6 ECO4 and beyond. In the meantime, we'll
// create the socket pair manually.
struct sockaddr_in lcladdr;
memset (&lcladdr, 0, sizeof lcladdr);
lcladdr.sin_family = AF_INET;
lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
lcladdr.sin_port = 0;
int listener = open_socket (AF_INET, SOCK_STREAM, 0);
errno_assert (listener != -1);
int on = 1;
int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
errno_assert (rc != -1);
rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
errno_assert (rc != -1);
rc = bind (listener, (struct sockaddr *) &lcladdr, sizeof lcladdr);
errno_assert (rc != -1);
socklen_t lcladdr_len = sizeof lcladdr;
rc = getsockname (listener, (struct sockaddr *) &lcladdr, &lcladdr_len);
errno_assert (rc != -1);
rc = listen (listener, 1);
errno_assert (rc != -1);
*w_ = open_socket (AF_INET, SOCK_STREAM, 0);
errno_assert (*w_ != -1);
rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
errno_assert (rc != -1);
rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
errno_assert (rc != -1);
rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr);
errno_assert (rc != -1);
*r_ = accept (listener, NULL, NULL);
errno_assert (*r_ != -1);
close (listener);
return 0;
#else
// All other implementations support socketpair()
int sv[2];
int type = SOCK_STREAM;
// Setting this option result in sane behaviour when exec() functions
// are used. Old sockets are closed and don't block TCP ports, avoid
// leaks, etc.
#if defined ZMQ_HAVE_SOCK_CLOEXEC
type |= SOCK_CLOEXEC;
#endif
int rc = socketpair (AF_UNIX, type, 0, sv);
if (rc == -1) {
errno_assert (errno == ENFILE || errno == EMFILE);
*w_ = *r_ = -1;
return -1;
} else {
// If there's no SOCK_CLOEXEC, let's try the second best option. Note that
// race condition can cause socket not to be closed (if fork happens
// between socket creation and this point).
#if !defined ZMQ_HAVE_SOCK_CLOEXEC && defined FD_CLOEXEC
rc = fcntl (sv[0], F_SETFD, FD_CLOEXEC);
errno_assert (rc != -1);
rc = fcntl (sv[1], F_SETFD, FD_CLOEXEC);
errno_assert (rc != -1);
#endif
*w_ = sv[0];
*r_ = sv[1];
return 0;
}
#endif
}
...@@ -66,10 +66,6 @@ class signaler_t ...@@ -66,10 +66,6 @@ class signaler_t
#endif #endif
private: private:
// Creates a pair of file descriptors that will be used
// to pass the signals.
static int make_fdpair (fd_t *r_, fd_t *w_);
// Underlying write & read file descriptor // Underlying write & read file descriptor
// Will be -1 if an error occurred during initialization, e.g. we // Will be -1 if an error occurred during initialization, e.g. we
// exceeded the number of available handles // exceeded the number of available handles
......
...@@ -108,7 +108,7 @@ class tcp_connecter_t : public own_t, public io_object_t ...@@ -108,7 +108,7 @@ class tcp_connecter_t : public own_t, public io_object_t
// Underlying socket. // Underlying socket.
fd_t s; fd_t s;
// Handle corresponding to the listening socket, if file descriptor is // Handle corresponding to the listening socket, if file descriptor is
// registered with the poller, or NULL. // registered with the poller, or NULL.
handle_t handle; handle_t handle;
......
...@@ -87,6 +87,7 @@ struct iovec ...@@ -87,6 +87,7 @@ struct iovec
#include "signaler.hpp" #include "signaler.hpp"
#include "socket_poller.hpp" #include "socket_poller.hpp"
#include "timers.hpp" #include "timers.hpp"
#include "ip.hpp"
#if defined ZMQ_HAVE_OPENPGM #if defined ZMQ_HAVE_OPENPGM
#define __PGM_WININT_H__ #define __PGM_WININT_H__
...@@ -121,42 +122,11 @@ int zmq_errno (void) ...@@ -121,42 +122,11 @@ int zmq_errno (void)
void *zmq_ctx_new (void) void *zmq_ctx_new (void)
{ {
#if defined ZMQ_HAVE_OPENPGM
// Init PGM transport. Ensure threading and timer are enabled. Find PGM
// protocol ID. Note that if you want to use gettimeofday and sleep for
// openPGM timing, set environment variables PGM_TIMER to "GTOD" and
// PGM_SLEEP to "USLEEP".
pgm_error_t *pgm_error = NULL;
const bool ok = pgm_init (&pgm_error);
if (ok != TRUE) {
// Invalid parameters don't set pgm_error_t
zmq_assert (pgm_error != NULL);
if (pgm_error->domain == PGM_ERROR_DOMAIN_TIME
&& (pgm_error->code == PGM_ERROR_FAILED)) {
// Failed to access RTC or HPET device.
pgm_error_free (pgm_error);
errno = EINVAL;
return NULL;
}
// PGM_ERROR_DOMAIN_ENGINE: WSAStartup errors or missing WSARecvMsg.
zmq_assert (false);
}
#endif
#ifdef ZMQ_HAVE_WINDOWS
// Intialise Windows sockets. Note that WSAStartup can be called multiple
// times given that WSACleanup will be called for each WSAStartup.
// We do this before the ctx constructor since its embedded mailbox_t // We do this before the ctx constructor since its embedded mailbox_t
// object needs Winsock to be up and running. // object needs the network to be up and running (at least on Windows).
WORD version_requested = MAKEWORD (2, 2); if (!zmq::initialize_network ()) {
WSADATA wsa_data; return NULL;
int rc = WSAStartup (version_requested, &wsa_data); }
zmq_assert (rc == 0);
zmq_assert (LOBYTE (wsa_data.wVersion) == 2
&& HIBYTE (wsa_data.wVersion) == 2);
#endif
// Create 0MQ context. // Create 0MQ context.
zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t; zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t;
...@@ -181,17 +151,7 @@ int zmq_ctx_term (void *ctx_) ...@@ -181,17 +151,7 @@ int zmq_ctx_term (void *ctx_)
// Shut down only if termination was not interrupted by a signal. // Shut down only if termination was not interrupted by a signal.
if (!rc || en != EINTR) { if (!rc || en != EINTR) {
#ifdef ZMQ_HAVE_WINDOWS zmq::shutdown_network ();
// On Windows, uninitialise socket layer.
rc = WSACleanup ();
wsa_assert (rc != SOCKET_ERROR);
#endif
#if defined ZMQ_HAVE_OPENPGM
// Shut down the OpenPGM library.
if (pgm_shutdown () != TRUE)
zmq_assert (false);
#endif
} }
errno = en; errno = en;
...@@ -722,7 +682,7 @@ const char *zmq_msg_gets (const zmq_msg_t *msg_, const char *property_) ...@@ -722,7 +682,7 @@ const char *zmq_msg_gets (const zmq_msg_t *msg_, const char *property_)
} }
} }
// Polling. // Polling.
#if defined ZMQ_HAVE_POLLER #if defined ZMQ_HAVE_POLLER
inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
......
...@@ -208,6 +208,11 @@ if(ZMQ_HAVE_CURVE) ...@@ -208,6 +208,11 @@ if(ZMQ_HAVE_CURVE)
set_tests_properties(test_security_curve PROPERTIES TIMEOUT 60) set_tests_properties(test_security_curve PROPERTIES TIMEOUT 60)
endif() endif()
if(WIN32 AND ${POLLER} MATCHES "poll")
set_tests_properties(test_many_sockets PROPERTIES TIMEOUT 30)
set_tests_properties(test_immediate PROPERTIES TIMEOUT 30)
endif()
#add additional required flags #add additional required flags
#ZMQ_USE_TWEETNACL will already be defined when not using sodium #ZMQ_USE_TWEETNACL will already be defined when not using sodium
if(ZMQ_HAVE_CURVE AND NOT ZMQ_USE_TWEETNACL) if(ZMQ_HAVE_CURVE AND NOT ZMQ_USE_TWEETNACL)
......
...@@ -29,6 +29,16 @@ ...@@ -29,6 +29,16 @@
#include "testutil.hpp" #include "testutil.hpp"
#include <unity.h>
void setUp ()
{
}
void tearDown ()
{
}
const int MAX_SENDS = 10000; const int MAX_SENDS = 10000;
void test_change_before_connected () void test_change_before_connected ()
...@@ -41,9 +51,9 @@ void test_change_before_connected () ...@@ -41,9 +51,9 @@ void test_change_before_connected ()
int val = 2; int val = 2;
rc = zmq_setsockopt (connect_socket, ZMQ_RCVHWM, &val, sizeof (val)); rc = zmq_setsockopt (connect_socket, ZMQ_RCVHWM, &val, sizeof (val));
assert (rc == 0); TEST_ASSERT_EQUAL_INT (0, rc);
rc = zmq_setsockopt (bind_socket, ZMQ_SNDHWM, &val, sizeof (val)); rc = zmq_setsockopt (bind_socket, ZMQ_SNDHWM, &val, sizeof (val));
assert (rc == 0); TEST_ASSERT_EQUAL_INT (0, rc);
zmq_connect (connect_socket, "inproc://a"); zmq_connect (connect_socket, "inproc://a");
zmq_bind (bind_socket, "inproc://a"); zmq_bind (bind_socket, "inproc://a");
...@@ -51,15 +61,15 @@ void test_change_before_connected () ...@@ -51,15 +61,15 @@ void test_change_before_connected ()
size_t placeholder = sizeof (val); size_t placeholder = sizeof (val);
val = 0; val = 0;
rc = zmq_getsockopt (bind_socket, ZMQ_SNDHWM, &val, &placeholder); rc = zmq_getsockopt (bind_socket, ZMQ_SNDHWM, &val, &placeholder);
assert (rc == 0); TEST_ASSERT_EQUAL_INT (0, rc);
assert (val == 2); TEST_ASSERT_EQUAL_INT (2, val);
int send_count = 0; int send_count = 0;
while (send_count < MAX_SENDS while (send_count < MAX_SENDS
&& zmq_send (bind_socket, NULL, 0, ZMQ_DONTWAIT) == 0) && zmq_send (bind_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
++send_count; ++send_count;
assert (send_count == 4); TEST_ASSERT_EQUAL_INT (4, send_count);
zmq_close (bind_socket); zmq_close (bind_socket);
zmq_close (connect_socket); zmq_close (connect_socket);
...@@ -76,29 +86,29 @@ void test_change_after_connected () ...@@ -76,29 +86,29 @@ void test_change_after_connected ()
int val = 1; int val = 1;
rc = zmq_setsockopt (connect_socket, ZMQ_RCVHWM, &val, sizeof (val)); rc = zmq_setsockopt (connect_socket, ZMQ_RCVHWM, &val, sizeof (val));
assert (rc == 0); TEST_ASSERT_EQUAL_INT (0, rc);
rc = zmq_setsockopt (bind_socket, ZMQ_SNDHWM, &val, sizeof (val)); rc = zmq_setsockopt (bind_socket, ZMQ_SNDHWM, &val, sizeof (val));
assert (rc == 0); TEST_ASSERT_EQUAL_INT (0, rc);
zmq_connect (connect_socket, "inproc://a"); zmq_connect (connect_socket, "inproc://a");
zmq_bind (bind_socket, "inproc://a"); zmq_bind (bind_socket, "inproc://a");
val = 5; val = 5;
rc = zmq_setsockopt (bind_socket, ZMQ_SNDHWM, &val, sizeof (val)); rc = zmq_setsockopt (bind_socket, ZMQ_SNDHWM, &val, sizeof (val));
assert (rc == 0); TEST_ASSERT_EQUAL_INT (0, rc);
size_t placeholder = sizeof (val); size_t placeholder = sizeof (val);
val = 0; val = 0;
rc = zmq_getsockopt (bind_socket, ZMQ_SNDHWM, &val, &placeholder); rc = zmq_getsockopt (bind_socket, ZMQ_SNDHWM, &val, &placeholder);
assert (rc == 0); TEST_ASSERT_EQUAL_INT (0, rc);
assert (val == 5); TEST_ASSERT_EQUAL_INT (5, val);
int send_count = 0; int send_count = 0;
while (send_count < MAX_SENDS while (send_count < MAX_SENDS
&& zmq_send (bind_socket, NULL, 0, ZMQ_DONTWAIT) == 0) && zmq_send (bind_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
++send_count; ++send_count;
assert (send_count == 6); TEST_ASSERT_EQUAL_INT (6, send_count);
zmq_close (bind_socket); zmq_close (bind_socket);
zmq_close (connect_socket); zmq_close (connect_socket);
...@@ -120,7 +130,8 @@ int test_fill_up_to_hwm (void *socket, int sndhwm) ...@@ -120,7 +130,8 @@ int test_fill_up_to_hwm (void *socket, int sndhwm)
{ {
int send_count = send_until_wouldblock (socket); int send_count = send_until_wouldblock (socket);
fprintf (stderr, "sndhwm==%i, send_count==%i\n", sndhwm, send_count); fprintf (stderr, "sndhwm==%i, send_count==%i\n", sndhwm, send_count);
assert (send_count <= sndhwm + 1 && send_count > (sndhwm / 10)); TEST_ASSERT_LESS_OR_EQUAL_INT (sndhwm + 1, send_count);
TEST_ASSERT_GREATER_THAN_INT (sndhwm / 10, send_count);
return send_count; return send_count;
} }
...@@ -134,11 +145,11 @@ void test_decrease_when_full () ...@@ -134,11 +145,11 @@ void test_decrease_when_full ()
int val = 1; int val = 1;
rc = zmq_setsockopt (connect_socket, ZMQ_RCVHWM, &val, sizeof (val)); rc = zmq_setsockopt (connect_socket, ZMQ_RCVHWM, &val, sizeof (val));
assert (rc == 0); TEST_ASSERT_EQUAL_INT (0, rc);
int sndhwm = 100; int sndhwm = 100;
rc = zmq_setsockopt (bind_socket, ZMQ_SNDHWM, &sndhwm, sizeof (sndhwm)); rc = zmq_setsockopt (bind_socket, ZMQ_SNDHWM, &sndhwm, sizeof (sndhwm));
assert (rc == 0); TEST_ASSERT_EQUAL_INT (0, rc);
zmq_bind (bind_socket, "inproc://a"); zmq_bind (bind_socket, "inproc://a");
zmq_connect (connect_socket, "inproc://a"); zmq_connect (connect_socket, "inproc://a");
...@@ -149,14 +160,14 @@ void test_decrease_when_full () ...@@ -149,14 +160,14 @@ void test_decrease_when_full ()
// Decrease snd hwm // Decrease snd hwm
sndhwm = 70; sndhwm = 70;
rc = zmq_setsockopt (bind_socket, ZMQ_SNDHWM, &sndhwm, sizeof (sndhwm)); rc = zmq_setsockopt (bind_socket, ZMQ_SNDHWM, &sndhwm, sizeof (sndhwm));
assert (rc == 0); TEST_ASSERT_EQUAL_INT (0, rc);
int sndhwm_read = 0; int sndhwm_read = 0;
size_t sndhwm_read_size = sizeof (sndhwm_read); size_t sndhwm_read_size = sizeof (sndhwm_read);
rc = rc =
zmq_getsockopt (bind_socket, ZMQ_SNDHWM, &sndhwm_read, &sndhwm_read_size); zmq_getsockopt (bind_socket, ZMQ_SNDHWM, &sndhwm_read, &sndhwm_read_size);
assert (rc == 0); TEST_ASSERT_EQUAL_INT (0, rc);
assert (sndhwm_read == sndhwm); TEST_ASSERT_EQUAL_INT (sndhwm, sndhwm_read);
msleep (SETTLE_TIME); msleep (SETTLE_TIME);
...@@ -167,11 +178,11 @@ void test_decrease_when_full () ...@@ -167,11 +178,11 @@ void test_decrease_when_full ()
read_count < MAX_SENDS read_count < MAX_SENDS
&& zmq_recv (connect_socket, &read_data, sizeof (read_data), ZMQ_DONTWAIT) && zmq_recv (connect_socket, &read_data, sizeof (read_data), ZMQ_DONTWAIT)
== sizeof (read_data)) { == sizeof (read_data)) {
assert (read_count == read_data); TEST_ASSERT_EQUAL_INT (read_data, read_count);
++read_count; ++read_count;
} }
assert (read_count == send_count); TEST_ASSERT_EQUAL_INT (send_count, read_count);
// Give io thread some time to catch up // Give io thread some time to catch up
msleep (SETTLE_TIME); msleep (SETTLE_TIME);
...@@ -187,7 +198,12 @@ void test_decrease_when_full () ...@@ -187,7 +198,12 @@ void test_decrease_when_full ()
int main () int main ()
{ {
test_change_before_connected (); setup_test_environment ();
test_change_after_connected ();
test_decrease_when_full (); UNITY_BEGIN ();
RUN_TEST (test_change_before_connected);
RUN_TEST (test_change_after_connected);
RUN_TEST (test_decrease_when_full);
return UNITY_END ();
} }
...@@ -20,9 +20,15 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. ...@@ -20,9 +20,15 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
#include "../tests/testutil.hpp" #include "../tests/testutil.hpp"
#include <poller.hpp> #include <poller.hpp>
#include <i_poll_events.hpp>
#include <ip.hpp>
#include <unity.h> #include <unity.h>
#ifndef _WIN32
#define closesocket close
#endif
void setUp () void setUp ()
{ {
} }
...@@ -36,12 +42,177 @@ void test_create () ...@@ -36,12 +42,177 @@ void test_create ()
zmq::poller_t poller (thread_ctx); zmq::poller_t poller (thread_ctx);
} }
#if 0
// TODO this triggers an assertion. should it be a valid use case?
void test_start_empty ()
{
zmq::thread_ctx_t thread_ctx;
zmq::poller_t poller (thread_ctx);
poller.start ();
msleep (SETTLE_TIME);
}
#endif
struct test_events_t : zmq::i_poll_events
{
test_events_t (zmq::fd_t fd_, zmq::poller_t &poller_) :
fd (fd_),
poller (poller_)
{
}
virtual void in_event ()
{
in_events.add (1);
poller.rm_fd (handle);
handle = (zmq::poller_t::handle_t) NULL;
}
virtual void out_event ()
{
// TODO
}
virtual void timer_event (int id_)
{
LIBZMQ_UNUSED (id_);
timer_events.add (1);
poller.rm_fd (handle);
handle = (zmq::poller_t::handle_t) NULL;
}
void set_handle (zmq::poller_t::handle_t handle_) { handle = handle_; }
zmq::atomic_counter_t in_events, timer_events;
private:
zmq::fd_t fd;
zmq::poller_t &poller;
zmq::poller_t::handle_t handle;
};
void wait_in_events (test_events_t &events)
{
void *watch = zmq_stopwatch_start ();
while (events.in_events.get () < 1) {
#ifdef ZMQ_BUILD_DRAFT
TEST_ASSERT_LESS_OR_EQUAL_MESSAGE (SETTLE_TIME,
zmq_stopwatch_intermediate (watch),
"Timeout waiting for in event");
#endif
}
zmq_stopwatch_stop (watch);
}
void wait_timer_events (test_events_t &events)
{
void *watch = zmq_stopwatch_start ();
while (events.timer_events.get () < 1) {
#ifdef ZMQ_BUILD_DRAFT
TEST_ASSERT_LESS_OR_EQUAL_MESSAGE (SETTLE_TIME,
zmq_stopwatch_intermediate (watch),
"Timeout waiting for timer event");
#endif
}
zmq_stopwatch_stop (watch);
}
void create_nonblocking_fdpair (zmq::fd_t *r, zmq::fd_t *w)
{
int rc = zmq::make_fdpair (r, w);
TEST_ASSERT_EQUAL_INT (0, rc);
TEST_ASSERT_NOT_EQUAL (zmq::retired_fd, *r);
TEST_ASSERT_NOT_EQUAL (zmq::retired_fd, *w);
zmq::unblock_socket (*r);
zmq::unblock_socket (*w);
}
void send_signal (zmq::fd_t w)
{
#if defined ZMQ_HAVE_EVENTFD
const uint64_t inc = 1;
ssize_t sz = write (w, &inc, sizeof (inc));
assert (sz == sizeof (inc));
#else
{
char msg[] = "test";
int rc = send (w, msg, sizeof (msg), 0);
assert (rc == sizeof (msg));
}
#endif
}
void close_fdpair (zmq::fd_t w, zmq::fd_t r)
{
int rc = closesocket (w);
TEST_ASSERT_EQUAL_INT (0, rc);
#if !defined ZMQ_HAVE_EVENTFD
rc = closesocket (r);
TEST_ASSERT_EQUAL_INT (0, rc);
#else
LIBZMQ_UNUSED (r);
#endif
}
void test_add_fd_and_start_and_receive_data ()
{
zmq::thread_ctx_t thread_ctx;
zmq::poller_t poller (thread_ctx);
zmq::fd_t r, w;
create_nonblocking_fdpair (&r, &w);
test_events_t events (r, poller);
zmq::poller_t::handle_t handle = poller.add_fd (r, &events);
events.set_handle (handle);
poller.set_pollin (handle);
poller.start ();
send_signal (w);
wait_in_events (events);
// required cleanup
close_fdpair (w, r);
}
void test_add_fd_and_remove_by_timer ()
{
zmq::fd_t r, w;
create_nonblocking_fdpair (&r, &w);
zmq::thread_ctx_t thread_ctx;
zmq::poller_t poller (thread_ctx);
test_events_t events (r, poller);
zmq::poller_t::handle_t handle = poller.add_fd (r, &events);
events.set_handle (handle);
poller.add_timer (50, &events, 0);
poller.start ();
wait_timer_events (events);
// required cleanup
close_fdpair (w, r);
}
int main (void) int main (void)
{ {
UNITY_BEGIN ();
zmq::initialize_network ();
setup_test_environment (); setup_test_environment ();
UNITY_BEGIN ();
RUN_TEST (test_create); RUN_TEST (test_create);
RUN_TEST (test_add_fd_and_start_and_receive_data);
RUN_TEST (test_add_fd_and_remove_by_timer);
zmq::shutdown_network ();
return UNITY_END (); return UNITY_END ();
} }
...@@ -35,12 +35,53 @@ void test_create () ...@@ -35,12 +35,53 @@ void test_create ()
zmq::ypipe_t<int, 1> ypipe; zmq::ypipe_t<int, 1> ypipe;
} }
void test_check_read_empty ()
{
zmq::ypipe_t<int, 1> ypipe;
TEST_ASSERT_FALSE (ypipe.check_read ());
}
void test_read_empty ()
{
zmq::ypipe_t<int, 1> ypipe;
int read_value = -1;
TEST_ASSERT_FALSE (ypipe.read (&read_value));
TEST_ASSERT_EQUAL (-1, read_value);
}
void test_write_complete_and_check_read_and_read ()
{
const int value = 42;
zmq::ypipe_t<int, 1> ypipe;
ypipe.write (value, false);
TEST_ASSERT_FALSE (ypipe.check_read ());
int read_value = -1;
TEST_ASSERT_FALSE (ypipe.read (&read_value));
TEST_ASSERT_EQUAL_INT (-1, read_value);
}
void test_write_complete_and_flush_and_check_read_and_read ()
{
const int value = 42;
zmq::ypipe_t<int, 1> ypipe;
ypipe.write (value, false);
ypipe.flush ();
TEST_ASSERT_TRUE (ypipe.check_read ());
int read_value = -1;
TEST_ASSERT_TRUE (ypipe.read (&read_value));
TEST_ASSERT_EQUAL_INT (value, read_value);
}
int main (void) int main (void)
{ {
setup_test_environment (); setup_test_environment ();
UNITY_BEGIN (); UNITY_BEGIN ();
RUN_TEST (test_create); RUN_TEST (test_create);
RUN_TEST (test_check_read_empty);
RUN_TEST (test_read_empty);
RUN_TEST (test_write_complete_and_check_read_and_read);
RUN_TEST (test_write_complete_and_flush_and_check_read_and_read);
return UNITY_END (); return UNITY_END ();
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment