select.cpp 20.3 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "select.hpp"
32
#if defined ZMQ_IOTHREAD_POLLER_USE_SELECT
33

34
#if defined ZMQ_HAVE_WINDOWS
Martin Sustrik's avatar
Martin Sustrik committed
35
#elif defined ZMQ_HAVE_HPUX
Martin Sustrik's avatar
Martin Sustrik committed
36 37 38
#include <sys/param.h>
#include <sys/types.h>
#include <sys/time.h>
Martin Sustrik's avatar
Martin Sustrik committed
39
#elif defined ZMQ_HAVE_OPENVMS
Martin Sustrik's avatar
Martin Sustrik committed
40 41
#include <sys/types.h>
#include <sys/time.h>
42 43 44 45
#elif defined ZMQ_HAVE_VXWORKS
#include <sys/types.h>
#include <sys/time.h>
#include <strings.h>
Martin Sustrik's avatar
Martin Sustrik committed
46 47 48 49 50 51 52 53
#else
#include <sys/select.h>
#endif

#include "err.hpp"
#include "config.hpp"
#include "i_poll_events.hpp"

54
#include <algorithm>
55 56
#include <limits>
#include <climits>
57

58
zmq::select_t::select_t (const zmq::thread_ctx_t &ctx_) :
59
    worker_poller_base_t (ctx_),
60 61
#if defined ZMQ_HAVE_WINDOWS
    //  Fine as long as map is not cleared.
62
    _current_family_entry_it (_family_entries.end ())
63
#else
64
    _max_fd (retired_fd)
65
#endif
Martin Sustrik's avatar
Martin Sustrik committed
66
{
67 68
#if defined ZMQ_HAVE_WINDOWS
    for (size_t i = 0; i < fd_family_cache_size; ++i)
69
        _fd_family_cache[i] = std::make_pair (retired_fd, 0);
70
#endif
Martin Sustrik's avatar
Martin Sustrik committed
71 72
}

73 74
zmq::select_t::~select_t ()
{
75
    stop_worker ();
76 77
}

78
zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
Martin Sustrik's avatar
Martin Sustrik committed
79
{
80
    check_thread ();
81 82
    zmq_assert (fd_ != retired_fd);

83 84 85
    fd_entry_t fd_entry;
    fd_entry.fd = fd_;
    fd_entry.events = events_;
86

87 88 89
#if defined ZMQ_HAVE_WINDOWS
    u_short family = get_fd_family (fd_);
    wsa_assert (family != AF_UNSPEC);
90 91 92
    family_entry_t &family_entry = _family_entries[family];
#else
    family_entry_t &family_entry = _family_entry;
sigiesec's avatar
sigiesec committed
93
#endif
94 95
    family_entry.fd_entries.push_back (fd_entry);
    FD_SET (fd_, &family_entry.fds_set.error);
Martin Sustrik's avatar
Martin Sustrik committed
96

sigiesec's avatar
sigiesec committed
97
#if !defined ZMQ_HAVE_WINDOWS
98 99
    if (fd_ > _max_fd)
        _max_fd = fd_;
100
#endif
Martin Sustrik's avatar
Martin Sustrik committed
101

102
    adjust_load (1);
Martin Sustrik's avatar
Martin Sustrik committed
103

104
    return fd_;
Martin Sustrik's avatar
Martin Sustrik committed
105 106
}

107
zmq::select_t::fd_entries_t::iterator
108
zmq::select_t::find_fd_entry_by_handle (fd_entries_t &fd_entries_,
109 110 111
                                        handle_t handle_)
{
    fd_entries_t::iterator fd_entry_it;
112
    for (fd_entry_it = fd_entries_.begin (); fd_entry_it != fd_entries_.end ();
113 114 115 116 117 118 119
         ++fd_entry_it)
        if (fd_entry_it->fd == handle_)
            break;

    return fd_entry_it;
}

sigiesec's avatar
sigiesec committed
120 121 122 123 124 125 126
void zmq::select_t::trigger_events (const fd_entries_t &fd_entries_,
                                    const fds_set_t &local_fds_set_,
                                    int event_count_)
{
    //  Size is cached to avoid iteration through recently added descriptors.
    for (fd_entries_t::size_type i = 0, size = fd_entries_.size ();
         i < size && event_count_ > 0; ++i) {
127
        //  fd_entries_[i] may not be stored, since calls to
128
        //  in_event/out_event may reallocate the vector
sigiesec's avatar
sigiesec committed
129

130
        if (is_retired_fd (fd_entries_[i]))
sigiesec's avatar
sigiesec committed
131 132
            continue;

133 134
        if (FD_ISSET (fd_entries_[i].fd, &local_fds_set_.read)) {
            fd_entries_[i].events->in_event ();
sigiesec's avatar
sigiesec committed
135 136 137 138 139 140 141
            --event_count_;
        }

        //  TODO: can the is_retired_fd be true at this point? if it
        //  was retired before, we would already have continued, and I
        //  don't see where it might have been modified
        //  And if rc == 0, we can break instead of continuing
142
        if (is_retired_fd (fd_entries_[i]) || event_count_ == 0)
sigiesec's avatar
sigiesec committed
143 144
            continue;

145 146
        if (FD_ISSET (fd_entries_[i].fd, &local_fds_set_.write)) {
            fd_entries_[i].events->out_event ();
sigiesec's avatar
sigiesec committed
147 148 149 150
            --event_count_;
        }

        //  TODO: same as above
151
        if (is_retired_fd (fd_entries_[i]) || event_count_ == 0)
sigiesec's avatar
sigiesec committed
152 153
            continue;

154 155
        if (FD_ISSET (fd_entries_[i].fd, &local_fds_set_.error)) {
            fd_entries_[i].events->in_event ();
sigiesec's avatar
sigiesec committed
156 157 158 159 160
            --event_count_;
        }
    }
}

sigiesec's avatar
sigiesec committed
161
#if defined ZMQ_HAVE_WINDOWS
162
int zmq::select_t::try_retire_fd_entry (
163
  family_entries_t::iterator family_entry_it_, zmq::fd_t &handle_)
Martin Sustrik's avatar
Martin Sustrik committed
164
{
165
    family_entry_t &family_entry = family_entry_it_->second;
166

167 168
    fd_entries_t::iterator fd_entry_it =
      find_fd_entry_by_handle (family_entry.fd_entries, handle_);
169

170
    if (fd_entry_it == family_entry.fd_entries.end ())
171
        return 0;
172 173 174 175

    fd_entry_t &fd_entry = *fd_entry_it;
    zmq_assert (fd_entry.fd != retired_fd);

176
    if (family_entry_it_ != _current_family_entry_it) {
177
        //  Family is not currently being iterated and can be safely
178 179
        //  modified in-place. So later it can be skipped without
        //  re-verifying its content.
180 181 182 183
        family_entry.fd_entries.erase (fd_entry_it);
    } else {
        //  Otherwise mark removed entries as retired. It will be cleaned up
        //  at the end of the iteration. See zmq::select_t::loop
184
        fd_entry.fd = retired_fd;
185
        family_entry.has_retired = true;
186
    }
187
    family_entry.fds_set.remove_fd (handle_);
188
    return 1;
189
}
sigiesec's avatar
sigiesec committed
190
#endif
191 192 193

void zmq::select_t::rm_fd (handle_t handle_)
{
194
    check_thread ();
195
    int retired = 0;
196 197 198 199
#if defined ZMQ_HAVE_WINDOWS
    u_short family = get_fd_family (handle_);
    if (family != AF_UNSPEC) {
        family_entries_t::iterator family_entry_it =
200
          _family_entries.find (family);
201

202
        retired += try_retire_fd_entry (family_entry_it, handle_);
203 204 205 206
    } else {
        //  get_fd_family may fail and return AF_UNSPEC if the socket was not
        //  successfully connected. In that case, we need to look for the
        //  socket in all family_entries.
207
        family_entries_t::iterator end = _family_entries.end ();
208
        for (family_entries_t::iterator family_entry_it =
209
               _family_entries.begin ();
210
             family_entry_it != end; ++family_entry_it) {
211
            if (retired += try_retire_fd_entry (family_entry_it, handle_)) {
212
                break;
213
            }
214 215
        }
    }
216
#else
217
    fd_entries_t::iterator fd_entry_it =
218 219
      find_fd_entry_by_handle (_family_entry.fd_entries, handle_);
    assert (fd_entry_it != _family_entry.fd_entries.end ());
Martin Sustrik's avatar
Martin Sustrik committed
220

221
    zmq_assert (fd_entry_it->fd != retired_fd);
222
    fd_entry_it->fd = retired_fd;
223
    _family_entry.fds_set.remove_fd (handle_);
Martin Sustrik's avatar
Martin Sustrik committed
224

225 226
    ++retired;

227 228 229 230 231 232
    if (handle_ == _max_fd) {
        _max_fd = retired_fd;
        for (fd_entry_it = _family_entry.fd_entries.begin ();
             fd_entry_it != _family_entry.fd_entries.end (); ++fd_entry_it)
            if (fd_entry_it->fd > _max_fd)
                _max_fd = fd_entry_it->fd;
Martin Sustrik's avatar
Martin Sustrik committed
233 234
    }

235
    _family_entry.has_retired = true;
236
#endif
237
    zmq_assert (retired == 1);
238
    adjust_load (-1);
Martin Sustrik's avatar
Martin Sustrik committed
239 240
}

Martin Sustrik's avatar
Martin Sustrik committed
241
void zmq::select_t::set_pollin (handle_t handle_)
Martin Sustrik's avatar
Martin Sustrik committed
242
{
243
    check_thread ();
244 245 246
#if defined ZMQ_HAVE_WINDOWS
    u_short family = get_fd_family (handle_);
    wsa_assert (family != AF_UNSPEC);
247 248 249
    family_entry_t &family_entry = _family_entries[family];
#else
    family_entry_t &family_entry = _family_entry;
250
#endif
sigiesec's avatar
sigiesec committed
251
    FD_SET (handle_, &family_entry.fds_set.read);
Martin Sustrik's avatar
Martin Sustrik committed
252 253
}

Martin Sustrik's avatar
Martin Sustrik committed
254
void zmq::select_t::reset_pollin (handle_t handle_)
Martin Sustrik's avatar
Martin Sustrik committed
255
{
256
    check_thread ();
257 258 259
#if defined ZMQ_HAVE_WINDOWS
    u_short family = get_fd_family (handle_);
    wsa_assert (family != AF_UNSPEC);
260 261 262
    family_entry_t &family_entry = _family_entries[family];
#else
    family_entry_t &family_entry = _family_entry;
263
#endif
sigiesec's avatar
sigiesec committed
264
    FD_CLR (handle_, &family_entry.fds_set.read);
Martin Sustrik's avatar
Martin Sustrik committed
265 266
}

Martin Sustrik's avatar
Martin Sustrik committed
267
void zmq::select_t::set_pollout (handle_t handle_)
Martin Sustrik's avatar
Martin Sustrik committed
268
{
269
    check_thread ();
270 271 272
#if defined ZMQ_HAVE_WINDOWS
    u_short family = get_fd_family (handle_);
    wsa_assert (family != AF_UNSPEC);
273 274 275
    family_entry_t &family_entry = _family_entries[family];
#else
    family_entry_t &family_entry = _family_entry;
276
#endif
sigiesec's avatar
sigiesec committed
277
    FD_SET (handle_, &family_entry.fds_set.write);
Martin Sustrik's avatar
Martin Sustrik committed
278 279
}

Martin Sustrik's avatar
Martin Sustrik committed
280
void zmq::select_t::reset_pollout (handle_t handle_)
Martin Sustrik's avatar
Martin Sustrik committed
281
{
282
    check_thread ();
283 284 285
#if defined ZMQ_HAVE_WINDOWS
    u_short family = get_fd_family (handle_);
    wsa_assert (family != AF_UNSPEC);
286 287 288
    family_entry_t &family_entry = _family_entries[family];
#else
    family_entry_t &family_entry = _family_entry;
289
#endif
sigiesec's avatar
sigiesec committed
290
    FD_CLR (handle_, &family_entry.fds_set.write);
Martin Sustrik's avatar
Martin Sustrik committed
291 292
}

Martin Sustrik's avatar
Martin Sustrik committed
293
void zmq::select_t::stop ()
Martin Sustrik's avatar
Martin Sustrik committed
294
{
295 296
    check_thread ();
    //  no-op... thread is stopped when no more fds or timers are registered
Martin Sustrik's avatar
Martin Sustrik committed
297 298
}

Richard Newton's avatar
Richard Newton committed
299
int zmq::select_t::max_fds ()
300 301 302 303
{
    return FD_SETSIZE;
}

Martin Sustrik's avatar
Martin Sustrik committed
304
void zmq::select_t::loop ()
Martin Sustrik's avatar
Martin Sustrik committed
305
{
306
    while (true) {
307
        //  Execute any due timers.
308
        int timeout = static_cast<int> (execute_timers ());
309 310 311 312

        cleanup_retired ();

#ifdef _WIN32
313
        if (_family_entries.empty ()) {
314
#else
315
        if (_family_entry.fd_entries.empty ()) {
316 317 318 319 320 321 322 323 324
#endif
            zmq_assert (get_load () == 0);

            if (timeout == 0)
                break;

            // TODO sleep for timeout
            continue;
        }
325

326
#if defined ZMQ_HAVE_OSX
sigiesec's avatar
sigiesec committed
327
        struct timeval tv = {(long) (timeout / 1000), timeout % 1000 * 1000};
328
#else
329 330
        struct timeval tv = {static_cast<long> (timeout / 1000),
                             static_cast<long> (timeout % 1000 * 1000)};
331
#endif
332 333 334 335 336 337 338 339 340 341 342

#if defined ZMQ_HAVE_WINDOWS
        /*
            On Windows select does not allow to mix descriptors from different
            service providers. It seems to work for AF_INET and AF_INET6,
            but fails for AF_INET and VMCI. The workaround is to use
            WSAEventSelect and WSAWaitForMultipleEvents to wait, then use
            select to find out what actually changed. WSAWaitForMultipleEvents
            cannot be used alone, because it does not support more than 64 events
            which is not enough.

sigiesec's avatar
sigiesec committed
343
            To reduce unnecessary overhead, WSA is only used when there are more
344 345 346 347 348 349
            than one family. Moreover, AF_INET and AF_INET6 are considered the same
            family because Windows seems to handle them properly.
            See get_fd_family for details.
        */

        //  If there is just one family, there is no reason to use WSA events.
350
        int rc = 0;
351
        const bool use_wsa_events = _family_entries.size () > 1;
sigiesec's avatar
sigiesec committed
352 353 354 355 356 357 358 359
        if (use_wsa_events) {
            // TODO: I don't really understand why we are doing this. If any of
            // the events was signaled, we will call select for each fd_family
            // afterwards. The only benefit is if none of the events was
            // signaled, then we continue early.
            // IMHO, either WSAEventSelect/WSAWaitForMultipleEvents or select
            // should be used, but not both

360 361
            wsa_events_t wsa_events;

sigiesec's avatar
sigiesec committed
362
            for (family_entries_t::iterator family_entry_it =
363 364
                   _family_entries.begin ();
                 family_entry_it != _family_entries.end (); ++family_entry_it) {
sigiesec's avatar
sigiesec committed
365
                family_entry_t &family_entry = family_entry_it->second;
366

sigiesec's avatar
sigiesec committed
367 368 369 370
                for (fd_entries_t::iterator fd_entry_it =
                       family_entry.fd_entries.begin ();
                     fd_entry_it != family_entry.fd_entries.end ();
                     ++fd_entry_it) {
371 372 373
                    fd_t fd = fd_entry_it->fd;

                    //  http://stackoverflow.com/q/35043420/188530
sigiesec's avatar
sigiesec committed
374 375 376
                    if (FD_ISSET (fd, &family_entry.fds_set.read)
                        && FD_ISSET (fd, &family_entry.fds_set.write))
                        rc =
377
                          WSAEventSelect (fd, wsa_events.events[3],
sigiesec's avatar
sigiesec committed
378 379
                                          FD_READ | FD_ACCEPT | FD_CLOSE
                                            | FD_WRITE | FD_CONNECT | FD_OOB);
380
                    else if (FD_ISSET (fd, &family_entry.fds_set.read))
381
                        rc = WSAEventSelect (fd, wsa_events.events[0],
sigiesec's avatar
sigiesec committed
382 383
                                             FD_READ | FD_ACCEPT | FD_CLOSE
                                               | FD_OOB);
384
                    else if (FD_ISSET (fd, &family_entry.fds_set.write))
385
                        rc = WSAEventSelect (fd, wsa_events.events[1],
sigiesec's avatar
sigiesec committed
386
                                             FD_WRITE | FD_CONNECT | FD_OOB);
387
                    else if (FD_ISSET (fd, &family_entry.fds_set.error))
388
                        rc = WSAEventSelect (fd, wsa_events.events[2], FD_OOB);
389 390 391 392 393 394 395 396
                    else
                        rc = 0;

                    wsa_assert (rc != SOCKET_ERROR);
                }
            }

            rc = WSAWaitForMultipleEvents (4, wsa_events.events, FALSE,
sigiesec's avatar
sigiesec committed
397 398
                                           timeout ? timeout : INFINITE, FALSE);
            wsa_assert (rc != (int) WSA_WAIT_FAILED);
399 400 401 402 403 404
            zmq_assert (rc != WSA_WAIT_IO_COMPLETION);

            if (rc == WSA_WAIT_TIMEOUT)
                continue;
        }

405 406 407 408
        for (_current_family_entry_it = _family_entries.begin ();
             _current_family_entry_it != _family_entries.end ();
             ++_current_family_entry_it) {
            family_entry_t &family_entry = _current_family_entry_it->second;
409 410


sigiesec's avatar
sigiesec committed
411
            if (use_wsa_events) {
412 413
                //  There is no reason to wait again after WSAWaitForMultipleEvents.
                //  Simply collect what is ready.
sigiesec's avatar
sigiesec committed
414
                struct timeval tv_nodelay = {0, 0};
sigiesec's avatar
sigiesec committed
415 416 417
                select_family_entry (family_entry, 0, true, tv_nodelay);
            } else {
                select_family_entry (family_entry, 0, timeout > 0, tv);
418 419
            }
        }
Martin Sustrik's avatar
Martin Sustrik committed
420
#else
421
        select_family_entry (_family_entry, _max_fd + 1, timeout > 0, tv);
sigiesec's avatar
sigiesec committed
422 423 424
#endif
    }
}
425

sigiesec's avatar
sigiesec committed
426 427 428 429 430 431 432 433 434
void zmq::select_t::select_family_entry (family_entry_t &family_entry_,
                                         const int max_fd_,
                                         const bool use_timeout_,
                                         struct timeval &tv_)
{
    //  select will fail when run with empty sets.
    fd_entries_t &fd_entries = family_entry_.fd_entries;
    if (fd_entries.empty ())
        return;
Martin Sustrik's avatar
Martin Sustrik committed
435

sigiesec's avatar
sigiesec committed
436 437 438
    fds_set_t local_fds_set = family_entry_.fds_set;
    int rc = select (max_fd_, &local_fds_set.read, &local_fds_set.write,
                     &local_fds_set.error, use_timeout_ ? &tv_ : NULL);
Martin Sustrik's avatar
Martin Sustrik committed
439

sigiesec's avatar
sigiesec committed
440 441 442 443 444 445 446
#if defined ZMQ_HAVE_WINDOWS
    wsa_assert (rc != SOCKET_ERROR);
#else
    if (rc == -1) {
        errno_assert (errno == EINTR);
        return;
    }
447
#endif
sigiesec's avatar
sigiesec committed
448 449 450

    trigger_events (fd_entries, local_fds_set, rc);

451
    cleanup_retired (family_entry_);
Martin Sustrik's avatar
Martin Sustrik committed
452
}
453

454 455 456 457 458 459 460
zmq::select_t::fds_set_t::fds_set_t ()
{
    FD_ZERO (&read);
    FD_ZERO (&write);
    FD_ZERO (&error);
}

sigiesec's avatar
sigiesec committed
461
zmq::select_t::fds_set_t::fds_set_t (const fds_set_t &other_)
462
{
463 464 465 466 467
#if defined ZMQ_HAVE_WINDOWS
    // On Windows we don't need to copy the whole fd_set.
    // SOCKETS are continuous from the beginning of fd_array in fd_set.
    // We just need to copy fd_count elements of fd_array.
    // We gain huge memcpy() improvement if number of used SOCKETs is much lower than FD_SETSIZE.
sigiesec's avatar
sigiesec committed
468 469 470 471 472 473 474 475 476
    memcpy (&read, &other_.read,
            (char *) (other_.read.fd_array + other_.read.fd_count)
              - (char *) &other_.read);
    memcpy (&write, &other_.write,
            (char *) (other_.write.fd_array + other_.write.fd_count)
              - (char *) &other_.write);
    memcpy (&error, &other_.error,
            (char *) (other_.error.fd_array + other_.error.fd_count)
              - (char *) &other_.error);
477
#else
478 479 480
    memcpy (&read, &other_.read, sizeof other_.read);
    memcpy (&write, &other_.write, sizeof other_.write);
    memcpy (&error, &other_.error, sizeof other_.error);
481
#endif
482 483
}

sigiesec's avatar
sigiesec committed
484 485
zmq::select_t::fds_set_t &zmq::select_t::fds_set_t::
operator= (const fds_set_t &other_)
486
{
487 488 489 490 491
#if defined ZMQ_HAVE_WINDOWS
    // On Windows we don't need to copy the whole fd_set.
    // SOCKETS are continuous from the beginning of fd_array in fd_set.
    // We just need to copy fd_count elements of fd_array.
    // We gain huge memcpy() improvement if number of used SOCKETs is much lower than FD_SETSIZE.
sigiesec's avatar
sigiesec committed
492 493 494 495 496 497 498 499 500
    memcpy (&read, &other_.read,
            (char *) (other_.read.fd_array + other_.read.fd_count)
              - (char *) &other_.read);
    memcpy (&write, &other_.write,
            (char *) (other_.write.fd_array + other_.write.fd_count)
              - (char *) &other_.write);
    memcpy (&error, &other_.error,
            (char *) (other_.error.fd_array + other_.error.fd_count)
              - (char *) &other_.error);
501
#else
502 503 504
    memcpy (&read, &other_.read, sizeof other_.read);
    memcpy (&write, &other_.write, sizeof other_.write);
    memcpy (&error, &other_.error, sizeof other_.error);
505
#endif
506 507 508
    return *this;
}

sigiesec's avatar
sigiesec committed
509
void zmq::select_t::fds_set_t::remove_fd (const fd_t &fd_)
510 511 512 513 514 515
{
    FD_CLR (fd_, &read);
    FD_CLR (fd_, &write);
    FD_CLR (fd_, &error);
}

516 517 518 519 520 521 522 523 524 525 526 527 528 529 530
bool zmq::select_t::cleanup_retired (family_entry_t &family_entry_)
{
    if (family_entry_.has_retired) {
        family_entry_.has_retired = false;
        family_entry_.fd_entries.erase (
          std::remove_if (family_entry_.fd_entries.begin (),
                          family_entry_.fd_entries.end (), is_retired_fd),
          family_entry_.fd_entries.end ());
    }
    return family_entry_.fd_entries.empty ();
}

void zmq::select_t::cleanup_retired ()
{
#ifdef _WIN32
531 532
    for (family_entries_t::iterator it = _family_entries.begin ();
         it != _family_entries.end ();) {
533
        if (cleanup_retired (it->second))
534
            it = _family_entries.erase (it);
535 536 537 538
        else
            ++it;
    }
#else
539
    cleanup_retired (_family_entry);
540 541 542
#endif
}

543
bool zmq::select_t::is_retired_fd (const fd_entry_t &entry_)
544
{
545
    return (entry_.fd == retired_fd);
546 547
}

548 549 550 551 552
zmq::select_t::family_entry_t::family_entry_t () : has_retired (false)
{
}


553 554 555
#if defined ZMQ_HAVE_WINDOWS
u_short zmq::select_t::get_fd_family (fd_t fd_)
{
556 557 558
    // cache the results of determine_fd_family, as this is frequently called
    // for the same sockets, and determine_fd_family is expensive
    size_t i;
sigiesec's avatar
sigiesec committed
559
    for (i = 0; i < fd_family_cache_size; ++i) {
560
        const std::pair<fd_t, u_short> &entry = _fd_family_cache[i];
sigiesec's avatar
sigiesec committed
561
        if (entry.first == fd_) {
562 563 564 565 566 567
            return entry.second;
        }
        if (entry.first == retired_fd)
            break;
    }

sigiesec's avatar
sigiesec committed
568 569
    std::pair<fd_t, u_short> res =
      std::make_pair (fd_, determine_fd_family (fd_));
570
    if (i < fd_family_cache_size) {
571
        _fd_family_cache[i] = res;
sigiesec's avatar
sigiesec committed
572
    } else {
573 574
        // just overwrite a random entry
        // could be optimized by some LRU strategy
575
        _fd_family_cache[rand () % fd_family_cache_size] = res;
576 577 578 579 580 581 582 583
    }

    return res.second;
}

u_short zmq::select_t::determine_fd_family (fd_t fd_)
{
    //  Use sockaddr_storage instead of sockaddr to accommodate different structure sizes
sigiesec's avatar
sigiesec committed
584
    sockaddr_storage addr = {0};
585
    int addr_size = sizeof addr;
586

587
    int type;
sigiesec's avatar
sigiesec committed
588
    int type_length = sizeof (int);
589

590 591
    int rc = getsockopt (fd_, SOL_SOCKET, SO_TYPE,
                         reinterpret_cast<char *> (&type), &type_length);
592 593 594 595

    if (rc == 0) {
        if (type == SOCK_DGRAM)
            return AF_INET;
596 597 598 599 600 601 602 603

        rc =
          getsockname (fd_, reinterpret_cast<sockaddr *> (&addr), &addr_size);

        //  AF_INET and AF_INET6 can be mixed in select
        //  TODO: If proven otherwise, should simply return addr.sa_family
        if (rc != SOCKET_ERROR)
            return addr.ss_family == AF_INET6 ? AF_INET : addr.ss_family;
604
    }
605

606
    return AF_UNSPEC;
607 608 609 610
}

zmq::select_t::wsa_events_t::wsa_events_t ()
{
611 612 613 614 615 616 617 618
    events[0] = WSACreateEvent ();
    wsa_assert (events[0] != WSA_INVALID_EVENT);
    events[1] = WSACreateEvent ();
    wsa_assert (events[1] != WSA_INVALID_EVENT);
    events[2] = WSACreateEvent ();
    wsa_assert (events[2] != WSA_INVALID_EVENT);
    events[3] = WSACreateEvent ();
    wsa_assert (events[3] != WSA_INVALID_EVENT);
619 620 621 622
}

zmq::select_t::wsa_events_t::~wsa_events_t ()
{
623 624 625 626
    wsa_assert (WSACloseEvent (events[0]));
    wsa_assert (WSACloseEvent (events[1]));
    wsa_assert (WSACloseEvent (events[2]));
    wsa_assert (WSACloseEvent (events[3]));
627 628 629
}
#endif

630
#endif