tcp_listener.cpp 10.3 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31 32
#include <new>

33
#include <string>
34
#include <stdio.h>
35

36
#include "tcp_listener.hpp"
37
#include "stream_engine.hpp"
38
#include "io_thread.hpp"
39
#include "session_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
40 41
#include "config.hpp"
#include "err.hpp"
42
#include "ip.hpp"
43
#include "tcp.hpp"
44
#include "socket_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
45

46
#ifndef ZMQ_HAVE_WINDOWS
47 48 49 50 51 52 53 54
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <netdb.h>
#include <fcntl.h>
#endif
55 56 57

#ifdef ZMQ_HAVE_OPENVMS
#include <ioctl.h>
58
#endif
Martin Sustrik's avatar
Martin Sustrik committed
59

60 61 62 63 64
zmq::tcp_listener_t::tcp_listener_t (io_thread_t *io_thread_,
      socket_base_t *socket_, const options_t &options_) :
    own_t (io_thread_, options_),
    io_object_t (io_thread_),
    s (retired_fd),
65
    handle((handle_t)NULL),
66
    socket (socket_)
67 68 69 70 71
{
}

zmq::tcp_listener_t::~tcp_listener_t ()
{
72
    zmq_assert (s == retired_fd);
73 74
}

75 76 77 78 79 80 81 82 83 84
void zmq::tcp_listener_t::process_plug ()
{
    //  Start polling for incoming connections.
    handle = add_fd (s);
    set_pollin (handle);
}

void zmq::tcp_listener_t::process_term (int linger_)
{
    rm_fd (handle);
85
    close ();
86 87 88 89 90 91 92 93 94
    own_t::process_term (linger_);
}

void zmq::tcp_listener_t::in_event ()
{
    fd_t fd = accept ();

    //  If connection was reset by the peer in the meantime, just ignore it.
    //  TODO: Handle specific errors like ENFILE/EMFILE etc.
95
    if (fd == retired_fd) {
96
        socket->event_accept_failed (endpoint, zmq_errno());
97
        return;
98
    }
99

100 101 102 103 104 105 106 107
    int rc = tune_tcp_socket (fd);
    rc = rc | tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt,
        options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
    rc = rc | tune_tcp_maxrt (fd, options.tcp_maxrt);
    if (rc != 0) {
        socket->event_accept_failed (endpoint, zmq_errno());
        return;
    }
108

109
    //  Create the engine object for this connection.
110
    stream_engine_t *engine = new (std::nothrow)
111
        stream_engine_t (fd, options, endpoint);
112 113 114 115 116 117 118
    alloc_assert (engine);

    //  Choose I/O thread to run connecter in. Given that we are already
    //  running in an I/O thread, there must be at least one available.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    zmq_assert (io_thread);

119
    //  Create and launch a session object.
120
    session_base_t *session = session_base_t::create (io_thread, false, socket,
121
        options, NULL);
122
    errno_assert (session);
123 124 125
    session->inc_seqnum ();
    launch_child (session);
    send_attach (session, engine, false);
126
    socket->event_accepted (endpoint, (int) fd);
127 128
}

129 130 131
void zmq::tcp_listener_t::close ()
{
    zmq_assert (s != retired_fd);
132
#ifdef ZMQ_HAVE_WINDOWS
133 134 135 136 137 138
    int rc = closesocket (s);
    wsa_assert (rc != SOCKET_ERROR);
#else
    int rc = ::close (s);
    errno_assert (rc == 0);
#endif
139
    socket->event_closed (endpoint, (int) s);
140 141
    s = retired_fd;
}
142

143
int zmq::tcp_listener_t::get_address (std::string &addr_)
144
{
145
    // Get the details of the TCP socket
146
    struct sockaddr_storage ss;
AJ Lewis's avatar
AJ Lewis committed
147 148 149
#ifdef ZMQ_HAVE_HPUX
    int sl = sizeof (ss);
#else
150
    socklen_t sl = sizeof (ss);
AJ Lewis's avatar
AJ Lewis committed
151
#endif
152
    int rc = getsockname (s, (struct sockaddr *) &ss, &sl);
153

154
    if (rc != 0) {
155
        addr_.clear ();
156 157
        return rc;
    }
158

159 160
    tcp_address_t addr ((struct sockaddr *) &ss, sl);
    return addr.to_string (addr_);
161 162
}

163
int zmq::tcp_listener_t::set_address (const char *addr_)
164
{
165
    //  Convert the textual address into address structure.
Pieter Hintjens's avatar
Pieter Hintjens committed
166
    int rc = address.resolve (addr_, true, options.ipv6);
167
    if (rc != 0)
168
        return -1;
169

170 171
    address.to_string (endpoint);

172 173
    if (options.use_fd != -1) {
        s = options.use_fd;
174 175 176 177
        socket->event_listening (endpoint, (int) s);
        return 0;
    }

178
    //  Create a listening socket.
179
    s = open_socket (address.family (), SOCK_STREAM, IPPROTO_TCP);
180 181

    //  IPv6 address family not supported, try automatic downgrade to IPv4.
182
    if (s == zmq::retired_fd && address.family () == AF_INET6
Pieter Hintjens's avatar
Pieter Hintjens committed
183 184
    && errno == EAFNOSUPPORT
    && options.ipv6) {
185
        rc = address.resolve (addr_, true, false);
186 187
        if (rc != 0)
            return rc;
188
        s = open_socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
189 190
    }

191
#ifdef ZMQ_HAVE_WINDOWS
192
    if (s == INVALID_SOCKET) {
193
        errno = wsa_error_to_errno (WSAGetLastError ());
194 195
        return -1;
    }
196
#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
197 198 199
    //  On Windows, preventing sockets to be inherited by child processes.
    BOOL brc = SetHandleInformation ((HANDLE) s, HANDLE_FLAG_INHERIT, 0);
    win_assert (brc);
200
#endif
201 202 203 204
#else
    if (s == -1)
        return -1;
#endif
205

206 207
    //  On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
    //  Switch it on in such cases.
208
    if (address.family () == AF_INET6)
209
        enable_ipv4_mapping (s);
210

211 212 213 214
    // Set the IP Type-Of-Service for the underlying socket
    if (options.tos != 0)
        set_ip_type_of_service (s, options.tos);

215 216 217 218
    // Bind the socket to a device if applicable
    if (!options.bound_device.empty ())
        bind_to_device (s, options.bound_device);

219
    //  Set the socket buffer limits for the underlying socket.
220
    if (options.sndbuf >= 0)
221
        set_tcp_send_buffer (s, options.sndbuf);
222
    if (options.rcvbuf >= 0)
223
        set_tcp_receive_buffer (s, options.rcvbuf);
224

225 226
    //  Allow reusing of the address.
    int flag = 1;
227
#ifdef ZMQ_HAVE_WINDOWS
228
    rc = setsockopt (s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE,
229 230
        (const char*) &flag, sizeof (int));
    wsa_assert (rc != SOCKET_ERROR);
231 232 233 234
#else
    rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
    errno_assert (rc == 0);
#endif
235 236

    //  Bind the socket to the network interface and port.
237
    rc = bind (s, address.addr (), address.addrlen ());
238
#ifdef ZMQ_HAVE_WINDOWS
239
    if (rc == SOCKET_ERROR) {
240
        errno = wsa_error_to_errno (WSAGetLastError ());
241
        goto error;
242
    }
Martin Sustrik's avatar
Martin Sustrik committed
243
#else
244
    if (rc != 0)
245
        goto error;
246
#endif
247

248
    //  Listen for incoming connections.
249
    rc = listen (s, options.backlog);
250 251
#ifdef ZMQ_HAVE_WINDOWS
    if (rc == SOCKET_ERROR) {
252
        errno = wsa_error_to_errno (WSAGetLastError ());
253
        goto error;
254
    }
255
#else
Martin Sustrik's avatar
Martin Sustrik committed
256
    if (rc != 0)
257
        goto error;
Brett Cameron's avatar
Brett Cameron committed
258
#endif
259

260
    socket->event_listening (endpoint, (int) s);
Martin Sustrik's avatar
Martin Sustrik committed
261
    return 0;
262 263 264 265 266 267

error:
    int err = errno;
    close ();
    errno = err;
    return -1;
Martin Sustrik's avatar
Martin Sustrik committed
268 269
}

Martin Sustrik's avatar
Martin Sustrik committed
270
zmq::fd_t zmq::tcp_listener_t::accept ()
Martin Sustrik's avatar
Martin Sustrik committed
271
{
272 273
    //  The situation where connection cannot be accepted due to insufficient
    //  resources is considered valid and treated by ignoring the connection.
274
    //  Accept one connection and deal with different failure modes.
Martin Sustrik's avatar
Martin Sustrik committed
275
    zmq_assert (s != retired_fd);
276

277 278
    struct sockaddr_storage ss;
    memset (&ss, 0, sizeof (ss));
AJ Lewis's avatar
AJ Lewis committed
279 280 281
#ifdef ZMQ_HAVE_HPUX
    int ss_len = sizeof (ss);
#else
282
    socklen_t ss_len = sizeof (ss);
AJ Lewis's avatar
AJ Lewis committed
283
#endif
284 285 286
#if defined ZMQ_HAVE_SOCK_CLOEXEC
    fd_t sock = ::accept4 (s, (struct sockaddr *) &ss, &ss_len, SOCK_CLOEXEC);
#else
287
    fd_t sock = ::accept (s, (struct sockaddr *) &ss, &ss_len);
288
#endif
289

290
#ifdef ZMQ_HAVE_WINDOWS
291
    if (sock == INVALID_SOCKET) {
292
        const int last_error = WSAGetLastError();
293 294 295 296
        wsa_assert (last_error == WSAEWOULDBLOCK ||
            last_error == WSAECONNRESET ||
            last_error == WSAEMFILE ||
            last_error == WSAENOBUFS);
Martin Sustrik's avatar
Martin Sustrik committed
297
        return retired_fd;
298
    }
299
#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
300 301 302
    //  On Windows, preventing sockets to be inherited by child processes.
    BOOL brc = SetHandleInformation ((HANDLE) sock, HANDLE_FLAG_INHERIT, 0);
    win_assert (brc);
303
#endif
Brett Cameron's avatar
Brett Cameron committed
304
#else
305 306
    if (sock == -1) {
        errno_assert (errno == EAGAIN || errno == EWOULDBLOCK ||
307
            errno == EINTR || errno == ECONNABORTED || errno == EPROTO ||
308 309
            errno == ENOBUFS || errno == ENOMEM || errno == EMFILE ||
            errno == ENFILE);
310 311
        return retired_fd;
    }
Brett Cameron's avatar
Brett Cameron committed
312
#endif
313

314
#if !defined ZMQ_HAVE_SOCK_CLOEXEC && defined FD_CLOEXEC
315 316 317 318 319 320
    //  Race condition can cause socket not to be closed (if fork happens
    //  between accept and this point).
    int rc = fcntl (sock, F_SETFD, FD_CLOEXEC);
    errno_assert (rc != -1);
#endif

321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340
    if (!options.tcp_accept_filters.empty ()) {
        bool matched = false;
        for (options_t::tcp_accept_filters_t::size_type i = 0; i != options.tcp_accept_filters.size (); ++i) {
            if (options.tcp_accept_filters[i].match_address ((struct sockaddr *) &ss, ss_len)) {
                matched = true;
                break;
            }
        }
        if (!matched) {
#ifdef ZMQ_HAVE_WINDOWS
            int rc = closesocket (sock);
            wsa_assert (rc != SOCKET_ERROR);
#else
            int rc = ::close (sock);
            errno_assert (rc == 0);
#endif
            return retired_fd;
        }
    }

341 342 343 344 345 346 347 348 349 350 351
    if (zmq::set_nosigpipe (sock)) {
#ifdef ZMQ_HAVE_WINDOWS
        int rc = closesocket (sock);
        wsa_assert (rc != SOCKET_ERROR);
#else
        int rc = ::close (sock);
        errno_assert (rc == 0);
#endif
        return retired_fd;
    }

352 353 354 355
    // Set the IP Type-Of-Service priority for this client socket
    if (options.tos != 0)
        set_ip_type_of_service (sock, options.tos);

Martin Sustrik's avatar
Martin Sustrik committed
356 357
    return sock;
}