signaler.cpp 20.6 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29 30 31 32 33

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

//  On AIX, poll.h has to be included before zmq.h to get consistent
//  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
//  instead of 'events' and 'revents' and defines macros to map from POSIX-y
//  names to AIX-specific names).
34 35 36 37 38 39 40 41 42
//  zmq.h must be included *after* poll.h for AIX to build properly.
//  precompiled.hpp includes include/zmq.h
#if defined ZMQ_POLL_BASED_ON_POLL && defined ZMQ_HAVE_AIX
#include <poll.h>
#endif

#include "precompiled.hpp"
#include "poller.hpp"

43
#if defined ZMQ_POLL_BASED_ON_POLL
44
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_AIX
45
#include <poll.h>
46
#endif
47
#elif defined ZMQ_POLL_BASED_ON_SELECT
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
#if defined ZMQ_HAVE_WINDOWS
#elif defined ZMQ_HAVE_HPUX
#include <sys/param.h>
#include <sys/types.h>
#include <sys/time.h>
#elif defined ZMQ_HAVE_OPENVMS
#include <sys/types.h>
#include <sys/time.h>
#else
#include <sys/select.h>
#endif
#endif

#include "signaler.hpp"
#include "likely.hpp"
63
#include "stdint.hpp"
64
#include "config.hpp"
65 66 67 68
#include "err.hpp"
#include "fd.hpp"
#include "ip.hpp"

69 70 71 72
#if defined ZMQ_HAVE_EVENTFD
#include <sys/eventfd.h>
#endif

73
#if !defined ZMQ_HAVE_WINDOWS
74 75 76 77 78 79
#include <unistd.h>
#include <netinet/tcp.h>
#include <sys/types.h>
#include <sys/socket.h>
#endif

80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
#if !defined (ZMQ_HAVE_WINDOWS)
// Helper to sleep for specific number of milliseconds (or until signal)
//
static int sleep_ms (unsigned int ms_)
{
    if (ms_ == 0)
        return 0;
#if defined ZMQ_HAVE_WINDOWS
    Sleep (ms_ > 0 ? ms_ : INFINITE);
    return 0;
#elif defined ZMQ_HAVE_ANDROID
    usleep (ms_ * 1000);
    return 0;
#else
    return usleep (ms_ * 1000);
#endif
}

// Helper to wait on close(), for non-blocking sockets, until it completes
// If EAGAIN is received, will sleep briefly (1-100ms) then try again, until
// the overall timeout is reached.
//
static int close_wait_ms (int fd_, unsigned int max_ms_ = 2000)
{
    unsigned int ms_so_far = 0;
    unsigned int step_ms   = max_ms_ / 10;
    if (step_ms < 1)
        step_ms = 1;
    if (step_ms > 100)
        step_ms = 100;

    int rc = 0;       // do not sleep on first attempt
Pieter Hintjens's avatar
Pieter Hintjens committed
112 113
    do {
        if (rc == -1 && errno == EAGAIN) {
114 115 116 117 118 119 120 121 122 123
            sleep_ms (step_ms);
            ms_so_far += step_ms;
        }
        rc = close (fd_);
    } while (ms_so_far < max_ms_ && rc == -1 && errno == EAGAIN);

    return rc;
}
#endif

124 125 126
zmq::signaler_t::signaler_t ()
{
    //  Create the socketpair for signaling.
127 128 129 130
    if (make_fdpair (&r, &w) == 0) {
        unblock_socket (w);
        unblock_socket (r);
    }
131
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
132
    pid = getpid ();
133
#endif
134 135
}

136 137
// This might get run after some part of construction failed, leaving one or
// both of r and w retired_fd.
138 139
zmq::signaler_t::~signaler_t ()
{
140
#if defined ZMQ_HAVE_EVENTFD
141
    if (r == retired_fd) return;
142
    int rc = close_wait_ms (r);
143 144
    errno_assert (rc == 0);
#elif defined ZMQ_HAVE_WINDOWS
145 146 147 148 149 150 151 152 153 154 155 156 157
    if (w != retired_fd) {
        const struct linger so_linger = { 1, 0 };
        int rc = setsockopt (w, SOL_SOCKET, SO_LINGER,
            (const char *) &so_linger, sizeof so_linger);
        //  Only check shutdown if WSASTARTUP was previously done
        if (rc == 0 || WSAGetLastError () != WSANOTINITIALISED) {
            wsa_assert (rc != SOCKET_ERROR);
            rc = closesocket (w);
            wsa_assert (rc != SOCKET_ERROR);
            if (r == retired_fd) return;
            rc = closesocket (r);
            wsa_assert (rc != SOCKET_ERROR);
        }
Richard Newton's avatar
Richard Newton committed
158
    }
159
#else
160 161 162 163 164
    if (w != retired_fd) {
        int rc = close_wait_ms (w);
        errno_assert (rc == 0);
    }
    if (r != retired_fd) {
165
        int rc = close_wait_ms (r);
166 167
        errno_assert (rc == 0);
    }
168 169 170
#endif
}

Martin Hurton's avatar
Martin Hurton committed
171
zmq::fd_t zmq::signaler_t::get_fd () const
172 173 174 175 176 177
{
    return r;
}

void zmq::signaler_t::send ()
{
Martin Hurton's avatar
Martin Hurton committed
178 179
#if defined HAVE_FORK
    if (unlikely (pid != getpid ())) {
180 181 182 183
        //printf("Child process %d signaler_t::send returning without sending #1\n", getpid());
        return; // do not send anything in forked child context
    }
#endif
184 185 186 187 188
#if defined ZMQ_HAVE_EVENTFD
    const uint64_t inc = 1;
    ssize_t sz = write (w, &inc, sizeof (inc));
    errno_assert (sz == sizeof (inc));
#elif defined ZMQ_HAVE_WINDOWS
189
    unsigned char dummy = 0;
190 191
    while (true) {
        int nbytes = ::send (w, (char*) &dummy, sizeof (dummy), 0);
192
        wsa_assert (nbytes != SOCKET_ERROR);
193 194 195 196 197
        if (unlikely (nbytes == SOCKET_ERROR))
            continue;
        zmq_assert (nbytes == sizeof (dummy));
        break;
    }
198 199 200 201 202 203
#else
    unsigned char dummy = 0;
    while (true) {
        ssize_t nbytes = ::send (w, &dummy, sizeof (dummy), 0);
        if (unlikely (nbytes == -1 && errno == EINTR))
            continue;
204
#if defined(HAVE_FORK)
Martin Hurton's avatar
Martin Hurton committed
205
        if (unlikely (pid != getpid ())) {
206 207 208 209 210
            //printf("Child process %d signaler_t::send returning without sending #2\n", getpid());
            errno = EINTR;
            break;
        }
#endif
Martin Hurton's avatar
Martin Hurton committed
211
        zmq_assert (nbytes == sizeof dummy);
212 213 214 215 216 217 218
        break;
    }
#endif
}

int zmq::signaler_t::wait (int timeout_)
{
219
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
220
    if (unlikely (pid != getpid ())) {
221
        // we have forked and the file descriptor is closed. Emulate an interrupt
222 223 224 225 226 227 228
        // response.
        //printf("Child process %d signaler_t::wait returning simulating interrupt #1\n", getpid());
        errno = EINTR;
        return -1;
    }
#endif

229
#ifdef ZMQ_POLL_BASED_ON_POLL
230 231 232 233 234
    struct pollfd pfd;
    pfd.fd = r;
    pfd.events = POLLIN;
    int rc = poll (&pfd, 1, timeout_);
    if (unlikely (rc < 0)) {
235
        errno_assert (errno == EINTR);
236 237
        return -1;
    }
238 239
    else
    if (unlikely (rc == 0)) {
240 241 242
        errno = EAGAIN;
        return -1;
    }
243
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
244 245
    else
    if (unlikely (pid != getpid ())) {
246
        // we have forked and the file descriptor is closed. Emulate an interrupt
247 248 249 250 251 252
        // response.
        //printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid());
        errno = EINTR;
        return -1;
    }
#endif
253 254 255 256
    zmq_assert (rc == 1);
    zmq_assert (pfd.revents & POLLIN);
    return 0;

257
#elif defined ZMQ_POLL_BASED_ON_SELECT
258 259 260 261 262

    fd_set fds;
    FD_ZERO (&fds);
    FD_SET (r, &fds);
    struct timeval timeout;
263 264 265 266
    if (timeout_ >= 0) {
        timeout.tv_sec = timeout_ / 1000;
        timeout.tv_usec = timeout_ % 1000 * 1000;
    }
267
#ifdef ZMQ_HAVE_WINDOWS
268 269
    int rc = select (0, &fds, NULL, NULL,
        timeout_ >= 0 ? &timeout : NULL);
270 271
    wsa_assert (rc != SOCKET_ERROR);
#else
272 273
    int rc = select (r + 1, &fds, NULL, NULL,
        timeout_ >= 0 ? &timeout : NULL);
274
    if (unlikely (rc < 0)) {
275
        errno_assert (errno == EINTR);
276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291
        return -1;
    }
#endif
    if (unlikely (rc == 0)) {
        errno = EAGAIN;
        return -1;
    }
    zmq_assert (rc == 1);
    return 0;

#else
#error
#endif
}

void zmq::signaler_t::recv ()
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
{
    //  Attempt to read a signal.
#if defined ZMQ_HAVE_EVENTFD
    uint64_t dummy;
    ssize_t sz = read (r, &dummy, sizeof (dummy));
    errno_assert (sz == sizeof (dummy));

    //  If we accidentally grabbed the next signal(s) along with the current
    //  one, return it back to the eventfd object.
    if (unlikely (dummy > 1)) {
        const uint64_t inc = dummy - 1;
        ssize_t sz2 = write (w, &inc, sizeof (inc));
        errno_assert (sz2 == sizeof (inc));
        return;
    }

    zmq_assert (dummy == 1);
#else
    unsigned char dummy;
#if defined ZMQ_HAVE_WINDOWS
Pieter Hintjens's avatar
Pieter Hintjens committed
312
    int nbytes = ::recv (r, (char *) &dummy, sizeof (dummy), 0);
313 314 315 316 317 318 319 320 321 322 323
    wsa_assert (nbytes != SOCKET_ERROR);
#else
    ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
    errno_assert (nbytes >= 0);
#endif
    zmq_assert (nbytes == sizeof (dummy));
    zmq_assert (dummy == 0);
#endif
}

int zmq::signaler_t::recv_failable ()
324 325
{
    //  Attempt to read a signal.
326 327 328
#if defined ZMQ_HAVE_EVENTFD
    uint64_t dummy;
    ssize_t sz = read (r, &dummy, sizeof (dummy));
329 330
    if (sz == -1) {
        errno_assert (errno == EAGAIN);
331
        return -1;
332
    }
333 334 335 336 337 338 339 340 341
    else {
        errno_assert (sz == sizeof (dummy));

        //  If we accidentally grabbed the next signal(s) along with the current
        //  one, return it back to the eventfd object.
        if (unlikely (dummy > 1)) {
            const uint64_t inc = dummy - 1;
            ssize_t sz2 = write (w, &inc, sizeof (inc));
            errno_assert (sz2 == sizeof (inc));
342
            return 0;
343
        }
344

345 346
        zmq_assert (dummy == 1);
    }
347
#else
348
    unsigned char dummy;
Martin Sustrik's avatar
Martin Sustrik committed
349
#if defined ZMQ_HAVE_WINDOWS
Pieter Hintjens's avatar
Pieter Hintjens committed
350
    int nbytes = ::recv (r, (char *) &dummy, sizeof (dummy), 0);
351
    if (nbytes == SOCKET_ERROR) {
352 353 354
        const int last_error = WSAGetLastError();
        if (last_error == WSAEWOULDBLOCK) {
            errno = EAGAIN;
355
            return -1;
356
        }
357 358
        wsa_assert (last_error == WSAEWOULDBLOCK);
    }
359 360
#else
    ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
361 362 363 364 365 366 367
    if (nbytes == -1) {
        if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
            errno = EAGAIN;
            return -1;
        }
        errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR);
    }
368 369 370
#endif
    zmq_assert (nbytes == sizeof (dummy));
    zmq_assert (dummy == 0);
371
#endif
372
    return 0;
373 374
}

375
#ifdef HAVE_FORK
Martin Hurton's avatar
Martin Hurton committed
376
void zmq::signaler_t::forked ()
377
{
378 379 380
    //  Close file descriptors created in the parent and create new pair
    close (r);
    close (w);
381
    make_fdpair (&r, &w);
382 383 384
}
#endif

Pieter Hintjens's avatar
Pieter Hintjens committed
385
//  Returns -1 if we could not make the socket pair successfully
386 387
int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
{
388
#if defined ZMQ_HAVE_EVENTFD
389 390 391 392 393 394 395 396
    int flags = 0;
#if defined ZMQ_HAVE_EVENTFD_CLOEXEC
    //  Setting this option result in sane behaviour when exec() functions
    //  are used. Old sockets are closed and don't block TCP ports, avoid
    //  leaks, etc.
    flags |= EFD_CLOEXEC;
#endif
    fd_t fd = eventfd (0, flags);
Pieter Hintjens's avatar
Pieter Hintjens committed
397 398 399 400 401 402 403 404 405
    if (fd == -1) {
        errno_assert (errno == ENFILE || errno == EMFILE);
        *w_ = *r_ = -1;
        return -1;
    }
    else {
        *w_ = *r_ = fd;
        return 0;
    }
406 407

#elif defined ZMQ_HAVE_WINDOWS
408
#   if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
409
    //  Windows CE does not manage security attributes
Matt Arsenault's avatar
Matt Arsenault committed
410 411
    SECURITY_DESCRIPTOR sd;
    SECURITY_ATTRIBUTES sa;
Martin Hurton's avatar
Martin Hurton committed
412 413
    memset (&sd, 0, sizeof sd);
    memset (&sa, 0, sizeof sa);
Matthew Metnetsky's avatar
Matthew Metnetsky committed
414

Martin Hurton's avatar
Martin Hurton committed
415 416
    InitializeSecurityDescriptor (&sd, SECURITY_DESCRIPTOR_REVISION);
    SetSecurityDescriptorDacl (&sd, TRUE, 0, FALSE);
Matthew Metnetsky's avatar
Matthew Metnetsky committed
417

Martin Hurton's avatar
Martin Hurton committed
418
    sa.nLength = sizeof (SECURITY_ATTRIBUTES);
Matthew Metnetsky's avatar
Matthew Metnetsky committed
419
    sa.lpSecurityDescriptor = &sd;
Pieter Hintjens's avatar
Pieter Hintjens committed
420
#   endif
421

422 423 424 425
    //  This function has to be in a system-wide critical section so that
    //  two instances of the library don't accidentally create signaler
    //  crossing the process boundary.
    //  We'll use named event object to implement the critical section.
Martin Hurton's avatar
Martin Hurton committed
426 427 428
    //  Note that if the event object already exists, the CreateEvent requests
    //  EVENT_ALL_ACCESS access right. If this fails, we try to open
    //  the event object asking for SYNCHRONIZE access only.
429 430 431
    HANDLE sync = NULL;

    //  Create critical section only if using fixed signaler port
432 433 434 435 436
    //  Use problematic Event implementation for compatibility if using old port 5905.
    //  Otherwise use Mutex implementation.
    int event_signaler_port = 5905;

    if (signaler_port == event_signaler_port) {
437
#       if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
438
        sync = CreateEventW (&sa, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
439
#       else
440
        sync = CreateEventW (NULL, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
441 442
#       endif
        if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
443
            sync = OpenEventW (SYNCHRONIZE | EVENT_MODIFY_STATE,
444
                              FALSE, L"Global\\zmq-signaler-port-sync");
445 446 447

        win_assert (sync != NULL);
    }
Martin Hurton's avatar
Martin Hurton committed
448 449 450
    else
    if (signaler_port != 0) {
        wchar_t mutex_name [MAX_PATH];
451 452 453
#       ifdef __MINGW32__
        _snwprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d", signaler_port);
#       else
Martin Hurton's avatar
Martin Hurton committed
454
        swprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d", signaler_port);
455
#       endif
456

457
#       if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
458
        sync = CreateMutexW (&sa, FALSE, mutex_name);
459
#       else
460
        sync = CreateMutexW (NULL, FALSE, mutex_name);
461 462
#       endif
        if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
463
            sync = OpenMutexW (SYNCHRONIZE, FALSE, mutex_name);
464 465 466

        win_assert (sync != NULL);
    }
467

468 469 470 471 472 473 474
    //  Windows has no 'socketpair' function. CreatePipe is no good as pipe
    //  handles cannot be polled on. Here we create the socketpair by hand.
    *w_ = INVALID_SOCKET;
    *r_ = INVALID_SOCKET;

    //  Create listening socket.
    SOCKET listener;
475
    listener = open_socket (AF_INET, SOCK_STREAM, 0);
476 477 478 479 480
    wsa_assert (listener != INVALID_SOCKET);

    //  Set SO_REUSEADDR and TCP_NODELAY on listening socket.
    BOOL so_reuseaddr = 1;
    int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
Pieter Hintjens's avatar
Pieter Hintjens committed
481
        (char *) &so_reuseaddr, sizeof so_reuseaddr);
482 483 484
    wsa_assert (rc != SOCKET_ERROR);
    BOOL tcp_nodelay = 1;
    rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY,
Pieter Hintjens's avatar
Pieter Hintjens committed
485
        (char *) &tcp_nodelay, sizeof tcp_nodelay);
486 487
    wsa_assert (rc != SOCKET_ERROR);

488
    //  Init sockaddr to signaler port.
489
    struct sockaddr_in addr;
Martin Hurton's avatar
Martin Hurton committed
490
    memset (&addr, 0, sizeof addr);
491 492
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
493
    addr.sin_port = htons (signaler_port);
494 495

    //  Create the writer socket.
496
    *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
497 498 499 500
    wsa_assert (*w_ != INVALID_SOCKET);

    //  Set TCP_NODELAY on writer socket.
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY,
Martin Hurton's avatar
Martin Hurton committed
501
        (char *) &tcp_nodelay, sizeof tcp_nodelay);
502 503
    wsa_assert (rc != SOCKET_ERROR);

504 505 506
    if (sync != NULL) {
        //  Enter the critical section.
        DWORD dwrc = WaitForSingleObject (sync, INFINITE);
507
        zmq_assert (dwrc == WAIT_OBJECT_0 || dwrc == WAIT_ABANDONED);
508
    }
509 510

    //  Bind listening socket to signaler port.
Pieter Hintjens's avatar
Pieter Hintjens committed
511
    rc = bind (listener, (const struct sockaddr *) &addr, sizeof addr);
512

513 514
    if (rc != SOCKET_ERROR && signaler_port == 0) {
        //  Retrieve ephemeral port number
Martin Hurton's avatar
Martin Hurton committed
515
        int addrlen = sizeof addr;
Pieter Hintjens's avatar
Pieter Hintjens committed
516
        rc = getsockname (listener, (struct sockaddr *) &addr, &addrlen);
517 518
    }

519 520 521 522
    //  Listen for incoming connections.
    if (rc != SOCKET_ERROR)
        rc = listen (listener, 1);

523
    //  Connect writer to the listener.
524
    if (rc != SOCKET_ERROR)
Pieter Hintjens's avatar
Pieter Hintjens committed
525
        rc = connect (*w_, (struct sockaddr *) &addr, sizeof addr);
526

527 528
    //  Accept connection from writer.
    if (rc != SOCKET_ERROR)
529
        *r_ = accept (listener, NULL, NULL);
530

531 532 533 534 535
    //  Send/receive large chunk to work around TCP slow start
    //  This code is a workaround for #1608
    if (*r_ != INVALID_SOCKET) {
        size_t dummy_size = 1024 * 1024;        //  1M to overload default receive buffer
        unsigned char *dummy = (unsigned char *) malloc (dummy_size);
536 537
        wsa_assert (dummy);

538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553
        int still_to_send = (int) dummy_size;
        int still_to_recv = (int) dummy_size;
        while (still_to_send || still_to_recv) {
            int nbytes;
            if (still_to_send > 0) {
                nbytes = ::send (*w_, (char *) (dummy + dummy_size - still_to_send), still_to_send, 0);
                wsa_assert (nbytes != SOCKET_ERROR);
                still_to_send -= nbytes;
            }
            nbytes = ::recv (*r_, (char *) (dummy + dummy_size - still_to_recv), still_to_recv, 0);
            wsa_assert (nbytes != SOCKET_ERROR);
            still_to_recv -= nbytes;
        }
        free (dummy);
    }

554 555 556 557 558
    //  Save errno if error occurred in bind/listen/connect/accept.
    int saved_errno = 0;
    if (*r_ == INVALID_SOCKET)
        saved_errno = WSAGetLastError ();

559
    //  We don't need the listening socket anymore. Close it.
560
    rc = closesocket (listener);
561
    wsa_assert(rc != SOCKET_ERROR);
562

563 564
    if (sync != NULL) {
        //  Exit the critical section.
565 566 567 568 569
        BOOL brc;
        if (signaler_port == event_signaler_port)
            brc = SetEvent (sync);
        else
            brc = ReleaseMutex (sync);
570
        win_assert (brc != 0);
571

572 573 574 575
        //  Release the kernel object
        brc = CloseHandle (sync);
        win_assert (brc != 0);
    }
576

577
    if (*r_ != INVALID_SOCKET) {
578
#   if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
579
        //  On Windows, preventing sockets to be inherited by child processes.
580
        BOOL brc = SetHandleInformation ((HANDLE) *r_, HANDLE_FLAG_INHERIT, 0);
581
        win_assert (brc);
Pieter Hintjens's avatar
Pieter Hintjens committed
582
#   endif
583
        return 0;
Pieter Hintjens's avatar
Pieter Hintjens committed
584 585
    }
    else {
586
        //  Cleanup writer if connection failed
587 588 589 590 591
        if (*w_ != INVALID_SOCKET) {
            rc = closesocket (*w_);
            wsa_assert (rc != SOCKET_ERROR);
            *w_ = INVALID_SOCKET;
        }
592
        //  Set errno from saved value
593
        errno = wsa_error_to_errno (saved_errno);
594 595
        return -1;
    }
596 597 598 599 600 601 602 603 604

#elif defined ZMQ_HAVE_OPENVMS

    //  Whilst OpenVMS supports socketpair - it maps to AF_INET only.  Further,
    //  it does not set the socket options TCP_NODELAY and TCP_NODELACK which
    //  can lead to performance problems.
    //
    //  The bug will be fixed in V5.6 ECO4 and beyond.  In the meantime, we'll
    //  create the socket pair manually.
605
    struct sockaddr_in lcladdr;
Martin Hurton's avatar
Martin Hurton committed
606
    memset (&lcladdr, 0, sizeof lcladdr);
607 608 609 610
    lcladdr.sin_family = AF_INET;
    lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
    lcladdr.sin_port = 0;

611
    int listener = open_socket (AF_INET, SOCK_STREAM, 0);
612 613 614
    errno_assert (listener != -1);

    int on = 1;
Martin Hurton's avatar
Martin Hurton committed
615
    int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
616 617
    errno_assert (rc != -1);

Martin Hurton's avatar
Martin Hurton committed
618
    rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
619 620
    errno_assert (rc != -1);

Pieter Hintjens's avatar
Pieter Hintjens committed
621
    rc = bind (listener, (struct sockaddr *) &lcladdr, sizeof lcladdr);
622 623
    errno_assert (rc != -1);

Martin Hurton's avatar
Martin Hurton committed
624
    socklen_t lcladdr_len = sizeof lcladdr;
625

Pieter Hintjens's avatar
Pieter Hintjens committed
626
    rc = getsockname (listener, (struct sockaddr *) &lcladdr, &lcladdr_len);
627 628 629 630 631
    errno_assert (rc != -1);

    rc = listen (listener, 1);
    errno_assert (rc != -1);

632
    *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
633 634
    errno_assert (*w_ != -1);

Martin Hurton's avatar
Martin Hurton committed
635
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
636 637
    errno_assert (rc != -1);

Martin Hurton's avatar
Martin Hurton committed
638
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
639 640
    errno_assert (rc != -1);

Pieter Hintjens's avatar
Pieter Hintjens committed
641
    rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr);
642 643 644 645 646 647 648 649 650
    errno_assert (rc != -1);

    *r_ = accept (listener, NULL, NULL);
    errno_assert (*r_ != -1);

    close (listener);

    return 0;

Pieter Hintjens's avatar
Pieter Hintjens committed
651 652
#else
    // All other implementations support socketpair()
653
    int sv [2];
654 655 656 657 658 659 660 661
    int type = SOCK_STREAM;
    //  Setting this option result in sane behaviour when exec() functions
    //  are used. Old sockets are closed and don't block TCP ports, avoid
    //  leaks, etc.
#if defined ZMQ_HAVE_SOCK_CLOEXEC
    type |= SOCK_CLOEXEC;
#endif
    int rc = socketpair (AF_UNIX, type, 0, sv);
Pieter Hintjens's avatar
Pieter Hintjens committed
662 663
    if (rc == -1) {
        errno_assert (errno == ENFILE || errno == EMFILE);
664
        *w_ = *r_ = -1;
Pieter Hintjens's avatar
Pieter Hintjens committed
665 666 667
        return -1;
    }
    else {
668 669 670 671 672 673 674 675 676
        //  If there's no SOCK_CLOEXEC, let's try the second best option. Note that
        //  race condition can cause socket not to be closed (if fork happens
        //  between socket creation and this point).
#if !defined ZMQ_HAVE_SOCK_CLOEXEC && defined FD_CLOEXEC
        rc = fcntl (sv [0], F_SETFD, FD_CLOEXEC);
        errno_assert (rc != -1);
        rc = fcntl (sv [1], F_SETFD, FD_CLOEXEC);
        errno_assert (rc != -1);
#endif
Pieter Hintjens's avatar
Pieter Hintjens committed
677 678 679 680
        *w_ = sv [0];
        *r_ = sv [1];
        return 0;
    }
681 682
#endif
}