signaler.cpp 15.6 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20
#include "poller.hpp"
21 22 23 24 25

//  On AIX, poll.h has to be included before zmq.h to get consistent
//  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
//  instead of 'events' and 'revents' and defines macros to map from POSIX-y
//  names to AIX-specific names).
26
#if defined ZMQ_POLL_BASED_ON_POLL
27
#include <poll.h>
28
#elif defined ZMQ_POLL_BASED_ON_SELECT
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#elif defined ZMQ_HAVE_HPUX
#include <sys/param.h>
#include <sys/types.h>
#include <sys/time.h>
#elif defined ZMQ_HAVE_OPENVMS
#include <sys/types.h>
#include <sys/time.h>
#else
#include <sys/select.h>
#endif
#endif

#include "signaler.hpp"
#include "likely.hpp"
45
#include "stdint.hpp"
46
#include "config.hpp"
47 48 49 50
#include "err.hpp"
#include "fd.hpp"
#include "ip.hpp"

51 52 53 54
#if defined ZMQ_HAVE_EVENTFD
#include <sys/eventfd.h>
#endif

55 56 57 58 59 60 61 62 63 64
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#else
#include <unistd.h>
#include <netinet/tcp.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#endif

65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
#if !defined (ZMQ_HAVE_WINDOWS)
// Helper to sleep for specific number of milliseconds (or until signal)
//
static int sleep_ms (unsigned int ms_)
{
    if (ms_ == 0)
        return 0;
#if defined ZMQ_HAVE_WINDOWS
    Sleep (ms_ > 0 ? ms_ : INFINITE);
    return 0;
#elif defined ZMQ_HAVE_ANDROID
    usleep (ms_ * 1000);
    return 0;
#else
    return usleep (ms_ * 1000);
#endif
}

// Helper to wait on close(), for non-blocking sockets, until it completes
// If EAGAIN is received, will sleep briefly (1-100ms) then try again, until
// the overall timeout is reached.
//
static int close_wait_ms (int fd_, unsigned int max_ms_ = 2000)
{
    unsigned int ms_so_far = 0;
    unsigned int step_ms   = max_ms_ / 10;
    if (step_ms < 1)
        step_ms = 1;

    if (step_ms > 100)
        step_ms = 100;

    int rc = 0;       // do not sleep on first attempt

    do
    {
        if (rc == -1 && errno == EAGAIN)
        {
            sleep_ms (step_ms);
            ms_so_far += step_ms;
        }

        rc = close (fd_);
    } while (ms_so_far < max_ms_ && rc == -1 && errno == EAGAIN);

    return rc;
}
#endif

114 115 116
zmq::signaler_t::signaler_t ()
{
    //  Create the socketpair for signaling.
Pieter Hintjens's avatar
Pieter Hintjens committed
117 118 119 120
    if (make_fdpair (&r, &w) == 0) {
        unblock_socket (w);
        unblock_socket (r);
    }
121
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
122
    pid = getpid ();
123
#endif
124 125 126 127
}

zmq::signaler_t::~signaler_t ()
{
128
#if defined ZMQ_HAVE_EVENTFD
129
    int rc = close_wait_ms (r);
130 131
    errno_assert (rc == 0);
#elif defined ZMQ_HAVE_WINDOWS
Martin Hurton's avatar
Martin Hurton committed
132
    const struct linger so_linger = { 1, 0 };
133
    int rc = setsockopt (w, SOL_SOCKET, SO_LINGER,
Martin Hurton's avatar
Martin Hurton committed
134
        (const char *) &so_linger, sizeof so_linger);
135 136
    wsa_assert (rc != SOCKET_ERROR);
    rc = closesocket (w);
137 138 139 140
    wsa_assert (rc != SOCKET_ERROR);
    rc = closesocket (r);
    wsa_assert (rc != SOCKET_ERROR);
#else
141
    int rc = close_wait_ms (w);
142
    errno_assert (rc == 0);
143
    rc = close_wait_ms (r);
144
    errno_assert (rc == 0);
145 146 147
#endif
}

Martin Hurton's avatar
Martin Hurton committed
148
zmq::fd_t zmq::signaler_t::get_fd () const
149 150 151 152 153 154
{
    return r;
}

void zmq::signaler_t::send ()
{
Martin Hurton's avatar
Martin Hurton committed
155 156
#if defined HAVE_FORK
    if (unlikely (pid != getpid ())) {
157 158 159 160
        //printf("Child process %d signaler_t::send returning without sending #1\n", getpid());
        return; // do not send anything in forked child context
    }
#endif
161 162 163 164 165
#if defined ZMQ_HAVE_EVENTFD
    const uint64_t inc = 1;
    ssize_t sz = write (w, &inc, sizeof (inc));
    errno_assert (sz == sizeof (inc));
#elif defined ZMQ_HAVE_WINDOWS
166
    unsigned char dummy = 0;
Martin Sustrik's avatar
Martin Sustrik committed
167
    int nbytes = ::send (w, (char*) &dummy, sizeof (dummy), 0);
168 169 170 171 172 173 174 175
    wsa_assert (nbytes != SOCKET_ERROR);
    zmq_assert (nbytes == sizeof (dummy));
#else
    unsigned char dummy = 0;
    while (true) {
        ssize_t nbytes = ::send (w, &dummy, sizeof (dummy), 0);
        if (unlikely (nbytes == -1 && errno == EINTR))
            continue;
176
#if defined(HAVE_FORK)
Martin Hurton's avatar
Martin Hurton committed
177
        if (unlikely (pid != getpid ())) {
178 179 180 181 182
            //printf("Child process %d signaler_t::send returning without sending #2\n", getpid());
            errno = EINTR;
            break;
        }
#endif
Martin Hurton's avatar
Martin Hurton committed
183
        zmq_assert (nbytes == sizeof dummy);
184 185 186 187 188 189 190
        break;
    }
#endif
}

int zmq::signaler_t::wait (int timeout_)
{
191
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
192
    if (unlikely (pid != getpid ())) {
193 194 195 196 197 198 199 200
        // we have forked and the file descriptor is closed. Emulate an interupt
        // response.
        //printf("Child process %d signaler_t::wait returning simulating interrupt #1\n", getpid());
        errno = EINTR;
        return -1;
    }
#endif

201
#ifdef ZMQ_POLL_BASED_ON_POLL
202 203 204 205 206
    struct pollfd pfd;
    pfd.fd = r;
    pfd.events = POLLIN;
    int rc = poll (&pfd, 1, timeout_);
    if (unlikely (rc < 0)) {
207
        errno_assert (errno == EINTR);
208 209
        return -1;
    }
210 211
    else
    if (unlikely (rc == 0)) {
212 213 214
        errno = EAGAIN;
        return -1;
    }
215
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
216 217
    else
    if (unlikely (pid != getpid ())) {
218 219 220 221 222 223 224
        // we have forked and the file descriptor is closed. Emulate an interupt
        // response.
        //printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid());
        errno = EINTR;
        return -1;
    }
#endif
225 226 227 228
    zmq_assert (rc == 1);
    zmq_assert (pfd.revents & POLLIN);
    return 0;

229
#elif defined ZMQ_POLL_BASED_ON_SELECT
230 231 232 233 234

    fd_set fds;
    FD_ZERO (&fds);
    FD_SET (r, &fds);
    struct timeval timeout;
235 236 237 238
    if (timeout_ >= 0) {
        timeout.tv_sec = timeout_ / 1000;
        timeout.tv_usec = timeout_ % 1000 * 1000;
    }
239
#ifdef ZMQ_HAVE_WINDOWS
240 241
    int rc = select (0, &fds, NULL, NULL,
        timeout_ >= 0 ? &timeout : NULL);
242 243
    wsa_assert (rc != SOCKET_ERROR);
#else
244 245
    int rc = select (r + 1, &fds, NULL, NULL,
        timeout_ >= 0 ? &timeout : NULL);
246
    if (unlikely (rc < 0)) {
247
        errno_assert (errno == EINTR);
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265
        return -1;
    }
#endif
    if (unlikely (rc == 0)) {
        errno = EAGAIN;
        return -1;
    }
    zmq_assert (rc == 1);
    return 0;

#else
#error
#endif
}

void zmq::signaler_t::recv ()
{
    //  Attempt to read a signal.
266 267 268 269
#if defined ZMQ_HAVE_EVENTFD
    uint64_t dummy;
    ssize_t sz = read (r, &dummy, sizeof (dummy));
    errno_assert (sz == sizeof (dummy));
270 271 272 273 274

    //  If we accidentally grabbed the next signal along with the current
    //  one, return it back to the eventfd object.
    if (unlikely (dummy == 2)) {
        const uint64_t inc = 1;
275 276
        ssize_t sz2 = write (w, &inc, sizeof (inc));
        errno_assert (sz2 == sizeof (inc));
277 278 279
        return;
    }

280 281
    zmq_assert (dummy == 1);
#else
282
    unsigned char dummy;
Martin Sustrik's avatar
Martin Sustrik committed
283 284
#if defined ZMQ_HAVE_WINDOWS
    int nbytes = ::recv (r, (char*) &dummy, sizeof (dummy), 0);
285 286 287 288 289 290 291
    wsa_assert (nbytes != SOCKET_ERROR);
#else
    ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
    errno_assert (nbytes >= 0);
#endif
    zmq_assert (nbytes == sizeof (dummy));
    zmq_assert (dummy == 0);
292
#endif
293 294
}

295
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
296
void zmq::signaler_t::forked ()
297
{
298 299 300
    //  Close file descriptors created in the parent and create new pair
    close (r);
    close (w);
Pieter Hintjens's avatar
Pieter Hintjens committed
301
    make_fdpair (&r, &w);
302 303 304
}
#endif

Pieter Hintjens's avatar
Pieter Hintjens committed
305
//  Returns -1 if we could not make the socket pair successfully
306 307
int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
{
308 309
#if defined ZMQ_HAVE_EVENTFD
    fd_t fd = eventfd (0, 0);
Pieter Hintjens's avatar
Pieter Hintjens committed
310 311 312 313 314 315 316 317 318
    if (fd == -1) {
        errno_assert (errno == ENFILE || errno == EMFILE);
        *w_ = *r_ = -1;
        return -1;
    }
    else {
        *w_ = *r_ = fd;
        return 0;
    }
319 320

#elif defined ZMQ_HAVE_WINDOWS
Pieter Hintjens's avatar
Pieter Hintjens committed
321
#   if !defined _WIN32_WCE
322
    // Windows CE does not manage security attributes
Matt Arsenault's avatar
Matt Arsenault committed
323 324
    SECURITY_DESCRIPTOR sd;
    SECURITY_ATTRIBUTES sa;
Martin Hurton's avatar
Martin Hurton committed
325 326
    memset (&sd, 0, sizeof sd);
    memset (&sa, 0, sizeof sa);
Matthew Metnetsky's avatar
Matthew Metnetsky committed
327

Martin Hurton's avatar
Martin Hurton committed
328 329
    InitializeSecurityDescriptor (&sd, SECURITY_DESCRIPTOR_REVISION);
    SetSecurityDescriptorDacl (&sd, TRUE, 0, FALSE);
Matthew Metnetsky's avatar
Matthew Metnetsky committed
330

Martin Hurton's avatar
Martin Hurton committed
331
    sa.nLength = sizeof (SECURITY_ATTRIBUTES);
Matthew Metnetsky's avatar
Matthew Metnetsky committed
332
    sa.lpSecurityDescriptor = &sd;
Pieter Hintjens's avatar
Pieter Hintjens committed
333
#   endif
334

335 336 337 338
    //  This function has to be in a system-wide critical section so that
    //  two instances of the library don't accidentally create signaler
    //  crossing the process boundary.
    //  We'll use named event object to implement the critical section.
Martin Hurton's avatar
Martin Hurton committed
339 340 341
    //  Note that if the event object already exists, the CreateEvent requests
    //  EVENT_ALL_ACCESS access right. If this fails, we try to open
    //  the event object asking for SYNCHRONIZE access only.
342 343 344
    HANDLE sync = NULL;

    //  Create critical section only if using fixed signaler port
345 346 347 348 349
    //  Use problematic Event implementation for compatibility if using old port 5905.
    //  Otherwise use Mutex implementation.
    int event_signaler_port = 5905;

    if (signaler_port == event_signaler_port) {
350
#       if !defined _WIN32_WCE
351
        sync = CreateEventW (&sa, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
352
#       else
353
        sync = CreateEventW (NULL, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
354 355
#       endif
        if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
356
            sync = OpenEventW (SYNCHRONIZE | EVENT_MODIFY_STATE,
357
                              FALSE, L"Global\\zmq-signaler-port-sync");
358 359 360

        win_assert (sync != NULL);
    }
Martin Hurton's avatar
Martin Hurton committed
361 362 363
    else
    if (signaler_port != 0) {
        wchar_t mutex_name [MAX_PATH];
364 365 366
#       ifdef __MINGW32__
        _snwprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d", signaler_port);
#       else
Martin Hurton's avatar
Martin Hurton committed
367
        swprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d", signaler_port);
368
#       endif
369 370

#       if !defined _WIN32_WCE
371
        sync = CreateMutexW (&sa, FALSE, mutex_name);
372
#       else
373
        sync = CreateMutexW (NULL, FALSE, mutex_name);
374 375
#       endif
        if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
376
            sync = OpenMutexW (SYNCHRONIZE, FALSE, mutex_name);
377 378 379

        win_assert (sync != NULL);
    }
380

381 382 383 384 385 386 387
    //  Windows has no 'socketpair' function. CreatePipe is no good as pipe
    //  handles cannot be polled on. Here we create the socketpair by hand.
    *w_ = INVALID_SOCKET;
    *r_ = INVALID_SOCKET;

    //  Create listening socket.
    SOCKET listener;
388
    listener = open_socket (AF_INET, SOCK_STREAM, 0);
389 390 391 392 393
    wsa_assert (listener != INVALID_SOCKET);

    //  Set SO_REUSEADDR and TCP_NODELAY on listening socket.
    BOOL so_reuseaddr = 1;
    int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
Martin Hurton's avatar
Martin Hurton committed
394
        (char *)&so_reuseaddr, sizeof so_reuseaddr);
395 396 397
    wsa_assert (rc != SOCKET_ERROR);
    BOOL tcp_nodelay = 1;
    rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY,
Martin Hurton's avatar
Martin Hurton committed
398
        (char *)&tcp_nodelay, sizeof tcp_nodelay);
399 400
    wsa_assert (rc != SOCKET_ERROR);

401
    //  Init sockaddr to signaler port.
402
    struct sockaddr_in addr;
Martin Hurton's avatar
Martin Hurton committed
403
    memset (&addr, 0, sizeof addr);
404 405
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
406
    addr.sin_port = htons (signaler_port);
407 408

    //  Create the writer socket.
409
    *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
410 411 412 413
    wsa_assert (*w_ != INVALID_SOCKET);

    //  Set TCP_NODELAY on writer socket.
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY,
Martin Hurton's avatar
Martin Hurton committed
414
        (char *) &tcp_nodelay, sizeof tcp_nodelay);
415 416
    wsa_assert (rc != SOCKET_ERROR);

417 418 419
    if (sync != NULL) {
        //  Enter the critical section.
        DWORD dwrc = WaitForSingleObject (sync, INFINITE);
420
        zmq_assert (dwrc == WAIT_OBJECT_0 || dwrc == WAIT_ABANDONED);
421
    }
422 423

    //  Bind listening socket to signaler port.
Martin Hurton's avatar
Martin Hurton committed
424
    rc = bind (listener, (const struct sockaddr*) &addr, sizeof addr);
425

426 427
    if (rc != SOCKET_ERROR && signaler_port == 0) {
        //  Retrieve ephemeral port number
Martin Hurton's avatar
Martin Hurton committed
428
        int addrlen = sizeof addr;
429 430 431
        rc = getsockname (listener, (struct sockaddr*) &addr, &addrlen);
    }

432 433 434 435
    //  Listen for incoming connections.
    if (rc != SOCKET_ERROR)
        rc = listen (listener, 1);

436
    //  Connect writer to the listener.
437
    if (rc != SOCKET_ERROR)
Martin Hurton's avatar
Martin Hurton committed
438
        rc = connect (*w_, (struct sockaddr*) &addr, sizeof addr);
439

440 441
    //  Accept connection from writer.
    if (rc != SOCKET_ERROR)
442
        *r_ = accept (listener, NULL, NULL);
443 444 445 446 447 448

    //  Save errno if error occurred in bind/listen/connect/accept.
    int saved_errno = 0;
    if (*r_ == INVALID_SOCKET)
        saved_errno = WSAGetLastError ();

449
    //  We don't need the listening socket anymore. Close it.
450
    closesocket (listener);
451

452 453
    if (sync != NULL) {
        //  Exit the critical section.
454 455 456 457 458
        BOOL brc;
        if (signaler_port == event_signaler_port)
            brc = SetEvent (sync);
        else
            brc = ReleaseMutex (sync);
459
        win_assert (brc != 0);
460

461 462 463 464
        //  Release the kernel object
        brc = CloseHandle (sync);
        win_assert (brc != 0);
    }
465

466
    if (*r_ != INVALID_SOCKET) {
Pieter Hintjens's avatar
Pieter Hintjens committed
467
#   if !defined _WIN32_WCE
468
        //  On Windows, preventing sockets to be inherited by child processes.
469
        BOOL brc = SetHandleInformation ((HANDLE) *r_, HANDLE_FLAG_INHERIT, 0);
470
        win_assert (brc);
Pieter Hintjens's avatar
Pieter Hintjens committed
471
#   endif
472
        return 0;
Pieter Hintjens's avatar
Pieter Hintjens committed
473 474
    }
    else {
475
        //  Cleanup writer if connection failed
476 477 478 479 480
        if (*w_ != INVALID_SOCKET) {
            rc = closesocket (*w_);
            wsa_assert (rc != SOCKET_ERROR);
            *w_ = INVALID_SOCKET;
        }
481
        //  Set errno from saved value
482
        errno = wsa_error_to_errno (saved_errno);
483 484
        return -1;
    }
485 486 487 488 489 490 491 492 493

#elif defined ZMQ_HAVE_OPENVMS

    //  Whilst OpenVMS supports socketpair - it maps to AF_INET only.  Further,
    //  it does not set the socket options TCP_NODELAY and TCP_NODELACK which
    //  can lead to performance problems.
    //
    //  The bug will be fixed in V5.6 ECO4 and beyond.  In the meantime, we'll
    //  create the socket pair manually.
494
    struct sockaddr_in lcladdr;
Martin Hurton's avatar
Martin Hurton committed
495
    memset (&lcladdr, 0, sizeof lcladdr);
496 497 498 499
    lcladdr.sin_family = AF_INET;
    lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
    lcladdr.sin_port = 0;

500
    int listener = open_socket (AF_INET, SOCK_STREAM, 0);
501 502 503
    errno_assert (listener != -1);

    int on = 1;
Martin Hurton's avatar
Martin Hurton committed
504
    int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
505 506
    errno_assert (rc != -1);

Martin Hurton's avatar
Martin Hurton committed
507
    rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
508 509
    errno_assert (rc != -1);

Martin Hurton's avatar
Martin Hurton committed
510
    rc = bind (listener, (struct sockaddr*) &lcladdr, sizeof lcladdr);
511 512
    errno_assert (rc != -1);

Martin Hurton's avatar
Martin Hurton committed
513
    socklen_t lcladdr_len = sizeof lcladdr;
514 515 516 517 518 519 520

    rc = getsockname (listener, (struct sockaddr*) &lcladdr, &lcladdr_len);
    errno_assert (rc != -1);

    rc = listen (listener, 1);
    errno_assert (rc != -1);

521
    *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
522 523
    errno_assert (*w_ != -1);

Martin Hurton's avatar
Martin Hurton committed
524
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
525 526
    errno_assert (rc != -1);

Martin Hurton's avatar
Martin Hurton committed
527
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
528 529
    errno_assert (rc != -1);

Martin Hurton's avatar
Martin Hurton committed
530
    rc = connect (*w_, (struct sockaddr*) &lcladdr, sizeof lcladdr);
531 532 533 534 535 536 537 538 539
    errno_assert (rc != -1);

    *r_ = accept (listener, NULL, NULL);
    errno_assert (*r_ != -1);

    close (listener);

    return 0;

Pieter Hintjens's avatar
Pieter Hintjens committed
540 541
#else
    // All other implementations support socketpair()
542 543
    int sv [2];
    int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
Pieter Hintjens's avatar
Pieter Hintjens committed
544 545
    if (rc == -1) {
        errno_assert (errno == ENFILE || errno == EMFILE);
546
        *w_ = *r_ = -1;
Pieter Hintjens's avatar
Pieter Hintjens committed
547 548 549 550 551 552 553
        return -1;
    }
    else {
        *w_ = sv [0];
        *r_ = sv [1];
        return 0;
    }
554 555
#endif
}