signaler.cpp 20.3 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29 30 31 32 33

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

//  On AIX, poll.h has to be included before zmq.h to get consistent
//  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
//  instead of 'events' and 'revents' and defines macros to map from POSIX-y
//  names to AIX-specific names).
34 35 36 37 38 39 40 41 42
//  zmq.h must be included *after* poll.h for AIX to build properly.
//  precompiled.hpp includes include/zmq.h
#if defined ZMQ_POLL_BASED_ON_POLL && defined ZMQ_HAVE_AIX
#include <poll.h>
#endif

#include "precompiled.hpp"
#include "poller.hpp"

43
#if defined ZMQ_POLL_BASED_ON_POLL
44
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_AIX
45
#include <poll.h>
46
#endif
47
#elif defined ZMQ_POLL_BASED_ON_SELECT
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
#if defined ZMQ_HAVE_WINDOWS
#elif defined ZMQ_HAVE_HPUX
#include <sys/param.h>
#include <sys/types.h>
#include <sys/time.h>
#elif defined ZMQ_HAVE_OPENVMS
#include <sys/types.h>
#include <sys/time.h>
#else
#include <sys/select.h>
#endif
#endif

#include "signaler.hpp"
#include "likely.hpp"
63
#include "stdint.hpp"
64
#include "config.hpp"
65 66 67 68
#include "err.hpp"
#include "fd.hpp"
#include "ip.hpp"

69 70 71 72
#if defined ZMQ_HAVE_EVENTFD
#include <sys/eventfd.h>
#endif

73
#if !defined ZMQ_HAVE_WINDOWS
74 75 76 77 78 79
#include <unistd.h>
#include <netinet/tcp.h>
#include <sys/types.h>
#include <sys/socket.h>
#endif

80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
#if !defined (ZMQ_HAVE_WINDOWS)
// Helper to sleep for specific number of milliseconds (or until signal)
//
static int sleep_ms (unsigned int ms_)
{
    if (ms_ == 0)
        return 0;
#if defined ZMQ_HAVE_WINDOWS
    Sleep (ms_ > 0 ? ms_ : INFINITE);
    return 0;
#elif defined ZMQ_HAVE_ANDROID
    usleep (ms_ * 1000);
    return 0;
#else
    return usleep (ms_ * 1000);
#endif
}

// Helper to wait on close(), for non-blocking sockets, until it completes
// If EAGAIN is received, will sleep briefly (1-100ms) then try again, until
// the overall timeout is reached.
//
static int close_wait_ms (int fd_, unsigned int max_ms_ = 2000)
{
    unsigned int ms_so_far = 0;
    unsigned int step_ms   = max_ms_ / 10;
    if (step_ms < 1)
        step_ms = 1;
    if (step_ms > 100)
        step_ms = 100;

    int rc = 0;       // do not sleep on first attempt
Pieter Hintjens's avatar
Pieter Hintjens committed
112 113
    do {
        if (rc == -1 && errno == EAGAIN) {
114 115 116 117 118 119 120 121 122 123
            sleep_ms (step_ms);
            ms_so_far += step_ms;
        }
        rc = close (fd_);
    } while (ms_so_far < max_ms_ && rc == -1 && errno == EAGAIN);

    return rc;
}
#endif

124 125 126
zmq::signaler_t::signaler_t ()
{
    //  Create the socketpair for signaling.
127 128 129 130
    if (make_fdpair (&r, &w) == 0) {
        unblock_socket (w);
        unblock_socket (r);
    }
131
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
132
    pid = getpid ();
133
#endif
134 135
}

136 137
// This might get run after some part of construction failed, leaving one or
// both of r and w retired_fd.
138 139
zmq::signaler_t::~signaler_t ()
{
140
#if defined ZMQ_HAVE_EVENTFD
141
    if (r == retired_fd) return;
142
    int rc = close_wait_ms (r);
143 144
    errno_assert (rc == 0);
#elif defined ZMQ_HAVE_WINDOWS
145 146 147 148 149 150 151 152 153 154 155 156 157
    if (w != retired_fd) {
        const struct linger so_linger = { 1, 0 };
        int rc = setsockopt (w, SOL_SOCKET, SO_LINGER,
            (const char *) &so_linger, sizeof so_linger);
        //  Only check shutdown if WSASTARTUP was previously done
        if (rc == 0 || WSAGetLastError () != WSANOTINITIALISED) {
            wsa_assert (rc != SOCKET_ERROR);
            rc = closesocket (w);
            wsa_assert (rc != SOCKET_ERROR);
            if (r == retired_fd) return;
            rc = closesocket (r);
            wsa_assert (rc != SOCKET_ERROR);
        }
Richard Newton's avatar
Richard Newton committed
158
    }
159
#else
160 161 162 163 164
    if (w != retired_fd) {
        int rc = close_wait_ms (w);
        errno_assert (rc == 0);
    }
    if (r != retired_fd) {
165
        int rc = close_wait_ms (r);
166 167
        errno_assert (rc == 0);
    }
168 169 170
#endif
}

Martin Hurton's avatar
Martin Hurton committed
171
zmq::fd_t zmq::signaler_t::get_fd () const
172 173 174 175 176 177
{
    return r;
}

void zmq::signaler_t::send ()
{
Martin Hurton's avatar
Martin Hurton committed
178 179
#if defined HAVE_FORK
    if (unlikely (pid != getpid ())) {
180 181 182 183
        //printf("Child process %d signaler_t::send returning without sending #1\n", getpid());
        return; // do not send anything in forked child context
    }
#endif
184 185 186 187 188
#if defined ZMQ_HAVE_EVENTFD
    const uint64_t inc = 1;
    ssize_t sz = write (w, &inc, sizeof (inc));
    errno_assert (sz == sizeof (inc));
#elif defined ZMQ_HAVE_WINDOWS
189
    unsigned char dummy = 0;
Pieter Hintjens's avatar
Pieter Hintjens committed
190
    int nbytes = ::send (w, (char *) &dummy, sizeof (dummy), 0);
191 192 193 194 195 196 197 198
    wsa_assert (nbytes != SOCKET_ERROR);
    zmq_assert (nbytes == sizeof (dummy));
#else
    unsigned char dummy = 0;
    while (true) {
        ssize_t nbytes = ::send (w, &dummy, sizeof (dummy), 0);
        if (unlikely (nbytes == -1 && errno == EINTR))
            continue;
199
#if defined(HAVE_FORK)
Martin Hurton's avatar
Martin Hurton committed
200
        if (unlikely (pid != getpid ())) {
201 202 203 204 205
            //printf("Child process %d signaler_t::send returning without sending #2\n", getpid());
            errno = EINTR;
            break;
        }
#endif
Martin Hurton's avatar
Martin Hurton committed
206
        zmq_assert (nbytes == sizeof dummy);
207 208 209 210 211 212 213
        break;
    }
#endif
}

int zmq::signaler_t::wait (int timeout_)
{
214
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
215
    if (unlikely (pid != getpid ())) {
216
        // we have forked and the file descriptor is closed. Emulate an interrupt
217 218 219 220 221 222 223
        // response.
        //printf("Child process %d signaler_t::wait returning simulating interrupt #1\n", getpid());
        errno = EINTR;
        return -1;
    }
#endif

224
#ifdef ZMQ_POLL_BASED_ON_POLL
225 226 227 228 229
    struct pollfd pfd;
    pfd.fd = r;
    pfd.events = POLLIN;
    int rc = poll (&pfd, 1, timeout_);
    if (unlikely (rc < 0)) {
230
        errno_assert (errno == EINTR);
231 232
        return -1;
    }
233 234
    else
    if (unlikely (rc == 0)) {
235 236 237
        errno = EAGAIN;
        return -1;
    }
238
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
239 240
    else
    if (unlikely (pid != getpid ())) {
241
        // we have forked and the file descriptor is closed. Emulate an interrupt
242 243 244 245 246 247
        // response.
        //printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid());
        errno = EINTR;
        return -1;
    }
#endif
248 249 250 251
    zmq_assert (rc == 1);
    zmq_assert (pfd.revents & POLLIN);
    return 0;

252
#elif defined ZMQ_POLL_BASED_ON_SELECT
253 254 255 256 257

    fd_set fds;
    FD_ZERO (&fds);
    FD_SET (r, &fds);
    struct timeval timeout;
258 259 260 261
    if (timeout_ >= 0) {
        timeout.tv_sec = timeout_ / 1000;
        timeout.tv_usec = timeout_ % 1000 * 1000;
    }
262
#ifdef ZMQ_HAVE_WINDOWS
263 264
    int rc = select (0, &fds, NULL, NULL,
        timeout_ >= 0 ? &timeout : NULL);
265 266
    wsa_assert (rc != SOCKET_ERROR);
#else
267 268
    int rc = select (r + 1, &fds, NULL, NULL,
        timeout_ >= 0 ? &timeout : NULL);
269
    if (unlikely (rc < 0)) {
270
        errno_assert (errno == EINTR);
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
        return -1;
    }
#endif
    if (unlikely (rc == 0)) {
        errno = EAGAIN;
        return -1;
    }
    zmq_assert (rc == 1);
    return 0;

#else
#error
#endif
}

void zmq::signaler_t::recv ()
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306
{
    //  Attempt to read a signal.
#if defined ZMQ_HAVE_EVENTFD
    uint64_t dummy;
    ssize_t sz = read (r, &dummy, sizeof (dummy));
    errno_assert (sz == sizeof (dummy));

    //  If we accidentally grabbed the next signal(s) along with the current
    //  one, return it back to the eventfd object.
    if (unlikely (dummy > 1)) {
        const uint64_t inc = dummy - 1;
        ssize_t sz2 = write (w, &inc, sizeof (inc));
        errno_assert (sz2 == sizeof (inc));
        return;
    }

    zmq_assert (dummy == 1);
#else
    unsigned char dummy;
#if defined ZMQ_HAVE_WINDOWS
Pieter Hintjens's avatar
Pieter Hintjens committed
307
    int nbytes = ::recv (r, (char *) &dummy, sizeof (dummy), 0);
308 309 310 311 312 313 314 315 316 317 318
    wsa_assert (nbytes != SOCKET_ERROR);
#else
    ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
    errno_assert (nbytes >= 0);
#endif
    zmq_assert (nbytes == sizeof (dummy));
    zmq_assert (dummy == 0);
#endif
}

int zmq::signaler_t::recv_failable ()
319 320
{
    //  Attempt to read a signal.
321 322 323
#if defined ZMQ_HAVE_EVENTFD
    uint64_t dummy;
    ssize_t sz = read (r, &dummy, sizeof (dummy));
324 325
    if (sz == -1) {
        errno_assert (errno == EAGAIN);
326
        return -1;
327
    }
328 329 330 331 332 333 334 335 336
    else {
        errno_assert (sz == sizeof (dummy));

        //  If we accidentally grabbed the next signal(s) along with the current
        //  one, return it back to the eventfd object.
        if (unlikely (dummy > 1)) {
            const uint64_t inc = dummy - 1;
            ssize_t sz2 = write (w, &inc, sizeof (inc));
            errno_assert (sz2 == sizeof (inc));
337
            return 0;
338
        }
339

340 341
        zmq_assert (dummy == 1);
    }
342
#else
343
    unsigned char dummy;
Martin Sustrik's avatar
Martin Sustrik committed
344
#if defined ZMQ_HAVE_WINDOWS
Pieter Hintjens's avatar
Pieter Hintjens committed
345
    int nbytes = ::recv (r, (char *) &dummy, sizeof (dummy), 0);
346
    if (nbytes == SOCKET_ERROR) {
347 348 349
        const int last_error = WSAGetLastError();
        if (last_error == WSAEWOULDBLOCK) {
            errno = EAGAIN;
350
            return -1;
351
        }
352 353
        wsa_assert (last_error == WSAEWOULDBLOCK);
    }
354 355
#else
    ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
356 357 358 359 360 361 362
    if (nbytes == -1) {
        if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
            errno = EAGAIN;
            return -1;
        }
        errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR);
    }
363 364 365
#endif
    zmq_assert (nbytes == sizeof (dummy));
    zmq_assert (dummy == 0);
366
#endif
367
    return 0;
368 369
}

370
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
371
void zmq::signaler_t::forked ()
372
{
373 374 375
    //  Close file descriptors created in the parent and create new pair
    close (r);
    close (w);
376
    make_fdpair (&r, &w);
377 378 379
}
#endif

Pieter Hintjens's avatar
Pieter Hintjens committed
380
//  Returns -1 if we could not make the socket pair successfully
381 382
int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
{
383
#if defined ZMQ_HAVE_EVENTFD
384 385 386 387 388 389 390 391
    int flags = 0;
#if defined ZMQ_HAVE_EVENTFD_CLOEXEC
    //  Setting this option result in sane behaviour when exec() functions
    //  are used. Old sockets are closed and don't block TCP ports, avoid
    //  leaks, etc.
    flags |= EFD_CLOEXEC;
#endif
    fd_t fd = eventfd (0, flags);
Pieter Hintjens's avatar
Pieter Hintjens committed
392 393 394 395 396 397 398 399 400
    if (fd == -1) {
        errno_assert (errno == ENFILE || errno == EMFILE);
        *w_ = *r_ = -1;
        return -1;
    }
    else {
        *w_ = *r_ = fd;
        return 0;
    }
401 402

#elif defined ZMQ_HAVE_WINDOWS
Pieter Hintjens's avatar
Pieter Hintjens committed
403
#   if !defined _WIN32_WCE
404
    //  Windows CE does not manage security attributes
Matt Arsenault's avatar
Matt Arsenault committed
405 406
    SECURITY_DESCRIPTOR sd;
    SECURITY_ATTRIBUTES sa;
Martin Hurton's avatar
Martin Hurton committed
407 408
    memset (&sd, 0, sizeof sd);
    memset (&sa, 0, sizeof sa);
Matthew Metnetsky's avatar
Matthew Metnetsky committed
409

Martin Hurton's avatar
Martin Hurton committed
410 411
    InitializeSecurityDescriptor (&sd, SECURITY_DESCRIPTOR_REVISION);
    SetSecurityDescriptorDacl (&sd, TRUE, 0, FALSE);
Matthew Metnetsky's avatar
Matthew Metnetsky committed
412

Martin Hurton's avatar
Martin Hurton committed
413
    sa.nLength = sizeof (SECURITY_ATTRIBUTES);
Matthew Metnetsky's avatar
Matthew Metnetsky committed
414
    sa.lpSecurityDescriptor = &sd;
Pieter Hintjens's avatar
Pieter Hintjens committed
415
#   endif
416

417 418 419 420
    //  This function has to be in a system-wide critical section so that
    //  two instances of the library don't accidentally create signaler
    //  crossing the process boundary.
    //  We'll use named event object to implement the critical section.
Martin Hurton's avatar
Martin Hurton committed
421 422 423
    //  Note that if the event object already exists, the CreateEvent requests
    //  EVENT_ALL_ACCESS access right. If this fails, we try to open
    //  the event object asking for SYNCHRONIZE access only.
424 425 426
    HANDLE sync = NULL;

    //  Create critical section only if using fixed signaler port
427 428 429 430 431
    //  Use problematic Event implementation for compatibility if using old port 5905.
    //  Otherwise use Mutex implementation.
    int event_signaler_port = 5905;

    if (signaler_port == event_signaler_port) {
432
#       if !defined _WIN32_WCE
433
        sync = CreateEventW (&sa, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
434
#       else
435
        sync = CreateEventW (NULL, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
436 437
#       endif
        if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
438
            sync = OpenEventW (SYNCHRONIZE | EVENT_MODIFY_STATE,
439
                              FALSE, L"Global\\zmq-signaler-port-sync");
440 441 442

        win_assert (sync != NULL);
    }
Martin Hurton's avatar
Martin Hurton committed
443 444 445
    else
    if (signaler_port != 0) {
        wchar_t mutex_name [MAX_PATH];
446 447 448
#       ifdef __MINGW32__
        _snwprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d", signaler_port);
#       else
Martin Hurton's avatar
Martin Hurton committed
449
        swprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d", signaler_port);
450
#       endif
451 452

#       if !defined _WIN32_WCE
453
        sync = CreateMutexW (&sa, FALSE, mutex_name);
454
#       else
455
        sync = CreateMutexW (NULL, FALSE, mutex_name);
456 457
#       endif
        if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
458
            sync = OpenMutexW (SYNCHRONIZE, FALSE, mutex_name);
459 460 461

        win_assert (sync != NULL);
    }
462

463 464 465 466 467 468 469
    //  Windows has no 'socketpair' function. CreatePipe is no good as pipe
    //  handles cannot be polled on. Here we create the socketpair by hand.
    *w_ = INVALID_SOCKET;
    *r_ = INVALID_SOCKET;

    //  Create listening socket.
    SOCKET listener;
470
    listener = open_socket (AF_INET, SOCK_STREAM, 0);
471 472 473 474 475
    wsa_assert (listener != INVALID_SOCKET);

    //  Set SO_REUSEADDR and TCP_NODELAY on listening socket.
    BOOL so_reuseaddr = 1;
    int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
Pieter Hintjens's avatar
Pieter Hintjens committed
476
        (char *) &so_reuseaddr, sizeof so_reuseaddr);
477 478 479
    wsa_assert (rc != SOCKET_ERROR);
    BOOL tcp_nodelay = 1;
    rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY,
Pieter Hintjens's avatar
Pieter Hintjens committed
480
        (char *) &tcp_nodelay, sizeof tcp_nodelay);
481 482
    wsa_assert (rc != SOCKET_ERROR);

483
    //  Init sockaddr to signaler port.
484
    struct sockaddr_in addr;
Martin Hurton's avatar
Martin Hurton committed
485
    memset (&addr, 0, sizeof addr);
486 487
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
488
    addr.sin_port = htons (signaler_port);
489 490

    //  Create the writer socket.
491
    *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
492 493 494 495
    wsa_assert (*w_ != INVALID_SOCKET);

    //  Set TCP_NODELAY on writer socket.
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY,
Martin Hurton's avatar
Martin Hurton committed
496
        (char *) &tcp_nodelay, sizeof tcp_nodelay);
497 498
    wsa_assert (rc != SOCKET_ERROR);

499 500 501
    if (sync != NULL) {
        //  Enter the critical section.
        DWORD dwrc = WaitForSingleObject (sync, INFINITE);
502
        zmq_assert (dwrc == WAIT_OBJECT_0 || dwrc == WAIT_ABANDONED);
503
    }
504 505

    //  Bind listening socket to signaler port.
Pieter Hintjens's avatar
Pieter Hintjens committed
506
    rc = bind (listener, (const struct sockaddr *) &addr, sizeof addr);
507

508 509
    if (rc != SOCKET_ERROR && signaler_port == 0) {
        //  Retrieve ephemeral port number
Martin Hurton's avatar
Martin Hurton committed
510
        int addrlen = sizeof addr;
Pieter Hintjens's avatar
Pieter Hintjens committed
511
        rc = getsockname (listener, (struct sockaddr *) &addr, &addrlen);
512 513
    }

514 515 516 517
    //  Listen for incoming connections.
    if (rc != SOCKET_ERROR)
        rc = listen (listener, 1);

518
    //  Connect writer to the listener.
519
    if (rc != SOCKET_ERROR)
Pieter Hintjens's avatar
Pieter Hintjens committed
520
        rc = connect (*w_, (struct sockaddr *) &addr, sizeof addr);
521

522 523
    //  Accept connection from writer.
    if (rc != SOCKET_ERROR)
524
        *r_ = accept (listener, NULL, NULL);
525

526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546
    //  Send/receive large chunk to work around TCP slow start
    //  This code is a workaround for #1608
    if (*r_ != INVALID_SOCKET) {
        size_t dummy_size = 1024 * 1024;        //  1M to overload default receive buffer
        unsigned char *dummy = (unsigned char *) malloc (dummy_size);
        int still_to_send = (int) dummy_size;
        int still_to_recv = (int) dummy_size;
        while (still_to_send || still_to_recv) {
            int nbytes;
            if (still_to_send > 0) {
                nbytes = ::send (*w_, (char *) (dummy + dummy_size - still_to_send), still_to_send, 0);
                wsa_assert (nbytes != SOCKET_ERROR);
                still_to_send -= nbytes;
            }
            nbytes = ::recv (*r_, (char *) (dummy + dummy_size - still_to_recv), still_to_recv, 0);
            wsa_assert (nbytes != SOCKET_ERROR);
            still_to_recv -= nbytes;
        }
        free (dummy);
    }

547 548 549 550 551
    //  Save errno if error occurred in bind/listen/connect/accept.
    int saved_errno = 0;
    if (*r_ == INVALID_SOCKET)
        saved_errno = WSAGetLastError ();

552
    //  We don't need the listening socket anymore. Close it.
553
    rc = closesocket (listener);
554
    wsa_assert(rc != SOCKET_ERROR);
555

556 557
    if (sync != NULL) {
        //  Exit the critical section.
558 559 560 561 562
        BOOL brc;
        if (signaler_port == event_signaler_port)
            brc = SetEvent (sync);
        else
            brc = ReleaseMutex (sync);
563
        win_assert (brc != 0);
564

565 566 567 568
        //  Release the kernel object
        brc = CloseHandle (sync);
        win_assert (brc != 0);
    }
569

570
    if (*r_ != INVALID_SOCKET) {
Pieter Hintjens's avatar
Pieter Hintjens committed
571
#   if !defined _WIN32_WCE
572
        //  On Windows, preventing sockets to be inherited by child processes.
573
        BOOL brc = SetHandleInformation ((HANDLE) *r_, HANDLE_FLAG_INHERIT, 0);
574
        win_assert (brc);
Pieter Hintjens's avatar
Pieter Hintjens committed
575
#   endif
576
        return 0;
Pieter Hintjens's avatar
Pieter Hintjens committed
577 578
    }
    else {
579
        //  Cleanup writer if connection failed
580 581 582 583 584
        if (*w_ != INVALID_SOCKET) {
            rc = closesocket (*w_);
            wsa_assert (rc != SOCKET_ERROR);
            *w_ = INVALID_SOCKET;
        }
585
        //  Set errno from saved value
586
        errno = wsa_error_to_errno (saved_errno);
587 588
        return -1;
    }
589 590 591 592 593 594 595 596 597

#elif defined ZMQ_HAVE_OPENVMS

    //  Whilst OpenVMS supports socketpair - it maps to AF_INET only.  Further,
    //  it does not set the socket options TCP_NODELAY and TCP_NODELACK which
    //  can lead to performance problems.
    //
    //  The bug will be fixed in V5.6 ECO4 and beyond.  In the meantime, we'll
    //  create the socket pair manually.
598
    struct sockaddr_in lcladdr;
Martin Hurton's avatar
Martin Hurton committed
599
    memset (&lcladdr, 0, sizeof lcladdr);
600 601 602 603
    lcladdr.sin_family = AF_INET;
    lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
    lcladdr.sin_port = 0;

604
    int listener = open_socket (AF_INET, SOCK_STREAM, 0);
605 606 607
    errno_assert (listener != -1);

    int on = 1;
Martin Hurton's avatar
Martin Hurton committed
608
    int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
609 610
    errno_assert (rc != -1);

Martin Hurton's avatar
Martin Hurton committed
611
    rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
612 613
    errno_assert (rc != -1);

Pieter Hintjens's avatar
Pieter Hintjens committed
614
    rc = bind (listener, (struct sockaddr *) &lcladdr, sizeof lcladdr);
615 616
    errno_assert (rc != -1);

Martin Hurton's avatar
Martin Hurton committed
617
    socklen_t lcladdr_len = sizeof lcladdr;
618

Pieter Hintjens's avatar
Pieter Hintjens committed
619
    rc = getsockname (listener, (struct sockaddr *) &lcladdr, &lcladdr_len);
620 621 622 623 624
    errno_assert (rc != -1);

    rc = listen (listener, 1);
    errno_assert (rc != -1);

625
    *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
626 627
    errno_assert (*w_ != -1);

Martin Hurton's avatar
Martin Hurton committed
628
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
629 630
    errno_assert (rc != -1);

Martin Hurton's avatar
Martin Hurton committed
631
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
632 633
    errno_assert (rc != -1);

Pieter Hintjens's avatar
Pieter Hintjens committed
634
    rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr);
635 636 637 638 639 640 641 642 643
    errno_assert (rc != -1);

    *r_ = accept (listener, NULL, NULL);
    errno_assert (*r_ != -1);

    close (listener);

    return 0;

Pieter Hintjens's avatar
Pieter Hintjens committed
644 645
#else
    // All other implementations support socketpair()
646
    int sv [2];
647 648 649 650 651 652 653 654
    int type = SOCK_STREAM;
    //  Setting this option result in sane behaviour when exec() functions
    //  are used. Old sockets are closed and don't block TCP ports, avoid
    //  leaks, etc.
#if defined ZMQ_HAVE_SOCK_CLOEXEC
    type |= SOCK_CLOEXEC;
#endif
    int rc = socketpair (AF_UNIX, type, 0, sv);
Pieter Hintjens's avatar
Pieter Hintjens committed
655 656
    if (rc == -1) {
        errno_assert (errno == ENFILE || errno == EMFILE);
657
        *w_ = *r_ = -1;
Pieter Hintjens's avatar
Pieter Hintjens committed
658 659 660
        return -1;
    }
    else {
661 662 663 664 665 666 667 668 669
        //  If there's no SOCK_CLOEXEC, let's try the second best option. Note that
        //  race condition can cause socket not to be closed (if fork happens
        //  between socket creation and this point).
#if !defined ZMQ_HAVE_SOCK_CLOEXEC && defined FD_CLOEXEC
        rc = fcntl (sv [0], F_SETFD, FD_CLOEXEC);
        errno_assert (rc != -1);
        rc = fcntl (sv [1], F_SETFD, FD_CLOEXEC);
        errno_assert (rc != -1);
#endif
Pieter Hintjens's avatar
Pieter Hintjens committed
670 671 672 673
        *w_ = sv [0];
        *r_ = sv [1];
        return 0;
    }
674 675
#endif
}