tcp_listener.cpp 8.86 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3 4 5 6

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
7
    the terms of the GNU Lesser General Public License as published by
Martin Sustrik's avatar
Martin Sustrik committed
8 9 10 11 12 13
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
    GNU Lesser General Public License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
15

16
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
17 18 19
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20 21
#include <new>

22
#include <string>
23
#include <stdio.h>
24

Martin Sustrik's avatar
Martin Sustrik committed
25
#include "platform.hpp"
26
#include "tcp_listener.hpp"
27
#include "stream_engine.hpp"
28
#include "io_thread.hpp"
29
#include "session_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
30 31
#include "config.hpp"
#include "err.hpp"
32
#include "ip.hpp"
33
#include "tcp.hpp"
34
#include "socket_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
35

Martin Sustrik's avatar
Martin Sustrik committed
36
#ifdef ZMQ_HAVE_WINDOWS
37 38 39 40 41 42 43 44 45 46
#include "windows.hpp"
#else
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <netdb.h>
#include <fcntl.h>
#endif
47 48 49

#ifdef ZMQ_HAVE_OPENVMS
#include <ioctl.h>
50
#endif
Martin Sustrik's avatar
Martin Sustrik committed
51

52 53 54 55 56 57
zmq::tcp_listener_t::tcp_listener_t (io_thread_t *io_thread_,
      socket_base_t *socket_, const options_t &options_) :
    own_t (io_thread_, options_),
    io_object_t (io_thread_),
    s (retired_fd),
    socket (socket_)
58 59 60 61 62
{
}

zmq::tcp_listener_t::~tcp_listener_t ()
{
63
    zmq_assert (s == retired_fd);
64 65
}

66 67 68 69 70 71 72 73 74 75
void zmq::tcp_listener_t::process_plug ()
{
    //  Start polling for incoming connections.
    handle = add_fd (s);
    set_pollin (handle);
}

void zmq::tcp_listener_t::process_term (int linger_)
{
    rm_fd (handle);
76
    close ();
77 78 79 80 81 82 83 84 85
    own_t::process_term (linger_);
}

void zmq::tcp_listener_t::in_event ()
{
    fd_t fd = accept ();

    //  If connection was reset by the peer in the meantime, just ignore it.
    //  TODO: Handle specific errors like ENFILE/EMFILE etc.
86
    if (fd == retired_fd) {
87
        socket->event_accept_failed (endpoint, zmq_errno());
88
        return;
89
    }
90

91
    tune_tcp_socket (fd);
92
    tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
93

94 95 96
    // remember our fd for ZMQ_SRCFD in messages
    socket->set_fd(fd);

97
    //  Create the engine object for this connection.
98
    stream_engine_t *engine = new (std::nothrow)
99
        stream_engine_t (fd, options, endpoint);
100 101 102 103 104 105 106
    alloc_assert (engine);

    //  Choose I/O thread to run connecter in. Given that we are already
    //  running in an I/O thread, there must be at least one available.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    zmq_assert (io_thread);

107
    //  Create and launch a session object.
108
    session_base_t *session = session_base_t::create (io_thread, false, socket,
109
        options, NULL);
110
    errno_assert (session);
111 112 113
    session->inc_seqnum ();
    launch_child (session);
    send_attach (session, engine, false);
114
    socket->event_accepted (endpoint, fd);
115 116
}

117 118 119
void zmq::tcp_listener_t::close ()
{
    zmq_assert (s != retired_fd);
120
#ifdef ZMQ_HAVE_WINDOWS
121 122 123 124 125 126
    int rc = closesocket (s);
    wsa_assert (rc != SOCKET_ERROR);
#else
    int rc = ::close (s);
    errno_assert (rc == 0);
#endif
127
    socket->event_closed (endpoint, s);
128 129
    s = retired_fd;
}
130

131
int zmq::tcp_listener_t::get_address (std::string &addr_)
132
{
133
    // Get the details of the TCP socket
134
    struct sockaddr_storage ss;
AJ Lewis's avatar
AJ Lewis committed
135 136 137
#ifdef ZMQ_HAVE_HPUX
    int sl = sizeof (ss);
#else
138
    socklen_t sl = sizeof (ss);
AJ Lewis's avatar
AJ Lewis committed
139
#endif
140
    int rc = getsockname (s, (struct sockaddr *) &ss, &sl);
141

142
    if (rc != 0) {
143
        addr_.clear ();
144 145
        return rc;
    }
146

147 148
    tcp_address_t addr ((struct sockaddr *) &ss, sl);
    return addr.to_string (addr_);
149 150
}

151
int zmq::tcp_listener_t::set_address (const char *addr_)
152
{
153
    //  Convert the textual address into address structure.
Pieter Hintjens's avatar
Pieter Hintjens committed
154
    int rc = address.resolve (addr_, true, options.ipv6);
155
    if (rc != 0)
156
        return -1;
157 158

    //  Create a listening socket.
159
    s = open_socket (address.family (), SOCK_STREAM, IPPROTO_TCP);
160 161
#ifdef ZMQ_HAVE_WINDOWS
    if (s == INVALID_SOCKET)
162
        errno = wsa_error_to_errno (WSAGetLastError ());
163 164 165
#endif

    //  IPv6 address family not supported, try automatic downgrade to IPv4.
Pieter Hintjens's avatar
Pieter Hintjens committed
166 167 168
    if (address.family () == AF_INET6
    && errno == EAFNOSUPPORT
    && options.ipv6) {
169
        rc = address.resolve (addr_, true, true);
170 171
        if (rc != 0)
            return rc;
172
        s = ::socket (address.family (), SOCK_STREAM, IPPROTO_TCP);
173 174
    }

175
#ifdef ZMQ_HAVE_WINDOWS
176
    if (s == INVALID_SOCKET) {
177
        errno = wsa_error_to_errno (WSAGetLastError ());
178 179
        return -1;
    }
180
#if !defined _WIN32_WCE
181 182 183
    //  On Windows, preventing sockets to be inherited by child processes.
    BOOL brc = SetHandleInformation ((HANDLE) s, HANDLE_FLAG_INHERIT, 0);
    win_assert (brc);
184
#endif
185 186 187 188
#else
    if (s == -1)
        return -1;
#endif
189

190 191
    //  On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
    //  Switch it on in such cases.
192
    if (address.family () == AF_INET6)
193
        enable_ipv4_mapping (s);
194

195 196 197 198
    // Set the IP Type-Of-Service for the underlying socket
    if (options.tos != 0)
        set_ip_type_of_service (s, options.tos);

199 200 201 202 203
    //  Set the socket buffer limits for the underlying socket.
    if (options.sndbuf != 0)
        set_tcp_send_buffer (s, options.sndbuf);
    if (options.rcvbuf != 0)
        set_tcp_receive_buffer (s, options.rcvbuf);
204

205 206
    //  Allow reusing of the address.
    int flag = 1;
207
#ifdef ZMQ_HAVE_WINDOWS
208
    rc = setsockopt (s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE,
209 210
        (const char*) &flag, sizeof (int));
    wsa_assert (rc != SOCKET_ERROR);
211 212 213 214
#else
    rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
    errno_assert (rc == 0);
#endif
215

216 217
    address.to_string (endpoint);

218
    //  Bind the socket to the network interface and port.
219
    rc = bind (s, address.addr (), address.addrlen ());
220
#ifdef ZMQ_HAVE_WINDOWS
221
    if (rc == SOCKET_ERROR) {
222
        errno = wsa_error_to_errno (WSAGetLastError ());
223
        goto error;
224
    }
Martin Sustrik's avatar
Martin Sustrik committed
225
#else
226
    if (rc != 0)
227
        goto error;
228
#endif
229

230
    //  Listen for incoming connections.
231
    rc = listen (s, options.backlog);
232 233
#ifdef ZMQ_HAVE_WINDOWS
    if (rc == SOCKET_ERROR) {
234
        errno = wsa_error_to_errno (WSAGetLastError ());
235
        goto error;
236
    }
237
#else
Martin Sustrik's avatar
Martin Sustrik committed
238
    if (rc != 0)
239
        goto error;
Brett Cameron's avatar
Brett Cameron committed
240
#endif
241

242
    socket->event_listening (endpoint, s);
Martin Sustrik's avatar
Martin Sustrik committed
243
    return 0;
244 245 246 247 248 249

error:
    int err = errno;
    close ();
    errno = err;
    return -1;
Martin Sustrik's avatar
Martin Sustrik committed
250 251
}

Martin Sustrik's avatar
Martin Sustrik committed
252
zmq::fd_t zmq::tcp_listener_t::accept ()
Martin Sustrik's avatar
Martin Sustrik committed
253
{
254 255
    //  The situation where connection cannot be accepted due to insufficient
    //  resources is considered valid and treated by ignoring the connection.
256
    //  Accept one connection and deal with different failure modes.
Martin Sustrik's avatar
Martin Sustrik committed
257
    zmq_assert (s != retired_fd);
258

259 260
    struct sockaddr_storage ss;
    memset (&ss, 0, sizeof (ss));
AJ Lewis's avatar
AJ Lewis committed
261 262 263
#ifdef ZMQ_HAVE_HPUX
    int ss_len = sizeof (ss);
#else
264
    socklen_t ss_len = sizeof (ss);
AJ Lewis's avatar
AJ Lewis committed
265
#endif
266 267
    fd_t sock = ::accept (s, (struct sockaddr *) &ss, &ss_len);

268
#ifdef ZMQ_HAVE_WINDOWS
269
    if (sock == INVALID_SOCKET) {
270
        wsa_assert (WSAGetLastError () == WSAEWOULDBLOCK ||
271 272 273
            WSAGetLastError () == WSAECONNRESET ||
            WSAGetLastError () == WSAEMFILE ||
            WSAGetLastError () == WSAENOBUFS);
Martin Sustrik's avatar
Martin Sustrik committed
274
        return retired_fd;
275
    }
276
#if !defined _WIN32_WCE
277 278 279
    //  On Windows, preventing sockets to be inherited by child processes.
    BOOL brc = SetHandleInformation ((HANDLE) sock, HANDLE_FLAG_INHERIT, 0);
    win_assert (brc);
280
#endif
Brett Cameron's avatar
Brett Cameron committed
281
#else
282 283
    if (sock == -1) {
        errno_assert (errno == EAGAIN || errno == EWOULDBLOCK ||
284
            errno == EINTR || errno == ECONNABORTED || errno == EPROTO ||
285 286
            errno == ENOBUFS || errno == ENOMEM || errno == EMFILE ||
            errno == ENFILE);
287 288
        return retired_fd;
    }
Brett Cameron's avatar
Brett Cameron committed
289
#endif
290

291 292 293 294 295 296 297
    //  Race condition can cause socket not to be closed (if fork happens
    //  between accept and this point).
#ifdef FD_CLOEXEC
    int rc = fcntl (sock, F_SETFD, FD_CLOEXEC);
    errno_assert (rc != -1);
#endif

298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
    if (!options.tcp_accept_filters.empty ()) {
        bool matched = false;
        for (options_t::tcp_accept_filters_t::size_type i = 0; i != options.tcp_accept_filters.size (); ++i) {
            if (options.tcp_accept_filters[i].match_address ((struct sockaddr *) &ss, ss_len)) {
                matched = true;
                break;
            }
        }
        if (!matched) {
#ifdef ZMQ_HAVE_WINDOWS
            int rc = closesocket (sock);
            wsa_assert (rc != SOCKET_ERROR);
#else
            int rc = ::close (sock);
            errno_assert (rc == 0);
#endif
            return retired_fd;
        }
    }

318 319 320 321
    // Set the IP Type-Of-Service priority for this client socket
    if (options.tos != 0)
        set_ip_type_of_service (sock, options.tos);

Martin Sustrik's avatar
Martin Sustrik committed
322 323
    return sock;
}