socket_poller.cpp 19 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29

    This file is part of libzmq, the ZeroMQ core engine in C++.

    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31 32
#include "socket_poller.hpp"
#include "err.hpp"
33
#include "polling_util.hpp"
34
#include "macros.hpp"
35

36 37
#include <limits.h>

38
static bool is_thread_safe (zmq::socket_base_t &socket_)
39
{
40
    // do not use getsockopt here, since that would fail during context termination
41
    return socket_.is_thread_safe ();
42 43
}

44
zmq::socket_poller_t::socket_poller_t () :
45
    _tag (0xCAFEBABE),
46
    _signaler (NULL)
47
#if defined ZMQ_POLL_BASED_ON_POLL
48
    ,
49
    _pollfds (NULL)
50 51
#elif defined ZMQ_POLL_BASED_ON_SELECT
    ,
52
    _max_fd (0)
53
#endif
54
{
55
    rebuild ();
56 57 58 59 60
}

zmq::socket_poller_t::~socket_poller_t ()
{
    //  Mark the socket_poller as dead
61
    _tag = 0xdeadbeef;
62

63 64
    for (items_t::iterator it = _items.begin (), end = _items.end (); it != end;
         ++it) {
65 66 67
        // TODO shouldn't this zmq_assert (it->socket->check_tag ()) instead?
        if (it->socket && it->socket->check_tag ()
            && is_thread_safe (*it->socket)) {
68
            it->socket->remove_signaler (_signaler);
69 70 71
        }
    }

72
    if (_signaler != NULL) {
73
        LIBZMQ_DELETE (_signaler);
74 75
    }

76
#if defined ZMQ_POLL_BASED_ON_POLL
77 78 79
    if (_pollfds) {
        free (_pollfds);
        _pollfds = NULL;
80
    }
81
#endif
82 83
}

84
bool zmq::socket_poller_t::check_tag ()
85
{
86
    return _tag == 0xCAFEBABE;
87 88
}

89 90 91
int zmq::socket_poller_t::add (socket_base_t *socket_,
                               void *user_data_,
                               short events_)
92
{
93 94
    for (items_t::iterator it = _items.begin (), end = _items.end (); it != end;
         ++it) {
95 96 97
        if (it->socket == socket_) {
            errno = EINVAL;
            return -1;
98
        }
99 100
    }

101
    if (is_thread_safe (*socket_)) {
102 103 104
        if (_signaler == NULL) {
            _signaler = new (std::nothrow) signaler_t ();
            if (!_signaler) {
105 106 107
                errno = ENOMEM;
                return -1;
            }
108 109 110
            if (!_signaler->valid ()) {
                delete _signaler;
                _signaler = NULL;
111 112 113 114
                errno = EMFILE;
                return -1;
            }
        }
115

116
        socket_->add_signaler (_signaler);
117
    }
118

119 120 121 122 123
    item_t item = {
        socket_,
        0,
        user_data_,
        events_
124
#if defined ZMQ_POLL_BASED_ON_POLL
125 126
        ,
        -1
127 128
#endif
    };
129
    try {
130
        _items.push_back (item);
131 132 133 134 135
    }
    catch (const std::bad_alloc &) {
        errno = ENOMEM;
        return -1;
    }
136
    _need_rebuild = true;
137 138 139 140

    return 0;
}

141
int zmq::socket_poller_t::add_fd (fd_t fd_, void *user_data_, short events_)
142
{
143 144
    for (items_t::iterator it = _items.begin (), end = _items.end (); it != end;
         ++it) {
145 146 147
        if (!it->socket && it->fd == fd_) {
            errno = EINVAL;
            return -1;
148
        }
149 150
    }

151 152 153 154 155
    item_t item = {
        NULL,
        fd_,
        user_data_,
        events_
156
#if defined ZMQ_POLL_BASED_ON_POLL
157 158
        ,
        -1
159
#endif
160
    };
161
    try {
162
        _items.push_back (item);
163 164 165 166 167
    }
    catch (const std::bad_alloc &) {
        errno = ENOMEM;
        return -1;
    }
168
    _need_rebuild = true;
169 170 171 172

    return 0;
}

173
int zmq::socket_poller_t::modify (socket_base_t *socket_, short events_)
174
{
175
    const items_t::iterator end = _items.end ();
176
    items_t::iterator it;
177

178
    for (it = _items.begin (); it != end; ++it) {
179 180 181 182
        if (it->socket == socket_)
            break;
    }

183
    if (it == end) {
184 185 186 187 188
        errno = EINVAL;
        return -1;
    }

    it->events = events_;
189
    _need_rebuild = true;
190 191 192 193 194

    return 0;
}


195
int zmq::socket_poller_t::modify_fd (fd_t fd_, short events_)
196
{
197
    const items_t::iterator end = _items.end ();
198
    items_t::iterator it;
199

200
    for (it = _items.begin (); it != end; ++it) {
201 202 203 204
        if (!it->socket && it->fd == fd_)
            break;
    }

205
    if (it == end) {
206 207 208
        errno = EINVAL;
        return -1;
    }
209

210
    it->events = events_;
211
    _need_rebuild = true;
212 213

    return 0;
214
}
215 216


217
int zmq::socket_poller_t::remove (socket_base_t *socket_)
218
{
219
    const items_t::iterator end = _items.end ();
220
    items_t::iterator it;
221

222
    for (it = _items.begin (); it != end; ++it) {
223 224 225 226
        if (it->socket == socket_)
            break;
    }

227
    if (it == end) {
228 229 230
        errno = EINVAL;
        return -1;
    }
231

232 233
    _items.erase (it);
    _need_rebuild = true;
234

235
    if (is_thread_safe (*socket_)) {
236
        socket_->remove_signaler (_signaler);
237
    }
238

239 240 241
    return 0;
}

242
int zmq::socket_poller_t::remove_fd (fd_t fd_)
243
{
244
    const items_t::iterator end = _items.end ();
245
    items_t::iterator it;
246

247
    for (it = _items.begin (); it != end; ++it) {
248 249 250 251
        if (!it->socket && it->fd == fd_)
            break;
    }

252
    if (it == end) {
253 254 255
        errno = EINVAL;
        return -1;
    }
256

257 258
    _items.erase (it);
    _need_rebuild = true;
259 260

    return 0;
261
}
262

263
void zmq::socket_poller_t::rebuild ()
264
{
265 266 267 268
    _use_signaler = false;
    _pollset_size = 0;
    _need_rebuild = false;

269
#if defined ZMQ_POLL_BASED_ON_POLL
270

271 272 273
    if (_pollfds) {
        free (_pollfds);
        _pollfds = NULL;
274
    }
275

276 277
    for (items_t::iterator it = _items.begin (), end = _items.end (); it != end;
         ++it) {
278
        if (it->events) {
279
            if (it->socket && is_thread_safe (*it->socket)) {
280 281 282
                if (!_use_signaler) {
                    _use_signaler = true;
                    _pollset_size++;
283
                }
284
            } else
285
                _pollset_size++;
286
        }
287
    }
288

289
    if (_pollset_size == 0)
290
        return;
291

292
    _pollfds = static_cast<pollfd *> (malloc (_pollset_size * sizeof (pollfd)));
293
    alloc_assert (_pollfds);
294 295 296

    int item_nbr = 0;

297
    if (_use_signaler) {
298
        item_nbr = 1;
299 300
        _pollfds[0].fd = _signaler->get_fd ();
        _pollfds[0].events = POLLIN;
301 302
    }

303 304
    for (items_t::iterator it = _items.begin (), end = _items.end (); it != end;
         ++it) {
305 306
        if (it->events) {
            if (it->socket) {
307
                if (!is_thread_safe (*it->socket)) {
308
                    size_t fd_size = sizeof (zmq::fd_t);
309
                    int rc = it->socket->getsockopt (
310
                      ZMQ_FD, &_pollfds[item_nbr].fd, &fd_size);
311
                    zmq_assert (rc == 0);
312

313
                    _pollfds[item_nbr].events = POLLIN;
314 315
                    item_nbr++;
                }
316
            } else {
317 318
                _pollfds[item_nbr].fd = it->fd;
                _pollfds[item_nbr].events =
319 320 321
                  (it->events & ZMQ_POLLIN ? POLLIN : 0)
                  | (it->events & ZMQ_POLLOUT ? POLLOUT : 0)
                  | (it->events & ZMQ_POLLPRI ? POLLPRI : 0);
322
                it->pollfd_index = item_nbr;
323
                item_nbr++;
324 325 326
            }
        }
    }
327

328
#elif defined ZMQ_POLL_BASED_ON_SELECT
329 330 331

    //  Ensure we do not attempt to select () on more than FD_SETSIZE
    //  file descriptors.
332
    zmq_assert (_items.size () <= FD_SETSIZE);
333

334 335 336 337 338 339 340 341
    _pollset_in.resize (_items.size ());
    _pollset_out.resize (_items.size ());
    _pollset_err.resize (_items.size ());

    FD_ZERO (_pollset_in.get ());
    FD_ZERO (_pollset_out.get ());
    FD_ZERO (_pollset_err.get ());

342 343
    for (items_t::iterator it = _items.begin (), end = _items.end (); it != end;
         ++it) {
344
        if (it->socket && is_thread_safe (*it->socket) && it->events) {
345
            _use_signaler = true;
346
            FD_SET (_signaler->get_fd (), _pollset_in.get ());
347
            _pollset_size = 1;
348
            break;
349 350
        }
    }
351

352
    _max_fd = 0;
353 354

    //  Build the fd_sets for passing to select ().
355 356
    for (items_t::iterator it = _items.begin (), end = _items.end (); it != end;
         ++it) {
357 358 359 360
        if (it->events) {
            //  If the poll item is a 0MQ socket we are interested in input on the
            //  notification file descriptor retrieved by the ZMQ_FD socket option.
            if (it->socket) {
361
                if (!is_thread_safe (*it->socket)) {
362 363
                    zmq::fd_t notify_fd;
                    size_t fd_size = sizeof (zmq::fd_t);
364 365
                    int rc =
                      it->socket->getsockopt (ZMQ_FD, &notify_fd, &fd_size);
366
                    zmq_assert (rc == 0);
367

368
                    FD_SET (notify_fd, _pollset_in.get ());
369 370
                    if (_max_fd < notify_fd)
                        _max_fd = notify_fd;
371

372
                    _pollset_size++;
373 374 375 376 377 378
                }
            }
            //  Else, the poll item is a raw file descriptor. Convert the poll item
            //  events to the appropriate fd_sets.
            else {
                if (it->events & ZMQ_POLLIN)
379
                    FD_SET (it->fd, _pollset_in.get ());
380
                if (it->events & ZMQ_POLLOUT)
381
                    FD_SET (it->fd, _pollset_out.get ());
382
                if (it->events & ZMQ_POLLERR)
383
                    FD_SET (it->fd, _pollset_err.get ());
384 385
                if (_max_fd < it->fd)
                    _max_fd = it->fd;
386

387
                _pollset_size++;
388 389 390 391 392
            }
        }
    }

#endif
393 394
}

395
void zmq::socket_poller_t::zero_trail_events (
396
  zmq::socket_poller_t::event_t *events_, int n_events_, int found_)
397
{
398
    for (int i = found_; i < n_events_; ++i) {
399 400 401 402 403 404 405 406 407
        events_[i].socket = NULL;
        events_[i].fd = 0;
        events_[i].user_data = NULL;
        events_[i].events = 0;
    }
}

#if defined ZMQ_POLL_BASED_ON_POLL
int zmq::socket_poller_t::check_events (zmq::socket_poller_t::event_t *events_,
408
                                        int n_events_)
409 410
#elif defined ZMQ_POLL_BASED_ON_SELECT
int zmq::socket_poller_t::check_events (zmq::socket_poller_t::event_t *events_,
411
                                        int n_events_,
412 413 414
                                        fd_set &inset_,
                                        fd_set &outset_,
                                        fd_set &errset_)
415 416 417
#endif
{
    int found = 0;
418 419
    for (items_t::iterator it = _items.begin (), end = _items.end ();
         it != end && found < n_events_; ++it) {
420 421 422 423 424 425
        //  The poll item is a 0MQ socket. Retrieve pending events
        //  using the ZMQ_EVENTS socket option.
        if (it->socket) {
            size_t events_size = sizeof (uint32_t);
            uint32_t events;
            if (it->socket->getsockopt (ZMQ_EVENTS, &events, &events_size)
426
                == -1) {
427 428 429 430 431 432 433 434 435 436 437 438 439 440 441
                return -1;
            }

            if (it->events & events) {
                events_[found].socket = it->socket;
                events_[found].user_data = it->user_data;
                events_[found].events = it->events & events;
                ++found;
            }
        }
        //  Else, the poll item is a raw file descriptor, simply convert
        //  the events to zmq_pollitem_t-style format.
        else {
#if defined ZMQ_POLL_BASED_ON_POLL

442
            short revents = _pollfds[it->pollfd_index].revents;
443 444 445 446 447 448 449 450 451 452 453 454 455 456 457
            short events = 0;

            if (revents & POLLIN)
                events |= ZMQ_POLLIN;
            if (revents & POLLOUT)
                events |= ZMQ_POLLOUT;
            if (revents & POLLPRI)
                events |= ZMQ_POLLPRI;
            if (revents & ~(POLLIN | POLLOUT | POLLPRI))
                events |= ZMQ_POLLERR;

#elif defined ZMQ_POLL_BASED_ON_SELECT

            short events = 0;

458
            if (FD_ISSET (it->fd, &inset_))
459
                events |= ZMQ_POLLIN;
460
            if (FD_ISSET (it->fd, &outset_))
461
                events |= ZMQ_POLLOUT;
462
            if (FD_ISSET (it->fd, &errset_))
463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479
                events |= ZMQ_POLLERR;
#endif //POLL_SELECT

            if (events) {
                events_[found].socket = NULL;
                events_[found].user_data = it->user_data;
                events_[found].fd = it->fd;
                events_[found].events = events;
                ++found;
            }
        }
    }

    return found;
}

//Return 0 if timeout is expired otherwise 1
480
int zmq::socket_poller_t::adjust_timeout (zmq::clock_t &clock_,
481
                                          long timeout_,
482 483 484
                                          uint64_t &now_,
                                          uint64_t &end_,
                                          bool &first_pass_)
485 486 487 488 489 490 491 492 493
{
    //  If socket_poller_t::timeout is zero, exit immediately whether there
    //  are events or not.
    if (timeout_ == 0)
        return 0;

    //  At this point we are meant to wait for events but there are none.
    //  If timeout is infinite we can just loop until we get some events.
    if (timeout_ < 0) {
494 495
        if (first_pass_)
            first_pass_ = false;
496 497 498 499 500 501 502
        return 1;
    }

    //  The timeout is finite and there are no events. In the first pass
    //  we get a timestamp of when the polling have begun. (We assume that
    //  first pass have taken negligible time). We also compute the time
    //  when the polling should time out.
503 504 505 506
    now_ = clock_.now_ms ();
    if (first_pass_) {
        end_ = now_ + timeout_;
        first_pass_ = false;
507 508 509 510
        return 1;
    }

    //  Find out whether timeout have expired.
511
    if (now_ >= end_)
512 513 514 515 516 517
        return 0;

    return 1;
}

int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_,
518 519
                                int n_events_,
                                long timeout_)
520
{
521
    if (_items.empty () && timeout_ < 0) {
522 523 524 525
        errno = EFAULT;
        return -1;
    }

526
    if (_need_rebuild)
527
        rebuild ();
528

529
    if (unlikely (_pollset_size == 0)) {
530
        // We'll report an error (timed out) as if the list was non-empty and
531
        // no event occurred within the specified timeout. Otherwise the caller
532 533
        // needs to check the return value AND the event to avoid using the
        // nullified event data.
534
        errno = EAGAIN;
535
        if (timeout_ == 0)
536
            return -1;
537 538
#if defined ZMQ_HAVE_WINDOWS
        Sleep (timeout_ > 0 ? timeout_ : INFINITE);
539
        return -1;
540 541
#elif defined ZMQ_HAVE_ANDROID
        usleep (timeout_ * 1000);
542
        return -1;
543 544 545 546
#elif defined ZMQ_HAVE_OSX
        usleep (timeout_ * 1000);
        errno = EAGAIN;
        return -1;
547 548 549 550 551 552
#elif defined ZMQ_HAVE_VXWORKS
        struct timespec ns_;
        ns_.tv_sec = timeout_ / 1000;
        ns_.tv_nsec = timeout_ % 1000 * 1000000;
        nanosleep (&ns_, 0);
        return -1;
553
#else
554 555
        usleep (timeout_ * 1000);
        return -1;
556
#endif
557 558
    }

559
#if defined ZMQ_POLL_BASED_ON_POLL
560 561 562
    zmq::clock_t clock;
    uint64_t now = 0;
    uint64_t end = 0;
563

564 565 566 567 568 569 570
    bool first_pass = true;

    while (true) {
        //  Compute the timeout for the subsequent poll.
        int timeout;
        if (first_pass)
            timeout = 0;
571
        else if (timeout_ < 0)
572 573
            timeout = -1;
        else
574 575
            timeout =
              static_cast<int> (std::min<uint64_t> (end - now, INT_MAX));
576 577 578

        //  Wait for events.
        while (true) {
579
            int rc = poll (_pollfds, _pollset_size, timeout);
580 581 582 583 584 585 586 587
            if (rc == -1 && errno == EINTR) {
                return -1;
            }
            errno_assert (rc >= 0);
            break;
        }

        //  Receive the signal from pollfd
588 589
        if (_use_signaler && _pollfds[0].revents & POLLIN)
            _signaler->recv ();
590 591

        //  Check for the events.
592
        int found = check_events (events_, n_events_);
593
        if (found) {
594 595
            if (found > 0)
                zero_trail_events (events_, n_events_, found);
596
            return found;
597
        }
598

599 600
        //  Adjust timeout or break
        if (adjust_timeout (clock, timeout_, now, end, first_pass) == 0)
601
            break;
602
    }
603
    errno = EAGAIN;
604 605 606 607 608 609 610 611
    return -1;

#elif defined ZMQ_POLL_BASED_ON_SELECT

    zmq::clock_t clock;
    uint64_t now = 0;
    uint64_t end = 0;

612
    bool first_pass = true;
613

614 615 616
    optimized_fd_set_t inset (_pollset_size);
    optimized_fd_set_t outset (_pollset_size);
    optimized_fd_set_t errset (_pollset_size);
617 618 619 620 621 622 623 624 625

    while (true) {
        //  Compute the timeout for the subsequent poll.
        timeval timeout;
        timeval *ptimeout;
        if (first_pass) {
            timeout.tv_sec = 0;
            timeout.tv_usec = 0;
            ptimeout = &timeout;
626
        } else if (timeout_ < 0)
627 628
            ptimeout = NULL;
        else {
629 630
            timeout.tv_sec = static_cast<long> ((end - now) / 1000);
            timeout.tv_usec = static_cast<long> ((end - now) % 1000 * 1000);
631 632 633 634 635
            ptimeout = &timeout;
        }

        //  Wait for events. Ignore interrupts if there's infinite timeout.
        while (true) {
636 637 638 639 640 641
            memcpy (inset.get (), _pollset_in.get (),
                    valid_pollset_bytes (*_pollset_in.get ()));
            memcpy (outset.get (), _pollset_out.get (),
                    valid_pollset_bytes (*_pollset_out.get ()));
            memcpy (errset.get (), _pollset_err.get (),
                    valid_pollset_bytes (*_pollset_err.get ()));
642 643
            const int rc = select (static_cast<int> (_max_fd + 1), inset.get (),
                                   outset.get (), errset.get (), ptimeout);
644 645
#if defined ZMQ_HAVE_WINDOWS
            if (unlikely (rc == SOCKET_ERROR)) {
646
                errno = wsa_error_to_errno (WSAGetLastError ());
647 648 649 650 651 652 653 654 655 656 657 658
                wsa_assert (errno == ENOTSOCK);
                return -1;
            }
#else
            if (unlikely (rc == -1)) {
                errno_assert (errno == EINTR || errno == EBADF);
                return -1;
            }
#endif
            break;
        }

659
        if (_use_signaler && FD_ISSET (_signaler->get_fd (), inset.get ()))
660
            _signaler->recv ();
661 662

        //  Check for the events.
663 664
        const int found = check_events (events_, n_events_, *inset.get (),
                                        *outset.get (), *errset.get ());
665
        if (found) {
666 667
            if (found > 0)
                zero_trail_events (events_, n_events_, found);
668
            return found;
669
        }
670

671 672
        //  Adjust timeout or break
        if (adjust_timeout (clock, timeout_, now, end, first_pass) == 0)
673
            break;
674 675
    }

676
    errno = EAGAIN;
677 678 679
    return -1;

#else
680

681 682 683
    //  Exotic platforms that support neither poll() nor select().
    errno = ENOTSUP;
    return -1;
684

685
#endif
686
}