Commit 5ac900a9 authored by Simon Giesecke's avatar Simon Giesecke

Problem: huge size of socket_poller_t object on Windows with select polling

Solution: use resizable_optimized_fd_set_t
parent 4a651251
...@@ -31,6 +31,7 @@ ...@@ -31,6 +31,7 @@
#define __ZMQ_SOCKET_POLLING_UTIL_HPP_INCLUDED__ #define __ZMQ_SOCKET_POLLING_UTIL_HPP_INCLUDED__
#include <stdlib.h> #include <stdlib.h>
#include <vector>
#include "stdint.hpp" #include "stdint.hpp"
#include "platform.hpp" #include "platform.hpp"
...@@ -68,6 +69,40 @@ template <typename T, size_t S> class fast_vector_t ...@@ -68,6 +69,40 @@ template <typename T, size_t S> class fast_vector_t
T *_buf; T *_buf;
}; };
template <typename T, size_t S> class resizable_fast_vector_t
{
public:
resizable_fast_vector_t () : _dynamic_buf (NULL) {}
void resize (const size_t nitems_)
{
if (_dynamic_buf)
_dynamic_buf->resize (nitems_);
if (nitems_ > S) {
_dynamic_buf = new (std::nothrow) std::vector<T>;
// TODO since this function is called by a client, we could return errno == ENOMEM here
alloc_assert (_dynamic_buf);
}
}
T *get_buf ()
{
// e.g. MSVC 2008 does not have std::vector::data, so we use &...[0]
return _dynamic_buf ? &(*_dynamic_buf)[0] : _static_buf;
}
T &operator[] (const size_t i) { return get_buf ()[i]; }
~resizable_fast_vector_t () { delete _dynamic_buf; }
private:
resizable_fast_vector_t (const resizable_fast_vector_t &);
resizable_fast_vector_t &operator= (const resizable_fast_vector_t &);
T _static_buf[S];
std::vector<T> *_dynamic_buf;
};
#if defined ZMQ_POLL_BASED_ON_POLL #if defined ZMQ_POLL_BASED_ON_POLL
typedef int timeout_t; typedef int timeout_t;
...@@ -104,6 +139,20 @@ class optimized_fd_set_t ...@@ -104,6 +139,20 @@ class optimized_fd_set_t
fast_vector_t<char, sizeof (u_int) + ZMQ_POLLITEMS_DFLT * sizeof (SOCKET)> fast_vector_t<char, sizeof (u_int) + ZMQ_POLLITEMS_DFLT * sizeof (SOCKET)>
_fd_set; _fd_set;
}; };
class resizable_optimized_fd_set_t
{
public:
void resize (size_t nevents_) { _fd_set.resize (nevents_); }
fd_set *get () { return reinterpret_cast<fd_set *> (&_fd_set[0]); }
private:
resizable_fast_vector_t<char,
sizeof (u_int)
+ ZMQ_POLLITEMS_DFLT * sizeof (SOCKET)>
_fd_set;
};
#else #else
class optimized_fd_set_t class optimized_fd_set_t
{ {
...@@ -115,6 +164,14 @@ class optimized_fd_set_t ...@@ -115,6 +164,14 @@ class optimized_fd_set_t
private: private:
fd_set _fd_set; fd_set _fd_set;
}; };
class resizable_optimized_fd_set_t : public optimized_fd_set_t
{
public:
resizable_optimized_fd_set_t () : optimized_fd_set_t (0) {}
void resize (size_t /*nevents_*/) {}
};
#endif #endif
#endif #endif
} }
......
...@@ -318,18 +318,22 @@ void zmq::socket_poller_t::rebuild () ...@@ -318,18 +318,22 @@ void zmq::socket_poller_t::rebuild ()
#elif defined ZMQ_POLL_BASED_ON_SELECT #elif defined ZMQ_POLL_BASED_ON_SELECT
FD_ZERO (&_pollset_in);
FD_ZERO (&_pollset_out);
FD_ZERO (&_pollset_err);
// Ensure we do not attempt to select () on more than FD_SETSIZE // Ensure we do not attempt to select () on more than FD_SETSIZE
// file descriptors. // file descriptors.
zmq_assert (_items.size () <= FD_SETSIZE); zmq_assert (_items.size () <= FD_SETSIZE);
_pollset_in.resize (_items.size ());
_pollset_out.resize (_items.size ());
_pollset_err.resize (_items.size ());
FD_ZERO (_pollset_in.get ());
FD_ZERO (_pollset_out.get ());
FD_ZERO (_pollset_err.get ());
for (items_t::iterator it = _items.begin (); it != _items.end (); ++it) { for (items_t::iterator it = _items.begin (); it != _items.end (); ++it) {
if (it->socket && is_thread_safe (*it->socket) && it->events) { if (it->socket && is_thread_safe (*it->socket) && it->events) {
_use_signaler = true; _use_signaler = true;
FD_SET (_signaler->get_fd (), &_pollset_in); FD_SET (_signaler->get_fd (), _pollset_in.get ());
_pollset_size = 1; _pollset_size = 1;
break; break;
} }
...@@ -350,7 +354,7 @@ void zmq::socket_poller_t::rebuild () ...@@ -350,7 +354,7 @@ void zmq::socket_poller_t::rebuild ()
it->socket->getsockopt (ZMQ_FD, &notify_fd, &fd_size); it->socket->getsockopt (ZMQ_FD, &notify_fd, &fd_size);
zmq_assert (rc == 0); zmq_assert (rc == 0);
FD_SET (notify_fd, &_pollset_in); FD_SET (notify_fd, _pollset_in.get ());
if (_max_fd < notify_fd) if (_max_fd < notify_fd)
_max_fd = notify_fd; _max_fd = notify_fd;
...@@ -361,11 +365,11 @@ void zmq::socket_poller_t::rebuild () ...@@ -361,11 +365,11 @@ void zmq::socket_poller_t::rebuild ()
// events to the appropriate fd_sets. // events to the appropriate fd_sets.
else { else {
if (it->events & ZMQ_POLLIN) if (it->events & ZMQ_POLLIN)
FD_SET (it->fd, &_pollset_in); FD_SET (it->fd, _pollset_in.get ());
if (it->events & ZMQ_POLLOUT) if (it->events & ZMQ_POLLOUT)
FD_SET (it->fd, &_pollset_out); FD_SET (it->fd, _pollset_out.get ());
if (it->events & ZMQ_POLLERR) if (it->events & ZMQ_POLLERR)
FD_SET (it->fd, &_pollset_err); FD_SET (it->fd, _pollset_err.get ());
if (_max_fd < it->fd) if (_max_fd < it->fd)
_max_fd = it->fd; _max_fd = it->fd;
...@@ -618,12 +622,12 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, ...@@ -618,12 +622,12 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_,
// Wait for events. Ignore interrupts if there's infinite timeout. // Wait for events. Ignore interrupts if there's infinite timeout.
while (true) { while (true) {
memcpy (inset.get (), &_pollset_in, memcpy (inset.get (), _pollset_in.get (),
valid_pollset_bytes (_pollset_in)); valid_pollset_bytes (*_pollset_in.get ()));
memcpy (outset.get (), &_pollset_out, memcpy (outset.get (), _pollset_out.get (),
valid_pollset_bytes (_pollset_out)); valid_pollset_bytes (*_pollset_out.get ()));
memcpy (errset.get (), &_pollset_err, memcpy (errset.get (), _pollset_err.get (),
valid_pollset_bytes (_pollset_err)); valid_pollset_bytes (*_pollset_err.get ()));
const int rc = select (static_cast<int> (_max_fd + 1), inset.get (), const int rc = select (static_cast<int> (_max_fd + 1), inset.get (),
outset.get (), errset.get (), ptimeout); outset.get (), errset.get (), ptimeout);
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
...@@ -641,7 +645,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, ...@@ -641,7 +645,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_,
break; break;
} }
if (_use_signaler && FD_ISSET (_signaler->get_fd (), &inset)) if (_use_signaler && FD_ISSET (_signaler->get_fd (), inset.get ()))
_signaler->recv (); _signaler->recv ();
// Check for the events. // Check for the events.
......
...@@ -50,6 +50,7 @@ ...@@ -50,6 +50,7 @@
#include "socket_base.hpp" #include "socket_base.hpp"
#include "signaler.hpp" #include "signaler.hpp"
#include "polling_util.hpp"
namespace zmq namespace zmq
{ {
...@@ -135,9 +136,9 @@ class socket_poller_t ...@@ -135,9 +136,9 @@ class socket_poller_t
#if defined ZMQ_POLL_BASED_ON_POLL #if defined ZMQ_POLL_BASED_ON_POLL
pollfd *_pollfds; pollfd *_pollfds;
#elif defined ZMQ_POLL_BASED_ON_SELECT #elif defined ZMQ_POLL_BASED_ON_SELECT
fd_set _pollset_in; resizable_optimized_fd_set_t _pollset_in;
fd_set _pollset_out; resizable_optimized_fd_set_t _pollset_out;
fd_set _pollset_err; resizable_optimized_fd_set_t _pollset_err;
zmq::fd_t _max_fd; zmq::fd_t _max_fd;
#endif #endif
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment