signaler.cpp 11.5 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30 31
#include "precompiled.hpp"
#include "poller.hpp"
32
#include "polling_util.hpp"
33

34
#if defined ZMQ_POLL_BASED_ON_POLL
35
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_AIX
36
#include <poll.h>
37
#endif
38
#elif defined ZMQ_POLL_BASED_ON_SELECT
39 40 41 42 43 44 45 46
#if defined ZMQ_HAVE_WINDOWS
#elif defined ZMQ_HAVE_HPUX
#include <sys/param.h>
#include <sys/types.h>
#include <sys/time.h>
#elif defined ZMQ_HAVE_OPENVMS
#include <sys/types.h>
#include <sys/time.h>
47 48 49 50 51
#elif defined ZMQ_HAVE_VXWORKS
#include <sys/types.h>
#include <sys/time.h>
#include <sockLib.h>
#include <strings.h>
52 53 54 55 56 57 58
#else
#include <sys/select.h>
#endif
#endif

#include "signaler.hpp"
#include "likely.hpp"
59
#include "stdint.hpp"
60
#include "config.hpp"
61 62 63
#include "err.hpp"
#include "fd.hpp"
#include "ip.hpp"
64
#include "tcp.hpp"
65

66
#if !defined ZMQ_HAVE_WINDOWS
67 68 69 70 71 72
#include <unistd.h>
#include <netinet/tcp.h>
#include <sys/types.h>
#include <sys/socket.h>
#endif

73
#if !defined(ZMQ_HAVE_WINDOWS)
74 75 76 77 78 79 80 81 82 83 84 85
// Helper to sleep for specific number of milliseconds (or until signal)
//
static int sleep_ms (unsigned int ms_)
{
    if (ms_ == 0)
        return 0;
#if defined ZMQ_HAVE_WINDOWS
    Sleep (ms_ > 0 ? ms_ : INFINITE);
    return 0;
#elif defined ZMQ_HAVE_ANDROID
    usleep (ms_ * 1000);
    return 0;
86 87 88 89 90
#elif defined ZMQ_HAVE_VXWORKS
    struct timespec ns_;
    ns_.tv_sec = ms_ / 1000;
    ns_.tv_nsec = ms_ % 1000 * 1000000;
    return nanosleep (&ns_, 0);
91 92 93 94 95 96 97 98 99 100 101 102
#else
    return usleep (ms_ * 1000);
#endif
}

// Helper to wait on close(), for non-blocking sockets, until it completes
// If EAGAIN is received, will sleep briefly (1-100ms) then try again, until
// the overall timeout is reached.
//
static int close_wait_ms (int fd_, unsigned int max_ms_ = 2000)
{
    unsigned int ms_so_far = 0;
103 104 105 106
    const unsigned int min_step_ms = 1;
    const unsigned int max_step_ms = 100;
    const unsigned int step_ms =
      std::min (std::max (min_step_ms, max_ms_ / 10), max_step_ms);
107

108
    int rc = 0; // do not sleep on first attempt
Pieter Hintjens's avatar
Pieter Hintjens committed
109 110
    do {
        if (rc == -1 && errno == EAGAIN) {
111 112 113 114 115 116 117 118 119 120
            sleep_ms (step_ms);
            ms_so_far += step_ms;
        }
        rc = close (fd_);
    } while (ms_so_far < max_ms_ && rc == -1 && errno == EAGAIN);

    return rc;
}
#endif

121 122 123
zmq::signaler_t::signaler_t ()
{
    //  Create the socketpair for signaling.
124 125 126
    if (make_fdpair (&_r, &_w) == 0) {
        unblock_socket (_w);
        unblock_socket (_r);
127
    }
128
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
129
    pid = getpid ();
130
#endif
131 132
}

133
// This might get run after some part of construction failed, leaving one or
134
// both of _r and _w retired_fd.
135 136
zmq::signaler_t::~signaler_t ()
{
137
#if defined ZMQ_HAVE_EVENTFD
138
    if (_r == retired_fd)
139
        return;
140
    int rc = close_wait_ms (_r);
141 142
    errno_assert (rc == 0);
#elif defined ZMQ_HAVE_WINDOWS
143
    if (_w != retired_fd) {
144
        const struct linger so_linger = {1, 0};
145
        int rc = setsockopt (_w, SOL_SOCKET, SO_LINGER,
146 147
                             reinterpret_cast<const char *> (&so_linger),
                             sizeof so_linger);
148 149 150
        //  Only check shutdown if WSASTARTUP was previously done
        if (rc == 0 || WSAGetLastError () != WSANOTINITIALISED) {
            wsa_assert (rc != SOCKET_ERROR);
151
            rc = closesocket (_w);
152
            wsa_assert (rc != SOCKET_ERROR);
153
            if (_r == retired_fd)
154
                return;
155
            rc = closesocket (_r);
156 157
            wsa_assert (rc != SOCKET_ERROR);
        }
Richard Newton's avatar
Richard Newton committed
158
    }
159
#else
160 161
    if (_w != retired_fd) {
        int rc = close_wait_ms (_w);
162 163
        errno_assert (rc == 0);
    }
164 165
    if (_r != retired_fd) {
        int rc = close_wait_ms (_r);
166 167
        errno_assert (rc == 0);
    }
168 169 170
#endif
}

Martin Hurton's avatar
Martin Hurton committed
171
zmq::fd_t zmq::signaler_t::get_fd () const
172
{
173
    return _r;
174 175 176 177
}

void zmq::signaler_t::send ()
{
Martin Hurton's avatar
Martin Hurton committed
178 179
#if defined HAVE_FORK
    if (unlikely (pid != getpid ())) {
180 181 182 183
        //printf("Child process %d signaler_t::send returning without sending #1\n", getpid());
        return; // do not send anything in forked child context
    }
#endif
184 185
#if defined ZMQ_HAVE_EVENTFD
    const uint64_t inc = 1;
186
    ssize_t sz = write (_w, &inc, sizeof (inc));
187 188
    errno_assert (sz == sizeof (inc));
#elif defined ZMQ_HAVE_WINDOWS
189
    unsigned char dummy = 0;
190 191 192 193
    const int nbytes =
      ::send (_w, reinterpret_cast<char *> (&dummy), sizeof (dummy), 0);
    wsa_assert (nbytes != SOCKET_ERROR);
    zmq_assert (nbytes == sizeof (dummy));
194 195 196
#elif defined ZMQ_HAVE_VXWORKS
    unsigned char dummy = 0;
    while (true) {
197
        ssize_t nbytes = ::send (_w, (char *) &dummy, sizeof (dummy), 0);
198 199 200 201 202 203 204 205 206 207 208 209
        if (unlikely (nbytes == -1 && errno == EINTR))
            continue;
#if defined(HAVE_FORK)
        if (unlikely (pid != getpid ())) {
            //printf("Child process %d signaler_t::send returning without sending #2\n", getpid());
            errno = EINTR;
            break;
        }
#endif
        zmq_assert (nbytes == sizeof dummy);
        break;
    }
210 211 212
#else
    unsigned char dummy = 0;
    while (true) {
213
        ssize_t nbytes = ::send (_w, &dummy, sizeof (dummy), 0);
214 215
        if (unlikely (nbytes == -1 && errno == EINTR))
            continue;
216
#if defined(HAVE_FORK)
Martin Hurton's avatar
Martin Hurton committed
217
        if (unlikely (pid != getpid ())) {
218 219 220 221 222
            //printf("Child process %d signaler_t::send returning without sending #2\n", getpid());
            errno = EINTR;
            break;
        }
#endif
Martin Hurton's avatar
Martin Hurton committed
223
        zmq_assert (nbytes == sizeof dummy);
224 225 226 227 228 229 230
        break;
    }
#endif
}

int zmq::signaler_t::wait (int timeout_)
{
231
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
232
    if (unlikely (pid != getpid ())) {
233
        // we have forked and the file descriptor is closed. Emulate an interrupt
234 235 236 237 238 239 240
        // response.
        //printf("Child process %d signaler_t::wait returning simulating interrupt #1\n", getpid());
        errno = EINTR;
        return -1;
    }
#endif

241
#ifdef ZMQ_POLL_BASED_ON_POLL
242
    struct pollfd pfd;
243
    pfd.fd = _r;
244
    pfd.events = POLLIN;
245
    const int rc = poll (&pfd, 1, timeout_);
246
    if (unlikely (rc < 0)) {
247
        errno_assert (errno == EINTR);
248
        return -1;
249 250
    }
    if (unlikely (rc == 0)) {
251 252 253
        errno = EAGAIN;
        return -1;
    }
254
#ifdef HAVE_FORK
255
    if (unlikely (pid != getpid ())) {
256
        // we have forked and the file descriptor is closed. Emulate an interrupt
257 258 259 260 261 262
        // response.
        //printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid());
        errno = EINTR;
        return -1;
    }
#endif
263 264 265 266
    zmq_assert (rc == 1);
    zmq_assert (pfd.revents & POLLIN);
    return 0;

267
#elif defined ZMQ_POLL_BASED_ON_SELECT
268

269 270 271
    optimized_fd_set_t fds (FD_SETSIZE);
    FD_ZERO (fds.get ());
    FD_SET (_r, fds.get ());
272
    struct timeval timeout;
273 274 275 276
    if (timeout_ >= 0) {
        timeout.tv_sec = timeout_ / 1000;
        timeout.tv_usec = timeout_ % 1000 * 1000;
    }
277
#ifdef ZMQ_HAVE_WINDOWS
278 279
    int rc =
      select (0, fds.get (), NULL, NULL, timeout_ >= 0 ? &timeout : NULL);
280 281
    wsa_assert (rc != SOCKET_ERROR);
#else
282 283
    int rc =
      select (_r + 1, fds.get (), NULL, NULL, timeout_ >= 0 ? &timeout : NULL);
284
    if (unlikely (rc < 0)) {
285
        errno_assert (errno == EINTR);
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301
        return -1;
    }
#endif
    if (unlikely (rc == 0)) {
        errno = EAGAIN;
        return -1;
    }
    zmq_assert (rc == 1);
    return 0;

#else
#error
#endif
}

void zmq::signaler_t::recv ()
302
{
303
//  Attempt to read a signal.
304 305
#if defined ZMQ_HAVE_EVENTFD
    uint64_t dummy;
306
    ssize_t sz = read (_r, &dummy, sizeof (dummy));
307 308 309 310 311 312
    errno_assert (sz == sizeof (dummy));

    //  If we accidentally grabbed the next signal(s) along with the current
    //  one, return it back to the eventfd object.
    if (unlikely (dummy > 1)) {
        const uint64_t inc = dummy - 1;
313
        ssize_t sz2 = write (_w, &inc, sizeof (inc));
314 315 316 317 318 319 320 321
        errno_assert (sz2 == sizeof (inc));
        return;
    }

    zmq_assert (dummy == 1);
#else
    unsigned char dummy;
#if defined ZMQ_HAVE_WINDOWS
322
    const int nbytes =
323
      ::recv (_r, reinterpret_cast<char *> (&dummy), sizeof (dummy), 0);
324
    wsa_assert (nbytes != SOCKET_ERROR);
325
#elif defined ZMQ_HAVE_VXWORKS
326
    ssize_t nbytes = ::recv (_r, (char *) &dummy, sizeof (dummy), 0);
327
    errno_assert (nbytes >= 0);
328
#else
329
    ssize_t nbytes = ::recv (_r, &dummy, sizeof (dummy), 0);
330 331 332 333 334 335 336 337
    errno_assert (nbytes >= 0);
#endif
    zmq_assert (nbytes == sizeof (dummy));
    zmq_assert (dummy == 0);
#endif
}

int zmq::signaler_t::recv_failable ()
338
{
339
//  Attempt to read a signal.
340 341
#if defined ZMQ_HAVE_EVENTFD
    uint64_t dummy;
342
    ssize_t sz = read (_r, &dummy, sizeof (dummy));
343 344
    if (sz == -1) {
        errno_assert (errno == EAGAIN);
345
        return -1;
346 347
    }
    errno_assert (sz == sizeof (dummy));
348

349 350 351 352 353 354 355
    //  If we accidentally grabbed the next signal(s) along with the current
    //  one, return it back to the eventfd object.
    if (unlikely (dummy > 1)) {
        const uint64_t inc = dummy - 1;
        ssize_t sz2 = write (_w, &inc, sizeof (inc));
        errno_assert (sz2 == sizeof (inc));
        return 0;
356
    }
357 358 359

    zmq_assert (dummy == 1);

360
#else
361
    unsigned char dummy;
Martin Sustrik's avatar
Martin Sustrik committed
362
#if defined ZMQ_HAVE_WINDOWS
363
    const int nbytes =
364
      ::recv (_r, reinterpret_cast<char *> (&dummy), sizeof (dummy), 0);
365
    if (nbytes == SOCKET_ERROR) {
366
        const int last_error = WSAGetLastError ();
367 368
        if (last_error == WSAEWOULDBLOCK) {
            errno = EAGAIN;
369
            return -1;
370
        }
371 372
        wsa_assert (last_error == WSAEWOULDBLOCK);
    }
373
#elif defined ZMQ_HAVE_VXWORKS
374
    ssize_t nbytes = ::recv (_r, (char *) &dummy, sizeof (dummy), 0);
375 376 377 378 379 380 381 382
    if (nbytes == -1) {
        if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
            errno = EAGAIN;
            return -1;
        }
        errno_assert (errno == EAGAIN || errno == EWOULDBLOCK
                      || errno == EINTR);
    }
383
#else
384
    ssize_t nbytes = ::recv (_r, &dummy, sizeof (dummy), 0);
385 386 387 388 389
    if (nbytes == -1) {
        if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
            errno = EAGAIN;
            return -1;
        }
390 391
        errno_assert (errno == EAGAIN || errno == EWOULDBLOCK
                      || errno == EINTR);
392
    }
393 394 395
#endif
    zmq_assert (nbytes == sizeof (dummy));
    zmq_assert (dummy == 0);
396
#endif
397
    return 0;
398 399
}

400 401
bool zmq::signaler_t::valid () const
{
402
    return _w != retired_fd;
403 404
}

405
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
406
void zmq::signaler_t::forked ()
407
{
408
    //  Close file descriptors created in the parent and create new pair
409 410 411
    close (_r);
    close (_w);
    make_fdpair (&_r, &_w);
412 413
}
#endif