stream_engine.cpp 32.8 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
32 33

#include <string.h>
34 35

#ifndef ZMQ_HAVE_WINDOWS
36
#include <unistd.h>
37 38
#endif

39
#include <new>
40
#include <sstream>
41

42
#include "stream_engine.hpp"
43
#include "io_thread.hpp"
44
#include "session_base.hpp"
45 46
#include "v1_encoder.hpp"
#include "v1_decoder.hpp"
47 48
#include "v2_encoder.hpp"
#include "v2_decoder.hpp"
49
#include "null_mechanism.hpp"
50 51
#include "plain_client.hpp"
#include "plain_server.hpp"
52 53
#include "gssapi_client.hpp"
#include "gssapi_server.hpp"
54 55
#include "curve_client.hpp"
#include "curve_server.hpp"
56 57
#include "raw_decoder.hpp"
#include "raw_encoder.hpp"
58 59
#include "config.hpp"
#include "err.hpp"
60
#include "ip.hpp"
61
#include "tcp.hpp"
Martin Hurton's avatar
Martin Hurton committed
62 63
#include "likely.hpp"
#include "wire.hpp"
64

Chris Laws's avatar
Chris Laws committed
65
zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
66
                                       const std::string &endpoint_) :
67
    s (fd_),
68
    as_server(false),
69
    handle((handle_t)NULL),
70 71
    inpos (NULL),
    insize (0),
72
    decoder (NULL),
73 74
    outpos (NULL),
    outsize (0),
75
    encoder (NULL),
76
    metadata (NULL),
Martin Hurton's avatar
Martin Hurton committed
77
    handshaking (true),
78
    greeting_size (v2_greeting_size),
Martin Hurton's avatar
Martin Hurton committed
79
    greeting_bytes_read (0),
80 81
    session (NULL),
    options (options_),
82
    endpoint (endpoint_),
83
    plugged (false),
84 85
    next_msg (&stream_engine_t::routing_id_msg),
    process_msg (&stream_engine_t::process_routing_id_msg),
86 87
    io_error (false),
    subscription_required (false),
88
    mechanism (NULL),
89 90
    input_stopped (false),
    output_stopped (false),
91
    has_handshake_timer (false),
Jonathan Reams's avatar
Jonathan Reams committed
92 93 94
    has_ttl_timer (false),
    has_timeout_timer (false),
    has_heartbeat_timer (false),
95
    heartbeat_timeout (0),
96
    socket (NULL)
97
{
98 99
    int rc = tx_msg.init ();
    errno_assert (rc == 0);
Chris Laws's avatar
Chris Laws committed
100

Martin Hurton's avatar
Martin Hurton committed
101
    //  Put the socket into non-blocking mode.
102
    unblock_socket (s);
103

104 105
    int family = get_peer_ip_address (s, peer_address);
    if (family == 0)
106
        peer_address.clear();
107
#if defined ZMQ_HAVE_SO_PEERCRED
108 109
    else
    if (family == PF_UNIX) {
110 111 112 113 114 115 116 117 118
        struct ucred cred;
        socklen_t size = sizeof (cred);
        if (!getsockopt (s, SOL_SOCKET, SO_PEERCRED, &cred, &size)) {
            std::ostringstream buf;
            buf << ":" << cred.uid << ":" << cred.gid << ":" << cred.pid;
            peer_address += buf.str ();
        }
    }
#elif defined ZMQ_HAVE_LOCAL_PEERCRED
119 120
    else
    if (family == PF_UNIX) {
121 122 123 124 125 126 127 128 129 130 131 132 133
        struct xucred cred;
        socklen_t size = sizeof (cred);
        if (!getsockopt (s, 0, LOCAL_PEERCRED, &cred, &size)
                && cred.cr_version == XUCRED_VERSION) {
            std::ostringstream buf;
            buf << ":" << cred.cr_uid << ":";
            if (cred.cr_ngroups > 0)
                buf << cred.cr_groups[0];
            buf << ":";
            peer_address += buf.str ();
        }
    }
#endif
134

135 136 137 138 139
    if(options.heartbeat_interval > 0) {
        heartbeat_timeout = options.heartbeat_timeout;
        if(heartbeat_timeout == -1)
            heartbeat_timeout = options.heartbeat_interval;
    }
140 141
}

142
zmq::stream_engine_t::~stream_engine_t ()
143 144 145
{
    zmq_assert (!plugged);

146 147
    if (s != retired_fd) {
#ifdef ZMQ_HAVE_WINDOWS
148 149
        int rc = closesocket (s);
        wsa_assert (rc != SOCKET_ERROR);
150
#else
151
        int rc = close (s);
152 153 154 155 156 157
#ifdef __FreeBSD_kernel__
        // FreeBSD may return ECONNRESET on close() under load but this is not
        // an error.
        if (rc == -1 && errno == ECONNRESET)
            rc = 0;
#endif
158 159
        errno_assert (rc == 0);
#endif
160
        s = retired_fd;
161
    }
162

163 164 165
    int rc = tx_msg.close ();
    errno_assert (rc == 0);

166 167
    //  Drop reference to metadata and destroy it if we are
    //  the only user.
168 169 170 171 172
    if (metadata != NULL) {
        if (metadata->drop_ref ()) {
            LIBZMQ_DELETE(metadata);
        }
    }
173

174 175 176
    LIBZMQ_DELETE(encoder);
    LIBZMQ_DELETE(decoder);
    LIBZMQ_DELETE(mechanism);
177 178
}

179 180
void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
    session_base_t *session_)
181 182 183 184 185 186 187 188
{
    zmq_assert (!plugged);
    plugged = true;

    //  Connect to session object.
    zmq_assert (!session);
    zmq_assert (session_);
    session = session_;
189
    socket = session-> get_socket ();
190 191 192 193

    //  Connect to I/O threads poller object.
    io_object_t::plug (io_thread_);
    handle = add_fd (s);
194
    io_error = false;
Martin Hurton's avatar
Martin Hurton committed
195

196
    if (options.raw_socket) {
197
        // no handshaking for raw sock, instantiate raw encoder and decoders
198
        encoder = new (std::nothrow) raw_encoder_t (out_batch_size);
199 200
        alloc_assert (encoder);

201
        decoder = new (std::nothrow) raw_decoder_t (in_batch_size);
202 203 204 205
        alloc_assert (decoder);

        // disable handshaking for raw socket
        handshaking = false;
206

Martin Hurton's avatar
Martin Hurton committed
207
        next_msg = &stream_engine_t::pull_msg_from_session;
208 209
        process_msg = &stream_engine_t::push_raw_msg_to_session;

Thomas Rodgers's avatar
Thomas Rodgers committed
210 211
        properties_t properties;
        if (init_properties(properties)) {
212 213 214
            //  Compile metadata.
            zmq_assert (metadata == NULL);
            metadata = new (std::nothrow) metadata_t (properties);
215
            alloc_assert (metadata);
216
        }
217

218 219 220 221 222
        if (options.raw_notify) {
            //  For raw sockets, send an initial 0-length message to the
            // application so that it knows a peer has connected.
            msg_t connector;
            connector.init();
223
            push_raw_msg_to_session (&connector);
224 225 226
            connector.close();
            session->flush ();
        }
Martin Hurton's avatar
Martin Hurton committed
227 228
    }
    else {
229 230 231
        // start optional timer, to prevent handshake hanging on no input
        set_handshake_timer ();

232
        //  Send the 'length' and 'flags' fields of the routing id message.
233
        //  The 'length' field is encoded in the long format.
Pieter Hintjens's avatar
Pieter Hintjens committed
234
        outpos = greeting_send;
235
        outpos [outsize++] = 0xff;
236
        put_uint64 (&outpos [outsize], options.routing_id_size + 1);
237 238 239
        outsize += 8;
        outpos [outsize++] = 0x7f;
    }
Martin Hurton's avatar
Martin Hurton committed
240

241 242 243 244 245 246
    set_pollin (handle);
    set_pollout (handle);
    //  Flush all the data that may have been already received downstream.
    in_event ();
}

247
void zmq::stream_engine_t::unplug ()
248 249 250 251
{
    zmq_assert (plugged);
    plugged = false;

252 253 254 255 256 257
    //  Cancel all timers.
    if (has_handshake_timer) {
        cancel_timer (handshake_timer_id);
        has_handshake_timer = false;
    }

Jonathan Reams's avatar
Jonathan Reams committed
258 259 260 261 262 263 264 265 266 267 268 269 270 271
    if (has_ttl_timer) {
        cancel_timer (heartbeat_ttl_timer_id);
        has_ttl_timer = false;
    }

    if (has_timeout_timer) {
        cancel_timer (heartbeat_timeout_timer_id);
        has_timeout_timer = false;
    }

    if (has_heartbeat_timer) {
        cancel_timer (heartbeat_ivl_timer_id);
        has_heartbeat_timer = false;
    }
272
    //  Cancel all fd subscriptions.
273
    if (!io_error)
Martin Hurton's avatar
Martin Hurton committed
274
        rm_fd (handle);
275 276 277 278 279 280 281

    //  Disconnect from I/O threads poller object.
    io_object_t::unplug ();

    session = NULL;
}

282
void zmq::stream_engine_t::terminate ()
283 284 285 286 287
{
    unplug ();
    delete this;
}

288
void zmq::stream_engine_t::in_event ()
289
{
290
    zmq_assert (!io_error);
291

Pieter Hintjens's avatar
Pieter Hintjens committed
292
    //  If still handshaking, receive and process the greeting message.
Martin Hurton's avatar
Martin Hurton committed
293 294 295 296
    if (unlikely (handshaking))
        if (!handshake ())
            return;

297
    zmq_assert (decoder);
298 299

    //  If there has been an I/O error, stop polling.
300
    if (input_stopped) {
301 302 303 304
        rm_fd (handle);
        io_error = true;
        return;
    }
305

306
    //  If there's no data to process in the buffer... 
307 308 309 310 311 312
    if (!insize) {

        //  Retrieve the buffer and read as much data as possible.
        //  Note that buffer can be arbitrarily large. However, we assume
        //  the underlying TCP layer has fixed buffer size and thus the
        //  number of bytes read will be always limited.
313 314
        size_t bufsize = 0;
        decoder->get_buffer (&inpos, &bufsize);
315

316
        const int rc = tcp_read (s, inpos, bufsize);
317

318
        if (rc == 0) {
319 320
            // connection closed by peer
            errno = EPIPE;
321
            error (connection_error);
322
            return;
323
        }
324 325
        if (rc == -1) {
            if (errno != EAGAIN)
326
                error (connection_error);
327 328
            return;
        }
329

330
        //  Adjust input size
331
        insize = static_cast <size_t> (rc);
332 333
        // Adjust buffer size to received bytes
        decoder->resize_buffer(insize);
Martin Hurton's avatar
Martin Hurton committed
334
    }
335

336 337
    int rc = 0;
    size_t processed = 0;
338

339 340 341
    while (insize > 0) {
        rc = decoder->decode (inpos, insize, processed);
        zmq_assert (processed <= insize);
342 343
        inpos += processed;
        insize -= processed;
344 345
        if (rc == 0 || rc == -1)
            break;
Martin Hurton's avatar
Martin Hurton committed
346
        rc = (this->*process_msg) (decoder->msg ());
347 348
        if (rc == -1)
            break;
349 350
    }

351 352 353 354
    //  Tear down the connection if we have failed to decode input data
    //  or the session has rejected the message.
    if (rc == -1) {
        if (errno != EAGAIN) {
355
            error(protocol_error);
356 357
            return;
        }
358
        input_stopped = true;
359
        reset_pollin (handle);
Martin Hurton's avatar
Martin Hurton committed
360
    }
361 362

    session->flush ();
363 364
}

365
void zmq::stream_engine_t::out_event ()
366
{
367 368
    zmq_assert (!io_error);

369 370 371
    //  If write buffer is empty, try to read new data from the encoder.
    if (!outsize) {

Martin Hurton's avatar
Martin Hurton committed
372 373 374 375 376 377 378 379
        //  Even when we stop polling as soon as there is no
        //  data to send, the poller may invoke out_event one
        //  more time due to 'speculative write' optimisation.
        if (unlikely (encoder == NULL)) {
            zmq_assert (handshaking);
            return;
        }

380
        outpos = NULL;
381 382
        outsize = encoder->encode (&outpos, 0);

383
        while (outsize < (size_t) out_batch_size) {
Martin Hurton's avatar
Martin Hurton committed
384
            if ((this->*next_msg) (&tx_msg) == -1)
385 386 387
                break;
            encoder->load_msg (&tx_msg);
            unsigned char *bufptr = outpos + outsize;
388
            size_t n = encoder->encode (&bufptr, out_batch_size - outsize);
389 390 391 392 393
            zmq_assert (n > 0);
            if (outpos == NULL)
                outpos = bufptr;
            outsize += n;
        }
394 395 396

        //  If there is no data to send, stop polling for output.
        if (outsize == 0) {
397
            output_stopped = true;
398 399 400 401 402 403 404
            reset_pollout (handle);
            return;
        }
    }

    //  If there are any data to write in write buffer, write as much as
    //  possible to the socket. Note that amount of data to write can be
405
    //  arbitrarily large. However, we assume that underlying TCP layer has
406 407
    //  limited transmission buffer and thus the actual number of bytes
    //  written should be reasonably modest.
408
    const int nbytes = tcp_write (s, outpos, outsize);
409

Martin Hurton's avatar
Martin Hurton committed
410 411
    //  IO error has occurred. We stop waiting for output events.
    //  The engine is not terminated until we detect input error;
412
    //  this is necessary to prevent losing incoming messages.
413
    if (nbytes == -1) {
Martin Hurton's avatar
Martin Hurton committed
414
        reset_pollout (handle);
415 416 417 418 419
        return;
    }

    outpos += nbytes;
    outsize -= nbytes;
Martin Hurton's avatar
Martin Hurton committed
420 421 422 423 424 425

    //  If we are still handshaking and there are no data
    //  to send, stop polling for output.
    if (unlikely (handshaking))
        if (outsize == 0)
            reset_pollout (handle);
426 427
}

428
void zmq::stream_engine_t::restart_output ()
429
{
430 431 432
    if (unlikely (io_error))
        return;

433
    if (likely (output_stopped)) {
434
        set_pollout (handle);
435
        output_stopped = false;
436
    }
437 438 439 440 441 442 443 444

    //  Speculative write: The assumption is that at the moment new message
    //  was sent by the user the socket is probably available for writing.
    //  Thus we try to write the data to socket avoiding polling for POLLOUT.
    //  Consequently, the latency should be better in request/reply scenarios.
    out_event ();
}

445
void zmq::stream_engine_t::restart_input ()
446
{
447
    zmq_assert (input_stopped);
448 449 450
    zmq_assert (session != NULL);
    zmq_assert (decoder != NULL);

Martin Hurton's avatar
Martin Hurton committed
451
    int rc = (this->*process_msg) (decoder->msg ());
452 453 454 455
    if (rc == -1) {
        if (errno == EAGAIN)
            session->flush ();
        else
456
            error (protocol_error);
Martin Hurton's avatar
Martin Hurton committed
457 458 459
        return;
    }

460 461 462 463 464 465 466 467
    while (insize > 0) {
        size_t processed = 0;
        rc = decoder->decode (inpos, insize, processed);
        zmq_assert (processed <= insize);
        inpos += processed;
        insize -= processed;
        if (rc == 0 || rc == -1)
            break;
Martin Hurton's avatar
Martin Hurton committed
468
        rc = (this->*process_msg) (decoder->msg ());
469 470 471
        if (rc == -1)
            break;
    }
472

473 474 475
    if (rc == -1 && errno == EAGAIN)
        session->flush ();
    else
476 477 478 479 480
    if (io_error)
        error (connection_error);
    else
    if (rc == -1)
        error (protocol_error);
481
    else {
482
        input_stopped = false;
483 484 485 486 487 488
        set_pollin (handle);
        session->flush ();

        //  Speculative read.
        in_event ();
    }
489 490
}

491
bool zmq::stream_engine_t::handshake ()
Martin Hurton's avatar
Martin Hurton committed
492
{
493
    zmq_assert (handshaking);
Martin Hurton's avatar
Martin Hurton committed
494
    zmq_assert (greeting_bytes_read < greeting_size);
495
    //  Receive the greeting.
Martin Hurton's avatar
Martin Hurton committed
496
    while (greeting_bytes_read < greeting_size) {
497 498
        const int n = tcp_read (s, greeting_recv + greeting_bytes_read,
                                greeting_size - greeting_bytes_read);
499
        if (n == 0) {
500
            errno = EPIPE;
501
            error (connection_error);
502 503
            return false;
        }
504 505
        if (n == -1) {
            if (errno != EAGAIN)
506
                error (connection_error);
507
            return false;
508
        }
Martin Hurton's avatar
Martin Hurton committed
509 510 511

        greeting_bytes_read += n;

512 513 514
        //  We have received at least one byte from the peer.
        //  If the first byte is not 0xff, we know that the
        //  peer is using unversioned protocol.
Pieter Hintjens's avatar
Pieter Hintjens committed
515
        if (greeting_recv [0] != 0xff)
516
            break;
Martin Hurton's avatar
Martin Hurton committed
517

518
        if (greeting_bytes_read < signature_size)
519
            continue;
Martin Hurton's avatar
Martin Hurton committed
520

521 522
        //  Inspect the right-most bit of the 10th byte (which coincides
        //  with the 'flags' field if a regular message was sent).
523
        //  Zero indicates this is a header of a routing id message
524
        //  (i.e. the peer is using the unversioned protocol).
Pieter Hintjens's avatar
Pieter Hintjens committed
525
        if (!(greeting_recv [9] & 0x01))
526
            break;
Martin Hurton's avatar
Martin Hurton committed
527

528
        //  The peer is using versioned protocol.
529 530
        //  Send the major version number.
        if (outpos + outsize == greeting_send + signature_size) {
Martin Hurton's avatar
Martin Hurton committed
531 532
            if (outsize == 0)
                set_pollout (handle);
533 534 535 536 537 538 539 540 541 542 543 544 545 546 547
            outpos [outsize++] = 3;     //  Major version number
        }

        if (greeting_bytes_read > signature_size) {
            if (outpos + outsize == greeting_send + signature_size + 1) {
                if (outsize == 0)
                    set_pollout (handle);

                //  Use ZMTP/2.0 to talk to older peers.
                if (greeting_recv [10] == ZMTP_1_0
                ||  greeting_recv [10] == ZMTP_2_0)
                    outpos [outsize++] = options.type;
                else {
                    outpos [outsize++] = 0; //  Minor version number
                    memset (outpos + outsize, 0, 20);
548 549 550

                    zmq_assert (options.mechanism == ZMQ_NULL
                            ||  options.mechanism == ZMQ_PLAIN
551 552
                            ||  options.mechanism == ZMQ_CURVE
                            ||  options.mechanism == ZMQ_GSSAPI);
553

554 555 556
                    if (options.mechanism == ZMQ_NULL)
                        memcpy (outpos + outsize, "NULL", 4);
                    else
557
                    if (options.mechanism == ZMQ_PLAIN)
558
                        memcpy (outpos + outsize, "PLAIN", 5);
559
                    else
560 561
                    if (options.mechanism == ZMQ_GSSAPI)
                        memcpy (outpos + outsize, "GSSAPI", 6);
562
                    else
563
                    if (options.mechanism == ZMQ_CURVE)
564
                        memcpy (outpos + outsize, "CURVE", 5);
565 566 567 568 569 570
                    outsize += 20;
                    memset (outpos + outsize, 0, 32);
                    outsize += 32;
                    greeting_size = v3_greeting_size;
                }
            }
Martin Hurton's avatar
Martin Hurton committed
571 572 573
        }
    }

574 575
    //  Position of the revision field in the greeting.
    const size_t revision_pos = 10;
576

577
    //  Is the peer using ZMTP/1.0 with no revision number?
578
    //  If so, we send and receive rest of routing id message
Pieter Hintjens's avatar
Pieter Hintjens committed
579
    if (greeting_recv [0] != 0xff || !(greeting_recv [9] & 0x01)) {
Min RK's avatar
Min RK committed
580
        if (session->zap_enabled ()) {
581 582 583 584 585
           // reject ZMTP 1.0 connections if ZAP is enabled
           error (protocol_error);
           return false;
        }

586
        encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
587 588
        alloc_assert (encoder);

589
        decoder = new (std::nothrow) v1_decoder_t (in_batch_size, options.maxmsgsize);
590 591
        alloc_assert (decoder);

Martin Hurton's avatar
Martin Hurton committed
592 593 594 595
        //  We have already sent the message header.
        //  Since there is no way to tell the encoder to
        //  skip the message header, we simply throw that
        //  header data away.
596
        const size_t header_size = options.routing_id_size + 1 >= 255 ? 10 : 2;
Martin Hurton's avatar
Martin Hurton committed
597
        unsigned char tmp [10], *bufferp = tmp;
598

599
        //  Prepare the routing id message and load it into encoder.
600
        //  Then consume bytes we have already sent to the peer.
601
        const int rc = tx_msg.init_size (options.routing_id_size);
602
        zmq_assert (rc == 0);
603
        memcpy (tx_msg.data (), options.routing_id, options.routing_id_size);
604
        encoder->load_msg (&tx_msg);
605
        size_t buffer_size = encoder->encode (&bufferp, header_size);
Martin Hurton's avatar
Martin Hurton committed
606 607 608
        zmq_assert (buffer_size == header_size);

        //  Make sure the decoder sees the data we have already received.
Pieter Hintjens's avatar
Pieter Hintjens committed
609
        inpos = greeting_recv;
Martin Hurton's avatar
Martin Hurton committed
610
        insize = greeting_bytes_read;
611 612

        //  To allow for interoperability with peers that do not forward
613 614
        //  their subscriptions, we inject a phantom subscription message
        //  message into the incoming message stream.
615
        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB)
616
            subscription_required = true;
Martin Hurton's avatar
Martin Hurton committed
617

618
        //  We are sending our routing id now and the next message
Martin Hurton's avatar
Martin Hurton committed
619
        //  will come from the socket.
Martin Hurton's avatar
Martin Hurton committed
620
        next_msg = &stream_engine_t::pull_msg_from_session;
Martin Hurton's avatar
Martin Hurton committed
621

622 623
        //  We are expecting routing id message.
        process_msg = &stream_engine_t::process_routing_id_msg;
Martin Hurton's avatar
Martin Hurton committed
624
    }
625
    else
Pieter Hintjens's avatar
Pieter Hintjens committed
626
    if (greeting_recv [revision_pos] == ZMTP_1_0) {
Min RK's avatar
Min RK committed
627
        if (session->zap_enabled ()) {
628 629 630 631 632
           // reject ZMTP 1.0 connections if ZAP is enabled
           error (protocol_error);
           return false;
        }

633
        encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
634 635
        alloc_assert (encoder);

636
        decoder = new (std::nothrow) v1_decoder_t (
637
            in_batch_size, options.maxmsgsize);
638 639
        alloc_assert (decoder);
    }
640 641
    else
    if (greeting_recv [revision_pos] == ZMTP_2_0) {
Min RK's avatar
Min RK committed
642
        if (session->zap_enabled ()) {
643 644 645 646 647
           // reject ZMTP 2.0 connections if ZAP is enabled
           error (protocol_error);
           return false;
        }

648
        encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
649 650 651
        alloc_assert (encoder);

        decoder = new (std::nothrow) v2_decoder_t (
652
            in_batch_size, options.maxmsgsize);
653 654
        alloc_assert (decoder);
    }
655
    else {
656
        encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
657 658
        alloc_assert (encoder);

659
        decoder = new (std::nothrow) v2_decoder_t (
660
                in_batch_size, options.maxmsgsize);
661
        alloc_assert (decoder);
662

663 664
        if (options.mechanism == ZMQ_NULL
        &&  memcmp (greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
665 666
            mechanism = new (std::nothrow)
                null_mechanism_t (session, peer_address, options);
667
            alloc_assert (mechanism);
668 669
        }
        else
670 671
        if (options.mechanism == ZMQ_PLAIN
        &&  memcmp (greeting_recv + 12, "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
672 673 674 675 676
            if (options.as_server)
                mechanism = new (std::nothrow)
                    plain_server_t (session, peer_address, options);
            else
                mechanism = new (std::nothrow)
677
                    plain_client_t (session, options);
678
            alloc_assert (mechanism);
679
        }
680
#ifdef ZMQ_HAVE_CURVE
681
        else
682 683
        if (options.mechanism == ZMQ_CURVE
        &&  memcmp (greeting_recv + 12, "CURVE\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
684
            if (options.as_server)
685 686
                mechanism = new (std::nothrow)
                    curve_server_t (session, peer_address, options);
687
            else
688 689
                mechanism =
                  new (std::nothrow) curve_client_t (session, options);
690 691 692
            alloc_assert (mechanism);
        }
#endif
Chris Laws's avatar
Chris Laws committed
693
#ifdef HAVE_LIBGSSAPI_KRB5
694
        else
695 696
        if (options.mechanism == ZMQ_GSSAPI
        &&  memcmp (greeting_recv + 12, "GSSAPI\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
697 698 699 700
            if (options.as_server)
                mechanism = new (std::nothrow)
                    gssapi_server_t (session, peer_address, options);
            else
701 702
                mechanism =
                    new (std::nothrow) gssapi_client_t (session, options);
703 704
            alloc_assert (mechanism);
        }
705
#endif
706
        else {
707 708
            session->get_socket ()->event_handshake_failed_protocol (
              session->get_endpoint (), ZMQ_PROTOCOL_ERROR_ZMTP_MECHANISM_MISMATCH);
709
            error (protocol_error);
710 711
            return false;
        }
Martin Hurton's avatar
Martin Hurton committed
712 713
        next_msg = &stream_engine_t::next_handshake_command;
        process_msg = &stream_engine_t::process_handshake_command;
714
    }
Martin Hurton's avatar
Martin Hurton committed
715 716 717 718 719 720 721 722 723

    // Start polling for output if necessary.
    if (outsize == 0)
        set_pollout (handle);

    //  Handshaking was successful.
    //  Switch into the normal message flow.
    handshaking = false;

724 725 726 727 728
    if (has_handshake_timer) {
        cancel_timer (handshake_timer_id);
        has_handshake_timer = false;
    }

Martin Hurton's avatar
Martin Hurton committed
729 730 731
    return true;
}

732
int zmq::stream_engine_t::routing_id_msg (msg_t *msg_)
733
{
734
    int rc = msg_->init_size (options.routing_id_size);
735
    errno_assert (rc == 0);
736 737
    if (options.routing_id_size > 0)
        memcpy (msg_->data (), options.routing_id, options.routing_id_size);
Martin Hurton's avatar
Martin Hurton committed
738
    next_msg = &stream_engine_t::pull_msg_from_session;
739 740
    return 0;
}
741

742
int zmq::stream_engine_t::process_routing_id_msg (msg_t *msg_)
743
{
744 745
    if (options.recv_routing_id) {
        msg_->set_flags (msg_t::routing_id);
746
        int rc = session->push_msg (msg_);
747 748
        errno_assert (rc == 0);
    }
749 750 751 752 753 754 755
    else {
        int rc = msg_->close ();
        errno_assert (rc == 0);
        rc = msg_->init ();
        errno_assert (rc == 0);
    }

756 757 758 759 760 761 762 763 764 765 766 767 768
    if (subscription_required) {
        msg_t subscription;

        //  Inject the subscription message, so that also
        //  ZMQ 2.x peers receive published messages.
        int rc = subscription.init_size (1);
        errno_assert (rc == 0);
        *(unsigned char*) subscription.data () = 1;
        rc = session->push_msg (&subscription);
        errno_assert (rc == 0);
    }

    process_msg = &stream_engine_t::push_msg_to_session;
769

770 771
    return 0;
}
772

773
int zmq::stream_engine_t::next_handshake_command (msg_t *msg_)
774
{
775 776
    zmq_assert (mechanism != NULL);

777 778 779 780 781 782 783 784 785 786 787
    if (mechanism->status () == mechanism_t::ready) {
        mechanism_ready ();
        return pull_and_encode (msg_);
    }
    else
    if (mechanism->status () == mechanism_t::error) {
        errno = EPROTO;
        return -1;
    }
    else {
        const int rc = mechanism->next_handshake_command (msg_);
Vincent Tellier's avatar
Vincent Tellier committed
788

789 790
        if (rc == 0)
            msg_->set_flags (msg_t::command);
Vincent Tellier's avatar
Vincent Tellier committed
791

792
        return rc;
793
    }
794 795
}

796
int zmq::stream_engine_t::process_handshake_command (msg_t *msg_)
797
{
798
    zmq_assert (mechanism != NULL);
799
    const int rc = mechanism->process_handshake_command (msg_);
800
    if (rc == 0) {
801
        if (mechanism->status () == mechanism_t::ready)
802
            mechanism_ready ();
803 804 805 806 807
        else
        if (mechanism->status () == mechanism_t::error) {
            errno = EPROTO;
            return -1;
        }
808 809
        if (output_stopped)
            restart_output ();
810 811
    }

812
    return rc;
813 814
}

815 816 817 818 819 820
void zmq::stream_engine_t::zap_msg_available ()
{
    zmq_assert (mechanism != NULL);

    const int rc = mechanism->zap_msg_available ();
    if (rc == -1) {
821
        error (protocol_error);
822 823
        return;
    }
824 825 826 827
    if (input_stopped)
        restart_input ();
    if (output_stopped)
        restart_output ();
828 829
}

830 831 832 833 834
const char *zmq::stream_engine_t::get_endpoint () const
{
    return endpoint.c_str ();
}

835
void zmq::stream_engine_t::mechanism_ready ()
836
{
837 838 839 840 841
    if (options.heartbeat_interval > 0) {
        add_timer(options.heartbeat_interval, heartbeat_ivl_timer_id);
        has_heartbeat_timer = true;
    }

842 843 844 845
    if (options.recv_routing_id) {
        msg_t routing_id;
        mechanism->peer_routing_id (&routing_id);
        const int rc = session->push_msg (&routing_id);
846 847 848
        if (rc == -1 && errno == EAGAIN) {
            // If the write is failing at this stage with
            // an EAGAIN the pipe must be being shut down,
849
            // so we can just bail out of the routing id set.
850 851
            return;
        }
852
        errno_assert (rc == 0);
853
        session->flush ();
854
    }
855

Martin Hurton's avatar
Martin Hurton committed
856 857
    next_msg = &stream_engine_t::pull_and_encode;
    process_msg = &stream_engine_t::write_credential;
858 859 860

    //  Compile metadata.
    properties_t properties;
Thomas Rodgers's avatar
Thomas Rodgers committed
861
    init_properties(properties);
862

Pieter Hintjens's avatar
Pieter Hintjens committed
863 864
    //  Add ZAP properties.
    const properties_t& zap_properties = mechanism->get_zap_properties ();
Thomas Rodgers's avatar
Thomas Rodgers committed
865
    properties.insert(zap_properties.begin (), zap_properties.end ());
Pieter Hintjens's avatar
Pieter Hintjens committed
866

867 868
    //  Add ZMTP properties.
    const properties_t& zmtp_properties = mechanism->get_zmtp_properties ();
Thomas Rodgers's avatar
Thomas Rodgers committed
869
    properties.insert(zmtp_properties.begin (), zmtp_properties.end ());
870

871 872
    zmq_assert (metadata == NULL);
    if (!properties.empty ())
873
    {
874
        metadata = new (std::nothrow) metadata_t (properties);
875 876
        alloc_assert (metadata);
    }
Vincent Tellier's avatar
Vincent Tellier committed
877

878
#ifdef ZMQ_BUILD_DRAFT_API
879
    socket->event_handshake_succeeded (endpoint, 0);
880
#endif
881 882
}

883
int zmq::stream_engine_t::pull_msg_from_session (msg_t *msg_)
884
{
885 886
    return session->pull_msg (msg_);
}
887

888 889 890 891 892
int zmq::stream_engine_t::push_msg_to_session (msg_t *msg_)
{
    return session->push_msg (msg_);
}

893
int zmq::stream_engine_t::push_raw_msg_to_session (msg_t *msg_) {
894
    if (metadata && metadata != msg_->metadata())
895 896 897 898
        msg_->set_metadata(metadata);
    return push_msg_to_session(msg_);
}

899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917
int zmq::stream_engine_t::write_credential (msg_t *msg_)
{
    zmq_assert (mechanism != NULL);
    zmq_assert (session != NULL);

    const blob_t credential = mechanism->get_user_id ();
    if (credential.size () > 0) {
        msg_t msg;
        int rc = msg.init_size (credential.size ());
        zmq_assert (rc == 0);
        memcpy (msg.data (), credential.data (), credential.size ());
        msg.set_flags (msg_t::credential);
        rc = session->push_msg (&msg);
        if (rc == -1) {
            rc = msg.close ();
            errno_assert (rc == 0);
            return -1;
        }
    }
Martin Hurton's avatar
Martin Hurton committed
918
    process_msg = &stream_engine_t::decode_and_push;
919 920 921
    return decode_and_push (msg_);
}

922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938
int zmq::stream_engine_t::pull_and_encode (msg_t *msg_)
{
    zmq_assert (mechanism != NULL);

    if (session->pull_msg (msg_) == -1)
        return -1;
    if (mechanism->encode (msg_) == -1)
        return -1;
    return 0;
}

int zmq::stream_engine_t::decode_and_push (msg_t *msg_)
{
    zmq_assert (mechanism != NULL);

    if (mechanism->decode (msg_) == -1)
        return -1;
Jonathan Reams's avatar
Jonathan Reams committed
939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955

    if(has_timeout_timer) {
        has_timeout_timer = false;
        cancel_timer(heartbeat_timeout_timer_id);
    }

    if(has_ttl_timer) {
        has_ttl_timer = false;
        cancel_timer(heartbeat_ttl_timer_id);
    }

    if(msg_->flags() & msg_t::command) {
        uint8_t cmd_id = *((uint8_t*)msg_->data());
        if(cmd_id == 4)
            process_heartbeat_message(msg_);
    }

956
    if (metadata)
957
        msg_->set_metadata (metadata);
958 959
    if (session->push_msg (msg_) == -1) {
        if (errno == EAGAIN)
Martin Hurton's avatar
Martin Hurton committed
960
            process_msg = &stream_engine_t::push_one_then_decode_and_push;
961 962 963 964 965 966 967 968 969
        return -1;
    }
    return 0;
}

int zmq::stream_engine_t::push_one_then_decode_and_push (msg_t *msg_)
{
    const int rc = session->push_msg (msg_);
    if (rc == 0)
Martin Hurton's avatar
Martin Hurton committed
970
        process_msg = &stream_engine_t::decode_and_push;
971 972 973
    return rc;
}

974
void zmq::stream_engine_t::error (error_reason_t reason)
975
{
976
    if (options.raw_socket && options.raw_notify) {
977 978 979 980
        //  For raw sockets, send a final 0-length message to the application
        //  so that it knows the peer has been disconnected.
        msg_t terminator;
        terminator.init();
Martin Hurton's avatar
Martin Hurton committed
981
        (this->*process_msg) (&terminator);
982 983
        terminator.close();
    }
984
    zmq_assert (session);
985
#ifdef ZMQ_BUILD_DRAFT_API
986 987 988 989 990 991
    // protocol errors have been signaled already at the point where they occurred
    if (reason != protocol_error
        && (mechanism == NULL
            || mechanism->status () == mechanism_t::handshaking)) {
        int err = errno;
        socket->event_handshake_failed_no_detail (endpoint, err);
992
    }
993
#endif
994
    socket->event_disconnected (endpoint, (int) s);
995
    session->flush ();
996
    session->engine_error (reason);
997 998 999 1000
    unplug ();
    delete this;
}

1001
void zmq::stream_engine_t::set_handshake_timer ()
1002
{
1003
    zmq_assert (!has_handshake_timer);
1004

1005
    if (!options.raw_socket && options.handshake_ivl > 0) {
1006 1007
        add_timer (options.handshake_ivl, handshake_timer_id);
        has_handshake_timer = true;
1008
    }
1009 1010
}

Thomas Rodgers's avatar
Thomas Rodgers committed
1011 1012
bool zmq::stream_engine_t::init_properties (properties_t & properties) {
    if (peer_address.empty()) return false;
1013 1014
    properties.insert (
      std::make_pair (ZMQ_MSG_PROPERTY_PEER_ADDRESS, peer_address));
1015 1016

    //  Private property to support deprecated SRCFD
1017 1018 1019 1020
    std::ostringstream stream;
    stream << (int)s;
    std::string fd_string = stream.str();
    properties.insert(std::make_pair("__fd", fd_string));
Thomas Rodgers's avatar
Thomas Rodgers committed
1021 1022 1023
    return true;
}

1024
void zmq::stream_engine_t::timer_event (int id_)
1025
{
Jonathan Reams's avatar
Jonathan Reams committed
1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054
    if(id_ == handshake_timer_id) {
        has_handshake_timer = false;
        //  handshake timer expired before handshake completed, so engine fail
        error (timeout_error);
    }
    else if(id_ == heartbeat_ivl_timer_id) {
        next_msg = &stream_engine_t::produce_ping_message;
        out_event();
        add_timer(options.heartbeat_interval, heartbeat_ivl_timer_id);
    }
    else if(id_ == heartbeat_ttl_timer_id) {
        has_ttl_timer = false;
        error(timeout_error);
    }
    else if(id_ == heartbeat_timeout_timer_id) {
        has_timeout_timer = false;
        error(timeout_error);
    }
    else
        // There are no other valid timer ids!
        assert(false);
}

int zmq::stream_engine_t::produce_ping_message(msg_t * msg_)
{
    int rc = 0;
    zmq_assert (mechanism != NULL);

    // 16-bit TTL + \4PING == 7
1055
    rc = msg_->init_size(7);
1056 1057
    errno_assert(rc == 0);
    msg_->set_flags(msg_t::command);
Jonathan Reams's avatar
Jonathan Reams committed
1058 1059 1060 1061 1062 1063 1064 1065
    // Copy in the command message
    memcpy(msg_->data(), "\4PING", 5);

    uint16_t ttl_val = htons(options.heartbeat_ttl);
    memcpy(((uint8_t*)msg_->data()) + 5, &ttl_val, sizeof(ttl_val));

    rc = mechanism->encode (msg_);
    next_msg = &stream_engine_t::pull_and_encode;
1066 1067
    if(!has_timeout_timer && heartbeat_timeout > 0) {
        add_timer(heartbeat_timeout, heartbeat_timeout_timer_id);
Jonathan Reams's avatar
Jonathan Reams committed
1068 1069 1070 1071 1072 1073 1074 1075 1076 1077
        has_timeout_timer = true;
    }
    return rc;
}

int zmq::stream_engine_t::produce_pong_message(msg_t * msg_)
{
    int rc = 0;
    zmq_assert (mechanism != NULL);

1078
    rc = msg_->init_size(5);
1079 1080
    errno_assert(rc == 0);
    msg_->set_flags(msg_t::command);
Jonathan Reams's avatar
Jonathan Reams committed
1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096

    memcpy(msg_->data(), "\4PONG", 5);

    rc = mechanism->encode (msg_);
    next_msg = &stream_engine_t::pull_and_encode;
    return rc;
}

int zmq::stream_engine_t::process_heartbeat_message(msg_t * msg_)
{
    if(memcmp(msg_->data(), "\4PING", 5) == 0) {
        uint16_t remote_heartbeat_ttl;
        // Get the remote heartbeat TTL to setup the timer
        memcpy(&remote_heartbeat_ttl, (uint8_t*)msg_->data() + 5, 2);
        remote_heartbeat_ttl = ntohs(remote_heartbeat_ttl);
        // The remote heartbeat is in 10ths of a second
1097 1098
        // so we multiply it by 100 to get the timer interval in ms.
        remote_heartbeat_ttl *= 100;
Jonathan Reams's avatar
Jonathan Reams committed
1099 1100 1101 1102 1103

        if(!has_ttl_timer && remote_heartbeat_ttl > 0) {
            add_timer(remote_heartbeat_ttl, heartbeat_ttl_timer_id);
            has_ttl_timer = true;
        }
1104

Jonathan Reams's avatar
Jonathan Reams committed
1105 1106 1107 1108 1109
        next_msg = &stream_engine_t::produce_pong_message;
        out_event();
    }

    return 0;
1110
}