stream_engine.cpp 38.1 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
32

33
#include <limits.h>
34
#include <string.h>
35 36

#ifndef ZMQ_HAVE_WINDOWS
37
#include <unistd.h>
38 39
#endif

40
#include <new>
41
#include <sstream>
42

43
#include "stream_engine.hpp"
44
#include "io_thread.hpp"
45
#include "session_base.hpp"
46 47
#include "v1_encoder.hpp"
#include "v1_decoder.hpp"
48 49
#include "v2_encoder.hpp"
#include "v2_decoder.hpp"
50
#include "null_mechanism.hpp"
51 52
#include "plain_client.hpp"
#include "plain_server.hpp"
53 54
#include "gssapi_client.hpp"
#include "gssapi_server.hpp"
55 56
#include "curve_client.hpp"
#include "curve_server.hpp"
57 58
#include "raw_decoder.hpp"
#include "raw_encoder.hpp"
59 60
#include "config.hpp"
#include "err.hpp"
61
#include "ip.hpp"
62
#include "tcp.hpp"
Martin Hurton's avatar
Martin Hurton committed
63 64
#include "likely.hpp"
#include "wire.hpp"
65

66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
static std::string get_peer_address (zmq::fd_t s_)
{
    std::string peer_address;

    const int family = zmq::get_peer_ip_address (s_, peer_address);
    if (family == 0)
        peer_address.clear ();
#if defined ZMQ_HAVE_SO_PEERCRED
    else if (family == PF_UNIX) {
        struct ucred cred;
        socklen_t size = sizeof (cred);
        if (!getsockopt (s_, SOL_SOCKET, SO_PEERCRED, &cred, &size)) {
            std::ostringstream buf;
            buf << ":" << cred.uid << ":" << cred.gid << ":" << cred.pid;
            peer_address += buf.str ();
        }
    }
#elif defined ZMQ_HAVE_LOCAL_PEERCRED
    else if (family == PF_UNIX) {
        struct xucred cred;
        socklen_t size = sizeof (cred);
        if (!getsockopt (_s, 0, LOCAL_PEERCRED, &cred, &size)
            && cred.cr_version == XUCRED_VERSION) {
            std::ostringstream buf;
            buf << ":" << cred.cr_uid << ":";
            if (cred.cr_ngroups > 0)
                buf << cred.cr_groups[0];
            buf << ":";
            _peer_address += buf.str ();
        }
    }
#endif

    return peer_address;
}


103 104 105 106
zmq::stream_engine_t::stream_engine_t (
  fd_t fd_,
  const options_t &options_,
  const endpoint_uri_pair_t &endpoint_uri_pair_) :
107 108 109 110 111 112 113 114 115 116 117 118 119 120
    _s (fd_),
    _handle (static_cast<handle_t> (NULL)),
    _inpos (NULL),
    _insize (0),
    _decoder (NULL),
    _outpos (NULL),
    _outsize (0),
    _encoder (NULL),
    _metadata (NULL),
    _handshaking (true),
    _greeting_size (v2_greeting_size),
    _greeting_bytes_read (0),
    _session (NULL),
    _options (options_),
121
    _endpoint_uri_pair (endpoint_uri_pair_),
122 123 124 125 126 127 128 129 130 131 132 133 134
    _plugged (false),
    _next_msg (&stream_engine_t::routing_id_msg),
    _process_msg (&stream_engine_t::process_routing_id_msg),
    _io_error (false),
    _subscription_required (false),
    _mechanism (NULL),
    _input_stopped (false),
    _output_stopped (false),
    _has_handshake_timer (false),
    _has_ttl_timer (false),
    _has_timeout_timer (false),
    _has_heartbeat_timer (false),
    _heartbeat_timeout (0),
135 136
    _socket (NULL),
    _peer_address (get_peer_address (_s))
137
{
138
    int rc = _tx_msg.init ();
139
    errno_assert (rc == 0);
140
    rc = _pong_msg.init ();
141
    errno_assert (rc == 0);
Chris Laws's avatar
Chris Laws committed
142

Martin Hurton's avatar
Martin Hurton committed
143
    //  Put the socket into non-blocking mode.
144
    unblock_socket (_s);
145

146

147 148 149 150
    if (_options.heartbeat_interval > 0) {
        _heartbeat_timeout = _options.heartbeat_timeout;
        if (_heartbeat_timeout == -1)
            _heartbeat_timeout = _options.heartbeat_interval;
151
    }
152 153
}

154
zmq::stream_engine_t::~stream_engine_t ()
155
{
156
    zmq_assert (!_plugged);
157

158
    if (_s != retired_fd) {
159
#ifdef ZMQ_HAVE_WINDOWS
160
        int rc = closesocket (_s);
161
        wsa_assert (rc != SOCKET_ERROR);
162
#else
163
        int rc = close (_s);
164
#if defined(__FreeBSD_kernel__) || defined(__FreeBSD__)
165 166 167 168 169
        // FreeBSD may return ECONNRESET on close() under load but this is not
        // an error.
        if (rc == -1 && errno == ECONNRESET)
            rc = 0;
#endif
170 171
        errno_assert (rc == 0);
#endif
172
        _s = retired_fd;
173
    }
174

175
    int rc = _tx_msg.close ();
176 177
    errno_assert (rc == 0);

178 179
    //  Drop reference to metadata and destroy it if we are
    //  the only user.
180 181 182
    if (_metadata != NULL) {
        if (_metadata->drop_ref ()) {
            LIBZMQ_DELETE (_metadata);
183 184
        }
    }
185

186 187 188
    LIBZMQ_DELETE (_encoder);
    LIBZMQ_DELETE (_decoder);
    LIBZMQ_DELETE (_mechanism);
189 190
}

191
void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
192
                                 session_base_t *session_)
193
{
194 195
    zmq_assert (!_plugged);
    _plugged = true;
196 197

    //  Connect to session object.
198
    zmq_assert (!_session);
199
    zmq_assert (session_);
200 201
    _session = session_;
    _socket = _session->get_socket ();
202 203 204

    //  Connect to I/O threads poller object.
    io_object_t::plug (io_thread_);
205 206
    _handle = add_fd (_s);
    _io_error = false;
Martin Hurton's avatar
Martin Hurton committed
207

208
    if (_options.raw_socket) {
209
        // no handshaking for raw sock, instantiate raw encoder and decoders
210 211
        _encoder = new (std::nothrow) raw_encoder_t (out_batch_size);
        alloc_assert (_encoder);
212

213 214
        _decoder = new (std::nothrow) raw_decoder_t (in_batch_size);
        alloc_assert (_decoder);
215 216

        // disable handshaking for raw socket
217
        _handshaking = false;
218

219 220
        _next_msg = &stream_engine_t::pull_msg_from_session;
        _process_msg = &stream_engine_t::push_raw_msg_to_session;
221

Thomas Rodgers's avatar
Thomas Rodgers committed
222
        properties_t properties;
223
        if (init_properties (properties)) {
224
            //  Compile metadata.
225 226 227
            zmq_assert (_metadata == NULL);
            _metadata = new (std::nothrow) metadata_t (properties);
            alloc_assert (_metadata);
228
        }
229

230
        if (_options.raw_notify) {
231 232 233
            //  For raw sockets, send an initial 0-length message to the
            // application so that it knows a peer has connected.
            msg_t connector;
234
            connector.init ();
235
            push_raw_msg_to_session (&connector);
236
            connector.close ();
237
            _session->flush ();
238
        }
239
    } else {
240 241 242
        // start optional timer, to prevent handshake hanging on no input
        set_handshake_timer ();

243
        //  Send the 'length' and 'flags' fields of the routing id message.
244
        //  The 'length' field is encoded in the long format.
245
        _outpos = _greeting_send;
246
        _outpos[_outsize++] = UCHAR_MAX;
247 248 249
        put_uint64 (&_outpos[_outsize], _options.routing_id_size + 1);
        _outsize += 8;
        _outpos[_outsize++] = 0x7f;
250
    }
Martin Hurton's avatar
Martin Hurton committed
251

252 253
    set_pollin (_handle);
    set_pollout (_handle);
254 255 256 257
    //  Flush all the data that may have been already received downstream.
    in_event ();
}

258
void zmq::stream_engine_t::unplug ()
259
{
260 261
    zmq_assert (_plugged);
    _plugged = false;
262

263
    //  Cancel all timers.
264
    if (_has_handshake_timer) {
265
        cancel_timer (handshake_timer_id);
266
        _has_handshake_timer = false;
267 268
    }

269
    if (_has_ttl_timer) {
Jonathan Reams's avatar
Jonathan Reams committed
270
        cancel_timer (heartbeat_ttl_timer_id);
271
        _has_ttl_timer = false;
Jonathan Reams's avatar
Jonathan Reams committed
272 273
    }

274
    if (_has_timeout_timer) {
Jonathan Reams's avatar
Jonathan Reams committed
275
        cancel_timer (heartbeat_timeout_timer_id);
276
        _has_timeout_timer = false;
Jonathan Reams's avatar
Jonathan Reams committed
277 278
    }

279
    if (_has_heartbeat_timer) {
Jonathan Reams's avatar
Jonathan Reams committed
280
        cancel_timer (heartbeat_ivl_timer_id);
281
        _has_heartbeat_timer = false;
Jonathan Reams's avatar
Jonathan Reams committed
282
    }
283
    //  Cancel all fd subscriptions.
284 285
    if (!_io_error)
        rm_fd (_handle);
286 287 288 289

    //  Disconnect from I/O threads poller object.
    io_object_t::unplug ();

290
    _session = NULL;
291 292
}

293
void zmq::stream_engine_t::terminate ()
294 295 296 297 298
{
    unplug ();
    delete this;
}

299
void zmq::stream_engine_t::in_event ()
300 301 302 303 304 305 306
{
    // ignore errors
    const bool res = in_event_internal ();
    LIBZMQ_UNUSED (res);
}

bool zmq::stream_engine_t::in_event_internal ()
307
{
308
    zmq_assert (!_io_error);
309

Pieter Hintjens's avatar
Pieter Hintjens committed
310
    //  If still handshaking, receive and process the greeting message.
311
    if (unlikely (_handshaking))
Martin Hurton's avatar
Martin Hurton committed
312
        if (!handshake ())
313
            return false;
Martin Hurton's avatar
Martin Hurton committed
314

315
    zmq_assert (_decoder);
316 317

    //  If there has been an I/O error, stop polling.
318 319 320
    if (_input_stopped) {
        rm_fd (_handle);
        _io_error = true;
321
        return true; // TODO or return false in this case too?
322
    }
323

324
    //  If there's no data to process in the buffer...
325
    if (!_insize) {
326 327 328 329
        //  Retrieve the buffer and read as much data as possible.
        //  Note that buffer can be arbitrarily large. However, we assume
        //  the underlying TCP layer has fixed buffer size and thus the
        //  number of bytes read will be always limited.
330
        size_t bufsize = 0;
331
        _decoder->get_buffer (&_inpos, &bufsize);
332

333
        const int rc = tcp_read (_s, _inpos, bufsize);
334

335
        if (rc == 0) {
336 337
            // connection closed by peer
            errno = EPIPE;
338
            error (connection_error);
339
            return false;
340
        }
341
        if (rc == -1) {
342
            if (errno != EAGAIN) {
343
                error (connection_error);
344 345 346
                return false;
            }
            return true;
347
        }
348

349
        //  Adjust input size
350
        _insize = static_cast<size_t> (rc);
351
        // Adjust buffer size to received bytes
352
        _decoder->resize_buffer (_insize);
Martin Hurton's avatar
Martin Hurton committed
353
    }
354

355 356
    int rc = 0;
    size_t processed = 0;
357

358 359 360 361 362
    while (_insize > 0) {
        rc = _decoder->decode (_inpos, _insize, processed);
        zmq_assert (processed <= _insize);
        _inpos += processed;
        _insize -= processed;
363 364
        if (rc == 0 || rc == -1)
            break;
365
        rc = (this->*_process_msg) (_decoder->msg ());
366 367
        if (rc == -1)
            break;
368 369
    }

370 371 372 373
    //  Tear down the connection if we have failed to decode input data
    //  or the session has rejected the message.
    if (rc == -1) {
        if (errno != EAGAIN) {
374
            error (protocol_error);
375
            return false;
376
        }
377 378
        _input_stopped = true;
        reset_pollin (_handle);
Martin Hurton's avatar
Martin Hurton committed
379
    }
380

381
    _session->flush ();
382
    return true;
383 384
}

385
void zmq::stream_engine_t::out_event ()
386
{
387
    zmq_assert (!_io_error);
388

389
    //  If write buffer is empty, try to read new data from the encoder.
390
    if (!_outsize) {
Martin Hurton's avatar
Martin Hurton committed
391 392 393
        //  Even when we stop polling as soon as there is no
        //  data to send, the poller may invoke out_event one
        //  more time due to 'speculative write' optimisation.
394 395
        if (unlikely (_encoder == NULL)) {
            zmq_assert (_handshaking);
Martin Hurton's avatar
Martin Hurton committed
396 397 398
            return;
        }

399 400
        _outpos = NULL;
        _outsize = _encoder->encode (&_outpos, 0);
401

402 403
        while (_outsize < static_cast<size_t> (out_batch_size)) {
            if ((this->*_next_msg) (&_tx_msg) == -1)
404
                break;
405 406 407
            _encoder->load_msg (&_tx_msg);
            unsigned char *bufptr = _outpos + _outsize;
            size_t n = _encoder->encode (&bufptr, out_batch_size - _outsize);
408
            zmq_assert (n > 0);
409 410 411
            if (_outpos == NULL)
                _outpos = bufptr;
            _outsize += n;
412
        }
413 414

        //  If there is no data to send, stop polling for output.
415 416 417
        if (_outsize == 0) {
            _output_stopped = true;
            reset_pollout (_handle);
418 419 420 421 422 423
            return;
        }
    }

    //  If there are any data to write in write buffer, write as much as
    //  possible to the socket. Note that amount of data to write can be
424
    //  arbitrarily large. However, we assume that underlying TCP layer has
425 426
    //  limited transmission buffer and thus the actual number of bytes
    //  written should be reasonably modest.
427
    const int nbytes = tcp_write (_s, _outpos, _outsize);
428

Martin Hurton's avatar
Martin Hurton committed
429 430
    //  IO error has occurred. We stop waiting for output events.
    //  The engine is not terminated until we detect input error;
431
    //  this is necessary to prevent losing incoming messages.
432
    if (nbytes == -1) {
433
        reset_pollout (_handle);
434 435 436
        return;
    }

437 438
    _outpos += nbytes;
    _outsize -= nbytes;
Martin Hurton's avatar
Martin Hurton committed
439 440 441

    //  If we are still handshaking and there are no data
    //  to send, stop polling for output.
442 443 444
    if (unlikely (_handshaking))
        if (_outsize == 0)
            reset_pollout (_handle);
445 446
}

447
void zmq::stream_engine_t::restart_output ()
448
{
449
    if (unlikely (_io_error))
450 451
        return;

452 453 454
    if (likely (_output_stopped)) {
        set_pollout (_handle);
        _output_stopped = false;
455
    }
456 457 458 459 460 461 462 463

    //  Speculative write: The assumption is that at the moment new message
    //  was sent by the user the socket is probably available for writing.
    //  Thus we try to write the data to socket avoiding polling for POLLOUT.
    //  Consequently, the latency should be better in request/reply scenarios.
    out_event ();
}

464
bool zmq::stream_engine_t::restart_input ()
465
{
466 467 468
    zmq_assert (_input_stopped);
    zmq_assert (_session != NULL);
    zmq_assert (_decoder != NULL);
469

470
    int rc = (this->*_process_msg) (_decoder->msg ());
471 472
    if (rc == -1) {
        if (errno == EAGAIN)
473
            _session->flush ();
474
        else {
475
            error (protocol_error);
476 477 478
            return false;
        }
        return true;
Martin Hurton's avatar
Martin Hurton committed
479 480
    }

481
    while (_insize > 0) {
482
        size_t processed = 0;
483 484 485 486
        rc = _decoder->decode (_inpos, _insize, processed);
        zmq_assert (processed <= _insize);
        _inpos += processed;
        _insize -= processed;
487 488
        if (rc == 0 || rc == -1)
            break;
489
        rc = (this->*_process_msg) (_decoder->msg ());
490 491 492
        if (rc == -1)
            break;
    }
493

494
    if (rc == -1 && errno == EAGAIN)
495
        _session->flush ();
496
    else if (_io_error) {
497
        error (connection_error);
498 499
        return false;
    } else if (rc == -1) {
500
        error (protocol_error);
501 502 503
        return false;
    }

504
    else {
505 506 507
        _input_stopped = false;
        set_pollin (_handle);
        _session->flush ();
508 509

        //  Speculative read.
510 511
        if (!in_event_internal ())
            return false;
512
    }
513 514

    return true;
515 516
}

517 518 519
//  Position of the revision field in the greeting.
const size_t revision_pos = 10;

520
bool zmq::stream_engine_t::handshake ()
Martin Hurton's avatar
Martin Hurton committed
521
{
522 523
    zmq_assert (_handshaking);
    zmq_assert (_greeting_bytes_read < _greeting_size);
524
    //  Receive the greeting.
525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553
    const int rc = receive_greeting ();
    if (rc == -1)
        return false;
    const bool unversioned = rc != 0;

    if (!(this
            ->*select_handshake_fun (unversioned,
                                     _greeting_recv[revision_pos])) ())
        return false;

    // Start polling for output if necessary.
    if (_outsize == 0)
        set_pollout (_handle);

    //  Handshaking was successful.
    //  Switch into the normal message flow.
    _handshaking = false;

    if (_has_handshake_timer) {
        cancel_timer (handshake_timer_id);
        _has_handshake_timer = false;
    }

    return true;
}

int zmq::stream_engine_t::receive_greeting ()
{
    bool unversioned = false;
554 555 556
    while (_greeting_bytes_read < _greeting_size) {
        const int n = tcp_read (_s, _greeting_recv + _greeting_bytes_read,
                                _greeting_size - _greeting_bytes_read);
557
        if (n == 0) {
558
            errno = EPIPE;
559
            error (connection_error);
560
            return -1;
561
        }
562 563
        if (n == -1) {
            if (errno != EAGAIN)
564
                error (connection_error);
565
            return -1;
566
        }
Martin Hurton's avatar
Martin Hurton committed
567

568
        _greeting_bytes_read += n;
Martin Hurton's avatar
Martin Hurton committed
569

570 571 572
        //  We have received at least one byte from the peer.
        //  If the first byte is not 0xff, we know that the
        //  peer is using unversioned protocol.
573 574
        if (_greeting_recv[0] != 0xff) {
            unversioned = true;
575
            break;
576
        }
Martin Hurton's avatar
Martin Hurton committed
577

578
        if (_greeting_bytes_read < signature_size)
579
            continue;
Martin Hurton's avatar
Martin Hurton committed
580

581 582
        //  Inspect the right-most bit of the 10th byte (which coincides
        //  with the 'flags' field if a regular message was sent).
583
        //  Zero indicates this is a header of a routing id message
584
        //  (i.e. the peer is using the unversioned protocol).
585 586
        if (!(_greeting_recv[9] & 0x01)) {
            unversioned = true;
587
            break;
588
        }
Martin Hurton's avatar
Martin Hurton committed
589

590
        //  The peer is using versioned protocol.
591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606
        receive_greeting_versioned ();
    }
    return unversioned ? 1 : 0;
}

void zmq::stream_engine_t::receive_greeting_versioned ()
{
    //  Send the major version number.
    if (_outpos + _outsize == _greeting_send + signature_size) {
        if (_outsize == 0)
            set_pollout (_handle);
        _outpos[_outsize++] = 3; //  Major version number
    }

    if (_greeting_bytes_read > signature_size) {
        if (_outpos + _outsize == _greeting_send + signature_size + 1) {
607 608
            if (_outsize == 0)
                set_pollout (_handle);
609

610
            //  Use ZMTP/2.0 to talk to older peers.
611 612
            if (_greeting_recv[revision_pos] == ZMTP_1_0
                || _greeting_recv[revision_pos] == ZMTP_2_0)
613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634
                _outpos[_outsize++] = _options.type;
            else {
                _outpos[_outsize++] = 0; //  Minor version number
                memset (_outpos + _outsize, 0, 20);

                zmq_assert (_options.mechanism == ZMQ_NULL
                            || _options.mechanism == ZMQ_PLAIN
                            || _options.mechanism == ZMQ_CURVE
                            || _options.mechanism == ZMQ_GSSAPI);

                if (_options.mechanism == ZMQ_NULL)
                    memcpy (_outpos + _outsize, "NULL", 4);
                else if (_options.mechanism == ZMQ_PLAIN)
                    memcpy (_outpos + _outsize, "PLAIN", 5);
                else if (_options.mechanism == ZMQ_GSSAPI)
                    memcpy (_outpos + _outsize, "GSSAPI", 6);
                else if (_options.mechanism == ZMQ_CURVE)
                    memcpy (_outpos + _outsize, "CURVE", 5);
                _outsize += 20;
                memset (_outpos + _outsize, 0, 32);
                _outsize += 32;
                _greeting_size = v3_greeting_size;
635
            }
Martin Hurton's avatar
Martin Hurton committed
636 637
        }
    }
638
}
Martin Hurton's avatar
Martin Hurton committed
639

640 641 642 643
zmq::stream_engine_t::handshake_fun_t
zmq::stream_engine_t::select_handshake_fun (bool unversioned,
                                            unsigned char revision)
{
644
    //  Is the peer using ZMTP/1.0 with no revision number?
645 646 647 648 649 650 651 652 653 654 655 656
    if (unversioned) {
        return &stream_engine_t::handshake_v1_0_unversioned;
    }
    switch (revision) {
        case ZMTP_1_0:
            return &stream_engine_t::handshake_v1_0;
        case ZMTP_2_0:
            return &stream_engine_t::handshake_v2_0;
        default:
            return &stream_engine_t::handshake_v3_0;
    }
}
657

658 659 660 661 662 663 664 665
bool zmq::stream_engine_t::handshake_v1_0_unversioned ()
{
    //  We send and receive rest of routing id message
    if (_session->zap_enabled ()) {
        // reject ZMTP 1.0 connections if ZAP is enabled
        error (protocol_error);
        return false;
    }
666

667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706
    _encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
    alloc_assert (_encoder);

    _decoder =
      new (std::nothrow) v1_decoder_t (in_batch_size, _options.maxmsgsize);
    alloc_assert (_decoder);

    //  We have already sent the message header.
    //  Since there is no way to tell the encoder to
    //  skip the message header, we simply throw that
    //  header data away.
    const size_t header_size =
      _options.routing_id_size + 1 >= UCHAR_MAX ? 10 : 2;
    unsigned char tmp[10], *bufferp = tmp;

    //  Prepare the routing id message and load it into encoder.
    //  Then consume bytes we have already sent to the peer.
    const int rc = _tx_msg.init_size (_options.routing_id_size);
    zmq_assert (rc == 0);
    memcpy (_tx_msg.data (), _options.routing_id, _options.routing_id_size);
    _encoder->load_msg (&_tx_msg);
    const size_t buffer_size = _encoder->encode (&bufferp, header_size);
    zmq_assert (buffer_size == header_size);

    //  Make sure the decoder sees the data we have already received.
    _inpos = _greeting_recv;
    _insize = _greeting_bytes_read;

    //  To allow for interoperability with peers that do not forward
    //  their subscriptions, we inject a phantom subscription message
    //  message into the incoming message stream.
    if (_options.type == ZMQ_PUB || _options.type == ZMQ_XPUB)
        _subscription_required = true;

    //  We are sending our routing id now and the next message
    //  will come from the socket.
    _next_msg = &stream_engine_t::pull_msg_from_session;

    //  We are expecting routing id message.
    _process_msg = &stream_engine_t::process_routing_id_msg;
707

708 709
    return true;
}
Martin Hurton's avatar
Martin Hurton committed
710

711 712 713 714 715 716 717
bool zmq::stream_engine_t::handshake_v1_0 ()
{
    if (_session->zap_enabled ()) {
        // reject ZMTP 1.0 connections if ZAP is enabled
        error (protocol_error);
        return false;
    }
718

719 720
    _encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
    alloc_assert (_encoder);
721

722 723 724
    _decoder =
      new (std::nothrow) v1_decoder_t (in_batch_size, _options.maxmsgsize);
    alloc_assert (_decoder);
725

726 727
    return true;
}
728

729 730 731 732 733 734 735
bool zmq::stream_engine_t::handshake_v2_0 ()
{
    if (_session->zap_enabled ()) {
        // reject ZMTP 2.0 connections if ZAP is enabled
        error (protocol_error);
        return false;
    }
736

737 738 739 740 741 742
    _encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
    alloc_assert (_encoder);

    _decoder = new (std::nothrow)
      v2_decoder_t (in_batch_size, _options.maxmsgsize, _options.zero_copy);
    alloc_assert (_decoder);
743

744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767
    return true;
}

bool zmq::stream_engine_t::handshake_v3_0 ()
{
    _encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
    alloc_assert (_encoder);

    _decoder = new (std::nothrow)
      v2_decoder_t (in_batch_size, _options.maxmsgsize, _options.zero_copy);
    alloc_assert (_decoder);

    if (_options.mechanism == ZMQ_NULL
        && memcmp (_greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",
                   20)
             == 0) {
        _mechanism = new (std::nothrow)
          null_mechanism_t (_session, _peer_address, _options);
        alloc_assert (_mechanism);
    } else if (_options.mechanism == ZMQ_PLAIN
               && memcmp (_greeting_recv + 12,
                          "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
                    == 0) {
        if (_options.as_server)
768
            _mechanism = new (std::nothrow)
769 770 771 772 773
              plain_server_t (_session, _peer_address, _options);
        else
            _mechanism = new (std::nothrow) plain_client_t (_session, _options);
        alloc_assert (_mechanism);
    }
774
#ifdef ZMQ_HAVE_CURVE
775 776 777 778 779 780 781 782 783 784 785
    else if (_options.mechanism == ZMQ_CURVE
             && memcmp (_greeting_recv + 12,
                        "CURVE\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
                  == 0) {
        if (_options.as_server)
            _mechanism = new (std::nothrow)
              curve_server_t (_session, _peer_address, _options);
        else
            _mechanism = new (std::nothrow) curve_client_t (_session, _options);
        alloc_assert (_mechanism);
    }
786
#endif
Chris Laws's avatar
Chris Laws committed
787
#ifdef HAVE_LIBGSSAPI_KRB5
788 789 790 791 792 793 794 795 796 797 798
    else if (_options.mechanism == ZMQ_GSSAPI
             && memcmp (_greeting_recv + 12,
                        "GSSAPI\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
                  == 0) {
        if (_options.as_server)
            _mechanism = new (std::nothrow)
              gssapi_server_t (_session, _peer_address, _options);
        else
            _mechanism =
              new (std::nothrow) gssapi_client_t (_session, _options);
        alloc_assert (_mechanism);
799
    }
800 801 802 803 804 805 806
#endif
    else {
        _session->get_socket ()->event_handshake_failed_protocol (
          _session->get_endpoint (),
          ZMQ_PROTOCOL_ERROR_ZMTP_MECHANISM_MISMATCH);
        error (protocol_error);
        return false;
807
    }
808 809
    _next_msg = &stream_engine_t::next_handshake_command;
    _process_msg = &stream_engine_t::process_handshake_command;
810

Martin Hurton's avatar
Martin Hurton committed
811 812 813
    return true;
}

814
int zmq::stream_engine_t::routing_id_msg (msg_t *msg_)
815
{
816
    int rc = msg_->init_size (_options.routing_id_size);
817
    errno_assert (rc == 0);
818 819 820
    if (_options.routing_id_size > 0)
        memcpy (msg_->data (), _options.routing_id, _options.routing_id_size);
    _next_msg = &stream_engine_t::pull_msg_from_session;
821 822
    return 0;
}
823

824
int zmq::stream_engine_t::process_routing_id_msg (msg_t *msg_)
825
{
826
    if (_options.recv_routing_id) {
827
        msg_->set_flags (msg_t::routing_id);
828
        int rc = _session->push_msg (msg_);
829
        errno_assert (rc == 0);
830
    } else {
831 832 833 834 835 836
        int rc = msg_->close ();
        errno_assert (rc == 0);
        rc = msg_->init ();
        errno_assert (rc == 0);
    }

837
    if (_subscription_required) {
838 839 840 841 842 843
        msg_t subscription;

        //  Inject the subscription message, so that also
        //  ZMQ 2.x peers receive published messages.
        int rc = subscription.init_size (1);
        errno_assert (rc == 0);
844
        *static_cast<unsigned char *> (subscription.data ()) = 1;
845
        rc = _session->push_msg (&subscription);
846 847 848
        errno_assert (rc == 0);
    }

849
    _process_msg = &stream_engine_t::push_msg_to_session;
850

851 852
    return 0;
}
853

854
int zmq::stream_engine_t::next_handshake_command (msg_t *msg_)
855
{
856
    zmq_assert (_mechanism != NULL);
857

858
    if (_mechanism->status () == mechanism_t::ready) {
859 860
        mechanism_ready ();
        return pull_and_encode (msg_);
861
    }
862
    if (_mechanism->status () == mechanism_t::error) {
863 864
        errno = EPROTO;
        return -1;
865
    } else {
866
        const int rc = _mechanism->next_handshake_command (msg_);
Vincent Tellier's avatar
Vincent Tellier committed
867

868 869
        if (rc == 0)
            msg_->set_flags (msg_t::command);
Vincent Tellier's avatar
Vincent Tellier committed
870

871
        return rc;
872
    }
873 874
}

875
int zmq::stream_engine_t::process_handshake_command (msg_t *msg_)
876
{
877 878
    zmq_assert (_mechanism != NULL);
    const int rc = _mechanism->process_handshake_command (msg_);
879
    if (rc == 0) {
880
        if (_mechanism->status () == mechanism_t::ready)
881
            mechanism_ready ();
882
        else if (_mechanism->status () == mechanism_t::error) {
883 884 885
            errno = EPROTO;
            return -1;
        }
886
        if (_output_stopped)
887
            restart_output ();
888 889
    }

890
    return rc;
891 892
}

893 894
void zmq::stream_engine_t::zap_msg_available ()
{
895
    zmq_assert (_mechanism != NULL);
896

897
    const int rc = _mechanism->zap_msg_available ();
898
    if (rc == -1) {
899
        error (protocol_error);
900 901
        return;
    }
902
    if (_input_stopped)
903 904
        if (!restart_input ())
            return;
905
    if (_output_stopped)
906
        restart_output ();
907 908
}

909
const zmq::endpoint_uri_pair_t &zmq::stream_engine_t::get_endpoint () const
910
{
911
    return _endpoint_uri_pair;
912 913
}

914
void zmq::stream_engine_t::mechanism_ready ()
915
{
916 917 918
    if (_options.heartbeat_interval > 0) {
        add_timer (_options.heartbeat_interval, heartbeat_ivl_timer_id);
        _has_heartbeat_timer = true;
919 920
    }

921 922
    bool flush_session = false;

923
    if (_options.recv_routing_id) {
924
        msg_t routing_id;
925 926
        _mechanism->peer_routing_id (&routing_id);
        const int rc = _session->push_msg (&routing_id);
927 928 929
        if (rc == -1 && errno == EAGAIN) {
            // If the write is failing at this stage with
            // an EAGAIN the pipe must be being shut down,
930
            // so we can just bail out of the routing id set.
931 932
            return;
        }
933
        errno_assert (rc == 0);
934 935 936 937 938 939 940 941 942 943 944 945 946 947 948
        flush_session = true;
    }

    if (_options.router_notify & ZMQ_NOTIFY_CONNECT) {
        msg_t connect_notification;
        connect_notification.init ();
        const int rc = _session->push_msg (&connect_notification);
        if (rc == -1 && errno == EAGAIN) {
            // If the write is failing at this stage with
            // an EAGAIN the pipe must be being shut down,
            // so we can just bail out of the notification.
            return;
        }
        errno_assert (rc == 0);
        flush_session = true;
949
    }
950

951 952 953
    if (flush_session)
        _session->flush ();

954 955
    _next_msg = &stream_engine_t::pull_and_encode;
    _process_msg = &stream_engine_t::write_credential;
956 957 958

    //  Compile metadata.
    properties_t properties;
959
    init_properties (properties);
960

Pieter Hintjens's avatar
Pieter Hintjens committed
961
    //  Add ZAP properties.
962
    const properties_t &zap_properties = _mechanism->get_zap_properties ();
963
    properties.insert (zap_properties.begin (), zap_properties.end ());
Pieter Hintjens's avatar
Pieter Hintjens committed
964

965
    //  Add ZMTP properties.
966
    const properties_t &zmtp_properties = _mechanism->get_zmtp_properties ();
967
    properties.insert (zmtp_properties.begin (), zmtp_properties.end ());
968

969
    zmq_assert (_metadata == NULL);
970
    if (!properties.empty ()) {
971 972
        _metadata = new (std::nothrow) metadata_t (properties);
        alloc_assert (_metadata);
973
    }
Vincent Tellier's avatar
Vincent Tellier committed
974

975
    _socket->event_handshake_succeeded (_endpoint_uri_pair, 0);
976 977
}

978
int zmq::stream_engine_t::pull_msg_from_session (msg_t *msg_)
979
{
980
    return _session->pull_msg (msg_);
981
}
982

983 984
int zmq::stream_engine_t::push_msg_to_session (msg_t *msg_)
{
985
    return _session->push_msg (msg_);
986 987
}

988 989
int zmq::stream_engine_t::push_raw_msg_to_session (msg_t *msg_)
{
990 991
    if (_metadata && _metadata != msg_->metadata ())
        msg_->set_metadata (_metadata);
992
    return push_msg_to_session (msg_);
993 994
}

995 996
int zmq::stream_engine_t::write_credential (msg_t *msg_)
{
997 998
    zmq_assert (_mechanism != NULL);
    zmq_assert (_session != NULL);
999

1000
    const blob_t &credential = _mechanism->get_user_id ();
1001 1002 1003 1004 1005 1006
    if (credential.size () > 0) {
        msg_t msg;
        int rc = msg.init_size (credential.size ());
        zmq_assert (rc == 0);
        memcpy (msg.data (), credential.data (), credential.size ());
        msg.set_flags (msg_t::credential);
1007
        rc = _session->push_msg (&msg);
1008 1009 1010 1011 1012 1013
        if (rc == -1) {
            rc = msg.close ();
            errno_assert (rc == 0);
            return -1;
        }
    }
1014
    _process_msg = &stream_engine_t::decode_and_push;
1015 1016 1017
    return decode_and_push (msg_);
}

1018 1019
int zmq::stream_engine_t::pull_and_encode (msg_t *msg_)
{
1020
    zmq_assert (_mechanism != NULL);
1021

1022
    if (_session->pull_msg (msg_) == -1)
1023
        return -1;
1024
    if (_mechanism->encode (msg_) == -1)
1025 1026 1027 1028 1029 1030
        return -1;
    return 0;
}

int zmq::stream_engine_t::decode_and_push (msg_t *msg_)
{
1031
    zmq_assert (_mechanism != NULL);
1032

1033
    if (_mechanism->decode (msg_) == -1)
1034
        return -1;
Jonathan Reams's avatar
Jonathan Reams committed
1035

1036 1037
    if (_has_timeout_timer) {
        _has_timeout_timer = false;
1038
        cancel_timer (heartbeat_timeout_timer_id);
Jonathan Reams's avatar
Jonathan Reams committed
1039 1040
    }

1041 1042
    if (_has_ttl_timer) {
        _has_ttl_timer = false;
1043
        cancel_timer (heartbeat_ttl_timer_id);
Jonathan Reams's avatar
Jonathan Reams committed
1044 1045
    }

1046
    if (msg_->flags () & msg_t::command) {
1047
        process_command_message (msg_);
Jonathan Reams's avatar
Jonathan Reams committed
1048 1049
    }

1050 1051 1052
    if (_metadata)
        msg_->set_metadata (_metadata);
    if (_session->push_msg (msg_) == -1) {
1053
        if (errno == EAGAIN)
1054
            _process_msg = &stream_engine_t::push_one_then_decode_and_push;
1055 1056 1057 1058 1059 1060 1061
        return -1;
    }
    return 0;
}

int zmq::stream_engine_t::push_one_then_decode_and_push (msg_t *msg_)
{
1062
    const int rc = _session->push_msg (msg_);
1063
    if (rc == 0)
1064
        _process_msg = &stream_engine_t::decode_and_push;
1065 1066 1067
    return rc;
}

1068
void zmq::stream_engine_t::error (error_reason_t reason_)
1069
{
1070
    if (_options.raw_socket && _options.raw_notify) {
1071 1072 1073
        //  For raw sockets, send a final 0-length message to the application
        //  so that it knows the peer has been disconnected.
        msg_t terminator;
1074
        terminator.init ();
1075
        (this->*_process_msg) (&terminator);
1076
        terminator.close ();
1077
    }
1078
    zmq_assert (_session);
1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090

    if ((_options.router_notify & ZMQ_NOTIFY_DISCONNECT) && !_handshaking) {
        // For router sockets with disconnect notification, rollback
        // any incomplete message in the pipe, and push the disconnect
        // notification message.
        _session->rollback ();

        msg_t disconnect_notification;
        disconnect_notification.init ();
        _session->push_msg (&disconnect_notification);
    }

1091
    // protocol errors have been signaled already at the point where they occurred
1092
    if (reason_ != protocol_error
1093 1094
        && (_mechanism == NULL
            || _mechanism->status () == mechanism_t::handshaking)) {
1095
        int err = errno;
1096
        _socket->event_handshake_failed_no_detail (_endpoint_uri_pair, err);
1097
    }
1098

1099
    _socket->event_disconnected (_endpoint_uri_pair, _s);
1100 1101
    _session->flush ();
    _session->engine_error (reason_);
1102 1103 1104 1105
    unplug ();
    delete this;
}

1106
void zmq::stream_engine_t::set_handshake_timer ()
1107
{
1108
    zmq_assert (!_has_handshake_timer);
1109

1110 1111 1112
    if (!_options.raw_socket && _options.handshake_ivl > 0) {
        add_timer (_options.handshake_ivl, handshake_timer_id);
        _has_handshake_timer = true;
1113
    }
1114 1115
}

1116
bool zmq::stream_engine_t::init_properties (properties_t &properties_)
1117
{
1118
    if (_peer_address.empty ())
1119
        return false;
1120
    properties_.ZMQ_MAP_INSERT_OR_EMPLACE (
1121
      std::string (ZMQ_MSG_PROPERTY_PEER_ADDRESS), _peer_address);
1122 1123

    //  Private property to support deprecated SRCFD
1124
    std::ostringstream stream;
1125
    stream << static_cast<int> (_s);
1126
    std::string fd_string = stream.str ();
1127 1128
    properties_.ZMQ_MAP_INSERT_OR_EMPLACE (std::string ("__fd"),
                                           ZMQ_MOVE (fd_string));
Thomas Rodgers's avatar
Thomas Rodgers committed
1129 1130 1131
    return true;
}

1132
void zmq::stream_engine_t::timer_event (int id_)
1133
{
1134
    if (id_ == handshake_timer_id) {
1135
        _has_handshake_timer = false;
Jonathan Reams's avatar
Jonathan Reams committed
1136 1137
        //  handshake timer expired before handshake completed, so engine fail
        error (timeout_error);
1138
    } else if (id_ == heartbeat_ivl_timer_id) {
1139
        _next_msg = &stream_engine_t::produce_ping_message;
1140
        out_event ();
1141
        add_timer (_options.heartbeat_interval, heartbeat_ivl_timer_id);
1142
    } else if (id_ == heartbeat_ttl_timer_id) {
1143
        _has_ttl_timer = false;
1144 1145
        error (timeout_error);
    } else if (id_ == heartbeat_timeout_timer_id) {
1146
        _has_timeout_timer = false;
1147 1148
        error (timeout_error);
    } else
Jonathan Reams's avatar
Jonathan Reams committed
1149
        // There are no other valid timer ids!
1150
        assert (false);
Jonathan Reams's avatar
Jonathan Reams committed
1151 1152
}

1153
int zmq::stream_engine_t::produce_ping_message (msg_t *msg_)
Jonathan Reams's avatar
Jonathan Reams committed
1154
{
1155 1156
    // 16-bit TTL + \4PING == 7
    const size_t ping_ttl_len = msg_t::ping_cmd_name_size + 2;
1157
    zmq_assert (_mechanism != NULL);
Jonathan Reams's avatar
Jonathan Reams committed
1158

1159
    int rc = msg_->init_size (ping_ttl_len);
1160 1161
    errno_assert (rc == 0);
    msg_->set_flags (msg_t::command);
Jonathan Reams's avatar
Jonathan Reams committed
1162
    // Copy in the command message
1163
    memcpy (msg_->data (), "\4PING", msg_t::ping_cmd_name_size);
Jonathan Reams's avatar
Jonathan Reams committed
1164

1165
    uint16_t ttl_val = htons (_options.heartbeat_ttl);
1166
    memcpy (static_cast<uint8_t *> (msg_->data ()) + msg_t::ping_cmd_name_size,
1167
            &ttl_val, sizeof (ttl_val));
Jonathan Reams's avatar
Jonathan Reams committed
1168

1169 1170 1171 1172 1173
    rc = _mechanism->encode (msg_);
    _next_msg = &stream_engine_t::pull_and_encode;
    if (!_has_timeout_timer && _heartbeat_timeout > 0) {
        add_timer (_heartbeat_timeout, heartbeat_timeout_timer_id);
        _has_timeout_timer = true;
Jonathan Reams's avatar
Jonathan Reams committed
1174 1175 1176 1177
    }
    return rc;
}

1178
int zmq::stream_engine_t::produce_pong_message (msg_t *msg_)
Jonathan Reams's avatar
Jonathan Reams committed
1179
{
1180
    zmq_assert (_mechanism != NULL);
Jonathan Reams's avatar
Jonathan Reams committed
1181

1182
    int rc = msg_->move (_pong_msg);
1183
    errno_assert (rc == 0);
Jonathan Reams's avatar
Jonathan Reams committed
1184

1185 1186
    rc = _mechanism->encode (msg_);
    _next_msg = &stream_engine_t::pull_and_encode;
Jonathan Reams's avatar
Jonathan Reams committed
1187 1188 1189
    return rc;
}

1190
int zmq::stream_engine_t::process_heartbeat_message (msg_t *msg_)
Jonathan Reams's avatar
Jonathan Reams committed
1191
{
1192 1193 1194 1195
    if (msg_->is_ping ()) {
        // 16-bit TTL + \4PING == 7
        const size_t ping_ttl_len = msg_t::ping_cmd_name_size + 2;
        const size_t ping_max_ctx_len = 16;
Jonathan Reams's avatar
Jonathan Reams committed
1196
        uint16_t remote_heartbeat_ttl;
1197

Jonathan Reams's avatar
Jonathan Reams committed
1198
        // Get the remote heartbeat TTL to setup the timer
1199
        memcpy (&remote_heartbeat_ttl,
1200 1201 1202
                static_cast<uint8_t *> (msg_->data ())
                  + msg_t::ping_cmd_name_size,
                ping_ttl_len - msg_t::ping_cmd_name_size);
1203
        remote_heartbeat_ttl = ntohs (remote_heartbeat_ttl);
Jonathan Reams's avatar
Jonathan Reams committed
1204
        // The remote heartbeat is in 10ths of a second
1205 1206
        // so we multiply it by 100 to get the timer interval in ms.
        remote_heartbeat_ttl *= 100;
Jonathan Reams's avatar
Jonathan Reams committed
1207

1208
        if (!_has_ttl_timer && remote_heartbeat_ttl > 0) {
1209
            add_timer (remote_heartbeat_ttl, heartbeat_ttl_timer_id);
1210
            _has_ttl_timer = true;
Jonathan Reams's avatar
Jonathan Reams committed
1211
        }
1212

1213 1214 1215 1216 1217
        //  As per ZMTP 3.1 the PING command might contain an up to 16 bytes
        //  context which needs to be PONGed back, so build the pong message
        //  here and store it. Truncate it if it's too long.
        //  Given the engine goes straight to out_event, sequential PINGs will
        //  not be a problem.
1218
        const size_t context_len =
1219
          std::min (msg_->size () - ping_ttl_len, ping_max_ctx_len);
1220 1221
        const int rc =
          _pong_msg.init_size (msg_t::ping_cmd_name_size + context_len);
1222
        errno_assert (rc == 0);
1223
        _pong_msg.set_flags (msg_t::command);
1224
        memcpy (_pong_msg.data (), "\4PONG", msg_t::ping_cmd_name_size);
1225
        if (context_len > 0)
1226
            memcpy (static_cast<uint8_t *> (_pong_msg.data ())
1227
                      + msg_t::ping_cmd_name_size,
1228
                    static_cast<uint8_t *> (msg_->data ()) + ping_ttl_len,
1229
                    context_len);
1230

1231
        _next_msg = &stream_engine_t::produce_pong_message;
1232
        out_event ();
Jonathan Reams's avatar
Jonathan Reams committed
1233 1234 1235
    }

    return 0;
1236
}
1237 1238 1239

int zmq::stream_engine_t::process_command_message (msg_t *msg_)
{
1240 1241
    const uint8_t cmd_name_size =
      *(static_cast<const uint8_t *> (msg_->data ()));
1242 1243 1244
    const size_t ping_name_size = msg_t::ping_cmd_name_size - 1;
    const size_t sub_name_size = msg_t::sub_cmd_name_size - 1;
    const size_t cancel_name_size = msg_t::cancel_cmd_name_size - 1;
1245
    //  Malformed command
1246
    if (unlikely (msg_->size () < cmd_name_size + sizeof (cmd_name_size)))
1247 1248
        return -1;

1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263
    uint8_t *cmd_name = (static_cast<uint8_t *> (msg_->data ())) + 1;
    if (cmd_name_size == ping_name_size
        && memcmp (cmd_name, "PING", cmd_name_size) == 0)
        msg_->set_flags (zmq::msg_t::ping);
    if (cmd_name_size == ping_name_size
        && memcmp (cmd_name, "PONG", cmd_name_size) == 0)
        msg_->set_flags (zmq::msg_t::pong);
    if (cmd_name_size == sub_name_size
        && memcmp (cmd_name, "SUBSCRIBE", cmd_name_size) == 0)
        msg_->set_flags (zmq::msg_t::subscribe);
    if (cmd_name_size == cancel_name_size
        && memcmp (cmd_name, "CANCEL", cmd_name_size) == 0)
        msg_->set_flags (zmq::msg_t::cancel);

    if (msg_->is_ping () || msg_->is_pong ())
1264 1265 1266 1267
        return process_heartbeat_message (msg_);

    return 0;
}