Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
08622a77
Commit
08622a77
authored
Jul 05, 2013
by
Pieter Hintjens
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #609 from ckamm/tests
Make pipeline/reqrep tests try tcp:// endpoints.
parents
5038ef74
dfba19c4
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
300 additions
and
154 deletions
+300
-154
test_spec_dealer.cpp
tests/test_spec_dealer.cpp
+59
-39
test_spec_pushpull.cpp
tests/test_spec_pushpull.cpp
+94
-38
test_spec_rep.cpp
tests/test_spec_rep.cpp
+36
-22
test_spec_req.cpp
tests/test_spec_req.cpp
+47
-31
test_spec_router.cpp
tests/test_spec_router.cpp
+53
-24
testutil.hpp
tests/testutil.hpp
+11
-0
No files found.
tests/test_spec_dealer.cpp
View file @
08622a77
...
...
@@ -21,12 +21,15 @@
#include <stdlib.h>
#include "testutil.hpp"
const
char
*
bind_address
=
0
;
const
char
*
connect_address
=
0
;
void
test_round_robin_out
(
void
*
ctx
)
{
void
*
dealer
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
assert
(
dealer
);
int
rc
=
zmq_bind
(
dealer
,
"inproc://b"
);
int
rc
=
zmq_bind
(
dealer
,
bind_address
);
assert
(
rc
==
0
);
const
size_t
N
=
5
;
...
...
@@ -40,10 +43,14 @@ void test_round_robin_out (void *ctx)
rc
=
zmq_setsockopt
(
rep
[
i
],
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
int
));
assert
(
rc
==
0
);
rc
=
zmq_connect
(
rep
[
i
],
"inproc://b"
);
rc
=
zmq_connect
(
rep
[
i
],
connect_address
);
assert
(
rc
==
0
);
}
// Wait for connections.
rc
=
zmq_poll
(
0
,
0
,
100
);
assert
(
rc
==
0
);
// Send N requests
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
{
...
...
@@ -62,14 +69,16 @@ void test_round_robin_out (void *ctx)
rc
=
zmq_msg_close
(
&
msg
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
dealer
);
assert
(
rc
==
0
);
close_zero_linger
(
dealer
);
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
{
rc
=
zmq_close
(
rep
[
i
]);
assert
(
rc
==
0
);
close_zero_linger
(
rep
[
i
]);
}
// Wait for disconnects.
rc
=
zmq_poll
(
0
,
0
,
100
);
assert
(
rc
==
0
);
}
void
test_fair_queue_in
(
void
*
ctx
)
...
...
@@ -81,7 +90,7 @@ void test_fair_queue_in (void *ctx)
int
rc
=
zmq_setsockopt
(
receiver
,
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
int
));
assert
(
rc
==
0
);
rc
=
zmq_bind
(
receiver
,
"inproc://a"
);
rc
=
zmq_bind
(
receiver
,
bind_address
);
assert
(
rc
==
0
);
const
size_t
N
=
5
;
...
...
@@ -94,7 +103,7 @@ void test_fair_queue_in (void *ctx)
rc
=
zmq_setsockopt
(
senders
[
i
],
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
int
));
assert
(
rc
==
0
);
rc
=
zmq_connect
(
senders
[
i
],
"inproc://a"
);
rc
=
zmq_connect
(
senders
[
i
],
connect_address
);
assert
(
rc
==
0
);
}
...
...
@@ -111,32 +120,32 @@ void test_fair_queue_in (void *ctx)
// send N requests
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
{
char
*
str
=
strdup
(
"A"
);
str
[
0
]
+=
i
;
s_send_seq
(
senders
[
i
],
str
,
SEQ_END
);
free
(
str
);
s_send_seq
(
senders
[
i
],
"B"
,
SEQ_END
);
}
// Wait for data.
rc
=
zmq_poll
(
0
,
0
,
50
);
assert
(
rc
==
0
);
// handle N requests
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
{
char
*
str
=
strdup
(
"A"
);
str
[
0
]
+=
i
;
s_recv_seq
(
receiver
,
str
,
SEQ_END
);
free
(
str
);
s_recv_seq
(
receiver
,
"B"
,
SEQ_END
);
}
rc
=
zmq_msg_close
(
&
msg
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
receiver
);
assert
(
rc
==
0
);
close_zero_linger
(
receiver
);
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
{
rc
=
zmq_close
(
senders
[
i
]);
assert
(
rc
==
0
);
close_zero_linger
(
senders
[
i
]);
}
// Wait for disconnects.
rc
=
zmq_poll
(
0
,
0
,
100
);
assert
(
rc
==
0
);
}
void
test_destroy_queue_on_disconnect
(
void
*
ctx
)
...
...
@@ -144,26 +153,28 @@ void test_destroy_queue_on_disconnect (void *ctx)
void
*
A
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
assert
(
A
);
int
rc
=
zmq_bind
(
A
,
"inproc://d"
);
int
rc
=
zmq_bind
(
A
,
bind_address
);
assert
(
rc
==
0
);
void
*
B
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
assert
(
B
);
rc
=
zmq_connect
(
B
,
"inproc://d"
);
rc
=
zmq_connect
(
B
,
connect_address
);
assert
(
rc
==
0
);
// Send a message in both directions
s_send_seq
(
A
,
"ABC"
,
SEQ_END
);
s_send_seq
(
B
,
"DEF"
,
SEQ_END
);
rc
=
zmq_disconnect
(
B
,
"inproc://d"
);
rc
=
zmq_disconnect
(
B
,
connect_address
);
assert
(
rc
==
0
);
// Disconnect may take time and need command processing.
zmq_pollitem_t
poller
[
2
]
=
{
{
A
,
0
,
0
,
0
},
{
B
,
0
,
0
,
0
}
};
rc
=
zmq_poll
(
poller
,
2
,
100
);
assert
(
rc
==
0
);
rc
=
zmq_poll
(
poller
,
2
,
100
);
assert
(
rc
==
0
);
// No messages should be available, sending should fail.
zmq_msg_t
msg
;
...
...
@@ -178,7 +189,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
assert
(
errno
==
EAGAIN
);
// After a reconnect of B, the messages should still be gone
rc
=
zmq_connect
(
B
,
"inproc://d"
);
rc
=
zmq_connect
(
B
,
connect_address
);
assert
(
rc
==
0
);
rc
=
zmq_msg_recv
(
&
msg
,
A
,
ZMQ_DONTWAIT
);
...
...
@@ -192,10 +203,11 @@ void test_destroy_queue_on_disconnect (void *ctx)
rc
=
zmq_msg_close
(
&
msg
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
A
);
assert
(
rc
==
0
);
close_zero_linger
(
A
);
close_zero_linger
(
B
);
rc
=
zmq_close
(
B
);
// Wait for disconnects.
rc
=
zmq_poll
(
0
,
0
,
100
);
assert
(
rc
==
0
);
}
...
...
@@ -225,21 +237,29 @@ int main ()
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
// SHALL route outgoing messages to available peers using a round-robin
// strategy.
test_round_robin_out
(
ctx
);
const
char
*
binds
[]
=
{
"inproc://a"
,
"tcp://*:5555"
};
const
char
*
connects
[]
=
{
"inproc://a"
,
"tcp://localhost:5555"
};
for
(
int
i
=
0
;
i
<
2
;
++
i
)
{
bind_address
=
binds
[
i
];
connect_address
=
connects
[
i
];
// SHALL receive incoming messages from its peers using a fair-queuing
// strategy.
test_fair_queue_in
(
ctx
);
// SHALL route outgoing messages to available peers using a round-robin
// strategy.
test_round_robin_out
(
ctx
);
// SHALL block on sending, or return a suitable error, when it has no connected peers.
test_block_on_send_no_peers
(
ctx
);
// SHALL receive incoming messages from its peers using a fair-queuing
// strategy.
test_fair_queue_in
(
ctx
);
// SHALL create a double queue when a peer connects to it. If this peer
// disconnects, the DEALER socket SHALL destroy its double queue and SHALL
// discard any messages it contains.
test_destroy_queue_on_disconnect
(
ctx
);
// SHALL block on sending, or return a suitable error, when it has no connected peers.
test_block_on_send_no_peers
(
ctx
);
// SHALL create a double queue when a peer connects to it. If this peer
// disconnects, the DEALER socket SHALL destroy its double queue and SHALL
// discard any messages it contains.
test_destroy_queue_on_disconnect
(
ctx
);
}
int
rc
=
zmq_ctx_term
(
ctx
);
assert
(
rc
==
0
);
...
...
tests/test_spec_pushpull.cpp
View file @
08622a77
...
...
@@ -21,12 +21,15 @@
#include <stdlib.h>
#include "testutil.hpp"
const
char
*
bind_address
=
0
;
const
char
*
connect_address
=
0
;
void
test_push_round_robin_out
(
void
*
ctx
)
{
void
*
push
=
zmq_socket
(
ctx
,
ZMQ_PUSH
);
assert
(
push
);
int
rc
=
zmq_bind
(
push
,
"inproc://b"
);
int
rc
=
zmq_bind
(
push
,
bind_address
);
assert
(
rc
==
0
);
const
size_t
N
=
5
;
...
...
@@ -40,10 +43,14 @@ void test_push_round_robin_out (void *ctx)
rc
=
zmq_setsockopt
(
pulls
[
i
],
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
int
));
assert
(
rc
==
0
);
rc
=
zmq_connect
(
pulls
[
i
],
"inproc://b"
);
rc
=
zmq_connect
(
pulls
[
i
],
connect_address
);
assert
(
rc
==
0
);
}
// Wait for connections.
rc
=
zmq_poll
(
0
,
0
,
100
);
assert
(
rc
==
0
);
// Send 2N messages
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
{
...
...
@@ -61,14 +68,16 @@ void test_push_round_robin_out (void *ctx)
s_recv_seq
(
pulls
[
i
],
"DEF"
,
SEQ_END
);
}
rc
=
zmq_close
(
push
);
assert
(
rc
==
0
);
close_zero_linger
(
push
);
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
{
rc
=
zmq_close
(
pulls
[
i
]);
assert
(
rc
==
0
);
close_zero_linger
(
pulls
[
i
]);
}
// Wait for disconnects.
rc
=
zmq_poll
(
0
,
0
,
100
);
assert
(
rc
==
0
);
}
void
test_pull_fair_queue_in
(
void
*
ctx
)
...
...
@@ -76,7 +85,7 @@ void test_pull_fair_queue_in (void *ctx)
void
*
pull
=
zmq_socket
(
ctx
,
ZMQ_PULL
);
assert
(
pull
);
int
rc
=
zmq_bind
(
pull
,
"inproc://a"
);
int
rc
=
zmq_bind
(
pull
,
bind_address
);
assert
(
rc
==
0
);
const
size_t
N
=
5
;
...
...
@@ -86,38 +95,74 @@ void test_pull_fair_queue_in (void *ctx)
pushs
[
i
]
=
zmq_socket
(
ctx
,
ZMQ_PUSH
);
assert
(
pushs
[
i
]);
rc
=
zmq_connect
(
pushs
[
i
],
"inproc://a"
);
rc
=
zmq_connect
(
pushs
[
i
],
connect_address
);
assert
(
rc
==
0
);
}
// Wait for connections.
rc
=
zmq_poll
(
0
,
0
,
100
);
assert
(
rc
==
0
);
int
first_half
=
0
;
int
second_half
=
0
;
// Send 2N messages
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
{
char
*
str
=
strdup
(
"A"
);
char
*
str
=
strdup
(
"A"
);
str
[
0
]
+=
i
;
s_send_seq
(
pushs
[
i
],
str
,
SEQ_END
);
first_half
+=
str
[
0
];
str
[
0
]
+=
N
;
s_send_seq
(
pushs
[
i
],
str
,
SEQ_END
);
second_half
+=
str
[
0
];
free
(
str
);
}
// Expect to pull them in order
for
(
size_t
i
=
0
;
i
<
2
*
N
;
++
i
)
// Wait for data.
rc
=
zmq_poll
(
0
,
0
,
100
);
assert
(
rc
==
0
);
zmq_msg_t
msg
;
rc
=
zmq_msg_init
(
&
msg
);
assert
(
rc
==
0
);
// Expect to pull one from each first
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
{
char
*
str
=
strdup
(
"A"
);
str
[
0
]
+=
i
;
s_recv_seq
(
pull
,
str
,
SEQ_END
);
f
ree
(
str
)
;
rc
=
zmq_msg_recv
(
&
msg
,
pull
,
0
);
assert
(
rc
==
2
)
;
const
char
*
str
=
(
const
char
*
)
zmq_msg_data
(
&
msg
);
f
irst_half
-=
str
[
0
]
;
}
assert
(
first_half
==
0
);
rc
=
zmq_close
(
pull
);
// And then get the second batch
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
{
rc
=
zmq_msg_recv
(
&
msg
,
pull
,
0
);
assert
(
rc
==
2
);
const
char
*
str
=
(
const
char
*
)
zmq_msg_data
(
&
msg
);
second_half
-=
str
[
0
];
}
assert
(
second_half
==
0
);
rc
=
zmq_msg_close
(
&
msg
);
assert
(
rc
==
0
);
close_zero_linger
(
pull
);
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
{
rc
=
zmq_close
(
pushs
[
i
]);
assert
(
rc
==
0
);
close_zero_linger
(
pushs
[
i
]);
}
// Wait for disconnects.
rc
=
zmq_poll
(
0
,
0
,
100
);
assert
(
rc
==
0
);
}
void
test_push_block_on_send_no_peers
(
void
*
ctx
)
...
...
@@ -150,7 +195,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
int
rc
=
zmq_setsockopt
(
A
,
ZMQ_SNDHWM
,
&
hwm
,
sizeof
(
hwm
));
assert
(
rc
==
0
);
rc
=
zmq_bind
(
A
,
"inproc://d"
);
rc
=
zmq_bind
(
A
,
bind_address
);
assert
(
rc
==
0
);
void
*
B
=
zmq_socket
(
ctx
,
ZMQ_PULL
);
...
...
@@ -159,7 +204,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
rc
=
zmq_setsockopt
(
B
,
ZMQ_RCVHWM
,
&
hwm
,
sizeof
(
hwm
));
assert
(
rc
==
0
);
rc
=
zmq_connect
(
B
,
"inproc://d"
);
rc
=
zmq_connect
(
B
,
connect_address
);
assert
(
rc
==
0
);
// Send two messages, one should be stuck in A's outgoing queue, the other
...
...
@@ -172,13 +217,15 @@ void test_destroy_queue_on_disconnect (void *ctx)
assert
(
rc
==
-
1
);
assert
(
errno
==
EAGAIN
);
rc
=
zmq_disconnect
(
B
,
"inproc://d"
);
rc
=
zmq_disconnect
(
B
,
connect_address
);
assert
(
rc
==
0
);
// Disconnect may take time and need command processing.
zmq_pollitem_t
poller
[
2
]
=
{
{
A
,
0
,
0
,
0
},
{
B
,
0
,
0
,
0
}
};
rc
=
zmq_poll
(
poller
,
2
,
100
);
assert
(
rc
==
0
);
rc
=
zmq_poll
(
poller
,
2
,
100
);
assert
(
rc
==
0
);
zmq_msg_t
msg
;
rc
=
zmq_msg_init
(
&
msg
);
...
...
@@ -195,7 +242,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
assert
(
errno
==
EAGAIN
);
// Reconnect B
rc
=
zmq_connect
(
B
,
"inproc://d"
);
rc
=
zmq_connect
(
B
,
connect_address
);
assert
(
rc
==
0
);
// Still can't receive old data on B.
...
...
@@ -214,10 +261,11 @@ void test_destroy_queue_on_disconnect (void *ctx)
rc
=
zmq_msg_close
(
&
msg
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
A
);
assert
(
rc
==
0
);
close_zero_linger
(
A
);
close_zero_linger
(
B
);
rc
=
zmq_close
(
B
);
// Wait for disconnects.
rc
=
zmq_poll
(
0
,
0
,
100
);
assert
(
rc
==
0
);
}
...
...
@@ -226,22 +274,30 @@ int main ()
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
// PUSH: SHALL route outgoing messages to connected peers using a
// round-robin strategy.
test_push_round_robin_out
(
ctx
);
const
char
*
binds
[]
=
{
"inproc://a"
,
"tcp://*:5555"
};
const
char
*
connects
[]
=
{
"inproc://a"
,
"tcp://localhost:5555"
};
for
(
int
i
=
0
;
i
<
2
;
++
i
)
{
bind_address
=
binds
[
i
];
connect_address
=
connects
[
i
];
// PULL: SHALL receive incoming messages from its peers using a fair-queuing
//
strategy.
test_pull_fair_queue_in
(
ctx
);
// PUSH: SHALL route outgoing messages to connected peers using a
// round-robin
strategy.
test_push_round_robin_out
(
ctx
);
// PUSH: SHALL block on sending, or return a suitable error, when it has no
// available peers
.
test_push_block_on_send_no_peers
(
ctx
);
// PULL: SHALL receive incoming messages from its peers using a fair-queuing
// strategy
.
test_pull_fair_queue_in
(
ctx
);
// PUSH and PULL: SHALL create this queue when a peer connects to it. If
// this peer disconnects, the socket SHALL destroy its queue and SHALL
// discard any messages it contains.
test_destroy_queue_on_disconnect
(
ctx
);
// PUSH: SHALL block on sending, or return a suitable error, when it has no
// available peers.
test_push_block_on_send_no_peers
(
ctx
);
// PUSH and PULL: SHALL create this queue when a peer connects to it. If
// this peer disconnects, the socket SHALL destroy its queue and SHALL
// discard any messages it contains.
test_destroy_queue_on_disconnect
(
ctx
);
}
int
rc
=
zmq_ctx_term
(
ctx
);
assert
(
rc
==
0
);
...
...
tests/test_spec_rep.cpp
View file @
08622a77
...
...
@@ -21,6 +21,9 @@
#include <stdlib.h>
#include "testutil.hpp"
const
char
*
bind_address
=
0
;
const
char
*
connect_address
=
0
;
void
test_fair_queue_in
(
void
*
ctx
)
{
void
*
rep
=
zmq_socket
(
ctx
,
ZMQ_REP
);
...
...
@@ -30,7 +33,7 @@ void test_fair_queue_in (void *ctx)
int
rc
=
zmq_setsockopt
(
rep
,
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
int
));
assert
(
rc
==
0
);
rc
=
zmq_bind
(
rep
,
"inproc://a"
);
rc
=
zmq_bind
(
rep
,
bind_address
);
assert
(
rc
==
0
);
const
size_t
N
=
5
;
...
...
@@ -43,7 +46,7 @@ void test_fair_queue_in (void *ctx)
rc
=
zmq_setsockopt
(
reqs
[
i
],
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
int
));
assert
(
rc
==
0
);
rc
=
zmq_connect
(
reqs
[
i
],
"inproc://a"
);
rc
=
zmq_connect
(
reqs
[
i
],
connect_address
);
assert
(
rc
==
0
);
}
...
...
@@ -77,14 +80,16 @@ void test_fair_queue_in (void *ctx)
free
(
str
);
}
rc
=
zmq_close
(
rep
);
assert
(
rc
==
0
);
close_zero_linger
(
rep
);
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
{
rc
=
zmq_close
(
reqs
[
i
]);
assert
(
rc
==
0
);
close_zero_linger
(
reqs
[
i
]);
}
// Wait for disconnects.
rc
=
zmq_poll
(
0
,
0
,
100
);
assert
(
rc
==
0
);
}
void
test_envelope
(
void
*
ctx
)
...
...
@@ -92,13 +97,13 @@ void test_envelope (void *ctx)
void
*
rep
=
zmq_socket
(
ctx
,
ZMQ_REP
);
assert
(
rep
);
int
rc
=
zmq_bind
(
rep
,
"inproc://b"
);
int
rc
=
zmq_bind
(
rep
,
bind_address
);
assert
(
rc
==
0
);
void
*
dealer
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
assert
(
dealer
);
rc
=
zmq_connect
(
dealer
,
"inproc://b"
);
rc
=
zmq_connect
(
dealer
,
connect_address
);
assert
(
rc
==
0
);
// minimal envelope
...
...
@@ -113,10 +118,11 @@ void test_envelope (void *ctx)
s_send_seq
(
rep
,
"A"
,
SEQ_END
);
s_recv_seq
(
dealer
,
"X"
,
"Y"
,
0
,
"A"
,
SEQ_END
);
rc
=
zmq_close
(
rep
);
assert
(
rc
==
0
);
close_zero_linger
(
rep
);
close_zero_linger
(
dealer
);
rc
=
zmq_close
(
dealer
);
// Wait for disconnects.
rc
=
zmq_poll
(
0
,
0
,
100
);
assert
(
rc
==
0
);
}
...
...
@@ -125,17 +131,25 @@ int main ()
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
// SHALL receive incoming messages from its peers using a fair-queuing
// strategy.
test_fair_queue_in
(
ctx
);
// For an incoming message:
// SHALL remove and store the address envelope, including the delimiter.
// SHALL pass the remaining data frames to its calling application.
// SHALL wait for a single reply message from its calling application.
// SHALL prepend the address envelope and delimiter.
// SHALL deliver this message back to the originating peer.
test_envelope
(
ctx
);
const
char
*
binds
[]
=
{
"inproc://a"
,
"tcp://*:5555"
};
const
char
*
connects
[]
=
{
"inproc://a"
,
"tcp://localhost:5555"
};
for
(
int
i
=
0
;
i
<
2
;
++
i
)
{
bind_address
=
binds
[
i
];
connect_address
=
connects
[
i
];
// SHALL receive incoming messages from its peers using a fair-queuing
// strategy.
test_fair_queue_in
(
ctx
);
// For an incoming message:
// SHALL remove and store the address envelope, including the delimiter.
// SHALL pass the remaining data frames to its calling application.
// SHALL wait for a single reply message from its calling application.
// SHALL prepend the address envelope and delimiter.
// SHALL deliver this message back to the originating peer.
test_envelope
(
ctx
);
}
int
rc
=
zmq_ctx_term
(
ctx
);
assert
(
rc
==
0
);
...
...
tests/test_spec_req.cpp
View file @
08622a77
...
...
@@ -20,12 +20,15 @@
#include <stdio.h>
#include "testutil.hpp"
const
char
*
bind_address
=
0
;
const
char
*
connect_address
=
0
;
void
test_round_robin_out
(
void
*
ctx
)
{
void
*
req
=
zmq_socket
(
ctx
,
ZMQ_REQ
);
assert
(
req
);
int
rc
=
zmq_bind
(
req
,
"inproc://b"
);
int
rc
=
zmq_bind
(
req
,
bind_address
);
assert
(
rc
==
0
);
const
size_t
N
=
5
;
...
...
@@ -39,7 +42,7 @@ void test_round_robin_out (void *ctx)
rc
=
zmq_setsockopt
(
rep
[
i
],
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
int
));
assert
(
rc
==
0
);
rc
=
zmq_connect
(
rep
[
i
],
"inproc://b"
);
rc
=
zmq_connect
(
rep
[
i
],
connect_address
);
assert
(
rc
==
0
);
}
...
...
@@ -52,14 +55,16 @@ void test_round_robin_out (void *ctx)
s_recv_seq
(
req
,
"DEF"
,
SEQ_END
);
}
rc
=
zmq_close
(
req
);
assert
(
rc
==
0
);
close_zero_linger
(
req
);
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
{
rc
=
zmq_close
(
rep
[
i
]);
assert
(
rc
==
0
);
close_zero_linger
(
rep
[
i
]);
}
// Wait for disconnects.
rc
=
zmq_poll
(
0
,
0
,
100
);
assert
(
rc
==
0
);
}
void
test_req_only_listens_to_current_peer
(
void
*
ctx
)
...
...
@@ -70,7 +75,7 @@ void test_req_only_listens_to_current_peer (void *ctx)
int
rc
=
zmq_setsockopt
(
req
,
ZMQ_IDENTITY
,
"A"
,
2
);
assert
(
rc
==
0
);
rc
=
zmq_bind
(
req
,
"inproc://c"
);
rc
=
zmq_bind
(
req
,
bind_address
);
assert
(
rc
==
0
);
const
size_t
N
=
3
;
...
...
@@ -88,7 +93,7 @@ void test_req_only_listens_to_current_peer (void *ctx)
rc
=
zmq_setsockopt
(
router
[
i
],
ZMQ_ROUTER_MANDATORY
,
&
enabled
,
sizeof
(
enabled
));
assert
(
rc
==
0
);
rc
=
zmq_connect
(
router
[
i
],
"inproc://c"
);
rc
=
zmq_connect
(
router
[
i
],
connect_address
);
assert
(
rc
==
0
);
}
...
...
@@ -111,14 +116,16 @@ void test_req_only_listens_to_current_peer (void *ctx)
s_recv_seq
(
req
,
"GOOD"
,
SEQ_END
);
}
rc
=
zmq_close
(
req
);
assert
(
rc
==
0
);
close_zero_linger
(
req
);
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
{
rc
=
zmq_close
(
router
[
i
]);
assert
(
rc
==
0
);
close_zero_linger
(
router
[
i
]);
}
// Wait for disconnects.
rc
=
zmq_poll
(
0
,
0
,
100
);
assert
(
rc
==
0
);
}
void
test_req_message_format
(
void
*
ctx
)
...
...
@@ -129,10 +136,10 @@ void test_req_message_format (void *ctx)
void
*
router
=
zmq_socket
(
ctx
,
ZMQ_ROUTER
);
assert
(
router
);
int
rc
=
zmq_bind
(
req
,
"inproc://a"
);
int
rc
=
zmq_bind
(
req
,
bind_address
);
assert
(
rc
==
0
);
rc
=
zmq_connect
(
router
,
"inproc://a"
);
rc
=
zmq_connect
(
router
,
connect_address
);
assert
(
rc
==
0
);
// Send a multi-part request.
...
...
@@ -172,10 +179,11 @@ void test_req_message_format (void *ctx)
rc
=
zmq_msg_close
(
&
peer_id_msg
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
req
);
assert
(
rc
==
0
);
close_zero_linger
(
req
);
close_zero_linger
(
router
);
rc
=
zmq_close
(
router
);
// Wait for disconnects.
rc
=
zmq_poll
(
0
,
0
,
100
);
assert
(
rc
==
0
);
}
...
...
@@ -205,23 +213,31 @@ int main ()
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
// SHALL route outgoing messages to connected peers using a round-robin
// strategy.
test_round_robin_out
(
ctx
);
const
char
*
binds
[]
=
{
"inproc://a"
,
"tcp://*:5555"
};
const
char
*
connects
[]
=
{
"inproc://a"
,
"tcp://localhost:5555"
};
// The request and reply messages SHALL have this format on the wire:
// * A delimiter, consisting of an empty frame, added by the REQ socket.
// * One or more data frames, comprising the message visible to the
// application.
test_req_message_format
(
ctx
);
for
(
int
i
=
0
;
i
<
2
;
++
i
)
{
bind_address
=
binds
[
i
];
connect_address
=
connects
[
i
];
// SHALL block on sending, or return a suitable error, when it has no connected peers.
test_block_on_send_no_peers
(
ctx
);
// SHALL route outgoing messages to connected peers using a round-robin
// strategy.
test_round_robin_out
(
ctx
);
// SHALL accept an incoming message only from the last peer that it sent a
// request to.
// SHALL discard silently any messages received from other peers.
test_req_only_listens_to_current_peer
(
ctx
);
// The request and reply messages SHALL have this format on the wire:
// * A delimiter, consisting of an empty frame, added by the REQ socket.
// * One or more data frames, comprising the message visible to the
// application.
test_req_message_format
(
ctx
);
// SHALL block on sending, or return a suitable error, when it has no connected peers.
test_block_on_send_no_peers
(
ctx
);
// SHALL accept an incoming message only from the last peer that it sent a
// request to.
// SHALL discard silently any messages received from other peers.
test_req_only_listens_to_current_peer
(
ctx
);
}
int
rc
=
zmq_ctx_term
(
ctx
);
assert
(
rc
==
0
);
...
...
tests/test_spec_router.cpp
View file @
08622a77
...
...
@@ -21,6 +21,9 @@
#include <stdlib.h>
#include "testutil.hpp"
const
char
*
bind_address
=
0
;
const
char
*
connect_address
=
0
;
void
test_fair_queue_in
(
void
*
ctx
)
{
void
*
receiver
=
zmq_socket
(
ctx
,
ZMQ_ROUTER
);
...
...
@@ -30,7 +33,7 @@ void test_fair_queue_in (void *ctx)
int
rc
=
zmq_setsockopt
(
receiver
,
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
int
));
assert
(
rc
==
0
);
rc
=
zmq_bind
(
receiver
,
"inproc://a"
);
rc
=
zmq_bind
(
receiver
,
bind_address
);
assert
(
rc
==
0
);
const
size_t
N
=
5
;
...
...
@@ -49,7 +52,7 @@ void test_fair_queue_in (void *ctx)
assert
(
rc
==
0
);
free
(
str
);
rc
=
zmq_connect
(
senders
[
i
],
"inproc://a"
);
rc
=
zmq_connect
(
senders
[
i
],
connect_address
);
assert
(
rc
==
0
);
}
...
...
@@ -63,32 +66,43 @@ void test_fair_queue_in (void *ctx)
s_send_seq
(
senders
[
0
],
"M"
,
SEQ_END
);
s_recv_seq
(
receiver
,
"A"
,
"M"
,
SEQ_END
);
int
sum
=
0
;
// send N requests
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
{
s_send_seq
(
senders
[
i
],
"M"
,
SEQ_END
);
sum
+=
'A'
+
i
;
}
assert
(
sum
==
N
*
'A'
+
N
*
(
N
-
1
)
/
2
);
// handle N requests
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
{
char
*
str
=
strdup
(
"A"
);
str
[
0
]
+=
i
;
s_recv_seq
(
receiver
,
str
,
"M"
,
SEQ_END
);
free
(
str
);
rc
=
zmq_msg_recv
(
&
msg
,
receiver
,
0
);
assert
(
rc
==
2
);
const
char
*
id
=
(
const
char
*
)
zmq_msg_data
(
&
msg
);
sum
-=
id
[
0
];
s_recv_seq
(
receiver
,
"M"
,
SEQ_END
);
}
assert
(
sum
==
0
);
rc
=
zmq_msg_close
(
&
msg
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
receiver
);
assert
(
rc
==
0
);
close_zero_linger
(
receiver
);
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
{
rc
=
zmq_close
(
senders
[
i
]);
assert
(
rc
==
0
);
close_zero_linger
(
senders
[
i
]);
}
// Wait for disconnects.
rc
=
zmq_poll
(
0
,
0
,
100
);
assert
(
rc
==
0
);
}
void
test_destroy_queue_on_disconnect
(
void
*
ctx
)
...
...
@@ -100,7 +114,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
int
rc
=
zmq_setsockopt
(
A
,
ZMQ_ROUTER_MANDATORY
,
&
enabled
,
sizeof
(
enabled
));
assert
(
rc
==
0
);
rc
=
zmq_bind
(
A
,
"inproc://d"
);
rc
=
zmq_bind
(
A
,
bind_address
);
assert
(
rc
==
0
);
void
*
B
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
...
...
@@ -109,20 +123,26 @@ void test_destroy_queue_on_disconnect (void *ctx)
rc
=
zmq_setsockopt
(
B
,
ZMQ_IDENTITY
,
"B"
,
2
);
assert
(
rc
==
0
);
rc
=
zmq_connect
(
B
,
"inproc://d"
);
rc
=
zmq_connect
(
B
,
connect_address
);
assert
(
rc
==
0
);
// Wait for connection.
rc
=
zmq_poll
(
0
,
0
,
100
);
assert
(
rc
==
0
);
// Send a message in both directions
s_send_seq
(
A
,
"B"
,
"ABC"
,
SEQ_END
);
s_send_seq
(
B
,
"DEF"
,
SEQ_END
);
rc
=
zmq_disconnect
(
B
,
"inproc://d"
);
rc
=
zmq_disconnect
(
B
,
connect_address
);
assert
(
rc
==
0
);
// Disconnect may take time and need command processing.
zmq_pollitem_t
poller
[
2
]
=
{
{
A
,
0
,
0
,
0
},
{
B
,
0
,
0
,
0
}
};
rc
=
zmq_poll
(
poller
,
2
,
100
);
assert
(
rc
==
0
);
rc
=
zmq_poll
(
poller
,
2
,
100
);
assert
(
rc
==
0
);
// No messages should be available, sending should fail.
zmq_msg_t
msg
;
...
...
@@ -137,7 +157,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
assert
(
errno
==
EAGAIN
);
// After a reconnect of B, the messages should still be gone
rc
=
zmq_connect
(
B
,
"inproc://d"
);
rc
=
zmq_connect
(
B
,
connect_address
);
assert
(
rc
==
0
);
rc
=
zmq_msg_recv
(
&
msg
,
A
,
ZMQ_DONTWAIT
);
...
...
@@ -151,10 +171,11 @@ void test_destroy_queue_on_disconnect (void *ctx)
rc
=
zmq_msg_close
(
&
msg
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
A
);
assert
(
rc
==
0
);
close_zero_linger
(
A
);
close_zero_linger
(
B
);
rc
=
zmq_close
(
B
);
// Wait for disconnects.
rc
=
zmq_poll
(
0
,
0
,
100
);
assert
(
rc
==
0
);
}
...
...
@@ -164,14 +185,22 @@ int main ()
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
// SHALL receive incoming messages from its peers using a fair-queuing
// strategy.
test_fair_queue_in
(
ctx
);
const
char
*
binds
[]
=
{
"inproc://a"
,
"tcp://*:5555"
};
const
char
*
connects
[]
=
{
"inproc://a"
,
"tcp://localhost:5555"
};
for
(
int
i
=
0
;
i
<
2
;
++
i
)
{
bind_address
=
binds
[
i
];
connect_address
=
connects
[
i
];
// SHALL create a double queue when a peer connects to it. If this peer
// disconnects, the ROUTER socket SHALL destroy its double queue and SHALL
// discard any messages it contains.
test_destroy_queue_on_disconnect
(
ctx
);
// SHALL receive incoming messages from its peers using a fair-queuing
// strategy.
test_fair_queue_in
(
ctx
);
// SHALL create a double queue when a peer connects to it. If this peer
// disconnects, the ROUTER socket SHALL destroy its double queue and SHALL
// discard any messages it contains.
test_destroy_queue_on_disconnect
(
ctx
);
}
int
rc
=
zmq_ctx_term
(
ctx
);
assert
(
rc
==
0
);
...
...
tests/testutil.hpp
View file @
08622a77
...
...
@@ -186,4 +186,15 @@ void s_recv_seq (void *socket, ...)
zmq_msg_close
(
&
msg
);
}
// Sets a zero linger period on a socket and closes it.
void
close_zero_linger
(
void
*
socket
)
{
int
linger
=
0
;
int
rc
=
zmq_setsockopt
(
socket
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
));
assert
(
rc
==
0
);
rc
=
zmq_close
(
socket
);
assert
(
rc
==
0
);
}
#endif
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment