tcp_listener.cpp 8.12 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
Martin Sustrik's avatar
Martin Sustrik committed
2
    Copyright (c) 2009-2011 250bpm s.r.o.
3
    Copyright (c) 2007-2010 iMatix Corporation
4
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
5 6 7 8

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
9
    the terms of the GNU Lesser General Public License as published by
Martin Sustrik's avatar
Martin Sustrik committed
10 11 12 13 14 15
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
    GNU Lesser General Public License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
17

18
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
19 20 21
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

22 23
#include <new>

24
#include <string>
25

Martin Sustrik's avatar
Martin Sustrik committed
26
#include "platform.hpp"
27
#include "tcp_listener.hpp"
28
#include "stream_engine.hpp"
29
#include "io_thread.hpp"
30
#include "session_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
31 32
#include "config.hpp"
#include "err.hpp"
33
#include "ip.hpp"
34
#include "tcp.hpp"
35
#include "socket_base.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
36

Martin Sustrik's avatar
Martin Sustrik committed
37
#ifdef ZMQ_HAVE_WINDOWS
38 39 40 41 42 43 44 45 46 47
#include "windows.hpp"
#else
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <netdb.h>
#include <fcntl.h>
#endif
48 49 50

#ifdef ZMQ_HAVE_OPENVMS
#include <ioctl.h>
51
#endif
Martin Sustrik's avatar
Martin Sustrik committed
52

53 54 55 56 57 58
zmq::tcp_listener_t::tcp_listener_t (io_thread_t *io_thread_,
      socket_base_t *socket_, const options_t &options_) :
    own_t (io_thread_, options_),
    io_object_t (io_thread_),
    s (retired_fd),
    socket (socket_)
59 60 61 62 63
{
}

zmq::tcp_listener_t::~tcp_listener_t ()
{
64
    zmq_assert (s == retired_fd);
65 66
}

67 68 69 70 71 72 73 74 75 76
void zmq::tcp_listener_t::process_plug ()
{
    //  Start polling for incoming connections.
    handle = add_fd (s);
    set_pollin (handle);
}

void zmq::tcp_listener_t::process_term (int linger_)
{
    rm_fd (handle);
77
    close ();
78 79 80 81 82 83 84 85 86
    own_t::process_term (linger_);
}

void zmq::tcp_listener_t::in_event ()
{
    fd_t fd = accept ();

    //  If connection was reset by the peer in the meantime, just ignore it.
    //  TODO: Handle specific errors like ENFILE/EMFILE etc.
87
    if (fd == retired_fd) {
88
        socket->event_accept_failed (endpoint, zmq_errno());
89
        return;
90
    }
91

92
    tune_tcp_socket (fd);
93
    tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
94

95
    //  Create the engine object for this connection.
96
    stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint);
97 98 99 100 101 102 103 104
    alloc_assert (engine);

    //  Choose I/O thread to run connecter in. Given that we are already
    //  running in an I/O thread, there must be at least one available.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    zmq_assert (io_thread);

    //  Create and launch a session object. 
105
    session_base_t *session = session_base_t::create (io_thread, false, socket,
106
        options, NULL);
107
    errno_assert (session);
108 109 110
    session->inc_seqnum ();
    launch_child (session);
    send_attach (session, engine, false);
111
    socket->event_accepted (endpoint, fd);
112 113
}

114 115 116
void zmq::tcp_listener_t::close ()
{
    zmq_assert (s != retired_fd);
117
#ifdef ZMQ_HAVE_WINDOWS
118 119 120 121 122 123
    int rc = closesocket (s);
    wsa_assert (rc != SOCKET_ERROR);
#else
    int rc = ::close (s);
    errno_assert (rc == 0);
#endif
124
    socket->event_closed (endpoint, s);
125 126
    s = retired_fd;
}
127

128
int zmq::tcp_listener_t::get_address (std::string &addr_)
129
{
130
    // Get the details of the TCP socket
131
    struct sockaddr_storage ss;
AJ Lewis's avatar
AJ Lewis committed
132 133 134
#ifdef ZMQ_HAVE_HPUX
    int sl = sizeof (ss);
#else
135
    socklen_t sl = sizeof (ss);
AJ Lewis's avatar
AJ Lewis committed
136
#endif
137
    int rc = getsockname (s, (struct sockaddr *) &ss, &sl);
138

139
    if (rc != 0) {
140
        addr_.clear ();
141 142
        return rc;
    }
143

144 145
    tcp_address_t addr ((struct sockaddr *) &ss, sl);
    return addr.to_string (addr_);
146 147
}

148
int zmq::tcp_listener_t::set_address (const char *addr_)
149
{
150 151
    //  Convert the textual address into address structure.
    int rc = address.resolve (addr_, true, options.ipv4only ? true : false);
152
    if (rc != 0)
153
        return -1;
154 155

    //  Create a listening socket.
156
    s = open_socket (address.family (), SOCK_STREAM, IPPROTO_TCP);
157 158
#ifdef ZMQ_HAVE_WINDOWS
    if (s == INVALID_SOCKET)
159
        errno = wsa_error_to_errno (WSAGetLastError ());
160 161 162
#endif

    //  IPv6 address family not supported, try automatic downgrade to IPv4.
163
    if (address.family () == AF_INET6 && errno == EAFNOSUPPORT &&
164
          !options.ipv4only) {
165
        rc = address.resolve (addr_, true, true);
166 167
        if (rc != 0)
            return rc;
168
        s = ::socket (address.family (), SOCK_STREAM, IPPROTO_TCP);
169 170
    }

171
#ifdef ZMQ_HAVE_WINDOWS
172
    if (s == INVALID_SOCKET) {
173
        errno = wsa_error_to_errno (WSAGetLastError ());
174 175
        return -1;
    }
176 177 178
    //  On Windows, preventing sockets to be inherited by child processes.
    BOOL brc = SetHandleInformation ((HANDLE) s, HANDLE_FLAG_INHERIT, 0);
    win_assert (brc);
179 180 181 182
#else
    if (s == -1)
        return -1;
#endif
183

184 185
    //  On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
    //  Switch it on in such cases.
186
    if (address.family () == AF_INET6)
187 188
        enable_ipv4_mapping (s);

189 190
    //  Allow reusing of the address.
    int flag = 1;
191
#ifdef ZMQ_HAVE_WINDOWS
192
    rc = setsockopt (s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE,
193 194
        (const char*) &flag, sizeof (int));
    wsa_assert (rc != SOCKET_ERROR);
195 196 197 198
#else
    rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
    errno_assert (rc == 0);
#endif
199

200 201
    address.to_string (endpoint);

202
    //  Bind the socket to the network interface and port.
203
    rc = bind (s, address.addr (), address.addrlen ());
204
#ifdef ZMQ_HAVE_WINDOWS
205
    if (rc == SOCKET_ERROR) {
206
        errno = wsa_error_to_errno (WSAGetLastError ());
207
        goto error;
208
    }
Martin Sustrik's avatar
Martin Sustrik committed
209
#else
210
    if (rc != 0)
211
        goto error;
212
#endif
213

214 215
    //  Listen for incomming connections.
    rc = listen (s, options.backlog);
216 217
#ifdef ZMQ_HAVE_WINDOWS
    if (rc == SOCKET_ERROR) {
218
        errno = wsa_error_to_errno (WSAGetLastError ());
219
        goto error;
220
    }
221
#else
Martin Sustrik's avatar
Martin Sustrik committed
222
    if (rc != 0)
223
        goto error;
Brett Cameron's avatar
Brett Cameron committed
224
#endif
225

226
    socket->event_listening (endpoint, s);
Martin Sustrik's avatar
Martin Sustrik committed
227
    return 0;
228 229 230 231 232 233

error:
    int err = errno;
    close ();
    errno = err;
    return -1;
Martin Sustrik's avatar
Martin Sustrik committed
234 235
}

Martin Sustrik's avatar
Martin Sustrik committed
236
zmq::fd_t zmq::tcp_listener_t::accept ()
Martin Sustrik's avatar
Martin Sustrik committed
237
{
238 239
    //  The situation where connection cannot be accepted due to insufficient
    //  resources is considered valid and treated by ignoring the connection.
240
    //  Accept one connection and deal with different failure modes.
Martin Sustrik's avatar
Martin Sustrik committed
241
    zmq_assert (s != retired_fd);
242

243 244
    struct sockaddr_storage ss;
    memset (&ss, 0, sizeof (ss));
AJ Lewis's avatar
AJ Lewis committed
245 246 247
#ifdef ZMQ_HAVE_HPUX
    int ss_len = sizeof (ss);
#else
248
    socklen_t ss_len = sizeof (ss);
AJ Lewis's avatar
AJ Lewis committed
249
#endif
250 251
    fd_t sock = ::accept (s, (struct sockaddr *) &ss, &ss_len);

252
#ifdef ZMQ_HAVE_WINDOWS
253
    if (sock == INVALID_SOCKET) {
254
        wsa_assert (WSAGetLastError () == WSAEWOULDBLOCK ||
255 256 257
            WSAGetLastError () == WSAECONNRESET ||
            WSAGetLastError () == WSAEMFILE ||
            WSAGetLastError () == WSAENOBUFS);
Martin Sustrik's avatar
Martin Sustrik committed
258
        return retired_fd;
259
    }
260 261 262
    //  On Windows, preventing sockets to be inherited by child processes.
    BOOL brc = SetHandleInformation ((HANDLE) sock, HANDLE_FLAG_INHERIT, 0);
    win_assert (brc);
Brett Cameron's avatar
Brett Cameron committed
263
#else
264 265 266
    if (sock == -1) {
        errno_assert (errno == EAGAIN || errno == EWOULDBLOCK ||
            errno == EINTR || errno == ECONNABORTED || errno == EPROTO ||
267 268
            errno == ENOBUFS || errno == ENOMEM || errno == EMFILE ||
            errno == ENFILE);
269 270
        return retired_fd;
    }
Brett Cameron's avatar
Brett Cameron committed
271
#endif
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292

    if (!options.tcp_accept_filters.empty ()) {
        bool matched = false;
        for (options_t::tcp_accept_filters_t::size_type i = 0; i != options.tcp_accept_filters.size (); ++i) {
            if (options.tcp_accept_filters[i].match_address ((struct sockaddr *) &ss, ss_len)) {
                matched = true;
                break;
            }
        }
        if (!matched) {
#ifdef ZMQ_HAVE_WINDOWS
            int rc = closesocket (sock);
            wsa_assert (rc != SOCKET_ERROR);
#else
            int rc = ::close (sock);
            errno_assert (rc == 0);
#endif
            return retired_fd;
        }
    }

Martin Sustrik's avatar
Martin Sustrik committed
293 294
    return sock;
}