test_heartbeats.cpp 9.75 KB
Newer Older
Jonathan Reams's avatar
Jonathan Reams committed
1
/*
2
    Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
Jonathan Reams's avatar
Jonathan Reams committed
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "testutil.hpp"
#if defined (ZMQ_HAVE_WINDOWS)
#   include <winsock2.h>
#   include <ws2tcpip.h>
#   include <stdexcept>
#   define close closesocket
26
typedef SOCKET raw_socket;
Jonathan Reams's avatar
Jonathan Reams committed
27 28
#else
#   include <arpa/inet.h>
29
typedef int raw_socket;
Jonathan Reams's avatar
Jonathan Reams committed
30 31 32 33 34 35 36 37 38
#endif

//  Read one event off the monitor socket; return value and address
//  by reference, if not null, and event number by value. Returns -1
//  in case of error.

static int
get_monitor_event (void *monitor)
{
39
    for (int i = 0; i < 2; i++) {
Jonathan Reams's avatar
Jonathan Reams committed
40 41
        //  First frame in message contains event number and value
        zmq_msg_t msg;
42 43
        int rc = zmq_msg_init (&msg);
        assert (rc == 0);
Jonathan Reams's avatar
Jonathan Reams committed
44
        if (zmq_msg_recv (&msg, monitor, ZMQ_DONTWAIT) == -1) {
45
            msleep (SETTLE_TIME);
Jonathan Reams's avatar
Jonathan Reams committed
46 47 48 49 50 51 52 53
            continue;           //  Interruped, presumably
        }
        assert (zmq_msg_more (&msg));

        uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
        uint16_t event = *(uint16_t *) (data);

        //  Second frame in message contains event address
54 55
        rc = zmq_msg_init (&msg);
        assert (rc == 0);
Jonathan Reams's avatar
Jonathan Reams committed
56 57 58 59 60 61 62 63 64 65
        if (zmq_msg_recv (&msg, monitor, 0) == -1) {
            return -1;              //  Interruped, presumably
        }
        assert (!zmq_msg_more (&msg));

        return event;
    }
    return -1;
}

66
static void
67
recv_with_retry (raw_socket fd, char *buffer, int bytes) {
68 69 70 71 72 73 74 75 76 77
  int received = 0;
    while (true) {
      int rc = recv(fd, buffer + received, bytes - received, 0);
      assert(rc > 0);
      received += rc;
      assert(received <= bytes);
      if (received == bytes) break;
    }
}

Jonathan Reams's avatar
Jonathan Reams committed
78
static void
79
mock_handshake (raw_socket fd) {
Jonathan Reams's avatar
Jonathan Reams committed
80
    const uint8_t zmtp_greeting[33] = { 0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0x7f, 3, 0, 'N', 'U', 'L', 'L', 0 };
81 82 83
    char buffer [128];
    memset (buffer, 0, sizeof(buffer));
    memcpy (buffer, zmtp_greeting, sizeof(zmtp_greeting));
Jonathan Reams's avatar
Jonathan Reams committed
84

85 86
    int rc = send (fd, buffer, 64, 0);
    assert (rc == 64);
Jonathan Reams's avatar
Jonathan Reams committed
87

88
    recv_with_retry (fd, buffer, 64);
Jonathan Reams's avatar
Jonathan Reams committed
89

90
    const uint8_t zmtp_ready [43] = {
Jonathan Reams's avatar
Jonathan Reams committed
91 92 93 94 95 96 97 98
        4, 41, 5, 'R', 'E', 'A', 'D', 'Y', 11, 'S', 'o', 'c', 'k', 'e', 't', '-', 'T', 'y', 'p', 'e',
        0, 0, 0, 6, 'D', 'E', 'A', 'L', 'E', 'R', 8, 'I', 'd', 'e', 'n', 't', 'i', 't', 'y',
        0, 0, 0, 0
    };

    memset(buffer, 0, sizeof(buffer));
    memcpy(buffer, zmtp_ready, 43);
    rc = send(fd, buffer, 43, 0);
99
    assert (rc == 43);
Jonathan Reams's avatar
Jonathan Reams committed
100

101
    recv_with_retry(fd, buffer, 43);
Jonathan Reams's avatar
Jonathan Reams committed
102 103 104
}

static void
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
setup_curve (void * socket, int is_server) {
    const char *secret_key;
    const char *public_key;
    const char *server_key;

    if (is_server) {
        secret_key = "JTKVSB%%)wK0E.X)V>+}o?pNmC{O&4W4b!Ni{Lh6";
        public_key = "rq:rM>}U?@Lns47E1%kR.o@n%FcmmsL/@{H8]yf7";
        server_key = NULL;
    }
    else {
        secret_key = "D:)Q[IlAW!ahhC2ac:9*A}h:p?([4%wOTJ%JR%cs";
        public_key = "Yne@$w-vo<fVvi]a<NY6T1ed:M$fCG*[IaLV{hID";
        server_key = "rq:rM>}U?@Lns47E1%kR.o@n%FcmmsL/@{H8]yf7";
    }

    zmq_setsockopt (socket, ZMQ_CURVE_SECRETKEY, secret_key, strlen(secret_key));
    zmq_setsockopt (socket, ZMQ_CURVE_PUBLICKEY, public_key, strlen(public_key));
    if (is_server)
        zmq_setsockopt (socket, ZMQ_CURVE_SERVER, &is_server, sizeof(is_server));
    else
        zmq_setsockopt (socket, ZMQ_CURVE_SERVERKEY, server_key, strlen(server_key));
}

static void
prep_server_socket(void * ctx, int set_heartbeats, int is_curve, void ** server_out, void ** mon_out,
131
        char *endpoint, size_t ep_length)
Jonathan Reams's avatar
Jonathan Reams committed
132 133 134 135 136 137 138 139 140 141
{
    int rc;
    //  We'll be using this socket in raw mode
    void *server = zmq_socket (ctx, ZMQ_ROUTER);
    assert (server);

    int value = 0;
    rc = zmq_setsockopt (server, ZMQ_LINGER, &value, sizeof (value));
    assert (rc == 0);

142
    if (set_heartbeats) {
Jonathan Reams's avatar
Jonathan Reams committed
143 144 145 146 147
        value = 50;
        rc = zmq_setsockopt (server, ZMQ_HEARTBEAT_IVL, &value, sizeof(value));
        assert (rc == 0);
    }

148 149 150
    if (is_curve)
        setup_curve(server, 1);

151 152 153
    rc = zmq_bind (server, "tcp://127.0.0.1:*");
    assert (rc == 0);
    rc = zmq_getsockopt (server, ZMQ_LAST_ENDPOINT, endpoint, &ep_length);
Jonathan Reams's avatar
Jonathan Reams committed
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
    assert (rc == 0);

    //  Create and connect a socket for collecting monitor events on dealer
    void *server_mon = zmq_socket (ctx, ZMQ_PAIR);
    assert (server_mon);

    rc = zmq_socket_monitor (server, "inproc://monitor-dealer",
          ZMQ_EVENT_CONNECTED | ZMQ_EVENT_DISCONNECTED | ZMQ_EVENT_ACCEPTED);
    assert (rc == 0);

    //  Connect to the inproc endpoint so we'll get events
    rc = zmq_connect (server_mon, "inproc://monitor-dealer");
    assert (rc == 0);

    *server_out = server;
    *mon_out = server_mon;
}

// This checks for a broken TCP connection (or, in this case a stuck one
// where the peer never responds to PINGS). There should be an accepted event
// then a disconnect event.
static void
test_heartbeat_timeout (void)
{
    int rc;
179
    char my_endpoint[MAX_SOCKET_STRING];
Jonathan Reams's avatar
Jonathan Reams committed
180 181 182 183 184 185

    //  Set up our context and sockets
    void *ctx = zmq_ctx_new ();
    assert (ctx);

    void * server, * server_mon;
186
    prep_server_socket (ctx, 1, 0, &server, &server_mon, my_endpoint,
187
            MAX_SOCKET_STRING);
Jonathan Reams's avatar
Jonathan Reams committed
188 189

    struct sockaddr_in ip4addr;
190
    raw_socket s;
Jonathan Reams's avatar
Jonathan Reams committed
191 192

    ip4addr.sin_family = AF_INET;
193
    ip4addr.sin_port = htons (atoi (strrchr (my_endpoint, ':') + 1));
194
#if defined (ZMQ_HAVE_WINDOWS) && (_WIN32_WINNT < 0x0600)
195 196 197 198
    ip4addr.sin_addr.s_addr = inet_addr ("127.0.0.1");
#else
    inet_pton(AF_INET, "127.0.0.1", &ip4addr.sin_addr);
#endif
Jonathan Reams's avatar
Jonathan Reams committed
199 200 201 202 203 204

    s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
    rc = connect (s, (struct sockaddr*) &ip4addr, sizeof ip4addr);
    assert (rc > -1);

    // Mock a ZMTP 3 client so we can forcibly time out a connection
205
    mock_handshake (s);
Jonathan Reams's avatar
Jonathan Reams committed
206 207 208

    // By now everything should report as connected
    rc = get_monitor_event(server_mon);
209
    assert (rc == ZMQ_EVENT_ACCEPTED);
Jonathan Reams's avatar
Jonathan Reams committed
210 211 212

    // We should have been disconnected
    rc = get_monitor_event(server_mon);
213
    assert (rc == ZMQ_EVENT_DISCONNECTED);
Jonathan Reams's avatar
Jonathan Reams committed
214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234

    close(s);

    rc = zmq_close (server);
    assert (rc == 0);

    rc = zmq_close (server_mon);
    assert (rc == 0);

    rc = zmq_ctx_term (ctx);
    assert (rc == 0);
}

// This checks that peers respect the TTL value in ping messages
// We set up a mock ZMTP 3 client and send a ping message with a TLL
// to a server that is not doing any heartbeating. Then we sleep,
// if the server disconnects the client, then we know the TTL did
// its thing correctly.
static void
test_heartbeat_ttl (void)
{
235
    int rc, value;
236
    char my_endpoint[MAX_SOCKET_STRING];
Jonathan Reams's avatar
Jonathan Reams committed
237 238 239 240 241

    //  Set up our context and sockets
    void *ctx = zmq_ctx_new ();
    assert (ctx);

242
    void * server, * server_mon, *client;
243
    prep_server_socket (ctx, 0, 0, &server, &server_mon, my_endpoint,
244
            MAX_SOCKET_STRING);
Jonathan Reams's avatar
Jonathan Reams committed
245

246 247
    client = zmq_socket (ctx, ZMQ_DEALER);
    assert (client != NULL);
Jonathan Reams's avatar
Jonathan Reams committed
248

249 250
    // Set the heartbeat TTL to 0.1 seconds
    value = 100;
251 252
    rc = zmq_setsockopt (client, ZMQ_HEARTBEAT_TTL, &value, sizeof (value));
    assert (rc == 0);
Jonathan Reams's avatar
Jonathan Reams committed
253

254 255 256
    // Set the heartbeat interval to much longer than the TTL so that
    // the socket times out oon the remote side.
    value = 250;
257 258
    rc = zmq_setsockopt (client, ZMQ_HEARTBEAT_IVL, &value, sizeof (value));
    assert (rc == 0);
Jonathan Reams's avatar
Jonathan Reams committed
259

260
    rc = zmq_connect (client, my_endpoint);
261
    assert (rc == 0);
Jonathan Reams's avatar
Jonathan Reams committed
262 263

    // By now everything should report as connected
264 265
    rc = get_monitor_event (server_mon);
    assert (rc == ZMQ_EVENT_ACCEPTED);
Jonathan Reams's avatar
Jonathan Reams committed
266

267
    msleep (SETTLE_TIME);
Jonathan Reams's avatar
Jonathan Reams committed
268 269

    // We should have been disconnected
270 271
    rc = get_monitor_event (server_mon);
    assert (rc == ZMQ_EVENT_DISCONNECTED);
Jonathan Reams's avatar
Jonathan Reams committed
272 273 274 275 276 277 278

    rc = zmq_close (server);
    assert (rc == 0);

    rc = zmq_close (server_mon);
    assert (rc == 0);

279 280 281
    rc = zmq_close (client);
    assert (rc == 0);

Jonathan Reams's avatar
Jonathan Reams committed
282 283 284 285 286 287 288 289
    rc = zmq_ctx_term (ctx);
    assert (rc == 0);
}

// This checks for normal operation - that is pings and pongs being
// exchanged normally. There should be an accepted event on the server,
// and then no event afterwards.
static void
290
test_heartbeat_notimeout (int is_curve)
Jonathan Reams's avatar
Jonathan Reams committed
291 292
{
    int rc;
293
    char my_endpoint[MAX_SOCKET_STRING];
Jonathan Reams's avatar
Jonathan Reams committed
294 295 296 297 298 299

    //  Set up our context and sockets
    void *ctx = zmq_ctx_new ();
    assert (ctx);

    void * server, * server_mon;
300
    prep_server_socket(ctx, 1, is_curve, &server, &server_mon, my_endpoint,
301
            MAX_SOCKET_STRING);
Jonathan Reams's avatar
Jonathan Reams committed
302

303
    void * client = zmq_socket (ctx, ZMQ_DEALER);
304 305
    if (is_curve)
        setup_curve(client, 0);
306
    rc = zmq_connect (client, my_endpoint);
Jonathan Reams's avatar
Jonathan Reams committed
307 308

    // Give it a sec to connect and handshake
309
    msleep (SETTLE_TIME);
Jonathan Reams's avatar
Jonathan Reams committed
310 311 312

    // By now everything should report as connected
    rc = get_monitor_event(server_mon);
313
    assert (rc == ZMQ_EVENT_ACCEPTED);
Jonathan Reams's avatar
Jonathan Reams committed
314 315

    // We should still be connected because pings and pongs are happenin'
316 317
    rc = get_monitor_event (server_mon);
    assert (rc == -1);
Jonathan Reams's avatar
Jonathan Reams committed
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333

    rc = zmq_close (client);
    assert (rc == 0);

    rc = zmq_close (server);
    assert (rc == 0);

    rc = zmq_close (server_mon);
    assert (rc == 0);

    rc = zmq_ctx_term (ctx);
    assert (rc == 0);
}

int main (void)
{
334 335 336
    setup_test_environment ();
    test_heartbeat_timeout ();
    test_heartbeat_ttl ();
337 338 339 340
    // Run this test without curve
    test_heartbeat_notimeout (0);
    // Then rerun it with curve
    test_heartbeat_notimeout (1);
Jonathan Reams's avatar
Jonathan Reams committed
341
}