signaler.cpp 15.7 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20
#include "poller.hpp"
21 22 23 24 25

//  On AIX, poll.h has to be included before zmq.h to get consistent
//  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
//  instead of 'events' and 'revents' and defines macros to map from POSIX-y
//  names to AIX-specific names).
26
#if defined ZMQ_POLL_BASED_ON_POLL
27
#include <poll.h>
28
#elif defined ZMQ_POLL_BASED_ON_SELECT
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#elif defined ZMQ_HAVE_HPUX
#include <sys/param.h>
#include <sys/types.h>
#include <sys/time.h>
#elif defined ZMQ_HAVE_OPENVMS
#include <sys/types.h>
#include <sys/time.h>
#else
#include <sys/select.h>
#endif
#endif

#include "signaler.hpp"
#include "likely.hpp"
45
#include "stdint.hpp"
46
#include "config.hpp"
47 48 49 50
#include "err.hpp"
#include "fd.hpp"
#include "ip.hpp"

51 52 53 54
#if defined ZMQ_HAVE_EVENTFD
#include <sys/eventfd.h>
#endif

55 56 57 58 59 60 61 62 63 64
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#else
#include <unistd.h>
#include <netinet/tcp.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#endif

65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
#if !defined (ZMQ_HAVE_WINDOWS)
// Helper to sleep for specific number of milliseconds (or until signal)
//
static int sleep_ms (unsigned int ms_)
{
    if (ms_ == 0)
        return 0;
#if defined ZMQ_HAVE_WINDOWS
    Sleep (ms_ > 0 ? ms_ : INFINITE);
    return 0;
#elif defined ZMQ_HAVE_ANDROID
    usleep (ms_ * 1000);
    return 0;
#else
    return usleep (ms_ * 1000);
#endif
}

// Helper to wait on close(), for non-blocking sockets, until it completes
// If EAGAIN is received, will sleep briefly (1-100ms) then try again, until
// the overall timeout is reached.
//
static int close_wait_ms (int fd_, unsigned int max_ms_ = 2000)
{
    unsigned int ms_so_far = 0;
    unsigned int step_ms   = max_ms_ / 10;
    if (step_ms < 1)
        step_ms = 1;

    if (step_ms > 100)
        step_ms = 100;

    int rc = 0;       // do not sleep on first attempt

    do
    {
        if (rc == -1 && errno == EAGAIN)
        {
            sleep_ms (step_ms);
            ms_so_far += step_ms;
        }

        rc = close (fd_);
    } while (ms_so_far < max_ms_ && rc == -1 && errno == EAGAIN);

    return rc;
}
#endif

114 115 116
zmq::signaler_t::signaler_t ()
{
    //  Create the socketpair for signaling.
Pieter Hintjens's avatar
Pieter Hintjens committed
117 118 119 120
    if (make_fdpair (&r, &w) == 0) {
        unblock_socket (w);
        unblock_socket (r);
    }
121
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
122
    pid = getpid ();
123
#endif
124 125 126 127
}

zmq::signaler_t::~signaler_t ()
{
128
#if defined ZMQ_HAVE_EVENTFD
129
    int rc = close_wait_ms (r);
130 131
    errno_assert (rc == 0);
#elif defined ZMQ_HAVE_WINDOWS
Martin Hurton's avatar
Martin Hurton committed
132
    const struct linger so_linger = { 1, 0 };
133
    int rc = setsockopt (w, SOL_SOCKET, SO_LINGER,
Martin Hurton's avatar
Martin Hurton committed
134
        (const char *) &so_linger, sizeof so_linger);
135 136 137 138 139 140 141 142
    //  Only check shutdown if WSASTARTUP was previously done
    if (rc == 0 || WSAGetLastError () != WSANOTINITIALISED) {
        wsa_assert (rc != SOCKET_ERROR);
        rc = closesocket (w);
        wsa_assert (rc != SOCKET_ERROR);
        rc = closesocket (r);
        wsa_assert (rc != SOCKET_ERROR);
    }
143
#else
144
    int rc = close_wait_ms (w);
145
    errno_assert (rc == 0);
146
    rc = close_wait_ms (r);
147
    errno_assert (rc == 0);
148 149 150
#endif
}

Martin Hurton's avatar
Martin Hurton committed
151
zmq::fd_t zmq::signaler_t::get_fd () const
152 153 154 155 156 157
{
    return r;
}

void zmq::signaler_t::send ()
{
Martin Hurton's avatar
Martin Hurton committed
158 159
#if defined HAVE_FORK
    if (unlikely (pid != getpid ())) {
160 161 162 163
        //printf("Child process %d signaler_t::send returning without sending #1\n", getpid());
        return; // do not send anything in forked child context
    }
#endif
164 165 166 167 168
#if defined ZMQ_HAVE_EVENTFD
    const uint64_t inc = 1;
    ssize_t sz = write (w, &inc, sizeof (inc));
    errno_assert (sz == sizeof (inc));
#elif defined ZMQ_HAVE_WINDOWS
169
    unsigned char dummy = 0;
Martin Sustrik's avatar
Martin Sustrik committed
170
    int nbytes = ::send (w, (char*) &dummy, sizeof (dummy), 0);
171 172 173 174 175 176 177 178
    wsa_assert (nbytes != SOCKET_ERROR);
    zmq_assert (nbytes == sizeof (dummy));
#else
    unsigned char dummy = 0;
    while (true) {
        ssize_t nbytes = ::send (w, &dummy, sizeof (dummy), 0);
        if (unlikely (nbytes == -1 && errno == EINTR))
            continue;
179
#if defined(HAVE_FORK)
Martin Hurton's avatar
Martin Hurton committed
180
        if (unlikely (pid != getpid ())) {
181 182 183 184 185
            //printf("Child process %d signaler_t::send returning without sending #2\n", getpid());
            errno = EINTR;
            break;
        }
#endif
Martin Hurton's avatar
Martin Hurton committed
186
        zmq_assert (nbytes == sizeof dummy);
187 188 189 190 191 192 193
        break;
    }
#endif
}

int zmq::signaler_t::wait (int timeout_)
{
194
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
195
    if (unlikely (pid != getpid ())) {
196 197 198 199 200 201 202 203
        // we have forked and the file descriptor is closed. Emulate an interupt
        // response.
        //printf("Child process %d signaler_t::wait returning simulating interrupt #1\n", getpid());
        errno = EINTR;
        return -1;
    }
#endif

204
#ifdef ZMQ_POLL_BASED_ON_POLL
205 206 207 208 209
    struct pollfd pfd;
    pfd.fd = r;
    pfd.events = POLLIN;
    int rc = poll (&pfd, 1, timeout_);
    if (unlikely (rc < 0)) {
210
        errno_assert (errno == EINTR);
211 212
        return -1;
    }
213 214
    else
    if (unlikely (rc == 0)) {
215 216 217
        errno = EAGAIN;
        return -1;
    }
218
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
219 220
    else
    if (unlikely (pid != getpid ())) {
221 222 223 224 225 226 227
        // we have forked and the file descriptor is closed. Emulate an interupt
        // response.
        //printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid());
        errno = EINTR;
        return -1;
    }
#endif
228 229 230 231
    zmq_assert (rc == 1);
    zmq_assert (pfd.revents & POLLIN);
    return 0;

232
#elif defined ZMQ_POLL_BASED_ON_SELECT
233 234 235 236 237

    fd_set fds;
    FD_ZERO (&fds);
    FD_SET (r, &fds);
    struct timeval timeout;
238 239 240 241
    if (timeout_ >= 0) {
        timeout.tv_sec = timeout_ / 1000;
        timeout.tv_usec = timeout_ % 1000 * 1000;
    }
242
#ifdef ZMQ_HAVE_WINDOWS
243 244
    int rc = select (0, &fds, NULL, NULL,
        timeout_ >= 0 ? &timeout : NULL);
245 246
    wsa_assert (rc != SOCKET_ERROR);
#else
247 248
    int rc = select (r + 1, &fds, NULL, NULL,
        timeout_ >= 0 ? &timeout : NULL);
249
    if (unlikely (rc < 0)) {
250
        errno_assert (errno == EINTR);
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268
        return -1;
    }
#endif
    if (unlikely (rc == 0)) {
        errno = EAGAIN;
        return -1;
    }
    zmq_assert (rc == 1);
    return 0;

#else
#error
#endif
}

void zmq::signaler_t::recv ()
{
    //  Attempt to read a signal.
269 270 271 272
#if defined ZMQ_HAVE_EVENTFD
    uint64_t dummy;
    ssize_t sz = read (r, &dummy, sizeof (dummy));
    errno_assert (sz == sizeof (dummy));
273 274 275 276 277

    //  If we accidentally grabbed the next signal along with the current
    //  one, return it back to the eventfd object.
    if (unlikely (dummy == 2)) {
        const uint64_t inc = 1;
278 279
        ssize_t sz2 = write (w, &inc, sizeof (inc));
        errno_assert (sz2 == sizeof (inc));
280 281 282
        return;
    }

283 284
    zmq_assert (dummy == 1);
#else
285
    unsigned char dummy;
Martin Sustrik's avatar
Martin Sustrik committed
286 287
#if defined ZMQ_HAVE_WINDOWS
    int nbytes = ::recv (r, (char*) &dummy, sizeof (dummy), 0);
288 289 290 291 292 293 294
    wsa_assert (nbytes != SOCKET_ERROR);
#else
    ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
    errno_assert (nbytes >= 0);
#endif
    zmq_assert (nbytes == sizeof (dummy));
    zmq_assert (dummy == 0);
295
#endif
296 297
}

298
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
299
void zmq::signaler_t::forked ()
300
{
301 302 303
    //  Close file descriptors created in the parent and create new pair
    close (r);
    close (w);
Pieter Hintjens's avatar
Pieter Hintjens committed
304
    make_fdpair (&r, &w);
305 306 307
}
#endif

Pieter Hintjens's avatar
Pieter Hintjens committed
308
//  Returns -1 if we could not make the socket pair successfully
309 310
int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
{
311 312
#if defined ZMQ_HAVE_EVENTFD
    fd_t fd = eventfd (0, 0);
Pieter Hintjens's avatar
Pieter Hintjens committed
313 314 315 316 317 318 319 320 321
    if (fd == -1) {
        errno_assert (errno == ENFILE || errno == EMFILE);
        *w_ = *r_ = -1;
        return -1;
    }
    else {
        *w_ = *r_ = fd;
        return 0;
    }
322 323

#elif defined ZMQ_HAVE_WINDOWS
Pieter Hintjens's avatar
Pieter Hintjens committed
324
#   if !defined _WIN32_WCE
325
    // Windows CE does not manage security attributes
Matt Arsenault's avatar
Matt Arsenault committed
326 327
    SECURITY_DESCRIPTOR sd;
    SECURITY_ATTRIBUTES sa;
Martin Hurton's avatar
Martin Hurton committed
328 329
    memset (&sd, 0, sizeof sd);
    memset (&sa, 0, sizeof sa);
Matthew Metnetsky's avatar
Matthew Metnetsky committed
330

Martin Hurton's avatar
Martin Hurton committed
331 332
    InitializeSecurityDescriptor (&sd, SECURITY_DESCRIPTOR_REVISION);
    SetSecurityDescriptorDacl (&sd, TRUE, 0, FALSE);
Matthew Metnetsky's avatar
Matthew Metnetsky committed
333

Martin Hurton's avatar
Martin Hurton committed
334
    sa.nLength = sizeof (SECURITY_ATTRIBUTES);
Matthew Metnetsky's avatar
Matthew Metnetsky committed
335
    sa.lpSecurityDescriptor = &sd;
Pieter Hintjens's avatar
Pieter Hintjens committed
336
#   endif
337

338 339 340 341
    //  This function has to be in a system-wide critical section so that
    //  two instances of the library don't accidentally create signaler
    //  crossing the process boundary.
    //  We'll use named event object to implement the critical section.
Martin Hurton's avatar
Martin Hurton committed
342 343 344
    //  Note that if the event object already exists, the CreateEvent requests
    //  EVENT_ALL_ACCESS access right. If this fails, we try to open
    //  the event object asking for SYNCHRONIZE access only.
345 346 347
    HANDLE sync = NULL;

    //  Create critical section only if using fixed signaler port
348 349 350 351 352
    //  Use problematic Event implementation for compatibility if using old port 5905.
    //  Otherwise use Mutex implementation.
    int event_signaler_port = 5905;

    if (signaler_port == event_signaler_port) {
353
#       if !defined _WIN32_WCE
354
        sync = CreateEventW (&sa, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
355
#       else
356
        sync = CreateEventW (NULL, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
357 358
#       endif
        if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
359
            sync = OpenEventW (SYNCHRONIZE | EVENT_MODIFY_STATE,
360
                              FALSE, L"Global\\zmq-signaler-port-sync");
361 362 363

        win_assert (sync != NULL);
    }
Martin Hurton's avatar
Martin Hurton committed
364 365 366
    else
    if (signaler_port != 0) {
        wchar_t mutex_name [MAX_PATH];
367 368 369
#       ifdef __MINGW32__
        _snwprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d", signaler_port);
#       else
Martin Hurton's avatar
Martin Hurton committed
370
        swprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d", signaler_port);
371
#       endif
372 373

#       if !defined _WIN32_WCE
374
        sync = CreateMutexW (&sa, FALSE, mutex_name);
375
#       else
376
        sync = CreateMutexW (NULL, FALSE, mutex_name);
377 378
#       endif
        if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
379
            sync = OpenMutexW (SYNCHRONIZE, FALSE, mutex_name);
380 381 382

        win_assert (sync != NULL);
    }
383

384 385 386 387 388 389 390
    //  Windows has no 'socketpair' function. CreatePipe is no good as pipe
    //  handles cannot be polled on. Here we create the socketpair by hand.
    *w_ = INVALID_SOCKET;
    *r_ = INVALID_SOCKET;

    //  Create listening socket.
    SOCKET listener;
391
    listener = open_socket (AF_INET, SOCK_STREAM, 0);
392 393 394 395 396
    wsa_assert (listener != INVALID_SOCKET);

    //  Set SO_REUSEADDR and TCP_NODELAY on listening socket.
    BOOL so_reuseaddr = 1;
    int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
Martin Hurton's avatar
Martin Hurton committed
397
        (char *)&so_reuseaddr, sizeof so_reuseaddr);
398 399 400
    wsa_assert (rc != SOCKET_ERROR);
    BOOL tcp_nodelay = 1;
    rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY,
Martin Hurton's avatar
Martin Hurton committed
401
        (char *)&tcp_nodelay, sizeof tcp_nodelay);
402 403
    wsa_assert (rc != SOCKET_ERROR);

404
    //  Init sockaddr to signaler port.
405
    struct sockaddr_in addr;
Martin Hurton's avatar
Martin Hurton committed
406
    memset (&addr, 0, sizeof addr);
407 408
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
409
    addr.sin_port = htons (signaler_port);
410 411

    //  Create the writer socket.
412
    *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
413 414 415 416
    wsa_assert (*w_ != INVALID_SOCKET);

    //  Set TCP_NODELAY on writer socket.
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY,
Martin Hurton's avatar
Martin Hurton committed
417
        (char *) &tcp_nodelay, sizeof tcp_nodelay);
418 419
    wsa_assert (rc != SOCKET_ERROR);

420 421 422
    if (sync != NULL) {
        //  Enter the critical section.
        DWORD dwrc = WaitForSingleObject (sync, INFINITE);
423
        zmq_assert (dwrc == WAIT_OBJECT_0 || dwrc == WAIT_ABANDONED);
424
    }
425 426

    //  Bind listening socket to signaler port.
Martin Hurton's avatar
Martin Hurton committed
427
    rc = bind (listener, (const struct sockaddr*) &addr, sizeof addr);
428

429 430
    if (rc != SOCKET_ERROR && signaler_port == 0) {
        //  Retrieve ephemeral port number
Martin Hurton's avatar
Martin Hurton committed
431
        int addrlen = sizeof addr;
432 433 434
        rc = getsockname (listener, (struct sockaddr*) &addr, &addrlen);
    }

435 436 437 438
    //  Listen for incoming connections.
    if (rc != SOCKET_ERROR)
        rc = listen (listener, 1);

439
    //  Connect writer to the listener.
440
    if (rc != SOCKET_ERROR)
Martin Hurton's avatar
Martin Hurton committed
441
        rc = connect (*w_, (struct sockaddr*) &addr, sizeof addr);
442

443 444
    //  Accept connection from writer.
    if (rc != SOCKET_ERROR)
445
        *r_ = accept (listener, NULL, NULL);
446 447 448 449 450 451

    //  Save errno if error occurred in bind/listen/connect/accept.
    int saved_errno = 0;
    if (*r_ == INVALID_SOCKET)
        saved_errno = WSAGetLastError ();

452
    //  We don't need the listening socket anymore. Close it.
453
    closesocket (listener);
454

455 456
    if (sync != NULL) {
        //  Exit the critical section.
457 458 459 460 461
        BOOL brc;
        if (signaler_port == event_signaler_port)
            brc = SetEvent (sync);
        else
            brc = ReleaseMutex (sync);
462
        win_assert (brc != 0);
463

464 465 466 467
        //  Release the kernel object
        brc = CloseHandle (sync);
        win_assert (brc != 0);
    }
468

469
    if (*r_ != INVALID_SOCKET) {
Pieter Hintjens's avatar
Pieter Hintjens committed
470
#   if !defined _WIN32_WCE
471
        //  On Windows, preventing sockets to be inherited by child processes.
472
        BOOL brc = SetHandleInformation ((HANDLE) *r_, HANDLE_FLAG_INHERIT, 0);
473
        win_assert (brc);
Pieter Hintjens's avatar
Pieter Hintjens committed
474
#   endif
475
        return 0;
Pieter Hintjens's avatar
Pieter Hintjens committed
476 477
    }
    else {
478
        //  Cleanup writer if connection failed
479 480 481 482 483
        if (*w_ != INVALID_SOCKET) {
            rc = closesocket (*w_);
            wsa_assert (rc != SOCKET_ERROR);
            *w_ = INVALID_SOCKET;
        }
484
        //  Set errno from saved value
485
        errno = wsa_error_to_errno (saved_errno);
486 487
        return -1;
    }
488 489 490 491 492 493 494 495 496

#elif defined ZMQ_HAVE_OPENVMS

    //  Whilst OpenVMS supports socketpair - it maps to AF_INET only.  Further,
    //  it does not set the socket options TCP_NODELAY and TCP_NODELACK which
    //  can lead to performance problems.
    //
    //  The bug will be fixed in V5.6 ECO4 and beyond.  In the meantime, we'll
    //  create the socket pair manually.
497
    struct sockaddr_in lcladdr;
Martin Hurton's avatar
Martin Hurton committed
498
    memset (&lcladdr, 0, sizeof lcladdr);
499 500 501 502
    lcladdr.sin_family = AF_INET;
    lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
    lcladdr.sin_port = 0;

503
    int listener = open_socket (AF_INET, SOCK_STREAM, 0);
504 505 506
    errno_assert (listener != -1);

    int on = 1;
Martin Hurton's avatar
Martin Hurton committed
507
    int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
508 509
    errno_assert (rc != -1);

Martin Hurton's avatar
Martin Hurton committed
510
    rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
511 512
    errno_assert (rc != -1);

Martin Hurton's avatar
Martin Hurton committed
513
    rc = bind (listener, (struct sockaddr*) &lcladdr, sizeof lcladdr);
514 515
    errno_assert (rc != -1);

Martin Hurton's avatar
Martin Hurton committed
516
    socklen_t lcladdr_len = sizeof lcladdr;
517 518 519 520 521 522 523

    rc = getsockname (listener, (struct sockaddr*) &lcladdr, &lcladdr_len);
    errno_assert (rc != -1);

    rc = listen (listener, 1);
    errno_assert (rc != -1);

524
    *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
525 526
    errno_assert (*w_ != -1);

Martin Hurton's avatar
Martin Hurton committed
527
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
528 529
    errno_assert (rc != -1);

Martin Hurton's avatar
Martin Hurton committed
530
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
531 532
    errno_assert (rc != -1);

Martin Hurton's avatar
Martin Hurton committed
533
    rc = connect (*w_, (struct sockaddr*) &lcladdr, sizeof lcladdr);
534 535 536 537 538 539 540 541 542
    errno_assert (rc != -1);

    *r_ = accept (listener, NULL, NULL);
    errno_assert (*r_ != -1);

    close (listener);

    return 0;

Pieter Hintjens's avatar
Pieter Hintjens committed
543 544
#else
    // All other implementations support socketpair()
545 546
    int sv [2];
    int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
Pieter Hintjens's avatar
Pieter Hintjens committed
547 548
    if (rc == -1) {
        errno_assert (errno == ENFILE || errno == EMFILE);
549
        *w_ = *r_ = -1;
Pieter Hintjens's avatar
Pieter Hintjens committed
550 551 552 553 554 555 556
        return -1;
    }
    else {
        *w_ = sv [0];
        *r_ = sv [1];
        return 0;
    }
557 558
#endif
}