stream_engine.cpp 32.3 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31
#include "macros.hpp"
32 33

#include <string.h>
34 35

#ifndef ZMQ_HAVE_WINDOWS
36
#include <unistd.h>
37 38
#endif

39
#include <new>
40
#include <sstream>
41

42
#include "stream_engine.hpp"
43
#include "io_thread.hpp"
44
#include "session_base.hpp"
45 46
#include "v1_encoder.hpp"
#include "v1_decoder.hpp"
47 48
#include "v2_encoder.hpp"
#include "v2_decoder.hpp"
49
#include "null_mechanism.hpp"
50 51
#include "plain_client.hpp"
#include "plain_server.hpp"
52 53
#include "gssapi_client.hpp"
#include "gssapi_server.hpp"
54 55
#include "curve_client.hpp"
#include "curve_server.hpp"
56 57
#include "raw_decoder.hpp"
#include "raw_encoder.hpp"
58 59
#include "config.hpp"
#include "err.hpp"
60
#include "ip.hpp"
61
#include "tcp.hpp"
Martin Hurton's avatar
Martin Hurton committed
62 63
#include "likely.hpp"
#include "wire.hpp"
64

Chris Laws's avatar
Chris Laws committed
65
zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
66
                                       const std::string &endpoint_) :
67
    s (fd_),
68
    as_server(false),
69
    handle((handle_t)NULL),
70 71
    inpos (NULL),
    insize (0),
72
    decoder (NULL),
73 74
    outpos (NULL),
    outsize (0),
75
    encoder (NULL),
76
    metadata (NULL),
Martin Hurton's avatar
Martin Hurton committed
77
    handshaking (true),
78
    greeting_size (v2_greeting_size),
Martin Hurton's avatar
Martin Hurton committed
79
    greeting_bytes_read (0),
80 81
    session (NULL),
    options (options_),
82
    endpoint (endpoint_),
83
    plugged (false),
Martin Hurton's avatar
Martin Hurton committed
84 85
    next_msg (&stream_engine_t::identity_msg),
    process_msg (&stream_engine_t::process_identity_msg),
86 87
    io_error (false),
    subscription_required (false),
88
    mechanism (NULL),
89 90
    input_stopped (false),
    output_stopped (false),
91
    has_handshake_timer (false),
Jonathan Reams's avatar
Jonathan Reams committed
92 93 94
    has_ttl_timer (false),
    has_timeout_timer (false),
    has_heartbeat_timer (false),
95
    heartbeat_timeout (0),
96
    socket (NULL)
97
{
98 99
    int rc = tx_msg.init ();
    errno_assert (rc == 0);
Chris Laws's avatar
Chris Laws committed
100

Martin Hurton's avatar
Martin Hurton committed
101
    //  Put the socket into non-blocking mode.
102
    unblock_socket (s);
103

104 105
    int family = get_peer_ip_address (s, peer_address);
    if (family == 0)
106
        peer_address.clear();
107
#if defined ZMQ_HAVE_SO_PEERCRED
108 109
    else
    if (family == PF_UNIX) {
110 111 112 113 114 115 116 117 118
        struct ucred cred;
        socklen_t size = sizeof (cred);
        if (!getsockopt (s, SOL_SOCKET, SO_PEERCRED, &cred, &size)) {
            std::ostringstream buf;
            buf << ":" << cred.uid << ":" << cred.gid << ":" << cred.pid;
            peer_address += buf.str ();
        }
    }
#elif defined ZMQ_HAVE_LOCAL_PEERCRED
119 120
    else
    if (family == PF_UNIX) {
121 122 123 124 125 126 127 128 129 130 131 132 133
        struct xucred cred;
        socklen_t size = sizeof (cred);
        if (!getsockopt (s, 0, LOCAL_PEERCRED, &cred, &size)
                && cred.cr_version == XUCRED_VERSION) {
            std::ostringstream buf;
            buf << ":" << cred.cr_uid << ":";
            if (cred.cr_ngroups > 0)
                buf << cred.cr_groups[0];
            buf << ":";
            peer_address += buf.str ();
        }
    }
#endif
134

135
#ifdef SO_NOSIGPIPE
136 137 138
    //  Make sure that SIGPIPE signal is not generated when writing to a
    //  connection that was already closed by the peer.
    int set = 1;
139
    rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int));
140 141
    errno_assert (rc == 0);
#endif
142 143 144 145 146
    if(options.heartbeat_interval > 0) {
        heartbeat_timeout = options.heartbeat_timeout;
        if(heartbeat_timeout == -1)
            heartbeat_timeout = options.heartbeat_interval;
    }
147 148
}

149
zmq::stream_engine_t::~stream_engine_t ()
150 151 152
{
    zmq_assert (!plugged);

153 154
    if (s != retired_fd) {
#ifdef ZMQ_HAVE_WINDOWS
155 156
        int rc = closesocket (s);
        wsa_assert (rc != SOCKET_ERROR);
157
#else
158
        int rc = close (s);
159 160 161 162 163 164
#ifdef __FreeBSD_kernel__
        // FreeBSD may return ECONNRESET on close() under load but this is not
        // an error.
        if (rc == -1 && errno == ECONNRESET)
            rc = 0;
#endif
165 166
        errno_assert (rc == 0);
#endif
167
        s = retired_fd;
168
    }
169

170 171 172
    int rc = tx_msg.close ();
    errno_assert (rc == 0);

173 174
    //  Drop reference to metadata and destroy it if we are
    //  the only user.
175 176 177 178 179
    if (metadata != NULL) {
        if (metadata->drop_ref ()) {
            LIBZMQ_DELETE(metadata);
        }
    }
180

181 182 183
    LIBZMQ_DELETE(encoder);
    LIBZMQ_DELETE(decoder);
    LIBZMQ_DELETE(mechanism);
184 185
}

186 187
void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
    session_base_t *session_)
188 189 190 191 192 193 194 195
{
    zmq_assert (!plugged);
    plugged = true;

    //  Connect to session object.
    zmq_assert (!session);
    zmq_assert (session_);
    session = session_;
196
    socket = session-> get_socket ();
197 198 199 200

    //  Connect to I/O threads poller object.
    io_object_t::plug (io_thread_);
    handle = add_fd (s);
201
    io_error = false;
Martin Hurton's avatar
Martin Hurton committed
202

203
    if (options.raw_socket) {
204
        // no handshaking for raw sock, instantiate raw encoder and decoders
205
        encoder = new (std::nothrow) raw_encoder_t (out_batch_size);
206 207
        alloc_assert (encoder);

208
        decoder = new (std::nothrow) raw_decoder_t (in_batch_size);
209 210 211 212
        alloc_assert (decoder);

        // disable handshaking for raw socket
        handshaking = false;
213

Martin Hurton's avatar
Martin Hurton committed
214
        next_msg = &stream_engine_t::pull_msg_from_session;
215 216
        process_msg = &stream_engine_t::push_raw_msg_to_session;

Thomas Rodgers's avatar
Thomas Rodgers committed
217 218
        properties_t properties;
        if (init_properties(properties)) {
219 220 221 222
            //  Compile metadata.
            zmq_assert (metadata == NULL);
            metadata = new (std::nothrow) metadata_t (properties);
        }
223

224 225 226 227 228
        if (options.raw_notify) {
            //  For raw sockets, send an initial 0-length message to the
            // application so that it knows a peer has connected.
            msg_t connector;
            connector.init();
229
            push_raw_msg_to_session (&connector);
230 231 232
            connector.close();
            session->flush ();
        }
Martin Hurton's avatar
Martin Hurton committed
233 234
    }
    else {
235 236 237
        // start optional timer, to prevent handshake hanging on no input
        set_handshake_timer ();

238 239
        //  Send the 'length' and 'flags' fields of the identity message.
        //  The 'length' field is encoded in the long format.
Pieter Hintjens's avatar
Pieter Hintjens committed
240
        outpos = greeting_send;
241 242 243 244 245
        outpos [outsize++] = 0xff;
        put_uint64 (&outpos [outsize], options.identity_size + 1);
        outsize += 8;
        outpos [outsize++] = 0x7f;
    }
Martin Hurton's avatar
Martin Hurton committed
246

247 248 249 250 251 252
    set_pollin (handle);
    set_pollout (handle);
    //  Flush all the data that may have been already received downstream.
    in_event ();
}

253
void zmq::stream_engine_t::unplug ()
254 255 256 257
{
    zmq_assert (plugged);
    plugged = false;

258 259 260 261 262 263
    //  Cancel all timers.
    if (has_handshake_timer) {
        cancel_timer (handshake_timer_id);
        has_handshake_timer = false;
    }

Jonathan Reams's avatar
Jonathan Reams committed
264 265 266 267 268 269 270 271 272 273 274 275 276 277
    if (has_ttl_timer) {
        cancel_timer (heartbeat_ttl_timer_id);
        has_ttl_timer = false;
    }

    if (has_timeout_timer) {
        cancel_timer (heartbeat_timeout_timer_id);
        has_timeout_timer = false;
    }

    if (has_heartbeat_timer) {
        cancel_timer (heartbeat_ivl_timer_id);
        has_heartbeat_timer = false;
    }
278
    //  Cancel all fd subscriptions.
279
    if (!io_error)
Martin Hurton's avatar
Martin Hurton committed
280
        rm_fd (handle);
281 282 283 284 285 286 287

    //  Disconnect from I/O threads poller object.
    io_object_t::unplug ();

    session = NULL;
}

288
void zmq::stream_engine_t::terminate ()
289 290 291 292 293
{
    unplug ();
    delete this;
}

294
void zmq::stream_engine_t::in_event ()
295
{
296
    zmq_assert (!io_error);
297

Pieter Hintjens's avatar
Pieter Hintjens committed
298
    //  If still handshaking, receive and process the greeting message.
Martin Hurton's avatar
Martin Hurton committed
299 300 301 302
    if (unlikely (handshaking))
        if (!handshake ())
            return;

303
    zmq_assert (decoder);
304 305

    //  If there has been an I/O error, stop polling.
306
    if (input_stopped) {
307 308 309 310
        rm_fd (handle);
        io_error = true;
        return;
    }
311 312 313 314 315 316 317 318

    //  If there's no data to process in the buffer...
    if (!insize) {

        //  Retrieve the buffer and read as much data as possible.
        //  Note that buffer can be arbitrarily large. However, we assume
        //  the underlying TCP layer has fixed buffer size and thus the
        //  number of bytes read will be always limited.
319 320
        size_t bufsize = 0;
        decoder->get_buffer (&inpos, &bufsize);
321

322
        const int rc = tcp_read (s, inpos, bufsize);
323

324
        if (rc == 0) {
325
            error (connection_error);
326
            return;
327
        }
328 329
        if (rc == -1) {
            if (errno != EAGAIN)
330
                error (connection_error);
331 332
            return;
        }
333

334
        //  Adjust input size
335
        insize = static_cast <size_t> (rc);
336 337
        // Adjust buffer size to received bytes
        decoder->resize_buffer(insize);
Martin Hurton's avatar
Martin Hurton committed
338
    }
339

340 341
    int rc = 0;
    size_t processed = 0;
342

343 344 345
    while (insize > 0) {
        rc = decoder->decode (inpos, insize, processed);
        zmq_assert (processed <= insize);
346 347
        inpos += processed;
        insize -= processed;
348 349
        if (rc == 0 || rc == -1)
            break;
Martin Hurton's avatar
Martin Hurton committed
350
        rc = (this->*process_msg) (decoder->msg ());
351 352
        if (rc == -1)
            break;
353 354
    }

355 356 357 358
    //  Tear down the connection if we have failed to decode input data
    //  or the session has rejected the message.
    if (rc == -1) {
        if (errno != EAGAIN) {
359
            error (protocol_error);
360 361
            return;
        }
362
        input_stopped = true;
363
        reset_pollin (handle);
Martin Hurton's avatar
Martin Hurton committed
364
    }
365 366

    session->flush ();
367 368
}

369
void zmq::stream_engine_t::out_event ()
370
{
371 372
    zmq_assert (!io_error);

373 374 375
    //  If write buffer is empty, try to read new data from the encoder.
    if (!outsize) {

Martin Hurton's avatar
Martin Hurton committed
376 377 378 379 380 381 382 383
        //  Even when we stop polling as soon as there is no
        //  data to send, the poller may invoke out_event one
        //  more time due to 'speculative write' optimisation.
        if (unlikely (encoder == NULL)) {
            zmq_assert (handshaking);
            return;
        }

384
        outpos = NULL;
385 386
        outsize = encoder->encode (&outpos, 0);

387
        while (outsize < (size_t) out_batch_size) {
Martin Hurton's avatar
Martin Hurton committed
388
            if ((this->*next_msg) (&tx_msg) == -1)
389 390 391
                break;
            encoder->load_msg (&tx_msg);
            unsigned char *bufptr = outpos + outsize;
392
            size_t n = encoder->encode (&bufptr, out_batch_size - outsize);
393 394 395 396 397
            zmq_assert (n > 0);
            if (outpos == NULL)
                outpos = bufptr;
            outsize += n;
        }
398 399 400

        //  If there is no data to send, stop polling for output.
        if (outsize == 0) {
401
            output_stopped = true;
402 403 404 405 406 407 408
            reset_pollout (handle);
            return;
        }
    }

    //  If there are any data to write in write buffer, write as much as
    //  possible to the socket. Note that amount of data to write can be
409
    //  arbitrarily large. However, we assume that underlying TCP layer has
410 411
    //  limited transmission buffer and thus the actual number of bytes
    //  written should be reasonably modest.
412
    const int nbytes = tcp_write (s, outpos, outsize);
413

Martin Hurton's avatar
Martin Hurton committed
414 415
    //  IO error has occurred. We stop waiting for output events.
    //  The engine is not terminated until we detect input error;
416
    //  this is necessary to prevent losing incoming messages.
417
    if (nbytes == -1) {
Martin Hurton's avatar
Martin Hurton committed
418
        reset_pollout (handle);
419 420 421 422 423
        return;
    }

    outpos += nbytes;
    outsize -= nbytes;
Martin Hurton's avatar
Martin Hurton committed
424 425 426 427 428 429

    //  If we are still handshaking and there are no data
    //  to send, stop polling for output.
    if (unlikely (handshaking))
        if (outsize == 0)
            reset_pollout (handle);
430 431
}

432
void zmq::stream_engine_t::restart_output ()
433
{
434 435 436
    if (unlikely (io_error))
        return;

437
    if (likely (output_stopped)) {
438
        set_pollout (handle);
439
        output_stopped = false;
440
    }
441 442 443 444 445 446 447 448

    //  Speculative write: The assumption is that at the moment new message
    //  was sent by the user the socket is probably available for writing.
    //  Thus we try to write the data to socket avoiding polling for POLLOUT.
    //  Consequently, the latency should be better in request/reply scenarios.
    out_event ();
}

449
void zmq::stream_engine_t::restart_input ()
450
{
451
    zmq_assert (input_stopped);
452 453 454
    zmq_assert (session != NULL);
    zmq_assert (decoder != NULL);

Martin Hurton's avatar
Martin Hurton committed
455
    int rc = (this->*process_msg) (decoder->msg ());
456 457 458 459
    if (rc == -1) {
        if (errno == EAGAIN)
            session->flush ();
        else
460
            error (protocol_error);
Martin Hurton's avatar
Martin Hurton committed
461 462 463
        return;
    }

464 465 466 467 468 469 470 471
    while (insize > 0) {
        size_t processed = 0;
        rc = decoder->decode (inpos, insize, processed);
        zmq_assert (processed <= insize);
        inpos += processed;
        insize -= processed;
        if (rc == 0 || rc == -1)
            break;
Martin Hurton's avatar
Martin Hurton committed
472
        rc = (this->*process_msg) (decoder->msg ());
473 474 475
        if (rc == -1)
            break;
    }
476

477 478 479
    if (rc == -1 && errno == EAGAIN)
        session->flush ();
    else
480 481 482 483 484
    if (io_error)
        error (connection_error);
    else
    if (rc == -1)
        error (protocol_error);
485
    else {
486
        input_stopped = false;
487 488 489 490 491 492
        set_pollin (handle);
        session->flush ();

        //  Speculative read.
        in_event ();
    }
493 494
}

495
bool zmq::stream_engine_t::handshake ()
Martin Hurton's avatar
Martin Hurton committed
496
{
497
    zmq_assert (handshaking);
Martin Hurton's avatar
Martin Hurton committed
498
    zmq_assert (greeting_bytes_read < greeting_size);
499
    //  Receive the greeting.
Martin Hurton's avatar
Martin Hurton committed
500
    while (greeting_bytes_read < greeting_size) {
501 502
        const int n = tcp_read (s, greeting_recv + greeting_bytes_read,
                                greeting_size - greeting_bytes_read);
503
        if (n == 0) {
504
            error (connection_error);
505 506
            return false;
        }
507 508
        if (n == -1) {
            if (errno != EAGAIN)
509
                error (connection_error);
510
            return false;
511
        }
Martin Hurton's avatar
Martin Hurton committed
512 513 514

        greeting_bytes_read += n;

515 516 517
        //  We have received at least one byte from the peer.
        //  If the first byte is not 0xff, we know that the
        //  peer is using unversioned protocol.
Pieter Hintjens's avatar
Pieter Hintjens committed
518
        if (greeting_recv [0] != 0xff)
519
            break;
Martin Hurton's avatar
Martin Hurton committed
520

521
        if (greeting_bytes_read < signature_size)
522
            continue;
Martin Hurton's avatar
Martin Hurton committed
523

524 525 526 527
        //  Inspect the right-most bit of the 10th byte (which coincides
        //  with the 'flags' field if a regular message was sent).
        //  Zero indicates this is a header of identity message
        //  (i.e. the peer is using the unversioned protocol).
Pieter Hintjens's avatar
Pieter Hintjens committed
528
        if (!(greeting_recv [9] & 0x01))
529
            break;
Martin Hurton's avatar
Martin Hurton committed
530

531
        //  The peer is using versioned protocol.
532 533
        //  Send the major version number.
        if (outpos + outsize == greeting_send + signature_size) {
Martin Hurton's avatar
Martin Hurton committed
534 535
            if (outsize == 0)
                set_pollout (handle);
536 537 538 539 540 541 542 543 544 545 546 547 548 549 550
            outpos [outsize++] = 3;     //  Major version number
        }

        if (greeting_bytes_read > signature_size) {
            if (outpos + outsize == greeting_send + signature_size + 1) {
                if (outsize == 0)
                    set_pollout (handle);

                //  Use ZMTP/2.0 to talk to older peers.
                if (greeting_recv [10] == ZMTP_1_0
                ||  greeting_recv [10] == ZMTP_2_0)
                    outpos [outsize++] = options.type;
                else {
                    outpos [outsize++] = 0; //  Minor version number
                    memset (outpos + outsize, 0, 20);
551 552 553

                    zmq_assert (options.mechanism == ZMQ_NULL
                            ||  options.mechanism == ZMQ_PLAIN
554 555
                            ||  options.mechanism == ZMQ_CURVE
                            ||  options.mechanism == ZMQ_GSSAPI);
556

557 558 559
                    if (options.mechanism == ZMQ_NULL)
                        memcpy (outpos + outsize, "NULL", 4);
                    else
560
                    if (options.mechanism == ZMQ_PLAIN)
561
                        memcpy (outpos + outsize, "PLAIN", 5);
562
                    else
563 564
                    if (options.mechanism == ZMQ_GSSAPI)
                        memcpy (outpos + outsize, "GSSAPI", 6);
565
                    else
566
                    if (options.mechanism == ZMQ_CURVE)
567
                        memcpy (outpos + outsize, "CURVE", 5);
568 569 570 571 572 573
                    outsize += 20;
                    memset (outpos + outsize, 0, 32);
                    outsize += 32;
                    greeting_size = v3_greeting_size;
                }
            }
Martin Hurton's avatar
Martin Hurton committed
574 575 576
        }
    }

577 578
    //  Position of the revision field in the greeting.
    const size_t revision_pos = 10;
579

580 581
    //  Is the peer using ZMTP/1.0 with no revision number?
    //  If so, we send and receive rest of identity message
Pieter Hintjens's avatar
Pieter Hintjens committed
582
    if (greeting_recv [0] != 0xff || !(greeting_recv [9] & 0x01)) {
Min RK's avatar
Min RK committed
583
        if (session->zap_enabled ()) {
584 585 586 587 588
           // reject ZMTP 1.0 connections if ZAP is enabled
           error (protocol_error);
           return false;
        }

589
        encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
590 591
        alloc_assert (encoder);

592
        decoder = new (std::nothrow) v1_decoder_t (in_batch_size, options.maxmsgsize);
593 594
        alloc_assert (decoder);

Martin Hurton's avatar
Martin Hurton committed
595 596 597 598 599 600
        //  We have already sent the message header.
        //  Since there is no way to tell the encoder to
        //  skip the message header, we simply throw that
        //  header data away.
        const size_t header_size = options.identity_size + 1 >= 255 ? 10 : 2;
        unsigned char tmp [10], *bufferp = tmp;
601 602 603 604 605 606 607

        //  Prepare the identity message and load it into encoder.
        //  Then consume bytes we have already sent to the peer.
        const int rc = tx_msg.init_size (options.identity_size);
        zmq_assert (rc == 0);
        memcpy (tx_msg.data (), options.identity, options.identity_size);
        encoder->load_msg (&tx_msg);
608
        size_t buffer_size = encoder->encode (&bufferp, header_size);
Martin Hurton's avatar
Martin Hurton committed
609 610 611
        zmq_assert (buffer_size == header_size);

        //  Make sure the decoder sees the data we have already received.
Pieter Hintjens's avatar
Pieter Hintjens committed
612
        inpos = greeting_recv;
Martin Hurton's avatar
Martin Hurton committed
613
        insize = greeting_bytes_read;
614 615

        //  To allow for interoperability with peers that do not forward
616 617
        //  their subscriptions, we inject a phantom subscription message
        //  message into the incoming message stream.
618
        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB)
619
            subscription_required = true;
Martin Hurton's avatar
Martin Hurton committed
620 621 622

        //  We are sending our identity now and the next message
        //  will come from the socket.
Martin Hurton's avatar
Martin Hurton committed
623
        next_msg = &stream_engine_t::pull_msg_from_session;
Martin Hurton's avatar
Martin Hurton committed
624 625

        //  We are expecting identity message.
Martin Hurton's avatar
Martin Hurton committed
626
        process_msg = &stream_engine_t::process_identity_msg;
Martin Hurton's avatar
Martin Hurton committed
627
    }
628
    else
Pieter Hintjens's avatar
Pieter Hintjens committed
629
    if (greeting_recv [revision_pos] == ZMTP_1_0) {
Min RK's avatar
Min RK committed
630
        if (session->zap_enabled ()) {
631 632 633 634 635
           // reject ZMTP 1.0 connections if ZAP is enabled
           error (protocol_error);
           return false;
        }

636
        encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
637 638
        alloc_assert (encoder);

639
        decoder = new (std::nothrow) v1_decoder_t (
640
            in_batch_size, options.maxmsgsize);
641 642
        alloc_assert (decoder);
    }
643 644
    else
    if (greeting_recv [revision_pos] == ZMTP_2_0) {
Min RK's avatar
Min RK committed
645
        if (session->zap_enabled ()) {
646 647 648 649 650
           // reject ZMTP 2.0 connections if ZAP is enabled
           error (protocol_error);
           return false;
        }

651
        encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
652 653 654
        alloc_assert (encoder);

        decoder = new (std::nothrow) v2_decoder_t (
655
            in_batch_size, options.maxmsgsize);
656 657
        alloc_assert (decoder);
    }
658
    else {
659
        encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
660 661
        alloc_assert (encoder);

662
        decoder = new (std::nothrow) v2_decoder_t (
663
                in_batch_size, options.maxmsgsize);
664
        alloc_assert (decoder);
665

666 667
        if (options.mechanism == ZMQ_NULL
        &&  memcmp (greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
668 669
            mechanism = new (std::nothrow)
                null_mechanism_t (session, peer_address, options);
670
            alloc_assert (mechanism);
671 672
        }
        else
673 674
        if (options.mechanism == ZMQ_PLAIN
        &&  memcmp (greeting_recv + 12, "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
675 676 677 678 679 680
            if (options.as_server)
                mechanism = new (std::nothrow)
                    plain_server_t (session, peer_address, options);
            else
                mechanism = new (std::nothrow)
                    plain_client_t (options);
681
            alloc_assert (mechanism);
682
        }
683
#ifdef ZMQ_HAVE_CURVE
684
        else
685 686
        if (options.mechanism == ZMQ_CURVE
        &&  memcmp (greeting_recv + 12, "CURVE\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
687
            if (options.as_server)
688 689
                mechanism = new (std::nothrow)
                    curve_server_t (session, peer_address, options);
690 691 692 693 694
            else
                mechanism = new (std::nothrow) curve_client_t (options);
            alloc_assert (mechanism);
        }
#endif
Chris Laws's avatar
Chris Laws committed
695
#ifdef HAVE_LIBGSSAPI_KRB5
696
        else
697 698
        if (options.mechanism == ZMQ_GSSAPI
        &&  memcmp (greeting_recv + 12, "GSSAPI\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
699 700 701 702 703
            if (options.as_server)
                mechanism = new (std::nothrow)
                    gssapi_server_t (session, peer_address, options);
            else
                mechanism = new (std::nothrow) gssapi_client_t (options);
704 705
            alloc_assert (mechanism);
        }
706
#endif
707
        else {
708
            error (protocol_error);
709 710
            return false;
        }
Martin Hurton's avatar
Martin Hurton committed
711 712
        next_msg = &stream_engine_t::next_handshake_command;
        process_msg = &stream_engine_t::process_handshake_command;
713
    }
Martin Hurton's avatar
Martin Hurton committed
714 715 716 717 718 719 720 721 722

    // Start polling for output if necessary.
    if (outsize == 0)
        set_pollout (handle);

    //  Handshaking was successful.
    //  Switch into the normal message flow.
    handshaking = false;

723 724 725 726 727
    if (has_handshake_timer) {
        cancel_timer (handshake_timer_id);
        has_handshake_timer = false;
    }

Martin Hurton's avatar
Martin Hurton committed
728 729 730
    return true;
}

Martin Hurton's avatar
Martin Hurton committed
731
int zmq::stream_engine_t::identity_msg (msg_t *msg_)
732
{
733 734 735 736
    int rc = msg_->init_size (options.identity_size);
    errno_assert (rc == 0);
    if (options.identity_size > 0)
        memcpy (msg_->data (), options.identity, options.identity_size);
Martin Hurton's avatar
Martin Hurton committed
737
    next_msg = &stream_engine_t::pull_msg_from_session;
738 739
    return 0;
}
740

Martin Hurton's avatar
Martin Hurton committed
741
int zmq::stream_engine_t::process_identity_msg (msg_t *msg_)
742 743 744 745
{
    if (options.recv_identity) {
        msg_->set_flags (msg_t::identity);
        int rc = session->push_msg (msg_);
746 747
        errno_assert (rc == 0);
    }
748 749 750 751 752 753 754 755
    else {
        int rc = msg_->close ();
        errno_assert (rc == 0);
        rc = msg_->init ();
        errno_assert (rc == 0);
    }

    if (subscription_required)
Martin Hurton's avatar
Martin Hurton committed
756
        process_msg = &stream_engine_t::write_subscription_msg;
757
    else
Martin Hurton's avatar
Martin Hurton committed
758
        process_msg = &stream_engine_t::push_msg_to_session;
759

760 761
    return 0;
}
762

763
int zmq::stream_engine_t::next_handshake_command (msg_t *msg_)
764
{
765 766
    zmq_assert (mechanism != NULL);

767 768 769 770 771 772 773 774 775 776 777 778 779 780
    if (mechanism->status () == mechanism_t::ready) {
        mechanism_ready ();
        return pull_and_encode (msg_);
    }
    else
    if (mechanism->status () == mechanism_t::error) {
        errno = EPROTO;
        return -1;
    }
    else {
        const int rc = mechanism->next_handshake_command (msg_);
        if (rc == 0)
            msg_->set_flags (msg_t::command);
        return rc;
781
    }
782 783
}

784
int zmq::stream_engine_t::process_handshake_command (msg_t *msg_)
785
{
786
    zmq_assert (mechanism != NULL);
787
    const int rc = mechanism->process_handshake_command (msg_);
788
    if (rc == 0) {
789
        if (mechanism->status () == mechanism_t::ready)
790
            mechanism_ready ();
791 792 793 794 795
        else
        if (mechanism->status () == mechanism_t::error) {
            errno = EPROTO;
            return -1;
        }
796 797
        if (output_stopped)
            restart_output ();
798 799
    }

800
    return rc;
801 802
}

803 804 805 806 807 808
void zmq::stream_engine_t::zap_msg_available ()
{
    zmq_assert (mechanism != NULL);

    const int rc = mechanism->zap_msg_available ();
    if (rc == -1) {
809
        error (protocol_error);
810 811
        return;
    }
812 813 814 815
    if (input_stopped)
        restart_input ();
    if (output_stopped)
        restart_output ();
816 817
}

818
void zmq::stream_engine_t::mechanism_ready ()
819
{
820 821 822 823 824
    if (options.heartbeat_interval > 0) {
        add_timer(options.heartbeat_interval, heartbeat_ivl_timer_id);
        has_heartbeat_timer = true;
    }

825 826 827 828
    if (options.recv_identity) {
        msg_t identity;
        mechanism->peer_identity (&identity);
        const int rc = session->push_msg (&identity);
829 830 831 832 833 834
        if (rc == -1 && errno == EAGAIN) {
            // If the write is failing at this stage with
            // an EAGAIN the pipe must be being shut down,
            // so we can just bail out of the identity set.
            return;
        }
835
        errno_assert (rc == 0);
836
        session->flush ();
837
    }
838

Martin Hurton's avatar
Martin Hurton committed
839 840
    next_msg = &stream_engine_t::pull_and_encode;
    process_msg = &stream_engine_t::write_credential;
841 842 843

    //  Compile metadata.
    properties_t properties;
Thomas Rodgers's avatar
Thomas Rodgers committed
844
    init_properties(properties);
845

Pieter Hintjens's avatar
Pieter Hintjens committed
846 847
    //  Add ZAP properties.
    const properties_t& zap_properties = mechanism->get_zap_properties ();
Thomas Rodgers's avatar
Thomas Rodgers committed
848
    properties.insert(zap_properties.begin (), zap_properties.end ());
Pieter Hintjens's avatar
Pieter Hintjens committed
849

850 851
    //  Add ZMTP properties.
    const properties_t& zmtp_properties = mechanism->get_zmtp_properties ();
Thomas Rodgers's avatar
Thomas Rodgers committed
852
    properties.insert(zmtp_properties.begin (), zmtp_properties.end ());
853

854 855 856
    zmq_assert (metadata == NULL);
    if (!properties.empty ())
        metadata = new (std::nothrow) metadata_t (properties);
857 858
}

859
int zmq::stream_engine_t::pull_msg_from_session (msg_t *msg_)
860
{
861 862
    return session->pull_msg (msg_);
}
863

864 865 866 867 868
int zmq::stream_engine_t::push_msg_to_session (msg_t *msg_)
{
    return session->push_msg (msg_);
}

869
int zmq::stream_engine_t::push_raw_msg_to_session (msg_t *msg_) {
870
    if (metadata && metadata != msg_->metadata())
871 872 873 874
        msg_->set_metadata(metadata);
    return push_msg_to_session(msg_);
}

875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893
int zmq::stream_engine_t::write_credential (msg_t *msg_)
{
    zmq_assert (mechanism != NULL);
    zmq_assert (session != NULL);

    const blob_t credential = mechanism->get_user_id ();
    if (credential.size () > 0) {
        msg_t msg;
        int rc = msg.init_size (credential.size ());
        zmq_assert (rc == 0);
        memcpy (msg.data (), credential.data (), credential.size ());
        msg.set_flags (msg_t::credential);
        rc = session->push_msg (&msg);
        if (rc == -1) {
            rc = msg.close ();
            errno_assert (rc == 0);
            return -1;
        }
    }
Martin Hurton's avatar
Martin Hurton committed
894
    process_msg = &stream_engine_t::decode_and_push;
895 896 897
    return decode_and_push (msg_);
}

898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914
int zmq::stream_engine_t::pull_and_encode (msg_t *msg_)
{
    zmq_assert (mechanism != NULL);

    if (session->pull_msg (msg_) == -1)
        return -1;
    if (mechanism->encode (msg_) == -1)
        return -1;
    return 0;
}

int zmq::stream_engine_t::decode_and_push (msg_t *msg_)
{
    zmq_assert (mechanism != NULL);

    if (mechanism->decode (msg_) == -1)
        return -1;
Jonathan Reams's avatar
Jonathan Reams committed
915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931

    if(has_timeout_timer) {
        has_timeout_timer = false;
        cancel_timer(heartbeat_timeout_timer_id);
    }

    if(has_ttl_timer) {
        has_ttl_timer = false;
        cancel_timer(heartbeat_ttl_timer_id);
    }

    if(msg_->flags() & msg_t::command) {
        uint8_t cmd_id = *((uint8_t*)msg_->data());
        if(cmd_id == 4)
            process_heartbeat_message(msg_);
    }

932
    if (metadata)
933
        msg_->set_metadata (metadata);
934 935
    if (session->push_msg (msg_) == -1) {
        if (errno == EAGAIN)
Martin Hurton's avatar
Martin Hurton committed
936
            process_msg = &stream_engine_t::push_one_then_decode_and_push;
937 938 939 940 941 942 943 944 945
        return -1;
    }
    return 0;
}

int zmq::stream_engine_t::push_one_then_decode_and_push (msg_t *msg_)
{
    const int rc = session->push_msg (msg_);
    if (rc == 0)
Martin Hurton's avatar
Martin Hurton committed
946
        process_msg = &stream_engine_t::decode_and_push;
947 948 949
    return rc;
}

950 951 952
int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_)
{
    msg_t subscription;
953

954 955
    //  Inject the subscription message, so that also
    //  ZMQ 2.x peers receive published messages.
956 957 958 959 960 961
    int rc = subscription.init_size (1);
    errno_assert (rc == 0);
    *(unsigned char*) subscription.data () = 1;
    rc = session->push_msg (&subscription);
    if (rc == -1)
       return -1;
962

Martin Hurton's avatar
Martin Hurton committed
963
    process_msg = &stream_engine_t::push_msg_to_session;
964
    return push_msg_to_session (msg_);
965 966
}

967
void zmq::stream_engine_t::error (error_reason_t reason)
968
{
969
    if (options.raw_socket && options.raw_notify) {
970 971 972 973
        //  For raw sockets, send a final 0-length message to the application
        //  so that it knows the peer has been disconnected.
        msg_t terminator;
        terminator.init();
Martin Hurton's avatar
Martin Hurton committed
974
        (this->*process_msg) (&terminator);
975 976
        terminator.close();
    }
977
    zmq_assert (session);
978
    socket->event_disconnected (endpoint, (int) s);
979
    session->flush ();
980
    session->engine_error (reason);
981 982 983 984
    unplug ();
    delete this;
}

985
void zmq::stream_engine_t::set_handshake_timer ()
986
{
987
    zmq_assert (!has_handshake_timer);
988

989
    if (!options.raw_socket && options.handshake_ivl > 0) {
990 991
        add_timer (options.handshake_ivl, handshake_timer_id);
        has_handshake_timer = true;
992
    }
993 994
}

Thomas Rodgers's avatar
Thomas Rodgers committed
995 996 997
bool zmq::stream_engine_t::init_properties (properties_t & properties) {
    if (peer_address.empty()) return false;
    properties.insert (std::make_pair("Peer-Address", peer_address));
998 999

    //  Private property to support deprecated SRCFD
1000 1001 1002 1003
    std::ostringstream stream;
    stream << (int)s;
    std::string fd_string = stream.str();
    properties.insert(std::make_pair("__fd", fd_string));
Thomas Rodgers's avatar
Thomas Rodgers committed
1004 1005 1006
    return true;
}

1007
void zmq::stream_engine_t::timer_event (int id_)
1008
{
Jonathan Reams's avatar
Jonathan Reams committed
1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037
    if(id_ == handshake_timer_id) {
        has_handshake_timer = false;
        //  handshake timer expired before handshake completed, so engine fail
        error (timeout_error);
    }
    else if(id_ == heartbeat_ivl_timer_id) {
        next_msg = &stream_engine_t::produce_ping_message;
        out_event();
        add_timer(options.heartbeat_interval, heartbeat_ivl_timer_id);
    }
    else if(id_ == heartbeat_ttl_timer_id) {
        has_ttl_timer = false;
        error(timeout_error);
    }
    else if(id_ == heartbeat_timeout_timer_id) {
        has_timeout_timer = false;
        error(timeout_error);
    }
    else
        // There are no other valid timer ids!
        assert(false);
}

int zmq::stream_engine_t::produce_ping_message(msg_t * msg_)
{
    int rc = 0;
    zmq_assert (mechanism != NULL);

    // 16-bit TTL + \4PING == 7
1038
    rc = msg_->init_size(7);
1039 1040
    errno_assert(rc == 0);
    msg_->set_flags(msg_t::command);
Jonathan Reams's avatar
Jonathan Reams committed
1041 1042 1043 1044 1045 1046 1047 1048
    // Copy in the command message
    memcpy(msg_->data(), "\4PING", 5);

    uint16_t ttl_val = htons(options.heartbeat_ttl);
    memcpy(((uint8_t*)msg_->data()) + 5, &ttl_val, sizeof(ttl_val));

    rc = mechanism->encode (msg_);
    next_msg = &stream_engine_t::pull_and_encode;
1049 1050
    if(!has_timeout_timer && heartbeat_timeout > 0) {
        add_timer(heartbeat_timeout, heartbeat_timeout_timer_id);
Jonathan Reams's avatar
Jonathan Reams committed
1051 1052 1053 1054 1055 1056 1057 1058 1059 1060
        has_timeout_timer = true;
    }
    return rc;
}

int zmq::stream_engine_t::produce_pong_message(msg_t * msg_)
{
    int rc = 0;
    zmq_assert (mechanism != NULL);

1061
    rc = msg_->init_size(5);
1062 1063
    errno_assert(rc == 0);
    msg_->set_flags(msg_t::command);
Jonathan Reams's avatar
Jonathan Reams committed
1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079

    memcpy(msg_->data(), "\4PONG", 5);

    rc = mechanism->encode (msg_);
    next_msg = &stream_engine_t::pull_and_encode;
    return rc;
}

int zmq::stream_engine_t::process_heartbeat_message(msg_t * msg_)
{
    if(memcmp(msg_->data(), "\4PING", 5) == 0) {
        uint16_t remote_heartbeat_ttl;
        // Get the remote heartbeat TTL to setup the timer
        memcpy(&remote_heartbeat_ttl, (uint8_t*)msg_->data() + 5, 2);
        remote_heartbeat_ttl = ntohs(remote_heartbeat_ttl);
        // The remote heartbeat is in 10ths of a second
1080 1081
        // so we multiply it by 100 to get the timer interval in ms.
        remote_heartbeat_ttl *= 100;
Jonathan Reams's avatar
Jonathan Reams committed
1082 1083 1084 1085 1086

        if(!has_ttl_timer && remote_heartbeat_ttl > 0) {
            add_timer(remote_heartbeat_ttl, heartbeat_ttl_timer_id);
            has_ttl_timer = true;
        }
1087

Jonathan Reams's avatar
Jonathan Reams committed
1088 1089 1090 1091 1092
        next_msg = &stream_engine_t::produce_pong_message;
        out_event();
    }

    return 0;
1093
}