signaler.cpp 11.5 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30 31
#include "precompiled.hpp"
#include "poller.hpp"
32
#include "polling_util.hpp"
33

34
#if defined ZMQ_POLL_BASED_ON_POLL
35
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_AIX
36
#include <poll.h>
37
#endif
38
#elif defined ZMQ_POLL_BASED_ON_SELECT
39 40 41 42 43 44 45 46
#if defined ZMQ_HAVE_WINDOWS
#elif defined ZMQ_HAVE_HPUX
#include <sys/param.h>
#include <sys/types.h>
#include <sys/time.h>
#elif defined ZMQ_HAVE_OPENVMS
#include <sys/types.h>
#include <sys/time.h>
47 48 49 50 51
#elif defined ZMQ_HAVE_VXWORKS
#include <sys/types.h>
#include <sys/time.h>
#include <sockLib.h>
#include <strings.h>
52 53 54 55 56 57 58
#else
#include <sys/select.h>
#endif
#endif

#include "signaler.hpp"
#include "likely.hpp"
59
#include "stdint.hpp"
60
#include "config.hpp"
61 62 63
#include "err.hpp"
#include "fd.hpp"
#include "ip.hpp"
64
#include "tcp.hpp"
65

66
#if !defined ZMQ_HAVE_WINDOWS
67 68 69 70 71 72
#include <unistd.h>
#include <netinet/tcp.h>
#include <sys/types.h>
#include <sys/socket.h>
#endif

73
#if !defined(ZMQ_HAVE_WINDOWS)
74 75 76 77 78 79
// Helper to sleep for specific number of milliseconds (or until signal)
//
static int sleep_ms (unsigned int ms_)
{
    if (ms_ == 0)
        return 0;
80
#if defined ZMQ_HAVE_ANDROID
81 82
    usleep (ms_ * 1000);
    return 0;
83 84 85 86 87
#elif defined ZMQ_HAVE_VXWORKS
    struct timespec ns_;
    ns_.tv_sec = ms_ / 1000;
    ns_.tv_nsec = ms_ % 1000 * 1000000;
    return nanosleep (&ns_, 0);
88 89 90 91 92 93 94 95 96 97 98 99
#else
    return usleep (ms_ * 1000);
#endif
}

// Helper to wait on close(), for non-blocking sockets, until it completes
// If EAGAIN is received, will sleep briefly (1-100ms) then try again, until
// the overall timeout is reached.
//
static int close_wait_ms (int fd_, unsigned int max_ms_ = 2000)
{
    unsigned int ms_so_far = 0;
100 101 102 103
    const unsigned int min_step_ms = 1;
    const unsigned int max_step_ms = 100;
    const unsigned int step_ms =
      std::min (std::max (min_step_ms, max_ms_ / 10), max_step_ms);
104

105
    int rc = 0; // do not sleep on first attempt
Pieter Hintjens's avatar
Pieter Hintjens committed
106 107
    do {
        if (rc == -1 && errno == EAGAIN) {
108 109 110 111 112 113 114 115 116 117
            sleep_ms (step_ms);
            ms_so_far += step_ms;
        }
        rc = close (fd_);
    } while (ms_so_far < max_ms_ && rc == -1 && errno == EAGAIN);

    return rc;
}
#endif

118 119 120
zmq::signaler_t::signaler_t ()
{
    //  Create the socketpair for signaling.
121 122 123
    if (make_fdpair (&_r, &_w) == 0) {
        unblock_socket (_w);
        unblock_socket (_r);
124
    }
125
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
126
    pid = getpid ();
127
#endif
128 129
}

130
// This might get run after some part of construction failed, leaving one or
131
// both of _r and _w retired_fd.
132 133
zmq::signaler_t::~signaler_t ()
{
134
#if defined ZMQ_HAVE_EVENTFD
135
    if (_r == retired_fd)
136
        return;
137
    int rc = close_wait_ms (_r);
138 139
    errno_assert (rc == 0);
#elif defined ZMQ_HAVE_WINDOWS
140
    if (_w != retired_fd) {
141
        const struct linger so_linger = {1, 0};
142
        int rc = setsockopt (_w, SOL_SOCKET, SO_LINGER,
143 144
                             reinterpret_cast<const char *> (&so_linger),
                             sizeof so_linger);
145 146 147
        //  Only check shutdown if WSASTARTUP was previously done
        if (rc == 0 || WSAGetLastError () != WSANOTINITIALISED) {
            wsa_assert (rc != SOCKET_ERROR);
148
            rc = closesocket (_w);
149
            wsa_assert (rc != SOCKET_ERROR);
150
            if (_r == retired_fd)
151
                return;
152
            rc = closesocket (_r);
153 154
            wsa_assert (rc != SOCKET_ERROR);
        }
Richard Newton's avatar
Richard Newton committed
155
    }
156
#else
157 158
    if (_w != retired_fd) {
        int rc = close_wait_ms (_w);
159 160
        errno_assert (rc == 0);
    }
161 162
    if (_r != retired_fd) {
        int rc = close_wait_ms (_r);
163 164
        errno_assert (rc == 0);
    }
165 166 167
#endif
}

Martin Hurton's avatar
Martin Hurton committed
168
zmq::fd_t zmq::signaler_t::get_fd () const
169
{
170
    return _r;
171 172 173 174
}

void zmq::signaler_t::send ()
{
Martin Hurton's avatar
Martin Hurton committed
175 176
#if defined HAVE_FORK
    if (unlikely (pid != getpid ())) {
177 178 179 180
        //printf("Child process %d signaler_t::send returning without sending #1\n", getpid());
        return; // do not send anything in forked child context
    }
#endif
181 182
#if defined ZMQ_HAVE_EVENTFD
    const uint64_t inc = 1;
183
    ssize_t sz = write (_w, &inc, sizeof (inc));
184 185
    errno_assert (sz == sizeof (inc));
#elif defined ZMQ_HAVE_WINDOWS
186
    unsigned char dummy = 0;
187 188 189 190
    const int nbytes =
      ::send (_w, reinterpret_cast<char *> (&dummy), sizeof (dummy), 0);
    wsa_assert (nbytes != SOCKET_ERROR);
    zmq_assert (nbytes == sizeof (dummy));
191 192 193
#elif defined ZMQ_HAVE_VXWORKS
    unsigned char dummy = 0;
    while (true) {
194
        ssize_t nbytes = ::send (_w, (char *) &dummy, sizeof (dummy), 0);
195 196 197 198 199 200 201 202 203 204 205 206
        if (unlikely (nbytes == -1 && errno == EINTR))
            continue;
#if defined(HAVE_FORK)
        if (unlikely (pid != getpid ())) {
            //printf("Child process %d signaler_t::send returning without sending #2\n", getpid());
            errno = EINTR;
            break;
        }
#endif
        zmq_assert (nbytes == sizeof dummy);
        break;
    }
207 208 209
#else
    unsigned char dummy = 0;
    while (true) {
210
        ssize_t nbytes = ::send (_w, &dummy, sizeof (dummy), 0);
211 212
        if (unlikely (nbytes == -1 && errno == EINTR))
            continue;
213
#if defined(HAVE_FORK)
Martin Hurton's avatar
Martin Hurton committed
214
        if (unlikely (pid != getpid ())) {
215 216 217 218 219
            //printf("Child process %d signaler_t::send returning without sending #2\n", getpid());
            errno = EINTR;
            break;
        }
#endif
Martin Hurton's avatar
Martin Hurton committed
220
        zmq_assert (nbytes == sizeof dummy);
221 222 223 224 225 226 227
        break;
    }
#endif
}

int zmq::signaler_t::wait (int timeout_)
{
228
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
229
    if (unlikely (pid != getpid ())) {
230
        // we have forked and the file descriptor is closed. Emulate an interrupt
231 232 233 234 235 236 237
        // response.
        //printf("Child process %d signaler_t::wait returning simulating interrupt #1\n", getpid());
        errno = EINTR;
        return -1;
    }
#endif

238
#ifdef ZMQ_POLL_BASED_ON_POLL
239
    struct pollfd pfd;
240
    pfd.fd = _r;
241
    pfd.events = POLLIN;
242
    const int rc = poll (&pfd, 1, timeout_);
243
    if (unlikely (rc < 0)) {
244
        errno_assert (errno == EINTR);
245
        return -1;
246 247
    }
    if (unlikely (rc == 0)) {
248 249 250
        errno = EAGAIN;
        return -1;
    }
251
#ifdef HAVE_FORK
252
    if (unlikely (pid != getpid ())) {
253
        // we have forked and the file descriptor is closed. Emulate an interrupt
254 255 256 257 258 259
        // response.
        //printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid());
        errno = EINTR;
        return -1;
    }
#endif
260 261 262 263
    zmq_assert (rc == 1);
    zmq_assert (pfd.revents & POLLIN);
    return 0;

264
#elif defined ZMQ_POLL_BASED_ON_SELECT
265

266
    optimized_fd_set_t fds (1);
267 268
    FD_ZERO (fds.get ());
    FD_SET (_r, fds.get ());
269
    struct timeval timeout;
270 271 272 273
    if (timeout_ >= 0) {
        timeout.tv_sec = timeout_ / 1000;
        timeout.tv_usec = timeout_ % 1000 * 1000;
    }
274
#ifdef ZMQ_HAVE_WINDOWS
275 276
    int rc =
      select (0, fds.get (), NULL, NULL, timeout_ >= 0 ? &timeout : NULL);
277 278
    wsa_assert (rc != SOCKET_ERROR);
#else
279 280
    int rc =
      select (_r + 1, fds.get (), NULL, NULL, timeout_ >= 0 ? &timeout : NULL);
281
    if (unlikely (rc < 0)) {
282
        errno_assert (errno == EINTR);
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298
        return -1;
    }
#endif
    if (unlikely (rc == 0)) {
        errno = EAGAIN;
        return -1;
    }
    zmq_assert (rc == 1);
    return 0;

#else
#error
#endif
}

void zmq::signaler_t::recv ()
299
{
300
//  Attempt to read a signal.
301 302
#if defined ZMQ_HAVE_EVENTFD
    uint64_t dummy;
303
    ssize_t sz = read (_r, &dummy, sizeof (dummy));
304 305 306 307 308 309
    errno_assert (sz == sizeof (dummy));

    //  If we accidentally grabbed the next signal(s) along with the current
    //  one, return it back to the eventfd object.
    if (unlikely (dummy > 1)) {
        const uint64_t inc = dummy - 1;
310
        ssize_t sz2 = write (_w, &inc, sizeof (inc));
311 312 313 314 315 316 317 318
        errno_assert (sz2 == sizeof (inc));
        return;
    }

    zmq_assert (dummy == 1);
#else
    unsigned char dummy;
#if defined ZMQ_HAVE_WINDOWS
319
    const int nbytes =
320
      ::recv (_r, reinterpret_cast<char *> (&dummy), sizeof (dummy), 0);
321
    wsa_assert (nbytes != SOCKET_ERROR);
322
#elif defined ZMQ_HAVE_VXWORKS
323
    ssize_t nbytes = ::recv (_r, (char *) &dummy, sizeof (dummy), 0);
324
    errno_assert (nbytes >= 0);
325
#else
326
    ssize_t nbytes = ::recv (_r, &dummy, sizeof (dummy), 0);
327 328 329 330 331 332 333 334
    errno_assert (nbytes >= 0);
#endif
    zmq_assert (nbytes == sizeof (dummy));
    zmq_assert (dummy == 0);
#endif
}

int zmq::signaler_t::recv_failable ()
335
{
336
//  Attempt to read a signal.
337 338
#if defined ZMQ_HAVE_EVENTFD
    uint64_t dummy;
339
    ssize_t sz = read (_r, &dummy, sizeof (dummy));
340 341
    if (sz == -1) {
        errno_assert (errno == EAGAIN);
342
        return -1;
343 344
    }
    errno_assert (sz == sizeof (dummy));
345

346 347 348 349 350 351 352
    //  If we accidentally grabbed the next signal(s) along with the current
    //  one, return it back to the eventfd object.
    if (unlikely (dummy > 1)) {
        const uint64_t inc = dummy - 1;
        ssize_t sz2 = write (_w, &inc, sizeof (inc));
        errno_assert (sz2 == sizeof (inc));
        return 0;
353
    }
354 355 356

    zmq_assert (dummy == 1);

357
#else
358
    unsigned char dummy;
Martin Sustrik's avatar
Martin Sustrik committed
359
#if defined ZMQ_HAVE_WINDOWS
360
    const int nbytes =
361
      ::recv (_r, reinterpret_cast<char *> (&dummy), sizeof (dummy), 0);
362
    if (nbytes == SOCKET_ERROR) {
363
        const int last_error = WSAGetLastError ();
364 365
        if (last_error == WSAEWOULDBLOCK) {
            errno = EAGAIN;
366
            return -1;
367
        }
368 369
        wsa_assert (last_error == WSAEWOULDBLOCK);
    }
370
#elif defined ZMQ_HAVE_VXWORKS
371
    ssize_t nbytes = ::recv (_r, (char *) &dummy, sizeof (dummy), 0);
372 373 374 375 376 377 378 379
    if (nbytes == -1) {
        if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
            errno = EAGAIN;
            return -1;
        }
        errno_assert (errno == EAGAIN || errno == EWOULDBLOCK
                      || errno == EINTR);
    }
380
#else
381
    ssize_t nbytes = ::recv (_r, &dummy, sizeof (dummy), 0);
382 383 384 385 386
    if (nbytes == -1) {
        if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
            errno = EAGAIN;
            return -1;
        }
387 388
        errno_assert (errno == EAGAIN || errno == EWOULDBLOCK
                      || errno == EINTR);
389
    }
390 391 392
#endif
    zmq_assert (nbytes == sizeof (dummy));
    zmq_assert (dummy == 0);
393
#endif
394
    return 0;
395 396
}

397 398
bool zmq::signaler_t::valid () const
{
399
    return _w != retired_fd;
400 401
}

402
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
403
void zmq::signaler_t::forked ()
404
{
405
    //  Close file descriptors created in the parent and create new pair
406 407 408
    close (_r);
    close (_w);
    make_fdpair (&_r, &_w);
409 410
}
#endif