stream_engine.cpp 37.8 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
32

33
#include <limits.h>
34
#include <string.h>
35 36

#ifndef ZMQ_HAVE_WINDOWS
37
#include <unistd.h>
38 39
#endif

40
#include <new>
41
#include <sstream>
42

43
#include "stream_engine.hpp"
44
#include "io_thread.hpp"
45
#include "session_base.hpp"
46 47
#include "v1_encoder.hpp"
#include "v1_decoder.hpp"
48 49
#include "v2_encoder.hpp"
#include "v2_decoder.hpp"
50
#include "null_mechanism.hpp"
51 52
#include "plain_client.hpp"
#include "plain_server.hpp"
53 54
#include "gssapi_client.hpp"
#include "gssapi_server.hpp"
55 56
#include "curve_client.hpp"
#include "curve_server.hpp"
57 58
#include "raw_decoder.hpp"
#include "raw_encoder.hpp"
59 60
#include "config.hpp"
#include "err.hpp"
61
#include "ip.hpp"
62
#include "tcp.hpp"
Martin Hurton's avatar
Martin Hurton committed
63 64
#include "likely.hpp"
#include "wire.hpp"
65

66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
static std::string get_peer_address (zmq::fd_t s_)
{
    std::string peer_address;

    const int family = zmq::get_peer_ip_address (s_, peer_address);
    if (family == 0)
        peer_address.clear ();
#if defined ZMQ_HAVE_SO_PEERCRED
    else if (family == PF_UNIX) {
        struct ucred cred;
        socklen_t size = sizeof (cred);
        if (!getsockopt (s_, SOL_SOCKET, SO_PEERCRED, &cred, &size)) {
            std::ostringstream buf;
            buf << ":" << cred.uid << ":" << cred.gid << ":" << cred.pid;
            peer_address += buf.str ();
        }
    }
#elif defined ZMQ_HAVE_LOCAL_PEERCRED
    else if (family == PF_UNIX) {
        struct xucred cred;
        socklen_t size = sizeof (cred);
        if (!getsockopt (_s, 0, LOCAL_PEERCRED, &cred, &size)
            && cred.cr_version == XUCRED_VERSION) {
            std::ostringstream buf;
            buf << ":" << cred.cr_uid << ":";
            if (cred.cr_ngroups > 0)
                buf << cred.cr_groups[0];
            buf << ":";
            _peer_address += buf.str ();
        }
    }
#endif

    return peer_address;
}


103 104
zmq::stream_engine_t::stream_engine_t (fd_t fd_,
                                       const options_t &options_,
105
                                       const std::string &endpoint_) :
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
    _s (fd_),
    _handle (static_cast<handle_t> (NULL)),
    _inpos (NULL),
    _insize (0),
    _decoder (NULL),
    _outpos (NULL),
    _outsize (0),
    _encoder (NULL),
    _metadata (NULL),
    _handshaking (true),
    _greeting_size (v2_greeting_size),
    _greeting_bytes_read (0),
    _session (NULL),
    _options (options_),
    _endpoint (endpoint_),
    _plugged (false),
    _next_msg (&stream_engine_t::routing_id_msg),
    _process_msg (&stream_engine_t::process_routing_id_msg),
    _io_error (false),
    _subscription_required (false),
    _mechanism (NULL),
    _input_stopped (false),
    _output_stopped (false),
    _has_handshake_timer (false),
    _has_ttl_timer (false),
    _has_timeout_timer (false),
    _has_heartbeat_timer (false),
    _heartbeat_timeout (0),
134 135
    _socket (NULL),
    _peer_address (get_peer_address (_s))
136
{
137
    int rc = _tx_msg.init ();
138
    errno_assert (rc == 0);
139
    rc = _pong_msg.init ();
140
    errno_assert (rc == 0);
Chris Laws's avatar
Chris Laws committed
141

Martin Hurton's avatar
Martin Hurton committed
142
    //  Put the socket into non-blocking mode.
143
    unblock_socket (_s);
144

145

146 147 148 149
    if (_options.heartbeat_interval > 0) {
        _heartbeat_timeout = _options.heartbeat_timeout;
        if (_heartbeat_timeout == -1)
            _heartbeat_timeout = _options.heartbeat_interval;
150
    }
151 152
}

153
zmq::stream_engine_t::~stream_engine_t ()
154
{
155
    zmq_assert (!_plugged);
156

157
    if (_s != retired_fd) {
158
#ifdef ZMQ_HAVE_WINDOWS
159
        int rc = closesocket (_s);
160
        wsa_assert (rc != SOCKET_ERROR);
161
#else
162
        int rc = close (_s);
163
#if defined(__FreeBSD_kernel__) || defined(__FreeBSD__)
164 165 166 167 168
        // FreeBSD may return ECONNRESET on close() under load but this is not
        // an error.
        if (rc == -1 && errno == ECONNRESET)
            rc = 0;
#endif
169 170
        errno_assert (rc == 0);
#endif
171
        _s = retired_fd;
172
    }
173

174
    int rc = _tx_msg.close ();
175 176
    errno_assert (rc == 0);

177 178
    //  Drop reference to metadata and destroy it if we are
    //  the only user.
179 180 181
    if (_metadata != NULL) {
        if (_metadata->drop_ref ()) {
            LIBZMQ_DELETE (_metadata);
182 183
        }
    }
184

185 186 187
    LIBZMQ_DELETE (_encoder);
    LIBZMQ_DELETE (_decoder);
    LIBZMQ_DELETE (_mechanism);
188 189
}

190
void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
191
                                 session_base_t *session_)
192
{
193 194
    zmq_assert (!_plugged);
    _plugged = true;
195 196

    //  Connect to session object.
197
    zmq_assert (!_session);
198
    zmq_assert (session_);
199 200
    _session = session_;
    _socket = _session->get_socket ();
201 202 203

    //  Connect to I/O threads poller object.
    io_object_t::plug (io_thread_);
204 205
    _handle = add_fd (_s);
    _io_error = false;
Martin Hurton's avatar
Martin Hurton committed
206

207
    if (_options.raw_socket) {
208
        // no handshaking for raw sock, instantiate raw encoder and decoders
209 210
        _encoder = new (std::nothrow) raw_encoder_t (out_batch_size);
        alloc_assert (_encoder);
211

212 213
        _decoder = new (std::nothrow) raw_decoder_t (in_batch_size);
        alloc_assert (_decoder);
214 215

        // disable handshaking for raw socket
216
        _handshaking = false;
217

218 219
        _next_msg = &stream_engine_t::pull_msg_from_session;
        _process_msg = &stream_engine_t::push_raw_msg_to_session;
220

Thomas Rodgers's avatar
Thomas Rodgers committed
221
        properties_t properties;
222
        if (init_properties (properties)) {
223
            //  Compile metadata.
224 225 226
            zmq_assert (_metadata == NULL);
            _metadata = new (std::nothrow) metadata_t (properties);
            alloc_assert (_metadata);
227
        }
228

229
        if (_options.raw_notify) {
230 231 232
            //  For raw sockets, send an initial 0-length message to the
            // application so that it knows a peer has connected.
            msg_t connector;
233
            connector.init ();
234
            push_raw_msg_to_session (&connector);
235
            connector.close ();
236
            _session->flush ();
237
        }
238
    } else {
239 240 241
        // start optional timer, to prevent handshake hanging on no input
        set_handshake_timer ();

242
        //  Send the 'length' and 'flags' fields of the routing id message.
243
        //  The 'length' field is encoded in the long format.
244
        _outpos = _greeting_send;
245
        _outpos[_outsize++] = UCHAR_MAX;
246 247 248
        put_uint64 (&_outpos[_outsize], _options.routing_id_size + 1);
        _outsize += 8;
        _outpos[_outsize++] = 0x7f;
249
    }
Martin Hurton's avatar
Martin Hurton committed
250

251 252
    set_pollin (_handle);
    set_pollout (_handle);
253 254 255 256
    //  Flush all the data that may have been already received downstream.
    in_event ();
}

257
void zmq::stream_engine_t::unplug ()
258
{
259 260
    zmq_assert (_plugged);
    _plugged = false;
261

262
    //  Cancel all timers.
263
    if (_has_handshake_timer) {
264
        cancel_timer (handshake_timer_id);
265
        _has_handshake_timer = false;
266 267
    }

268
    if (_has_ttl_timer) {
Jonathan Reams's avatar
Jonathan Reams committed
269
        cancel_timer (heartbeat_ttl_timer_id);
270
        _has_ttl_timer = false;
Jonathan Reams's avatar
Jonathan Reams committed
271 272
    }

273
    if (_has_timeout_timer) {
Jonathan Reams's avatar
Jonathan Reams committed
274
        cancel_timer (heartbeat_timeout_timer_id);
275
        _has_timeout_timer = false;
Jonathan Reams's avatar
Jonathan Reams committed
276 277
    }

278
    if (_has_heartbeat_timer) {
Jonathan Reams's avatar
Jonathan Reams committed
279
        cancel_timer (heartbeat_ivl_timer_id);
280
        _has_heartbeat_timer = false;
Jonathan Reams's avatar
Jonathan Reams committed
281
    }
282
    //  Cancel all fd subscriptions.
283 284
    if (!_io_error)
        rm_fd (_handle);
285 286 287 288

    //  Disconnect from I/O threads poller object.
    io_object_t::unplug ();

289
    _session = NULL;
290 291
}

292
void zmq::stream_engine_t::terminate ()
293 294 295 296 297
{
    unplug ();
    delete this;
}

298
void zmq::stream_engine_t::in_event ()
299
{
300
    zmq_assert (!_io_error);
301

Pieter Hintjens's avatar
Pieter Hintjens committed
302
    //  If still handshaking, receive and process the greeting message.
303
    if (unlikely (_handshaking))
Martin Hurton's avatar
Martin Hurton committed
304 305 306
        if (!handshake ())
            return;

307
    zmq_assert (_decoder);
308 309

    //  If there has been an I/O error, stop polling.
310 311 312
    if (_input_stopped) {
        rm_fd (_handle);
        _io_error = true;
313 314
        return;
    }
315

316
    //  If there's no data to process in the buffer...
317
    if (!_insize) {
318 319 320 321
        //  Retrieve the buffer and read as much data as possible.
        //  Note that buffer can be arbitrarily large. However, we assume
        //  the underlying TCP layer has fixed buffer size and thus the
        //  number of bytes read will be always limited.
322
        size_t bufsize = 0;
323
        _decoder->get_buffer (&_inpos, &bufsize);
324

325
        const int rc = tcp_read (_s, _inpos, bufsize);
326

327
        if (rc == 0) {
328 329
            // connection closed by peer
            errno = EPIPE;
330
            error (connection_error);
331
            return;
332
        }
333 334
        if (rc == -1) {
            if (errno != EAGAIN)
335
                error (connection_error);
336 337
            return;
        }
338

339
        //  Adjust input size
340
        _insize = static_cast<size_t> (rc);
341
        // Adjust buffer size to received bytes
342
        _decoder->resize_buffer (_insize);
Martin Hurton's avatar
Martin Hurton committed
343
    }
344

345 346
    int rc = 0;
    size_t processed = 0;
347

348 349 350 351 352
    while (_insize > 0) {
        rc = _decoder->decode (_inpos, _insize, processed);
        zmq_assert (processed <= _insize);
        _inpos += processed;
        _insize -= processed;
353 354
        if (rc == 0 || rc == -1)
            break;
355
        rc = (this->*_process_msg) (_decoder->msg ());
356 357
        if (rc == -1)
            break;
358 359
    }

360 361 362 363
    //  Tear down the connection if we have failed to decode input data
    //  or the session has rejected the message.
    if (rc == -1) {
        if (errno != EAGAIN) {
364
            error (protocol_error);
365 366
            return;
        }
367 368
        _input_stopped = true;
        reset_pollin (_handle);
Martin Hurton's avatar
Martin Hurton committed
369
    }
370

371
    _session->flush ();
372 373
}

374
void zmq::stream_engine_t::out_event ()
375
{
376
    zmq_assert (!_io_error);
377

378
    //  If write buffer is empty, try to read new data from the encoder.
379
    if (!_outsize) {
Martin Hurton's avatar
Martin Hurton committed
380 381 382
        //  Even when we stop polling as soon as there is no
        //  data to send, the poller may invoke out_event one
        //  more time due to 'speculative write' optimisation.
383 384
        if (unlikely (_encoder == NULL)) {
            zmq_assert (_handshaking);
Martin Hurton's avatar
Martin Hurton committed
385 386 387
            return;
        }

388 389
        _outpos = NULL;
        _outsize = _encoder->encode (&_outpos, 0);
390

391 392
        while (_outsize < static_cast<size_t> (out_batch_size)) {
            if ((this->*_next_msg) (&_tx_msg) == -1)
393
                break;
394 395 396
            _encoder->load_msg (&_tx_msg);
            unsigned char *bufptr = _outpos + _outsize;
            size_t n = _encoder->encode (&bufptr, out_batch_size - _outsize);
397
            zmq_assert (n > 0);
398 399 400
            if (_outpos == NULL)
                _outpos = bufptr;
            _outsize += n;
401
        }
402 403

        //  If there is no data to send, stop polling for output.
404 405 406
        if (_outsize == 0) {
            _output_stopped = true;
            reset_pollout (_handle);
407 408 409 410 411 412
            return;
        }
    }

    //  If there are any data to write in write buffer, write as much as
    //  possible to the socket. Note that amount of data to write can be
413
    //  arbitrarily large. However, we assume that underlying TCP layer has
414 415
    //  limited transmission buffer and thus the actual number of bytes
    //  written should be reasonably modest.
416
    const int nbytes = tcp_write (_s, _outpos, _outsize);
417

Martin Hurton's avatar
Martin Hurton committed
418 419
    //  IO error has occurred. We stop waiting for output events.
    //  The engine is not terminated until we detect input error;
420
    //  this is necessary to prevent losing incoming messages.
421
    if (nbytes == -1) {
422
        reset_pollout (_handle);
423 424 425
        return;
    }

426 427
    _outpos += nbytes;
    _outsize -= nbytes;
Martin Hurton's avatar
Martin Hurton committed
428 429 430

    //  If we are still handshaking and there are no data
    //  to send, stop polling for output.
431 432 433
    if (unlikely (_handshaking))
        if (_outsize == 0)
            reset_pollout (_handle);
434 435
}

436
void zmq::stream_engine_t::restart_output ()
437
{
438
    if (unlikely (_io_error))
439 440
        return;

441 442 443
    if (likely (_output_stopped)) {
        set_pollout (_handle);
        _output_stopped = false;
444
    }
445 446 447 448 449 450 451 452

    //  Speculative write: The assumption is that at the moment new message
    //  was sent by the user the socket is probably available for writing.
    //  Thus we try to write the data to socket avoiding polling for POLLOUT.
    //  Consequently, the latency should be better in request/reply scenarios.
    out_event ();
}

453
bool zmq::stream_engine_t::restart_input ()
454
{
455 456 457
    zmq_assert (_input_stopped);
    zmq_assert (_session != NULL);
    zmq_assert (_decoder != NULL);
458

459
    int rc = (this->*_process_msg) (_decoder->msg ());
460 461
    if (rc == -1) {
        if (errno == EAGAIN)
462
            _session->flush ();
463
        else {
464
            error (protocol_error);
465 466 467
            return false;
        }
        return true;
Martin Hurton's avatar
Martin Hurton committed
468 469
    }

470
    while (_insize > 0) {
471
        size_t processed = 0;
472 473 474 475
        rc = _decoder->decode (_inpos, _insize, processed);
        zmq_assert (processed <= _insize);
        _inpos += processed;
        _insize -= processed;
476 477
        if (rc == 0 || rc == -1)
            break;
478
        rc = (this->*_process_msg) (_decoder->msg ());
479 480 481
        if (rc == -1)
            break;
    }
482

483
    if (rc == -1 && errno == EAGAIN)
484
        _session->flush ();
485
    else if (_io_error) {
486
        error (connection_error);
487 488
        return false;
    } else if (rc == -1) {
489
        error (protocol_error);
490 491 492
        return false;
    }

493
    else {
494 495 496
        _input_stopped = false;
        set_pollin (_handle);
        _session->flush ();
497 498 499 500

        //  Speculative read.
        in_event ();
    }
501 502

    return true;
503 504
}

505 506 507
//  Position of the revision field in the greeting.
const size_t revision_pos = 10;

508
bool zmq::stream_engine_t::handshake ()
Martin Hurton's avatar
Martin Hurton committed
509
{
510 511
    zmq_assert (_handshaking);
    zmq_assert (_greeting_bytes_read < _greeting_size);
512
    //  Receive the greeting.
513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541
    const int rc = receive_greeting ();
    if (rc == -1)
        return false;
    const bool unversioned = rc != 0;

    if (!(this
            ->*select_handshake_fun (unversioned,
                                     _greeting_recv[revision_pos])) ())
        return false;

    // Start polling for output if necessary.
    if (_outsize == 0)
        set_pollout (_handle);

    //  Handshaking was successful.
    //  Switch into the normal message flow.
    _handshaking = false;

    if (_has_handshake_timer) {
        cancel_timer (handshake_timer_id);
        _has_handshake_timer = false;
    }

    return true;
}

int zmq::stream_engine_t::receive_greeting ()
{
    bool unversioned = false;
542 543 544
    while (_greeting_bytes_read < _greeting_size) {
        const int n = tcp_read (_s, _greeting_recv + _greeting_bytes_read,
                                _greeting_size - _greeting_bytes_read);
545
        if (n == 0) {
546
            errno = EPIPE;
547
            error (connection_error);
548
            return -1;
549
        }
550 551
        if (n == -1) {
            if (errno != EAGAIN)
552
                error (connection_error);
553
            return -1;
554
        }
Martin Hurton's avatar
Martin Hurton committed
555

556
        _greeting_bytes_read += n;
Martin Hurton's avatar
Martin Hurton committed
557

558 559 560
        //  We have received at least one byte from the peer.
        //  If the first byte is not 0xff, we know that the
        //  peer is using unversioned protocol.
561 562
        if (_greeting_recv[0] != 0xff) {
            unversioned = true;
563
            break;
564
        }
Martin Hurton's avatar
Martin Hurton committed
565

566
        if (_greeting_bytes_read < signature_size)
567
            continue;
Martin Hurton's avatar
Martin Hurton committed
568

569 570
        //  Inspect the right-most bit of the 10th byte (which coincides
        //  with the 'flags' field if a regular message was sent).
571
        //  Zero indicates this is a header of a routing id message
572
        //  (i.e. the peer is using the unversioned protocol).
573 574
        if (!(_greeting_recv[9] & 0x01)) {
            unversioned = true;
575
            break;
576
        }
Martin Hurton's avatar
Martin Hurton committed
577

578
        //  The peer is using versioned protocol.
579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594
        receive_greeting_versioned ();
    }
    return unversioned ? 1 : 0;
}

void zmq::stream_engine_t::receive_greeting_versioned ()
{
    //  Send the major version number.
    if (_outpos + _outsize == _greeting_send + signature_size) {
        if (_outsize == 0)
            set_pollout (_handle);
        _outpos[_outsize++] = 3; //  Major version number
    }

    if (_greeting_bytes_read > signature_size) {
        if (_outpos + _outsize == _greeting_send + signature_size + 1) {
595 596
            if (_outsize == 0)
                set_pollout (_handle);
597

598
            //  Use ZMTP/2.0 to talk to older peers.
599 600
            if (_greeting_recv[revision_pos] == ZMTP_1_0
                || _greeting_recv[revision_pos] == ZMTP_2_0)
601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622
                _outpos[_outsize++] = _options.type;
            else {
                _outpos[_outsize++] = 0; //  Minor version number
                memset (_outpos + _outsize, 0, 20);

                zmq_assert (_options.mechanism == ZMQ_NULL
                            || _options.mechanism == ZMQ_PLAIN
                            || _options.mechanism == ZMQ_CURVE
                            || _options.mechanism == ZMQ_GSSAPI);

                if (_options.mechanism == ZMQ_NULL)
                    memcpy (_outpos + _outsize, "NULL", 4);
                else if (_options.mechanism == ZMQ_PLAIN)
                    memcpy (_outpos + _outsize, "PLAIN", 5);
                else if (_options.mechanism == ZMQ_GSSAPI)
                    memcpy (_outpos + _outsize, "GSSAPI", 6);
                else if (_options.mechanism == ZMQ_CURVE)
                    memcpy (_outpos + _outsize, "CURVE", 5);
                _outsize += 20;
                memset (_outpos + _outsize, 0, 32);
                _outsize += 32;
                _greeting_size = v3_greeting_size;
623
            }
Martin Hurton's avatar
Martin Hurton committed
624 625
        }
    }
626
}
Martin Hurton's avatar
Martin Hurton committed
627

628 629 630 631
zmq::stream_engine_t::handshake_fun_t
zmq::stream_engine_t::select_handshake_fun (bool unversioned,
                                            unsigned char revision)
{
632
    //  Is the peer using ZMTP/1.0 with no revision number?
633 634 635 636 637 638 639 640 641 642 643 644
    if (unversioned) {
        return &stream_engine_t::handshake_v1_0_unversioned;
    }
    switch (revision) {
        case ZMTP_1_0:
            return &stream_engine_t::handshake_v1_0;
        case ZMTP_2_0:
            return &stream_engine_t::handshake_v2_0;
        default:
            return &stream_engine_t::handshake_v3_0;
    }
}
645

646 647 648 649 650 651 652 653
bool zmq::stream_engine_t::handshake_v1_0_unversioned ()
{
    //  We send and receive rest of routing id message
    if (_session->zap_enabled ()) {
        // reject ZMTP 1.0 connections if ZAP is enabled
        error (protocol_error);
        return false;
    }
654

655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694
    _encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
    alloc_assert (_encoder);

    _decoder =
      new (std::nothrow) v1_decoder_t (in_batch_size, _options.maxmsgsize);
    alloc_assert (_decoder);

    //  We have already sent the message header.
    //  Since there is no way to tell the encoder to
    //  skip the message header, we simply throw that
    //  header data away.
    const size_t header_size =
      _options.routing_id_size + 1 >= UCHAR_MAX ? 10 : 2;
    unsigned char tmp[10], *bufferp = tmp;

    //  Prepare the routing id message and load it into encoder.
    //  Then consume bytes we have already sent to the peer.
    const int rc = _tx_msg.init_size (_options.routing_id_size);
    zmq_assert (rc == 0);
    memcpy (_tx_msg.data (), _options.routing_id, _options.routing_id_size);
    _encoder->load_msg (&_tx_msg);
    const size_t buffer_size = _encoder->encode (&bufferp, header_size);
    zmq_assert (buffer_size == header_size);

    //  Make sure the decoder sees the data we have already received.
    _inpos = _greeting_recv;
    _insize = _greeting_bytes_read;

    //  To allow for interoperability with peers that do not forward
    //  their subscriptions, we inject a phantom subscription message
    //  message into the incoming message stream.
    if (_options.type == ZMQ_PUB || _options.type == ZMQ_XPUB)
        _subscription_required = true;

    //  We are sending our routing id now and the next message
    //  will come from the socket.
    _next_msg = &stream_engine_t::pull_msg_from_session;

    //  We are expecting routing id message.
    _process_msg = &stream_engine_t::process_routing_id_msg;
695

696 697
    return true;
}
Martin Hurton's avatar
Martin Hurton committed
698

699 700 701 702 703 704 705
bool zmq::stream_engine_t::handshake_v1_0 ()
{
    if (_session->zap_enabled ()) {
        // reject ZMTP 1.0 connections if ZAP is enabled
        error (protocol_error);
        return false;
    }
706

707 708
    _encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
    alloc_assert (_encoder);
709

710 711 712
    _decoder =
      new (std::nothrow) v1_decoder_t (in_batch_size, _options.maxmsgsize);
    alloc_assert (_decoder);
713

714 715
    return true;
}
716

717 718 719 720 721 722 723
bool zmq::stream_engine_t::handshake_v2_0 ()
{
    if (_session->zap_enabled ()) {
        // reject ZMTP 2.0 connections if ZAP is enabled
        error (protocol_error);
        return false;
    }
724

725 726 727 728 729 730
    _encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
    alloc_assert (_encoder);

    _decoder = new (std::nothrow)
      v2_decoder_t (in_batch_size, _options.maxmsgsize, _options.zero_copy);
    alloc_assert (_decoder);
731

732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755
    return true;
}

bool zmq::stream_engine_t::handshake_v3_0 ()
{
    _encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
    alloc_assert (_encoder);

    _decoder = new (std::nothrow)
      v2_decoder_t (in_batch_size, _options.maxmsgsize, _options.zero_copy);
    alloc_assert (_decoder);

    if (_options.mechanism == ZMQ_NULL
        && memcmp (_greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",
                   20)
             == 0) {
        _mechanism = new (std::nothrow)
          null_mechanism_t (_session, _peer_address, _options);
        alloc_assert (_mechanism);
    } else if (_options.mechanism == ZMQ_PLAIN
               && memcmp (_greeting_recv + 12,
                          "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
                    == 0) {
        if (_options.as_server)
756
            _mechanism = new (std::nothrow)
757 758 759 760 761
              plain_server_t (_session, _peer_address, _options);
        else
            _mechanism = new (std::nothrow) plain_client_t (_session, _options);
        alloc_assert (_mechanism);
    }
762
#ifdef ZMQ_HAVE_CURVE
763 764 765 766 767 768 769 770 771 772 773
    else if (_options.mechanism == ZMQ_CURVE
             && memcmp (_greeting_recv + 12,
                        "CURVE\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
                  == 0) {
        if (_options.as_server)
            _mechanism = new (std::nothrow)
              curve_server_t (_session, _peer_address, _options);
        else
            _mechanism = new (std::nothrow) curve_client_t (_session, _options);
        alloc_assert (_mechanism);
    }
774
#endif
Chris Laws's avatar
Chris Laws committed
775
#ifdef HAVE_LIBGSSAPI_KRB5
776 777 778 779 780 781 782 783 784 785 786
    else if (_options.mechanism == ZMQ_GSSAPI
             && memcmp (_greeting_recv + 12,
                        "GSSAPI\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
                  == 0) {
        if (_options.as_server)
            _mechanism = new (std::nothrow)
              gssapi_server_t (_session, _peer_address, _options);
        else
            _mechanism =
              new (std::nothrow) gssapi_client_t (_session, _options);
        alloc_assert (_mechanism);
787
    }
788 789 790 791 792 793 794
#endif
    else {
        _session->get_socket ()->event_handshake_failed_protocol (
          _session->get_endpoint (),
          ZMQ_PROTOCOL_ERROR_ZMTP_MECHANISM_MISMATCH);
        error (protocol_error);
        return false;
795
    }
796 797
    _next_msg = &stream_engine_t::next_handshake_command;
    _process_msg = &stream_engine_t::process_handshake_command;
798

Martin Hurton's avatar
Martin Hurton committed
799 800 801
    return true;
}

802
int zmq::stream_engine_t::routing_id_msg (msg_t *msg_)
803
{
804
    int rc = msg_->init_size (_options.routing_id_size);
805
    errno_assert (rc == 0);
806 807 808
    if (_options.routing_id_size > 0)
        memcpy (msg_->data (), _options.routing_id, _options.routing_id_size);
    _next_msg = &stream_engine_t::pull_msg_from_session;
809 810
    return 0;
}
811

812
int zmq::stream_engine_t::process_routing_id_msg (msg_t *msg_)
813
{
814
    if (_options.recv_routing_id) {
815
        msg_->set_flags (msg_t::routing_id);
816
        int rc = _session->push_msg (msg_);
817
        errno_assert (rc == 0);
818
    } else {
819 820 821 822 823 824
        int rc = msg_->close ();
        errno_assert (rc == 0);
        rc = msg_->init ();
        errno_assert (rc == 0);
    }

825
    if (_subscription_required) {
826 827 828 829 830 831
        msg_t subscription;

        //  Inject the subscription message, so that also
        //  ZMQ 2.x peers receive published messages.
        int rc = subscription.init_size (1);
        errno_assert (rc == 0);
832
        *static_cast<unsigned char *> (subscription.data ()) = 1;
833
        rc = _session->push_msg (&subscription);
834 835 836
        errno_assert (rc == 0);
    }

837
    _process_msg = &stream_engine_t::push_msg_to_session;
838

839 840
    return 0;
}
841

842
int zmq::stream_engine_t::next_handshake_command (msg_t *msg_)
843
{
844
    zmq_assert (_mechanism != NULL);
845

846
    if (_mechanism->status () == mechanism_t::ready) {
847 848
        mechanism_ready ();
        return pull_and_encode (msg_);
849
    }
850
    if (_mechanism->status () == mechanism_t::error) {
851 852
        errno = EPROTO;
        return -1;
853
    } else {
854
        const int rc = _mechanism->next_handshake_command (msg_);
Vincent Tellier's avatar
Vincent Tellier committed
855

856 857
        if (rc == 0)
            msg_->set_flags (msg_t::command);
Vincent Tellier's avatar
Vincent Tellier committed
858

859
        return rc;
860
    }
861 862
}

863
int zmq::stream_engine_t::process_handshake_command (msg_t *msg_)
864
{
865 866
    zmq_assert (_mechanism != NULL);
    const int rc = _mechanism->process_handshake_command (msg_);
867
    if (rc == 0) {
868
        if (_mechanism->status () == mechanism_t::ready)
869
            mechanism_ready ();
870
        else if (_mechanism->status () == mechanism_t::error) {
871 872 873
            errno = EPROTO;
            return -1;
        }
874
        if (_output_stopped)
875
            restart_output ();
876 877
    }

878
    return rc;
879 880
}

881 882
void zmq::stream_engine_t::zap_msg_available ()
{
883
    zmq_assert (_mechanism != NULL);
884

885
    const int rc = _mechanism->zap_msg_available ();
886
    if (rc == -1) {
887
        error (protocol_error);
888 889
        return;
    }
890
    if (_input_stopped)
891 892
        if (!restart_input ())
            return;
893
    if (_output_stopped)
894
        restart_output ();
895 896
}

897 898
const char *zmq::stream_engine_t::get_endpoint () const
{
899
    return _endpoint.c_str ();
900 901
}

902
void zmq::stream_engine_t::mechanism_ready ()
903
{
904 905 906
    if (_options.heartbeat_interval > 0) {
        add_timer (_options.heartbeat_interval, heartbeat_ivl_timer_id);
        _has_heartbeat_timer = true;
907 908
    }

909 910
    bool flush_session = false;

911
    if (_options.recv_routing_id) {
912
        msg_t routing_id;
913 914
        _mechanism->peer_routing_id (&routing_id);
        const int rc = _session->push_msg (&routing_id);
915 916 917
        if (rc == -1 && errno == EAGAIN) {
            // If the write is failing at this stage with
            // an EAGAIN the pipe must be being shut down,
918
            // so we can just bail out of the routing id set.
919 920
            return;
        }
921
        errno_assert (rc == 0);
922 923 924 925 926 927 928 929 930 931 932 933 934 935 936
        flush_session = true;
    }

    if (_options.router_notify & ZMQ_NOTIFY_CONNECT) {
        msg_t connect_notification;
        connect_notification.init ();
        const int rc = _session->push_msg (&connect_notification);
        if (rc == -1 && errno == EAGAIN) {
            // If the write is failing at this stage with
            // an EAGAIN the pipe must be being shut down,
            // so we can just bail out of the notification.
            return;
        }
        errno_assert (rc == 0);
        flush_session = true;
937
    }
938

939 940 941
    if (flush_session)
        _session->flush ();

942 943
    _next_msg = &stream_engine_t::pull_and_encode;
    _process_msg = &stream_engine_t::write_credential;
944 945 946

    //  Compile metadata.
    properties_t properties;
947
    init_properties (properties);
948

Pieter Hintjens's avatar
Pieter Hintjens committed
949
    //  Add ZAP properties.
950
    const properties_t &zap_properties = _mechanism->get_zap_properties ();
951
    properties.insert (zap_properties.begin (), zap_properties.end ());
Pieter Hintjens's avatar
Pieter Hintjens committed
952

953
    //  Add ZMTP properties.
954
    const properties_t &zmtp_properties = _mechanism->get_zmtp_properties ();
955
    properties.insert (zmtp_properties.begin (), zmtp_properties.end ());
956

957
    zmq_assert (_metadata == NULL);
958
    if (!properties.empty ()) {
959 960
        _metadata = new (std::nothrow) metadata_t (properties);
        alloc_assert (_metadata);
961
    }
Vincent Tellier's avatar
Vincent Tellier committed
962

963
    _socket->event_handshake_succeeded (_endpoint, 0);
964 965
}

966
int zmq::stream_engine_t::pull_msg_from_session (msg_t *msg_)
967
{
968
    return _session->pull_msg (msg_);
969
}
970

971 972
int zmq::stream_engine_t::push_msg_to_session (msg_t *msg_)
{
973
    return _session->push_msg (msg_);
974 975
}

976 977
int zmq::stream_engine_t::push_raw_msg_to_session (msg_t *msg_)
{
978 979
    if (_metadata && _metadata != msg_->metadata ())
        msg_->set_metadata (_metadata);
980
    return push_msg_to_session (msg_);
981 982
}

983 984
int zmq::stream_engine_t::write_credential (msg_t *msg_)
{
985 986
    zmq_assert (_mechanism != NULL);
    zmq_assert (_session != NULL);
987

988
    const blob_t &credential = _mechanism->get_user_id ();
989 990 991 992 993 994
    if (credential.size () > 0) {
        msg_t msg;
        int rc = msg.init_size (credential.size ());
        zmq_assert (rc == 0);
        memcpy (msg.data (), credential.data (), credential.size ());
        msg.set_flags (msg_t::credential);
995
        rc = _session->push_msg (&msg);
996 997 998 999 1000 1001
        if (rc == -1) {
            rc = msg.close ();
            errno_assert (rc == 0);
            return -1;
        }
    }
1002
    _process_msg = &stream_engine_t::decode_and_push;
1003 1004 1005
    return decode_and_push (msg_);
}

1006 1007
int zmq::stream_engine_t::pull_and_encode (msg_t *msg_)
{
1008
    zmq_assert (_mechanism != NULL);
1009

1010
    if (_session->pull_msg (msg_) == -1)
1011
        return -1;
1012
    if (_mechanism->encode (msg_) == -1)
1013 1014 1015 1016 1017 1018
        return -1;
    return 0;
}

int zmq::stream_engine_t::decode_and_push (msg_t *msg_)
{
1019
    zmq_assert (_mechanism != NULL);
1020

1021
    if (_mechanism->decode (msg_) == -1)
1022
        return -1;
Jonathan Reams's avatar
Jonathan Reams committed
1023

1024 1025
    if (_has_timeout_timer) {
        _has_timeout_timer = false;
1026
        cancel_timer (heartbeat_timeout_timer_id);
Jonathan Reams's avatar
Jonathan Reams committed
1027 1028
    }

1029 1030
    if (_has_ttl_timer) {
        _has_ttl_timer = false;
1031
        cancel_timer (heartbeat_ttl_timer_id);
Jonathan Reams's avatar
Jonathan Reams committed
1032 1033
    }

1034
    if (msg_->flags () & msg_t::command) {
1035
        process_command_message (msg_);
Jonathan Reams's avatar
Jonathan Reams committed
1036 1037
    }

1038 1039 1040
    if (_metadata)
        msg_->set_metadata (_metadata);
    if (_session->push_msg (msg_) == -1) {
1041
        if (errno == EAGAIN)
1042
            _process_msg = &stream_engine_t::push_one_then_decode_and_push;
1043 1044 1045 1046 1047 1048 1049
        return -1;
    }
    return 0;
}

int zmq::stream_engine_t::push_one_then_decode_and_push (msg_t *msg_)
{
1050
    const int rc = _session->push_msg (msg_);
1051
    if (rc == 0)
1052
        _process_msg = &stream_engine_t::decode_and_push;
1053 1054 1055
    return rc;
}

1056
void zmq::stream_engine_t::error (error_reason_t reason_)
1057
{
1058
    if (_options.raw_socket && _options.raw_notify) {
1059 1060 1061
        //  For raw sockets, send a final 0-length message to the application
        //  so that it knows the peer has been disconnected.
        msg_t terminator;
1062
        terminator.init ();
1063
        (this->*_process_msg) (&terminator);
1064
        terminator.close ();
1065
    }
1066
    zmq_assert (_session);
1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078

    if ((_options.router_notify & ZMQ_NOTIFY_DISCONNECT) && !_handshaking) {
        // For router sockets with disconnect notification, rollback
        // any incomplete message in the pipe, and push the disconnect
        // notification message.
        _session->rollback ();

        msg_t disconnect_notification;
        disconnect_notification.init ();
        _session->push_msg (&disconnect_notification);
    }

1079
    // protocol errors have been signaled already at the point where they occurred
1080
    if (reason_ != protocol_error
1081 1082
        && (_mechanism == NULL
            || _mechanism->status () == mechanism_t::handshaking)) {
1083
        int err = errno;
1084
        _socket->event_handshake_failed_no_detail (_endpoint, err);
1085
    }
1086

1087 1088 1089
    _socket->event_disconnected (_endpoint, _s);
    _session->flush ();
    _session->engine_error (reason_);
1090 1091 1092 1093
    unplug ();
    delete this;
}

1094
void zmq::stream_engine_t::set_handshake_timer ()
1095
{
1096
    zmq_assert (!_has_handshake_timer);
1097

1098 1099 1100
    if (!_options.raw_socket && _options.handshake_ivl > 0) {
        add_timer (_options.handshake_ivl, handshake_timer_id);
        _has_handshake_timer = true;
1101
    }
1102 1103
}

1104
bool zmq::stream_engine_t::init_properties (properties_t &properties_)
1105
{
1106
    if (_peer_address.empty ())
1107
        return false;
1108
    properties_.ZMQ_MAP_INSERT_OR_EMPLACE (
1109
      std::string (ZMQ_MSG_PROPERTY_PEER_ADDRESS), _peer_address);
1110 1111

    //  Private property to support deprecated SRCFD
1112
    std::ostringstream stream;
1113
    stream << static_cast<int> (_s);
1114
    std::string fd_string = stream.str ();
1115 1116
    properties_.ZMQ_MAP_INSERT_OR_EMPLACE (std::string ("__fd"),
                                           ZMQ_MOVE (fd_string));
Thomas Rodgers's avatar
Thomas Rodgers committed
1117 1118 1119
    return true;
}

1120
void zmq::stream_engine_t::timer_event (int id_)
1121
{
1122
    if (id_ == handshake_timer_id) {
1123
        _has_handshake_timer = false;
Jonathan Reams's avatar
Jonathan Reams committed
1124 1125
        //  handshake timer expired before handshake completed, so engine fail
        error (timeout_error);
1126
    } else if (id_ == heartbeat_ivl_timer_id) {
1127
        _next_msg = &stream_engine_t::produce_ping_message;
1128
        out_event ();
1129
        add_timer (_options.heartbeat_interval, heartbeat_ivl_timer_id);
1130
    } else if (id_ == heartbeat_ttl_timer_id) {
1131
        _has_ttl_timer = false;
1132 1133
        error (timeout_error);
    } else if (id_ == heartbeat_timeout_timer_id) {
1134
        _has_timeout_timer = false;
1135 1136
        error (timeout_error);
    } else
Jonathan Reams's avatar
Jonathan Reams committed
1137
        // There are no other valid timer ids!
1138
        assert (false);
Jonathan Reams's avatar
Jonathan Reams committed
1139 1140
}

1141
int zmq::stream_engine_t::produce_ping_message (msg_t *msg_)
Jonathan Reams's avatar
Jonathan Reams committed
1142
{
1143 1144
    // 16-bit TTL + \4PING == 7
    const size_t ping_ttl_len = msg_t::ping_cmd_name_size + 2;
1145
    zmq_assert (_mechanism != NULL);
Jonathan Reams's avatar
Jonathan Reams committed
1146

1147
    int rc = msg_->init_size (ping_ttl_len);
1148 1149
    errno_assert (rc == 0);
    msg_->set_flags (msg_t::command);
Jonathan Reams's avatar
Jonathan Reams committed
1150
    // Copy in the command message
1151
    memcpy (msg_->data (), "\4PING", msg_t::ping_cmd_name_size);
Jonathan Reams's avatar
Jonathan Reams committed
1152

1153
    uint16_t ttl_val = htons (_options.heartbeat_ttl);
1154
    memcpy (static_cast<uint8_t *> (msg_->data ()) + msg_t::ping_cmd_name_size,
1155
            &ttl_val, sizeof (ttl_val));
Jonathan Reams's avatar
Jonathan Reams committed
1156

1157 1158 1159 1160 1161
    rc = _mechanism->encode (msg_);
    _next_msg = &stream_engine_t::pull_and_encode;
    if (!_has_timeout_timer && _heartbeat_timeout > 0) {
        add_timer (_heartbeat_timeout, heartbeat_timeout_timer_id);
        _has_timeout_timer = true;
Jonathan Reams's avatar
Jonathan Reams committed
1162 1163 1164 1165
    }
    return rc;
}

1166
int zmq::stream_engine_t::produce_pong_message (msg_t *msg_)
Jonathan Reams's avatar
Jonathan Reams committed
1167
{
1168
    zmq_assert (_mechanism != NULL);
Jonathan Reams's avatar
Jonathan Reams committed
1169

1170
    int rc = msg_->move (_pong_msg);
1171
    errno_assert (rc == 0);
Jonathan Reams's avatar
Jonathan Reams committed
1172

1173 1174
    rc = _mechanism->encode (msg_);
    _next_msg = &stream_engine_t::pull_and_encode;
Jonathan Reams's avatar
Jonathan Reams committed
1175 1176 1177
    return rc;
}

1178
int zmq::stream_engine_t::process_heartbeat_message (msg_t *msg_)
Jonathan Reams's avatar
Jonathan Reams committed
1179
{
1180 1181 1182 1183
    if (msg_->is_ping ()) {
        // 16-bit TTL + \4PING == 7
        const size_t ping_ttl_len = msg_t::ping_cmd_name_size + 2;
        const size_t ping_max_ctx_len = 16;
Jonathan Reams's avatar
Jonathan Reams committed
1184
        uint16_t remote_heartbeat_ttl;
1185

Jonathan Reams's avatar
Jonathan Reams committed
1186
        // Get the remote heartbeat TTL to setup the timer
1187
        memcpy (&remote_heartbeat_ttl,
1188 1189 1190
                static_cast<uint8_t *> (msg_->data ())
                  + msg_t::ping_cmd_name_size,
                ping_ttl_len - msg_t::ping_cmd_name_size);
1191
        remote_heartbeat_ttl = ntohs (remote_heartbeat_ttl);
Jonathan Reams's avatar
Jonathan Reams committed
1192
        // The remote heartbeat is in 10ths of a second
1193 1194
        // so we multiply it by 100 to get the timer interval in ms.
        remote_heartbeat_ttl *= 100;
Jonathan Reams's avatar
Jonathan Reams committed
1195

1196
        if (!_has_ttl_timer && remote_heartbeat_ttl > 0) {
1197
            add_timer (remote_heartbeat_ttl, heartbeat_ttl_timer_id);
1198
            _has_ttl_timer = true;
Jonathan Reams's avatar
Jonathan Reams committed
1199
        }
1200

1201 1202 1203 1204 1205
        //  As per ZMTP 3.1 the PING command might contain an up to 16 bytes
        //  context which needs to be PONGed back, so build the pong message
        //  here and store it. Truncate it if it's too long.
        //  Given the engine goes straight to out_event, sequential PINGs will
        //  not be a problem.
1206
        const size_t context_len =
1207
          std::min (msg_->size () - ping_ttl_len, ping_max_ctx_len);
1208 1209
        const int rc =
          _pong_msg.init_size (msg_t::ping_cmd_name_size + context_len);
1210
        errno_assert (rc == 0);
1211
        _pong_msg.set_flags (msg_t::command);
1212
        memcpy (_pong_msg.data (), "\4PONG", msg_t::ping_cmd_name_size);
1213
        if (context_len > 0)
1214
            memcpy (static_cast<uint8_t *> (_pong_msg.data ())
1215
                      + msg_t::ping_cmd_name_size,
1216
                    static_cast<uint8_t *> (msg_->data ()) + ping_ttl_len,
1217
                    context_len);
1218

1219
        _next_msg = &stream_engine_t::produce_pong_message;
1220
        out_event ();
Jonathan Reams's avatar
Jonathan Reams committed
1221 1222 1223
    }

    return 0;
1224
}
1225 1226 1227

int zmq::stream_engine_t::process_command_message (msg_t *msg_)
{
1228 1229
    const uint8_t cmd_name_size =
      *(static_cast<const uint8_t *> (msg_->data ()));
1230 1231 1232
    const size_t ping_name_size = msg_t::ping_cmd_name_size - 1;
    const size_t sub_name_size = msg_t::sub_cmd_name_size - 1;
    const size_t cancel_name_size = msg_t::cancel_cmd_name_size - 1;
1233
    //  Malformed command
1234
    if (unlikely (msg_->size () < cmd_name_size + sizeof (cmd_name_size)))
1235 1236
        return -1;

1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251
    uint8_t *cmd_name = (static_cast<uint8_t *> (msg_->data ())) + 1;
    if (cmd_name_size == ping_name_size
        && memcmp (cmd_name, "PING", cmd_name_size) == 0)
        msg_->set_flags (zmq::msg_t::ping);
    if (cmd_name_size == ping_name_size
        && memcmp (cmd_name, "PONG", cmd_name_size) == 0)
        msg_->set_flags (zmq::msg_t::pong);
    if (cmd_name_size == sub_name_size
        && memcmp (cmd_name, "SUBSCRIBE", cmd_name_size) == 0)
        msg_->set_flags (zmq::msg_t::subscribe);
    if (cmd_name_size == cancel_name_size
        && memcmp (cmd_name, "CANCEL", cmd_name_size) == 0)
        msg_->set_flags (zmq::msg_t::cancel);

    if (msg_->is_ping () || msg_->is_pong ())
1252 1253 1254 1255
        return process_heartbeat_message (msg_);

    return 0;
}