fd_signaler.cpp 10.2 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2010 iMatix Corporation
Martin Sustrik's avatar
Martin Sustrik committed
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the Lesser GNU General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    Lesser GNU General Public License for more details.

    You should have received a copy of the Lesser GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "fd_signaler.hpp"
#include "platform.hpp"
#include "err.hpp"
#include "fd.hpp"
unknown's avatar
unknown committed
24
#include "ip.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
25

Martin Sustrik's avatar
Martin Sustrik committed
26
#if defined ZMQ_HAVE_OPENVMS
Martin Sustrik's avatar
Martin Sustrik committed
27
#include <netinet/tcp.h>
Martin Sustrik's avatar
Martin Sustrik committed
28
#elif defined ZMQ_HAVE_WINDOWS 
Martin Sustrik's avatar
Martin Sustrik committed
29 30 31 32 33 34
#include "windows.hpp"
#else
#include <unistd.h>
#include <fcntl.h>
#endif

Martin Sustrik's avatar
Martin Sustrik committed
35
#if defined ZMQ_HAVE_EVENTFD
Martin Sustrik's avatar
Martin Sustrik committed
36 37 38

#include <sys/eventfd.h>

Martin Sustrik's avatar
Martin Sustrik committed
39
zmq::fd_signaler_t::fd_signaler_t ()
Martin Sustrik's avatar
Martin Sustrik committed
40 41 42 43 44 45 46 47 48 49 50 51 52
{
    //  Create eventfd object.
    fd = eventfd (0, 0);
    errno_assert (fd != -1);

    //  Set to non-blocking mode.
    int flags = fcntl (fd, F_GETFL, 0);
    if (flags == -1)
        flags = 0;
    int rc = fcntl (fd, F_SETFL, flags | O_NONBLOCK);
    errno_assert (rc != -1);
}

Martin Sustrik's avatar
Martin Sustrik committed
53
zmq::fd_signaler_t::~fd_signaler_t ()
Martin Sustrik's avatar
Martin Sustrik committed
54 55 56 57 58
{
    int rc = close (fd);
    errno_assert (rc != -1);
}

Martin Sustrik's avatar
Martin Sustrik committed
59
void zmq::fd_signaler_t::signal (int signal_)
Martin Sustrik's avatar
Martin Sustrik committed
60
{
Martin Sustrik's avatar
Martin Sustrik committed
61
    zmq_assert (signal_ >= 0 && signal_ < 64);
62
    uint64_t inc = 1;
Martin Sustrik's avatar
Martin Sustrik committed
63
    inc <<= signal_;
64 65
    ssize_t sz = write (fd, &inc, sizeof (uint64_t));
    errno_assert (sz == sizeof (uint64_t));
Martin Sustrik's avatar
Martin Sustrik committed
66 67
}

68 69
uint64_t zmq::fd_signaler_t::poll ()
{
70 71 72 73 74 75 76 77 78 79 80
    //  Set to blocking mode.
    int flags = fcntl (fd, F_GETFL, 0);
    if (flags == -1)
        flags = 0;
    int rc = fcntl (fd, F_SETFL, flags & ~O_NONBLOCK);
    errno_assert (rc != -1);

    uint64_t signals;
    ssize_t sz;
    while (true) {
        sz = read (fd, &signals, sizeof (uint64_t));
Martin Sustrik's avatar
Martin Sustrik committed
81 82 83
        if (sz == -1) {
            if (errno == EAGAIN || errno == EINTR)
                continue;
Martin Sustrik's avatar
Martin Sustrik committed
84
            errno_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
85 86
        }
        break;
87 88 89 90 91 92 93
    }

    //  Set to non-blocking mode.
    rc = fcntl (fd, F_SETFL, flags | O_NONBLOCK);
    errno_assert (rc != -1);

    return signals;
94 95
}

96
uint64_t zmq::fd_signaler_t::check ()
Martin Sustrik's avatar
Martin Sustrik committed
97
{
98 99
    uint64_t signals;
    ssize_t sz = read (fd, &signals, sizeof (uint64_t));
100
    if (sz == -1 && (errno == EAGAIN || errno == EINTR))
Martin Sustrik's avatar
Martin Sustrik committed
101 102
        return 0;
    errno_assert (sz != -1);
103
    return signals;
Martin Sustrik's avatar
Martin Sustrik committed
104 105
}

Martin Sustrik's avatar
Martin Sustrik committed
106
zmq::fd_t zmq::fd_signaler_t::get_fd ()
Martin Sustrik's avatar
Martin Sustrik committed
107 108 109 110
{
    return fd;
}

Martin Sustrik's avatar
Martin Sustrik committed
111
#elif defined ZMQ_HAVE_WINDOWS
Martin Sustrik's avatar
Martin Sustrik committed
112

Martin Sustrik's avatar
Martin Sustrik committed
113
zmq::fd_signaler_t::fd_signaler_t ()
Martin Sustrik's avatar
Martin Sustrik committed
114
{
Martin Sustrik's avatar
Martin Sustrik committed
115 116
    //  Windows have no 'socketpair' function. CreatePipe is no good as pipe
    //  handles cannot be polled on. Here we create the socketpair by hand.
117

Martin Sustrik's avatar
Martin Sustrik committed
118 119 120 121 122 123 124 125 126 127 128 129
    struct sockaddr_in addr;
    SOCKET listener;
    int addrlen = sizeof (addr);
           
    w = INVALID_SOCKET; 
    r = INVALID_SOCKET;
    
    fd_t rcs = (listener = socket (AF_INET, SOCK_STREAM, 0));
    wsa_assert (rcs != INVALID_SOCKET);

    memset (&addr, 0, sizeof (addr));
    addr.sin_family = AF_INET;
130 131
    addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
    addr.sin_port = 0;
Martin Sustrik's avatar
Martin Sustrik committed
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
            
    int rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr));
    wsa_assert (rc != SOCKET_ERROR);

    rc = getsockname (listener, (struct sockaddr*) &addr, &addrlen);
    wsa_assert (rc != SOCKET_ERROR);
            
    //  Listen for incomming connections.
    rc = listen (listener, 1);
    wsa_assert (rc != SOCKET_ERROR);
                     
    //  Create the socket.
    w = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0,  0);
    wsa_assert (w != INVALID_SOCKET);
                      
    //  Connect to the remote peer.
    rc = connect (w, (sockaddr *) &addr, sizeof (addr));
    wsa_assert (rc != SOCKET_ERROR);
                                    
151
    //  Accept connection from w.
Martin Sustrik's avatar
Martin Sustrik committed
152 153
    r = accept (listener, NULL, NULL);
    wsa_assert (r != INVALID_SOCKET);
Martin Sustrik's avatar
Martin Sustrik committed
154 155 156 157 158 159 160

    //  Set the read site of the pair to non-blocking mode.
    //unsigned long argp = 1;
    //rc = ioctlsocket (r, FIONBIO, &argp);
    //wsa_assert (rc != SOCKET_ERROR);

    //  We don't need the listening socket anymore. Close it.
Martin Sustrik's avatar
Martin Sustrik committed
161 162 163 164
    rc = closesocket (listener);
    wsa_assert (rc != SOCKET_ERROR);
}

Martin Sustrik's avatar
Martin Sustrik committed
165
zmq::fd_signaler_t::~fd_signaler_t ()
Martin Sustrik's avatar
Martin Sustrik committed
166 167 168 169 170 171 172 173
{
    int rc = closesocket (w);
    wsa_assert (rc != SOCKET_ERROR);

    rc = closesocket (r);
    wsa_assert (rc != SOCKET_ERROR);
}

Martin Sustrik's avatar
Martin Sustrik committed
174
void zmq::fd_signaler_t::signal (int signal_)
Martin Sustrik's avatar
Martin Sustrik committed
175
{
176 177 178
    //  TODO: Note that send is a blocking operation.
    //  How should we behave if the signal cannot be written to the signaler?

Martin Sustrik's avatar
Martin Sustrik committed
179
    zmq_assert (signal_ >= 0 && signal_ < 64);
Martin Sustrik's avatar
Martin Sustrik committed
180 181 182 183 184
    char c = (char) signal_;
    int rc = send (w, &c, 1, 0);
    win_assert (rc != SOCKET_ERROR);
}

185
uint64_t zmq::fd_signaler_t::poll ()
Martin Sustrik's avatar
Martin Sustrik committed
186
{
Martin Sustrik's avatar
Martin Sustrik committed
187 188 189 190
    //  TODO: Can we do a blocking read on non-blocking socket?
    //  It's not needed as for now, so let it stay unimplemented.
    zmq_assert (false);
    return 0;
191 192 193 194
}

uint64_t zmq::fd_signaler_t::check ()
{
Martin Sustrik's avatar
Martin Sustrik committed
195 196 197
    unsigned char buffer [32];
    int nbytes = recv (r, (char*) buffer, 32, 0);
    if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK)
198
        return 0;
Martin Sustrik's avatar
Martin Sustrik committed
199
    wsa_assert (nbytes != -1);
Martin Sustrik's avatar
Martin Sustrik committed
200

201
    uint64_t signals = 0;
Martin Sustrik's avatar
Martin Sustrik committed
202
    for (int pos = 0; pos != nbytes; pos++) {
Martin Sustrik's avatar
Martin Sustrik committed
203
        zmq_assert (buffer [pos] < 64);
204
        signals |= (uint64_t (1) << (buffer [pos]));
Martin Sustrik's avatar
Martin Sustrik committed
205 206 207 208
    }
    return signals;
}

Martin Sustrik's avatar
Martin Sustrik committed
209
zmq::fd_t zmq::fd_signaler_t::get_fd ()
Martin Sustrik's avatar
Martin Sustrik committed
210 211 212 213
{
    return r;
}

214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251
#elif defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX

#include <sys/types.h>
#include <sys/socket.h>

zmq::fd_signaler_t::fd_signaler_t ()
{
    int sv [2];
    int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
    errno_assert (rc == 0);
    w = sv [0];
    r = sv [1];

    //  Set the reader to non-blocking mode.
    int flags = fcntl (r, F_GETFL, 0);
    if (flags == -1)
        flags = 0;
    rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
    errno_assert (rc != -1);
}

zmq::fd_signaler_t::~fd_signaler_t ()
{
    close (w);
    close (r);
}

void zmq::fd_signaler_t::signal (int signal_)
{
    zmq_assert (signal_ >= 0 && signal_ < 64);
    unsigned char c = (unsigned char) signal_;
    ssize_t nbytes = send (w, &c, 1, 0);
    errno_assert (nbytes == 1);
}

uint64_t zmq::fd_signaler_t::poll ()
{
    //  Set the reader to blocking mode.
252
    int flags = fcntl (r, F_GETFL, 0);
253 254
    if (flags == -1)
        flags = 0;
255
    int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK);
256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291
    errno_assert (rc != -1);

    //  Poll for events.
    uint64_t signals = check ();

    //  Set the reader to non-blocking mode.
    flags = fcntl (r, F_GETFL, 0);
    if (flags == -1)
        flags = 0;
    rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
    errno_assert (rc != -1);

    return signals;
}

uint64_t zmq::fd_signaler_t::check ()
{
    unsigned char buffer [64];
    ssize_t nbytes = recv (r, buffer, 64, 0);
    if (nbytes == -1 && errno == EAGAIN)
        return 0;
    zmq_assert (nbytes != -1);

    uint64_t signals = 0;
    for (int pos = 0; pos != nbytes; pos ++) {
        zmq_assert (buffer [pos] < 64);
        signals |= (uint64_t (1) << (buffer [pos]));
    }
    return signals;
}

zmq::fd_t zmq::fd_signaler_t::get_fd ()
{
    return r;
}

Martin Sustrik's avatar
Martin Sustrik committed
292 293 294 295 296
#else

#include <sys/types.h>
#include <sys/socket.h>

Martin Sustrik's avatar
Martin Sustrik committed
297
zmq::fd_signaler_t::fd_signaler_t ()
Martin Sustrik's avatar
Martin Sustrik committed
298 299 300 301 302 303 304 305
{
    int sv [2];
    int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
    errno_assert (rc == 0);
    w = sv [0];
    r = sv [1];
}

Martin Sustrik's avatar
Martin Sustrik committed
306
zmq::fd_signaler_t::~fd_signaler_t ()
Martin Sustrik's avatar
Martin Sustrik committed
307 308 309 310 311
{
    close (w);
    close (r);
}

Martin Sustrik's avatar
Martin Sustrik committed
312
void zmq::fd_signaler_t::signal (int signal_)
Martin Sustrik's avatar
Martin Sustrik committed
313
{
314 315 316
    //  TODO: Note that send is a blocking operation.
    //  How should we behave if the signal cannot be written to the signaler?

Martin Sustrik's avatar
Martin Sustrik committed
317
    zmq_assert (signal_ >= 0 && signal_ < 64);
Martin Sustrik's avatar
Martin Sustrik committed
318 319 320 321 322
    unsigned char c = (unsigned char) signal_;
    ssize_t nbytes = send (w, &c, 1, 0);
    errno_assert (nbytes == 1);
}

323 324
uint64_t zmq::fd_signaler_t::poll ()
{
325 326 327
    unsigned char buffer [64];
    ssize_t nbytes = recv (r, buffer, 64, 0);
    zmq_assert (nbytes != -1);
328

329 330 331 332 333 334
    uint64_t signals = 0;
    for (int pos = 0; pos != nbytes; pos ++) {
        zmq_assert (buffer [pos] < 64);
        signals |= (uint64_t (1) << (buffer [pos]));
    }
    return signals;
335 336 337
}

uint64_t zmq::fd_signaler_t::check ()
Martin Sustrik's avatar
Martin Sustrik committed
338
{
339 340
    unsigned char buffer [64];
    ssize_t nbytes = recv (r, buffer, 64, MSG_DONTWAIT);
341 342 343
    if (nbytes == -1 && errno == EAGAIN)
        return 0;
    zmq_assert (nbytes != -1);
344 345

    uint64_t signals = 0;
Martin Sustrik's avatar
Martin Sustrik committed
346
    for (int pos = 0; pos != nbytes; pos ++) {
Martin Sustrik's avatar
Martin Sustrik committed
347
        zmq_assert (buffer [pos] < 64);
348
        signals |= (uint64_t (1) << (buffer [pos]));
Martin Sustrik's avatar
Martin Sustrik committed
349 350 351 352
    }
    return signals;
}

Martin Sustrik's avatar
Martin Sustrik committed
353
zmq::fd_t zmq::fd_signaler_t::get_fd ()
Martin Sustrik's avatar
Martin Sustrik committed
354 355 356 357 358 359
{
    return r;
}

#endif

Martin Sustrik's avatar
Martin Sustrik committed
360
#if defined ZMQ_HAVE_OPENVMS
Martin Sustrik's avatar
Martin Sustrik committed
361

Martin Sustrik's avatar
Martin Sustrik committed
362
int zmq::fd_signaler_t::socketpair (int domain_, int type_, int protocol_,
Martin Sustrik's avatar
Martin Sustrik committed
363 364 365 366 367 368 369 370
    int sv_ [2])
{
    int listener;
    sockaddr_in lcladdr;
    socklen_t lcladdr_len;
    int rc;
    int on = 1;

Martin Sustrik's avatar
Martin Sustrik committed
371
    zmq_assert (type_ == SOCK_STREAM);
Martin Sustrik's avatar
Martin Sustrik committed
372 373 374 375

    //  Fill in the localhost address (127.0.0.1).
    memset (&lcladdr, 0, sizeof (lcladdr));
    lcladdr.sin_family = AF_INET;
376 377
    lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
    lcladdr.sin_port = 0;
Martin Sustrik's avatar
Martin Sustrik committed
378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420

    listener = socket (AF_INET, SOCK_STREAM, 0);
    errno_assert (listener != -1);

    rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
    errno_assert (rc != -1);

    rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on));
    errno_assert (rc != -1);

    rc = bind(listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr));
    errno_assert (rc != -1);
        
    lcladdr_len = sizeof (lcladdr);

    rc = getsockname (listener, (struct sockaddr*) &lcladdr, &lcladdr_len);
    errno_assert (rc != -1);

    rc = listen (listener, 1);
    errno_assert (rc != -1);

    sv_ [0] = socket (AF_INET, SOCK_STREAM, 0);
    errno_assert (rc != -1);

    rc = setsockopt (sv_ [0], IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
    errno_assert (rc != -1);

    rc = setsockopt (sv_ [0], IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on));
    errno_assert (rc != -1);

    rc = connect (sv_ [0], (struct sockaddr*) &lcladdr, sizeof (lcladdr));
    errno_assert (rc != -1);

    sv_ [1] = accept (listener, NULL, NULL);
    errno_assert (sv_ [1] != -1);

    close (listener);

    return 0;
}

#endif