Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
bbc9a611
Commit
bbc9a611
authored
Jul 14, 2013
by
Christian Kamm
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Fix coding style in spec tests.
parent
524bd7ac
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
130 additions
and
161 deletions
+130
-161
test_spec_dealer.cpp
tests/test_spec_dealer.cpp
+29
-37
test_spec_pushpull.cpp
tests/test_spec_pushpull.cpp
+43
-56
test_spec_rep.cpp
tests/test_spec_rep.cpp
+26
-31
test_spec_req.cpp
tests/test_spec_req.cpp
+4
-4
test_spec_router.cpp
tests/test_spec_router.cpp
+28
-33
No files found.
tests/test_spec_dealer.cpp
View file @
bbc9a611
...
@@ -34,16 +34,15 @@ void test_round_robin_out (void *ctx)
...
@@ -34,16 +34,15 @@ void test_round_robin_out (void *ctx)
const
size_t
services
=
5
;
const
size_t
services
=
5
;
void
*
rep
[
services
];
void
*
rep
[
services
];
for
(
size_t
i
=
0
;
i
<
services
;
++
i
)
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
{
{
rep
[
peer
]
=
zmq_socket
(
ctx
,
ZMQ_REP
);
rep
[
i
]
=
zmq_socket
(
ctx
,
ZMQ_REP
);
assert
(
rep
[
peer
]);
assert
(
rep
[
i
]);
int
timeout
=
100
;
int
timeout
=
100
;
rc
=
zmq_setsockopt
(
rep
[
i
],
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
int
));
rc
=
zmq_setsockopt
(
rep
[
peer
],
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
int
));
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_connect
(
rep
[
i
],
connect_address
);
rc
=
zmq_connect
(
rep
[
peer
],
connect_address
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
}
}
...
@@ -53,28 +52,22 @@ void test_round_robin_out (void *ctx)
...
@@ -53,28 +52,22 @@ void test_round_robin_out (void *ctx)
// Send all requests
// Send all requests
for
(
size_t
i
=
0
;
i
<
services
;
++
i
)
for
(
size_t
i
=
0
;
i
<
services
;
++
i
)
{
s_send_seq
(
dealer
,
0
,
"ABC"
,
SEQ_END
);
s_send_seq
(
dealer
,
0
,
"ABC"
,
SEQ_END
);
}
// Expect every REP got one message
// Expect every REP got one message
zmq_msg_t
msg
;
zmq_msg_t
msg
;
zmq_msg_init
(
&
msg
);
zmq_msg_init
(
&
msg
);
for
(
size_t
i
=
0
;
i
<
services
;
++
i
)
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
{
s_recv_seq
(
rep
[
peer
],
"ABC"
,
SEQ_END
);
s_recv_seq
(
rep
[
i
],
"ABC"
,
SEQ_END
);
}
rc
=
zmq_msg_close
(
&
msg
);
rc
=
zmq_msg_close
(
&
msg
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
close_zero_linger
(
dealer
);
close_zero_linger
(
dealer
);
for
(
size_t
i
=
0
;
i
<
services
;
++
i
)
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
{
close_zero_linger
(
rep
[
peer
]);
close_zero_linger
(
rep
[
i
]);
}
// Wait for disconnects.
// Wait for disconnects.
rc
=
zmq_poll
(
0
,
0
,
100
);
rc
=
zmq_poll
(
0
,
0
,
100
);
...
@@ -87,7 +80,7 @@ void test_fair_queue_in (void *ctx)
...
@@ -87,7 +80,7 @@ void test_fair_queue_in (void *ctx)
assert
(
receiver
);
assert
(
receiver
);
int
timeout
=
100
;
int
timeout
=
100
;
int
rc
=
zmq_setsockopt
(
receiver
,
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
int
));
int
rc
=
zmq_setsockopt
(
receiver
,
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
int
));
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_bind
(
receiver
,
bind_address
);
rc
=
zmq_bind
(
receiver
,
bind_address
);
...
@@ -95,15 +88,14 @@ void test_fair_queue_in (void *ctx)
...
@@ -95,15 +88,14 @@ void test_fair_queue_in (void *ctx)
const
size_t
services
=
5
;
const
size_t
services
=
5
;
void
*
senders
[
services
];
void
*
senders
[
services
];
for
(
size_t
i
=
0
;
i
<
services
;
++
i
)
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
{
{
senders
[
peer
]
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
senders
[
i
]
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
assert
(
senders
[
peer
]);
assert
(
senders
[
i
]);
rc
=
zmq_setsockopt
(
senders
[
i
],
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
int
));
rc
=
zmq_setsockopt
(
senders
[
peer
],
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
int
));
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_connect
(
senders
[
i
],
connect_address
);
rc
=
zmq_connect
(
senders
[
peer
],
connect_address
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
}
}
...
@@ -111,22 +103,22 @@ void test_fair_queue_in (void *ctx)
...
@@ -111,22 +103,22 @@ void test_fair_queue_in (void *ctx)
rc
=
zmq_msg_init
(
&
msg
);
rc
=
zmq_msg_init
(
&
msg
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
s_send_seq
(
senders
[
0
],
"A"
,
SEQ_END
);
s_send_seq
(
senders
[
0
],
"A"
,
SEQ_END
);
s_recv_seq
(
receiver
,
"A"
,
SEQ_END
);
s_recv_seq
(
receiver
,
"A"
,
SEQ_END
);
s_send_seq
(
senders
[
0
],
"A"
,
SEQ_END
);
s_send_seq
(
senders
[
0
],
"A"
,
SEQ_END
);
s_recv_seq
(
receiver
,
"A"
,
SEQ_END
);
s_recv_seq
(
receiver
,
"A"
,
SEQ_END
);
// send our requests
// send our requests
for
(
size_t
i
=
0
;
i
<
services
;
++
i
)
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
s_send_seq
(
senders
[
i
],
"B"
,
SEQ_END
);
s_send_seq
(
senders
[
peer
],
"B"
,
SEQ_END
);
// Wait for data.
// Wait for data.
rc
=
zmq_poll
(
0
,
0
,
50
);
rc
=
zmq_poll
(
0
,
0
,
50
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// handle the requests
// handle the requests
for
(
size_t
i
=
0
;
i
<
services
;
++
i
)
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
s_recv_seq
(
receiver
,
"B"
,
SEQ_END
);
s_recv_seq
(
receiver
,
"B"
,
SEQ_END
);
rc
=
zmq_msg_close
(
&
msg
);
rc
=
zmq_msg_close
(
&
msg
);
...
@@ -134,8 +126,8 @@ void test_fair_queue_in (void *ctx)
...
@@ -134,8 +126,8 @@ void test_fair_queue_in (void *ctx)
close_zero_linger
(
receiver
);
close_zero_linger
(
receiver
);
for
(
size_t
i
=
0
;
i
<
services
;
++
i
)
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
close_zero_linger
(
senders
[
i
]);
close_zero_linger
(
senders
[
peer
]);
// Wait for disconnects.
// Wait for disconnects.
rc
=
zmq_poll
(
0
,
0
,
100
);
rc
=
zmq_poll
(
0
,
0
,
100
);
...
@@ -164,7 +156,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
...
@@ -164,7 +156,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// Disconnect may take time and need command processing.
// Disconnect may take time and need command processing.
zmq_pollitem_t
poller
[
2
]
=
{
{
A
,
0
,
0
,
0
},
{
B
,
0
,
0
,
0
}
};
zmq_pollitem_t
poller
[
2
]
=
{
{
A
,
0
,
0
,
0
},
{
B
,
0
,
0
,
0
}
};
rc
=
zmq_poll
(
poller
,
2
,
100
);
rc
=
zmq_poll
(
poller
,
2
,
100
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_poll
(
poller
,
2
,
100
);
rc
=
zmq_poll
(
poller
,
2
,
100
);
...
@@ -211,7 +203,7 @@ void test_block_on_send_no_peers (void *ctx)
...
@@ -211,7 +203,7 @@ void test_block_on_send_no_peers (void *ctx)
assert
(
sc
);
assert
(
sc
);
int
timeout
=
100
;
int
timeout
=
100
;
int
rc
=
zmq_setsockopt
(
sc
,
ZMQ_SNDTIMEO
,
&
timeout
,
sizeof
(
timeout
));
int
rc
=
zmq_setsockopt
(
sc
,
ZMQ_SNDTIMEO
,
&
timeout
,
sizeof
(
timeout
));
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_send
(
sc
,
0
,
0
,
ZMQ_DONTWAIT
);
rc
=
zmq_send
(
sc
,
0
,
0
,
ZMQ_DONTWAIT
);
...
@@ -231,12 +223,12 @@ int main (void)
...
@@ -231,12 +223,12 @@ int main (void)
void
*
ctx
=
zmq_ctx_new
();
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
assert
(
ctx
);
const
char
*
binds
[]
=
{
"inproc://a"
,
"tcp://*:5555"
};
const
char
*
binds
[]
=
{
"inproc://a"
,
"tcp://*:5555"
};
const
char
*
connects
[]
=
{
"inproc://a"
,
"tcp://localhost:5555"
};
const
char
*
connects
[]
=
{
"inproc://a"
,
"tcp://localhost:5555"
};
for
(
int
i
=
0
;
i
<
2
;
++
i
)
{
for
(
int
transports
=
0
;
transports
<
2
;
++
transports
)
{
bind_address
=
binds
[
i
];
bind_address
=
binds
[
transports
];
connect_address
=
connects
[
i
];
connect_address
=
connects
[
transports
];
// SHALL route outgoing messages to available peers using a round-robin
// SHALL route outgoing messages to available peers using a round-robin
// strategy.
// strategy.
...
...
tests/test_spec_pushpull.cpp
View file @
bbc9a611
...
@@ -32,18 +32,17 @@ void test_push_round_robin_out (void *ctx)
...
@@ -32,18 +32,17 @@ void test_push_round_robin_out (void *ctx)
int
rc
=
zmq_bind
(
push
,
bind_address
);
int
rc
=
zmq_bind
(
push
,
bind_address
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
const
size_t
N
=
5
;
const
size_t
services
=
5
;
void
*
pulls
[
N
];
void
*
pulls
[
services
];
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
{
{
pulls
[
peer
]
=
zmq_socket
(
ctx
,
ZMQ_PULL
);
pulls
[
i
]
=
zmq_socket
(
ctx
,
ZMQ_PULL
);
assert
(
pulls
[
peer
]);
assert
(
pulls
[
i
]);
int
timeout
=
100
;
int
timeout
=
100
;
rc
=
zmq_setsockopt
(
pulls
[
i
],
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
int
));
rc
=
zmq_setsockopt
(
pulls
[
peer
],
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
int
));
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_connect
(
pulls
[
i
],
connect_address
);
rc
=
zmq_connect
(
pulls
[
peer
],
connect_address
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
}
}
...
@@ -52,28 +51,21 @@ void test_push_round_robin_out (void *ctx)
...
@@ -52,28 +51,21 @@ void test_push_round_robin_out (void *ctx)
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// Send 2N messages
// Send 2N messages
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
{
s_send_seq
(
push
,
"ABC"
,
SEQ_END
);
s_send_seq
(
push
,
"ABC"
,
SEQ_END
);
}
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
{
s_send_seq
(
push
,
"DEF"
,
SEQ_END
);
s_send_seq
(
push
,
"DEF"
,
SEQ_END
);
}
// Expect every PULL got one of each
// Expect every PULL got one of each
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
{
{
s_recv_seq
(
pulls
[
peer
],
"ABC"
,
SEQ_END
);
s_recv_seq
(
pulls
[
i
],
"ABC"
,
SEQ_END
);
s_recv_seq
(
pulls
[
peer
],
"DEF"
,
SEQ_END
);
s_recv_seq
(
pulls
[
i
],
"DEF"
,
SEQ_END
);
}
}
close_zero_linger
(
push
);
close_zero_linger
(
push
);
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
{
close_zero_linger
(
pulls
[
peer
]);
close_zero_linger
(
pulls
[
i
]);
}
// Wait for disconnects.
// Wait for disconnects.
rc
=
zmq_poll
(
0
,
0
,
100
);
rc
=
zmq_poll
(
0
,
0
,
100
);
...
@@ -88,14 +80,14 @@ void test_pull_fair_queue_in (void *ctx)
...
@@ -88,14 +80,14 @@ void test_pull_fair_queue_in (void *ctx)
int
rc
=
zmq_bind
(
pull
,
bind_address
);
int
rc
=
zmq_bind
(
pull
,
bind_address
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
const
size_t
N
=
5
;
const
size_t
services
=
5
;
void
*
pushs
[
N
];
void
*
pushs
[
services
];
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
{
{
pushs
[
i
]
=
zmq_socket
(
ctx
,
ZMQ_PUSH
);
pushs
[
peer
]
=
zmq_socket
(
ctx
,
ZMQ_PUSH
);
assert
(
pushs
[
i
]);
assert
(
pushs
[
peer
]);
rc
=
zmq_connect
(
pushs
[
i
],
connect_address
);
rc
=
zmq_connect
(
pushs
[
peer
],
connect_address
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
}
}
...
@@ -107,17 +99,16 @@ void test_pull_fair_queue_in (void *ctx)
...
@@ -107,17 +99,16 @@ void test_pull_fair_queue_in (void *ctx)
int
second_half
=
0
;
int
second_half
=
0
;
// Send 2N messages
// Send 2N messages
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
{
{
char
*
str
=
strdup
(
"A"
);
char
*
str
=
strdup
(
"A"
);
str
[
0
]
+=
i
;
str
[
0
]
+=
peer
;
s_send_seq
(
pushs
[
i
],
str
,
SEQ_END
);
s_send_seq
(
pushs
[
peer
],
str
,
SEQ_END
);
first_half
+=
str
[
0
];
first_half
+=
str
[
0
];
str
[
0
]
+=
N
;
str
[
0
]
+=
services
;
s_send_seq
(
pushs
[
i
],
str
,
SEQ_END
);
s_send_seq
(
pushs
[
peer
],
str
,
SEQ_END
);
second_half
+=
str
[
0
];
second_half
+=
str
[
0
];
free
(
str
);
free
(
str
);
}
}
...
@@ -131,22 +122,20 @@ void test_pull_fair_queue_in (void *ctx)
...
@@ -131,22 +122,20 @@ void test_pull_fair_queue_in (void *ctx)
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// Expect to pull one from each first
// Expect to pull one from each first
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
{
{
rc
=
zmq_msg_recv
(
&
msg
,
pull
,
0
);
rc
=
zmq_msg_recv
(
&
msg
,
pull
,
0
);
assert
(
rc
==
2
);
assert
(
rc
==
2
);
const
char
*
str
=
(
const
char
*
)
zmq_msg_data
(
&
msg
);
const
char
*
str
=
(
const
char
*
)
zmq_msg_data
(
&
msg
);
first_half
-=
str
[
0
];
first_half
-=
str
[
0
];
}
}
assert
(
first_half
==
0
);
assert
(
first_half
==
0
);
// And then get the second batch
// And then get the second batch
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
{
{
rc
=
zmq_msg_recv
(
&
msg
,
pull
,
0
);
rc
=
zmq_msg_recv
(
&
msg
,
pull
,
0
);
assert
(
rc
==
2
);
assert
(
rc
==
2
);
const
char
*
str
=
(
const
char
*
)
zmq_msg_data
(
&
msg
);
const
char
*
str
=
(
const
char
*
)
zmq_msg_data
(
&
msg
);
second_half
-=
str
[
0
];
second_half
-=
str
[
0
];
}
}
assert
(
second_half
==
0
);
assert
(
second_half
==
0
);
...
@@ -155,10 +144,8 @@ void test_pull_fair_queue_in (void *ctx)
...
@@ -155,10 +144,8 @@ void test_pull_fair_queue_in (void *ctx)
close_zero_linger
(
pull
);
close_zero_linger
(
pull
);
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
{
close_zero_linger
(
pushs
[
peer
]);
close_zero_linger
(
pushs
[
i
]);
}
// Wait for disconnects.
// Wait for disconnects.
rc
=
zmq_poll
(
0
,
0
,
100
);
rc
=
zmq_poll
(
0
,
0
,
100
);
...
@@ -171,7 +158,7 @@ void test_push_block_on_send_no_peers (void *ctx)
...
@@ -171,7 +158,7 @@ void test_push_block_on_send_no_peers (void *ctx)
assert
(
sc
);
assert
(
sc
);
int
timeout
=
100
;
int
timeout
=
100
;
int
rc
=
zmq_setsockopt
(
sc
,
ZMQ_SNDTIMEO
,
&
timeout
,
sizeof
(
timeout
));
int
rc
=
zmq_setsockopt
(
sc
,
ZMQ_SNDTIMEO
,
&
timeout
,
sizeof
(
timeout
));
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_send
(
sc
,
0
,
0
,
ZMQ_DONTWAIT
);
rc
=
zmq_send
(
sc
,
0
,
0
,
ZMQ_DONTWAIT
);
...
@@ -192,7 +179,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
...
@@ -192,7 +179,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
assert
(
A
);
assert
(
A
);
int
hwm
=
1
;
int
hwm
=
1
;
int
rc
=
zmq_setsockopt
(
A
,
ZMQ_SNDHWM
,
&
hwm
,
sizeof
(
hwm
));
int
rc
=
zmq_setsockopt
(
A
,
ZMQ_SNDHWM
,
&
hwm
,
sizeof
(
hwm
));
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_bind
(
A
,
bind_address
);
rc
=
zmq_bind
(
A
,
bind_address
);
...
@@ -201,7 +188,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
...
@@ -201,7 +188,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
void
*
B
=
zmq_socket
(
ctx
,
ZMQ_PULL
);
void
*
B
=
zmq_socket
(
ctx
,
ZMQ_PULL
);
assert
(
B
);
assert
(
B
);
rc
=
zmq_setsockopt
(
B
,
ZMQ_RCVHWM
,
&
hwm
,
sizeof
(
hwm
));
rc
=
zmq_setsockopt
(
B
,
ZMQ_RCVHWM
,
&
hwm
,
sizeof
(
hwm
));
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_connect
(
B
,
connect_address
);
rc
=
zmq_connect
(
B
,
connect_address
);
...
@@ -221,7 +208,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
...
@@ -221,7 +208,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// Disconnect may take time and need command processing.
// Disconnect may take time and need command processing.
zmq_pollitem_t
poller
[
2
]
=
{
{
A
,
0
,
0
,
0
},
{
B
,
0
,
0
,
0
}
};
zmq_pollitem_t
poller
[
2
]
=
{
{
A
,
0
,
0
,
0
},
{
B
,
0
,
0
,
0
}
};
rc
=
zmq_poll
(
poller
,
2
,
100
);
rc
=
zmq_poll
(
poller
,
2
,
100
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_poll
(
poller
,
2
,
100
);
rc
=
zmq_poll
(
poller
,
2
,
100
);
...
@@ -269,17 +256,17 @@ void test_destroy_queue_on_disconnect (void *ctx)
...
@@ -269,17 +256,17 @@ void test_destroy_queue_on_disconnect (void *ctx)
assert
(
rc
==
0
);
assert
(
rc
==
0
);
}
}
int
main
()
int
main
(
void
)
{
{
void
*
ctx
=
zmq_ctx_new
();
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
assert
(
ctx
);
const
char
*
binds
[]
=
{
"inproc://a"
,
"tcp://*:5555"
};
const
char
*
binds
[]
=
{
"inproc://a"
,
"tcp://*:5555"
};
const
char
*
connects
[]
=
{
"inproc://a"
,
"tcp://localhost:5555"
};
const
char
*
connects
[]
=
{
"inproc://a"
,
"tcp://localhost:5555"
};
for
(
int
i
=
0
;
i
<
2
;
++
i
)
{
for
(
int
transport
=
0
;
transport
<
2
;
++
transport
)
{
bind_address
=
binds
[
i
];
bind_address
=
binds
[
transport
];
connect_address
=
connects
[
i
];
connect_address
=
connects
[
transport
];
// PUSH: SHALL route outgoing messages to connected peers using a
// PUSH: SHALL route outgoing messages to connected peers using a
// round-robin strategy.
// round-robin strategy.
...
...
tests/test_spec_rep.cpp
View file @
bbc9a611
...
@@ -30,62 +30,57 @@ void test_fair_queue_in (void *ctx)
...
@@ -30,62 +30,57 @@ void test_fair_queue_in (void *ctx)
assert
(
rep
);
assert
(
rep
);
int
timeout
=
100
;
int
timeout
=
100
;
int
rc
=
zmq_setsockopt
(
rep
,
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
int
));
int
rc
=
zmq_setsockopt
(
rep
,
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
int
));
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_bind
(
rep
,
bind_address
);
rc
=
zmq_bind
(
rep
,
bind_address
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
const
size_t
N
=
5
;
const
size_t
services
=
5
;
void
*
reqs
[
N
];
void
*
reqs
[
services
];
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
{
{
reqs
[
peer
]
=
zmq_socket
(
ctx
,
ZMQ_REQ
);
reqs
[
i
]
=
zmq_socket
(
ctx
,
ZMQ_REQ
);
assert
(
reqs
[
peer
]);
assert
(
reqs
[
i
]);
rc
=
zmq_setsockopt
(
reqs
[
i
],
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
int
));
rc
=
zmq_setsockopt
(
reqs
[
peer
],
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
int
));
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_connect
(
reqs
[
i
],
connect_address
);
rc
=
zmq_connect
(
reqs
[
peer
],
connect_address
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
}
}
s_send_seq
(
reqs
[
0
],
"A"
,
SEQ_END
);
s_send_seq
(
reqs
[
0
],
"A"
,
SEQ_END
);
s_recv_seq
(
rep
,
"A"
,
SEQ_END
);
s_recv_seq
(
rep
,
"A"
,
SEQ_END
);
s_send_seq
(
rep
,
"A"
,
SEQ_END
);
s_send_seq
(
rep
,
"A"
,
SEQ_END
);
s_recv_seq
(
reqs
[
0
],
"A"
,
SEQ_END
);
s_recv_seq
(
reqs
[
0
],
"A"
,
SEQ_END
);
s_send_seq
(
reqs
[
0
],
"A"
,
SEQ_END
);
s_send_seq
(
reqs
[
0
],
"A"
,
SEQ_END
);
s_recv_seq
(
rep
,
"A"
,
SEQ_END
);
s_recv_seq
(
rep
,
"A"
,
SEQ_END
);
s_send_seq
(
rep
,
"A"
,
SEQ_END
);
s_send_seq
(
rep
,
"A"
,
SEQ_END
);
s_recv_seq
(
reqs
[
0
],
"A"
,
SEQ_END
);
s_recv_seq
(
reqs
[
0
],
"A"
,
SEQ_END
);
// send N requests
// send N requests
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
{
{
char
*
str
=
strdup
(
"A"
);
char
*
str
=
strdup
(
"A"
);
str
[
0
]
+=
i
;
str
[
0
]
+=
peer
;
s_send_seq
(
reqs
[
i
],
str
,
SEQ_END
);
s_send_seq
(
reqs
[
peer
],
str
,
SEQ_END
);
free
(
str
);
free
(
str
);
}
}
// handle N requests
// handle N requests
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
{
{
char
*
str
=
strdup
(
"A"
);
char
*
str
=
strdup
(
"A"
);
str
[
0
]
+=
i
;
str
[
0
]
+=
peer
;
s_recv_seq
(
rep
,
str
,
SEQ_END
);
s_recv_seq
(
rep
,
str
,
SEQ_END
);
s_send_seq
(
rep
,
str
,
SEQ_END
);
s_send_seq
(
rep
,
str
,
SEQ_END
);
s_recv_seq
(
reqs
[
i
],
str
,
SEQ_END
);
s_recv_seq
(
reqs
[
peer
],
str
,
SEQ_END
);
free
(
str
);
free
(
str
);
}
}
close_zero_linger
(
rep
);
close_zero_linger
(
rep
);
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
{
close_zero_linger
(
reqs
[
peer
]);
close_zero_linger
(
reqs
[
i
]);
}
// Wait for disconnects.
// Wait for disconnects.
rc
=
zmq_poll
(
0
,
0
,
100
);
rc
=
zmq_poll
(
0
,
0
,
100
);
...
@@ -126,17 +121,17 @@ void test_envelope (void *ctx)
...
@@ -126,17 +121,17 @@ void test_envelope (void *ctx)
assert
(
rc
==
0
);
assert
(
rc
==
0
);
}
}
int
main
()
int
main
(
void
)
{
{
void
*
ctx
=
zmq_ctx_new
();
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
assert
(
ctx
);
const
char
*
binds
[]
=
{
"inproc://a"
,
"tcp://*:5555"
};
const
char
*
binds
[]
=
{
"inproc://a"
,
"tcp://*:5555"
};
const
char
*
connects
[]
=
{
"inproc://a"
,
"tcp://localhost:5555"
};
const
char
*
connects
[]
=
{
"inproc://a"
,
"tcp://localhost:5555"
};
for
(
int
i
=
0
;
i
<
2
;
++
i
)
{
for
(
int
transport
=
0
;
transport
<
2
;
++
transport
)
{
bind_address
=
binds
[
i
];
bind_address
=
binds
[
transport
];
connect_address
=
connects
[
i
];
connect_address
=
connects
[
transport
];
// SHALL receive incoming messages from its peers using a fair-queuing
// SHALL receive incoming messages from its peers using a fair-queuing
// strategy.
// strategy.
...
...
tests/test_spec_req.cpp
View file @
bbc9a611
...
@@ -88,11 +88,11 @@ void test_req_only_listens_to_current_peer (void *ctx)
...
@@ -88,11 +88,11 @@ void test_req_only_listens_to_current_peer (void *ctx)
assert
(
router
[
i
]);
assert
(
router
[
i
]);
int
timeout
=
100
;
int
timeout
=
100
;
rc
=
zmq_setsockopt
(
router
[
i
],
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
timeout
));
rc
=
zmq_setsockopt
(
router
[
i
],
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
timeout
));
assert
(
rc
==
0
);
assert
(
rc
==
0
);
int
enabled
=
1
;
int
enabled
=
1
;
rc
=
zmq_setsockopt
(
router
[
i
],
ZMQ_ROUTER_MANDATORY
,
&
enabled
,
sizeof
(
enabled
));
rc
=
zmq_setsockopt
(
router
[
i
],
ZMQ_ROUTER_MANDATORY
,
&
enabled
,
sizeof
(
enabled
));
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_connect
(
router
[
i
],
connect_address
);
rc
=
zmq_connect
(
router
[
i
],
connect_address
);
...
@@ -154,7 +154,7 @@ void test_req_message_format (void *ctx)
...
@@ -154,7 +154,7 @@ void test_req_message_format (void *ctx)
zmq_msg_copy
(
&
peer_id_msg
,
&
msg
);
zmq_msg_copy
(
&
peer_id_msg
,
&
msg
);
int
more
=
0
;
int
more
=
0
;
size_t
more_size
=
sizeof
(
more
);
size_t
more_size
=
sizeof
(
more
);
rc
=
zmq_getsockopt
(
router
,
ZMQ_RCVMORE
,
&
more
,
&
more_size
);
rc
=
zmq_getsockopt
(
router
,
ZMQ_RCVMORE
,
&
more
,
&
more_size
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
assert
(
more
);
assert
(
more
);
...
@@ -190,7 +190,7 @@ void test_block_on_send_no_peers (void *ctx)
...
@@ -190,7 +190,7 @@ void test_block_on_send_no_peers (void *ctx)
assert
(
sc
);
assert
(
sc
);
int
timeout
=
100
;
int
timeout
=
100
;
int
rc
=
zmq_setsockopt
(
sc
,
ZMQ_SNDTIMEO
,
&
timeout
,
sizeof
(
timeout
));
int
rc
=
zmq_setsockopt
(
sc
,
ZMQ_SNDTIMEO
,
&
timeout
,
sizeof
(
timeout
));
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_send
(
sc
,
0
,
0
,
ZMQ_DONTWAIT
);
rc
=
zmq_send
(
sc
,
0
,
0
,
ZMQ_DONTWAIT
);
...
...
tests/test_spec_router.cpp
View file @
bbc9a611
...
@@ -30,29 +30,28 @@ void test_fair_queue_in (void *ctx)
...
@@ -30,29 +30,28 @@ void test_fair_queue_in (void *ctx)
assert
(
receiver
);
assert
(
receiver
);
int
timeout
=
100
;
int
timeout
=
100
;
int
rc
=
zmq_setsockopt
(
receiver
,
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
int
));
int
rc
=
zmq_setsockopt
(
receiver
,
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
int
));
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_bind
(
receiver
,
bind_address
);
rc
=
zmq_bind
(
receiver
,
bind_address
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
const
size_t
N
=
5
;
const
size_t
services
=
5
;
void
*
senders
[
N
];
void
*
senders
[
services
];
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
{
{
senders
[
peer
]
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
senders
[
i
]
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
assert
(
senders
[
peer
]);
assert
(
senders
[
i
]);
rc
=
zmq_setsockopt
(
senders
[
i
],
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
int
));
rc
=
zmq_setsockopt
(
senders
[
peer
],
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
int
));
assert
(
rc
==
0
);
assert
(
rc
==
0
);
char
*
str
=
strdup
(
"A"
);
char
*
str
=
strdup
(
"A"
);
str
[
0
]
+=
i
;
str
[
0
]
+=
peer
;
rc
=
zmq_setsockopt
(
senders
[
i
],
ZMQ_IDENTITY
,
str
,
2
);
rc
=
zmq_setsockopt
(
senders
[
peer
],
ZMQ_IDENTITY
,
str
,
2
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
free
(
str
);
free
(
str
);
rc
=
zmq_connect
(
senders
[
i
],
connect_address
);
rc
=
zmq_connect
(
senders
[
peer
],
connect_address
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
}
}
...
@@ -60,30 +59,28 @@ void test_fair_queue_in (void *ctx)
...
@@ -60,30 +59,28 @@ void test_fair_queue_in (void *ctx)
rc
=
zmq_msg_init
(
&
msg
);
rc
=
zmq_msg_init
(
&
msg
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
s_send_seq
(
senders
[
0
],
"M"
,
SEQ_END
);
s_send_seq
(
senders
[
0
],
"M"
,
SEQ_END
);
s_recv_seq
(
receiver
,
"A"
,
"M"
,
SEQ_END
);
s_recv_seq
(
receiver
,
"A"
,
"M"
,
SEQ_END
);
s_send_seq
(
senders
[
0
],
"M"
,
SEQ_END
);
s_send_seq
(
senders
[
0
],
"M"
,
SEQ_END
);
s_recv_seq
(
receiver
,
"A"
,
"M"
,
SEQ_END
);
s_recv_seq
(
receiver
,
"A"
,
"M"
,
SEQ_END
);
int
sum
=
0
;
int
sum
=
0
;
// send N requests
// send N requests
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
{
{
s_send_seq
(
senders
[
peer
],
"M"
,
SEQ_END
);
s_send_seq
(
senders
[
i
],
"M"
,
SEQ_END
);
sum
+=
'A'
+
peer
;
sum
+=
'A'
+
i
;
}
}
assert
(
sum
==
N
*
'A'
+
N
*
(
N
-
1
)
/
2
);
assert
(
sum
==
services
*
'A'
+
services
*
(
services
-
1
)
/
2
);
// handle N requests
// handle N requests
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
{
{
rc
=
zmq_msg_recv
(
&
msg
,
receiver
,
0
);
rc
=
zmq_msg_recv
(
&
msg
,
receiver
,
0
);
assert
(
rc
==
2
);
assert
(
rc
==
2
);
const
char
*
id
=
(
const
char
*
)
zmq_msg_data
(
&
msg
);
const
char
*
id
=
(
const
char
*
)
zmq_msg_data
(
&
msg
);
sum
-=
id
[
0
];
sum
-=
id
[
0
];
s_recv_seq
(
receiver
,
"M"
,
SEQ_END
);
s_recv_seq
(
receiver
,
"M"
,
SEQ_END
);
}
}
...
@@ -95,10 +92,8 @@ void test_fair_queue_in (void *ctx)
...
@@ -95,10 +92,8 @@ void test_fair_queue_in (void *ctx)
close_zero_linger
(
receiver
);
close_zero_linger
(
receiver
);
for
(
size_t
i
=
0
;
i
<
N
;
++
i
)
for
(
size_t
peer
=
0
;
peer
<
services
;
++
peer
)
{
close_zero_linger
(
senders
[
peer
]);
close_zero_linger
(
senders
[
i
]);
}
// Wait for disconnects.
// Wait for disconnects.
rc
=
zmq_poll
(
0
,
0
,
100
);
rc
=
zmq_poll
(
0
,
0
,
100
);
...
@@ -111,7 +106,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
...
@@ -111,7 +106,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
assert
(
A
);
assert
(
A
);
int
enabled
=
1
;
int
enabled
=
1
;
int
rc
=
zmq_setsockopt
(
A
,
ZMQ_ROUTER_MANDATORY
,
&
enabled
,
sizeof
(
enabled
));
int
rc
=
zmq_setsockopt
(
A
,
ZMQ_ROUTER_MANDATORY
,
&
enabled
,
sizeof
(
enabled
));
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_bind
(
A
,
bind_address
);
rc
=
zmq_bind
(
A
,
bind_address
);
...
@@ -138,7 +133,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
...
@@ -138,7 +133,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// Disconnect may take time and need command processing.
// Disconnect may take time and need command processing.
zmq_pollitem_t
poller
[
2
]
=
{
{
A
,
0
,
0
,
0
},
{
B
,
0
,
0
,
0
}
};
zmq_pollitem_t
poller
[
2
]
=
{
{
A
,
0
,
0
,
0
},
{
B
,
0
,
0
,
0
}
};
rc
=
zmq_poll
(
poller
,
2
,
100
);
rc
=
zmq_poll
(
poller
,
2
,
100
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_poll
(
poller
,
2
,
100
);
rc
=
zmq_poll
(
poller
,
2
,
100
);
...
@@ -180,17 +175,17 @@ void test_destroy_queue_on_disconnect (void *ctx)
...
@@ -180,17 +175,17 @@ void test_destroy_queue_on_disconnect (void *ctx)
}
}
int
main
()
int
main
(
void
)
{
{
void
*
ctx
=
zmq_ctx_new
();
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
assert
(
ctx
);
const
char
*
binds
[]
=
{
"inproc://a"
,
"tcp://*:5555"
};
const
char
*
binds
[]
=
{
"inproc://a"
,
"tcp://*:5555"
};
const
char
*
connects
[]
=
{
"inproc://a"
,
"tcp://localhost:5555"
};
const
char
*
connects
[]
=
{
"inproc://a"
,
"tcp://localhost:5555"
};
for
(
int
i
=
0
;
i
<
2
;
++
i
)
{
for
(
int
transport
=
0
;
transport
<
2
;
++
transport
)
{
bind_address
=
binds
[
i
];
bind_address
=
binds
[
transport
];
connect_address
=
connects
[
i
];
connect_address
=
connects
[
transport
];
// SHALL receive incoming messages from its peers using a fair-queuing
// SHALL receive incoming messages from its peers using a fair-queuing
// strategy.
// strategy.
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment