test_heartbeats.cpp 8.25 KB
Newer Older
Jonathan Reams's avatar
Jonathan Reams committed
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Jonathan Reams's avatar
Jonathan Reams committed
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.

#include "testutil.hpp"
#if defined (ZMQ_HAVE_WINDOWS)
#   include <winsock2.h>
#   include <ws2tcpip.h>
#   include <stdexcept>
#   define close closesocket
typedef SOCKET raw_socket;
Jonathan Reams's avatar
Jonathan Reams committed
27 28
#   include <arpa/inet.h>
typedef int raw_socket;
Jonathan Reams's avatar
Jonathan Reams committed
30 31 32 33 34 35 36 37 38

//  Read one event off the monitor socket; return value and address
//  by reference, if not null, and event number by value. Returns -1
//  in case of error.

static int
get_monitor_event (void *monitor)
    for (int i = 0; i < 2; i++) {
Jonathan Reams's avatar
Jonathan Reams committed
40 41
        //  First frame in message contains event number and value
        zmq_msg_t msg;
42 43
        int rc = zmq_msg_init (&msg);
        assert (rc == 0);
Jonathan Reams's avatar
Jonathan Reams committed
        if (zmq_msg_recv (&msg, monitor, ZMQ_DONTWAIT) == -1) {
            msleep (SETTLE_TIME);
Jonathan Reams's avatar
Jonathan Reams committed
46 47 48 49 50 51 52 53
            continue;           //  Interruped, presumably
        assert (zmq_msg_more (&msg));

        uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
        uint16_t event = *(uint16_t *) (data);

        //  Second frame in message contains event address
54 55
        rc = zmq_msg_init (&msg);
        assert (rc == 0);
Jonathan Reams's avatar
Jonathan Reams committed
56 57 58 59 60 61 62 63 64 65
        if (zmq_msg_recv (&msg, monitor, 0) == -1) {
            return -1;              //  Interruped, presumably
        assert (!zmq_msg_more (&msg));

        return event;
    return -1;

static void
recv_with_retry (raw_socket fd, char *buffer, int bytes) {
68 69 70 71 72 73 74 75 76 77
  int received = 0;
    while (true) {
      int rc = recv(fd, buffer + received, bytes - received, 0);
      assert(rc > 0);
      received += rc;
      assert(received <= bytes);
      if (received == bytes) break;

Jonathan Reams's avatar
Jonathan Reams committed
static void
mock_handshake (raw_socket fd) {
Jonathan Reams's avatar
Jonathan Reams committed
    const uint8_t zmtp_greeting[33] = { 0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0x7f, 3, 0, 'N', 'U', 'L', 'L', 0 };
81 82 83
    char buffer [128];
    memset (buffer, 0, sizeof(buffer));
    memcpy (buffer, zmtp_greeting, sizeof(zmtp_greeting));
Jonathan Reams's avatar
Jonathan Reams committed

85 86
    int rc = send (fd, buffer, 64, 0);
    assert (rc == 64);
Jonathan Reams's avatar
Jonathan Reams committed

    recv_with_retry (fd, buffer, 64);
Jonathan Reams's avatar
Jonathan Reams committed

    const uint8_t zmtp_ready [43] = {
Jonathan Reams's avatar
Jonathan Reams committed
91 92 93 94 95 96 97 98
        4, 41, 5, 'R', 'E', 'A', 'D', 'Y', 11, 'S', 'o', 'c', 'k', 'e', 't', '-', 'T', 'y', 'p', 'e',
        0, 0, 0, 6, 'D', 'E', 'A', 'L', 'E', 'R', 8, 'I', 'd', 'e', 'n', 't', 'i', 't', 'y',
        0, 0, 0, 0

    memset(buffer, 0, sizeof(buffer));
    memcpy(buffer, zmtp_ready, 43);
    rc = send(fd, buffer, 43, 0);
    assert (rc == 43);
Jonathan Reams's avatar
Jonathan Reams committed

    recv_with_retry(fd, buffer, 43);
Jonathan Reams's avatar
Jonathan Reams committed
102 103 104

static void
prep_server_socket(void * ctx, int set_heartbeats, void ** server_out, void ** mon_out)
Jonathan Reams's avatar
Jonathan Reams committed
106 107 108 109 110 111 112 113 114 115
    int rc;
    //  We'll be using this socket in raw mode
    void *server = zmq_socket (ctx, ZMQ_ROUTER);
    assert (server);

    int value = 0;
    rc = zmq_setsockopt (server, ZMQ_LINGER, &value, sizeof (value));
    assert (rc == 0);

    if (set_heartbeats) {
Jonathan Reams's avatar
Jonathan Reams committed
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
        value = 50;
        rc = zmq_setsockopt (server, ZMQ_HEARTBEAT_IVL, &value, sizeof(value));
        assert (rc == 0);

    rc = zmq_bind (server, "tcp://");
    assert (rc == 0);

    //  Create and connect a socket for collecting monitor events on dealer
    void *server_mon = zmq_socket (ctx, ZMQ_PAIR);
    assert (server_mon);

    rc = zmq_socket_monitor (server, "inproc://monitor-dealer",
    assert (rc == 0);

    //  Connect to the inproc endpoint so we'll get events
    rc = zmq_connect (server_mon, "inproc://monitor-dealer");
    assert (rc == 0);

    *server_out = server;
    *mon_out = server_mon;

// This checks for a broken TCP connection (or, in this case a stuck one
// where the peer never responds to PINGS). There should be an accepted event
// then a disconnect event.
static void
test_heartbeat_timeout (void)
    int rc;

    //  Set up our context and sockets
    void *ctx = zmq_ctx_new ();
    assert (ctx);

    void * server, * server_mon;
    prep_server_socket (ctx, 1, &server, &server_mon);
Jonathan Reams's avatar
Jonathan Reams committed
155 156

    struct sockaddr_in ip4addr;
    raw_socket s;
Jonathan Reams's avatar
Jonathan Reams committed
158 159

    ip4addr.sin_family = AF_INET;
160 161
    ip4addr.sin_port = htons (5556);
#if defined (ZMQ_HAVE_WINDOWS) && (_WIN32_WINNT < 0x0600)
162 163 164 165
    ip4addr.sin_addr.s_addr = inet_addr ("");
    inet_pton(AF_INET, "", &ip4addr.sin_addr);
Jonathan Reams's avatar
Jonathan Reams committed
166 167 168 169 170 171

    rc = connect (s, (struct sockaddr*) &ip4addr, sizeof ip4addr);
    assert (rc > -1);

    // Mock a ZMTP 3 client so we can forcibly time out a connection
    mock_handshake (s);
Jonathan Reams's avatar
Jonathan Reams committed
173 174 175

    // By now everything should report as connected
    rc = get_monitor_event(server_mon);
    assert (rc == ZMQ_EVENT_ACCEPTED);
Jonathan Reams's avatar
Jonathan Reams committed
177 178 179

    // We should have been disconnected
    rc = get_monitor_event(server_mon);
    assert (rc == ZMQ_EVENT_DISCONNECTED);
Jonathan Reams's avatar
Jonathan Reams committed
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201


    rc = zmq_close (server);
    assert (rc == 0);

    rc = zmq_close (server_mon);
    assert (rc == 0);

    rc = zmq_ctx_term (ctx);
    assert (rc == 0);

// This checks that peers respect the TTL value in ping messages
// We set up a mock ZMTP 3 client and send a ping message with a TLL
// to a server that is not doing any heartbeating. Then we sleep,
// if the server disconnects the client, then we know the TTL did
// its thing correctly.
static void
test_heartbeat_ttl (void)
    int rc, value;
Jonathan Reams's avatar
Jonathan Reams committed
203 204 205 206 207

    //  Set up our context and sockets
    void *ctx = zmq_ctx_new ();
    assert (ctx);

    void * server, * server_mon, *client;
    prep_server_socket (ctx, 0, &server, &server_mon);
Jonathan Reams's avatar
Jonathan Reams committed

211 212
    client = zmq_socket (ctx, ZMQ_DEALER);
    assert (client != NULL);
Jonathan Reams's avatar
Jonathan Reams committed

214 215
    // Set the heartbeat TTL to 0.1 seconds
    value = 100;
216 217
    rc = zmq_setsockopt (client, ZMQ_HEARTBEAT_TTL, &value, sizeof (value));
    assert (rc == 0);
Jonathan Reams's avatar
Jonathan Reams committed

219 220 221
    // Set the heartbeat interval to much longer than the TTL so that
    // the socket times out oon the remote side.
    value = 250;
222 223
    rc = zmq_setsockopt (client, ZMQ_HEARTBEAT_IVL, &value, sizeof (value));
    assert (rc == 0);
Jonathan Reams's avatar
Jonathan Reams committed

225 226
    rc = zmq_connect (client, "tcp://localhost:5556");
    assert (rc == 0);
Jonathan Reams's avatar
Jonathan Reams committed
227 228

    // By now everything should report as connected
229 230
    rc = get_monitor_event (server_mon);
    assert (rc == ZMQ_EVENT_ACCEPTED);
Jonathan Reams's avatar
Jonathan Reams committed

    msleep (SETTLE_TIME);
Jonathan Reams's avatar
Jonathan Reams committed
233 234

    // We should have been disconnected
235 236
    rc = get_monitor_event (server_mon);
    assert (rc == ZMQ_EVENT_DISCONNECTED);
Jonathan Reams's avatar
Jonathan Reams committed
237 238 239 240 241 242 243

    rc = zmq_close (server);
    assert (rc == 0);

    rc = zmq_close (server_mon);
    assert (rc == 0);

244 245 246
    rc = zmq_close (client);
    assert (rc == 0);

Jonathan Reams's avatar
Jonathan Reams committed
247 248 249 250 251 252 253 254
    rc = zmq_ctx_term (ctx);
    assert (rc == 0);

// This checks for normal operation - that is pings and pongs being
// exchanged normally. There should be an accepted event on the server,
// and then no event afterwards.
static void
test_heartbeat_notimeout (void)
Jonathan Reams's avatar
Jonathan Reams committed
256 257 258 259 260 261 262 263
    int rc;

    //  Set up our context and sockets
    void *ctx = zmq_ctx_new ();
    assert (ctx);

    void * server, * server_mon;
    prep_server_socket(ctx, 1, &server, &server_mon);
Jonathan Reams's avatar
Jonathan Reams committed

266 267
    void * client = zmq_socket (ctx, ZMQ_DEALER);
    rc = zmq_connect (client, "tcp://");
Jonathan Reams's avatar
Jonathan Reams committed
268 269

    // Give it a sec to connect and handshake
    msleep (SETTLE_TIME);
Jonathan Reams's avatar
Jonathan Reams committed
271 272 273

    // By now everything should report as connected
    rc = get_monitor_event(server_mon);
    assert (rc == ZMQ_EVENT_ACCEPTED);
Jonathan Reams's avatar
Jonathan Reams committed
275 276

    // We should still be connected because pings and pongs are happenin'
277 278
    rc = get_monitor_event (server_mon);
    assert (rc == -1);
Jonathan Reams's avatar
Jonathan Reams committed
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294

    rc = zmq_close (client);
    assert (rc == 0);

    rc = zmq_close (server);
    assert (rc == 0);

    rc = zmq_close (server_mon);
    assert (rc == 0);

    rc = zmq_ctx_term (ctx);
    assert (rc == 0);

int main (void)
295 296 297 298
    setup_test_environment ();
    test_heartbeat_timeout ();
    test_heartbeat_ttl ();
    test_heartbeat_notimeout ();
Jonathan Reams's avatar
Jonathan Reams committed