mailbox.cpp 11 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2010 iMatix Corporation
Martin Sustrik's avatar
Martin Sustrik committed
3 4 5 6

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
7
    the terms of the GNU Lesser General Public License as published by
Martin Sustrik's avatar
Martin Sustrik committed
8 9 10 11 12 13
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
    GNU Lesser General Public License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
15

16
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
17 18 19
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20
#include "mailbox.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
21 22 23
#include "platform.hpp"
#include "err.hpp"
#include "fd.hpp"
unknown's avatar
unknown committed
24
#include "ip.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
25

26
#if defined ZMQ_HAVE_WINDOWS
Martin Sustrik's avatar
Martin Sustrik committed
27 28 29 30
#include "windows.hpp"
#else
#include <unistd.h>
#include <fcntl.h>
31
#include <limits.h>
32 33 34 35
#include <netinet/tcp.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
Martin Sustrik's avatar
Martin Sustrik committed
36 37
#endif

38
zmq::fd_t zmq::mailbox_t::get_fd ()
39 40 41 42
{
    return r;
}

43
#if defined ZMQ_HAVE_WINDOWS
Martin Sustrik's avatar
Martin Sustrik committed
44

45
zmq::mailbox_t::mailbox_t ()
Martin Sustrik's avatar
Martin Sustrik committed
46
{
47 48 49
    //  Create the socketpair for signalling.
    int rc = make_socketpair (&r, &w);
    errno_assert (rc == 0);
Martin Sustrik's avatar
Martin Sustrik committed
50

51
    //  Set the writer to non-blocking mode.
52
    unsigned long argp = 1;
53
    rc = ioctlsocket (w, FIONBIO, &argp);
54
    wsa_assert (rc != SOCKET_ERROR);
Martin Sustrik's avatar
Martin Sustrik committed
55

56 57 58
    //  Set the reader to non-blocking mode.
    argp = 1;
    rc = ioctlsocket (r, FIONBIO, &argp);
Martin Sustrik's avatar
Martin Sustrik committed
59 60 61
    wsa_assert (rc != SOCKET_ERROR);
}

62
zmq::mailbox_t::~mailbox_t ()
Martin Sustrik's avatar
Martin Sustrik committed
63 64 65 66 67 68 69 70
{
    int rc = closesocket (w);
    wsa_assert (rc != SOCKET_ERROR);

    rc = closesocket (r);
    wsa_assert (rc != SOCKET_ERROR);
}

71
void zmq::mailbox_t::send (const command_t &cmd_)
Martin Sustrik's avatar
Martin Sustrik committed
72
{
73 74 75 76 77 78
    //  TODO: Implement SNDBUF auto-resizing as for POSIX platforms.
    //  In the mean time, the following code with assert if the send()
    //  call would block.
    int nbytes = ::send (w, (char *)&cmd_, sizeof (command_t), 0);
    wsa_assert (nbytes != SOCKET_ERROR);
    zmq_assert (nbytes == sizeof (command_t));
Martin Sustrik's avatar
Martin Sustrik committed
79 80
}

81
int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
Martin Sustrik's avatar
Martin Sustrik committed
82
{
83
    //  If required, set the reader to blocking mode.
84 85 86 87 88
    if (block_) {
        unsigned long argp = 0;
        int rc = ioctlsocket (r, FIONBIO, &argp);
        wsa_assert (rc != SOCKET_ERROR);
    }
89

90
    //  Attempt to read an entire command. Returns EAGAIN if non-blocking
91 92
    //  and a command is not available. Save value of errno if we wish to pass
    //  it to caller.
93 94
    int err = 0;
    int nbytes = ::recv (r, (char *)cmd_, sizeof (command_t), 0);
95
    if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK)
96
        err = EAGAIN;
97 98

    //  Re-set the reader to non-blocking mode.
99 100 101 102 103
    if (block_) {
        unsigned long argp = 1;
        int rc = ioctlsocket (r, FIONBIO, &argp);
        wsa_assert (rc != SOCKET_ERROR);
    }
104 105

    //  If the recv failed, return with the saved errno.
106
    if (err != 0) {
107
        errno = err;
108 109
        return -1;
    }
110

111 112
    //  Sanity check for success.
    wsa_assert (nbytes != SOCKET_ERROR);
Martin Sustrik's avatar
Martin Sustrik committed
113

114 115
    //  Check whether we haven't got half of command.
    zmq_assert (nbytes == sizeof (command_t));
116

117 118
    return 0;
}
119

120
#else
121

122
zmq::mailbox_t::mailbox_t ()
123
{
124 125 126 127 128 129 130
#ifdef PIPE_BUF
    //  Make sure that command can be written to the socket in atomic fashion.
    //  If this wasn't guaranteed, commands from different threads would be
    //  interleaved.
    zmq_assert (sizeof (command_t) <= PIPE_BUF);
#endif

131
    //  Create the socketpair for signaling.
132 133 134 135 136 137 138
    int rc = make_socketpair (&r, &w);
    errno_assert (rc == 0);

    //  Set the writer to non-blocking mode.
    int flags = fcntl (w, F_GETFL, 0);
    errno_assert (flags >= 0);
    rc = fcntl (w, F_SETFL, flags | O_NONBLOCK);
139 140
    errno_assert (rc == 0);

141
#ifndef MSG_DONTWAIT
142
    //  Set the reader to non-blocking mode.
143 144
    flags = fcntl (r, F_GETFL, 0);
    errno_assert (flags >= 0);
145
    rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
146 147
    errno_assert (rc == 0);
#endif
148 149
}

150
zmq::mailbox_t::~mailbox_t ()
151 152 153 154 155
{
    close (w);
    close (r);
}

156
void zmq::mailbox_t::send (const command_t &cmd_)
157
{
158
    //  Attempt to write an entire command without blocking.
159 160
    ssize_t nbytes;
    do {
161
        nbytes = ::send (w, &cmd_, sizeof (command_t), 0);
162
    } while (nbytes == -1 && errno == EINTR);
163 164

    //  Attempt to increase mailbox SNDBUF if the send failed.
165 166 167
    if (nbytes == -1 && errno == EAGAIN) {
        int old_sndbuf, new_sndbuf;
        socklen_t sndbuf_size = sizeof old_sndbuf;
168

169 170 171 172 173
        //  Retrieve current send buffer size.
        int rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &old_sndbuf,
            &sndbuf_size);
        errno_assert (rc == 0);
        new_sndbuf = old_sndbuf * 2;
174

175 176 177
        //  Double the new send buffer size.
        rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, sndbuf_size);
        errno_assert (rc == 0);
178

179 180 181 182
        //  Verify that the OS actually honored the request.
        rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, &sndbuf_size);
        errno_assert (rc == 0);
        zmq_assert (new_sndbuf > old_sndbuf);
183

184 185 186 187 188
        //  Retry the sending operation; at this point it must succeed.
        do {
            nbytes = ::send (w, &cmd_, sizeof (command_t), 0);
        } while (nbytes == -1 && errno == EINTR);
    }
189
    errno_assert (nbytes != -1);
190 191 192

    //  This should never happen as we've already checked that command size is
    //  less than PIPE_BUF.
193
    zmq_assert (nbytes == sizeof (command_t));
194 195
}

196
int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
197
{
198
#ifdef MSG_DONTWAIT
199

200 201 202 203 204 205 206
    //  Attempt to read an entire command. Returns EAGAIN if non-blocking
    //  mode is requested and a command is not available.
    ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t),
        block_ ? 0 : MSG_DONTWAIT);
    if (nbytes == -1 && (errno == EAGAIN || errno == EINTR))
        return -1;
#else
207

208 209
    //  If required, set the reader to blocking mode.
    if (block_) {
210
        int flags = fcntl (r, F_GETFL, 0);
211
        errno_assert (flags >= 0);
212
        int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK);
213
        errno_assert (rc == 0);
214
    }
215

216
    //  Attempt to read an entire command. Returns EAGAIN if non-blocking
217 218
    //  and a command is not available. Save value of errno if we wish to pass
    //  it to caller.
219 220
    int err = 0;
    ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0);
221
    if (nbytes == -1 && (errno == EAGAIN || errno == EINTR))  
222
        err = errno;
223

224
    //  Re-set the reader to non-blocking mode.
225
    if (block_) {
226
        int flags = fcntl (r, F_GETFL, 0);
227
        errno_assert (flags >= 0);
228
        int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
229
        errno_assert (rc == 0);
230
    }
231

232 233
    //  If the recv failed, return with the saved errno if set.
    if (err != 0) {
234
        errno = err;
235 236
        return -1;
    }
237

238
#endif
239

240 241
    //  Sanity check for success.
    errno_assert (nbytes != -1);
242

243 244
    //  Check whether we haven't got half of command.
    zmq_assert (nbytes == sizeof (command_t));
Martin Sustrik's avatar
Martin Sustrik committed
245

246 247
    return 0;
}
Martin Sustrik's avatar
Martin Sustrik committed
248

249 250
#endif

251
int zmq::mailbox_t::make_socketpair (fd_t *r_, fd_t *w_)
Martin Sustrik's avatar
Martin Sustrik committed
252
{
253
#if defined ZMQ_HAVE_WINDOWS
254

255 256 257 258
    //  Windows has no 'socketpair' function. CreatePipe is no good as pipe
    //  handles cannot be polled on. Here we create the socketpair by hand.
    *w_ = INVALID_SOCKET;
    *r_ = INVALID_SOCKET;
Martin Sustrik's avatar
Martin Sustrik committed
259

260 261 262 263
    //  Create listening socket.
    SOCKET listener;
    listener = socket (AF_INET, SOCK_STREAM, 0);
    wsa_assert (listener != INVALID_SOCKET);
Martin Sustrik's avatar
Martin Sustrik committed
264

265 266 267 268 269 270 271 272 273
    //  Set SO_REUSEADDR and TCP_NODELAY on listening socket.
    BOOL so_reuseaddr = 1;
    int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
        (char *)&so_reuseaddr, sizeof (so_reuseaddr));
    wsa_assert (rc != SOCKET_ERROR);
    BOOL tcp_nodelay = 1;
    rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY,
        (char *)&tcp_nodelay, sizeof (tcp_nodelay));
    wsa_assert (rc != SOCKET_ERROR);
274

275 276 277 278 279 280 281 282
    //  Bind listening socket to any free local port.
    struct sockaddr_in addr;
    memset (&addr, 0, sizeof (addr));
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
    addr.sin_port = 0;
    rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr));
    wsa_assert (rc != SOCKET_ERROR);
Martin Sustrik's avatar
Martin Sustrik committed
283

284 285 286 287
    //  Retrieve local port listener is bound to (into addr).
    int addrlen = sizeof (addr);
    rc = getsockname (listener, (struct sockaddr*) &addr, &addrlen);
    wsa_assert (rc != SOCKET_ERROR);
Martin Sustrik's avatar
Martin Sustrik committed
288

289 290 291
    //  Listen for incomming connections.
    rc = listen (listener, 1);
    wsa_assert (rc != SOCKET_ERROR);
292

293 294 295
    //  Create the writer socket.
    *w_ = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0,  0);
    wsa_assert (*w_ != INVALID_SOCKET);
Martin Sustrik's avatar
Martin Sustrik committed
296

297 298 299 300
    //  Set TCP_NODELAY on writer socket.
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY,
        (char *)&tcp_nodelay, sizeof (tcp_nodelay));
    wsa_assert (rc != SOCKET_ERROR);
Martin Sustrik's avatar
Martin Sustrik committed
301

302 303 304
    //  Connect writer to the listener.
    rc = connect (*w_, (sockaddr *) &addr, sizeof (addr));
    wsa_assert (rc != SOCKET_ERROR);
Martin Sustrik's avatar
Martin Sustrik committed
305

306 307 308 309 310 311 312 313 314
    //  Accept connection from writer.
    *r_ = accept (listener, NULL, NULL);
    wsa_assert (*r_ != INVALID_SOCKET);

    //  We don't need the listening socket anymore. Close it.
    rc = closesocket (listener);
    wsa_assert (rc != SOCKET_ERROR);

    return 0;
Martin Sustrik's avatar
Martin Sustrik committed
315

316
#elif defined ZMQ_HAVE_OPENVMS
Martin Sustrik's avatar
Martin Sustrik committed
317

318 319 320 321 322 323 324
    //  Whilst OpenVMS supports socketpair - it maps to AF_INET only.  Further,
    //  it does not set the socket options TCP_NODELAY and TCP_NODELACK which
    //  can lead to performance problems.
    //
    //  The bug will be fixed in V5.6 ECO4 and beyond.  In the meantime, we'll
    //  create the socket pair manually.
    sockaddr_in lcladdr;
Martin Sustrik's avatar
Martin Sustrik committed
325 326
    memset (&lcladdr, 0, sizeof (lcladdr));
    lcladdr.sin_family = AF_INET;
327 328
    lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
    lcladdr.sin_port = 0;
Martin Sustrik's avatar
Martin Sustrik committed
329

330
    int listener = socket (AF_INET, SOCK_STREAM, 0);
Martin Sustrik's avatar
Martin Sustrik committed
331 332
    errno_assert (listener != -1);

333 334
    int on = 1;
    int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
Martin Sustrik's avatar
Martin Sustrik committed
335 336 337 338 339 340 341
    errno_assert (rc != -1);

    rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on));
    errno_assert (rc != -1);

    rc = bind(listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr));
    errno_assert (rc != -1);
342 343

    socklen_t lcladdr_len = sizeof (lcladdr);
Martin Sustrik's avatar
Martin Sustrik committed
344 345 346 347 348 349 350

    rc = getsockname (listener, (struct sockaddr*) &lcladdr, &lcladdr_len);
    errno_assert (rc != -1);

    rc = listen (listener, 1);
    errno_assert (rc != -1);

351 352
    *w_ = socket (AF_INET, SOCK_STREAM, 0);
    errno_assert (*w_ != -1);
Martin Sustrik's avatar
Martin Sustrik committed
353

354
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
Martin Sustrik's avatar
Martin Sustrik committed
355 356
    errno_assert (rc != -1);

357
    rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on));
Martin Sustrik's avatar
Martin Sustrik committed
358 359
    errno_assert (rc != -1);

360
    rc = connect (*w_, (struct sockaddr*) &lcladdr, sizeof (lcladdr));
Martin Sustrik's avatar
Martin Sustrik committed
361 362
    errno_assert (rc != -1);

363 364
    *r_ = accept (listener, NULL, NULL);
    errno_assert (*r_ != -1);
Martin Sustrik's avatar
Martin Sustrik committed
365 366 367 368

    close (listener);

    return 0;
369 370 371 372 373 374 375 376 377

#else // All other implementations support socketpair()

    int sv [2];
    int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
    errno_assert (rc == 0);
    *w_ = sv [0];
    *r_ = sv [1];
    return 0;
Martin Sustrik's avatar
Martin Sustrik committed
378 379

#endif
380
}
Martin Sustrik's avatar
Martin Sustrik committed
381