stream_engine.cpp 37.8 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
32

33
#include <limits.h>
34
#include <string.h>
35 36

#ifndef ZMQ_HAVE_WINDOWS
37
#include <unistd.h>
38 39
#endif

40
#include <new>
41
#include <sstream>
42

43
#include "stream_engine.hpp"
44
#include "io_thread.hpp"
45
#include "session_base.hpp"
46 47
#include "v1_encoder.hpp"
#include "v1_decoder.hpp"
48 49
#include "v2_encoder.hpp"
#include "v2_decoder.hpp"
50
#include "null_mechanism.hpp"
51 52
#include "plain_client.hpp"
#include "plain_server.hpp"
53 54
#include "gssapi_client.hpp"
#include "gssapi_server.hpp"
55 56
#include "curve_client.hpp"
#include "curve_server.hpp"
57 58
#include "raw_decoder.hpp"
#include "raw_encoder.hpp"
59 60
#include "config.hpp"
#include "err.hpp"
61
#include "ip.hpp"
62
#include "tcp.hpp"
Martin Hurton's avatar
Martin Hurton committed
63 64
#include "likely.hpp"
#include "wire.hpp"
65

66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
static std::string get_peer_address (zmq::fd_t s_)
{
    std::string peer_address;

    const int family = zmq::get_peer_ip_address (s_, peer_address);
    if (family == 0)
        peer_address.clear ();
#if defined ZMQ_HAVE_SO_PEERCRED
    else if (family == PF_UNIX) {
        struct ucred cred;
        socklen_t size = sizeof (cred);
        if (!getsockopt (s_, SOL_SOCKET, SO_PEERCRED, &cred, &size)) {
            std::ostringstream buf;
            buf << ":" << cred.uid << ":" << cred.gid << ":" << cred.pid;
            peer_address += buf.str ();
        }
    }
#elif defined ZMQ_HAVE_LOCAL_PEERCRED
    else if (family == PF_UNIX) {
        struct xucred cred;
        socklen_t size = sizeof (cred);
        if (!getsockopt (_s, 0, LOCAL_PEERCRED, &cred, &size)
            && cred.cr_version == XUCRED_VERSION) {
            std::ostringstream buf;
            buf << ":" << cred.cr_uid << ":";
            if (cred.cr_ngroups > 0)
                buf << cred.cr_groups[0];
            buf << ":";
            _peer_address += buf.str ();
        }
    }
#endif

    return peer_address;
}


103 104 105 106
zmq::stream_engine_t::stream_engine_t (
  fd_t fd_,
  const options_t &options_,
  const endpoint_uri_pair_t &endpoint_uri_pair_) :
107 108 109 110 111 112 113 114 115 116 117 118 119 120
    _s (fd_),
    _handle (static_cast<handle_t> (NULL)),
    _inpos (NULL),
    _insize (0),
    _decoder (NULL),
    _outpos (NULL),
    _outsize (0),
    _encoder (NULL),
    _metadata (NULL),
    _handshaking (true),
    _greeting_size (v2_greeting_size),
    _greeting_bytes_read (0),
    _session (NULL),
    _options (options_),
121
    _endpoint_uri_pair (endpoint_uri_pair_),
122 123 124 125 126 127 128 129 130 131 132 133 134
    _plugged (false),
    _next_msg (&stream_engine_t::routing_id_msg),
    _process_msg (&stream_engine_t::process_routing_id_msg),
    _io_error (false),
    _subscription_required (false),
    _mechanism (NULL),
    _input_stopped (false),
    _output_stopped (false),
    _has_handshake_timer (false),
    _has_ttl_timer (false),
    _has_timeout_timer (false),
    _has_heartbeat_timer (false),
    _heartbeat_timeout (0),
135 136
    _socket (NULL),
    _peer_address (get_peer_address (_s))
137
{
138
    int rc = _tx_msg.init ();
139
    errno_assert (rc == 0);
140
    rc = _pong_msg.init ();
141
    errno_assert (rc == 0);
Chris Laws's avatar
Chris Laws committed
142

Martin Hurton's avatar
Martin Hurton committed
143
    //  Put the socket into non-blocking mode.
144
    unblock_socket (_s);
145

146

147 148 149 150
    if (_options.heartbeat_interval > 0) {
        _heartbeat_timeout = _options.heartbeat_timeout;
        if (_heartbeat_timeout == -1)
            _heartbeat_timeout = _options.heartbeat_interval;
151
    }
152 153
}

154
zmq::stream_engine_t::~stream_engine_t ()
155
{
156
    zmq_assert (!_plugged);
157

158
    if (_s != retired_fd) {
159
#ifdef ZMQ_HAVE_WINDOWS
160
        int rc = closesocket (_s);
161
        wsa_assert (rc != SOCKET_ERROR);
162
#else
163
        int rc = close (_s);
164
#if defined(__FreeBSD_kernel__) || defined(__FreeBSD__)
165 166 167 168 169
        // FreeBSD may return ECONNRESET on close() under load but this is not
        // an error.
        if (rc == -1 && errno == ECONNRESET)
            rc = 0;
#endif
170 171
        errno_assert (rc == 0);
#endif
172
        _s = retired_fd;
173
    }
174

175
    int rc = _tx_msg.close ();
176 177
    errno_assert (rc == 0);

178 179
    //  Drop reference to metadata and destroy it if we are
    //  the only user.
180 181 182
    if (_metadata != NULL) {
        if (_metadata->drop_ref ()) {
            LIBZMQ_DELETE (_metadata);
183 184
        }
    }
185

186 187 188
    LIBZMQ_DELETE (_encoder);
    LIBZMQ_DELETE (_decoder);
    LIBZMQ_DELETE (_mechanism);
189 190
}

191
void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
192
                                 session_base_t *session_)
193
{
194 195
    zmq_assert (!_plugged);
    _plugged = true;
196 197

    //  Connect to session object.
198
    zmq_assert (!_session);
199
    zmq_assert (session_);
200 201
    _session = session_;
    _socket = _session->get_socket ();
202 203 204

    //  Connect to I/O threads poller object.
    io_object_t::plug (io_thread_);
205 206
    _handle = add_fd (_s);
    _io_error = false;
Martin Hurton's avatar
Martin Hurton committed
207

208
    if (_options.raw_socket) {
209
        // no handshaking for raw sock, instantiate raw encoder and decoders
210 211
        _encoder = new (std::nothrow) raw_encoder_t (out_batch_size);
        alloc_assert (_encoder);
212

213 214
        _decoder = new (std::nothrow) raw_decoder_t (in_batch_size);
        alloc_assert (_decoder);
215 216

        // disable handshaking for raw socket
217
        _handshaking = false;
218

219 220
        _next_msg = &stream_engine_t::pull_msg_from_session;
        _process_msg = &stream_engine_t::push_raw_msg_to_session;
221

Thomas Rodgers's avatar
Thomas Rodgers committed
222
        properties_t properties;
223
        if (init_properties (properties)) {
224
            //  Compile metadata.
225 226 227
            zmq_assert (_metadata == NULL);
            _metadata = new (std::nothrow) metadata_t (properties);
            alloc_assert (_metadata);
228
        }
229

230
        if (_options.raw_notify) {
231 232 233
            //  For raw sockets, send an initial 0-length message to the
            // application so that it knows a peer has connected.
            msg_t connector;
234
            connector.init ();
235
            push_raw_msg_to_session (&connector);
236
            connector.close ();
237
            _session->flush ();
238
        }
239
    } else {
240 241 242
        // start optional timer, to prevent handshake hanging on no input
        set_handshake_timer ();

243
        //  Send the 'length' and 'flags' fields of the routing id message.
244
        //  The 'length' field is encoded in the long format.
245
        _outpos = _greeting_send;
246
        _outpos[_outsize++] = UCHAR_MAX;
247 248 249
        put_uint64 (&_outpos[_outsize], _options.routing_id_size + 1);
        _outsize += 8;
        _outpos[_outsize++] = 0x7f;
250
    }
Martin Hurton's avatar
Martin Hurton committed
251

252 253
    set_pollin (_handle);
    set_pollout (_handle);
254 255 256 257
    //  Flush all the data that may have been already received downstream.
    in_event ();
}

258
void zmq::stream_engine_t::unplug ()
259
{
260 261
    zmq_assert (_plugged);
    _plugged = false;
262

263
    //  Cancel all timers.
264
    if (_has_handshake_timer) {
265
        cancel_timer (handshake_timer_id);
266
        _has_handshake_timer = false;
267 268
    }

269
    if (_has_ttl_timer) {
Jonathan Reams's avatar
Jonathan Reams committed
270
        cancel_timer (heartbeat_ttl_timer_id);
271
        _has_ttl_timer = false;
Jonathan Reams's avatar
Jonathan Reams committed
272 273
    }

274
    if (_has_timeout_timer) {
Jonathan Reams's avatar
Jonathan Reams committed
275
        cancel_timer (heartbeat_timeout_timer_id);
276
        _has_timeout_timer = false;
Jonathan Reams's avatar
Jonathan Reams committed
277 278
    }

279
    if (_has_heartbeat_timer) {
Jonathan Reams's avatar
Jonathan Reams committed
280
        cancel_timer (heartbeat_ivl_timer_id);
281
        _has_heartbeat_timer = false;
Jonathan Reams's avatar
Jonathan Reams committed
282
    }
283
    //  Cancel all fd subscriptions.
284 285
    if (!_io_error)
        rm_fd (_handle);
286 287 288 289

    //  Disconnect from I/O threads poller object.
    io_object_t::unplug ();

290
    _session = NULL;
291 292
}

293
void zmq::stream_engine_t::terminate ()
294 295 296 297 298
{
    unplug ();
    delete this;
}

299
void zmq::stream_engine_t::in_event ()
300
{
301
    zmq_assert (!_io_error);
302

Pieter Hintjens's avatar
Pieter Hintjens committed
303
    //  If still handshaking, receive and process the greeting message.
304
    if (unlikely (_handshaking))
Martin Hurton's avatar
Martin Hurton committed
305 306 307
        if (!handshake ())
            return;

308
    zmq_assert (_decoder);
309 310

    //  If there has been an I/O error, stop polling.
311 312 313
    if (_input_stopped) {
        rm_fd (_handle);
        _io_error = true;
314 315
        return;
    }
316

317
    //  If there's no data to process in the buffer...
318
    if (!_insize) {
319 320 321 322
        //  Retrieve the buffer and read as much data as possible.
        //  Note that buffer can be arbitrarily large. However, we assume
        //  the underlying TCP layer has fixed buffer size and thus the
        //  number of bytes read will be always limited.
323
        size_t bufsize = 0;
324
        _decoder->get_buffer (&_inpos, &bufsize);
325

326
        const int rc = tcp_read (_s, _inpos, bufsize);
327

328
        if (rc == 0) {
329 330
            // connection closed by peer
            errno = EPIPE;
331
            error (connection_error);
332
            return;
333
        }
334 335
        if (rc == -1) {
            if (errno != EAGAIN)
336
                error (connection_error);
337 338
            return;
        }
339

340
        //  Adjust input size
341
        _insize = static_cast<size_t> (rc);
342
        // Adjust buffer size to received bytes
343
        _decoder->resize_buffer (_insize);
Martin Hurton's avatar
Martin Hurton committed
344
    }
345

346 347
    int rc = 0;
    size_t processed = 0;
348

349 350 351 352 353
    while (_insize > 0) {
        rc = _decoder->decode (_inpos, _insize, processed);
        zmq_assert (processed <= _insize);
        _inpos += processed;
        _insize -= processed;
354 355
        if (rc == 0 || rc == -1)
            break;
356
        rc = (this->*_process_msg) (_decoder->msg ());
357 358
        if (rc == -1)
            break;
359 360
    }

361 362 363 364
    //  Tear down the connection if we have failed to decode input data
    //  or the session has rejected the message.
    if (rc == -1) {
        if (errno != EAGAIN) {
365
            error (protocol_error);
366 367
            return;
        }
368 369
        _input_stopped = true;
        reset_pollin (_handle);
Martin Hurton's avatar
Martin Hurton committed
370
    }
371

372
    _session->flush ();
373 374
}

375
void zmq::stream_engine_t::out_event ()
376
{
377
    zmq_assert (!_io_error);
378

379
    //  If write buffer is empty, try to read new data from the encoder.
380
    if (!_outsize) {
Martin Hurton's avatar
Martin Hurton committed
381 382 383
        //  Even when we stop polling as soon as there is no
        //  data to send, the poller may invoke out_event one
        //  more time due to 'speculative write' optimisation.
384 385
        if (unlikely (_encoder == NULL)) {
            zmq_assert (_handshaking);
Martin Hurton's avatar
Martin Hurton committed
386 387 388
            return;
        }

389 390
        _outpos = NULL;
        _outsize = _encoder->encode (&_outpos, 0);
391

392 393
        while (_outsize < static_cast<size_t> (out_batch_size)) {
            if ((this->*_next_msg) (&_tx_msg) == -1)
394
                break;
395 396 397
            _encoder->load_msg (&_tx_msg);
            unsigned char *bufptr = _outpos + _outsize;
            size_t n = _encoder->encode (&bufptr, out_batch_size - _outsize);
398
            zmq_assert (n > 0);
399 400 401
            if (_outpos == NULL)
                _outpos = bufptr;
            _outsize += n;
402
        }
403 404

        //  If there is no data to send, stop polling for output.
405 406 407
        if (_outsize == 0) {
            _output_stopped = true;
            reset_pollout (_handle);
408 409 410 411 412 413
            return;
        }
    }

    //  If there are any data to write in write buffer, write as much as
    //  possible to the socket. Note that amount of data to write can be
414
    //  arbitrarily large. However, we assume that underlying TCP layer has
415 416
    //  limited transmission buffer and thus the actual number of bytes
    //  written should be reasonably modest.
417
    const int nbytes = tcp_write (_s, _outpos, _outsize);
418

Martin Hurton's avatar
Martin Hurton committed
419 420
    //  IO error has occurred. We stop waiting for output events.
    //  The engine is not terminated until we detect input error;
421
    //  this is necessary to prevent losing incoming messages.
422
    if (nbytes == -1) {
423
        reset_pollout (_handle);
424 425 426
        return;
    }

427 428
    _outpos += nbytes;
    _outsize -= nbytes;
Martin Hurton's avatar
Martin Hurton committed
429 430 431

    //  If we are still handshaking and there are no data
    //  to send, stop polling for output.
432 433 434
    if (unlikely (_handshaking))
        if (_outsize == 0)
            reset_pollout (_handle);
435 436
}

437
void zmq::stream_engine_t::restart_output ()
438
{
439
    if (unlikely (_io_error))
440 441
        return;

442 443 444
    if (likely (_output_stopped)) {
        set_pollout (_handle);
        _output_stopped = false;
445
    }
446 447 448 449 450 451 452 453

    //  Speculative write: The assumption is that at the moment new message
    //  was sent by the user the socket is probably available for writing.
    //  Thus we try to write the data to socket avoiding polling for POLLOUT.
    //  Consequently, the latency should be better in request/reply scenarios.
    out_event ();
}

454
bool zmq::stream_engine_t::restart_input ()
455
{
456 457 458
    zmq_assert (_input_stopped);
    zmq_assert (_session != NULL);
    zmq_assert (_decoder != NULL);
459

460
    int rc = (this->*_process_msg) (_decoder->msg ());
461 462
    if (rc == -1) {
        if (errno == EAGAIN)
463
            _session->flush ();
464
        else {
465
            error (protocol_error);
466 467 468
            return false;
        }
        return true;
Martin Hurton's avatar
Martin Hurton committed
469 470
    }

471
    while (_insize > 0) {
472
        size_t processed = 0;
473 474 475 476
        rc = _decoder->decode (_inpos, _insize, processed);
        zmq_assert (processed <= _insize);
        _inpos += processed;
        _insize -= processed;
477 478
        if (rc == 0 || rc == -1)
            break;
479
        rc = (this->*_process_msg) (_decoder->msg ());
480 481 482
        if (rc == -1)
            break;
    }
483

484
    if (rc == -1 && errno == EAGAIN)
485
        _session->flush ();
486
    else if (_io_error) {
487
        error (connection_error);
488 489
        return false;
    } else if (rc == -1) {
490
        error (protocol_error);
491 492 493
        return false;
    }

494
    else {
495 496 497
        _input_stopped = false;
        set_pollin (_handle);
        _session->flush ();
498 499 500 501

        //  Speculative read.
        in_event ();
    }
502 503

    return true;
504 505
}

506 507 508
//  Position of the revision field in the greeting.
const size_t revision_pos = 10;

509
bool zmq::stream_engine_t::handshake ()
Martin Hurton's avatar
Martin Hurton committed
510
{
511 512
    zmq_assert (_handshaking);
    zmq_assert (_greeting_bytes_read < _greeting_size);
513
    //  Receive the greeting.
514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542
    const int rc = receive_greeting ();
    if (rc == -1)
        return false;
    const bool unversioned = rc != 0;

    if (!(this
            ->*select_handshake_fun (unversioned,
                                     _greeting_recv[revision_pos])) ())
        return false;

    // Start polling for output if necessary.
    if (_outsize == 0)
        set_pollout (_handle);

    //  Handshaking was successful.
    //  Switch into the normal message flow.
    _handshaking = false;

    if (_has_handshake_timer) {
        cancel_timer (handshake_timer_id);
        _has_handshake_timer = false;
    }

    return true;
}

int zmq::stream_engine_t::receive_greeting ()
{
    bool unversioned = false;
543 544 545
    while (_greeting_bytes_read < _greeting_size) {
        const int n = tcp_read (_s, _greeting_recv + _greeting_bytes_read,
                                _greeting_size - _greeting_bytes_read);
546
        if (n == 0) {
547
            errno = EPIPE;
548
            error (connection_error);
549
            return -1;
550
        }
551 552
        if (n == -1) {
            if (errno != EAGAIN)
553
                error (connection_error);
554
            return -1;
555
        }
Martin Hurton's avatar
Martin Hurton committed
556

557
        _greeting_bytes_read += n;
Martin Hurton's avatar
Martin Hurton committed
558

559 560 561
        //  We have received at least one byte from the peer.
        //  If the first byte is not 0xff, we know that the
        //  peer is using unversioned protocol.
562 563
        if (_greeting_recv[0] != 0xff) {
            unversioned = true;
564
            break;
565
        }
Martin Hurton's avatar
Martin Hurton committed
566

567
        if (_greeting_bytes_read < signature_size)
568
            continue;
Martin Hurton's avatar
Martin Hurton committed
569

570 571
        //  Inspect the right-most bit of the 10th byte (which coincides
        //  with the 'flags' field if a regular message was sent).
572
        //  Zero indicates this is a header of a routing id message
573
        //  (i.e. the peer is using the unversioned protocol).
574 575
        if (!(_greeting_recv[9] & 0x01)) {
            unversioned = true;
576
            break;
577
        }
Martin Hurton's avatar
Martin Hurton committed
578

579
        //  The peer is using versioned protocol.
580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595
        receive_greeting_versioned ();
    }
    return unversioned ? 1 : 0;
}

void zmq::stream_engine_t::receive_greeting_versioned ()
{
    //  Send the major version number.
    if (_outpos + _outsize == _greeting_send + signature_size) {
        if (_outsize == 0)
            set_pollout (_handle);
        _outpos[_outsize++] = 3; //  Major version number
    }

    if (_greeting_bytes_read > signature_size) {
        if (_outpos + _outsize == _greeting_send + signature_size + 1) {
596 597
            if (_outsize == 0)
                set_pollout (_handle);
598

599
            //  Use ZMTP/2.0 to talk to older peers.
600 601
            if (_greeting_recv[revision_pos] == ZMTP_1_0
                || _greeting_recv[revision_pos] == ZMTP_2_0)
602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623
                _outpos[_outsize++] = _options.type;
            else {
                _outpos[_outsize++] = 0; //  Minor version number
                memset (_outpos + _outsize, 0, 20);

                zmq_assert (_options.mechanism == ZMQ_NULL
                            || _options.mechanism == ZMQ_PLAIN
                            || _options.mechanism == ZMQ_CURVE
                            || _options.mechanism == ZMQ_GSSAPI);

                if (_options.mechanism == ZMQ_NULL)
                    memcpy (_outpos + _outsize, "NULL", 4);
                else if (_options.mechanism == ZMQ_PLAIN)
                    memcpy (_outpos + _outsize, "PLAIN", 5);
                else if (_options.mechanism == ZMQ_GSSAPI)
                    memcpy (_outpos + _outsize, "GSSAPI", 6);
                else if (_options.mechanism == ZMQ_CURVE)
                    memcpy (_outpos + _outsize, "CURVE", 5);
                _outsize += 20;
                memset (_outpos + _outsize, 0, 32);
                _outsize += 32;
                _greeting_size = v3_greeting_size;
624
            }
Martin Hurton's avatar
Martin Hurton committed
625 626
        }
    }
627
}
Martin Hurton's avatar
Martin Hurton committed
628

629 630 631 632
zmq::stream_engine_t::handshake_fun_t
zmq::stream_engine_t::select_handshake_fun (bool unversioned,
                                            unsigned char revision)
{
633
    //  Is the peer using ZMTP/1.0 with no revision number?
634 635 636 637 638 639 640 641 642 643 644 645
    if (unversioned) {
        return &stream_engine_t::handshake_v1_0_unversioned;
    }
    switch (revision) {
        case ZMTP_1_0:
            return &stream_engine_t::handshake_v1_0;
        case ZMTP_2_0:
            return &stream_engine_t::handshake_v2_0;
        default:
            return &stream_engine_t::handshake_v3_0;
    }
}
646

647 648 649 650 651 652 653 654
bool zmq::stream_engine_t::handshake_v1_0_unversioned ()
{
    //  We send and receive rest of routing id message
    if (_session->zap_enabled ()) {
        // reject ZMTP 1.0 connections if ZAP is enabled
        error (protocol_error);
        return false;
    }
655

656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695
    _encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
    alloc_assert (_encoder);

    _decoder =
      new (std::nothrow) v1_decoder_t (in_batch_size, _options.maxmsgsize);
    alloc_assert (_decoder);

    //  We have already sent the message header.
    //  Since there is no way to tell the encoder to
    //  skip the message header, we simply throw that
    //  header data away.
    const size_t header_size =
      _options.routing_id_size + 1 >= UCHAR_MAX ? 10 : 2;
    unsigned char tmp[10], *bufferp = tmp;

    //  Prepare the routing id message and load it into encoder.
    //  Then consume bytes we have already sent to the peer.
    const int rc = _tx_msg.init_size (_options.routing_id_size);
    zmq_assert (rc == 0);
    memcpy (_tx_msg.data (), _options.routing_id, _options.routing_id_size);
    _encoder->load_msg (&_tx_msg);
    const size_t buffer_size = _encoder->encode (&bufferp, header_size);
    zmq_assert (buffer_size == header_size);

    //  Make sure the decoder sees the data we have already received.
    _inpos = _greeting_recv;
    _insize = _greeting_bytes_read;

    //  To allow for interoperability with peers that do not forward
    //  their subscriptions, we inject a phantom subscription message
    //  message into the incoming message stream.
    if (_options.type == ZMQ_PUB || _options.type == ZMQ_XPUB)
        _subscription_required = true;

    //  We are sending our routing id now and the next message
    //  will come from the socket.
    _next_msg = &stream_engine_t::pull_msg_from_session;

    //  We are expecting routing id message.
    _process_msg = &stream_engine_t::process_routing_id_msg;
696

697 698
    return true;
}
Martin Hurton's avatar
Martin Hurton committed
699

700 701 702 703 704 705 706
bool zmq::stream_engine_t::handshake_v1_0 ()
{
    if (_session->zap_enabled ()) {
        // reject ZMTP 1.0 connections if ZAP is enabled
        error (protocol_error);
        return false;
    }
707

708 709
    _encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
    alloc_assert (_encoder);
710

711 712 713
    _decoder =
      new (std::nothrow) v1_decoder_t (in_batch_size, _options.maxmsgsize);
    alloc_assert (_decoder);
714

715 716
    return true;
}
717

718 719 720 721 722 723 724
bool zmq::stream_engine_t::handshake_v2_0 ()
{
    if (_session->zap_enabled ()) {
        // reject ZMTP 2.0 connections if ZAP is enabled
        error (protocol_error);
        return false;
    }
725

726 727 728 729 730 731
    _encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
    alloc_assert (_encoder);

    _decoder = new (std::nothrow)
      v2_decoder_t (in_batch_size, _options.maxmsgsize, _options.zero_copy);
    alloc_assert (_decoder);
732

733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756
    return true;
}

bool zmq::stream_engine_t::handshake_v3_0 ()
{
    _encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
    alloc_assert (_encoder);

    _decoder = new (std::nothrow)
      v2_decoder_t (in_batch_size, _options.maxmsgsize, _options.zero_copy);
    alloc_assert (_decoder);

    if (_options.mechanism == ZMQ_NULL
        && memcmp (_greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",
                   20)
             == 0) {
        _mechanism = new (std::nothrow)
          null_mechanism_t (_session, _peer_address, _options);
        alloc_assert (_mechanism);
    } else if (_options.mechanism == ZMQ_PLAIN
               && memcmp (_greeting_recv + 12,
                          "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
                    == 0) {
        if (_options.as_server)
757
            _mechanism = new (std::nothrow)
758 759 760 761 762
              plain_server_t (_session, _peer_address, _options);
        else
            _mechanism = new (std::nothrow) plain_client_t (_session, _options);
        alloc_assert (_mechanism);
    }
763
#ifdef ZMQ_HAVE_CURVE
764 765 766 767 768 769 770 771 772 773 774
    else if (_options.mechanism == ZMQ_CURVE
             && memcmp (_greeting_recv + 12,
                        "CURVE\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
                  == 0) {
        if (_options.as_server)
            _mechanism = new (std::nothrow)
              curve_server_t (_session, _peer_address, _options);
        else
            _mechanism = new (std::nothrow) curve_client_t (_session, _options);
        alloc_assert (_mechanism);
    }
775
#endif
Chris Laws's avatar
Chris Laws committed
776
#ifdef HAVE_LIBGSSAPI_KRB5
777 778 779 780 781 782 783 784 785 786 787
    else if (_options.mechanism == ZMQ_GSSAPI
             && memcmp (_greeting_recv + 12,
                        "GSSAPI\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
                  == 0) {
        if (_options.as_server)
            _mechanism = new (std::nothrow)
              gssapi_server_t (_session, _peer_address, _options);
        else
            _mechanism =
              new (std::nothrow) gssapi_client_t (_session, _options);
        alloc_assert (_mechanism);
788
    }
789 790 791 792 793 794 795
#endif
    else {
        _session->get_socket ()->event_handshake_failed_protocol (
          _session->get_endpoint (),
          ZMQ_PROTOCOL_ERROR_ZMTP_MECHANISM_MISMATCH);
        error (protocol_error);
        return false;
796
    }
797 798
    _next_msg = &stream_engine_t::next_handshake_command;
    _process_msg = &stream_engine_t::process_handshake_command;
799

Martin Hurton's avatar
Martin Hurton committed
800 801 802
    return true;
}

803
int zmq::stream_engine_t::routing_id_msg (msg_t *msg_)
804
{
805
    int rc = msg_->init_size (_options.routing_id_size);
806
    errno_assert (rc == 0);
807 808 809
    if (_options.routing_id_size > 0)
        memcpy (msg_->data (), _options.routing_id, _options.routing_id_size);
    _next_msg = &stream_engine_t::pull_msg_from_session;
810 811
    return 0;
}
812

813
int zmq::stream_engine_t::process_routing_id_msg (msg_t *msg_)
814
{
815
    if (_options.recv_routing_id) {
816
        msg_->set_flags (msg_t::routing_id);
817
        int rc = _session->push_msg (msg_);
818
        errno_assert (rc == 0);
819
    } else {
820 821 822 823 824 825
        int rc = msg_->close ();
        errno_assert (rc == 0);
        rc = msg_->init ();
        errno_assert (rc == 0);
    }

826
    if (_subscription_required) {
827 828 829 830 831 832
        msg_t subscription;

        //  Inject the subscription message, so that also
        //  ZMQ 2.x peers receive published messages.
        int rc = subscription.init_size (1);
        errno_assert (rc == 0);
833
        *static_cast<unsigned char *> (subscription.data ()) = 1;
834
        rc = _session->push_msg (&subscription);
835 836 837
        errno_assert (rc == 0);
    }

838
    _process_msg = &stream_engine_t::push_msg_to_session;
839

840 841
    return 0;
}
842

843
int zmq::stream_engine_t::next_handshake_command (msg_t *msg_)
844
{
845
    zmq_assert (_mechanism != NULL);
846

847
    if (_mechanism->status () == mechanism_t::ready) {
848 849
        mechanism_ready ();
        return pull_and_encode (msg_);
850
    }
851
    if (_mechanism->status () == mechanism_t::error) {
852 853
        errno = EPROTO;
        return -1;
854
    } else {
855
        const int rc = _mechanism->next_handshake_command (msg_);
Vincent Tellier's avatar
Vincent Tellier committed
856

857 858
        if (rc == 0)
            msg_->set_flags (msg_t::command);
Vincent Tellier's avatar
Vincent Tellier committed
859

860
        return rc;
861
    }
862 863
}

864
int zmq::stream_engine_t::process_handshake_command (msg_t *msg_)
865
{
866 867
    zmq_assert (_mechanism != NULL);
    const int rc = _mechanism->process_handshake_command (msg_);
868
    if (rc == 0) {
869
        if (_mechanism->status () == mechanism_t::ready)
870
            mechanism_ready ();
871
        else if (_mechanism->status () == mechanism_t::error) {
872 873 874
            errno = EPROTO;
            return -1;
        }
875
        if (_output_stopped)
876
            restart_output ();
877 878
    }

879
    return rc;
880 881
}

882 883
void zmq::stream_engine_t::zap_msg_available ()
{
884
    zmq_assert (_mechanism != NULL);
885

886
    const int rc = _mechanism->zap_msg_available ();
887
    if (rc == -1) {
888
        error (protocol_error);
889 890
        return;
    }
891
    if (_input_stopped)
892 893
        if (!restart_input ())
            return;
894
    if (_output_stopped)
895
        restart_output ();
896 897
}

898
const zmq::endpoint_uri_pair_t &zmq::stream_engine_t::get_endpoint () const
899
{
900
    return _endpoint_uri_pair;
901 902
}

903
void zmq::stream_engine_t::mechanism_ready ()
904
{
905 906 907
    if (_options.heartbeat_interval > 0) {
        add_timer (_options.heartbeat_interval, heartbeat_ivl_timer_id);
        _has_heartbeat_timer = true;
908 909
    }

910 911
    bool flush_session = false;

912
    if (_options.recv_routing_id) {
913
        msg_t routing_id;
914 915
        _mechanism->peer_routing_id (&routing_id);
        const int rc = _session->push_msg (&routing_id);
916 917 918
        if (rc == -1 && errno == EAGAIN) {
            // If the write is failing at this stage with
            // an EAGAIN the pipe must be being shut down,
919
            // so we can just bail out of the routing id set.
920 921
            return;
        }
922
        errno_assert (rc == 0);
923 924 925 926 927 928 929 930 931 932 933 934 935 936 937
        flush_session = true;
    }

    if (_options.router_notify & ZMQ_NOTIFY_CONNECT) {
        msg_t connect_notification;
        connect_notification.init ();
        const int rc = _session->push_msg (&connect_notification);
        if (rc == -1 && errno == EAGAIN) {
            // If the write is failing at this stage with
            // an EAGAIN the pipe must be being shut down,
            // so we can just bail out of the notification.
            return;
        }
        errno_assert (rc == 0);
        flush_session = true;
938
    }
939

940 941 942
    if (flush_session)
        _session->flush ();

943 944
    _next_msg = &stream_engine_t::pull_and_encode;
    _process_msg = &stream_engine_t::write_credential;
945 946 947

    //  Compile metadata.
    properties_t properties;
948
    init_properties (properties);
949

Pieter Hintjens's avatar
Pieter Hintjens committed
950
    //  Add ZAP properties.
951
    const properties_t &zap_properties = _mechanism->get_zap_properties ();
952
    properties.insert (zap_properties.begin (), zap_properties.end ());
Pieter Hintjens's avatar
Pieter Hintjens committed
953

954
    //  Add ZMTP properties.
955
    const properties_t &zmtp_properties = _mechanism->get_zmtp_properties ();
956
    properties.insert (zmtp_properties.begin (), zmtp_properties.end ());
957

958
    zmq_assert (_metadata == NULL);
959
    if (!properties.empty ()) {
960 961
        _metadata = new (std::nothrow) metadata_t (properties);
        alloc_assert (_metadata);
962
    }
Vincent Tellier's avatar
Vincent Tellier committed
963

964
    _socket->event_handshake_succeeded (_endpoint_uri_pair, 0);
965 966
}

967
int zmq::stream_engine_t::pull_msg_from_session (msg_t *msg_)
968
{
969
    return _session->pull_msg (msg_);
970
}
971

972 973
int zmq::stream_engine_t::push_msg_to_session (msg_t *msg_)
{
974
    return _session->push_msg (msg_);
975 976
}

977 978
int zmq::stream_engine_t::push_raw_msg_to_session (msg_t *msg_)
{
979 980
    if (_metadata && _metadata != msg_->metadata ())
        msg_->set_metadata (_metadata);
981
    return push_msg_to_session (msg_);
982 983
}

984 985
int zmq::stream_engine_t::write_credential (msg_t *msg_)
{
986 987
    zmq_assert (_mechanism != NULL);
    zmq_assert (_session != NULL);
988

989
    const blob_t &credential = _mechanism->get_user_id ();
990 991 992 993 994 995
    if (credential.size () > 0) {
        msg_t msg;
        int rc = msg.init_size (credential.size ());
        zmq_assert (rc == 0);
        memcpy (msg.data (), credential.data (), credential.size ());
        msg.set_flags (msg_t::credential);
996
        rc = _session->push_msg (&msg);
997 998 999 1000 1001 1002
        if (rc == -1) {
            rc = msg.close ();
            errno_assert (rc == 0);
            return -1;
        }
    }
1003
    _process_msg = &stream_engine_t::decode_and_push;
1004 1005 1006
    return decode_and_push (msg_);
}

1007 1008
int zmq::stream_engine_t::pull_and_encode (msg_t *msg_)
{
1009
    zmq_assert (_mechanism != NULL);
1010

1011
    if (_session->pull_msg (msg_) == -1)
1012
        return -1;
1013
    if (_mechanism->encode (msg_) == -1)
1014 1015 1016 1017 1018 1019
        return -1;
    return 0;
}

int zmq::stream_engine_t::decode_and_push (msg_t *msg_)
{
1020
    zmq_assert (_mechanism != NULL);
1021

1022
    if (_mechanism->decode (msg_) == -1)
1023
        return -1;
Jonathan Reams's avatar
Jonathan Reams committed
1024

1025 1026
    if (_has_timeout_timer) {
        _has_timeout_timer = false;
1027
        cancel_timer (heartbeat_timeout_timer_id);
Jonathan Reams's avatar
Jonathan Reams committed
1028 1029
    }

1030 1031
    if (_has_ttl_timer) {
        _has_ttl_timer = false;
1032
        cancel_timer (heartbeat_ttl_timer_id);
Jonathan Reams's avatar
Jonathan Reams committed
1033 1034
    }

1035
    if (msg_->flags () & msg_t::command) {
1036
        process_command_message (msg_);
Jonathan Reams's avatar
Jonathan Reams committed
1037 1038
    }

1039 1040 1041
    if (_metadata)
        msg_->set_metadata (_metadata);
    if (_session->push_msg (msg_) == -1) {
1042
        if (errno == EAGAIN)
1043
            _process_msg = &stream_engine_t::push_one_then_decode_and_push;
1044 1045 1046 1047 1048 1049 1050
        return -1;
    }
    return 0;
}

int zmq::stream_engine_t::push_one_then_decode_and_push (msg_t *msg_)
{
1051
    const int rc = _session->push_msg (msg_);
1052
    if (rc == 0)
1053
        _process_msg = &stream_engine_t::decode_and_push;
1054 1055 1056
    return rc;
}

1057
void zmq::stream_engine_t::error (error_reason_t reason_)
1058
{
1059
    if (_options.raw_socket && _options.raw_notify) {
1060 1061 1062
        //  For raw sockets, send a final 0-length message to the application
        //  so that it knows the peer has been disconnected.
        msg_t terminator;
1063
        terminator.init ();
1064
        (this->*_process_msg) (&terminator);
1065
        terminator.close ();
1066
    }
1067
    zmq_assert (_session);
1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079

    if ((_options.router_notify & ZMQ_NOTIFY_DISCONNECT) && !_handshaking) {
        // For router sockets with disconnect notification, rollback
        // any incomplete message in the pipe, and push the disconnect
        // notification message.
        _session->rollback ();

        msg_t disconnect_notification;
        disconnect_notification.init ();
        _session->push_msg (&disconnect_notification);
    }

1080
    // protocol errors have been signaled already at the point where they occurred
1081
    if (reason_ != protocol_error
1082 1083
        && (_mechanism == NULL
            || _mechanism->status () == mechanism_t::handshaking)) {
1084
        int err = errno;
1085
        _socket->event_handshake_failed_no_detail (_endpoint_uri_pair, err);
1086
    }
1087

1088
    _socket->event_disconnected (_endpoint_uri_pair, _s);
1089 1090
    _session->flush ();
    _session->engine_error (reason_);
1091 1092 1093 1094
    unplug ();
    delete this;
}

1095
void zmq::stream_engine_t::set_handshake_timer ()
1096
{
1097
    zmq_assert (!_has_handshake_timer);
1098

1099 1100 1101
    if (!_options.raw_socket && _options.handshake_ivl > 0) {
        add_timer (_options.handshake_ivl, handshake_timer_id);
        _has_handshake_timer = true;
1102
    }
1103 1104
}

1105
bool zmq::stream_engine_t::init_properties (properties_t &properties_)
1106
{
1107
    if (_peer_address.empty ())
1108
        return false;
1109
    properties_.ZMQ_MAP_INSERT_OR_EMPLACE (
1110
      std::string (ZMQ_MSG_PROPERTY_PEER_ADDRESS), _peer_address);
1111 1112

    //  Private property to support deprecated SRCFD
1113
    std::ostringstream stream;
1114
    stream << static_cast<int> (_s);
1115
    std::string fd_string = stream.str ();
1116 1117
    properties_.ZMQ_MAP_INSERT_OR_EMPLACE (std::string ("__fd"),
                                           ZMQ_MOVE (fd_string));
Thomas Rodgers's avatar
Thomas Rodgers committed
1118 1119 1120
    return true;
}

1121
void zmq::stream_engine_t::timer_event (int id_)
1122
{
1123
    if (id_ == handshake_timer_id) {
1124
        _has_handshake_timer = false;
Jonathan Reams's avatar
Jonathan Reams committed
1125 1126
        //  handshake timer expired before handshake completed, so engine fail
        error (timeout_error);
1127
    } else if (id_ == heartbeat_ivl_timer_id) {
1128
        _next_msg = &stream_engine_t::produce_ping_message;
1129
        out_event ();
1130
        add_timer (_options.heartbeat_interval, heartbeat_ivl_timer_id);
1131
    } else if (id_ == heartbeat_ttl_timer_id) {
1132
        _has_ttl_timer = false;
1133 1134
        error (timeout_error);
    } else if (id_ == heartbeat_timeout_timer_id) {
1135
        _has_timeout_timer = false;
1136 1137
        error (timeout_error);
    } else
Jonathan Reams's avatar
Jonathan Reams committed
1138
        // There are no other valid timer ids!
1139
        assert (false);
Jonathan Reams's avatar
Jonathan Reams committed
1140 1141
}

1142
int zmq::stream_engine_t::produce_ping_message (msg_t *msg_)
Jonathan Reams's avatar
Jonathan Reams committed
1143
{
1144 1145
    // 16-bit TTL + \4PING == 7
    const size_t ping_ttl_len = msg_t::ping_cmd_name_size + 2;
1146
    zmq_assert (_mechanism != NULL);
Jonathan Reams's avatar
Jonathan Reams committed
1147

1148
    int rc = msg_->init_size (ping_ttl_len);
1149 1150
    errno_assert (rc == 0);
    msg_->set_flags (msg_t::command);
Jonathan Reams's avatar
Jonathan Reams committed
1151
    // Copy in the command message
1152
    memcpy (msg_->data (), "\4PING", msg_t::ping_cmd_name_size);
Jonathan Reams's avatar
Jonathan Reams committed
1153

1154
    uint16_t ttl_val = htons (_options.heartbeat_ttl);
1155
    memcpy (static_cast<uint8_t *> (msg_->data ()) + msg_t::ping_cmd_name_size,
1156
            &ttl_val, sizeof (ttl_val));
Jonathan Reams's avatar
Jonathan Reams committed
1157

1158 1159 1160 1161 1162
    rc = _mechanism->encode (msg_);
    _next_msg = &stream_engine_t::pull_and_encode;
    if (!_has_timeout_timer && _heartbeat_timeout > 0) {
        add_timer (_heartbeat_timeout, heartbeat_timeout_timer_id);
        _has_timeout_timer = true;
Jonathan Reams's avatar
Jonathan Reams committed
1163 1164 1165 1166
    }
    return rc;
}

1167
int zmq::stream_engine_t::produce_pong_message (msg_t *msg_)
Jonathan Reams's avatar
Jonathan Reams committed
1168
{
1169
    zmq_assert (_mechanism != NULL);
Jonathan Reams's avatar
Jonathan Reams committed
1170

1171
    int rc = msg_->move (_pong_msg);
1172
    errno_assert (rc == 0);
Jonathan Reams's avatar
Jonathan Reams committed
1173

1174 1175
    rc = _mechanism->encode (msg_);
    _next_msg = &stream_engine_t::pull_and_encode;
Jonathan Reams's avatar
Jonathan Reams committed
1176 1177 1178
    return rc;
}

1179
int zmq::stream_engine_t::process_heartbeat_message (msg_t *msg_)
Jonathan Reams's avatar
Jonathan Reams committed
1180
{
1181 1182 1183 1184
    if (msg_->is_ping ()) {
        // 16-bit TTL + \4PING == 7
        const size_t ping_ttl_len = msg_t::ping_cmd_name_size + 2;
        const size_t ping_max_ctx_len = 16;
Jonathan Reams's avatar
Jonathan Reams committed
1185
        uint16_t remote_heartbeat_ttl;
1186

Jonathan Reams's avatar
Jonathan Reams committed
1187
        // Get the remote heartbeat TTL to setup the timer
1188
        memcpy (&remote_heartbeat_ttl,
1189 1190 1191
                static_cast<uint8_t *> (msg_->data ())
                  + msg_t::ping_cmd_name_size,
                ping_ttl_len - msg_t::ping_cmd_name_size);
1192
        remote_heartbeat_ttl = ntohs (remote_heartbeat_ttl);
Jonathan Reams's avatar
Jonathan Reams committed
1193
        // The remote heartbeat is in 10ths of a second
1194 1195
        // so we multiply it by 100 to get the timer interval in ms.
        remote_heartbeat_ttl *= 100;
Jonathan Reams's avatar
Jonathan Reams committed
1196

1197
        if (!_has_ttl_timer && remote_heartbeat_ttl > 0) {
1198
            add_timer (remote_heartbeat_ttl, heartbeat_ttl_timer_id);
1199
            _has_ttl_timer = true;
Jonathan Reams's avatar
Jonathan Reams committed
1200
        }
1201

1202 1203 1204 1205 1206
        //  As per ZMTP 3.1 the PING command might contain an up to 16 bytes
        //  context which needs to be PONGed back, so build the pong message
        //  here and store it. Truncate it if it's too long.
        //  Given the engine goes straight to out_event, sequential PINGs will
        //  not be a problem.
1207
        const size_t context_len =
1208
          std::min (msg_->size () - ping_ttl_len, ping_max_ctx_len);
1209 1210
        const int rc =
          _pong_msg.init_size (msg_t::ping_cmd_name_size + context_len);
1211
        errno_assert (rc == 0);
1212
        _pong_msg.set_flags (msg_t::command);
1213
        memcpy (_pong_msg.data (), "\4PONG", msg_t::ping_cmd_name_size);
1214
        if (context_len > 0)
1215
            memcpy (static_cast<uint8_t *> (_pong_msg.data ())
1216
                      + msg_t::ping_cmd_name_size,
1217
                    static_cast<uint8_t *> (msg_->data ()) + ping_ttl_len,
1218
                    context_len);
1219

1220
        _next_msg = &stream_engine_t::produce_pong_message;
1221
        out_event ();
Jonathan Reams's avatar
Jonathan Reams committed
1222 1223 1224
    }

    return 0;
1225
}
1226 1227 1228

int zmq::stream_engine_t::process_command_message (msg_t *msg_)
{
1229 1230
    const uint8_t cmd_name_size =
      *(static_cast<const uint8_t *> (msg_->data ()));
1231 1232 1233
    const size_t ping_name_size = msg_t::ping_cmd_name_size - 1;
    const size_t sub_name_size = msg_t::sub_cmd_name_size - 1;
    const size_t cancel_name_size = msg_t::cancel_cmd_name_size - 1;
1234
    //  Malformed command
1235
    if (unlikely (msg_->size () < cmd_name_size + sizeof (cmd_name_size)))
1236 1237
        return -1;

1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252
    uint8_t *cmd_name = (static_cast<uint8_t *> (msg_->data ())) + 1;
    if (cmd_name_size == ping_name_size
        && memcmp (cmd_name, "PING", cmd_name_size) == 0)
        msg_->set_flags (zmq::msg_t::ping);
    if (cmd_name_size == ping_name_size
        && memcmp (cmd_name, "PONG", cmd_name_size) == 0)
        msg_->set_flags (zmq::msg_t::pong);
    if (cmd_name_size == sub_name_size
        && memcmp (cmd_name, "SUBSCRIBE", cmd_name_size) == 0)
        msg_->set_flags (zmq::msg_t::subscribe);
    if (cmd_name_size == cancel_name_size
        && memcmp (cmd_name, "CANCEL", cmd_name_size) == 0)
        msg_->set_flags (zmq::msg_t::cancel);

    if (msg_->is_ping () || msg_->is_pong ())
1253 1254 1255 1256
        return process_heartbeat_message (msg_);

    return 0;
}