Unverified Commit e2a4d770 authored by Luca Boccassi's avatar Luca Boccassi Committed by GitHub

Merge pull request #3150 from sigiesec/reduce-duplication

Resolve huge stack size requirements problem with select on Windows
parents 89e5f15a 17df28d2
......@@ -624,6 +624,7 @@ set (cxx-sources
plain_server.cpp
poll.cpp
poller_base.cpp
polling_util.cpp
pollset.cpp
proxy.cpp
pub.cpp
......@@ -746,6 +747,7 @@ set (cxx-sources
poll.hpp
poller.hpp
poller_base.hpp
polling_util.hpp
pollset.hpp
precompiled.hpp
proxy.hpp
......
......@@ -48,14 +48,7 @@ cmake -H. -B<build dir> -G"Visual Studio 14 2015 Win64" \
In VS 2012 it is mandatory to increase the default stack size of 1 MB to
at least 2 MB due to implementation of std::map intermittently requiring
substantial amount of stack and causing stack overflow. ZeroMQ generally
needs more stack when FD_SETSIZE is higher.
In all Windows builds it is recommended to start with at least 2 MB stack
size unless application using ZeroMQ is using large number of threads which
can cause substantial consumption of virtual address space, especially if
32 bit build is used.
Generally, programmer needs to tune the stack to balance memory consumption
but never get into situation that stack is overflown.
substantial amount of stack and causing stack overflow.
Windows Builds - Static
=======================
......
......@@ -143,6 +143,8 @@ src_libzmq_la_SOURCES = \
src/poller.hpp \
src/poller_base.cpp \
src/poller_base.hpp \
src/polling_util.cpp \
src/polling_util.hpp \
src/pollset.cpp \
src/pollset.hpp \
src/precompiled.cpp \
......
......@@ -10,6 +10,7 @@ environment:
CMAKE_GENERATOR: "Visual Studio 12 2013"
MSVCVERSION: "v120"
MSVCYEAR: "vs2013"
ENABLE_DRAFTS: ON
matrix:
- platform: Win32
configuration: Release
......@@ -47,6 +48,7 @@ environment:
configuration: Release
WITH_LIBSODIUM: OFF
ENABLE_CURVE: OFF
ENABLE_DRAFTS: OFF
- platform: Win32
configuration: Release
WITH_LIBSODIUM: ON
......@@ -111,7 +113,7 @@ before_build:
# - cmd: set BUILDLOG="%LIBZMQ_SRCDIR%\build.log"
- cmd: md "%LIBZMQ_BUILDDIR%"
- cd "%LIBZMQ_BUILDDIR%"
- cmd: cmake -D CMAKE_INCLUDE_PATH="%SODIUM_INCLUDE_DIR%" -D CMAKE_LIBRARY_PATH="%SODIUM_LIBRARY_DIR%" -D WITH_LIBSODIUM="%WITH_LIBSODIUM%" -D ENABLE_DRAFTS="ON" -D ENABLE_ANALYSIS="%ENABLE_ANALYSIS%" -D ENABLE_CURVE="%ENABLE_CURVE%" -D API_POLLER="%API_POLLER%" -D POLLER="%POLLER%" -D CMAKE_C_FLAGS_RELEASE="/MT" -D CMAKE_C_FLAGS_DEBUG="/MTd" -D WITH_LIBSODIUM="%WITH_LIBSODIUM%" -G "%CMAKE_GENERATOR%" "%APPVEYOR_BUILD_FOLDER%"
- cmd: cmake -D CMAKE_INCLUDE_PATH="%SODIUM_INCLUDE_DIR%" -D CMAKE_LIBRARY_PATH="%SODIUM_LIBRARY_DIR%" -D WITH_LIBSODIUM="%WITH_LIBSODIUM%" -D ENABLE_DRAFTS="%ENABLE_DRAFTS%" -D ENABLE_ANALYSIS="%ENABLE_ANALYSIS%" -D ENABLE_CURVE="%ENABLE_CURVE%" -D API_POLLER="%API_POLLER%" -D POLLER="%POLLER%" -D CMAKE_C_FLAGS_RELEASE="/MT" -D CMAKE_C_FLAGS_DEBUG="/MTd" -D WITH_LIBSODIUM="%WITH_LIBSODIUM%" -G "%CMAKE_GENERATOR%" "%APPVEYOR_BUILD_FOLDER%"
- cmd: cd "%LIBZMQ_SRCDIR%"
build_script:
......
/*
Copyright (c) 2007-2018 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "precompiled.hpp"
#include "polling_util.hpp"
#if defined ZMQ_POLL_BASED_ON_POLL
#include <limits.h>
#include <algorithm>
zmq::timeout_t zmq::compute_timeout (const bool first_pass_,
const long timeout_,
const uint64_t now_,
const uint64_t end_)
{
if (first_pass_)
return 0;
if (timeout_ < 0)
return -1;
return static_cast<zmq::timeout_t> (
std::min<uint64_t> (end_ - now_, INT_MAX));
}
#endif
/*
Copyright (c) 2007-2018 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_SOCKET_POLLING_UTIL_HPP_INCLUDED__
#define __ZMQ_SOCKET_POLLING_UTIL_HPP_INCLUDED__
#include <stdlib.h>
#include <vector>
#include "stdint.hpp"
#include "platform.hpp"
#include "err.hpp"
namespace zmq
{
template <typename T, size_t S> class fast_vector_t
{
public:
explicit fast_vector_t (const size_t nitems_)
{
if (nitems_ > S) {
_buf = static_cast<T *> (malloc (nitems_ * sizeof (T)));
// TODO since this function is called by a client, we could return errno == ENOMEM here
alloc_assert (_buf);
} else {
_buf = _static_buf;
}
}
T &operator[] (const size_t i) { return _buf[i]; }
~fast_vector_t ()
{
if (_buf != _static_buf)
free (_buf);
}
private:
fast_vector_t (const fast_vector_t &);
fast_vector_t &operator= (const fast_vector_t &);
T _static_buf[S];
T *_buf;
};
template <typename T, size_t S> class resizable_fast_vector_t
{
public:
resizable_fast_vector_t () : _dynamic_buf (NULL) {}
void resize (const size_t nitems_)
{
if (_dynamic_buf)
_dynamic_buf->resize (nitems_);
if (nitems_ > S) {
_dynamic_buf = new (std::nothrow) std::vector<T>;
// TODO since this function is called by a client, we could return errno == ENOMEM here
alloc_assert (_dynamic_buf);
}
}
T *get_buf ()
{
// e.g. MSVC 2008 does not have std::vector::data, so we use &...[0]
return _dynamic_buf ? &(*_dynamic_buf)[0] : _static_buf;
}
T &operator[] (const size_t i) { return get_buf ()[i]; }
~resizable_fast_vector_t () { delete _dynamic_buf; }
private:
resizable_fast_vector_t (const resizable_fast_vector_t &);
resizable_fast_vector_t &operator= (const resizable_fast_vector_t &);
T _static_buf[S];
std::vector<T> *_dynamic_buf;
};
#if defined ZMQ_POLL_BASED_ON_POLL
typedef int timeout_t;
timeout_t compute_timeout (const bool first_pass_,
const long timeout_,
const uint64_t now_,
const uint64_t end_);
#elif defined ZMQ_POLL_BASED_ON_SELECT
inline size_t valid_pollset_bytes (const fd_set &pollset_)
{
#if defined ZMQ_HAVE_WINDOWS
// On Windows we don't need to copy the whole fd_set.
// SOCKETS are continuous from the beginning of fd_array in fd_set.
// We just need to copy fd_count elements of fd_array.
// We gain huge memcpy() improvement if number of used SOCKETs is much lower than FD_SETSIZE.
return reinterpret_cast<const char *> (
&pollset_.fd_array[pollset_.fd_count])
- reinterpret_cast<const char *> (&pollset_);
#else
return sizeof (fd_set);
#endif
}
#if defined ZMQ_HAVE_WINDOWS
class optimized_fd_set_t
{
public:
explicit optimized_fd_set_t (size_t nevents_) : _fd_set (nevents_) {}
fd_set *get () { return reinterpret_cast<fd_set *> (&_fd_set[0]); }
private:
fast_vector_t<char, sizeof (u_int) + ZMQ_POLLITEMS_DFLT * sizeof (SOCKET)>
_fd_set;
};
class resizable_optimized_fd_set_t
{
public:
void resize (size_t nevents_) { _fd_set.resize (nevents_); }
fd_set *get () { return reinterpret_cast<fd_set *> (&_fd_set[0]); }
private:
resizable_fast_vector_t<char,
sizeof (u_int)
+ ZMQ_POLLITEMS_DFLT * sizeof (SOCKET)>
_fd_set;
};
#else
class optimized_fd_set_t
{
public:
explicit optimized_fd_set_t (size_t /*nevents_*/) {}
fd_set *get () { return &_fd_set; }
private:
fd_set _fd_set;
};
class resizable_optimized_fd_set_t : public optimized_fd_set_t
{
public:
resizable_optimized_fd_set_t () : optimized_fd_set_t (0) {}
void resize (size_t /*nevents_*/) {}
};
#endif
#endif
}
#endif
......@@ -30,6 +30,7 @@
#include "precompiled.hpp"
#include "socket_poller.hpp"
#include "err.hpp"
#include "polling_util.hpp"
#include <limits.h>
......@@ -41,10 +42,7 @@ static bool is_thread_safe (zmq::socket_base_t &socket_)
zmq::socket_poller_t::socket_poller_t () :
_tag (0xCAFEBABE),
_signaler (NULL),
_need_rebuild (true),
_use_signaler (false),
_pollset_size (0)
_signaler (NULL)
#if defined ZMQ_POLL_BASED_ON_POLL
,
_pollfds (NULL)
......@@ -53,20 +51,7 @@ zmq::socket_poller_t::socket_poller_t () :
_max_fd (0)
#endif
{
#if defined ZMQ_POLL_BASED_ON_SELECT
#if defined ZMQ_HAVE_WINDOWS
// On Windows fd_set contains array of SOCKETs, each 4 bytes.
// For large fd_sets memset() could be expensive and it is unnecessary.
// It is enough to set fd_count to 0, exactly what FD_ZERO() macro does.
FD_ZERO (&_pollset_in);
FD_ZERO (&_pollset_out);
FD_ZERO (&_pollset_err);
#else
memset (&_pollset_in, 0, sizeof (_pollset_in));
memset (&_pollset_out, 0, sizeof (_pollset_out));
memset (&_pollset_err, 0, sizeof (_pollset_err));
#endif
#endif
rebuild ();
}
zmq::socket_poller_t::~socket_poller_t ()
......@@ -270,6 +255,10 @@ int zmq::socket_poller_t::remove_fd (fd_t fd_)
void zmq::socket_poller_t::rebuild ()
{
_use_signaler = false;
_pollset_size = 0;
_need_rebuild = false;
#if defined ZMQ_POLL_BASED_ON_POLL
if (_pollfds) {
......@@ -277,10 +266,6 @@ void zmq::socket_poller_t::rebuild ()
_pollfds = NULL;
}
_use_signaler = false;
_pollset_size = 0;
for (items_t::iterator it = _items.begin (); it != _items.end (); ++it) {
if (it->events) {
if (it->socket && is_thread_safe (*it->socket)) {
......@@ -333,22 +318,22 @@ void zmq::socket_poller_t::rebuild ()
#elif defined ZMQ_POLL_BASED_ON_SELECT
FD_ZERO (&_pollset_in);
FD_ZERO (&_pollset_out);
FD_ZERO (&_pollset_err);
// Ensure we do not attempt to select () on more than FD_SETSIZE
// file descriptors.
zmq_assert (_items.size () <= FD_SETSIZE);
_pollset_size = 0;
_pollset_in.resize (_items.size ());
_pollset_out.resize (_items.size ());
_pollset_err.resize (_items.size ());
_use_signaler = false;
FD_ZERO (_pollset_in.get ());
FD_ZERO (_pollset_out.get ());
FD_ZERO (_pollset_err.get ());
for (items_t::iterator it = _items.begin (); it != _items.end (); ++it) {
if (it->socket && is_thread_safe (*it->socket) && it->events) {
_use_signaler = true;
FD_SET (_signaler->get_fd (), &_pollset_in);
FD_SET (_signaler->get_fd (), _pollset_in.get ());
_pollset_size = 1;
break;
}
......@@ -369,7 +354,7 @@ void zmq::socket_poller_t::rebuild ()
it->socket->getsockopt (ZMQ_FD, &notify_fd, &fd_size);
zmq_assert (rc == 0);
FD_SET (notify_fd, &_pollset_in);
FD_SET (notify_fd, _pollset_in.get ());
if (_max_fd < notify_fd)
_max_fd = notify_fd;
......@@ -380,11 +365,11 @@ void zmq::socket_poller_t::rebuild ()
// events to the appropriate fd_sets.
else {
if (it->events & ZMQ_POLLIN)
FD_SET (it->fd, &_pollset_in);
FD_SET (it->fd, _pollset_in.get ());
if (it->events & ZMQ_POLLOUT)
FD_SET (it->fd, &_pollset_out);
FD_SET (it->fd, _pollset_out.get ());
if (it->events & ZMQ_POLLERR)
FD_SET (it->fd, &_pollset_err);
FD_SET (it->fd, _pollset_err.get ());
if (_max_fd < it->fd)
_max_fd = it->fd;
......@@ -394,8 +379,6 @@ void zmq::socket_poller_t::rebuild ()
}
#endif
_need_rebuild = false;
}
void zmq::socket_poller_t::zero_trail_events (
......@@ -617,7 +600,9 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_,
bool first_pass = true;
fd_set inset, outset, errset;
optimized_fd_set_t inset (_pollset_size);
optimized_fd_set_t outset (_pollset_size);
optimized_fd_set_t errset (_pollset_size);
while (true) {
// Compute the timeout for the subsequent poll.
......@@ -637,34 +622,21 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_,
// Wait for events. Ignore interrupts if there's infinite timeout.
while (true) {
memcpy (inset.get (), _pollset_in.get (),
valid_pollset_bytes (*_pollset_in.get ()));
memcpy (outset.get (), _pollset_out.get (),
valid_pollset_bytes (*_pollset_out.get ()));
memcpy (errset.get (), _pollset_err.get (),
valid_pollset_bytes (*_pollset_err.get ()));
const int rc = select (static_cast<int> (_max_fd + 1), inset.get (),
outset.get (), errset.get (), ptimeout);
#if defined ZMQ_HAVE_WINDOWS
// On Windows we don't need to copy the whole fd_set.
// SOCKETS are continuous from the beginning of fd_array in fd_set.
// We just need to copy fd_count elements of fd_array.
// We gain huge memcpy() improvement if number of used SOCKETs is much lower than FD_SETSIZE.
memcpy (&inset, &_pollset_in,
reinterpret_cast<char *> (_pollset_in.fd_array
+ _pollset_in.fd_count)
- reinterpret_cast<char *> (&_pollset_in));
memcpy (&outset, &_pollset_out,
reinterpret_cast<char *> (_pollset_out.fd_array
+ _pollset_out.fd_count)
- reinterpret_cast<char *> (&_pollset_out));
memcpy (&errset, &_pollset_err,
reinterpret_cast<char *> (_pollset_err.fd_array
+ _pollset_err.fd_count)
- reinterpret_cast<char *> (&_pollset_err));
int rc = select (0, &inset, &outset, &errset, ptimeout);
if (unlikely (rc == SOCKET_ERROR)) {
errno = zmq::wsa_error_to_errno (WSAGetLastError ());
errno = wsa_error_to_errno (WSAGetLastError ());
wsa_assert (errno == ENOTSOCK);
return -1;
}
#else
memcpy (&inset, &_pollset_in, sizeof (fd_set));
memcpy (&outset, &_pollset_out, sizeof (fd_set));
memcpy (&errset, &_pollset_err, sizeof (fd_set));
int rc = select (_max_fd + 1, &inset, &outset, &errset, ptimeout);
if (unlikely (rc == -1)) {
errno_assert (errno == EINTR || errno == EBADF);
return -1;
......@@ -673,11 +645,12 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_,
break;
}
if (_use_signaler && FD_ISSET (_signaler->get_fd (), &inset))
if (_use_signaler && FD_ISSET (_signaler->get_fd (), inset.get ()))
_signaler->recv ();
// Check for the events.
int found = check_events (events_, n_events_, inset, outset, errset);
const int found = check_events (events_, n_events_, *inset.get (),
*outset.get (), *errset.get ());
if (found) {
if (found > 0)
zero_trail_events (events_, n_events_, found);
......
......@@ -50,6 +50,7 @@
#include "socket_base.hpp"
#include "signaler.hpp"
#include "polling_util.hpp"
namespace zmq
{
......@@ -135,9 +136,9 @@ class socket_poller_t
#if defined ZMQ_POLL_BASED_ON_POLL
pollfd *_pollfds;
#elif defined ZMQ_POLL_BASED_ON_SELECT
fd_set _pollset_in;
fd_set _pollset_out;
fd_set _pollset_err;
resizable_optimized_fd_set_t _pollset_in;
resizable_optimized_fd_set_t _pollset_out;
resizable_optimized_fd_set_t _pollset_err;
zmq::fd_t _max_fd;
#endif
......
......@@ -41,6 +41,7 @@
#include "macros.hpp"
#include "poller.hpp"
#if !defined ZMQ_HAVE_POLLER
// On AIX platform, poll.h has to be included first to get consistent
// definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
// instead of 'events' and 'revents' and defines macros to map from POSIX-y
......@@ -49,6 +50,9 @@
#include <poll.h>
#endif
#include "polling_util.hpp"
#endif
// TODO: determine if this is an issue, since zmq.h is being loaded from pch.
// zmq.h must be included *after* poll.h for AIX to build properly
//#include "../include/zmq.h"
......@@ -800,7 +804,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
// if poller is present, use that.
return zmq_poller_poll (items_, nitems_, timeout_);
#else
#if defined ZMQ_POLL_BASED_ON_POLL
#if defined ZMQ_POLL_BASED_ON_POLL || defined ZMQ_POLL_BASED_ON_SELECT
if (unlikely (nitems_ < 0)) {
errno = EINVAL;
return -1;
......@@ -811,14 +815,15 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
#if defined ZMQ_HAVE_WINDOWS
Sleep (timeout_ > 0 ? timeout_ : INFINITE);
return 0;
#elif defined ZMQ_HAVE_ANDROID
usleep (timeout_ * 1000);
return 0;
#elif defined ZMQ_HAVE_VXWORKS
struct timespec ns_;
ns_.tv_sec = timeout_ / 1000;
ns_.tv_nsec = timeout_ % 1000 * 1000000;
return nanosleep (&ns_, 0);
#else
return usleep (timeout_ * 1000);
#endif
}
if (!items_) {
errno = EFAULT;
return -1;
......@@ -827,13 +832,8 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
zmq::clock_t clock;
uint64_t now = 0;
uint64_t end = 0;
pollfd spollfds[ZMQ_POLLITEMS_DFLT];
pollfd *pollfds = spollfds;
if (nitems_ > ZMQ_POLLITEMS_DFLT) {
pollfds = static_cast<pollfd *> (malloc (nitems_ * sizeof (pollfd)));
alloc_assert (pollfds);
}
#if defined ZMQ_POLL_BASED_ON_POLL
zmq::fast_vector_t<pollfd, ZMQ_POLLITEMS_DFLT> pollfds (nitems_);
// Build pollset for poll () system call.
for (int i = 0; i != nitems_; i++) {
......@@ -844,8 +844,6 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
if (zmq_getsockopt (items_[i].socket, ZMQ_FD, &pollfds[i].fd,
&zmq_fd_size)
== -1) {
if (pollfds != spollfds)
free (pollfds);
return -1;
}
pollfds[i].events = items_[i].events ? POLLIN : 0;
......@@ -860,27 +858,71 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
| (items_[i].events & ZMQ_POLLPRI ? POLLPRI : 0);
}
}
#else
// Ensure we do not attempt to select () on more than FD_SETSIZE
// file descriptors.
// TODO since this function is called by a client, we could return errno EINVAL/ENOMEM/... here
zmq_assert (nitems_ <= FD_SETSIZE);
zmq::optimized_fd_set_t pollset_in (nitems_);
FD_ZERO (pollset_in.get ());
zmq::optimized_fd_set_t pollset_out (nitems_);
FD_ZERO (pollset_out.get ());
zmq::optimized_fd_set_t pollset_err (nitems_);
FD_ZERO (pollset_err.get ());
zmq::fd_t maxfd = 0;
// Build the fd_sets for passing to select ().
for (int i = 0; i != nitems_; i++) {
// If the poll item is a 0MQ socket we are interested in input on the
// notification file descriptor retrieved by the ZMQ_FD socket option.
if (items_[i].socket) {
size_t zmq_fd_size = sizeof (zmq::fd_t);
zmq::fd_t notify_fd;
if (zmq_getsockopt (items_[i].socket, ZMQ_FD, &notify_fd,
&zmq_fd_size)
== -1)
return -1;
if (items_[i].events) {
FD_SET (notify_fd, pollset_in.get ());
if (maxfd < notify_fd)
maxfd = notify_fd;
}
}
// Else, the poll item is a raw file descriptor. Convert the poll item
// events to the appropriate fd_sets.
else {
if (items_[i].events & ZMQ_POLLIN)
FD_SET (items_[i].fd, pollset_in.get ());
if (items_[i].events & ZMQ_POLLOUT)
FD_SET (items_[i].fd, pollset_out.get ());
if (items_[i].events & ZMQ_POLLERR)
FD_SET (items_[i].fd, pollset_err.get ());
if (maxfd < items_[i].fd)
maxfd = items_[i].fd;
}
}
zmq::optimized_fd_set_t inset (nitems_);
zmq::optimized_fd_set_t outset (nitems_);
zmq::optimized_fd_set_t errset (nitems_);
#endif
bool first_pass = true;
int nevents = 0;
while (true) {
#if defined ZMQ_POLL_BASED_ON_POLL
// Compute the timeout for the subsequent poll.
int timeout;
if (first_pass)
timeout = 0;
else if (timeout_ < 0)
timeout = -1;
else
timeout =
static_cast<int> (std::min<uint64_t> (end - now, INT_MAX));
zmq::timeout_t timeout =
zmq::compute_timeout (first_pass, timeout_, now, end);
// Wait for events.
{
int rc = poll (pollfds, nitems_, timeout);
int rc = poll (&pollfds[0], nitems_, timeout);
if (rc == -1 && errno == EINTR) {
if (pollfds != spollfds)
free (pollfds);
return -1;
}
errno_assert (rc >= 0);
......@@ -897,8 +939,6 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
if (zmq_getsockopt (items_[i].socket, ZMQ_EVENTS, &zmq_events,
&zmq_events_size)
== -1) {
if (pollfds != spollfds)
free (pollfds);
return -1;
}
if ((items_[i].events & ZMQ_POLLOUT)
......@@ -925,119 +965,8 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
nevents++;
}
// If timeout is zero, exit immediately whether there are events or not.
if (timeout_ == 0)
break;
// If there are events to return, we can exit immediately.
if (nevents)
break;
// At this point we are meant to wait for events but there are none.
// If timeout is infinite we can just loop until we get some events.
if (timeout_ < 0) {
if (first_pass)
first_pass = false;
continue;
}
// The timeout is finite and there are no events. In the first pass
// we get a timestamp of when the polling have begun. (We assume that
// first pass have taken negligible time). We also compute the time
// when the polling should time out.
if (first_pass) {
now = clock.now_ms ();
end = now + timeout_;
if (now == end)
break;
first_pass = false;
continue;
}
// Find out whether timeout have expired.
now = clock.now_ms ();
if (now >= end)
break;
}
if (pollfds != spollfds)
free (pollfds);
return nevents;
#elif defined ZMQ_POLL_BASED_ON_SELECT
if (unlikely (nitems_ < 0)) {
errno = EINVAL;
return -1;
}
if (unlikely (nitems_ == 0)) {
if (timeout_ == 0)
return 0;
#if defined ZMQ_HAVE_WINDOWS
Sleep (timeout_ > 0 ? timeout_ : INFINITE);
return 0;
#elif defined ZMQ_HAVE_VXWORKS
struct timespec ns_;
ns_.tv_sec = timeout_ / 1000;
ns_.tv_nsec = timeout_ % 1000 * 1000000;
return nanosleep (&ns_, 0);
#else
return usleep (timeout_ * 1000);
#endif
}
zmq::clock_t clock;
uint64_t now = 0;
uint64_t end = 0;
// Ensure we do not attempt to select () on more than FD_SETSIZE
// file descriptors.
zmq_assert (nitems_ <= FD_SETSIZE);
fd_set pollset_in;
FD_ZERO (&pollset_in);
fd_set pollset_out;
FD_ZERO (&pollset_out);
fd_set pollset_err;
FD_ZERO (&pollset_err);
zmq::fd_t maxfd = 0;
// Build the fd_sets for passing to select ().
for (int i = 0; i != nitems_; i++) {
// If the poll item is a 0MQ socket we are interested in input on the
// notification file descriptor retrieved by the ZMQ_FD socket option.
if (items_[i].socket) {
size_t zmq_fd_size = sizeof (zmq::fd_t);
zmq::fd_t notify_fd;
if (zmq_getsockopt (items_[i].socket, ZMQ_FD, &notify_fd,
&zmq_fd_size)
== -1)
return -1;
if (items_[i].events) {
FD_SET (notify_fd, &pollset_in);
if (maxfd < notify_fd)
maxfd = notify_fd;
}
}
// Else, the poll item is a raw file descriptor. Convert the poll item
// events to the appropriate fd_sets.
else {
if (items_[i].events & ZMQ_POLLIN)
FD_SET (items_[i].fd, &pollset_in);
if (items_[i].events & ZMQ_POLLOUT)
FD_SET (items_[i].fd, &pollset_out);
if (items_[i].events & ZMQ_POLLERR)
FD_SET (items_[i].fd, &pollset_err);
if (maxfd < items_[i].fd)
maxfd = items_[i].fd;
}
}
bool first_pass = true;
int nevents = 0;
fd_set inset, outset, errset;
while (true) {
// Compute the timeout for the subsequent poll.
timeval timeout;
timeval *ptimeout;
......@@ -1055,34 +984,23 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
// Wait for events. Ignore interrupts if there's infinite timeout.
while (true) {
memcpy (inset.get (), pollset_in.get (),
zmq::valid_pollset_bytes (*pollset_in.get ()));
memcpy (outset.get (), pollset_out.get (),
zmq::valid_pollset_bytes (*pollset_out.get ()));
memcpy (errset.get (), pollset_err.get (),
zmq::valid_pollset_bytes (*pollset_err.get ()));
#if defined ZMQ_HAVE_WINDOWS
// On Windows we don't need to copy the whole fd_set.
// SOCKETS are continuous from the beginning of fd_array in fd_set.
// We just need to copy fd_count elements of fd_array.
// We gain huge memcpy() improvement if number of used SOCKETs is much lower than FD_SETSIZE.
memcpy (&inset, &pollset_in,
reinterpret_cast<char *> (pollset_in.fd_array
+ pollset_in.fd_count)
- reinterpret_cast<char *> (&pollset_in));
memcpy (&outset, &pollset_out,
reinterpret_cast<char *> (pollset_out.fd_array
+ pollset_out.fd_count)
- reinterpret_cast<char *> (&pollset_out));
memcpy (&errset, &pollset_err,
reinterpret_cast<char *> (pollset_err.fd_array
+ pollset_err.fd_count)
- reinterpret_cast<char *> (&pollset_err));
int rc = select (0, &inset, &outset, &errset, ptimeout);
int rc =
select (0, inset.get (), outset.get (), errset.get (), ptimeout);
if (unlikely (rc == SOCKET_ERROR)) {
errno = zmq::wsa_error_to_errno (WSAGetLastError ());
wsa_assert (errno == ENOTSOCK);
return -1;
}
#else
memcpy (&inset, &pollset_in, sizeof (fd_set));
memcpy (&outset, &pollset_out, sizeof (fd_set));
memcpy (&errset, &pollset_err, sizeof (fd_set));
int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
int rc = select (maxfd + 1, inset.get (), outset.get (),
errset.get (), ptimeout);
if (unlikely (rc == -1)) {
errno_assert (errno == EINTR || errno == EBADF);
return -1;
......@@ -1114,17 +1032,18 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
// Else, the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format.
else {
if (FD_ISSET (items_[i].fd, &inset))
if (FD_ISSET (items_[i].fd, inset.get ()))
items_[i].revents |= ZMQ_POLLIN;
if (FD_ISSET (items_[i].fd, &outset))
if (FD_ISSET (items_[i].fd, outset.get ()))
items_[i].revents |= ZMQ_POLLOUT;
if (FD_ISSET (items_[i].fd, &errset))
if (FD_ISSET (items_[i].fd, errset.get ()))
items_[i].revents |= ZMQ_POLLERR;
}
if (items_[i].revents)
nevents++;
}
#endif
// If timeout is zero, exit immediately whether there are events or not.
if (timeout_ == 0)
......@@ -1162,7 +1081,6 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
}
return nevents;
#else
// Exotic platforms that support neither poll() nor select().
errno = ENOTSUP;
......
......@@ -224,7 +224,7 @@ if(WIN32 AND ${POLLER} MATCHES "epoll")
set_tests_properties(test_many_sockets PROPERTIES TIMEOUT 120)
endif()
if(WIN32)
if(WIN32 AND ENABLE_DRAFTS)
set_tests_properties(test_radio_dish PROPERTIES TIMEOUT 30)
endif()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment