tcp.cpp 11.3 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
32 33 34 35
#include "ip.hpp"
#include "tcp.hpp"
#include "err.hpp"

36
#if !defined ZMQ_HAVE_WINDOWS
37 38 39 40 41 42 43 44 45 46 47
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#endif

#if defined ZMQ_HAVE_OPENVMS
#include <ioctl.h>
#endif

48
int zmq::tune_tcp_socket (fd_t s_)
49 50 51 52 53 54 55
{
    //  Disable Nagle's algorithm. We are doing data batching on 0MQ level,
    //  so using Nagle wouldn't improve throughput in anyway, but it would
    //  hurt latency.
    int nodelay = 1;
    int rc = setsockopt (s_, IPPROTO_TCP, TCP_NODELAY, (char*) &nodelay,
        sizeof (int));
56
    tcp_assert_tuning_error (s_, rc);
57 58
    if (rc != 0)
        return rc;
59 60

#ifdef ZMQ_HAVE_OPENVMS
61
    //  Disable delayed acknowledgements as they hurt latency significantly.
62 63 64
    int nodelack = 1;
    rc = setsockopt (s_, IPPROTO_TCP, TCP_NODELACK, (char*) &nodelack,
        sizeof (int));
65
    tcp_assert_tuning_error (s_, rc);
66
#endif
67
    return rc;
68 69
}

70
int zmq::set_tcp_send_buffer (fd_t sockfd_, int bufsize_)
71 72 73
{
    const int rc = setsockopt (sockfd_, SOL_SOCKET, SO_SNDBUF,
        (char*) &bufsize_, sizeof bufsize_);
74
    tcp_assert_tuning_error (sockfd_, rc);
75
    return rc;
76 77
}

78
int zmq::set_tcp_receive_buffer (fd_t sockfd_, int bufsize_)
79 80
{
    const int rc = setsockopt (sockfd_, SOL_SOCKET, SO_RCVBUF,
81
        (char *) &bufsize_, sizeof bufsize_);
82
    tcp_assert_tuning_error (sockfd_, rc);
83
    return rc;
84 85
}

86
int zmq::tune_tcp_keepalives (fd_t s_, int keepalive_, int keepalive_cnt_,
87
        int keepalive_idle_, int keepalive_intvl_)
88
{
89
    // These options are used only under certain #ifdefs below.
90 91 92 93
    LIBZMQ_UNUSED (keepalive_);
    LIBZMQ_UNUSED (keepalive_cnt_);
    LIBZMQ_UNUSED (keepalive_idle_);
    LIBZMQ_UNUSED (keepalive_intvl_);
94 95

    // If none of the #ifdefs apply, then s_ is unused.
96
    LIBZMQ_UNUSED (s_);
97

98 99
    //  Tuning TCP keep-alives if platform allows it
    //  All values = -1 means skip and leave it for OS
100
#ifdef ZMQ_HAVE_WINDOWS
101 102 103
    if (keepalive_ != -1) {
        tcp_keepalive keepalive_opts;
        keepalive_opts.onoff = keepalive_;
104 105 106 107
        keepalive_opts.keepalivetime = keepalive_idle_ != -1 ?
                                            keepalive_idle_ * 1000 : 7200000;
        keepalive_opts.keepaliveinterval = keepalive_intvl_ != -1 ?
                                            keepalive_intvl_ * 1000 : 1000;
108
        DWORD num_bytes_returned;
109 110 111
        int rc = WSAIoctl (s_, SIO_KEEPALIVE_VALS, &keepalive_opts,
            sizeof (keepalive_opts), NULL, 0, &num_bytes_returned, NULL, NULL);
        tcp_assert_tuning_error (s_, rc);
112 113
        if (rc == SOCKET_ERROR)
            return rc;
114
    }
115
#else
116 117
#ifdef ZMQ_HAVE_SO_KEEPALIVE
    if (keepalive_ != -1) {
118 119
        int rc = setsockopt (s_, SOL_SOCKET, SO_KEEPALIVE,
                (char*) &keepalive_, sizeof (int));
120
        tcp_assert_tuning_error (s_, rc);
121 122
        if (rc != 0)
            return rc;
123 124 125

#ifdef ZMQ_HAVE_TCP_KEEPCNT
        if (keepalive_cnt_ != -1) {
126 127
            int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPCNT,
                    &keepalive_cnt_, sizeof (int));
128
            tcp_assert_tuning_error (s_, rc);
129 130
            if (rc != 0)
                return rc;
131 132 133 134 135
        }
#endif // ZMQ_HAVE_TCP_KEEPCNT

#ifdef ZMQ_HAVE_TCP_KEEPIDLE
        if (keepalive_idle_ != -1) {
136 137
            int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPIDLE,
                    &keepalive_idle_, sizeof (int));
138
            tcp_assert_tuning_error (s_, rc);
139 140
            if (rc != 0)
                return rc;
141 142 143 144
        }
#else // ZMQ_HAVE_TCP_KEEPIDLE
#ifdef ZMQ_HAVE_TCP_KEEPALIVE
        if (keepalive_idle_ != -1) {
145 146
            int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPALIVE,
                    &keepalive_idle_, sizeof (int));
147
            tcp_assert_tuning_error (s_, rc);
148 149
            if (rc != 0)
                return rc;
150 151 152 153 154 155
        }
#endif // ZMQ_HAVE_TCP_KEEPALIVE
#endif // ZMQ_HAVE_TCP_KEEPIDLE

#ifdef ZMQ_HAVE_TCP_KEEPINTVL
        if (keepalive_intvl_ != -1) {
156 157
            int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPINTVL,
                    &keepalive_intvl_, sizeof (int));
158
            tcp_assert_tuning_error (s_, rc);
159 160
            if (rc != 0)
                return rc;
161 162 163 164
        }
#endif // ZMQ_HAVE_TCP_KEEPINTVL
    }
#endif // ZMQ_HAVE_SO_KEEPALIVE
165
#endif // ZMQ_HAVE_WINDOWS
166 167
    
    return 0;
168
}
169

170
int zmq::tune_tcp_maxrt (fd_t sockfd_, int timeout_)
171 172
{
    if (timeout_ <= 0)
173
        return 0;
174

175 176
    LIBZMQ_UNUSED (sockfd_);

177 178 179
#if defined (ZMQ_HAVE_WINDOWS) && defined (TCP_MAXRT)
    // msdn says it's supported in >= Vista, >= Windows Server 2003
    timeout_ /= 1000;    // in seconds
180
    int rc = setsockopt (sockfd_, IPPROTO_TCP, TCP_MAXRT, (char*) &timeout_,
181 182
        sizeof (timeout_));
    tcp_assert_tuning_error (sockfd_, rc);
183
    return rc;
184 185
// FIXME: should be ZMQ_HAVE_TCP_USER_TIMEOUT
#elif defined (TCP_USER_TIMEOUT)
186
    int rc = setsockopt (sockfd_, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout_,
187 188
        sizeof (timeout_));
    tcp_assert_tuning_error (sockfd_, rc);
189
    return rc;
190
#endif
191
    return 0;
192 193 194
}

 int zmq::tcp_write (fd_t s_, const void *data_, size_t size_)
195 196 197 198 199 200 201
{
#ifdef ZMQ_HAVE_WINDOWS

    int nbytes = send (s_, (char*) data_, (int) size_, 0);

    //  If not a single byte can be written to the socket in non-blocking mode
    //  we'll get an error (this may happen during the speculative write).
202
    const int last_error = WSAGetLastError ();
203
    if (nbytes == SOCKET_ERROR && last_error == WSAEWOULDBLOCK)
204 205 206 207
        return 0;

    //  Signalise peer failure.
    if (nbytes == SOCKET_ERROR && (
208 209 210 211 212 213
          last_error == WSAENETDOWN     ||
          last_error == WSAENETRESET    ||
          last_error == WSAEHOSTUNREACH ||
          last_error == WSAECONNABORTED ||
          last_error == WSAETIMEDOUT    ||
          last_error == WSAECONNRESET
214
        ))
215 216
        return -1;

217 218 219
    //  Circumvent a Windows bug:
    //  See https://support.microsoft.com/en-us/kb/201213
    //  See https://zeromq.jira.com/browse/LIBZMQ-195
220
    if (nbytes == SOCKET_ERROR && last_error == WSAENOBUFS)
221 222
        return 0;

223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
    wsa_assert (nbytes != SOCKET_ERROR);
    return nbytes;

#else
    ssize_t nbytes = send (s_, data_, size_, 0);

    //  Several errors are OK. When speculative write is being done we may not
    //  be able to write a single byte from the socket. Also, SIGSTOP issued
    //  by a debugging tool can result in EINTR error.
    if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||
          errno == EINTR))
        return 0;

    //  Signalise peer failure.
    if (nbytes == -1) {
        errno_assert (errno != EACCES
                   && errno != EBADF
                   && errno != EDESTADDRREQ
                   && errno != EFAULT
                   && errno != EISCONN
                   && errno != EMSGSIZE
                   && errno != ENOMEM
                   && errno != ENOTSOCK
                   && errno != EOPNOTSUPP);
        return -1;
    }

    return static_cast <int> (nbytes);

#endif
}

int zmq::tcp_read (fd_t s_, void *data_, size_t size_)
{
#ifdef ZMQ_HAVE_WINDOWS

    const int rc = recv (s_, (char*) data_, (int) size_, 0);

    //  If not a single byte can be read from the socket in non-blocking mode
    //  we'll get an error (this may happen during the speculative read).
263
    if (rc == SOCKET_ERROR) {
264
        const int last_error = WSAGetLastError ();
265 266 267 268
        if (last_error == WSAEWOULDBLOCK) {
            errno = EAGAIN;
        }
        else {
269 270 271 272 273 274
            wsa_assert (last_error == WSAENETDOWN ||
                last_error == WSAENETRESET        ||
                last_error == WSAECONNABORTED     ||
                last_error == WSAETIMEDOUT        ||
                last_error == WSAECONNRESET       ||
                last_error == WSAECONNREFUSED     ||
275 276 277
                last_error == WSAENOTCONN);
            errno = wsa_error_to_errno (last_error);
        }
278 279
    }

280
    return rc == SOCKET_ERROR ? -1 : rc;
281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301

#else

    const ssize_t rc = recv (s_, data_, size_, 0);

    //  Several errors are OK. When speculative read is being done we may not
    //  be able to read a single byte from the socket. Also, SIGSTOP issued
    //  by a debugging tool can result in EINTR error.
    if (rc == -1) {
        errno_assert (errno != EBADF
                   && errno != EFAULT
                   && errno != ENOMEM
                   && errno != ENOTSOCK);
        if (errno == EWOULDBLOCK || errno == EINTR)
            errno = EAGAIN;
    }

    return static_cast <int> (rc);

#endif
}
302

303
void zmq::tcp_assert_tuning_error (zmq::fd_t s_, int rc_)
304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323
{
    if (rc_ == 0)
        return;

    //  Check whether an error occurred
    int err = 0;
#ifdef ZMQ_HAVE_HPUX
    int len = sizeof err;
#else
    socklen_t len = sizeof err;
#endif
    
    int rc = getsockopt (s_, SOL_SOCKET, SO_ERROR, (char*) &err, &len);
    
    //  Assert if the error was caused by 0MQ bug.
    //  Networking problems are OK. No need to assert.
#ifdef ZMQ_HAVE_WINDOWS
    zmq_assert (rc == 0);
    if (err != 0) {
        wsa_assert (err == WSAECONNREFUSED
324
                 || err == WSAECONNRESET
325
                 || err == WSAECONNABORTED
326 327
                 || err == WSAEINTR
                 || err == WSAETIMEDOUT
328 329 330
                 || err == WSAEHOSTUNREACH
                 || err == WSAENETUNREACH
                 || err == WSAENETDOWN
331
                 || err == WSAENETRESET
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351
                 || err == WSAEACCES
                 || err == WSAEINVAL
                 || err == WSAEADDRINUSE);
    }
#else
    //  Following code should handle both Berkeley-derived socket
    //  implementations and Solaris.
    if (rc == -1)
        err = errno;
    if (err != 0) {
        errno = err;
        errno_assert (
            errno == ECONNREFUSED ||
            errno == ECONNRESET ||
            errno == ECONNABORTED ||
            errno == EINTR ||
            errno == ETIMEDOUT ||
            errno == EHOSTUNREACH ||
            errno == ENETUNREACH ||
            errno == ENETDOWN ||
352
            errno == ENETRESET ||
353 354 355 356
            errno == EINVAL);
    }
#endif
}