Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
b9298823
Commit
b9298823
authored
Sep 15, 2013
by
Richard Newton
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Set hwm on connect socket before bind has happend to just that of connects hwm.
parent
7841b0dd
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
163 additions
and
5 deletions
+163
-5
pipe.cpp
src/pipe.cpp
+2
-2
socket_base.cpp
src/socket_base.cpp
+6
-2
test_hwm.cpp
tests/test_hwm.cpp
+155
-1
No files found.
src/pipe.cpp
View file @
b9298823
...
...
@@ -480,6 +480,6 @@ void zmq::pipe_t::hiccup ()
void
zmq
::
pipe_t
::
set_hwms
(
int
inhwm_
,
int
outhwm_
)
{
lwm
=
compute_lwm
(
inhwm_
);
hwm
=
outhwm_
;
lwm
=
compute_lwm
(
inhwm_
);
hwm
=
outhwm_
;
}
src/socket_base.cpp
View file @
b9298823
...
...
@@ -439,10 +439,14 @@ int zmq::socket_base_t::connect (const char *addr_)
// The total HWM for an inproc connection should be the sum of
// the binder's HWM and the connector's HWM.
int
sndhwm
=
0
;
if
(
options
.
sndhwm
!=
0
&&
peer
.
options
.
rcvhwm
!=
0
)
if
(
peer
.
socket
==
NULL
)
sndhwm
=
options
.
sndhwm
;
else
if
(
options
.
sndhwm
!=
0
&&
peer
.
options
.
rcvhwm
!=
0
)
sndhwm
=
options
.
sndhwm
+
peer
.
options
.
rcvhwm
;
int
rcvhwm
=
0
;
if
(
options
.
rcvhwm
!=
0
&&
peer
.
options
.
sndhwm
!=
0
)
if
(
peer
.
socket
==
NULL
)
rcvhwm
=
options
.
rcvhwm
;
else
if
(
options
.
rcvhwm
!=
0
&&
peer
.
options
.
sndhwm
!=
0
)
rcvhwm
=
options
.
rcvhwm
+
peer
.
options
.
sndhwm
;
// Create a bi-directional pipe to connect the peers.
...
...
tests/test_hwm.cpp
View file @
b9298823
...
...
@@ -26,6 +26,49 @@ const int MAX_SENDS = 10000;
enum
TestType
{
BIND_FIRST
,
CONNECT_FIRST
};
int
test_defaults
()
{
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
int
rc
;
// Set up bind socket
void
*
bind_socket
=
zmq_socket
(
ctx
,
ZMQ_PULL
);
assert
(
bind_socket
);
rc
=
zmq_bind
(
bind_socket
,
"inproc://a"
);
assert
(
rc
==
0
);
// Set up connect socket
void
*
connect_socket
=
zmq_socket
(
ctx
,
ZMQ_PUSH
);
assert
(
connect_socket
);
rc
=
zmq_connect
(
connect_socket
,
"inproc://a"
);
assert
(
rc
==
0
);
// Send until we block
int
send_count
=
0
;
while
(
send_count
<
MAX_SENDS
&&
zmq_send
(
connect_socket
,
NULL
,
0
,
ZMQ_DONTWAIT
)
==
0
)
++
send_count
;
// Now receive all sent messages
int
recv_count
=
0
;
while
(
zmq_recv
(
bind_socket
,
NULL
,
0
,
ZMQ_DONTWAIT
)
==
0
)
++
recv_count
;
assert
(
send_count
==
recv_count
);
// Clean up
rc
=
zmq_close
(
connect_socket
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
bind_socket
);
assert
(
rc
==
0
);
rc
=
zmq_ctx_term
(
ctx
);
assert
(
rc
==
0
);
return
send_count
;
}
int
count_msg
(
int
send_hwm
,
int
recv_hwm
,
TestType
testType
)
{
void
*
ctx
=
zmq_ctx_new
();
...
...
@@ -114,12 +157,114 @@ int test_inproc_connect_first (int send_hwm, int recv_hwm)
return
count_msg
(
send_hwm
,
recv_hwm
,
CONNECT_FIRST
);
}
int
test_inproc_connect_and_close_first
(
int
send_hwm
,
int
recv_hwm
)
{
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
int
rc
;
// Set up connect socket
void
*
connect_socket
=
zmq_socket
(
ctx
,
ZMQ_PUSH
);
assert
(
connect_socket
);
rc
=
zmq_setsockopt
(
connect_socket
,
ZMQ_SNDHWM
,
&
send_hwm
,
sizeof
(
send_hwm
));
assert
(
rc
==
0
);
rc
=
zmq_connect
(
connect_socket
,
"inproc://a"
);
assert
(
rc
==
0
);
// Send until we block
int
send_count
=
0
;
while
(
send_count
<
MAX_SENDS
&&
zmq_send
(
connect_socket
,
NULL
,
0
,
ZMQ_DONTWAIT
)
==
0
)
++
send_count
;
// Close connect
rc
=
zmq_close
(
connect_socket
);
assert
(
rc
==
0
);
// Set up bind socket
void
*
bind_socket
=
zmq_socket
(
ctx
,
ZMQ_PULL
);
assert
(
bind_socket
);
rc
=
zmq_setsockopt
(
bind_socket
,
ZMQ_RCVHWM
,
&
recv_hwm
,
sizeof
(
recv_hwm
));
assert
(
rc
==
0
);
rc
=
zmq_bind
(
bind_socket
,
"inproc://a"
);
assert
(
rc
==
0
);
// Now receive all sent messages
int
recv_count
=
0
;
while
(
zmq_recv
(
bind_socket
,
NULL
,
0
,
ZMQ_DONTWAIT
)
==
0
)
++
recv_count
;
assert
(
send_count
==
recv_count
);
// Clean up
rc
=
zmq_close
(
bind_socket
);
assert
(
rc
==
0
);
rc
=
zmq_ctx_term
(
ctx
);
assert
(
rc
==
0
);
return
send_count
;
}
int
test_inproc_bind_and_close_first
(
int
send_hwm
,
int
recv_hwm
)
{
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
int
rc
;
// Set up bind socket
void
*
bind_socket
=
zmq_socket
(
ctx
,
ZMQ_PUSH
);
assert
(
bind_socket
);
rc
=
zmq_setsockopt
(
bind_socket
,
ZMQ_SNDHWM
,
&
send_hwm
,
sizeof
(
send_hwm
));
assert
(
rc
==
0
);
rc
=
zmq_bind
(
bind_socket
,
"inproc://a"
);
assert
(
rc
==
0
);
// Send until we block
int
send_count
=
0
;
while
(
send_count
<
MAX_SENDS
&&
zmq_send
(
bind_socket
,
NULL
,
0
,
ZMQ_DONTWAIT
)
==
0
)
++
send_count
;
// Close bind
rc
=
zmq_close
(
bind_socket
);
assert
(
rc
==
0
);
/* Can't currently do connect without then wiring up a bind as things hang, this needs top be fixed.
// Set up connect socket
void *connect_socket = zmq_socket (ctx, ZMQ_PULL);
assert (connect_socket);
rc = zmq_setsockopt (connect_socket, ZMQ_RCVHWM, &recv_hwm, sizeof (recv_hwm));
assert (rc == 0);
rc = zmq_connect (connect_socket, "inproc://a");
assert (rc == 0);
// Now receive all sent messages
int recv_count = 0;
while (zmq_recv (connect_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
++recv_count;
assert (send_count == recv_count);
*/
// Clean up
//rc = zmq_close (connect_socket);
//assert (rc == 0);
rc
=
zmq_ctx_term
(
ctx
);
assert
(
rc
==
0
);
return
send_count
;
}
int
main
(
void
)
{
setup_test_environment
();
int
count
;
// Default values are 1000 on send and 1000 one receive, so 2000 total
count
=
test_defaults
();
assert
(
count
==
2000
);
// Infinite send and receive buffer
count
=
test_inproc_bind_first
(
0
,
0
);
assert
(
count
==
MAX_SENDS
);
...
...
@@ -138,11 +283,20 @@ int main (void)
count
=
test_inproc_connect_first
(
0
,
1
);
assert
(
count
==
MAX_SENDS
);
// Send and recv buffers 1, so total that can be queued is 2
// Send and recv buffers
hwm
1, so total that can be queued is 2
count
=
test_inproc_bind_first
(
1
,
1
);
assert
(
count
==
2
);
count
=
test_inproc_connect_first
(
1
,
1
);
assert
(
count
==
2
);
// Send hwm of 1, send before bind so total that can be queued is 1
count
=
test_inproc_connect_and_close_first
(
1
,
0
);
assert
(
count
==
1
);
// Send hwm of 1, send from bind side before connect so total that can be queued should be 1,
// however currently all messages get thrown away before the connect. BUG?
count
=
test_inproc_bind_and_close_first
(
1
,
0
);
//assert (count == 1);
return
0
;
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment