Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
c0cda2e6
Commit
c0cda2e6
authored
Jun 05, 2015
by
Constantin Rack
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #1426 from ricnewton/master
Allow changing of high water marks after connection is established
parents
a3b8f80f
dc949624
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
181 additions
and
82 deletions
+181
-82
ctx.cpp
src/ctx.cpp
+11
-12
options.cpp
src/options.cpp
+1
-14
pipe.cpp
src/pipe.cpp
+20
-2
pipe.hpp
src/pipe.hpp
+7
-0
socket_base.cpp
src/socket_base.cpp
+18
-0
socket_base.hpp
src/socket_base.hpp
+2
-0
test_hwm.cpp
tests/test_hwm.cpp
+2
-2
test_sockopt_hwm.cpp
tests/test_sockopt_hwm.cpp
+120
-52
No files found.
src/ctx.cpp
View file @
c0cda2e6
...
...
@@ -529,15 +529,6 @@ void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
errno_assert
(
rc
==
0
);
}
int
sndhwm
=
0
;
if
(
pending_connection_
.
endpoint
.
options
.
sndhwm
!=
0
&&
bind_options
.
rcvhwm
!=
0
)
sndhwm
=
pending_connection_
.
endpoint
.
options
.
sndhwm
+
bind_options
.
rcvhwm
;
int
rcvhwm
=
0
;
if
(
pending_connection_
.
endpoint
.
options
.
rcvhwm
!=
0
&&
bind_options
.
sndhwm
!=
0
)
rcvhwm
=
pending_connection_
.
endpoint
.
options
.
rcvhwm
+
bind_options
.
sndhwm
;
bool
conflate
=
pending_connection_
.
endpoint
.
options
.
conflate
&&
(
pending_connection_
.
endpoint
.
options
.
type
==
ZMQ_DEALER
||
pending_connection_
.
endpoint
.
options
.
type
==
ZMQ_PULL
||
...
...
@@ -545,9 +536,17 @@ void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
pending_connection_
.
endpoint
.
options
.
type
==
ZMQ_PUB
||
pending_connection_
.
endpoint
.
options
.
type
==
ZMQ_SUB
);
int
hwms
[
2
]
=
{
conflate
?
-
1
:
sndhwm
,
conflate
?
-
1
:
rcvhwm
};
pending_connection_
.
connect_pipe
->
set_hwms
(
hwms
[
1
],
hwms
[
0
]);
pending_connection_
.
bind_pipe
->
set_hwms
(
hwms
[
0
],
hwms
[
1
]);
if
(
!
conflate
)
{
pending_connection_
.
connect_pipe
->
set_hwms_boost
(
bind_options
.
sndhwm
,
bind_options
.
rcvhwm
);
pending_connection_
.
bind_pipe
->
set_hwms_boost
(
pending_connection_
.
endpoint
.
options
.
sndhwm
,
pending_connection_
.
endpoint
.
options
.
rcvhwm
);
pending_connection_
.
connect_pipe
->
set_hwms
(
pending_connection_
.
endpoint
.
options
.
rcvhwm
,
pending_connection_
.
endpoint
.
options
.
sndhwm
);
pending_connection_
.
bind_pipe
->
set_hwms
(
bind_options
.
rcvhwm
,
bind_options
.
sndhwm
);
}
else
{
pending_connection_
.
connect_pipe
->
set_hwms
(
-
1
,
-
1
);
pending_connection_
.
bind_pipe
->
set_hwms
(
-
1
,
-
1
);
}
if
(
side_
==
bind_side
)
{
command_t
cmd
;
...
...
src/options.cpp
View file @
c0cda2e6
...
...
@@ -888,18 +888,5 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
bool
zmq
::
options_t
::
is_valid
(
int
option_
)
const
{
bool
valid
=
true
;
if
(
connected
)
{
switch
(
option_
)
{
case
ZMQ_SNDHWM
:
case
ZMQ_RCVHWM
:
valid
=
false
;
break
;
default
:
break
;
}
}
return
valid
;
return
true
;
}
src/pipe.cpp
View file @
c0cda2e6
...
...
@@ -81,6 +81,8 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
out_active
(
true
),
hwm
(
outhwm_
),
lwm
(
compute_lwm
(
inhwm_
)),
inhwmboost
(
0
),
outhwmboost
(
0
),
msgs_read
(
0
),
msgs_written
(
0
),
peers_msgs_read
(
0
),
...
...
@@ -518,8 +520,24 @@ void zmq::pipe_t::hiccup ()
void
zmq
::
pipe_t
::
set_hwms
(
int
inhwm_
,
int
outhwm_
)
{
lwm
=
compute_lwm
(
inhwm_
);
hwm
=
outhwm_
;
int
in
=
inhwm_
+
inhwmboost
;
int
out
=
outhwm_
+
outhwmboost
;
// if either send or recv side has hwm <= 0 it means infinite so we should set hwms infinite
if
(
inhwm_
<=
0
||
inhwmboost
<=
0
)
in
=
0
;
if
(
outhwm_
<=
0
||
outhwmboost
<=
0
)
out
=
0
;
lwm
=
compute_lwm
(
in
);
hwm
=
out
;
}
void
zmq
::
pipe_t
::
set_hwms_boost
(
int
inhwmboost_
,
int
outhwmboost_
)
{
inhwmboost
=
inhwmboost_
;
outhwmboost
=
outhwmboost_
;
}
bool
zmq
::
pipe_t
::
check_hwm
()
const
...
...
src/pipe.hpp
View file @
c0cda2e6
...
...
@@ -133,6 +133,9 @@ namespace zmq
// set the high water marks.
void
set_hwms
(
int
inhwm_
,
int
outhwm_
);
// set the boost to high water marks, used by inproc sockets so total hwm are sum of connect and bind sockets watermarks
void
set_hwms_boost
(
int
inhwmboost_
,
int
outhwmboost_
);
// check HWM
bool
check_hwm
()
const
;
private
:
...
...
@@ -176,6 +179,10 @@ namespace zmq
// Low watermark for the inbound pipe.
int
lwm
;
// boosts for high and low watermarks, used with inproc sockets so hwm are sum of send and recv hmws on each side of pipe
int
inhwmboost
;
int
outhwmboost
;
// Number of messages read and written so far.
uint64_t
msgs_read
;
uint64_t
msgs_written
;
...
...
src/socket_base.cpp
View file @
c0cda2e6
...
...
@@ -335,6 +335,7 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
// If the socket type doesn't support the option, pass it to
// the generic option parser.
rc
=
options
.
setsockopt
(
option_
,
optval_
,
optvallen_
);
update_pipe_options
(
option_
);
EXIT_MUTEX
();
return
rc
;
...
...
@@ -612,6 +613,11 @@ int zmq::socket_base_t::connect (const char *addr_)
int
hwms
[
2
]
=
{
conflate
?
-
1
:
sndhwm
,
conflate
?
-
1
:
rcvhwm
};
bool
conflates
[
2
]
=
{
conflate
,
conflate
};
int
rc
=
pipepair
(
parents
,
new_pipes
,
hwms
,
conflates
);
if
(
!
conflate
)
{
new_pipes
[
0
]
->
set_hwms_boost
(
peer
.
options
.
sndhwm
,
peer
.
options
.
rcvhwm
);
new_pipes
[
1
]
->
set_hwms_boost
(
options
.
sndhwm
,
options
.
rcvhwm
);
}
errno_assert
(
rc
==
0
);
if
(
!
peer
.
socket
)
{
...
...
@@ -1249,6 +1255,18 @@ void zmq::socket_base_t::process_term (int linger_)
own_t
::
process_term
(
linger_
);
}
void
zmq
::
socket_base_t
::
update_pipe_options
(
int
option_
)
{
if
(
option_
==
ZMQ_SNDHWM
||
option_
==
ZMQ_RCVHWM
)
{
for
(
pipes_t
::
size_type
i
=
0
;
i
!=
pipes
.
size
();
++
i
)
{
pipes
[
i
]
->
set_hwms
(
options
.
rcvhwm
,
options
.
sndhwm
);
}
}
}
void
zmq
::
socket_base_t
::
process_destroy
()
{
destroyed
=
true
;
...
...
src/socket_base.hpp
View file @
c0cda2e6
...
...
@@ -232,6 +232,8 @@ namespace zmq
void
process_bind
(
zmq
::
pipe_t
*
pipe_
);
void
process_term
(
int
linger_
);
void
update_pipe_options
(
int
option_
);
// Socket's mailbox object.
i_mailbox
*
mailbox
;
...
...
tests/test_hwm.cpp
View file @
c0cda2e6
...
...
@@ -278,13 +278,13 @@ int main (void)
count
=
test_inproc_connect_first
(
0
,
0
);
assert
(
count
==
MAX_SENDS
);
// Infinite
send
buffer
// Infinite
receive
buffer
count
=
test_inproc_bind_first
(
1
,
0
);
assert
(
count
==
MAX_SENDS
);
count
=
test_inproc_connect_first
(
1
,
0
);
assert
(
count
==
MAX_SENDS
);
// Infinite
receive
buffer
// Infinite
send
buffer
count
=
test_inproc_bind_first
(
0
,
1
);
assert
(
count
==
MAX_SENDS
);
count
=
test_inproc_connect_first
(
0
,
1
);
...
...
tests/test_sockopt_hwm.cpp
View file @
c0cda2e6
#include "testutil.hpp"
void
test_valid_hwm_change
()
const
int
MAX_SENDS
=
10000
;
void
test_change_before_connected
()
{
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
int
rc
;
int
rc
;
void
*
ctx
=
zmq_ctx_new
();
void
*
bind_socket
=
zmq_socket
(
ctx
,
ZMQ_SUB
);
assert
(
bind_socket
);
void
*
bind_socket
=
zmq_socket
(
ctx
,
ZMQ_PUSH
);
void
*
connect_socket
=
zmq_socket
(
ctx
,
ZMQ_PULL
);
int
val
=
500
;
rc
=
zmq_setsockopt
(
bind_socket
,
ZMQ_RCVHWM
,
&
val
,
sizeof
(
val
));
assert
(
rc
==
0
);
int
val
=
2
;
rc
=
zmq_setsockopt
(
connect_socket
,
ZMQ_RCVHWM
,
&
val
,
sizeof
(
val
));
assert
(
rc
==
0
);
rc
=
zmq_setsockopt
(
bind_socket
,
ZMQ_SNDHWM
,
&
val
,
sizeof
(
val
));
assert
(
rc
==
0
);
rc
=
zmq_bind
(
bind
_socket
,
"inproc://a"
);
assert
(
rc
==
0
);
zmq_connect
(
connect
_socket
,
"inproc://a"
);
zmq_bind
(
bind_socket
,
"inproc://a"
);
size_t
placeholder
=
sizeof
(
val
);
val
=
0
;
rc
=
zmq_getsockopt
(
bind_socket
,
ZMQ_RCVHWM
,
&
val
,
&
placeholder
);
assert
(
rc
==
0
);
assert
(
val
==
500
);
}
size_t
placeholder
=
sizeof
(
val
);
val
=
0
;
rc
=
zmq_getsockopt
(
bind_socket
,
ZMQ_SNDHWM
,
&
val
,
&
placeholder
);
assert
(
rc
==
0
);
assert
(
val
==
2
);
int
send_count
=
0
;
while
(
send_count
<
MAX_SENDS
&&
zmq_send
(
bind_socket
,
NULL
,
0
,
ZMQ_DONTWAIT
)
==
0
)
++
send_count
;
assert
(
send_count
==
4
);
zmq_close
(
bind_socket
);
zmq_close
(
connect_socket
);
zmq_ctx_term
(
ctx
);
}
/**
* Test that zmq_setsockopt() fails to change the RCVHWM when called
* after a call to zmq_bind().
*/
void
test_invalid_hwm_change_bind
()
void
test_change_after_connected
()
{
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
int
rc
;
int
rc
;
void
*
ctx
=
zmq_ctx_new
();
void
*
bind_socket
=
zmq_socket
(
ctx
,
ZMQ_SUB
);
assert
(
bind_socket
);
void
*
bind_socket
=
zmq_socket
(
ctx
,
ZMQ_PUSH
);
void
*
connect_socket
=
zmq_socket
(
ctx
,
ZMQ_PULL
);
rc
=
zmq_bind
(
bind_socket
,
"inproc://a"
);
assert
(
rc
==
0
);
int
val
=
1
;
rc
=
zmq_setsockopt
(
connect_socket
,
ZMQ_RCVHWM
,
&
val
,
sizeof
(
val
));
assert
(
rc
==
0
);
rc
=
zmq_setsockopt
(
bind_socket
,
ZMQ_SNDHWM
,
&
val
,
sizeof
(
val
));
assert
(
rc
==
0
);
int
val
=
500
;
rc
=
zmq_setsockopt
(
bind_socket
,
ZMQ_RCVHWM
,
&
val
,
sizeof
(
val
));
assert
(
rc
==
-
1
);
zmq_connect
(
connect_socket
,
"inproc://a"
);
zmq_bind
(
bind_socket
,
"inproc://a"
);
zmq_close
(
bind_socket
)
;
zmq_ctx_term
(
ctx
);
}
val
=
5
;
rc
=
zmq_setsockopt
(
bind_socket
,
ZMQ_SNDHWM
,
&
val
,
sizeof
(
val
)
);
assert
(
rc
==
0
);
void
test_invalid_hwm_change_connect
()
{
void
*
ctx
=
zmq_ctx_new
(
);
assert
(
ctx
);
int
rc
;
size_t
placeholder
=
sizeof
(
val
);
val
=
0
;
rc
=
zmq_getsockopt
(
bind_socket
,
ZMQ_SNDHWM
,
&
val
,
&
placeholder
);
assert
(
rc
==
0
);
assert
(
val
==
5
)
;
void
*
connect_socket
=
zmq_socket
(
ctx
,
ZMQ_SUB
);
assert
(
connect_socket
);
int
send_count
=
0
;
while
(
send_count
<
MAX_SENDS
&&
zmq_send
(
bind_socket
,
NULL
,
0
,
ZMQ_DONTWAIT
)
==
0
)
++
send_count
;
rc
=
zmq_connect
(
connect_socket
,
"inproc://a"
);
assert
(
rc
==
0
);
assert
(
send_count
==
6
);
int
val
=
500
;
rc
=
zmq_setsockopt
(
connect_socket
,
ZMQ_RCVHWM
,
&
val
,
sizeof
(
val
));
assert
(
rc
==
-
1
);
zmq_close
(
bind_socket
);
zmq_close
(
connect_socket
);
zmq_ctx_term
(
ctx
);
}
zmq_close
(
connect_socket
);
zmq_ctx_term
(
ctx
);
void
test_decrease_when_full
()
{
int
rc
;
void
*
ctx
=
zmq_ctx_new
();
void
*
bind_socket
=
zmq_socket
(
ctx
,
ZMQ_PUSH
);
void
*
connect_socket
=
zmq_socket
(
ctx
,
ZMQ_PULL
);
int
val
=
1
;
rc
=
zmq_setsockopt
(
connect_socket
,
ZMQ_RCVHWM
,
&
val
,
sizeof
(
val
));
assert
(
rc
==
0
);
val
=
100
;
rc
=
zmq_setsockopt
(
bind_socket
,
ZMQ_SNDHWM
,
&
val
,
sizeof
(
val
));
assert
(
rc
==
0
);
zmq_bind
(
bind_socket
,
"inproc://a"
);
zmq_connect
(
connect_socket
,
"inproc://a"
);
// Fill up to hwm
int
send_count
=
0
;
while
(
send_count
<
MAX_SENDS
&&
zmq_send
(
bind_socket
,
&
send_count
,
sizeof
(
send_count
),
ZMQ_DONTWAIT
)
==
sizeof
(
send_count
))
++
send_count
;
assert
(
send_count
==
101
);
// Descrease snd hwm
val
=
70
;
rc
=
zmq_setsockopt
(
bind_socket
,
ZMQ_SNDHWM
,
&
val
,
sizeof
(
val
));
assert
(
rc
==
0
);
size_t
placeholder
=
sizeof
(
val
);
val
=
0
;
rc
=
zmq_getsockopt
(
bind_socket
,
ZMQ_SNDHWM
,
&
val
,
&
placeholder
);
assert
(
rc
==
0
);
assert
(
val
==
70
);
// Read out all data (should get up to previous hwm worth so none were dropped)
int
read_count
=
0
;
int
read_data
=
0
;
while
(
read_count
<
MAX_SENDS
&&
zmq_recv
(
connect_socket
,
&
read_data
,
sizeof
(
read_data
),
ZMQ_DONTWAIT
)
==
sizeof
(
read_data
))
{
assert
(
read_count
==
read_data
);
++
read_count
;
}
assert
(
read_count
==
101
);
// Give io thread some time to catch up
msleep
(
10
);
// Fill up to new hwm
send_count
=
0
;
while
(
send_count
<
MAX_SENDS
&&
zmq_send
(
bind_socket
,
&
send_count
,
sizeof
(
send_count
),
ZMQ_DONTWAIT
)
==
sizeof
(
send_count
))
++
send_count
;
// Really this should be 71, but the lwm stuff kicks in doesn't seem quite right
assert
(
send_count
>
0
);
zmq_close
(
bind_socket
);
zmq_close
(
connect_socket
);
zmq_ctx_term
(
ctx
);
}
int
main
()
{
test_valid_hwm_change
();
test_invalid_hwm_change_bin
d
();
test_invalid_hwm_change_connect
();
test_change_before_connected
();
test_change_after_connecte
d
();
test_decrease_when_full
();
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment