proxy.cpp 26.5 KB
Newer Older
Pieter Hintjens's avatar
Pieter Hintjens committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Pieter Hintjens's avatar
Pieter Hintjens committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Pieter Hintjens's avatar
Pieter Hintjens committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Pieter Hintjens's avatar
Pieter Hintjens committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Pieter Hintjens's avatar
Pieter Hintjens committed
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30 31 32 33 34 35 36 37 38 39
//  On AIX platform, poll.h has to be included first to get consistent
//  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
//  instead of 'events' and 'revents' and defines macros to map from POSIX-y
//  names to AIX-specific names).
//  zmq.h must be included *after* poll.h for AIX to build properly.
//  precompiled.hpp includes include/zmq.h
#if defined ZMQ_POLL_BASED_ON_POLL && defined ZMQ_HAVE_AIX
#include <poll.h>
#endif

40
#include "precompiled.hpp"
Pieter Hintjens's avatar
Pieter Hintjens committed
41
#include <stddef.h>
42
#include "poller.hpp"
43
#include "proxy.hpp"
Pieter Hintjens's avatar
Pieter Hintjens committed
44 45
#include "likely.hpp"

46 47
#if defined ZMQ_POLL_BASED_ON_POLL && !defined ZMQ_HAVE_WINDOWS                \
  && !defined ZMQ_HAVE_AIX
Pieter Hintjens's avatar
Pieter Hintjens committed
48 49 50
#include <poll.h>
#endif

51 52 53 54 55
// These headers end up pulling in zmq.h somewhere in their include
// dependency chain
#include "socket_base.hpp"
#include "err.hpp"

56 57 58 59 60 61 62
#ifdef ZMQ_HAVE_POLLER

#include "socket_poller.hpp"

//  Macros for repetitive code.

//  PROXY_CLEANUP() must not be used before these variables are initialized.
63 64 65 66 67 68 69 70 71 72
#define PROXY_CLEANUP()                                                        \
    do {                                                                       \
        delete poller_all;                                                     \
        delete poller_in;                                                      \
        delete poller_control;                                                 \
        delete poller_receive_blocked;                                         \
        delete poller_send_blocked;                                            \
        delete poller_both_blocked;                                            \
        delete poller_frontend_only;                                           \
        delete poller_backend_only;                                            \
73
    } while (false)
74 75


76 77 78 79 80 81
#define CHECK_RC_EXIT_ON_FAILURE()                                             \
    do {                                                                       \
        if (rc < 0) {                                                          \
            PROXY_CLEANUP ();                                                  \
            return close_and_return (&msg, -1);                                \
        }                                                                      \
82
    } while (false)
83 84 85

#endif //  ZMQ_HAVE_POLLER

86 87 88 89 90 91 92 93 94 95 96 97 98 99

// Control socket messages

typedef struct
{
    uint64_t msg_in;
    uint64_t bytes_in;
    uint64_t msg_out;
    uint64_t bytes_out;
} zmq_socket_stats_t;


// Utility functions

100 101 102
int capture (class zmq::socket_base_t *capture_,
             zmq::msg_t &msg_,
             int more_ = 0)
103 104 105 106 107 108 109 110 111 112
{
    //  Copy message to capture socket if any
    if (capture_) {
        zmq::msg_t ctrl;
        int rc = ctrl.init ();
        if (unlikely (rc < 0))
            return -1;
        rc = ctrl.copy (msg_);
        if (unlikely (rc < 0))
            return -1;
113
        rc = capture_->send (&ctrl, more_ ? ZMQ_SNDMORE : 0);
114 115 116 117 118 119
        if (unlikely (rc < 0))
            return -1;
    }
    return 0;
}

120 121 122 123 124 125
int forward (class zmq::socket_base_t *from_,
             zmq_socket_stats_t *from_stats,
             class zmq::socket_base_t *to_,
             zmq_socket_stats_t *to_stats,
             class zmq::socket_base_t *capture_,
             zmq::msg_t &msg_)
126 127 128
{
    int more;
    size_t moresz;
129
    size_t complete_msg_size = 0;
130
    while (true) {
131 132 133 134
        int rc = from_->recv (&msg_, 0);
        if (unlikely (rc < 0))
            return -1;

135
        complete_msg_size += msg_.size ();
136

137 138 139 140 141 142
        moresz = sizeof more;
        rc = from_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
        if (unlikely (rc < 0))
            return -1;

        //  Copy message to capture socket if any
143
        rc = capture (capture_, msg_, more);
144 145 146
        if (unlikely (rc < 0))
            return -1;

147
        rc = to_->send (&msg_, more ? ZMQ_SNDMORE : 0);
148 149
        if (unlikely (rc < 0))
            return -1;
150

151 152 153
        if (more == 0)
            break;
    }
154 155 156 157 158 159 160 161 162 163

    // A multipart message counts as 1 packet:
    from_stats->msg_in++;
    from_stats->bytes_in += complete_msg_size;
    to_stats->msg_out++;
    to_stats->bytes_out += complete_msg_size;

    return 0;
}

164
static int loop_and_send_multipart_stat (zmq::socket_base_t *control_,
165 166 167
                                         uint64_t stat,
                                         bool first,
                                         bool more)
168 169 170 171 172 173
{
    int rc;
    zmq::msg_t msg;

    //  VSM of 8 bytes can't fail to init
    msg.init_size (sizeof (uint64_t));
174
    memcpy (msg.data (), (const void *) &stat, sizeof (uint64_t));
175

176
    //  if the first message is handed to the pipe successfully then the HWM
177 178 179 180 181 182 183 184 185
    //  is not full, which means failures are due to interrupts (on Windows pipes
    //  are TCP sockets), so keep retrying
    do {
        rc = control_->send (&msg, more ? ZMQ_SNDMORE : 0);
    } while (!first && rc != 0 && errno == EAGAIN);

    return rc;
}

186 187 188
int reply_stats (class zmq::socket_base_t *control_,
                 zmq_socket_stats_t *frontend_stats,
                 zmq_socket_stats_t *backend_stats)
189
{
190
    // first part: frontend stats - the first send might fail due to HWM
191 192 193
    if (loop_and_send_multipart_stat (control_, frontend_stats->msg_in, true,
                                      true)
        != 0)
194
        return -1;
195

196 197 198 199 200 201
    loop_and_send_multipart_stat (control_, frontend_stats->bytes_in, false,
                                  true);
    loop_and_send_multipart_stat (control_, frontend_stats->msg_out, false,
                                  true);
    loop_and_send_multipart_stat (control_, frontend_stats->bytes_out, false,
                                  true);
202 203

    // second part: backend stats
204
    loop_and_send_multipart_stat (control_, backend_stats->msg_in, false, true);
205 206 207 208 209 210
    loop_and_send_multipart_stat (control_, backend_stats->bytes_in, false,
                                  true);
    loop_and_send_multipart_stat (control_, backend_stats->msg_out, false,
                                  true);
    loop_and_send_multipart_stat (control_, backend_stats->bytes_out, false,
                                  false);
211

212 213
    return 0;
}
Pieter Hintjens's avatar
Pieter Hintjens committed
214

215

216 217
#ifdef ZMQ_HAVE_POLLER

218 219 220 221
int zmq::proxy (class socket_base_t *frontend_,
                class socket_base_t *backend_,
                class socket_base_t *capture_,
                class socket_base_t *control_)
222 223 224 225 226 227 228 229 230 231 232 233 234
{
    msg_t msg;
    int rc = msg.init ();
    if (rc != 0)
        return -1;

    //  The algorithm below assumes ratio of requests and replies processed
    //  under full load to be 1:1.

    int more;
    size_t moresz = sizeof (more);

    //  Proxy can be in these three states
235 236
    enum
    {
237 238 239 240 241 242 243 244 245 246 247
        active,
        paused,
        terminated
    } state = active;

    bool frontend_equal_to_backend;
    bool frontend_in = false;
    bool frontend_out = false;
    bool backend_in = false;
    bool backend_out = false;
    bool control_in = false;
248
    zmq::socket_poller_t::event_t events[3];
249 250
    zmq_socket_stats_t frontend_stats;
    zmq_socket_stats_t backend_stats;
251 252
    memset (&frontend_stats, 0, sizeof (frontend_stats));
    memset (&backend_stats, 0, sizeof (backend_stats));
253 254 255 256

    //  Don't allocate these pollers from stack because they will take more than 900 kB of stack!
    //  On Windows this blows up default stack of 1 MB and aborts the program.
    //  I wanted to use std::shared_ptr here as the best solution but that requires C++11...
257 258 259 260 261 262 263 264
    zmq::socket_poller_t *poller_all =
      new (std::nothrow) zmq::socket_poller_t; //  Poll for everything.
    zmq::socket_poller_t *poller_in = new (std::nothrow) zmq::
      socket_poller_t; //  Poll only 'ZMQ_POLLIN' on all sockets. Initial blocking poll in loop.
    zmq::socket_poller_t *poller_control = new (std::nothrow) zmq::
      socket_poller_t; //  Poll only for 'ZMQ_POLLIN' on 'control_', when proxy is paused.
    zmq::socket_poller_t *poller_receive_blocked = new (std::nothrow)
      zmq::socket_poller_t; //  All except 'ZMQ_POLLIN' on 'frontend_'.
265 266 267

    //  If frontend_==backend_ 'poller_send_blocked' and 'poller_receive_blocked' are the same, 'ZMQ_POLLIN' is ignored.
    //  In that case 'poller_send_blocked' is not used. We need only 'poller_receive_blocked'.
268
    //  We also don't need 'poller_both_blocked', 'poller_backend_only' nor 'poller_frontend_only' no need to initialize it.
269
    //  We save some RAM and time for initialization.
270 271 272 273 274 275 276 277
    zmq::socket_poller_t *poller_send_blocked =
      NULL; //  All except 'ZMQ_POLLIN' on 'backend_'.
    zmq::socket_poller_t *poller_both_blocked =
      NULL; //  All except 'ZMQ_POLLIN' on both 'frontend_' and 'backend_'.
    zmq::socket_poller_t *poller_frontend_only =
      NULL; //  Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'frontend_'.
    zmq::socket_poller_t *poller_backend_only =
      NULL; //  Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'backend_'.
278 279

    if (frontend_ != backend_) {
280 281 282 283 284 285 286 287
        poller_send_blocked = new (std::nothrow)
          zmq::socket_poller_t; //  All except 'ZMQ_POLLIN' on 'backend_'.
        poller_both_blocked = new (std::nothrow) zmq::
          socket_poller_t; //  All except 'ZMQ_POLLIN' on both 'frontend_' and 'backend_'.
        poller_frontend_only = new (std::nothrow) zmq::
          socket_poller_t; //  Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'frontend_'.
        poller_backend_only = new (std::nothrow) zmq::
          socket_poller_t; //  Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'backend_'.
288 289 290 291
        frontend_equal_to_backend = false;
    } else
        frontend_equal_to_backend = true;

292 293 294 295
    if (poller_all == NULL || poller_in == NULL || poller_control == NULL
        || poller_receive_blocked == NULL
        || ((poller_send_blocked == NULL || poller_both_blocked == NULL)
            && !frontend_equal_to_backend)) {
296 297 298 299
        PROXY_CLEANUP ();
        return close_and_return (&msg, -1);
    }

300 301
    zmq::socket_poller_t *poller_wait =
      poller_in; //  Poller for blocking wait, initially all 'ZMQ_POLLIN'.
302

303 304 305
    //  Register 'frontend_' and 'backend_' with pollers.
    rc = poller_all->add (frontend_, NULL,
                          ZMQ_POLLIN | ZMQ_POLLOUT); //  Everything.
306
    CHECK_RC_EXIT_ON_FAILURE ();
307
    rc = poller_in->add (frontend_, NULL, ZMQ_POLLIN); //  All 'ZMQ_POLLIN's.
308 309 310 311 312 313 314 315 316
    CHECK_RC_EXIT_ON_FAILURE ();

    if (frontend_equal_to_backend) {
        //  If frontend_==backend_ 'poller_send_blocked' and 'poller_receive_blocked' are the same,
        //  so we don't need 'poller_send_blocked'. We need only 'poller_receive_blocked'.
        //  We also don't need 'poller_both_blocked', no need to initialize it.
        rc = poller_receive_blocked->add (frontend_, NULL, ZMQ_POLLOUT);
        CHECK_RC_EXIT_ON_FAILURE ();
    } else {
317 318
        rc = poller_all->add (backend_, NULL,
                              ZMQ_POLLIN | ZMQ_POLLOUT); //  Everything.
319
        CHECK_RC_EXIT_ON_FAILURE ();
320
        rc = poller_in->add (backend_, NULL, ZMQ_POLLIN); //  All 'ZMQ_POLLIN's.
321
        CHECK_RC_EXIT_ON_FAILURE ();
322 323
        rc = poller_both_blocked->add (
          frontend_, NULL, ZMQ_POLLOUT); //  Waiting only for 'ZMQ_POLLOUT'.
324
        CHECK_RC_EXIT_ON_FAILURE ();
325 326
        rc = poller_both_blocked->add (
          backend_, NULL, ZMQ_POLLOUT); //  Waiting only for 'ZMQ_POLLOUT'.
327
        CHECK_RC_EXIT_ON_FAILURE ();
328 329 330
        rc = poller_send_blocked->add (
          backend_, NULL,
          ZMQ_POLLOUT); //  All except 'ZMQ_POLLIN' on 'backend_'.
331
        CHECK_RC_EXIT_ON_FAILURE ();
332 333 334
        rc = poller_send_blocked->add (
          frontend_, NULL,
          ZMQ_POLLIN | ZMQ_POLLOUT); //  All except 'ZMQ_POLLIN' on 'backend_'.
335
        CHECK_RC_EXIT_ON_FAILURE ();
336 337 338
        rc = poller_receive_blocked->add (
          frontend_, NULL,
          ZMQ_POLLOUT); //  All except 'ZMQ_POLLIN' on 'frontend_'.
339
        CHECK_RC_EXIT_ON_FAILURE ();
340 341 342
        rc = poller_receive_blocked->add (
          backend_, NULL,
          ZMQ_POLLIN | ZMQ_POLLOUT); //  All except 'ZMQ_POLLIN' on 'frontend_'.
343
        CHECK_RC_EXIT_ON_FAILURE ();
344 345
        rc =
          poller_frontend_only->add (frontend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT);
346
        CHECK_RC_EXIT_ON_FAILURE ();
347 348
        rc =
          poller_backend_only->add (backend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT);
349
        CHECK_RC_EXIT_ON_FAILURE ();
350 351 352 353 354 355 356 357
    }

    //  Register 'control_' with pollers.
    if (control_ != NULL) {
        rc = poller_all->add (control_, NULL, ZMQ_POLLIN);
        CHECK_RC_EXIT_ON_FAILURE ();
        rc = poller_in->add (control_, NULL, ZMQ_POLLIN);
        CHECK_RC_EXIT_ON_FAILURE ();
358 359 360
        rc = poller_control->add (
          control_, NULL,
          ZMQ_POLLIN); //  When proxy is paused we wait only for ZMQ_POLLIN on 'control_' socket.
361 362 363 364 365 366 367 368
        CHECK_RC_EXIT_ON_FAILURE ();
        rc = poller_receive_blocked->add (control_, NULL, ZMQ_POLLIN);
        CHECK_RC_EXIT_ON_FAILURE ();
        if (!frontend_equal_to_backend) {
            rc = poller_send_blocked->add (control_, NULL, ZMQ_POLLIN);
            CHECK_RC_EXIT_ON_FAILURE ();
            rc = poller_both_blocked->add (control_, NULL, ZMQ_POLLIN);
            CHECK_RC_EXIT_ON_FAILURE ();
369 370 371 372
            rc = poller_frontend_only->add (control_, NULL, ZMQ_POLLIN);
            CHECK_RC_EXIT_ON_FAILURE ();
            rc = poller_backend_only->add (control_, NULL, ZMQ_POLLIN);
            CHECK_RC_EXIT_ON_FAILURE ();
373 374 375 376 377 378 379 380 381 382 383 384 385
        }
    }


    int i;
    bool request_processed, reply_processed;


    while (state != terminated) {
        //  Blocking wait initially only for 'ZMQ_POLLIN' - 'poller_wait' points to 'poller_in'.
        //  If one of receiving end's queue is full ('ZMQ_POLLOUT' not available),
        //  'poller_wait' is pointed to 'poller_receive_blocked', 'poller_send_blocked' or 'poller_both_blocked'.
        rc = poller_wait->wait (events, 3, -1);
386
        if (rc < 0 && errno == EAGAIN)
387 388 389 390 391
            rc = 0;
        CHECK_RC_EXIT_ON_FAILURE ();

        //  Some of events waited for by 'poller_wait' have arrived, now poll for everything without blocking.
        rc = poller_all->wait (events, 3, 0);
392
        if (rc < 0 && errno == EAGAIN)
393 394 395 396 397
            rc = 0;
        CHECK_RC_EXIT_ON_FAILURE ();

        //  Process events.
        for (i = 0; i < rc; i++) {
398 399 400
            if (events[i].socket == frontend_) {
                frontend_in = (events[i].events & ZMQ_POLLIN) != 0;
                frontend_out = (events[i].events & ZMQ_POLLOUT) != 0;
401
            } else
402 403 404 405 406 407 408
              //  This 'if' needs to be after check for 'frontend_' in order never
              //  to be reached in case frontend_==backend_, so we ensure backend_in=false in that case.
              if (events[i].socket == backend_) {
                backend_in = (events[i].events & ZMQ_POLLIN) != 0;
                backend_out = (events[i].events & ZMQ_POLLOUT) != 0;
            } else if (events[i].socket == control_)
                control_in = (events[i].events & ZMQ_POLLIN) != 0;
409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428
        }


        //  Process a control command if any.
        if (control_in) {
            rc = control_->recv (&msg, 0);
            CHECK_RC_EXIT_ON_FAILURE ();
            rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
            if (unlikely (rc < 0) || more) {
                PROXY_CLEANUP ();
                return close_and_return (&msg, -1);
            }

            //  Copy message to capture socket if any.
            rc = capture (capture_, msg);
            CHECK_RC_EXIT_ON_FAILURE ();

            if (msg.size () == 5 && memcmp (msg.data (), "PAUSE", 5) == 0) {
                state = paused;
                poller_wait = poller_control;
429 430 431 432 433 434 435 436 437
            } else if (msg.size () == 6
                       && memcmp (msg.data (), "RESUME", 6) == 0) {
                state = active;
                poller_wait = poller_in;
            } else {
                if (msg.size () == 9
                    && memcmp (msg.data (), "TERMINATE", 9) == 0)
                    state = terminated;
                else {
438
#ifdef ZMQ_BUILD_DRAFT_API
439 440 441 442 443 444
                    if (msg.size () == 10
                        && memcmp (msg.data (), "STATISTICS", 10) == 0) {
                        rc = reply_stats (control_, &frontend_stats,
                                          &backend_stats);
                        CHECK_RC_EXIT_ON_FAILURE ();
                    } else {
445
#endif
446 447 448
                        //  This is an API error, we assert
                        puts ("E: invalid command sent to proxy");
                        zmq_assert (false);
449
#ifdef ZMQ_BUILD_DRAFT_API
450
                    }
451
#endif
452
                }
453
            }
454
            control_in = false;
455 456 457 458 459 460
        }

        if (state == active) {
            //  Process a request, 'ZMQ_POLLIN' on 'frontend_' and 'ZMQ_POLLOUT' on 'backend_'.
            //  In case of frontend_==backend_ there's no 'ZMQ_POLLOUT' event.
            if (frontend_in && (backend_out || frontend_equal_to_backend)) {
461 462
                rc = forward (frontend_, &frontend_stats, backend_,
                              &backend_stats, capture_, msg);
463 464 465
                CHECK_RC_EXIT_ON_FAILURE ();
                request_processed = true;
                frontend_in = backend_out = false;
466 467
            } else
                request_processed = false;
468 469

            //  Process a reply, 'ZMQ_POLLIN' on 'backend_' and 'ZMQ_POLLOUT' on 'frontend_'.
470
            //  If 'frontend_' and 'backend_' are the same this is not needed because previous processing
471 472 473
            //  covers all of the cases. 'backend_in' is always false if frontend_==backend_ due to
            //  design in 'for' event processing loop.
            if (backend_in && frontend_out) {
474 475
                rc = forward (backend_, &backend_stats, frontend_,
                              &frontend_stats, capture_, msg);
476 477 478
                CHECK_RC_EXIT_ON_FAILURE ();
                reply_processed = true;
                backend_in = frontend_out = false;
479 480
            } else
                reply_processed = false;
481 482 483 484 485

            if (request_processed || reply_processed) {
                //  If request/reply is processed that means we had at least one 'ZMQ_POLLOUT' event.
                //  Enable corresponding 'ZMQ_POLLIN' for blocking wait if any was disabled.
                if (poller_wait != poller_in) {
486
                    if (request_processed) { //  'frontend_' -> 'backend_'
487 488
                        if (poller_wait == poller_both_blocked)
                            poller_wait = poller_send_blocked;
489 490 491
                        else if (poller_wait == poller_receive_blocked
                                 || poller_wait == poller_frontend_only)
                            poller_wait = poller_in;
492
                    }
493
                    if (reply_processed) { //  'backend_' -> 'frontend_'
494 495
                        if (poller_wait == poller_both_blocked)
                            poller_wait = poller_receive_blocked;
496 497 498
                        else if (poller_wait == poller_send_blocked
                                 || poller_wait == poller_backend_only)
                            poller_wait = poller_in;
499 500 501
                    }
                }
            } else {
502 503 504 505
                //  No requests have been processed, there were no 'ZMQ_POLLIN' with corresponding 'ZMQ_POLLOUT' events.
                //  That means that out queue(s) is/are full or one out queue is full and second one has no messages to process.
                //  Disable receiving 'ZMQ_POLLIN' for sockets for which there's no 'ZMQ_POLLOUT',
                //  or wait only on both 'backend_''s or 'frontend_''s 'ZMQ_POLLIN' and 'ZMQ_POLLOUT'.
506
                if (frontend_in) {
507 508 509 510 511 512 513 514
                    if (frontend_out)
                        // If frontend_in and frontend_out are true, obviously backend_in and backend_out are both false.
                        // In that case we need to wait for both 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' only on 'backend_'.
                        // We'll never get here in case of frontend_==backend_ because then frontend_out will always be false.
                        poller_wait = poller_backend_only;
                    else {
                        if (poller_wait == poller_send_blocked)
                            poller_wait = poller_both_blocked;
515 516
                        else if (poller_wait == poller_in)
                            poller_wait = poller_receive_blocked;
517
                    }
518 519 520 521
                }
                if (backend_in) {
                    //  Will never be reached if frontend_==backend_, 'backend_in' will
                    //  always be false due to design in 'for' event processing loop.
522 523 524 525 526 527 528
                    if (backend_out)
                        // If backend_in and backend_out are true, obviously frontend_in and frontend_out are both false.
                        // In that case we need to wait for both 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' only on 'frontend_'.
                        poller_wait = poller_frontend_only;
                    else {
                        if (poller_wait == poller_receive_blocked)
                            poller_wait = poller_both_blocked;
529 530
                        else if (poller_wait == poller_in)
                            poller_wait = poller_send_blocked;
531
                    }
532 533 534 535 536 537 538 539 540 541
                }
            }
        }
    }
    PROXY_CLEANUP ();
    return close_and_return (&msg, 0);
}

#else //  ZMQ_HAVE_POLLER

542 543 544 545
int zmq::proxy (class socket_base_t *frontend_,
                class socket_base_t *backend_,
                class socket_base_t *capture_,
                class socket_base_t *control_)
Pieter Hintjens's avatar
Pieter Hintjens committed
546 547 548
{
    msg_t msg;
    int rc = msg.init ();
Pieter Hintjens's avatar
Pieter Hintjens committed
549
    if (rc != 0)
Pieter Hintjens's avatar
Pieter Hintjens committed
550 551
        return -1;

552
    //  The algorithm below assumes ratio of requests and replies processed
Pieter Hintjens's avatar
Pieter Hintjens committed
553
    //  under full load to be 1:1.
554

Pieter Hintjens's avatar
Pieter Hintjens committed
555
    int more;
Pieter Hintjens's avatar
Pieter Hintjens committed
556
    size_t moresz;
557 558 559
    zmq_pollitem_t items[] = {{frontend_, 0, ZMQ_POLLIN, 0},
                              {backend_, 0, ZMQ_POLLIN, 0},
                              {control_, 0, ZMQ_POLLIN, 0}};
560
    int qt_poll_items = (control_ ? 3 : 2);
561 562
    zmq_pollitem_t itemsout[] = {{frontend_, 0, ZMQ_POLLOUT, 0},
                                 {backend_, 0, ZMQ_POLLOUT, 0}};
Pieter Hintjens's avatar
Pieter Hintjens committed
563

564
    zmq_socket_stats_t frontend_stats;
565
    memset (&frontend_stats, 0, sizeof (frontend_stats));
566
    zmq_socket_stats_t backend_stats;
567
    memset (&backend_stats, 0, sizeof (backend_stats));
568

Pieter Hintjens's avatar
Pieter Hintjens committed
569
    //  Proxy can be in these three states
570 571
    enum
    {
Pieter Hintjens's avatar
Pieter Hintjens committed
572 573 574 575 576 577
        active,
        paused,
        terminated
    } state = active;

    while (state != terminated) {
Pieter Hintjens's avatar
Pieter Hintjens committed
578
        //  Wait while there are either requests or replies to process.
579
        rc = zmq_poll (&items[0], qt_poll_items, -1);
Pieter Hintjens's avatar
Pieter Hintjens committed
580
        if (unlikely (rc < 0))
581
            return close_and_return (&msg, -1);
Pieter Hintjens's avatar
Pieter Hintjens committed
582

583
        //  Get the pollout separately because when combining this with pollin it maxes the CPU
584 585 586
        //  because pollout shall most of the time return directly.
        //  POLLOUT is only checked when frontend and backend sockets are not the same.
        if (frontend_ != backend_) {
587
            rc = zmq_poll (&itemsout[0], 2, 0);
588
            if (unlikely (rc < 0)) {
589
                return close_and_return (&msg, -1);
590 591
            }
        }
592

593
        //  Process a control command if any
594
        if (control_ && items[2].revents & ZMQ_POLLIN) {
Pieter Hintjens's avatar
Pieter Hintjens committed
595 596
            rc = control_->recv (&msg, 0);
            if (unlikely (rc < 0))
597
                return close_and_return (&msg, -1);
Pieter Hintjens's avatar
Pieter Hintjens committed
598 599 600 601

            moresz = sizeof more;
            rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
            if (unlikely (rc < 0) || more)
602
                return close_and_return (&msg, -1);
Pieter Hintjens's avatar
Pieter Hintjens committed
603 604

            //  Copy message to capture socket if any
605
            rc = capture (capture_, msg);
606
            if (unlikely (rc < 0))
607
                return close_and_return (&msg, -1);
608

609
            if (msg.size () == 5 && memcmp (msg.data (), "PAUSE", 5) == 0)
Pieter Hintjens's avatar
Pieter Hintjens committed
610
                state = paused;
611 612 613 614 615 616
            else if (msg.size () == 6 && memcmp (msg.data (), "RESUME", 6) == 0)
                state = active;
            else if (msg.size () == 9
                     && memcmp (msg.data (), "TERMINATE", 9) == 0)
                state = terminated;
            else {
617
#ifdef ZMQ_BUILD_DRAFT_API
618 619 620 621 622 623 624
                if (msg.size () == 10
                    && memcmp (msg.data (), "STATISTICS", 10) == 0) {
                    rc =
                      reply_stats (control_, &frontend_stats, &backend_stats);
                    if (unlikely (rc < 0))
                        return close_and_return (&msg, -1);
                } else {
625
#endif
626 627 628
                    //  This is an API error, we assert
                    puts ("E: invalid command sent to proxy");
                    zmq_assert (false);
629
#ifdef ZMQ_BUILD_DRAFT_API
630
                }
631
#endif
632
            }
633
        }
634
        //  Process a request
635 636 637 638
        if (state == active && items[0].revents & ZMQ_POLLIN
            && (frontend_ == backend_ || itemsout[1].revents & ZMQ_POLLOUT)) {
            rc = forward (frontend_, &frontend_stats, backend_, &backend_stats,
                          capture_, msg);
639
            if (unlikely (rc < 0))
640
                return close_and_return (&msg, -1);
641 642
        }
        //  Process a reply
643 644 645 646 647
        if (state == active && frontend_ != backend_
            && items[1].revents & ZMQ_POLLIN
            && itemsout[0].revents & ZMQ_POLLOUT) {
            rc = forward (backend_, &backend_stats, frontend_, &frontend_stats,
                          capture_, msg);
648
            if (unlikely (rc < 0))
649
                return close_and_return (&msg, -1);
Pieter Hintjens's avatar
Pieter Hintjens committed
650 651
        }
    }
652 653

    return close_and_return (&msg, 0);
Pieter Hintjens's avatar
Pieter Hintjens committed
654
}
655 656

#endif //  ZMQ_HAVE_POLLER