tcp_listener.cpp 10.7 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31 32
#include <new>

33
#include <string>
34
#include <stdio.h>
35

36
#include "tcp_listener.hpp"
37
#include "stream_engine.hpp"
38
#include "io_thread.hpp"
39
#include "session_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
40 41
#include "config.hpp"
#include "err.hpp"
42
#include "ip.hpp"
43
#include "tcp.hpp"
44
#include "socket_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
45

46
#ifndef ZMQ_HAVE_WINDOWS
47 48 49 50 51 52 53 54
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <netdb.h>
#include <fcntl.h>
#endif
55 56 57

#ifdef ZMQ_HAVE_OPENVMS
#include <ioctl.h>
58
#endif
Martin Sustrik's avatar
Martin Sustrik committed
59

60
zmq::tcp_listener_t::tcp_listener_t (io_thread_t *io_thread_,
61 62
                                     socket_base_t *socket_,
                                     const options_t &options_) :
63 64 65
    own_t (io_thread_, options_),
    io_object_t (io_thread_),
    s (retired_fd),
66
    handle ((handle_t) NULL),
67
    socket (socket_)
68 69 70 71 72
{
}

zmq::tcp_listener_t::~tcp_listener_t ()
{
73
    zmq_assert (s == retired_fd);
74
    zmq_assert (!handle);
75 76
}

77 78 79 80 81 82 83 84 85 86
void zmq::tcp_listener_t::process_plug ()
{
    //  Start polling for incoming connections.
    handle = add_fd (s);
    set_pollin (handle);
}

void zmq::tcp_listener_t::process_term (int linger_)
{
    rm_fd (handle);
87
    handle = (handle_t) NULL;
88
    close ();
89 90 91 92 93 94 95 96 97
    own_t::process_term (linger_);
}

void zmq::tcp_listener_t::in_event ()
{
    fd_t fd = accept ();

    //  If connection was reset by the peer in the meantime, just ignore it.
    //  TODO: Handle specific errors like ENFILE/EMFILE etc.
98
    if (fd == retired_fd) {
99
        socket->event_accept_failed (endpoint, zmq_errno ());
100
        return;
101
    }
102

103
    int rc = tune_tcp_socket (fd);
104 105 106 107
    rc = rc
         | tune_tcp_keepalives (
             fd, options.tcp_keepalive, options.tcp_keepalive_cnt,
             options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
108 109
    rc = rc | tune_tcp_maxrt (fd, options.tcp_maxrt);
    if (rc != 0) {
110
        socket->event_accept_failed (endpoint, zmq_errno ());
111 112
        return;
    }
113

114
    //  Create the engine object for this connection.
115 116
    stream_engine_t *engine =
      new (std::nothrow) stream_engine_t (fd, options, endpoint);
117 118 119 120 121 122 123
    alloc_assert (engine);

    //  Choose I/O thread to run connecter in. Given that we are already
    //  running in an I/O thread, there must be at least one available.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    zmq_assert (io_thread);

124
    //  Create and launch a session object.
125 126
    session_base_t *session =
      session_base_t::create (io_thread, false, socket, options, NULL);
127
    errno_assert (session);
128 129 130
    session->inc_seqnum ();
    launch_child (session);
    send_attach (session, engine, false);
131
    socket->event_accepted (endpoint, (int) fd);
132 133
}

134 135 136
void zmq::tcp_listener_t::close ()
{
    zmq_assert (s != retired_fd);
137
#ifdef ZMQ_HAVE_WINDOWS
138 139 140 141 142 143
    int rc = closesocket (s);
    wsa_assert (rc != SOCKET_ERROR);
#else
    int rc = ::close (s);
    errno_assert (rc == 0);
#endif
144
    socket->event_closed (endpoint, (int) s);
145 146
    s = retired_fd;
}
147

148
int zmq::tcp_listener_t::get_address (std::string &addr_)
149
{
150
    // Get the details of the TCP socket
151
    struct sockaddr_storage ss;
AJ Lewis's avatar
AJ Lewis committed
152 153 154
#ifdef ZMQ_HAVE_HPUX
    int sl = sizeof (ss);
#else
155
    socklen_t sl = sizeof (ss);
AJ Lewis's avatar
AJ Lewis committed
156
#endif
157
    int rc = getsockname (s, (struct sockaddr *) &ss, &sl);
158

159
    if (rc != 0) {
160
        addr_.clear ();
161 162
        return rc;
    }
163

164 165
    tcp_address_t addr ((struct sockaddr *) &ss, sl);
    return addr.to_string (addr_);
166 167
}

168
int zmq::tcp_listener_t::set_address (const char *addr_)
169
{
170
    //  Convert the textual address into address structure.
Pieter Hintjens's avatar
Pieter Hintjens committed
171
    int rc = address.resolve (addr_, true, options.ipv6);
172
    if (rc != 0)
173
        return -1;
174

175 176
    address.to_string (endpoint);

177 178
    if (options.use_fd != -1) {
        s = options.use_fd;
179 180 181 182
        socket->event_listening (endpoint, (int) s);
        return 0;
    }

183
    //  Create a listening socket.
184
    s = open_socket (address.family (), SOCK_STREAM, IPPROTO_TCP);
185 186

    //  IPv6 address family not supported, try automatic downgrade to IPv4.
187
    if (s == zmq::retired_fd && address.family () == AF_INET6
188
        && errno == EAFNOSUPPORT && options.ipv6) {
189
        rc = address.resolve (addr_, true, false);
190 191
        if (rc != 0)
            return rc;
192
        s = open_socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
193 194
    }

195
#ifdef ZMQ_HAVE_WINDOWS
196
    if (s == INVALID_SOCKET) {
197
        errno = wsa_error_to_errno (WSAGetLastError ());
198 199
        return -1;
    }
200
#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
201 202 203
    //  On Windows, preventing sockets to be inherited by child processes.
    BOOL brc = SetHandleInformation ((HANDLE) s, HANDLE_FLAG_INHERIT, 0);
    win_assert (brc);
204
#endif
205 206 207 208
#else
    if (s == -1)
        return -1;
#endif
209

210 211
    //  On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
    //  Switch it on in such cases.
212
    if (address.family () == AF_INET6)
213
        enable_ipv4_mapping (s);
214

215 216 217 218
    // Set the IP Type-Of-Service for the underlying socket
    if (options.tos != 0)
        set_ip_type_of_service (s, options.tos);

219 220 221 222
    // Set the socket to loopback fastpath if configured.
    if (options.loopback_fastpath)
        tcp_tune_loopback_fast_path (s);

223 224 225 226
    // Bind the socket to a device if applicable
    if (!options.bound_device.empty ())
        bind_to_device (s, options.bound_device);

227
    //  Set the socket buffer limits for the underlying socket.
228
    if (options.sndbuf >= 0)
229
        set_tcp_send_buffer (s, options.sndbuf);
230
    if (options.rcvbuf >= 0)
231
        set_tcp_receive_buffer (s, options.rcvbuf);
232

233 234
    //  Allow reusing of the address.
    int flag = 1;
235
#ifdef ZMQ_HAVE_WINDOWS
236 237
    rc = setsockopt (s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (const char *) &flag,
                     sizeof (int));
238
    wsa_assert (rc != SOCKET_ERROR);
239 240 241 242
#else
    rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
    errno_assert (rc == 0);
#endif
243 244

    //  Bind the socket to the network interface and port.
245
    rc = bind (s, address.addr (), address.addrlen ());
246
#ifdef ZMQ_HAVE_WINDOWS
247
    if (rc == SOCKET_ERROR) {
248
        errno = wsa_error_to_errno (WSAGetLastError ());
249
        goto error;
250
    }
Martin Sustrik's avatar
Martin Sustrik committed
251
#else
252
    if (rc != 0)
253
        goto error;
254
#endif
255

256
    //  Listen for incoming connections.
257
    rc = listen (s, options.backlog);
258 259
#ifdef ZMQ_HAVE_WINDOWS
    if (rc == SOCKET_ERROR) {
260
        errno = wsa_error_to_errno (WSAGetLastError ());
261
        goto error;
262
    }
263
#else
Martin Sustrik's avatar
Martin Sustrik committed
264
    if (rc != 0)
265
        goto error;
Brett Cameron's avatar
Brett Cameron committed
266
#endif
267

268
    socket->event_listening (endpoint, (int) s);
Martin Sustrik's avatar
Martin Sustrik committed
269
    return 0;
270 271 272 273 274 275

error:
    int err = errno;
    close ();
    errno = err;
    return -1;
Martin Sustrik's avatar
Martin Sustrik committed
276 277
}

Martin Sustrik's avatar
Martin Sustrik committed
278
zmq::fd_t zmq::tcp_listener_t::accept ()
Martin Sustrik's avatar
Martin Sustrik committed
279
{
280 281
    //  The situation where connection cannot be accepted due to insufficient
    //  resources is considered valid and treated by ignoring the connection.
282
    //  Accept one connection and deal with different failure modes.
Martin Sustrik's avatar
Martin Sustrik committed
283
    zmq_assert (s != retired_fd);
284

285 286
    struct sockaddr_storage ss;
    memset (&ss, 0, sizeof (ss));
AJ Lewis's avatar
AJ Lewis committed
287 288 289
#ifdef ZMQ_HAVE_HPUX
    int ss_len = sizeof (ss);
#else
290
    socklen_t ss_len = sizeof (ss);
AJ Lewis's avatar
AJ Lewis committed
291
#endif
292
#if defined ZMQ_HAVE_SOCK_CLOEXEC && defined HAVE_ACCEPT4
293 294
    fd_t sock = ::accept4 (s, (struct sockaddr *) &ss, &ss_len, SOCK_CLOEXEC);
#else
295
    fd_t sock = ::accept (s, (struct sockaddr *) &ss, &ss_len);
296
#endif
297

298
#ifdef ZMQ_HAVE_WINDOWS
299
    if (sock == INVALID_SOCKET) {
300 301 302
        const int last_error = WSAGetLastError ();
        wsa_assert (last_error == WSAEWOULDBLOCK || last_error == WSAECONNRESET
                    || last_error == WSAEMFILE || last_error == WSAENOBUFS);
Martin Sustrik's avatar
Martin Sustrik committed
303
        return retired_fd;
304
    }
305
#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
306 307 308
    //  On Windows, preventing sockets to be inherited by child processes.
    BOOL brc = SetHandleInformation ((HANDLE) sock, HANDLE_FLAG_INHERIT, 0);
    win_assert (brc);
309
#endif
Brett Cameron's avatar
Brett Cameron committed
310
#else
311
    if (sock == -1) {
312 313 314 315
        errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR
                      || errno == ECONNABORTED || errno == EPROTO
                      || errno == ENOBUFS || errno == ENOMEM || errno == EMFILE
                      || errno == ENFILE);
316 317
        return retired_fd;
    }
Brett Cameron's avatar
Brett Cameron committed
318
#endif
319

320 321
#if (!defined ZMQ_HAVE_SOCK_CLOEXEC || !defined HAVE_ACCEPT4)                  \
  && defined FD_CLOEXEC
322 323 324 325 326 327
    //  Race condition can cause socket not to be closed (if fork happens
    //  between accept and this point).
    int rc = fcntl (sock, F_SETFD, FD_CLOEXEC);
    errno_assert (rc != -1);
#endif

328 329
    if (!options.tcp_accept_filters.empty ()) {
        bool matched = false;
330 331 332 333
        for (options_t::tcp_accept_filters_t::size_type i = 0;
             i != options.tcp_accept_filters.size (); ++i) {
            if (options.tcp_accept_filters[i].match_address (
                  (struct sockaddr *) &ss, ss_len)) {
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349
                matched = true;
                break;
            }
        }
        if (!matched) {
#ifdef ZMQ_HAVE_WINDOWS
            int rc = closesocket (sock);
            wsa_assert (rc != SOCKET_ERROR);
#else
            int rc = ::close (sock);
            errno_assert (rc == 0);
#endif
            return retired_fd;
        }
    }

350 351 352 353 354 355 356 357 358 359 360
    if (zmq::set_nosigpipe (sock)) {
#ifdef ZMQ_HAVE_WINDOWS
        int rc = closesocket (sock);
        wsa_assert (rc != SOCKET_ERROR);
#else
        int rc = ::close (sock);
        errno_assert (rc == 0);
#endif
        return retired_fd;
    }

361 362 363 364
    // Set the IP Type-Of-Service priority for this client socket
    if (options.tos != 0)
        set_ip_type_of_service (sock, options.tos);

Martin Sustrik's avatar
Martin Sustrik committed
365 366
    return sock;
}