test_spec_pushpull.cpp 8.26 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29 30 31

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "testutil.hpp"

32 33 34
const char *bind_address = 0;
const char *connect_address = 0;

35 36 37 38 39
void test_push_round_robin_out (void *ctx)
{
    void *push = zmq_socket (ctx, ZMQ_PUSH);
    assert (push);

40
    int rc = zmq_bind (push, bind_address);
41 42
    assert (rc == 0);

43 44 45 46 47
    const size_t services = 5;
    void *pulls [services];
    for (size_t peer = 0; peer < services; ++peer) {
        pulls [peer] = zmq_socket (ctx, ZMQ_PULL);
        assert (pulls [peer]);
48

49
        int timeout = 250;
50
        rc = zmq_setsockopt (pulls [peer], ZMQ_RCVTIMEO, &timeout, sizeof (int));
51 52
        assert (rc == 0);

53
        rc = zmq_connect (pulls [peer], connect_address);
54 55 56
        assert (rc == 0);
    }

57
    // Wait for connections.
58
    msleep (SETTLE_TIME);
59

60
    // Send 2N messages
61
    for (size_t peer = 0; peer < services; ++peer)
62
        s_send_seq (push, "ABC", SEQ_END);
63
    for (size_t peer = 0; peer < services; ++peer)
64 65 66
        s_send_seq (push, "DEF", SEQ_END);

    // Expect every PULL got one of each
67 68 69
    for (size_t peer = 0; peer < services; ++peer) {
        s_recv_seq (pulls [peer], "ABC", SEQ_END);
        s_recv_seq (pulls [peer], "DEF", SEQ_END);
70 71
    }

72
    close_zero_linger (push);
73

74 75
    for (size_t peer = 0; peer < services; ++peer)
        close_zero_linger (pulls [peer]);
76 77

    // Wait for disconnects.
78
    msleep (SETTLE_TIME);
79 80 81 82 83 84 85
}

void test_pull_fair_queue_in (void *ctx)
{
    void *pull = zmq_socket (ctx, ZMQ_PULL);
    assert (pull);

86
    int rc = zmq_bind (pull, bind_address);
87 88
    assert (rc == 0);

89 90 91
    const size_t services = 5;
    void *pushs [services];
    for (size_t peer = 0; peer < services; ++peer)
92
    {
93 94
        pushs [peer] = zmq_socket (ctx, ZMQ_PUSH);
        assert (pushs [peer]);
95

96
        rc = zmq_connect (pushs [peer], connect_address);
97 98 99
        assert (rc == 0);
    }

100
    // Wait for connections.
101
    msleep (SETTLE_TIME);
102 103 104 105

    int first_half = 0;
    int second_half = 0;

106
    // Send 2N messages
107
    for (size_t peer = 0; peer < services; ++peer) {
108 109
        char *str = strdup("A");

110 111 112
        str [0] += peer;
        s_send_seq (pushs [peer], str, SEQ_END);
        first_half += str [0];
113

114 115 116
        str [0] += services;
        s_send_seq (pushs [peer], str, SEQ_END);
        second_half += str [0];
117

118 119 120
        free (str);
    }

121
    // Wait for data.
122
    msleep (SETTLE_TIME);
123 124 125 126 127 128

    zmq_msg_t msg;
    rc = zmq_msg_init (&msg);
    assert (rc == 0);

    // Expect to pull one from each first
129
    for (size_t peer = 0; peer < services; ++peer) {
130 131 132
        rc = zmq_msg_recv (&msg, pull, 0);
        assert (rc == 2);
        const char *str = (const char *)zmq_msg_data (&msg);
133
        first_half -= str [0];
134
    }
135
    assert (first_half == 0);
136

137
    // And then get the second batch
138
    for (size_t peer = 0; peer < services; ++peer) {
139 140 141
        rc = zmq_msg_recv (&msg, pull, 0);
        assert (rc == 2);
        const char *str = (const char *)zmq_msg_data (&msg);
142
        second_half -= str [0];
143 144 145 146
    }
    assert (second_half == 0);

    rc = zmq_msg_close (&msg);
147 148
    assert (rc == 0);

149 150
    close_zero_linger (pull);

151 152
    for (size_t peer = 0; peer < services; ++peer)
        close_zero_linger (pushs [peer]);
153 154

    // Wait for disconnects.
155
    msleep (SETTLE_TIME);
156 157 158 159 160 161 162
}

void test_push_block_on_send_no_peers (void *ctx)
{
    void *sc = zmq_socket (ctx, ZMQ_PUSH);
    assert (sc);

163
    int timeout = 250;
164
    int rc = zmq_setsockopt (sc, ZMQ_SNDTIMEO, &timeout, sizeof (timeout));
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
    assert (rc == 0);

    rc = zmq_send (sc, 0, 0, ZMQ_DONTWAIT);
    assert (rc == -1);
    assert (errno == EAGAIN);

    rc = zmq_send (sc, 0, 0, 0);
    assert (rc == -1);
    assert (errno == EAGAIN);

    rc = zmq_close (sc);
    assert (rc == 0);
}

void test_destroy_queue_on_disconnect (void *ctx)
{
    void *A = zmq_socket (ctx, ZMQ_PUSH);
    assert (A);

    int hwm = 1;
185
    int rc = zmq_setsockopt (A, ZMQ_SNDHWM, &hwm, sizeof (hwm));
186 187
    assert (rc == 0);

188
    rc = zmq_bind (A, bind_address);
189 190 191 192 193
    assert (rc == 0);

    void *B = zmq_socket (ctx, ZMQ_PULL);
    assert (B);

194
    rc = zmq_setsockopt (B, ZMQ_RCVHWM, &hwm, sizeof (hwm));
195 196
    assert (rc == 0);

197
    rc = zmq_connect (B, connect_address);
198 199 200 201 202 203 204 205 206 207 208 209
    assert (rc == 0);

    // Send two messages, one should be stuck in A's outgoing queue, the other
    // arrives at B.
    s_send_seq (A, "ABC", SEQ_END);
    s_send_seq (A, "DEF", SEQ_END);

    // Both queues should now be full, indicated by A blocking on send.
    rc = zmq_send (A, 0, 0, ZMQ_DONTWAIT);
    assert (rc == -1);
    assert (errno == EAGAIN);

210
    rc = zmq_disconnect (B, connect_address);
211 212 213
    assert (rc == 0);

    // Disconnect may take time and need command processing.
214
    zmq_pollitem_t poller [2] = { { A, 0, 0, 0 }, { B, 0, 0, 0 } };
215 216
    rc = zmq_poll (poller, 2, 100);
    assert (rc == 0);
217 218
    rc = zmq_poll (poller, 2, 100);
    assert (rc == 0);
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234

    zmq_msg_t msg;
    rc = zmq_msg_init (&msg);
    assert (rc == 0);

    // Can't receive old data on B.
    rc = zmq_msg_recv (&msg, B, ZMQ_DONTWAIT);
    assert (rc == -1);
    assert (errno == EAGAIN);

    // Sending fails.
    rc = zmq_send (A, 0, 0, ZMQ_DONTWAIT);
    assert (rc == -1);
    assert (errno == EAGAIN);

    // Reconnect B
235
    rc = zmq_connect (B, connect_address);
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
    assert (rc == 0);

    // Still can't receive old data on B.
    rc = zmq_msg_recv (&msg, B, ZMQ_DONTWAIT);
    assert (rc == -1);
    assert (errno == EAGAIN);

    // two messages should be sendable before the queues are filled up.
    s_send_seq (A, "ABC", SEQ_END);
    s_send_seq (A, "DEF", SEQ_END);

    rc = zmq_send (A, 0, 0, ZMQ_DONTWAIT);
    assert (rc == -1);
    assert (errno == EAGAIN);

    rc = zmq_msg_close (&msg);
    assert (rc == 0);

254 255
    close_zero_linger (A);
    close_zero_linger (B);
256

257
    // Wait for disconnects.
258
    msleep (SETTLE_TIME);
259 260
}

261
int main (void)
262
{
263
    setup_test_environment();
264 265 266
    void *ctx = zmq_ctx_new ();
    assert (ctx);

267
    const char *binds [] = { "inproc://a", "tcp://127.0.0.1:5555" };
268
    const char *connects [] = { "inproc://a", "tcp://localhost:5555" };
269

270 271 272
    for (int transport = 0; transport < 2; ++transport) {
        bind_address = binds [transport];
        connect_address = connects [transport];
273

274 275 276
        // PUSH: SHALL route outgoing messages to connected peers using a
        // round-robin strategy.
        test_push_round_robin_out (ctx);
277

278 279 280
        // PULL: SHALL receive incoming messages from its peers using a fair-queuing
        // strategy.
        test_pull_fair_queue_in (ctx);
281

282 283 284 285 286 287 288
        // PUSH: SHALL block on sending, or return a suitable error, when it has no
        // available peers.
        test_push_block_on_send_no_peers (ctx);

        // PUSH and PULL: SHALL create this queue when a peer connects to it. If
        // this peer disconnects, the socket SHALL destroy its queue and SHALL
        // discard any messages it contains.
289 290
        // *** Test disabled until libzmq does this properly ***
        // test_destroy_queue_on_disconnect (ctx);
291
    }
292 293 294 295 296 297

    int rc = zmq_ctx_term (ctx);
    assert (rc == 0);

    return 0 ;
}