proxy.cpp 6.6 KB
Newer Older
Pieter Hintjens's avatar
Pieter Hintjens committed
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Pieter Hintjens's avatar
Pieter Hintjens committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Pieter Hintjens's avatar
Pieter Hintjens committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Pieter Hintjens's avatar
Pieter Hintjens committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Pieter Hintjens's avatar
Pieter Hintjens committed
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30 31 32 33 34 35 36 37 38 39
//  On AIX platform, poll.h has to be included first to get consistent
//  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
//  instead of 'events' and 'revents' and defines macros to map from POSIX-y
//  names to AIX-specific names).
//  zmq.h must be included *after* poll.h for AIX to build properly.
//  precompiled.hpp includes include/zmq.h
#if defined ZMQ_POLL_BASED_ON_POLL && defined ZMQ_HAVE_AIX
#include <poll.h>
#endif

40
#include "precompiled.hpp"
Pieter Hintjens's avatar
Pieter Hintjens committed
41
#include <stddef.h>
42
#include "poller.hpp"
43
#include "proxy.hpp"
Pieter Hintjens's avatar
Pieter Hintjens committed
44 45
#include "likely.hpp"

46
#if defined ZMQ_POLL_BASED_ON_POLL && !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_AIX
Pieter Hintjens's avatar
Pieter Hintjens committed
47 48 49
#include <poll.h>
#endif

50 51 52 53 54
// These headers end up pulling in zmq.h somewhere in their include
// dependency chain
#include "socket_base.hpp"
#include "err.hpp"

55
int capture(
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
        class zmq::socket_base_t *capture_,
        zmq::msg_t& msg_,
        int more_ = 0)
{
    //  Copy message to capture socket if any
    if (capture_) {
        zmq::msg_t ctrl;
        int rc = ctrl.init ();
        if (unlikely (rc < 0))
            return -1;
        rc = ctrl.copy (msg_);
        if (unlikely (rc < 0))
            return -1;
        rc = capture_->send (&ctrl, more_? ZMQ_SNDMORE: 0);
        if (unlikely (rc < 0))
            return -1;
    }
    return 0;
}

76
int forward(
77 78 79
        class zmq::socket_base_t *from_,
        class zmq::socket_base_t *to_,
        class zmq::socket_base_t *capture_,
80
        zmq::msg_t& msg_)
81 82 83
{
    int more;
    size_t moresz;
84
    while (true) {
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
        int rc = from_->recv (&msg_, 0);
        if (unlikely (rc < 0))
            return -1;

        moresz = sizeof more;
        rc = from_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
        if (unlikely (rc < 0))
            return -1;

        //  Copy message to capture socket if any
        rc = capture(capture_, msg_, more);
        if (unlikely (rc < 0))
            return -1;

        rc = to_->send (&msg_, more? ZMQ_SNDMORE: 0);
        if (unlikely (rc < 0))
            return -1;
        if (more == 0)
            break;
    }
    return 0;
}
Pieter Hintjens's avatar
Pieter Hintjens committed
107

108
int zmq::proxy (
109 110 111
    class socket_base_t *frontend_,
    class socket_base_t *backend_,
    class socket_base_t *capture_,
112
    class socket_base_t *control_)
Pieter Hintjens's avatar
Pieter Hintjens committed
113 114 115
{
    msg_t msg;
    int rc = msg.init ();
Pieter Hintjens's avatar
Pieter Hintjens committed
116
    if (rc != 0)
Pieter Hintjens's avatar
Pieter Hintjens committed
117 118
        return -1;

119
    //  The algorithm below assumes ratio of requests and replies processed
Pieter Hintjens's avatar
Pieter Hintjens committed
120
    //  under full load to be 1:1.
121

Pieter Hintjens's avatar
Pieter Hintjens committed
122
    int more;
Pieter Hintjens's avatar
Pieter Hintjens committed
123
    size_t moresz;
124
    zmq_pollitem_t items [] = {
125 126
        { frontend_, 0, ZMQ_POLLIN, 0 },
        { backend_, 0, ZMQ_POLLIN, 0 },
127 128 129
        { control_, 0, ZMQ_POLLIN, 0 }
    };
    int qt_poll_items = (control_ ? 3 : 2);
130 131 132 133
    zmq_pollitem_t itemsout [] = {
        { frontend_, 0, ZMQ_POLLOUT, 0 },
        { backend_, 0, ZMQ_POLLOUT, 0 }
    };
Pieter Hintjens's avatar
Pieter Hintjens committed
134 135 136 137 138 139 140 141 142

    //  Proxy can be in these three states
    enum {
        active,
        paused,
        terminated
    } state = active;

    while (state != terminated) {
Pieter Hintjens's avatar
Pieter Hintjens committed
143
        //  Wait while there are either requests or replies to process.
144
        rc = zmq_poll (&items [0], qt_poll_items, -1);
Pieter Hintjens's avatar
Pieter Hintjens committed
145
        if (unlikely (rc < 0))
Pieter Hintjens's avatar
Pieter Hintjens committed
146 147
            return -1;

148
        //  Get the pollout separately because when combining this with pollin it maxes the CPU
149 150 151 152 153 154 155 156
        //  because pollout shall most of the time return directly.
        //  POLLOUT is only checked when frontend and backend sockets are not the same.
        if (frontend_ != backend_) {
            rc = zmq_poll (&itemsout [0], 2, 0);
            if (unlikely (rc < 0)) {
                return -1;
            }
        }
157

158
        //  Process a control command if any
159
        if (control_ && items [2].revents & ZMQ_POLLIN) {
Pieter Hintjens's avatar
Pieter Hintjens committed
160 161 162 163 164 165 166 167 168 169
            rc = control_->recv (&msg, 0);
            if (unlikely (rc < 0))
                return -1;

            moresz = sizeof more;
            rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
            if (unlikely (rc < 0) || more)
                return -1;

            //  Copy message to capture socket if any
170 171 172 173
            rc = capture(capture_, msg);
            if (unlikely (rc < 0))
                return -1;

174
            if (msg.size () == 5 && memcmp (msg.data (), "PAUSE", 5) == 0)
Pieter Hintjens's avatar
Pieter Hintjens committed
175 176
                state = paused;
            else
177
            if (msg.size () == 6 && memcmp (msg.data (), "RESUME", 6) == 0)
Pieter Hintjens's avatar
Pieter Hintjens committed
178 179
                state = active;
            else
180
            if (msg.size () == 9 && memcmp (msg.data (), "TERMINATE", 9) == 0)
Pieter Hintjens's avatar
Pieter Hintjens committed
181 182 183 184
                state = terminated;
            else {
                //  This is an API error, we should assert
                puts ("E: invalid command sent to proxy");
185
                zmq_assert (false);
Pieter Hintjens's avatar
Pieter Hintjens committed
186
            }
187
        }
188 189
        //  Process a request
        if (state == active
190
        &&  items [0].revents & ZMQ_POLLIN
191
        &&  (frontend_ == backend_ || itemsout [1].revents & ZMQ_POLLOUT)) {
192
            rc = forward(frontend_, backend_, capture_,msg);
193 194 195 196 197
            if (unlikely (rc < 0))
                return -1;
        }
        //  Process a reply
        if (state == active
198
        &&  frontend_ != backend_
199
        &&  items [1].revents & ZMQ_POLLIN
200
        &&  itemsout [0].revents & ZMQ_POLLOUT) {
201
            rc = forward(backend_, frontend_, capture_,msg);
202 203
            if (unlikely (rc < 0))
                return -1;
Pieter Hintjens's avatar
Pieter Hintjens committed
204 205 206 207
        }
    }
    return 0;
}