tcp_connecter.cpp 11.7 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include <new>
32 33
#include <string>

34
#include "macros.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
35
#include "tcp_connecter.hpp"
36
#include "stream_engine.hpp"
37
#include "io_thread.hpp"
38
#include "random.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
39
#include "err.hpp"
40
#include "ip.hpp"
41
#include "tcp.hpp"
42 43
#include "address.hpp"
#include "tcp_address.hpp"
44
#include "session_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
45

46
#if !defined ZMQ_HAVE_WINDOWS
47 48 49 50 51 52 53 54 55 56 57 58
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <netdb.h>
#include <fcntl.h>
#ifdef ZMQ_HAVE_OPENVMS
#include <ioctl.h>
#endif
#endif
Martin Sustrik's avatar
Martin Sustrik committed
59

60
zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
61
      class session_base_t *session_, const options_t &options_,
62
      address_t *addr_, bool delayed_start_) :
63 64
    own_t (io_thread_, options_),
    io_object_t (io_thread_),
65
    addr (addr_),
66
    s (retired_fd),
67
    handle((handle_t)NULL),
68
    handle_valid (false),
69
    delayed_start (delayed_start_),
70
    connect_timer_started (false),
71
    reconnect_timer_started (false),
72
    session (session_),
Martin Hurton's avatar
Martin Hurton committed
73
    current_reconnect_ivl (options.reconnect_ivl)
unknown's avatar
unknown committed
74
{
75 76
    zmq_assert (addr);
    zmq_assert (addr->protocol == "tcp");
77
    addr->to_string (endpoint);
Martin Hurton's avatar
Martin Hurton committed
78
    socket = session->get_socket ();
unknown's avatar
unknown committed
79 80 81 82
}

zmq::tcp_connecter_t::~tcp_connecter_t ()
{
83
    zmq_assert (!connect_timer_started);
84
    zmq_assert (!reconnect_timer_started);
85 86
    zmq_assert (!handle_valid);
    zmq_assert (s == retired_fd);
unknown's avatar
unknown committed
87 88
}

89 90
void zmq::tcp_connecter_t::process_plug ()
{
91 92
    if (delayed_start)
        add_reconnect_timer ();
93 94 95 96
    else
        start_connecting ();
}

97 98
void zmq::tcp_connecter_t::process_term (int linger_)
{
99 100 101 102 103
    if (connect_timer_started) {
        cancel_timer (connect_timer_id);
        connect_timer_started = false;
    }

104
    if (reconnect_timer_started) {
105
        cancel_timer (reconnect_timer_id);
106
        reconnect_timer_started = false;
107 108 109 110 111 112 113 114 115 116 117 118 119
    }

    if (handle_valid) {
        rm_fd (handle);
        handle_valid = false;
    }

    if (s != retired_fd)
        close ();

    own_t::process_term (linger_);
}

120 121
void zmq::tcp_connecter_t::in_event ()
{
122
    //  We are not polling for incoming data, so we are actually called
123 124 125 126 127 128 129
    //  because of error here. However, we can get error on out event as well
    //  on some platforms, so we'll simply handle both events in the same way.
    out_event ();
}

void zmq::tcp_connecter_t::out_event ()
{
130 131 132 133 134
    if (connect_timer_started) {
        cancel_timer (connect_timer_id);
        connect_timer_started = false;
    }

135 136 137
    rm_fd (handle);
    handle_valid = false;

Martin Hurton's avatar
Martin Hurton committed
138
    const fd_t fd = connect ();
139 140 141
    //  Handle the error condition by attempt to reconnect.
    if (fd == retired_fd) {
        close ();
Martin Hurton's avatar
Martin Hurton committed
142
        add_reconnect_timer ();
143 144 145
        return;
    }

146
    tune_tcp_socket (fd);
147 148
    tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt,
            options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
149
    tune_tcp_maxrt (fd, options.tcp_maxrt);
150

151
    //  Create the engine object for this connection.
152
    stream_engine_t *engine = new (std::nothrow)
153
        stream_engine_t (fd, options, endpoint);
154 155 156 157 158 159 160
    alloc_assert (engine);

    //  Attach the engine to the corresponding session object.
    send_attach (session, engine);

    //  Shut the connecter down.
    terminate ();
161

162
    socket->event_connected (endpoint, (int) fd);
163 164 165 166
}

void zmq::tcp_connecter_t::timer_event (int id_)
{
167 168 169 170 171 172 173 174 175 176 177 178 179 180
    zmq_assert (id_ == reconnect_timer_id || id_ == connect_timer_id);
    if (id_ == connect_timer_id) {
        connect_timer_started = false;

        rm_fd (handle);
        handle_valid = false;

        close ();
        add_reconnect_timer ();
    }
    else if (id_ == reconnect_timer_id) {
        reconnect_timer_started = false;
        start_connecting ();
    }
181 182 183 184 185
}

void zmq::tcp_connecter_t::start_connecting ()
{
    //  Open the connecting socket.
Martin Hurton's avatar
Martin Hurton committed
186
    const int rc = open ();
187 188 189 190 191 192 193 194

    //  Connect may succeed in synchronous manner.
    if (rc == 0) {
        handle = add_fd (s);
        handle_valid = true;
        out_event ();
    }

195
    //  Connection establishment may be delayed. Poll for its completion.
196 197
    else
    if (rc == -1 && errno == EINPROGRESS) {
198 199 200
        handle = add_fd (s);
        handle_valid = true;
        set_pollout (handle);
201
        socket->event_connect_delayed (endpoint, zmq_errno());
202 203 204

        //  add userspace connect timeout
        add_connect_timer ();
205 206 207
    }

    //  Handle any other error condition by eventual reconnect.
208
    else {
209 210
        if (s != retired_fd)
            close ();
211 212
        add_reconnect_timer ();
    }
213 214
}

215 216 217 218 219 220 221 222
void zmq::tcp_connecter_t::add_connect_timer ()
{
    if (options.connect_timeout > 0) {
        add_timer (options.connect_timeout, connect_timer_id);
        connect_timer_started = true;
    }
}

Martin Hurton's avatar
Martin Hurton committed
223
void zmq::tcp_connecter_t::add_reconnect_timer ()
224
{
Martin Hurton's avatar
Martin Hurton committed
225 226 227
    const int interval = get_new_reconnect_ivl ();
    add_timer (interval, reconnect_timer_id);
    socket->event_connect_retried (endpoint, interval);
228
    reconnect_timer_started = true;
229 230 231 232 233
}

int zmq::tcp_connecter_t::get_new_reconnect_ivl ()
{
    //  The new interval is the current interval + random value.
Martin Hurton's avatar
Martin Hurton committed
234 235
    const int interval = current_reconnect_ivl +
        generate_random () % options.reconnect_ivl;
236 237 238

    //  Only change the current reconnect interval  if the maximum reconnect
    //  interval was set and if it's larger than the reconnect interval.
239
    if (options.reconnect_ivl_max > 0 &&
Martin Hurton's avatar
Martin Hurton committed
240
        options.reconnect_ivl_max > options.reconnect_ivl)
241
        //  Calculate the next interval
Martin Hurton's avatar
Martin Hurton committed
242 243 244
        current_reconnect_ivl =
            std::min (current_reconnect_ivl * 2, options.reconnect_ivl_max);
    return interval;
245 246
}

unknown's avatar
unknown committed
247 248 249 250
int zmq::tcp_connecter_t::open ()
{
    zmq_assert (s == retired_fd);

251 252
    //  Resolve the address
    if (addr->resolved.tcp_addr != NULL) {
253
        LIBZMQ_DELETE(addr->resolved.tcp_addr);
254 255 256 257 258 259 260
    }

    addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
    alloc_assert (addr->resolved.tcp_addr);
    int rc = addr->resolved.tcp_addr->resolve (
        addr->address.c_str (), false, options.ipv6);
    if (rc != 0) {
261
        LIBZMQ_DELETE(addr->resolved.tcp_addr);
262 263 264
        return -1;
    }
    zmq_assert (addr->resolved.tcp_addr != NULL);
Martin Hurton's avatar
Martin Hurton committed
265
    tcp_address_t * const tcp_addr = addr->resolved.tcp_addr;
266

unknown's avatar
unknown committed
267
    //  Create the socket.
Martin Hurton's avatar
Martin Hurton committed
268
    s = open_socket (tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP);
269 270

    //  IPv6 address family not supported, try automatic downgrade to IPv4.
271
    if (s == zmq::retired_fd && tcp_addr->family () == AF_INET6
272 273 274 275 276 277 278 279 280 281 282
    && errno == EAFNOSUPPORT
    && options.ipv6) {
        rc = addr->resolved.tcp_addr->resolve (
            addr->address.c_str (), false, false);
        if (rc != 0) {
            LIBZMQ_DELETE(addr->resolved.tcp_addr);
            return -1;
        }
        s = open_socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
    }

283
#ifdef ZMQ_HAVE_WINDOWS
284
    if (s == INVALID_SOCKET) {
285
        errno = wsa_error_to_errno (WSAGetLastError ());
286 287
        return -1;
    }
288 289 290 291
#else
    if (s == -1)
        return -1;
#endif
unknown's avatar
unknown committed
292

293 294
    //  On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
    //  Switch it on in such cases.
Martin Hurton's avatar
Martin Hurton committed
295
    if (tcp_addr->family () == AF_INET6)
296
        enable_ipv4_mapping (s);
297

298 299 300 301
    // Set the IP Type-Of-Service priority for this socket
    if (options.tos != 0)
        set_ip_type_of_service (s, options.tos);

302 303
    // Set the socket to non-blocking mode so that we get async connect().
    unblock_socket (s);
unknown's avatar
unknown committed
304

305
    //  Set the socket buffer limits for the underlying socket.
306
    if (options.sndbuf >= 0)
307
        set_tcp_send_buffer (s, options.sndbuf);
308
    if (options.rcvbuf >= 0)
309 310
        set_tcp_receive_buffer (s, options.rcvbuf);

311 312 313 314
    // Set the IP Type-Of-Service for the underlying socket
    if (options.tos != 0)
        set_ip_type_of_service (s, options.tos);

315
    // Set a source address for conversations
Martin Hurton's avatar
Martin Hurton committed
316
    if (tcp_addr->has_src_addr ()) {
317 318 319 320 321 322 323 324 325 326 327 328
        //  Allow reusing of the address, to connect to different servers
        //  using the same source port on the client.
        int flag = 1;
#ifdef ZMQ_HAVE_WINDOWS
        rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, (const char*) &flag,
                sizeof (int));
        wsa_assert (rc != SOCKET_ERROR);
#else
        rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
        errno_assert (rc == 0);
#endif

Martin Hurton's avatar
Martin Hurton committed
329 330
        rc = ::bind (s, tcp_addr->src_addr (), tcp_addr->src_addrlen ());
        if (rc == -1)
331 332 333
            return -1;
    }

unknown's avatar
unknown committed
334
    //  Connect to the remote peer.
Martin Hurton's avatar
Martin Hurton committed
335
    rc = ::connect (s, tcp_addr->addr (), tcp_addr->addrlen ());
unknown's avatar
unknown committed
336

337
    //  Connect was successful immediately.
unknown's avatar
unknown committed
338 339 340
    if (rc == 0)
        return 0;

341
    //  Translate error codes indicating asynchronous connect has been
342
    //  launched to a uniform EINPROGRESS.
343
#ifdef ZMQ_HAVE_WINDOWS
344 345
    const int last_error = WSAGetLastError();
    if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
346
        errno = EINPROGRESS;
347
    else
348
        errno = wsa_error_to_errno (last_error);
349
#else
350
    if (errno == EINTR)
351
        errno = EINPROGRESS;
352
#endif
unknown's avatar
unknown committed
353 354 355
    return -1;
}

356 357
zmq::fd_t zmq::tcp_connecter_t::connect ()
{
Pieter Hintjens's avatar
Pieter Hintjens committed
358
    //  Async connect has finished. Check whether an error occurred
359
    int err = 0;
Martin Hurton's avatar
Martin Hurton committed
360 361
#ifdef ZMQ_HAVE_HPUX
    int len = sizeof err;
362
#else
Martin Hurton's avatar
Martin Hurton committed
363
    socklen_t len = sizeof err;
364 365
#endif

Martin Hurton's avatar
Martin Hurton committed
366
    const int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len);
367 368 369 370

    //  Assert if the error was caused by 0MQ bug.
    //  Networking problems are OK. No need to assert.
#ifdef ZMQ_HAVE_WINDOWS
371 372
    zmq_assert (rc == 0);
    if (err != 0) {
373 374 375 376
        if (err == WSAEBADF ||
            err == WSAENOPROTOOPT ||
            err == WSAENOTSOCK ||
            err == WSAENOBUFS)
377 378 379
        {
            wsa_assert_no (err);
        }
Martin Hurton's avatar
Martin Hurton committed
380
        return retired_fd;
381
    }
Brett Cameron's avatar
Brett Cameron committed
382
#else
Martin Sustrik's avatar
Martin Sustrik committed
383 384 385 386 387 388
    //  Following code should handle both Berkeley-derived socket
    //  implementations and Solaris.
    if (rc == -1)
        err = errno;
    if (err != 0) {
        errno = err;
Pieter Hintjens's avatar
Pieter Hintjens committed
389
        errno_assert (
390 391 392 393
            errno != EBADF &&
            errno != ENOPROTOOPT &&
            errno != ENOTSOCK &&
            errno != ENOBUFS);
Martin Sustrik's avatar
Martin Sustrik committed
394 395
        return retired_fd;
    }
396
#endif
Martin Sustrik's avatar
Martin Sustrik committed
397

398
    //  Return the newly connected socket.
Martin Hurton's avatar
Martin Hurton committed
399
    const fd_t result = s;
Martin Sustrik's avatar
Martin Sustrik committed
400 401 402 403
    s = retired_fd;
    return result;
}

404 405 406 407
void zmq::tcp_connecter_t::close ()
{
    zmq_assert (s != retired_fd);
#ifdef ZMQ_HAVE_WINDOWS
Martin Hurton's avatar
Martin Hurton committed
408
    const int rc = closesocket (s);
409 410
    wsa_assert (rc != SOCKET_ERROR);
#else
Martin Hurton's avatar
Martin Hurton committed
411
    const int rc = ::close (s);
412
    errno_assert (rc == 0);
Martin Sustrik's avatar
Martin Sustrik committed
413
#endif
414
    socket->event_closed (endpoint, (int) s);
415 416
    s = retired_fd;
}