stream_engine.cpp 34.9 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
32

33
#include <limits.h>
34
#include <string.h>
35 36

#ifndef ZMQ_HAVE_WINDOWS
37
#include <unistd.h>
38 39
#endif

40
#include <new>
41
#include <sstream>
42

43
#include "stream_engine.hpp"
44
#include "io_thread.hpp"
45
#include "session_base.hpp"
46 47
#include "v1_encoder.hpp"
#include "v1_decoder.hpp"
48 49
#include "v2_encoder.hpp"
#include "v2_decoder.hpp"
50
#include "null_mechanism.hpp"
51 52
#include "plain_client.hpp"
#include "plain_server.hpp"
53 54
#include "gssapi_client.hpp"
#include "gssapi_server.hpp"
55 56
#include "curve_client.hpp"
#include "curve_server.hpp"
57 58
#include "raw_decoder.hpp"
#include "raw_encoder.hpp"
59 60
#include "config.hpp"
#include "err.hpp"
61
#include "ip.hpp"
62
#include "tcp.hpp"
Martin Hurton's avatar
Martin Hurton committed
63 64
#include "likely.hpp"
#include "wire.hpp"
65

66 67
zmq::stream_engine_t::stream_engine_t (fd_t fd_,
                                       const options_t &options_,
68
                                       const std::string &endpoint_) :
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
    _s (fd_),
    _as_server (false),
    _handle (static_cast<handle_t> (NULL)),
    _inpos (NULL),
    _insize (0),
    _decoder (NULL),
    _outpos (NULL),
    _outsize (0),
    _encoder (NULL),
    _metadata (NULL),
    _handshaking (true),
    _greeting_size (v2_greeting_size),
    _greeting_bytes_read (0),
    _session (NULL),
    _options (options_),
    _endpoint (endpoint_),
    _plugged (false),
    _next_msg (&stream_engine_t::routing_id_msg),
    _process_msg (&stream_engine_t::process_routing_id_msg),
    _io_error (false),
    _subscription_required (false),
    _mechanism (NULL),
    _input_stopped (false),
    _output_stopped (false),
    _has_handshake_timer (false),
    _has_ttl_timer (false),
    _has_timeout_timer (false),
    _has_heartbeat_timer (false),
    _heartbeat_timeout (0),
    _socket (NULL)
99
{
100
    int rc = _tx_msg.init ();
101
    errno_assert (rc == 0);
102
    rc = _pong_msg.init ();
103
    errno_assert (rc == 0);
Chris Laws's avatar
Chris Laws committed
104

Martin Hurton's avatar
Martin Hurton committed
105
    //  Put the socket into non-blocking mode.
106
    unblock_socket (_s);
107

108
    int family = get_peer_ip_address (_s, _peer_address);
109
    if (family == 0)
110
        _peer_address.clear ();
111
#if defined ZMQ_HAVE_SO_PEERCRED
112
    else if (family == PF_UNIX) {
113 114
        struct ucred cred;
        socklen_t size = sizeof (cred);
115
        if (!getsockopt (_s, SOL_SOCKET, SO_PEERCRED, &cred, &size)) {
116 117
            std::ostringstream buf;
            buf << ":" << cred.uid << ":" << cred.gid << ":" << cred.pid;
118
            _peer_address += buf.str ();
119 120 121
        }
    }
#elif defined ZMQ_HAVE_LOCAL_PEERCRED
122
    else if (family == PF_UNIX) {
123 124
        struct xucred cred;
        socklen_t size = sizeof (cred);
125
        if (!getsockopt (_s, 0, LOCAL_PEERCRED, &cred, &size)
126
            && cred.cr_version == XUCRED_VERSION) {
127 128 129 130 131
            std::ostringstream buf;
            buf << ":" << cred.cr_uid << ":";
            if (cred.cr_ngroups > 0)
                buf << cred.cr_groups[0];
            buf << ":";
132
            _peer_address += buf.str ();
133 134 135
        }
    }
#endif
136

137 138 139 140
    if (_options.heartbeat_interval > 0) {
        _heartbeat_timeout = _options.heartbeat_timeout;
        if (_heartbeat_timeout == -1)
            _heartbeat_timeout = _options.heartbeat_interval;
141
    }
142 143
}

144
zmq::stream_engine_t::~stream_engine_t ()
145
{
146
    zmq_assert (!_plugged);
147

148
    if (_s != retired_fd) {
149
#ifdef ZMQ_HAVE_WINDOWS
150
        int rc = closesocket (_s);
151
        wsa_assert (rc != SOCKET_ERROR);
152
#else
153
        int rc = close (_s);
154
#if defined(__FreeBSD_kernel__) || defined(__FreeBSD__)
155 156 157 158 159
        // FreeBSD may return ECONNRESET on close() under load but this is not
        // an error.
        if (rc == -1 && errno == ECONNRESET)
            rc = 0;
#endif
160 161
        errno_assert (rc == 0);
#endif
162
        _s = retired_fd;
163
    }
164

165
    int rc = _tx_msg.close ();
166 167
    errno_assert (rc == 0);

168 169
    //  Drop reference to metadata and destroy it if we are
    //  the only user.
170 171 172
    if (_metadata != NULL) {
        if (_metadata->drop_ref ()) {
            LIBZMQ_DELETE (_metadata);
173 174
        }
    }
175

176 177 178
    LIBZMQ_DELETE (_encoder);
    LIBZMQ_DELETE (_decoder);
    LIBZMQ_DELETE (_mechanism);
179 180
}

181
void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
182
                                 session_base_t *session_)
183
{
184 185
    zmq_assert (!_plugged);
    _plugged = true;
186 187

    //  Connect to session object.
188
    zmq_assert (!_session);
189
    zmq_assert (session_);
190 191
    _session = session_;
    _socket = _session->get_socket ();
192 193 194

    //  Connect to I/O threads poller object.
    io_object_t::plug (io_thread_);
195 196
    _handle = add_fd (_s);
    _io_error = false;
Martin Hurton's avatar
Martin Hurton committed
197

198
    if (_options.raw_socket) {
199
        // no handshaking for raw sock, instantiate raw encoder and decoders
200 201
        _encoder = new (std::nothrow) raw_encoder_t (out_batch_size);
        alloc_assert (_encoder);
202

203 204
        _decoder = new (std::nothrow) raw_decoder_t (in_batch_size);
        alloc_assert (_decoder);
205 206

        // disable handshaking for raw socket
207
        _handshaking = false;
208

209 210
        _next_msg = &stream_engine_t::pull_msg_from_session;
        _process_msg = &stream_engine_t::push_raw_msg_to_session;
211

Thomas Rodgers's avatar
Thomas Rodgers committed
212
        properties_t properties;
213
        if (init_properties (properties)) {
214
            //  Compile metadata.
215 216 217
            zmq_assert (_metadata == NULL);
            _metadata = new (std::nothrow) metadata_t (properties);
            alloc_assert (_metadata);
218
        }
219

220
        if (_options.raw_notify) {
221 222 223
            //  For raw sockets, send an initial 0-length message to the
            // application so that it knows a peer has connected.
            msg_t connector;
224
            connector.init ();
225
            push_raw_msg_to_session (&connector);
226
            connector.close ();
227
            _session->flush ();
228
        }
229
    } else {
230 231 232
        // start optional timer, to prevent handshake hanging on no input
        set_handshake_timer ();

233
        //  Send the 'length' and 'flags' fields of the routing id message.
234
        //  The 'length' field is encoded in the long format.
235
        _outpos = _greeting_send;
236
        _outpos[_outsize++] = UCHAR_MAX;
237 238 239
        put_uint64 (&_outpos[_outsize], _options.routing_id_size + 1);
        _outsize += 8;
        _outpos[_outsize++] = 0x7f;
240
    }
Martin Hurton's avatar
Martin Hurton committed
241

242 243
    set_pollin (_handle);
    set_pollout (_handle);
244 245 246 247
    //  Flush all the data that may have been already received downstream.
    in_event ();
}

248
void zmq::stream_engine_t::unplug ()
249
{
250 251
    zmq_assert (_plugged);
    _plugged = false;
252

253
    //  Cancel all timers.
254
    if (_has_handshake_timer) {
255
        cancel_timer (handshake_timer_id);
256
        _has_handshake_timer = false;
257 258
    }

259
    if (_has_ttl_timer) {
Jonathan Reams's avatar
Jonathan Reams committed
260
        cancel_timer (heartbeat_ttl_timer_id);
261
        _has_ttl_timer = false;
Jonathan Reams's avatar
Jonathan Reams committed
262 263
    }

264
    if (_has_timeout_timer) {
Jonathan Reams's avatar
Jonathan Reams committed
265
        cancel_timer (heartbeat_timeout_timer_id);
266
        _has_timeout_timer = false;
Jonathan Reams's avatar
Jonathan Reams committed
267 268
    }

269
    if (_has_heartbeat_timer) {
Jonathan Reams's avatar
Jonathan Reams committed
270
        cancel_timer (heartbeat_ivl_timer_id);
271
        _has_heartbeat_timer = false;
Jonathan Reams's avatar
Jonathan Reams committed
272
    }
273
    //  Cancel all fd subscriptions.
274 275
    if (!_io_error)
        rm_fd (_handle);
276 277 278 279

    //  Disconnect from I/O threads poller object.
    io_object_t::unplug ();

280
    _session = NULL;
281 282
}

283
void zmq::stream_engine_t::terminate ()
284 285 286 287 288
{
    unplug ();
    delete this;
}

289
void zmq::stream_engine_t::in_event ()
290
{
291
    zmq_assert (!_io_error);
292

Pieter Hintjens's avatar
Pieter Hintjens committed
293
    //  If still handshaking, receive and process the greeting message.
294
    if (unlikely (_handshaking))
Martin Hurton's avatar
Martin Hurton committed
295 296 297
        if (!handshake ())
            return;

298
    zmq_assert (_decoder);
299 300

    //  If there has been an I/O error, stop polling.
301 302 303
    if (_input_stopped) {
        rm_fd (_handle);
        _io_error = true;
304 305
        return;
    }
306

307
    //  If there's no data to process in the buffer...
308
    if (!_insize) {
309 310 311 312
        //  Retrieve the buffer and read as much data as possible.
        //  Note that buffer can be arbitrarily large. However, we assume
        //  the underlying TCP layer has fixed buffer size and thus the
        //  number of bytes read will be always limited.
313
        size_t bufsize = 0;
314
        _decoder->get_buffer (&_inpos, &bufsize);
315

316
        const int rc = tcp_read (_s, _inpos, bufsize);
317

318
        if (rc == 0) {
319 320
            // connection closed by peer
            errno = EPIPE;
321
            error (connection_error);
322
            return;
323
        }
324 325
        if (rc == -1) {
            if (errno != EAGAIN)
326
                error (connection_error);
327 328
            return;
        }
329

330
        //  Adjust input size
331
        _insize = static_cast<size_t> (rc);
332
        // Adjust buffer size to received bytes
333
        _decoder->resize_buffer (_insize);
Martin Hurton's avatar
Martin Hurton committed
334
    }
335

336 337
    int rc = 0;
    size_t processed = 0;
338

339 340 341 342 343
    while (_insize > 0) {
        rc = _decoder->decode (_inpos, _insize, processed);
        zmq_assert (processed <= _insize);
        _inpos += processed;
        _insize -= processed;
344 345
        if (rc == 0 || rc == -1)
            break;
346
        rc = (this->*_process_msg) (_decoder->msg ());
347 348
        if (rc == -1)
            break;
349 350
    }

351 352 353 354
    //  Tear down the connection if we have failed to decode input data
    //  or the session has rejected the message.
    if (rc == -1) {
        if (errno != EAGAIN) {
355
            error (protocol_error);
356 357
            return;
        }
358 359
        _input_stopped = true;
        reset_pollin (_handle);
Martin Hurton's avatar
Martin Hurton committed
360
    }
361

362
    _session->flush ();
363 364
}

365
void zmq::stream_engine_t::out_event ()
366
{
367
    zmq_assert (!_io_error);
368

369
    //  If write buffer is empty, try to read new data from the encoder.
370
    if (!_outsize) {
Martin Hurton's avatar
Martin Hurton committed
371 372 373
        //  Even when we stop polling as soon as there is no
        //  data to send, the poller may invoke out_event one
        //  more time due to 'speculative write' optimisation.
374 375
        if (unlikely (_encoder == NULL)) {
            zmq_assert (_handshaking);
Martin Hurton's avatar
Martin Hurton committed
376 377 378
            return;
        }

379 380
        _outpos = NULL;
        _outsize = _encoder->encode (&_outpos, 0);
381

382 383
        while (_outsize < static_cast<size_t> (out_batch_size)) {
            if ((this->*_next_msg) (&_tx_msg) == -1)
384
                break;
385 386 387
            _encoder->load_msg (&_tx_msg);
            unsigned char *bufptr = _outpos + _outsize;
            size_t n = _encoder->encode (&bufptr, out_batch_size - _outsize);
388
            zmq_assert (n > 0);
389 390 391
            if (_outpos == NULL)
                _outpos = bufptr;
            _outsize += n;
392
        }
393 394

        //  If there is no data to send, stop polling for output.
395 396 397
        if (_outsize == 0) {
            _output_stopped = true;
            reset_pollout (_handle);
398 399 400 401 402 403
            return;
        }
    }

    //  If there are any data to write in write buffer, write as much as
    //  possible to the socket. Note that amount of data to write can be
404
    //  arbitrarily large. However, we assume that underlying TCP layer has
405 406
    //  limited transmission buffer and thus the actual number of bytes
    //  written should be reasonably modest.
407
    const int nbytes = tcp_write (_s, _outpos, _outsize);
408

Martin Hurton's avatar
Martin Hurton committed
409 410
    //  IO error has occurred. We stop waiting for output events.
    //  The engine is not terminated until we detect input error;
411
    //  this is necessary to prevent losing incoming messages.
412
    if (nbytes == -1) {
413
        reset_pollout (_handle);
414 415 416
        return;
    }

417 418
    _outpos += nbytes;
    _outsize -= nbytes;
Martin Hurton's avatar
Martin Hurton committed
419 420 421

    //  If we are still handshaking and there are no data
    //  to send, stop polling for output.
422 423 424
    if (unlikely (_handshaking))
        if (_outsize == 0)
            reset_pollout (_handle);
425 426
}

427
void zmq::stream_engine_t::restart_output ()
428
{
429
    if (unlikely (_io_error))
430 431
        return;

432 433 434
    if (likely (_output_stopped)) {
        set_pollout (_handle);
        _output_stopped = false;
435
    }
436 437 438 439 440 441 442 443

    //  Speculative write: The assumption is that at the moment new message
    //  was sent by the user the socket is probably available for writing.
    //  Thus we try to write the data to socket avoiding polling for POLLOUT.
    //  Consequently, the latency should be better in request/reply scenarios.
    out_event ();
}

444
void zmq::stream_engine_t::restart_input ()
445
{
446 447 448
    zmq_assert (_input_stopped);
    zmq_assert (_session != NULL);
    zmq_assert (_decoder != NULL);
449

450
    int rc = (this->*_process_msg) (_decoder->msg ());
451 452
    if (rc == -1) {
        if (errno == EAGAIN)
453
            _session->flush ();
454
        else
455
            error (protocol_error);
Martin Hurton's avatar
Martin Hurton committed
456 457 458
        return;
    }

459
    while (_insize > 0) {
460
        size_t processed = 0;
461 462 463 464
        rc = _decoder->decode (_inpos, _insize, processed);
        zmq_assert (processed <= _insize);
        _inpos += processed;
        _insize -= processed;
465 466
        if (rc == 0 || rc == -1)
            break;
467
        rc = (this->*_process_msg) (_decoder->msg ());
468 469 470
        if (rc == -1)
            break;
    }
471

472
    if (rc == -1 && errno == EAGAIN)
473 474
        _session->flush ();
    else if (_io_error)
475
        error (connection_error);
476
    else if (rc == -1)
477
        error (protocol_error);
478
    else {
479 480 481
        _input_stopped = false;
        set_pollin (_handle);
        _session->flush ();
482 483 484 485

        //  Speculative read.
        in_event ();
    }
486 487
}

488
bool zmq::stream_engine_t::handshake ()
Martin Hurton's avatar
Martin Hurton committed
489
{
490 491
    zmq_assert (_handshaking);
    zmq_assert (_greeting_bytes_read < _greeting_size);
492
    //  Receive the greeting.
493 494 495
    while (_greeting_bytes_read < _greeting_size) {
        const int n = tcp_read (_s, _greeting_recv + _greeting_bytes_read,
                                _greeting_size - _greeting_bytes_read);
496
        if (n == 0) {
497
            errno = EPIPE;
498
            error (connection_error);
499 500
            return false;
        }
501 502
        if (n == -1) {
            if (errno != EAGAIN)
503
                error (connection_error);
504
            return false;
505
        }
Martin Hurton's avatar
Martin Hurton committed
506

507
        _greeting_bytes_read += n;
Martin Hurton's avatar
Martin Hurton committed
508

509 510 511
        //  We have received at least one byte from the peer.
        //  If the first byte is not 0xff, we know that the
        //  peer is using unversioned protocol.
512
        if (_greeting_recv[0] != 0xff)
513
            break;
Martin Hurton's avatar
Martin Hurton committed
514

515
        if (_greeting_bytes_read < signature_size)
516
            continue;
Martin Hurton's avatar
Martin Hurton committed
517

518 519
        //  Inspect the right-most bit of the 10th byte (which coincides
        //  with the 'flags' field if a regular message was sent).
520
        //  Zero indicates this is a header of a routing id message
521
        //  (i.e. the peer is using the unversioned protocol).
522
        if (!(_greeting_recv[9] & 0x01))
523
            break;
Martin Hurton's avatar
Martin Hurton committed
524

525
        //  The peer is using versioned protocol.
526
        //  Send the major version number.
527 528 529 530
        if (_outpos + _outsize == _greeting_send + signature_size) {
            if (_outsize == 0)
                set_pollout (_handle);
            _outpos[_outsize++] = 3; //  Major version number
531 532
        }

533 534 535 536
        if (_greeting_bytes_read > signature_size) {
            if (_outpos + _outsize == _greeting_send + signature_size + 1) {
                if (_outsize == 0)
                    set_pollout (_handle);
537 538

                //  Use ZMTP/2.0 to talk to older peers.
539 540 541
                if (_greeting_recv[10] == ZMTP_1_0
                    || _greeting_recv[10] == ZMTP_2_0)
                    _outpos[_outsize++] = _options.type;
542
                else {
543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562
                    _outpos[_outsize++] = 0; //  Minor version number
                    memset (_outpos + _outsize, 0, 20);

                    zmq_assert (_options.mechanism == ZMQ_NULL
                                || _options.mechanism == ZMQ_PLAIN
                                || _options.mechanism == ZMQ_CURVE
                                || _options.mechanism == ZMQ_GSSAPI);

                    if (_options.mechanism == ZMQ_NULL)
                        memcpy (_outpos + _outsize, "NULL", 4);
                    else if (_options.mechanism == ZMQ_PLAIN)
                        memcpy (_outpos + _outsize, "PLAIN", 5);
                    else if (_options.mechanism == ZMQ_GSSAPI)
                        memcpy (_outpos + _outsize, "GSSAPI", 6);
                    else if (_options.mechanism == ZMQ_CURVE)
                        memcpy (_outpos + _outsize, "CURVE", 5);
                    _outsize += 20;
                    memset (_outpos + _outsize, 0, 32);
                    _outsize += 32;
                    _greeting_size = v3_greeting_size;
563 564
                }
            }
Martin Hurton's avatar
Martin Hurton committed
565 566 567
        }
    }

568 569
    //  Position of the revision field in the greeting.
    const size_t revision_pos = 10;
570

571
    //  Is the peer using ZMTP/1.0 with no revision number?
572
    //  If so, we send and receive rest of routing id message
573 574
    if (_greeting_recv[0] != 0xff || !(_greeting_recv[9] & 0x01)) {
        if (_session->zap_enabled ()) {
575 576 577
            // reject ZMTP 1.0 connections if ZAP is enabled
            error (protocol_error);
            return false;
578 579
        }

580 581
        _encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
        alloc_assert (_encoder);
582

583 584 585
        _decoder =
          new (std::nothrow) v1_decoder_t (in_batch_size, _options.maxmsgsize);
        alloc_assert (_decoder);
586

Martin Hurton's avatar
Martin Hurton committed
587 588 589 590
        //  We have already sent the message header.
        //  Since there is no way to tell the encoder to
        //  skip the message header, we simply throw that
        //  header data away.
591 592
        const size_t header_size =
          _options.routing_id_size + 1 >= UCHAR_MAX ? 10 : 2;
593
        unsigned char tmp[10], *bufferp = tmp;
594

595
        //  Prepare the routing id message and load it into encoder.
596
        //  Then consume bytes we have already sent to the peer.
597
        const int rc = _tx_msg.init_size (_options.routing_id_size);
598
        zmq_assert (rc == 0);
599 600 601
        memcpy (_tx_msg.data (), _options.routing_id, _options.routing_id_size);
        _encoder->load_msg (&_tx_msg);
        size_t buffer_size = _encoder->encode (&bufferp, header_size);
Martin Hurton's avatar
Martin Hurton committed
602 603 604
        zmq_assert (buffer_size == header_size);

        //  Make sure the decoder sees the data we have already received.
605 606
        _inpos = _greeting_recv;
        _insize = _greeting_bytes_read;
607 608

        //  To allow for interoperability with peers that do not forward
609 610
        //  their subscriptions, we inject a phantom subscription message
        //  message into the incoming message stream.
611 612
        if (_options.type == ZMQ_PUB || _options.type == ZMQ_XPUB)
            _subscription_required = true;
Martin Hurton's avatar
Martin Hurton committed
613

614
        //  We are sending our routing id now and the next message
Martin Hurton's avatar
Martin Hurton committed
615
        //  will come from the socket.
616
        _next_msg = &stream_engine_t::pull_msg_from_session;
Martin Hurton's avatar
Martin Hurton committed
617

618
        //  We are expecting routing id message.
619 620 621
        _process_msg = &stream_engine_t::process_routing_id_msg;
    } else if (_greeting_recv[revision_pos] == ZMTP_1_0) {
        if (_session->zap_enabled ()) {
622 623 624
            // reject ZMTP 1.0 connections if ZAP is enabled
            error (protocol_error);
            return false;
625 626
        }

627 628
        _encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
        alloc_assert (_encoder);
629

630 631 632 633 634
        _decoder =
          new (std::nothrow) v1_decoder_t (in_batch_size, _options.maxmsgsize);
        alloc_assert (_decoder);
    } else if (_greeting_recv[revision_pos] == ZMTP_2_0) {
        if (_session->zap_enabled ()) {
635 636 637
            // reject ZMTP 2.0 connections if ZAP is enabled
            error (protocol_error);
            return false;
638 639
        }

640 641
        _encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
        alloc_assert (_encoder);
642

643 644 645
        _decoder = new (std::nothrow)
          v2_decoder_t (in_batch_size, _options.maxmsgsize, _options.zero_copy);
        alloc_assert (_decoder);
646
    } else {
647 648
        _encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
        alloc_assert (_encoder);
649

650 651 652
        _decoder = new (std::nothrow)
          v2_decoder_t (in_batch_size, _options.maxmsgsize, _options.zero_copy);
        alloc_assert (_decoder);
653

654 655
        if (_options.mechanism == ZMQ_NULL
            && memcmp (_greeting_recv + 12,
656 657
                       "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
                 == 0) {
658 659 660 661 662
            _mechanism = new (std::nothrow)
              null_mechanism_t (_session, _peer_address, _options);
            alloc_assert (_mechanism);
        } else if (_options.mechanism == ZMQ_PLAIN
                   && memcmp (_greeting_recv + 12,
663 664
                              "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
                        == 0) {
665 666 667
            if (_options.as_server)
                _mechanism = new (std::nothrow)
                  plain_server_t (_session, _peer_address, _options);
668
            else
669 670 671
                _mechanism =
                  new (std::nothrow) plain_client_t (_session, _options);
            alloc_assert (_mechanism);
672
        }
673
#ifdef ZMQ_HAVE_CURVE
674 675
        else if (_options.mechanism == ZMQ_CURVE
                 && memcmp (_greeting_recv + 12,
676 677
                            "CURVE\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
                      == 0) {
678 679 680
            if (_options.as_server)
                _mechanism = new (std::nothrow)
                  curve_server_t (_session, _peer_address, _options);
681
            else
682 683 684
                _mechanism =
                  new (std::nothrow) curve_client_t (_session, _options);
            alloc_assert (_mechanism);
685 686
        }
#endif
Chris Laws's avatar
Chris Laws committed
687
#ifdef HAVE_LIBGSSAPI_KRB5
688 689
        else if (_options.mechanism == ZMQ_GSSAPI
                 && memcmp (_greeting_recv + 12,
690 691
                            "GSSAPI\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
                      == 0) {
692 693 694
            if (_options.as_server)
                _mechanism = new (std::nothrow)
                  gssapi_server_t (_session, _peer_address, _options);
695
            else
696 697 698
                _mechanism =
                  new (std::nothrow) gssapi_client_t (_session, _options);
            alloc_assert (_mechanism);
699
        }
700
#endif
701
        else {
702 703
            _session->get_socket ()->event_handshake_failed_protocol (
              _session->get_endpoint (),
704
              ZMQ_PROTOCOL_ERROR_ZMTP_MECHANISM_MISMATCH);
705
            error (protocol_error);
706 707
            return false;
        }
708 709
        _next_msg = &stream_engine_t::next_handshake_command;
        _process_msg = &stream_engine_t::process_handshake_command;
710
    }
Martin Hurton's avatar
Martin Hurton committed
711 712

    // Start polling for output if necessary.
713 714
    if (_outsize == 0)
        set_pollout (_handle);
Martin Hurton's avatar
Martin Hurton committed
715 716 717

    //  Handshaking was successful.
    //  Switch into the normal message flow.
718
    _handshaking = false;
Martin Hurton's avatar
Martin Hurton committed
719

720
    if (_has_handshake_timer) {
721
        cancel_timer (handshake_timer_id);
722
        _has_handshake_timer = false;
723 724
    }

Martin Hurton's avatar
Martin Hurton committed
725 726 727
    return true;
}

728
int zmq::stream_engine_t::routing_id_msg (msg_t *msg_)
729
{
730
    int rc = msg_->init_size (_options.routing_id_size);
731
    errno_assert (rc == 0);
732 733 734
    if (_options.routing_id_size > 0)
        memcpy (msg_->data (), _options.routing_id, _options.routing_id_size);
    _next_msg = &stream_engine_t::pull_msg_from_session;
735 736
    return 0;
}
737

738
int zmq::stream_engine_t::process_routing_id_msg (msg_t *msg_)
739
{
740
    if (_options.recv_routing_id) {
741
        msg_->set_flags (msg_t::routing_id);
742
        int rc = _session->push_msg (msg_);
743
        errno_assert (rc == 0);
744
    } else {
745 746 747 748 749 750
        int rc = msg_->close ();
        errno_assert (rc == 0);
        rc = msg_->init ();
        errno_assert (rc == 0);
    }

751
    if (_subscription_required) {
752 753 754 755 756 757
        msg_t subscription;

        //  Inject the subscription message, so that also
        //  ZMQ 2.x peers receive published messages.
        int rc = subscription.init_size (1);
        errno_assert (rc == 0);
758
        *static_cast<unsigned char *> (subscription.data ()) = 1;
759
        rc = _session->push_msg (&subscription);
760 761 762
        errno_assert (rc == 0);
    }

763
    _process_msg = &stream_engine_t::push_msg_to_session;
764

765 766
    return 0;
}
767

768
int zmq::stream_engine_t::next_handshake_command (msg_t *msg_)
769
{
770
    zmq_assert (_mechanism != NULL);
771

772
    if (_mechanism->status () == mechanism_t::ready) {
773 774
        mechanism_ready ();
        return pull_and_encode (msg_);
775
    }
776
    if (_mechanism->status () == mechanism_t::error) {
777 778
        errno = EPROTO;
        return -1;
779
    } else {
780
        const int rc = _mechanism->next_handshake_command (msg_);
Vincent Tellier's avatar
Vincent Tellier committed
781

782 783
        if (rc == 0)
            msg_->set_flags (msg_t::command);
Vincent Tellier's avatar
Vincent Tellier committed
784

785
        return rc;
786
    }
787 788
}

789
int zmq::stream_engine_t::process_handshake_command (msg_t *msg_)
790
{
791 792
    zmq_assert (_mechanism != NULL);
    const int rc = _mechanism->process_handshake_command (msg_);
793
    if (rc == 0) {
794
        if (_mechanism->status () == mechanism_t::ready)
795
            mechanism_ready ();
796
        else if (_mechanism->status () == mechanism_t::error) {
797 798 799
            errno = EPROTO;
            return -1;
        }
800
        if (_output_stopped)
801
            restart_output ();
802 803
    }

804
    return rc;
805 806
}

807 808
void zmq::stream_engine_t::zap_msg_available ()
{
809
    zmq_assert (_mechanism != NULL);
810

811
    const int rc = _mechanism->zap_msg_available ();
812
    if (rc == -1) {
813
        error (protocol_error);
814 815
        return;
    }
816
    if (_input_stopped)
817
        restart_input ();
818
    if (_output_stopped)
819
        restart_output ();
820 821
}

822 823
const char *zmq::stream_engine_t::get_endpoint () const
{
824
    return _endpoint.c_str ();
825 826
}

827
void zmq::stream_engine_t::mechanism_ready ()
828
{
829 830 831
    if (_options.heartbeat_interval > 0) {
        add_timer (_options.heartbeat_interval, heartbeat_ivl_timer_id);
        _has_heartbeat_timer = true;
832 833
    }

834
    if (_options.recv_routing_id) {
835
        msg_t routing_id;
836 837
        _mechanism->peer_routing_id (&routing_id);
        const int rc = _session->push_msg (&routing_id);
838 839 840
        if (rc == -1 && errno == EAGAIN) {
            // If the write is failing at this stage with
            // an EAGAIN the pipe must be being shut down,
841
            // so we can just bail out of the routing id set.
842 843
            return;
        }
844
        errno_assert (rc == 0);
845
        _session->flush ();
846
    }
847

848 849
    _next_msg = &stream_engine_t::pull_and_encode;
    _process_msg = &stream_engine_t::write_credential;
850 851 852

    //  Compile metadata.
    properties_t properties;
853
    init_properties (properties);
854

Pieter Hintjens's avatar
Pieter Hintjens committed
855
    //  Add ZAP properties.
856
    const properties_t &zap_properties = _mechanism->get_zap_properties ();
857
    properties.insert (zap_properties.begin (), zap_properties.end ());
Pieter Hintjens's avatar
Pieter Hintjens committed
858

859
    //  Add ZMTP properties.
860
    const properties_t &zmtp_properties = _mechanism->get_zmtp_properties ();
861
    properties.insert (zmtp_properties.begin (), zmtp_properties.end ());
862

863
    zmq_assert (_metadata == NULL);
864
    if (!properties.empty ()) {
865 866
        _metadata = new (std::nothrow) metadata_t (properties);
        alloc_assert (_metadata);
867
    }
Vincent Tellier's avatar
Vincent Tellier committed
868

869
#ifdef ZMQ_BUILD_DRAFT_API
870
    _socket->event_handshake_succeeded (_endpoint, 0);
871
#endif
872 873
}

874
int zmq::stream_engine_t::pull_msg_from_session (msg_t *msg_)
875
{
876
    return _session->pull_msg (msg_);
877
}
878

879 880
int zmq::stream_engine_t::push_msg_to_session (msg_t *msg_)
{
881
    return _session->push_msg (msg_);
882 883
}

884 885
int zmq::stream_engine_t::push_raw_msg_to_session (msg_t *msg_)
{
886 887
    if (_metadata && _metadata != msg_->metadata ())
        msg_->set_metadata (_metadata);
888
    return push_msg_to_session (msg_);
889 890
}

891 892
int zmq::stream_engine_t::write_credential (msg_t *msg_)
{
893 894
    zmq_assert (_mechanism != NULL);
    zmq_assert (_session != NULL);
895

896
    const blob_t &credential = _mechanism->get_user_id ();
897 898 899 900 901 902
    if (credential.size () > 0) {
        msg_t msg;
        int rc = msg.init_size (credential.size ());
        zmq_assert (rc == 0);
        memcpy (msg.data (), credential.data (), credential.size ());
        msg.set_flags (msg_t::credential);
903
        rc = _session->push_msg (&msg);
904 905 906 907 908 909
        if (rc == -1) {
            rc = msg.close ();
            errno_assert (rc == 0);
            return -1;
        }
    }
910
    _process_msg = &stream_engine_t::decode_and_push;
911 912 913
    return decode_and_push (msg_);
}

914 915
int zmq::stream_engine_t::pull_and_encode (msg_t *msg_)
{
916
    zmq_assert (_mechanism != NULL);
917

918
    if (_session->pull_msg (msg_) == -1)
919
        return -1;
920
    if (_mechanism->encode (msg_) == -1)
921 922 923 924 925 926
        return -1;
    return 0;
}

int zmq::stream_engine_t::decode_and_push (msg_t *msg_)
{
927
    zmq_assert (_mechanism != NULL);
928

929
    if (_mechanism->decode (msg_) == -1)
930
        return -1;
Jonathan Reams's avatar
Jonathan Reams committed
931

932 933
    if (_has_timeout_timer) {
        _has_timeout_timer = false;
934
        cancel_timer (heartbeat_timeout_timer_id);
Jonathan Reams's avatar
Jonathan Reams committed
935 936
    }

937 938
    if (_has_ttl_timer) {
        _has_ttl_timer = false;
939
        cancel_timer (heartbeat_ttl_timer_id);
Jonathan Reams's avatar
Jonathan Reams committed
940 941
    }

942
    if (msg_->flags () & msg_t::command) {
943
        process_command_message (msg_);
Jonathan Reams's avatar
Jonathan Reams committed
944 945
    }

946 947 948
    if (_metadata)
        msg_->set_metadata (_metadata);
    if (_session->push_msg (msg_) == -1) {
949
        if (errno == EAGAIN)
950
            _process_msg = &stream_engine_t::push_one_then_decode_and_push;
951 952 953 954 955 956 957
        return -1;
    }
    return 0;
}

int zmq::stream_engine_t::push_one_then_decode_and_push (msg_t *msg_)
{
958
    const int rc = _session->push_msg (msg_);
959
    if (rc == 0)
960
        _process_msg = &stream_engine_t::decode_and_push;
961 962 963
    return rc;
}

964
void zmq::stream_engine_t::error (error_reason_t reason_)
965
{
966
    if (_options.raw_socket && _options.raw_notify) {
967 968 969
        //  For raw sockets, send a final 0-length message to the application
        //  so that it knows the peer has been disconnected.
        msg_t terminator;
970
        terminator.init ();
971
        (this->*_process_msg) (&terminator);
972
        terminator.close ();
973
    }
974
    zmq_assert (_session);
975
#ifdef ZMQ_BUILD_DRAFT_API
976
    // protocol errors have been signaled already at the point where they occurred
977
    if (reason_ != protocol_error
978 979
        && (_mechanism == NULL
            || _mechanism->status () == mechanism_t::handshaking)) {
980
        int err = errno;
981
        _socket->event_handshake_failed_no_detail (_endpoint, err);
982
    }
983
#endif
984 985 986
    _socket->event_disconnected (_endpoint, _s);
    _session->flush ();
    _session->engine_error (reason_);
987 988 989 990
    unplug ();
    delete this;
}

991
void zmq::stream_engine_t::set_handshake_timer ()
992
{
993
    zmq_assert (!_has_handshake_timer);
994

995 996 997
    if (!_options.raw_socket && _options.handshake_ivl > 0) {
        add_timer (_options.handshake_ivl, handshake_timer_id);
        _has_handshake_timer = true;
998
    }
999 1000
}

1001
bool zmq::stream_engine_t::init_properties (properties_t &properties_)
1002
{
1003
    if (_peer_address.empty ())
1004
        return false;
1005
    properties_.ZMQ_MAP_INSERT_OR_EMPLACE (
1006
      std::string (ZMQ_MSG_PROPERTY_PEER_ADDRESS), _peer_address);
1007 1008

    //  Private property to support deprecated SRCFD
1009
    std::ostringstream stream;
1010
    stream << static_cast<int> (_s);
1011
    std::string fd_string = stream.str ();
1012 1013
    properties_.ZMQ_MAP_INSERT_OR_EMPLACE (std::string ("__fd"),
                                           ZMQ_MOVE (fd_string));
Thomas Rodgers's avatar
Thomas Rodgers committed
1014 1015 1016
    return true;
}

1017
void zmq::stream_engine_t::timer_event (int id_)
1018
{
1019
    if (id_ == handshake_timer_id) {
1020
        _has_handshake_timer = false;
Jonathan Reams's avatar
Jonathan Reams committed
1021 1022
        //  handshake timer expired before handshake completed, so engine fail
        error (timeout_error);
1023
    } else if (id_ == heartbeat_ivl_timer_id) {
1024
        _next_msg = &stream_engine_t::produce_ping_message;
1025
        out_event ();
1026
        add_timer (_options.heartbeat_interval, heartbeat_ivl_timer_id);
1027
    } else if (id_ == heartbeat_ttl_timer_id) {
1028
        _has_ttl_timer = false;
1029 1030
        error (timeout_error);
    } else if (id_ == heartbeat_timeout_timer_id) {
1031
        _has_timeout_timer = false;
1032 1033
        error (timeout_error);
    } else
Jonathan Reams's avatar
Jonathan Reams committed
1034
        // There are no other valid timer ids!
1035
        assert (false);
Jonathan Reams's avatar
Jonathan Reams committed
1036 1037
}

1038
int zmq::stream_engine_t::produce_ping_message (msg_t *msg_)
Jonathan Reams's avatar
Jonathan Reams committed
1039 1040
{
    int rc = 0;
1041
    zmq_assert (_mechanism != NULL);
Jonathan Reams's avatar
Jonathan Reams committed
1042 1043

    // 16-bit TTL + \4PING == 7
1044 1045 1046
    rc = msg_->init_size (7);
    errno_assert (rc == 0);
    msg_->set_flags (msg_t::command);
Jonathan Reams's avatar
Jonathan Reams committed
1047
    // Copy in the command message
1048
    memcpy (msg_->data (), "\4PING", 5);
Jonathan Reams's avatar
Jonathan Reams committed
1049

1050
    uint16_t ttl_val = htons (_options.heartbeat_ttl);
1051 1052
    memcpy ((static_cast<uint8_t *> (msg_->data ())) + 5, &ttl_val,
            sizeof (ttl_val));
Jonathan Reams's avatar
Jonathan Reams committed
1053

1054 1055 1056 1057 1058
    rc = _mechanism->encode (msg_);
    _next_msg = &stream_engine_t::pull_and_encode;
    if (!_has_timeout_timer && _heartbeat_timeout > 0) {
        add_timer (_heartbeat_timeout, heartbeat_timeout_timer_id);
        _has_timeout_timer = true;
Jonathan Reams's avatar
Jonathan Reams committed
1059 1060 1061 1062
    }
    return rc;
}

1063
int zmq::stream_engine_t::produce_pong_message (msg_t *msg_)
Jonathan Reams's avatar
Jonathan Reams committed
1064 1065
{
    int rc = 0;
1066
    zmq_assert (_mechanism != NULL);
Jonathan Reams's avatar
Jonathan Reams committed
1067

1068
    rc = msg_->move (_pong_msg);
1069
    errno_assert (rc == 0);
Jonathan Reams's avatar
Jonathan Reams committed
1070

1071 1072
    rc = _mechanism->encode (msg_);
    _next_msg = &stream_engine_t::pull_and_encode;
Jonathan Reams's avatar
Jonathan Reams committed
1073 1074 1075
    return rc;
}

1076
int zmq::stream_engine_t::process_heartbeat_message (msg_t *msg_)
Jonathan Reams's avatar
Jonathan Reams committed
1077
{
1078
    if (memcmp (msg_->data (), "\4PING", 5) == 0) {
Jonathan Reams's avatar
Jonathan Reams committed
1079 1080
        uint16_t remote_heartbeat_ttl;
        // Get the remote heartbeat TTL to setup the timer
1081 1082
        memcpy (&remote_heartbeat_ttl,
                static_cast<uint8_t *> (msg_->data ()) + 5, 2);
1083
        remote_heartbeat_ttl = ntohs (remote_heartbeat_ttl);
Jonathan Reams's avatar
Jonathan Reams committed
1084
        // The remote heartbeat is in 10ths of a second
1085 1086
        // so we multiply it by 100 to get the timer interval in ms.
        remote_heartbeat_ttl *= 100;
Jonathan Reams's avatar
Jonathan Reams committed
1087

1088
        if (!_has_ttl_timer && remote_heartbeat_ttl > 0) {
1089
            add_timer (remote_heartbeat_ttl, heartbeat_ttl_timer_id);
1090
            _has_ttl_timer = true;
Jonathan Reams's avatar
Jonathan Reams committed
1091
        }
1092

1093 1094 1095 1096 1097 1098
        //  As per ZMTP 3.1 the PING command might contain an up to 16 bytes
        //  context which needs to be PONGed back, so build the pong message
        //  here and store it. Truncate it if it's too long.
        //  Given the engine goes straight to out_event, sequential PINGs will
        //  not be a problem.
        size_t context_len = msg_->size () - 7 > 16 ? 16 : msg_->size () - 7;
1099
        int rc = _pong_msg.init_size (5 + context_len);
1100
        errno_assert (rc == 0);
1101 1102
        _pong_msg.set_flags (msg_t::command);
        memcpy (_pong_msg.data (), "\4PONG", 5);
1103
        if (context_len > 0)
1104
            memcpy ((static_cast<uint8_t *> (_pong_msg.data ())) + 5,
1105
                    (static_cast<uint8_t *> (msg_->data ())) + 7, context_len);
1106

1107
        _next_msg = &stream_engine_t::produce_pong_message;
1108
        out_event ();
Jonathan Reams's avatar
Jonathan Reams committed
1109 1110 1111
    }

    return 0;
1112
}
1113 1114 1115

int zmq::stream_engine_t::process_command_message (msg_t *msg_)
{
1116 1117
    const uint8_t cmd_name_size =
      *(static_cast<const uint8_t *> (msg_->data ()));
1118 1119 1120 1121
    //  Malformed command
    if (msg_->size () < cmd_name_size + sizeof (cmd_name_size))
        return -1;

1122 1123
    const uint8_t *cmd_name =
      (static_cast<const uint8_t *> (msg_->data ())) + 1;
1124 1125 1126 1127 1128 1129 1130
    if (cmd_name_size == 4
        && (memcmp (cmd_name, "PING", cmd_name_size) == 0
            || memcmp (cmd_name, "PONG", cmd_name_size) == 0))
        return process_heartbeat_message (msg_);

    return 0;
}