tcp_listener.cpp 9.6 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Martin Sustrik's avatar
Martin Sustrik committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Martin Sustrik's avatar
Martin Sustrik committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
25

26
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30 31
#include <new>

32
#include <string>
33
#include <stdio.h>
34

Martin Sustrik's avatar
Martin Sustrik committed
35
#include "platform.hpp"
36
#include "tcp_listener.hpp"
37
#include "stream_engine.hpp"
38
#include "io_thread.hpp"
39
#include "session_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
40 41
#include "config.hpp"
#include "err.hpp"
42
#include "ip.hpp"
43
#include "tcp.hpp"
44
#include "socket_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
45

Martin Sustrik's avatar
Martin Sustrik committed
46
#ifdef ZMQ_HAVE_WINDOWS
47 48 49 50 51 52 53 54 55 56
#include "windows.hpp"
#else
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <netdb.h>
#include <fcntl.h>
#endif
57 58 59

#ifdef ZMQ_HAVE_OPENVMS
#include <ioctl.h>
60
#endif
Martin Sustrik's avatar
Martin Sustrik committed
61

62 63 64 65 66 67
zmq::tcp_listener_t::tcp_listener_t (io_thread_t *io_thread_,
      socket_base_t *socket_, const options_t &options_) :
    own_t (io_thread_, options_),
    io_object_t (io_thread_),
    s (retired_fd),
    socket (socket_)
68 69 70 71 72
{
}

zmq::tcp_listener_t::~tcp_listener_t ()
{
73
    zmq_assert (s == retired_fd);
74 75
}

76 77 78 79 80 81 82 83 84 85
void zmq::tcp_listener_t::process_plug ()
{
    //  Start polling for incoming connections.
    handle = add_fd (s);
    set_pollin (handle);
}

void zmq::tcp_listener_t::process_term (int linger_)
{
    rm_fd (handle);
86
    close ();
87 88 89 90 91 92 93 94 95
    own_t::process_term (linger_);
}

void zmq::tcp_listener_t::in_event ()
{
    fd_t fd = accept ();

    //  If connection was reset by the peer in the meantime, just ignore it.
    //  TODO: Handle specific errors like ENFILE/EMFILE etc.
96
    if (fd == retired_fd) {
97
        socket->event_accept_failed (endpoint, zmq_errno());
98
        return;
99
    }
100

101
    tune_tcp_socket (fd);
102
    tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
103
    tune_tcp_retransmit_timeout (fd, options.tcp_retransmit_timeout);
104

105 106 107
    // remember our fd for ZMQ_SRCFD in messages
    socket->set_fd(fd);

108
    //  Create the engine object for this connection.
109
    stream_engine_t *engine = new (std::nothrow)
110
        stream_engine_t (fd, options, endpoint);
111 112 113 114 115 116 117
    alloc_assert (engine);

    //  Choose I/O thread to run connecter in. Given that we are already
    //  running in an I/O thread, there must be at least one available.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    zmq_assert (io_thread);

118
    //  Create and launch a session object.
119
    session_base_t *session = session_base_t::create (io_thread, false, socket,
120
        options, NULL);
121
    errno_assert (session);
122 123 124
    session->inc_seqnum ();
    launch_child (session);
    send_attach (session, engine, false);
125
    socket->event_accepted (endpoint, (int) fd);
126 127
}

128 129 130
void zmq::tcp_listener_t::close ()
{
    zmq_assert (s != retired_fd);
131
#ifdef ZMQ_HAVE_WINDOWS
132 133 134 135 136 137
    int rc = closesocket (s);
    wsa_assert (rc != SOCKET_ERROR);
#else
    int rc = ::close (s);
    errno_assert (rc == 0);
#endif
138
    socket->event_closed (endpoint, (int) s);
139 140
    s = retired_fd;
}
141

142
int zmq::tcp_listener_t::get_address (std::string &addr_)
143
{
144
    // Get the details of the TCP socket
145
    struct sockaddr_storage ss;
AJ Lewis's avatar
AJ Lewis committed
146 147 148
#ifdef ZMQ_HAVE_HPUX
    int sl = sizeof (ss);
#else
149
    socklen_t sl = sizeof (ss);
AJ Lewis's avatar
AJ Lewis committed
150
#endif
151
    int rc = getsockname (s, (struct sockaddr *) &ss, &sl);
152

153
    if (rc != 0) {
154
        addr_.clear ();
155 156
        return rc;
    }
157

158 159
    tcp_address_t addr ((struct sockaddr *) &ss, sl);
    return addr.to_string (addr_);
160 161
}

162
int zmq::tcp_listener_t::set_address (const char *addr_)
163
{
164
    //  Convert the textual address into address structure.
Pieter Hintjens's avatar
Pieter Hintjens committed
165
    int rc = address.resolve (addr_, true, options.ipv6);
166
    if (rc != 0)
167
        return -1;
168 169

    //  Create a listening socket.
170
    s = open_socket (address.family (), SOCK_STREAM, IPPROTO_TCP);
171 172
#ifdef ZMQ_HAVE_WINDOWS
    if (s == INVALID_SOCKET)
173
        errno = wsa_error_to_errno (WSAGetLastError ());
174 175 176
#endif

    //  IPv6 address family not supported, try automatic downgrade to IPv4.
Pieter Hintjens's avatar
Pieter Hintjens committed
177 178 179
    if (address.family () == AF_INET6
    && errno == EAFNOSUPPORT
    && options.ipv6) {
180
        rc = address.resolve (addr_, true, true);
181 182
        if (rc != 0)
            return rc;
183
        s = ::socket (address.family (), SOCK_STREAM, IPPROTO_TCP);
184 185
    }

186
#ifdef ZMQ_HAVE_WINDOWS
187
    if (s == INVALID_SOCKET) {
188
        errno = wsa_error_to_errno (WSAGetLastError ());
189 190
        return -1;
    }
191
#if !defined _WIN32_WCE
192 193 194
    //  On Windows, preventing sockets to be inherited by child processes.
    BOOL brc = SetHandleInformation ((HANDLE) s, HANDLE_FLAG_INHERIT, 0);
    win_assert (brc);
195
#endif
196 197 198 199
#else
    if (s == -1)
        return -1;
#endif
200

201 202
    //  On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
    //  Switch it on in such cases.
203
    if (address.family () == AF_INET6)
204
        enable_ipv4_mapping (s);
205

206 207 208 209
    // Set the IP Type-Of-Service for the underlying socket
    if (options.tos != 0)
        set_ip_type_of_service (s, options.tos);

210
    //  Set the socket buffer limits for the underlying socket.
211
    if (options.sndbuf >= 0)
212
        set_tcp_send_buffer (s, options.sndbuf);
213
    if (options.rcvbuf >= 0)
214
        set_tcp_receive_buffer (s, options.rcvbuf);
215

216 217
    //  Allow reusing of the address.
    int flag = 1;
218
#ifdef ZMQ_HAVE_WINDOWS
219
    rc = setsockopt (s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE,
220 221
        (const char*) &flag, sizeof (int));
    wsa_assert (rc != SOCKET_ERROR);
222 223 224 225
#else
    rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
    errno_assert (rc == 0);
#endif
226

227 228
    address.to_string (endpoint);

229
    //  Bind the socket to the network interface and port.
230
    rc = bind (s, address.addr (), address.addrlen ());
231
#ifdef ZMQ_HAVE_WINDOWS
232
    if (rc == SOCKET_ERROR) {
233
        errno = wsa_error_to_errno (WSAGetLastError ());
234
        goto error;
235
    }
Martin Sustrik's avatar
Martin Sustrik committed
236
#else
237
    if (rc != 0)
238
        goto error;
239
#endif
240

241
    //  Listen for incoming connections.
242
    rc = listen (s, options.backlog);
243 244
#ifdef ZMQ_HAVE_WINDOWS
    if (rc == SOCKET_ERROR) {
245
        errno = wsa_error_to_errno (WSAGetLastError ());
246
        goto error;
247
    }
248
#else
Martin Sustrik's avatar
Martin Sustrik committed
249
    if (rc != 0)
250
        goto error;
Brett Cameron's avatar
Brett Cameron committed
251
#endif
252

253
    socket->event_listening (endpoint, (int) s);
Martin Sustrik's avatar
Martin Sustrik committed
254
    return 0;
255 256 257 258 259 260

error:
    int err = errno;
    close ();
    errno = err;
    return -1;
Martin Sustrik's avatar
Martin Sustrik committed
261 262
}

Martin Sustrik's avatar
Martin Sustrik committed
263
zmq::fd_t zmq::tcp_listener_t::accept ()
Martin Sustrik's avatar
Martin Sustrik committed
264
{
265 266
    //  The situation where connection cannot be accepted due to insufficient
    //  resources is considered valid and treated by ignoring the connection.
267
    //  Accept one connection and deal with different failure modes.
Martin Sustrik's avatar
Martin Sustrik committed
268
    zmq_assert (s != retired_fd);
269

270 271
    struct sockaddr_storage ss;
    memset (&ss, 0, sizeof (ss));
AJ Lewis's avatar
AJ Lewis committed
272 273 274
#ifdef ZMQ_HAVE_HPUX
    int ss_len = sizeof (ss);
#else
275
    socklen_t ss_len = sizeof (ss);
AJ Lewis's avatar
AJ Lewis committed
276
#endif
277 278
    fd_t sock = ::accept (s, (struct sockaddr *) &ss, &ss_len);

279
#ifdef ZMQ_HAVE_WINDOWS
280
    if (sock == INVALID_SOCKET) {
281 282 283 284 285
		const int last_error = WSAGetLastError();
        wsa_assert (last_error == WSAEWOULDBLOCK ||
            last_error == WSAECONNRESET ||
            last_error == WSAEMFILE ||
            last_error == WSAENOBUFS);
Martin Sustrik's avatar
Martin Sustrik committed
286
        return retired_fd;
287
    }
288
#if !defined _WIN32_WCE
289 290 291
    //  On Windows, preventing sockets to be inherited by child processes.
    BOOL brc = SetHandleInformation ((HANDLE) sock, HANDLE_FLAG_INHERIT, 0);
    win_assert (brc);
292
#endif
Brett Cameron's avatar
Brett Cameron committed
293
#else
294 295
    if (sock == -1) {
        errno_assert (errno == EAGAIN || errno == EWOULDBLOCK ||
296
            errno == EINTR || errno == ECONNABORTED || errno == EPROTO ||
297 298
            errno == ENOBUFS || errno == ENOMEM || errno == EMFILE ||
            errno == ENFILE);
299 300
        return retired_fd;
    }
Brett Cameron's avatar
Brett Cameron committed
301
#endif
302

303 304 305 306 307 308 309
    //  Race condition can cause socket not to be closed (if fork happens
    //  between accept and this point).
#ifdef FD_CLOEXEC
    int rc = fcntl (sock, F_SETFD, FD_CLOEXEC);
    errno_assert (rc != -1);
#endif

310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
    if (!options.tcp_accept_filters.empty ()) {
        bool matched = false;
        for (options_t::tcp_accept_filters_t::size_type i = 0; i != options.tcp_accept_filters.size (); ++i) {
            if (options.tcp_accept_filters[i].match_address ((struct sockaddr *) &ss, ss_len)) {
                matched = true;
                break;
            }
        }
        if (!matched) {
#ifdef ZMQ_HAVE_WINDOWS
            int rc = closesocket (sock);
            wsa_assert (rc != SOCKET_ERROR);
#else
            int rc = ::close (sock);
            errno_assert (rc == 0);
#endif
            return retired_fd;
        }
    }

330 331 332 333
    // Set the IP Type-Of-Service priority for this client socket
    if (options.tos != 0)
        set_ip_type_of_service (sock, options.tos);

Martin Sustrik's avatar
Martin Sustrik committed
334 335
    return sock;
}