stream_engine.cpp 31.9 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "platform.hpp"
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#else
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <netdb.h>
#include <fcntl.h>
41 42 43
#if defined ZMQ_HAVE_OPENBSD
#define ucred sockpeercred
#endif
44 45 46 47
#endif

#include <string.h>
#include <new>
48
#include <sstream>
49

50
#include "stream_engine.hpp"
51
#include "io_thread.hpp"
52
#include "session_base.hpp"
53 54
#include "v1_encoder.hpp"
#include "v1_decoder.hpp"
55 56
#include "v2_encoder.hpp"
#include "v2_decoder.hpp"
57
#include "null_mechanism.hpp"
58 59
#include "plain_client.hpp"
#include "plain_server.hpp"
60 61
#include "gssapi_client.hpp"
#include "gssapi_server.hpp"
62 63
#include "curve_client.hpp"
#include "curve_server.hpp"
64 65
#include "raw_decoder.hpp"
#include "raw_encoder.hpp"
66 67
#include "config.hpp"
#include "err.hpp"
68
#include "ip.hpp"
69
#include "tcp.hpp"
Martin Hurton's avatar
Martin Hurton committed
70 71
#include "likely.hpp"
#include "wire.hpp"
72

Chris Laws's avatar
Chris Laws committed
73
zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
74
                                       const std::string &endpoint_) :
75
    s (fd_),
76 77
    inpos (NULL),
    insize (0),
78
    decoder (NULL),
79 80
    outpos (NULL),
    outsize (0),
81
    encoder (NULL),
82
    metadata (NULL),
Martin Hurton's avatar
Martin Hurton committed
83
    handshaking (true),
84
    greeting_size (v2_greeting_size),
Martin Hurton's avatar
Martin Hurton committed
85
    greeting_bytes_read (0),
86 87
    session (NULL),
    options (options_),
88
    endpoint (endpoint_),
89
    plugged (false),
Martin Hurton's avatar
Martin Hurton committed
90 91
    next_msg (&stream_engine_t::identity_msg),
    process_msg (&stream_engine_t::process_identity_msg),
92 93
    io_error (false),
    subscription_required (false),
94
    mechanism (NULL),
95 96
    input_stopped (false),
    output_stopped (false),
97
    has_handshake_timer (false),
Jonathan Reams's avatar
Jonathan Reams committed
98 99 100
    has_ttl_timer (false),
    has_timeout_timer (false),
    has_heartbeat_timer (false),
101
    heartbeat_timeout (0),
102
    socket (NULL)
103
{
104 105
    int rc = tx_msg.init ();
    errno_assert (rc == 0);
Chris Laws's avatar
Chris Laws committed
106

Martin Hurton's avatar
Martin Hurton committed
107
    //  Put the socket into non-blocking mode.
108
    unblock_socket (s);
109

110 111
    int family = get_peer_ip_address (s, peer_address);
    if (family == 0)
112
        peer_address.clear();
113
#if defined ZMQ_HAVE_SO_PEERCRED
114 115
    else
    if (family == PF_UNIX) {
116 117 118 119 120 121 122 123 124
        struct ucred cred;
        socklen_t size = sizeof (cred);
        if (!getsockopt (s, SOL_SOCKET, SO_PEERCRED, &cred, &size)) {
            std::ostringstream buf;
            buf << ":" << cred.uid << ":" << cred.gid << ":" << cred.pid;
            peer_address += buf.str ();
        }
    }
#elif defined ZMQ_HAVE_LOCAL_PEERCRED
125 126
    else
    if (family == PF_UNIX) {
127 128 129 130 131 132 133 134 135 136 137 138 139
        struct xucred cred;
        socklen_t size = sizeof (cred);
        if (!getsockopt (s, 0, LOCAL_PEERCRED, &cred, &size)
                && cred.cr_version == XUCRED_VERSION) {
            std::ostringstream buf;
            buf << ":" << cred.cr_uid << ":";
            if (cred.cr_ngroups > 0)
                buf << cred.cr_groups[0];
            buf << ":";
            peer_address += buf.str ();
        }
    }
#endif
140

141
#ifdef SO_NOSIGPIPE
142 143 144
    //  Make sure that SIGPIPE signal is not generated when writing to a
    //  connection that was already closed by the peer.
    int set = 1;
145
    rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int));
146 147
    errno_assert (rc == 0);
#endif
148 149 150 151 152
    if(options.heartbeat_interval > 0) {
        heartbeat_timeout = options.heartbeat_timeout;
        if(heartbeat_timeout == -1)
            heartbeat_timeout = options.heartbeat_interval;
    }
153 154
}

155
zmq::stream_engine_t::~stream_engine_t ()
156 157 158
{
    zmq_assert (!plugged);

159 160
    if (s != retired_fd) {
#ifdef ZMQ_HAVE_WINDOWS
161 162
        int rc = closesocket (s);
        wsa_assert (rc != SOCKET_ERROR);
163
#else
164
        int rc = close (s);
165 166
        errno_assert (rc == 0);
#endif
167
        s = retired_fd;
168
    }
169

170 171 172
    int rc = tx_msg.close ();
    errno_assert (rc == 0);

173 174 175 176 177 178
    //  Drop reference to metadata and destroy it if we are
    //  the only user.
    if (metadata != NULL)
        if (metadata->drop_ref ())
            delete metadata;

179 180 181
    delete encoder;
    delete decoder;
    delete mechanism;
182 183
}

184 185
void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
    session_base_t *session_)
186 187 188 189 190 191 192 193
{
    zmq_assert (!plugged);
    plugged = true;

    //  Connect to session object.
    zmq_assert (!session);
    zmq_assert (session_);
    session = session_;
194
    socket = session-> get_socket ();
195 196 197 198

    //  Connect to I/O threads poller object.
    io_object_t::plug (io_thread_);
    handle = add_fd (s);
199
    io_error = false;
Martin Hurton's avatar
Martin Hurton committed
200

201
    if (options.raw_socket) {
202
        // no handshaking for raw sock, instantiate raw encoder and decoders
203
        encoder = new (std::nothrow) raw_encoder_t (out_batch_size);
204 205
        alloc_assert (encoder);

206
        decoder = new (std::nothrow) raw_decoder_t (in_batch_size);
207 208 209 210
        alloc_assert (decoder);

        // disable handshaking for raw socket
        handshaking = false;
211

Martin Hurton's avatar
Martin Hurton committed
212
        next_msg = &stream_engine_t::pull_msg_from_session;
213 214
        process_msg = &stream_engine_t::push_raw_msg_to_session;

Thomas Rodgers's avatar
Thomas Rodgers committed
215 216
        properties_t properties;
        if (init_properties(properties)) {
217 218 219 220
            //  Compile metadata.
            zmq_assert (metadata == NULL);
            metadata = new (std::nothrow) metadata_t (properties);
        }
221

222 223 224 225 226
        if (options.raw_notify) {
            //  For raw sockets, send an initial 0-length message to the
            // application so that it knows a peer has connected.
            msg_t connector;
            connector.init();
227
            push_raw_msg_to_session (&connector);
228 229 230
            connector.close();
            session->flush ();
        }
Martin Hurton's avatar
Martin Hurton committed
231 232
    }
    else {
233 234 235
        // start optional timer, to prevent handshake hanging on no input
        set_handshake_timer ();

236 237
        //  Send the 'length' and 'flags' fields of the identity message.
        //  The 'length' field is encoded in the long format.
Pieter Hintjens's avatar
Pieter Hintjens committed
238
        outpos = greeting_send;
239 240 241 242 243
        outpos [outsize++] = 0xff;
        put_uint64 (&outpos [outsize], options.identity_size + 1);
        outsize += 8;
        outpos [outsize++] = 0x7f;
    }
Martin Hurton's avatar
Martin Hurton committed
244

245 246 247 248 249 250
    set_pollin (handle);
    set_pollout (handle);
    //  Flush all the data that may have been already received downstream.
    in_event ();
}

251
void zmq::stream_engine_t::unplug ()
252 253 254 255
{
    zmq_assert (plugged);
    plugged = false;

256 257 258 259 260 261
    //  Cancel all timers.
    if (has_handshake_timer) {
        cancel_timer (handshake_timer_id);
        has_handshake_timer = false;
    }

Jonathan Reams's avatar
Jonathan Reams committed
262 263 264 265 266 267 268 269 270 271 272 273 274 275
    if (has_ttl_timer) {
        cancel_timer (heartbeat_ttl_timer_id);
        has_ttl_timer = false;
    }

    if (has_timeout_timer) {
        cancel_timer (heartbeat_timeout_timer_id);
        has_timeout_timer = false;
    }

    if (has_heartbeat_timer) {
        cancel_timer (heartbeat_ivl_timer_id);
        has_heartbeat_timer = false;
    }
276
    //  Cancel all fd subscriptions.
277
    if (!io_error)
Martin Hurton's avatar
Martin Hurton committed
278
        rm_fd (handle);
279 280 281 282 283 284 285

    //  Disconnect from I/O threads poller object.
    io_object_t::unplug ();

    session = NULL;
}

286
void zmq::stream_engine_t::terminate ()
287 288 289 290 291
{
    unplug ();
    delete this;
}

292
void zmq::stream_engine_t::in_event ()
293
{
294
    zmq_assert (!io_error);
295

Pieter Hintjens's avatar
Pieter Hintjens committed
296
    //  If still handshaking, receive and process the greeting message.
Martin Hurton's avatar
Martin Hurton committed
297 298 299 300
    if (unlikely (handshaking))
        if (!handshake ())
            return;

301
    zmq_assert (decoder);
302 303

    //  If there has been an I/O error, stop polling.
304
    if (input_stopped) {
305 306 307 308
        rm_fd (handle);
        io_error = true;
        return;
    }
309 310 311 312 313 314 315 316

    //  If there's no data to process in the buffer...
    if (!insize) {

        //  Retrieve the buffer and read as much data as possible.
        //  Note that buffer can be arbitrarily large. However, we assume
        //  the underlying TCP layer has fixed buffer size and thus the
        //  number of bytes read will be always limited.
317 318
        size_t bufsize = 0;
        decoder->get_buffer (&inpos, &bufsize);
319

320
        const int rc = tcp_read (s, inpos, bufsize);
321

322
        if (rc == 0) {
323
            error (connection_error);
324
            return;
325
        }
326 327
        if (rc == -1) {
            if (errno != EAGAIN)
328
                error (connection_error);
329 330
            return;
        }
331

332
        //  Adjust input size
333
        insize = static_cast <size_t> (rc);
334 335
        // Adjust buffer size to received bytes
        decoder->resize_buffer(insize);
Martin Hurton's avatar
Martin Hurton committed
336
    }
337

338 339
    int rc = 0;
    size_t processed = 0;
340

341 342 343
    while (insize > 0) {
        rc = decoder->decode (inpos, insize, processed);
        zmq_assert (processed <= insize);
344 345
        inpos += processed;
        insize -= processed;
346 347
        if (rc == 0 || rc == -1)
            break;
Martin Hurton's avatar
Martin Hurton committed
348
        rc = (this->*process_msg) (decoder->msg ());
349 350
        if (rc == -1)
            break;
351 352
    }

353 354 355 356
    //  Tear down the connection if we have failed to decode input data
    //  or the session has rejected the message.
    if (rc == -1) {
        if (errno != EAGAIN) {
357
            error (protocol_error);
358 359
            return;
        }
360
        input_stopped = true;
361
        reset_pollin (handle);
Martin Hurton's avatar
Martin Hurton committed
362
    }
363 364

    session->flush ();
365 366
}

367
void zmq::stream_engine_t::out_event ()
368
{
369 370
    zmq_assert (!io_error);

371 372 373
    //  If write buffer is empty, try to read new data from the encoder.
    if (!outsize) {

Martin Hurton's avatar
Martin Hurton committed
374 375 376 377 378 379 380 381
        //  Even when we stop polling as soon as there is no
        //  data to send, the poller may invoke out_event one
        //  more time due to 'speculative write' optimisation.
        if (unlikely (encoder == NULL)) {
            zmq_assert (handshaking);
            return;
        }

382
        outpos = NULL;
383 384 385
        outsize = encoder->encode (&outpos, 0);

        while (outsize < out_batch_size) {
Martin Hurton's avatar
Martin Hurton committed
386
            if ((this->*next_msg) (&tx_msg) == -1)
387 388 389 390 391 392 393 394 395
                break;
            encoder->load_msg (&tx_msg);
            unsigned char *bufptr = outpos + outsize;
            size_t n = encoder->encode (&bufptr, out_batch_size - outsize);
            zmq_assert (n > 0);
            if (outpos == NULL)
                outpos = bufptr;
            outsize += n;
        }
396 397 398

        //  If there is no data to send, stop polling for output.
        if (outsize == 0) {
399
            output_stopped = true;
400 401 402 403 404 405 406
            reset_pollout (handle);
            return;
        }
    }

    //  If there are any data to write in write buffer, write as much as
    //  possible to the socket. Note that amount of data to write can be
407
    //  arbitrarily large. However, we assume that underlying TCP layer has
408 409
    //  limited transmission buffer and thus the actual number of bytes
    //  written should be reasonably modest.
410
    const int nbytes = tcp_write (s, outpos, outsize);
411

Martin Hurton's avatar
Martin Hurton committed
412 413
    //  IO error has occurred. We stop waiting for output events.
    //  The engine is not terminated until we detect input error;
414
    //  this is necessary to prevent losing incoming messages.
415
    if (nbytes == -1) {
Martin Hurton's avatar
Martin Hurton committed
416
        reset_pollout (handle);
417 418 419 420 421
        return;
    }

    outpos += nbytes;
    outsize -= nbytes;
Martin Hurton's avatar
Martin Hurton committed
422 423 424 425 426 427

    //  If we are still handshaking and there are no data
    //  to send, stop polling for output.
    if (unlikely (handshaking))
        if (outsize == 0)
            reset_pollout (handle);
428 429
}

430
void zmq::stream_engine_t::restart_output ()
431
{
432 433 434
    if (unlikely (io_error))
        return;

435
    if (likely (output_stopped)) {
436
        set_pollout (handle);
437
        output_stopped = false;
438
    }
439 440 441 442 443 444 445 446

    //  Speculative write: The assumption is that at the moment new message
    //  was sent by the user the socket is probably available for writing.
    //  Thus we try to write the data to socket avoiding polling for POLLOUT.
    //  Consequently, the latency should be better in request/reply scenarios.
    out_event ();
}

447
void zmq::stream_engine_t::restart_input ()
448
{
449
    zmq_assert (input_stopped);
450 451 452
    zmq_assert (session != NULL);
    zmq_assert (decoder != NULL);

Martin Hurton's avatar
Martin Hurton committed
453
    int rc = (this->*process_msg) (decoder->msg ());
454 455 456 457
    if (rc == -1) {
        if (errno == EAGAIN)
            session->flush ();
        else
458
            error (protocol_error);
Martin Hurton's avatar
Martin Hurton committed
459 460 461
        return;
    }

462 463 464 465 466 467 468 469
    while (insize > 0) {
        size_t processed = 0;
        rc = decoder->decode (inpos, insize, processed);
        zmq_assert (processed <= insize);
        inpos += processed;
        insize -= processed;
        if (rc == 0 || rc == -1)
            break;
Martin Hurton's avatar
Martin Hurton committed
470
        rc = (this->*process_msg) (decoder->msg ());
471 472 473
        if (rc == -1)
            break;
    }
474

475 476 477
    if (rc == -1 && errno == EAGAIN)
        session->flush ();
    else
478 479 480 481 482
    if (io_error)
        error (connection_error);
    else
    if (rc == -1)
        error (protocol_error);
483
    else {
484
        input_stopped = false;
485 486 487 488 489 490
        set_pollin (handle);
        session->flush ();

        //  Speculative read.
        in_event ();
    }
491 492
}

493
bool zmq::stream_engine_t::handshake ()
Martin Hurton's avatar
Martin Hurton committed
494
{
495
    zmq_assert (handshaking);
Martin Hurton's avatar
Martin Hurton committed
496
    zmq_assert (greeting_bytes_read < greeting_size);
497
    //  Receive the greeting.
Martin Hurton's avatar
Martin Hurton committed
498
    while (greeting_bytes_read < greeting_size) {
499 500
        const int n = tcp_read (s, greeting_recv + greeting_bytes_read,
                                greeting_size - greeting_bytes_read);
501
        if (n == 0) {
502
            error (connection_error);
503 504
            return false;
        }
505 506
        if (n == -1) {
            if (errno != EAGAIN)
507
                error (connection_error);
508
            return false;
509
        }
Martin Hurton's avatar
Martin Hurton committed
510 511 512

        greeting_bytes_read += n;

513 514 515
        //  We have received at least one byte from the peer.
        //  If the first byte is not 0xff, we know that the
        //  peer is using unversioned protocol.
Pieter Hintjens's avatar
Pieter Hintjens committed
516
        if (greeting_recv [0] != 0xff)
517
            break;
Martin Hurton's avatar
Martin Hurton committed
518

519
        if (greeting_bytes_read < signature_size)
520
            continue;
Martin Hurton's avatar
Martin Hurton committed
521

522 523 524 525
        //  Inspect the right-most bit of the 10th byte (which coincides
        //  with the 'flags' field if a regular message was sent).
        //  Zero indicates this is a header of identity message
        //  (i.e. the peer is using the unversioned protocol).
Pieter Hintjens's avatar
Pieter Hintjens committed
526
        if (!(greeting_recv [9] & 0x01))
527
            break;
Martin Hurton's avatar
Martin Hurton committed
528

529
        //  The peer is using versioned protocol.
530 531
        //  Send the major version number.
        if (outpos + outsize == greeting_send + signature_size) {
Martin Hurton's avatar
Martin Hurton committed
532 533
            if (outsize == 0)
                set_pollout (handle);
534 535 536 537 538 539 540 541 542 543 544 545 546 547 548
            outpos [outsize++] = 3;     //  Major version number
        }

        if (greeting_bytes_read > signature_size) {
            if (outpos + outsize == greeting_send + signature_size + 1) {
                if (outsize == 0)
                    set_pollout (handle);

                //  Use ZMTP/2.0 to talk to older peers.
                if (greeting_recv [10] == ZMTP_1_0
                ||  greeting_recv [10] == ZMTP_2_0)
                    outpos [outsize++] = options.type;
                else {
                    outpos [outsize++] = 0; //  Minor version number
                    memset (outpos + outsize, 0, 20);
549 550 551

                    zmq_assert (options.mechanism == ZMQ_NULL
                            ||  options.mechanism == ZMQ_PLAIN
552 553
                            ||  options.mechanism == ZMQ_CURVE
                            ||  options.mechanism == ZMQ_GSSAPI);
554

555 556 557
                    if (options.mechanism == ZMQ_NULL)
                        memcpy (outpos + outsize, "NULL", 4);
                    else
558
                    if (options.mechanism == ZMQ_PLAIN)
559
                        memcpy (outpos + outsize, "PLAIN", 5);
560
                    else
561 562
                    if (options.mechanism == ZMQ_GSSAPI)
                        memcpy (outpos + outsize, "GSSAPI", 6);
563
                    else
564
                    if (options.mechanism == ZMQ_CURVE)
565
                        memcpy (outpos + outsize, "CURVE", 5);
566 567 568 569 570 571
                    outsize += 20;
                    memset (outpos + outsize, 0, 32);
                    outsize += 32;
                    greeting_size = v3_greeting_size;
                }
            }
Martin Hurton's avatar
Martin Hurton committed
572 573 574
        }
    }

575 576
    //  Position of the revision field in the greeting.
    const size_t revision_pos = 10;
577

578 579
    //  Is the peer using ZMTP/1.0 with no revision number?
    //  If so, we send and receive rest of identity message
Pieter Hintjens's avatar
Pieter Hintjens committed
580
    if (greeting_recv [0] != 0xff || !(greeting_recv [9] & 0x01)) {
Min RK's avatar
Min RK committed
581
        if (session->zap_enabled ()) {
582 583 584 585 586
           // reject ZMTP 1.0 connections if ZAP is enabled
           error (protocol_error);
           return false;
        }

587
        encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
588 589
        alloc_assert (encoder);

590
        decoder = new (std::nothrow) v1_decoder_t (in_batch_size, options.maxmsgsize);
591 592
        alloc_assert (decoder);

Martin Hurton's avatar
Martin Hurton committed
593 594 595 596 597 598
        //  We have already sent the message header.
        //  Since there is no way to tell the encoder to
        //  skip the message header, we simply throw that
        //  header data away.
        const size_t header_size = options.identity_size + 1 >= 255 ? 10 : 2;
        unsigned char tmp [10], *bufferp = tmp;
599 600 601 602 603 604 605

        //  Prepare the identity message and load it into encoder.
        //  Then consume bytes we have already sent to the peer.
        const int rc = tx_msg.init_size (options.identity_size);
        zmq_assert (rc == 0);
        memcpy (tx_msg.data (), options.identity, options.identity_size);
        encoder->load_msg (&tx_msg);
606
        size_t buffer_size = encoder->encode (&bufferp, header_size);
Martin Hurton's avatar
Martin Hurton committed
607 608 609
        zmq_assert (buffer_size == header_size);

        //  Make sure the decoder sees the data we have already received.
Pieter Hintjens's avatar
Pieter Hintjens committed
610
        inpos = greeting_recv;
Martin Hurton's avatar
Martin Hurton committed
611
        insize = greeting_bytes_read;
612 613

        //  To allow for interoperability with peers that do not forward
614 615
        //  their subscriptions, we inject a phantom subscription message
        //  message into the incoming message stream.
616
        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB)
617
            subscription_required = true;
Martin Hurton's avatar
Martin Hurton committed
618 619 620

        //  We are sending our identity now and the next message
        //  will come from the socket.
Martin Hurton's avatar
Martin Hurton committed
621
        next_msg = &stream_engine_t::pull_msg_from_session;
Martin Hurton's avatar
Martin Hurton committed
622 623

        //  We are expecting identity message.
Martin Hurton's avatar
Martin Hurton committed
624
        process_msg = &stream_engine_t::process_identity_msg;
Martin Hurton's avatar
Martin Hurton committed
625
    }
626
    else
Pieter Hintjens's avatar
Pieter Hintjens committed
627
    if (greeting_recv [revision_pos] == ZMTP_1_0) {
Min RK's avatar
Min RK committed
628
        if (session->zap_enabled ()) {
629 630 631 632 633
           // reject ZMTP 1.0 connections if ZAP is enabled
           error (protocol_error);
           return false;
        }

634 635
        encoder = new (std::nothrow) v1_encoder_t (
            out_batch_size);
636 637
        alloc_assert (encoder);

638 639
        decoder = new (std::nothrow) v1_decoder_t (
            in_batch_size, options.maxmsgsize);
640 641
        alloc_assert (decoder);
    }
642 643
    else
    if (greeting_recv [revision_pos] == ZMTP_2_0) {
Min RK's avatar
Min RK committed
644
        if (session->zap_enabled ()) {
645 646 647 648 649
           // reject ZMTP 2.0 connections if ZAP is enabled
           error (protocol_error);
           return false;
        }

650 651 652 653 654 655 656
        encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
        alloc_assert (encoder);

        decoder = new (std::nothrow) v2_decoder_t (
            in_batch_size, options.maxmsgsize);
        alloc_assert (decoder);
    }
657
    else {
658
        encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
659 660
        alloc_assert (encoder);

661
        decoder = new (std::nothrow) v2_decoder_t (
662
            in_batch_size, options.maxmsgsize);
663
        alloc_assert (decoder);
664

665 666
        if (options.mechanism == ZMQ_NULL
        &&  memcmp (greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
667 668
            mechanism = new (std::nothrow)
                null_mechanism_t (session, peer_address, options);
669
            alloc_assert (mechanism);
670 671
        }
        else
672 673
        if (options.mechanism == ZMQ_PLAIN
        &&  memcmp (greeting_recv + 12, "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
674 675 676 677 678 679
            if (options.as_server)
                mechanism = new (std::nothrow)
                    plain_server_t (session, peer_address, options);
            else
                mechanism = new (std::nothrow)
                    plain_client_t (options);
680
            alloc_assert (mechanism);
681
        }
682 683
#ifdef HAVE_LIBSODIUM
        else
684 685
        if (options.mechanism == ZMQ_CURVE
        &&  memcmp (greeting_recv + 12, "CURVE\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
686
            if (options.as_server)
687 688
                mechanism = new (std::nothrow)
                    curve_server_t (session, peer_address, options);
689 690 691 692 693
            else
                mechanism = new (std::nothrow) curve_client_t (options);
            alloc_assert (mechanism);
        }
#endif
Chris Laws's avatar
Chris Laws committed
694
#ifdef HAVE_LIBGSSAPI_KRB5
695
        else
696 697
        if (options.mechanism == ZMQ_GSSAPI
        &&  memcmp (greeting_recv + 12, "GSSAPI\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
698 699 700 701 702
            if (options.as_server)
                mechanism = new (std::nothrow)
                    gssapi_server_t (session, peer_address, options);
            else
                mechanism = new (std::nothrow) gssapi_client_t (options);
703 704
            alloc_assert (mechanism);
        }
705
#endif
706
        else {
707
            error (protocol_error);
708 709
            return false;
        }
Martin Hurton's avatar
Martin Hurton committed
710 711
        next_msg = &stream_engine_t::next_handshake_command;
        process_msg = &stream_engine_t::process_handshake_command;
Jonathan Reams's avatar
Jonathan Reams committed
712 713 714 715 716

        if(options.heartbeat_interval > 0) {
            add_timer(options.heartbeat_interval, heartbeat_ivl_timer_id);
            has_heartbeat_timer = true;
        }
717
    }
Martin Hurton's avatar
Martin Hurton committed
718 719 720 721 722 723 724 725 726

    // Start polling for output if necessary.
    if (outsize == 0)
        set_pollout (handle);

    //  Handshaking was successful.
    //  Switch into the normal message flow.
    handshaking = false;

727 728 729 730 731
    if (has_handshake_timer) {
        cancel_timer (handshake_timer_id);
        has_handshake_timer = false;
    }

Martin Hurton's avatar
Martin Hurton committed
732 733 734
    return true;
}

Martin Hurton's avatar
Martin Hurton committed
735
int zmq::stream_engine_t::identity_msg (msg_t *msg_)
736
{
737 738 739 740
    int rc = msg_->init_size (options.identity_size);
    errno_assert (rc == 0);
    if (options.identity_size > 0)
        memcpy (msg_->data (), options.identity, options.identity_size);
Martin Hurton's avatar
Martin Hurton committed
741
    next_msg = &stream_engine_t::pull_msg_from_session;
742 743
    return 0;
}
744

Martin Hurton's avatar
Martin Hurton committed
745
int zmq::stream_engine_t::process_identity_msg (msg_t *msg_)
746 747 748 749
{
    if (options.recv_identity) {
        msg_->set_flags (msg_t::identity);
        int rc = session->push_msg (msg_);
750 751
        errno_assert (rc == 0);
    }
752 753 754 755 756 757 758 759
    else {
        int rc = msg_->close ();
        errno_assert (rc == 0);
        rc = msg_->init ();
        errno_assert (rc == 0);
    }

    if (subscription_required)
Martin Hurton's avatar
Martin Hurton committed
760
        process_msg = &stream_engine_t::write_subscription_msg;
761
    else
Martin Hurton's avatar
Martin Hurton committed
762
        process_msg = &stream_engine_t::push_msg_to_session;
763

764 765
    return 0;
}
766

767
int zmq::stream_engine_t::next_handshake_command (msg_t *msg_)
768
{
769 770
    zmq_assert (mechanism != NULL);

771 772 773 774 775 776 777 778 779 780 781 782 783 784
    if (mechanism->status () == mechanism_t::ready) {
        mechanism_ready ();
        return pull_and_encode (msg_);
    }
    else
    if (mechanism->status () == mechanism_t::error) {
        errno = EPROTO;
        return -1;
    }
    else {
        const int rc = mechanism->next_handshake_command (msg_);
        if (rc == 0)
            msg_->set_flags (msg_t::command);
        return rc;
785
    }
786 787
}

788
int zmq::stream_engine_t::process_handshake_command (msg_t *msg_)
789
{
790
    zmq_assert (mechanism != NULL);
791
    const int rc = mechanism->process_handshake_command (msg_);
792
    if (rc == 0) {
793
        if (mechanism->status () == mechanism_t::ready)
794
            mechanism_ready ();
795 796 797 798 799
        else
        if (mechanism->status () == mechanism_t::error) {
            errno = EPROTO;
            return -1;
        }
800 801
        if (output_stopped)
            restart_output ();
802 803
    }

804
    return rc;
805 806
}

807 808 809 810 811 812
void zmq::stream_engine_t::zap_msg_available ()
{
    zmq_assert (mechanism != NULL);

    const int rc = mechanism->zap_msg_available ();
    if (rc == -1) {
813
        error (protocol_error);
814 815
        return;
    }
816 817 818 819
    if (input_stopped)
        restart_input ();
    if (output_stopped)
        restart_output ();
820 821
}

822
void zmq::stream_engine_t::mechanism_ready ()
823
{
824 825 826 827
    if (options.recv_identity) {
        msg_t identity;
        mechanism->peer_identity (&identity);
        const int rc = session->push_msg (&identity);
828 829 830 831 832 833
        if (rc == -1 && errno == EAGAIN) {
            // If the write is failing at this stage with
            // an EAGAIN the pipe must be being shut down,
            // so we can just bail out of the identity set.
            return;
        }
834
        errno_assert (rc == 0);
835
        session->flush ();
836
    }
837

Martin Hurton's avatar
Martin Hurton committed
838 839
    next_msg = &stream_engine_t::pull_and_encode;
    process_msg = &stream_engine_t::write_credential;
840 841 842

    //  Compile metadata.
    properties_t properties;
Thomas Rodgers's avatar
Thomas Rodgers committed
843
    init_properties(properties);
844

Pieter Hintjens's avatar
Pieter Hintjens committed
845 846
    //  Add ZAP properties.
    const properties_t& zap_properties = mechanism->get_zap_properties ();
Thomas Rodgers's avatar
Thomas Rodgers committed
847
    properties.insert(zap_properties.begin (), zap_properties.end ());
Pieter Hintjens's avatar
Pieter Hintjens committed
848

849 850
    //  Add ZMTP properties.
    const properties_t& zmtp_properties = mechanism->get_zmtp_properties ();
Thomas Rodgers's avatar
Thomas Rodgers committed
851
    properties.insert(zmtp_properties.begin (), zmtp_properties.end ());
852

853 854 855
    zmq_assert (metadata == NULL);
    if (!properties.empty ())
        metadata = new (std::nothrow) metadata_t (properties);
856 857
}

858
int zmq::stream_engine_t::pull_msg_from_session (msg_t *msg_)
859
{
860 861
    return session->pull_msg (msg_);
}
862

863 864 865 866 867
int zmq::stream_engine_t::push_msg_to_session (msg_t *msg_)
{
    return session->push_msg (msg_);
}

868 869 870 871 872 873
int zmq::stream_engine_t::push_raw_msg_to_session (msg_t *msg_) {
    if (metadata)
        msg_->set_metadata(metadata);
    return push_msg_to_session(msg_);
}

874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892
int zmq::stream_engine_t::write_credential (msg_t *msg_)
{
    zmq_assert (mechanism != NULL);
    zmq_assert (session != NULL);

    const blob_t credential = mechanism->get_user_id ();
    if (credential.size () > 0) {
        msg_t msg;
        int rc = msg.init_size (credential.size ());
        zmq_assert (rc == 0);
        memcpy (msg.data (), credential.data (), credential.size ());
        msg.set_flags (msg_t::credential);
        rc = session->push_msg (&msg);
        if (rc == -1) {
            rc = msg.close ();
            errno_assert (rc == 0);
            return -1;
        }
    }
Martin Hurton's avatar
Martin Hurton committed
893
    process_msg = &stream_engine_t::decode_and_push;
894 895 896
    return decode_and_push (msg_);
}

897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913
int zmq::stream_engine_t::pull_and_encode (msg_t *msg_)
{
    zmq_assert (mechanism != NULL);

    if (session->pull_msg (msg_) == -1)
        return -1;
    if (mechanism->encode (msg_) == -1)
        return -1;
    return 0;
}

int zmq::stream_engine_t::decode_and_push (msg_t *msg_)
{
    zmq_assert (mechanism != NULL);

    if (mechanism->decode (msg_) == -1)
        return -1;
Jonathan Reams's avatar
Jonathan Reams committed
914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930

    if(has_timeout_timer) {
        has_timeout_timer = false;
        cancel_timer(heartbeat_timeout_timer_id);
    }

    if(has_ttl_timer) {
        has_ttl_timer = false;
        cancel_timer(heartbeat_ttl_timer_id);
    }

    if(msg_->flags() & msg_t::command) {
        uint8_t cmd_id = *((uint8_t*)msg_->data());
        if(cmd_id == 4)
            process_heartbeat_message(msg_);
    }

931
    if (metadata)
932
        msg_->set_metadata (metadata);
933 934
    if (session->push_msg (msg_) == -1) {
        if (errno == EAGAIN)
Martin Hurton's avatar
Martin Hurton committed
935
            process_msg = &stream_engine_t::push_one_then_decode_and_push;
936 937 938 939 940 941 942 943 944
        return -1;
    }
    return 0;
}

int zmq::stream_engine_t::push_one_then_decode_and_push (msg_t *msg_)
{
    const int rc = session->push_msg (msg_);
    if (rc == 0)
Martin Hurton's avatar
Martin Hurton committed
945
        process_msg = &stream_engine_t::decode_and_push;
946 947 948
    return rc;
}

949 950 951
int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_)
{
    msg_t subscription;
952

953 954
    //  Inject the subscription message, so that also
    //  ZMQ 2.x peers receive published messages.
955 956 957 958 959 960
    int rc = subscription.init_size (1);
    errno_assert (rc == 0);
    *(unsigned char*) subscription.data () = 1;
    rc = session->push_msg (&subscription);
    if (rc == -1)
       return -1;
961

Martin Hurton's avatar
Martin Hurton committed
962
    process_msg = &stream_engine_t::push_msg_to_session;
963
    return push_msg_to_session (msg_);
964 965
}

966
void zmq::stream_engine_t::error (error_reason_t reason)
967
{
968
    if (options.raw_socket && options.raw_notify) {
969 970 971 972
        //  For raw sockets, send a final 0-length message to the application
        //  so that it knows the peer has been disconnected.
        msg_t terminator;
        terminator.init();
Martin Hurton's avatar
Martin Hurton committed
973
        (this->*process_msg) (&terminator);
974 975
        terminator.close();
    }
976
    zmq_assert (session);
977
    socket->event_disconnected (endpoint, (int) s);
978
    session->flush ();
979
    session->engine_error (reason);
980 981 982 983
    unplug ();
    delete this;
}

984
void zmq::stream_engine_t::set_handshake_timer ()
985
{
986
    zmq_assert (!has_handshake_timer);
987

988
    if (!options.raw_socket && options.handshake_ivl > 0) {
989 990
        add_timer (options.handshake_ivl, handshake_timer_id);
        has_handshake_timer = true;
991
    }
992 993
}

Thomas Rodgers's avatar
Thomas Rodgers committed
994 995 996 997 998 999
bool zmq::stream_engine_t::init_properties (properties_t & properties) {
    if (peer_address.empty()) return false;
    properties.insert (std::make_pair("Peer-Address", peer_address));
    return true;
}

1000
void zmq::stream_engine_t::timer_event (int id_)
1001
{
Jonathan Reams's avatar
Jonathan Reams committed
1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040
    if(id_ == handshake_timer_id) {
        has_handshake_timer = false;
        //  handshake timer expired before handshake completed, so engine fail
        error (timeout_error);
    }
    else if(id_ == heartbeat_ivl_timer_id) {
        next_msg = &stream_engine_t::produce_ping_message;
        out_event();
        add_timer(options.heartbeat_interval, heartbeat_ivl_timer_id);
    }
    else if(id_ == heartbeat_ttl_timer_id) {
        has_ttl_timer = false;
        error(timeout_error);
    }
    else if(id_ == heartbeat_timeout_timer_id) {
        has_timeout_timer = false;
        error(timeout_error);
    }
    else
        // There are no other valid timer ids!
        assert(false);
}

int zmq::stream_engine_t::produce_ping_message(msg_t * msg_)
{
    int rc = 0;
    zmq_assert (mechanism != NULL);

    // 16-bit TTL + \4PING == 7
    msg_->init_size(7);
    msg_->set_flags(msg_t::command);
    // Copy in the command message
    memcpy(msg_->data(), "\4PING", 5);

    uint16_t ttl_val = htons(options.heartbeat_ttl);
    memcpy(((uint8_t*)msg_->data()) + 5, &ttl_val, sizeof(ttl_val));

    rc = mechanism->encode (msg_);
    next_msg = &stream_engine_t::pull_and_encode;
1041 1042
    if(!has_timeout_timer && heartbeat_timeout > 0) {
        add_timer(heartbeat_timeout, heartbeat_timeout_timer_id);
Jonathan Reams's avatar
Jonathan Reams committed
1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070
        has_timeout_timer = true;
    }
    return rc;
}

int zmq::stream_engine_t::produce_pong_message(msg_t * msg_)
{
    int rc = 0;
    zmq_assert (mechanism != NULL);

    msg_->init_size(5);
    msg_->set_flags(msg_t::command);

    memcpy(msg_->data(), "\4PONG", 5);

    rc = mechanism->encode (msg_);
    next_msg = &stream_engine_t::pull_and_encode;
    return rc;
}

int zmq::stream_engine_t::process_heartbeat_message(msg_t * msg_)
{
    if(memcmp(msg_->data(), "\4PING", 5) == 0) {
        uint16_t remote_heartbeat_ttl;
        // Get the remote heartbeat TTL to setup the timer
        memcpy(&remote_heartbeat_ttl, (uint8_t*)msg_->data() + 5, 2);
        remote_heartbeat_ttl = ntohs(remote_heartbeat_ttl);
        // The remote heartbeat is in 10ths of a second
1071 1072
        // so we multiply it by 100 to get the timer interval in ms.
        remote_heartbeat_ttl *= 100;
Jonathan Reams's avatar
Jonathan Reams committed
1073 1074 1075 1076 1077

        if(!has_ttl_timer && remote_heartbeat_ttl > 0) {
            add_timer(remote_heartbeat_ttl, heartbeat_ttl_timer_id);
            has_ttl_timer = true;
        }
1078

Jonathan Reams's avatar
Jonathan Reams committed
1079 1080 1081 1082 1083
        next_msg = &stream_engine_t::produce_pong_message;
        out_event();
    }

    return 0;
1084
}