test_proxy.cpp 17.6 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "testutil.hpp"

// Asynchronous client-to-server (DEALER to ROUTER) - pure libzmq
//
// While this example runs in a single process, that is to make
// it easier to start and stop the example. Each task may have its own
// context and conceptually acts as a separate process. To have this
// behaviour, it is necessary to replace the inproc transport of the
// control socket by a tcp transport.

// This is our client task
// It connects to the server, and then sends a request once per second
// It collects responses as they arrive, and it prints them out. We will
// run several client tasks in parallel, each with a different random ID.

#define CONTENT_SIZE 13
#define CONTENT_SIZE_MAX 32
47 48
#define ROUTING_ID_SIZE 10
#define ROUTING_ID_SIZE_MAX 32
49 50
#define QT_WORKERS 5
#define QT_CLIENTS 3
51 52
#define is_verbose 0

53 54
struct thread_data
{
55 56 57 58
    void *ctx;
    int id;
};

59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
typedef struct
{
    uint64_t msg_in;
    uint64_t bytes_in;
    uint64_t msg_out;
    uint64_t bytes_out;
} zmq_socket_stats_t;

typedef struct
{
    zmq_socket_stats_t frontend;
    zmq_socket_stats_t backend;
} zmq_proxy_stats_t;

void *g_clients_pkts_out = NULL;
void *g_workers_pkts_out = NULL;


77
static void client_task (void *db_)
78
{
79
    struct thread_data *databag = (struct thread_data *) db_;
80 81 82 83 84 85
    // Endpoint socket gets random port to avoid test failing when port in use
    void *endpoint = zmq_socket (databag->ctx, ZMQ_PAIR);
    assert (endpoint);
    int linger = 0;
    int rc = zmq_setsockopt (endpoint, ZMQ_LINGER, &linger, sizeof (linger));
    assert (rc == 0);
86
    char endpoint_source[256];
87 88 89 90 91 92 93
    sprintf (endpoint_source, "inproc://endpoint%d", databag->id);
    rc = zmq_connect (endpoint, endpoint_source);
    assert (rc == 0);
    char *my_endpoint = s_recv (endpoint);
    assert (my_endpoint);

    void *client = zmq_socket (databag->ctx, ZMQ_DEALER);
94 95
    assert (client);

96
    // Control socket receives terminate command from main over inproc
97
    void *control = zmq_socket (databag->ctx, ZMQ_SUB);
98
    assert (control);
99
    rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0);
100
    assert (rc == 0);
101 102
    rc = zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger));
    assert (rc == 0);
103 104 105
    rc = zmq_connect (control, "inproc://control");
    assert (rc == 0);

106
    char content[CONTENT_SIZE_MAX];
107
    // Set random routing id to make tracing easier
108 109 110 111 112
    char routing_id[ROUTING_ID_SIZE];
    sprintf (routing_id, "%04X-%04X", rand () % 0xFFFF, rand () % 0xFFFF);
    rc =
      zmq_setsockopt (client, ZMQ_ROUTING_ID, routing_id,
                      ROUTING_ID_SIZE); // includes '\0' as an helper for printf
113
    assert (rc == 0);
114 115 116
    linger = 0;
    rc = zmq_setsockopt (client, ZMQ_LINGER, &linger, sizeof (linger));
    assert (rc == 0);
117
    rc = zmq_connect (client, my_endpoint);
118 119
    assert (rc == 0);

120 121
    zmq_pollitem_t items[] = {{client, 0, ZMQ_POLLIN, 0},
                              {control, 0, ZMQ_POLLIN, 0}};
122 123
    int request_nbr = 0;
    bool run = true;
124
    bool keep_sending = true;
125 126 127 128 129
    while (run) {
        // Tick once per 200 ms, pulling in arriving messages
        int centitick;
        for (centitick = 0; centitick < 20; centitick++) {
            zmq_poll (items, 2, 10);
130
            if (items[0].revents & ZMQ_POLLIN) {
131 132 133 134
                int rcvmore;
                size_t sz = sizeof (rcvmore);
                rc = zmq_recv (client, content, CONTENT_SIZE_MAX, 0);
                assert (rc == CONTENT_SIZE);
135 136 137 138
                if (is_verbose)
                    printf (
                      "client receive - routing_id = %s    content = %s\n",
                      routing_id, content);
139 140 141 142 143 144
                //  Check that message is still the same
                assert (memcmp (content, "request #", 9) == 0);
                rc = zmq_getsockopt (client, ZMQ_RCVMORE, &rcvmore, &sz);
                assert (rc == 0);
                assert (!rcvmore);
            }
145
            if (items[1].revents & ZMQ_POLLIN) {
146
                rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0);
147

148 149 150 151 152 153
                if (rc > 0) {
                    content[rc] = 0; // NULL-terminate the command string
                    if (is_verbose)
                        printf (
                          "client receive - routing_id = %s    command = %s\n",
                          routing_id, content);
154 155 156 157 158 159 160 161
                    if (memcmp (content, "TERMINATE", 9) == 0) {
                        run = false;
                        break;
                    }
                    if (memcmp (content, "STOP", 4) == 0) {
                        keep_sending = false;
                        break;
                    }
162 163 164
                }
            }
        }
165

166 167 168 169 170 171
        if (keep_sending) {
            sprintf (content, "request #%03d", ++request_nbr); // CONTENT_SIZE
            if (is_verbose)
                printf ("client send - routing_id = %s    request #%03d\n",
                        routing_id, request_nbr);
            zmq_atomic_counter_inc (g_clients_pkts_out);
172 173 174 175

            rc = zmq_send (client, content, CONTENT_SIZE, 0);
            assert (rc == CONTENT_SIZE);
        }
176
    }
177 178 179 180 181

    rc = zmq_close (client);
    assert (rc == 0);
    rc = zmq_close (control);
    assert (rc == 0);
182 183 184
    rc = zmq_close (endpoint);
    assert (rc == 0);
    free (my_endpoint);
185 186 187 188 189 190 191 192
}

// This is our server task.
// It uses the multithreaded server model to deal requests out to a pool
// of workers and route replies back to clients. One worker can handle
// one request at a time but one client can talk to multiple workers at
// once.

193
static void server_worker (void *ctx_);
194

195
void server_task (void *ctx_)
196 197
{
    // Frontend socket talks to clients over TCP
198 199
    size_t len = MAX_SOCKET_STRING;
    char my_endpoint[MAX_SOCKET_STRING];
200
    void *frontend = zmq_socket (ctx_, ZMQ_ROUTER);
201
    assert (frontend);
202 203 204
    int linger = 0;
    int rc = zmq_setsockopt (frontend, ZMQ_LINGER, &linger, sizeof (linger));
    assert (rc == 0);
205 206 207
    rc = zmq_bind (frontend, "tcp://127.0.0.1:*");
    assert (rc == 0);
    rc = zmq_getsockopt (frontend, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
208 209
    assert (rc == 0);

210
    // Backend socket talks to workers over inproc
211
    void *backend = zmq_socket (ctx_, ZMQ_DEALER);
212
    assert (backend);
213 214
    rc = zmq_setsockopt (backend, ZMQ_LINGER, &linger, sizeof (linger));
    assert (rc == 0);
215 216 217
    rc = zmq_bind (backend, "inproc://backend");
    assert (rc == 0);

218
    // Control socket receives terminate command from main over inproc
219
    void *control = zmq_socket (ctx_, ZMQ_REP);
220
    assert (control);
221 222
    rc = zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger));
    assert (rc == 0);
223
    rc = zmq_connect (control, "inproc://control_proxy");
224 225
    assert (rc == 0);

226 227
    // Launch pool of worker threads, precise number is not critical
    int thread_nbr;
228
    void *threads[5];
229
    for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++)
230
        threads[thread_nbr] = zmq_threadstart (&server_worker, ctx_);
231

232
    // Endpoint socket sends random port to avoid test failing when port in use
233 234
    void *endpoint_receivers[QT_CLIENTS];
    char endpoint_source[256];
235
    for (int i = 0; i < QT_CLIENTS; ++i) {
236
        endpoint_receivers[i] = zmq_socket (ctx_, ZMQ_PAIR);
237 238 239
        assert (endpoint_receivers[i]);
        rc = zmq_setsockopt (endpoint_receivers[i], ZMQ_LINGER, &linger,
                             sizeof (linger));
240 241
        assert (rc == 0);
        sprintf (endpoint_source, "inproc://endpoint%d", i);
242
        rc = zmq_bind (endpoint_receivers[i], endpoint_source);
243 244 245 246
        assert (rc == 0);
    }

    for (int i = 0; i < QT_CLIENTS; ++i) {
247
        rc = s_send (endpoint_receivers[i], my_endpoint);
248 249 250
        assert (rc > 0);
    }

251
    // Connect backend to frontend via a proxy
252 253
    rc = zmq_proxy_steerable (frontend, backend, NULL, control);
    assert (rc == 0);
254

255 256
    for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++)
        zmq_threadclose (threads[thread_nbr]);
257 258 259 260 261 262 263

    rc = zmq_close (frontend);
    assert (rc == 0);
    rc = zmq_close (backend);
    assert (rc == 0);
    rc = zmq_close (control);
    assert (rc == 0);
264
    for (int i = 0; i < QT_CLIENTS; ++i) {
265
        rc = zmq_close (endpoint_receivers[i]);
266 267
        assert (rc == 0);
    }
268 269 270 271 272 273
}

// Each worker task works on one request at a time and sends a random number
// of replies back, with random delays between replies:
// The comments in the first column, if suppressed, makes it a poller version

274
static void server_worker (void *ctx_)
275
{
276
    void *worker = zmq_socket (ctx_, ZMQ_DEALER);
277
    assert (worker);
278 279 280 281
    int linger = 0;
    int rc = zmq_setsockopt (worker, ZMQ_LINGER, &linger, sizeof (linger));
    assert (rc == 0);
    rc = zmq_connect (worker, "inproc://backend");
282 283
    assert (rc == 0);

284
    // Control socket receives terminate command from main over inproc
285
    void *control = zmq_socket (ctx_, ZMQ_SUB);
286 287 288
    assert (control);
    rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0);
    assert (rc == 0);
289 290
    rc = zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger));
    assert (rc == 0);
291 292 293
    rc = zmq_connect (control, "inproc://control");
    assert (rc == 0);

294 295
    char content[CONTENT_SIZE_MAX]; //    bigger than what we need to check that
    char routing_id[ROUTING_ID_SIZE_MAX]; // the size received is the size sent
296 297

    bool run = true;
298
    bool keep_sending = true;
299
    while (run) {
300 301
        rc = zmq_recv (control, content, CONTENT_SIZE_MAX,
                       ZMQ_DONTWAIT); // usually, rc == -1 (no message)
302
        if (rc > 0) {
303
            content[rc] = 0; // NULL-terminate the command string
304
            if (is_verbose)
305
                printf ("server_worker receives command = %s\n", content);
306
            if (memcmp (content, "TERMINATE", 9) == 0)
307
                run = false;
308 309
            if (memcmp (content, "STOP", 4) == 0)
                keep_sending = false;
310 311 312
        }
        // The DEALER socket gives us the reply envelope and message
        // if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0
313 314
        rc = zmq_recv (worker, routing_id, ROUTING_ID_SIZE_MAX, ZMQ_DONTWAIT);
        if (rc == ROUTING_ID_SIZE) {
315 316 317
            rc = zmq_recv (worker, content, CONTENT_SIZE_MAX, 0);
            assert (rc == CONTENT_SIZE);
            if (is_verbose)
318 319
                printf ("server receive - routing_id = %s    content = %s\n",
                        routing_id, content);
320 321

            // Send 0..4 replies back
322 323
            if (keep_sending) {
                int reply, replies = rand () % 5;
324 325 326 327 328
                for (reply = 0; reply < replies; reply++) {
                    // Sleep for some fraction of a second
                    msleep (rand () % 10 + 1);

                    //  Send message from server to client
329 330 331 332
                    if (is_verbose)
                        printf ("server send - routing_id = %s    reply\n",
                                routing_id);
                    zmq_atomic_counter_inc (g_workers_pkts_out);
333

334 335
                    rc = zmq_send (worker, routing_id, ROUTING_ID_SIZE,
                                   ZMQ_SNDMORE);
336
                    assert (rc == ROUTING_ID_SIZE);
337 338 339
                    rc = zmq_send (worker, content, CONTENT_SIZE, 0);
                    assert (rc == CONTENT_SIZE);
                }
340 341 342
            }
        }
    }
343 344 345 346 347 348
    rc = zmq_close (worker);
    assert (rc == 0);
    rc = zmq_close (control);
    assert (rc == 0);
}

349
uint64_t recv_stat (void *sock_, bool last_)
350 351 352 353 354 355
{
    uint64_t res;
    zmq_msg_t stats_msg;

    int rc = zmq_msg_init (&stats_msg);
    assert (rc == 0);
356
    rc = zmq_recvmsg (sock_, &stats_msg, 0);
357 358
    assert (rc == sizeof (uint64_t));
    memcpy (&res, zmq_msg_data (&stats_msg), zmq_msg_size (&stats_msg));
359 360 361 362 363
    rc = zmq_msg_close (&stats_msg);
    assert (rc == 0);

    int more;
    size_t moresz = sizeof more;
364
    rc = zmq_getsockopt (sock_, ZMQ_RCVMORE, &more, &moresz);
365
    assert (rc == 0);
366
    assert ((last_ && !more) || (!last_ && more));
367 368 369 370

    return res;
}

371 372
// Utility function to interrogate the proxy:

373
void check_proxy_stats (void *control_proxy_)
374 375 376 377
{
    zmq_proxy_stats_t total_stats;
    int rc;

378
    rc = zmq_send (control_proxy_, "STATISTICS", 10, 0);
379 380 381
    assert (rc == 10);

    // first frame of the reply contains FRONTEND stats:
382 383 384 385
    total_stats.frontend.msg_in = recv_stat (control_proxy_, false);
    total_stats.frontend.bytes_in = recv_stat (control_proxy_, false);
    total_stats.frontend.msg_out = recv_stat (control_proxy_, false);
    total_stats.frontend.bytes_out = recv_stat (control_proxy_, false);
386 387

    // second frame of the reply contains BACKEND stats:
388 389 390 391
    total_stats.backend.msg_in = recv_stat (control_proxy_, false);
    total_stats.backend.bytes_in = recv_stat (control_proxy_, false);
    total_stats.backend.msg_out = recv_stat (control_proxy_, false);
    total_stats.backend.bytes_out = recv_stat (control_proxy_, true);
392 393 394

    // check stats

395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412
    if (is_verbose) {
        printf (
          "frontend: pkts_in=%lu bytes_in=%lu  pkts_out=%lu bytes_out=%lu\n",
          (unsigned long int) total_stats.frontend.msg_in,
          (unsigned long int) total_stats.frontend.bytes_in,
          (unsigned long int) total_stats.frontend.msg_out,
          (unsigned long int) total_stats.frontend.bytes_out);
        printf (
          "backend: pkts_in=%lu bytes_in=%lu  pkts_out=%lu bytes_out=%lu\n",
          (unsigned long int) total_stats.backend.msg_in,
          (unsigned long int) total_stats.backend.bytes_in,
          (unsigned long int) total_stats.backend.msg_out,
          (unsigned long int) total_stats.backend.bytes_out);

        printf ("clients sent out %d requests\n",
                zmq_atomic_counter_value (g_clients_pkts_out));
        printf ("workers sent out %d replies\n",
                zmq_atomic_counter_value (g_workers_pkts_out));
413
    }
414 415 416 417 418 419 420 421
    assert (total_stats.frontend.msg_in
            == (unsigned) zmq_atomic_counter_value (g_clients_pkts_out));
    assert (total_stats.frontend.msg_out
            == (unsigned) zmq_atomic_counter_value (g_workers_pkts_out));
    assert (total_stats.backend.msg_in
            == (unsigned) zmq_atomic_counter_value (g_workers_pkts_out));
    assert (total_stats.backend.msg_out
            == (unsigned) zmq_atomic_counter_value (g_clients_pkts_out));
422 423 424
}


425 426 427
// The main thread simply starts several clients and a server, and then
// waits for the server to finish.

428
int main (void)
429 430 431 432 433
{
    setup_test_environment ();

    void *ctx = zmq_ctx_new ();
    assert (ctx);
434 435 436 437 438

    g_clients_pkts_out = zmq_atomic_counter_new ();
    g_workers_pkts_out = zmq_atomic_counter_new ();


439
    // Control socket receives terminate command from main over inproc
440 441
    void *control = zmq_socket (ctx, ZMQ_PUB);
    assert (control);
442 443 444 445
    int linger = 0;
    int rc = zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger));
    assert (rc == 0);
    rc = zmq_bind (control, "inproc://control");
446 447
    assert (rc == 0);

448 449 450 451 452 453 454 455
    // Control socket receives terminate command from main over inproc
    void *control_proxy = zmq_socket (ctx, ZMQ_REQ);
    assert (control_proxy);
    rc = zmq_setsockopt (control_proxy, ZMQ_LINGER, &linger, sizeof (linger));
    assert (rc == 0);
    rc = zmq_bind (control_proxy, "inproc://control_proxy");
    assert (rc == 0);

456 457
    void *threads[QT_CLIENTS + 1];
    struct thread_data databags[QT_CLIENTS + 1];
458
    for (int i = 0; i < QT_CLIENTS; i++) {
459 460 461
        databags[i].ctx = ctx;
        databags[i].id = i;
        threads[i] = zmq_threadstart (&client_task, &databags[i]);
462
    }
463
    threads[QT_CLIENTS] = zmq_threadstart (&server_task, ctx);
464
    msleep (500); // Run for 500 ms then quit
465

466 467 468 469 470 471

    if (is_verbose)
        printf ("stopping all clients and server workers\n");
    rc = zmq_send (control, "STOP", 4, 0);
    assert (rc == 4);

472
    msleep (500); // Wait for all clients and workers to STOP
473 474


475
#ifdef ZMQ_BUILD_DRAFT_API
476 477
    if (is_verbose)
        printf ("retrieving stats from the proxy\n");
478
    check_proxy_stats (control_proxy);
479
#endif
480 481 482

    if (is_verbose)
        printf ("shutting down all clients and server workers\n");
483
    rc = zmq_send (control, "TERMINATE", 9, 0);
484
    assert (rc == 9);
485

486 487 488 489 490 491
    if (is_verbose)
        printf ("shutting down the proxy\n");
    rc = zmq_send (control_proxy, "TERMINATE", 9, 0);
    assert (rc == 9);


492 493
    rc = zmq_close (control);
    assert (rc == 0);
494 495
    rc = zmq_close (control_proxy);
    assert (rc == 0);
496

497 498 499
    for (int i = 0; i < QT_CLIENTS + 1; i++)
        zmq_threadclose (threads[i]);

500 501
    rc = zmq_ctx_term (ctx);
    assert (rc == 0);
502
    return 0;
503
}