test_router_mandatory_hwm.cpp 3.85 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
/*
    Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include <stdio.h>
#include "testutil.hpp"
#include <unistd.h>

24 25 26 27 28 29
// DEBUG shouldn't be defined in sources as it will cause a redefined symbol
// error when it is defined in the build configuration. It appears that the
// intent here is to semi-permanently disable DEBUG tracing statements, so the
// implementation is changed to accomodate that intent.
//#define DEBUG 0
#define TRACE_ENABLED 0
30 31 32 33

int main (void)
{
    int rc;
34
    if (TRACE_ENABLED) fprintf(stderr, "Staring router mandatory HWM test ...\n");
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
    setup_test_environment();
    void *ctx = zmq_ctx_new ();
    assert (ctx);
    void *router = zmq_socket (ctx, ZMQ_ROUTER);
    assert (router);

    // Configure router socket to mandatory routing and set HWM and linger
    int mandatory = 1;
    rc = zmq_setsockopt (router, ZMQ_ROUTER_MANDATORY, &mandatory, sizeof (mandatory));
    assert (rc == 0);
    int sndhwm = 1;
    rc = zmq_setsockopt (router, ZMQ_SNDHWM, &sndhwm, sizeof (sndhwm));
    assert (rc == 0);
    int linger = 1;
    rc = zmq_setsockopt (router, ZMQ_LINGER, &linger, sizeof (linger));
    assert (rc == 0);

    rc = zmq_bind (router, "tcp://127.0.0.1:5560");
    assert (rc == 0);

    //  Create dealer called "X" and connect it to our router, configure HWM
    void *dealer = zmq_socket (ctx, ZMQ_DEALER);
    assert (dealer);
    rc = zmq_setsockopt (dealer, ZMQ_IDENTITY, "X", 1);
    assert (rc == 0);
    int rcvhwm = 1;
    rc = zmq_setsockopt (dealer, ZMQ_RCVHWM, &rcvhwm, sizeof (rcvhwm));
    assert (rc == 0);

    rc = zmq_connect (dealer, "tcp://127.0.0.1:5560");
    assert (rc == 0);

    //  Get message from dealer to know when connection is ready
    char buffer [255];
    rc = zmq_send (dealer, "Hello", 5, 0);
    assert (rc == 5);
    rc = zmq_recv (router, buffer, 255, 0);
    assert (rc == 1);
    assert (buffer [0] ==  'X');

    int i;
    const int BUF_SIZE = 65536;
    char buf[BUF_SIZE];
    // Send first batch of messages
    for(i = 0; i < 100000; ++i) {
80 81 82 83 84 85
        if (TRACE_ENABLED) fprintf(stderr, "Sending message %d ...\n", i);
        rc = zmq_send (router, "X", 1, ZMQ_DONTWAIT | ZMQ_SNDMORE);
        if (rc == -1 && zmq_errno() == EAGAIN) break;
        assert (rc == 1);
        rc = zmq_send (router, buf, BUF_SIZE, ZMQ_DONTWAIT);
        assert (rc == BUF_SIZE);
86 87 88 89 90 91 92
    }
    // This should fail after one message but kernel buffering could
    // skew results
    assert (i < 10);
    sleep(1);
    // Send second batch of messages
    for(; i < 100000; ++i) {
93 94 95 96 97 98
        if (TRACE_ENABLED) fprintf(stderr, "Sending message %d (part 2) ...\n", i);
        rc = zmq_send (router, "X", 1, ZMQ_DONTWAIT | ZMQ_SNDMORE);
        if (rc == -1 && zmq_errno() == EAGAIN) break;
        assert (rc == 1);
        rc = zmq_send (router, buf, BUF_SIZE, ZMQ_DONTWAIT);
        assert (rc == BUF_SIZE);
99 100 101 102 103
    }
    // This should fail after two messages but kernel buffering could
    // skew results
    assert (i < 20);

104
    if (TRACE_ENABLED) fprintf(stderr, "Done sending messages.\n");
105 106 107 108 109 110 111 112 113 114 115 116

    rc = zmq_close (router);
    assert (rc == 0);

    rc = zmq_close (dealer);
    assert (rc == 0);

    rc = zmq_ctx_term (ctx);
    assert (rc == 0);

    return 0 ;
}