test_spec_pushpull.cpp 16.7 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2018 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29 30

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "testutil.hpp"
31 32
#include "testutil_unity.hpp"

33 34 35
#include <stdlib.h>
#include <string.h>

36
SETUP_TEARDOWN_TESTCONTEXT
37

38
char connect_address[MAX_SOCKET_STRING];
39

40 41 42
// PUSH: SHALL route outgoing messages to connected peers using a
// round-robin strategy.
void test_push_round_robin_out (const char *bind_address_)
43
{
44
    void *push = test_context_socket (ZMQ_PUSH);
45

46
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (push, bind_address_));
47
    size_t len = MAX_SOCKET_STRING;
48 49
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_getsockopt (push, ZMQ_LAST_ENDPOINT, connect_address, &len));
50

51
    const size_t services = 5;
52
    void *pulls[services];
53
    for (size_t peer = 0; peer < services; ++peer) {
54
        pulls[peer] = test_context_socket (ZMQ_PULL);
55

56
        int timeout = 250;
57 58 59
        TEST_ASSERT_SUCCESS_ERRNO (
          zmq_setsockopt (pulls[peer], ZMQ_RCVTIMEO, &timeout, sizeof (int)));
        TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pulls[peer], connect_address));
60 61
    }

62
    // Wait for connections.
63
    msleep (SETTLE_TIME);
64

65
    // Send 2N messages
66
    for (size_t peer = 0; peer < services; ++peer)
67
        s_send_seq (push, "ABC", SEQ_END);
68
    for (size_t peer = 0; peer < services; ++peer)
69 70 71
        s_send_seq (push, "DEF", SEQ_END);

    // Expect every PULL got one of each
72
    for (size_t peer = 0; peer < services; ++peer) {
73 74
        s_recv_seq (pulls[peer], "ABC", SEQ_END);
        s_recv_seq (pulls[peer], "DEF", SEQ_END);
75 76
    }

77
    test_context_socket_close_zero_linger (push);
78

79
    for (size_t peer = 0; peer < services; ++peer)
80
        test_context_socket_close_zero_linger (pulls[peer]);
81 82
}

83 84 85
// PULL: SHALL receive incoming messages from its peers using a fair-queuing
// strategy.
void test_pull_fair_queue_in (const char *bind_address_)
86
{
87
    void *pull = test_context_socket (ZMQ_PULL);
88

89
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pull, bind_address_));
90
    size_t len = MAX_SOCKET_STRING;
91 92
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_getsockopt (pull, ZMQ_LAST_ENDPOINT, connect_address, &len));
93

94
    const unsigned char services = 5;
95
    void *pushs[services];
96
    for (unsigned char peer = 0; peer < services; ++peer) {
97
        pushs[peer] = test_context_socket (ZMQ_PUSH);
98

99
        TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pushs[peer], connect_address));
100 101
    }

102
    // Wait for connections.
103
    msleep (SETTLE_TIME);
104 105 106 107

    int first_half = 0;
    int second_half = 0;

108
    // Send 2N messages
109
    for (unsigned char peer = 0; peer < services; ++peer) {
110
        char *str = strdup ("A");
111

112 113 114
        str[0] += peer;
        s_send_seq (pushs[peer], str, SEQ_END);
        first_half += str[0];
115

116 117 118
        str[0] += services;
        s_send_seq (pushs[peer], str, SEQ_END);
        second_half += str[0];
119

120 121 122
        free (str);
    }

123
    // Wait for data.
124
    msleep (SETTLE_TIME);
125 126

    zmq_msg_t msg;
127
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
128 129

    // Expect to pull one from each first
130
    for (size_t peer = 0; peer < services; ++peer) {
131 132
        TEST_ASSERT_EQUAL_INT (
          2, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, pull, 0)));
133 134
        const char *str = (const char *) zmq_msg_data (&msg);
        first_half -= str[0];
135
    }
136
    TEST_ASSERT_EQUAL_INT (0, first_half);
137

138
    // And then get the second batch
139
    for (size_t peer = 0; peer < services; ++peer) {
140 141
        TEST_ASSERT_EQUAL_INT (
          2, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, pull, 0)));
142 143
        const char *str = (const char *) zmq_msg_data (&msg);
        second_half -= str[0];
144
    }
145
    TEST_ASSERT_EQUAL_INT (0, second_half);
146

147
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
148

149
    test_context_socket_close_zero_linger (pull);
150

151
    for (size_t peer = 0; peer < services; ++peer)
152
        test_context_socket_close_zero_linger (pushs[peer]);
153 154
}

155 156 157
// PUSH: SHALL block on sending, or return a suitable error, when it has no
// available peers.
void test_push_block_on_send_no_peers (const char *bind_address_)
158
{
159
    void *sc = test_context_socket (ZMQ_PUSH);
160

161
    int timeout = 250;
162 163
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (sc, ZMQ_SNDTIMEO, &timeout, sizeof (timeout)));
164

165 166
    TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_send (sc, 0, 0, ZMQ_DONTWAIT));
    TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_send (sc, 0, 0, 0));
167

168
    test_context_socket_close (sc);
169 170
}

171 172 173 174
// PUSH and PULL: SHALL create this queue when a peer connects to it. If
// this peer disconnects, the socket SHALL destroy its queue and SHALL
// discard any messages it contains.
void test_destroy_queue_on_disconnect (const char *bind_address_)
175
{
176
    void *a = test_context_socket (ZMQ_PUSH);
177 178

    int hwm = 1;
179 180
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (a, ZMQ_SNDHWM, &hwm, sizeof (hwm)));
181

182
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (a, bind_address_));
183
    size_t len = MAX_SOCKET_STRING;
184 185
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_getsockopt (a, ZMQ_LAST_ENDPOINT, connect_address, &len));
186

187
    void *b = test_context_socket (ZMQ_PULL);
188

189 190
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (b, ZMQ_RCVHWM, &hwm, sizeof (hwm)));
191

192
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (b, connect_address));
193 194 195

    // Send two messages, one should be stuck in A's outgoing queue, the other
    // arrives at B.
196 197
    s_send_seq (a, "ABC", SEQ_END);
    s_send_seq (a, "DEF", SEQ_END);
198 199

    // Both queues should now be full, indicated by A blocking on send.
200
    TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_send (a, 0, 0, ZMQ_DONTWAIT));
201

202
    TEST_ASSERT_SUCCESS_ERRNO (zmq_disconnect (b, connect_address));
203 204

    // Disconnect may take time and need command processing.
205
    zmq_pollitem_t poller[2] = {{a, 0, 0, 0}, {b, 0, 0, 0}};
206 207 208 209
    TEST_ASSERT_EQUAL_INT (
      0, TEST_ASSERT_SUCCESS_ERRNO (zmq_poll (poller, 2, 100)));
    TEST_ASSERT_EQUAL_INT (
      0, TEST_ASSERT_SUCCESS_ERRNO (zmq_poll (poller, 2, 100)));
210 211

    zmq_msg_t msg;
212
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
213 214

    // Can't receive old data on B.
215
    TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_msg_recv (&msg, b, ZMQ_DONTWAIT));
216 217

    // Sending fails.
218
    TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_send (a, 0, 0, ZMQ_DONTWAIT));
219 220

    // Reconnect B
221
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (b, connect_address));
222 223

    // Still can't receive old data on B.
224
    TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_msg_recv (&msg, b, ZMQ_DONTWAIT));
225 226

    // two messages should be sendable before the queues are filled up.
227 228
    s_send_seq (a, "ABC", SEQ_END);
    s_send_seq (a, "DEF", SEQ_END);
229

230
    TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_send (a, 0, 0, ZMQ_DONTWAIT));
231

232
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
233

234 235
    test_context_socket_close_zero_linger (a);
    test_context_socket_close_zero_linger (b);
236 237
}

238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
// PUSH and PULL: SHALL either receive or drop multipart messages atomically.
void test_push_multipart_atomic_drop (const char *bind_address_,
                                      const bool block)
{
    int linger = 0;
    int hwm = 1;

    void *push = test_context_socket (ZMQ_PUSH);
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (push, ZMQ_LINGER, &linger, sizeof (linger)));
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (push, ZMQ_SNDHWM, &hwm, sizeof (hwm)));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (push, bind_address_));
    size_t addr_len = MAX_SOCKET_STRING;
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_getsockopt (push, ZMQ_LAST_ENDPOINT, connect_address, &addr_len));

    void *pull = test_context_socket (ZMQ_PULL);
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (pull, ZMQ_LINGER, &linger, sizeof (linger)));
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (pull, ZMQ_RCVHWM, &hwm, sizeof (hwm)));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pull, connect_address));

    // Wait for connections.
    msleep (SETTLE_TIME);

    int rc;
    zmq_msg_t msg_data;
    // A large message is needed to overrun the TCP buffers
    const size_t len = 16 * 1024 * 1024;
    size_t zmq_events_size = sizeof (int);
    int zmq_events;

    // Normal case - excercise the queues
    send_string_expect_success (push, "0", ZMQ_SNDMORE);
    send_string_expect_success (push, "0", ZMQ_SNDMORE);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg_data, len));
    memset (zmq_msg_data (&msg_data), 'a', len);
    TEST_ASSERT_EQUAL_INT (len, zmq_msg_send (&msg_data, push, 0));

    recv_string_expect_success (pull, "0", 0);
    recv_string_expect_success (pull, "0", 0);
    zmq_msg_init (&msg_data);
    TEST_ASSERT_EQUAL_INT (len, zmq_msg_recv (&msg_data, pull, 0));
    zmq_msg_close (&msg_data);

    // Fill the HWMs of sender and receiver, one message each
    send_string_expect_success (push, "1", 0);

    send_string_expect_success (push, "2", ZMQ_SNDMORE);
    send_string_expect_success (push, "2", ZMQ_SNDMORE);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg_data, len));
    memset (zmq_msg_data (&msg_data), 'b', len);
    TEST_ASSERT_EQUAL_INT (len, zmq_msg_send (&msg_data, push, 0));

    // Disconnect and simulate a poll (doesn't work on Windows) to
    // let the commands run and let the pipes start to be deallocated
    TEST_ASSERT_SUCCESS_ERRNO (zmq_disconnect (pull, connect_address));

    zmq_getsockopt (push, ZMQ_EVENTS, &zmq_events, &zmq_events_size);
    zmq_getsockopt (pull, ZMQ_EVENTS, &zmq_events, &zmq_events_size);
    msleep (SETTLE_TIME);
    zmq_getsockopt (push, ZMQ_EVENTS, &zmq_events, &zmq_events_size);
    zmq_getsockopt (pull, ZMQ_EVENTS, &zmq_events, &zmq_events_size);

    // Reconnect and immediately push a large message into the pipe,
    // if the problem is reproduced the pipe is in the process of being
    // terminated but still exists (state term_ack_sent) and had already
    // accepted the frame, so with the first frames already gone and
    // unreachable only the last is left, and is stuck in the lb.
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pull, connect_address));

    send_string_expect_success (push, "3", ZMQ_SNDMORE);
    send_string_expect_success (push, "3", ZMQ_SNDMORE);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg_data, len));
    memset (zmq_msg_data (&msg_data), 'c', len);
    if (block) {
        TEST_ASSERT_EQUAL_INT (len,
                               zmq_msg_send (&msg_data, push, ZMQ_SNDMORE));
    } else {
        rc = zmq_msg_send (&msg_data, push, ZMQ_SNDMORE | ZMQ_DONTWAIT);
        // inproc won't fail, much faster to connect/disconnect pipes than TCP
        if (rc == -1) {
            // at this point the new pipe is there and it works
            send_string_expect_success (push, "3", ZMQ_SNDMORE);
            send_string_expect_success (push, "3", ZMQ_SNDMORE);
            TEST_ASSERT_EQUAL_INT (len,
                                   zmq_msg_send (&msg_data, push, ZMQ_SNDMORE));
        }
    }
    send_string_expect_success (push, "3b", 0);

    zmq_getsockopt (push, ZMQ_EVENTS, &zmq_events, &zmq_events_size);
    zmq_getsockopt (pull, ZMQ_EVENTS, &zmq_events, &zmq_events_size);
    msleep (SETTLE_TIME);
    zmq_getsockopt (push, ZMQ_EVENTS, &zmq_events, &zmq_events_size);
    zmq_getsockopt (pull, ZMQ_EVENTS, &zmq_events, &zmq_events_size);

    send_string_expect_success (push, "5", ZMQ_SNDMORE);
    send_string_expect_success (push, "5", ZMQ_SNDMORE);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg_data, len));
    memset (zmq_msg_data (&msg_data), 'd', len);
    TEST_ASSERT_EQUAL_INT (len, zmq_msg_send (&msg_data, push, 0));

    // On very slow machines the message will not be lost, as it will
    // be sent when the new pipe is already in place, so avoid failing
    // and simply carry on as it would be very noisy otherwise.
    // Receive both to avoid leaking metadata.
    // If only the "5" message is received, the problem is reproduced, and
    // without the fix the first message received would be the last large
    // frame of "3".
    char buffer[2];
    rc =
      TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (pull, buffer, sizeof (buffer), 0));
    TEST_ASSERT_EQUAL_INT (1, rc);
    TEST_ASSERT_TRUE (buffer[0] == '3' || buffer[0] == '5');
    if (buffer[0] == '3') {
        recv_string_expect_success (pull, "3", 0);
        zmq_msg_init (&msg_data);
        TEST_ASSERT_EQUAL_INT (len, zmq_msg_recv (&msg_data, pull, 0));
        zmq_msg_close (&msg_data);
        recv_string_expect_success (pull, "3b", 0);
        recv_string_expect_success (pull, "5", 0);
    }
    recv_string_expect_success (pull, "5", 0);
    zmq_msg_init (&msg_data);
    TEST_ASSERT_EQUAL_INT (len, zmq_msg_recv (&msg_data, pull, 0));
    zmq_msg_close (&msg_data);

    test_context_socket_close_zero_linger (pull);
    test_context_socket_close_zero_linger (push);
}

372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387
#define def_test_spec_pushpull(name, bind_address_)                            \
    void test_spec_pushpull_##name##_push_round_robin_out ()                   \
    {                                                                          \
        test_push_round_robin_out (bind_address_);                             \
    }                                                                          \
    void test_spec_pushpull_##name##_pull_fair_queue_in ()                     \
    {                                                                          \
        test_pull_fair_queue_in (bind_address_);                               \
    }                                                                          \
    void test_spec_pushpull_##name##_push_block_on_send_no_peers ()            \
    {                                                                          \
        test_push_block_on_send_no_peers (bind_address_);                      \
    }                                                                          \
    void test_spec_pushpull_##name##_destroy_queue_on_disconnect ()            \
    {                                                                          \
        test_destroy_queue_on_disconnect (bind_address_);                      \
388 389 390 391 392 393 394 395
    }                                                                          \
    void test_spec_pushpull_##name##_push_multipart_atomic_drop_block ()       \
    {                                                                          \
        test_push_multipart_atomic_drop (bind_address_, true);                 \
    }                                                                          \
    void test_spec_pushpull_##name##_push_multipart_atomic_drop_non_block ()   \
    {                                                                          \
        test_push_multipart_atomic_drop (bind_address_, false);                \
396
    }
397

398
def_test_spec_pushpull (inproc, "inproc://a")
399

400
  def_test_spec_pushpull (tcp, "tcp://127.0.0.1:*")
401

402 403 404
    int main ()
{
    setup_test_environment ();
405

406 407 408 409 410 411 412 413 414 415
    UNITY_BEGIN ();
    RUN_TEST (test_spec_pushpull_inproc_push_round_robin_out);
    RUN_TEST (test_spec_pushpull_tcp_push_round_robin_out);
    RUN_TEST (test_spec_pushpull_inproc_pull_fair_queue_in);
    RUN_TEST (test_spec_pushpull_tcp_pull_fair_queue_in);
    RUN_TEST (test_spec_pushpull_inproc_push_block_on_send_no_peers);
    RUN_TEST (test_spec_pushpull_tcp_push_block_on_send_no_peers);
    // TODO Tests disabled until libzmq does this properly
    //RUN_TEST (test_spec_pushpull_inproc_destroy_queue_on_disconnect);
    //RUN_TEST (test_spec_pushpull_tcp_destroy_queue_on_disconnect);
416 417 418 419
    RUN_TEST (test_spec_pushpull_inproc_push_multipart_atomic_drop_block);
    RUN_TEST (test_spec_pushpull_inproc_push_multipart_atomic_drop_non_block);
    RUN_TEST (test_spec_pushpull_tcp_push_multipart_atomic_drop_block);
    RUN_TEST (test_spec_pushpull_tcp_push_multipart_atomic_drop_non_block);
420
    return UNITY_END ();
421
}