select.cpp 20.1 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "select.hpp"
32
#if defined ZMQ_IOTHREAD_POLLER_USE_SELECT
33

34
#if defined ZMQ_HAVE_WINDOWS
Martin Sustrik's avatar
Martin Sustrik committed
35
#elif defined ZMQ_HAVE_HPUX
Martin Sustrik's avatar
Martin Sustrik committed
36 37 38
#include <sys/param.h>
#include <sys/types.h>
#include <sys/time.h>
Martin Sustrik's avatar
Martin Sustrik committed
39
#elif defined ZMQ_HAVE_OPENVMS
Martin Sustrik's avatar
Martin Sustrik committed
40 41
#include <sys/types.h>
#include <sys/time.h>
42 43 44 45
#elif defined ZMQ_HAVE_VXWORKS
#include <sys/types.h>
#include <sys/time.h>
#include <strings.h>
Martin Sustrik's avatar
Martin Sustrik committed
46 47 48 49 50 51 52 53
#else
#include <sys/select.h>
#endif

#include "err.hpp"
#include "config.hpp"
#include "i_poll_events.hpp"

54
#include <algorithm>
55 56
#include <limits>
#include <climits>
57

58
zmq::select_t::select_t (const zmq::thread_ctx_t &ctx_) :
59
    worker_poller_base_t (ctx_),
60 61
#if defined ZMQ_HAVE_WINDOWS
    //  Fine as long as map is not cleared.
62
    current_family_entry_it (family_entries.end ())
63
#else
64
    maxfd (retired_fd)
65
#endif
Martin Sustrik's avatar
Martin Sustrik committed
66
{
67 68
#if defined ZMQ_HAVE_WINDOWS
    for (size_t i = 0; i < fd_family_cache_size; ++i)
69
        fd_family_cache[i] = std::make_pair (retired_fd, 0);
70
#endif
Martin Sustrik's avatar
Martin Sustrik committed
71 72
}

73 74
zmq::select_t::~select_t ()
{
75
    stop_worker ();
76 77
}

78
zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
Martin Sustrik's avatar
Martin Sustrik committed
79
{
80
    check_thread ();
81 82
    zmq_assert (fd_ != retired_fd);

83 84 85
    fd_entry_t fd_entry;
    fd_entry.fd = fd_;
    fd_entry.events = events_;
86

87 88 89
#if defined ZMQ_HAVE_WINDOWS
    u_short family = get_fd_family (fd_);
    wsa_assert (family != AF_UNSPEC);
90
    family_entry_t &family_entry = family_entries[family];
sigiesec's avatar
sigiesec committed
91
#endif
92 93
    family_entry.fd_entries.push_back (fd_entry);
    FD_SET (fd_, &family_entry.fds_set.error);
Martin Sustrik's avatar
Martin Sustrik committed
94

sigiesec's avatar
sigiesec committed
95
#if !defined ZMQ_HAVE_WINDOWS
Martin Sustrik's avatar
Martin Sustrik committed
96 97
    if (fd_ > maxfd)
        maxfd = fd_;
98
#endif
Martin Sustrik's avatar
Martin Sustrik committed
99

100
    adjust_load (1);
Martin Sustrik's avatar
Martin Sustrik committed
101

102
    return fd_;
Martin Sustrik's avatar
Martin Sustrik committed
103 104
}

105
zmq::select_t::fd_entries_t::iterator
106
zmq::select_t::find_fd_entry_by_handle (fd_entries_t &fd_entries_,
107 108 109
                                        handle_t handle_)
{
    fd_entries_t::iterator fd_entry_it;
110
    for (fd_entry_it = fd_entries_.begin (); fd_entry_it != fd_entries_.end ();
111 112 113 114 115 116 117
         ++fd_entry_it)
        if (fd_entry_it->fd == handle_)
            break;

    return fd_entry_it;
}

sigiesec's avatar
sigiesec committed
118 119 120 121 122 123 124
void zmq::select_t::trigger_events (const fd_entries_t &fd_entries_,
                                    const fds_set_t &local_fds_set_,
                                    int event_count_)
{
    //  Size is cached to avoid iteration through recently added descriptors.
    for (fd_entries_t::size_type i = 0, size = fd_entries_.size ();
         i < size && event_count_ > 0; ++i) {
125
        //  fd_entries_[i] may not be stored, since calls to
126
        //  in_event/out_event may reallocate the vector
sigiesec's avatar
sigiesec committed
127

128
        if (is_retired_fd (fd_entries_[i]))
sigiesec's avatar
sigiesec committed
129 130
            continue;

131 132
        if (FD_ISSET (fd_entries_[i].fd, &local_fds_set_.read)) {
            fd_entries_[i].events->in_event ();
sigiesec's avatar
sigiesec committed
133 134 135 136 137 138 139
            --event_count_;
        }

        //  TODO: can the is_retired_fd be true at this point? if it
        //  was retired before, we would already have continued, and I
        //  don't see where it might have been modified
        //  And if rc == 0, we can break instead of continuing
140
        if (is_retired_fd (fd_entries_[i]) || event_count_ == 0)
sigiesec's avatar
sigiesec committed
141 142
            continue;

143 144
        if (FD_ISSET (fd_entries_[i].fd, &local_fds_set_.write)) {
            fd_entries_[i].events->out_event ();
sigiesec's avatar
sigiesec committed
145 146 147 148
            --event_count_;
        }

        //  TODO: same as above
149
        if (is_retired_fd (fd_entries_[i]) || event_count_ == 0)
sigiesec's avatar
sigiesec committed
150 151
            continue;

152 153
        if (FD_ISSET (fd_entries_[i].fd, &local_fds_set_.error)) {
            fd_entries_[i].events->in_event ();
sigiesec's avatar
sigiesec committed
154 155 156 157 158
            --event_count_;
        }
    }
}

sigiesec's avatar
sigiesec committed
159
#if defined ZMQ_HAVE_WINDOWS
160
int zmq::select_t::try_retire_fd_entry (
161
  family_entries_t::iterator family_entry_it_, zmq::fd_t &handle_)
Martin Sustrik's avatar
Martin Sustrik committed
162
{
163
    family_entry_t &family_entry = family_entry_it_->second;
164

165 166
    fd_entries_t::iterator fd_entry_it =
      find_fd_entry_by_handle (family_entry.fd_entries, handle_);
167

168
    if (fd_entry_it == family_entry.fd_entries.end ())
169
        return 0;
170 171 172 173

    fd_entry_t &fd_entry = *fd_entry_it;
    zmq_assert (fd_entry.fd != retired_fd);

174
    if (family_entry_it_ != current_family_entry_it) {
175
        //  Family is not currently being iterated and can be safely
176 177
        //  modified in-place. So later it can be skipped without
        //  re-verifying its content.
178 179 180 181
        family_entry.fd_entries.erase (fd_entry_it);
    } else {
        //  Otherwise mark removed entries as retired. It will be cleaned up
        //  at the end of the iteration. See zmq::select_t::loop
182
        fd_entry.fd = retired_fd;
183
        family_entry.has_retired = true;
184
    }
185
    family_entry.fds_set.remove_fd (handle_);
186
    return 1;
187
}
sigiesec's avatar
sigiesec committed
188
#endif
189 190 191

void zmq::select_t::rm_fd (handle_t handle_)
{
192
    check_thread ();
193
    int retired = 0;
194 195 196 197 198 199
#if defined ZMQ_HAVE_WINDOWS
    u_short family = get_fd_family (handle_);
    if (family != AF_UNSPEC) {
        family_entries_t::iterator family_entry_it =
          family_entries.find (family);

200
        retired += try_retire_fd_entry (family_entry_it, handle_);
201 202 203 204 205 206 207 208
    } else {
        //  get_fd_family may fail and return AF_UNSPEC if the socket was not
        //  successfully connected. In that case, we need to look for the
        //  socket in all family_entries.
        family_entries_t::iterator end = family_entries.end ();
        for (family_entries_t::iterator family_entry_it =
               family_entries.begin ();
             family_entry_it != end; ++family_entry_it) {
209
            if (retired += try_retire_fd_entry (family_entry_it, handle_)) {
210
                break;
211
            }
212 213
        }
    }
214
#else
215
    fd_entries_t::iterator fd_entry_it =
sigiesec's avatar
sigiesec committed
216
      find_fd_entry_by_handle (family_entry.fd_entries, handle_);
217
    assert (fd_entry_it != family_entry.fd_entries.end ());
Martin Sustrik's avatar
Martin Sustrik committed
218

219
    zmq_assert (fd_entry_it->fd != retired_fd);
220
    fd_entry_it->fd = retired_fd;
sigiesec's avatar
sigiesec committed
221
    family_entry.fds_set.remove_fd (handle_);
Martin Sustrik's avatar
Martin Sustrik committed
222

223 224
    ++retired;

225
    if (handle_ == maxfd) {
Martin Sustrik's avatar
Martin Sustrik committed
226
        maxfd = retired_fd;
sigiesec's avatar
sigiesec committed
227 228
        for (fd_entry_it = family_entry.fd_entries.begin ();
             fd_entry_it != family_entry.fd_entries.end (); ++fd_entry_it)
229 230
            if (fd_entry_it->fd > maxfd)
                maxfd = fd_entry_it->fd;
Martin Sustrik's avatar
Martin Sustrik committed
231 232
    }

233
    family_entry.has_retired = true;
234
#endif
235
    zmq_assert (retired == 1);
236
    adjust_load (-1);
Martin Sustrik's avatar
Martin Sustrik committed
237 238
}

Martin Sustrik's avatar
Martin Sustrik committed
239
void zmq::select_t::set_pollin (handle_t handle_)
Martin Sustrik's avatar
Martin Sustrik committed
240
{
241
    check_thread ();
242 243 244
#if defined ZMQ_HAVE_WINDOWS
    u_short family = get_fd_family (handle_);
    wsa_assert (family != AF_UNSPEC);
245
    family_entry_t &family_entry = family_entries[family];
246
#endif
sigiesec's avatar
sigiesec committed
247
    FD_SET (handle_, &family_entry.fds_set.read);
Martin Sustrik's avatar
Martin Sustrik committed
248 249
}

Martin Sustrik's avatar
Martin Sustrik committed
250
void zmq::select_t::reset_pollin (handle_t handle_)
Martin Sustrik's avatar
Martin Sustrik committed
251
{
252
    check_thread ();
253 254 255
#if defined ZMQ_HAVE_WINDOWS
    u_short family = get_fd_family (handle_);
    wsa_assert (family != AF_UNSPEC);
256
    family_entry_t &family_entry = family_entries[family];
257
#endif
sigiesec's avatar
sigiesec committed
258
    FD_CLR (handle_, &family_entry.fds_set.read);
Martin Sustrik's avatar
Martin Sustrik committed
259 260
}

Martin Sustrik's avatar
Martin Sustrik committed
261
void zmq::select_t::set_pollout (handle_t handle_)
Martin Sustrik's avatar
Martin Sustrik committed
262
{
263
    check_thread ();
264 265 266
#if defined ZMQ_HAVE_WINDOWS
    u_short family = get_fd_family (handle_);
    wsa_assert (family != AF_UNSPEC);
267
    family_entry_t &family_entry = family_entries[family];
268
#endif
sigiesec's avatar
sigiesec committed
269
    FD_SET (handle_, &family_entry.fds_set.write);
Martin Sustrik's avatar
Martin Sustrik committed
270 271
}

Martin Sustrik's avatar
Martin Sustrik committed
272
void zmq::select_t::reset_pollout (handle_t handle_)
Martin Sustrik's avatar
Martin Sustrik committed
273
{
274
    check_thread ();
275 276 277
#if defined ZMQ_HAVE_WINDOWS
    u_short family = get_fd_family (handle_);
    wsa_assert (family != AF_UNSPEC);
278
    family_entry_t &family_entry = family_entries[family];
279
#endif
sigiesec's avatar
sigiesec committed
280
    FD_CLR (handle_, &family_entry.fds_set.write);
Martin Sustrik's avatar
Martin Sustrik committed
281 282
}

Martin Sustrik's avatar
Martin Sustrik committed
283
void zmq::select_t::stop ()
Martin Sustrik's avatar
Martin Sustrik committed
284
{
285 286
    check_thread ();
    //  no-op... thread is stopped when no more fds or timers are registered
Martin Sustrik's avatar
Martin Sustrik committed
287 288
}

Richard Newton's avatar
Richard Newton committed
289
int zmq::select_t::max_fds ()
290 291 292 293
{
    return FD_SETSIZE;
}

Martin Sustrik's avatar
Martin Sustrik committed
294
void zmq::select_t::loop ()
Martin Sustrik's avatar
Martin Sustrik committed
295
{
296
    while (true) {
297
        //  Execute any due timers.
298
        int timeout = static_cast<int> (execute_timers ());
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314

        cleanup_retired ();

#ifdef _WIN32
        if (family_entries.empty ()) {
#else
        if (family_entry.fd_entries.empty ()) {
#endif
            zmq_assert (get_load () == 0);

            if (timeout == 0)
                break;

            // TODO sleep for timeout
            continue;
        }
315

316
#if defined ZMQ_HAVE_OSX
sigiesec's avatar
sigiesec committed
317
        struct timeval tv = {(long) (timeout / 1000), timeout % 1000 * 1000};
318
#else
319 320
        struct timeval tv = {static_cast<long> (timeout / 1000),
                             static_cast<long> (timeout % 1000 * 1000)};
321
#endif
322 323 324 325 326 327 328 329 330 331 332

#if defined ZMQ_HAVE_WINDOWS
        /*
            On Windows select does not allow to mix descriptors from different
            service providers. It seems to work for AF_INET and AF_INET6,
            but fails for AF_INET and VMCI. The workaround is to use
            WSAEventSelect and WSAWaitForMultipleEvents to wait, then use
            select to find out what actually changed. WSAWaitForMultipleEvents
            cannot be used alone, because it does not support more than 64 events
            which is not enough.

sigiesec's avatar
sigiesec committed
333
            To reduce unnecessary overhead, WSA is only used when there are more
334 335 336 337 338 339
            than one family. Moreover, AF_INET and AF_INET6 are considered the same
            family because Windows seems to handle them properly.
            See get_fd_family for details.
        */

        //  If there is just one family, there is no reason to use WSA events.
340
        int rc = 0;
sigiesec's avatar
sigiesec committed
341 342 343 344 345 346 347 348 349
        const bool use_wsa_events = family_entries.size () > 1;
        if (use_wsa_events) {
            // TODO: I don't really understand why we are doing this. If any of
            // the events was signaled, we will call select for each fd_family
            // afterwards. The only benefit is if none of the events was
            // signaled, then we continue early.
            // IMHO, either WSAEventSelect/WSAWaitForMultipleEvents or select
            // should be used, but not both

350 351
            wsa_events_t wsa_events;

sigiesec's avatar
sigiesec committed
352 353 354 355
            for (family_entries_t::iterator family_entry_it =
                   family_entries.begin ();
                 family_entry_it != family_entries.end (); ++family_entry_it) {
                family_entry_t &family_entry = family_entry_it->second;
356

sigiesec's avatar
sigiesec committed
357 358 359 360
                for (fd_entries_t::iterator fd_entry_it =
                       family_entry.fd_entries.begin ();
                     fd_entry_it != family_entry.fd_entries.end ();
                     ++fd_entry_it) {
361 362 363
                    fd_t fd = fd_entry_it->fd;

                    //  http://stackoverflow.com/q/35043420/188530
sigiesec's avatar
sigiesec committed
364 365 366
                    if (FD_ISSET (fd, &family_entry.fds_set.read)
                        && FD_ISSET (fd, &family_entry.fds_set.write))
                        rc =
367
                          WSAEventSelect (fd, wsa_events.events[3],
sigiesec's avatar
sigiesec committed
368 369
                                          FD_READ | FD_ACCEPT | FD_CLOSE
                                            | FD_WRITE | FD_CONNECT | FD_OOB);
370
                    else if (FD_ISSET (fd, &family_entry.fds_set.read))
371
                        rc = WSAEventSelect (fd, wsa_events.events[0],
sigiesec's avatar
sigiesec committed
372 373
                                             FD_READ | FD_ACCEPT | FD_CLOSE
                                               | FD_OOB);
374
                    else if (FD_ISSET (fd, &family_entry.fds_set.write))
375
                        rc = WSAEventSelect (fd, wsa_events.events[1],
sigiesec's avatar
sigiesec committed
376
                                             FD_WRITE | FD_CONNECT | FD_OOB);
377
                    else if (FD_ISSET (fd, &family_entry.fds_set.error))
378
                        rc = WSAEventSelect (fd, wsa_events.events[2], FD_OOB);
379 380 381 382 383 384 385 386
                    else
                        rc = 0;

                    wsa_assert (rc != SOCKET_ERROR);
                }
            }

            rc = WSAWaitForMultipleEvents (4, wsa_events.events, FALSE,
sigiesec's avatar
sigiesec committed
387 388
                                           timeout ? timeout : INFINITE, FALSE);
            wsa_assert (rc != (int) WSA_WAIT_FAILED);
389 390 391 392 393 394 395
            zmq_assert (rc != WSA_WAIT_IO_COMPLETION);

            if (rc == WSA_WAIT_TIMEOUT)
                continue;
        }

        for (current_family_entry_it = family_entries.begin ();
sigiesec's avatar
sigiesec committed
396 397 398
             current_family_entry_it != family_entries.end ();
             ++current_family_entry_it) {
            family_entry_t &family_entry = current_family_entry_it->second;
399 400


sigiesec's avatar
sigiesec committed
401
            if (use_wsa_events) {
402 403
                //  There is no reason to wait again after WSAWaitForMultipleEvents.
                //  Simply collect what is ready.
sigiesec's avatar
sigiesec committed
404
                struct timeval tv_nodelay = {0, 0};
sigiesec's avatar
sigiesec committed
405 406 407
                select_family_entry (family_entry, 0, true, tv_nodelay);
            } else {
                select_family_entry (family_entry, 0, timeout > 0, tv);
408 409
            }
        }
Martin Sustrik's avatar
Martin Sustrik committed
410
#else
411
        select_family_entry (family_entry, maxfd + 1, timeout > 0, tv);
sigiesec's avatar
sigiesec committed
412 413 414
#endif
    }
}
415

sigiesec's avatar
sigiesec committed
416 417 418 419 420 421 422 423 424
void zmq::select_t::select_family_entry (family_entry_t &family_entry_,
                                         const int max_fd_,
                                         const bool use_timeout_,
                                         struct timeval &tv_)
{
    //  select will fail when run with empty sets.
    fd_entries_t &fd_entries = family_entry_.fd_entries;
    if (fd_entries.empty ())
        return;
Martin Sustrik's avatar
Martin Sustrik committed
425

sigiesec's avatar
sigiesec committed
426 427 428
    fds_set_t local_fds_set = family_entry_.fds_set;
    int rc = select (max_fd_, &local_fds_set.read, &local_fds_set.write,
                     &local_fds_set.error, use_timeout_ ? &tv_ : NULL);
Martin Sustrik's avatar
Martin Sustrik committed
429

sigiesec's avatar
sigiesec committed
430 431 432 433 434 435 436
#if defined ZMQ_HAVE_WINDOWS
    wsa_assert (rc != SOCKET_ERROR);
#else
    if (rc == -1) {
        errno_assert (errno == EINTR);
        return;
    }
437
#endif
sigiesec's avatar
sigiesec committed
438 439 440

    trigger_events (fd_entries, local_fds_set, rc);

441
    cleanup_retired (family_entry_);
Martin Sustrik's avatar
Martin Sustrik committed
442
}
443

444 445 446 447 448 449 450
zmq::select_t::fds_set_t::fds_set_t ()
{
    FD_ZERO (&read);
    FD_ZERO (&write);
    FD_ZERO (&error);
}

sigiesec's avatar
sigiesec committed
451
zmq::select_t::fds_set_t::fds_set_t (const fds_set_t &other_)
452
{
453 454 455 456 457
#if defined ZMQ_HAVE_WINDOWS
    // On Windows we don't need to copy the whole fd_set.
    // SOCKETS are continuous from the beginning of fd_array in fd_set.
    // We just need to copy fd_count elements of fd_array.
    // We gain huge memcpy() improvement if number of used SOCKETs is much lower than FD_SETSIZE.
sigiesec's avatar
sigiesec committed
458 459 460 461 462 463 464 465 466
    memcpy (&read, &other_.read,
            (char *) (other_.read.fd_array + other_.read.fd_count)
              - (char *) &other_.read);
    memcpy (&write, &other_.write,
            (char *) (other_.write.fd_array + other_.write.fd_count)
              - (char *) &other_.write);
    memcpy (&error, &other_.error,
            (char *) (other_.error.fd_array + other_.error.fd_count)
              - (char *) &other_.error);
467
#else
468 469 470
    memcpy (&read, &other_.read, sizeof other_.read);
    memcpy (&write, &other_.write, sizeof other_.write);
    memcpy (&error, &other_.error, sizeof other_.error);
471
#endif
472 473
}

sigiesec's avatar
sigiesec committed
474 475
zmq::select_t::fds_set_t &zmq::select_t::fds_set_t::
operator= (const fds_set_t &other_)
476
{
477 478 479 480 481
#if defined ZMQ_HAVE_WINDOWS
    // On Windows we don't need to copy the whole fd_set.
    // SOCKETS are continuous from the beginning of fd_array in fd_set.
    // We just need to copy fd_count elements of fd_array.
    // We gain huge memcpy() improvement if number of used SOCKETs is much lower than FD_SETSIZE.
sigiesec's avatar
sigiesec committed
482 483 484 485 486 487 488 489 490
    memcpy (&read, &other_.read,
            (char *) (other_.read.fd_array + other_.read.fd_count)
              - (char *) &other_.read);
    memcpy (&write, &other_.write,
            (char *) (other_.write.fd_array + other_.write.fd_count)
              - (char *) &other_.write);
    memcpy (&error, &other_.error,
            (char *) (other_.error.fd_array + other_.error.fd_count)
              - (char *) &other_.error);
491
#else
492 493 494
    memcpy (&read, &other_.read, sizeof other_.read);
    memcpy (&write, &other_.write, sizeof other_.write);
    memcpy (&error, &other_.error, sizeof other_.error);
495
#endif
496 497 498
    return *this;
}

sigiesec's avatar
sigiesec committed
499
void zmq::select_t::fds_set_t::remove_fd (const fd_t &fd_)
500 501 502 503 504 505
{
    FD_CLR (fd_, &read);
    FD_CLR (fd_, &write);
    FD_CLR (fd_, &error);
}

506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532
bool zmq::select_t::cleanup_retired (family_entry_t &family_entry_)
{
    if (family_entry_.has_retired) {
        family_entry_.has_retired = false;
        family_entry_.fd_entries.erase (
          std::remove_if (family_entry_.fd_entries.begin (),
                          family_entry_.fd_entries.end (), is_retired_fd),
          family_entry_.fd_entries.end ());
    }
    return family_entry_.fd_entries.empty ();
}

void zmq::select_t::cleanup_retired ()
{
#ifdef _WIN32
    for (family_entries_t::iterator it = family_entries.begin ();
         it != family_entries.end ();) {
        if (cleanup_retired (it->second))
            it = family_entries.erase (it);
        else
            ++it;
    }
#else
    cleanup_retired (family_entry);
#endif
}

533
bool zmq::select_t::is_retired_fd (const fd_entry_t &entry_)
534
{
535
    return (entry_.fd == retired_fd);
536 537
}

538 539 540 541 542
zmq::select_t::family_entry_t::family_entry_t () : has_retired (false)
{
}


543 544 545
#if defined ZMQ_HAVE_WINDOWS
u_short zmq::select_t::get_fd_family (fd_t fd_)
{
546 547 548
    // cache the results of determine_fd_family, as this is frequently called
    // for the same sockets, and determine_fd_family is expensive
    size_t i;
sigiesec's avatar
sigiesec committed
549
    for (i = 0; i < fd_family_cache_size; ++i) {
550
        const std::pair<fd_t, u_short> &entry = fd_family_cache[i];
sigiesec's avatar
sigiesec committed
551
        if (entry.first == fd_) {
552 553 554 555 556 557
            return entry.second;
        }
        if (entry.first == retired_fd)
            break;
    }

sigiesec's avatar
sigiesec committed
558 559
    std::pair<fd_t, u_short> res =
      std::make_pair (fd_, determine_fd_family (fd_));
560
    if (i < fd_family_cache_size) {
561
        fd_family_cache[i] = res;
sigiesec's avatar
sigiesec committed
562
    } else {
563 564
        // just overwrite a random entry
        // could be optimized by some LRU strategy
565
        fd_family_cache[rand () % fd_family_cache_size] = res;
566 567 568 569 570 571 572 573
    }

    return res.second;
}

u_short zmq::select_t::determine_fd_family (fd_t fd_)
{
    //  Use sockaddr_storage instead of sockaddr to accommodate different structure sizes
sigiesec's avatar
sigiesec committed
574
    sockaddr_storage addr = {0};
575
    int addr_size = sizeof addr;
576

577
    int type;
sigiesec's avatar
sigiesec committed
578
    int type_length = sizeof (int);
579

580 581
    int rc = getsockopt (fd_, SOL_SOCKET, SO_TYPE,
                         reinterpret_cast<char *> (&type), &type_length);
582 583 584 585 586

    if (rc == 0) {
        if (type == SOCK_DGRAM)
            return AF_INET;
        else {
587 588
            rc = getsockname (fd_, reinterpret_cast<sockaddr *> (&addr),
                              &addr_size);
589 590 591 592

            //  AF_INET and AF_INET6 can be mixed in select
            //  TODO: If proven otherwise, should simply return addr.sa_family
            if (rc != SOCKET_ERROR)
593
                return addr.ss_family == AF_INET6 ? AF_INET : addr.ss_family;
594
        }
595
    }
596

597
    return AF_UNSPEC;
598 599 600 601
}

zmq::select_t::wsa_events_t::wsa_events_t ()
{
602 603 604 605 606 607 608 609
    events[0] = WSACreateEvent ();
    wsa_assert (events[0] != WSA_INVALID_EVENT);
    events[1] = WSACreateEvent ();
    wsa_assert (events[1] != WSA_INVALID_EVENT);
    events[2] = WSACreateEvent ();
    wsa_assert (events[2] != WSA_INVALID_EVENT);
    events[3] = WSACreateEvent ();
    wsa_assert (events[3] != WSA_INVALID_EVENT);
610 611 612 613
}

zmq::select_t::wsa_events_t::~wsa_events_t ()
{
614 615 616 617
    wsa_assert (WSACloseEvent (events[0]));
    wsa_assert (WSACloseEvent (events[1]));
    wsa_assert (WSACloseEvent (events[2]));
    wsa_assert (WSACloseEvent (events[3]));
618 619 620
}
#endif

621
#endif