Commit 812e7562 authored by Luca Boccassi's avatar Luca Boccassi Committed by GitHub

Merge pull request #2809 from sigiesec/optimize-select-win

Optimize select on Windows; reduce code duplication in select_t
parents 2b75a9ef f9d7eea6
...@@ -54,10 +54,13 @@ zmq::select_t::select_t (const zmq::ctx_t &ctx_) : ...@@ -54,10 +54,13 @@ zmq::select_t::select_t (const zmq::ctx_t &ctx_) :
current_family_entry_it (family_entries.end ()), current_family_entry_it (family_entries.end ()),
#else #else
maxfd (retired_fd), maxfd (retired_fd),
retired (false),
#endif #endif
stopping (false) stopping (false)
{ {
#if defined ZMQ_HAVE_WINDOWS
for (size_t i = 0; i < fd_family_cache_size; ++i)
fd_family_cache [i] = std::make_pair (retired_fd, 0);
#endif
} }
zmq::select_t::~select_t () zmq::select_t::~select_t ()
...@@ -74,13 +77,12 @@ zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_) ...@@ -74,13 +77,12 @@ zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
u_short family = get_fd_family (fd_); u_short family = get_fd_family (fd_);
wsa_assert (family != AF_UNSPEC); wsa_assert (family != AF_UNSPEC);
family_entry_t& family_entry = family_entries [family]; family_entry_t &family_entry = family_entries [family];
#endif
family_entry.fd_entries.push_back (fd_entry); family_entry.fd_entries.push_back (fd_entry);
FD_SET (fd_, &family_entry.fds_set.error); FD_SET (fd_, &family_entry.fds_set.error);
#else
fd_entries.push_back (fd_entry);
FD_SET (fd_, &fds_set.error);
#if !defined ZMQ_HAVE_WINDOWS
if (fd_ > maxfd) if (fd_ > maxfd)
maxfd = fd_; maxfd = fd_;
#endif #endif
...@@ -103,6 +105,47 @@ zmq::select_t::find_fd_entry_by_handle (fd_entries_t &fd_entries, ...@@ -103,6 +105,47 @@ zmq::select_t::find_fd_entry_by_handle (fd_entries_t &fd_entries,
return fd_entry_it; return fd_entry_it;
} }
void zmq::select_t::trigger_events (const fd_entries_t &fd_entries_,
const fds_set_t &local_fds_set_,
int event_count_)
{
// Size is cached to avoid iteration through recently added descriptors.
for (fd_entries_t::size_type i = 0, size = fd_entries_.size ();
i < size && event_count_ > 0; ++i) {
const fd_entry_t &current_fd_entry = fd_entries_ [i];
if (is_retired_fd (current_fd_entry))
continue;
if (FD_ISSET (current_fd_entry.fd, &local_fds_set_.read)) {
current_fd_entry.events->in_event ();
--event_count_;
}
// TODO: can the is_retired_fd be true at this point? if it
// was retired before, we would already have continued, and I
// don't see where it might have been modified
// And if rc == 0, we can break instead of continuing
if (is_retired_fd (current_fd_entry) || event_count_ == 0)
continue;
if (FD_ISSET (current_fd_entry.fd, &local_fds_set_.write)) {
current_fd_entry.events->out_event ();
--event_count_;
}
// TODO: same as above
if (is_retired_fd (current_fd_entry) || event_count_ == 0)
continue;
if (FD_ISSET (current_fd_entry.fd, &local_fds_set_.error)) {
current_fd_entry.events->in_event ();
--event_count_;
}
}
}
#if defined ZMQ_HAVE_WINDOWS
bool zmq::select_t::try_remove_fd_entry ( bool zmq::select_t::try_remove_fd_entry (
family_entries_t::iterator family_entry_it, zmq::fd_t &handle_) family_entries_t::iterator family_entry_it, zmq::fd_t &handle_)
{ {
...@@ -126,6 +169,7 @@ bool zmq::select_t::try_remove_fd_entry ( ...@@ -126,6 +169,7 @@ bool zmq::select_t::try_remove_fd_entry (
family_entry.fds_set.remove_fd (handle_); family_entry.fds_set.remove_fd (handle_);
return true; return true;
} }
#endif
void zmq::select_t::rm_fd (handle_t handle_) void zmq::select_t::rm_fd (handle_t handle_)
{ {
...@@ -151,21 +195,21 @@ void zmq::select_t::rm_fd (handle_t handle_) ...@@ -151,21 +195,21 @@ void zmq::select_t::rm_fd (handle_t handle_)
} }
#else #else
fd_entries_t::iterator fd_entry_it = fd_entries_t::iterator fd_entry_it =
find_fd_entry_by_handle (fd_entries, handle_); find_fd_entry_by_handle (family_entry.fd_entries, handle_);
assert (fd_entry_it != fd_entries.end ()); assert (fd_entry_it != fd_entries.end ());
fd_entry_it->fd = retired_fd; fd_entry_it->fd = retired_fd;
fds_set.remove_fd (handle_); family_entry.fds_set.remove_fd (handle_);
if (handle_ == maxfd) { if (handle_ == maxfd) {
maxfd = retired_fd; maxfd = retired_fd;
for (fd_entry_it = fd_entries.begin (); fd_entry_it != fd_entries.end (); for (fd_entry_it = family_entry.fd_entries.begin ();
++fd_entry_it) fd_entry_it != family_entry.fd_entries.end (); ++fd_entry_it)
if (fd_entry_it->fd > maxfd) if (fd_entry_it->fd > maxfd)
maxfd = fd_entry_it->fd; maxfd = fd_entry_it->fd;
} }
retired = true; family_entry.retired = true;
#endif #endif
adjust_load (-1); adjust_load (-1);
} }
...@@ -175,10 +219,9 @@ void zmq::select_t::set_pollin (handle_t handle_) ...@@ -175,10 +219,9 @@ void zmq::select_t::set_pollin (handle_t handle_)
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
u_short family = get_fd_family (handle_); u_short family = get_fd_family (handle_);
wsa_assert (family != AF_UNSPEC); wsa_assert (family != AF_UNSPEC);
FD_SET (handle_, &family_entries [family].fds_set.read); family_entry_t &family_entry = family_entries [family];
#else
FD_SET (handle_, &fds_set.read);
#endif #endif
FD_SET (handle_, &family_entry.fds_set.read);
} }
void zmq::select_t::reset_pollin (handle_t handle_) void zmq::select_t::reset_pollin (handle_t handle_)
...@@ -186,10 +229,9 @@ void zmq::select_t::reset_pollin (handle_t handle_) ...@@ -186,10 +229,9 @@ void zmq::select_t::reset_pollin (handle_t handle_)
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
u_short family = get_fd_family (handle_); u_short family = get_fd_family (handle_);
wsa_assert (family != AF_UNSPEC); wsa_assert (family != AF_UNSPEC);
FD_CLR (handle_, &family_entries [family].fds_set.read); family_entry_t &family_entry = family_entries [family];
#else
FD_CLR (handle_, &fds_set.read);
#endif #endif
FD_CLR (handle_, &family_entry.fds_set.read);
} }
void zmq::select_t::set_pollout (handle_t handle_) void zmq::select_t::set_pollout (handle_t handle_)
...@@ -197,10 +239,9 @@ void zmq::select_t::set_pollout (handle_t handle_) ...@@ -197,10 +239,9 @@ void zmq::select_t::set_pollout (handle_t handle_)
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
u_short family = get_fd_family (handle_); u_short family = get_fd_family (handle_);
wsa_assert (family != AF_UNSPEC); wsa_assert (family != AF_UNSPEC);
FD_SET (handle_, &family_entries [family].fds_set.write); family_entry_t &family_entry = family_entries [family];
#else
FD_SET (handle_, &fds_set.write);
#endif #endif
FD_SET (handle_, &family_entry.fds_set.write);
} }
void zmq::select_t::reset_pollout (handle_t handle_) void zmq::select_t::reset_pollout (handle_t handle_)
...@@ -208,10 +249,9 @@ void zmq::select_t::reset_pollout (handle_t handle_) ...@@ -208,10 +249,9 @@ void zmq::select_t::reset_pollout (handle_t handle_)
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
u_short family = get_fd_family (handle_); u_short family = get_fd_family (handle_);
wsa_assert (family != AF_UNSPEC); wsa_assert (family != AF_UNSPEC);
FD_CLR (handle_, &family_entries [family].fds_set.write); family_entry_t &family_entry = family_entries [family];
#else
FD_CLR (handle_, &fds_set.write);
#endif #endif
FD_CLR (handle_, &family_entry.fds_set.write);
} }
void zmq::select_t::start () void zmq::select_t::start ()
...@@ -236,9 +276,10 @@ void zmq::select_t::loop () ...@@ -236,9 +276,10 @@ void zmq::select_t::loop ()
int timeout = (int) execute_timers (); int timeout = (int) execute_timers ();
#if defined ZMQ_HAVE_OSX #if defined ZMQ_HAVE_OSX
struct timeval tv = { (long) (timeout / 1000), timeout % 1000 * 1000 }; struct timeval tv = {(long) (timeout / 1000), timeout % 1000 * 1000};
#else #else
struct timeval tv = { (long) (timeout / 1000), (long) (timeout % 1000 * 1000) }; struct timeval tv = {(long) (timeout / 1000),
(long) (timeout % 1000 * 1000)};
#endif #endif
int rc = 0; int rc = 0;
...@@ -253,38 +294,51 @@ void zmq::select_t::loop () ...@@ -253,38 +294,51 @@ void zmq::select_t::loop ()
cannot be used alone, because it does not support more than 64 events cannot be used alone, because it does not support more than 64 events
which is not enough. which is not enough.
To reduce unncessary overhead, WSA is only used when there are more To reduce unnecessary overhead, WSA is only used when there are more
than one family. Moreover, AF_INET and AF_INET6 are considered the same than one family. Moreover, AF_INET and AF_INET6 are considered the same
family because Windows seems to handle them properly. family because Windows seems to handle them properly.
See get_fd_family for details. See get_fd_family for details.
*/ */
// If there is just one family, there is no reason to use WSA events. // If there is just one family, there is no reason to use WSA events.
if (family_entries.size () > 1) { const bool use_wsa_events = family_entries.size () > 1;
if (use_wsa_events) {
// TODO: I don't really understand why we are doing this. If any of
// the events was signaled, we will call select for each fd_family
// afterwards. The only benefit is if none of the events was
// signaled, then we continue early.
// IMHO, either WSAEventSelect/WSAWaitForMultipleEvents or select
// should be used, but not both
wsa_events_t wsa_events; wsa_events_t wsa_events;
for (family_entries_t::iterator family_entry_it = family_entries.begin (); for (family_entries_t::iterator family_entry_it =
family_entry_it != family_entries.end (); ++family_entry_it) { family_entries.begin ();
family_entry_t& family_entry = family_entry_it->second; family_entry_it != family_entries.end (); ++family_entry_it) {
family_entry_t &family_entry = family_entry_it->second;
for (fd_entries_t::iterator fd_entry_it = family_entry.fd_entries.begin (); for (fd_entries_t::iterator fd_entry_it =
fd_entry_it != family_entry.fd_entries.end (); ++fd_entry_it) { family_entry.fd_entries.begin ();
fd_entry_it != family_entry.fd_entries.end ();
++fd_entry_it) {
fd_t fd = fd_entry_it->fd; fd_t fd = fd_entry_it->fd;
// http://stackoverflow.com/q/35043420/188530 // http://stackoverflow.com/q/35043420/188530
if (FD_ISSET (fd, &family_entry.fds_set.read) && if (FD_ISSET (fd, &family_entry.fds_set.read)
FD_ISSET (fd, &family_entry.fds_set.write)) && FD_ISSET (fd, &family_entry.fds_set.write))
rc = WSAEventSelect (fd, wsa_events.events [3], rc =
FD_READ | FD_ACCEPT | FD_CLOSE | FD_WRITE | FD_CONNECT | FD_OOB); WSAEventSelect (fd, wsa_events.events [3],
FD_READ | FD_ACCEPT | FD_CLOSE
| FD_WRITE | FD_CONNECT | FD_OOB);
else if (FD_ISSET (fd, &family_entry.fds_set.read)) else if (FD_ISSET (fd, &family_entry.fds_set.read))
rc = WSAEventSelect (fd, wsa_events.events [0], rc = WSAEventSelect (fd, wsa_events.events [0],
FD_READ | FD_ACCEPT | FD_CLOSE | FD_OOB); FD_READ | FD_ACCEPT | FD_CLOSE
| FD_OOB);
else if (FD_ISSET (fd, &family_entry.fds_set.write)) else if (FD_ISSET (fd, &family_entry.fds_set.write))
rc = WSAEventSelect (fd, wsa_events.events [1], rc = WSAEventSelect (fd, wsa_events.events [1],
FD_WRITE | FD_CONNECT | FD_OOB); FD_WRITE | FD_CONNECT | FD_OOB);
else if (FD_ISSET (fd, &family_entry.fds_set.error)) else if (FD_ISSET (fd, &family_entry.fds_set.error))
rc = WSAEventSelect (fd, wsa_events.events [2], rc = WSAEventSelect (fd, wsa_events.events [2], FD_OOB);
FD_OOB);
else else
rc = 0; rc = 0;
...@@ -293,8 +347,8 @@ void zmq::select_t::loop () ...@@ -293,8 +347,8 @@ void zmq::select_t::loop ()
} }
rc = WSAWaitForMultipleEvents (4, wsa_events.events, FALSE, rc = WSAWaitForMultipleEvents (4, wsa_events.events, FALSE,
timeout ? timeout : INFINITE, FALSE); timeout ? timeout : INFINITE, FALSE);
wsa_assert (rc != (int)WSA_WAIT_FAILED); wsa_assert (rc != (int) WSA_WAIT_FAILED);
zmq_assert (rc != WSA_WAIT_IO_COMPLETION); zmq_assert (rc != WSA_WAIT_IO_COMPLETION);
if (rc == WSA_WAIT_TIMEOUT) if (rc == WSA_WAIT_TIMEOUT)
...@@ -302,111 +356,63 @@ void zmq::select_t::loop () ...@@ -302,111 +356,63 @@ void zmq::select_t::loop ()
} }
for (current_family_entry_it = family_entries.begin (); for (current_family_entry_it = family_entries.begin ();
current_family_entry_it != family_entries.end (); ++current_family_entry_it) { current_family_entry_it != family_entries.end ();
family_entry_t& family_entry = current_family_entry_it->second; ++current_family_entry_it) {
family_entry_t &family_entry = current_family_entry_it->second;
// select will fail when run with empty sets.
if (family_entry.fd_entries.empty ())
continue;
fds_set_t local_fds_set = family_entry.fds_set; if (use_wsa_events) {
if (family_entries.size () > 1) {
// There is no reason to wait again after WSAWaitForMultipleEvents. // There is no reason to wait again after WSAWaitForMultipleEvents.
// Simply collect what is ready. // Simply collect what is ready.
struct timeval tv_nodelay = { 0, 0 }; struct timeval tv_nodelay = {0, 0};
rc = select (0, &local_fds_set.read, &local_fds_set.write, &local_fds_set.error, select_family_entry (family_entry, 0, true, tv_nodelay);
&tv_nodelay); } else {
} select_family_entry (family_entry, 0, timeout > 0, tv);
else
rc = select (0, &local_fds_set.read, &local_fds_set.write,
&local_fds_set.error, timeout > 0 ? &tv : NULL);
wsa_assert (rc != SOCKET_ERROR);
// Size is cached to avoid iteration through recently added descriptors.
for (fd_entries_t::size_type i = 0, size = family_entry.fd_entries.size (); i < size && rc > 0; ++i) {
if (family_entry.fd_entries[i].fd == retired_fd)
continue;
if (FD_ISSET(family_entry.fd_entries[i].fd, &local_fds_set.read)) {
family_entry.fd_entries[i].events->in_event();
--rc;
}
if (family_entry.fd_entries[i].fd == retired_fd || rc == 0)
continue;
if (FD_ISSET(family_entry.fd_entries[i].fd, &local_fds_set.write)) {
family_entry.fd_entries[i].events->out_event();
--rc;
}
if (family_entry.fd_entries[i].fd == retired_fd || rc == 0)
continue;
if (FD_ISSET(family_entry.fd_entries[i].fd, &local_fds_set.error)) {
family_entry.fd_entries[i].events->in_event();
--rc;
}
}
if (family_entry.retired) {
family_entry.retired = false;
family_entry.fd_entries.erase (std::remove_if (family_entry.fd_entries.begin (),
family_entry.fd_entries.end (), is_retired_fd), family_entry.fd_entries.end ());
} }
} }
#else #else
fds_set_t local_fds_set = fds_set; select_family_entry (family_entry, maxfd, timeout > 0, tv);
rc = select (maxfd + 1, &local_fds_set.read, &local_fds_set.write, #endif
&local_fds_set.error, timeout ? &tv : NULL); }
}
if (rc == -1) {
errno_assert (errno == EINTR);
continue;
}
// Size is cached to avoid iteration through just added descriptors.
for (fd_entries_t::size_type i = 0, size = fd_entries.size (); i < size && rc > 0; ++i) {
if (fd_entries [i].fd == retired_fd)
continue;
if (FD_ISSET (fd_entries [i].fd, &local_fds_set.read)) {
fd_entries [i].events->in_event ();
--rc;
}
if (fd_entries [i].fd == retired_fd || rc == 0) void zmq::select_t::select_family_entry (family_entry_t &family_entry_,
continue; const int max_fd_,
const bool use_timeout_,
struct timeval &tv_)
{
// select will fail when run with empty sets.
fd_entries_t &fd_entries = family_entry_.fd_entries;
if (fd_entries.empty ())
return;
if (FD_ISSET (fd_entries [i].fd, &local_fds_set.write)) { fds_set_t local_fds_set = family_entry_.fds_set;
fd_entries [i].events->out_event (); int rc = select (max_fd_, &local_fds_set.read, &local_fds_set.write,
--rc; &local_fds_set.error, use_timeout_ ? &tv_ : NULL);
}
if (fd_entries [i].fd == retired_fd || rc == 0) #if defined ZMQ_HAVE_WINDOWS
continue; wsa_assert (rc != SOCKET_ERROR);
#else
if (rc == -1) {
errno_assert (errno == EINTR);
return;
}
#endif
if (FD_ISSET (fd_entries [i].fd, &local_fds_set.error)) { trigger_events (fd_entries, local_fds_set, rc);
fd_entries [i].events->in_event ();
--rc;
}
}
if (retired) { if (family_entry_.retired) {
retired = false; family_entry_.retired = false;
fd_entries.erase (std::remove_if (fd_entries.begin (), fd_entries.end (), family_entry_.fd_entries.erase (std::remove_if (fd_entries.begin (),
is_retired_fd), fd_entries.end ()); fd_entries.end (),
} is_retired_fd),
#endif family_entry_.fd_entries.end ());
} }
} }
void zmq::select_t::worker_routine (void *arg_) void zmq::select_t::worker_routine (void *arg_)
{ {
((select_t*) arg_)->loop (); ((select_t *) arg_)->loop ();
} }
zmq::select_t::fds_set_t::fds_set_t () zmq::select_t::fds_set_t::fds_set_t ()
...@@ -416,16 +422,22 @@ zmq::select_t::fds_set_t::fds_set_t () ...@@ -416,16 +422,22 @@ zmq::select_t::fds_set_t::fds_set_t ()
FD_ZERO (&error); FD_ZERO (&error);
} }
zmq::select_t::fds_set_t::fds_set_t (const fds_set_t& other_) zmq::select_t::fds_set_t::fds_set_t (const fds_set_t &other_)
{ {
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
// On Windows we don't need to copy the whole fd_set. // On Windows we don't need to copy the whole fd_set.
// SOCKETS are continuous from the beginning of fd_array in fd_set. // SOCKETS are continuous from the beginning of fd_array in fd_set.
// We just need to copy fd_count elements of fd_array. // We just need to copy fd_count elements of fd_array.
// We gain huge memcpy() improvement if number of used SOCKETs is much lower than FD_SETSIZE. // We gain huge memcpy() improvement if number of used SOCKETs is much lower than FD_SETSIZE.
memcpy (&read, &other_.read, (char *) (other_.read.fd_array + other_.read.fd_count ) - (char *) &other_.read ); memcpy (&read, &other_.read,
memcpy (&write, &other_.write, (char *) (other_.write.fd_array + other_.write.fd_count) - (char *) &other_.write); (char *) (other_.read.fd_array + other_.read.fd_count)
memcpy (&error, &other_.error, (char *) (other_.error.fd_array + other_.error.fd_count) - (char *) &other_.error); - (char *) &other_.read);
memcpy (&write, &other_.write,
(char *) (other_.write.fd_array + other_.write.fd_count)
- (char *) &other_.write);
memcpy (&error, &other_.error,
(char *) (other_.error.fd_array + other_.error.fd_count)
- (char *) &other_.error);
#else #else
memcpy (&read, &other_.read, sizeof other_.read); memcpy (&read, &other_.read, sizeof other_.read);
memcpy (&write, &other_.write, sizeof other_.write); memcpy (&write, &other_.write, sizeof other_.write);
...@@ -433,16 +445,23 @@ zmq::select_t::fds_set_t::fds_set_t (const fds_set_t& other_) ...@@ -433,16 +445,23 @@ zmq::select_t::fds_set_t::fds_set_t (const fds_set_t& other_)
#endif #endif
} }
zmq::select_t::fds_set_t& zmq::select_t::fds_set_t::operator= (const fds_set_t& other_) zmq::select_t::fds_set_t &zmq::select_t::fds_set_t::
operator= (const fds_set_t &other_)
{ {
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
// On Windows we don't need to copy the whole fd_set. // On Windows we don't need to copy the whole fd_set.
// SOCKETS are continuous from the beginning of fd_array in fd_set. // SOCKETS are continuous from the beginning of fd_array in fd_set.
// We just need to copy fd_count elements of fd_array. // We just need to copy fd_count elements of fd_array.
// We gain huge memcpy() improvement if number of used SOCKETs is much lower than FD_SETSIZE. // We gain huge memcpy() improvement if number of used SOCKETs is much lower than FD_SETSIZE.
memcpy (&read, &other_.read, (char *) (other_.read.fd_array + other_.read.fd_count ) - (char *) &other_.read ); memcpy (&read, &other_.read,
memcpy (&write, &other_.write, (char *) (other_.write.fd_array + other_.write.fd_count) - (char *) &other_.write); (char *) (other_.read.fd_array + other_.read.fd_count)
memcpy (&error, &other_.error, (char *) (other_.error.fd_array + other_.error.fd_count) - (char *) &other_.error); - (char *) &other_.read);
memcpy (&write, &other_.write,
(char *) (other_.write.fd_array + other_.write.fd_count)
- (char *) &other_.write);
memcpy (&error, &other_.error,
(char *) (other_.error.fd_array + other_.error.fd_count)
- (char *) &other_.error);
#else #else
memcpy (&read, &other_.read, sizeof other_.read); memcpy (&read, &other_.read, sizeof other_.read);
memcpy (&write, &other_.write, sizeof other_.write); memcpy (&write, &other_.write, sizeof other_.write);
...@@ -451,7 +470,7 @@ zmq::select_t::fds_set_t& zmq::select_t::fds_set_t::operator= (const fds_set_t& ...@@ -451,7 +470,7 @@ zmq::select_t::fds_set_t& zmq::select_t::fds_set_t::operator= (const fds_set_t&
return *this; return *this;
} }
void zmq::select_t::fds_set_t::remove_fd (const fd_t& fd_) void zmq::select_t::fds_set_t::remove_fd (const fd_t &fd_)
{ {
FD_CLR (fd_, &read); FD_CLR (fd_, &read);
FD_CLR (fd_, &write); FD_CLR (fd_, &write);
...@@ -466,20 +485,48 @@ bool zmq::select_t::is_retired_fd (const fd_entry_t &entry) ...@@ -466,20 +485,48 @@ bool zmq::select_t::is_retired_fd (const fd_entry_t &entry)
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
u_short zmq::select_t::get_fd_family (fd_t fd_) u_short zmq::select_t::get_fd_family (fd_t fd_)
{ {
// Use sockaddr_storage instead of sockaddr to accomodate differect structure sizes // cache the results of determine_fd_family, as this is frequently called
sockaddr_storage addr = { 0 }; // for the same sockets, and determine_fd_family is expensive
size_t i;
for (i = 0; i < fd_family_cache_size; ++i) {
const std::pair<fd_t, u_short> &entry = fd_family_cache [i];
if (entry.first == fd_) {
return entry.second;
}
if (entry.first == retired_fd)
break;
}
std::pair<fd_t, u_short> res =
std::make_pair (fd_, determine_fd_family (fd_));
if (i < fd_family_cache_size) {
fd_family_cache [i] = res;
} else {
// just overwrite a random entry
// could be optimized by some LRU strategy
fd_family_cache [rand () % fd_family_cache_size] = res;
}
return res.second;
}
u_short zmq::select_t::determine_fd_family (fd_t fd_)
{
// Use sockaddr_storage instead of sockaddr to accommodate different structure sizes
sockaddr_storage addr = {0};
int addr_size = sizeof addr; int addr_size = sizeof addr;
int type; int type;
int type_length = sizeof(int); int type_length = sizeof (int);
int rc = getsockopt(fd_, SOL_SOCKET, SO_TYPE, (char*) &type, &type_length); int rc =
getsockopt (fd_, SOL_SOCKET, SO_TYPE, (char *) &type, &type_length);
if (rc == 0) { if (rc == 0) {
if (type == SOCK_DGRAM) if (type == SOCK_DGRAM)
return AF_INET; return AF_INET;
else { else {
rc = getsockname(fd_, (sockaddr *)&addr, &addr_size); rc = getsockname (fd_, (sockaddr *) &addr, &addr_size);
// AF_INET and AF_INET6 can be mixed in select // AF_INET and AF_INET6 can be mixed in select
// TODO: If proven otherwise, should simply return addr.sa_family // TODO: If proven otherwise, should simply return addr.sa_family
...@@ -491,8 +538,7 @@ u_short zmq::select_t::get_fd_family (fd_t fd_) ...@@ -491,8 +538,7 @@ u_short zmq::select_t::get_fd_family (fd_t fd_)
return AF_UNSPEC; return AF_UNSPEC;
} }
zmq::select_t::family_entry_t::family_entry_t () : zmq::select_t::family_entry_t::family_entry_t () : retired (false)
retired (false)
{ {
} }
......
...@@ -53,122 +53,130 @@ ...@@ -53,122 +53,130 @@
namespace zmq namespace zmq
{ {
struct i_poll_events;
struct i_poll_events; // Implements socket polling mechanism using POSIX.1-2001 select()
// function.
// Implements socket polling mechanism using POSIX.1-2001 select() class select_t : public poller_base_t
// function. {
public:
typedef fd_t handle_t;
class select_t : public poller_base_t select_t (const ctx_t &ctx_);
{ ~select_t ();
public:
typedef fd_t handle_t; // "poller" concept.
handle_t add_fd (fd_t fd_, zmq::i_poll_events *events_);
void rm_fd (handle_t handle_);
void set_pollin (handle_t handle_);
void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_);
void start ();
void stop ();
select_t (const ctx_t &ctx_); static int max_fds ();
~select_t ();
// "poller" concept. private:
handle_t add_fd (fd_t fd_, zmq::i_poll_events *events_); // Main worker thread routine.
void rm_fd (handle_t handle_); static void worker_routine (void *arg_);
void set_pollin (handle_t handle_);
void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_);
void start ();
void stop ();
static int max_fds (); // Main event loop.
void loop ();
private: // Reference to ZMQ context.
const ctx_t &ctx;
// Main worker thread routine. // Internal state.
static void worker_routine (void *arg_); struct fds_set_t
{
fds_set_t ();
fds_set_t (const fds_set_t &other_);
fds_set_t &operator= (const fds_set_t &other_);
// Convenience method to descriptor from all sets.
void remove_fd (const fd_t &fd_);
fd_set read;
fd_set write;
fd_set error;
};
// Main event loop. struct fd_entry_t
void loop (); {
fd_t fd;
zmq::i_poll_events *events;
};
typedef std::vector<fd_entry_t> fd_entries_t;
// Reference to ZMQ context. void trigger_events (const fd_entries_t &fd_entries_,
const ctx_t &ctx; const fds_set_t &local_fds_set_,
int event_count_);
// Internal state. struct family_entry_t
struct fds_set_t {
{ family_entry_t ();
fds_set_t ();
fds_set_t (const fds_set_t& other_);
fds_set_t& operator=(const fds_set_t& other_);
// Convinient method to descriptor from all sets.
void remove_fd (const fd_t& fd_);
fd_set read; fd_entries_t fd_entries;
fd_set write; fds_set_t fds_set;
fd_set error; bool retired;
}; };
struct fd_entry_t void select_family_entry (family_entry_t &family_entry_,
{ int max_fd_,
fd_t fd; bool use_timeout_,
zmq::i_poll_events* events; struct timeval &tv_);
};
typedef std::vector<fd_entry_t> fd_entries_t;
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
struct family_entry_t typedef std::map<u_short, family_entry_t> family_entries_t;
{
family_entry_t ();
fd_entries_t fd_entries;
fds_set_t fds_set;
bool retired;
};
typedef std::map<u_short, family_entry_t> family_entries_t;
struct wsa_events_t
{
wsa_events_t ();
~wsa_events_t ();
// read, write, error and readwrite
WSAEVENT events [4];
};
#endif
#if defined ZMQ_HAVE_WINDOWS struct wsa_events_t
family_entries_t family_entries; {
// See loop for details. wsa_events_t ();
family_entries_t::iterator current_family_entry_it; ~wsa_events_t ();
bool try_remove_fd_entry (family_entries_t::iterator family_entry_it, // read, write, error and readwrite
zmq::fd_t &handle_); WSAEVENT events [4];
#else };
fd_entries_t fd_entries;
fds_set_t fds_set;
fd_t maxfd;
bool retired;
#endif
#if defined ZMQ_HAVE_WINDOWS family_entries_t family_entries;
// Socket's family or AF_UNSPEC on error. // See loop for details.
static u_short get_fd_family (fd_t fd_); family_entries_t::iterator current_family_entry_it;
bool try_remove_fd_entry (family_entries_t::iterator family_entry_it,
zmq::fd_t &handle_);
static const size_t fd_family_cache_size = 8;
std::pair<fd_t, u_short> fd_family_cache [fd_family_cache_size];
u_short get_fd_family (fd_t fd_);
// Socket's family or AF_UNSPEC on error.
static u_short determine_fd_family (fd_t fd_);
#else
// on non-Windows, we can treat all fds as one family
family_entry_t family_entry;
fd_t maxfd;
bool retired;
#endif #endif
// Checks if an fd_entry_t is retired.
static bool is_retired_fd (const fd_entry_t &entry);
static fd_entries_t::iterator // Checks if an fd_entry_t is retired.
find_fd_entry_by_handle (fd_entries_t &fd_entries, handle_t handle_); static bool is_retired_fd (const fd_entry_t &entry);
// If true, thread is shutting down. static fd_entries_t::iterator
bool stopping; find_fd_entry_by_handle (fd_entries_t &fd_entries, handle_t handle_);
// Handle of the physical thread doing the I/O work. // If true, thread is shutting down.
thread_t worker; bool stopping;
select_t (const select_t&); // Handle of the physical thread doing the I/O work.
const select_t &operator = (const select_t&); thread_t worker;
};
typedef select_t poller_t; select_t (const select_t &);
const select_t &operator= (const select_t &);
};
typedef select_t poller_t;
} }
#endif #endif
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment