test_proxy.cpp 8.81 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "testutil.hpp"
#include "../include/zmq_utils.h"

// Asynchronous client-to-server (DEALER to ROUTER) - pure libzmq
//
// While this example runs in a single process, that is to make
// it easier to start and stop the example. Each task may have its own
// context and conceptually acts as a separate process. To have this
// behaviour, it is necessary to replace the inproc transport of the
// control socket by a tcp transport.

// This is our client task
// It connects to the server, and then sends a request once per second
// It collects responses as they arrive, and it prints them out. We will
// run several client tasks in parallel, each with a different random ID.

#define CONTENT_SIZE 13
#define CONTENT_SIZE_MAX 32
#define ID_SIZE 10
#define ID_SIZE_MAX 32
40 41
#define QT_WORKERS    5
#define QT_CLIENTS    3
42 43 44 45 46 47 48 49
#define is_verbose 0

static void
client_task (void *ctx)
{
    void *client = zmq_socket (ctx, ZMQ_DEALER);
    assert (client);

50
    // Control socket receives terminate command from main over inproc
51 52 53 54 55 56 57 58
    void *control = zmq_socket (ctx, ZMQ_SUB);
    assert (control);
    int rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0);
    assert (rc == 0);
    rc = zmq_connect (control, "inproc://control");
    assert (rc == 0);

    char content [CONTENT_SIZE_MAX];
59 60 61
    // Set random identity to make tracing easier
    char identity [ID_SIZE];
    sprintf (identity, "%04X-%04X", rand() % 0xFFFF, rand() % 0xFFFF);
62 63
    rc = zmq_setsockopt (client, ZMQ_IDENTITY, identity, ID_SIZE); // includes '\0' as an helper for printf
    assert (rc == 0);
64
    rc = zmq_connect (client, "tcp://127.0.0.1:5563");
65 66
    assert (rc == 0);

67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
    zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 }, { control, 0, ZMQ_POLLIN, 0 } };
    int request_nbr = 0;
    bool run = true;
    while (run) {
        // Tick once per 200 ms, pulling in arriving messages
        int centitick;
        for (centitick = 0; centitick < 20; centitick++) {
            zmq_poll (items, 2, 10);
            if (items [0].revents & ZMQ_POLLIN) {
                int rcvmore;
                size_t sz = sizeof (rcvmore);
                rc = zmq_recv (client, content, CONTENT_SIZE_MAX, 0);
                assert (rc == CONTENT_SIZE);
                if (is_verbose) printf("client receive - identity = %s    content = %s\n", identity, content);
                //  Check that message is still the same
                assert (memcmp (content, "request #", 9) == 0);
                rc = zmq_getsockopt (client, ZMQ_RCVMORE, &rcvmore, &sz);
                assert (rc == 0);
                assert (!rcvmore);
            }
            if (items [1].revents & ZMQ_POLLIN) {
                rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0);
89 90 91 92
                if (is_verbose) printf("client receive - identity = %s    command = %s\n", identity, content);
                if (memcmp (content, "TERMINATE", 9) == 0) {
                    run = false;
                    break;
93 94 95 96 97 98 99
                }
            }
        }
        sprintf(content, "request #%03d", ++request_nbr); // CONTENT_SIZE
        rc = zmq_send (client, content, CONTENT_SIZE, 0);
        assert (rc == CONTENT_SIZE);
    }
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115

    rc = zmq_close (client);
    assert (rc == 0);
    rc = zmq_close (control);
    assert (rc == 0);
}

// This is our server task.
// It uses the multithreaded server model to deal requests out to a pool
// of workers and route replies back to clients. One worker can handle
// one request at a time but one client can talk to multiple workers at
// once.

static void server_worker (void *ctx);

void
116
server_task (void *ctx)
117 118 119 120
{
    // Frontend socket talks to clients over TCP
    void *frontend = zmq_socket (ctx, ZMQ_ROUTER);
    assert (frontend);
121
    int rc = zmq_bind (frontend, "tcp://127.0.0.1:5563");
122 123
    assert (rc == 0);

124
    // Backend socket talks to workers over inproc
125 126 127 128 129
    void *backend = zmq_socket (ctx, ZMQ_DEALER);
    assert (backend);
    rc = zmq_bind (backend, "inproc://backend");
    assert (rc == 0);

130
    // Control socket receives terminate command from main over inproc
131 132 133 134 135 136 137
    void *control = zmq_socket (ctx, ZMQ_SUB);
    assert (control);
    rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0);
    assert (rc == 0);
    rc = zmq_connect (control, "inproc://control");
    assert (rc == 0);

138 139 140 141 142
    // Launch pool of worker threads, precise number is not critical
    int thread_nbr;
    void* threads [5];
    for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++)
        threads[thread_nbr] = zmq_threadstart (&server_worker, ctx);
143

144
    // Connect backend to frontend via a proxy
145
    zmq_proxy_steerable (frontend, backend, NULL, control);
146

147 148
    for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++)
        zmq_threadclose (threads[thread_nbr]);
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169

    rc = zmq_close (frontend);
    assert (rc == 0);
    rc = zmq_close (backend);
    assert (rc == 0);
    rc = zmq_close (control);
    assert (rc == 0);
}

// Each worker task works on one request at a time and sends a random number
// of replies back, with random delays between replies:
// The comments in the first column, if suppressed, makes it a poller version

static void
server_worker (void *ctx)
{
    void *worker = zmq_socket (ctx, ZMQ_DEALER);
    assert (worker);
    int rc = zmq_connect (worker, "inproc://backend");
    assert (rc == 0);

170
    // Control socket receives terminate command from main over inproc
171 172 173 174 175 176 177
    void *control = zmq_socket (ctx, ZMQ_SUB);
    assert (control);
    rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0);
    assert (rc == 0);
    rc = zmq_connect (control, "inproc://control");
    assert (rc == 0);

178 179 180 181 182 183 184
    char content [CONTENT_SIZE_MAX]; //    bigger than what we need to check that
    char identity [ID_SIZE_MAX];      // the size received is the size sent

    bool run = true;
    while (run) {
        rc = zmq_recv (control, content, CONTENT_SIZE_MAX, ZMQ_DONTWAIT); // usually, rc == -1 (no message)
        if (rc > 0) {
185
            if (is_verbose)
186
                printf("server_worker receives command = %s\n", content);
187
            if (memcmp (content, "TERMINATE", 9) == 0)
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
                run = false;
        }
        // The DEALER socket gives us the reply envelope and message
        // if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0
        rc = zmq_recv (worker, identity, ID_SIZE_MAX, ZMQ_DONTWAIT);
        if (rc == ID_SIZE) {
            rc = zmq_recv (worker, content, CONTENT_SIZE_MAX, 0);
            assert (rc == CONTENT_SIZE);
            if (is_verbose)
                printf ("server receive - identity = %s    content = %s\n", identity, content);

            // Send 0..4 replies back
            int reply, replies = rand() % 5;
            for (reply = 0; reply < replies; reply++) {
                // Sleep for some fraction of a second
                msleep (rand () % 10 + 1);
                //  Send message from server to client
                rc = zmq_send (worker, identity, ID_SIZE, ZMQ_SNDMORE);
                assert (rc == ID_SIZE);
                rc = zmq_send (worker, content, CONTENT_SIZE, 0);
                assert (rc == CONTENT_SIZE);
            }
        }
    }
212 213 214 215 216 217 218 219 220
    rc = zmq_close (worker);
    assert (rc == 0);
    rc = zmq_close (control);
    assert (rc == 0);
}

// The main thread simply starts several clients and a server, and then
// waits for the server to finish.

221
int main (void)
222 223 224 225 226
{
    setup_test_environment ();

    void *ctx = zmq_ctx_new ();
    assert (ctx);
227
    // Control socket receives terminate command from main over inproc
228 229 230 231 232
    void *control = zmq_socket (ctx, ZMQ_PUB);
    assert (control);
    int rc = zmq_bind (control, "inproc://control");
    assert (rc == 0);

233 234 235
    void *threads [QT_CLIENTS + 1];
    for (int i = 0; i < QT_CLIENTS; i++)
        threads[i] = zmq_threadstart  (&client_task, ctx);
236 237
    threads[QT_CLIENTS] = zmq_threadstart  (&server_task, ctx);
    msleep (500); // Run for 500 ms then quit
238

239
    rc = zmq_send (control, "TERMINATE", 9, 0);
240
    assert (rc == 9);
241 242 243 244

    rc = zmq_close (control);
    assert (rc == 0);

245 246 247
    for (int i = 0; i < QT_CLIENTS + 1; i++)
        zmq_threadclose (threads[i]);

248 249
    rc = zmq_ctx_term (ctx);
    assert (rc == 0);
250
    return 0;
251
}