test_proxy.cpp 17.6 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29 30 31

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "testutil.hpp"

32 33 34 35 36 37 38
// Asynchronous client-to-server (DEALER to ROUTER) - pure libzmq
//
// While this example runs in a single process, that is to make
// it easier to start and stop the example. Each task may have its own
// context and conceptually acts as a separate process. To have this
// behaviour, it is necessary to replace the inproc transport of the
// control socket by a tcp transport.
39

40 41 42 43
// This is our client task
// It connects to the server, and then sends a request once per second
// It collects responses as they arrive, and it prints them out. We will
// run several client tasks in parallel, each with a different random ID.
44

45
#define CONTENT_SIZE 13
46
#define CONTENT_SIZE_MAX 32
47 48
#define ROUTING_ID_SIZE 10
#define ROUTING_ID_SIZE_MAX 32
49 50
#define QT_WORKERS 5
#define QT_CLIENTS 3
51 52
#define is_verbose 0

53 54
struct thread_data
{
55
    void *ctx;
56 57 58
    int id;
};

59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
typedef struct
{
    uint64_t msg_in;
    uint64_t bytes_in;
    uint64_t msg_out;
    uint64_t bytes_out;
} zmq_socket_stats_t;

typedef struct
{
    zmq_socket_stats_t frontend;
    zmq_socket_stats_t backend;
} zmq_proxy_stats_t;

void *g_clients_pkts_out = NULL;
void *g_workers_pkts_out = NULL;


77
static void client_task (void *db_)
78
{
79
    struct thread_data *databag = (struct thread_data *) db_;
80
    // Endpoint socket gets random port to avoid test failing when port in use
81 82
    void *endpoint = zmq_socket (databag->ctx, ZMQ_PAIR);
    assert (endpoint);
83
    int linger = 0;
84 85
    int rc = zmq_setsockopt (endpoint, ZMQ_LINGER, &linger, sizeof (linger));
    assert (rc == 0);
86
    char endpoint_source[256];
87
    sprintf (endpoint_source, "inproc://endpoint%d", databag->id);
88 89
    rc = zmq_connect (endpoint, endpoint_source);
    assert (rc == 0);
90
    char *my_endpoint = s_recv (endpoint);
91
    assert (my_endpoint);
92

93 94
    void *client = zmq_socket (databag->ctx, ZMQ_DEALER);
    assert (client);
95

96
    // Control socket receives terminate command from main over inproc
97 98 99 100 101 102 103 104
    void *control = zmq_socket (databag->ctx, ZMQ_SUB);
    assert (control);
    rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0);
    assert (rc == 0);
    rc = zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger));
    assert (rc == 0);
    rc = zmq_connect (control, "inproc://control");
    assert (rc == 0);
105

106
    char content[CONTENT_SIZE_MAX] = {};
107
    // Set random routing id to make tracing easier
108
    char routing_id[ROUTING_ID_SIZE] = {};
109
    sprintf (routing_id, "%04X-%04X", rand () % 0xFFFF, rand () % 0xFFFF);
110 111 112 113
    rc =
      zmq_setsockopt (client, ZMQ_ROUTING_ID, routing_id,
                      ROUTING_ID_SIZE); // includes '\0' as an helper for printf
    assert (rc == 0);
114
    linger = 0;
115 116 117 118
    rc = zmq_setsockopt (client, ZMQ_LINGER, &linger, sizeof (linger));
    assert (rc == 0);
    rc = zmq_connect (client, my_endpoint);
    assert (rc == 0);
119

120 121
    zmq_pollitem_t items[] = {{client, 0, ZMQ_POLLIN, 0},
                              {control, 0, ZMQ_POLLIN, 0}};
122 123
    int request_nbr = 0;
    bool run = true;
124
    bool keep_sending = true;
125
    while (run) {
126 127 128
        // Tick once per 200 ms, pulling in arriving messages
        int centitick;
        for (centitick = 0; centitick < 20; centitick++) {
129
            zmq_poll (items, 2, 10);
130
            if (items[0].revents & ZMQ_POLLIN) {
131 132
                int rcvmore;
                size_t sz = sizeof (rcvmore);
133 134
                rc = zmq_recv (client, content, CONTENT_SIZE_MAX, 0);
                assert (rc == CONTENT_SIZE);
135 136 137 138
                if (is_verbose)
                    printf (
                      "client receive - routing_id = %s    content = %s\n",
                      routing_id, content);
139
                //  Check that message is still the same
140 141 142 143
                assert (memcmp (content, "request #", 9) == 0);
                rc = zmq_getsockopt (client, ZMQ_RCVMORE, &rcvmore, &sz);
                assert (rc == 0);
                assert (!rcvmore);
144
            }
145
            if (items[1].revents & ZMQ_POLLIN) {
146
                rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0);
147

148 149 150 151 152 153
                if (rc > 0) {
                    content[rc] = 0; // NULL-terminate the command string
                    if (is_verbose)
                        printf (
                          "client receive - routing_id = %s    command = %s\n",
                          routing_id, content);
154 155 156 157 158 159 160 161
                    if (memcmp (content, "TERMINATE", 9) == 0) {
                        run = false;
                        break;
                    }
                    if (memcmp (content, "STOP", 4) == 0) {
                        keep_sending = false;
                        break;
                    }
162 163 164
                }
            }
        }
165

166 167 168 169 170 171
        if (keep_sending) {
            sprintf (content, "request #%03d", ++request_nbr); // CONTENT_SIZE
            if (is_verbose)
                printf ("client send - routing_id = %s    request #%03d\n",
                        routing_id, request_nbr);
            zmq_atomic_counter_inc (g_clients_pkts_out);
172

173 174
            rc = zmq_send (client, content, CONTENT_SIZE, 0);
            assert (rc == CONTENT_SIZE);
175
        }
176
    }
177

178 179 180 181 182 183
    rc = zmq_close (client);
    assert (rc == 0);
    rc = zmq_close (control);
    assert (rc == 0);
    rc = zmq_close (endpoint);
    assert (rc == 0);
184
    free (my_endpoint);
185 186 187 188 189 190 191 192
}

// This is our server task.
// It uses the multithreaded server model to deal requests out to a pool
// of workers and route replies back to clients. One worker can handle
// one request at a time but one client can talk to multiple workers at
// once.

193
static void server_worker (void *ctx_);
194

195
void server_task (void *ctx_)
196 197
{
    // Frontend socket talks to clients over TCP
198
    size_t len = MAX_SOCKET_STRING;
199
    char my_endpoint[MAX_SOCKET_STRING];
200 201
    void *frontend = zmq_socket (ctx_, ZMQ_ROUTER);
    assert (frontend);
202
    int linger = 0;
203 204 205 206 207 208
    int rc = zmq_setsockopt (frontend, ZMQ_LINGER, &linger, sizeof (linger));
    assert (rc == 0);
    rc = zmq_bind (frontend, "tcp://127.0.0.1:*");
    assert (rc == 0);
    rc = zmq_getsockopt (frontend, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
    assert (rc == 0);
209

210
    // Backend socket talks to workers over inproc
211 212 213 214 215 216
    void *backend = zmq_socket (ctx_, ZMQ_DEALER);
    assert (backend);
    rc = zmq_setsockopt (backend, ZMQ_LINGER, &linger, sizeof (linger));
    assert (rc == 0);
    rc = zmq_bind (backend, "inproc://backend");
    assert (rc == 0);
217

218
    // Control socket receives terminate command from main over inproc
219 220 221 222 223 224
    void *control = zmq_socket (ctx_, ZMQ_REP);
    assert (control);
    rc = zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger));
    assert (rc == 0);
    rc = zmq_connect (control, "inproc://control_proxy");
    assert (rc == 0);
225

226 227
    // Launch pool of worker threads, precise number is not critical
    int thread_nbr;
228
    void *threads[5];
229
    for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++)
230
        threads[thread_nbr] = zmq_threadstart (&server_worker, ctx_);
231

232
    // Endpoint socket sends random port to avoid test failing when port in use
233 234
    void *endpoint_receivers[QT_CLIENTS];
    char endpoint_source[256];
235
    for (int i = 0; i < QT_CLIENTS; ++i) {
236 237 238 239 240
        endpoint_receivers[i] = zmq_socket (ctx_, ZMQ_PAIR);
        assert (endpoint_receivers[i]);
        rc = zmq_setsockopt (endpoint_receivers[i], ZMQ_LINGER, &linger,
                             sizeof (linger));
        assert (rc == 0);
241
        sprintf (endpoint_source, "inproc://endpoint%d", i);
242 243
        rc = zmq_bind (endpoint_receivers[i], endpoint_source);
        assert (rc == 0);
244 245 246
    }

    for (int i = 0; i < QT_CLIENTS; ++i) {
247 248
        rc = s_send (endpoint_receivers[i], my_endpoint);
        assert (rc > 0);
249 250
    }

251
    // Connect backend to frontend via a proxy
252 253
    rc = zmq_proxy_steerable (frontend, backend, NULL, control);
    assert (rc == 0);
254

255 256
    for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++)
        zmq_threadclose (threads[thread_nbr]);
257

258 259 260 261 262 263
    rc = zmq_close (frontend);
    assert (rc == 0);
    rc = zmq_close (backend);
    assert (rc == 0);
    rc = zmq_close (control);
    assert (rc == 0);
264
    for (int i = 0; i < QT_CLIENTS; ++i) {
265 266
        rc = zmq_close (endpoint_receivers[i]);
        assert (rc == 0);
267
    }
268 269 270 271 272 273
}

// Each worker task works on one request at a time and sends a random number
// of replies back, with random delays between replies:
// The comments in the first column, if suppressed, makes it a poller version

274
static void server_worker (void *ctx_)
275
{
276 277
    void *worker = zmq_socket (ctx_, ZMQ_DEALER);
    assert (worker);
278
    int linger = 0;
279 280 281 282
    int rc = zmq_setsockopt (worker, ZMQ_LINGER, &linger, sizeof (linger));
    assert (rc == 0);
    rc = zmq_connect (worker, "inproc://backend");
    assert (rc == 0);
283

284
    // Control socket receives terminate command from main over inproc
285 286 287 288 289 290 291 292
    void *control = zmq_socket (ctx_, ZMQ_SUB);
    assert (control);
    rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0);
    assert (rc == 0);
    rc = zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger));
    assert (rc == 0);
    rc = zmq_connect (control, "inproc://control");
    assert (rc == 0);
293

294 295 296 297
    char content[CONTENT_SIZE_MAX] =
      {}; // bigger than what we need to check that
    char routing_id[ROUTING_ID_SIZE_MAX] =
      {}; // the size received is the size sent
298 299

    bool run = true;
300
    bool keep_sending = true;
301
    while (run) {
302 303
        rc = zmq_recv (control, content, CONTENT_SIZE_MAX,
                       ZMQ_DONTWAIT); // usually, rc == -1 (no message)
304
        if (rc > 0) {
305
            content[rc] = 0; // NULL-terminate the command string
306
            if (is_verbose)
307
                printf ("server_worker receives command = %s\n", content);
308
            if (memcmp (content, "TERMINATE", 9) == 0)
309
                run = false;
310 311
            if (memcmp (content, "STOP", 4) == 0)
                keep_sending = false;
312 313 314
        }
        // The DEALER socket gives us the reply envelope and message
        // if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0
315 316
        rc = zmq_recv (worker, routing_id, ROUTING_ID_SIZE_MAX, ZMQ_DONTWAIT);
        if (rc == ROUTING_ID_SIZE) {
317 318
            rc = zmq_recv (worker, content, CONTENT_SIZE_MAX, 0);
            assert (rc == CONTENT_SIZE);
319
            if (is_verbose)
320 321
                printf ("server receive - routing_id = %s    content = %s\n",
                        routing_id, content);
322 323

            // Send 0..4 replies back
324
            if (keep_sending) {
325 326
                int reply, replies = rand () % 5;
                for (reply = 0; reply < replies; reply++) {
327 328 329 330
                    // Sleep for some fraction of a second
                    msleep (rand () % 10 + 1);

                    //  Send message from server to client
331 332 333 334
                    if (is_verbose)
                        printf ("server send - routing_id = %s    reply\n",
                                routing_id);
                    zmq_atomic_counter_inc (g_workers_pkts_out);
335

336 337 338 339 340
                    rc = zmq_send (worker, routing_id, ROUTING_ID_SIZE,
                                   ZMQ_SNDMORE);
                    assert (rc == ROUTING_ID_SIZE);
                    rc = zmq_send (worker, content, CONTENT_SIZE, 0);
                    assert (rc == CONTENT_SIZE);
341
                }
342 343 344
            }
        }
    }
345 346 347 348
    rc = zmq_close (worker);
    assert (rc == 0);
    rc = zmq_close (control);
    assert (rc == 0);
349 350
}

351
uint64_t recv_stat (void *sock_, bool last_)
352 353 354 355
{
    uint64_t res;
    zmq_msg_t stats_msg;

356 357 358 359
    int rc = zmq_msg_init (&stats_msg);
    assert (rc == 0);
    rc = zmq_recvmsg (sock_, &stats_msg, 0);
    assert (rc == sizeof (uint64_t));
360
    memcpy (&res, zmq_msg_data (&stats_msg), zmq_msg_size (&stats_msg));
361 362
    rc = zmq_msg_close (&stats_msg);
    assert (rc == 0);
363 364 365

    int more;
    size_t moresz = sizeof more;
366 367 368
    rc = zmq_getsockopt (sock_, ZMQ_RCVMORE, &more, &moresz);
    assert (rc == 0);
    assert ((last_ && !more) || (!last_ && more));
369 370 371 372

    return res;
}

373 374
// Utility function to interrogate the proxy:

375
void check_proxy_stats (void *control_proxy_)
376 377
{
    zmq_proxy_stats_t total_stats;
378
    int rc;
379

380 381
    rc = zmq_send (control_proxy_, "STATISTICS", 10, 0);
    assert (rc == 10);
382 383

    // first frame of the reply contains FRONTEND stats:
384 385 386 387
    total_stats.frontend.msg_in = recv_stat (control_proxy_, false);
    total_stats.frontend.bytes_in = recv_stat (control_proxy_, false);
    total_stats.frontend.msg_out = recv_stat (control_proxy_, false);
    total_stats.frontend.bytes_out = recv_stat (control_proxy_, false);
388 389

    // second frame of the reply contains BACKEND stats:
390 391 392 393
    total_stats.backend.msg_in = recv_stat (control_proxy_, false);
    total_stats.backend.bytes_in = recv_stat (control_proxy_, false);
    total_stats.backend.msg_out = recv_stat (control_proxy_, false);
    total_stats.backend.bytes_out = recv_stat (control_proxy_, true);
394 395 396

    // check stats

397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414
    if (is_verbose) {
        printf (
          "frontend: pkts_in=%lu bytes_in=%lu  pkts_out=%lu bytes_out=%lu\n",
          (unsigned long int) total_stats.frontend.msg_in,
          (unsigned long int) total_stats.frontend.bytes_in,
          (unsigned long int) total_stats.frontend.msg_out,
          (unsigned long int) total_stats.frontend.bytes_out);
        printf (
          "backend: pkts_in=%lu bytes_in=%lu  pkts_out=%lu bytes_out=%lu\n",
          (unsigned long int) total_stats.backend.msg_in,
          (unsigned long int) total_stats.backend.bytes_in,
          (unsigned long int) total_stats.backend.msg_out,
          (unsigned long int) total_stats.backend.bytes_out);

        printf ("clients sent out %d requests\n",
                zmq_atomic_counter_value (g_clients_pkts_out));
        printf ("workers sent out %d replies\n",
                zmq_atomic_counter_value (g_workers_pkts_out));
415
    }
416 417 418 419 420 421 422 423
    assert (total_stats.frontend.msg_in
            == (unsigned) zmq_atomic_counter_value (g_clients_pkts_out));
    assert (total_stats.frontend.msg_out
            == (unsigned) zmq_atomic_counter_value (g_workers_pkts_out));
    assert (total_stats.backend.msg_in
            == (unsigned) zmq_atomic_counter_value (g_workers_pkts_out));
    assert (total_stats.backend.msg_out
            == (unsigned) zmq_atomic_counter_value (g_clients_pkts_out));
424 425
}

426 427 428 429 430

// The main thread simply starts several clients and a server, and then
// waits for the server to finish.

int main (void)
431
{
432 433 434 435 436
    setup_test_environment ();

    void *ctx = zmq_ctx_new ();
    assert (ctx);

437 438 439
    g_clients_pkts_out = zmq_atomic_counter_new ();
    g_workers_pkts_out = zmq_atomic_counter_new ();

440

441
    // Control socket receives terminate command from main over inproc
442 443
    void *control = zmq_socket (ctx, ZMQ_PUB);
    assert (control);
444
    int linger = 0;
445 446 447 448
    int rc = zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger));
    assert (rc == 0);
    rc = zmq_bind (control, "inproc://control");
    assert (rc == 0);
449

450
    // Control socket receives terminate command from main over inproc
451 452 453 454 455 456
    void *control_proxy = zmq_socket (ctx, ZMQ_REQ);
    assert (control_proxy);
    rc = zmq_setsockopt (control_proxy, ZMQ_LINGER, &linger, sizeof (linger));
    assert (rc == 0);
    rc = zmq_bind (control_proxy, "inproc://control_proxy");
    assert (rc == 0);
457

458 459
    void *threads[QT_CLIENTS + 1];
    struct thread_data databags[QT_CLIENTS + 1];
460
    for (int i = 0; i < QT_CLIENTS; i++) {
461
        databags[i].ctx = ctx;
462 463
        databags[i].id = i;
        threads[i] = zmq_threadstart (&client_task, &databags[i]);
464
    }
465
    threads[QT_CLIENTS] = zmq_threadstart (&server_task, ctx);
466
    msleep (500); // Run for 500 ms then quit
467

468

469 470
    if (is_verbose)
        printf ("stopping all clients and server workers\n");
471 472
    rc = zmq_send (control, "STOP", 4, 0);
    assert (rc == 4);
473

474
    msleep (500); // Wait for all clients and workers to STOP
475

476

477 478
    if (is_verbose)
        printf ("retrieving stats from the proxy\n");
479
    check_proxy_stats (control_proxy);
480 481 482

    if (is_verbose)
        printf ("shutting down all clients and server workers\n");
483 484
    rc = zmq_send (control, "TERMINATE", 9, 0);
    assert (rc == 9);
485

486 487
    if (is_verbose)
        printf ("shutting down the proxy\n");
488 489
    rc = zmq_send (control_proxy, "TERMINATE", 9, 0);
    assert (rc == 9);
490

491 492 493 494 495

    rc = zmq_close (control);
    assert (rc == 0);
    rc = zmq_close (control_proxy);
    assert (rc == 0);
496

497 498 499
    for (int i = 0; i < QT_CLIENTS + 1; i++)
        zmq_threadclose (threads[i]);

500 501 502
    rc = zmq_ctx_term (ctx);
    assert (rc == 0);
    return 0;
503
}