ip.cpp 26.8 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
31 32
#include "ip.hpp"
#include "err.hpp"
33
#include "macros.hpp"
34
#include "config.hpp"
35
#include "address.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
36

37
#if !defined ZMQ_HAVE_WINDOWS
38
#include <fcntl.h>
39 40
#include <sys/types.h>
#include <sys/socket.h>
41
#include <sys/stat.h>
42
#include <netdb.h>
43
#include <netinet/in.h>
44
#include <netinet/tcp.h>
45 46 47 48
#include <stdlib.h>
#include <unistd.h>

#include <vector>
49 50
#else
#include "tcp.hpp"
51 52 53
#ifdef ZMQ_HAVE_IPC
#include "ipc_address.hpp"
#endif
54 55

#include <direct.h>
56 57
#endif

58
#if defined ZMQ_HAVE_OPENVMS || defined ZMQ_HAVE_VXWORKS
59 60 61
#include <ioctl.h>
#endif

62 63 64 65 66 67
#if defined ZMQ_HAVE_VXWORKS
#include <unistd.h>
#include <sockLib.h>
#include <ioLib.h>
#endif

68 69 70 71 72 73 74 75 76 77 78 79
#if defined ZMQ_HAVE_EVENTFD
#include <sys/eventfd.h>
#endif

#if defined ZMQ_HAVE_OPENPGM
#ifdef ZMQ_HAVE_WINDOWS
#define __PGM_WININT_H__
#endif

#include <pgm/pgm.h>
#endif

80 81 82 83
#ifdef __APPLE__
#include <TargetConditionals.h>
#endif

84 85 86 87 88 89 90 91
#ifndef ZMQ_HAVE_WINDOWS
// Acceptable temporary directory environment variables
static const char *tmp_env_vars[] = {
  "TMPDIR", "TEMPDIR", "TMP",
  0 // Sentinel
};
#endif

92 93
zmq::fd_t zmq::open_socket (int domain_, int type_, int protocol_)
{
94 95
    int rc;

96 97
    //  Setting this option result in sane behaviour when exec() functions
    //  are used. Old sockets are closed and don't block TCP ports etc.
98
#if defined ZMQ_HAVE_SOCK_CLOEXEC
99 100 101
    type_ |= SOCK_CLOEXEC;
#endif

102 103 104
#if defined ZMQ_HAVE_WINDOWS && defined WSA_FLAG_NO_HANDLE_INHERIT
    // if supported, create socket with WSA_FLAG_NO_HANDLE_INHERIT, such that
    // the race condition in making it non-inheritable later is avoided
105 106 107
    const fd_t s =
      WSASocket (domain_, type_, protocol_, NULL, 0,
                 WSA_FLAG_OVERLAPPED || WSA_FLAG_NO_HANDLE_INHERIT);
108
#else
109
    const fd_t s = socket (domain_, type_, protocol_);
110
#endif
111
    if (s == retired_fd) {
112
#ifdef ZMQ_HAVE_WINDOWS
113
        errno = wsa_error_to_errno (WSAGetLastError ());
114
#endif
115 116
        return retired_fd;
    }
117

118
    make_socket_noninheritable (s);
119

120 121 122 123
    //  Socket is not yet connected so EINVAL is not a valid networking error
    rc = zmq::set_nosigpipe (s);
    errno_assert (rc == 0);

124 125 126
    return s;
}

127 128
void zmq::unblock_socket (fd_t s_)
{
129
#if defined ZMQ_HAVE_WINDOWS
130
    u_long nonblock = 1;
131
    const int rc = ioctlsocket (s_, FIONBIO, &nonblock);
132
    wsa_assert (rc != SOCKET_ERROR);
133
#elif defined ZMQ_HAVE_OPENVMS || defined ZMQ_HAVE_VXWORKS
134 135
    int nonblock = 1;
    int rc = ioctl (s_, FIONBIO, &nonblock);
136 137
    errno_assert (rc != -1);
#else
138 139
    int flags = fcntl (s_, F_GETFL, 0);
    if (flags == -1)
140
        flags = 0;
141
    int rc = fcntl (s_, F_SETFL, flags | O_NONBLOCK);
142 143 144 145
    errno_assert (rc != -1);
#endif
}

146 147
void zmq::enable_ipv4_mapping (fd_t s_)
{
148
    LIBZMQ_UNUSED (s_);
Matt Arsenault's avatar
Matt Arsenault committed
149

150 151
#if defined IPV6_V6ONLY && !defined ZMQ_HAVE_OPENBSD                           \
  && !defined ZMQ_HAVE_DRAGONFLY
152 153 154 155 156
#ifdef ZMQ_HAVE_WINDOWS
    DWORD flag = 0;
#else
    int flag = 0;
#endif
157 158
    const int rc = setsockopt (s_, IPPROTO_IPV6, IPV6_V6ONLY,
                               reinterpret_cast<char *> (&flag), sizeof (flag));
159 160 161 162 163 164 165
#ifdef ZMQ_HAVE_WINDOWS
    wsa_assert (rc != SOCKET_ERROR);
#else
    errno_assert (rc == 0);
#endif
#endif
}
Matt Arsenault's avatar
Matt Arsenault committed
166

167
int zmq::get_peer_ip_address (fd_t sockfd_, std::string &ip_addr_)
168 169 170
{
    struct sockaddr_storage ss;

171
    const zmq_socklen_t addrlen =
172 173 174
      get_socket_address (sockfd_, socket_end_remote, &ss);

    if (addrlen == 0) {
175
#ifdef ZMQ_HAVE_WINDOWS
176 177 178 179
        const int last_error = WSAGetLastError ();
        wsa_assert (last_error != WSANOTINITIALISED && last_error != WSAEFAULT
                    && last_error != WSAEINPROGRESS
                    && last_error != WSAENOTSOCK);
180
#elif !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE
181
        errno_assert (errno != EBADF && errno != EFAULT && errno != ENOTSOCK);
182 183 184
#else
        errno_assert (errno != EFAULT && errno != ENOTSOCK);
#endif
185
        return 0;
186 187
    }

188
    char host[NI_MAXHOST];
189 190 191
    const int rc =
      getnameinfo (reinterpret_cast<struct sockaddr *> (&ss), addrlen, host,
                   sizeof host, NULL, 0, NI_NUMERICHOST);
192
    if (rc != 0)
193
        return 0;
194 195

    ip_addr_ = host;
196

197 198
    union
    {
199 200 201 202 203
        struct sockaddr sa;
        struct sockaddr_storage sa_stor;
    } u;

    u.sa_stor = ss;
204
    return static_cast<int> (u.sa.sa_family);
205
}
206

207
void zmq::set_ip_type_of_service (fd_t s_, int iptos_)
208
{
209
    int rc = setsockopt (s_, IPPROTO_IP, IP_TOS,
210
                         reinterpret_cast<char *> (&iptos_), sizeof (iptos_));
211

212 213 214 215 216 217
#ifdef ZMQ_HAVE_WINDOWS
    wsa_assert (rc != SOCKET_ERROR);
#else
    errno_assert (rc == 0);
#endif

218
    //  Windows and Hurd do not support IPV6_TCLASS
219 220
#if !defined(ZMQ_HAVE_WINDOWS) && defined(IPV6_TCLASS)
    rc = setsockopt (s_, IPPROTO_IPV6, IPV6_TCLASS,
221
                     reinterpret_cast<char *> (&iptos_), sizeof (iptos_));
222

223 224
    //  If IPv6 is not enabled ENOPROTOOPT will be returned on Linux and
    //  EINVAL on OSX
225
    if (rc == -1) {
226
        errno_assert (errno == ENOPROTOOPT || errno == EINVAL);
227
    }
228 229
#endif
}
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249

int zmq::set_nosigpipe (fd_t s_)
{
#ifdef SO_NOSIGPIPE
    //  Make sure that SIGPIPE signal is not generated when writing to a
    //  connection that was already closed by the peer.
    //  As per POSIX spec, EINVAL will be returned if the socket was valid but
    //  the connection has been reset by the peer. Return an error so that the
    //  socket can be closed and the connection retried if necessary.
    int set = 1;
    int rc = setsockopt (s_, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int));
    if (rc != 0 && errno == EINVAL)
        return -1;
    errno_assert (rc == 0);
#else
    LIBZMQ_UNUSED (s_);
#endif

    return 0;
}
250

251
int zmq::bind_to_device (fd_t s_, const std::string &bound_device_)
252 253
{
#ifdef ZMQ_HAVE_SO_BINDTODEVICE
254 255
    int rc = setsockopt (s_, SOL_SOCKET, SO_BINDTODEVICE,
                         bound_device_.c_str (), bound_device_.length ());
256 257 258 259
    if (rc != 0) {
        assert_success_or_recoverable (s_, rc);
        return -1;
    }
260 261
    return 0;

262 263 264
#else
    LIBZMQ_UNUSED (s_);
    LIBZMQ_UNUSED (bound_device_);
265 266 267

    errno = ENOTSUP;
    return -1;
268 269
#endif
}
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300

bool zmq::initialize_network ()
{
#if defined ZMQ_HAVE_OPENPGM

    //  Init PGM transport. Ensure threading and timer are enabled. Find PGM
    //  protocol ID. Note that if you want to use gettimeofday and sleep for
    //  openPGM timing, set environment variables PGM_TIMER to "GTOD" and
    //  PGM_SLEEP to "USLEEP".
    pgm_error_t *pgm_error = NULL;
    const bool ok = pgm_init (&pgm_error);
    if (ok != TRUE) {
        //  Invalid parameters don't set pgm_error_t
        zmq_assert (pgm_error != NULL);
        if (pgm_error->domain == PGM_ERROR_DOMAIN_TIME
            && (pgm_error->code == PGM_ERROR_FAILED)) {
            //  Failed to access RTC or HPET device.
            pgm_error_free (pgm_error);
            errno = EINVAL;
            return false;
        }

        //  PGM_ERROR_DOMAIN_ENGINE: WSAStartup errors or missing WSARecvMsg.
        zmq_assert (false);
    }
#endif

#ifdef ZMQ_HAVE_WINDOWS
    //  Intialise Windows sockets. Note that WSAStartup can be called multiple
    //  times given that WSACleanup will be called for each WSAStartup.

301
    const WORD version_requested = MAKEWORD (2, 2);
302
    WSADATA wsa_data;
303
    const int rc = WSAStartup (version_requested, &wsa_data);
304 305 306 307 308 309 310 311 312 313 314 315
    zmq_assert (rc == 0);
    zmq_assert (LOBYTE (wsa_data.wVersion) == 2
                && HIBYTE (wsa_data.wVersion) == 2);
#endif

    return true;
}

void zmq::shutdown_network ()
{
#ifdef ZMQ_HAVE_WINDOWS
    //  On Windows, uninitialise socket layer.
316
    const int rc = WSACleanup ();
317 318 319 320 321 322 323 324 325 326 327
    wsa_assert (rc != SOCKET_ERROR);
#endif

#if defined ZMQ_HAVE_OPENPGM
    //  Shut down the OpenPGM library.
    if (pgm_shutdown () != TRUE)
        zmq_assert (false);
#endif
}

#if defined ZMQ_HAVE_WINDOWS
328
static void tune_socket (const SOCKET socket_)
329 330
{
    BOOL tcp_nodelay = 1;
331
    const int rc =
332
      setsockopt (socket_, IPPROTO_TCP, TCP_NODELAY,
333
                  reinterpret_cast<char *> (&tcp_nodelay), sizeof tcp_nodelay);
334 335
    wsa_assert (rc != SOCKET_ERROR);

336
    zmq::tcp_tune_loopback_fast_path (socket_);
337 338
}

339
static int make_fdpair_tcpip (zmq::fd_t *r_, zmq::fd_t *w_)
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366
{
#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
    //  Windows CE does not manage security attributes
    SECURITY_DESCRIPTOR sd;
    SECURITY_ATTRIBUTES sa;
    memset (&sd, 0, sizeof sd);
    memset (&sa, 0, sizeof sa);

    InitializeSecurityDescriptor (&sd, SECURITY_DESCRIPTOR_REVISION);
    SetSecurityDescriptorDacl (&sd, TRUE, 0, FALSE);

    sa.nLength = sizeof (SECURITY_ATTRIBUTES);
    sa.lpSecurityDescriptor = &sd;
#endif

    //  This function has to be in a system-wide critical section so that
    //  two instances of the library don't accidentally create signaler
    //  crossing the process boundary.
    //  We'll use named event object to implement the critical section.
    //  Note that if the event object already exists, the CreateEvent requests
    //  EVENT_ALL_ACCESS access right. If this fails, we try to open
    //  the event object asking for SYNCHRONIZE access only.
    HANDLE sync = NULL;

    //  Create critical section only if using fixed signaler port
    //  Use problematic Event implementation for compatibility if using old port 5905.
    //  Otherwise use Mutex implementation.
367
    const int event_signaler_port = 5905;
368

369
    if (zmq::signaler_port == event_signaler_port) {
370 371 372 373 374 375 376 377 378 379 380 381
#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
        sync =
          CreateEventW (&sa, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
#else
        sync =
          CreateEventW (NULL, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
#endif
        if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
            sync = OpenEventW (SYNCHRONIZE | EVENT_MODIFY_STATE, FALSE,
                               L"Global\\zmq-signaler-port-sync");

        win_assert (sync != NULL);
382
    } else if (zmq::signaler_port != 0) {
383 384 385
        wchar_t mutex_name[MAX_PATH];
#ifdef __MINGW32__
        _snwprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d",
386
                    zmq::signaler_port);
387 388
#else
        swprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d",
389
                  zmq::signaler_port);
390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409
#endif

#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
        sync = CreateMutexW (&sa, FALSE, mutex_name);
#else
        sync = CreateMutexW (NULL, FALSE, mutex_name);
#endif
        if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
            sync = OpenMutexW (SYNCHRONIZE, FALSE, mutex_name);

        win_assert (sync != NULL);
    }

    //  Windows has no 'socketpair' function. CreatePipe is no good as pipe
    //  handles cannot be polled on. Here we create the socketpair by hand.
    *w_ = INVALID_SOCKET;
    *r_ = INVALID_SOCKET;

    //  Create listening socket.
    SOCKET listener;
410
    listener = zmq::open_socket (AF_INET, SOCK_STREAM, 0);
411 412 413 414 415
    wsa_assert (listener != INVALID_SOCKET);

    //  Set SO_REUSEADDR and TCP_NODELAY on listening socket.
    BOOL so_reuseaddr = 1;
    int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
416 417
                         reinterpret_cast<char *> (&so_reuseaddr),
                         sizeof so_reuseaddr);
418 419 420 421 422 423 424 425 426
    wsa_assert (rc != SOCKET_ERROR);

    tune_socket (listener);

    //  Init sockaddr to signaler port.
    struct sockaddr_in addr;
    memset (&addr, 0, sizeof addr);
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
427
    addr.sin_port = htons (zmq::signaler_port);
428 429

    //  Create the writer socket.
430
    *w_ = zmq::open_socket (AF_INET, SOCK_STREAM, 0);
431 432 433 434
    wsa_assert (*w_ != INVALID_SOCKET);

    if (sync != NULL) {
        //  Enter the critical section.
435
        const DWORD dwrc = WaitForSingleObject (sync, INFINITE);
436 437 438 439
        zmq_assert (dwrc == WAIT_OBJECT_0 || dwrc == WAIT_ABANDONED);
    }

    //  Bind listening socket to signaler port.
440 441
    rc = bind (listener, reinterpret_cast<const struct sockaddr *> (&addr),
               sizeof addr);
442

443
    if (rc != SOCKET_ERROR && zmq::signaler_port == 0) {
444 445
        //  Retrieve ephemeral port number
        int addrlen = sizeof addr;
446 447
        rc = getsockname (listener, reinterpret_cast<struct sockaddr *> (&addr),
                          &addrlen);
448 449 450
    }

    //  Listen for incoming connections.
451
    if (rc != SOCKET_ERROR) {
452
        rc = listen (listener, 1);
453
    }
454 455

    //  Connect writer to the listener.
456
    if (rc != SOCKET_ERROR) {
457 458
        rc = connect (*w_, reinterpret_cast<struct sockaddr *> (&addr),
                      sizeof addr);
459
    }
460

461
    //  Accept connection from writer.
462 463 464 465
    if (rc != SOCKET_ERROR) {
        //  Set TCP_NODELAY on writer socket.
        tune_socket (*w_);

466
        *r_ = accept (listener, NULL, NULL);
467
    }
468 469 470 471

    //  Send/receive large chunk to work around TCP slow start
    //  This code is a workaround for #1608
    if (*r_ != INVALID_SOCKET) {
472
        const size_t dummy_size =
473
          1024 * 1024; //  1M to overload default receive buffer
474 475
        unsigned char *dummy =
          static_cast<unsigned char *> (malloc (dummy_size));
476 477
        wsa_assert (dummy);

478 479
        int still_to_send = static_cast<int> (dummy_size);
        int still_to_recv = static_cast<int> (dummy_size);
480 481 482
        while (still_to_send || still_to_recv) {
            int nbytes;
            if (still_to_send > 0) {
483 484 485 486
                nbytes = ::send (
                  *w_,
                  reinterpret_cast<char *> (dummy + dummy_size - still_to_send),
                  still_to_send, 0);
487 488 489
                wsa_assert (nbytes != SOCKET_ERROR);
                still_to_send -= nbytes;
            }
490 491 492 493
            nbytes = ::recv (
              *r_,
              reinterpret_cast<char *> (dummy + dummy_size - still_to_recv),
              still_to_recv, 0);
494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511
            wsa_assert (nbytes != SOCKET_ERROR);
            still_to_recv -= nbytes;
        }
        free (dummy);
    }

    //  Save errno if error occurred in bind/listen/connect/accept.
    int saved_errno = 0;
    if (*r_ == INVALID_SOCKET)
        saved_errno = WSAGetLastError ();

    //  We don't need the listening socket anymore. Close it.
    rc = closesocket (listener);
    wsa_assert (rc != SOCKET_ERROR);

    if (sync != NULL) {
        //  Exit the critical section.
        BOOL brc;
512
        if (zmq::signaler_port == event_signaler_port)
513 514 515 516 517 518 519 520 521 522 523
            brc = SetEvent (sync);
        else
            brc = ReleaseMutex (sync);
        win_assert (brc != 0);

        //  Release the kernel object
        brc = CloseHandle (sync);
        win_assert (brc != 0);
    }

    if (*r_ != INVALID_SOCKET) {
524
        zmq::make_socket_noninheritable (*r_);
525 526
        return 0;
    }
527 528 529 530 531 532 533
    //  Cleanup writer if connection failed
    if (*w_ != INVALID_SOCKET) {
        rc = closesocket (*w_);
        wsa_assert (rc != SOCKET_ERROR);
        *w_ = INVALID_SOCKET;
    }
    //  Set errno from saved value
534
    errno = zmq::wsa_error_to_errno (saved_errno);
535
    return -1;
536 537
}
#endif
538

539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554
int zmq::make_fdpair (fd_t *r_, fd_t *w_)
{
#if defined ZMQ_HAVE_EVENTFD
    int flags = 0;
#if defined ZMQ_HAVE_EVENTFD_CLOEXEC
    //  Setting this option result in sane behaviour when exec() functions
    //  are used. Old sockets are closed and don't block TCP ports, avoid
    //  leaks, etc.
    flags |= EFD_CLOEXEC;
#endif
    fd_t fd = eventfd (0, flags);
    if (fd == -1) {
        errno_assert (errno == ENFILE || errno == EMFILE);
        *w_ = *r_ = -1;
        return -1;
    }
555 556 557
    *w_ = *r_ = fd;
    return 0;

558

559
#elif defined ZMQ_HAVE_WINDOWS
560 561 562 563 564
#ifdef ZMQ_HAVE_IPC
    ipc_address_t address;
    std::string dirname, filename;

    //  Create a listening socket.
565
    const SOCKET listener = open_socket (AF_UNIX, SOCK_STREAM, 0);
566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642
    if (listener == retired_fd) {
        //  This may happen if the library was built on a system supporting AF_UNIX, but the system running doesn't support it.
        goto try_tcpip;
    }

    create_ipc_wildcard_address (dirname, filename);

    //  Initialise the address structure.
    int rc = address.resolve (filename.c_str ());
    if (rc != 0) {
        goto error_closelistener;
    }

    //  Bind the socket to the file path.
    rc = bind (listener, const_cast<sockaddr *> (address.addr ()),
               address.addrlen ());
    if (rc != 0) {
        errno = wsa_error_to_errno (WSAGetLastError ());
        goto error_closelistener;
    }

    //  Listen for incoming connections.
    rc = listen (listener, 1);
    if (rc != 0) {
        errno = wsa_error_to_errno (WSAGetLastError ());
        goto error_closelistener;
    }

    sockaddr_un lcladdr;
    socklen_t lcladdr_len = sizeof lcladdr;

    rc = getsockname (listener, reinterpret_cast<struct sockaddr *> (&lcladdr),
                      &lcladdr_len);
    wsa_assert (rc != -1);

    //  Create the client socket.
    *w_ = open_socket (AF_UNIX, SOCK_STREAM, 0);
    if (*w_ == -1) {
        errno = wsa_error_to_errno (WSAGetLastError ());
        goto error_closelistener;
    }

    //  Connect to the remote peer.
    rc = ::connect (*w_, reinterpret_cast<const struct sockaddr *> (&lcladdr),
                    lcladdr_len);
    if (rc == -1) {
        goto error_closeclient;
    }

    *r_ = accept (listener, NULL, NULL);
    errno_assert (*r_ != -1);

    //  Close the listener socket, we don't need it anymore.
    rc = closesocket (listener);
    wsa_assert (rc == 0);

    return 0;

error_closeclient:
    int saved_errno = errno;
    rc = closesocket (*w_);
    wsa_assert (rc == 0);
    errno = saved_errno;

error_closelistener:
    saved_errno = errno;
    rc = closesocket (listener);
    wsa_assert (rc == 0);
    errno = saved_errno;

    return -1;

try_tcpip:
    // try to fallback to TCP/IP
    // TODO: maybe remember this decision permanently?
#endif

643
    return make_fdpair_tcpip (r_, w_);
644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696
#elif defined ZMQ_HAVE_OPENVMS

    //  Whilst OpenVMS supports socketpair - it maps to AF_INET only.  Further,
    //  it does not set the socket options TCP_NODELAY and TCP_NODELACK which
    //  can lead to performance problems.
    //
    //  The bug will be fixed in V5.6 ECO4 and beyond.  In the meantime, we'll
    //  create the socket pair manually.
    struct sockaddr_in lcladdr;
    memset (&lcladdr, 0, sizeof lcladdr);
    lcladdr.sin_family = AF_INET;
    lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
    lcladdr.sin_port = 0;

    int listener = open_socket (AF_INET, SOCK_STREAM, 0);
    errno_assert (listener != -1);

    int on = 1;
    int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
    errno_assert (rc != -1);

    rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
    errno_assert (rc != -1);

    rc = bind (listener, (struct sockaddr *) &lcladdr, sizeof lcladdr);
    errno_assert (rc != -1);

    socklen_t lcladdr_len = sizeof lcladdr;

    rc = getsockname (listener, (struct sockaddr *) &lcladdr, &lcladdr_len);
    errno_assert (rc != -1);

    rc = listen (listener, 1);
    errno_assert (rc != -1);

    *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
    errno_assert (*w_ != -1);

    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
    errno_assert (rc != -1);

    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
    errno_assert (rc != -1);

    rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr);
    errno_assert (rc != -1);

    *r_ = accept (listener, NULL, NULL);
    errno_assert (*r_ != -1);

    close (listener);

    return 0;
697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715
#elif defined ZMQ_HAVE_VXWORKS
    struct sockaddr_in lcladdr;
    memset (&lcladdr, 0, sizeof lcladdr);
    lcladdr.sin_family = AF_INET;
    lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
    lcladdr.sin_port = 0;

    int listener = open_socket (AF_INET, SOCK_STREAM, 0);
    errno_assert (listener != -1);

    int on = 1;
    int rc =
      setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, (char *) &on, sizeof on);
    errno_assert (rc != -1);

    rc = bind (listener, (struct sockaddr *) &lcladdr, sizeof lcladdr);
    errno_assert (rc != -1);

    socklen_t lcladdr_len = sizeof lcladdr;
716

717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738
    rc = getsockname (listener, (struct sockaddr *) &lcladdr,
                      (int *) &lcladdr_len);
    errno_assert (rc != -1);

    rc = listen (listener, 1);
    errno_assert (rc != -1);

    *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
    errno_assert (*w_ != -1);

    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, (char *) &on, sizeof on);
    errno_assert (rc != -1);

    rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr);
    errno_assert (rc != -1);

    *r_ = accept (listener, NULL, NULL);
    errno_assert (*r_ != -1);

    close (listener);

    return 0;
739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754
#else
    // All other implementations support socketpair()
    int sv[2];
    int type = SOCK_STREAM;
    //  Setting this option result in sane behaviour when exec() functions
    //  are used. Old sockets are closed and don't block TCP ports, avoid
    //  leaks, etc.
#if defined ZMQ_HAVE_SOCK_CLOEXEC
    type |= SOCK_CLOEXEC;
#endif
    int rc = socketpair (AF_UNIX, type, 0, sv);
    if (rc == -1) {
        errno_assert (errno == ENFILE || errno == EMFILE);
        *w_ = *r_ = -1;
        return -1;
    } else {
755 756 757
        make_socket_noninheritable (sv[0]);
        make_socket_noninheritable (sv[1]);

758 759 760 761 762 763
        *w_ = sv[0];
        *r_ = sv[1];
        return 0;
    }
#endif
}
764

765
void zmq::make_socket_noninheritable (fd_t sock_)
766 767 768 769
{
#if defined ZMQ_HAVE_WINDOWS && !defined _WIN32_WCE                            \
  && !defined ZMQ_HAVE_WINDOWS_UWP
    //  On Windows, preventing sockets to be inherited by child processes.
770
    const BOOL brc = SetHandleInformation (reinterpret_cast<HANDLE> (sock_),
771 772
                                           HANDLE_FLAG_INHERIT, 0);
    win_assert (brc);
773
#elif (!defined ZMQ_HAVE_SOCK_CLOEXEC || !defined HAVE_ACCEPT4)                \
774 775 776 777
  && defined FD_CLOEXEC
    //  If there 's no SOCK_CLOEXEC, let's try the second best option.
    //  Race condition can cause socket not to be closed (if fork happens
    //  between accept and this point).
778
    const int rc = fcntl (sock_, F_SETFD, FD_CLOEXEC);
779
    errno_assert (rc != -1);
780 781
#else
    LIBZMQ_UNUSED (sock_);
782 783
#endif
}
784

785
void zmq::assert_success_or_recoverable (zmq::fd_t s_, int rc_)
786
{
787 788 789 790 791 792
#ifdef ZMQ_HAVE_WINDOWS
    if (rc_ != SOCKET_ERROR) {
        return;
    }
#else
    if (rc_ != -1) {
793
        return;
794 795
    }
#endif
796 797 798 799 800 801 802 803 804

    //  Check whether an error occurred
    int err = 0;
#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
    int len = sizeof err;
#else
    socklen_t len = sizeof err;
#endif

805 806
    const int rc = getsockopt (s_, SOL_SOCKET, SO_ERROR,
                               reinterpret_cast<char *> (&err), &len);
807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834

    //  Assert if the error was caused by 0MQ bug.
    //  Networking problems are OK. No need to assert.
#ifdef ZMQ_HAVE_WINDOWS
    zmq_assert (rc == 0);
    if (err != 0) {
        wsa_assert (err == WSAECONNREFUSED || err == WSAECONNRESET
                    || err == WSAECONNABORTED || err == WSAEINTR
                    || err == WSAETIMEDOUT || err == WSAEHOSTUNREACH
                    || err == WSAENETUNREACH || err == WSAENETDOWN
                    || err == WSAENETRESET || err == WSAEACCES
                    || err == WSAEINVAL || err == WSAEADDRINUSE);
    }
#else
    //  Following code should handle both Berkeley-derived socket
    //  implementations and Solaris.
    if (rc == -1)
        err = errno;
    if (err != 0) {
        errno = err;
        errno_assert (errno == ECONNREFUSED || errno == ECONNRESET
                      || errno == ECONNABORTED || errno == EINTR
                      || errno == ETIMEDOUT || errno == EHOSTUNREACH
                      || errno == ENETUNREACH || errno == ENETDOWN
                      || errno == ENETRESET || errno == EINVAL);
    }
#endif
}
835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861

#ifdef ZMQ_HAVE_IPC
int zmq::create_ipc_wildcard_address (std::string &path_, std::string &file_)
{
#if defined ZMQ_HAVE_WINDOWS
    char buffer[MAX_PATH];

    {
        const errno_t rc = tmpnam_s (buffer);
        errno_assert (rc == 0);
    }

    // TODO or use CreateDirectoryA and specify permissions?
    const int rc = _mkdir (buffer);
    if (rc != 0) {
        return -1;
    }

    path_.assign (buffer);
    file_ = path_ + "/socket";
#else
    std::string tmp_path;

    // If TMPDIR, TEMPDIR, or TMP are available and are directories, create
    // the socket directory there.
    const char **tmp_env = tmp_env_vars;
    while (tmp_path.empty () && *tmp_env != 0) {
862
        const char *const tmpdir = getenv (*tmp_env);
863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882
        struct stat statbuf;

        // Confirm it is actually a directory before trying to use
        if (tmpdir != 0 && ::stat (tmpdir, &statbuf) == 0
            && S_ISDIR (statbuf.st_mode)) {
            tmp_path.assign (tmpdir);
            if (*(tmp_path.rbegin ()) != '/') {
                tmp_path.push_back ('/');
            }
        }

        // Try the next environment variable
        ++tmp_env;
    }

    // Append a directory name
    tmp_path.append ("tmpXXXXXX");

    // We need room for tmp_path + trailing NUL
    std::vector<char> buffer (tmp_path.length () + 1);
883
    memcpy (&buffer[0], tmp_path.c_str (), tmp_path.length () + 1);
884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911

#if defined HAVE_MKDTEMP
    // Create the directory.  POSIX requires that mkdtemp() creates the
    // directory with 0700 permissions, meaning the only possible race
    // with socket creation could be the same user.  However, since
    // each socket is created in a directory created by mkdtemp(), and
    // mkdtemp() guarantees a unique directory name, there will be no
    // collision.
    if (mkdtemp (&buffer[0]) == 0) {
        return -1;
    }

    path_.assign (&buffer[0]);
    file_ = path_ + "/socket";
#else
    LIBZMQ_UNUSED (path_);
    int fd = mkstemp (&buffer[0]);
    if (fd == -1)
        return -1;
    ::close (fd);

    file_.assign (&buffer[0]);
#endif
#endif

    return 0;
}
#endif