signaler.cpp 8.64 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2010 iMatix Corporation
Martin Sustrik's avatar
Martin Sustrik committed
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the Lesser GNU General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    Lesser GNU General Public License for more details.

    You should have received a copy of the Lesser GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20
#include "signaler.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
21 22 23
#include "platform.hpp"
#include "err.hpp"
#include "fd.hpp"
unknown's avatar
unknown committed
24
#include "ip.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
25

Martin Sustrik's avatar
Martin Sustrik committed
26
#if defined ZMQ_HAVE_OPENVMS
Martin Sustrik's avatar
Martin Sustrik committed
27
#include <netinet/tcp.h>
Martin Sustrik's avatar
Martin Sustrik committed
28
#elif defined ZMQ_HAVE_WINDOWS 
Martin Sustrik's avatar
Martin Sustrik committed
29 30 31 32
#include "windows.hpp"
#else
#include <unistd.h>
#include <fcntl.h>
33
#include <limits.h>
Martin Sustrik's avatar
Martin Sustrik committed
34 35
#endif

36 37 38 39 40
zmq::fd_t zmq::signaler_t::get_fd ()
{
    return r;
}

41
#if defined ZMQ_HAVE_WINDOWS
Martin Sustrik's avatar
Martin Sustrik committed
42

43
zmq::signaler_t::signaler_t () :
Martin Sustrik's avatar
Martin Sustrik committed
44
{
Martin Sustrik's avatar
Martin Sustrik committed
45 46
    //  Windows have no 'socketpair' function. CreatePipe is no good as pipe
    //  handles cannot be polled on. Here we create the socketpair by hand.
47

Martin Sustrik's avatar
Martin Sustrik committed
48 49 50 51 52 53 54 55 56 57 58 59
    struct sockaddr_in addr;
    SOCKET listener;
    int addrlen = sizeof (addr);
           
    w = INVALID_SOCKET; 
    r = INVALID_SOCKET;
    
    fd_t rcs = (listener = socket (AF_INET, SOCK_STREAM, 0));
    wsa_assert (rcs != INVALID_SOCKET);

    memset (&addr, 0, sizeof (addr));
    addr.sin_family = AF_INET;
60 61
    addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
    addr.sin_port = 0;
Martin Sustrik's avatar
Martin Sustrik committed
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
            
    int rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr));
    wsa_assert (rc != SOCKET_ERROR);

    rc = getsockname (listener, (struct sockaddr*) &addr, &addrlen);
    wsa_assert (rc != SOCKET_ERROR);
            
    //  Listen for incomming connections.
    rc = listen (listener, 1);
    wsa_assert (rc != SOCKET_ERROR);
                     
    //  Create the socket.
    w = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0,  0);
    wsa_assert (w != INVALID_SOCKET);
                      
    //  Connect to the remote peer.
    rc = connect (w, (sockaddr *) &addr, sizeof (addr));
    wsa_assert (rc != SOCKET_ERROR);
                                    
81
    //  Accept connection from w.
Martin Sustrik's avatar
Martin Sustrik committed
82 83
    r = accept (listener, NULL, NULL);
    wsa_assert (r != INVALID_SOCKET);
Martin Sustrik's avatar
Martin Sustrik committed
84 85

    //  Set the read site of the pair to non-blocking mode.
86 87 88
    unsigned long argp = 1;
    rc = ioctlsocket (r, FIONBIO, &argp);
    wsa_assert (rc != SOCKET_ERROR);
Martin Sustrik's avatar
Martin Sustrik committed
89 90

    //  We don't need the listening socket anymore. Close it.
Martin Sustrik's avatar
Martin Sustrik committed
91 92 93 94
    rc = closesocket (listener);
    wsa_assert (rc != SOCKET_ERROR);
}

95
zmq::signaler_t::~signaler_t ()
Martin Sustrik's avatar
Martin Sustrik committed
96 97 98 99 100 101 102 103
{
    int rc = closesocket (w);
    wsa_assert (rc != SOCKET_ERROR);

    rc = closesocket (r);
    wsa_assert (rc != SOCKET_ERROR);
}

104
void zmq::signaler_t::send (const command_t &cmd_)
Martin Sustrik's avatar
Martin Sustrik committed
105
{
106 107
    //  TODO: Note that send is a blocking operation.
    //  How should we behave if the signal cannot be written to the signaler?
108 109
    //  Even worse: What if half of a command is written?
    int rc = send (w, (char*) &cmd_, sizeof (command_t), 0);
Martin Sustrik's avatar
Martin Sustrik committed
110
    win_assert (rc != SOCKET_ERROR);
111
    zmq_assert (rc == sizeof (command_t));
Martin Sustrik's avatar
Martin Sustrik committed
112 113
}

114
bool zmq::signaler_t::recv (command_t *cmd_, bool block_)
Martin Sustrik's avatar
Martin Sustrik committed
115
{
116
    if (block_) {
117

118 119 120 121 122
        //  Switch to blocking mode.
        unsigned long argp = 0;
        int rc = ioctlsocket (r, FIONBIO, &argp);
        wsa_assert (rc != SOCKET_ERROR);
    }
123

124 125 126 127 128 129 130
    bool result;
    int nbytes = recv (r, (char*) cmd_, sizeof (command_t), 0);
    if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) {
        result = false;
    }
    else {
        wsa_assert (nbytes != -1);
131

132 133
        //  Check whether we haven't got half of a signal.
        zmq_assert (nbytes % sizeof (uint32_t) == 0);
Martin Sustrik's avatar
Martin Sustrik committed
134

135
        result = true;
Martin Sustrik's avatar
Martin Sustrik committed
136 137
    }

138
    if (block_) {
139

140 141 142 143 144
        //  Switch back to non-blocking mode.
        unsigned long argp = 1;
        int rc = ioctlsocket (r, FIONBIO, &argp);
        wsa_assert (rc != SOCKET_ERROR);
    }
145

146
    return result;
Martin Sustrik's avatar
Martin Sustrik committed
147 148
}

149 150 151 152 153
#elif defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX

#include <sys/types.h>
#include <sys/socket.h>

154
zmq::signaler_t::signaler_t ()
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
{
    int sv [2];
    int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
    errno_assert (rc == 0);
    w = sv [0];
    r = sv [1];

    //  Set the reader to non-blocking mode.
    int flags = fcntl (r, F_GETFL, 0);
    if (flags == -1)
        flags = 0;
    rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
    errno_assert (rc != -1);
}

170
zmq::signaler_t::~signaler_t ()
171 172 173 174 175
{
    close (w);
    close (r);
}

176
void zmq::signaler_t::send (const command_t &cmd_)
177
{
178
    ssize_t nbytes = send (w, &cmd_, sizeof (command_t), 0);
179
    errno_assert (nbytes != -1);
180
    zmq_assert (nbytes == sizeof (command_t));
181 182
}

183
bool zmq::signaler_t::recv (command_t &cmd_, bool block_)
184
{
185 186 187 188 189 190 191 192 193
    if (block_) {

        //  Set the reader to blocking mode.
        int flags = fcntl (r, F_GETFL, 0);
        if (flags == -1)
            flags = 0;
        int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK);
        errno_assert (rc != -1);
    }
194

195 196 197 198 199 200 201
    bool result;
    ssize_t nbytes = recv (r, buffer, sizeof (command_t), 0);
    if (nbytes == -1 && errno == EAGAIN) {
        result = false;
    }
    else {
        zmq_assert (nbytes != -1);
202

203 204
        //  Check whether we haven't got half of command.
        zmq_assert (nbytes == sizeof (command_t));
205

206
        result = true;
207
    }
208

209 210 211 212 213 214 215 216 217
   if (block_)

        //  Set the reader to non-blocking mode.
        int flags = fcntl (r, F_GETFL, 0);
        if (flags == -1)
            flags = 0;
        int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
        errno_assert (rc != -1);
    }
218

219
    return result;
220 221
}

Martin Sustrik's avatar
Martin Sustrik committed
222 223 224 225 226
#else

#include <sys/types.h>
#include <sys/socket.h>

227
zmq::signaler_t::signaler_t ()
Martin Sustrik's avatar
Martin Sustrik committed
228
{
229 230 231 232 233
    //  Make sure that command can be written to the socket in atomic fashion.
    //  If this wasn't guaranteed, commands from different threads would be
    //  interleaved.
    zmq_assert (sizeof (command_t) <= PIPE_BUF);

Martin Sustrik's avatar
Martin Sustrik committed
234 235 236 237 238 239 240
    int sv [2];
    int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
    errno_assert (rc == 0);
    w = sv [0];
    r = sv [1];
}

241
zmq::signaler_t::~signaler_t ()
Martin Sustrik's avatar
Martin Sustrik committed
242 243 244 245 246
{
    close (w);
    close (r);
}

247
void zmq::signaler_t::send (const command_t &cmd_)
Martin Sustrik's avatar
Martin Sustrik committed
248
{
249
    //  TODO: Note that send is a blocking operation.
250 251
    //  How should we behave if the command cannot be written to the signaler?
    ssize_t nbytes = ::send (w, &cmd_, sizeof (command_t), 0);
252
    errno_assert (nbytes != -1);
253 254 255 256

    //  This should never happen as we've already checked that command size is
    //  less than PIPE_BUF.
    zmq_assert (nbytes == sizeof (command_t));
Martin Sustrik's avatar
Martin Sustrik committed
257 258
}

259
bool zmq::signaler_t::recv (command_t *cmd_, bool block_)
260
{
261 262
    ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t),
        block_ ? 0 : MSG_DONTWAIT);
263

264 265 266
    //  If there's no signal available return false.
    if (nbytes == -1 && errno == EAGAIN)
        return false;
267

268
    errno_assert (nbytes != -1);
Martin Sustrik's avatar
Martin Sustrik committed
269

270 271
    //  Check whether we haven't got half of command.
    zmq_assert (nbytes == sizeof (command_t));
272

273
    return true;
Martin Sustrik's avatar
Martin Sustrik committed
274 275 276 277
}

#endif

Martin Sustrik's avatar
Martin Sustrik committed
278
#if defined ZMQ_HAVE_OPENVMS
Martin Sustrik's avatar
Martin Sustrik committed
279

280
int zmq::signaler_t::socketpair (int domain_, int type_, int protocol_,
Martin Sustrik's avatar
Martin Sustrik committed
281 282 283 284 285 286 287 288
    int sv_ [2])
{
    int listener;
    sockaddr_in lcladdr;
    socklen_t lcladdr_len;
    int rc;
    int on = 1;

Martin Sustrik's avatar
Martin Sustrik committed
289
    zmq_assert (type_ == SOCK_STREAM);
Martin Sustrik's avatar
Martin Sustrik committed
290 291 292 293

    //  Fill in the localhost address (127.0.0.1).
    memset (&lcladdr, 0, sizeof (lcladdr));
    lcladdr.sin_family = AF_INET;
294 295
    lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
    lcladdr.sin_port = 0;
Martin Sustrik's avatar
Martin Sustrik committed
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338

    listener = socket (AF_INET, SOCK_STREAM, 0);
    errno_assert (listener != -1);

    rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
    errno_assert (rc != -1);

    rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on));
    errno_assert (rc != -1);

    rc = bind(listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr));
    errno_assert (rc != -1);
        
    lcladdr_len = sizeof (lcladdr);

    rc = getsockname (listener, (struct sockaddr*) &lcladdr, &lcladdr_len);
    errno_assert (rc != -1);

    rc = listen (listener, 1);
    errno_assert (rc != -1);

    sv_ [0] = socket (AF_INET, SOCK_STREAM, 0);
    errno_assert (rc != -1);

    rc = setsockopt (sv_ [0], IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
    errno_assert (rc != -1);

    rc = setsockopt (sv_ [0], IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on));
    errno_assert (rc != -1);

    rc = connect (sv_ [0], (struct sockaddr*) &lcladdr, sizeof (lcladdr));
    errno_assert (rc != -1);

    sv_ [1] = accept (listener, NULL, NULL);
    errno_assert (sv_ [1] != -1);

    close (listener);

    return 0;
}

#endif