tcp_connecter.cpp 13.3 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include <new>
32 33
#include <string>

34
#include "macros.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
35
#include "tcp_connecter.hpp"
36
#include "stream_engine.hpp"
37
#include "io_thread.hpp"
38
#include "random.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
39
#include "err.hpp"
40
#include "ip.hpp"
41
#include "tcp.hpp"
42 43
#include "address.hpp"
#include "tcp_address.hpp"
44
#include "session_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
45

46
#if !defined ZMQ_HAVE_WINDOWS
47 48 49 50 51 52 53 54
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <netdb.h>
#include <fcntl.h>
55 56 57
#ifdef ZMQ_HAVE_VXWORKS
#include <sockLib.h>
#endif
58 59 60 61
#ifdef ZMQ_HAVE_OPENVMS
#include <ioctl.h>
#endif
#endif
Martin Sustrik's avatar
Martin Sustrik committed
62

63 64 65 66
#ifdef __APPLE__
#include <TargetConditionals.h>
#endif

67
zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
68 69 70 71
                                       class session_base_t *session_,
                                       const options_t &options_,
                                       address_t *addr_,
                                       bool delayed_start_) :
72 73
    own_t (io_thread_, options_),
    io_object_t (io_thread_),
74 75 76 77 78 79 80 81 82
    _addr (addr_),
    _s (retired_fd),
    _handle (static_cast<handle_t> (NULL)),
    _delayed_start (delayed_start_),
    _connect_timer_started (false),
    _reconnect_timer_started (false),
    _session (session_),
    _current_reconnect_ivl (options.reconnect_ivl),
    _socket (_session->get_socket ())
unknown's avatar
unknown committed
83
{
84
    zmq_assert (_addr);
85
    zmq_assert (_addr->protocol == protocol_name::tcp);
86
    _addr->to_string (_endpoint);
87 88 89
    // TODO the return value is unused! what if it fails? if this is impossible
    // or does not matter, change such that endpoint in initialized using an
    // initializer, and make endpoint const
unknown's avatar
unknown committed
90 91 92 93
}

zmq::tcp_connecter_t::~tcp_connecter_t ()
{
94 95 96 97
    zmq_assert (!_connect_timer_started);
    zmq_assert (!_reconnect_timer_started);
    zmq_assert (!_handle);
    zmq_assert (_s == retired_fd);
unknown's avatar
unknown committed
98 99
}

100 101
void zmq::tcp_connecter_t::process_plug ()
{
102
    if (_delayed_start)
103
        add_reconnect_timer ();
104 105 106 107
    else
        start_connecting ();
}

108 109
void zmq::tcp_connecter_t::process_term (int linger_)
{
110
    if (_connect_timer_started) {
111
        cancel_timer (connect_timer_id);
112
        _connect_timer_started = false;
113 114
    }

115
    if (_reconnect_timer_started) {
116
        cancel_timer (reconnect_timer_id);
117
        _reconnect_timer_started = false;
118 119
    }

120
    if (_handle) {
121
        rm_handle ();
122 123
    }

124
    if (_s != retired_fd)
125 126 127 128 129
        close ();

    own_t::process_term (linger_);
}

130 131
void zmq::tcp_connecter_t::in_event ()
{
132
    //  We are not polling for incoming data, so we are actually called
133 134 135 136 137 138 139
    //  because of error here. However, we can get error on out event as well
    //  on some platforms, so we'll simply handle both events in the same way.
    out_event ();
}

void zmq::tcp_connecter_t::out_event ()
{
140
    if (_connect_timer_started) {
141
        cancel_timer (connect_timer_id);
142
        _connect_timer_started = false;
143 144
    }

145
    rm_handle ();
146

Martin Hurton's avatar
Martin Hurton committed
147
    const fd_t fd = connect ();
148

149
    //  Handle the error condition by attempt to reconnect.
150
    if (fd == retired_fd || !tune_socket (fd)) {
151 152 153 154
        close ();
        add_reconnect_timer ();
        return;
    }
155

156
    //  Create the engine object for this connection.
157
    stream_engine_t *engine =
158
      new (std::nothrow) stream_engine_t (fd, options, _endpoint);
159 160 161
    alloc_assert (engine);

    //  Attach the engine to the corresponding session object.
162
    send_attach (_session, engine);
163 164 165

    //  Shut the connecter down.
    terminate ();
166

167
    _socket->event_connected (_endpoint, fd);
168 169
}

170 171
void zmq::tcp_connecter_t::rm_handle ()
{
172 173
    rm_fd (_handle);
    _handle = static_cast<handle_t> (NULL);
174 175
}

176 177
void zmq::tcp_connecter_t::timer_event (int id_)
{
178 179
    zmq_assert (id_ == reconnect_timer_id || id_ == connect_timer_id);
    if (id_ == connect_timer_id) {
180
        _connect_timer_started = false;
181
        rm_handle ();
182 183
        close ();
        add_reconnect_timer ();
184
    } else if (id_ == reconnect_timer_id) {
185
        _reconnect_timer_started = false;
186 187
        start_connecting ();
    }
188 189 190 191 192
}

void zmq::tcp_connecter_t::start_connecting ()
{
    //  Open the connecting socket.
Martin Hurton's avatar
Martin Hurton committed
193
    const int rc = open ();
194 195 196

    //  Connect may succeed in synchronous manner.
    if (rc == 0) {
197
        _handle = add_fd (_s);
198 199 200
        out_event ();
    }

201
    //  Connection establishment may be delayed. Poll for its completion.
202
    else if (rc == -1 && errno == EINPROGRESS) {
203 204 205
        _handle = add_fd (_s);
        set_pollout (_handle);
        _socket->event_connect_delayed (_endpoint, zmq_errno ());
206 207 208

        //  add userspace connect timeout
        add_connect_timer ();
209 210 211
    }

    //  Handle any other error condition by eventual reconnect.
212
    else {
213
        if (_s != retired_fd)
214
            close ();
215 216
        add_reconnect_timer ();
    }
217 218
}

219 220 221 222
void zmq::tcp_connecter_t::add_connect_timer ()
{
    if (options.connect_timeout > 0) {
        add_timer (options.connect_timeout, connect_timer_id);
223
        _connect_timer_started = true;
224 225 226
    }
}

Martin Hurton's avatar
Martin Hurton committed
227
void zmq::tcp_connecter_t::add_reconnect_timer ()
228
{
229 230 231 232 233 234
    if (options.reconnect_ivl != -1) {
        const int interval = get_new_reconnect_ivl ();
        add_timer (interval, reconnect_timer_id);
        _socket->event_connect_retried (_endpoint, interval);
        _reconnect_timer_started = true;
    }
235 236 237 238 239
}

int zmq::tcp_connecter_t::get_new_reconnect_ivl ()
{
    //  The new interval is the current interval + random value.
240
    const int interval =
241
      _current_reconnect_ivl + generate_random () % options.reconnect_ivl;
242 243 244

    //  Only change the current reconnect interval  if the maximum reconnect
    //  interval was set and if it's larger than the reconnect interval.
245 246
    if (options.reconnect_ivl_max > 0
        && options.reconnect_ivl_max > options.reconnect_ivl)
247
        //  Calculate the next interval
248 249
        _current_reconnect_ivl =
          std::min (_current_reconnect_ivl * 2, options.reconnect_ivl_max);
Martin Hurton's avatar
Martin Hurton committed
250
    return interval;
251 252
}

unknown's avatar
unknown committed
253 254
int zmq::tcp_connecter_t::open ()
{
255
    zmq_assert (_s == retired_fd);
unknown's avatar
unknown committed
256

257
    //  Resolve the address
258 259
    if (_addr->resolved.tcp_addr != NULL) {
        LIBZMQ_DELETE (_addr->resolved.tcp_addr);
260 261
    }

262 263 264 265
    _addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
    alloc_assert (_addr->resolved.tcp_addr);
    int rc = _addr->resolved.tcp_addr->resolve (_addr->address.c_str (), false,
                                                options.ipv6);
266
    if (rc != 0) {
267
        LIBZMQ_DELETE (_addr->resolved.tcp_addr);
268 269
        return -1;
    }
270 271
    zmq_assert (_addr->resolved.tcp_addr != NULL);
    const tcp_address_t *const tcp_addr = _addr->resolved.tcp_addr;
272

unknown's avatar
unknown committed
273
    //  Create the socket.
274
    _s = open_socket (tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP);
275 276

    //  IPv6 address family not supported, try automatic downgrade to IPv4.
277
    if (_s == zmq::retired_fd && tcp_addr->family () == AF_INET6
278
        && errno == EAFNOSUPPORT && options.ipv6) {
279 280
        rc = _addr->resolved.tcp_addr->resolve (_addr->address.c_str (), false,
                                                false);
281
        if (rc != 0) {
282
            LIBZMQ_DELETE (_addr->resolved.tcp_addr);
283 284
            return -1;
        }
285
        _s = open_socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
286 287
    }

288
    if (_s == retired_fd) {
289 290
        return -1;
    }
unknown's avatar
unknown committed
291

292 293
    //  On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
    //  Switch it on in such cases.
Martin Hurton's avatar
Martin Hurton committed
294
    if (tcp_addr->family () == AF_INET6)
295
        enable_ipv4_mapping (_s);
296

297 298
    // Set the IP Type-Of-Service priority for this socket
    if (options.tos != 0)
299
        set_ip_type_of_service (_s, options.tos);
300

301 302
    // Bind the socket to a device if applicable
    if (!options.bound_device.empty ())
303
        bind_to_device (_s, options.bound_device);
304

305
    // Set the socket to non-blocking mode so that we get async connect().
306
    unblock_socket (_s);
unknown's avatar
unknown committed
307

308 309
    // Set the socket to loopback fastpath if configured.
    if (options.loopback_fastpath)
310
        tcp_tune_loopback_fast_path (_s);
311

312
    //  Set the socket buffer limits for the underlying socket.
313
    if (options.sndbuf >= 0)
314
        set_tcp_send_buffer (_s, options.sndbuf);
315
    if (options.rcvbuf >= 0)
316
        set_tcp_receive_buffer (_s, options.rcvbuf);
317

318 319
    // Set the IP Type-Of-Service for the underlying socket
    if (options.tos != 0)
320
        set_ip_type_of_service (_s, options.tos);
321

322
    // Set a source address for conversations
Martin Hurton's avatar
Martin Hurton committed
323
    if (tcp_addr->has_src_addr ()) {
324 325 326 327
        //  Allow reusing of the address, to connect to different servers
        //  using the same source port on the client.
        int flag = 1;
#ifdef ZMQ_HAVE_WINDOWS
328
        rc = setsockopt (_s, SOL_SOCKET, SO_REUSEADDR,
329
                         reinterpret_cast<const char *> (&flag), sizeof (int));
330
        wsa_assert (rc != SOCKET_ERROR);
331
#elif defined ZMQ_HAVE_VXWORKS
332
        rc = setsockopt (_s, SOL_SOCKET, SO_REUSEADDR, (char *) &flag,
333 334
                         sizeof (int));
        errno_assert (rc == 0);
335
#else
336
        rc = setsockopt (_s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
337 338 339
        errno_assert (rc == 0);
#endif

340
#if defined ZMQ_HAVE_VXWORKS
341
        rc = ::bind (_s, (sockaddr *) tcp_addr->src_addr (),
342 343
                     tcp_addr->src_addrlen ());
#else
344
        rc = ::bind (_s, tcp_addr->src_addr (), tcp_addr->src_addrlen ());
345
#endif
Martin Hurton's avatar
Martin Hurton committed
346
        if (rc == -1)
347 348 349
            return -1;
    }

350
        //  Connect to the remote peer.
351
#if defined ZMQ_HAVE_VXWORKS
352
    rc = ::connect (_s, (sockaddr *) tcp_addr->addr (), tcp_addr->addrlen ());
353
#else
354
    rc = ::connect (_s, tcp_addr->addr (), tcp_addr->addrlen ());
355
#endif
356
    //  Connect was successful immediately.
357
    if (rc == 0) {
unknown's avatar
unknown committed
358
        return 0;
359
    }
unknown's avatar
unknown committed
360

361 362
        //  Translate error codes indicating asynchronous connect has been
        //  launched to a uniform EINPROGRESS.
363
#ifdef ZMQ_HAVE_WINDOWS
364
    const int last_error = WSAGetLastError ();
365
    if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
366
        errno = EINPROGRESS;
367
    else
368
        errno = wsa_error_to_errno (last_error);
369
#else
370
    if (errno == EINTR)
371
        errno = EINPROGRESS;
372
#endif
unknown's avatar
unknown committed
373 374 375
    return -1;
}

376 377
zmq::fd_t zmq::tcp_connecter_t::connect ()
{
Pieter Hintjens's avatar
Pieter Hintjens committed
378
    //  Async connect has finished. Check whether an error occurred
379
    int err = 0;
380
#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
Martin Hurton's avatar
Martin Hurton committed
381
    int len = sizeof err;
382
#else
Martin Hurton's avatar
Martin Hurton committed
383
    socklen_t len = sizeof err;
384 385
#endif

386
    const int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR,
387
                               reinterpret_cast<char *> (&err), &len);
388 389 390 391

    //  Assert if the error was caused by 0MQ bug.
    //  Networking problems are OK. No need to assert.
#ifdef ZMQ_HAVE_WINDOWS
392 393
    zmq_assert (rc == 0);
    if (err != 0) {
394 395
        if (err == WSAEBADF || err == WSAENOPROTOOPT || err == WSAENOTSOCK
            || err == WSAENOBUFS) {
396 397
            wsa_assert_no (err);
        }
Martin Hurton's avatar
Martin Hurton committed
398
        return retired_fd;
399
    }
Brett Cameron's avatar
Brett Cameron committed
400
#else
Martin Sustrik's avatar
Martin Sustrik committed
401 402 403 404 405 406
    //  Following code should handle both Berkeley-derived socket
    //  implementations and Solaris.
    if (rc == -1)
        err = errno;
    if (err != 0) {
        errno = err;
407
#if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE
408 409
        errno_assert (errno != EBADF && errno != ENOPROTOOPT
                      && errno != ENOTSOCK && errno != ENOBUFS);
410 411 412 413
#else
        errno_assert (errno != ENOPROTOOPT && errno != ENOTSOCK
                      && errno != ENOBUFS);
#endif
Martin Sustrik's avatar
Martin Sustrik committed
414 415
        return retired_fd;
    }
416
#endif
Martin Sustrik's avatar
Martin Sustrik committed
417

418
    //  Return the newly connected socket.
419 420
    const fd_t result = _s;
    _s = retired_fd;
Martin Sustrik's avatar
Martin Sustrik committed
421 422 423
    return result;
}

424
bool zmq::tcp_connecter_t::tune_socket (const fd_t fd_)
425
{
426
    const int rc = tune_tcp_socket (fd_)
427
                   | tune_tcp_keepalives (
428
                       fd_, options.tcp_keepalive, options.tcp_keepalive_cnt,
429
                       options.tcp_keepalive_idle, options.tcp_keepalive_intvl)
430
                   | tune_tcp_maxrt (fd_, options.tcp_maxrt);
431 432 433
    return rc == 0;
}

434 435
void zmq::tcp_connecter_t::close ()
{
436
    zmq_assert (_s != retired_fd);
437
#ifdef ZMQ_HAVE_WINDOWS
438
    const int rc = closesocket (_s);
439 440
    wsa_assert (rc != SOCKET_ERROR);
#else
441
    const int rc = ::close (_s);
442
    errno_assert (rc == 0);
Martin Sustrik's avatar
Martin Sustrik committed
443
#endif
444 445
    _socket->event_closed (_endpoint, _s);
    _s = retired_fd;
446
}