proxy.cpp 6.42 KB
Newer Older
Pieter Hintjens's avatar
Pieter Hintjens committed
1
/*
2
    Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
Pieter Hintjens's avatar
Pieter Hintjens committed
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
Pieter Hintjens's avatar
Pieter Hintjens committed
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
Pieter Hintjens's avatar
Pieter Hintjens committed
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
Pieter Hintjens's avatar
Pieter Hintjens committed
25 26 27 28 29 30

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include <stddef.h>
31
#include "poller.hpp"
32
#include "proxy.hpp"
Pieter Hintjens's avatar
Pieter Hintjens committed
33 34
#include "likely.hpp"

Pieter Hintjens's avatar
Pieter Hintjens committed
35 36 37 38 39 40 41 42
//  On AIX platform, poll.h has to be included first to get consistent
//  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
//  instead of 'events' and 'revents' and defines macros to map from POSIX-y
//  names to AIX-specific names).
#if defined ZMQ_POLL_BASED_ON_POLL
#include <poll.h>
#endif

43 44 45 46 47
// These headers end up pulling in zmq.h somewhere in their include
// dependency chain
#include "socket_base.hpp"
#include "err.hpp"

Pieter Hintjens's avatar
Pieter Hintjens committed
48 49 50
// zmq.h must be included *after* poll.h for AIX to build properly
#include "../include/zmq.h"

51
int capture(
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
        class zmq::socket_base_t *capture_,
        zmq::msg_t& msg_,
        int more_ = 0)
{
    //  Copy message to capture socket if any
    if (capture_) {
        zmq::msg_t ctrl;
        int rc = ctrl.init ();
        if (unlikely (rc < 0))
            return -1;
        rc = ctrl.copy (msg_);
        if (unlikely (rc < 0))
            return -1;
        rc = capture_->send (&ctrl, more_? ZMQ_SNDMORE: 0);
        if (unlikely (rc < 0))
            return -1;
    }
    return 0;
}

72
int forward(
73 74 75
        class zmq::socket_base_t *from_,
        class zmq::socket_base_t *to_,
        class zmq::socket_base_t *capture_,
76
        zmq::msg_t& msg_)
77 78 79
{
    int more;
    size_t moresz;
80
    while (true) {
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
        int rc = from_->recv (&msg_, 0);
        if (unlikely (rc < 0))
            return -1;

        moresz = sizeof more;
        rc = from_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
        if (unlikely (rc < 0))
            return -1;

        //  Copy message to capture socket if any
        rc = capture(capture_, msg_, more);
        if (unlikely (rc < 0))
            return -1;

        rc = to_->send (&msg_, more? ZMQ_SNDMORE: 0);
        if (unlikely (rc < 0))
            return -1;
        if (more == 0)
            break;
    }
    return 0;
}
Pieter Hintjens's avatar
Pieter Hintjens committed
103

104
int zmq::proxy (
105 106 107
    class socket_base_t *frontend_,
    class socket_base_t *backend_,
    class socket_base_t *capture_,
108
    class socket_base_t *control_)
Pieter Hintjens's avatar
Pieter Hintjens committed
109 110 111
{
    msg_t msg;
    int rc = msg.init ();
Pieter Hintjens's avatar
Pieter Hintjens committed
112
    if (rc != 0)
Pieter Hintjens's avatar
Pieter Hintjens committed
113 114
        return -1;

115
    //  The algorithm below assumes ratio of requests and replies processed
Pieter Hintjens's avatar
Pieter Hintjens committed
116
    //  under full load to be 1:1.
117

Pieter Hintjens's avatar
Pieter Hintjens committed
118
    int more;
Pieter Hintjens's avatar
Pieter Hintjens committed
119
    size_t moresz;
120
    zmq_pollitem_t items [] = {
121 122
        { frontend_, 0, ZMQ_POLLIN, 0 },
        { backend_, 0, ZMQ_POLLIN, 0 },
123 124 125
        { control_, 0, ZMQ_POLLIN, 0 }
    };
    int qt_poll_items = (control_ ? 3 : 2);
126 127 128 129
    zmq_pollitem_t itemsout [] = {
        { frontend_, 0, ZMQ_POLLOUT, 0 },
        { backend_, 0, ZMQ_POLLOUT, 0 }
    };
Pieter Hintjens's avatar
Pieter Hintjens committed
130 131 132 133 134 135 136 137 138

    //  Proxy can be in these three states
    enum {
        active,
        paused,
        terminated
    } state = active;

    while (state != terminated) {
Pieter Hintjens's avatar
Pieter Hintjens committed
139
        //  Wait while there are either requests or replies to process.
140
        rc = zmq_poll (&items [0], qt_poll_items, -1);
Pieter Hintjens's avatar
Pieter Hintjens committed
141
        if (unlikely (rc < 0))
Pieter Hintjens's avatar
Pieter Hintjens committed
142 143
            return -1;

144
        //  Get the pollout separately because when combining this with pollin it maxes the CPU
145 146 147 148 149 150 151 152
        //  because pollout shall most of the time return directly.
        //  POLLOUT is only checked when frontend and backend sockets are not the same.
        if (frontend_ != backend_) {
            rc = zmq_poll (&itemsout [0], 2, 0);
            if (unlikely (rc < 0)) {
                return -1;
            }
        }
153

154
        //  Process a control command if any
155
        if (control_ && items [2].revents & ZMQ_POLLIN) {
Pieter Hintjens's avatar
Pieter Hintjens committed
156 157 158 159 160 161 162 163 164 165
            rc = control_->recv (&msg, 0);
            if (unlikely (rc < 0))
                return -1;

            moresz = sizeof more;
            rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
            if (unlikely (rc < 0) || more)
                return -1;

            //  Copy message to capture socket if any
166 167 168 169
            rc = capture(capture_, msg);
            if (unlikely (rc < 0))
                return -1;

170
            if (msg.size () == 5 && memcmp (msg.data (), "PAUSE", 5) == 0)
Pieter Hintjens's avatar
Pieter Hintjens committed
171 172
                state = paused;
            else
173
            if (msg.size () == 6 && memcmp (msg.data (), "RESUME", 6) == 0)
Pieter Hintjens's avatar
Pieter Hintjens committed
174 175
                state = active;
            else
176
            if (msg.size () == 9 && memcmp (msg.data (), "TERMINATE", 9) == 0)
Pieter Hintjens's avatar
Pieter Hintjens committed
177 178 179 180
                state = terminated;
            else {
                //  This is an API error, we should assert
                puts ("E: invalid command sent to proxy");
181
                zmq_assert (false);
Pieter Hintjens's avatar
Pieter Hintjens committed
182
            }
183
        }
184 185
        //  Process a request
        if (state == active
186
        &&  items [0].revents & ZMQ_POLLIN
187
        &&  (frontend_ == backend_ || itemsout [1].revents & ZMQ_POLLOUT)) {
188
            rc = forward(frontend_, backend_, capture_,msg);
189 190 191 192 193
            if (unlikely (rc < 0))
                return -1;
        }
        //  Process a reply
        if (state == active
194
        &&  frontend_ != backend_
195
        &&  items [1].revents & ZMQ_POLLIN
196
        &&  itemsout [0].revents & ZMQ_POLLOUT) {
197
            rc = forward(backend_, frontend_, capture_,msg);
198 199
            if (unlikely (rc < 0))
                return -1;
Pieter Hintjens's avatar
Pieter Hintjens committed
200 201 202 203
        }
    }
    return 0;
}