stream_engine.cpp 26.4 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "platform.hpp"
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#else
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <netdb.h>
#include <fcntl.h>
#endif

#include <string.h>
#include <new>
35
#include <sstream>
36

37
#include "stream_engine.hpp"
38
#include "io_thread.hpp"
39
#include "session_base.hpp"
40 41
#include "v1_encoder.hpp"
#include "v1_decoder.hpp"
42 43
#include "v2_encoder.hpp"
#include "v2_decoder.hpp"
44
#include "null_mechanism.hpp"
45
#include "plain_mechanism.hpp"
46 47
#include "curve_client.hpp"
#include "curve_server.hpp"
48 49
#include "raw_decoder.hpp"
#include "raw_encoder.hpp"
50 51
#include "config.hpp"
#include "err.hpp"
52
#include "ip.hpp"
Martin Hurton's avatar
Martin Hurton committed
53 54
#include "likely.hpp"
#include "wire.hpp"
55

56 57
zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, 
                                       const std::string &endpoint_) :
58
    s (fd_),
59 60
    inpos (NULL),
    insize (0),
61
    decoder (NULL),
62 63
    outpos (NULL),
    outsize (0),
64
    encoder (NULL),
Martin Hurton's avatar
Martin Hurton committed
65
    handshaking (true),
66
    greeting_size (v2_greeting_size),
Martin Hurton's avatar
Martin Hurton committed
67
    greeting_bytes_read (0),
68 69
    session (NULL),
    options (options_),
70
    endpoint (endpoint_),
71
    plugged (false),
72 73
    read_msg (&stream_engine_t::read_identity),
    write_msg (&stream_engine_t::write_identity),
74 75
    io_error (false),
    subscription_required (false),
76
    mechanism (NULL),
77 78
    input_stopped (false),
    output_stopped (false),
79
    socket (NULL)
80
{
81 82
    int rc = tx_msg.init ();
    errno_assert (rc == 0);
83
    
Martin Hurton's avatar
Martin Hurton committed
84
    //  Put the socket into non-blocking mode.
85
    unblock_socket (s);
86

87 88
    int family = get_peer_ip_address (s, peer_address);
    if (family == 0)
89
        peer_address = "";
90
#if defined ZMQ_HAVE_SO_PEERCRED
91 92
    else
    if (family == PF_UNIX) {
93 94 95 96 97 98 99 100 101
        struct ucred cred;
        socklen_t size = sizeof (cred);
        if (!getsockopt (s, SOL_SOCKET, SO_PEERCRED, &cred, &size)) {
            std::ostringstream buf;
            buf << ":" << cred.uid << ":" << cred.gid << ":" << cred.pid;
            peer_address += buf.str ();
        }
    }
#elif defined ZMQ_HAVE_LOCAL_PEERCRED
102 103
    else
    if (family == PF_UNIX) {
104 105 106 107 108 109 110 111 112 113 114 115 116
        struct xucred cred;
        socklen_t size = sizeof (cred);
        if (!getsockopt (s, 0, LOCAL_PEERCRED, &cred, &size)
                && cred.cr_version == XUCRED_VERSION) {
            std::ostringstream buf;
            buf << ":" << cred.cr_uid << ":";
            if (cred.cr_ngroups > 0)
                buf << cred.cr_groups[0];
            buf << ":";
            peer_address += buf.str ();
        }
    }
#endif
117

118
#ifdef SO_NOSIGPIPE
119 120 121
    //  Make sure that SIGPIPE signal is not generated when writing to a
    //  connection that was already closed by the peer.
    int set = 1;
122
    rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int));
123 124
    errno_assert (rc == 0);
#endif
125 126
}

127
zmq::stream_engine_t::~stream_engine_t ()
128 129 130
{
    zmq_assert (!plugged);

131 132
    if (s != retired_fd) {
#ifdef ZMQ_HAVE_WINDOWS
133 134
        int rc = closesocket (s);
        wsa_assert (rc != SOCKET_ERROR);
135
#else
136
        int rc = close (s);
137 138
        errno_assert (rc == 0);
#endif
139
        s = retired_fd;
140
    }
141

142 143 144
    int rc = tx_msg.close ();
    errno_assert (rc == 0);

145 146 147
    delete encoder;
    delete decoder;
    delete mechanism;
148 149
}

150 151
void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
    session_base_t *session_)
152 153 154 155 156 157 158 159
{
    zmq_assert (!plugged);
    plugged = true;

    //  Connect to session object.
    zmq_assert (!session);
    zmq_assert (session_);
    session = session_;
160
    socket = session-> get_socket ();
161 162 163 164

    //  Connect to I/O threads poller object.
    io_object_t::plug (io_thread_);
    handle = add_fd (s);
165
    io_error = false;
Martin Hurton's avatar
Martin Hurton committed
166

Martin Hurton's avatar
Martin Hurton committed
167
    if (options.raw_sock) {
168
        // no handshaking for raw sock, instantiate raw encoder and decoders
169
        encoder = new (std::nothrow) raw_encoder_t (out_batch_size);
170 171
        alloc_assert (encoder);

172
        decoder = new (std::nothrow) raw_decoder_t (in_batch_size);
173 174 175 176
        alloc_assert (decoder);

        // disable handshaking for raw socket
        handshaking = false;
177 178 179

        read_msg = &stream_engine_t::pull_msg_from_session;
        write_msg = &stream_engine_t::push_msg_to_session;
180 181 182 183 184

        //  For raw sockets, send an initial 0-length message to the
        // application so that it knows a peer has connected.
        msg_t connector;
        connector.init();
185
        (this->*write_msg) (&connector);
186 187
        connector.close();
        session->flush ();
Martin Hurton's avatar
Martin Hurton committed
188 189
    }
    else {
190 191
        //  Send the 'length' and 'flags' fields of the identity message.
        //  The 'length' field is encoded in the long format.
Pieter Hintjens's avatar
Pieter Hintjens committed
192
        outpos = greeting_send;
193 194 195 196 197
        outpos [outsize++] = 0xff;
        put_uint64 (&outpos [outsize], options.identity_size + 1);
        outsize += 8;
        outpos [outsize++] = 0x7f;
    }
Martin Hurton's avatar
Martin Hurton committed
198

199 200 201 202 203 204
    set_pollin (handle);
    set_pollout (handle);
    //  Flush all the data that may have been already received downstream.
    in_event ();
}

205
void zmq::stream_engine_t::unplug ()
206 207 208 209 210
{
    zmq_assert (plugged);
    plugged = false;

    //  Cancel all fd subscriptions.
211
    if (!io_error)
Martin Hurton's avatar
Martin Hurton committed
212
        rm_fd (handle);
213 214 215 216 217 218 219

    //  Disconnect from I/O threads poller object.
    io_object_t::unplug ();

    session = NULL;
}

220
void zmq::stream_engine_t::terminate ()
221 222 223 224 225
{
    unplug ();
    delete this;
}

226
void zmq::stream_engine_t::in_event ()
227
{
228
    zmq_assert (!io_error);
229

Pieter Hintjens's avatar
Pieter Hintjens committed
230
    //  If still handshaking, receive and process the greeting message.
Martin Hurton's avatar
Martin Hurton committed
231 232 233 234
    if (unlikely (handshaking))
        if (!handshake ())
            return;

235
    zmq_assert (decoder);
236 237

    //  If there has been an I/O error, stop polling.
238
    if (input_stopped) {
239 240 241 242
        rm_fd (handle);
        io_error = true;
        return;
    }
243 244 245 246 247 248 249 250

    //  If there's no data to process in the buffer...
    if (!insize) {

        //  Retrieve the buffer and read as much data as possible.
        //  Note that buffer can be arbitrarily large. However, we assume
        //  the underlying TCP layer has fixed buffer size and thus the
        //  number of bytes read will be always limited.
251 252
        size_t bufsize = 0;
        decoder->get_buffer (&inpos, &bufsize);
253

254 255
        int const rc = read (inpos, bufsize);
        if (rc == 0) {
256 257
            error ();
            return;
258
        }
259 260 261 262 263
        if (rc == -1) {
            if (errno != EAGAIN)
                error ();
            return;
        }
264

265
        //  Adjust input size
266
        insize = static_cast <size_t> (rc);
Martin Hurton's avatar
Martin Hurton committed
267
    }
268

269 270
    int rc = 0;
    size_t processed = 0;
271

272 273 274
    while (insize > 0) {
        rc = decoder->decode (inpos, insize, processed);
        zmq_assert (processed <= insize);
275 276
        inpos += processed;
        insize -= processed;
277 278
        if (rc == 0 || rc == -1)
            break;
279
        rc = (this->*write_msg) (decoder->msg ());
280 281
        if (rc == -1)
            break;
282 283
    }

284 285 286 287
    //  Tear down the connection if we have failed to decode input data
    //  or the session has rejected the message.
    if (rc == -1) {
        if (errno != EAGAIN) {
Martin Hurton's avatar
Martin Hurton committed
288
            error ();
289 290
            return;
        }
291
        input_stopped = true;
292
        reset_pollin (handle);
Martin Hurton's avatar
Martin Hurton committed
293
    }
294 295

    session->flush ();
296 297
}

298
void zmq::stream_engine_t::out_event ()
299
{
300 301
    zmq_assert (!io_error);

302 303 304
    //  If write buffer is empty, try to read new data from the encoder.
    if (!outsize) {

Martin Hurton's avatar
Martin Hurton committed
305 306 307 308 309 310 311 312
        //  Even when we stop polling as soon as there is no
        //  data to send, the poller may invoke out_event one
        //  more time due to 'speculative write' optimisation.
        if (unlikely (encoder == NULL)) {
            zmq_assert (handshaking);
            return;
        }

313
        outpos = NULL;
314 315 316
        outsize = encoder->encode (&outpos, 0);

        while (outsize < out_batch_size) {
317
            if ((this->*read_msg) (&tx_msg) == -1)
318 319 320 321 322 323 324 325 326
                break;
            encoder->load_msg (&tx_msg);
            unsigned char *bufptr = outpos + outsize;
            size_t n = encoder->encode (&bufptr, out_batch_size - outsize);
            zmq_assert (n > 0);
            if (outpos == NULL)
                outpos = bufptr;
            outsize += n;
        }
327 328 329

        //  If there is no data to send, stop polling for output.
        if (outsize == 0) {
330
            output_stopped = true;
331 332 333 334 335 336 337
            reset_pollout (handle);
            return;
        }
    }

    //  If there are any data to write in write buffer, write as much as
    //  possible to the socket. Note that amount of data to write can be
338
    //  arbitrarily large. However, we assume that underlying TCP layer has
339 340 341 342
    //  limited transmission buffer and thus the actual number of bytes
    //  written should be reasonably modest.
    int nbytes = write (outpos, outsize);

Martin Hurton's avatar
Martin Hurton committed
343 344
    //  IO error has occurred. We stop waiting for output events.
    //  The engine is not terminated until we detect input error;
345
    //  this is necessary to prevent losing incoming messages.
346
    if (nbytes == -1) {
Martin Hurton's avatar
Martin Hurton committed
347
        reset_pollout (handle);
348 349 350 351 352
        return;
    }

    outpos += nbytes;
    outsize -= nbytes;
Martin Hurton's avatar
Martin Hurton committed
353 354 355 356 357 358

    //  If we are still handshaking and there are no data
    //  to send, stop polling for output.
    if (unlikely (handshaking))
        if (outsize == 0)
            reset_pollout (handle);
359 360
}

361
void zmq::stream_engine_t::restart_output ()
362
{
363 364 365
    if (unlikely (io_error))
        return;

366
    if (likely (output_stopped)) {
367
        set_pollout (handle);
368
        output_stopped = false;
369
    }
370 371 372 373 374 375 376 377

    //  Speculative write: The assumption is that at the moment new message
    //  was sent by the user the socket is probably available for writing.
    //  Thus we try to write the data to socket avoiding polling for POLLOUT.
    //  Consequently, the latency should be better in request/reply scenarios.
    out_event ();
}

378
void zmq::stream_engine_t::restart_input ()
379
{
380
    zmq_assert (input_stopped);
381 382 383
    zmq_assert (session != NULL);
    zmq_assert (decoder != NULL);

384
    int rc = (this->*write_msg) (decoder->msg ());
385 386 387 388 389
    if (rc == -1) {
        if (errno == EAGAIN)
            session->flush ();
        else
            error ();
Martin Hurton's avatar
Martin Hurton committed
390 391 392
        return;
    }

393 394 395 396 397 398 399 400
    while (insize > 0) {
        size_t processed = 0;
        rc = decoder->decode (inpos, insize, processed);
        zmq_assert (processed <= insize);
        inpos += processed;
        insize -= processed;
        if (rc == 0 || rc == -1)
            break;
401
        rc = (this->*write_msg) (decoder->msg ());
402 403 404
        if (rc == -1)
            break;
    }
405

406 407 408 409 410 411
    if (rc == -1 && errno == EAGAIN)
        session->flush ();
    else
    if (rc == -1 || io_error)
        error ();
    else {
412
        input_stopped = false;
413 414 415 416 417 418
        set_pollin (handle);
        session->flush ();

        //  Speculative read.
        in_event ();
    }
419 420
}

421
bool zmq::stream_engine_t::handshake ()
Martin Hurton's avatar
Martin Hurton committed
422
{
423
    zmq_assert (handshaking);
Martin Hurton's avatar
Martin Hurton committed
424
    zmq_assert (greeting_bytes_read < greeting_size);
425
    //  Receive the greeting.
Martin Hurton's avatar
Martin Hurton committed
426
    while (greeting_bytes_read < greeting_size) {
Pieter Hintjens's avatar
Pieter Hintjens committed
427
        const int n = read (greeting_recv + greeting_bytes_read,
Martin Hurton's avatar
Martin Hurton committed
428
                            greeting_size - greeting_bytes_read);
429
        if (n == 0) {
430 431 432
            error ();
            return false;
        }
433 434 435
        if (n == -1) {
            if (errno != EAGAIN)
                error ();
436
            return false;
437
        }
Martin Hurton's avatar
Martin Hurton committed
438 439 440

        greeting_bytes_read += n;

441 442 443
        //  We have received at least one byte from the peer.
        //  If the first byte is not 0xff, we know that the
        //  peer is using unversioned protocol.
Pieter Hintjens's avatar
Pieter Hintjens committed
444
        if (greeting_recv [0] != 0xff)
445
            break;
Martin Hurton's avatar
Martin Hurton committed
446

447
        if (greeting_bytes_read < signature_size)
448
            continue;
Martin Hurton's avatar
Martin Hurton committed
449

450 451 452 453
        //  Inspect the right-most bit of the 10th byte (which coincides
        //  with the 'flags' field if a regular message was sent).
        //  Zero indicates this is a header of identity message
        //  (i.e. the peer is using the unversioned protocol).
Pieter Hintjens's avatar
Pieter Hintjens committed
454
        if (!(greeting_recv [9] & 0x01))
455
            break;
Martin Hurton's avatar
Martin Hurton committed
456

457
        //  The peer is using versioned protocol.
458 459
        //  Send the major version number.
        if (outpos + outsize == greeting_send + signature_size) {
Martin Hurton's avatar
Martin Hurton committed
460 461
            if (outsize == 0)
                set_pollout (handle);
462 463 464 465 466 467 468 469 470 471 472 473 474 475 476
            outpos [outsize++] = 3;     //  Major version number
        }

        if (greeting_bytes_read > signature_size) {
            if (outpos + outsize == greeting_send + signature_size + 1) {
                if (outsize == 0)
                    set_pollout (handle);

                //  Use ZMTP/2.0 to talk to older peers.
                if (greeting_recv [10] == ZMTP_1_0
                ||  greeting_recv [10] == ZMTP_2_0)
                    outpos [outsize++] = options.type;
                else {
                    outpos [outsize++] = 0; //  Minor version number
                    memset (outpos + outsize, 0, 20);
477 478 479 480 481

                    zmq_assert (options.mechanism == ZMQ_NULL
                            ||  options.mechanism == ZMQ_PLAIN
                            ||  options.mechanism == ZMQ_CURVE);

482 483 484
                    if (options.mechanism == ZMQ_NULL)
                        memcpy (outpos + outsize, "NULL", 4);
                    else
485
                    if (options.mechanism == ZMQ_PLAIN)
486
                        memcpy (outpos + outsize, "PLAIN", 5);
487 488
                    else
                        memcpy (outpos + outsize, "CURVE", 5);
489 490 491 492 493 494
                    outsize += 20;
                    memset (outpos + outsize, 0, 32);
                    outsize += 32;
                    greeting_size = v3_greeting_size;
                }
            }
Martin Hurton's avatar
Martin Hurton committed
495 496 497
        }
    }

498 499
    //  Position of the revision field in the greeting.
    const size_t revision_pos = 10;
500

501 502
    //  Is the peer using ZMTP/1.0 with no revision number?
    //  If so, we send and receive rest of identity message
Pieter Hintjens's avatar
Pieter Hintjens committed
503
    if (greeting_recv [0] != 0xff || !(greeting_recv [9] & 0x01)) {
504
        encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
505 506
        alloc_assert (encoder);

507
        decoder = new (std::nothrow) v1_decoder_t (in_batch_size, options.maxmsgsize);
508 509
        alloc_assert (decoder);

Martin Hurton's avatar
Martin Hurton committed
510 511 512 513 514 515
        //  We have already sent the message header.
        //  Since there is no way to tell the encoder to
        //  skip the message header, we simply throw that
        //  header data away.
        const size_t header_size = options.identity_size + 1 >= 255 ? 10 : 2;
        unsigned char tmp [10], *bufferp = tmp;
516 517 518 519 520 521 522

        //  Prepare the identity message and load it into encoder.
        //  Then consume bytes we have already sent to the peer.
        const int rc = tx_msg.init_size (options.identity_size);
        zmq_assert (rc == 0);
        memcpy (tx_msg.data (), options.identity, options.identity_size);
        encoder->load_msg (&tx_msg);
523
        size_t buffer_size = encoder->encode (&bufferp, header_size);
Martin Hurton's avatar
Martin Hurton committed
524 525 526
        zmq_assert (buffer_size == header_size);

        //  Make sure the decoder sees the data we have already received.
Pieter Hintjens's avatar
Pieter Hintjens committed
527
        inpos = greeting_recv;
Martin Hurton's avatar
Martin Hurton committed
528
        insize = greeting_bytes_read;
529 530

        //  To allow for interoperability with peers that do not forward
531 532
        //  their subscriptions, we inject a phantom subscription message
        //  message into the incoming message stream.
533
        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB)
534
            subscription_required = true;
Martin Hurton's avatar
Martin Hurton committed
535 536 537 538 539 540 541

        //  We are sending our identity now and the next message
        //  will come from the socket.
        read_msg = &stream_engine_t::pull_msg_from_session;

        //  We are expecting identity message.
        write_msg = &stream_engine_t::write_identity;
Martin Hurton's avatar
Martin Hurton committed
542
    }
543
    else
Pieter Hintjens's avatar
Pieter Hintjens committed
544
    if (greeting_recv [revision_pos] == ZMTP_1_0) {
545 546
        encoder = new (std::nothrow) v1_encoder_t (
            out_batch_size);
547 548
        alloc_assert (encoder);

549 550
        decoder = new (std::nothrow) v1_decoder_t (
            in_batch_size, options.maxmsgsize);
551 552
        alloc_assert (decoder);
    }
553 554 555 556 557 558 559 560 561
    else
    if (greeting_recv [revision_pos] == ZMTP_2_0) {
        encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
        alloc_assert (encoder);

        decoder = new (std::nothrow) v2_decoder_t (
            in_batch_size, options.maxmsgsize);
        alloc_assert (decoder);
    }
562
    else {
563
        encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
564 565
        alloc_assert (encoder);

566
        decoder = new (std::nothrow) v2_decoder_t (
567
            in_batch_size, options.maxmsgsize);
568
        alloc_assert (decoder);
569 570

        if (memcmp (greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
571 572
            mechanism = new (std::nothrow)
                null_mechanism_t (session, peer_address, options);
573
            alloc_assert (mechanism);
574 575 576
        }
        else
        if (memcmp (greeting_recv + 12, "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
577 578
            mechanism = new (std::nothrow)
                plain_mechanism_t (session, peer_address, options);
579
            alloc_assert (mechanism);
580
        }
581 582 583 584
#ifdef HAVE_LIBSODIUM
        else
        if (memcmp (greeting_recv + 12, "CURVE\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
            if (options.as_server)
585 586
                mechanism = new (std::nothrow)
                    curve_server_t (session, peer_address, options);
587 588 589 590 591
            else
                mechanism = new (std::nothrow) curve_client_t (options);
            alloc_assert (mechanism);
        }
#endif
592 593 594 595
        else {
            error ();
            return false;
        }
596 597
        read_msg = &stream_engine_t::next_handshake_command;
        write_msg = &stream_engine_t::process_handshake_command;
598
    }
Martin Hurton's avatar
Martin Hurton committed
599 600 601 602 603 604 605 606 607 608 609 610

    // Start polling for output if necessary.
    if (outsize == 0)
        set_pollout (handle);

    //  Handshaking was successful.
    //  Switch into the normal message flow.
    handshaking = false;

    return true;
}

611
int zmq::stream_engine_t::read_identity (msg_t *msg_)
612
{
613 614 615 616 617 618 619
    int rc = msg_->init_size (options.identity_size);
    errno_assert (rc == 0);
    if (options.identity_size > 0)
        memcpy (msg_->data (), options.identity, options.identity_size);
    read_msg = &stream_engine_t::pull_msg_from_session;
    return 0;
}
620

621 622 623 624 625
int zmq::stream_engine_t::write_identity (msg_t *msg_)
{
    if (options.recv_identity) {
        msg_->set_flags (msg_t::identity);
        int rc = session->push_msg (msg_);
626 627
        errno_assert (rc == 0);
    }
628 629 630 631 632 633 634 635 636 637 638
    else {
        int rc = msg_->close ();
        errno_assert (rc == 0);
        rc = msg_->init ();
        errno_assert (rc == 0);
    }

    if (subscription_required)
        write_msg = &stream_engine_t::write_subscription_msg;
    else
        write_msg = &stream_engine_t::push_msg_to_session;
639

640 641
    return 0;
}
642

643
int zmq::stream_engine_t::next_handshake_command (msg_t *msg_)
644
{
645 646
    zmq_assert (mechanism != NULL);

647
    const int rc = mechanism->next_handshake_command (msg_);
648
    if (rc == 0) {
649
        msg_->set_flags (msg_t::command);
650 651 652
        if (mechanism->is_handshake_complete ())
            mechanism_ready ();
    }
653

654
    return rc;
655 656
}

657
int zmq::stream_engine_t::process_handshake_command (msg_t *msg_)
658
{
659
    zmq_assert (mechanism != NULL);
660
    const int rc = mechanism->process_handshake_command (msg_);
661 662 663
    if (rc == 0) {
        if (mechanism->is_handshake_complete ())
            mechanism_ready ();
664 665
        if (output_stopped)
            restart_output ();
666 667
    }

668
    return rc;
669 670
}

671 672 673 674 675 676 677 678 679
void zmq::stream_engine_t::zap_msg_available ()
{
    zmq_assert (mechanism != NULL);

    const int rc = mechanism->zap_msg_available ();
    if (rc == -1) {
        error ();
        return;
    }
680 681 682 683
    if (input_stopped)
        restart_input ();
    if (output_stopped)
        restart_output ();
684 685
}

686
void zmq::stream_engine_t::mechanism_ready ()
687
{
688 689 690 691
    if (options.recv_identity) {
        msg_t identity;
        mechanism->peer_identity (&identity);
        const int rc = session->push_msg (&identity);
692 693 694 695 696 697
        if (rc == -1 && errno == EAGAIN) {
            // If the write is failing at this stage with
            // an EAGAIN the pipe must be being shut down,
            // so we can just bail out of the identity set.
            return;
        }
698
        errno_assert (rc == 0);
699
        session->flush ();
700
    }
701

702
    read_msg = &stream_engine_t::pull_and_encode;
703
    write_msg = &stream_engine_t::write_credential;
704 705
}

706
int zmq::stream_engine_t::pull_msg_from_session (msg_t *msg_)
707
{
708 709
    return session->pull_msg (msg_);
}
710

711 712 713 714 715
int zmq::stream_engine_t::push_msg_to_session (msg_t *msg_)
{
    return session->push_msg (msg_);
}

716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738
int zmq::stream_engine_t::write_credential (msg_t *msg_)
{
    zmq_assert (mechanism != NULL);
    zmq_assert (session != NULL);

    const blob_t credential = mechanism->get_user_id ();
    if (credential.size () > 0) {
        msg_t msg;
        int rc = msg.init_size (credential.size ());
        zmq_assert (rc == 0);
        memcpy (msg.data (), credential.data (), credential.size ());
        msg.set_flags (msg_t::credential);
        rc = session->push_msg (&msg);
        if (rc == -1) {
            rc = msg.close ();
            errno_assert (rc == 0);
            return -1;
        }
    }
    write_msg = &stream_engine_t::decode_and_push;
    return decode_and_push (msg_);
}

739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771
int zmq::stream_engine_t::pull_and_encode (msg_t *msg_)
{
    zmq_assert (mechanism != NULL);

    if (session->pull_msg (msg_) == -1)
        return -1;
    if (mechanism->encode (msg_) == -1)
        return -1;
    return 0;
}

int zmq::stream_engine_t::decode_and_push (msg_t *msg_)
{
    zmq_assert (mechanism != NULL);

    if (mechanism->decode (msg_) == -1)
        return -1;
    if (session->push_msg (msg_) == -1) {
        if (errno == EAGAIN)
            write_msg = &stream_engine_t::push_one_then_decode_and_push;
        return -1;
    }
    return 0;
}

int zmq::stream_engine_t::push_one_then_decode_and_push (msg_t *msg_)
{
    const int rc = session->push_msg (msg_);
    if (rc == 0)
        write_msg = &stream_engine_t::decode_and_push;
    return rc;
}

772 773 774
int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_)
{
    msg_t subscription;
775

776 777
    //  Inject the subscription message, so that also
    //  ZMQ 2.x peers receive published messages.
778 779 780 781 782 783
    int rc = subscription.init_size (1);
    errno_assert (rc == 0);
    *(unsigned char*) subscription.data () = 1;
    rc = session->push_msg (&subscription);
    if (rc == -1)
       return -1;
784

785 786
    write_msg = &stream_engine_t::push_msg_to_session;
    return push_msg_to_session (msg_);
787 788
}

789
void zmq::stream_engine_t::error ()
790
{
791 792 793 794 795 796 797 798
    if (options.raw_sock) {
        //  For raw sockets, send a final 0-length message to the application
        //  so that it knows the peer has been disconnected.
        msg_t terminator;
        terminator.init();
        (this->*write_msg) (&terminator);
        terminator.close();
    }
799
    zmq_assert (session);
800
    socket->event_disconnected (endpoint, s);
801
    session->flush ();
Martin Hurton's avatar
Martin Hurton committed
802
    session->engine_error ();
803 804 805 806
    unplug ();
    delete this;
}

807
int zmq::stream_engine_t::write (const void *data_, size_t size_)
808
{
809 810
#ifdef ZMQ_HAVE_WINDOWS

811 812 813 814 815 816
    int nbytes = send (s, (char*) data_, (int) size_, 0);

    //  If not a single byte can be written to the socket in non-blocking mode
    //  we'll get an error (this may happen during the speculative write).
    if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK)
        return 0;
817
        
818
    //  Signalise peer failure.
819
    if (nbytes == SOCKET_ERROR && (
820 821 822 823 824 825 826 827 828
          WSAGetLastError () == WSAENETDOWN ||
          WSAGetLastError () == WSAENETRESET ||
          WSAGetLastError () == WSAEHOSTUNREACH ||
          WSAGetLastError () == WSAECONNABORTED ||
          WSAGetLastError () == WSAETIMEDOUT ||
          WSAGetLastError () == WSAECONNRESET))
        return -1;

    wsa_assert (nbytes != SOCKET_ERROR);
Martin Hurton's avatar
Martin Hurton committed
829
    return nbytes;
830 831

#else
832

833 834 835 836 837 838 839 840 841 842
    ssize_t nbytes = send (s, data_, size_, 0);

    //  Several errors are OK. When speculative write is being done we may not
    //  be able to write a single byte from the socket. Also, SIGSTOP issued
    //  by a debugging tool can result in EINTR error.
    if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||
          errno == EINTR))
        return 0;

    //  Signalise peer failure.
843 844 845 846 847 848 849 850 851 852 853
    if (nbytes == -1) {
        errno_assert (errno != EACCES
                   && errno != EBADF
                   && errno != EDESTADDRREQ
                   && errno != EFAULT
                   && errno != EINVAL
                   && errno != EISCONN
                   && errno != EMSGSIZE
                   && errno != ENOMEM
                   && errno != ENOTSOCK
                   && errno != EOPNOTSUPP);
854
        return -1;
855
    }
856

857
    return static_cast <int> (nbytes);
858 859

#endif
860 861
}

862
int zmq::stream_engine_t::read (void *data_, size_t size_)
863
{
864 865
#ifdef ZMQ_HAVE_WINDOWS

866
    const int rc = recv (s, (char*) data_, (int) size_, 0);
867 868 869

    //  If not a single byte can be read from the socket in non-blocking mode
    //  we'll get an error (this may happen during the speculative read).
870 871 872 873 874 875 876 877 878 879 880 881 882 883
    if (rc == SOCKET_ERROR) {
        if (WSAGetLastError () == WSAEWOULDBLOCK)
            errno = EAGAIN;
        else {
            wsa_assert (WSAGetLastError () == WSAENETDOWN
                     || WSAGetLastError () == WSAENETRESET
                     || WSAGetLastError () == WSAECONNABORTED
                     || WSAGetLastError () == WSAETIMEDOUT
                     || WSAGetLastError () == WSAECONNRESET
                     || WSAGetLastError () == WSAECONNREFUSED
                     || WSAGetLastError () == WSAENOTCONN);
            errno = wsa_error_to_errno (WSAGetLastError ());
        }
    }
884

885
    return rc == SOCKET_ERROR? -1: rc;
886 887 888

#else

889
    const ssize_t rc = recv (s, data_, size_, 0);
890 891

    //  Several errors are OK. When speculative read is being done we may not
892
    //  be able to read a single byte from the socket. Also, SIGSTOP issued
893
    //  by a debugging tool can result in EINTR error.
894
    if (rc == -1) {
895 896 897 898 899
        errno_assert (errno != EBADF
                   && errno != EFAULT
                   && errno != EINVAL
                   && errno != ENOMEM
                   && errno != ENOTSOCK);
900 901
        if (errno == EWOULDBLOCK || errno == EINTR)
            errno = EAGAIN;
902
    }
903

904
    return static_cast <int> (rc);
905 906

#endif
907
}