socks_connecter.cpp 14.8 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31 32 33
#include <new>
#include <string>

34
#include "macros.hpp"
35 36 37 38 39 40 41 42 43 44 45
#include "socks_connecter.hpp"
#include "stream_engine.hpp"
#include "random.hpp"
#include "err.hpp"
#include "ip.hpp"
#include "tcp.hpp"
#include "address.hpp"
#include "tcp_address.hpp"
#include "session_base.hpp"
#include "socks.hpp"

46
#ifndef ZMQ_HAVE_WINDOWS
47 48 49
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
50 51 52
#if defined ZMQ_HAVE_VXWORKS
#include <sockLib.h>
#endif
53 54 55
#endif

zmq::socks_connecter_t::socks_connecter_t (class io_thread_t *io_thread_,
56 57 58 59 60
                                           class session_base_t *session_,
                                           const options_t &options_,
                                           address_t *addr_,
                                           address_t *proxy_addr_,
                                           bool delayed_start_) :
61 62
    own_t (io_thread_, options_),
    io_object_t (io_thread_),
63 64 65 66 67 68 69 70 71 72
    _addr (addr_),
    _proxy_addr (proxy_addr_),
    _status (unplugged),
    _s (retired_fd),
    _handle (static_cast<handle_t> (NULL)),
    _handle_valid (false),
    _delayed_start (delayed_start_),
    _timer_started (false),
    _session (session_),
    _current_reconnect_ivl (options.reconnect_ivl)
73
{
74
    zmq_assert (_addr);
75
    zmq_assert (_addr->protocol == protocol_name::tcp);
76 77
    _proxy_addr->to_string (_endpoint);
    _socket = _session->get_socket ();
78 79 80 81
}

zmq::socks_connecter_t::~socks_connecter_t ()
{
82 83
    zmq_assert (_s == retired_fd);
    LIBZMQ_DELETE (_proxy_addr);
84 85 86 87
}

void zmq::socks_connecter_t::process_plug ()
{
88
    if (_delayed_start)
89 90 91 92 93 94 95
        start_timer ();
    else
        initiate_connect ();
}

void zmq::socks_connecter_t::process_term (int linger_)
{
96
    switch (_status) {
97 98 99 100 101 102 103 104 105 106
        case unplugged:
            break;
        case waiting_for_reconnect_time:
            cancel_timer (reconnect_timer_id);
            break;
        case waiting_for_proxy_connection:
        case sending_greeting:
        case waiting_for_choice:
        case sending_request:
        case waiting_for_response:
107 108
            rm_fd (_handle);
            if (_s != retired_fd)
109 110 111 112 113 114 115 116 117
                close ();
            break;
    }

    own_t::process_term (linger_);
}

void zmq::socks_connecter_t::in_event ()
{
118
    zmq_assert (_status != unplugged && _status != waiting_for_reconnect_time);
119

120 121
    if (_status == waiting_for_choice) {
        int rc = _choice_decoder.input (_s);
122 123
        if (rc == 0 || rc == -1)
            error ();
124 125
        else if (_choice_decoder.message_ready ()) {
            const socks_choice_t choice = _choice_decoder.decode ();
126 127 128 129 130 131
            rc = process_server_response (choice);
            if (rc == -1)
                error ();
            else {
                std::string hostname = "";
                uint16_t port = 0;
132
                if (parse_address (_addr->address, hostname, port) == -1)
133 134
                    error ();
                else {
135
                    _request_encoder.encode (
136
                      socks_request_t (1, hostname, port));
137 138 139
                    reset_pollin (_handle);
                    set_pollout (_handle);
                    _status = sending_request;
140 141
                }
            }
142
        }
143 144
    } else if (_status == waiting_for_response) {
        int rc = _response_decoder.input (_s);
145 146
        if (rc == 0 || rc == -1)
            error ();
147 148
        else if (_response_decoder.message_ready ()) {
            const socks_response_t response = _response_decoder.decode ();
149
            rc = process_server_response (response);
150 151 152 153
            if (rc == -1)
                error ();
            else {
                //  Create the engine object for this connection.
154
                stream_engine_t *engine =
155
                  new (std::nothrow) stream_engine_t (_s, options, _endpoint);
156 157 158
                alloc_assert (engine);

                //  Attach the engine to the corresponding session object.
159
                send_attach (_session, engine);
160

161
                _socket->event_connected (_endpoint, _s);
162

163 164 165
                rm_fd (_handle);
                _s = -1;
                _status = unplugged;
166 167 168 169 170

                //  Shut the connecter down.
                terminate ();
            }
        }
171
    } else
172 173 174 175 176
        error ();
}

void zmq::socks_connecter_t::out_event ()
{
177 178
    zmq_assert (_status == waiting_for_proxy_connection
                || _status == sending_greeting || _status == sending_request);
179

180
    if (_status == waiting_for_proxy_connection) {
181
        const int rc = static_cast<int> (check_proxy_connection ());
182 183 184
        if (rc == -1)
            error ();
        else {
185 186 187
            _greeting_encoder.encode (
              socks_greeting_t (socks_no_auth_required));
            _status = sending_greeting;
188
        }
189 190 191
    } else if (_status == sending_greeting) {
        zmq_assert (_greeting_encoder.has_pending_data ());
        const int rc = _greeting_encoder.output (_s);
192 193
        if (rc == -1 || rc == 0)
            error ();
194 195 196 197
        else if (!_greeting_encoder.has_pending_data ()) {
            reset_pollout (_handle);
            set_pollin (_handle);
            _status = waiting_for_choice;
198
        }
199
    } else {
200 201
        zmq_assert (_request_encoder.has_pending_data ());
        const int rc = _request_encoder.output (_s);
202 203
        if (rc == -1 || rc == 0)
            error ();
204 205 206 207
        else if (!_request_encoder.has_pending_data ()) {
            reset_pollout (_handle);
            set_pollin (_handle);
            _status = waiting_for_response;
208 209 210 211 212 213 214 215 216 217 218
        }
    }
}

void zmq::socks_connecter_t::initiate_connect ()
{
    //  Open the connecting socket.
    const int rc = connect_to_proxy ();

    //  Connect may succeed in synchronous manner.
    if (rc == 0) {
219 220 221
        _handle = add_fd (_s);
        set_pollout (_handle);
        _status = sending_greeting;
222 223
    }
    //  Connection establishment may be delayed. Poll for its completion.
224
    else if (errno == EINPROGRESS) {
225 226 227 228
        _handle = add_fd (_s);
        set_pollout (_handle);
        _status = waiting_for_proxy_connection;
        _socket->event_connect_delayed (_endpoint, zmq_errno ());
229 230 231
    }
    //  Handle any other error condition by eventual reconnect.
    else {
232
        if (_s != retired_fd)
233 234 235 236 237 238
            close ();
        start_timer ();
    }
}

int zmq::socks_connecter_t::process_server_response (
239
  const socks_choice_t &response_)
240 241
{
    //  We do not support any authentication method for now.
242
    return response_.method == 0 ? 0 : -1;
243 244 245
}

int zmq::socks_connecter_t::process_server_response (
246
  const socks_response_t &response_)
247
{
248
    return response_.response_code == 0 ? 0 : -1;
249 250 251 252
}

void zmq::socks_connecter_t::timer_event (int id_)
{
253
    zmq_assert (_status == waiting_for_reconnect_time);
254 255 256 257 258 259
    zmq_assert (id_ == reconnect_timer_id);
    initiate_connect ();
}

void zmq::socks_connecter_t::error ()
{
260
    rm_fd (_handle);
261
    close ();
262 263 264 265
    _greeting_encoder.reset ();
    _choice_decoder.reset ();
    _request_encoder.reset ();
    _response_decoder.reset ();
266 267 268 269 270
    start_timer ();
}

void zmq::socks_connecter_t::start_timer ()
{
271 272 273 274 275 276
    if (options.reconnect_ivl != -1) {
        const int interval = get_new_reconnect_ivl ();
        add_timer (interval, reconnect_timer_id);
        _status = waiting_for_reconnect_time;
        _socket->event_connect_retried (_endpoint, interval);
    }
277 278 279 280 281
}

int zmq::socks_connecter_t::get_new_reconnect_ivl ()
{
    //  The new interval is the current interval + random value.
282
    const int interval =
283
      _current_reconnect_ivl + generate_random () % options.reconnect_ivl;
284 285 286

    //  Only change the current reconnect interval  if the maximum reconnect
    //  interval was set and if it's larger than the reconnect interval.
287 288
    if (options.reconnect_ivl_max > 0
        && options.reconnect_ivl_max > options.reconnect_ivl)
289
        //  Calculate the next interval
290 291
        _current_reconnect_ivl =
          std::min (_current_reconnect_ivl * 2, options.reconnect_ivl_max);
292 293 294 295 296
    return interval;
}

int zmq::socks_connecter_t::connect_to_proxy ()
{
297
    zmq_assert (_s == retired_fd);
298 299

    //  Resolve the address
300 301 302
    LIBZMQ_DELETE (_proxy_addr->resolved.tcp_addr);
    _proxy_addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
    alloc_assert (_proxy_addr->resolved.tcp_addr);
303

304 305
    int rc = _proxy_addr->resolved.tcp_addr->resolve (
      _proxy_addr->address.c_str (), false, options.ipv6);
306
    if (rc != 0) {
307
        LIBZMQ_DELETE (_proxy_addr->resolved.tcp_addr);
308 309
        return -1;
    }
310 311
    zmq_assert (_proxy_addr->resolved.tcp_addr != NULL);
    const tcp_address_t *tcp_addr = _proxy_addr->resolved.tcp_addr;
312 313

    //  Create the socket.
314 315
    _s = open_socket (tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP);
    if (_s == retired_fd)
316 317 318 319 320
        return -1;

    //  On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
    //  Switch it on in such cases.
    if (tcp_addr->family () == AF_INET6)
321
        enable_ipv4_mapping (_s);
322 323 324

    // Set the IP Type-Of-Service priority for this socket
    if (options.tos != 0)
325
        set_ip_type_of_service (_s, options.tos);
326

327 328
    // Bind the socket to a device if applicable
    if (!options.bound_device.empty ())
329
        bind_to_device (_s, options.bound_device);
330

331
    // Set the socket to non-blocking mode so that we get async connect().
332
    unblock_socket (_s);
333 334

    //  Set the socket buffer limits for the underlying socket.
335
    if (options.sndbuf >= 0)
336
        set_tcp_send_buffer (_s, options.sndbuf);
337
    if (options.rcvbuf >= 0)
338
        set_tcp_receive_buffer (_s, options.rcvbuf);
339 340 341

    // Set the IP Type-Of-Service for the underlying socket
    if (options.tos != 0)
342
        set_ip_type_of_service (_s, options.tos);
343 344 345

    // Set a source address for conversations
    if (tcp_addr->has_src_addr ()) {
346
#if defined ZMQ_HAVE_VXWORKS
347
        rc = ::bind (_s, (sockaddr *) tcp_addr->src_addr (),
348 349
                     tcp_addr->src_addrlen ());
#else
350
        rc = ::bind (_s, tcp_addr->src_addr (), tcp_addr->src_addrlen ());
351
#endif
352 353 354 355 356 357
        if (rc == -1) {
            close ();
            return -1;
        }
    }

358
        //  Connect to the remote peer.
359
#if defined ZMQ_HAVE_VXWORKS
360
    rc = ::connect (_s, (sockaddr *) tcp_addr->addr (), tcp_addr->addrlen ());
361
#else
362
    rc = ::connect (_s, tcp_addr->addr (), tcp_addr->addrlen ());
363
#endif
364
    //  Connect was successful immediately.
365 366 367
    if (rc == 0)
        return 0;

368 369
        //  Translate error codes indicating asynchronous connect has been
        //  launched to a uniform EINPROGRESS.
370
#ifdef ZMQ_HAVE_WINDOWS
371
    const int last_error = WSAGetLastError ();
372
    if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
373 374
        errno = EINPROGRESS;
    else {
375
        errno = wsa_error_to_errno (last_error);
376 377 378 379 380 381 382 383 384 385 386 387 388
        close ();
    }
#else
    if (errno == EINTR)
        errno = EINPROGRESS;
#endif
    return -1;
}

zmq::fd_t zmq::socks_connecter_t::check_proxy_connection ()
{
    //  Async connect has finished. Check whether an error occurred
    int err = 0;
389
#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
390 391 392 393 394
    int len = sizeof err;
#else
    socklen_t len = sizeof err;
#endif

395
    int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR,
396
                         reinterpret_cast<char *> (&err), &len);
397 398 399 400 401 402

    //  Assert if the error was caused by 0MQ bug.
    //  Networking problems are OK. No need to assert.
#ifdef ZMQ_HAVE_WINDOWS
    zmq_assert (rc == 0);
    if (err != 0) {
403 404 405 406 407
        wsa_assert (err == WSAECONNREFUSED || err == WSAETIMEDOUT
                    || err == WSAECONNABORTED || err == WSAEHOSTUNREACH
                    || err == WSAENETUNREACH || err == WSAENETDOWN
                    || err == WSAEACCES || err == WSAEINVAL
                    || err == WSAEADDRINUSE);
408 409 410 411 412 413 414 415 416
        return -1;
    }
#else
    //  Following code should handle both Berkeley-derived socket
    //  implementations and Solaris.
    if (rc == -1)
        err = errno;
    if (err != 0) {
        errno = err;
417 418 419 420
        errno_assert (errno == ECONNREFUSED || errno == ECONNRESET
                      || errno == ETIMEDOUT || errno == EHOSTUNREACH
                      || errno == ENETUNREACH || errno == ENETDOWN
                      || errno == EINVAL);
421 422 423 424
        return -1;
    }
#endif

425
    rc = tune_tcp_socket (_s);
426 427
    rc = rc
         | tune_tcp_keepalives (
428
             _s, options.tcp_keepalive, options.tcp_keepalive_cnt,
429
             options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
430 431
    if (rc != 0)
        return -1;
432

433 434 435 436 437
    return 0;
}

void zmq::socks_connecter_t::close ()
{
438
    zmq_assert (_s != retired_fd);
439
#ifdef ZMQ_HAVE_WINDOWS
440
    const int rc = closesocket (_s);
441 442
    wsa_assert (rc != SOCKET_ERROR);
#else
443
    const int rc = ::close (_s);
444 445
    errno_assert (rc == 0);
#endif
446 447
    _socket->event_closed (_endpoint, _s);
    _s = retired_fd;
448 449
}

450 451 452
int zmq::socks_connecter_t::parse_address (const std::string &address_,
                                           std::string &hostname_,
                                           uint16_t &port_)
453 454 455 456 457 458 459 460 461
{
    //  Find the ':' at end that separates address from the port number.
    const size_t idx = address_.rfind (':');
    if (idx == std::string::npos) {
        errno = EINVAL;
        return -1;
    }

    //  Extract hostname
462
    if (idx < 2 || address_[0] != '[' || address_[idx - 1] != ']')
463 464 465 466 467 468 469
        hostname_ = address_.substr (0, idx);
    else
        hostname_ = address_.substr (1, idx - 2);

    //  Separate the hostname/port.
    const std::string port_str = address_.substr (idx + 1);
    //  Parse the port number (0 is not a valid port).
470
    port_ = static_cast<uint16_t> (atoi (port_str.c_str ()));
471 472 473 474 475 476
    if (port_ == 0) {
        errno = EINVAL;
        return -1;
    }
    return 0;
}