tcp_connecter.cpp 12 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include <new>
32 33
#include <string>

34
#include "macros.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
35
#include "tcp_connecter.hpp"
36
#include "stream_engine.hpp"
37
#include "io_thread.hpp"
38
#include "random.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
39
#include "err.hpp"
40
#include "ip.hpp"
41
#include "tcp.hpp"
42 43
#include "address.hpp"
#include "tcp_address.hpp"
44
#include "session_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
45

46
#if !defined ZMQ_HAVE_WINDOWS
47 48 49 50 51 52 53 54 55 56 57 58
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <netdb.h>
#include <fcntl.h>
#ifdef ZMQ_HAVE_OPENVMS
#include <ioctl.h>
#endif
#endif
Martin Sustrik's avatar
Martin Sustrik committed
59

60
zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
61
      class session_base_t *session_, const options_t &options_,
62
      address_t *addr_, bool delayed_start_) :
63 64
    own_t (io_thread_, options_),
    io_object_t (io_thread_),
65
    addr (addr_),
66
    s (retired_fd),
67
    handle((handle_t)NULL),
68
    handle_valid (false),
69
    delayed_start (delayed_start_),
70
    connect_timer_started (false),
71
    reconnect_timer_started (false),
72
    session (session_),
Martin Hurton's avatar
Martin Hurton committed
73
    current_reconnect_ivl (options.reconnect_ivl)
unknown's avatar
unknown committed
74
{
75 76
    zmq_assert (addr);
    zmq_assert (addr->protocol == "tcp");
77
    addr->to_string (endpoint);
Martin Hurton's avatar
Martin Hurton committed
78
    socket = session->get_socket ();
unknown's avatar
unknown committed
79 80 81 82
}

zmq::tcp_connecter_t::~tcp_connecter_t ()
{
83
    zmq_assert (!connect_timer_started);
84
    zmq_assert (!reconnect_timer_started);
85 86
    zmq_assert (!handle_valid);
    zmq_assert (s == retired_fd);
unknown's avatar
unknown committed
87 88
}

89 90
void zmq::tcp_connecter_t::process_plug ()
{
91 92
    if (delayed_start)
        add_reconnect_timer ();
93 94 95 96
    else
        start_connecting ();
}

97 98
void zmq::tcp_connecter_t::process_term (int linger_)
{
99 100 101 102 103
    if (connect_timer_started) {
        cancel_timer (connect_timer_id);
        connect_timer_started = false;
    }

104
    if (reconnect_timer_started) {
105
        cancel_timer (reconnect_timer_id);
106
        reconnect_timer_started = false;
107 108 109 110 111 112 113 114 115 116 117 118 119
    }

    if (handle_valid) {
        rm_fd (handle);
        handle_valid = false;
    }

    if (s != retired_fd)
        close ();

    own_t::process_term (linger_);
}

120 121
void zmq::tcp_connecter_t::in_event ()
{
122
    //  We are not polling for incoming data, so we are actually called
123 124 125 126 127 128 129
    //  because of error here. However, we can get error on out event as well
    //  on some platforms, so we'll simply handle both events in the same way.
    out_event ();
}

void zmq::tcp_connecter_t::out_event ()
{
130 131 132 133 134
    if (connect_timer_started) {
        cancel_timer (connect_timer_id);
        connect_timer_started = false;
    }

135 136 137
    rm_fd (handle);
    handle_valid = false;

Martin Hurton's avatar
Martin Hurton committed
138
    const fd_t fd = connect ();
139

140 141 142
    //  Handle the error condition by attempt to reconnect.
    if (fd == retired_fd) {
        close ();
Martin Hurton's avatar
Martin Hurton committed
143
        add_reconnect_timer ();
144 145 146
        return;
    }

147 148 149 150 151 152 153 154 155
    int rc = tune_tcp_socket (fd);
    rc = rc | tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt,
        options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
    rc = rc | tune_tcp_maxrt (fd, options.tcp_maxrt);
    if (rc != 0) {
        close ();
        add_reconnect_timer ();
        return;
    }
156

157
    //  Create the engine object for this connection.
158
    stream_engine_t *engine = new (std::nothrow)
159
        stream_engine_t (fd, options, endpoint);
160 161 162 163 164 165 166
    alloc_assert (engine);

    //  Attach the engine to the corresponding session object.
    send_attach (session, engine);

    //  Shut the connecter down.
    terminate ();
167

168
    socket->event_connected (endpoint, (int) fd);
169 170 171 172
}

void zmq::tcp_connecter_t::timer_event (int id_)
{
173 174 175 176 177 178 179 180 181 182 183 184 185 186
    zmq_assert (id_ == reconnect_timer_id || id_ == connect_timer_id);
    if (id_ == connect_timer_id) {
        connect_timer_started = false;

        rm_fd (handle);
        handle_valid = false;

        close ();
        add_reconnect_timer ();
    }
    else if (id_ == reconnect_timer_id) {
        reconnect_timer_started = false;
        start_connecting ();
    }
187 188 189 190 191
}

void zmq::tcp_connecter_t::start_connecting ()
{
    //  Open the connecting socket.
Martin Hurton's avatar
Martin Hurton committed
192
    const int rc = open ();
193 194 195 196 197 198 199 200

    //  Connect may succeed in synchronous manner.
    if (rc == 0) {
        handle = add_fd (s);
        handle_valid = true;
        out_event ();
    }

201
    //  Connection establishment may be delayed. Poll for its completion.
202 203
    else
    if (rc == -1 && errno == EINPROGRESS) {
204 205 206
        handle = add_fd (s);
        handle_valid = true;
        set_pollout (handle);
207
        socket->event_connect_delayed (endpoint, zmq_errno());
208 209 210

        //  add userspace connect timeout
        add_connect_timer ();
211 212 213
    }

    //  Handle any other error condition by eventual reconnect.
214
    else {
215 216
        if (s != retired_fd)
            close ();
217 218
        add_reconnect_timer ();
    }
219 220
}

221 222 223 224 225 226 227 228
void zmq::tcp_connecter_t::add_connect_timer ()
{
    if (options.connect_timeout > 0) {
        add_timer (options.connect_timeout, connect_timer_id);
        connect_timer_started = true;
    }
}

Martin Hurton's avatar
Martin Hurton committed
229
void zmq::tcp_connecter_t::add_reconnect_timer ()
230
{
Martin Hurton's avatar
Martin Hurton committed
231 232 233
    const int interval = get_new_reconnect_ivl ();
    add_timer (interval, reconnect_timer_id);
    socket->event_connect_retried (endpoint, interval);
234
    reconnect_timer_started = true;
235 236 237 238 239
}

int zmq::tcp_connecter_t::get_new_reconnect_ivl ()
{
    //  The new interval is the current interval + random value.
Martin Hurton's avatar
Martin Hurton committed
240 241
    const int interval = current_reconnect_ivl +
        generate_random () % options.reconnect_ivl;
242 243 244

    //  Only change the current reconnect interval  if the maximum reconnect
    //  interval was set and if it's larger than the reconnect interval.
245
    if (options.reconnect_ivl_max > 0 &&
Martin Hurton's avatar
Martin Hurton committed
246
        options.reconnect_ivl_max > options.reconnect_ivl)
247
        //  Calculate the next interval
Martin Hurton's avatar
Martin Hurton committed
248 249 250
        current_reconnect_ivl =
            std::min (current_reconnect_ivl * 2, options.reconnect_ivl_max);
    return interval;
251 252
}

unknown's avatar
unknown committed
253 254 255 256
int zmq::tcp_connecter_t::open ()
{
    zmq_assert (s == retired_fd);

257 258
    //  Resolve the address
    if (addr->resolved.tcp_addr != NULL) {
259
        LIBZMQ_DELETE(addr->resolved.tcp_addr);
260 261 262 263 264 265 266
    }

    addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
    alloc_assert (addr->resolved.tcp_addr);
    int rc = addr->resolved.tcp_addr->resolve (
        addr->address.c_str (), false, options.ipv6);
    if (rc != 0) {
267
        LIBZMQ_DELETE(addr->resolved.tcp_addr);
268 269 270
        return -1;
    }
    zmq_assert (addr->resolved.tcp_addr != NULL);
Martin Hurton's avatar
Martin Hurton committed
271
    tcp_address_t * const tcp_addr = addr->resolved.tcp_addr;
272

unknown's avatar
unknown committed
273
    //  Create the socket.
Martin Hurton's avatar
Martin Hurton committed
274
    s = open_socket (tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP);
275 276

    //  IPv6 address family not supported, try automatic downgrade to IPv4.
277
    if (s == zmq::retired_fd && tcp_addr->family () == AF_INET6
278 279 280 281 282 283 284 285 286 287 288
    && errno == EAFNOSUPPORT
    && options.ipv6) {
        rc = addr->resolved.tcp_addr->resolve (
            addr->address.c_str (), false, false);
        if (rc != 0) {
            LIBZMQ_DELETE(addr->resolved.tcp_addr);
            return -1;
        }
        s = open_socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
    }

289
#ifdef ZMQ_HAVE_WINDOWS
290
    if (s == INVALID_SOCKET) {
291
        errno = wsa_error_to_errno (WSAGetLastError ());
292 293
        return -1;
    }
294 295 296 297
#else
    if (s == -1)
        return -1;
#endif
unknown's avatar
unknown committed
298

299 300
    //  On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
    //  Switch it on in such cases.
Martin Hurton's avatar
Martin Hurton committed
301
    if (tcp_addr->family () == AF_INET6)
302
        enable_ipv4_mapping (s);
303

304 305 306 307
    // Set the IP Type-Of-Service priority for this socket
    if (options.tos != 0)
        set_ip_type_of_service (s, options.tos);

308 309 310 311
    // Bind the socket to a device if applicable
    if (!options.bound_device.empty ())
        bind_to_device (s, options.bound_device);

312 313
    // Set the socket to non-blocking mode so that we get async connect().
    unblock_socket (s);
unknown's avatar
unknown committed
314

315
    //  Set the socket buffer limits for the underlying socket.
316
    if (options.sndbuf >= 0)
317
        set_tcp_send_buffer (s, options.sndbuf);
318
    if (options.rcvbuf >= 0)
319 320
        set_tcp_receive_buffer (s, options.rcvbuf);

321 322 323 324
    // Set the IP Type-Of-Service for the underlying socket
    if (options.tos != 0)
        set_ip_type_of_service (s, options.tos);

325
    // Set a source address for conversations
Martin Hurton's avatar
Martin Hurton committed
326
    if (tcp_addr->has_src_addr ()) {
327 328 329 330 331 332 333 334 335 336 337 338
        //  Allow reusing of the address, to connect to different servers
        //  using the same source port on the client.
        int flag = 1;
#ifdef ZMQ_HAVE_WINDOWS
        rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, (const char*) &flag,
                sizeof (int));
        wsa_assert (rc != SOCKET_ERROR);
#else
        rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
        errno_assert (rc == 0);
#endif

Martin Hurton's avatar
Martin Hurton committed
339 340
        rc = ::bind (s, tcp_addr->src_addr (), tcp_addr->src_addrlen ());
        if (rc == -1)
341 342 343
            return -1;
    }

unknown's avatar
unknown committed
344
    //  Connect to the remote peer.
Martin Hurton's avatar
Martin Hurton committed
345
    rc = ::connect (s, tcp_addr->addr (), tcp_addr->addrlen ());
unknown's avatar
unknown committed
346

347
    //  Connect was successful immediately.
unknown's avatar
unknown committed
348 349 350
    if (rc == 0)
        return 0;

351
    //  Translate error codes indicating asynchronous connect has been
352
    //  launched to a uniform EINPROGRESS.
353
#ifdef ZMQ_HAVE_WINDOWS
354 355
    const int last_error = WSAGetLastError();
    if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
356
        errno = EINPROGRESS;
357
    else
358
        errno = wsa_error_to_errno (last_error);
359
#else
360
    if (errno == EINTR)
361
        errno = EINPROGRESS;
362
#endif
unknown's avatar
unknown committed
363 364 365
    return -1;
}

366 367
zmq::fd_t zmq::tcp_connecter_t::connect ()
{
Pieter Hintjens's avatar
Pieter Hintjens committed
368
    //  Async connect has finished. Check whether an error occurred
369
    int err = 0;
Martin Hurton's avatar
Martin Hurton committed
370 371
#ifdef ZMQ_HAVE_HPUX
    int len = sizeof err;
372
#else
Martin Hurton's avatar
Martin Hurton committed
373
    socklen_t len = sizeof err;
374 375
#endif

Martin Hurton's avatar
Martin Hurton committed
376
    const int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len);
377 378 379 380

    //  Assert if the error was caused by 0MQ bug.
    //  Networking problems are OK. No need to assert.
#ifdef ZMQ_HAVE_WINDOWS
381 382
    zmq_assert (rc == 0);
    if (err != 0) {
383 384 385 386
        if (err == WSAEBADF ||
            err == WSAENOPROTOOPT ||
            err == WSAENOTSOCK ||
            err == WSAENOBUFS)
387 388 389
        {
            wsa_assert_no (err);
        }
Martin Hurton's avatar
Martin Hurton committed
390
        return retired_fd;
391
    }
Brett Cameron's avatar
Brett Cameron committed
392
#else
Martin Sustrik's avatar
Martin Sustrik committed
393 394 395 396 397 398
    //  Following code should handle both Berkeley-derived socket
    //  implementations and Solaris.
    if (rc == -1)
        err = errno;
    if (err != 0) {
        errno = err;
Pieter Hintjens's avatar
Pieter Hintjens committed
399
        errno_assert (
400 401 402 403
            errno != EBADF &&
            errno != ENOPROTOOPT &&
            errno != ENOTSOCK &&
            errno != ENOBUFS);
Martin Sustrik's avatar
Martin Sustrik committed
404 405
        return retired_fd;
    }
406
#endif
Martin Sustrik's avatar
Martin Sustrik committed
407

408
    //  Return the newly connected socket.
Martin Hurton's avatar
Martin Hurton committed
409
    const fd_t result = s;
Martin Sustrik's avatar
Martin Sustrik committed
410 411 412 413
    s = retired_fd;
    return result;
}

414 415 416 417
void zmq::tcp_connecter_t::close ()
{
    zmq_assert (s != retired_fd);
#ifdef ZMQ_HAVE_WINDOWS
Martin Hurton's avatar
Martin Hurton committed
418
    const int rc = closesocket (s);
419 420
    wsa_assert (rc != SOCKET_ERROR);
#else
Martin Hurton's avatar
Martin Hurton committed
421
    const int rc = ::close (s);
422
    errno_assert (rc == 0);
Martin Sustrik's avatar
Martin Sustrik committed
423
#endif
424
    socket->event_closed (endpoint, (int) s);
425 426
    s = retired_fd;
}