tcp_listener.cpp 9.52 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30 31
#include <new>

32
#include <string>
33
#include <stdio.h>
34

Martin Sustrik's avatar
Martin Sustrik committed
35
#include "platform.hpp"
36
#include "tcp_listener.hpp"
37
#include "stream_engine.hpp"
38
#include "io_thread.hpp"
39
#include "session_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
40 41
#include "config.hpp"
#include "err.hpp"
42
#include "ip.hpp"
43
#include "tcp.hpp"
44
#include "socket_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
45

Martin Sustrik's avatar
Martin Sustrik committed
46
#ifdef ZMQ_HAVE_WINDOWS
47 48 49 50 51 52 53 54 55 56
#include "windows.hpp"
#else
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <netdb.h>
#include <fcntl.h>
#endif
57 58 59

#ifdef ZMQ_HAVE_OPENVMS
#include <ioctl.h>
60
#endif
Martin Sustrik's avatar
Martin Sustrik committed
61

62 63 64 65 66 67
zmq::tcp_listener_t::tcp_listener_t (io_thread_t *io_thread_,
      socket_base_t *socket_, const options_t &options_) :
    own_t (io_thread_, options_),
    io_object_t (io_thread_),
    s (retired_fd),
    socket (socket_)
68 69 70 71 72
{
}

zmq::tcp_listener_t::~tcp_listener_t ()
{
73
    zmq_assert (s == retired_fd);
74 75
}

76 77 78 79 80 81 82 83 84 85
void zmq::tcp_listener_t::process_plug ()
{
    //  Start polling for incoming connections.
    handle = add_fd (s);
    set_pollin (handle);
}

void zmq::tcp_listener_t::process_term (int linger_)
{
    rm_fd (handle);
86
    close ();
87 88 89 90 91 92 93 94 95
    own_t::process_term (linger_);
}

void zmq::tcp_listener_t::in_event ()
{
    fd_t fd = accept ();

    //  If connection was reset by the peer in the meantime, just ignore it.
    //  TODO: Handle specific errors like ENFILE/EMFILE etc.
96
    if (fd == retired_fd) {
97
        socket->event_accept_failed (endpoint, zmq_errno());
98
        return;
99
    }
100

101
    tune_tcp_socket (fd);
102
    tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
103

104 105 106
    // remember our fd for ZMQ_SRCFD in messages
    socket->set_fd(fd);

107
    //  Create the engine object for this connection.
108
    stream_engine_t *engine = new (std::nothrow)
109
        stream_engine_t (fd, options, endpoint);
110 111 112 113 114 115 116
    alloc_assert (engine);

    //  Choose I/O thread to run connecter in. Given that we are already
    //  running in an I/O thread, there must be at least one available.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    zmq_assert (io_thread);

117
    //  Create and launch a session object.
118
    session_base_t *session = session_base_t::create (io_thread, false, socket,
119
        options, NULL);
120
    errno_assert (session);
121 122 123
    session->inc_seqnum ();
    launch_child (session);
    send_attach (session, engine, false);
124
    socket->event_accepted (endpoint, (int) fd);
125 126
}

127 128 129
void zmq::tcp_listener_t::close ()
{
    zmq_assert (s != retired_fd);
130
#ifdef ZMQ_HAVE_WINDOWS
131 132 133 134 135 136
    int rc = closesocket (s);
    wsa_assert (rc != SOCKET_ERROR);
#else
    int rc = ::close (s);
    errno_assert (rc == 0);
#endif
137
    socket->event_closed (endpoint, (int) s);
138 139
    s = retired_fd;
}
140

141
int zmq::tcp_listener_t::get_address (std::string &addr_)
142
{
143
    // Get the details of the TCP socket
144
    struct sockaddr_storage ss;
AJ Lewis's avatar
AJ Lewis committed
145 146 147
#ifdef ZMQ_HAVE_HPUX
    int sl = sizeof (ss);
#else
148
    socklen_t sl = sizeof (ss);
AJ Lewis's avatar
AJ Lewis committed
149
#endif
150
    int rc = getsockname (s, (struct sockaddr *) &ss, &sl);
151

152
    if (rc != 0) {
153
        addr_.clear ();
154 155
        return rc;
    }
156

157 158
    tcp_address_t addr ((struct sockaddr *) &ss, sl);
    return addr.to_string (addr_);
159 160
}

161
int zmq::tcp_listener_t::set_address (const char *addr_)
162
{
163
    //  Convert the textual address into address structure.
Pieter Hintjens's avatar
Pieter Hintjens committed
164
    int rc = address.resolve (addr_, true, options.ipv6);
165
    if (rc != 0)
166
        return -1;
167 168

    //  Create a listening socket.
169
    s = open_socket (address.family (), SOCK_STREAM, IPPROTO_TCP);
170 171
#ifdef ZMQ_HAVE_WINDOWS
    if (s == INVALID_SOCKET)
172
        errno = wsa_error_to_errno (WSAGetLastError ());
173 174 175
#endif

    //  IPv6 address family not supported, try automatic downgrade to IPv4.
Pieter Hintjens's avatar
Pieter Hintjens committed
176 177 178
    if (address.family () == AF_INET6
    && errno == EAFNOSUPPORT
    && options.ipv6) {
179
        rc = address.resolve (addr_, true, true);
180 181
        if (rc != 0)
            return rc;
182
        s = ::socket (address.family (), SOCK_STREAM, IPPROTO_TCP);
183 184
    }

185
#ifdef ZMQ_HAVE_WINDOWS
186
    if (s == INVALID_SOCKET) {
187
        errno = wsa_error_to_errno (WSAGetLastError ());
188 189
        return -1;
    }
190
#if !defined _WIN32_WCE
191 192 193
    //  On Windows, preventing sockets to be inherited by child processes.
    BOOL brc = SetHandleInformation ((HANDLE) s, HANDLE_FLAG_INHERIT, 0);
    win_assert (brc);
194
#endif
195 196 197 198
#else
    if (s == -1)
        return -1;
#endif
199

200 201
    //  On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
    //  Switch it on in such cases.
202
    if (address.family () == AF_INET6)
203
        enable_ipv4_mapping (s);
204

205 206 207 208
    // Set the IP Type-Of-Service for the underlying socket
    if (options.tos != 0)
        set_ip_type_of_service (s, options.tos);

209
    //  Set the socket buffer limits for the underlying socket.
210
    if (options.sndbuf >= 0)
211
        set_tcp_send_buffer (s, options.sndbuf);
212
    if (options.rcvbuf >= 0)
213
        set_tcp_receive_buffer (s, options.rcvbuf);
214

215 216
    //  Allow reusing of the address.
    int flag = 1;
217
#ifdef ZMQ_HAVE_WINDOWS
218
    rc = setsockopt (s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE,
219 220
        (const char*) &flag, sizeof (int));
    wsa_assert (rc != SOCKET_ERROR);
221 222 223 224
#else
    rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
    errno_assert (rc == 0);
#endif
225

226 227
    address.to_string (endpoint);

228
    //  Bind the socket to the network interface and port.
229
    rc = bind (s, address.addr (), address.addrlen ());
230
#ifdef ZMQ_HAVE_WINDOWS
231
    if (rc == SOCKET_ERROR) {
232
        errno = wsa_error_to_errno (WSAGetLastError ());
233
        goto error;
234
    }
Martin Sustrik's avatar
Martin Sustrik committed
235
#else
236
    if (rc != 0)
237
        goto error;
238
#endif
239

240
    //  Listen for incoming connections.
241
    rc = listen (s, options.backlog);
242 243
#ifdef ZMQ_HAVE_WINDOWS
    if (rc == SOCKET_ERROR) {
244
        errno = wsa_error_to_errno (WSAGetLastError ());
245
        goto error;
246
    }
247
#else
Martin Sustrik's avatar
Martin Sustrik committed
248
    if (rc != 0)
249
        goto error;
Brett Cameron's avatar
Brett Cameron committed
250
#endif
251

252
    socket->event_listening (endpoint, (int) s);
Martin Sustrik's avatar
Martin Sustrik committed
253
    return 0;
254 255 256 257 258 259

error:
    int err = errno;
    close ();
    errno = err;
    return -1;
Martin Sustrik's avatar
Martin Sustrik committed
260 261
}

Martin Sustrik's avatar
Martin Sustrik committed
262
zmq::fd_t zmq::tcp_listener_t::accept ()
Martin Sustrik's avatar
Martin Sustrik committed
263
{
264 265
    //  The situation where connection cannot be accepted due to insufficient
    //  resources is considered valid and treated by ignoring the connection.
266
    //  Accept one connection and deal with different failure modes.
Martin Sustrik's avatar
Martin Sustrik committed
267
    zmq_assert (s != retired_fd);
268

269 270
    struct sockaddr_storage ss;
    memset (&ss, 0, sizeof (ss));
AJ Lewis's avatar
AJ Lewis committed
271 272 273
#ifdef ZMQ_HAVE_HPUX
    int ss_len = sizeof (ss);
#else
274
    socklen_t ss_len = sizeof (ss);
AJ Lewis's avatar
AJ Lewis committed
275
#endif
276 277
    fd_t sock = ::accept (s, (struct sockaddr *) &ss, &ss_len);

278
#ifdef ZMQ_HAVE_WINDOWS
279
    if (sock == INVALID_SOCKET) {
280
        wsa_assert (WSAGetLastError () == WSAEWOULDBLOCK ||
281 282 283
            WSAGetLastError () == WSAECONNRESET ||
            WSAGetLastError () == WSAEMFILE ||
            WSAGetLastError () == WSAENOBUFS);
Martin Sustrik's avatar
Martin Sustrik committed
284
        return retired_fd;
285
    }
286
#if !defined _WIN32_WCE
287 288 289
    //  On Windows, preventing sockets to be inherited by child processes.
    BOOL brc = SetHandleInformation ((HANDLE) sock, HANDLE_FLAG_INHERIT, 0);
    win_assert (brc);
290
#endif
Brett Cameron's avatar
Brett Cameron committed
291
#else
292 293
    if (sock == -1) {
        errno_assert (errno == EAGAIN || errno == EWOULDBLOCK ||
294
            errno == EINTR || errno == ECONNABORTED || errno == EPROTO ||
295 296
            errno == ENOBUFS || errno == ENOMEM || errno == EMFILE ||
            errno == ENFILE);
297 298
        return retired_fd;
    }
Brett Cameron's avatar
Brett Cameron committed
299
#endif
300

301 302 303 304 305 306 307
    //  Race condition can cause socket not to be closed (if fork happens
    //  between accept and this point).
#ifdef FD_CLOEXEC
    int rc = fcntl (sock, F_SETFD, FD_CLOEXEC);
    errno_assert (rc != -1);
#endif

308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
    if (!options.tcp_accept_filters.empty ()) {
        bool matched = false;
        for (options_t::tcp_accept_filters_t::size_type i = 0; i != options.tcp_accept_filters.size (); ++i) {
            if (options.tcp_accept_filters[i].match_address ((struct sockaddr *) &ss, ss_len)) {
                matched = true;
                break;
            }
        }
        if (!matched) {
#ifdef ZMQ_HAVE_WINDOWS
            int rc = closesocket (sock);
            wsa_assert (rc != SOCKET_ERROR);
#else
            int rc = ::close (sock);
            errno_assert (rc == 0);
#endif
            return retired_fd;
        }
    }

328 329 330 331
    // Set the IP Type-Of-Service priority for this client socket
    if (options.tos != 0)
        set_ip_type_of_service (sock, options.tos);

Martin Sustrik's avatar
Martin Sustrik committed
332 333
    return sock;
}