ip.cpp 26.7 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
31 32
#include "ip.hpp"
#include "err.hpp"
33
#include "macros.hpp"
34
#include "config.hpp"
35
#include "address.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
36

37
#if !defined ZMQ_HAVE_WINDOWS
38
#include <fcntl.h>
39 40
#include <sys/types.h>
#include <sys/socket.h>
41
#include <sys/stat.h>
42
#include <netdb.h>
43
#include <netinet/in.h>
44
#include <netinet/tcp.h>
45 46 47 48
#include <stdlib.h>
#include <unistd.h>

#include <vector>
49 50
#else
#include "tcp.hpp"
51 52 53
#ifdef ZMQ_HAVE_IPC
#include "ipc_address.hpp"
#endif
54 55

#include <direct.h>
56 57
#endif

58
#if defined ZMQ_HAVE_OPENVMS || defined ZMQ_HAVE_VXWORKS
59 60 61
#include <ioctl.h>
#endif

62 63 64 65 66 67
#if defined ZMQ_HAVE_VXWORKS
#include <unistd.h>
#include <sockLib.h>
#include <ioLib.h>
#endif

68 69 70 71 72 73 74 75 76 77 78 79
#if defined ZMQ_HAVE_EVENTFD
#include <sys/eventfd.h>
#endif

#if defined ZMQ_HAVE_OPENPGM
#ifdef ZMQ_HAVE_WINDOWS
#define __PGM_WININT_H__
#endif

#include <pgm/pgm.h>
#endif

80 81 82 83
#ifdef __APPLE__
#include <TargetConditionals.h>
#endif

84 85 86 87 88 89 90 91
#ifndef ZMQ_HAVE_WINDOWS
// Acceptable temporary directory environment variables
static const char *tmp_env_vars[] = {
  "TMPDIR", "TEMPDIR", "TMP",
  0 // Sentinel
};
#endif

92 93
zmq::fd_t zmq::open_socket (int domain_, int type_, int protocol_)
{
94 95
    int rc;

96 97
    //  Setting this option result in sane behaviour when exec() functions
    //  are used. Old sockets are closed and don't block TCP ports etc.
98
#if defined ZMQ_HAVE_SOCK_CLOEXEC
99 100 101
    type_ |= SOCK_CLOEXEC;
#endif

102 103 104
#if defined ZMQ_HAVE_WINDOWS && defined WSA_FLAG_NO_HANDLE_INHERIT
    // if supported, create socket with WSA_FLAG_NO_HANDLE_INHERIT, such that
    // the race condition in making it non-inheritable later is avoided
105 106 107
    const fd_t s =
      WSASocket (domain_, type_, protocol_, NULL, 0,
                 WSA_FLAG_OVERLAPPED || WSA_FLAG_NO_HANDLE_INHERIT);
108
#else
109
    const fd_t s = socket (domain_, type_, protocol_);
110
#endif
111
    if (s == retired_fd) {
112
#ifdef ZMQ_HAVE_WINDOWS
113
        errno = wsa_error_to_errno (WSAGetLastError ());
114
#endif
115 116
        return retired_fd;
    }
117

118
    make_socket_noninheritable (s);
119

120 121 122 123
    //  Socket is not yet connected so EINVAL is not a valid networking error
    rc = zmq::set_nosigpipe (s);
    errno_assert (rc == 0);

124 125 126
    return s;
}

127 128
void zmq::unblock_socket (fd_t s_)
{
129
#if defined ZMQ_HAVE_WINDOWS
130 131 132
    u_long nonblock = 1;
    int rc = ioctlsocket (s_, FIONBIO, &nonblock);
    wsa_assert (rc != SOCKET_ERROR);
133
#elif defined ZMQ_HAVE_OPENVMS || defined ZMQ_HAVE_VXWORKS
134 135
    int nonblock = 1;
    int rc = ioctl (s_, FIONBIO, &nonblock);
136 137
    errno_assert (rc != -1);
#else
138 139
    int flags = fcntl (s_, F_GETFL, 0);
    if (flags == -1)
140
        flags = 0;
141
    int rc = fcntl (s_, F_SETFL, flags | O_NONBLOCK);
142 143 144 145
    errno_assert (rc != -1);
#endif
}

146 147
void zmq::enable_ipv4_mapping (fd_t s_)
{
148
    LIBZMQ_UNUSED (s_);
Matt Arsenault's avatar
Matt Arsenault committed
149

150 151
#if defined IPV6_V6ONLY && !defined ZMQ_HAVE_OPENBSD                           \
  && !defined ZMQ_HAVE_DRAGONFLY
152 153 154 155 156
#ifdef ZMQ_HAVE_WINDOWS
    DWORD flag = 0;
#else
    int flag = 0;
#endif
157 158
    int rc = setsockopt (s_, IPPROTO_IPV6, IPV6_V6ONLY,
                         reinterpret_cast<char *> (&flag), sizeof (flag));
159 160 161 162 163 164 165
#ifdef ZMQ_HAVE_WINDOWS
    wsa_assert (rc != SOCKET_ERROR);
#else
    errno_assert (rc == 0);
#endif
#endif
}
Matt Arsenault's avatar
Matt Arsenault committed
166

167
int zmq::get_peer_ip_address (fd_t sockfd_, std::string &ip_addr_)
168 169 170
{
    struct sockaddr_storage ss;

171 172 173 174
    zmq_socklen_t addrlen =
      get_socket_address (sockfd_, socket_end_remote, &ss);

    if (addrlen == 0) {
175
#ifdef ZMQ_HAVE_WINDOWS
176 177 178 179
        const int last_error = WSAGetLastError ();
        wsa_assert (last_error != WSANOTINITIALISED && last_error != WSAEFAULT
                    && last_error != WSAEINPROGRESS
                    && last_error != WSAENOTSOCK);
180
#elif !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE
181
        errno_assert (errno != EBADF && errno != EFAULT && errno != ENOTSOCK);
182 183 184
#else
        errno_assert (errno != EFAULT && errno != ENOTSOCK);
#endif
185
        return 0;
186 187
    }

188
    char host[NI_MAXHOST];
189 190
    int rc = getnameinfo (reinterpret_cast<struct sockaddr *> (&ss), addrlen,
                          host, sizeof host, NULL, 0, NI_NUMERICHOST);
191
    if (rc != 0)
192
        return 0;
193 194

    ip_addr_ = host;
195

196 197
    union
    {
198 199 200 201 202
        struct sockaddr sa;
        struct sockaddr_storage sa_stor;
    } u;

    u.sa_stor = ss;
203
    return static_cast<int> (u.sa.sa_family);
204
}
205

206
void zmq::set_ip_type_of_service (fd_t s_, int iptos_)
207
{
208
    int rc = setsockopt (s_, IPPROTO_IP, IP_TOS,
209
                         reinterpret_cast<char *> (&iptos_), sizeof (iptos_));
210

211 212 213 214 215 216
#ifdef ZMQ_HAVE_WINDOWS
    wsa_assert (rc != SOCKET_ERROR);
#else
    errno_assert (rc == 0);
#endif

217
    //  Windows and Hurd do not support IPV6_TCLASS
218 219
#if !defined(ZMQ_HAVE_WINDOWS) && defined(IPV6_TCLASS)
    rc = setsockopt (s_, IPPROTO_IPV6, IPV6_TCLASS,
220
                     reinterpret_cast<char *> (&iptos_), sizeof (iptos_));
221

222 223
    //  If IPv6 is not enabled ENOPROTOOPT will be returned on Linux and
    //  EINVAL on OSX
224
    if (rc == -1) {
225
        errno_assert (errno == ENOPROTOOPT || errno == EINVAL);
226
    }
227 228
#endif
}
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248

int zmq::set_nosigpipe (fd_t s_)
{
#ifdef SO_NOSIGPIPE
    //  Make sure that SIGPIPE signal is not generated when writing to a
    //  connection that was already closed by the peer.
    //  As per POSIX spec, EINVAL will be returned if the socket was valid but
    //  the connection has been reset by the peer. Return an error so that the
    //  socket can be closed and the connection retried if necessary.
    int set = 1;
    int rc = setsockopt (s_, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int));
    if (rc != 0 && errno == EINVAL)
        return -1;
    errno_assert (rc == 0);
#else
    LIBZMQ_UNUSED (s_);
#endif

    return 0;
}
249

250
int zmq::bind_to_device (fd_t s_, const std::string &bound_device_)
251 252
{
#ifdef ZMQ_HAVE_SO_BINDTODEVICE
253 254
    int rc = setsockopt (s_, SOL_SOCKET, SO_BINDTODEVICE,
                         bound_device_.c_str (), bound_device_.length ());
255 256 257 258
    if (rc != 0) {
        assert_success_or_recoverable (s_, rc);
        return -1;
    }
259 260
    return 0;

261 262 263
#else
    LIBZMQ_UNUSED (s_);
    LIBZMQ_UNUSED (bound_device_);
264 265 266

    errno = ENOTSUP;
    return -1;
267 268
#endif
}
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326

bool zmq::initialize_network ()
{
#if defined ZMQ_HAVE_OPENPGM

    //  Init PGM transport. Ensure threading and timer are enabled. Find PGM
    //  protocol ID. Note that if you want to use gettimeofday and sleep for
    //  openPGM timing, set environment variables PGM_TIMER to "GTOD" and
    //  PGM_SLEEP to "USLEEP".
    pgm_error_t *pgm_error = NULL;
    const bool ok = pgm_init (&pgm_error);
    if (ok != TRUE) {
        //  Invalid parameters don't set pgm_error_t
        zmq_assert (pgm_error != NULL);
        if (pgm_error->domain == PGM_ERROR_DOMAIN_TIME
            && (pgm_error->code == PGM_ERROR_FAILED)) {
            //  Failed to access RTC or HPET device.
            pgm_error_free (pgm_error);
            errno = EINVAL;
            return false;
        }

        //  PGM_ERROR_DOMAIN_ENGINE: WSAStartup errors or missing WSARecvMsg.
        zmq_assert (false);
    }
#endif

#ifdef ZMQ_HAVE_WINDOWS
    //  Intialise Windows sockets. Note that WSAStartup can be called multiple
    //  times given that WSACleanup will be called for each WSAStartup.

    WORD version_requested = MAKEWORD (2, 2);
    WSADATA wsa_data;
    int rc = WSAStartup (version_requested, &wsa_data);
    zmq_assert (rc == 0);
    zmq_assert (LOBYTE (wsa_data.wVersion) == 2
                && HIBYTE (wsa_data.wVersion) == 2);
#endif

    return true;
}

void zmq::shutdown_network ()
{
#ifdef ZMQ_HAVE_WINDOWS
    //  On Windows, uninitialise socket layer.
    int rc = WSACleanup ();
    wsa_assert (rc != SOCKET_ERROR);
#endif

#if defined ZMQ_HAVE_OPENPGM
    //  Shut down the OpenPGM library.
    if (pgm_shutdown () != TRUE)
        zmq_assert (false);
#endif
}

#if defined ZMQ_HAVE_WINDOWS
327
static void tune_socket (const SOCKET socket_)
328 329
{
    BOOL tcp_nodelay = 1;
330
    int rc =
331
      setsockopt (socket_, IPPROTO_TCP, TCP_NODELAY,
332
                  reinterpret_cast<char *> (&tcp_nodelay), sizeof tcp_nodelay);
333 334
    wsa_assert (rc != SOCKET_ERROR);

335
    zmq::tcp_tune_loopback_fast_path (socket_);
336 337
}

338
static int make_fdpair_tcpip (zmq::fd_t *r_, zmq::fd_t *w_)
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367
{
#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
    //  Windows CE does not manage security attributes
    SECURITY_DESCRIPTOR sd;
    SECURITY_ATTRIBUTES sa;
    memset (&sd, 0, sizeof sd);
    memset (&sa, 0, sizeof sa);

    InitializeSecurityDescriptor (&sd, SECURITY_DESCRIPTOR_REVISION);
    SetSecurityDescriptorDacl (&sd, TRUE, 0, FALSE);

    sa.nLength = sizeof (SECURITY_ATTRIBUTES);
    sa.lpSecurityDescriptor = &sd;
#endif

    //  This function has to be in a system-wide critical section so that
    //  two instances of the library don't accidentally create signaler
    //  crossing the process boundary.
    //  We'll use named event object to implement the critical section.
    //  Note that if the event object already exists, the CreateEvent requests
    //  EVENT_ALL_ACCESS access right. If this fails, we try to open
    //  the event object asking for SYNCHRONIZE access only.
    HANDLE sync = NULL;

    //  Create critical section only if using fixed signaler port
    //  Use problematic Event implementation for compatibility if using old port 5905.
    //  Otherwise use Mutex implementation.
    int event_signaler_port = 5905;

368
    if (zmq::signaler_port == event_signaler_port) {
369 370 371 372 373 374 375 376 377 378 379 380
#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
        sync =
          CreateEventW (&sa, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
#else
        sync =
          CreateEventW (NULL, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
#endif
        if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
            sync = OpenEventW (SYNCHRONIZE | EVENT_MODIFY_STATE, FALSE,
                               L"Global\\zmq-signaler-port-sync");

        win_assert (sync != NULL);
381
    } else if (zmq::signaler_port != 0) {
382 383 384
        wchar_t mutex_name[MAX_PATH];
#ifdef __MINGW32__
        _snwprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d",
385
                    zmq::signaler_port);
386 387
#else
        swprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d",
388
                  zmq::signaler_port);
389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408
#endif

#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
        sync = CreateMutexW (&sa, FALSE, mutex_name);
#else
        sync = CreateMutexW (NULL, FALSE, mutex_name);
#endif
        if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED)
            sync = OpenMutexW (SYNCHRONIZE, FALSE, mutex_name);

        win_assert (sync != NULL);
    }

    //  Windows has no 'socketpair' function. CreatePipe is no good as pipe
    //  handles cannot be polled on. Here we create the socketpair by hand.
    *w_ = INVALID_SOCKET;
    *r_ = INVALID_SOCKET;

    //  Create listening socket.
    SOCKET listener;
409
    listener = zmq::open_socket (AF_INET, SOCK_STREAM, 0);
410 411 412 413 414
    wsa_assert (listener != INVALID_SOCKET);

    //  Set SO_REUSEADDR and TCP_NODELAY on listening socket.
    BOOL so_reuseaddr = 1;
    int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
415 416
                         reinterpret_cast<char *> (&so_reuseaddr),
                         sizeof so_reuseaddr);
417 418 419 420 421 422 423 424 425
    wsa_assert (rc != SOCKET_ERROR);

    tune_socket (listener);

    //  Init sockaddr to signaler port.
    struct sockaddr_in addr;
    memset (&addr, 0, sizeof addr);
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
426
    addr.sin_port = htons (zmq::signaler_port);
427 428

    //  Create the writer socket.
429
    *w_ = zmq::open_socket (AF_INET, SOCK_STREAM, 0);
430 431 432 433 434 435 436 437 438
    wsa_assert (*w_ != INVALID_SOCKET);

    if (sync != NULL) {
        //  Enter the critical section.
        DWORD dwrc = WaitForSingleObject (sync, INFINITE);
        zmq_assert (dwrc == WAIT_OBJECT_0 || dwrc == WAIT_ABANDONED);
    }

    //  Bind listening socket to signaler port.
439 440
    rc = bind (listener, reinterpret_cast<const struct sockaddr *> (&addr),
               sizeof addr);
441

442
    if (rc != SOCKET_ERROR && zmq::signaler_port == 0) {
443 444
        //  Retrieve ephemeral port number
        int addrlen = sizeof addr;
445 446
        rc = getsockname (listener, reinterpret_cast<struct sockaddr *> (&addr),
                          &addrlen);
447 448 449
    }

    //  Listen for incoming connections.
450
    if (rc != SOCKET_ERROR) {
451
        rc = listen (listener, 1);
452
    }
453 454

    //  Connect writer to the listener.
455
    if (rc != SOCKET_ERROR) {
456 457
        rc = connect (*w_, reinterpret_cast<struct sockaddr *> (&addr),
                      sizeof addr);
458
    }
459

460
    //  Accept connection from writer.
461 462 463 464
    if (rc != SOCKET_ERROR) {
        //  Set TCP_NODELAY on writer socket.
        tune_socket (*w_);

465
        *r_ = accept (listener, NULL, NULL);
466
    }
467 468 469 470 471 472

    //  Send/receive large chunk to work around TCP slow start
    //  This code is a workaround for #1608
    if (*r_ != INVALID_SOCKET) {
        size_t dummy_size =
          1024 * 1024; //  1M to overload default receive buffer
473 474
        unsigned char *dummy =
          static_cast<unsigned char *> (malloc (dummy_size));
475 476
        wsa_assert (dummy);

477 478
        int still_to_send = static_cast<int> (dummy_size);
        int still_to_recv = static_cast<int> (dummy_size);
479 480 481
        while (still_to_send || still_to_recv) {
            int nbytes;
            if (still_to_send > 0) {
482 483 484 485
                nbytes = ::send (
                  *w_,
                  reinterpret_cast<char *> (dummy + dummy_size - still_to_send),
                  still_to_send, 0);
486 487 488
                wsa_assert (nbytes != SOCKET_ERROR);
                still_to_send -= nbytes;
            }
489 490 491 492
            nbytes = ::recv (
              *r_,
              reinterpret_cast<char *> (dummy + dummy_size - still_to_recv),
              still_to_recv, 0);
493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510
            wsa_assert (nbytes != SOCKET_ERROR);
            still_to_recv -= nbytes;
        }
        free (dummy);
    }

    //  Save errno if error occurred in bind/listen/connect/accept.
    int saved_errno = 0;
    if (*r_ == INVALID_SOCKET)
        saved_errno = WSAGetLastError ();

    //  We don't need the listening socket anymore. Close it.
    rc = closesocket (listener);
    wsa_assert (rc != SOCKET_ERROR);

    if (sync != NULL) {
        //  Exit the critical section.
        BOOL brc;
511
        if (zmq::signaler_port == event_signaler_port)
512 513 514 515 516 517 518 519 520 521 522
            brc = SetEvent (sync);
        else
            brc = ReleaseMutex (sync);
        win_assert (brc != 0);

        //  Release the kernel object
        brc = CloseHandle (sync);
        win_assert (brc != 0);
    }

    if (*r_ != INVALID_SOCKET) {
523
        zmq::make_socket_noninheritable (*r_);
524 525
        return 0;
    }
526 527 528 529 530 531 532
    //  Cleanup writer if connection failed
    if (*w_ != INVALID_SOCKET) {
        rc = closesocket (*w_);
        wsa_assert (rc != SOCKET_ERROR);
        *w_ = INVALID_SOCKET;
    }
    //  Set errno from saved value
533
    errno = zmq::wsa_error_to_errno (saved_errno);
534
    return -1;
535 536
}
#endif
537

538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553
int zmq::make_fdpair (fd_t *r_, fd_t *w_)
{
#if defined ZMQ_HAVE_EVENTFD
    int flags = 0;
#if defined ZMQ_HAVE_EVENTFD_CLOEXEC
    //  Setting this option result in sane behaviour when exec() functions
    //  are used. Old sockets are closed and don't block TCP ports, avoid
    //  leaks, etc.
    flags |= EFD_CLOEXEC;
#endif
    fd_t fd = eventfd (0, flags);
    if (fd == -1) {
        errno_assert (errno == ENFILE || errno == EMFILE);
        *w_ = *r_ = -1;
        return -1;
    }
554 555 556
    *w_ = *r_ = fd;
    return 0;

557

558
#elif defined ZMQ_HAVE_WINDOWS
559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641
#ifdef ZMQ_HAVE_IPC
    ipc_address_t address;
    std::string dirname, filename;

    //  Create a listening socket.
    SOCKET listener = open_socket (AF_UNIX, SOCK_STREAM, 0);
    if (listener == retired_fd) {
        //  This may happen if the library was built on a system supporting AF_UNIX, but the system running doesn't support it.
        goto try_tcpip;
    }

    create_ipc_wildcard_address (dirname, filename);

    //  Initialise the address structure.
    int rc = address.resolve (filename.c_str ());
    if (rc != 0) {
        goto error_closelistener;
    }

    //  Bind the socket to the file path.
    rc = bind (listener, const_cast<sockaddr *> (address.addr ()),
               address.addrlen ());
    if (rc != 0) {
        errno = wsa_error_to_errno (WSAGetLastError ());
        goto error_closelistener;
    }

    //  Listen for incoming connections.
    rc = listen (listener, 1);
    if (rc != 0) {
        errno = wsa_error_to_errno (WSAGetLastError ());
        goto error_closelistener;
    }

    sockaddr_un lcladdr;
    socklen_t lcladdr_len = sizeof lcladdr;

    rc = getsockname (listener, reinterpret_cast<struct sockaddr *> (&lcladdr),
                      &lcladdr_len);
    wsa_assert (rc != -1);

    //  Create the client socket.
    *w_ = open_socket (AF_UNIX, SOCK_STREAM, 0);
    if (*w_ == -1) {
        errno = wsa_error_to_errno (WSAGetLastError ());
        goto error_closelistener;
    }

    //  Connect to the remote peer.
    rc = ::connect (*w_, reinterpret_cast<const struct sockaddr *> (&lcladdr),
                    lcladdr_len);
    if (rc == -1) {
        goto error_closeclient;
    }

    *r_ = accept (listener, NULL, NULL);
    errno_assert (*r_ != -1);

    //  Close the listener socket, we don't need it anymore.
    rc = closesocket (listener);
    wsa_assert (rc == 0);

    return 0;

error_closeclient:
    int saved_errno = errno;
    rc = closesocket (*w_);
    wsa_assert (rc == 0);
    errno = saved_errno;

error_closelistener:
    saved_errno = errno;
    rc = closesocket (listener);
    wsa_assert (rc == 0);
    errno = saved_errno;

    return -1;

try_tcpip:
    // try to fallback to TCP/IP
    // TODO: maybe remember this decision permanently?
#endif

642
    return make_fdpair_tcpip (r_, w_);
643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695
#elif defined ZMQ_HAVE_OPENVMS

    //  Whilst OpenVMS supports socketpair - it maps to AF_INET only.  Further,
    //  it does not set the socket options TCP_NODELAY and TCP_NODELACK which
    //  can lead to performance problems.
    //
    //  The bug will be fixed in V5.6 ECO4 and beyond.  In the meantime, we'll
    //  create the socket pair manually.
    struct sockaddr_in lcladdr;
    memset (&lcladdr, 0, sizeof lcladdr);
    lcladdr.sin_family = AF_INET;
    lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
    lcladdr.sin_port = 0;

    int listener = open_socket (AF_INET, SOCK_STREAM, 0);
    errno_assert (listener != -1);

    int on = 1;
    int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
    errno_assert (rc != -1);

    rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
    errno_assert (rc != -1);

    rc = bind (listener, (struct sockaddr *) &lcladdr, sizeof lcladdr);
    errno_assert (rc != -1);

    socklen_t lcladdr_len = sizeof lcladdr;

    rc = getsockname (listener, (struct sockaddr *) &lcladdr, &lcladdr_len);
    errno_assert (rc != -1);

    rc = listen (listener, 1);
    errno_assert (rc != -1);

    *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
    errno_assert (*w_ != -1);

    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on);
    errno_assert (rc != -1);

    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
    errno_assert (rc != -1);

    rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr);
    errno_assert (rc != -1);

    *r_ = accept (listener, NULL, NULL);
    errno_assert (*r_ != -1);

    close (listener);

    return 0;
696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714
#elif defined ZMQ_HAVE_VXWORKS
    struct sockaddr_in lcladdr;
    memset (&lcladdr, 0, sizeof lcladdr);
    lcladdr.sin_family = AF_INET;
    lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
    lcladdr.sin_port = 0;

    int listener = open_socket (AF_INET, SOCK_STREAM, 0);
    errno_assert (listener != -1);

    int on = 1;
    int rc =
      setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, (char *) &on, sizeof on);
    errno_assert (rc != -1);

    rc = bind (listener, (struct sockaddr *) &lcladdr, sizeof lcladdr);
    errno_assert (rc != -1);

    socklen_t lcladdr_len = sizeof lcladdr;
715

716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737
    rc = getsockname (listener, (struct sockaddr *) &lcladdr,
                      (int *) &lcladdr_len);
    errno_assert (rc != -1);

    rc = listen (listener, 1);
    errno_assert (rc != -1);

    *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
    errno_assert (*w_ != -1);

    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, (char *) &on, sizeof on);
    errno_assert (rc != -1);

    rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr);
    errno_assert (rc != -1);

    *r_ = accept (listener, NULL, NULL);
    errno_assert (*r_ != -1);

    close (listener);

    return 0;
738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753
#else
    // All other implementations support socketpair()
    int sv[2];
    int type = SOCK_STREAM;
    //  Setting this option result in sane behaviour when exec() functions
    //  are used. Old sockets are closed and don't block TCP ports, avoid
    //  leaks, etc.
#if defined ZMQ_HAVE_SOCK_CLOEXEC
    type |= SOCK_CLOEXEC;
#endif
    int rc = socketpair (AF_UNIX, type, 0, sv);
    if (rc == -1) {
        errno_assert (errno == ENFILE || errno == EMFILE);
        *w_ = *r_ = -1;
        return -1;
    } else {
754 755 756
        make_socket_noninheritable (sv[0]);
        make_socket_noninheritable (sv[1]);

757 758 759 760 761 762
        *w_ = sv[0];
        *r_ = sv[1];
        return 0;
    }
#endif
}
763

764
void zmq::make_socket_noninheritable (fd_t sock_)
765 766 767 768
{
#if defined ZMQ_HAVE_WINDOWS && !defined _WIN32_WCE                            \
  && !defined ZMQ_HAVE_WINDOWS_UWP
    //  On Windows, preventing sockets to be inherited by child processes.
769
    const BOOL brc = SetHandleInformation (reinterpret_cast<HANDLE> (sock_),
770 771
                                           HANDLE_FLAG_INHERIT, 0);
    win_assert (brc);
772
#elif (!defined ZMQ_HAVE_SOCK_CLOEXEC || !defined HAVE_ACCEPT4)                \
773 774 775 776
  && defined FD_CLOEXEC
    //  If there 's no SOCK_CLOEXEC, let's try the second best option.
    //  Race condition can cause socket not to be closed (if fork happens
    //  between accept and this point).
777
    const int rc = fcntl (sock_, F_SETFD, FD_CLOEXEC);
778
    errno_assert (rc != -1);
779 780
#else
    LIBZMQ_UNUSED (sock_);
781 782
#endif
}
783

784
void zmq::assert_success_or_recoverable (zmq::fd_t s_, int rc_)
785
{
786 787 788 789 790 791
#ifdef ZMQ_HAVE_WINDOWS
    if (rc_ != SOCKET_ERROR) {
        return;
    }
#else
    if (rc_ != -1) {
792
        return;
793 794
    }
#endif
795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833

    //  Check whether an error occurred
    int err = 0;
#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
    int len = sizeof err;
#else
    socklen_t len = sizeof err;
#endif

    int rc = getsockopt (s_, SOL_SOCKET, SO_ERROR,
                         reinterpret_cast<char *> (&err), &len);

    //  Assert if the error was caused by 0MQ bug.
    //  Networking problems are OK. No need to assert.
#ifdef ZMQ_HAVE_WINDOWS
    zmq_assert (rc == 0);
    if (err != 0) {
        wsa_assert (err == WSAECONNREFUSED || err == WSAECONNRESET
                    || err == WSAECONNABORTED || err == WSAEINTR
                    || err == WSAETIMEDOUT || err == WSAEHOSTUNREACH
                    || err == WSAENETUNREACH || err == WSAENETDOWN
                    || err == WSAENETRESET || err == WSAEACCES
                    || err == WSAEINVAL || err == WSAEADDRINUSE);
    }
#else
    //  Following code should handle both Berkeley-derived socket
    //  implementations and Solaris.
    if (rc == -1)
        err = errno;
    if (err != 0) {
        errno = err;
        errno_assert (errno == ECONNREFUSED || errno == ECONNRESET
                      || errno == ECONNABORTED || errno == EINTR
                      || errno == ETIMEDOUT || errno == EHOSTUNREACH
                      || errno == ENETUNREACH || errno == ENETDOWN
                      || errno == ENETRESET || errno == EINVAL);
    }
#endif
}
834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881

#ifdef ZMQ_HAVE_IPC
int zmq::create_ipc_wildcard_address (std::string &path_, std::string &file_)
{
#if defined ZMQ_HAVE_WINDOWS
    char buffer[MAX_PATH];

    {
        const errno_t rc = tmpnam_s (buffer);
        errno_assert (rc == 0);
    }

    // TODO or use CreateDirectoryA and specify permissions?
    const int rc = _mkdir (buffer);
    if (rc != 0) {
        return -1;
    }

    path_.assign (buffer);
    file_ = path_ + "/socket";
#else
    std::string tmp_path;

    // If TMPDIR, TEMPDIR, or TMP are available and are directories, create
    // the socket directory there.
    const char **tmp_env = tmp_env_vars;
    while (tmp_path.empty () && *tmp_env != 0) {
        char *tmpdir = getenv (*tmp_env);
        struct stat statbuf;

        // Confirm it is actually a directory before trying to use
        if (tmpdir != 0 && ::stat (tmpdir, &statbuf) == 0
            && S_ISDIR (statbuf.st_mode)) {
            tmp_path.assign (tmpdir);
            if (*(tmp_path.rbegin ()) != '/') {
                tmp_path.push_back ('/');
            }
        }

        // Try the next environment variable
        ++tmp_env;
    }

    // Append a directory name
    tmp_path.append ("tmpXXXXXX");

    // We need room for tmp_path + trailing NUL
    std::vector<char> buffer (tmp_path.length () + 1);
882
    memcpy (&buffer[0], tmp_path.c_str (), tmp_path.length () + 1);
883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910

#if defined HAVE_MKDTEMP
    // Create the directory.  POSIX requires that mkdtemp() creates the
    // directory with 0700 permissions, meaning the only possible race
    // with socket creation could be the same user.  However, since
    // each socket is created in a directory created by mkdtemp(), and
    // mkdtemp() guarantees a unique directory name, there will be no
    // collision.
    if (mkdtemp (&buffer[0]) == 0) {
        return -1;
    }

    path_.assign (&buffer[0]);
    file_ = path_ + "/socket";
#else
    LIBZMQ_UNUSED (path_);
    int fd = mkstemp (&buffer[0]);
    if (fd == -1)
        return -1;
    ::close (fd);

    file_.assign (&buffer[0]);
#endif
#endif

    return 0;
}
#endif