Commit e9a5bc8d authored by Jonathan Reams's avatar Jonathan Reams

Fix units and default values for heartbeats options

Set the ZMQ_HEARTBEAT_TIMEOUT to default to the value of
ZMQ_HEARTBEAT_IVL if it's not explicitly set.
Change the units of ZMQ_HEARTBEAT_TTL to milliseconds in the API
and round down to the nearest decisecond so that all the options
are using the same units.
Make the maximum heartbeat TTL match the spec (6553 seconds)
parent 2e5435e3
...@@ -278,11 +278,13 @@ ZMQ_HEARTBEAT_TTL: Set the TTL value for ZMTP heartbeats ...@@ -278,11 +278,13 @@ ZMQ_HEARTBEAT_TTL: Set the TTL value for ZMTP heartbeats
The 'ZMQ_HEARTBEAT_TTL' option shall set the timeout on the remote peer for ZMTP The 'ZMQ_HEARTBEAT_TTL' option shall set the timeout on the remote peer for ZMTP
heartbeats. If this option is greater than 0, the remote side shall time out the heartbeats. If this option is greater than 0, the remote side shall time out the
connection if it does not receive any more traffic within the TTL period. This option connection if it does not receive any more traffic within the TTL period. This option
does not have any effect if 'ZMQ_HEARTBEAT_IVL' is not set or is 0. does not have any effect if 'ZMQ_HEARTBEAT_IVL' is not set or is 0. Internally, this
value is rounded down to the nearest decisecond, any value less than 100 will have
no effect.
[horizontal] [horizontal]
Option value type:: uint16_t Option value type:: int
Option value unit:: deciseconds (1/10th of a second) Option value unit:: milliseconds
Default value:: 0 Default value:: 0
Applicable socket types:: all, when using connection-oriented transports Applicable socket types:: all, when using connection-oriented transports
......
...@@ -72,7 +72,7 @@ zmq::options_t::options_t () : ...@@ -72,7 +72,7 @@ zmq::options_t::options_t () :
connected (false), connected (false),
heartbeat_ttl (0), heartbeat_ttl (0),
heartbeat_interval (0), heartbeat_interval (0),
heartbeat_timeout (0) heartbeat_timeout (-1)
{ {
} }
...@@ -530,7 +530,9 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, ...@@ -530,7 +530,9 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
break; break;
case ZMQ_HEARTBEAT_TTL: case ZMQ_HEARTBEAT_TTL:
if (is_int && value >= 0 && value < 0xffff) { // Convert this to deciseconds from milliseconds
value = value / 100;
if (is_int && value >= 0 && value <= 6553) {
heartbeat_ttl = (uint16_t)value; heartbeat_ttl = (uint16_t)value;
return 0; return 0;
} }
...@@ -905,7 +907,8 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) ...@@ -905,7 +907,8 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
case ZMQ_HEARTBEAT_TTL: case ZMQ_HEARTBEAT_TTL:
if (is_int) { if (is_int) {
*(uint16_t*)value = heartbeat_ttl; // Convert the internal deciseconds value to milliseconds
*value = heartbeat_ttl * 100;
return 0; return 0;
} }
break; break;
......
...@@ -98,6 +98,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, ...@@ -98,6 +98,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
has_ttl_timer (false), has_ttl_timer (false),
has_timeout_timer (false), has_timeout_timer (false),
has_heartbeat_timer (false), has_heartbeat_timer (false),
heartbeat_timeout (0),
socket (NULL) socket (NULL)
{ {
int rc = tx_msg.init (); int rc = tx_msg.init ();
...@@ -144,6 +145,11 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, ...@@ -144,6 +145,11 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int)); rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int));
errno_assert (rc == 0); errno_assert (rc == 0);
#endif #endif
if(options.heartbeat_interval > 0) {
heartbeat_timeout = options.heartbeat_timeout;
if(heartbeat_timeout == -1)
heartbeat_timeout = options.heartbeat_interval;
}
} }
zmq::stream_engine_t::~stream_engine_t () zmq::stream_engine_t::~stream_engine_t ()
...@@ -1032,8 +1038,8 @@ int zmq::stream_engine_t::produce_ping_message(msg_t * msg_) ...@@ -1032,8 +1038,8 @@ int zmq::stream_engine_t::produce_ping_message(msg_t * msg_)
rc = mechanism->encode (msg_); rc = mechanism->encode (msg_);
next_msg = &stream_engine_t::pull_and_encode; next_msg = &stream_engine_t::pull_and_encode;
if(!has_timeout_timer && options.heartbeat_timeout > 0) { if(!has_timeout_timer && heartbeat_timeout > 0) {
add_timer(options.heartbeat_timeout, heartbeat_timeout_timer_id); add_timer(heartbeat_timeout, heartbeat_timeout_timer_id);
has_timeout_timer = true; has_timeout_timer = true;
} }
return rc; return rc;
...@@ -1062,8 +1068,8 @@ int zmq::stream_engine_t::process_heartbeat_message(msg_t * msg_) ...@@ -1062,8 +1068,8 @@ int zmq::stream_engine_t::process_heartbeat_message(msg_t * msg_)
memcpy(&remote_heartbeat_ttl, (uint8_t*)msg_->data() + 5, 2); memcpy(&remote_heartbeat_ttl, (uint8_t*)msg_->data() + 5, 2);
remote_heartbeat_ttl = ntohs(remote_heartbeat_ttl); remote_heartbeat_ttl = ntohs(remote_heartbeat_ttl);
// The remote heartbeat is in 10ths of a second // The remote heartbeat is in 10ths of a second
// so we multiply it by 10 to get the timer interval. // so we multiply it by 100 to get the timer interval in ms.
remote_heartbeat_ttl *= 10; remote_heartbeat_ttl *= 100;
if(!has_ttl_timer && remote_heartbeat_ttl > 0) { if(!has_ttl_timer && remote_heartbeat_ttl > 0) {
add_timer(remote_heartbeat_ttl, heartbeat_ttl_timer_id); add_timer(remote_heartbeat_ttl, heartbeat_ttl_timer_id);
......
...@@ -219,6 +219,7 @@ namespace zmq ...@@ -219,6 +219,7 @@ namespace zmq
bool has_ttl_timer; bool has_ttl_timer;
bool has_timeout_timer; bool has_timeout_timer;
bool has_heartbeat_timer; bool has_heartbeat_timer;
int heartbeat_timeout;
// Socket // Socket
zmq::socket_base_t *socket; zmq::socket_base_t *socket;
......
...@@ -131,10 +131,6 @@ prep_server_socket(void * ctx, int set_heartbeats, int is_curve, void ** server_ ...@@ -131,10 +131,6 @@ prep_server_socket(void * ctx, int set_heartbeats, int is_curve, void ** server_
value = 50; value = 50;
rc = zmq_setsockopt (server, ZMQ_HEARTBEAT_IVL, &value, sizeof(value)); rc = zmq_setsockopt (server, ZMQ_HEARTBEAT_IVL, &value, sizeof(value));
assert (rc == 0); assert (rc == 0);
value = 50;
rc = zmq_setsockopt (server, ZMQ_HEARTBEAT_TIMEOUT, &value, sizeof(value));
assert (rc == 0);
} }
if(is_curve) if(is_curve)
...@@ -216,59 +212,49 @@ test_heartbeat_timeout (void) ...@@ -216,59 +212,49 @@ test_heartbeat_timeout (void)
static void static void
test_heartbeat_ttl (void) test_heartbeat_ttl (void)
{ {
int rc; int rc, value;
// Set up our context and sockets // Set up our context and sockets
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
void * server, * server_mon; void * server, * server_mon, *client;
prep_server_socket(ctx, 0, 0, &server, &server_mon); prep_server_socket(ctx, 0, 0, &server, &server_mon);
struct sockaddr_in ip4addr; client = zmq_socket(ctx, ZMQ_DEALER);
int s; assert(client != NULL);
ip4addr.sin_family = AF_INET; // Set the heartbeat TTL to 0.1 seconds
ip4addr.sin_port = htons(5556); value = 100;
inet_pton(AF_INET, "127.0.0.1", &ip4addr.sin_addr); zmq_setsockopt(client, ZMQ_HEARTBEAT_TTL, &value, sizeof(value));
s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); // Set the heartbeat interval to much longer than the TTL so that
rc = connect (s, (struct sockaddr*) &ip4addr, sizeof ip4addr); // the socket times out oon the remote side.
assert (rc > -1); value = 250;
zmq_setsockopt(client, ZMQ_HEARTBEAT_IVL, &value, sizeof(value));
// Mock a ZMTP 3 client so we can forcibly time out a connection rc = zmq_connect(client, "tcp://localhost:5556");
mock_handshake(s); assert(rc == 0);
// By now everything should report as connected // By now everything should report as connected
rc = get_monitor_event(server_mon); rc = get_monitor_event(server_mon);
assert(rc == ZMQ_EVENT_ACCEPTED); assert(rc == ZMQ_EVENT_ACCEPTED);
// This is a ping message with a 0.5 second TTL. msleep(100);
uint8_t ping_message[] = {
0x4, // This specifies that this is a command message
0x7, // The total payload length is 8 bytes
0x4, 'P', 'I', 'N', 'G', // The command name
0, 10 // This is a network-order 16-bit TTL value
};
rc = send(s, (const char*)ping_message, sizeof(ping_message), 0);
assert(rc == sizeof(ping_message));
uint8_t pong_buffer[8] = { 0 };
rc = recv(s, (char*)pong_buffer, 7, 0);
assert(rc == 7 && memcmp(pong_buffer, "\4\5\4PONG", 7) == 0);
// We should have been disconnected // We should have been disconnected
rc = get_monitor_event(server_mon); rc = get_monitor_event(server_mon);
assert(rc == ZMQ_EVENT_DISCONNECTED); assert(rc == ZMQ_EVENT_DISCONNECTED);
close(s);
rc = zmq_close (server); rc = zmq_close (server);
assert (rc == 0); assert (rc == 0);
rc = zmq_close (server_mon); rc = zmq_close (server_mon);
assert (rc == 0); assert (rc == 0);
rc = zmq_close (client);
assert (rc == 0);
rc = zmq_ctx_term (ctx); rc = zmq_ctx_term (ctx);
assert (rc == 0); assert (rc == 0);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment