stream_engine.cpp 32.1 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
32 33 34 35
#include "platform.hpp"

#include <string.h>
#include <new>
36
#include <sstream>
37

38
#include "stream_engine.hpp"
39
#include "io_thread.hpp"
40
#include "session_base.hpp"
41 42
#include "v1_encoder.hpp"
#include "v1_decoder.hpp"
43 44
#include "v2_encoder.hpp"
#include "v2_decoder.hpp"
45
#include "null_mechanism.hpp"
46 47
#include "plain_client.hpp"
#include "plain_server.hpp"
48 49
#include "gssapi_client.hpp"
#include "gssapi_server.hpp"
50 51
#include "curve_client.hpp"
#include "curve_server.hpp"
52 53
#include "raw_decoder.hpp"
#include "raw_encoder.hpp"
54 55
#include "config.hpp"
#include "err.hpp"
56
#include "ip.hpp"
57
#include "tcp.hpp"
Martin Hurton's avatar
Martin Hurton committed
58 59
#include "likely.hpp"
#include "wire.hpp"
60

Chris Laws's avatar
Chris Laws committed
61
zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
62
                                       const std::string &endpoint_) :
63
    s (fd_),
64 65
    as_server(false),
    handle(NULL),
66 67
    inpos (NULL),
    insize (0),
68
    decoder (NULL),
69 70
    outpos (NULL),
    outsize (0),
71
    encoder (NULL),
72
    metadata (NULL),
Martin Hurton's avatar
Martin Hurton committed
73
    handshaking (true),
74
    greeting_size (v2_greeting_size),
Martin Hurton's avatar
Martin Hurton committed
75
    greeting_bytes_read (0),
76 77
    session (NULL),
    options (options_),
78
    endpoint (endpoint_),
79
    plugged (false),
Martin Hurton's avatar
Martin Hurton committed
80 81
    next_msg (&stream_engine_t::identity_msg),
    process_msg (&stream_engine_t::process_identity_msg),
82 83
    io_error (false),
    subscription_required (false),
84
    mechanism (NULL),
85 86
    input_stopped (false),
    output_stopped (false),
87
    has_handshake_timer (false),
Jonathan Reams's avatar
Jonathan Reams committed
88 89 90
    has_ttl_timer (false),
    has_timeout_timer (false),
    has_heartbeat_timer (false),
91
    heartbeat_timeout (0),
92
    socket (NULL)
93
{
94 95
    int rc = tx_msg.init ();
    errno_assert (rc == 0);
Chris Laws's avatar
Chris Laws committed
96

Martin Hurton's avatar
Martin Hurton committed
97
    //  Put the socket into non-blocking mode.
98
    unblock_socket (s);
99

100 101
    int family = get_peer_ip_address (s, peer_address);
    if (family == 0)
102
        peer_address.clear();
103
#if defined ZMQ_HAVE_SO_PEERCRED
104 105
    else
    if (family == PF_UNIX) {
106 107 108 109 110 111 112 113 114
        struct ucred cred;
        socklen_t size = sizeof (cred);
        if (!getsockopt (s, SOL_SOCKET, SO_PEERCRED, &cred, &size)) {
            std::ostringstream buf;
            buf << ":" << cred.uid << ":" << cred.gid << ":" << cred.pid;
            peer_address += buf.str ();
        }
    }
#elif defined ZMQ_HAVE_LOCAL_PEERCRED
115 116
    else
    if (family == PF_UNIX) {
117 118 119 120 121 122 123 124 125 126 127 128 129
        struct xucred cred;
        socklen_t size = sizeof (cred);
        if (!getsockopt (s, 0, LOCAL_PEERCRED, &cred, &size)
                && cred.cr_version == XUCRED_VERSION) {
            std::ostringstream buf;
            buf << ":" << cred.cr_uid << ":";
            if (cred.cr_ngroups > 0)
                buf << cred.cr_groups[0];
            buf << ":";
            peer_address += buf.str ();
        }
    }
#endif
130

131
#ifdef SO_NOSIGPIPE
132 133 134
    //  Make sure that SIGPIPE signal is not generated when writing to a
    //  connection that was already closed by the peer.
    int set = 1;
135
    rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int));
136 137
    errno_assert (rc == 0);
#endif
138 139 140 141 142
    if(options.heartbeat_interval > 0) {
        heartbeat_timeout = options.heartbeat_timeout;
        if(heartbeat_timeout == -1)
            heartbeat_timeout = options.heartbeat_interval;
    }
143 144
}

145
zmq::stream_engine_t::~stream_engine_t ()
146 147 148
{
    zmq_assert (!plugged);

149 150
    if (s != retired_fd) {
#ifdef ZMQ_HAVE_WINDOWS
151 152
        int rc = closesocket (s);
        wsa_assert (rc != SOCKET_ERROR);
153
#else
154
        int rc = close (s);
155 156
        errno_assert (rc == 0);
#endif
157
        s = retired_fd;
158
    }
159

160 161 162
    int rc = tx_msg.close ();
    errno_assert (rc == 0);

163 164
    //  Drop reference to metadata and destroy it if we are
    //  the only user.
165 166 167 168 169
    if (metadata != NULL) {
        if (metadata->drop_ref ()) {
            LIBZMQ_DELETE(metadata);
        }
    }
170

171 172 173
    LIBZMQ_DELETE(encoder);
    LIBZMQ_DELETE(decoder);
    LIBZMQ_DELETE(mechanism);
174 175
}

176 177
void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
    session_base_t *session_)
178 179 180 181 182 183 184 185
{
    zmq_assert (!plugged);
    plugged = true;

    //  Connect to session object.
    zmq_assert (!session);
    zmq_assert (session_);
    session = session_;
186
    socket = session-> get_socket ();
187 188 189 190

    //  Connect to I/O threads poller object.
    io_object_t::plug (io_thread_);
    handle = add_fd (s);
191
    io_error = false;
Martin Hurton's avatar
Martin Hurton committed
192

193
    if (options.raw_socket) {
194
        // no handshaking for raw sock, instantiate raw encoder and decoders
195
        encoder = new (std::nothrow) raw_encoder_t (out_batch_size);
196 197
        alloc_assert (encoder);

198
        decoder = new (std::nothrow) raw_decoder_t (in_batch_size);
199 200 201 202
        alloc_assert (decoder);

        // disable handshaking for raw socket
        handshaking = false;
203

Martin Hurton's avatar
Martin Hurton committed
204
        next_msg = &stream_engine_t::pull_msg_from_session;
205 206
        process_msg = &stream_engine_t::push_raw_msg_to_session;

Thomas Rodgers's avatar
Thomas Rodgers committed
207 208
        properties_t properties;
        if (init_properties(properties)) {
209 210 211 212
            //  Compile metadata.
            zmq_assert (metadata == NULL);
            metadata = new (std::nothrow) metadata_t (properties);
        }
213

214 215 216 217 218
        if (options.raw_notify) {
            //  For raw sockets, send an initial 0-length message to the
            // application so that it knows a peer has connected.
            msg_t connector;
            connector.init();
219
            push_raw_msg_to_session (&connector);
220 221 222
            connector.close();
            session->flush ();
        }
Martin Hurton's avatar
Martin Hurton committed
223 224
    }
    else {
225 226 227
        // start optional timer, to prevent handshake hanging on no input
        set_handshake_timer ();

228 229
        //  Send the 'length' and 'flags' fields of the identity message.
        //  The 'length' field is encoded in the long format.
Pieter Hintjens's avatar
Pieter Hintjens committed
230
        outpos = greeting_send;
231 232 233 234 235
        outpos [outsize++] = 0xff;
        put_uint64 (&outpos [outsize], options.identity_size + 1);
        outsize += 8;
        outpos [outsize++] = 0x7f;
    }
Martin Hurton's avatar
Martin Hurton committed
236

237 238 239 240 241 242
    set_pollin (handle);
    set_pollout (handle);
    //  Flush all the data that may have been already received downstream.
    in_event ();
}

243
void zmq::stream_engine_t::unplug ()
244 245 246 247
{
    zmq_assert (plugged);
    plugged = false;

248 249 250 251 252 253
    //  Cancel all timers.
    if (has_handshake_timer) {
        cancel_timer (handshake_timer_id);
        has_handshake_timer = false;
    }

Jonathan Reams's avatar
Jonathan Reams committed
254 255 256 257 258 259 260 261 262 263 264 265 266 267
    if (has_ttl_timer) {
        cancel_timer (heartbeat_ttl_timer_id);
        has_ttl_timer = false;
    }

    if (has_timeout_timer) {
        cancel_timer (heartbeat_timeout_timer_id);
        has_timeout_timer = false;
    }

    if (has_heartbeat_timer) {
        cancel_timer (heartbeat_ivl_timer_id);
        has_heartbeat_timer = false;
    }
268
    //  Cancel all fd subscriptions.
269
    if (!io_error)
Martin Hurton's avatar
Martin Hurton committed
270
        rm_fd (handle);
271 272 273 274 275 276 277

    //  Disconnect from I/O threads poller object.
    io_object_t::unplug ();

    session = NULL;
}

278
void zmq::stream_engine_t::terminate ()
279 280 281 282 283
{
    unplug ();
    delete this;
}

284
void zmq::stream_engine_t::in_event ()
285
{
286
    zmq_assert (!io_error);
287

Pieter Hintjens's avatar
Pieter Hintjens committed
288
    //  If still handshaking, receive and process the greeting message.
Martin Hurton's avatar
Martin Hurton committed
289 290 291 292
    if (unlikely (handshaking))
        if (!handshake ())
            return;

293
    zmq_assert (decoder);
294 295

    //  If there has been an I/O error, stop polling.
296
    if (input_stopped) {
297 298 299 300
        rm_fd (handle);
        io_error = true;
        return;
    }
301 302 303 304 305 306 307 308

    //  If there's no data to process in the buffer...
    if (!insize) {

        //  Retrieve the buffer and read as much data as possible.
        //  Note that buffer can be arbitrarily large. However, we assume
        //  the underlying TCP layer has fixed buffer size and thus the
        //  number of bytes read will be always limited.
309 310
        size_t bufsize = 0;
        decoder->get_buffer (&inpos, &bufsize);
311

312
        const int rc = tcp_read (s, inpos, bufsize);
313

314
        if (rc == 0) {
315
            error (connection_error);
316
            return;
317
        }
318 319
        if (rc == -1) {
            if (errno != EAGAIN)
320
                error (connection_error);
321 322
            return;
        }
323

324
        //  Adjust input size
325
        insize = static_cast <size_t> (rc);
326 327
        // Adjust buffer size to received bytes
        decoder->resize_buffer(insize);
Martin Hurton's avatar
Martin Hurton committed
328
    }
329

330 331
    int rc = 0;
    size_t processed = 0;
332

333 334 335
    while (insize > 0) {
        rc = decoder->decode (inpos, insize, processed);
        zmq_assert (processed <= insize);
336 337
        inpos += processed;
        insize -= processed;
338 339
        if (rc == 0 || rc == -1)
            break;
Martin Hurton's avatar
Martin Hurton committed
340
        rc = (this->*process_msg) (decoder->msg ());
341 342
        if (rc == -1)
            break;
343 344
    }

345 346 347 348
    //  Tear down the connection if we have failed to decode input data
    //  or the session has rejected the message.
    if (rc == -1) {
        if (errno != EAGAIN) {
349
            error (protocol_error);
350 351
            return;
        }
352
        input_stopped = true;
353
        reset_pollin (handle);
Martin Hurton's avatar
Martin Hurton committed
354
    }
355 356

    session->flush ();
357 358
}

359
void zmq::stream_engine_t::out_event ()
360
{
361 362
    zmq_assert (!io_error);

363 364 365
    //  If write buffer is empty, try to read new data from the encoder.
    if (!outsize) {

Martin Hurton's avatar
Martin Hurton committed
366 367 368 369 370 371 372 373
        //  Even when we stop polling as soon as there is no
        //  data to send, the poller may invoke out_event one
        //  more time due to 'speculative write' optimisation.
        if (unlikely (encoder == NULL)) {
            zmq_assert (handshaking);
            return;
        }

374
        outpos = NULL;
375 376
        outsize = encoder->encode (&outpos, 0);

377
        while (outsize < (size_t) out_batch_size) {
Martin Hurton's avatar
Martin Hurton committed
378
            if ((this->*next_msg) (&tx_msg) == -1)
379 380 381
                break;
            encoder->load_msg (&tx_msg);
            unsigned char *bufptr = outpos + outsize;
382
            size_t n = encoder->encode (&bufptr, out_batch_size - outsize);
383 384 385 386 387
            zmq_assert (n > 0);
            if (outpos == NULL)
                outpos = bufptr;
            outsize += n;
        }
388 389 390

        //  If there is no data to send, stop polling for output.
        if (outsize == 0) {
391
            output_stopped = true;
392 393 394 395 396 397 398
            reset_pollout (handle);
            return;
        }
    }

    //  If there are any data to write in write buffer, write as much as
    //  possible to the socket. Note that amount of data to write can be
399
    //  arbitrarily large. However, we assume that underlying TCP layer has
400 401
    //  limited transmission buffer and thus the actual number of bytes
    //  written should be reasonably modest.
402
    const int nbytes = tcp_write (s, outpos, outsize);
403

Martin Hurton's avatar
Martin Hurton committed
404 405
    //  IO error has occurred. We stop waiting for output events.
    //  The engine is not terminated until we detect input error;
406
    //  this is necessary to prevent losing incoming messages.
407
    if (nbytes == -1) {
Martin Hurton's avatar
Martin Hurton committed
408
        reset_pollout (handle);
409 410 411 412 413
        return;
    }

    outpos += nbytes;
    outsize -= nbytes;
Martin Hurton's avatar
Martin Hurton committed
414 415 416 417 418 419

    //  If we are still handshaking and there are no data
    //  to send, stop polling for output.
    if (unlikely (handshaking))
        if (outsize == 0)
            reset_pollout (handle);
420 421
}

422
void zmq::stream_engine_t::restart_output ()
423
{
424 425 426
    if (unlikely (io_error))
        return;

427
    if (likely (output_stopped)) {
428
        set_pollout (handle);
429
        output_stopped = false;
430
    }
431 432 433 434 435 436 437 438

    //  Speculative write: The assumption is that at the moment new message
    //  was sent by the user the socket is probably available for writing.
    //  Thus we try to write the data to socket avoiding polling for POLLOUT.
    //  Consequently, the latency should be better in request/reply scenarios.
    out_event ();
}

439
void zmq::stream_engine_t::restart_input ()
440
{
441
    zmq_assert (input_stopped);
442 443 444
    zmq_assert (session != NULL);
    zmq_assert (decoder != NULL);

Martin Hurton's avatar
Martin Hurton committed
445
    int rc = (this->*process_msg) (decoder->msg ());
446 447 448 449
    if (rc == -1) {
        if (errno == EAGAIN)
            session->flush ();
        else
450
            error (protocol_error);
Martin Hurton's avatar
Martin Hurton committed
451 452 453
        return;
    }

454 455 456 457 458 459 460 461
    while (insize > 0) {
        size_t processed = 0;
        rc = decoder->decode (inpos, insize, processed);
        zmq_assert (processed <= insize);
        inpos += processed;
        insize -= processed;
        if (rc == 0 || rc == -1)
            break;
Martin Hurton's avatar
Martin Hurton committed
462
        rc = (this->*process_msg) (decoder->msg ());
463 464 465
        if (rc == -1)
            break;
    }
466

467 468 469
    if (rc == -1 && errno == EAGAIN)
        session->flush ();
    else
470 471 472 473 474
    if (io_error)
        error (connection_error);
    else
    if (rc == -1)
        error (protocol_error);
475
    else {
476
        input_stopped = false;
477 478 479 480 481 482
        set_pollin (handle);
        session->flush ();

        //  Speculative read.
        in_event ();
    }
483 484
}

485
bool zmq::stream_engine_t::handshake ()
Martin Hurton's avatar
Martin Hurton committed
486
{
487
    zmq_assert (handshaking);
Martin Hurton's avatar
Martin Hurton committed
488
    zmq_assert (greeting_bytes_read < greeting_size);
489
    //  Receive the greeting.
Martin Hurton's avatar
Martin Hurton committed
490
    while (greeting_bytes_read < greeting_size) {
491 492
        const int n = tcp_read (s, greeting_recv + greeting_bytes_read,
                                greeting_size - greeting_bytes_read);
493
        if (n == 0) {
494
            error (connection_error);
495 496
            return false;
        }
497 498
        if (n == -1) {
            if (errno != EAGAIN)
499
                error (connection_error);
500
            return false;
501
        }
Martin Hurton's avatar
Martin Hurton committed
502 503 504

        greeting_bytes_read += n;

505 506 507
        //  We have received at least one byte from the peer.
        //  If the first byte is not 0xff, we know that the
        //  peer is using unversioned protocol.
Pieter Hintjens's avatar
Pieter Hintjens committed
508
        if (greeting_recv [0] != 0xff)
509
            break;
Martin Hurton's avatar
Martin Hurton committed
510

511
        if (greeting_bytes_read < signature_size)
512
            continue;
Martin Hurton's avatar
Martin Hurton committed
513

514 515 516 517
        //  Inspect the right-most bit of the 10th byte (which coincides
        //  with the 'flags' field if a regular message was sent).
        //  Zero indicates this is a header of identity message
        //  (i.e. the peer is using the unversioned protocol).
Pieter Hintjens's avatar
Pieter Hintjens committed
518
        if (!(greeting_recv [9] & 0x01))
519
            break;
Martin Hurton's avatar
Martin Hurton committed
520

521
        //  The peer is using versioned protocol.
522 523
        //  Send the major version number.
        if (outpos + outsize == greeting_send + signature_size) {
Martin Hurton's avatar
Martin Hurton committed
524 525
            if (outsize == 0)
                set_pollout (handle);
526 527 528 529 530 531 532 533 534 535 536 537 538 539 540
            outpos [outsize++] = 3;     //  Major version number
        }

        if (greeting_bytes_read > signature_size) {
            if (outpos + outsize == greeting_send + signature_size + 1) {
                if (outsize == 0)
                    set_pollout (handle);

                //  Use ZMTP/2.0 to talk to older peers.
                if (greeting_recv [10] == ZMTP_1_0
                ||  greeting_recv [10] == ZMTP_2_0)
                    outpos [outsize++] = options.type;
                else {
                    outpos [outsize++] = 0; //  Minor version number
                    memset (outpos + outsize, 0, 20);
541 542 543

                    zmq_assert (options.mechanism == ZMQ_NULL
                            ||  options.mechanism == ZMQ_PLAIN
544 545
                            ||  options.mechanism == ZMQ_CURVE
                            ||  options.mechanism == ZMQ_GSSAPI);
546

547 548 549
                    if (options.mechanism == ZMQ_NULL)
                        memcpy (outpos + outsize, "NULL", 4);
                    else
550
                    if (options.mechanism == ZMQ_PLAIN)
551
                        memcpy (outpos + outsize, "PLAIN", 5);
552
                    else
553 554
                    if (options.mechanism == ZMQ_GSSAPI)
                        memcpy (outpos + outsize, "GSSAPI", 6);
555
                    else
556
                    if (options.mechanism == ZMQ_CURVE)
557
                        memcpy (outpos + outsize, "CURVE", 5);
558 559 560 561 562 563
                    outsize += 20;
                    memset (outpos + outsize, 0, 32);
                    outsize += 32;
                    greeting_size = v3_greeting_size;
                }
            }
Martin Hurton's avatar
Martin Hurton committed
564 565 566
        }
    }

567 568
    //  Position of the revision field in the greeting.
    const size_t revision_pos = 10;
569

570 571
    //  Is the peer using ZMTP/1.0 with no revision number?
    //  If so, we send and receive rest of identity message
Pieter Hintjens's avatar
Pieter Hintjens committed
572
    if (greeting_recv [0] != 0xff || !(greeting_recv [9] & 0x01)) {
Min RK's avatar
Min RK committed
573
        if (session->zap_enabled ()) {
574 575 576 577 578
           // reject ZMTP 1.0 connections if ZAP is enabled
           error (protocol_error);
           return false;
        }

579
        encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
580 581
        alloc_assert (encoder);

582
        decoder = new (std::nothrow) v1_decoder_t (in_batch_size, options.maxmsgsize);
583 584
        alloc_assert (decoder);

Martin Hurton's avatar
Martin Hurton committed
585 586 587 588 589 590
        //  We have already sent the message header.
        //  Since there is no way to tell the encoder to
        //  skip the message header, we simply throw that
        //  header data away.
        const size_t header_size = options.identity_size + 1 >= 255 ? 10 : 2;
        unsigned char tmp [10], *bufferp = tmp;
591 592 593 594 595 596 597

        //  Prepare the identity message and load it into encoder.
        //  Then consume bytes we have already sent to the peer.
        const int rc = tx_msg.init_size (options.identity_size);
        zmq_assert (rc == 0);
        memcpy (tx_msg.data (), options.identity, options.identity_size);
        encoder->load_msg (&tx_msg);
598
        size_t buffer_size = encoder->encode (&bufferp, header_size);
Martin Hurton's avatar
Martin Hurton committed
599 600 601
        zmq_assert (buffer_size == header_size);

        //  Make sure the decoder sees the data we have already received.
Pieter Hintjens's avatar
Pieter Hintjens committed
602
        inpos = greeting_recv;
Martin Hurton's avatar
Martin Hurton committed
603
        insize = greeting_bytes_read;
604 605

        //  To allow for interoperability with peers that do not forward
606 607
        //  their subscriptions, we inject a phantom subscription message
        //  message into the incoming message stream.
608
        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB)
609
            subscription_required = true;
Martin Hurton's avatar
Martin Hurton committed
610 611 612

        //  We are sending our identity now and the next message
        //  will come from the socket.
Martin Hurton's avatar
Martin Hurton committed
613
        next_msg = &stream_engine_t::pull_msg_from_session;
Martin Hurton's avatar
Martin Hurton committed
614 615

        //  We are expecting identity message.
Martin Hurton's avatar
Martin Hurton committed
616
        process_msg = &stream_engine_t::process_identity_msg;
Martin Hurton's avatar
Martin Hurton committed
617
    }
618
    else
Pieter Hintjens's avatar
Pieter Hintjens committed
619
    if (greeting_recv [revision_pos] == ZMTP_1_0) {
Min RK's avatar
Min RK committed
620
        if (session->zap_enabled ()) {
621 622 623 624 625
           // reject ZMTP 1.0 connections if ZAP is enabled
           error (protocol_error);
           return false;
        }

626
        encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
627 628
        alloc_assert (encoder);

629
        decoder = new (std::nothrow) v1_decoder_t (
630
            in_batch_size, options.maxmsgsize);
631 632
        alloc_assert (decoder);
    }
633 634
    else
    if (greeting_recv [revision_pos] == ZMTP_2_0) {
Min RK's avatar
Min RK committed
635
        if (session->zap_enabled ()) {
636 637 638 639 640
           // reject ZMTP 2.0 connections if ZAP is enabled
           error (protocol_error);
           return false;
        }

641
        encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
642 643 644
        alloc_assert (encoder);

        decoder = new (std::nothrow) v2_decoder_t (
645
            in_batch_size, options.maxmsgsize);
646 647
        alloc_assert (decoder);
    }
648
    else {
649
        encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
650 651
        alloc_assert (encoder);

652
        decoder = new (std::nothrow) v2_decoder_t (
653
                in_batch_size, options.maxmsgsize);
654
        alloc_assert (decoder);
655

656 657
        if (options.mechanism == ZMQ_NULL
        &&  memcmp (greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
658 659
            mechanism = new (std::nothrow)
                null_mechanism_t (session, peer_address, options);
660
            alloc_assert (mechanism);
661 662
        }
        else
663 664
        if (options.mechanism == ZMQ_PLAIN
        &&  memcmp (greeting_recv + 12, "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
665 666 667 668 669 670
            if (options.as_server)
                mechanism = new (std::nothrow)
                    plain_server_t (session, peer_address, options);
            else
                mechanism = new (std::nothrow)
                    plain_client_t (options);
671
            alloc_assert (mechanism);
672
        }
673
#ifdef ZMQ_HAVE_CURVE
674
        else
675 676
        if (options.mechanism == ZMQ_CURVE
        &&  memcmp (greeting_recv + 12, "CURVE\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
677
            if (options.as_server)
678 679
                mechanism = new (std::nothrow)
                    curve_server_t (session, peer_address, options);
680 681 682 683 684
            else
                mechanism = new (std::nothrow) curve_client_t (options);
            alloc_assert (mechanism);
        }
#endif
Chris Laws's avatar
Chris Laws committed
685
#ifdef HAVE_LIBGSSAPI_KRB5
686
        else
687 688
        if (options.mechanism == ZMQ_GSSAPI
        &&  memcmp (greeting_recv + 12, "GSSAPI\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
689 690 691 692 693
            if (options.as_server)
                mechanism = new (std::nothrow)
                    gssapi_server_t (session, peer_address, options);
            else
                mechanism = new (std::nothrow) gssapi_client_t (options);
694 695
            alloc_assert (mechanism);
        }
696
#endif
697
        else {
698
            error (protocol_error);
699 700
            return false;
        }
Martin Hurton's avatar
Martin Hurton committed
701 702
        next_msg = &stream_engine_t::next_handshake_command;
        process_msg = &stream_engine_t::process_handshake_command;
703
    }
Martin Hurton's avatar
Martin Hurton committed
704 705 706 707 708 709 710 711 712

    // Start polling for output if necessary.
    if (outsize == 0)
        set_pollout (handle);

    //  Handshaking was successful.
    //  Switch into the normal message flow.
    handshaking = false;

713 714 715 716 717
    if (has_handshake_timer) {
        cancel_timer (handshake_timer_id);
        has_handshake_timer = false;
    }

Martin Hurton's avatar
Martin Hurton committed
718 719 720
    return true;
}

Martin Hurton's avatar
Martin Hurton committed
721
int zmq::stream_engine_t::identity_msg (msg_t *msg_)
722
{
723 724 725 726
    int rc = msg_->init_size (options.identity_size);
    errno_assert (rc == 0);
    if (options.identity_size > 0)
        memcpy (msg_->data (), options.identity, options.identity_size);
Martin Hurton's avatar
Martin Hurton committed
727
    next_msg = &stream_engine_t::pull_msg_from_session;
728 729
    return 0;
}
730

Martin Hurton's avatar
Martin Hurton committed
731
int zmq::stream_engine_t::process_identity_msg (msg_t *msg_)
732 733 734 735
{
    if (options.recv_identity) {
        msg_->set_flags (msg_t::identity);
        int rc = session->push_msg (msg_);
736 737
        errno_assert (rc == 0);
    }
738 739 740 741 742 743 744 745
    else {
        int rc = msg_->close ();
        errno_assert (rc == 0);
        rc = msg_->init ();
        errno_assert (rc == 0);
    }

    if (subscription_required)
Martin Hurton's avatar
Martin Hurton committed
746
        process_msg = &stream_engine_t::write_subscription_msg;
747
    else
Martin Hurton's avatar
Martin Hurton committed
748
        process_msg = &stream_engine_t::push_msg_to_session;
749

750 751
    return 0;
}
752

753
int zmq::stream_engine_t::next_handshake_command (msg_t *msg_)
754
{
755 756
    zmq_assert (mechanism != NULL);

757 758 759 760 761 762 763 764 765 766 767 768 769 770
    if (mechanism->status () == mechanism_t::ready) {
        mechanism_ready ();
        return pull_and_encode (msg_);
    }
    else
    if (mechanism->status () == mechanism_t::error) {
        errno = EPROTO;
        return -1;
    }
    else {
        const int rc = mechanism->next_handshake_command (msg_);
        if (rc == 0)
            msg_->set_flags (msg_t::command);
        return rc;
771
    }
772 773
}

774
int zmq::stream_engine_t::process_handshake_command (msg_t *msg_)
775
{
776
    zmq_assert (mechanism != NULL);
777
    const int rc = mechanism->process_handshake_command (msg_);
778
    if (rc == 0) {
779
        if (mechanism->status () == mechanism_t::ready)
780
            mechanism_ready ();
781 782 783 784 785
        else
        if (mechanism->status () == mechanism_t::error) {
            errno = EPROTO;
            return -1;
        }
786 787
        if (output_stopped)
            restart_output ();
788 789
    }

790
    return rc;
791 792
}

793 794 795 796 797 798
void zmq::stream_engine_t::zap_msg_available ()
{
    zmq_assert (mechanism != NULL);

    const int rc = mechanism->zap_msg_available ();
    if (rc == -1) {
799
        error (protocol_error);
800 801
        return;
    }
802 803 804 805
    if (input_stopped)
        restart_input ();
    if (output_stopped)
        restart_output ();
806 807
}

808
void zmq::stream_engine_t::mechanism_ready ()
809
{
810 811 812 813 814
    if (options.heartbeat_interval > 0) {
        add_timer(options.heartbeat_interval, heartbeat_ivl_timer_id);
        has_heartbeat_timer = true;
    }

815 816 817 818
    if (options.recv_identity) {
        msg_t identity;
        mechanism->peer_identity (&identity);
        const int rc = session->push_msg (&identity);
819 820 821 822 823 824
        if (rc == -1 && errno == EAGAIN) {
            // If the write is failing at this stage with
            // an EAGAIN the pipe must be being shut down,
            // so we can just bail out of the identity set.
            return;
        }
825
        errno_assert (rc == 0);
826
        session->flush ();
827
    }
828

Martin Hurton's avatar
Martin Hurton committed
829 830
    next_msg = &stream_engine_t::pull_and_encode;
    process_msg = &stream_engine_t::write_credential;
831 832 833

    //  Compile metadata.
    properties_t properties;
Thomas Rodgers's avatar
Thomas Rodgers committed
834
    init_properties(properties);
835

Pieter Hintjens's avatar
Pieter Hintjens committed
836 837
    //  Add ZAP properties.
    const properties_t& zap_properties = mechanism->get_zap_properties ();
Thomas Rodgers's avatar
Thomas Rodgers committed
838
    properties.insert(zap_properties.begin (), zap_properties.end ());
Pieter Hintjens's avatar
Pieter Hintjens committed
839

840 841
    //  Add ZMTP properties.
    const properties_t& zmtp_properties = mechanism->get_zmtp_properties ();
Thomas Rodgers's avatar
Thomas Rodgers committed
842
    properties.insert(zmtp_properties.begin (), zmtp_properties.end ());
843

844 845 846
    zmq_assert (metadata == NULL);
    if (!properties.empty ())
        metadata = new (std::nothrow) metadata_t (properties);
847 848
}

849
int zmq::stream_engine_t::pull_msg_from_session (msg_t *msg_)
850
{
851 852
    return session->pull_msg (msg_);
}
853

854 855 856 857 858
int zmq::stream_engine_t::push_msg_to_session (msg_t *msg_)
{
    return session->push_msg (msg_);
}

859
int zmq::stream_engine_t::push_raw_msg_to_session (msg_t *msg_) {
860
    if (metadata && metadata != msg_->metadata())
861 862 863 864
        msg_->set_metadata(metadata);
    return push_msg_to_session(msg_);
}

865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883
int zmq::stream_engine_t::write_credential (msg_t *msg_)
{
    zmq_assert (mechanism != NULL);
    zmq_assert (session != NULL);

    const blob_t credential = mechanism->get_user_id ();
    if (credential.size () > 0) {
        msg_t msg;
        int rc = msg.init_size (credential.size ());
        zmq_assert (rc == 0);
        memcpy (msg.data (), credential.data (), credential.size ());
        msg.set_flags (msg_t::credential);
        rc = session->push_msg (&msg);
        if (rc == -1) {
            rc = msg.close ();
            errno_assert (rc == 0);
            return -1;
        }
    }
Martin Hurton's avatar
Martin Hurton committed
884
    process_msg = &stream_engine_t::decode_and_push;
885 886 887
    return decode_and_push (msg_);
}

888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904
int zmq::stream_engine_t::pull_and_encode (msg_t *msg_)
{
    zmq_assert (mechanism != NULL);

    if (session->pull_msg (msg_) == -1)
        return -1;
    if (mechanism->encode (msg_) == -1)
        return -1;
    return 0;
}

int zmq::stream_engine_t::decode_and_push (msg_t *msg_)
{
    zmq_assert (mechanism != NULL);

    if (mechanism->decode (msg_) == -1)
        return -1;
Jonathan Reams's avatar
Jonathan Reams committed
905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921

    if(has_timeout_timer) {
        has_timeout_timer = false;
        cancel_timer(heartbeat_timeout_timer_id);
    }

    if(has_ttl_timer) {
        has_ttl_timer = false;
        cancel_timer(heartbeat_ttl_timer_id);
    }

    if(msg_->flags() & msg_t::command) {
        uint8_t cmd_id = *((uint8_t*)msg_->data());
        if(cmd_id == 4)
            process_heartbeat_message(msg_);
    }

922
    if (metadata)
923
        msg_->set_metadata (metadata);
924 925
    if (session->push_msg (msg_) == -1) {
        if (errno == EAGAIN)
Martin Hurton's avatar
Martin Hurton committed
926
            process_msg = &stream_engine_t::push_one_then_decode_and_push;
927 928 929 930 931 932 933 934 935
        return -1;
    }
    return 0;
}

int zmq::stream_engine_t::push_one_then_decode_and_push (msg_t *msg_)
{
    const int rc = session->push_msg (msg_);
    if (rc == 0)
Martin Hurton's avatar
Martin Hurton committed
936
        process_msg = &stream_engine_t::decode_and_push;
937 938 939
    return rc;
}

940 941 942
int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_)
{
    msg_t subscription;
943

944 945
    //  Inject the subscription message, so that also
    //  ZMQ 2.x peers receive published messages.
946 947 948 949 950 951
    int rc = subscription.init_size (1);
    errno_assert (rc == 0);
    *(unsigned char*) subscription.data () = 1;
    rc = session->push_msg (&subscription);
    if (rc == -1)
       return -1;
952

Martin Hurton's avatar
Martin Hurton committed
953
    process_msg = &stream_engine_t::push_msg_to_session;
954
    return push_msg_to_session (msg_);
955 956
}

957
void zmq::stream_engine_t::error (error_reason_t reason)
958
{
959
    if (options.raw_socket && options.raw_notify) {
960 961 962 963
        //  For raw sockets, send a final 0-length message to the application
        //  so that it knows the peer has been disconnected.
        msg_t terminator;
        terminator.init();
Martin Hurton's avatar
Martin Hurton committed
964
        (this->*process_msg) (&terminator);
965 966
        terminator.close();
    }
967
    zmq_assert (session);
968
    socket->event_disconnected (endpoint, (int) s);
969
    session->flush ();
970
    session->engine_error (reason);
971 972 973 974
    unplug ();
    delete this;
}

975
void zmq::stream_engine_t::set_handshake_timer ()
976
{
977
    zmq_assert (!has_handshake_timer);
978

979
    if (!options.raw_socket && options.handshake_ivl > 0) {
980 981
        add_timer (options.handshake_ivl, handshake_timer_id);
        has_handshake_timer = true;
982
    }
983 984
}

Thomas Rodgers's avatar
Thomas Rodgers committed
985 986 987
bool zmq::stream_engine_t::init_properties (properties_t & properties) {
    if (peer_address.empty()) return false;
    properties.insert (std::make_pair("Peer-Address", peer_address));
988 989

    //  Private property to support deprecated SRCFD
990 991 992 993
    std::ostringstream stream;
    stream << (int)s;
    std::string fd_string = stream.str();
    properties.insert(std::make_pair("__fd", fd_string));
Thomas Rodgers's avatar
Thomas Rodgers committed
994 995 996
    return true;
}

997
void zmq::stream_engine_t::timer_event (int id_)
998
{
Jonathan Reams's avatar
Jonathan Reams committed
999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027
    if(id_ == handshake_timer_id) {
        has_handshake_timer = false;
        //  handshake timer expired before handshake completed, so engine fail
        error (timeout_error);
    }
    else if(id_ == heartbeat_ivl_timer_id) {
        next_msg = &stream_engine_t::produce_ping_message;
        out_event();
        add_timer(options.heartbeat_interval, heartbeat_ivl_timer_id);
    }
    else if(id_ == heartbeat_ttl_timer_id) {
        has_ttl_timer = false;
        error(timeout_error);
    }
    else if(id_ == heartbeat_timeout_timer_id) {
        has_timeout_timer = false;
        error(timeout_error);
    }
    else
        // There are no other valid timer ids!
        assert(false);
}

int zmq::stream_engine_t::produce_ping_message(msg_t * msg_)
{
    int rc = 0;
    zmq_assert (mechanism != NULL);

    // 16-bit TTL + \4PING == 7
1028
    rc = msg_->init_size(7);
1029 1030
    errno_assert(rc == 0);
    msg_->set_flags(msg_t::command);
Jonathan Reams's avatar
Jonathan Reams committed
1031 1032 1033 1034 1035 1036 1037 1038
    // Copy in the command message
    memcpy(msg_->data(), "\4PING", 5);

    uint16_t ttl_val = htons(options.heartbeat_ttl);
    memcpy(((uint8_t*)msg_->data()) + 5, &ttl_val, sizeof(ttl_val));

    rc = mechanism->encode (msg_);
    next_msg = &stream_engine_t::pull_and_encode;
1039 1040
    if(!has_timeout_timer && heartbeat_timeout > 0) {
        add_timer(heartbeat_timeout, heartbeat_timeout_timer_id);
Jonathan Reams's avatar
Jonathan Reams committed
1041 1042 1043 1044 1045 1046 1047 1048 1049 1050
        has_timeout_timer = true;
    }
    return rc;
}

int zmq::stream_engine_t::produce_pong_message(msg_t * msg_)
{
    int rc = 0;
    zmq_assert (mechanism != NULL);

1051
    rc = msg_->init_size(5);
1052 1053
    errno_assert(rc == 0);
    msg_->set_flags(msg_t::command);
Jonathan Reams's avatar
Jonathan Reams committed
1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069

    memcpy(msg_->data(), "\4PONG", 5);

    rc = mechanism->encode (msg_);
    next_msg = &stream_engine_t::pull_and_encode;
    return rc;
}

int zmq::stream_engine_t::process_heartbeat_message(msg_t * msg_)
{
    if(memcmp(msg_->data(), "\4PING", 5) == 0) {
        uint16_t remote_heartbeat_ttl;
        // Get the remote heartbeat TTL to setup the timer
        memcpy(&remote_heartbeat_ttl, (uint8_t*)msg_->data() + 5, 2);
        remote_heartbeat_ttl = ntohs(remote_heartbeat_ttl);
        // The remote heartbeat is in 10ths of a second
1070 1071
        // so we multiply it by 100 to get the timer interval in ms.
        remote_heartbeat_ttl *= 100;
Jonathan Reams's avatar
Jonathan Reams committed
1072 1073 1074 1075 1076

        if(!has_ttl_timer && remote_heartbeat_ttl > 0) {
            add_timer(remote_heartbeat_ttl, heartbeat_ttl_timer_id);
            has_ttl_timer = true;
        }
1077

Jonathan Reams's avatar
Jonathan Reams committed
1078 1079 1080 1081 1082
        next_msg = &stream_engine_t::produce_pong_message;
        out_event();
    }

    return 0;
1083
}