stream_engine.cpp 24.3 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "platform.hpp"
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#else
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <netdb.h>
#include <fcntl.h>
#endif

#include <string.h>
#include <new>

36
#include "stream_engine.hpp"
37
#include "io_thread.hpp"
38
#include "session_base.hpp"
39 40
#include "v1_encoder.hpp"
#include "v1_decoder.hpp"
41 42
#include "v2_encoder.hpp"
#include "v2_decoder.hpp"
43
#include "null_mechanism.hpp"
44
#include "plain_mechanism.hpp"
45 46
#include "curve_client.hpp"
#include "curve_server.hpp"
47 48
#include "raw_decoder.hpp"
#include "raw_encoder.hpp"
49 50
#include "config.hpp"
#include "err.hpp"
51
#include "ip.hpp"
Martin Hurton's avatar
Martin Hurton committed
52 53
#include "likely.hpp"
#include "wire.hpp"
54

55 56
zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, 
                                       const std::string &endpoint_) :
57
    s (fd_),
58 59
    inpos (NULL),
    insize (0),
60
    decoder (NULL),
61 62
    outpos (NULL),
    outsize (0),
63
    encoder (NULL),
Martin Hurton's avatar
Martin Hurton committed
64
    handshaking (true),
65
    greeting_size (v2_greeting_size),
Martin Hurton's avatar
Martin Hurton committed
66
    greeting_bytes_read (0),
67 68
    session (NULL),
    options (options_),
69
    endpoint (endpoint_),
70
    plugged (false),
71
    terminating (false),
72 73
    read_msg (&stream_engine_t::read_identity),
    write_msg (&stream_engine_t::write_identity),
74 75
    io_error (false),
    subscription_required (false),
76
    mechanism (NULL),
77 78
    input_stopped (false),
    output_stopped (false),
79
    socket (NULL)
80
{
81 82
    int rc = tx_msg.init ();
    errno_assert (rc == 0);
83
    
Martin Hurton's avatar
Martin Hurton committed
84
    //  Put the socket into non-blocking mode.
85
    unblock_socket (s);
86

87 88 89
    if (!get_peer_ip_address (s, peer_address))
        peer_address = "";

90
#ifdef SO_NOSIGPIPE
91 92 93
    //  Make sure that SIGPIPE signal is not generated when writing to a
    //  connection that was already closed by the peer.
    int set = 1;
94
    rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int));
95 96
    errno_assert (rc == 0);
#endif
97 98
}

99
zmq::stream_engine_t::~stream_engine_t ()
100 101 102
{
    zmq_assert (!plugged);

103 104
    if (s != retired_fd) {
#ifdef ZMQ_HAVE_WINDOWS
105 106
        int rc = closesocket (s);
        wsa_assert (rc != SOCKET_ERROR);
107
#else
108
        int rc = close (s);
109 110
        errno_assert (rc == 0);
#endif
111
        s = retired_fd;
112
    }
113

114 115 116
    int rc = tx_msg.close ();
    errno_assert (rc == 0);

117 118 119
    delete encoder;
    delete decoder;
    delete mechanism;
120 121
}

122 123
void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
    session_base_t *session_)
124 125 126 127 128 129 130 131
{
    zmq_assert (!plugged);
    plugged = true;

    //  Connect to session object.
    zmq_assert (!session);
    zmq_assert (session_);
    session = session_;
132
    socket = session-> get_socket ();
133 134 135 136

    //  Connect to I/O threads poller object.
    io_object_t::plug (io_thread_);
    handle = add_fd (s);
137
    io_error = false;
Martin Hurton's avatar
Martin Hurton committed
138

Martin Hurton's avatar
Martin Hurton committed
139
    if (options.raw_sock) {
140
        // no handshaking for raw sock, instantiate raw encoder and decoders
141
        encoder = new (std::nothrow) raw_encoder_t (out_batch_size);
142 143
        alloc_assert (encoder);

144
        decoder = new (std::nothrow) raw_decoder_t (in_batch_size);
145 146 147 148
        alloc_assert (decoder);

        // disable handshaking for raw socket
        handshaking = false;
149 150 151

        read_msg = &stream_engine_t::pull_msg_from_session;
        write_msg = &stream_engine_t::push_msg_to_session;
Martin Hurton's avatar
Martin Hurton committed
152 153
    }
    else {
154 155
        //  Send the 'length' and 'flags' fields of the identity message.
        //  The 'length' field is encoded in the long format.
Pieter Hintjens's avatar
Pieter Hintjens committed
156
        outpos = greeting_send;
157 158 159 160 161
        outpos [outsize++] = 0xff;
        put_uint64 (&outpos [outsize], options.identity_size + 1);
        outsize += 8;
        outpos [outsize++] = 0x7f;
    }
Martin Hurton's avatar
Martin Hurton committed
162

163 164 165 166 167 168
    set_pollin (handle);
    set_pollout (handle);
    //  Flush all the data that may have been already received downstream.
    in_event ();
}

169
void zmq::stream_engine_t::unplug ()
170 171 172 173 174
{
    zmq_assert (plugged);
    plugged = false;

    //  Cancel all fd subscriptions.
175
    if (!io_error)
Martin Hurton's avatar
Martin Hurton committed
176
        rm_fd (handle);
177 178 179 180 181 182 183

    //  Disconnect from I/O threads poller object.
    io_object_t::unplug ();

    session = NULL;
}

184
void zmq::stream_engine_t::terminate ()
185
{
186 187 188 189 190
    if (!terminating && encoder && encoder->has_data ()) {
        //  Give io_thread a chance to send in the buffer
        terminating = true;
        return;
    }
191 192 193 194
    unplug ();
    delete this;
}

195
void zmq::stream_engine_t::in_event ()
196
{
197 198
    assert (!io_error);

Pieter Hintjens's avatar
Pieter Hintjens committed
199
    //  If still handshaking, receive and process the greeting message.
Martin Hurton's avatar
Martin Hurton committed
200 201 202 203
    if (unlikely (handshaking))
        if (!handshake ())
            return;

204
    zmq_assert (decoder);
205 206

    //  If there has been an I/O error, stop polling.
207
    if (input_stopped) {
208 209 210 211
        rm_fd (handle);
        io_error = true;
        return;
    }
212 213 214 215 216 217 218 219

    //  If there's no data to process in the buffer...
    if (!insize) {

        //  Retrieve the buffer and read as much data as possible.
        //  Note that buffer can be arbitrarily large. However, we assume
        //  the underlying TCP layer has fixed buffer size and thus the
        //  number of bytes read will be always limited.
220
        decoder->get_buffer (&inpos, &insize);
221
        const int bytes_read = read (inpos, insize);
222 223

        //  Check whether the peer has closed the connection.
224 225 226
        if (bytes_read == -1) {
            error ();
            return;
227 228
        }

229 230
        //  Adjust input size
        insize = static_cast <size_t> (bytes_read);
Martin Hurton's avatar
Martin Hurton committed
231
    }
232

233 234
    int rc = 0;
    size_t processed = 0;
235

236 237 238
    while (insize > 0) {
        rc = decoder->decode (inpos, insize, processed);
        zmq_assert (processed <= insize);
239 240
        inpos += processed;
        insize -= processed;
241 242
        if (rc == 0 || rc == -1)
            break;
243
        rc = (this->*write_msg) (decoder->msg ());
244 245
        if (rc == -1)
            break;
246 247
    }

248 249 250 251
    //  Tear down the connection if we have failed to decode input data
    //  or the session has rejected the message.
    if (rc == -1) {
        if (errno != EAGAIN) {
Martin Hurton's avatar
Martin Hurton committed
252
            error ();
253 254
            return;
        }
255
        input_stopped = true;
256
        reset_pollin (handle);
Martin Hurton's avatar
Martin Hurton committed
257
    }
258 259

    session->flush ();
260 261
}

262
void zmq::stream_engine_t::out_event ()
263
{
264 265
    zmq_assert (!io_error);

266 267 268
    //  If write buffer is empty, try to read new data from the encoder.
    if (!outsize) {

Martin Hurton's avatar
Martin Hurton committed
269 270 271 272 273 274 275 276
        //  Even when we stop polling as soon as there is no
        //  data to send, the poller may invoke out_event one
        //  more time due to 'speculative write' optimisation.
        if (unlikely (encoder == NULL)) {
            zmq_assert (handshaking);
            return;
        }

277
        outpos = NULL;
278 279 280
        outsize = encoder->encode (&outpos, 0);

        while (outsize < out_batch_size) {
281
            if ((this->*read_msg) (&tx_msg) == -1)
282 283 284 285 286 287 288 289 290
                break;
            encoder->load_msg (&tx_msg);
            unsigned char *bufptr = outpos + outsize;
            size_t n = encoder->encode (&bufptr, out_batch_size - outsize);
            zmq_assert (n > 0);
            if (outpos == NULL)
                outpos = bufptr;
            outsize += n;
        }
291 292 293

        //  If there is no data to send, stop polling for output.
        if (outsize == 0) {
294
            output_stopped = true;
295 296 297 298 299 300 301
            reset_pollout (handle);
            return;
        }
    }

    //  If there are any data to write in write buffer, write as much as
    //  possible to the socket. Note that amount of data to write can be
302
    //  arbitrarily large. However, we assume that underlying TCP layer has
303 304 305 306
    //  limited transmission buffer and thus the actual number of bytes
    //  written should be reasonably modest.
    int nbytes = write (outpos, outsize);

Martin Hurton's avatar
Martin Hurton committed
307 308
    //  IO error has occurred. We stop waiting for output events.
    //  The engine is not terminated until we detect input error;
309
    //  this is necessary to prevent losing incoming messages.
310
    if (nbytes == -1) {
Martin Hurton's avatar
Martin Hurton committed
311
        reset_pollout (handle);
312 313
        if (unlikely (terminating))
            terminate ();
314 315 316 317 318
        return;
    }

    outpos += nbytes;
    outsize -= nbytes;
Martin Hurton's avatar
Martin Hurton committed
319 320 321 322 323 324

    //  If we are still handshaking and there are no data
    //  to send, stop polling for output.
    if (unlikely (handshaking))
        if (outsize == 0)
            reset_pollout (handle);
325 326 327 328

    if (unlikely (terminating))
        if (outsize == 0)
            terminate ();
329 330
}

331
void zmq::stream_engine_t::restart_output ()
332
{
333 334 335
    if (unlikely (io_error))
        return;

336
    if (likely (output_stopped)) {
337
        set_pollout (handle);
338
        output_stopped = false;
339
    }
340 341 342 343 344 345 346 347

    //  Speculative write: The assumption is that at the moment new message
    //  was sent by the user the socket is probably available for writing.
    //  Thus we try to write the data to socket avoiding polling for POLLOUT.
    //  Consequently, the latency should be better in request/reply scenarios.
    out_event ();
}

348
void zmq::stream_engine_t::restart_input ()
349
{
350
    zmq_assert (input_stopped);
351 352 353
    zmq_assert (session != NULL);
    zmq_assert (decoder != NULL);

354
    int rc = (this->*write_msg) (decoder->msg ());
355 356 357 358 359
    if (rc == -1) {
        if (errno == EAGAIN)
            session->flush ();
        else
            error ();
Martin Hurton's avatar
Martin Hurton committed
360 361 362
        return;
    }

363 364 365 366 367 368 369 370
    while (insize > 0) {
        size_t processed = 0;
        rc = decoder->decode (inpos, insize, processed);
        zmq_assert (processed <= insize);
        inpos += processed;
        insize -= processed;
        if (rc == 0 || rc == -1)
            break;
371
        rc = (this->*write_msg) (decoder->msg ());
372 373 374
        if (rc == -1)
            break;
    }
375

376 377 378 379 380 381
    if (rc == -1 && errno == EAGAIN)
        session->flush ();
    else
    if (rc == -1 || io_error)
        error ();
    else {
382
        input_stopped = false;
383 384 385 386 387 388
        set_pollin (handle);
        session->flush ();

        //  Speculative read.
        in_event ();
    }
389 390
}

391
bool zmq::stream_engine_t::handshake ()
Martin Hurton's avatar
Martin Hurton committed
392
{
393
    zmq_assert (handshaking);
Martin Hurton's avatar
Martin Hurton committed
394
    zmq_assert (greeting_bytes_read < greeting_size);
395
    //  Receive the greeting.
Martin Hurton's avatar
Martin Hurton committed
396
    while (greeting_bytes_read < greeting_size) {
Pieter Hintjens's avatar
Pieter Hintjens committed
397
        const int n = read (greeting_recv + greeting_bytes_read,
Martin Hurton's avatar
Martin Hurton committed
398
                            greeting_size - greeting_bytes_read);
399 400 401 402
        if (n == -1) {
            error ();
            return false;
        }
Martin Hurton's avatar
Martin Hurton committed
403
        if (n == 0)
404
            return false;
Martin Hurton's avatar
Martin Hurton committed
405 406 407

        greeting_bytes_read += n;

408 409 410
        //  We have received at least one byte from the peer.
        //  If the first byte is not 0xff, we know that the
        //  peer is using unversioned protocol.
Pieter Hintjens's avatar
Pieter Hintjens committed
411
        if (greeting_recv [0] != 0xff)
412
            break;
Martin Hurton's avatar
Martin Hurton committed
413

414
        if (greeting_bytes_read < signature_size)
415
            continue;
Martin Hurton's avatar
Martin Hurton committed
416

417 418 419 420
        //  Inspect the right-most bit of the 10th byte (which coincides
        //  with the 'flags' field if a regular message was sent).
        //  Zero indicates this is a header of identity message
        //  (i.e. the peer is using the unversioned protocol).
Pieter Hintjens's avatar
Pieter Hintjens committed
421
        if (!(greeting_recv [9] & 0x01))
422
            break;
Martin Hurton's avatar
Martin Hurton committed
423

424
        //  The peer is using versioned protocol.
425 426
        //  Send the major version number.
        if (outpos + outsize == greeting_send + signature_size) {
Martin Hurton's avatar
Martin Hurton committed
427 428
            if (outsize == 0)
                set_pollout (handle);
429 430 431 432 433 434 435 436 437 438 439 440 441 442 443
            outpos [outsize++] = 3;     //  Major version number
        }

        if (greeting_bytes_read > signature_size) {
            if (outpos + outsize == greeting_send + signature_size + 1) {
                if (outsize == 0)
                    set_pollout (handle);

                //  Use ZMTP/2.0 to talk to older peers.
                if (greeting_recv [10] == ZMTP_1_0
                ||  greeting_recv [10] == ZMTP_2_0)
                    outpos [outsize++] = options.type;
                else {
                    outpos [outsize++] = 0; //  Minor version number
                    memset (outpos + outsize, 0, 20);
444 445 446 447 448

                    zmq_assert (options.mechanism == ZMQ_NULL
                            ||  options.mechanism == ZMQ_PLAIN
                            ||  options.mechanism == ZMQ_CURVE);

449 450 451
                    if (options.mechanism == ZMQ_NULL)
                        memcpy (outpos + outsize, "NULL", 4);
                    else
452
                    if (options.mechanism == ZMQ_PLAIN)
453
                        memcpy (outpos + outsize, "PLAIN", 5);
454 455
                    else
                        memcpy (outpos + outsize, "CURVE", 5);
456 457 458 459 460 461
                    outsize += 20;
                    memset (outpos + outsize, 0, 32);
                    outsize += 32;
                    greeting_size = v3_greeting_size;
                }
            }
Martin Hurton's avatar
Martin Hurton committed
462 463 464
        }
    }

465 466
    //  Position of the revision field in the greeting.
    const size_t revision_pos = 10;
467

468 469
    //  Is the peer using ZMTP/1.0 with no revision number?
    //  If so, we send and receive rest of identity message
Pieter Hintjens's avatar
Pieter Hintjens committed
470
    if (greeting_recv [0] != 0xff || !(greeting_recv [9] & 0x01)) {
471
        encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
472 473
        alloc_assert (encoder);

474
        decoder = new (std::nothrow) v1_decoder_t (in_batch_size, options.maxmsgsize);
475 476
        alloc_assert (decoder);

Martin Hurton's avatar
Martin Hurton committed
477 478 479 480 481 482
        //  We have already sent the message header.
        //  Since there is no way to tell the encoder to
        //  skip the message header, we simply throw that
        //  header data away.
        const size_t header_size = options.identity_size + 1 >= 255 ? 10 : 2;
        unsigned char tmp [10], *bufferp = tmp;
483 484 485 486 487 488 489

        //  Prepare the identity message and load it into encoder.
        //  Then consume bytes we have already sent to the peer.
        const int rc = tx_msg.init_size (options.identity_size);
        zmq_assert (rc == 0);
        memcpy (tx_msg.data (), options.identity, options.identity_size);
        encoder->load_msg (&tx_msg);
490
        size_t buffer_size = encoder->encode (&bufferp, header_size);
Martin Hurton's avatar
Martin Hurton committed
491 492 493
        zmq_assert (buffer_size == header_size);

        //  Make sure the decoder sees the data we have already received.
Pieter Hintjens's avatar
Pieter Hintjens committed
494
        inpos = greeting_recv;
Martin Hurton's avatar
Martin Hurton committed
495
        insize = greeting_bytes_read;
496 497

        //  To allow for interoperability with peers that do not forward
498 499
        //  their subscriptions, we inject a phantom subscription message
        //  message into the incoming message stream.
500
        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB)
501
            subscription_required = true;
Martin Hurton's avatar
Martin Hurton committed
502
    }
503
    else
Pieter Hintjens's avatar
Pieter Hintjens committed
504
    if (greeting_recv [revision_pos] == ZMTP_1_0) {
505 506
        encoder = new (std::nothrow) v1_encoder_t (
            out_batch_size);
507 508
        alloc_assert (encoder);

509 510
        decoder = new (std::nothrow) v1_decoder_t (
            in_batch_size, options.maxmsgsize);
511 512
        alloc_assert (decoder);
    }
513 514 515 516 517 518 519 520 521
    else
    if (greeting_recv [revision_pos] == ZMTP_2_0) {
        encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
        alloc_assert (encoder);

        decoder = new (std::nothrow) v2_decoder_t (
            in_batch_size, options.maxmsgsize);
        alloc_assert (decoder);
    }
522
    else {
523
        encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
524 525
        alloc_assert (encoder);

526
        decoder = new (std::nothrow) v2_decoder_t (
527
            in_batch_size, options.maxmsgsize);
528
        alloc_assert (decoder);
529 530

        if (memcmp (greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
531 532
            mechanism = new (std::nothrow)
                null_mechanism_t (session, peer_address, options);
533
            alloc_assert (mechanism);
534 535 536
        }
        else
        if (memcmp (greeting_recv + 12, "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
537 538
            mechanism = new (std::nothrow)
                plain_mechanism_t (session, peer_address, options);
539
            alloc_assert (mechanism);
540
        }
541 542 543 544
#ifdef HAVE_LIBSODIUM
        else
        if (memcmp (greeting_recv + 12, "CURVE\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
            if (options.as_server)
545 546
                mechanism = new (std::nothrow)
                    curve_server_t (session, peer_address, options);
547 548 549 550 551
            else
                mechanism = new (std::nothrow) curve_client_t (options);
            alloc_assert (mechanism);
        }
#endif
552 553 554 555
        else {
            error ();
            return false;
        }
556 557
        read_msg = &stream_engine_t::next_handshake_command;
        write_msg = &stream_engine_t::process_handshake_command;
558
    }
Martin Hurton's avatar
Martin Hurton committed
559 560 561 562 563 564 565 566 567 568 569 570

    // Start polling for output if necessary.
    if (outsize == 0)
        set_pollout (handle);

    //  Handshaking was successful.
    //  Switch into the normal message flow.
    handshaking = false;

    return true;
}

571
int zmq::stream_engine_t::read_identity (msg_t *msg_)
572
{
573 574 575 576 577 578 579
    int rc = msg_->init_size (options.identity_size);
    errno_assert (rc == 0);
    if (options.identity_size > 0)
        memcpy (msg_->data (), options.identity, options.identity_size);
    read_msg = &stream_engine_t::pull_msg_from_session;
    return 0;
}
580

581 582 583 584 585
int zmq::stream_engine_t::write_identity (msg_t *msg_)
{
    if (options.recv_identity) {
        msg_->set_flags (msg_t::identity);
        int rc = session->push_msg (msg_);
586 587
        errno_assert (rc == 0);
    }
588 589 590 591 592 593 594 595 596 597 598
    else {
        int rc = msg_->close ();
        errno_assert (rc == 0);
        rc = msg_->init ();
        errno_assert (rc == 0);
    }

    if (subscription_required)
        write_msg = &stream_engine_t::write_subscription_msg;
    else
        write_msg = &stream_engine_t::push_msg_to_session;
599

600 601
    return 0;
}
602

603
int zmq::stream_engine_t::next_handshake_command (msg_t *msg_)
604
{
605 606
    zmq_assert (mechanism != NULL);

607
    const int rc = mechanism->next_handshake_command (msg_);
608
    if (rc == 0) {
609
        msg_->set_flags (msg_t::command);
610 611 612
        if (mechanism->is_handshake_complete ())
            mechanism_ready ();
    }
613

614
    return rc;
615 616
}

617
int zmq::stream_engine_t::process_handshake_command (msg_t *msg_)
618
{
619
    zmq_assert (mechanism != NULL);
620
    const int rc = mechanism->process_handshake_command (msg_);
621 622 623
    if (rc == 0) {
        if (mechanism->is_handshake_complete ())
            mechanism_ready ();
624 625
        if (output_stopped)
            restart_output ();
626 627
    }

628
    return rc;
629 630
}

631 632 633 634 635 636 637 638 639
void zmq::stream_engine_t::zap_msg_available ()
{
    zmq_assert (mechanism != NULL);

    const int rc = mechanism->zap_msg_available ();
    if (rc == -1) {
        error ();
        return;
    }
640 641 642 643
    if (input_stopped)
        restart_input ();
    if (output_stopped)
        restart_output ();
644 645
}

646
void zmq::stream_engine_t::mechanism_ready ()
647
{
648 649 650 651
    if (options.recv_identity) {
        msg_t identity;
        mechanism->peer_identity (&identity);
        const int rc = session->push_msg (&identity);
652 653 654 655 656 657
        if (rc == -1 && errno == EAGAIN) {
            // If the write is failing at this stage with
            // an EAGAIN the pipe must be being shut down,
            // so we can just bail out of the identity set.
            return;
        }
658
        errno_assert (rc == 0);
659
        session->flush ();
660
    }
661

662 663
    read_msg = &stream_engine_t::pull_and_encode;
    write_msg = &stream_engine_t::decode_and_push;
664 665
}

666
int zmq::stream_engine_t::pull_msg_from_session (msg_t *msg_)
667
{
668 669
    return session->pull_msg (msg_);
}
670

671 672 673 674 675
int zmq::stream_engine_t::push_msg_to_session (msg_t *msg_)
{
    return session->push_msg (msg_);
}

676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708
int zmq::stream_engine_t::pull_and_encode (msg_t *msg_)
{
    zmq_assert (mechanism != NULL);

    if (session->pull_msg (msg_) == -1)
        return -1;
    if (mechanism->encode (msg_) == -1)
        return -1;
    return 0;
}

int zmq::stream_engine_t::decode_and_push (msg_t *msg_)
{
    zmq_assert (mechanism != NULL);

    if (mechanism->decode (msg_) == -1)
        return -1;
    if (session->push_msg (msg_) == -1) {
        if (errno == EAGAIN)
            write_msg = &stream_engine_t::push_one_then_decode_and_push;
        return -1;
    }
    return 0;
}

int zmq::stream_engine_t::push_one_then_decode_and_push (msg_t *msg_)
{
    const int rc = session->push_msg (msg_);
    if (rc == 0)
        write_msg = &stream_engine_t::decode_and_push;
    return rc;
}

709 710 711
int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_)
{
    msg_t subscription;
712

713 714
    //  Inject the subscription message, so that also
    //  ZMQ 2.x peers receive published messages.
715 716 717 718 719 720
    int rc = subscription.init_size (1);
    errno_assert (rc == 0);
    *(unsigned char*) subscription.data () = 1;
    rc = session->push_msg (&subscription);
    if (rc == -1)
       return -1;
721

722 723
    write_msg = &stream_engine_t::push_msg_to_session;
    return push_msg_to_session (msg_);
724 725
}

726
void zmq::stream_engine_t::error ()
727 728
{
    zmq_assert (session);
729
    socket->event_disconnected (endpoint, s);
730
    session->flush ();
731 732 733 734 735
    session->detach ();
    unplug ();
    delete this;
}

736
int zmq::stream_engine_t::write (const void *data_, size_t size_)
737
{
738 739
#ifdef ZMQ_HAVE_WINDOWS

740 741 742 743 744 745
    int nbytes = send (s, (char*) data_, (int) size_, 0);

    //  If not a single byte can be written to the socket in non-blocking mode
    //  we'll get an error (this may happen during the speculative write).
    if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK)
        return 0;
746
        
747
    //  Signalise peer failure.
748
    if (nbytes == SOCKET_ERROR && (
749 750 751 752 753 754 755 756 757
          WSAGetLastError () == WSAENETDOWN ||
          WSAGetLastError () == WSAENETRESET ||
          WSAGetLastError () == WSAEHOSTUNREACH ||
          WSAGetLastError () == WSAECONNABORTED ||
          WSAGetLastError () == WSAETIMEDOUT ||
          WSAGetLastError () == WSAECONNRESET))
        return -1;

    wsa_assert (nbytes != SOCKET_ERROR);
Martin Hurton's avatar
Martin Hurton committed
758
    return nbytes;
759 760

#else
761

762 763 764 765 766 767 768 769 770 771
    ssize_t nbytes = send (s, data_, size_, 0);

    //  Several errors are OK. When speculative write is being done we may not
    //  be able to write a single byte from the socket. Also, SIGSTOP issued
    //  by a debugging tool can result in EINTR error.
    if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||
          errno == EINTR))
        return 0;

    //  Signalise peer failure.
772 773 774 775 776 777 778 779 780 781 782
    if (nbytes == -1) {
        errno_assert (errno != EACCES
                   && errno != EBADF
                   && errno != EDESTADDRREQ
                   && errno != EFAULT
                   && errno != EINVAL
                   && errno != EISCONN
                   && errno != EMSGSIZE
                   && errno != ENOMEM
                   && errno != ENOTSOCK
                   && errno != EOPNOTSUPP);
783
        return -1;
784
    }
785

786
    return static_cast <int> (nbytes);
787 788

#endif
789 790
}

791
int zmq::stream_engine_t::read (void *data_, size_t size_)
792
{
793 794
#ifdef ZMQ_HAVE_WINDOWS

795 796 797 798 799 800 801 802
    int nbytes = recv (s, (char*) data_, (int) size_, 0);

    //  If not a single byte can be read from the socket in non-blocking mode
    //  we'll get an error (this may happen during the speculative read).
    if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK)
        return 0;

    //  Connection failure.
803
    if (nbytes == SOCKET_ERROR && (
804 805 806 807 808 809 810 811 812 813 814 815 816 817 818
          WSAGetLastError () == WSAENETDOWN ||
          WSAGetLastError () == WSAENETRESET ||
          WSAGetLastError () == WSAECONNABORTED ||
          WSAGetLastError () == WSAETIMEDOUT ||
          WSAGetLastError () == WSAECONNRESET ||
          WSAGetLastError () == WSAECONNREFUSED ||
          WSAGetLastError () == WSAENOTCONN))
        return -1;

    wsa_assert (nbytes != SOCKET_ERROR);

    //  Orderly shutdown by the other peer.
    if (nbytes == 0)
        return -1; 

Martin Hurton's avatar
Martin Hurton committed
819
    return nbytes;
820 821 822 823 824 825

#else

    ssize_t nbytes = recv (s, data_, size_, 0);

    //  Several errors are OK. When speculative read is being done we may not
826
    //  be able to read a single byte from the socket. Also, SIGSTOP issued
827 828 829 830 831 832
    //  by a debugging tool can result in EINTR error.
    if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||
          errno == EINTR))
        return 0;

    //  Signalise peer failure.
833 834 835 836 837 838
    if (nbytes == -1) {
        errno_assert (errno != EBADF
                   && errno != EFAULT
                   && errno != EINVAL
                   && errno != ENOMEM
                   && errno != ENOTSOCK);
839
        return -1;
840
    }
841 842 843 844 845

    //  Orderly shutdown by the peer.
    if (nbytes == 0)
        return -1;

846
    return static_cast <int> (nbytes);
847 848

#endif
849
}