test_heartbeats.cpp 15.1 KB
Newer Older
Jonathan Reams's avatar
Jonathan Reams committed
1
/*
2
    Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
Jonathan Reams's avatar
Jonathan Reams committed
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "testutil.hpp"
21 22 23 24 25
#if defined(ZMQ_HAVE_WINDOWS)
#include <winsock2.h>
#include <ws2tcpip.h>
#include <stdexcept>
#define close closesocket
26
typedef SOCKET raw_socket;
Jonathan Reams's avatar
Jonathan Reams committed
27
#else
28
#include <arpa/inet.h>
29
typedef int raw_socket;
Jonathan Reams's avatar
Jonathan Reams committed
30 31
#endif

32 33 34 35 36 37 38 39
#include <limits.h>

// TODO remove this here, either ensure that UINT16_MAX is always properly
// defined or handle this at a more central location
#ifndef UINT16_MAX
#define UINT16_MAX 65535
#endif

40
#include "testutil_unity.hpp"
41 42 43

void setUp ()
{
44
    setup_test_context ();
45 46 47 48
}

void tearDown ()
{
49
    teardown_test_context ();
50 51 52
}


Jonathan Reams's avatar
Jonathan Reams committed
53 54 55 56
//  Read one event off the monitor socket; return value and address
//  by reference, if not null, and event number by value. Returns -1
//  in case of error.

57
static int get_monitor_event (void *monitor)
Jonathan Reams's avatar
Jonathan Reams committed
58
{
59
    for (int i = 0; i < 2; i++) {
Jonathan Reams's avatar
Jonathan Reams committed
60 61
        //  First frame in message contains event number and value
        zmq_msg_t msg;
62
        TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
Jonathan Reams's avatar
Jonathan Reams committed
63
        if (zmq_msg_recv (&msg, monitor, ZMQ_DONTWAIT) == -1) {
64
            msleep (SETTLE_TIME);
65
            continue; //  Interrupted, presumably
Jonathan Reams's avatar
Jonathan Reams committed
66
        }
67
        TEST_ASSERT_TRUE (zmq_msg_more (&msg));
Jonathan Reams's avatar
Jonathan Reams committed
68 69 70 71 72

        uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
        uint16_t event = *(uint16_t *) (data);

        //  Second frame in message contains event address
73
        TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
Jonathan Reams's avatar
Jonathan Reams committed
74
        if (zmq_msg_recv (&msg, monitor, 0) == -1) {
75
            return -1; //  Interrupted, presumably
Jonathan Reams's avatar
Jonathan Reams committed
76
        }
77
        TEST_ASSERT_FALSE (zmq_msg_more (&msg));
Jonathan Reams's avatar
Jonathan Reams committed
78 79 80 81 82 83

        return event;
    }
    return -1;
}

84 85 86
static void recv_with_retry (raw_socket fd, char *buffer, int bytes)
{
    int received = 0;
87
    while (true) {
88 89 90
        int rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (
          recv (fd, buffer + received, bytes - received, 0));
        TEST_ASSERT_GREATER_THAN_INT (0, rc);
91
        received += rc;
92
        TEST_ASSERT_LESS_OR_EQUAL_INT (bytes, received);
93 94
        if (received == bytes)
            break;
95 96 97
    }
}

98
static void mock_handshake (raw_socket fd, int mock_ping)
99 100 101 102 103 104
{
    const uint8_t zmtp_greeting[33] = {0xff, 0, 0, 0,   0,   0,   0,   0, 0,
                                       0x7f, 3, 0, 'N', 'U', 'L', 'L', 0};
    char buffer[128];
    memset (buffer, 0, sizeof (buffer));
    memcpy (buffer, zmtp_greeting, sizeof (zmtp_greeting));
Jonathan Reams's avatar
Jonathan Reams committed
105

106 107
    int rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (fd, buffer, 64, 0));
    TEST_ASSERT_EQUAL_INT (64, rc);
Jonathan Reams's avatar
Jonathan Reams committed
108

109
    recv_with_retry (fd, buffer, 64);
Jonathan Reams's avatar
Jonathan Reams committed
110

111 112 113 114
    const uint8_t zmtp_ready[43] = {
      4,   41,  5,   'R', 'E', 'A', 'D', 'Y', 11,  'S', 'o', 'c', 'k', 'e', 't',
      '-', 'T', 'y', 'p', 'e', 0,   0,   0,   6,   'D', 'E', 'A', 'L', 'E', 'R',
      8,   'I', 'd', 'e', 'n', 't', 'i', 't', 'y', 0,   0,   0,   0};
Jonathan Reams's avatar
Jonathan Reams committed
115

116 117
    memset (buffer, 0, sizeof (buffer));
    memcpy (buffer, zmtp_ready, 43);
118 119
    rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (fd, buffer, 43, 0));
    TEST_ASSERT_EQUAL_INT (43, rc);
Jonathan Reams's avatar
Jonathan Reams committed
120

121
    //  greeting
122
    recv_with_retry (fd, buffer, 43);
123 124 125 126 127 128 129 130 131

    if (mock_ping) {
        //  test PING context - should be replicated in the PONG
        //  to avoid timeouts, do a bulk send
        const uint8_t zmtp_ping[12] = {4,   10, 4, 'P', 'I', 'N',
                                       'G', 0,  0, 'L', 'O', 'L'};
        uint8_t zmtp_pong[10] = {4, 8, 4, 'P', 'O', 'N', 'G', 'L', 'O', 'L'};
        memset (buffer, 0, sizeof (buffer));
        memcpy (buffer, zmtp_ping, 12);
132 133
        rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (fd, buffer, 12, 0));
        TEST_ASSERT_EQUAL_INT (12, rc);
134 135 136 137 138 139

        //  test a larger body that won't fit in a small message and should get
        //  truncated
        memset (buffer, 'z', sizeof (buffer));
        memcpy (buffer, zmtp_ping, 12);
        buffer[1] = 65;
140 141
        rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (fd, buffer, 67, 0));
        TEST_ASSERT_EQUAL_INT (67, rc);
142 143 144

        //  small pong
        recv_with_retry (fd, buffer, 10);
145
        TEST_ASSERT_EQUAL_INT (0, memcmp (zmtp_pong, buffer, 10));
146 147 148 149
        //  large pong
        recv_with_retry (fd, buffer, 23);
        uint8_t zmtp_pooong[65] = {4, 21, 4, 'P', 'O', 'N', 'G', 'L', 'O', 'L'};
        memset (zmtp_pooong + 10, 'z', 55);
150
        TEST_ASSERT_EQUAL_INT (0, memcmp (zmtp_pooong, buffer, 23));
151
    }
Jonathan Reams's avatar
Jonathan Reams committed
152 153
}

154 155
static void setup_curve (void *socket, int is_server)
{
156 157 158 159 160 161 162 163
    const char *secret_key;
    const char *public_key;
    const char *server_key;

    if (is_server) {
        secret_key = "JTKVSB%%)wK0E.X)V>+}o?pNmC{O&4W4b!Ni{Lh6";
        public_key = "rq:rM>}U?@Lns47E1%kR.o@n%FcmmsL/@{H8]yf7";
        server_key = NULL;
164
    } else {
165 166 167 168 169
        secret_key = "D:)Q[IlAW!ahhC2ac:9*A}h:p?([4%wOTJ%JR%cs";
        public_key = "Yne@$w-vo<fVvi]a<NY6T1ed:M$fCG*[IaLV{hID";
        server_key = "rq:rM>}U?@Lns47E1%kR.o@n%FcmmsL/@{H8]yf7";
    }

170 171 172 173
    zmq_setsockopt (socket, ZMQ_CURVE_SECRETKEY, secret_key,
                    strlen (secret_key));
    zmq_setsockopt (socket, ZMQ_CURVE_PUBLICKEY, public_key,
                    strlen (public_key));
174
    if (is_server)
175 176
        zmq_setsockopt (socket, ZMQ_CURVE_SERVER, &is_server,
                        sizeof (is_server));
177
    else
178 179
        zmq_setsockopt (socket, ZMQ_CURVE_SERVERKEY, server_key,
                        strlen (server_key));
180 181
}

182
static void prep_server_socket (int set_heartbeats,
183 184 185 186
                                int is_curve,
                                void **server_out,
                                void **mon_out,
                                char *endpoint,
187 188
                                size_t ep_length,
                                int socket_type)
Jonathan Reams's avatar
Jonathan Reams committed
189 190
{
    //  We'll be using this socket in raw mode
191
    void *server = test_context_socket (socket_type);
Jonathan Reams's avatar
Jonathan Reams committed
192 193

    int value = 0;
194 195
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (server, ZMQ_LINGER, &value, sizeof (value)));
Jonathan Reams's avatar
Jonathan Reams committed
196

197
    if (set_heartbeats) {
Jonathan Reams's avatar
Jonathan Reams committed
198
        value = 50;
199 200
        TEST_ASSERT_SUCCESS_ERRNO (
          zmq_setsockopt (server, ZMQ_HEARTBEAT_IVL, &value, sizeof (value)));
Jonathan Reams's avatar
Jonathan Reams committed
201 202
    }

203
    if (is_curve)
204
        setup_curve (server, 1);
205

206
    bind_loopback_ipv4 (server, endpoint, ep_length);
Jonathan Reams's avatar
Jonathan Reams committed
207 208

    //  Create and connect a socket for collecting monitor events on dealer
209
    void *server_mon = test_context_socket (ZMQ_PAIR);
Jonathan Reams's avatar
Jonathan Reams committed
210

211 212 213
    TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor (
      server, "inproc://monitor-dealer",
      ZMQ_EVENT_CONNECTED | ZMQ_EVENT_DISCONNECTED | ZMQ_EVENT_ACCEPTED));
Jonathan Reams's avatar
Jonathan Reams committed
214 215

    //  Connect to the inproc endpoint so we'll get events
216 217
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_connect (server_mon, "inproc://monitor-dealer"));
Jonathan Reams's avatar
Jonathan Reams committed
218 219 220 221 222 223 224 225

    *server_out = server;
    *mon_out = server_mon;
}

// This checks for a broken TCP connection (or, in this case a stuck one
// where the peer never responds to PINGS). There should be an accepted event
// then a disconnect event.
226
static void test_heartbeat_timeout (int server_type, int mock_ping)
Jonathan Reams's avatar
Jonathan Reams committed
227 228
{
    int rc;
229
    char my_endpoint[MAX_SOCKET_STRING];
Jonathan Reams's avatar
Jonathan Reams committed
230

231
    void *server, *server_mon;
232
    prep_server_socket (!mock_ping, 0, &server, &server_mon, my_endpoint,
233
                        MAX_SOCKET_STRING, server_type);
Jonathan Reams's avatar
Jonathan Reams committed
234 235

    struct sockaddr_in ip4addr;
236
    raw_socket s;
Jonathan Reams's avatar
Jonathan Reams committed
237 238

    ip4addr.sin_family = AF_INET;
239
    ip4addr.sin_port = htons (atoi (strrchr (my_endpoint, ':') + 1));
240
#if defined(ZMQ_HAVE_WINDOWS) && (_WIN32_WINNT < 0x0600)
241 242
    ip4addr.sin_addr.s_addr = inet_addr ("127.0.0.1");
#else
243
    inet_pton (AF_INET, "127.0.0.1", &ip4addr.sin_addr);
244
#endif
Jonathan Reams's avatar
Jonathan Reams committed
245 246

    s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
247 248 249
    rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (
      connect (s, (struct sockaddr *) &ip4addr, sizeof ip4addr));
    TEST_ASSERT_GREATER_THAN_INT (-1, rc);
Jonathan Reams's avatar
Jonathan Reams committed
250 251

    // Mock a ZMTP 3 client so we can forcibly time out a connection
252
    mock_handshake (s, mock_ping);
Jonathan Reams's avatar
Jonathan Reams committed
253 254

    // By now everything should report as connected
255
    rc = get_monitor_event (server_mon);
256
    TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_ACCEPTED, rc);
Jonathan Reams's avatar
Jonathan Reams committed
257

258 259 260
    if (!mock_ping) {
        // We should have been disconnected
        rc = get_monitor_event (server_mon);
261
        TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_DISCONNECTED, rc);
262
    }
Jonathan Reams's avatar
Jonathan Reams committed
263

264
    close (s);
Jonathan Reams's avatar
Jonathan Reams committed
265

266 267
    test_context_socket_close (server);
    test_context_socket_close (server_mon);
Jonathan Reams's avatar
Jonathan Reams committed
268 269 270 271 272 273 274
}

// This checks that peers respect the TTL value in ping messages
// We set up a mock ZMTP 3 client and send a ping message with a TLL
// to a server that is not doing any heartbeating. Then we sleep,
// if the server disconnects the client, then we know the TTL did
// its thing correctly.
275
static void test_heartbeat_ttl (int client_type, int server_type)
Jonathan Reams's avatar
Jonathan Reams committed
276
{
277
    int rc, value;
278
    char my_endpoint[MAX_SOCKET_STRING];
Jonathan Reams's avatar
Jonathan Reams committed
279

280
    void *server, *server_mon, *client;
281
    prep_server_socket (0, 0, &server, &server_mon, my_endpoint,
282
                        MAX_SOCKET_STRING, server_type);
Jonathan Reams's avatar
Jonathan Reams committed
283

284
    client = test_context_socket (client_type);
Jonathan Reams's avatar
Jonathan Reams committed
285

286 287
    // Set the heartbeat TTL to 0.1 seconds
    value = 100;
288 289
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (client, ZMQ_HEARTBEAT_TTL, &value, sizeof (value)));
Jonathan Reams's avatar
Jonathan Reams committed
290

291 292 293
    // Set the heartbeat interval to much longer than the TTL so that
    // the socket times out oon the remote side.
    value = 250;
294 295
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (client, ZMQ_HEARTBEAT_IVL, &value, sizeof (value)));
Jonathan Reams's avatar
Jonathan Reams committed
296

297
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (client, my_endpoint));
Jonathan Reams's avatar
Jonathan Reams committed
298 299

    // By now everything should report as connected
300
    rc = get_monitor_event (server_mon);
301
    TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_ACCEPTED, rc);
Jonathan Reams's avatar
Jonathan Reams committed
302

303
    msleep (SETTLE_TIME);
Jonathan Reams's avatar
Jonathan Reams committed
304 305

    // We should have been disconnected
306
    rc = get_monitor_event (server_mon);
307
    TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_DISCONNECTED, rc);
Jonathan Reams's avatar
Jonathan Reams committed
308

309 310 311
    test_context_socket_close (server);
    test_context_socket_close (server_mon);
    test_context_socket_close (client);
Jonathan Reams's avatar
Jonathan Reams committed
312 313 314 315 316
}

// This checks for normal operation - that is pings and pongs being
// exchanged normally. There should be an accepted event on the server,
// and then no event afterwards.
317 318
static void
test_heartbeat_notimeout (int is_curve, int client_type, int server_type)
Jonathan Reams's avatar
Jonathan Reams committed
319 320
{
    int rc;
321
    char my_endpoint[MAX_SOCKET_STRING];
Jonathan Reams's avatar
Jonathan Reams committed
322

323
    void *server, *server_mon;
324
    prep_server_socket (1, is_curve, &server, &server_mon, my_endpoint,
325
                        MAX_SOCKET_STRING, server_type);
Jonathan Reams's avatar
Jonathan Reams committed
326

327
    void *client = test_context_socket (client_type);
328
    if (is_curve)
329
        setup_curve (client, 0);
330
    rc = zmq_connect (client, my_endpoint);
Jonathan Reams's avatar
Jonathan Reams committed
331 332

    // Give it a sec to connect and handshake
333
    msleep (SETTLE_TIME);
Jonathan Reams's avatar
Jonathan Reams committed
334 335

    // By now everything should report as connected
336
    rc = get_monitor_event (server_mon);
337
    TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_ACCEPTED, rc);
Jonathan Reams's avatar
Jonathan Reams committed
338 339

    // We should still be connected because pings and pongs are happenin'
340
    TEST_ASSERT_EQUAL_INT (-1, get_monitor_event (server_mon));
Jonathan Reams's avatar
Jonathan Reams committed
341

342 343 344
    test_context_socket_close (client);
    test_context_socket_close (server);
    test_context_socket_close (server_mon);
Jonathan Reams's avatar
Jonathan Reams committed
345 346
}

347 348
void test_heartbeat_timeout_router ()
{
349 350 351 352 353 354
    test_heartbeat_timeout (ZMQ_ROUTER, 0);
}

void test_heartbeat_timeout_router_mock_ping ()
{
    test_heartbeat_timeout (ZMQ_ROUTER, 1);
355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376
}

#define DEFINE_TESTS(first, second, first_define, second_define)               \
    void test_heartbeat_ttl_##first##_##second ()                              \
    {                                                                          \
        test_heartbeat_ttl (first_define, second_define);                      \
    }                                                                          \
    void test_heartbeat_notimeout_##first##_##second ()                        \
    {                                                                          \
        test_heartbeat_notimeout (0, first_define, second_define);             \
    }                                                                          \
    void test_heartbeat_notimeout_##first##_##second##_with_curve ()           \
    {                                                                          \
        test_heartbeat_notimeout (1, first_define, second_define);             \
    }

DEFINE_TESTS (dealer, router, ZMQ_DEALER, ZMQ_ROUTER)
DEFINE_TESTS (req, rep, ZMQ_REQ, ZMQ_REP)
DEFINE_TESTS (pull, push, ZMQ_PULL, ZMQ_PUSH)
DEFINE_TESTS (sub, pub, ZMQ_SUB, ZMQ_PUB)
DEFINE_TESTS (pair, pair, ZMQ_PAIR, ZMQ_PAIR)

377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418
const int deciseconds_per_millisecond = 100;
const int heartbeat_ttl_max =
  (UINT16_MAX + 1) * deciseconds_per_millisecond - 1;

void test_setsockopt_heartbeat_success (const int value)
{
    void *const socket = test_context_socket (ZMQ_PAIR);
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (socket, ZMQ_HEARTBEAT_TTL, &value, sizeof (value)));

    int value_read;
    size_t value_read_size = sizeof (value_read);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_getsockopt (socket, ZMQ_HEARTBEAT_TTL,
                                               &value_read, &value_read_size));

    TEST_ASSERT_EQUAL_INT (value - value % deciseconds_per_millisecond,
                           value_read);

    test_context_socket_close (socket);
}

void test_setsockopt_heartbeat_ttl_max ()
{
    test_setsockopt_heartbeat_success (heartbeat_ttl_max);
}

void test_setsockopt_heartbeat_ttl_more_than_max_fails ()
{
    void *const socket = test_context_socket (ZMQ_PAIR);
    const int value = heartbeat_ttl_max + 1;
    TEST_ASSERT_FAILURE_ERRNO (
      EINVAL,
      zmq_setsockopt (socket, ZMQ_HEARTBEAT_TTL, &value, sizeof (value)));

    test_context_socket_close (socket);
}

void test_setsockopt_heartbeat_ttl_near_zero ()
{
    test_setsockopt_heartbeat_success (deciseconds_per_millisecond - 1);
}

Jonathan Reams's avatar
Jonathan Reams committed
419 420
int main (void)
{
421
    setup_test_environment ();
422

423 424
    UNITY_BEGIN ();

425
    RUN_TEST (test_heartbeat_timeout_router);
426 427
    RUN_TEST (test_heartbeat_timeout_router_mock_ping);

428 429 430 431 432 433
    RUN_TEST (test_heartbeat_ttl_dealer_router);
    RUN_TEST (test_heartbeat_ttl_req_rep);
    RUN_TEST (test_heartbeat_ttl_pull_push);
    RUN_TEST (test_heartbeat_ttl_sub_pub);
    RUN_TEST (test_heartbeat_ttl_pair_pair);

434 435 436 437
    RUN_TEST (test_setsockopt_heartbeat_ttl_max);
    RUN_TEST (test_setsockopt_heartbeat_ttl_more_than_max_fails);
    RUN_TEST (test_setsockopt_heartbeat_ttl_near_zero);

438 439 440 441 442 443 444 445 446 447 448
    RUN_TEST (test_heartbeat_notimeout_dealer_router);
    RUN_TEST (test_heartbeat_notimeout_req_rep);
    RUN_TEST (test_heartbeat_notimeout_pull_push);
    RUN_TEST (test_heartbeat_notimeout_sub_pub);
    RUN_TEST (test_heartbeat_notimeout_pair_pair);

    RUN_TEST (test_heartbeat_notimeout_dealer_router_with_curve);
    RUN_TEST (test_heartbeat_notimeout_req_rep_with_curve);
    RUN_TEST (test_heartbeat_notimeout_pull_push_with_curve);
    RUN_TEST (test_heartbeat_notimeout_sub_pub_with_curve);
    RUN_TEST (test_heartbeat_notimeout_pair_pair_with_curve);
449

450
    return UNITY_END ();
Jonathan Reams's avatar
Jonathan Reams committed
451
}