stream_engine.cpp 34.3 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
32 33

#include <string.h>
34 35

#ifndef ZMQ_HAVE_WINDOWS
36
#include <unistd.h>
37 38
#endif

39
#include <new>
40
#include <sstream>
41

42
#include "stream_engine.hpp"
43
#include "io_thread.hpp"
44
#include "session_base.hpp"
45 46
#include "v1_encoder.hpp"
#include "v1_decoder.hpp"
47 48
#include "v2_encoder.hpp"
#include "v2_decoder.hpp"
49
#include "null_mechanism.hpp"
50 51
#include "plain_client.hpp"
#include "plain_server.hpp"
52 53
#include "gssapi_client.hpp"
#include "gssapi_server.hpp"
54 55
#include "curve_client.hpp"
#include "curve_server.hpp"
56 57
#include "raw_decoder.hpp"
#include "raw_encoder.hpp"
58 59
#include "config.hpp"
#include "err.hpp"
60
#include "ip.hpp"
61
#include "tcp.hpp"
Martin Hurton's avatar
Martin Hurton committed
62 63
#include "likely.hpp"
#include "wire.hpp"
64

65 66
zmq::stream_engine_t::stream_engine_t (fd_t fd_,
                                       const options_t &options_,
67
                                       const std::string &endpoint_) :
68
    s (fd_),
69 70
    as_server (false),
    handle ((handle_t) NULL),
71 72
    inpos (NULL),
    insize (0),
73
    decoder (NULL),
74 75
    outpos (NULL),
    outsize (0),
76
    encoder (NULL),
77
    metadata (NULL),
Martin Hurton's avatar
Martin Hurton committed
78
    handshaking (true),
79
    greeting_size (v2_greeting_size),
Martin Hurton's avatar
Martin Hurton committed
80
    greeting_bytes_read (0),
81 82
    session (NULL),
    options (options_),
83
    endpoint (endpoint_),
84
    plugged (false),
85 86
    next_msg (&stream_engine_t::routing_id_msg),
    process_msg (&stream_engine_t::process_routing_id_msg),
87 88
    io_error (false),
    subscription_required (false),
89
    mechanism (NULL),
90 91
    input_stopped (false),
    output_stopped (false),
92
    has_handshake_timer (false),
Jonathan Reams's avatar
Jonathan Reams committed
93 94 95
    has_ttl_timer (false),
    has_timeout_timer (false),
    has_heartbeat_timer (false),
96
    heartbeat_timeout (0),
97
    socket (NULL)
98
{
99 100
    int rc = tx_msg.init ();
    errno_assert (rc == 0);
101 102
    rc = pong_msg.init ();
    errno_assert (rc == 0);
Chris Laws's avatar
Chris Laws committed
103

Martin Hurton's avatar
Martin Hurton committed
104
    //  Put the socket into non-blocking mode.
105
    unblock_socket (s);
106

107 108
    int family = get_peer_ip_address (s, peer_address);
    if (family == 0)
109
        peer_address.clear ();
110
#if defined ZMQ_HAVE_SO_PEERCRED
111
    else if (family == PF_UNIX) {
112 113 114 115 116 117 118 119 120
        struct ucred cred;
        socklen_t size = sizeof (cred);
        if (!getsockopt (s, SOL_SOCKET, SO_PEERCRED, &cred, &size)) {
            std::ostringstream buf;
            buf << ":" << cred.uid << ":" << cred.gid << ":" << cred.pid;
            peer_address += buf.str ();
        }
    }
#elif defined ZMQ_HAVE_LOCAL_PEERCRED
121
    else if (family == PF_UNIX) {
122 123 124
        struct xucred cred;
        socklen_t size = sizeof (cred);
        if (!getsockopt (s, 0, LOCAL_PEERCRED, &cred, &size)
125
            && cred.cr_version == XUCRED_VERSION) {
126 127 128 129 130 131 132 133 134
            std::ostringstream buf;
            buf << ":" << cred.cr_uid << ":";
            if (cred.cr_ngroups > 0)
                buf << cred.cr_groups[0];
            buf << ":";
            peer_address += buf.str ();
        }
    }
#endif
135

136
    if (options.heartbeat_interval > 0) {
137
        heartbeat_timeout = options.heartbeat_timeout;
138
        if (heartbeat_timeout == -1)
139 140
            heartbeat_timeout = options.heartbeat_interval;
    }
141 142
}

143
zmq::stream_engine_t::~stream_engine_t ()
144 145 146
{
    zmq_assert (!plugged);

147 148
    if (s != retired_fd) {
#ifdef ZMQ_HAVE_WINDOWS
149 150
        int rc = closesocket (s);
        wsa_assert (rc != SOCKET_ERROR);
151
#else
152
        int rc = close (s);
153
#if defined(__FreeBSD_kernel__) || defined(__FreeBSD__)
154 155 156 157 158
        // FreeBSD may return ECONNRESET on close() under load but this is not
        // an error.
        if (rc == -1 && errno == ECONNRESET)
            rc = 0;
#endif
159 160
        errno_assert (rc == 0);
#endif
161
        s = retired_fd;
162
    }
163

164 165 166
    int rc = tx_msg.close ();
    errno_assert (rc == 0);

167 168
    //  Drop reference to metadata and destroy it if we are
    //  the only user.
169 170
    if (metadata != NULL) {
        if (metadata->drop_ref ()) {
171
            LIBZMQ_DELETE (metadata);
172 173
        }
    }
174

175 176 177
    LIBZMQ_DELETE (encoder);
    LIBZMQ_DELETE (decoder);
    LIBZMQ_DELETE (mechanism);
178 179
}

180
void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
181
                                 session_base_t *session_)
182 183 184 185 186 187 188 189
{
    zmq_assert (!plugged);
    plugged = true;

    //  Connect to session object.
    zmq_assert (!session);
    zmq_assert (session_);
    session = session_;
190
    socket = session->get_socket ();
191 192 193 194

    //  Connect to I/O threads poller object.
    io_object_t::plug (io_thread_);
    handle = add_fd (s);
195
    io_error = false;
Martin Hurton's avatar
Martin Hurton committed
196

197
    if (options.raw_socket) {
198
        // no handshaking for raw sock, instantiate raw encoder and decoders
199
        encoder = new (std::nothrow) raw_encoder_t (out_batch_size);
200 201
        alloc_assert (encoder);

202
        decoder = new (std::nothrow) raw_decoder_t (in_batch_size);
203 204 205 206
        alloc_assert (decoder);

        // disable handshaking for raw socket
        handshaking = false;
207

Martin Hurton's avatar
Martin Hurton committed
208
        next_msg = &stream_engine_t::pull_msg_from_session;
209 210
        process_msg = &stream_engine_t::push_raw_msg_to_session;

Thomas Rodgers's avatar
Thomas Rodgers committed
211
        properties_t properties;
212
        if (init_properties (properties)) {
213 214 215
            //  Compile metadata.
            zmq_assert (metadata == NULL);
            metadata = new (std::nothrow) metadata_t (properties);
216
            alloc_assert (metadata);
217
        }
218

219 220 221 222
        if (options.raw_notify) {
            //  For raw sockets, send an initial 0-length message to the
            // application so that it knows a peer has connected.
            msg_t connector;
223
            connector.init ();
224
            push_raw_msg_to_session (&connector);
225
            connector.close ();
226 227
            session->flush ();
        }
228
    } else {
229 230 231
        // start optional timer, to prevent handshake hanging on no input
        set_handshake_timer ();

232
        //  Send the 'length' and 'flags' fields of the routing id message.
233
        //  The 'length' field is encoded in the long format.
Pieter Hintjens's avatar
Pieter Hintjens committed
234
        outpos = greeting_send;
235 236
        outpos[outsize++] = 0xff;
        put_uint64 (&outpos[outsize], options.routing_id_size + 1);
237
        outsize += 8;
238
        outpos[outsize++] = 0x7f;
239
    }
Martin Hurton's avatar
Martin Hurton committed
240

241 242 243 244 245 246
    set_pollin (handle);
    set_pollout (handle);
    //  Flush all the data that may have been already received downstream.
    in_event ();
}

247
void zmq::stream_engine_t::unplug ()
248 249 250 251
{
    zmq_assert (plugged);
    plugged = false;

252 253 254 255 256 257
    //  Cancel all timers.
    if (has_handshake_timer) {
        cancel_timer (handshake_timer_id);
        has_handshake_timer = false;
    }

Jonathan Reams's avatar
Jonathan Reams committed
258 259 260 261 262 263 264 265 266 267 268 269 270 271
    if (has_ttl_timer) {
        cancel_timer (heartbeat_ttl_timer_id);
        has_ttl_timer = false;
    }

    if (has_timeout_timer) {
        cancel_timer (heartbeat_timeout_timer_id);
        has_timeout_timer = false;
    }

    if (has_heartbeat_timer) {
        cancel_timer (heartbeat_ivl_timer_id);
        has_heartbeat_timer = false;
    }
272
    //  Cancel all fd subscriptions.
273
    if (!io_error)
Martin Hurton's avatar
Martin Hurton committed
274
        rm_fd (handle);
275 276 277 278 279 280 281

    //  Disconnect from I/O threads poller object.
    io_object_t::unplug ();

    session = NULL;
}

282
void zmq::stream_engine_t::terminate ()
283 284 285 286 287
{
    unplug ();
    delete this;
}

288
void zmq::stream_engine_t::in_event ()
289
{
290
    zmq_assert (!io_error);
291

Pieter Hintjens's avatar
Pieter Hintjens committed
292
    //  If still handshaking, receive and process the greeting message.
Martin Hurton's avatar
Martin Hurton committed
293 294 295 296
    if (unlikely (handshaking))
        if (!handshake ())
            return;

297
    zmq_assert (decoder);
298 299

    //  If there has been an I/O error, stop polling.
300
    if (input_stopped) {
301 302 303 304
        rm_fd (handle);
        io_error = true;
        return;
    }
305

306
    //  If there's no data to process in the buffer...
307 308 309 310 311
    if (!insize) {
        //  Retrieve the buffer and read as much data as possible.
        //  Note that buffer can be arbitrarily large. However, we assume
        //  the underlying TCP layer has fixed buffer size and thus the
        //  number of bytes read will be always limited.
312 313
        size_t bufsize = 0;
        decoder->get_buffer (&inpos, &bufsize);
314

315
        const int rc = tcp_read (s, inpos, bufsize);
316

317
        if (rc == 0) {
318 319
            // connection closed by peer
            errno = EPIPE;
320
            error (connection_error);
321
            return;
322
        }
323 324
        if (rc == -1) {
            if (errno != EAGAIN)
325
                error (connection_error);
326 327
            return;
        }
328

329
        //  Adjust input size
330
        insize = static_cast<size_t> (rc);
331
        // Adjust buffer size to received bytes
332
        decoder->resize_buffer (insize);
Martin Hurton's avatar
Martin Hurton committed
333
    }
334

335 336
    int rc = 0;
    size_t processed = 0;
337

338 339 340
    while (insize > 0) {
        rc = decoder->decode (inpos, insize, processed);
        zmq_assert (processed <= insize);
341 342
        inpos += processed;
        insize -= processed;
343 344
        if (rc == 0 || rc == -1)
            break;
Martin Hurton's avatar
Martin Hurton committed
345
        rc = (this->*process_msg) (decoder->msg ());
346 347
        if (rc == -1)
            break;
348 349
    }

350 351 352 353
    //  Tear down the connection if we have failed to decode input data
    //  or the session has rejected the message.
    if (rc == -1) {
        if (errno != EAGAIN) {
354
            error (protocol_error);
355 356
            return;
        }
357
        input_stopped = true;
358
        reset_pollin (handle);
Martin Hurton's avatar
Martin Hurton committed
359
    }
360 361

    session->flush ();
362 363
}

364
void zmq::stream_engine_t::out_event ()
365
{
366 367
    zmq_assert (!io_error);

368 369
    //  If write buffer is empty, try to read new data from the encoder.
    if (!outsize) {
Martin Hurton's avatar
Martin Hurton committed
370 371 372 373 374 375 376 377
        //  Even when we stop polling as soon as there is no
        //  data to send, the poller may invoke out_event one
        //  more time due to 'speculative write' optimisation.
        if (unlikely (encoder == NULL)) {
            zmq_assert (handshaking);
            return;
        }

378
        outpos = NULL;
379 380
        outsize = encoder->encode (&outpos, 0);

381
        while (outsize < (size_t) out_batch_size) {
Martin Hurton's avatar
Martin Hurton committed
382
            if ((this->*next_msg) (&tx_msg) == -1)
383 384 385
                break;
            encoder->load_msg (&tx_msg);
            unsigned char *bufptr = outpos + outsize;
386
            size_t n = encoder->encode (&bufptr, out_batch_size - outsize);
387 388 389 390 391
            zmq_assert (n > 0);
            if (outpos == NULL)
                outpos = bufptr;
            outsize += n;
        }
392 393 394

        //  If there is no data to send, stop polling for output.
        if (outsize == 0) {
395
            output_stopped = true;
396 397 398 399 400 401 402
            reset_pollout (handle);
            return;
        }
    }

    //  If there are any data to write in write buffer, write as much as
    //  possible to the socket. Note that amount of data to write can be
403
    //  arbitrarily large. However, we assume that underlying TCP layer has
404 405
    //  limited transmission buffer and thus the actual number of bytes
    //  written should be reasonably modest.
406
    const int nbytes = tcp_write (s, outpos, outsize);
407

Martin Hurton's avatar
Martin Hurton committed
408 409
    //  IO error has occurred. We stop waiting for output events.
    //  The engine is not terminated until we detect input error;
410
    //  this is necessary to prevent losing incoming messages.
411
    if (nbytes == -1) {
Martin Hurton's avatar
Martin Hurton committed
412
        reset_pollout (handle);
413 414 415 416 417
        return;
    }

    outpos += nbytes;
    outsize -= nbytes;
Martin Hurton's avatar
Martin Hurton committed
418 419 420 421 422 423

    //  If we are still handshaking and there are no data
    //  to send, stop polling for output.
    if (unlikely (handshaking))
        if (outsize == 0)
            reset_pollout (handle);
424 425
}

426
void zmq::stream_engine_t::restart_output ()
427
{
428 429 430
    if (unlikely (io_error))
        return;

431
    if (likely (output_stopped)) {
432
        set_pollout (handle);
433
        output_stopped = false;
434
    }
435 436 437 438 439 440 441 442

    //  Speculative write: The assumption is that at the moment new message
    //  was sent by the user the socket is probably available for writing.
    //  Thus we try to write the data to socket avoiding polling for POLLOUT.
    //  Consequently, the latency should be better in request/reply scenarios.
    out_event ();
}

443
void zmq::stream_engine_t::restart_input ()
444
{
445
    zmq_assert (input_stopped);
446 447 448
    zmq_assert (session != NULL);
    zmq_assert (decoder != NULL);

Martin Hurton's avatar
Martin Hurton committed
449
    int rc = (this->*process_msg) (decoder->msg ());
450 451 452 453
    if (rc == -1) {
        if (errno == EAGAIN)
            session->flush ();
        else
454
            error (protocol_error);
Martin Hurton's avatar
Martin Hurton committed
455 456 457
        return;
    }

458 459 460 461 462 463 464 465
    while (insize > 0) {
        size_t processed = 0;
        rc = decoder->decode (inpos, insize, processed);
        zmq_assert (processed <= insize);
        inpos += processed;
        insize -= processed;
        if (rc == 0 || rc == -1)
            break;
Martin Hurton's avatar
Martin Hurton committed
466
        rc = (this->*process_msg) (decoder->msg ());
467 468 469
        if (rc == -1)
            break;
    }
470

471 472
    if (rc == -1 && errno == EAGAIN)
        session->flush ();
473
    else if (io_error)
474
        error (connection_error);
475
    else if (rc == -1)
476
        error (protocol_error);
477
    else {
478
        input_stopped = false;
479 480 481 482 483 484
        set_pollin (handle);
        session->flush ();

        //  Speculative read.
        in_event ();
    }
485 486
}

487
bool zmq::stream_engine_t::handshake ()
Martin Hurton's avatar
Martin Hurton committed
488
{
489
    zmq_assert (handshaking);
Martin Hurton's avatar
Martin Hurton committed
490
    zmq_assert (greeting_bytes_read < greeting_size);
491
    //  Receive the greeting.
Martin Hurton's avatar
Martin Hurton committed
492
    while (greeting_bytes_read < greeting_size) {
493 494
        const int n = tcp_read (s, greeting_recv + greeting_bytes_read,
                                greeting_size - greeting_bytes_read);
495
        if (n == 0) {
496
            errno = EPIPE;
497
            error (connection_error);
498 499
            return false;
        }
500 501
        if (n == -1) {
            if (errno != EAGAIN)
502
                error (connection_error);
503
            return false;
504
        }
Martin Hurton's avatar
Martin Hurton committed
505 506 507

        greeting_bytes_read += n;

508 509 510
        //  We have received at least one byte from the peer.
        //  If the first byte is not 0xff, we know that the
        //  peer is using unversioned protocol.
511
        if (greeting_recv[0] != 0xff)
512
            break;
Martin Hurton's avatar
Martin Hurton committed
513

514
        if (greeting_bytes_read < signature_size)
515
            continue;
Martin Hurton's avatar
Martin Hurton committed
516

517 518
        //  Inspect the right-most bit of the 10th byte (which coincides
        //  with the 'flags' field if a regular message was sent).
519
        //  Zero indicates this is a header of a routing id message
520
        //  (i.e. the peer is using the unversioned protocol).
521
        if (!(greeting_recv[9] & 0x01))
522
            break;
Martin Hurton's avatar
Martin Hurton committed
523

524
        //  The peer is using versioned protocol.
525 526
        //  Send the major version number.
        if (outpos + outsize == greeting_send + signature_size) {
Martin Hurton's avatar
Martin Hurton committed
527 528
            if (outsize == 0)
                set_pollout (handle);
529
            outpos[outsize++] = 3; //  Major version number
530 531 532 533 534 535 536 537
        }

        if (greeting_bytes_read > signature_size) {
            if (outpos + outsize == greeting_send + signature_size + 1) {
                if (outsize == 0)
                    set_pollout (handle);

                //  Use ZMTP/2.0 to talk to older peers.
538 539 540
                if (greeting_recv[10] == ZMTP_1_0
                    || greeting_recv[10] == ZMTP_2_0)
                    outpos[outsize++] = options.type;
541
                else {
542
                    outpos[outsize++] = 0; //  Minor version number
543
                    memset (outpos + outsize, 0, 20);
544 545

                    zmq_assert (options.mechanism == ZMQ_NULL
546 547 548
                                || options.mechanism == ZMQ_PLAIN
                                || options.mechanism == ZMQ_CURVE
                                || options.mechanism == ZMQ_GSSAPI);
549

550 551
                    if (options.mechanism == ZMQ_NULL)
                        memcpy (outpos + outsize, "NULL", 4);
552
                    else if (options.mechanism == ZMQ_PLAIN)
553
                        memcpy (outpos + outsize, "PLAIN", 5);
554
                    else if (options.mechanism == ZMQ_GSSAPI)
555
                        memcpy (outpos + outsize, "GSSAPI", 6);
556
                    else if (options.mechanism == ZMQ_CURVE)
557
                        memcpy (outpos + outsize, "CURVE", 5);
558 559 560 561 562 563
                    outsize += 20;
                    memset (outpos + outsize, 0, 32);
                    outsize += 32;
                    greeting_size = v3_greeting_size;
                }
            }
Martin Hurton's avatar
Martin Hurton committed
564 565 566
        }
    }

567 568
    //  Position of the revision field in the greeting.
    const size_t revision_pos = 10;
569

570
    //  Is the peer using ZMTP/1.0 with no revision number?
571
    //  If so, we send and receive rest of routing id message
572
    if (greeting_recv[0] != 0xff || !(greeting_recv[9] & 0x01)) {
Min RK's avatar
Min RK committed
573
        if (session->zap_enabled ()) {
574 575 576
            // reject ZMTP 1.0 connections if ZAP is enabled
            error (protocol_error);
            return false;
577 578
        }

579
        encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
580 581
        alloc_assert (encoder);

582 583
        decoder =
          new (std::nothrow) v1_decoder_t (in_batch_size, options.maxmsgsize);
584 585
        alloc_assert (decoder);

Martin Hurton's avatar
Martin Hurton committed
586 587 588 589
        //  We have already sent the message header.
        //  Since there is no way to tell the encoder to
        //  skip the message header, we simply throw that
        //  header data away.
590
        const size_t header_size = options.routing_id_size + 1 >= 255 ? 10 : 2;
591
        unsigned char tmp[10], *bufferp = tmp;
592

593
        //  Prepare the routing id message and load it into encoder.
594
        //  Then consume bytes we have already sent to the peer.
595
        const int rc = tx_msg.init_size (options.routing_id_size);
596
        zmq_assert (rc == 0);
597
        memcpy (tx_msg.data (), options.routing_id, options.routing_id_size);
598
        encoder->load_msg (&tx_msg);
599
        size_t buffer_size = encoder->encode (&bufferp, header_size);
Martin Hurton's avatar
Martin Hurton committed
600 601 602
        zmq_assert (buffer_size == header_size);

        //  Make sure the decoder sees the data we have already received.
Pieter Hintjens's avatar
Pieter Hintjens committed
603
        inpos = greeting_recv;
Martin Hurton's avatar
Martin Hurton committed
604
        insize = greeting_bytes_read;
605 606

        //  To allow for interoperability with peers that do not forward
607 608
        //  their subscriptions, we inject a phantom subscription message
        //  message into the incoming message stream.
609
        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB)
610
            subscription_required = true;
Martin Hurton's avatar
Martin Hurton committed
611

612
        //  We are sending our routing id now and the next message
Martin Hurton's avatar
Martin Hurton committed
613
        //  will come from the socket.
Martin Hurton's avatar
Martin Hurton committed
614
        next_msg = &stream_engine_t::pull_msg_from_session;
Martin Hurton's avatar
Martin Hurton committed
615

616 617
        //  We are expecting routing id message.
        process_msg = &stream_engine_t::process_routing_id_msg;
618
    } else if (greeting_recv[revision_pos] == ZMTP_1_0) {
Min RK's avatar
Min RK committed
619
        if (session->zap_enabled ()) {
620 621 622
            // reject ZMTP 1.0 connections if ZAP is enabled
            error (protocol_error);
            return false;
623 624
        }

625
        encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
626 627
        alloc_assert (encoder);

628 629
        decoder =
          new (std::nothrow) v1_decoder_t (in_batch_size, options.maxmsgsize);
630
        alloc_assert (decoder);
631
    } else if (greeting_recv[revision_pos] == ZMTP_2_0) {
Min RK's avatar
Min RK committed
632
        if (session->zap_enabled ()) {
633 634 635
            // reject ZMTP 2.0 connections if ZAP is enabled
            error (protocol_error);
            return false;
636 637
        }

638
        encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
639 640
        alloc_assert (encoder);

641 642
        decoder = new (std::nothrow)
          v2_decoder_t (in_batch_size, options.maxmsgsize, options.zero_copy);
643
        alloc_assert (decoder);
644
    } else {
645
        encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
646 647
        alloc_assert (encoder);

648 649
        decoder = new (std::nothrow)
          v2_decoder_t (in_batch_size, options.maxmsgsize, options.zero_copy);
650
        alloc_assert (decoder);
651

652
        if (options.mechanism == ZMQ_NULL
653 654 655
            && memcmp (greeting_recv + 12,
                       "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
                 == 0) {
656
            mechanism = new (std::nothrow)
657
              null_mechanism_t (session, peer_address, options);
658
            alloc_assert (mechanism);
659 660 661 662
        } else if (options.mechanism == ZMQ_PLAIN
                   && memcmp (greeting_recv + 12,
                              "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
                        == 0) {
663 664
            if (options.as_server)
                mechanism = new (std::nothrow)
665
                  plain_server_t (session, peer_address, options);
666
            else
667 668
                mechanism =
                  new (std::nothrow) plain_client_t (session, options);
669
            alloc_assert (mechanism);
670
        }
671
#ifdef ZMQ_HAVE_CURVE
672 673 674 675
        else if (options.mechanism == ZMQ_CURVE
                 && memcmp (greeting_recv + 12,
                            "CURVE\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
                      == 0) {
676
            if (options.as_server)
677
                mechanism = new (std::nothrow)
678
                  curve_server_t (session, peer_address, options);
679
            else
680 681
                mechanism =
                  new (std::nothrow) curve_client_t (session, options);
682 683 684
            alloc_assert (mechanism);
        }
#endif
Chris Laws's avatar
Chris Laws committed
685
#ifdef HAVE_LIBGSSAPI_KRB5
686 687 688 689
        else if (options.mechanism == ZMQ_GSSAPI
                 && memcmp (greeting_recv + 12,
                            "GSSAPI\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
                      == 0) {
690 691
            if (options.as_server)
                mechanism = new (std::nothrow)
692
                  gssapi_server_t (session, peer_address, options);
693
            else
694
                mechanism =
695
                  new (std::nothrow) gssapi_client_t (session, options);
696 697
            alloc_assert (mechanism);
        }
698
#endif
699
        else {
700
            session->get_socket ()->event_handshake_failed_protocol (
701 702
              session->get_endpoint (),
              ZMQ_PROTOCOL_ERROR_ZMTP_MECHANISM_MISMATCH);
703
            error (protocol_error);
704 705
            return false;
        }
Martin Hurton's avatar
Martin Hurton committed
706 707
        next_msg = &stream_engine_t::next_handshake_command;
        process_msg = &stream_engine_t::process_handshake_command;
708
    }
Martin Hurton's avatar
Martin Hurton committed
709 710 711 712 713 714 715 716 717

    // Start polling for output if necessary.
    if (outsize == 0)
        set_pollout (handle);

    //  Handshaking was successful.
    //  Switch into the normal message flow.
    handshaking = false;

718 719 720 721 722
    if (has_handshake_timer) {
        cancel_timer (handshake_timer_id);
        has_handshake_timer = false;
    }

Martin Hurton's avatar
Martin Hurton committed
723 724 725
    return true;
}

726
int zmq::stream_engine_t::routing_id_msg (msg_t *msg_)
727
{
728
    int rc = msg_->init_size (options.routing_id_size);
729
    errno_assert (rc == 0);
730 731
    if (options.routing_id_size > 0)
        memcpy (msg_->data (), options.routing_id, options.routing_id_size);
Martin Hurton's avatar
Martin Hurton committed
732
    next_msg = &stream_engine_t::pull_msg_from_session;
733 734
    return 0;
}
735

736
int zmq::stream_engine_t::process_routing_id_msg (msg_t *msg_)
737
{
738 739
    if (options.recv_routing_id) {
        msg_->set_flags (msg_t::routing_id);
740
        int rc = session->push_msg (msg_);
741
        errno_assert (rc == 0);
742
    } else {
743 744 745 746 747 748
        int rc = msg_->close ();
        errno_assert (rc == 0);
        rc = msg_->init ();
        errno_assert (rc == 0);
    }

749 750 751 752 753 754 755
    if (subscription_required) {
        msg_t subscription;

        //  Inject the subscription message, so that also
        //  ZMQ 2.x peers receive published messages.
        int rc = subscription.init_size (1);
        errno_assert (rc == 0);
756
        *(unsigned char *) subscription.data () = 1;
757 758 759 760 761
        rc = session->push_msg (&subscription);
        errno_assert (rc == 0);
    }

    process_msg = &stream_engine_t::push_msg_to_session;
762

763 764
    return 0;
}
765

766
int zmq::stream_engine_t::next_handshake_command (msg_t *msg_)
767
{
768 769
    zmq_assert (mechanism != NULL);

770 771 772
    if (mechanism->status () == mechanism_t::ready) {
        mechanism_ready ();
        return pull_and_encode (msg_);
773
    } else if (mechanism->status () == mechanism_t::error) {
774 775
        errno = EPROTO;
        return -1;
776
    } else {
777
        const int rc = mechanism->next_handshake_command (msg_);
Vincent Tellier's avatar
Vincent Tellier committed
778

779 780
        if (rc == 0)
            msg_->set_flags (msg_t::command);
Vincent Tellier's avatar
Vincent Tellier committed
781

782
        return rc;
783
    }
784 785
}

786
int zmq::stream_engine_t::process_handshake_command (msg_t *msg_)
787
{
788
    zmq_assert (mechanism != NULL);
789
    const int rc = mechanism->process_handshake_command (msg_);
790
    if (rc == 0) {
791
        if (mechanism->status () == mechanism_t::ready)
792
            mechanism_ready ();
793
        else if (mechanism->status () == mechanism_t::error) {
794 795 796
            errno = EPROTO;
            return -1;
        }
797 798
        if (output_stopped)
            restart_output ();
799 800
    }

801
    return rc;
802 803
}

804 805 806 807 808 809
void zmq::stream_engine_t::zap_msg_available ()
{
    zmq_assert (mechanism != NULL);

    const int rc = mechanism->zap_msg_available ();
    if (rc == -1) {
810
        error (protocol_error);
811 812
        return;
    }
813 814 815 816
    if (input_stopped)
        restart_input ();
    if (output_stopped)
        restart_output ();
817 818
}

819 820 821 822 823
const char *zmq::stream_engine_t::get_endpoint () const
{
    return endpoint.c_str ();
}

824
void zmq::stream_engine_t::mechanism_ready ()
825
{
826
    if (options.heartbeat_interval > 0) {
827
        add_timer (options.heartbeat_interval, heartbeat_ivl_timer_id);
828 829 830
        has_heartbeat_timer = true;
    }

831 832 833 834
    if (options.recv_routing_id) {
        msg_t routing_id;
        mechanism->peer_routing_id (&routing_id);
        const int rc = session->push_msg (&routing_id);
835 836 837
        if (rc == -1 && errno == EAGAIN) {
            // If the write is failing at this stage with
            // an EAGAIN the pipe must be being shut down,
838
            // so we can just bail out of the routing id set.
839 840
            return;
        }
841
        errno_assert (rc == 0);
842
        session->flush ();
843
    }
844

Martin Hurton's avatar
Martin Hurton committed
845 846
    next_msg = &stream_engine_t::pull_and_encode;
    process_msg = &stream_engine_t::write_credential;
847 848 849

    //  Compile metadata.
    properties_t properties;
850
    init_properties (properties);
851

Pieter Hintjens's avatar
Pieter Hintjens committed
852
    //  Add ZAP properties.
853 854
    const properties_t &zap_properties = mechanism->get_zap_properties ();
    properties.insert (zap_properties.begin (), zap_properties.end ());
Pieter Hintjens's avatar
Pieter Hintjens committed
855

856
    //  Add ZMTP properties.
857 858
    const properties_t &zmtp_properties = mechanism->get_zmtp_properties ();
    properties.insert (zmtp_properties.begin (), zmtp_properties.end ());
859

860
    zmq_assert (metadata == NULL);
861
    if (!properties.empty ()) {
862
        metadata = new (std::nothrow) metadata_t (properties);
863 864
        alloc_assert (metadata);
    }
Vincent Tellier's avatar
Vincent Tellier committed
865

866
#ifdef ZMQ_BUILD_DRAFT_API
867
    socket->event_handshake_succeeded (endpoint, 0);
868
#endif
869 870
}

871
int zmq::stream_engine_t::pull_msg_from_session (msg_t *msg_)
872
{
873 874
    return session->pull_msg (msg_);
}
875

876 877 878 879 880
int zmq::stream_engine_t::push_msg_to_session (msg_t *msg_)
{
    return session->push_msg (msg_);
}

881 882 883 884 885
int zmq::stream_engine_t::push_raw_msg_to_session (msg_t *msg_)
{
    if (metadata && metadata != msg_->metadata ())
        msg_->set_metadata (metadata);
    return push_msg_to_session (msg_);
886 887
}

888 889 890 891 892
int zmq::stream_engine_t::write_credential (msg_t *msg_)
{
    zmq_assert (mechanism != NULL);
    zmq_assert (session != NULL);

893
    const blob_t &credential = mechanism->get_user_id ();
894 895 896 897 898 899 900 901 902 903 904 905 906
    if (credential.size () > 0) {
        msg_t msg;
        int rc = msg.init_size (credential.size ());
        zmq_assert (rc == 0);
        memcpy (msg.data (), credential.data (), credential.size ());
        msg.set_flags (msg_t::credential);
        rc = session->push_msg (&msg);
        if (rc == -1) {
            rc = msg.close ();
            errno_assert (rc == 0);
            return -1;
        }
    }
Martin Hurton's avatar
Martin Hurton committed
907
    process_msg = &stream_engine_t::decode_and_push;
908 909 910
    return decode_and_push (msg_);
}

911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927
int zmq::stream_engine_t::pull_and_encode (msg_t *msg_)
{
    zmq_assert (mechanism != NULL);

    if (session->pull_msg (msg_) == -1)
        return -1;
    if (mechanism->encode (msg_) == -1)
        return -1;
    return 0;
}

int zmq::stream_engine_t::decode_and_push (msg_t *msg_)
{
    zmq_assert (mechanism != NULL);

    if (mechanism->decode (msg_) == -1)
        return -1;
Jonathan Reams's avatar
Jonathan Reams committed
928

929
    if (has_timeout_timer) {
Jonathan Reams's avatar
Jonathan Reams committed
930
        has_timeout_timer = false;
931
        cancel_timer (heartbeat_timeout_timer_id);
Jonathan Reams's avatar
Jonathan Reams committed
932 933
    }

934
    if (has_ttl_timer) {
Jonathan Reams's avatar
Jonathan Reams committed
935
        has_ttl_timer = false;
936
        cancel_timer (heartbeat_ttl_timer_id);
Jonathan Reams's avatar
Jonathan Reams committed
937 938
    }

939
    if (msg_->flags () & msg_t::command) {
940
        process_command_message (msg_);
Jonathan Reams's avatar
Jonathan Reams committed
941 942
    }

943
    if (metadata)
944
        msg_->set_metadata (metadata);
945 946
    if (session->push_msg (msg_) == -1) {
        if (errno == EAGAIN)
Martin Hurton's avatar
Martin Hurton committed
947
            process_msg = &stream_engine_t::push_one_then_decode_and_push;
948 949 950 951 952 953 954 955 956
        return -1;
    }
    return 0;
}

int zmq::stream_engine_t::push_one_then_decode_and_push (msg_t *msg_)
{
    const int rc = session->push_msg (msg_);
    if (rc == 0)
Martin Hurton's avatar
Martin Hurton committed
957
        process_msg = &stream_engine_t::decode_and_push;
958 959 960
    return rc;
}

961
void zmq::stream_engine_t::error (error_reason_t reason)
962
{
963
    if (options.raw_socket && options.raw_notify) {
964 965 966
        //  For raw sockets, send a final 0-length message to the application
        //  so that it knows the peer has been disconnected.
        msg_t terminator;
967
        terminator.init ();
Martin Hurton's avatar
Martin Hurton committed
968
        (this->*process_msg) (&terminator);
969
        terminator.close ();
970
    }
971
    zmq_assert (session);
972
#ifdef ZMQ_BUILD_DRAFT_API
973 974 975 976 977 978
    // protocol errors have been signaled already at the point where they occurred
    if (reason != protocol_error
        && (mechanism == NULL
            || mechanism->status () == mechanism_t::handshaking)) {
        int err = errno;
        socket->event_handshake_failed_no_detail (endpoint, err);
979
    }
980
#endif
981
    socket->event_disconnected (endpoint, s);
982
    session->flush ();
983
    session->engine_error (reason);
984 985 986 987
    unplug ();
    delete this;
}

988
void zmq::stream_engine_t::set_handshake_timer ()
989
{
990
    zmq_assert (!has_handshake_timer);
991

992
    if (!options.raw_socket && options.handshake_ivl > 0) {
993 994
        add_timer (options.handshake_ivl, handshake_timer_id);
        has_handshake_timer = true;
995
    }
996 997
}

998 999 1000 1001
bool zmq::stream_engine_t::init_properties (properties_t &properties)
{
    if (peer_address.empty ())
        return false;
1002 1003
    properties.ZMQ_MAP_INSERT_OR_EMPLACE (
      std::string (ZMQ_MSG_PROPERTY_PEER_ADDRESS), peer_address);
1004 1005

    //  Private property to support deprecated SRCFD
1006
    std::ostringstream stream;
1007 1008
    stream << (int) s;
    std::string fd_string = stream.str ();
1009 1010
    properties.ZMQ_MAP_INSERT_OR_EMPLACE (std::string ("__fd"),
                                          ZMQ_MOVE (fd_string));
Thomas Rodgers's avatar
Thomas Rodgers committed
1011 1012 1013
    return true;
}

1014
void zmq::stream_engine_t::timer_event (int id_)
1015
{
1016
    if (id_ == handshake_timer_id) {
Jonathan Reams's avatar
Jonathan Reams committed
1017 1018 1019
        has_handshake_timer = false;
        //  handshake timer expired before handshake completed, so engine fail
        error (timeout_error);
1020
    } else if (id_ == heartbeat_ivl_timer_id) {
Jonathan Reams's avatar
Jonathan Reams committed
1021
        next_msg = &stream_engine_t::produce_ping_message;
1022 1023 1024
        out_event ();
        add_timer (options.heartbeat_interval, heartbeat_ivl_timer_id);
    } else if (id_ == heartbeat_ttl_timer_id) {
Jonathan Reams's avatar
Jonathan Reams committed
1025
        has_ttl_timer = false;
1026 1027
        error (timeout_error);
    } else if (id_ == heartbeat_timeout_timer_id) {
Jonathan Reams's avatar
Jonathan Reams committed
1028
        has_timeout_timer = false;
1029 1030
        error (timeout_error);
    } else
Jonathan Reams's avatar
Jonathan Reams committed
1031
        // There are no other valid timer ids!
1032
        assert (false);
Jonathan Reams's avatar
Jonathan Reams committed
1033 1034
}

1035
int zmq::stream_engine_t::produce_ping_message (msg_t *msg_)
Jonathan Reams's avatar
Jonathan Reams committed
1036 1037 1038 1039 1040
{
    int rc = 0;
    zmq_assert (mechanism != NULL);

    // 16-bit TTL + \4PING == 7
1041 1042 1043
    rc = msg_->init_size (7);
    errno_assert (rc == 0);
    msg_->set_flags (msg_t::command);
Jonathan Reams's avatar
Jonathan Reams committed
1044
    // Copy in the command message
1045
    memcpy (msg_->data (), "\4PING", 5);
Jonathan Reams's avatar
Jonathan Reams committed
1046

1047 1048
    uint16_t ttl_val = htons (options.heartbeat_ttl);
    memcpy (((uint8_t *) msg_->data ()) + 5, &ttl_val, sizeof (ttl_val));
Jonathan Reams's avatar
Jonathan Reams committed
1049 1050 1051

    rc = mechanism->encode (msg_);
    next_msg = &stream_engine_t::pull_and_encode;
1052 1053
    if (!has_timeout_timer && heartbeat_timeout > 0) {
        add_timer (heartbeat_timeout, heartbeat_timeout_timer_id);
Jonathan Reams's avatar
Jonathan Reams committed
1054 1055 1056 1057 1058
        has_timeout_timer = true;
    }
    return rc;
}

1059
int zmq::stream_engine_t::produce_pong_message (msg_t *msg_)
Jonathan Reams's avatar
Jonathan Reams committed
1060 1061 1062 1063
{
    int rc = 0;
    zmq_assert (mechanism != NULL);

1064
    rc = msg_->move (pong_msg);
1065
    errno_assert (rc == 0);
Jonathan Reams's avatar
Jonathan Reams committed
1066 1067 1068 1069 1070 1071

    rc = mechanism->encode (msg_);
    next_msg = &stream_engine_t::pull_and_encode;
    return rc;
}

1072
int zmq::stream_engine_t::process_heartbeat_message (msg_t *msg_)
Jonathan Reams's avatar
Jonathan Reams committed
1073
{
1074
    if (memcmp (msg_->data (), "\4PING", 5) == 0) {
Jonathan Reams's avatar
Jonathan Reams committed
1075 1076
        uint16_t remote_heartbeat_ttl;
        // Get the remote heartbeat TTL to setup the timer
1077 1078
        memcpy (&remote_heartbeat_ttl, (uint8_t *) msg_->data () + 5, 2);
        remote_heartbeat_ttl = ntohs (remote_heartbeat_ttl);
Jonathan Reams's avatar
Jonathan Reams committed
1079
        // The remote heartbeat is in 10ths of a second
1080 1081
        // so we multiply it by 100 to get the timer interval in ms.
        remote_heartbeat_ttl *= 100;
Jonathan Reams's avatar
Jonathan Reams committed
1082

1083 1084
        if (!has_ttl_timer && remote_heartbeat_ttl > 0) {
            add_timer (remote_heartbeat_ttl, heartbeat_ttl_timer_id);
Jonathan Reams's avatar
Jonathan Reams committed
1085 1086
            has_ttl_timer = true;
        }
1087

1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101
        //  As per ZMTP 3.1 the PING command might contain an up to 16 bytes
        //  context which needs to be PONGed back, so build the pong message
        //  here and store it. Truncate it if it's too long.
        //  Given the engine goes straight to out_event, sequential PINGs will
        //  not be a problem.
        size_t context_len = msg_->size () - 7 > 16 ? 16 : msg_->size () - 7;
        int rc = pong_msg.init_size (5 + context_len);
        errno_assert (rc == 0);
        pong_msg.set_flags (msg_t::command);
        memcpy (pong_msg.data (), "\4PONG", 5);
        if (context_len > 0)
            memcpy (((uint8_t *) pong_msg.data ()) + 5,
                    ((uint8_t *) msg_->data ()) + 7, context_len);

Jonathan Reams's avatar
Jonathan Reams committed
1102
        next_msg = &stream_engine_t::produce_pong_message;
1103
        out_event ();
Jonathan Reams's avatar
Jonathan Reams committed
1104 1105 1106
    }

    return 0;
1107
}
1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123

int zmq::stream_engine_t::process_command_message (msg_t *msg_)
{
    uint8_t cmd_name_size = *((uint8_t *) msg_->data ());
    //  Malformed command
    if (msg_->size () < cmd_name_size + sizeof (cmd_name_size))
        return -1;

    uint8_t *cmd_name = ((uint8_t *) msg_->data ()) + 1;
    if (cmd_name_size == 4
        && (memcmp (cmd_name, "PING", cmd_name_size) == 0
            || memcmp (cmd_name, "PONG", cmd_name_size) == 0))
        return process_heartbeat_message (msg_);

    return 0;
}