stream_engine.cpp 24.8 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "platform.hpp"
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#else
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <netdb.h>
#include <fcntl.h>
#endif

#include <string.h>
#include <new>

36
#include "stream_engine.hpp"
37
#include "io_thread.hpp"
38
#include "session_base.hpp"
39 40
#include "v1_encoder.hpp"
#include "v1_decoder.hpp"
41 42
#include "v2_encoder.hpp"
#include "v2_decoder.hpp"
43 44
#include "raw_decoder.hpp"
#include "raw_encoder.hpp"
45 46
#include "config.hpp"
#include "err.hpp"
47
#include "ip.hpp"
Martin Hurton's avatar
Martin Hurton committed
48 49
#include "likely.hpp"
#include "wire.hpp"
50

51
zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, const std::string &endpoint_) :
52
    s (fd_),
53 54
    inpos (NULL),
    insize (0),
55
    decoder (NULL),
56 57
    outpos (NULL),
    outsize (0),
58
    encoder (NULL),
Martin Hurton's avatar
Martin Hurton committed
59
    handshaking (true),
60
    greeting_size (v2_greeting_size),
Martin Hurton's avatar
Martin Hurton committed
61
    greeting_bytes_read (0),
62 63
    session (NULL),
    options (options_),
64
    endpoint (endpoint_),
65
    plugged (false),
66
    terminating (false),
67 68
    read_msg (&stream_engine_t::read_identity),
    write_msg (&stream_engine_t::write_identity),
69 70 71
    io_error (false),
    congested (false),
    subscription_required (false),
72 73
    output_paused (false),
    ready_command_received (false),
74
    socket (NULL)
75
{
76 77 78
    int rc = tx_msg.init ();
    errno_assert (rc == 0);

Martin Hurton's avatar
Martin Hurton committed
79
    //  Put the socket into non-blocking mode.
80
    unblock_socket (s);
81 82
    //  Set the socket buffer limits for the underlying socket.
    if (options.sndbuf) {
83
        rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF,
84 85 86 87 88 89 90 91
            (char*) &options.sndbuf, sizeof (int));
#ifdef ZMQ_HAVE_WINDOWS
		wsa_assert (rc != SOCKET_ERROR);
#else
        errno_assert (rc == 0);
#endif
    }
    if (options.rcvbuf) {
92
        rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF,
93 94 95 96 97 98 99 100
            (char*) &options.rcvbuf, sizeof (int));
#ifdef ZMQ_HAVE_WINDOWS
		wsa_assert (rc != SOCKET_ERROR);
#else
        errno_assert (rc == 0);
#endif
    }

101
#ifdef SO_NOSIGPIPE
102 103 104
    //  Make sure that SIGPIPE signal is not generated when writing to a
    //  connection that was already closed by the peer.
    int set = 1;
105
    rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int));
106 107
    errno_assert (rc == 0);
#endif
108 109
}

110
zmq::stream_engine_t::~stream_engine_t ()
111 112 113
{
    zmq_assert (!plugged);

114 115
    if (s != retired_fd) {
#ifdef ZMQ_HAVE_WINDOWS
116 117
        int rc = closesocket (s);
        wsa_assert (rc != SOCKET_ERROR);
118
#else
119
        int rc = close (s);
120 121
        errno_assert (rc == 0);
#endif
122
        s = retired_fd;
123
    }
124

125 126 127
    int rc = tx_msg.close ();
    errno_assert (rc == 0);

128 129 130 131
    if (encoder != NULL)
        delete encoder;
    if (decoder != NULL)
        delete decoder;
132 133
}

134 135
void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
    session_base_t *session_)
136 137 138 139 140 141 142 143
{
    zmq_assert (!plugged);
    plugged = true;

    //  Connect to session object.
    zmq_assert (!session);
    zmq_assert (session_);
    session = session_;
144
    socket = session-> get_socket ();
145 146 147 148

    //  Connect to I/O threads poller object.
    io_object_t::plug (io_thread_);
    handle = add_fd (s);
149
    io_error = false;
Martin Hurton's avatar
Martin Hurton committed
150

Martin Hurton's avatar
Martin Hurton committed
151
    if (options.raw_sock) {
152
        // no handshaking for raw sock, instantiate raw encoder and decoders
153
        encoder = new (std::nothrow) raw_encoder_t (out_batch_size);
154 155
        alloc_assert (encoder);

156
        decoder = new (std::nothrow) raw_decoder_t (in_batch_size);
157 158 159 160
        alloc_assert (decoder);

        // disable handshaking for raw socket
        handshaking = false;
161 162 163

        read_msg = &stream_engine_t::pull_msg_from_session;
        write_msg = &stream_engine_t::push_msg_to_session;
Martin Hurton's avatar
Martin Hurton committed
164 165
    }
    else {
166 167
        //  Send the 'length' and 'flags' fields of the identity message.
        //  The 'length' field is encoded in the long format.
Pieter Hintjens's avatar
Pieter Hintjens committed
168
        outpos = greeting_send;
169 170 171 172 173
        outpos [outsize++] = 0xff;
        put_uint64 (&outpos [outsize], options.identity_size + 1);
        outsize += 8;
        outpos [outsize++] = 0x7f;
    }
Martin Hurton's avatar
Martin Hurton committed
174

175 176 177 178 179 180
    set_pollin (handle);
    set_pollout (handle);
    //  Flush all the data that may have been already received downstream.
    in_event ();
}

181
void zmq::stream_engine_t::unplug ()
182 183 184 185 186
{
    zmq_assert (plugged);
    plugged = false;

    //  Cancel all fd subscriptions.
187
    if (!io_error)
Martin Hurton's avatar
Martin Hurton committed
188
        rm_fd (handle);
189 190 191 192 193 194 195

    //  Disconnect from I/O threads poller object.
    io_object_t::unplug ();

    session = NULL;
}

196
void zmq::stream_engine_t::terminate ()
197
{
198 199 200 201 202
    if (!terminating && encoder && encoder->has_data ()) {
        //  Give io_thread a chance to send in the buffer
        terminating = true;
        return;
    }
203 204 205 206
    unplug ();
    delete this;
}

207
void zmq::stream_engine_t::in_event ()
208
{
209 210
    assert (!io_error);

Pieter Hintjens's avatar
Pieter Hintjens committed
211
    //  If still handshaking, receive and process the greeting message.
Martin Hurton's avatar
Martin Hurton committed
212 213 214 215
    if (unlikely (handshaking))
        if (!handshake ())
            return;

216
    zmq_assert (decoder);
217 218 219 220 221 222 223

    //  If there has been an I/O error, stop polling.
    if (congested) {
        rm_fd (handle);
        io_error = true;
        return;
    }
224 225 226 227 228 229 230 231

    //  If there's no data to process in the buffer...
    if (!insize) {

        //  Retrieve the buffer and read as much data as possible.
        //  Note that buffer can be arbitrarily large. However, we assume
        //  the underlying TCP layer has fixed buffer size and thus the
        //  number of bytes read will be always limited.
232
        decoder->get_buffer (&inpos, &insize);
233
        const int bytes_read = read (inpos, insize);
234 235

        //  Check whether the peer has closed the connection.
236 237 238
        if (bytes_read == -1) {
            error ();
            return;
239 240
        }

241 242
        //  Adjust input size
        insize = static_cast <size_t> (bytes_read);
Martin Hurton's avatar
Martin Hurton committed
243
    }
244

245 246
    int rc = 0;
    size_t processed = 0;
247

248 249 250
    while (insize > 0) {
        rc = decoder->decode (inpos, insize, processed);
        zmq_assert (processed <= insize);
251 252
        inpos += processed;
        insize -= processed;
253 254
        if (rc == 0 || rc == -1)
            break;
255
        rc = (this->*write_msg) (decoder->msg ());
256 257
        if (rc == -1)
            break;
258 259
    }

260 261 262 263
    //  Tear down the connection if we have failed to decode input data
    //  or the session has rejected the message.
    if (rc == -1) {
        if (errno != EAGAIN) {
Martin Hurton's avatar
Martin Hurton committed
264
            error ();
265 266 267 268
            return;
        }
        congested = true;
        reset_pollin (handle);
Martin Hurton's avatar
Martin Hurton committed
269
    }
270 271

    session->flush ();
272 273
}

274
void zmq::stream_engine_t::out_event ()
275
{
276 277
    zmq_assert (!io_error);

278 279 280
    //  If write buffer is empty, try to read new data from the encoder.
    if (!outsize) {

Martin Hurton's avatar
Martin Hurton committed
281 282 283 284 285 286 287 288
        //  Even when we stop polling as soon as there is no
        //  data to send, the poller may invoke out_event one
        //  more time due to 'speculative write' optimisation.
        if (unlikely (encoder == NULL)) {
            zmq_assert (handshaking);
            return;
        }

289
        outpos = NULL;
290 291 292
        outsize = encoder->encode (&outpos, 0);

        while (outsize < out_batch_size) {
293
            if ((this->*read_msg) (&tx_msg) == -1)
294 295 296 297 298 299 300 301 302
                break;
            encoder->load_msg (&tx_msg);
            unsigned char *bufptr = outpos + outsize;
            size_t n = encoder->encode (&bufptr, out_batch_size - outsize);
            zmq_assert (n > 0);
            if (outpos == NULL)
                outpos = bufptr;
            outsize += n;
        }
303 304 305 306 307 308 309 310 311 312

        //  If there is no data to send, stop polling for output.
        if (outsize == 0) {
            reset_pollout (handle);
            return;
        }
    }

    //  If there are any data to write in write buffer, write as much as
    //  possible to the socket. Note that amount of data to write can be
313
    //  arbitrarily large. However, we assume that underlying TCP layer has
314 315 316 317
    //  limited transmission buffer and thus the actual number of bytes
    //  written should be reasonably modest.
    int nbytes = write (outpos, outsize);

Martin Hurton's avatar
Martin Hurton committed
318 319
    //  IO error has occurred. We stop waiting for output events.
    //  The engine is not terminated until we detect input error;
320
    //  this is necessary to prevent losing incoming messages.
321
    if (nbytes == -1) {
Martin Hurton's avatar
Martin Hurton committed
322
        reset_pollout (handle);
323 324
        if (unlikely (terminating))
            terminate ();
325 326 327 328 329
        return;
    }

    outpos += nbytes;
    outsize -= nbytes;
Martin Hurton's avatar
Martin Hurton committed
330 331 332 333 334 335

    //  If we are still handshaking and there are no data
    //  to send, stop polling for output.
    if (unlikely (handshaking))
        if (outsize == 0)
            reset_pollout (handle);
336 337 338 339

    if (unlikely (terminating))
        if (outsize == 0)
            terminate ();
340 341
}

342
void zmq::stream_engine_t::activate_out ()
343
{
344 345 346
    if (unlikely (io_error))
        return;

347 348 349 350 351 352 353 354 355
    set_pollout (handle);

    //  Speculative write: The assumption is that at the moment new message
    //  was sent by the user the socket is probably available for writing.
    //  Thus we try to write the data to socket avoiding polling for POLLOUT.
    //  Consequently, the latency should be better in request/reply scenarios.
    out_event ();
}

356
void zmq::stream_engine_t::activate_in ()
357
{
358 359 360 361
    zmq_assert (congested);
    zmq_assert (session != NULL);
    zmq_assert (decoder != NULL);

362
    int rc = (this->*write_msg) (decoder->msg ());
363 364 365 366 367
    if (rc == -1) {
        if (errno == EAGAIN)
            session->flush ();
        else
            error ();
Martin Hurton's avatar
Martin Hurton committed
368 369 370
        return;
    }

371 372 373 374 375 376 377 378
    while (insize > 0) {
        size_t processed = 0;
        rc = decoder->decode (inpos, insize, processed);
        zmq_assert (processed <= insize);
        inpos += processed;
        insize -= processed;
        if (rc == 0 || rc == -1)
            break;
379
        rc = (this->*write_msg) (decoder->msg ());
380 381 382
        if (rc == -1)
            break;
    }
383

384 385 386 387 388 389 390 391 392 393 394 395 396
    if (rc == -1 && errno == EAGAIN)
        session->flush ();
    else
    if (rc == -1 || io_error)
        error ();
    else {
        congested = false;
        set_pollin (handle);
        session->flush ();

        //  Speculative read.
        in_event ();
    }
397 398
}

399
bool zmq::stream_engine_t::handshake ()
Martin Hurton's avatar
Martin Hurton committed
400
{
401
    zmq_assert (handshaking);
Martin Hurton's avatar
Martin Hurton committed
402 403
    zmq_assert (greeting_bytes_read < greeting_size);

404
    //  Receive the greeting.
Martin Hurton's avatar
Martin Hurton committed
405
    while (greeting_bytes_read < greeting_size) {
Pieter Hintjens's avatar
Pieter Hintjens committed
406
        const int n = read (greeting_recv + greeting_bytes_read,
Martin Hurton's avatar
Martin Hurton committed
407
                            greeting_size - greeting_bytes_read);
408 409 410 411
        if (n == -1) {
            error ();
            return false;
        }
Martin Hurton's avatar
Martin Hurton committed
412
        if (n == 0)
413
            return false;
Martin Hurton's avatar
Martin Hurton committed
414 415 416

        greeting_bytes_read += n;

417 418 419
        //  We have received at least one byte from the peer.
        //  If the first byte is not 0xff, we know that the
        //  peer is using unversioned protocol.
Pieter Hintjens's avatar
Pieter Hintjens committed
420
        if (greeting_recv [0] != 0xff)
421
            break;
Martin Hurton's avatar
Martin Hurton committed
422

423
        if (greeting_bytes_read < signature_size)
424
            continue;
Martin Hurton's avatar
Martin Hurton committed
425

426 427 428 429
        //  Inspect the right-most bit of the 10th byte (which coincides
        //  with the 'flags' field if a regular message was sent).
        //  Zero indicates this is a header of identity message
        //  (i.e. the peer is using the unversioned protocol).
Pieter Hintjens's avatar
Pieter Hintjens committed
430
        if (!(greeting_recv [9] & 0x01))
431
            break;
Martin Hurton's avatar
Martin Hurton committed
432

433
        //  The peer is using versioned protocol.
434 435
        //  Send the major version number.
        if (outpos + outsize == greeting_send + signature_size) {
Martin Hurton's avatar
Martin Hurton committed
436 437
            if (outsize == 0)
                set_pollout (handle);
438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459
            outpos [outsize++] = 3;     //  Major version number
        }

        if (greeting_bytes_read > signature_size) {
            if (outpos + outsize == greeting_send + signature_size + 1) {
                if (outsize == 0)
                    set_pollout (handle);

                //  Use ZMTP/2.0 to talk to older peers.
                if (greeting_recv [10] == ZMTP_1_0
                ||  greeting_recv [10] == ZMTP_2_0)
                    outpos [outsize++] = options.type;
                else {
                    outpos [outsize++] = 0; //  Minor version number
                    memset (outpos + outsize, 0, 20);
                    memcpy (outpos + outsize, "NULL", 4);
                    outsize += 20;
                    memset (outpos + outsize, 0, 32);
                    outsize += 32;
                    greeting_size = v3_greeting_size;
                }
            }
Martin Hurton's avatar
Martin Hurton committed
460 461 462
        }
    }

463 464
    //  Position of the revision field in the greeting.
    const size_t revision_pos = 10;
465

466 467
    //  Is the peer using ZMTP/1.0 with no revision number?
    //  If so, we send and receive rest of identity message
Pieter Hintjens's avatar
Pieter Hintjens committed
468
    if (greeting_recv [0] != 0xff || !(greeting_recv [9] & 0x01)) {
469
        encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
470 471
        alloc_assert (encoder);

472
        decoder = new (std::nothrow) v1_decoder_t (in_batch_size, options.maxmsgsize);
473 474
        alloc_assert (decoder);

Martin Hurton's avatar
Martin Hurton committed
475 476 477 478 479 480
        //  We have already sent the message header.
        //  Since there is no way to tell the encoder to
        //  skip the message header, we simply throw that
        //  header data away.
        const size_t header_size = options.identity_size + 1 >= 255 ? 10 : 2;
        unsigned char tmp [10], *bufferp = tmp;
481
        size_t buffer_size = encoder->encode (&bufferp, header_size);
Martin Hurton's avatar
Martin Hurton committed
482 483 484
        zmq_assert (buffer_size == header_size);

        //  Make sure the decoder sees the data we have already received.
Pieter Hintjens's avatar
Pieter Hintjens committed
485
        inpos = greeting_recv;
Martin Hurton's avatar
Martin Hurton committed
486
        insize = greeting_bytes_read;
487 488

        //  To allow for interoperability with peers that do not forward
Pieter Hintjens's avatar
Pieter Hintjens committed
489
        //  their subscriptions, we inject a phony subscription
490
        //  message into the incomming message stream.
491
        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB)
492
            subscription_required = true;
Martin Hurton's avatar
Martin Hurton committed
493
    }
494
    else
Pieter Hintjens's avatar
Pieter Hintjens committed
495
    if (greeting_recv [revision_pos] == ZMTP_1_0) {
496 497
        encoder = new (std::nothrow) v1_encoder_t (
            out_batch_size);
498 499
        alloc_assert (encoder);

500 501
        decoder = new (std::nothrow) v1_decoder_t (
            in_batch_size, options.maxmsgsize);
502 503
        alloc_assert (decoder);
    }
504 505 506 507 508 509 510 511 512
    else
    if (greeting_recv [revision_pos] == ZMTP_2_0) {
        encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
        alloc_assert (encoder);

        decoder = new (std::nothrow) v2_decoder_t (
            in_batch_size, options.maxmsgsize);
        alloc_assert (decoder);
    }
513
    else {
514
        encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
515 516
        alloc_assert (encoder);

517
        decoder = new (std::nothrow) v2_decoder_t (
518
            in_batch_size, options.maxmsgsize);
519
        alloc_assert (decoder);
520 521 522 523 524 525 526 527 528

        if (memcmp (greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
            read_msg = &stream_engine_t::send_ready_command;
            write_msg = &stream_engine_t::receive_ready_command;
        }
        else {
            error ();
            return false;
        }
529
    }
Martin Hurton's avatar
Martin Hurton committed
530 531 532 533 534 535 536 537 538 539 540 541

    // Start polling for output if necessary.
    if (outsize == 0)
        set_pollout (handle);

    //  Handshaking was successful.
    //  Switch into the normal message flow.
    handshaking = false;

    return true;
}

542
int zmq::stream_engine_t::read_identity (msg_t *msg_)
543
{
544 545 546 547 548 549 550
    int rc = msg_->init_size (options.identity_size);
    errno_assert (rc == 0);
    if (options.identity_size > 0)
        memcpy (msg_->data (), options.identity, options.identity_size);
    read_msg = &stream_engine_t::pull_msg_from_session;
    return 0;
}
551

552 553 554 555 556
int zmq::stream_engine_t::write_identity (msg_t *msg_)
{
    if (options.recv_identity) {
        msg_->set_flags (msg_t::identity);
        int rc = session->push_msg (msg_);
557 558
        errno_assert (rc == 0);
    }
559 560 561 562 563 564 565 566 567 568 569
    else {
        int rc = msg_->close ();
        errno_assert (rc == 0);
        rc = msg_->init ();
        errno_assert (rc == 0);
    }

    if (subscription_required)
        write_msg = &stream_engine_t::write_subscription_msg;
    else
        write_msg = &stream_engine_t::push_msg_to_session;
570

571 572
    return 0;
}
573

574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702
int zmq::stream_engine_t::send_ready_command (msg_t *msg_)
{
    unsigned char * const command_buffer = (unsigned char *) malloc (512);
    alloc_assert (command_buffer);

    unsigned char *ptr = command_buffer;

    //  Add mechanism string
    memcpy (ptr, "READY   ", 8);
    ptr += 8;

    //  Add socket type property
    const char *socket_type = socket_type_string (options.type);
    ptr += add_property (ptr, "Socket-Type", socket_type, strlen (socket_type));

    //  Add identity property
    if (options.type == ZMQ_REQ
    ||  options.type == ZMQ_DEALER
    ||  options.type == ZMQ_ROUTER) {
        ptr += add_property (ptr, "Identity",
            options.identity, options.identity_size);
    }

    const size_t command_size = ptr - command_buffer;
    const int rc = msg_->init_size (command_size);
    errno_assert (rc == 0);
    memcpy (msg_->data (), command_buffer, command_size);
    free (command_buffer);

    if (ready_command_received)
        read_msg = &stream_engine_t::pull_msg_from_session;
    else
        read_msg = &stream_engine_t::wait;

    return 0;
}

int zmq::stream_engine_t::receive_ready_command (msg_t *msg_)
{
    const unsigned char * const command_buffer =
        static_cast <unsigned char *> (msg_->data ());
    const size_t command_size = msg_->size ();

    const unsigned char *ptr = command_buffer;
    size_t bytes_left = command_size;

    if (bytes_left < 8 || memcmp(ptr, "READY   ", 8)) {
        errno = EPROTO;
        return -1;
    }

    ptr += 8;
    bytes_left -= 8;

    //  Parse the property list
    while (bytes_left > 1) {
        const size_t name_length = static_cast <size_t> (*ptr);
        ptr += 1;
        bytes_left -= 1;

        if (bytes_left < name_length)
            break;
        const std::string name = std::string((const char *) ptr, name_length);
        ptr += name_length;
        bytes_left -= name_length;

        if (bytes_left < 4)
            break;
        const size_t value_length = static_cast <size_t> (get_uint32 (ptr));
        ptr += 4;
        bytes_left -= 4;

        if (bytes_left < value_length)
            break;
        const unsigned char * const value = ptr;
        ptr += value_length;
        bytes_left -= value_length;

        if (name == "Socket-Type") {
            //  Implement socket type checking
        }
        else
        if (name == "Identity") {
            if (options.recv_identity) {
                msg_t identity;
                int rc = identity.init_size (value_length);
                errno_assert (rc == 0);
                memcpy (identity.data (), value, value_length);
                identity.set_flags (msg_t::identity);
                rc = session->push_msg (&identity);
                errno_assert (rc == 0);
            }
        }
    }

    if (bytes_left > 0) {
        errno = EPROTO;
        return -1;
    }

    int rc = msg_->close ();
    errno_assert (rc == 0);
    rc = msg_->init ();
    errno_assert (rc == 0);

    write_msg = &stream_engine_t::push_msg_to_session;

    ready_command_received = true;
    if (output_paused) {
        activate_out ();
        output_paused = false;
    }

    return 0;
}

int zmq::stream_engine_t::wait (msg_t *msg_)
{
    if (ready_command_received) {
        read_msg = &stream_engine_t::pull_msg_from_session;
        return pull_msg_from_session (msg_);
    }
    else {
        output_paused = true;
        errno = EAGAIN;
        return -1;
    }
}

703
int zmq::stream_engine_t::pull_msg_from_session (msg_t *msg_)
704
{
705 706
    return session->pull_msg (msg_);
}
707

708 709 710 711 712 713 714 715
int zmq::stream_engine_t::push_msg_to_session (msg_t *msg_)
{
    return session->push_msg (msg_);
}

int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_)
{
    msg_t subscription;
716

717 718
    //  Inject the subscription message, so that also
    //  ZMQ 2.x peers receive published messages.
719 720 721 722 723 724
    int rc = subscription.init_size (1);
    errno_assert (rc == 0);
    *(unsigned char*) subscription.data () = 1;
    rc = session->push_msg (&subscription);
    if (rc == -1)
       return -1;
725

726 727
    write_msg = &stream_engine_t::push_msg_to_session;
    return push_msg_to_session (msg_);
728 729
}

730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752
size_t zmq::stream_engine_t::add_property (unsigned char *ptr,
    const char *name, const void *value, size_t value_len)
{
    const size_t name_len = strlen (name);
    zmq_assert (name_len <= 255);
    *ptr++ = static_cast <unsigned char> (name_len);
    memcpy (ptr, name, name_len);
    ptr += name_len;
    zmq_assert (value_len <= (2^31) - 1);
    put_uint32 (ptr, static_cast <uint32_t> (value_len));
    ptr += 4;
    memcpy (ptr, value, value_len);

    return 1 + name_len + 4 + value_len;
}

const char *zmq::stream_engine_t::socket_type_string (int socket_type) {
    const char *names [] = {"PAIR", "PUB", "SUB", "REQ", "REP", "DEALER",
                            "ROUTER", "PULL", "PUSH", "XPUB", "XSUB"};
    zmq_assert (socket_type >= 0 && socket_type <= 10);
    return names [socket_type];
}

753
void zmq::stream_engine_t::error ()
754 755
{
    zmq_assert (session);
756
    socket->event_disconnected (endpoint, s);
757
    session->flush ();
758 759 760 761 762
    session->detach ();
    unplug ();
    delete this;
}

763
int zmq::stream_engine_t::write (const void *data_, size_t size_)
764
{
765 766
#ifdef ZMQ_HAVE_WINDOWS

767 768 769 770 771 772 773 774
    int nbytes = send (s, (char*) data_, (int) size_, 0);

    //  If not a single byte can be written to the socket in non-blocking mode
    //  we'll get an error (this may happen during the speculative write).
    if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK)
        return 0;
		
    //  Signalise peer failure.
775
    if (nbytes == SOCKET_ERROR && (
776 777 778 779 780 781 782 783 784
          WSAGetLastError () == WSAENETDOWN ||
          WSAGetLastError () == WSAENETRESET ||
          WSAGetLastError () == WSAEHOSTUNREACH ||
          WSAGetLastError () == WSAECONNABORTED ||
          WSAGetLastError () == WSAETIMEDOUT ||
          WSAGetLastError () == WSAECONNRESET))
        return -1;

    wsa_assert (nbytes != SOCKET_ERROR);
Martin Hurton's avatar
Martin Hurton committed
785
    return nbytes;
786 787

#else
788

789 790 791 792 793 794 795 796 797 798
    ssize_t nbytes = send (s, data_, size_, 0);

    //  Several errors are OK. When speculative write is being done we may not
    //  be able to write a single byte from the socket. Also, SIGSTOP issued
    //  by a debugging tool can result in EINTR error.
    if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||
          errno == EINTR))
        return 0;

    //  Signalise peer failure.
799 800 801 802 803 804 805 806 807 808 809
    if (nbytes == -1) {
        errno_assert (errno != EACCES
                   && errno != EBADF
                   && errno != EDESTADDRREQ
                   && errno != EFAULT
                   && errno != EINVAL
                   && errno != EISCONN
                   && errno != EMSGSIZE
                   && errno != ENOMEM
                   && errno != ENOTSOCK
                   && errno != EOPNOTSUPP);
810
        return -1;
811
    }
812

813
    return static_cast <int> (nbytes);
814 815

#endif
816 817
}

818
int zmq::stream_engine_t::read (void *data_, size_t size_)
819
{
820 821
#ifdef ZMQ_HAVE_WINDOWS

822 823 824 825 826 827 828 829
    int nbytes = recv (s, (char*) data_, (int) size_, 0);

    //  If not a single byte can be read from the socket in non-blocking mode
    //  we'll get an error (this may happen during the speculative read).
    if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK)
        return 0;

    //  Connection failure.
830
    if (nbytes == SOCKET_ERROR && (
831 832 833 834 835 836 837 838 839 840 841 842 843 844 845
          WSAGetLastError () == WSAENETDOWN ||
          WSAGetLastError () == WSAENETRESET ||
          WSAGetLastError () == WSAECONNABORTED ||
          WSAGetLastError () == WSAETIMEDOUT ||
          WSAGetLastError () == WSAECONNRESET ||
          WSAGetLastError () == WSAECONNREFUSED ||
          WSAGetLastError () == WSAENOTCONN))
        return -1;

    wsa_assert (nbytes != SOCKET_ERROR);

    //  Orderly shutdown by the other peer.
    if (nbytes == 0)
        return -1; 

Martin Hurton's avatar
Martin Hurton committed
846
    return nbytes;
847 848 849 850 851 852

#else

    ssize_t nbytes = recv (s, data_, size_, 0);

    //  Several errors are OK. When speculative read is being done we may not
853
    //  be able to read a single byte from the socket. Also, SIGSTOP issued
854 855 856 857 858 859
    //  by a debugging tool can result in EINTR error.
    if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||
          errno == EINTR))
        return 0;

    //  Signalise peer failure.
860 861 862 863 864 865
    if (nbytes == -1) {
        errno_assert (errno != EBADF
                   && errno != EFAULT
                   && errno != EINVAL
                   && errno != ENOMEM
                   && errno != ENOTSOCK);
866
        return -1;
867
    }
868 869 870 871 872

    //  Orderly shutdown by the peer.
    if (nbytes == 0)
        return -1;

873
    return static_cast <int> (nbytes);
874 875

#endif
876
}