Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
0a037a74
Unverified
Commit
0a037a74
authored
May 14, 2018
by
Luca Boccassi
Committed by
GitHub
May 14, 2018
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #3097 from sigiesec/ping-context
ZMTP 3.1 PING Context not implemented
parents
df2fe88b
be66eacf
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
220 additions
and
135 deletions
+220
-135
Makefile.am
Makefile.am
+2
-1
stream_engine.cpp
src/stream_engine.cpp
+34
-7
stream_engine.hpp
src/stream_engine.hpp
+3
-0
test_heartbeats.cpp
tests/test_heartbeats.cpp
+156
-127
testutil_unity.hpp
tests/testutil_unity.hpp
+25
-0
No files found.
Makefile.am
View file @
0a037a74
...
@@ -651,7 +651,8 @@ tests_test_setsockopt_SOURCES = tests/test_setsockopt.cpp
...
@@ -651,7 +651,8 @@ tests_test_setsockopt_SOURCES = tests/test_setsockopt.cpp
tests_test_setsockopt_LDADD
=
src/libzmq.la
tests_test_setsockopt_LDADD
=
src/libzmq.la
tests_test_heartbeats_SOURCES
=
tests/test_heartbeats.cpp
tests_test_heartbeats_SOURCES
=
tests/test_heartbeats.cpp
tests_test_heartbeats_LDADD
=
src/libzmq.la
tests_test_heartbeats_LDADD
=
src/libzmq.la
${
UNITY_LIBS
}
tests_test_heartbeats_CPPFLAGS
=
${
UNITY_CPPFLAGS
}
tests_test_stream_exceeds_buffer_SOURCES
=
tests/test_stream_exceeds_buffer.cpp
tests_test_stream_exceeds_buffer_SOURCES
=
tests/test_stream_exceeds_buffer.cpp
tests_test_stream_exceeds_buffer_LDADD
=
src/libzmq.la
tests_test_stream_exceeds_buffer_LDADD
=
src/libzmq.la
...
...
src/stream_engine.cpp
View file @
0a037a74
...
@@ -98,6 +98,8 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_,
...
@@ -98,6 +98,8 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_,
{
{
int
rc
=
tx_msg
.
init
();
int
rc
=
tx_msg
.
init
();
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
rc
=
pong_msg
.
init
();
errno_assert
(
rc
==
0
);
// Put the socket into non-blocking mode.
// Put the socket into non-blocking mode.
unblock_socket
(
s
);
unblock_socket
(
s
);
...
@@ -935,9 +937,7 @@ int zmq::stream_engine_t::decode_and_push (msg_t *msg_)
...
@@ -935,9 +937,7 @@ int zmq::stream_engine_t::decode_and_push (msg_t *msg_)
}
}
if
(
msg_
->
flags
()
&
msg_t
::
command
)
{
if
(
msg_
->
flags
()
&
msg_t
::
command
)
{
uint8_t
cmd_id
=
*
((
uint8_t
*
)
msg_
->
data
());
process_command_message
(
msg_
);
if
(
cmd_id
==
4
)
process_heartbeat_message
(
msg_
);
}
}
if
(
metadata
)
if
(
metadata
)
...
@@ -1061,11 +1061,8 @@ int zmq::stream_engine_t::produce_pong_message (msg_t *msg_)
...
@@ -1061,11 +1061,8 @@ int zmq::stream_engine_t::produce_pong_message (msg_t *msg_)
int
rc
=
0
;
int
rc
=
0
;
zmq_assert
(
mechanism
!=
NULL
);
zmq_assert
(
mechanism
!=
NULL
);
rc
=
msg_
->
init_size
(
5
);
rc
=
msg_
->
move
(
pong_msg
);
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
msg_
->
set_flags
(
msg_t
::
command
);
memcpy
(
msg_
->
data
(),
"
\4
PONG"
,
5
);
rc
=
mechanism
->
encode
(
msg_
);
rc
=
mechanism
->
encode
(
msg_
);
next_msg
=
&
stream_engine_t
::
pull_and_encode
;
next_msg
=
&
stream_engine_t
::
pull_and_encode
;
...
@@ -1088,9 +1085,39 @@ int zmq::stream_engine_t::process_heartbeat_message (msg_t *msg_)
...
@@ -1088,9 +1085,39 @@ int zmq::stream_engine_t::process_heartbeat_message (msg_t *msg_)
has_ttl_timer
=
true
;
has_ttl_timer
=
true
;
}
}
// As per ZMTP 3.1 the PING command might contain an up to 16 bytes
// context which needs to be PONGed back, so build the pong message
// here and store it. Truncate it if it's too long.
// Given the engine goes straight to out_event, sequential PINGs will
// not be a problem.
size_t
context_len
=
msg_
->
size
()
-
7
>
16
?
16
:
msg_
->
size
()
-
7
;
int
rc
=
pong_msg
.
init_size
(
5
+
context_len
);
errno_assert
(
rc
==
0
);
pong_msg
.
set_flags
(
msg_t
::
command
);
memcpy
(
pong_msg
.
data
(),
"
\4
PONG"
,
5
);
if
(
context_len
>
0
)
memcpy
(((
uint8_t
*
)
pong_msg
.
data
())
+
5
,
((
uint8_t
*
)
msg_
->
data
())
+
7
,
context_len
);
next_msg
=
&
stream_engine_t
::
produce_pong_message
;
next_msg
=
&
stream_engine_t
::
produce_pong_message
;
out_event
();
out_event
();
}
}
return
0
;
return
0
;
}
}
int
zmq
::
stream_engine_t
::
process_command_message
(
msg_t
*
msg_
)
{
uint8_t
cmd_name_size
=
*
((
uint8_t
*
)
msg_
->
data
());
// Malformed command
if
(
msg_
->
size
()
<
cmd_name_size
+
sizeof
(
cmd_name_size
))
return
-
1
;
uint8_t
*
cmd_name
=
((
uint8_t
*
)
msg_
->
data
())
+
1
;
if
(
cmd_name_size
==
4
&&
(
memcmp
(
cmd_name
,
"PING"
,
cmd_name_size
)
==
0
||
memcmp
(
cmd_name
,
"PONG"
,
cmd_name_size
)
==
0
))
return
process_heartbeat_message
(
msg_
);
return
0
;
}
src/stream_engine.hpp
View file @
0a037a74
...
@@ -127,6 +127,7 @@ class stream_engine_t : public io_object_t, public i_engine
...
@@ -127,6 +127,7 @@ class stream_engine_t : public io_object_t, public i_engine
typedef
metadata_t
::
dict_t
properties_t
;
typedef
metadata_t
::
dict_t
properties_t
;
bool
init_properties
(
properties_t
&
properties
);
bool
init_properties
(
properties_t
&
properties
);
int
process_command_message
(
msg_t
*
msg_
);
int
produce_ping_message
(
msg_t
*
msg_
);
int
produce_ping_message
(
msg_t
*
msg_
);
int
process_heartbeat_message
(
msg_t
*
msg_
);
int
process_heartbeat_message
(
msg_t
*
msg_
);
int
produce_pong_message
(
msg_t
*
msg_
);
int
produce_pong_message
(
msg_t
*
msg_
);
...
@@ -138,6 +139,8 @@ class stream_engine_t : public io_object_t, public i_engine
...
@@ -138,6 +139,8 @@ class stream_engine_t : public io_object_t, public i_engine
bool
as_server
;
bool
as_server
;
msg_t
tx_msg
;
msg_t
tx_msg
;
// Need to store PING payload for PONG
msg_t
pong_msg
;
handle_t
handle
;
handle_t
handle
;
...
...
tests/test_heartbeats.cpp
View file @
0a037a74
...
@@ -29,6 +29,19 @@ typedef SOCKET raw_socket;
...
@@ -29,6 +29,19 @@ typedef SOCKET raw_socket;
typedef
int
raw_socket
;
typedef
int
raw_socket
;
#endif
#endif
#include "testutil_unity.hpp"
void
setUp
()
{
setup_test_context
();
}
void
tearDown
()
{
teardown_test_context
();
}
// Read one event off the monitor socket; return value and address
// Read one event off the monitor socket; return value and address
// by reference, if not null, and event number by value. Returns -1
// by reference, if not null, and event number by value. Returns -1
// in case of error.
// in case of error.
...
@@ -38,24 +51,22 @@ static int get_monitor_event (void *monitor)
...
@@ -38,24 +51,22 @@ static int get_monitor_event (void *monitor)
for
(
int
i
=
0
;
i
<
2
;
i
++
)
{
for
(
int
i
=
0
;
i
<
2
;
i
++
)
{
// First frame in message contains event number and value
// First frame in message contains event number and value
zmq_msg_t
msg
;
zmq_msg_t
msg
;
int
rc
=
zmq_msg_init
(
&
msg
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_msg_init
(
&
msg
));
assert
(
rc
==
0
);
if
(
zmq_msg_recv
(
&
msg
,
monitor
,
ZMQ_DONTWAIT
)
==
-
1
)
{
if
(
zmq_msg_recv
(
&
msg
,
monitor
,
ZMQ_DONTWAIT
)
==
-
1
)
{
msleep
(
SETTLE_TIME
);
msleep
(
SETTLE_TIME
);
continue
;
// Interruped, presumably
continue
;
// Interrup
t
ed, presumably
}
}
assert
(
zmq_msg_more
(
&
msg
));
TEST_ASSERT_TRUE
(
zmq_msg_more
(
&
msg
));
uint8_t
*
data
=
(
uint8_t
*
)
zmq_msg_data
(
&
msg
);
uint8_t
*
data
=
(
uint8_t
*
)
zmq_msg_data
(
&
msg
);
uint16_t
event
=
*
(
uint16_t
*
)
(
data
);
uint16_t
event
=
*
(
uint16_t
*
)
(
data
);
// Second frame in message contains event address
// Second frame in message contains event address
rc
=
zmq_msg_init
(
&
msg
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_msg_init
(
&
msg
));
assert
(
rc
==
0
);
if
(
zmq_msg_recv
(
&
msg
,
monitor
,
0
)
==
-
1
)
{
if
(
zmq_msg_recv
(
&
msg
,
monitor
,
0
)
==
-
1
)
{
return
-
1
;
// Interruped, presumably
return
-
1
;
// Interrup
t
ed, presumably
}
}
assert
(
!
zmq_msg_more
(
&
msg
));
TEST_ASSERT_FALSE
(
zmq_msg_more
(
&
msg
));
return
event
;
return
event
;
}
}
...
@@ -66,20 +77,17 @@ static void recv_with_retry (raw_socket fd, char *buffer, int bytes)
...
@@ -66,20 +77,17 @@ static void recv_with_retry (raw_socket fd, char *buffer, int bytes)
{
{
int
received
=
0
;
int
received
=
0
;
while
(
true
)
{
while
(
true
)
{
int
rc
=
recv
(
fd
,
buffer
+
received
,
bytes
-
received
,
0
);
int
rc
=
TEST_ASSERT_SUCCESS_RAW_ERRNO
(
assert
(
rc
>
0
);
recv
(
fd
,
buffer
+
received
,
bytes
-
received
,
0
));
TEST_ASSERT_GREATER_THAN_INT
(
0
,
rc
);
received
+=
rc
;
received
+=
rc
;
assert
(
received
<=
bytes
);
TEST_ASSERT_LESS_OR_EQUAL_INT
(
bytes
,
received
);
if
(
received
==
bytes
)
if
(
received
==
bytes
)
break
;
break
;
// ZMQ_REP READY message is shorter, check the actual socket type
if
(
received
>=
3
&&
buffer
[
received
-
1
]
==
'P'
&&
buffer
[
received
-
2
]
==
'E'
&&
buffer
[
received
-
3
]
==
'R'
)
break
;
}
}
}
}
static
void
mock_handshake
(
raw_socket
fd
)
static
void
mock_handshake
(
raw_socket
fd
,
int
mock_ping
)
{
{
const
uint8_t
zmtp_greeting
[
33
]
=
{
0xff
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
const
uint8_t
zmtp_greeting
[
33
]
=
{
0xff
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0x7f
,
3
,
0
,
'N'
,
'U'
,
'L'
,
'L'
,
0
};
0x7f
,
3
,
0
,
'N'
,
'U'
,
'L'
,
'L'
,
0
};
...
@@ -87,8 +95,8 @@ static void mock_handshake (raw_socket fd)
...
@@ -87,8 +95,8 @@ static void mock_handshake (raw_socket fd)
memset
(
buffer
,
0
,
sizeof
(
buffer
));
memset
(
buffer
,
0
,
sizeof
(
buffer
));
memcpy
(
buffer
,
zmtp_greeting
,
sizeof
(
zmtp_greeting
));
memcpy
(
buffer
,
zmtp_greeting
,
sizeof
(
zmtp_greeting
));
int
rc
=
send
(
fd
,
buffer
,
64
,
0
);
int
rc
=
TEST_ASSERT_SUCCESS_RAW_ERRNO
(
send
(
fd
,
buffer
,
64
,
0
)
);
assert
(
rc
==
64
);
TEST_ASSERT_EQUAL_INT
(
64
,
rc
);
recv_with_retry
(
fd
,
buffer
,
64
);
recv_with_retry
(
fd
,
buffer
,
64
);
...
@@ -99,10 +107,40 @@ static void mock_handshake (raw_socket fd)
...
@@ -99,10 +107,40 @@ static void mock_handshake (raw_socket fd)
memset
(
buffer
,
0
,
sizeof
(
buffer
));
memset
(
buffer
,
0
,
sizeof
(
buffer
));
memcpy
(
buffer
,
zmtp_ready
,
43
);
memcpy
(
buffer
,
zmtp_ready
,
43
);
rc
=
send
(
fd
,
buffer
,
43
,
0
);
rc
=
TEST_ASSERT_SUCCESS_RAW_ERRNO
(
send
(
fd
,
buffer
,
43
,
0
)
);
assert
(
rc
==
43
);
TEST_ASSERT_EQUAL_INT
(
43
,
rc
);
// greeting
recv_with_retry
(
fd
,
buffer
,
43
);
recv_with_retry
(
fd
,
buffer
,
43
);
if
(
mock_ping
)
{
// test PING context - should be replicated in the PONG
// to avoid timeouts, do a bulk send
const
uint8_t
zmtp_ping
[
12
]
=
{
4
,
10
,
4
,
'P'
,
'I'
,
'N'
,
'G'
,
0
,
0
,
'L'
,
'O'
,
'L'
};
uint8_t
zmtp_pong
[
10
]
=
{
4
,
8
,
4
,
'P'
,
'O'
,
'N'
,
'G'
,
'L'
,
'O'
,
'L'
};
memset
(
buffer
,
0
,
sizeof
(
buffer
));
memcpy
(
buffer
,
zmtp_ping
,
12
);
rc
=
TEST_ASSERT_SUCCESS_RAW_ERRNO
(
send
(
fd
,
buffer
,
12
,
0
));
TEST_ASSERT_EQUAL_INT
(
12
,
rc
);
// test a larger body that won't fit in a small message and should get
// truncated
memset
(
buffer
,
'z'
,
sizeof
(
buffer
));
memcpy
(
buffer
,
zmtp_ping
,
12
);
buffer
[
1
]
=
65
;
rc
=
TEST_ASSERT_SUCCESS_RAW_ERRNO
(
send
(
fd
,
buffer
,
67
,
0
));
TEST_ASSERT_EQUAL_INT
(
67
,
rc
);
// small pong
recv_with_retry
(
fd
,
buffer
,
10
);
TEST_ASSERT_EQUAL_INT
(
0
,
memcmp
(
zmtp_pong
,
buffer
,
10
));
// large pong
recv_with_retry
(
fd
,
buffer
,
23
);
uint8_t
zmtp_pooong
[
65
]
=
{
4
,
21
,
4
,
'P'
,
'O'
,
'N'
,
'G'
,
'L'
,
'O'
,
'L'
};
memset
(
zmtp_pooong
+
10
,
'z'
,
55
);
TEST_ASSERT_EQUAL_INT
(
0
,
memcmp
(
zmtp_pooong
,
buffer
,
23
));
}
}
}
static
void
setup_curve
(
void
*
socket
,
int
is_server
)
static
void
setup_curve
(
void
*
socket
,
int
is_server
)
...
@@ -133,8 +171,7 @@ static void setup_curve (void *socket, int is_server)
...
@@ -133,8 +171,7 @@ static void setup_curve (void *socket, int is_server)
strlen
(
server_key
));
strlen
(
server_key
));
}
}
static
void
prep_server_socket
(
void
*
ctx
,
static
void
prep_server_socket
(
int
set_heartbeats
,
int
set_heartbeats
,
int
is_curve
,
int
is_curve
,
void
**
server_out
,
void
**
server_out
,
void
**
mon_out
,
void
**
mon_out
,
...
@@ -142,41 +179,34 @@ static void prep_server_socket (void *ctx,
...
@@ -142,41 +179,34 @@ static void prep_server_socket (void *ctx,
size_t
ep_length
,
size_t
ep_length
,
int
socket_type
)
int
socket_type
)
{
{
int
rc
;
// We'll be using this socket in raw mode
// We'll be using this socket in raw mode
void
*
server
=
zmq_socket
(
ctx
,
socket_type
);
void
*
server
=
test_context_socket
(
socket_type
);
assert
(
server
);
int
value
=
0
;
int
value
=
0
;
rc
=
zmq_setsockopt
(
server
,
ZMQ_LINGER
,
&
value
,
sizeof
(
value
));
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_setsockopt
(
server
,
ZMQ_LINGER
,
&
value
,
sizeof
(
value
))
);
if
(
set_heartbeats
)
{
if
(
set_heartbeats
)
{
value
=
50
;
value
=
50
;
rc
=
zmq_setsockopt
(
server
,
ZMQ_HEARTBEAT_IVL
,
&
value
,
sizeof
(
value
));
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_setsockopt
(
server
,
ZMQ_HEARTBEAT_IVL
,
&
value
,
sizeof
(
value
))
);
}
}
if
(
is_curve
)
if
(
is_curve
)
setup_curve
(
server
,
1
);
setup_curve
(
server
,
1
);
rc
=
zmq_bind
(
server
,
"tcp://127.0.0.1:*"
);
bind_loopback_ipv4
(
server
,
endpoint
,
ep_length
);
assert
(
rc
==
0
);
rc
=
zmq_getsockopt
(
server
,
ZMQ_LAST_ENDPOINT
,
endpoint
,
&
ep_length
);
assert
(
rc
==
0
);
// Create and connect a socket for collecting monitor events on dealer
// Create and connect a socket for collecting monitor events on dealer
void
*
server_mon
=
zmq_socket
(
ctx
,
ZMQ_PAIR
);
void
*
server_mon
=
test_context_socket
(
ZMQ_PAIR
);
assert
(
server_mon
);
rc
=
zmq_socket_monitor
(
server
,
"inproc://monitor-dealer"
,
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_socket_monitor
(
ZMQ_EVENT_CONNECTED
|
ZMQ_EVENT_DISCONNECTED
server
,
"inproc://monitor-dealer"
,
|
ZMQ_EVENT_ACCEPTED
);
ZMQ_EVENT_CONNECTED
|
ZMQ_EVENT_DISCONNECTED
|
ZMQ_EVENT_ACCEPTED
));
assert
(
rc
==
0
);
// Connect to the inproc endpoint so we'll get events
// Connect to the inproc endpoint so we'll get events
rc
=
zmq_connect
(
server_mon
,
"inproc://monitor-dealer"
);
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_connect
(
server_mon
,
"inproc://monitor-dealer"
)
);
*
server_out
=
server
;
*
server_out
=
server
;
*
mon_out
=
server_mon
;
*
mon_out
=
server_mon
;
...
@@ -185,17 +215,13 @@ static void prep_server_socket (void *ctx,
...
@@ -185,17 +215,13 @@ static void prep_server_socket (void *ctx,
// This checks for a broken TCP connection (or, in this case a stuck one
// This checks for a broken TCP connection (or, in this case a stuck one
// where the peer never responds to PINGS). There should be an accepted event
// where the peer never responds to PINGS). There should be an accepted event
// then a disconnect event.
// then a disconnect event.
static
void
test_heartbeat_timeout
(
int
server_type
)
static
void
test_heartbeat_timeout
(
int
server_type
,
int
mock_ping
)
{
{
int
rc
;
int
rc
;
char
my_endpoint
[
MAX_SOCKET_STRING
];
char
my_endpoint
[
MAX_SOCKET_STRING
];
// Set up our context and sockets
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
void
*
server
,
*
server_mon
;
void
*
server
,
*
server_mon
;
prep_server_socket
(
ctx
,
1
,
0
,
&
server
,
&
server_mon
,
my_endpoint
,
prep_server_socket
(
!
mock_ping
,
0
,
&
server
,
&
server_mon
,
my_endpoint
,
MAX_SOCKET_STRING
,
server_type
);
MAX_SOCKET_STRING
,
server_type
);
struct
sockaddr_in
ip4addr
;
struct
sockaddr_in
ip4addr
;
...
@@ -210,30 +236,27 @@ static void test_heartbeat_timeout (int server_type)
...
@@ -210,30 +236,27 @@ static void test_heartbeat_timeout (int server_type)
#endif
#endif
s
=
socket
(
AF_INET
,
SOCK_STREAM
,
IPPROTO_TCP
);
s
=
socket
(
AF_INET
,
SOCK_STREAM
,
IPPROTO_TCP
);
rc
=
connect
(
s
,
(
struct
sockaddr
*
)
&
ip4addr
,
sizeof
ip4addr
);
rc
=
TEST_ASSERT_SUCCESS_RAW_ERRNO
(
assert
(
rc
>
-
1
);
connect
(
s
,
(
struct
sockaddr
*
)
&
ip4addr
,
sizeof
ip4addr
));
TEST_ASSERT_GREATER_THAN_INT
(
-
1
,
rc
);
// Mock a ZMTP 3 client so we can forcibly time out a connection
// Mock a ZMTP 3 client so we can forcibly time out a connection
mock_handshake
(
s
);
mock_handshake
(
s
,
mock_ping
);
// By now everything should report as connected
// By now everything should report as connected
rc
=
get_monitor_event
(
server_mon
);
rc
=
get_monitor_event
(
server_mon
);
assert
(
rc
==
ZMQ_EVENT_ACCEPTED
);
TEST_ASSERT_EQUAL_INT
(
ZMQ_EVENT_ACCEPTED
,
rc
);
// We should have been disconnected
if
(
!
mock_ping
)
{
rc
=
get_monitor_event
(
server_mon
);
// We should have been disconnected
assert
(
rc
==
ZMQ_EVENT_DISCONNECTED
);
rc
=
get_monitor_event
(
server_mon
);
TEST_ASSERT_EQUAL_INT
(
ZMQ_EVENT_DISCONNECTED
,
rc
);
}
close
(
s
);
close
(
s
);
rc
=
zmq_close
(
server
);
test_context_socket_close
(
server
);
assert
(
rc
==
0
);
test_context_socket_close
(
server_mon
);
rc
=
zmq_close
(
server_mon
);
assert
(
rc
==
0
);
rc
=
zmq_ctx_term
(
ctx
);
assert
(
rc
==
0
);
}
}
// This checks that peers respect the TTL value in ping messages
// This checks that peers respect the TTL value in ping messages
...
@@ -246,52 +269,38 @@ static void test_heartbeat_ttl (int client_type, int server_type)
...
@@ -246,52 +269,38 @@ static void test_heartbeat_ttl (int client_type, int server_type)
int
rc
,
value
;
int
rc
,
value
;
char
my_endpoint
[
MAX_SOCKET_STRING
];
char
my_endpoint
[
MAX_SOCKET_STRING
];
// Set up our context and sockets
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
void
*
server
,
*
server_mon
,
*
client
;
void
*
server
,
*
server_mon
,
*
client
;
prep_server_socket
(
ctx
,
0
,
0
,
&
server
,
&
server_mon
,
my_endpoint
,
prep_server_socket
(
0
,
0
,
&
server
,
&
server_mon
,
my_endpoint
,
MAX_SOCKET_STRING
,
server_type
);
MAX_SOCKET_STRING
,
server_type
);
client
=
zmq_socket
(
ctx
,
client_type
);
client
=
test_context_socket
(
client_type
);
assert
(
client
!=
NULL
);
// Set the heartbeat TTL to 0.1 seconds
// Set the heartbeat TTL to 0.1 seconds
value
=
100
;
value
=
100
;
rc
=
zmq_setsockopt
(
client
,
ZMQ_HEARTBEAT_TTL
,
&
value
,
sizeof
(
value
));
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_setsockopt
(
client
,
ZMQ_HEARTBEAT_TTL
,
&
value
,
sizeof
(
value
))
);
// Set the heartbeat interval to much longer than the TTL so that
// Set the heartbeat interval to much longer than the TTL so that
// the socket times out oon the remote side.
// the socket times out oon the remote side.
value
=
250
;
value
=
250
;
rc
=
zmq_setsockopt
(
client
,
ZMQ_HEARTBEAT_IVL
,
&
value
,
sizeof
(
value
));
TEST_ASSERT_SUCCESS_ERRNO
(
assert
(
rc
==
0
);
zmq_setsockopt
(
client
,
ZMQ_HEARTBEAT_IVL
,
&
value
,
sizeof
(
value
))
);
rc
=
zmq_connect
(
client
,
my_endpoint
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
client
,
my_endpoint
));
assert
(
rc
==
0
);
// By now everything should report as connected
// By now everything should report as connected
rc
=
get_monitor_event
(
server_mon
);
rc
=
get_monitor_event
(
server_mon
);
assert
(
rc
==
ZMQ_EVENT_ACCEPTED
);
TEST_ASSERT_EQUAL_INT
(
ZMQ_EVENT_ACCEPTED
,
rc
);
msleep
(
SETTLE_TIME
);
msleep
(
SETTLE_TIME
);
// We should have been disconnected
// We should have been disconnected
rc
=
get_monitor_event
(
server_mon
);
rc
=
get_monitor_event
(
server_mon
);
assert
(
rc
==
ZMQ_EVENT_DISCONNECTED
);
TEST_ASSERT_EQUAL_INT
(
ZMQ_EVENT_DISCONNECTED
,
rc
);
rc
=
zmq_close
(
server
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
server_mon
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
client
);
test_context_socket_close
(
server
);
assert
(
rc
==
0
);
test_context_socket_close
(
server_mon
);
test_context_socket_close
(
client
);
rc
=
zmq_ctx_term
(
ctx
);
assert
(
rc
==
0
);
}
}
// This checks for normal operation - that is pings and pongs being
// This checks for normal operation - that is pings and pongs being
...
@@ -303,15 +312,11 @@ test_heartbeat_notimeout (int is_curve, int client_type, int server_type)
...
@@ -303,15 +312,11 @@ test_heartbeat_notimeout (int is_curve, int client_type, int server_type)
int
rc
;
int
rc
;
char
my_endpoint
[
MAX_SOCKET_STRING
];
char
my_endpoint
[
MAX_SOCKET_STRING
];
// Set up our context and sockets
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
void
*
server
,
*
server_mon
;
void
*
server
,
*
server_mon
;
prep_server_socket
(
ctx
,
1
,
is_curve
,
&
server
,
&
server_mon
,
my_endpoint
,
prep_server_socket
(
1
,
is_curve
,
&
server
,
&
server_mon
,
my_endpoint
,
MAX_SOCKET_STRING
,
server_type
);
MAX_SOCKET_STRING
,
server_type
);
void
*
client
=
zmq_socket
(
ctx
,
client_type
);
void
*
client
=
test_context_socket
(
client_type
);
if
(
is_curve
)
if
(
is_curve
)
setup_curve
(
client
,
0
);
setup_curve
(
client
,
0
);
rc
=
zmq_connect
(
client
,
my_endpoint
);
rc
=
zmq_connect
(
client
,
my_endpoint
);
...
@@ -321,48 +326,72 @@ test_heartbeat_notimeout (int is_curve, int client_type, int server_type)
...
@@ -321,48 +326,72 @@ test_heartbeat_notimeout (int is_curve, int client_type, int server_type)
// By now everything should report as connected
// By now everything should report as connected
rc
=
get_monitor_event
(
server_mon
);
rc
=
get_monitor_event
(
server_mon
);
assert
(
rc
==
ZMQ_EVENT_ACCEPTED
);
TEST_ASSERT_EQUAL_INT
(
ZMQ_EVENT_ACCEPTED
,
rc
);
// We should still be connected because pings and pongs are happenin'
// We should still be connected because pings and pongs are happenin'
rc
=
get_monitor_event
(
server_mon
);
TEST_ASSERT_EQUAL_INT
(
-
1
,
get_monitor_event
(
server_mon
));
assert
(
rc
==
-
1
);
rc
=
zmq_close
(
client
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
server
);
test_context_socket_close
(
client
);
assert
(
rc
==
0
);
test_context_socket_close
(
server
);
test_context_socket_close
(
server_mon
);
}
rc
=
zmq_close
(
server_mon
);
void
test_heartbeat_timeout_router
()
assert
(
rc
==
0
);
{
test_heartbeat_timeout
(
ZMQ_ROUTER
,
0
);
}
rc
=
zmq_ctx_term
(
ctx
);
void
test_heartbeat_timeout_router_mock_ping
()
assert
(
rc
==
0
);
{
test_heartbeat_timeout
(
ZMQ_ROUTER
,
1
);
}
}
#define DEFINE_TESTS(first, second, first_define, second_define) \
void test_heartbeat_ttl_##first##_##second () \
{ \
test_heartbeat_ttl (first_define, second_define); \
} \
void test_heartbeat_notimeout_##first##_##second () \
{ \
test_heartbeat_notimeout (0, first_define, second_define); \
} \
void test_heartbeat_notimeout_##first##_##second##_with_curve () \
{ \
test_heartbeat_notimeout (1, first_define, second_define); \
}
DEFINE_TESTS
(
dealer
,
router
,
ZMQ_DEALER
,
ZMQ_ROUTER
)
DEFINE_TESTS
(
req
,
rep
,
ZMQ_REQ
,
ZMQ_REP
)
DEFINE_TESTS
(
pull
,
push
,
ZMQ_PULL
,
ZMQ_PUSH
)
DEFINE_TESTS
(
sub
,
pub
,
ZMQ_SUB
,
ZMQ_PUB
)
DEFINE_TESTS
(
pair
,
pair
,
ZMQ_PAIR
,
ZMQ_PAIR
)
int
main
(
void
)
int
main
(
void
)
{
{
setup_test_environment
();
setup_test_environment
();
test_heartbeat_timeout
(
ZMQ_ROUTER
);
UNITY_BEGIN
();
test_heartbeat_timeout
(
ZMQ_REP
);
//RUN_TEST (test_heartbeat_timeout_router);
test_heartbeat_ttl
(
ZMQ_DEALER
,
ZMQ_ROUTER
);
RUN_TEST
(
test_heartbeat_timeout_router_mock_ping
);
test_heartbeat_ttl
(
ZMQ_REQ
,
ZMQ_REP
);
test_heartbeat_ttl
(
ZMQ_PULL
,
ZMQ_PUSH
);
//RUN_TEST (test_heartbeat_ttl_dealer_router);
test_heartbeat_ttl
(
ZMQ_SUB
,
ZMQ_PUB
);
//RUN_TEST (test_heartbeat_ttl_req_rep);
test_heartbeat_ttl
(
ZMQ_PAIR
,
ZMQ_PAIR
);
//RUN_TEST (test_heartbeat_ttl_pull_push);
//RUN_TEST (test_heartbeat_ttl_sub_pub);
// Run this test without curve
//RUN_TEST (test_heartbeat_ttl_pair_pair);
test_heartbeat_notimeout
(
0
,
ZMQ_DEALER
,
ZMQ_ROUTER
);
test_heartbeat_notimeout
(
0
,
ZMQ_REQ
,
ZMQ_REP
);
//RUN_TEST (test_heartbeat_notimeout_dealer_router);
test_heartbeat_notimeout
(
0
,
ZMQ_PULL
,
ZMQ_PUSH
);
//RUN_TEST (test_heartbeat_notimeout_req_rep);
test_heartbeat_notimeout
(
0
,
ZMQ_SUB
,
ZMQ_PUB
);
//RUN_TEST (test_heartbeat_notimeout_pull_push);
test_heartbeat_notimeout
(
0
,
ZMQ_PAIR
,
ZMQ_PAIR
);
//RUN_TEST (test_heartbeat_notimeout_sub_pub);
// Then rerun it with curve
//RUN_TEST (test_heartbeat_notimeout_pair_pair);
test_heartbeat_notimeout
(
1
,
ZMQ_DEALER
,
ZMQ_ROUTER
);
test_heartbeat_notimeout
(
1
,
ZMQ_REQ
,
ZMQ_REP
);
//RUN_TEST (test_heartbeat_notimeout_dealer_router_with_curve);
test_heartbeat_notimeout
(
1
,
ZMQ_PULL
,
ZMQ_PUSH
);
//RUN_TEST (test_heartbeat_notimeout_req_rep_with_curve);
test_heartbeat_notimeout
(
1
,
ZMQ_SUB
,
ZMQ_PUB
);
//RUN_TEST (test_heartbeat_notimeout_pull_push_with_curve);
test_heartbeat_notimeout
(
1
,
ZMQ_PAIR
,
ZMQ_PAIR
);
//RUN_TEST (test_heartbeat_notimeout_sub_pub_with_curve);
//RUN_TEST (test_heartbeat_notimeout_pair_pair_with_curve);
return
UNITY_END
();
}
}
tests/testutil_unity.hpp
View file @
0a037a74
...
@@ -57,12 +57,37 @@ int test_assert_success_message_errno_helper (int rc,
...
@@ -57,12 +57,37 @@ int test_assert_success_message_errno_helper (int rc,
return
rc
;
return
rc
;
}
}
int
test_assert_success_message_raw_errno_helper
(
int
rc
,
const
char
*
msg
,
const
char
*
expr
)
{
if
(
rc
==
-
1
)
{
#if defined ZMQ_HAVE_WINDOWS
int
current_errno
=
WSAGetLastError
();
#else
int
current_errno
=
errno
;
#endif
char
buffer
[
512
];
buffer
[
sizeof
(
buffer
)
-
1
]
=
0
;
// to ensure defined behavior with VC++ <= 2013
snprintf
(
buffer
,
sizeof
(
buffer
)
-
1
,
"%s failed%s%s%s, errno = %i"
,
expr
,
msg
?
" (additional info: "
:
""
,
msg
?
msg
:
""
,
msg
?
")"
:
""
,
current_errno
);
TEST_FAIL_MESSAGE
(
buffer
);
}
return
rc
;
}
#define TEST_ASSERT_SUCCESS_MESSAGE_ERRNO(expr, msg) \
#define TEST_ASSERT_SUCCESS_MESSAGE_ERRNO(expr, msg) \
test_assert_success_message_errno_helper (expr, msg, #expr)
test_assert_success_message_errno_helper (expr, msg, #expr)
#define TEST_ASSERT_SUCCESS_ERRNO(expr) \
#define TEST_ASSERT_SUCCESS_ERRNO(expr) \
test_assert_success_message_errno_helper (expr, NULL, #expr)
test_assert_success_message_errno_helper (expr, NULL, #expr)
#define TEST_ASSERT_SUCCESS_RAW_ERRNO(expr) \
test_assert_success_message_raw_errno_helper (expr, NULL, #expr)
#define TEST_ASSERT_FAILURE_ERRNO(error_code, expr) \
#define TEST_ASSERT_FAILURE_ERRNO(error_code, expr) \
{ \
{ \
int rc = (expr); \
int rc = (expr); \
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment