stream_engine.cpp 32.3 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
32 33

#include <string.h>
34 35

#ifndef ZMQ_HAVE_WINDOWS
36
#include <unistd.h>
37 38
#endif

39
#include <new>
40
#include <sstream>
41

42
#include "stream_engine.hpp"
43
#include "io_thread.hpp"
44
#include "session_base.hpp"
45 46
#include "v1_encoder.hpp"
#include "v1_decoder.hpp"
47 48
#include "v2_encoder.hpp"
#include "v2_decoder.hpp"
49
#include "null_mechanism.hpp"
50 51
#include "plain_client.hpp"
#include "plain_server.hpp"
52 53
#include "gssapi_client.hpp"
#include "gssapi_server.hpp"
54 55
#include "curve_client.hpp"
#include "curve_server.hpp"
56 57
#include "raw_decoder.hpp"
#include "raw_encoder.hpp"
58 59
#include "config.hpp"
#include "err.hpp"
60
#include "ip.hpp"
61
#include "tcp.hpp"
Martin Hurton's avatar
Martin Hurton committed
62 63
#include "likely.hpp"
#include "wire.hpp"
64

Chris Laws's avatar
Chris Laws committed
65
zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
66
                                       const std::string &endpoint_) :
67
    s (fd_),
68
    as_server(false),
69
    handle((handle_t)NULL),
70 71
    inpos (NULL),
    insize (0),
72
    decoder (NULL),
73 74
    outpos (NULL),
    outsize (0),
75
    encoder (NULL),
76
    metadata (NULL),
Martin Hurton's avatar
Martin Hurton committed
77
    handshaking (true),
78
    greeting_size (v2_greeting_size),
Martin Hurton's avatar
Martin Hurton committed
79
    greeting_bytes_read (0),
80 81
    session (NULL),
    options (options_),
82
    endpoint (endpoint_),
83
    plugged (false),
Martin Hurton's avatar
Martin Hurton committed
84 85
    next_msg (&stream_engine_t::identity_msg),
    process_msg (&stream_engine_t::process_identity_msg),
86 87
    io_error (false),
    subscription_required (false),
88
    mechanism (NULL),
89 90
    input_stopped (false),
    output_stopped (false),
91
    has_handshake_timer (false),
Jonathan Reams's avatar
Jonathan Reams committed
92 93 94
    has_ttl_timer (false),
    has_timeout_timer (false),
    has_heartbeat_timer (false),
95
    heartbeat_timeout (0),
96
    socket (NULL)
97
{
98 99
    int rc = tx_msg.init ();
    errno_assert (rc == 0);
Chris Laws's avatar
Chris Laws committed
100

Martin Hurton's avatar
Martin Hurton committed
101
    //  Put the socket into non-blocking mode.
102
    unblock_socket (s);
103

104 105
    int family = get_peer_ip_address (s, peer_address);
    if (family == 0)
106
        peer_address.clear();
107
#if defined ZMQ_HAVE_SO_PEERCRED
108 109
    else
    if (family == PF_UNIX) {
110 111 112 113 114 115 116 117 118
        struct ucred cred;
        socklen_t size = sizeof (cred);
        if (!getsockopt (s, SOL_SOCKET, SO_PEERCRED, &cred, &size)) {
            std::ostringstream buf;
            buf << ":" << cred.uid << ":" << cred.gid << ":" << cred.pid;
            peer_address += buf.str ();
        }
    }
#elif defined ZMQ_HAVE_LOCAL_PEERCRED
119 120
    else
    if (family == PF_UNIX) {
121 122 123 124 125 126 127 128 129 130 131 132 133
        struct xucred cred;
        socklen_t size = sizeof (cred);
        if (!getsockopt (s, 0, LOCAL_PEERCRED, &cred, &size)
                && cred.cr_version == XUCRED_VERSION) {
            std::ostringstream buf;
            buf << ":" << cred.cr_uid << ":";
            if (cred.cr_ngroups > 0)
                buf << cred.cr_groups[0];
            buf << ":";
            peer_address += buf.str ();
        }
    }
#endif
134

135 136 137 138 139
    if(options.heartbeat_interval > 0) {
        heartbeat_timeout = options.heartbeat_timeout;
        if(heartbeat_timeout == -1)
            heartbeat_timeout = options.heartbeat_interval;
    }
140 141
}

142
zmq::stream_engine_t::~stream_engine_t ()
143 144 145
{
    zmq_assert (!plugged);

146 147
    if (s != retired_fd) {
#ifdef ZMQ_HAVE_WINDOWS
148 149
        int rc = closesocket (s);
        wsa_assert (rc != SOCKET_ERROR);
150
#else
151
        int rc = close (s);
152 153 154 155 156 157
#ifdef __FreeBSD_kernel__
        // FreeBSD may return ECONNRESET on close() under load but this is not
        // an error.
        if (rc == -1 && errno == ECONNRESET)
            rc = 0;
#endif
158 159
        errno_assert (rc == 0);
#endif
160
        s = retired_fd;
161
    }
162

163 164 165
    int rc = tx_msg.close ();
    errno_assert (rc == 0);

166 167
    //  Drop reference to metadata and destroy it if we are
    //  the only user.
168 169 170 171 172
    if (metadata != NULL) {
        if (metadata->drop_ref ()) {
            LIBZMQ_DELETE(metadata);
        }
    }
173

174 175 176
    LIBZMQ_DELETE(encoder);
    LIBZMQ_DELETE(decoder);
    LIBZMQ_DELETE(mechanism);
177 178
}

179 180
void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
    session_base_t *session_)
181 182 183 184 185 186 187 188
{
    zmq_assert (!plugged);
    plugged = true;

    //  Connect to session object.
    zmq_assert (!session);
    zmq_assert (session_);
    session = session_;
189
    socket = session-> get_socket ();
190 191 192 193

    //  Connect to I/O threads poller object.
    io_object_t::plug (io_thread_);
    handle = add_fd (s);
194
    io_error = false;
Martin Hurton's avatar
Martin Hurton committed
195

196
    if (options.raw_socket) {
197
        // no handshaking for raw sock, instantiate raw encoder and decoders
198
        encoder = new (std::nothrow) raw_encoder_t (out_batch_size);
199 200
        alloc_assert (encoder);

201
        decoder = new (std::nothrow) raw_decoder_t (in_batch_size);
202 203 204 205
        alloc_assert (decoder);

        // disable handshaking for raw socket
        handshaking = false;
206

Martin Hurton's avatar
Martin Hurton committed
207
        next_msg = &stream_engine_t::pull_msg_from_session;
208 209
        process_msg = &stream_engine_t::push_raw_msg_to_session;

Thomas Rodgers's avatar
Thomas Rodgers committed
210 211
        properties_t properties;
        if (init_properties(properties)) {
212 213 214
            //  Compile metadata.
            zmq_assert (metadata == NULL);
            metadata = new (std::nothrow) metadata_t (properties);
215
            alloc_assert (metadata);
216
        }
217

218 219 220 221 222
        if (options.raw_notify) {
            //  For raw sockets, send an initial 0-length message to the
            // application so that it knows a peer has connected.
            msg_t connector;
            connector.init();
223
            push_raw_msg_to_session (&connector);
224 225 226
            connector.close();
            session->flush ();
        }
Martin Hurton's avatar
Martin Hurton committed
227 228
    }
    else {
229 230 231
        // start optional timer, to prevent handshake hanging on no input
        set_handshake_timer ();

232 233
        //  Send the 'length' and 'flags' fields of the identity message.
        //  The 'length' field is encoded in the long format.
Pieter Hintjens's avatar
Pieter Hintjens committed
234
        outpos = greeting_send;
235 236 237 238 239
        outpos [outsize++] = 0xff;
        put_uint64 (&outpos [outsize], options.identity_size + 1);
        outsize += 8;
        outpos [outsize++] = 0x7f;
    }
Martin Hurton's avatar
Martin Hurton committed
240

241 242 243 244 245 246
    set_pollin (handle);
    set_pollout (handle);
    //  Flush all the data that may have been already received downstream.
    in_event ();
}

247
void zmq::stream_engine_t::unplug ()
248 249 250 251
{
    zmq_assert (plugged);
    plugged = false;

252 253 254 255 256 257
    //  Cancel all timers.
    if (has_handshake_timer) {
        cancel_timer (handshake_timer_id);
        has_handshake_timer = false;
    }

Jonathan Reams's avatar
Jonathan Reams committed
258 259 260 261 262 263 264 265 266 267 268 269 270 271
    if (has_ttl_timer) {
        cancel_timer (heartbeat_ttl_timer_id);
        has_ttl_timer = false;
    }

    if (has_timeout_timer) {
        cancel_timer (heartbeat_timeout_timer_id);
        has_timeout_timer = false;
    }

    if (has_heartbeat_timer) {
        cancel_timer (heartbeat_ivl_timer_id);
        has_heartbeat_timer = false;
    }
272
    //  Cancel all fd subscriptions.
273
    if (!io_error)
Martin Hurton's avatar
Martin Hurton committed
274
        rm_fd (handle);
275 276 277 278 279 280 281

    //  Disconnect from I/O threads poller object.
    io_object_t::unplug ();

    session = NULL;
}

282
void zmq::stream_engine_t::terminate ()
283 284 285 286 287
{
    unplug ();
    delete this;
}

288
void zmq::stream_engine_t::in_event ()
289
{
290
    zmq_assert (!io_error);
291

Pieter Hintjens's avatar
Pieter Hintjens committed
292
    //  If still handshaking, receive and process the greeting message.
Martin Hurton's avatar
Martin Hurton committed
293 294 295 296
    if (unlikely (handshaking))
        if (!handshake ())
            return;

297
    zmq_assert (decoder);
298 299

    //  If there has been an I/O error, stop polling.
300
    if (input_stopped) {
301 302 303 304
        rm_fd (handle);
        io_error = true;
        return;
    }
305 306 307 308 309 310 311 312

    //  If there's no data to process in the buffer...
    if (!insize) {

        //  Retrieve the buffer and read as much data as possible.
        //  Note that buffer can be arbitrarily large. However, we assume
        //  the underlying TCP layer has fixed buffer size and thus the
        //  number of bytes read will be always limited.
313 314
        size_t bufsize = 0;
        decoder->get_buffer (&inpos, &bufsize);
315

316
        const int rc = tcp_read (s, inpos, bufsize);
317

318
        if (rc == 0) {
319
            error (connection_error);
320
            return;
321
        }
322 323
        if (rc == -1) {
            if (errno != EAGAIN)
324
                error (connection_error);
325 326
            return;
        }
327

328
        //  Adjust input size
329
        insize = static_cast <size_t> (rc);
330 331
        // Adjust buffer size to received bytes
        decoder->resize_buffer(insize);
Martin Hurton's avatar
Martin Hurton committed
332
    }
333

334 335
    int rc = 0;
    size_t processed = 0;
336

337 338 339
    while (insize > 0) {
        rc = decoder->decode (inpos, insize, processed);
        zmq_assert (processed <= insize);
340 341
        inpos += processed;
        insize -= processed;
342 343
        if (rc == 0 || rc == -1)
            break;
Martin Hurton's avatar
Martin Hurton committed
344
        rc = (this->*process_msg) (decoder->msg ());
345 346
        if (rc == -1)
            break;
347 348
    }

349 350 351 352
    //  Tear down the connection if we have failed to decode input data
    //  or the session has rejected the message.
    if (rc == -1) {
        if (errno != EAGAIN) {
353
            error(protocol_error);
354 355
            return;
        }
356
        input_stopped = true;
357
        reset_pollin (handle);
Martin Hurton's avatar
Martin Hurton committed
358
    }
359 360

    session->flush ();
361 362
}

363
void zmq::stream_engine_t::out_event ()
364
{
365 366
    zmq_assert (!io_error);

367 368 369
    //  If write buffer is empty, try to read new data from the encoder.
    if (!outsize) {

Martin Hurton's avatar
Martin Hurton committed
370 371 372 373 374 375 376 377
        //  Even when we stop polling as soon as there is no
        //  data to send, the poller may invoke out_event one
        //  more time due to 'speculative write' optimisation.
        if (unlikely (encoder == NULL)) {
            zmq_assert (handshaking);
            return;
        }

378
        outpos = NULL;
379 380
        outsize = encoder->encode (&outpos, 0);

381
        while (outsize < (size_t) out_batch_size) {
Martin Hurton's avatar
Martin Hurton committed
382
            if ((this->*next_msg) (&tx_msg) == -1)
383 384 385
                break;
            encoder->load_msg (&tx_msg);
            unsigned char *bufptr = outpos + outsize;
386
            size_t n = encoder->encode (&bufptr, out_batch_size - outsize);
387 388 389 390 391
            zmq_assert (n > 0);
            if (outpos == NULL)
                outpos = bufptr;
            outsize += n;
        }
392 393 394

        //  If there is no data to send, stop polling for output.
        if (outsize == 0) {
395
            output_stopped = true;
396 397 398 399 400 401 402
            reset_pollout (handle);
            return;
        }
    }

    //  If there are any data to write in write buffer, write as much as
    //  possible to the socket. Note that amount of data to write can be
403
    //  arbitrarily large. However, we assume that underlying TCP layer has
404 405
    //  limited transmission buffer and thus the actual number of bytes
    //  written should be reasonably modest.
406
    const int nbytes = tcp_write (s, outpos, outsize);
407

Martin Hurton's avatar
Martin Hurton committed
408 409
    //  IO error has occurred. We stop waiting for output events.
    //  The engine is not terminated until we detect input error;
410
    //  this is necessary to prevent losing incoming messages.
411
    if (nbytes == -1) {
Martin Hurton's avatar
Martin Hurton committed
412
        reset_pollout (handle);
413 414 415 416 417
        return;
    }

    outpos += nbytes;
    outsize -= nbytes;
Martin Hurton's avatar
Martin Hurton committed
418 419 420 421 422 423

    //  If we are still handshaking and there are no data
    //  to send, stop polling for output.
    if (unlikely (handshaking))
        if (outsize == 0)
            reset_pollout (handle);
424 425
}

426
void zmq::stream_engine_t::restart_output ()
427
{
428 429 430
    if (unlikely (io_error))
        return;

431
    if (likely (output_stopped)) {
432
        set_pollout (handle);
433
        output_stopped = false;
434
    }
435 436 437 438 439 440 441 442

    //  Speculative write: The assumption is that at the moment new message
    //  was sent by the user the socket is probably available for writing.
    //  Thus we try to write the data to socket avoiding polling for POLLOUT.
    //  Consequently, the latency should be better in request/reply scenarios.
    out_event ();
}

443
void zmq::stream_engine_t::restart_input ()
444
{
445
    zmq_assert (input_stopped);
446 447 448
    zmq_assert (session != NULL);
    zmq_assert (decoder != NULL);

Martin Hurton's avatar
Martin Hurton committed
449
    int rc = (this->*process_msg) (decoder->msg ());
450 451 452 453
    if (rc == -1) {
        if (errno == EAGAIN)
            session->flush ();
        else
454
            error (protocol_error);
Martin Hurton's avatar
Martin Hurton committed
455 456 457
        return;
    }

458 459 460 461 462 463 464 465
    while (insize > 0) {
        size_t processed = 0;
        rc = decoder->decode (inpos, insize, processed);
        zmq_assert (processed <= insize);
        inpos += processed;
        insize -= processed;
        if (rc == 0 || rc == -1)
            break;
Martin Hurton's avatar
Martin Hurton committed
466
        rc = (this->*process_msg) (decoder->msg ());
467 468 469
        if (rc == -1)
            break;
    }
470

471 472 473
    if (rc == -1 && errno == EAGAIN)
        session->flush ();
    else
474 475 476 477 478
    if (io_error)
        error (connection_error);
    else
    if (rc == -1)
        error (protocol_error);
479
    else {
480
        input_stopped = false;
481 482 483 484 485 486
        set_pollin (handle);
        session->flush ();

        //  Speculative read.
        in_event ();
    }
487 488
}

489
bool zmq::stream_engine_t::handshake ()
Martin Hurton's avatar
Martin Hurton committed
490
{
491
    zmq_assert (handshaking);
Martin Hurton's avatar
Martin Hurton committed
492
    zmq_assert (greeting_bytes_read < greeting_size);
493
    //  Receive the greeting.
Martin Hurton's avatar
Martin Hurton committed
494
    while (greeting_bytes_read < greeting_size) {
495 496
        const int n = tcp_read (s, greeting_recv + greeting_bytes_read,
                                greeting_size - greeting_bytes_read);
497
        if (n == 0) {
498
            error (connection_error);
499 500
            return false;
        }
501 502
        if (n == -1) {
            if (errno != EAGAIN)
503
                error (connection_error);
504
            return false;
505
        }
Martin Hurton's avatar
Martin Hurton committed
506 507 508

        greeting_bytes_read += n;

509 510 511
        //  We have received at least one byte from the peer.
        //  If the first byte is not 0xff, we know that the
        //  peer is using unversioned protocol.
Pieter Hintjens's avatar
Pieter Hintjens committed
512
        if (greeting_recv [0] != 0xff)
513
            break;
Martin Hurton's avatar
Martin Hurton committed
514

515
        if (greeting_bytes_read < signature_size)
516
            continue;
Martin Hurton's avatar
Martin Hurton committed
517

518 519 520 521
        //  Inspect the right-most bit of the 10th byte (which coincides
        //  with the 'flags' field if a regular message was sent).
        //  Zero indicates this is a header of identity message
        //  (i.e. the peer is using the unversioned protocol).
Pieter Hintjens's avatar
Pieter Hintjens committed
522
        if (!(greeting_recv [9] & 0x01))
523
            break;
Martin Hurton's avatar
Martin Hurton committed
524

525
        //  The peer is using versioned protocol.
526 527
        //  Send the major version number.
        if (outpos + outsize == greeting_send + signature_size) {
Martin Hurton's avatar
Martin Hurton committed
528 529
            if (outsize == 0)
                set_pollout (handle);
530 531 532 533 534 535 536 537 538 539 540 541 542 543 544
            outpos [outsize++] = 3;     //  Major version number
        }

        if (greeting_bytes_read > signature_size) {
            if (outpos + outsize == greeting_send + signature_size + 1) {
                if (outsize == 0)
                    set_pollout (handle);

                //  Use ZMTP/2.0 to talk to older peers.
                if (greeting_recv [10] == ZMTP_1_0
                ||  greeting_recv [10] == ZMTP_2_0)
                    outpos [outsize++] = options.type;
                else {
                    outpos [outsize++] = 0; //  Minor version number
                    memset (outpos + outsize, 0, 20);
545 546 547

                    zmq_assert (options.mechanism == ZMQ_NULL
                            ||  options.mechanism == ZMQ_PLAIN
548 549
                            ||  options.mechanism == ZMQ_CURVE
                            ||  options.mechanism == ZMQ_GSSAPI);
550

551 552 553
                    if (options.mechanism == ZMQ_NULL)
                        memcpy (outpos + outsize, "NULL", 4);
                    else
554
                    if (options.mechanism == ZMQ_PLAIN)
555
                        memcpy (outpos + outsize, "PLAIN", 5);
556
                    else
557 558
                    if (options.mechanism == ZMQ_GSSAPI)
                        memcpy (outpos + outsize, "GSSAPI", 6);
559
                    else
560
                    if (options.mechanism == ZMQ_CURVE)
561
                        memcpy (outpos + outsize, "CURVE", 5);
562 563 564 565 566 567
                    outsize += 20;
                    memset (outpos + outsize, 0, 32);
                    outsize += 32;
                    greeting_size = v3_greeting_size;
                }
            }
Martin Hurton's avatar
Martin Hurton committed
568 569 570
        }
    }

571 572
    //  Position of the revision field in the greeting.
    const size_t revision_pos = 10;
573

574 575
    //  Is the peer using ZMTP/1.0 with no revision number?
    //  If so, we send and receive rest of identity message
Pieter Hintjens's avatar
Pieter Hintjens committed
576
    if (greeting_recv [0] != 0xff || !(greeting_recv [9] & 0x01)) {
Min RK's avatar
Min RK committed
577
        if (session->zap_enabled ()) {
578 579 580 581 582
           // reject ZMTP 1.0 connections if ZAP is enabled
           error (protocol_error);
           return false;
        }

583
        encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
584 585
        alloc_assert (encoder);

586
        decoder = new (std::nothrow) v1_decoder_t (in_batch_size, options.maxmsgsize);
587 588
        alloc_assert (decoder);

Martin Hurton's avatar
Martin Hurton committed
589 590 591 592 593 594
        //  We have already sent the message header.
        //  Since there is no way to tell the encoder to
        //  skip the message header, we simply throw that
        //  header data away.
        const size_t header_size = options.identity_size + 1 >= 255 ? 10 : 2;
        unsigned char tmp [10], *bufferp = tmp;
595 596 597 598 599 600 601

        //  Prepare the identity message and load it into encoder.
        //  Then consume bytes we have already sent to the peer.
        const int rc = tx_msg.init_size (options.identity_size);
        zmq_assert (rc == 0);
        memcpy (tx_msg.data (), options.identity, options.identity_size);
        encoder->load_msg (&tx_msg);
602
        size_t buffer_size = encoder->encode (&bufferp, header_size);
Martin Hurton's avatar
Martin Hurton committed
603 604 605
        zmq_assert (buffer_size == header_size);

        //  Make sure the decoder sees the data we have already received.
Pieter Hintjens's avatar
Pieter Hintjens committed
606
        inpos = greeting_recv;
Martin Hurton's avatar
Martin Hurton committed
607
        insize = greeting_bytes_read;
608 609

        //  To allow for interoperability with peers that do not forward
610 611
        //  their subscriptions, we inject a phantom subscription message
        //  message into the incoming message stream.
612
        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB)
613
            subscription_required = true;
Martin Hurton's avatar
Martin Hurton committed
614 615 616

        //  We are sending our identity now and the next message
        //  will come from the socket.
Martin Hurton's avatar
Martin Hurton committed
617
        next_msg = &stream_engine_t::pull_msg_from_session;
Martin Hurton's avatar
Martin Hurton committed
618 619

        //  We are expecting identity message.
Martin Hurton's avatar
Martin Hurton committed
620
        process_msg = &stream_engine_t::process_identity_msg;
Martin Hurton's avatar
Martin Hurton committed
621
    }
622
    else
Pieter Hintjens's avatar
Pieter Hintjens committed
623
    if (greeting_recv [revision_pos] == ZMTP_1_0) {
Min RK's avatar
Min RK committed
624
        if (session->zap_enabled ()) {
625 626 627 628 629
           // reject ZMTP 1.0 connections if ZAP is enabled
           error (protocol_error);
           return false;
        }

630
        encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
631 632
        alloc_assert (encoder);

633
        decoder = new (std::nothrow) v1_decoder_t (
634
            in_batch_size, options.maxmsgsize);
635 636
        alloc_assert (decoder);
    }
637 638
    else
    if (greeting_recv [revision_pos] == ZMTP_2_0) {
Min RK's avatar
Min RK committed
639
        if (session->zap_enabled ()) {
640 641 642 643 644
           // reject ZMTP 2.0 connections if ZAP is enabled
           error (protocol_error);
           return false;
        }

645
        encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
646 647 648
        alloc_assert (encoder);

        decoder = new (std::nothrow) v2_decoder_t (
649
            in_batch_size, options.maxmsgsize);
650 651
        alloc_assert (decoder);
    }
652
    else {
653
        encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
654 655
        alloc_assert (encoder);

656
        decoder = new (std::nothrow) v2_decoder_t (
657
                in_batch_size, options.maxmsgsize);
658
        alloc_assert (decoder);
659

660 661
        if (options.mechanism == ZMQ_NULL
        &&  memcmp (greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
662 663
            mechanism = new (std::nothrow)
                null_mechanism_t (session, peer_address, options);
664
            alloc_assert (mechanism);
665 666
        }
        else
667 668
        if (options.mechanism == ZMQ_PLAIN
        &&  memcmp (greeting_recv + 12, "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
669 670 671 672 673 674
            if (options.as_server)
                mechanism = new (std::nothrow)
                    plain_server_t (session, peer_address, options);
            else
                mechanism = new (std::nothrow)
                    plain_client_t (options);
675
            alloc_assert (mechanism);
676
        }
677
#ifdef ZMQ_HAVE_CURVE
678
        else
679 680
        if (options.mechanism == ZMQ_CURVE
        &&  memcmp (greeting_recv + 12, "CURVE\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
681
            if (options.as_server)
682 683
                mechanism = new (std::nothrow)
                    curve_server_t (session, peer_address, options);
684 685 686 687 688
            else
                mechanism = new (std::nothrow) curve_client_t (options);
            alloc_assert (mechanism);
        }
#endif
Chris Laws's avatar
Chris Laws committed
689
#ifdef HAVE_LIBGSSAPI_KRB5
690
        else
691 692
        if (options.mechanism == ZMQ_GSSAPI
        &&  memcmp (greeting_recv + 12, "GSSAPI\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
693 694 695 696 697
            if (options.as_server)
                mechanism = new (std::nothrow)
                    gssapi_server_t (session, peer_address, options);
            else
                mechanism = new (std::nothrow) gssapi_client_t (options);
698 699
            alloc_assert (mechanism);
        }
700
#endif
701
        else {
702
            error (protocol_error);
703 704
            return false;
        }
Martin Hurton's avatar
Martin Hurton committed
705 706
        next_msg = &stream_engine_t::next_handshake_command;
        process_msg = &stream_engine_t::process_handshake_command;
707
    }
Martin Hurton's avatar
Martin Hurton committed
708 709 710 711 712 713 714 715 716

    // Start polling for output if necessary.
    if (outsize == 0)
        set_pollout (handle);

    //  Handshaking was successful.
    //  Switch into the normal message flow.
    handshaking = false;

717 718 719 720 721
    if (has_handshake_timer) {
        cancel_timer (handshake_timer_id);
        has_handshake_timer = false;
    }

Martin Hurton's avatar
Martin Hurton committed
722 723 724
    return true;
}

Martin Hurton's avatar
Martin Hurton committed
725
int zmq::stream_engine_t::identity_msg (msg_t *msg_)
726
{
727 728 729 730
    int rc = msg_->init_size (options.identity_size);
    errno_assert (rc == 0);
    if (options.identity_size > 0)
        memcpy (msg_->data (), options.identity, options.identity_size);
Martin Hurton's avatar
Martin Hurton committed
731
    next_msg = &stream_engine_t::pull_msg_from_session;
732 733
    return 0;
}
734

Martin Hurton's avatar
Martin Hurton committed
735
int zmq::stream_engine_t::process_identity_msg (msg_t *msg_)
736 737 738 739
{
    if (options.recv_identity) {
        msg_->set_flags (msg_t::identity);
        int rc = session->push_msg (msg_);
740 741
        errno_assert (rc == 0);
    }
742 743 744 745 746 747 748
    else {
        int rc = msg_->close ();
        errno_assert (rc == 0);
        rc = msg_->init ();
        errno_assert (rc == 0);
    }

749 750 751 752 753 754 755 756 757 758 759 760 761
    if (subscription_required) {
        msg_t subscription;

        //  Inject the subscription message, so that also
        //  ZMQ 2.x peers receive published messages.
        int rc = subscription.init_size (1);
        errno_assert (rc == 0);
        *(unsigned char*) subscription.data () = 1;
        rc = session->push_msg (&subscription);
        errno_assert (rc == 0);
    }

    process_msg = &stream_engine_t::push_msg_to_session;
762

763 764
    return 0;
}
765

766
int zmq::stream_engine_t::next_handshake_command (msg_t *msg_)
767
{
768 769
    zmq_assert (mechanism != NULL);

770 771 772 773 774 775 776 777 778 779 780
    if (mechanism->status () == mechanism_t::ready) {
        mechanism_ready ();
        return pull_and_encode (msg_);
    }
    else
    if (mechanism->status () == mechanism_t::error) {
        errno = EPROTO;
        return -1;
    }
    else {
        const int rc = mechanism->next_handshake_command (msg_);
Vincent Tellier's avatar
Vincent Tellier committed
781

782 783
        if (rc == 0)
            msg_->set_flags (msg_t::command);
784
#ifdef ZMQ_BUILD_DRAFT_API
785 786
        if(mechanism->status() == mechanism_t::error)
            socket->event_handshake_failed(endpoint, 0);
787
#endif
Vincent Tellier's avatar
Vincent Tellier committed
788

789
        return rc;
790
    }
791 792
}

793
int zmq::stream_engine_t::process_handshake_command (msg_t *msg_)
794
{
795
    zmq_assert (mechanism != NULL);
796
    const int rc = mechanism->process_handshake_command (msg_);
797
    if (rc == 0) {
798
        if (mechanism->status () == mechanism_t::ready)
799
            mechanism_ready ();
800 801 802 803 804
        else
        if (mechanism->status () == mechanism_t::error) {
            errno = EPROTO;
            return -1;
        }
805 806
        if (output_stopped)
            restart_output ();
807 808
    }

809
    return rc;
810 811
}

812 813 814 815 816 817
void zmq::stream_engine_t::zap_msg_available ()
{
    zmq_assert (mechanism != NULL);

    const int rc = mechanism->zap_msg_available ();
    if (rc == -1) {
818
        error (protocol_error);
819 820
        return;
    }
821 822 823 824
    if (input_stopped)
        restart_input ();
    if (output_stopped)
        restart_output ();
825 826
}

827
void zmq::stream_engine_t::mechanism_ready ()
828
{
829 830 831 832 833
    if (options.heartbeat_interval > 0) {
        add_timer(options.heartbeat_interval, heartbeat_ivl_timer_id);
        has_heartbeat_timer = true;
    }

834 835 836 837
    if (options.recv_identity) {
        msg_t identity;
        mechanism->peer_identity (&identity);
        const int rc = session->push_msg (&identity);
838 839 840 841 842 843
        if (rc == -1 && errno == EAGAIN) {
            // If the write is failing at this stage with
            // an EAGAIN the pipe must be being shut down,
            // so we can just bail out of the identity set.
            return;
        }
844
        errno_assert (rc == 0);
845
        session->flush ();
846
    }
847

Martin Hurton's avatar
Martin Hurton committed
848 849
    next_msg = &stream_engine_t::pull_and_encode;
    process_msg = &stream_engine_t::write_credential;
850 851 852

    //  Compile metadata.
    properties_t properties;
Thomas Rodgers's avatar
Thomas Rodgers committed
853
    init_properties(properties);
854

Pieter Hintjens's avatar
Pieter Hintjens committed
855 856
    //  Add ZAP properties.
    const properties_t& zap_properties = mechanism->get_zap_properties ();
Thomas Rodgers's avatar
Thomas Rodgers committed
857
    properties.insert(zap_properties.begin (), zap_properties.end ());
Pieter Hintjens's avatar
Pieter Hintjens committed
858

859 860
    //  Add ZMTP properties.
    const properties_t& zmtp_properties = mechanism->get_zmtp_properties ();
Thomas Rodgers's avatar
Thomas Rodgers committed
861
    properties.insert(zmtp_properties.begin (), zmtp_properties.end ());
862

863 864
    zmq_assert (metadata == NULL);
    if (!properties.empty ())
865
    {
866
        metadata = new (std::nothrow) metadata_t (properties);
867 868
        alloc_assert (metadata);
    }
Vincent Tellier's avatar
Vincent Tellier committed
869

870
#ifdef ZMQ_BUILD_DRAFT_API
Vincent Tellier's avatar
Vincent Tellier committed
871
    socket->event_handshake_succeed(endpoint, 0);
872
#endif
873 874
}

875
int zmq::stream_engine_t::pull_msg_from_session (msg_t *msg_)
876
{
877 878
    return session->pull_msg (msg_);
}
879

880 881 882 883 884
int zmq::stream_engine_t::push_msg_to_session (msg_t *msg_)
{
    return session->push_msg (msg_);
}

885
int zmq::stream_engine_t::push_raw_msg_to_session (msg_t *msg_) {
886
    if (metadata && metadata != msg_->metadata())
887 888 889 890
        msg_->set_metadata(metadata);
    return push_msg_to_session(msg_);
}

891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909
int zmq::stream_engine_t::write_credential (msg_t *msg_)
{
    zmq_assert (mechanism != NULL);
    zmq_assert (session != NULL);

    const blob_t credential = mechanism->get_user_id ();
    if (credential.size () > 0) {
        msg_t msg;
        int rc = msg.init_size (credential.size ());
        zmq_assert (rc == 0);
        memcpy (msg.data (), credential.data (), credential.size ());
        msg.set_flags (msg_t::credential);
        rc = session->push_msg (&msg);
        if (rc == -1) {
            rc = msg.close ();
            errno_assert (rc == 0);
            return -1;
        }
    }
Martin Hurton's avatar
Martin Hurton committed
910
    process_msg = &stream_engine_t::decode_and_push;
911 912 913
    return decode_and_push (msg_);
}

914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930
int zmq::stream_engine_t::pull_and_encode (msg_t *msg_)
{
    zmq_assert (mechanism != NULL);

    if (session->pull_msg (msg_) == -1)
        return -1;
    if (mechanism->encode (msg_) == -1)
        return -1;
    return 0;
}

int zmq::stream_engine_t::decode_and_push (msg_t *msg_)
{
    zmq_assert (mechanism != NULL);

    if (mechanism->decode (msg_) == -1)
        return -1;
Jonathan Reams's avatar
Jonathan Reams committed
931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947

    if(has_timeout_timer) {
        has_timeout_timer = false;
        cancel_timer(heartbeat_timeout_timer_id);
    }

    if(has_ttl_timer) {
        has_ttl_timer = false;
        cancel_timer(heartbeat_ttl_timer_id);
    }

    if(msg_->flags() & msg_t::command) {
        uint8_t cmd_id = *((uint8_t*)msg_->data());
        if(cmd_id == 4)
            process_heartbeat_message(msg_);
    }

948
    if (metadata)
949
        msg_->set_metadata (metadata);
950 951
    if (session->push_msg (msg_) == -1) {
        if (errno == EAGAIN)
Martin Hurton's avatar
Martin Hurton committed
952
            process_msg = &stream_engine_t::push_one_then_decode_and_push;
953 954 955 956 957 958 959 960 961
        return -1;
    }
    return 0;
}

int zmq::stream_engine_t::push_one_then_decode_and_push (msg_t *msg_)
{
    const int rc = session->push_msg (msg_);
    if (rc == 0)
Martin Hurton's avatar
Martin Hurton committed
962
        process_msg = &stream_engine_t::decode_and_push;
963 964 965
    return rc;
}

966
void zmq::stream_engine_t::error (error_reason_t reason)
967
{
968
    if (options.raw_socket && options.raw_notify) {
969 970 971 972
        //  For raw sockets, send a final 0-length message to the application
        //  so that it knows the peer has been disconnected.
        msg_t terminator;
        terminator.init();
Martin Hurton's avatar
Martin Hurton committed
973
        (this->*process_msg) (&terminator);
974 975
        terminator.close();
    }
976
    zmq_assert (session);
977
#ifdef ZMQ_BUILD_DRAFT_API
978
    if(mechanism == NULL || mechanism->status() == mechanism_t::handshaking)
Vincent Tellier's avatar
Vincent Tellier committed
979
        socket->event_handshake_failed(endpoint, (int) s);
980
#endif
981
    socket->event_disconnected (endpoint, (int) s);
982
    session->flush ();
983
    session->engine_error (reason);
984 985 986 987
    unplug ();
    delete this;
}

988
void zmq::stream_engine_t::set_handshake_timer ()
989
{
990
    zmq_assert (!has_handshake_timer);
991

992
    if (!options.raw_socket && options.handshake_ivl > 0) {
993 994
        add_timer (options.handshake_ivl, handshake_timer_id);
        has_handshake_timer = true;
995
    }
996 997
}

Thomas Rodgers's avatar
Thomas Rodgers committed
998 999 1000
bool zmq::stream_engine_t::init_properties (properties_t & properties) {
    if (peer_address.empty()) return false;
    properties.insert (std::make_pair("Peer-Address", peer_address));
1001 1002

    //  Private property to support deprecated SRCFD
1003 1004 1005 1006
    std::ostringstream stream;
    stream << (int)s;
    std::string fd_string = stream.str();
    properties.insert(std::make_pair("__fd", fd_string));
Thomas Rodgers's avatar
Thomas Rodgers committed
1007 1008 1009
    return true;
}

1010
void zmq::stream_engine_t::timer_event (int id_)
1011
{
Jonathan Reams's avatar
Jonathan Reams committed
1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040
    if(id_ == handshake_timer_id) {
        has_handshake_timer = false;
        //  handshake timer expired before handshake completed, so engine fail
        error (timeout_error);
    }
    else if(id_ == heartbeat_ivl_timer_id) {
        next_msg = &stream_engine_t::produce_ping_message;
        out_event();
        add_timer(options.heartbeat_interval, heartbeat_ivl_timer_id);
    }
    else if(id_ == heartbeat_ttl_timer_id) {
        has_ttl_timer = false;
        error(timeout_error);
    }
    else if(id_ == heartbeat_timeout_timer_id) {
        has_timeout_timer = false;
        error(timeout_error);
    }
    else
        // There are no other valid timer ids!
        assert(false);
}

int zmq::stream_engine_t::produce_ping_message(msg_t * msg_)
{
    int rc = 0;
    zmq_assert (mechanism != NULL);

    // 16-bit TTL + \4PING == 7
1041
    rc = msg_->init_size(7);
1042 1043
    errno_assert(rc == 0);
    msg_->set_flags(msg_t::command);
Jonathan Reams's avatar
Jonathan Reams committed
1044 1045 1046 1047 1048 1049 1050 1051
    // Copy in the command message
    memcpy(msg_->data(), "\4PING", 5);

    uint16_t ttl_val = htons(options.heartbeat_ttl);
    memcpy(((uint8_t*)msg_->data()) + 5, &ttl_val, sizeof(ttl_val));

    rc = mechanism->encode (msg_);
    next_msg = &stream_engine_t::pull_and_encode;
1052 1053
    if(!has_timeout_timer && heartbeat_timeout > 0) {
        add_timer(heartbeat_timeout, heartbeat_timeout_timer_id);
Jonathan Reams's avatar
Jonathan Reams committed
1054 1055 1056 1057 1058 1059 1060 1061 1062 1063
        has_timeout_timer = true;
    }
    return rc;
}

int zmq::stream_engine_t::produce_pong_message(msg_t * msg_)
{
    int rc = 0;
    zmq_assert (mechanism != NULL);

1064
    rc = msg_->init_size(5);
1065 1066
    errno_assert(rc == 0);
    msg_->set_flags(msg_t::command);
Jonathan Reams's avatar
Jonathan Reams committed
1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082

    memcpy(msg_->data(), "\4PONG", 5);

    rc = mechanism->encode (msg_);
    next_msg = &stream_engine_t::pull_and_encode;
    return rc;
}

int zmq::stream_engine_t::process_heartbeat_message(msg_t * msg_)
{
    if(memcmp(msg_->data(), "\4PING", 5) == 0) {
        uint16_t remote_heartbeat_ttl;
        // Get the remote heartbeat TTL to setup the timer
        memcpy(&remote_heartbeat_ttl, (uint8_t*)msg_->data() + 5, 2);
        remote_heartbeat_ttl = ntohs(remote_heartbeat_ttl);
        // The remote heartbeat is in 10ths of a second
1083 1084
        // so we multiply it by 100 to get the timer interval in ms.
        remote_heartbeat_ttl *= 100;
Jonathan Reams's avatar
Jonathan Reams committed
1085 1086 1087 1088 1089

        if(!has_ttl_timer && remote_heartbeat_ttl > 0) {
            add_timer(remote_heartbeat_ttl, heartbeat_ttl_timer_id);
            has_ttl_timer = true;
        }
1090

Jonathan Reams's avatar
Jonathan Reams committed
1091 1092 1093 1094 1095
        next_msg = &stream_engine_t::produce_pong_message;
        out_event();
    }

    return 0;
1096
}