stream_engine.cpp 32 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "macros.hpp"
31 32 33 34 35 36 37 38 39 40 41
#include "platform.hpp"
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#else
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <netdb.h>
#include <fcntl.h>
42 43 44
#if defined ZMQ_HAVE_OPENBSD
#define ucred sockpeercred
#endif
45 46 47 48
#endif

#include <string.h>
#include <new>
49
#include <sstream>
50

51
#include "stream_engine.hpp"
52
#include "io_thread.hpp"
53
#include "session_base.hpp"
54 55
#include "v1_encoder.hpp"
#include "v1_decoder.hpp"
56 57
#include "v2_encoder.hpp"
#include "v2_decoder.hpp"
58
#include "null_mechanism.hpp"
59 60
#include "plain_client.hpp"
#include "plain_server.hpp"
61 62
#include "gssapi_client.hpp"
#include "gssapi_server.hpp"
63 64
#include "curve_client.hpp"
#include "curve_server.hpp"
65 66
#include "raw_decoder.hpp"
#include "raw_encoder.hpp"
67 68
#include "config.hpp"
#include "err.hpp"
69
#include "ip.hpp"
70
#include "tcp.hpp"
Martin Hurton's avatar
Martin Hurton committed
71 72
#include "likely.hpp"
#include "wire.hpp"
73

Chris Laws's avatar
Chris Laws committed
74
zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
75
                                       const std::string &endpoint_) :
76
    s (fd_),
77 78
    inpos (NULL),
    insize (0),
79
    decoder (NULL),
80 81
    outpos (NULL),
    outsize (0),
82
    encoder (NULL),
83
    metadata (NULL),
Martin Hurton's avatar
Martin Hurton committed
84
    handshaking (true),
85
    greeting_size (v2_greeting_size),
Martin Hurton's avatar
Martin Hurton committed
86
    greeting_bytes_read (0),
87 88
    session (NULL),
    options (options_),
89
    endpoint (endpoint_),
90
    plugged (false),
Martin Hurton's avatar
Martin Hurton committed
91 92
    next_msg (&stream_engine_t::identity_msg),
    process_msg (&stream_engine_t::process_identity_msg),
93 94
    io_error (false),
    subscription_required (false),
95
    mechanism (NULL),
96 97
    input_stopped (false),
    output_stopped (false),
98
    has_handshake_timer (false),
Jonathan Reams's avatar
Jonathan Reams committed
99 100 101
    has_ttl_timer (false),
    has_timeout_timer (false),
    has_heartbeat_timer (false),
102
    heartbeat_timeout (0),
103
    socket (NULL)
104
{
105 106
    int rc = tx_msg.init ();
    errno_assert (rc == 0);
Chris Laws's avatar
Chris Laws committed
107

Martin Hurton's avatar
Martin Hurton committed
108
    //  Put the socket into non-blocking mode.
109
    unblock_socket (s);
110

111 112
    int family = get_peer_ip_address (s, peer_address);
    if (family == 0)
113
        peer_address.clear();
114
#if defined ZMQ_HAVE_SO_PEERCRED
115 116
    else
    if (family == PF_UNIX) {
117 118 119 120 121 122 123 124 125
        struct ucred cred;
        socklen_t size = sizeof (cred);
        if (!getsockopt (s, SOL_SOCKET, SO_PEERCRED, &cred, &size)) {
            std::ostringstream buf;
            buf << ":" << cred.uid << ":" << cred.gid << ":" << cred.pid;
            peer_address += buf.str ();
        }
    }
#elif defined ZMQ_HAVE_LOCAL_PEERCRED
126 127
    else
    if (family == PF_UNIX) {
128 129 130 131 132 133 134 135 136 137 138 139 140
        struct xucred cred;
        socklen_t size = sizeof (cred);
        if (!getsockopt (s, 0, LOCAL_PEERCRED, &cred, &size)
                && cred.cr_version == XUCRED_VERSION) {
            std::ostringstream buf;
            buf << ":" << cred.cr_uid << ":";
            if (cred.cr_ngroups > 0)
                buf << cred.cr_groups[0];
            buf << ":";
            peer_address += buf.str ();
        }
    }
#endif
141

142
#ifdef SO_NOSIGPIPE
143 144 145
    //  Make sure that SIGPIPE signal is not generated when writing to a
    //  connection that was already closed by the peer.
    int set = 1;
146
    rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int));
147 148
    errno_assert (rc == 0);
#endif
149 150 151 152 153
    if(options.heartbeat_interval > 0) {
        heartbeat_timeout = options.heartbeat_timeout;
        if(heartbeat_timeout == -1)
            heartbeat_timeout = options.heartbeat_interval;
    }
154 155
}

156
zmq::stream_engine_t::~stream_engine_t ()
157 158 159
{
    zmq_assert (!plugged);

160 161
    if (s != retired_fd) {
#ifdef ZMQ_HAVE_WINDOWS
162 163
        int rc = closesocket (s);
        wsa_assert (rc != SOCKET_ERROR);
164
#else
165
        int rc = close (s);
166 167
        errno_assert (rc == 0);
#endif
168
        s = retired_fd;
169
    }
170

171 172 173
    int rc = tx_msg.close ();
    errno_assert (rc == 0);

174 175
    //  Drop reference to metadata and destroy it if we are
    //  the only user.
176 177 178 179 180
    if (metadata != NULL) {
        if (metadata->drop_ref ()) {
            LIBZMQ_DELETE(metadata);
        }
    }
181

182 183 184
    LIBZMQ_DELETE(encoder);
    LIBZMQ_DELETE(decoder);
    LIBZMQ_DELETE(mechanism);
185 186
}

187 188
void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
    session_base_t *session_)
189 190 191 192 193 194 195 196
{
    zmq_assert (!plugged);
    plugged = true;

    //  Connect to session object.
    zmq_assert (!session);
    zmq_assert (session_);
    session = session_;
197
    socket = session-> get_socket ();
198 199 200 201

    //  Connect to I/O threads poller object.
    io_object_t::plug (io_thread_);
    handle = add_fd (s);
202
    io_error = false;
Martin Hurton's avatar
Martin Hurton committed
203

204
    if (options.raw_socket) {
205
        // no handshaking for raw sock, instantiate raw encoder and decoders
206
        encoder = new (std::nothrow) raw_encoder_t (options.sndbuf);
207 208
        alloc_assert (encoder);

209
        decoder = new (std::nothrow) raw_decoder_t (options.rcvbuf);
210 211 212 213
        alloc_assert (decoder);

        // disable handshaking for raw socket
        handshaking = false;
214

Martin Hurton's avatar
Martin Hurton committed
215
        next_msg = &stream_engine_t::pull_msg_from_session;
216 217
        process_msg = &stream_engine_t::push_raw_msg_to_session;

Thomas Rodgers's avatar
Thomas Rodgers committed
218 219
        properties_t properties;
        if (init_properties(properties)) {
220 221 222 223
            //  Compile metadata.
            zmq_assert (metadata == NULL);
            metadata = new (std::nothrow) metadata_t (properties);
        }
224

225 226 227 228 229
        if (options.raw_notify) {
            //  For raw sockets, send an initial 0-length message to the
            // application so that it knows a peer has connected.
            msg_t connector;
            connector.init();
230
            push_raw_msg_to_session (&connector);
231 232 233
            connector.close();
            session->flush ();
        }
Martin Hurton's avatar
Martin Hurton committed
234 235
    }
    else {
236 237 238
        // start optional timer, to prevent handshake hanging on no input
        set_handshake_timer ();

239 240
        //  Send the 'length' and 'flags' fields of the identity message.
        //  The 'length' field is encoded in the long format.
Pieter Hintjens's avatar
Pieter Hintjens committed
241
        outpos = greeting_send;
242 243 244 245 246
        outpos [outsize++] = 0xff;
        put_uint64 (&outpos [outsize], options.identity_size + 1);
        outsize += 8;
        outpos [outsize++] = 0x7f;
    }
Martin Hurton's avatar
Martin Hurton committed
247

248 249 250 251 252 253
    set_pollin (handle);
    set_pollout (handle);
    //  Flush all the data that may have been already received downstream.
    in_event ();
}

254
void zmq::stream_engine_t::unplug ()
255 256 257 258
{
    zmq_assert (plugged);
    plugged = false;

259 260 261 262 263 264
    //  Cancel all timers.
    if (has_handshake_timer) {
        cancel_timer (handshake_timer_id);
        has_handshake_timer = false;
    }

Jonathan Reams's avatar
Jonathan Reams committed
265 266 267 268 269 270 271 272 273 274 275 276 277 278
    if (has_ttl_timer) {
        cancel_timer (heartbeat_ttl_timer_id);
        has_ttl_timer = false;
    }

    if (has_timeout_timer) {
        cancel_timer (heartbeat_timeout_timer_id);
        has_timeout_timer = false;
    }

    if (has_heartbeat_timer) {
        cancel_timer (heartbeat_ivl_timer_id);
        has_heartbeat_timer = false;
    }
279
    //  Cancel all fd subscriptions.
280
    if (!io_error)
Martin Hurton's avatar
Martin Hurton committed
281
        rm_fd (handle);
282 283 284 285 286 287 288

    //  Disconnect from I/O threads poller object.
    io_object_t::unplug ();

    session = NULL;
}

289
void zmq::stream_engine_t::terminate ()
290 291 292 293 294
{
    unplug ();
    delete this;
}

295
void zmq::stream_engine_t::in_event ()
296
{
297
    zmq_assert (!io_error);
298

Pieter Hintjens's avatar
Pieter Hintjens committed
299
    //  If still handshaking, receive and process the greeting message.
Martin Hurton's avatar
Martin Hurton committed
300 301 302 303
    if (unlikely (handshaking))
        if (!handshake ())
            return;

304
    zmq_assert (decoder);
305 306

    //  If there has been an I/O error, stop polling.
307
    if (input_stopped) {
308 309 310 311
        rm_fd (handle);
        io_error = true;
        return;
    }
312 313 314 315 316 317 318 319

    //  If there's no data to process in the buffer...
    if (!insize) {

        //  Retrieve the buffer and read as much data as possible.
        //  Note that buffer can be arbitrarily large. However, we assume
        //  the underlying TCP layer has fixed buffer size and thus the
        //  number of bytes read will be always limited.
320 321
        size_t bufsize = 0;
        decoder->get_buffer (&inpos, &bufsize);
322

323
        const int rc = tcp_read (s, inpos, bufsize);
324

325
        if (rc == 0) {
326
            error (connection_error);
327
            return;
328
        }
329 330
        if (rc == -1) {
            if (errno != EAGAIN)
331
                error (connection_error);
332 333
            return;
        }
334

335
        //  Adjust input size
336
        insize = static_cast <size_t> (rc);
337 338
        // Adjust buffer size to received bytes
        decoder->resize_buffer(insize);
Martin Hurton's avatar
Martin Hurton committed
339
    }
340

341 342
    int rc = 0;
    size_t processed = 0;
343

344 345 346
    while (insize > 0) {
        rc = decoder->decode (inpos, insize, processed);
        zmq_assert (processed <= insize);
347 348
        inpos += processed;
        insize -= processed;
349 350
        if (rc == 0 || rc == -1)
            break;
Martin Hurton's avatar
Martin Hurton committed
351
        rc = (this->*process_msg) (decoder->msg ());
352 353
        if (rc == -1)
            break;
354 355
    }

356 357 358 359
    //  Tear down the connection if we have failed to decode input data
    //  or the session has rejected the message.
    if (rc == -1) {
        if (errno != EAGAIN) {
360
            error (protocol_error);
361 362
            return;
        }
363
        input_stopped = true;
364
        reset_pollin (handle);
Martin Hurton's avatar
Martin Hurton committed
365
    }
366 367

    session->flush ();
368 369
}

370
void zmq::stream_engine_t::out_event ()
371
{
372 373
    zmq_assert (!io_error);

374 375 376
    //  If write buffer is empty, try to read new data from the encoder.
    if (!outsize) {

Martin Hurton's avatar
Martin Hurton committed
377 378 379 380 381 382 383 384
        //  Even when we stop polling as soon as there is no
        //  data to send, the poller may invoke out_event one
        //  more time due to 'speculative write' optimisation.
        if (unlikely (encoder == NULL)) {
            zmq_assert (handshaking);
            return;
        }

385
        outpos = NULL;
386 387
        outsize = encoder->encode (&outpos, 0);

388
        while (outsize < (size_t) options.sndbuf) {
Martin Hurton's avatar
Martin Hurton committed
389
            if ((this->*next_msg) (&tx_msg) == -1)
390 391 392
                break;
            encoder->load_msg (&tx_msg);
            unsigned char *bufptr = outpos + outsize;
393
            size_t n = encoder->encode (&bufptr, options.sndbuf - outsize);
394 395 396 397 398
            zmq_assert (n > 0);
            if (outpos == NULL)
                outpos = bufptr;
            outsize += n;
        }
399 400 401

        //  If there is no data to send, stop polling for output.
        if (outsize == 0) {
402
            output_stopped = true;
403 404 405 406 407 408 409
            reset_pollout (handle);
            return;
        }
    }

    //  If there are any data to write in write buffer, write as much as
    //  possible to the socket. Note that amount of data to write can be
410
    //  arbitrarily large. However, we assume that underlying TCP layer has
411 412
    //  limited transmission buffer and thus the actual number of bytes
    //  written should be reasonably modest.
413
    const int nbytes = tcp_write (s, outpos, outsize);
414

Martin Hurton's avatar
Martin Hurton committed
415 416
    //  IO error has occurred. We stop waiting for output events.
    //  The engine is not terminated until we detect input error;
417
    //  this is necessary to prevent losing incoming messages.
418
    if (nbytes == -1) {
Martin Hurton's avatar
Martin Hurton committed
419
        reset_pollout (handle);
420 421 422 423 424
        return;
    }

    outpos += nbytes;
    outsize -= nbytes;
Martin Hurton's avatar
Martin Hurton committed
425 426 427 428 429 430

    //  If we are still handshaking and there are no data
    //  to send, stop polling for output.
    if (unlikely (handshaking))
        if (outsize == 0)
            reset_pollout (handle);
431 432
}

433
void zmq::stream_engine_t::restart_output ()
434
{
435 436 437
    if (unlikely (io_error))
        return;

438
    if (likely (output_stopped)) {
439
        set_pollout (handle);
440
        output_stopped = false;
441
    }
442 443 444 445 446 447 448 449

    //  Speculative write: The assumption is that at the moment new message
    //  was sent by the user the socket is probably available for writing.
    //  Thus we try to write the data to socket avoiding polling for POLLOUT.
    //  Consequently, the latency should be better in request/reply scenarios.
    out_event ();
}

450
void zmq::stream_engine_t::restart_input ()
451
{
452
    zmq_assert (input_stopped);
453 454 455
    zmq_assert (session != NULL);
    zmq_assert (decoder != NULL);

Martin Hurton's avatar
Martin Hurton committed
456
    int rc = (this->*process_msg) (decoder->msg ());
457 458 459 460
    if (rc == -1) {
        if (errno == EAGAIN)
            session->flush ();
        else
461
            error (protocol_error);
Martin Hurton's avatar
Martin Hurton committed
462 463 464
        return;
    }

465 466 467 468 469 470 471 472
    while (insize > 0) {
        size_t processed = 0;
        rc = decoder->decode (inpos, insize, processed);
        zmq_assert (processed <= insize);
        inpos += processed;
        insize -= processed;
        if (rc == 0 || rc == -1)
            break;
Martin Hurton's avatar
Martin Hurton committed
473
        rc = (this->*process_msg) (decoder->msg ());
474 475 476
        if (rc == -1)
            break;
    }
477

478 479 480
    if (rc == -1 && errno == EAGAIN)
        session->flush ();
    else
481 482 483 484 485
    if (io_error)
        error (connection_error);
    else
    if (rc == -1)
        error (protocol_error);
486
    else {
487
        input_stopped = false;
488 489 490 491 492 493
        set_pollin (handle);
        session->flush ();

        //  Speculative read.
        in_event ();
    }
494 495
}

496
bool zmq::stream_engine_t::handshake ()
Martin Hurton's avatar
Martin Hurton committed
497
{
498
    zmq_assert (handshaking);
Martin Hurton's avatar
Martin Hurton committed
499
    zmq_assert (greeting_bytes_read < greeting_size);
500
    //  Receive the greeting.
Martin Hurton's avatar
Martin Hurton committed
501
    while (greeting_bytes_read < greeting_size) {
502 503
        const int n = tcp_read (s, greeting_recv + greeting_bytes_read,
                                greeting_size - greeting_bytes_read);
504
        if (n == 0) {
505
            error (connection_error);
506 507
            return false;
        }
508 509
        if (n == -1) {
            if (errno != EAGAIN)
510
                error (connection_error);
511
            return false;
512
        }
Martin Hurton's avatar
Martin Hurton committed
513 514 515

        greeting_bytes_read += n;

516 517 518
        //  We have received at least one byte from the peer.
        //  If the first byte is not 0xff, we know that the
        //  peer is using unversioned protocol.
Pieter Hintjens's avatar
Pieter Hintjens committed
519
        if (greeting_recv [0] != 0xff)
520
            break;
Martin Hurton's avatar
Martin Hurton committed
521

522
        if (greeting_bytes_read < signature_size)
523
            continue;
Martin Hurton's avatar
Martin Hurton committed
524

525 526 527 528
        //  Inspect the right-most bit of the 10th byte (which coincides
        //  with the 'flags' field if a regular message was sent).
        //  Zero indicates this is a header of identity message
        //  (i.e. the peer is using the unversioned protocol).
Pieter Hintjens's avatar
Pieter Hintjens committed
529
        if (!(greeting_recv [9] & 0x01))
530
            break;
Martin Hurton's avatar
Martin Hurton committed
531

532
        //  The peer is using versioned protocol.
533 534
        //  Send the major version number.
        if (outpos + outsize == greeting_send + signature_size) {
Martin Hurton's avatar
Martin Hurton committed
535 536
            if (outsize == 0)
                set_pollout (handle);
537 538 539 540 541 542 543 544 545 546 547 548 549 550 551
            outpos [outsize++] = 3;     //  Major version number
        }

        if (greeting_bytes_read > signature_size) {
            if (outpos + outsize == greeting_send + signature_size + 1) {
                if (outsize == 0)
                    set_pollout (handle);

                //  Use ZMTP/2.0 to talk to older peers.
                if (greeting_recv [10] == ZMTP_1_0
                ||  greeting_recv [10] == ZMTP_2_0)
                    outpos [outsize++] = options.type;
                else {
                    outpos [outsize++] = 0; //  Minor version number
                    memset (outpos + outsize, 0, 20);
552 553 554

                    zmq_assert (options.mechanism == ZMQ_NULL
                            ||  options.mechanism == ZMQ_PLAIN
555 556
                            ||  options.mechanism == ZMQ_CURVE
                            ||  options.mechanism == ZMQ_GSSAPI);
557

558 559 560
                    if (options.mechanism == ZMQ_NULL)
                        memcpy (outpos + outsize, "NULL", 4);
                    else
561
                    if (options.mechanism == ZMQ_PLAIN)
562
                        memcpy (outpos + outsize, "PLAIN", 5);
563
                    else
564 565
                    if (options.mechanism == ZMQ_GSSAPI)
                        memcpy (outpos + outsize, "GSSAPI", 6);
566
                    else
567
                    if (options.mechanism == ZMQ_CURVE)
568
                        memcpy (outpos + outsize, "CURVE", 5);
569 570 571 572 573 574
                    outsize += 20;
                    memset (outpos + outsize, 0, 32);
                    outsize += 32;
                    greeting_size = v3_greeting_size;
                }
            }
Martin Hurton's avatar
Martin Hurton committed
575 576 577
        }
    }

578 579
    //  Position of the revision field in the greeting.
    const size_t revision_pos = 10;
580

581 582
    //  Is the peer using ZMTP/1.0 with no revision number?
    //  If so, we send and receive rest of identity message
Pieter Hintjens's avatar
Pieter Hintjens committed
583
    if (greeting_recv [0] != 0xff || !(greeting_recv [9] & 0x01)) {
Min RK's avatar
Min RK committed
584
        if (session->zap_enabled ()) {
585 586 587 588 589
           // reject ZMTP 1.0 connections if ZAP is enabled
           error (protocol_error);
           return false;
        }

590
        encoder = new (std::nothrow) v1_encoder_t (options.sndbuf);
591 592
        alloc_assert (encoder);

593
        decoder = new (std::nothrow) v1_decoder_t (options.rcvbuf, options.maxmsgsize);
594 595
        alloc_assert (decoder);

Martin Hurton's avatar
Martin Hurton committed
596 597 598 599 600 601
        //  We have already sent the message header.
        //  Since there is no way to tell the encoder to
        //  skip the message header, we simply throw that
        //  header data away.
        const size_t header_size = options.identity_size + 1 >= 255 ? 10 : 2;
        unsigned char tmp [10], *bufferp = tmp;
602 603 604 605 606 607 608

        //  Prepare the identity message and load it into encoder.
        //  Then consume bytes we have already sent to the peer.
        const int rc = tx_msg.init_size (options.identity_size);
        zmq_assert (rc == 0);
        memcpy (tx_msg.data (), options.identity, options.identity_size);
        encoder->load_msg (&tx_msg);
609
        size_t buffer_size = encoder->encode (&bufferp, header_size);
Martin Hurton's avatar
Martin Hurton committed
610 611 612
        zmq_assert (buffer_size == header_size);

        //  Make sure the decoder sees the data we have already received.
Pieter Hintjens's avatar
Pieter Hintjens committed
613
        inpos = greeting_recv;
Martin Hurton's avatar
Martin Hurton committed
614
        insize = greeting_bytes_read;
615 616

        //  To allow for interoperability with peers that do not forward
617 618
        //  their subscriptions, we inject a phantom subscription message
        //  message into the incoming message stream.
619
        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB)
620
            subscription_required = true;
Martin Hurton's avatar
Martin Hurton committed
621 622 623

        //  We are sending our identity now and the next message
        //  will come from the socket.
Martin Hurton's avatar
Martin Hurton committed
624
        next_msg = &stream_engine_t::pull_msg_from_session;
Martin Hurton's avatar
Martin Hurton committed
625 626

        //  We are expecting identity message.
Martin Hurton's avatar
Martin Hurton committed
627
        process_msg = &stream_engine_t::process_identity_msg;
Martin Hurton's avatar
Martin Hurton committed
628
    }
629
    else
Pieter Hintjens's avatar
Pieter Hintjens committed
630
    if (greeting_recv [revision_pos] == ZMTP_1_0) {
Min RK's avatar
Min RK committed
631
        if (session->zap_enabled ()) {
632 633 634 635 636
           // reject ZMTP 1.0 connections if ZAP is enabled
           error (protocol_error);
           return false;
        }

637
        encoder = new (std::nothrow) v1_encoder_t (
638
           options.sndbuf);
639 640
        alloc_assert (encoder);

641
        decoder = new (std::nothrow) v1_decoder_t (
642
            options.rcvbuf, options.maxmsgsize);
643 644
        alloc_assert (decoder);
    }
645 646
    else
    if (greeting_recv [revision_pos] == ZMTP_2_0) {
Min RK's avatar
Min RK committed
647
        if (session->zap_enabled ()) {
648 649 650 651 652
           // reject ZMTP 2.0 connections if ZAP is enabled
           error (protocol_error);
           return false;
        }

653
        encoder = new (std::nothrow) v2_encoder_t (options.sndbuf);
654 655 656
        alloc_assert (encoder);

        decoder = new (std::nothrow) v2_decoder_t (
657
            options.rcvbuf, options.maxmsgsize);
658 659
        alloc_assert (decoder);
    }
660
    else {
661
        encoder = new (std::nothrow) v2_encoder_t (options.sndbuf);
662 663
        alloc_assert (encoder);

664
        decoder = new (std::nothrow) v2_decoder_t (
665
                options.rcvbuf, options.maxmsgsize);
666
        alloc_assert (decoder);
667

668 669
        if (options.mechanism == ZMQ_NULL
        &&  memcmp (greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
670 671
            mechanism = new (std::nothrow)
                null_mechanism_t (session, peer_address, options);
672
            alloc_assert (mechanism);
673 674
        }
        else
675 676
        if (options.mechanism == ZMQ_PLAIN
        &&  memcmp (greeting_recv + 12, "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
677 678 679 680 681 682
            if (options.as_server)
                mechanism = new (std::nothrow)
                    plain_server_t (session, peer_address, options);
            else
                mechanism = new (std::nothrow)
                    plain_client_t (options);
683
            alloc_assert (mechanism);
684
        }
685
#ifdef ZMQ_HAVE_CURVE
686
        else
687 688
        if (options.mechanism == ZMQ_CURVE
        &&  memcmp (greeting_recv + 12, "CURVE\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
689
            if (options.as_server)
690 691
                mechanism = new (std::nothrow)
                    curve_server_t (session, peer_address, options);
692 693 694 695 696
            else
                mechanism = new (std::nothrow) curve_client_t (options);
            alloc_assert (mechanism);
        }
#endif
Chris Laws's avatar
Chris Laws committed
697
#ifdef HAVE_LIBGSSAPI_KRB5
698
        else
699 700
        if (options.mechanism == ZMQ_GSSAPI
        &&  memcmp (greeting_recv + 12, "GSSAPI\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
701 702 703 704 705
            if (options.as_server)
                mechanism = new (std::nothrow)
                    gssapi_server_t (session, peer_address, options);
            else
                mechanism = new (std::nothrow) gssapi_client_t (options);
706 707
            alloc_assert (mechanism);
        }
708
#endif
709
        else {
710
            error (protocol_error);
711 712
            return false;
        }
Martin Hurton's avatar
Martin Hurton committed
713 714
        next_msg = &stream_engine_t::next_handshake_command;
        process_msg = &stream_engine_t::process_handshake_command;
715
    }
Martin Hurton's avatar
Martin Hurton committed
716 717 718 719 720 721 722 723 724

    // Start polling for output if necessary.
    if (outsize == 0)
        set_pollout (handle);

    //  Handshaking was successful.
    //  Switch into the normal message flow.
    handshaking = false;

725 726 727 728 729
    if (has_handshake_timer) {
        cancel_timer (handshake_timer_id);
        has_handshake_timer = false;
    }

Martin Hurton's avatar
Martin Hurton committed
730 731 732
    return true;
}

Martin Hurton's avatar
Martin Hurton committed
733
int zmq::stream_engine_t::identity_msg (msg_t *msg_)
734
{
735 736 737 738
    int rc = msg_->init_size (options.identity_size);
    errno_assert (rc == 0);
    if (options.identity_size > 0)
        memcpy (msg_->data (), options.identity, options.identity_size);
Martin Hurton's avatar
Martin Hurton committed
739
    next_msg = &stream_engine_t::pull_msg_from_session;
740 741
    return 0;
}
742

Martin Hurton's avatar
Martin Hurton committed
743
int zmq::stream_engine_t::process_identity_msg (msg_t *msg_)
744 745 746 747
{
    if (options.recv_identity) {
        msg_->set_flags (msg_t::identity);
        int rc = session->push_msg (msg_);
748 749
        errno_assert (rc == 0);
    }
750 751 752 753 754 755 756 757
    else {
        int rc = msg_->close ();
        errno_assert (rc == 0);
        rc = msg_->init ();
        errno_assert (rc == 0);
    }

    if (subscription_required)
Martin Hurton's avatar
Martin Hurton committed
758
        process_msg = &stream_engine_t::write_subscription_msg;
759
    else
Martin Hurton's avatar
Martin Hurton committed
760
        process_msg = &stream_engine_t::push_msg_to_session;
761

762 763
    return 0;
}
764

765
int zmq::stream_engine_t::next_handshake_command (msg_t *msg_)
766
{
767 768
    zmq_assert (mechanism != NULL);

769 770 771 772 773 774 775 776 777 778 779 780 781 782
    if (mechanism->status () == mechanism_t::ready) {
        mechanism_ready ();
        return pull_and_encode (msg_);
    }
    else
    if (mechanism->status () == mechanism_t::error) {
        errno = EPROTO;
        return -1;
    }
    else {
        const int rc = mechanism->next_handshake_command (msg_);
        if (rc == 0)
            msg_->set_flags (msg_t::command);
        return rc;
783
    }
784 785
}

786
int zmq::stream_engine_t::process_handshake_command (msg_t *msg_)
787
{
788
    zmq_assert (mechanism != NULL);
789
    const int rc = mechanism->process_handshake_command (msg_);
790
    if (rc == 0) {
791
        if (mechanism->status () == mechanism_t::ready)
792
            mechanism_ready ();
793 794 795 796 797
        else
        if (mechanism->status () == mechanism_t::error) {
            errno = EPROTO;
            return -1;
        }
798 799
        if (output_stopped)
            restart_output ();
800 801
    }

802
    return rc;
803 804
}

805 806 807 808 809 810
void zmq::stream_engine_t::zap_msg_available ()
{
    zmq_assert (mechanism != NULL);

    const int rc = mechanism->zap_msg_available ();
    if (rc == -1) {
811
        error (protocol_error);
812 813
        return;
    }
814 815 816 817
    if (input_stopped)
        restart_input ();
    if (output_stopped)
        restart_output ();
818 819
}

820
void zmq::stream_engine_t::mechanism_ready ()
821
{
822 823 824 825 826
    if (options.heartbeat_interval > 0) {
        add_timer(options.heartbeat_interval, heartbeat_ivl_timer_id);
        has_heartbeat_timer = true;
    }

827 828 829 830
    if (options.recv_identity) {
        msg_t identity;
        mechanism->peer_identity (&identity);
        const int rc = session->push_msg (&identity);
831 832 833 834 835 836
        if (rc == -1 && errno == EAGAIN) {
            // If the write is failing at this stage with
            // an EAGAIN the pipe must be being shut down,
            // so we can just bail out of the identity set.
            return;
        }
837
        errno_assert (rc == 0);
838
        session->flush ();
839
    }
840

Martin Hurton's avatar
Martin Hurton committed
841 842
    next_msg = &stream_engine_t::pull_and_encode;
    process_msg = &stream_engine_t::write_credential;
843 844 845

    //  Compile metadata.
    properties_t properties;
Thomas Rodgers's avatar
Thomas Rodgers committed
846
    init_properties(properties);
847

Pieter Hintjens's avatar
Pieter Hintjens committed
848 849
    //  Add ZAP properties.
    const properties_t& zap_properties = mechanism->get_zap_properties ();
Thomas Rodgers's avatar
Thomas Rodgers committed
850
    properties.insert(zap_properties.begin (), zap_properties.end ());
Pieter Hintjens's avatar
Pieter Hintjens committed
851

852 853
    //  Add ZMTP properties.
    const properties_t& zmtp_properties = mechanism->get_zmtp_properties ();
Thomas Rodgers's avatar
Thomas Rodgers committed
854
    properties.insert(zmtp_properties.begin (), zmtp_properties.end ());
855

856 857 858
    zmq_assert (metadata == NULL);
    if (!properties.empty ())
        metadata = new (std::nothrow) metadata_t (properties);
859 860
}

861
int zmq::stream_engine_t::pull_msg_from_session (msg_t *msg_)
862
{
863 864
    return session->pull_msg (msg_);
}
865

866 867 868 869 870
int zmq::stream_engine_t::push_msg_to_session (msg_t *msg_)
{
    return session->push_msg (msg_);
}

871
int zmq::stream_engine_t::push_raw_msg_to_session (msg_t *msg_) {
872
    if (metadata && metadata != msg_->metadata())
873 874 875 876
        msg_->set_metadata(metadata);
    return push_msg_to_session(msg_);
}

877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895
int zmq::stream_engine_t::write_credential (msg_t *msg_)
{
    zmq_assert (mechanism != NULL);
    zmq_assert (session != NULL);

    const blob_t credential = mechanism->get_user_id ();
    if (credential.size () > 0) {
        msg_t msg;
        int rc = msg.init_size (credential.size ());
        zmq_assert (rc == 0);
        memcpy (msg.data (), credential.data (), credential.size ());
        msg.set_flags (msg_t::credential);
        rc = session->push_msg (&msg);
        if (rc == -1) {
            rc = msg.close ();
            errno_assert (rc == 0);
            return -1;
        }
    }
Martin Hurton's avatar
Martin Hurton committed
896
    process_msg = &stream_engine_t::decode_and_push;
897 898 899
    return decode_and_push (msg_);
}

900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916
int zmq::stream_engine_t::pull_and_encode (msg_t *msg_)
{
    zmq_assert (mechanism != NULL);

    if (session->pull_msg (msg_) == -1)
        return -1;
    if (mechanism->encode (msg_) == -1)
        return -1;
    return 0;
}

int zmq::stream_engine_t::decode_and_push (msg_t *msg_)
{
    zmq_assert (mechanism != NULL);

    if (mechanism->decode (msg_) == -1)
        return -1;
Jonathan Reams's avatar
Jonathan Reams committed
917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933

    if(has_timeout_timer) {
        has_timeout_timer = false;
        cancel_timer(heartbeat_timeout_timer_id);
    }

    if(has_ttl_timer) {
        has_ttl_timer = false;
        cancel_timer(heartbeat_ttl_timer_id);
    }

    if(msg_->flags() & msg_t::command) {
        uint8_t cmd_id = *((uint8_t*)msg_->data());
        if(cmd_id == 4)
            process_heartbeat_message(msg_);
    }

934
    if (metadata)
935
        msg_->set_metadata (metadata);
936 937
    if (session->push_msg (msg_) == -1) {
        if (errno == EAGAIN)
Martin Hurton's avatar
Martin Hurton committed
938
            process_msg = &stream_engine_t::push_one_then_decode_and_push;
939 940 941 942 943 944 945 946 947
        return -1;
    }
    return 0;
}

int zmq::stream_engine_t::push_one_then_decode_and_push (msg_t *msg_)
{
    const int rc = session->push_msg (msg_);
    if (rc == 0)
Martin Hurton's avatar
Martin Hurton committed
948
        process_msg = &stream_engine_t::decode_and_push;
949 950 951
    return rc;
}

952 953 954
int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_)
{
    msg_t subscription;
955

956 957
    //  Inject the subscription message, so that also
    //  ZMQ 2.x peers receive published messages.
958 959 960 961 962 963
    int rc = subscription.init_size (1);
    errno_assert (rc == 0);
    *(unsigned char*) subscription.data () = 1;
    rc = session->push_msg (&subscription);
    if (rc == -1)
       return -1;
964

Martin Hurton's avatar
Martin Hurton committed
965
    process_msg = &stream_engine_t::push_msg_to_session;
966
    return push_msg_to_session (msg_);
967 968
}

969
void zmq::stream_engine_t::error (error_reason_t reason)
970
{
971
    if (options.raw_socket && options.raw_notify) {
972 973 974 975
        //  For raw sockets, send a final 0-length message to the application
        //  so that it knows the peer has been disconnected.
        msg_t terminator;
        terminator.init();
Martin Hurton's avatar
Martin Hurton committed
976
        (this->*process_msg) (&terminator);
977 978
        terminator.close();
    }
979
    zmq_assert (session);
980
    socket->event_disconnected (endpoint, (int) s);
981
    session->flush ();
982
    session->engine_error (reason);
983 984 985 986
    unplug ();
    delete this;
}

987
void zmq::stream_engine_t::set_handshake_timer ()
988
{
989
    zmq_assert (!has_handshake_timer);
990

991
    if (!options.raw_socket && options.handshake_ivl > 0) {
992 993
        add_timer (options.handshake_ivl, handshake_timer_id);
        has_handshake_timer = true;
994
    }
995 996
}

Thomas Rodgers's avatar
Thomas Rodgers committed
997 998 999 1000 1001 1002
bool zmq::stream_engine_t::init_properties (properties_t & properties) {
    if (peer_address.empty()) return false;
    properties.insert (std::make_pair("Peer-Address", peer_address));
    return true;
}

1003
void zmq::stream_engine_t::timer_event (int id_)
1004
{
Jonathan Reams's avatar
Jonathan Reams committed
1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043
    if(id_ == handshake_timer_id) {
        has_handshake_timer = false;
        //  handshake timer expired before handshake completed, so engine fail
        error (timeout_error);
    }
    else if(id_ == heartbeat_ivl_timer_id) {
        next_msg = &stream_engine_t::produce_ping_message;
        out_event();
        add_timer(options.heartbeat_interval, heartbeat_ivl_timer_id);
    }
    else if(id_ == heartbeat_ttl_timer_id) {
        has_ttl_timer = false;
        error(timeout_error);
    }
    else if(id_ == heartbeat_timeout_timer_id) {
        has_timeout_timer = false;
        error(timeout_error);
    }
    else
        // There are no other valid timer ids!
        assert(false);
}

int zmq::stream_engine_t::produce_ping_message(msg_t * msg_)
{
    int rc = 0;
    zmq_assert (mechanism != NULL);

    // 16-bit TTL + \4PING == 7
    msg_->init_size(7);
    msg_->set_flags(msg_t::command);
    // Copy in the command message
    memcpy(msg_->data(), "\4PING", 5);

    uint16_t ttl_val = htons(options.heartbeat_ttl);
    memcpy(((uint8_t*)msg_->data()) + 5, &ttl_val, sizeof(ttl_val));

    rc = mechanism->encode (msg_);
    next_msg = &stream_engine_t::pull_and_encode;
1044 1045
    if(!has_timeout_timer && heartbeat_timeout > 0) {
        add_timer(heartbeat_timeout, heartbeat_timeout_timer_id);
Jonathan Reams's avatar
Jonathan Reams committed
1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073
        has_timeout_timer = true;
    }
    return rc;
}

int zmq::stream_engine_t::produce_pong_message(msg_t * msg_)
{
    int rc = 0;
    zmq_assert (mechanism != NULL);

    msg_->init_size(5);
    msg_->set_flags(msg_t::command);

    memcpy(msg_->data(), "\4PONG", 5);

    rc = mechanism->encode (msg_);
    next_msg = &stream_engine_t::pull_and_encode;
    return rc;
}

int zmq::stream_engine_t::process_heartbeat_message(msg_t * msg_)
{
    if(memcmp(msg_->data(), "\4PING", 5) == 0) {
        uint16_t remote_heartbeat_ttl;
        // Get the remote heartbeat TTL to setup the timer
        memcpy(&remote_heartbeat_ttl, (uint8_t*)msg_->data() + 5, 2);
        remote_heartbeat_ttl = ntohs(remote_heartbeat_ttl);
        // The remote heartbeat is in 10ths of a second
1074 1075
        // so we multiply it by 100 to get the timer interval in ms.
        remote_heartbeat_ttl *= 100;
Jonathan Reams's avatar
Jonathan Reams committed
1076 1077 1078 1079 1080

        if(!has_ttl_timer && remote_heartbeat_ttl > 0) {
            add_timer(remote_heartbeat_ttl, heartbeat_ttl_timer_id);
            has_ttl_timer = true;
        }
1081

Jonathan Reams's avatar
Jonathan Reams committed
1082 1083 1084 1085 1086
        next_msg = &stream_engine_t::produce_pong_message;
        out_event();
    }

    return 0;
1087
}