Unverified Commit 4d8e5fb2 authored by Luca Boccassi's avatar Luca Boccassi Committed by GitHub

Merge pull request #3378 from sigiesec/refactor-stream-listeners-and-connecters

Refactor stream listeners and connecters
parents 1aa6f707 e162c8bd
......@@ -821,6 +821,10 @@ set(cxx-sources
stdint.hpp
stream.hpp
stream_engine.hpp
stream_connecter_base.hpp
stream_connecter_base.cpp
stream_listener_base.hpp
stream_listener_base.cpp
sub.hpp
tcp.hpp
tcp_address.hpp
......
......@@ -194,6 +194,10 @@ src_libzmq_la_SOURCES = \
src/stdint.hpp \
src/stream.cpp \
src/stream.hpp \
src/stream_connecter_base.cpp \
src/stream_connecter_base.hpp \
src/stream_listener_base.cpp \
src/stream_listener_base.hpp \
src/stream_engine.cpp \
src/stream_engine.hpp \
src/sub.cpp \
......
......@@ -54,70 +54,18 @@
zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_,
class session_base_t *session_,
const options_t &options_,
const address_t *addr_,
address_t *addr_,
bool delayed_start_) :
own_t (io_thread_, options_),
io_object_t (io_thread_),
addr (addr_),
s (retired_fd),
handle_valid (false),
delayed_start (delayed_start_),
timer_started (false),
session (session_),
current_reconnect_ivl (options.reconnect_ivl)
stream_connecter_base_t (
io_thread_, session_, options_, addr_, delayed_start_)
{
zmq_assert (addr);
zmq_assert (addr->protocol == protocol_name::ipc);
addr->to_string (endpoint);
socket = session->get_socket ();
}
zmq::ipc_connecter_t::~ipc_connecter_t ()
{
zmq_assert (!timer_started);
zmq_assert (!handle_valid);
zmq_assert (s == retired_fd);
}
void zmq::ipc_connecter_t::process_plug ()
{
if (delayed_start)
add_reconnect_timer ();
else
start_connecting ();
}
void zmq::ipc_connecter_t::process_term (int linger_)
{
if (timer_started) {
cancel_timer (reconnect_timer_id);
timer_started = false;
}
if (handle_valid) {
rm_fd (handle);
handle_valid = false;
}
if (s != retired_fd)
close ();
own_t::process_term (linger_);
}
void zmq::ipc_connecter_t::in_event ()
{
// We are not polling for incoming data, so we are actually called
// because of error here. However, we can get error on out event as well
// on some platforms, so we'll simply handle both events in the same way.
out_event ();
zmq_assert (_addr->protocol == protocol_name::ipc);
}
void zmq::ipc_connecter_t::out_event ()
{
fd_t fd = connect ();
rm_fd (handle);
handle_valid = false;
rm_handle ();
// Handle the error condition by attempt to reconnect.
if (fd == retired_fd) {
......@@ -125,25 +73,8 @@ void zmq::ipc_connecter_t::out_event ()
add_reconnect_timer ();
return;
}
// Create the engine object for this connection.
stream_engine_t *engine =
new (std::nothrow) stream_engine_t (fd, options, endpoint);
alloc_assert (engine);
// Attach the engine to the corresponding session object.
send_attach (session, engine);
// Shut the connecter down.
terminate ();
socket->event_connected (endpoint, fd);
}
void zmq::ipc_connecter_t::timer_event (int id_)
{
zmq_assert (id_ == reconnect_timer_id);
timer_started = false;
start_connecting ();
create_engine (fd);
}
void zmq::ipc_connecter_t::start_connecting ()
......@@ -153,71 +84,44 @@ void zmq::ipc_connecter_t::start_connecting ()
// Connect may succeed in synchronous manner.
if (rc == 0) {
handle = add_fd (s);
handle_valid = true;
_handle = add_fd (_s);
out_event ();
}
// Connection establishment may be delayed. Poll for its completion.
else if (rc == -1 && errno == EINPROGRESS) {
handle = add_fd (s);
handle_valid = true;
set_pollout (handle);
socket->event_connect_delayed (endpoint, zmq_errno ());
_handle = add_fd (_s);
set_pollout (_handle);
_socket->event_connect_delayed (_endpoint, zmq_errno ());
// TODO, tcp_connecter_t adds a connect timer in this case; maybe this
// should be done here as well (and then this could be pulled up to
// stream_connecter_base_t).
}
// Handle any other error condition by eventual reconnect.
else {
if (s != retired_fd)
if (_s != retired_fd)
close ();
add_reconnect_timer ();
}
}
void zmq::ipc_connecter_t::add_reconnect_timer ()
{
if (options.reconnect_ivl != -1) {
int rc_ivl = get_new_reconnect_ivl ();
add_timer (rc_ivl, reconnect_timer_id);
socket->event_connect_retried (endpoint, rc_ivl);
timer_started = true;
}
}
int zmq::ipc_connecter_t::get_new_reconnect_ivl ()
{
// The new interval is the current interval + random value.
int this_interval =
current_reconnect_ivl + (generate_random () % options.reconnect_ivl);
// Only change the current reconnect interval if the maximum reconnect
// interval was set and if it's larger than the reconnect interval.
if (options.reconnect_ivl_max > 0
&& options.reconnect_ivl_max > options.reconnect_ivl) {
// Calculate the next interval
current_reconnect_ivl = current_reconnect_ivl * 2;
if (current_reconnect_ivl >= options.reconnect_ivl_max) {
current_reconnect_ivl = options.reconnect_ivl_max;
}
}
return this_interval;
}
int zmq::ipc_connecter_t::open ()
{
zmq_assert (s == retired_fd);
zmq_assert (_s == retired_fd);
// Create the socket.
s = open_socket (AF_UNIX, SOCK_STREAM, 0);
if (s == -1)
_s = open_socket (AF_UNIX, SOCK_STREAM, 0);
if (_s == -1)
return -1;
// Set the non-blocking flag.
unblock_socket (s);
unblock_socket (_s);
// Connect to the remote peer.
int rc = ::connect (s, addr->resolved.ipc_addr->addr (),
addr->resolved.ipc_addr->addrlen ());
int rc = ::connect (_s, _addr->resolved.ipc_addr->addr (),
_addr->resolved.ipc_addr->addrlen ());
// Connect was successful immediately.
if (rc == 0)
......@@ -234,16 +138,6 @@ int zmq::ipc_connecter_t::open ()
return -1;
}
int zmq::ipc_connecter_t::close ()
{
zmq_assert (s != retired_fd);
int rc = ::close (s);
errno_assert (rc == 0);
socket->event_closed (endpoint, s);
s = retired_fd;
return 0;
}
zmq::fd_t zmq::ipc_connecter_t::connect ()
{
// Following code should handle both Berkeley-derived socket
......@@ -254,7 +148,7 @@ zmq::fd_t zmq::ipc_connecter_t::connect ()
#else
socklen_t len = sizeof (err);
#endif
int rc = getsockopt (s, SOL_SOCKET, SO_ERROR,
int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR,
reinterpret_cast<char *> (&err), &len);
if (rc == -1) {
if (errno == ENOPROTOOPT)
......@@ -272,8 +166,8 @@ zmq::fd_t zmq::ipc_connecter_t::connect ()
return retired_fd;
}
fd_t result = s;
s = retired_fd;
fd_t result = _s;
_s = retired_fd;
return result;
}
......
......@@ -34,17 +34,11 @@
&& !defined ZMQ_HAVE_VXWORKS
#include "fd.hpp"
#include "own.hpp"
#include "stdint.hpp"
#include "io_object.hpp"
#include "stream_connecter_base.hpp"
namespace zmq
{
class io_thread_t;
class session_base_t;
struct address_t;
class ipc_connecter_t : public own_t, public io_object_t
class ipc_connecter_t : public stream_connecter_base_t
{
public:
// If 'delayed_start' is true connecter first waits for a while,
......@@ -52,80 +46,25 @@ class ipc_connecter_t : public own_t, public io_object_t
ipc_connecter_t (zmq::io_thread_t *io_thread_,
zmq::session_base_t *session_,
const options_t &options_,
const address_t *addr_,
address_t *addr_,
bool delayed_start_);
~ipc_connecter_t ();
private:
// ID of the timer used to delay the reconnection.
enum
{
reconnect_timer_id = 1
};
// Handlers for incoming commands.
void process_plug ();
void process_term (int linger_);
// Handlers for I/O events.
void in_event ();
void out_event ();
void timer_event (int id_);
// Internal function to start the actual connection establishment.
void start_connecting ();
// Internal function to add a reconnect timer
void add_reconnect_timer ();
// Internal function to return a reconnect backoff delay.
// Will modify the current_reconnect_ivl used for next call
// Returns the currently used interval
int get_new_reconnect_ivl ();
// Open IPC connecting socket. Returns -1 in case of error,
// 0 if connect was successful immediately. Returns -1 with
// EAGAIN errno if async connect was launched.
int open ();
// Close the connecting socket.
int close ();
// Get the file descriptor of newly created connection. Returns
// retired_fd if the connection was unsuccessful.
fd_t connect ();
// Address to connect to. Owned by session_base_t.
const address_t *addr;
// Underlying socket.
fd_t s;
// Handle corresponding to the listening socket.
handle_t handle;
// If true file descriptor is registered with the poller and 'handle'
// contains valid value.
bool handle_valid;
// If true, connecter is waiting a while before trying to connect.
const bool delayed_start;
// True iff a timer has been started.
bool timer_started;
// Reference to the session we belong to.
zmq::session_base_t *session;
// Current reconnect ivl, updated for backoff strategy
int current_reconnect_ivl;
// String representation of endpoint to connect to
std::string endpoint;
// Socket
zmq::socket_base_t *socket;
ipc_connecter_t (const ipc_connecter_t &);
const ipc_connecter_t &operator= (const ipc_connecter_t &);
};
......
......@@ -37,10 +37,8 @@
#include <string.h>
#include "stream_engine.hpp"
#include "ipc_address.hpp"
#include "io_thread.hpp"
#include "session_base.hpp"
#include "config.hpp"
#include "err.hpp"
#include "ip.hpp"
......@@ -132,34 +130,11 @@ int zmq::ipc_listener_t::create_wildcard_address (std::string &path_,
zmq::ipc_listener_t::ipc_listener_t (io_thread_t *io_thread_,
socket_base_t *socket_,
const options_t &options_) :
own_t (io_thread_, options_),
io_object_t (io_thread_),
has_file (false),
s (retired_fd),
handle (static_cast<handle_t> (NULL)),
socket (socket_)
stream_listener_base_t (io_thread_, socket_, options_),
_has_file (false)
{
}
zmq::ipc_listener_t::~ipc_listener_t ()
{
zmq_assert (s == retired_fd);
}
void zmq::ipc_listener_t::process_plug ()
{
// Start polling for incoming connections.
handle = add_fd (s);
set_pollin (handle);
}
void zmq::ipc_listener_t::process_term (int linger_)
{
rm_fd (handle);
close ();
own_t::process_term (linger_);
}
void zmq::ipc_listener_t::in_event ()
{
fd_t fd = accept ();
......@@ -167,42 +142,21 @@ void zmq::ipc_listener_t::in_event ()
// If connection was reset by the peer in the meantime, just ignore it.
// TODO: Handle specific errors like ENFILE/EMFILE etc.
if (fd == retired_fd) {
socket->event_accept_failed (endpoint, zmq_errno ());
_socket->event_accept_failed (_endpoint, zmq_errno ());
return;
}
// Create the engine object for this connection.
stream_engine_t *engine =
new (std::nothrow) stream_engine_t (fd, options, endpoint);
alloc_assert (engine);
// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_assert (io_thread);
// Create and launch a session object.
session_base_t *session =
session_base_t::create (io_thread, false, socket, options, NULL);
errno_assert (session);
session->inc_seqnum ();
launch_child (session);
send_attach (session, engine, false);
socket->event_accepted (endpoint, fd);
create_engine (fd);
}
int zmq::ipc_listener_t::get_address (std::string &addr_)
{
struct sockaddr_storage ss;
#ifdef ZMQ_HAVE_HPUX
int sl = sizeof (ss);
#else
socklen_t sl = sizeof (ss);
#endif
int rc = getsockname (s, reinterpret_cast<sockaddr *> (&ss), &sl);
if (rc != 0) {
const zmq_socklen_t sl = get_socket_address (&ss);
if (sl == 0) {
addr_.clear ();
return rc;
return -1;
}
ipc_address_t addr (reinterpret_cast<struct sockaddr *> (&ss), sl);
......@@ -216,7 +170,7 @@ int zmq::ipc_listener_t::set_address (const char *addr_)
// Allow wildcard file
if (options.use_fd == -1 && addr[0] == '*') {
if (create_wildcard_address (tmp_socket_dirname, addr) < 0) {
if (create_wildcard_address (_tmp_socket_dirname, addr) < 0) {
return -1;
}
}
......@@ -229,56 +183,56 @@ int zmq::ipc_listener_t::set_address (const char *addr_)
if (options.use_fd == -1) {
::unlink (addr.c_str ());
}
filename.clear ();
_filename.clear ();
// Initialise the address structure.
ipc_address_t address;
int rc = address.resolve (addr.c_str ());
if (rc != 0) {
if (!tmp_socket_dirname.empty ()) {
if (!_tmp_socket_dirname.empty ()) {
// We need to preserve errno to return to the user
int tmp_errno = errno;
::rmdir (tmp_socket_dirname.c_str ());
tmp_socket_dirname.clear ();
::rmdir (_tmp_socket_dirname.c_str ());
_tmp_socket_dirname.clear ();
errno = tmp_errno;
}
return -1;
}
address.to_string (endpoint);
address.to_string (_endpoint);
if (options.use_fd != -1) {
s = options.use_fd;
_s = options.use_fd;
} else {
// Create a listening socket.
s = open_socket (AF_UNIX, SOCK_STREAM, 0);
if (s == -1) {
if (!tmp_socket_dirname.empty ()) {
_s = open_socket (AF_UNIX, SOCK_STREAM, 0);
if (_s == -1) {
if (!_tmp_socket_dirname.empty ()) {
// We need to preserve errno to return to the user
int tmp_errno = errno;
::rmdir (tmp_socket_dirname.c_str ());
tmp_socket_dirname.clear ();
::rmdir (_tmp_socket_dirname.c_str ());
_tmp_socket_dirname.clear ();
errno = tmp_errno;
}
return -1;
}
// Bind the socket to the file path.
rc = bind (s, const_cast<sockaddr *> (address.addr ()),
rc = bind (_s, const_cast<sockaddr *> (address.addr ()),
address.addrlen ());
if (rc != 0)
goto error;
// Listen for incoming connections.
rc = listen (s, options.backlog);
rc = listen (_s, options.backlog);
if (rc != 0)
goto error;
}
filename = ZMQ_MOVE (addr);
has_file = true;
_filename = ZMQ_MOVE (addr);
_has_file = true;
socket->event_listening (endpoint, s);
_socket->event_listening (_endpoint, _s);
return 0;
error:
......@@ -290,28 +244,28 @@ error:
int zmq::ipc_listener_t::close ()
{
zmq_assert (s != retired_fd);
int fd_for_event = s;
int rc = ::close (s);
zmq_assert (_s != retired_fd);
int fd_for_event = _s;
int rc = ::close (_s);
errno_assert (rc == 0);
s = retired_fd;
_s = retired_fd;
if (has_file && options.use_fd == -1) {
if (_has_file && options.use_fd == -1) {
rc = 0;
if (rc == 0 && !tmp_socket_dirname.empty ()) {
rc = ::rmdir (tmp_socket_dirname.c_str ());
tmp_socket_dirname.clear ();
if (rc == 0 && !_tmp_socket_dirname.empty ()) {
rc = ::rmdir (_tmp_socket_dirname.c_str ());
_tmp_socket_dirname.clear ();
}
if (rc != 0) {
socket->event_close_failed (endpoint, zmq_errno ());
_socket->event_close_failed (_endpoint, zmq_errno ());
return -1;
}
}
socket->event_closed (endpoint, fd_for_event);
_socket->event_closed (_endpoint, fd_for_event);
return 0;
}
......@@ -389,11 +343,11 @@ zmq::fd_t zmq::ipc_listener_t::accept ()
// Accept one connection and deal with different failure modes.
// The situation where connection cannot be accepted due to insufficient
// resources is considered valid and treated by ignoring the connection.
zmq_assert (s != retired_fd);
zmq_assert (_s != retired_fd);
#if defined ZMQ_HAVE_SOCK_CLOEXEC && defined HAVE_ACCEPT4
fd_t sock = ::accept4 (s, NULL, NULL, SOCK_CLOEXEC);
fd_t sock = ::accept4 (_s, NULL, NULL, SOCK_CLOEXEC);
#else
fd_t sock = ::accept (s, NULL, NULL);
fd_t sock = ::accept (_s, NULL, NULL);
#endif
if (sock == -1) {
errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR
......
......@@ -36,22 +36,16 @@
#include <string>
#include "fd.hpp"
#include "own.hpp"
#include "stdint.hpp"
#include "io_object.hpp"
#include "stream_listener_base.hpp"
namespace zmq
{
class io_thread_t;
class socket_base_t;
class ipc_listener_t : public own_t, public io_object_t
class ipc_listener_t : public stream_listener_base_t
{
public:
ipc_listener_t (zmq::io_thread_t *io_thread_,
zmq::socket_base_t *socket_,
const options_t &options_);
~ipc_listener_t ();
// Set address to listen on.
int set_address (const char *addr_);
......@@ -60,16 +54,9 @@ class ipc_listener_t : public own_t, public io_object_t
int get_address (std::string &addr_);
private:
// Handlers for incoming commands.
void process_plug ();
void process_term (int linger_);
// Handlers for I/O events.
void in_event ();
// Close the listening socket.
int close ();
// Create wildcard path address
static int create_wildcard_address (std::string &path_, std::string &file_);
......@@ -79,32 +66,22 @@ class ipc_listener_t : public own_t, public io_object_t
bool filter (fd_t sock_);
#endif
int close ();
// Accept the new connection. Returns the file descriptor of the
// newly created connection. The function may return retired_fd
// if the connection was dropped while waiting in the listen backlog.
fd_t accept ();
// True, if the underlying file for UNIX domain socket exists.
bool has_file;
bool _has_file;
// Name of the temporary directory (if any) that has the
// the UNIX domain socket
std::string tmp_socket_dirname;
std::string _tmp_socket_dirname;
// Name of the file associated with the UNIX domain address.
std::string filename;
// Underlying socket.
fd_t s;
// Handle corresponding to the listening socket.
handle_t handle;
// Socket the listener belongs to.
zmq::socket_base_t *socket;
// String representation of endpoint to bind to
std::string endpoint;
std::string _filename;
// Acceptable temporary directory environment variables
static const char *tmp_env_vars[];
......
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "precompiled.hpp"
#include "stream_connecter_base.hpp"
#include "session_base.hpp"
#include "address.hpp"
#include "random.hpp"
zmq::stream_connecter_base_t::stream_connecter_base_t (
zmq::io_thread_t *io_thread_,
zmq::session_base_t *session_,
const zmq::options_t &options_,
zmq::address_t *addr_,
bool delayed_start_) :
own_t (io_thread_, options_),
io_object_t (io_thread_),
_addr (addr_),
_s (retired_fd),
_handle (static_cast<handle_t> (NULL)),
_socket (session_->get_socket ()),
_delayed_start (delayed_start_),
_reconnect_timer_started (false),
_session (session_),
_current_reconnect_ivl (options.reconnect_ivl)
{
zmq_assert (_addr);
_addr->to_string (_endpoint);
// TODO the return value is unused! what if it fails? if this is impossible
// or does not matter, change such that endpoint in initialized using an
// initializer, and make endpoint const
}
zmq::stream_connecter_base_t::~stream_connecter_base_t ()
{
zmq_assert (!_reconnect_timer_started);
zmq_assert (!_handle);
zmq_assert (_s == retired_fd);
}
void zmq::stream_connecter_base_t::process_plug ()
{
if (_delayed_start)
add_reconnect_timer ();
else
start_connecting ();
}
void zmq::stream_connecter_base_t::process_term (int linger_)
{
if (_reconnect_timer_started) {
cancel_timer (reconnect_timer_id);
_reconnect_timer_started = false;
}
if (_handle) {
rm_handle ();
}
if (_s != retired_fd)
close ();
own_t::process_term (linger_);
}
void zmq::stream_connecter_base_t::add_reconnect_timer ()
{
if (options.reconnect_ivl != -1) {
const int interval = get_new_reconnect_ivl ();
add_timer (interval, reconnect_timer_id);
_socket->event_connect_retried (_endpoint, interval);
_reconnect_timer_started = true;
}
}
int zmq::stream_connecter_base_t::get_new_reconnect_ivl ()
{
// The new interval is the current interval + random value.
const int interval =
_current_reconnect_ivl + generate_random () % options.reconnect_ivl;
// Only change the current reconnect interval if the maximum reconnect
// interval was set and if it's larger than the reconnect interval.
if (options.reconnect_ivl_max > 0
&& options.reconnect_ivl_max > options.reconnect_ivl)
// Calculate the next interval
_current_reconnect_ivl =
std::min (_current_reconnect_ivl * 2, options.reconnect_ivl_max);
return interval;
}
void zmq::stream_connecter_base_t::rm_handle ()
{
rm_fd (_handle);
_handle = static_cast<handle_t> (NULL);
}
void zmq::stream_connecter_base_t::close ()
{
zmq_assert (_s != retired_fd);
#ifdef ZMQ_HAVE_WINDOWS
const int rc = closesocket (_s);
wsa_assert (rc != SOCKET_ERROR);
#else
const int rc = ::close (_s);
errno_assert (rc == 0);
#endif
_socket->event_closed (_endpoint, _s);
_s = retired_fd;
}
void zmq::stream_connecter_base_t::in_event ()
{
// We are not polling for incoming data, so we are actually called
// because of error here. However, we can get error on out event as well
// on some platforms, so we'll simply handle both events in the same way.
out_event ();
}
void zmq::stream_connecter_base_t::create_engine (fd_t fd)
{
// Create the engine object for this connection.
stream_engine_t *engine =
new (std::nothrow) stream_engine_t (fd, options, _endpoint);
alloc_assert (engine);
// Attach the engine to the corresponding session object.
send_attach (_session, engine);
// Shut the connecter down.
terminate ();
_socket->event_connected (_endpoint, fd);
}
void zmq::stream_connecter_base_t::timer_event (int id_)
{
zmq_assert (id_ == reconnect_timer_id);
_reconnect_timer_started = false;
start_connecting ();
}
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __STREAM_CONNECTER_BASE_HPP_INCLUDED__
#define __STREAM_CONNECTER_BASE_HPP_INCLUDED__
#include "fd.hpp"
#include "own.hpp"
#include "io_object.hpp"
namespace zmq
{
class io_thread_t;
class session_base_t;
struct address_t;
class stream_connecter_base_t : public own_t, public io_object_t
{
public:
// If 'delayed_start' is true connecter first waits for a while,
// then starts connection process.
stream_connecter_base_t (zmq::io_thread_t *io_thread_,
zmq::session_base_t *session_,
const options_t &options_,
address_t *addr_,
bool delayed_start_);
~stream_connecter_base_t ();
protected:
// Handlers for incoming commands.
void process_plug ();
void process_term (int linger_);
// Handlers for I/O events.
void in_event ();
void timer_event (int id_);
// Internal function to create the engine after connection was established.
void create_engine (fd_t fd);
// Internal function to add a reconnect timer
void add_reconnect_timer ();
// Removes the handle from the poller.
void rm_handle ();
// Close the connecting socket.
void close ();
// Address to connect to. Owned by session_base_t.
// It is non-const since some parts may change during opening.
address_t *const _addr;
// Underlying socket.
fd_t _s;
// Handle corresponding to the listening socket, if file descriptor is
// registered with the poller, or NULL.
handle_t _handle;
// String representation of endpoint to connect to
std::string _endpoint;
// Socket
zmq::socket_base_t *const _socket;
private:
// ID of the timer used to delay the reconnection.
enum
{
reconnect_timer_id = 1
};
// Internal function to return a reconnect backoff delay.
// Will modify the current_reconnect_ivl used for next call
// Returns the currently used interval
int get_new_reconnect_ivl ();
virtual void start_connecting () = 0;
// If true, connecter is waiting a while before trying to connect.
const bool _delayed_start;
// True iff a timer has been started.
bool _reconnect_timer_started;
// Reference to the session we belong to.
zmq::session_base_t *const _session;
// Current reconnect ivl, updated for backoff strategy
int _current_reconnect_ivl;
stream_connecter_base_t (const stream_connecter_base_t &);
const stream_connecter_base_t &operator= (const stream_connecter_base_t &);
};
}
#endif
......@@ -63,6 +63,43 @@
#include "likely.hpp"
#include "wire.hpp"
static std::string get_peer_address (zmq::fd_t s_)
{
std::string peer_address;
const int family = zmq::get_peer_ip_address (s_, peer_address);
if (family == 0)
peer_address.clear ();
#if defined ZMQ_HAVE_SO_PEERCRED
else if (family == PF_UNIX) {
struct ucred cred;
socklen_t size = sizeof (cred);
if (!getsockopt (s_, SOL_SOCKET, SO_PEERCRED, &cred, &size)) {
std::ostringstream buf;
buf << ":" << cred.uid << ":" << cred.gid << ":" << cred.pid;
peer_address += buf.str ();
}
}
#elif defined ZMQ_HAVE_LOCAL_PEERCRED
else if (family == PF_UNIX) {
struct xucred cred;
socklen_t size = sizeof (cred);
if (!getsockopt (_s, 0, LOCAL_PEERCRED, &cred, &size)
&& cred.cr_version == XUCRED_VERSION) {
std::ostringstream buf;
buf << ":" << cred.cr_uid << ":";
if (cred.cr_ngroups > 0)
buf << cred.cr_groups[0];
buf << ":";
_peer_address += buf.str ();
}
}
#endif
return peer_address;
}
zmq::stream_engine_t::stream_engine_t (fd_t fd_,
const options_t &options_,
const std::string &endpoint_) :
......@@ -94,7 +131,8 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_,
_has_timeout_timer (false),
_has_heartbeat_timer (false),
_heartbeat_timeout (0),
_socket (NULL)
_socket (NULL),
_peer_address (get_peer_address (_s))
{
int rc = _tx_msg.init ();
errno_assert (rc == 0);
......@@ -104,34 +142,6 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_,
// Put the socket into non-blocking mode.
unblock_socket (_s);
const int family = get_peer_ip_address (_s, _peer_address);
if (family == 0)
_peer_address.clear ();
#if defined ZMQ_HAVE_SO_PEERCRED
else if (family == PF_UNIX) {
struct ucred cred;
socklen_t size = sizeof (cred);
if (!getsockopt (_s, SOL_SOCKET, SO_PEERCRED, &cred, &size)) {
std::ostringstream buf;
buf << ":" << cred.uid << ":" << cred.gid << ":" << cred.pid;
_peer_address += buf.str ();
}
}
#elif defined ZMQ_HAVE_LOCAL_PEERCRED
else if (family == PF_UNIX) {
struct xucred cred;
socklen_t size = sizeof (cred);
if (!getsockopt (_s, 0, LOCAL_PEERCRED, &cred, &size)
&& cred.cr_version == XUCRED_VERSION) {
std::ostringstream buf;
buf << ":" << cred.cr_uid << ":";
if (cred.cr_ngroups > 0)
buf << cred.cr_groups[0];
buf << ":";
_peer_address += buf.str ();
}
}
#endif
if (_options.heartbeat_interval > 0) {
_heartbeat_timeout = _options.heartbeat_timeout;
......
......@@ -191,7 +191,7 @@ class stream_engine_t : public io_object_t, public i_engine
const options_t _options;
// String representation of endpoint
std::string _endpoint;
const std::string _endpoint;
bool _plugged;
......@@ -238,7 +238,7 @@ class stream_engine_t : public io_object_t, public i_engine
// Socket
zmq::socket_base_t *_socket;
std::string _peer_address;
const std::string _peer_address;
stream_engine_t (const stream_engine_t &);
const stream_engine_t &operator= (const stream_engine_t &);
......
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "precompiled.hpp"
#include "stream_listener_base.hpp"
#include "session_base.hpp"
#include "socket_base.hpp"
#include "stream_engine.hpp"
zmq::stream_listener_base_t::stream_listener_base_t (
zmq::io_thread_t *io_thread_,
zmq::socket_base_t *socket_,
const zmq::options_t &options_) :
own_t (io_thread_, options_),
io_object_t (io_thread_),
_s (retired_fd),
_handle (static_cast<handle_t> (NULL)),
_socket (socket_)
{
}
zmq::stream_listener_base_t::~stream_listener_base_t ()
{
zmq_assert (_s == retired_fd);
zmq_assert (!_handle);
}
zmq::zmq_socklen_t
zmq::stream_listener_base_t::get_socket_address (sockaddr_storage *ss_) const
{
zmq_socklen_t sl = static_cast<zmq_socklen_t> (sizeof (*ss_));
const int rc =
getsockname (_s, reinterpret_cast<struct sockaddr *> (ss_), &sl);
return rc != 0 ? 0 : sl;
}
void zmq::stream_listener_base_t::process_plug ()
{
// Start polling for incoming connections.
_handle = add_fd (_s);
set_pollin (_handle);
}
void zmq::stream_listener_base_t::process_term (int linger_)
{
rm_fd (_handle);
_handle = static_cast<handle_t> (NULL);
close ();
own_t::process_term (linger_);
}
int zmq::stream_listener_base_t::close ()
{
// TODO this is identical to stream_connector_base_t::close
zmq_assert (_s != retired_fd);
#ifdef ZMQ_HAVE_WINDOWS
const int rc = closesocket (_s);
wsa_assert (rc != SOCKET_ERROR);
#else
const int rc = ::close (_s);
errno_assert (rc == 0);
#endif
_socket->event_closed (_endpoint, _s);
_s = retired_fd;
return 0;
}
void zmq::stream_listener_base_t::create_engine (fd_t fd)
{
stream_engine_t *engine =
new (std::nothrow) stream_engine_t (fd, options, _endpoint);
alloc_assert (engine);
// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_assert (io_thread);
// Create and launch a session object.
session_base_t *session =
session_base_t::create (io_thread, false, _socket, options, NULL);
errno_assert (session);
session->inc_seqnum ();
launch_child (session);
send_attach (session, engine, false);
_socket->event_accepted (_endpoint, fd);
}
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_STREAM_LISTENER_BASE_HPP_INCLUDED__
#define __ZMQ_STREAM_LISTENER_BASE_HPP_INCLUDED__
#include <string>
#include "fd.hpp"
#include "own.hpp"
#include "stdint.hpp"
#include "io_object.hpp"
#include "tipc_address.hpp"
namespace zmq
{
class io_thread_t;
class socket_base_t;
#if defined(ZMQ_HAVE_HPUX) || defined(ZMQ_HAVE_VXWORKS)
typedef int zmq_socklen_t;
#else
typedef socklen_t zmq_socklen_t;
#endif
class stream_listener_base_t : public own_t, public io_object_t
{
public:
stream_listener_base_t (zmq::io_thread_t *io_thread_,
zmq::socket_base_t *socket_,
const options_t &options_);
~stream_listener_base_t ();
protected:
zmq_socklen_t get_socket_address (sockaddr_storage *ss_) const;
private:
// Handlers for incoming commands.
void process_plug ();
void process_term (int linger_);
protected:
// Close the listening socket.
virtual int close ();
void create_engine (fd_t fd);
// Underlying socket.
fd_t _s;
// Handle corresponding to the listening socket.
handle_t _handle;
// Socket the listener belongs to.
zmq::socket_base_t *_socket;
// String representation of endpoint to bind to
std::string _endpoint;
private:
stream_listener_base_t (const stream_listener_base_t &);
const stream_listener_base_t &operator= (const stream_listener_base_t &);
};
}
#endif
......@@ -35,7 +35,6 @@
#include "tcp_connecter.hpp"
#include "stream_engine.hpp"
#include "io_thread.hpp"
#include "random.hpp"
#include "err.hpp"
#include "ip.hpp"
#include "tcp.hpp"
......@@ -69,40 +68,16 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
const options_t &options_,
address_t *addr_,
bool delayed_start_) :
own_t (io_thread_, options_),
io_object_t (io_thread_),
_addr (addr_),
_s (retired_fd),
_handle (static_cast<handle_t> (NULL)),
_delayed_start (delayed_start_),
_connect_timer_started (false),
_reconnect_timer_started (false),
_session (session_),
_current_reconnect_ivl (options.reconnect_ivl),
_socket (_session->get_socket ())
stream_connecter_base_t (
io_thread_, session_, options_, addr_, delayed_start_),
_connect_timer_started (false)
{
zmq_assert (_addr);
zmq_assert (_addr->protocol == protocol_name::tcp);
_addr->to_string (_endpoint);
// TODO the return value is unused! what if it fails? if this is impossible
// or does not matter, change such that endpoint in initialized using an
// initializer, and make endpoint const
}
zmq::tcp_connecter_t::~tcp_connecter_t ()
{
zmq_assert (!_connect_timer_started);
zmq_assert (!_reconnect_timer_started);
zmq_assert (!_handle);
zmq_assert (_s == retired_fd);
}
void zmq::tcp_connecter_t::process_plug ()
{
if (_delayed_start)
add_reconnect_timer ();
else
start_connecting ();
}
void zmq::tcp_connecter_t::process_term (int linger_)
......@@ -112,27 +87,7 @@ void zmq::tcp_connecter_t::process_term (int linger_)
_connect_timer_started = false;
}
if (_reconnect_timer_started) {
cancel_timer (reconnect_timer_id);
_reconnect_timer_started = false;
}
if (_handle) {
rm_handle ();
}
if (_s != retired_fd)
close ();
own_t::process_term (linger_);
}
void zmq::tcp_connecter_t::in_event ()
{
// We are not polling for incoming data, so we are actually called
// because of error here. However, we can get error on out event as well
// on some platforms, so we'll simply handle both events in the same way.
out_event ();
stream_connecter_base_t::process_term (linger_);
}
void zmq::tcp_connecter_t::out_event ()
......@@ -142,6 +97,9 @@ void zmq::tcp_connecter_t::out_event ()
_connect_timer_started = false;
}
// TODO this is still very similar to (t)ipc_connecter_t, maybe the
// differences can be factored out
rm_handle ();
const fd_t fd = connect ();
......@@ -153,38 +111,18 @@ void zmq::tcp_connecter_t::out_event ()
return;
}
// Create the engine object for this connection.
stream_engine_t *engine =
new (std::nothrow) stream_engine_t (fd, options, _endpoint);
alloc_assert (engine);
// Attach the engine to the corresponding session object.
send_attach (_session, engine);
// Shut the connecter down.
terminate ();
_socket->event_connected (_endpoint, fd);
}
void zmq::tcp_connecter_t::rm_handle ()
{
rm_fd (_handle);
_handle = static_cast<handle_t> (NULL);
create_engine (fd);
}
void zmq::tcp_connecter_t::timer_event (int id_)
{
zmq_assert (id_ == reconnect_timer_id || id_ == connect_timer_id);
if (id_ == connect_timer_id) {
_connect_timer_started = false;
rm_handle ();
close ();
add_reconnect_timer ();
} else if (id_ == reconnect_timer_id) {
_reconnect_timer_started = false;
start_connecting ();
}
} else
stream_connecter_base_t::timer_event (id_);
}
void zmq::tcp_connecter_t::start_connecting ()
......@@ -224,32 +162,6 @@ void zmq::tcp_connecter_t::add_connect_timer ()
}
}
void zmq::tcp_connecter_t::add_reconnect_timer ()
{
if (options.reconnect_ivl != -1) {
const int interval = get_new_reconnect_ivl ();
add_timer (interval, reconnect_timer_id);
_socket->event_connect_retried (_endpoint, interval);
_reconnect_timer_started = true;
}
}
int zmq::tcp_connecter_t::get_new_reconnect_ivl ()
{
// The new interval is the current interval + random value.
const int interval =
_current_reconnect_ivl + generate_random () % options.reconnect_ivl;
// Only change the current reconnect interval if the maximum reconnect
// interval was set and if it's larger than the reconnect interval.
if (options.reconnect_ivl_max > 0
&& options.reconnect_ivl_max > options.reconnect_ivl)
// Calculate the next interval
_current_reconnect_ivl =
std::min (_current_reconnect_ivl * 2, options.reconnect_ivl_max);
return interval;
}
int zmq::tcp_connecter_t::open ()
{
zmq_assert (_s == retired_fd);
......@@ -430,17 +342,3 @@ bool zmq::tcp_connecter_t::tune_socket (const fd_t fd_)
| tune_tcp_maxrt (fd_, options.tcp_maxrt);
return rc == 0;
}
void zmq::tcp_connecter_t::close ()
{
zmq_assert (_s != retired_fd);
#ifdef ZMQ_HAVE_WINDOWS
const int rc = closesocket (_s);
wsa_assert (rc != SOCKET_ERROR);
#else
const int rc = ::close (_s);
errno_assert (rc == 0);
#endif
_socket->event_closed (_endpoint, _s);
_s = retired_fd;
}
......@@ -31,17 +31,12 @@
#define __TCP_CONNECTER_HPP_INCLUDED__
#include "fd.hpp"
#include "own.hpp"
#include "stdint.hpp"
#include "io_object.hpp"
#include "stream_connecter_base.hpp"
namespace zmq
{
class io_thread_t;
class session_base_t;
struct address_t;
class tcp_connecter_t : public own_t, public io_object_t
class tcp_connecter_t : public stream_connecter_base_t
{
public:
// If 'delayed_start' is true connecter first waits for a while,
......@@ -54,47 +49,30 @@ class tcp_connecter_t : public own_t, public io_object_t
~tcp_connecter_t ();
private:
// ID of the timer used to delay the reconnection.
// ID of the timer used to check the connect timeout, must be different from stream_connecter_base_t::reconnect_timer_id.
enum
{
reconnect_timer_id = 1,
connect_timer_id
connect_timer_id = 2
};
// Handlers for incoming commands.
void process_plug ();
void process_term (int linger_);
// Handlers for I/O events.
void in_event ();
void out_event ();
void timer_event (int id_);
// Removes the handle from the poller.
void rm_handle ();
// Internal function to start the actual connection establishment.
void start_connecting ();
// Internal function to add a connect timer
void add_connect_timer ();
// Internal function to add a reconnect timer
void add_reconnect_timer ();
// Internal function to return a reconnect backoff delay.
// Will modify the current_reconnect_ivl used for next call
// Returns the currently used interval
int get_new_reconnect_ivl ();
// Open TCP connecting socket. Returns -1 in case of error,
// 0 if connect was successful immediately. Returns -1 with
// EAGAIN errno if async connect was launched.
int open ();
// Close the connecting socket.
void close ();
// Get the file descriptor of newly created connection. Returns
// retired_fd if the connection was unsuccessful.
fd_t connect ();
......@@ -102,34 +80,8 @@ class tcp_connecter_t : public own_t, public io_object_t
// Tunes a connected socket.
bool tune_socket (fd_t fd_);
// Address to connect to. Owned by session_base_t.
address_t *const _addr;
// Underlying socket.
fd_t _s;
// Handle corresponding to the listening socket, if file descriptor is
// registered with the poller, or NULL.
handle_t _handle;
// If true, connecter is waiting a while before trying to connect.
const bool _delayed_start;
// True iff a timer has been started.
bool _connect_timer_started;
bool _reconnect_timer_started;
// Reference to the session we belong to.
zmq::session_base_t *const _session;
// Current reconnect ivl, updated for backoff strategy
int _current_reconnect_ivl;
// String representation of endpoint to connect to
std::string _endpoint;
// Socket
zmq::socket_base_t *const _socket;
tcp_connecter_t (const tcp_connecter_t &);
const tcp_connecter_t &operator= (const tcp_connecter_t &);
......
......@@ -34,9 +34,7 @@
#include <stdio.h>
#include "tcp_listener.hpp"
#include "stream_engine.hpp"
#include "io_thread.hpp"
#include "session_base.hpp"
#include "config.hpp"
#include "err.hpp"
#include "ip.hpp"
......@@ -63,35 +61,10 @@
zmq::tcp_listener_t::tcp_listener_t (io_thread_t *io_thread_,
socket_base_t *socket_,
const options_t &options_) :
own_t (io_thread_, options_),
io_object_t (io_thread_),
_s (retired_fd),
_handle (static_cast<handle_t> (NULL)),
_socket (socket_)
stream_listener_base_t (io_thread_, socket_, options_)
{
}
zmq::tcp_listener_t::~tcp_listener_t ()
{
zmq_assert (_s == retired_fd);
zmq_assert (!_handle);
}
void zmq::tcp_listener_t::process_plug ()
{
// Start polling for incoming connections.
_handle = add_fd (_s);
set_pollin (_handle);
}
void zmq::tcp_listener_t::process_term (int linger_)
{
rm_fd (_handle);
_handle = static_cast<handle_t> (NULL);
close ();
own_t::process_term (linger_);
}
void zmq::tcp_listener_t::in_event ()
{
fd_t fd = accept ();
......@@ -115,53 +88,17 @@ void zmq::tcp_listener_t::in_event ()
}
// Create the engine object for this connection.
stream_engine_t *engine =
new (std::nothrow) stream_engine_t (fd, options, _endpoint);
alloc_assert (engine);
// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_assert (io_thread);
// Create and launch a session object.
session_base_t *session =
session_base_t::create (io_thread, false, _socket, options, NULL);
errno_assert (session);
session->inc_seqnum ();
launch_child (session);
send_attach (session, engine, false);
_socket->event_accepted (_endpoint, fd);
}
void zmq::tcp_listener_t::close ()
{
zmq_assert (_s != retired_fd);
#ifdef ZMQ_HAVE_WINDOWS
int rc = closesocket (_s);
wsa_assert (rc != SOCKET_ERROR);
#else
int rc = ::close (_s);
errno_assert (rc == 0);
#endif
_socket->event_closed (_endpoint, _s);
_s = retired_fd;
create_engine (fd);
}
int zmq::tcp_listener_t::get_address (std::string &addr_)
{
// Get the details of the TCP socket
struct sockaddr_storage ss;
#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
int sl = sizeof (ss);
#else
socklen_t sl = sizeof (ss);
#endif
int rc = getsockname (_s, reinterpret_cast<struct sockaddr *> (&ss), &sl);
if (rc != 0) {
const zmq_socklen_t sl = get_socket_address (&ss);
if (!sl) {
addr_.clear ();
return rc;
return -1;
}
tcp_address_t addr (reinterpret_cast<struct sockaddr *> (&ss), sl);
......
......@@ -31,23 +31,17 @@
#define __ZMQ_TCP_LISTENER_HPP_INCLUDED__
#include "fd.hpp"
#include "own.hpp"
#include "stdint.hpp"
#include "io_object.hpp"
#include "tcp_address.hpp"
#include "stream_listener_base.hpp"
namespace zmq
{
class io_thread_t;
class socket_base_t;
class tcp_listener_t : public own_t, public io_object_t
class tcp_listener_t : public stream_listener_base_t
{
public:
tcp_listener_t (zmq::io_thread_t *io_thread_,
zmq::socket_base_t *socket_,
const options_t &options_);
~tcp_listener_t ();
// Set address to listen on.
int set_address (const char *addr_);
......@@ -56,16 +50,9 @@ class tcp_listener_t : public own_t, public io_object_t
int get_address (std::string &addr_);
private:
// Handlers for incoming commands.
void process_plug ();
void process_term (int linger_);
// Handlers for I/O events.
void in_event ();
// Close the listening socket.
void close ();
// Accept the new connection. Returns the file descriptor of the
// newly created connection. The function may return retired_fd
// if the connection was dropped while waiting in the listen backlog
......@@ -75,18 +62,6 @@ class tcp_listener_t : public own_t, public io_object_t
// Address to listen on.
tcp_address_t _address;
// Underlying socket.
fd_t _s;
// Handle corresponding to the listening socket.
handle_t _handle;
// Socket the listener belongs to.
zmq::socket_base_t *_socket;
// String representation of endpoint to bind to
std::string _endpoint;
tcp_listener_t (const tcp_listener_t &);
const tcp_listener_t &operator= (const tcp_listener_t &);
};
......
......@@ -56,70 +56,18 @@
zmq::tipc_connecter_t::tipc_connecter_t (class io_thread_t *io_thread_,
class session_base_t *session_,
const options_t &options_,
const address_t *addr_,
address_t *addr_,
bool delayed_start_) :
own_t (io_thread_, options_),
io_object_t (io_thread_),
addr (addr_),
s (retired_fd),
handle_valid (false),
delayed_start (delayed_start_),
timer_started (false),
session (session_),
current_reconnect_ivl (options.reconnect_ivl)
stream_connecter_base_t (
io_thread_, session_, options_, addr_, delayed_start_)
{
zmq_assert (addr);
zmq_assert (addr->protocol == "tipc");
addr->to_string (endpoint);
socket = session->get_socket ();
}
zmq::tipc_connecter_t::~tipc_connecter_t ()
{
zmq_assert (!timer_started);
zmq_assert (!handle_valid);
zmq_assert (s == retired_fd);
}
void zmq::tipc_connecter_t::process_plug ()
{
if (delayed_start)
add_reconnect_timer ();
else
start_connecting ();
}
void zmq::tipc_connecter_t::process_term (int linger_)
{
if (timer_started) {
cancel_timer (reconnect_timer_id);
timer_started = false;
}
if (handle_valid) {
rm_fd (handle);
handle_valid = false;
}
if (s != retired_fd)
close ();
own_t::process_term (linger_);
}
void zmq::tipc_connecter_t::in_event ()
{
// We are not polling for incoming data, so we are actually called
// because of error here. However, we can get error on out event as well
// on some platforms, so we'll simply handle both events in the same way.
out_event ();
zmq_assert (_addr->protocol == "tipc");
}
void zmq::tipc_connecter_t::out_event ()
{
fd_t fd = connect ();
rm_fd (handle);
handle_valid = false;
rm_handle ();
// Handle the error condition by attempt to reconnect.
if (fd == retired_fd) {
......@@ -127,25 +75,8 @@ void zmq::tipc_connecter_t::out_event ()
add_reconnect_timer ();
return;
}
// Create the engine object for this connection.
stream_engine_t *engine =
new (std::nothrow) stream_engine_t (fd, options, endpoint);
alloc_assert (engine);
// Attach the engine to the corresponding session object.
send_attach (session, engine);
// Shut the connecter down.
terminate ();
socket->event_connected (endpoint, fd);
}
void zmq::tipc_connecter_t::timer_event (int id_)
{
zmq_assert (id_ == reconnect_timer_id);
timer_started = false;
start_connecting ();
create_engine (fd);
}
void zmq::tipc_connecter_t::start_connecting ()
......@@ -155,79 +86,48 @@ void zmq::tipc_connecter_t::start_connecting ()
// Connect may succeed in synchronous manner.
if (rc == 0) {
handle = add_fd (s);
handle_valid = true;
_handle = add_fd (_s);
out_event ();
}
// Connection establishment may be delayed. Poll for its completion.
else if (rc == -1 && errno == EINPROGRESS) {
handle = add_fd (s);
handle_valid = true;
set_pollout (handle);
socket->event_connect_delayed (endpoint, zmq_errno ());
_handle = add_fd (_s);
set_pollout (_handle);
_socket->event_connect_delayed (_endpoint, zmq_errno ());
}
// Handle any other error condition by eventual reconnect.
else {
if (s != retired_fd)
if (_s != retired_fd)
close ();
add_reconnect_timer ();
}
}
void zmq::tipc_connecter_t::add_reconnect_timer ()
{
if (options.reconnect_ivl != -1) {
int rc_ivl = get_new_reconnect_ivl ();
add_timer (rc_ivl, reconnect_timer_id);
socket->event_connect_retried (endpoint, rc_ivl);
timer_started = true;
}
}
int zmq::tipc_connecter_t::get_new_reconnect_ivl ()
{
// The new interval is the current interval + random value.
int this_interval =
current_reconnect_ivl + (generate_random () % options.reconnect_ivl);
// Only change the current reconnect interval if the maximum reconnect
// interval was set and if it's larger than the reconnect interval.
if (options.reconnect_ivl_max > 0
&& options.reconnect_ivl_max > options.reconnect_ivl) {
// Calculate the next interval
current_reconnect_ivl = current_reconnect_ivl * 2;
if (current_reconnect_ivl >= options.reconnect_ivl_max) {
current_reconnect_ivl = options.reconnect_ivl_max;
}
}
return this_interval;
}
int zmq::tipc_connecter_t::open ()
{
zmq_assert (s == retired_fd);
zmq_assert (_s == retired_fd);
// Cannot connect to random tipc addresses
if (addr->resolved.tipc_addr->is_random ()) {
if (_addr->resolved.tipc_addr->is_random ()) {
errno = EINVAL;
return -1;
}
// Create the socket.
s = open_socket (AF_TIPC, SOCK_STREAM, 0);
if (s == -1)
_s = open_socket (AF_TIPC, SOCK_STREAM, 0);
if (_s == -1)
return -1;
// Set the non-blocking flag.
unblock_socket (s);
unblock_socket (_s);
// Connect to the remote peer.
#ifdef ZMQ_HAVE_VXWORKS
int rc = ::connect (s, (sockaddr *) addr->resolved.tipc_addr->addr (),
addr->resolved.tipc_addr->addrlen ());
#else
int rc = ::connect (s, addr->resolved.tipc_addr->addr (),
addr->resolved.tipc_addr->addrlen ());
int rc = ::connect (_s, _addr->resolved.tipc_addr->addr (),
_addr->resolved.tipc_addr->addrlen ());
#endif
// Connect was successful immediately.
if (rc == 0)
......@@ -243,15 +143,6 @@ int zmq::tipc_connecter_t::open ()
return -1;
}
void zmq::tipc_connecter_t::close ()
{
zmq_assert (s != retired_fd);
int rc = ::close (s);
errno_assert (rc == 0);
socket->event_closed (endpoint, s);
s = retired_fd;
}
zmq::fd_t zmq::tipc_connecter_t::connect ()
{
// Following code should handle both Berkeley-derived socket
......@@ -262,7 +153,7 @@ zmq::fd_t zmq::tipc_connecter_t::connect ()
#else
socklen_t len = sizeof (err);
#endif
int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char *) &err, &len);
int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR, (char *) &err, &len);
if (rc == -1)
err = errno;
if (err != 0) {
......@@ -275,8 +166,8 @@ zmq::fd_t zmq::tipc_connecter_t::connect ()
return retired_fd;
}
fd_t result = s;
s = retired_fd;
fd_t result = _s;
_s = retired_fd;
return result;
}
......
......@@ -35,17 +35,11 @@
#if defined ZMQ_HAVE_TIPC
#include "fd.hpp"
#include "own.hpp"
#include "stdint.hpp"
#include "io_object.hpp"
#include "stream_connecter_base.hpp"
namespace zmq
{
class io_thread_t;
class session_base_t;
struct address_t;
class tipc_connecter_t : public own_t, public io_object_t
class tipc_connecter_t : public stream_connecter_base_t
{
public:
// If 'delayed_start' is true connecter first waits for a while,
......@@ -53,75 +47,20 @@ class tipc_connecter_t : public own_t, public io_object_t
tipc_connecter_t (zmq::io_thread_t *io_thread_,
zmq::session_base_t *session_,
const options_t &options_,
const address_t *addr_,
address_t *addr_,
bool delayed_start_);
~tipc_connecter_t ();
private:
// ID of the timer used to delay the reconnection.
enum
{
reconnect_timer_id = 1
};
// Handlers for incoming commands.
void process_plug ();
void process_term (int linger_);
// Handlers for I/O events.
void in_event ();
void out_event ();
void timer_event (int id_);
// Internal function to start the actual connection establishment.
void start_connecting ();
// Internal function to add a reconnect timer
void add_reconnect_timer ();
// Close the connecting socket.
void close ();
// Get the file descriptor of newly created connection. Returns
// retired_fd if the connection was unsuccessful.
fd_t connect ();
// Address to connect to. Owned by session_base_t.
const address_t *addr;
// Underlying socket.
fd_t s;
// Handle corresponding to the listening socket.
handle_t handle;
// If true file descriptor is registered with the poller and 'handle'
// contains valid value.
bool handle_valid;
// If true, connecter is waiting a while before trying to connect.
const bool delayed_start;
// True iff a timer has been started.
bool timer_started;
// Reference to the session we belong to.
zmq::session_base_t *session;
// Current reconnect ivl, updated for backoff strategy
int current_reconnect_ivl;
// String representation of endpoint to connect to
std::string endpoint;
// Socket
zmq::socket_base_t *socket;
// Internal function to return a reconnect backoff delay.
// Will modify the current_reconnect_ivl used for next call
// Returns the currently used interval
int get_new_reconnect_ivl ();
// Open IPC connecting socket. Returns -1 in case of error,
// 0 if connect was successful immediately. Returns -1 with
// EAGAIN errno if async connect was launched.
......
......@@ -37,10 +37,8 @@
#include <string.h>
#include "stream_engine.hpp"
#include "tipc_address.hpp"
#include "io_thread.hpp"
#include "session_base.hpp"
#include "config.hpp"
#include "err.hpp"
#include "ip.hpp"
......@@ -59,32 +57,10 @@
zmq::tipc_listener_t::tipc_listener_t (io_thread_t *io_thread_,
socket_base_t *socket_,
const options_t &options_) :
own_t (io_thread_, options_),
io_object_t (io_thread_),
s (retired_fd),
socket (socket_)
stream_listener_base_t (io_thread_, socket_, options_)
{
}
zmq::tipc_listener_t::~tipc_listener_t ()
{
zmq_assert (s == retired_fd);
}
void zmq::tipc_listener_t::process_plug ()
{
// Start polling for incoming connections.
handle = add_fd (s);
set_pollin (handle);
}
void zmq::tipc_listener_t::process_term (int linger_)
{
rm_fd (handle);
close ();
own_t::process_term (linger_);
}
void zmq::tipc_listener_t::in_event ()
{
fd_t fd = accept ();
......@@ -92,43 +68,21 @@ void zmq::tipc_listener_t::in_event ()
// If connection was reset by the peer in the meantime, just ignore it.
// TODO: Handle specific errors like ENFILE/EMFILE etc.
if (fd == retired_fd) {
socket->event_accept_failed (endpoint, zmq_errno ());
_socket->event_accept_failed (_endpoint, zmq_errno ());
return;
}
// Create the engine object for this connection.
stream_engine_t *engine =
new (std::nothrow) stream_engine_t (fd, options, endpoint);
alloc_assert (engine);
// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_assert (io_thread);
// Create and launch a session object.
session_base_t *session =
session_base_t::create (io_thread, false, socket, options, NULL);
errno_assert (session);
session->inc_seqnum ();
launch_child (session);
send_attach (session, engine, false);
socket->event_accepted (endpoint, fd);
create_engine (fd);
}
int zmq::tipc_listener_t::get_address (std::string &addr_)
{
struct sockaddr_storage ss;
socklen_t sl = sizeof (ss);
#ifdef ZMQ_HAVE_VXWORKS
int rc = getsockname (s, (sockaddr *) &ss, (int *) &sl);
#else
int rc = getsockname (s, (sockaddr *) &ss, &sl);
#endif
if (rc != 0) {
const zmq_socklen_t sl = get_socket_address (&ss);
if (!sl) {
addr_.clear ();
return rc;
return -1;
}
tipc_address_t addr ((struct sockaddr *) &ss, sl);
......@@ -138,57 +92,52 @@ int zmq::tipc_listener_t::get_address (std::string &addr_)
int zmq::tipc_listener_t::set_address (const char *addr_)
{
// Convert str to address struct
int rc = address.resolve (addr_);
int rc = _address.resolve (addr_);
if (rc != 0)
return -1;
// Cannot bind non-random Port Identity
struct sockaddr_tipc *a = (sockaddr_tipc *) address.addr ();
if (!address.is_random () && a->addrtype == TIPC_ADDR_ID) {
struct sockaddr_tipc *a = (sockaddr_tipc *) _address.addr ();
if (!_address.is_random () && a->addrtype == TIPC_ADDR_ID) {
errno = EINVAL;
return -1;
}
// Create a listening socket.
s = open_socket (AF_TIPC, SOCK_STREAM, 0);
if (s == -1)
_s = open_socket (AF_TIPC, SOCK_STREAM, 0);
if (_s == -1)
return -1;
// If random Port Identity, update address object to reflect the assigned address
if (address.is_random ()) {
if (_address.is_random ()) {
struct sockaddr_storage ss;
#ifdef ZMQ_HAVE_VXWORKS
int sl = sizeof (ss);
#else
socklen_t sl = sizeof (ss);
#endif
int rc = getsockname (s, (sockaddr *) &ss, &sl);
if (rc != 0)
const zmq_socklen_t sl = get_socket_address (&ss);
if (sl == 0)
goto error;
address = tipc_address_t ((struct sockaddr *) &ss, sl);
_address = tipc_address_t ((struct sockaddr *) &ss, sl);
}
address.to_string (endpoint);
_address.to_string (_endpoint);
// Bind the socket to tipc name
if (address.is_service ()) {
if (_address.is_service ()) {
#ifdef ZMQ_HAVE_VXWORKS
rc = bind (s, (sockaddr *) address.addr (), address.addrlen ());
rc = bind (_s, (sockaddr *) address.addr (), address.addrlen ());
#else
rc = bind (s, address.addr (), address.addrlen ());
rc = bind (_s, _address.addr (), _address.addrlen ());
#endif
if (rc != 0)
goto error;
}
// Listen for incoming connections.
rc = listen (s, options.backlog);
rc = listen (_s, options.backlog);
if (rc != 0)
goto error;
socket->event_listening (endpoint, s);
_socket->event_listening (_endpoint, _s);
return 0;
error:
......@@ -198,15 +147,6 @@ error:
return -1;
}
void zmq::tipc_listener_t::close ()
{
zmq_assert (s != retired_fd);
int rc = ::close (s);
errno_assert (rc == 0);
s = retired_fd;
socket->event_closed (endpoint, s);
}
zmq::fd_t zmq::tipc_listener_t::accept ()
{
// Accept one connection and deal with different failure modes.
......@@ -215,11 +155,11 @@ zmq::fd_t zmq::tipc_listener_t::accept ()
struct sockaddr_storage ss = {};
socklen_t ss_len = sizeof (ss);
zmq_assert (s != retired_fd);
zmq_assert (_s != retired_fd);
#ifdef ZMQ_HAVE_VXWORKS
fd_t sock = ::accept (s, (struct sockaddr *) &ss, (int *) &ss_len);
fd_t sock = ::accept (_s, (struct sockaddr *) &ss, (int *) &ss_len);
#else
fd_t sock = ::accept (s, (struct sockaddr *) &ss, &ss_len);
fd_t sock = ::accept (_s, (struct sockaddr *) &ss, &ss_len);
#endif
if (sock == -1) {
errno_assert (errno == EAGAIN || errno == EWOULDBLOCK
......
......@@ -37,23 +37,17 @@
#include <string>
#include "fd.hpp"
#include "own.hpp"
#include "stdint.hpp"
#include "io_object.hpp"
#include "stream_listener_base.hpp"
#include "tipc_address.hpp"
namespace zmq
{
class io_thread_t;
class socket_base_t;
class tipc_listener_t : public own_t, public io_object_t
class tipc_listener_t : public stream_listener_base_t
{
public:
tipc_listener_t (zmq::io_thread_t *io_thread_,
zmq::socket_base_t *socket_,
const options_t &options_);
~tipc_listener_t ();
// Set address to listen on.
int set_address (const char *addr_);
......@@ -62,36 +56,16 @@ class tipc_listener_t : public own_t, public io_object_t
int get_address (std::string &addr_);
private:
// Handlers for incoming commands.
void process_plug ();
void process_term (int linger_);
// Handlers for I/O events.
void in_event ();
// Close the listening socket.
void close ();
// Accept the new connection. Returns the file descriptor of the
// newly created connection. The function may return retired_fd
// if the connection was dropped while waiting in the listen backlog.
fd_t accept ();
// Address to listen on
tipc_address_t address;
// Underlying socket.
fd_t s;
// Handle corresponding to the listening socket.
handle_t handle;
// Socket the listener belongs to.
zmq::socket_base_t *socket;
// String representation of endpoint to bind to
std::string endpoint;
tipc_address_t _address;
tipc_listener_t (const tipc_listener_t &);
const tipc_listener_t &operator= (const tipc_listener_t &);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment