stream_connecter_base.cpp 6.17 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
/*
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file

    This file is part of libzmq, the ZeroMQ core engine in C++.

    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "precompiled.hpp"
#include "stream_connecter_base.hpp"
#include "session_base.hpp"
#include "address.hpp"
34
#include "random.hpp"
35

36 37 38 39 40 41
#ifndef ZMQ_HAVE_WINDOWS
#include <unistd.h>
#else
#include <winsock2.h>
#endif

42 43
#include <limits>

44 45 46 47 48 49 50 51 52 53 54
zmq::stream_connecter_base_t::stream_connecter_base_t (
  zmq::io_thread_t *io_thread_,
  zmq::session_base_t *session_,
  const zmq::options_t &options_,
  zmq::address_t *addr_,
  bool delayed_start_) :
    own_t (io_thread_, options_),
    io_object_t (io_thread_),
    _addr (addr_),
    _s (retired_fd),
    _handle (static_cast<handle_t> (NULL)),
55
    _socket (session_->get_socket ()),
56 57 58
    _delayed_start (delayed_start_),
    _reconnect_timer_started (false),
    _session (session_),
59
    _current_reconnect_ivl (options.reconnect_ivl)
60 61 62 63 64 65 66
{
    zmq_assert (_addr);
    _addr->to_string (_endpoint);
    // TODO the return value is unused! what if it fails? if this is impossible
    // or does not matter, change such that endpoint in initialized using an
    // initializer, and make endpoint const
}
67 68 69 70 71 72 73

zmq::stream_connecter_base_t::~stream_connecter_base_t ()
{
    zmq_assert (!_reconnect_timer_started);
    zmq_assert (!_handle);
    zmq_assert (_s == retired_fd);
}
74 75 76 77 78 79 80 81 82

void zmq::stream_connecter_base_t::process_plug ()
{
    if (_delayed_start)
        add_reconnect_timer ();
    else
        start_connecting ();
}

83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
void zmq::stream_connecter_base_t::process_term (int linger_)
{
    if (_reconnect_timer_started) {
        cancel_timer (reconnect_timer_id);
        _reconnect_timer_started = false;
    }

    if (_handle) {
        rm_handle ();
    }

    if (_s != retired_fd)
        close ();

    own_t::process_term (linger_);
}

100 101 102 103 104
void zmq::stream_connecter_base_t::add_reconnect_timer ()
{
    if (options.reconnect_ivl != -1) {
        const int interval = get_new_reconnect_ivl ();
        add_timer (interval, reconnect_timer_id);
105 106
        _socket->event_connect_retried (
          make_unconnected_connect_endpoint_pair (_endpoint), interval);
107 108 109 110 111 112
        _reconnect_timer_started = true;
    }
}

int zmq::stream_connecter_base_t::get_new_reconnect_ivl ()
{
113 114 115 116
    //  TODO should the random jitter be really based on the configured initial
    //  reconnect interval options.reconnect_ivl, or better on the
    //  _current_reconnect_ivl?

117
    //  The new interval is the current interval + random value.
118
    const int random_jitter = generate_random () % options.reconnect_ivl;
119
    const int interval =
120 121 122
      _current_reconnect_ivl < std::numeric_limits<int>::max () - random_jitter
        ? _current_reconnect_ivl + random_jitter
        : std::numeric_limits<int>::max ();
123

124
    //  Only change the new current reconnect interval if the maximum reconnect
125 126
    //  interval was set and if it's larger than the reconnect interval.
    if (options.reconnect_ivl_max > 0
127
        && options.reconnect_ivl_max > options.reconnect_ivl) {
128 129
        //  Calculate the next interval
        _current_reconnect_ivl =
130 131 132 133 134
          _current_reconnect_ivl < std::numeric_limits<int>::max () / 2
            ? std::min (_current_reconnect_ivl * 2, options.reconnect_ivl_max)
            : options.reconnect_ivl_max;
    }

135 136
    return interval;
}
137 138 139 140 141 142

void zmq::stream_connecter_base_t::rm_handle ()
{
    rm_fd (_handle);
    _handle = static_cast<handle_t> (NULL);
}
143 144 145

void zmq::stream_connecter_base_t::close ()
{
146 147
    // TODO before, this was an assertion for _s != retired_fd, but this does not match usage of close
    if (_s != retired_fd) {
148
#ifdef ZMQ_HAVE_WINDOWS
149 150
        const int rc = closesocket (_s);
        wsa_assert (rc != SOCKET_ERROR);
151
#else
152 153
        const int rc = ::close (_s);
        errno_assert (rc == 0);
154
#endif
155 156 157 158
        _socket->event_closed (
          make_unconnected_connect_endpoint_pair (_endpoint), _s);
        _s = retired_fd;
    }
159
}
160 161 162 163 164 165 166 167 168

void zmq::stream_connecter_base_t::in_event ()
{
    //  We are not polling for incoming data, so we are actually called
    //  because of error here. However, we can get error on out event as well
    //  on some platforms, so we'll simply handle both events in the same way.
    out_event ();
}

169 170
void zmq::stream_connecter_base_t::create_engine (
  fd_t fd, const std::string &local_address_)
171
{
172 173
    const endpoint_uri_pair_t endpoint_pair (local_address_, _endpoint,
                                             endpoint_type_connect);
174

175 176
    //  Create the engine object for this connection.
    stream_engine_t *engine =
177
      new (std::nothrow) stream_engine_t (fd, options, endpoint_pair);
178 179 180 181 182 183 184 185
    alloc_assert (engine);

    //  Attach the engine to the corresponding session object.
    send_attach (_session, engine);

    //  Shut the connecter down.
    terminate ();

186
    _socket->event_connected (endpoint_pair, fd);
187 188 189 190 191 192 193 194
}

void zmq::stream_connecter_base_t::timer_event (int id_)
{
    zmq_assert (id_ == reconnect_timer_id);
    _reconnect_timer_started = false;
    start_connecting ();
}