tcp_connecter.cpp 13 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include <new>
32 33
#include <string>

34
#include "macros.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
35
#include "tcp_connecter.hpp"
36
#include "stream_engine.hpp"
37
#include "io_thread.hpp"
38
#include "random.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
39
#include "err.hpp"
40
#include "ip.hpp"
41
#include "tcp.hpp"
42 43
#include "address.hpp"
#include "tcp_address.hpp"
44
#include "session_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
45

46
#if !defined ZMQ_HAVE_WINDOWS
47 48 49 50 51 52 53 54
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <netdb.h>
#include <fcntl.h>
55 56 57
#ifdef ZMQ_HAVE_VXWORKS
#include <sockLib.h>
#endif
58 59 60 61
#ifdef ZMQ_HAVE_OPENVMS
#include <ioctl.h>
#endif
#endif
Martin Sustrik's avatar
Martin Sustrik committed
62

63
zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
64 65 66 67
                                       class session_base_t *session_,
                                       const options_t &options_,
                                       address_t *addr_,
                                       bool delayed_start_) :
68 69
    own_t (io_thread_, options_),
    io_object_t (io_thread_),
70
    addr (addr_),
71
    s (retired_fd),
72
    handle (static_cast<handle_t> (NULL)),
73
    delayed_start (delayed_start_),
74
    connect_timer_started (false),
75
    reconnect_timer_started (false),
76
    session (session_),
77 78
    current_reconnect_ivl (options.reconnect_ivl),
    socket (session->get_socket ())
unknown's avatar
unknown committed
79
{
80 81
    zmq_assert (addr);
    zmq_assert (addr->protocol == "tcp");
82
    addr->to_string (endpoint);
83 84 85
    // TODO the return value is unused! what if it fails? if this is impossible
    // or does not matter, change such that endpoint in initialized using an
    // initializer, and make endpoint const
unknown's avatar
unknown committed
86 87 88 89
}

zmq::tcp_connecter_t::~tcp_connecter_t ()
{
90
    zmq_assert (!connect_timer_started);
91
    zmq_assert (!reconnect_timer_started);
92
    zmq_assert (!handle);
93
    zmq_assert (s == retired_fd);
unknown's avatar
unknown committed
94 95
}

96 97
void zmq::tcp_connecter_t::process_plug ()
{
98 99
    if (delayed_start)
        add_reconnect_timer ();
100 101 102 103
    else
        start_connecting ();
}

104 105
void zmq::tcp_connecter_t::process_term (int linger_)
{
106 107 108 109 110
    if (connect_timer_started) {
        cancel_timer (connect_timer_id);
        connect_timer_started = false;
    }

111
    if (reconnect_timer_started) {
112
        cancel_timer (reconnect_timer_id);
113
        reconnect_timer_started = false;
114 115
    }

116 117
    if (handle) {
        rm_handle ();
118 119 120 121 122 123 124 125
    }

    if (s != retired_fd)
        close ();

    own_t::process_term (linger_);
}

126 127
void zmq::tcp_connecter_t::in_event ()
{
128
    //  We are not polling for incoming data, so we are actually called
129 130 131 132 133 134 135
    //  because of error here. However, we can get error on out event as well
    //  on some platforms, so we'll simply handle both events in the same way.
    out_event ();
}

void zmq::tcp_connecter_t::out_event ()
{
136 137 138 139 140
    if (connect_timer_started) {
        cancel_timer (connect_timer_id);
        connect_timer_started = false;
    }

141
    rm_handle ();
142

Martin Hurton's avatar
Martin Hurton committed
143
    const fd_t fd = connect ();
144

145
    //  Handle the error condition by attempt to reconnect.
146
    if (fd == retired_fd || !tune_socket (fd)) {
147 148 149 150
        close ();
        add_reconnect_timer ();
        return;
    }
151

152
    //  Create the engine object for this connection.
153 154
    stream_engine_t *engine =
      new (std::nothrow) stream_engine_t (fd, options, endpoint);
155 156 157 158 159 160 161
    alloc_assert (engine);

    //  Attach the engine to the corresponding session object.
    send_attach (session, engine);

    //  Shut the connecter down.
    terminate ();
162

163
    socket->event_connected (endpoint, fd);
164 165
}

166 167 168
void zmq::tcp_connecter_t::rm_handle ()
{
    rm_fd (handle);
169
    handle = static_cast<handle_t> (NULL);
170 171
}

172 173
void zmq::tcp_connecter_t::timer_event (int id_)
{
174 175 176
    zmq_assert (id_ == reconnect_timer_id || id_ == connect_timer_id);
    if (id_ == connect_timer_id) {
        connect_timer_started = false;
177
        rm_handle ();
178 179
        close ();
        add_reconnect_timer ();
180
    } else if (id_ == reconnect_timer_id) {
181 182 183
        reconnect_timer_started = false;
        start_connecting ();
    }
184 185 186 187 188
}

void zmq::tcp_connecter_t::start_connecting ()
{
    //  Open the connecting socket.
Martin Hurton's avatar
Martin Hurton committed
189
    const int rc = open ();
190 191 192 193 194 195 196

    //  Connect may succeed in synchronous manner.
    if (rc == 0) {
        handle = add_fd (s);
        out_event ();
    }

197
    //  Connection establishment may be delayed. Poll for its completion.
198
    else if (rc == -1 && errno == EINPROGRESS) {
199 200
        handle = add_fd (s);
        set_pollout (handle);
201
        socket->event_connect_delayed (endpoint, zmq_errno ());
202 203 204

        //  add userspace connect timeout
        add_connect_timer ();
205 206 207
    }

    //  Handle any other error condition by eventual reconnect.
208
    else {
209 210
        if (s != retired_fd)
            close ();
211 212
        add_reconnect_timer ();
    }
213 214
}

215 216 217 218 219 220 221 222
void zmq::tcp_connecter_t::add_connect_timer ()
{
    if (options.connect_timeout > 0) {
        add_timer (options.connect_timeout, connect_timer_id);
        connect_timer_started = true;
    }
}

Martin Hurton's avatar
Martin Hurton committed
223
void zmq::tcp_connecter_t::add_reconnect_timer ()
224
{
Martin Hurton's avatar
Martin Hurton committed
225 226 227
    const int interval = get_new_reconnect_ivl ();
    add_timer (interval, reconnect_timer_id);
    socket->event_connect_retried (endpoint, interval);
228
    reconnect_timer_started = true;
229 230 231 232 233
}

int zmq::tcp_connecter_t::get_new_reconnect_ivl ()
{
    //  The new interval is the current interval + random value.
234 235
    const int interval =
      current_reconnect_ivl + generate_random () % options.reconnect_ivl;
236 237 238

    //  Only change the current reconnect interval  if the maximum reconnect
    //  interval was set and if it's larger than the reconnect interval.
239 240
    if (options.reconnect_ivl_max > 0
        && options.reconnect_ivl_max > options.reconnect_ivl)
241
        //  Calculate the next interval
Martin Hurton's avatar
Martin Hurton committed
242
        current_reconnect_ivl =
243
          std::min (current_reconnect_ivl * 2, options.reconnect_ivl_max);
Martin Hurton's avatar
Martin Hurton committed
244
    return interval;
245 246
}

unknown's avatar
unknown committed
247 248 249 250
int zmq::tcp_connecter_t::open ()
{
    zmq_assert (s == retired_fd);

251 252
    //  Resolve the address
    if (addr->resolved.tcp_addr != NULL) {
253
        LIBZMQ_DELETE (addr->resolved.tcp_addr);
254 255 256 257
    }

    addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
    alloc_assert (addr->resolved.tcp_addr);
258 259
    int rc = addr->resolved.tcp_addr->resolve (addr->address.c_str (), false,
                                               options.ipv6);
260
    if (rc != 0) {
261
        LIBZMQ_DELETE (addr->resolved.tcp_addr);
262 263 264
        return -1;
    }
    zmq_assert (addr->resolved.tcp_addr != NULL);
265
    tcp_address_t *const tcp_addr = addr->resolved.tcp_addr;
266

unknown's avatar
unknown committed
267
    //  Create the socket.
Martin Hurton's avatar
Martin Hurton committed
268
    s = open_socket (tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP);
269 270

    //  IPv6 address family not supported, try automatic downgrade to IPv4.
271
    if (s == zmq::retired_fd && tcp_addr->family () == AF_INET6
272 273 274
        && errno == EAFNOSUPPORT && options.ipv6) {
        rc = addr->resolved.tcp_addr->resolve (addr->address.c_str (), false,
                                               false);
275
        if (rc != 0) {
276
            LIBZMQ_DELETE (addr->resolved.tcp_addr);
277 278 279 280 281
            return -1;
        }
        s = open_socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
    }

282
#ifdef ZMQ_HAVE_WINDOWS
283
    if (s == INVALID_SOCKET) {
284
        errno = wsa_error_to_errno (WSAGetLastError ());
285 286
        return -1;
    }
287 288 289 290
#else
    if (s == -1)
        return -1;
#endif
unknown's avatar
unknown committed
291

292 293
    //  On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
    //  Switch it on in such cases.
Martin Hurton's avatar
Martin Hurton committed
294
    if (tcp_addr->family () == AF_INET6)
295
        enable_ipv4_mapping (s);
296

297 298 299 300
    // Set the IP Type-Of-Service priority for this socket
    if (options.tos != 0)
        set_ip_type_of_service (s, options.tos);

301 302 303 304
    // Bind the socket to a device if applicable
    if (!options.bound_device.empty ())
        bind_to_device (s, options.bound_device);

305 306
    // Set the socket to non-blocking mode so that we get async connect().
    unblock_socket (s);
unknown's avatar
unknown committed
307

308 309 310 311
    // Set the socket to loopback fastpath if configured.
    if (options.loopback_fastpath)
        tcp_tune_loopback_fast_path (s);

312
    //  Set the socket buffer limits for the underlying socket.
313
    if (options.sndbuf >= 0)
314
        set_tcp_send_buffer (s, options.sndbuf);
315
    if (options.rcvbuf >= 0)
316 317
        set_tcp_receive_buffer (s, options.rcvbuf);

318 319 320 321
    // Set the IP Type-Of-Service for the underlying socket
    if (options.tos != 0)
        set_ip_type_of_service (s, options.tos);

322
    // Set a source address for conversations
Martin Hurton's avatar
Martin Hurton committed
323
    if (tcp_addr->has_src_addr ()) {
324 325 326 327
        //  Allow reusing of the address, to connect to different servers
        //  using the same source port on the client.
        int flag = 1;
#ifdef ZMQ_HAVE_WINDOWS
328 329
        rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR,
                         reinterpret_cast<const char *> (&flag), sizeof (int));
330
        wsa_assert (rc != SOCKET_ERROR);
331 332 333 334
#elif defined ZMQ_HAVE_VXWORKS
        rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, (char *) &flag,
                         sizeof (int));
        errno_assert (rc == 0);
335 336 337 338 339
#else
        rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
        errno_assert (rc == 0);
#endif

340 341 342 343
#if defined ZMQ_HAVE_VXWORKS
        rc = ::bind (s, (sockaddr *) tcp_addr->src_addr (),
                     tcp_addr->src_addrlen ());
#else
Martin Hurton's avatar
Martin Hurton committed
344
        rc = ::bind (s, tcp_addr->src_addr (), tcp_addr->src_addrlen ());
345
#endif
Martin Hurton's avatar
Martin Hurton committed
346
        if (rc == -1)
347 348 349
            return -1;
    }

350
        //  Connect to the remote peer.
351 352 353
#if defined ZMQ_HAVE_VXWORKS
    rc = ::connect (s, (sockaddr *) tcp_addr->addr (), tcp_addr->addrlen ());
#else
Martin Hurton's avatar
Martin Hurton committed
354
    rc = ::connect (s, tcp_addr->addr (), tcp_addr->addrlen ());
355
#endif
356
    //  Connect was successful immediately.
357
    if (rc == 0) {
unknown's avatar
unknown committed
358
        return 0;
359
    }
unknown's avatar
unknown committed
360

361 362
        //  Translate error codes indicating asynchronous connect has been
        //  launched to a uniform EINPROGRESS.
363
#ifdef ZMQ_HAVE_WINDOWS
364
    const int last_error = WSAGetLastError ();
365
    if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
366
        errno = EINPROGRESS;
367
    else
368
        errno = wsa_error_to_errno (last_error);
369
#else
370
    if (errno == EINTR)
371
        errno = EINPROGRESS;
372
#endif
unknown's avatar
unknown committed
373 374 375
    return -1;
}

376 377
zmq::fd_t zmq::tcp_connecter_t::connect ()
{
Pieter Hintjens's avatar
Pieter Hintjens committed
378
    //  Async connect has finished. Check whether an error occurred
379
    int err = 0;
380
#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
Martin Hurton's avatar
Martin Hurton committed
381
    int len = sizeof err;
382
#else
Martin Hurton's avatar
Martin Hurton committed
383
    socklen_t len = sizeof err;
384 385
#endif

386 387
    const int rc = getsockopt (s, SOL_SOCKET, SO_ERROR,
                               reinterpret_cast<char *> (&err), &len);
388 389 390 391

    //  Assert if the error was caused by 0MQ bug.
    //  Networking problems are OK. No need to assert.
#ifdef ZMQ_HAVE_WINDOWS
392 393
    zmq_assert (rc == 0);
    if (err != 0) {
394 395
        if (err == WSAEBADF || err == WSAENOPROTOOPT || err == WSAENOTSOCK
            || err == WSAENOBUFS) {
396 397
            wsa_assert_no (err);
        }
Martin Hurton's avatar
Martin Hurton committed
398
        return retired_fd;
399
    }
Brett Cameron's avatar
Brett Cameron committed
400
#else
Martin Sustrik's avatar
Martin Sustrik committed
401 402 403 404 405 406
    //  Following code should handle both Berkeley-derived socket
    //  implementations and Solaris.
    if (rc == -1)
        err = errno;
    if (err != 0) {
        errno = err;
407 408
        errno_assert (errno != EBADF && errno != ENOPROTOOPT
                      && errno != ENOTSOCK && errno != ENOBUFS);
Martin Sustrik's avatar
Martin Sustrik committed
409 410
        return retired_fd;
    }
411
#endif
Martin Sustrik's avatar
Martin Sustrik committed
412

413
    //  Return the newly connected socket.
Martin Hurton's avatar
Martin Hurton committed
414
    const fd_t result = s;
Martin Sustrik's avatar
Martin Sustrik committed
415 416 417 418
    s = retired_fd;
    return result;
}

419 420 421 422 423 424 425 426 427 428
bool zmq::tcp_connecter_t::tune_socket (const fd_t fd)
{
    const int rc = tune_tcp_socket (fd)
                   | tune_tcp_keepalives (
                       fd, options.tcp_keepalive, options.tcp_keepalive_cnt,
                       options.tcp_keepalive_idle, options.tcp_keepalive_intvl)
                   | tune_tcp_maxrt (fd, options.tcp_maxrt);
    return rc == 0;
}

429 430 431 432
void zmq::tcp_connecter_t::close ()
{
    zmq_assert (s != retired_fd);
#ifdef ZMQ_HAVE_WINDOWS
Martin Hurton's avatar
Martin Hurton committed
433
    const int rc = closesocket (s);
434 435
    wsa_assert (rc != SOCKET_ERROR);
#else
Martin Hurton's avatar
Martin Hurton committed
436
    const int rc = ::close (s);
437
    errno_assert (rc == 0);
Martin Sustrik's avatar
Martin Sustrik committed
438
#endif
439
    socket->event_closed (endpoint, s);
440 441
    s = retired_fd;
}