Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
d9a3cc48
Commit
d9a3cc48
authored
Aug 08, 2014
by
kreuzberger
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
do not silently drop messages in publisher if hwm is reached
parent
446e8efb
Hide whitespace changes
Inline
Side-by-side
Showing
13 changed files
with
345 additions
and
4 deletions
+345
-4
zmq.h
include/zmq.h
+1
-0
dist.cpp
src/dist.cpp
+20
-0
dist.hpp
src/dist.hpp
+3
-0
options.cpp
src/options.cpp
+13
-1
options.hpp
src/options.hpp
+3
-0
pipe.cpp
src/pipe.cpp
+6
-0
pipe.hpp
src/pipe.hpp
+2
-0
xpub.cpp
src/xpub.cpp
+15
-2
xpub.hpp
src/xpub.hpp
+3
-0
CMakeLists.txt
tests/CMakeLists.txt
+2
-0
Makefile.am
tests/Makefile.am
+5
-1
test_hwm_pubsub.cpp
tests/test_hwm_pubsub.cpp
+161
-0
test_xpub_wait_inproc.cpp
tests/test_xpub_wait_inproc.cpp
+111
-0
No files found.
include/zmq.h
View file @
d9a3cc48
...
@@ -307,6 +307,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property);
...
@@ -307,6 +307,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property);
#define ZMQ_HANDSHAKE_IVL 66
#define ZMQ_HANDSHAKE_IVL 66
#define ZMQ_IDENTITY_FD 67
#define ZMQ_IDENTITY_FD 67
#define ZMQ_SOCKS_PROXY 68
#define ZMQ_SOCKS_PROXY 68
#define ZMQ_XPUB_WAIT 69
/* Message options */
/* Message options */
#define ZMQ_MORE 1
#define ZMQ_MORE 1
...
...
src/dist.cpp
View file @
d9a3cc48
...
@@ -194,3 +194,23 @@ bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_)
...
@@ -194,3 +194,23 @@ bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_)
return
true
;
return
true
;
}
}
bool
zmq
::
dist_t
::
check_hwm
()
{
// If there are no matching pipes available, there is nothing to write.
bool
pipes_hwm_ok
=
true
;
if
(
matching
==
0
)
{
return
true
;
}
for
(
pipes_t
::
size_type
i
=
0
;
i
<
matching
;
++
i
)
{
if
(
!
pipes
[
i
]
->
check_hwm
())
{
pipes_hwm_ok
=
false
;
break
;
}
}
return
pipes_hwm_ok
;
}
src/dist.hpp
View file @
d9a3cc48
...
@@ -64,6 +64,9 @@ namespace zmq
...
@@ -64,6 +64,9 @@ namespace zmq
bool
has_out
();
bool
has_out
();
// check HWM of all pipes matching
bool
check_hwm
();
private
:
private
:
// Write the message to the pipe. Make the pipe inactive if writing
// Write the message to the pipe. Make the pipe inactive if writing
...
...
src/options.cpp
View file @
d9a3cc48
...
@@ -458,6 +458,12 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
...
@@ -458,6 +458,12 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
return
0
;
return
0
;
}
}
break
;
break
;
case
ZMQ_XPUB_WAIT
:
{
pubWait
=
true
;
return
0
;
}
break
;
default
:
default
:
#if defined (ZMQ_ACT_MILITANT)
#if defined (ZMQ_ACT_MILITANT)
...
@@ -804,7 +810,13 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
...
@@ -804,7 +810,13 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
return
0
;
return
0
;
}
}
break
;
break
;
case
ZMQ_XPUB_WAIT
:
if
(
is_int
)
{
*
value
=
pubWait
;
return
0
;
}
break
;
default
:
default
:
#if defined (ZMQ_ACT_MILITANT)
#if defined (ZMQ_ACT_MILITANT)
malformed
=
false
;
malformed
=
false
;
...
...
src/options.hpp
View file @
d9a3cc48
...
@@ -179,6 +179,9 @@ namespace zmq
...
@@ -179,6 +179,9 @@ namespace zmq
// close socket. Default is 30 secs. 0 means no handshake timeout.
// close socket. Default is 30 secs. 0 means no handshake timeout.
int
handshake_ivl
;
int
handshake_ivl
;
// flag if PUB socket should block if reaching HWM
bool
pubWait
;
};
};
}
}
...
...
src/pipe.cpp
View file @
d9a3cc48
...
@@ -500,3 +500,9 @@ void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_)
...
@@ -500,3 +500,9 @@ void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_)
lwm
=
compute_lwm
(
inhwm_
);
lwm
=
compute_lwm
(
inhwm_
);
hwm
=
outhwm_
;
hwm
=
outhwm_
;
}
}
bool
zmq
::
pipe_t
::
check_hwm
()
{
bool
full
=
hwm
>
0
&&
msgs_written
-
peers_msgs_read
>=
uint64_t
(
hwm
-
1
);
return
(
!
full
);
}
src/pipe.hpp
View file @
d9a3cc48
...
@@ -118,6 +118,8 @@ namespace zmq
...
@@ -118,6 +118,8 @@ namespace zmq
// set the high water marks.
// set the high water marks.
void
set_hwms
(
int
inhwm_
,
int
outhwm_
);
void
set_hwms
(
int
inhwm_
,
int
outhwm_
);
// check HWM
bool
check_hwm
();
// provide a way to link pipe to engine fd. Set on session initialization
// provide a way to link pipe to engine fd. Set on session initialization
fd_t
assoc_fd
;
//=retired_fd
fd_t
assoc_fd
;
//=retired_fd
private
:
private
:
...
...
src/xpub.cpp
View file @
d9a3cc48
...
@@ -90,7 +90,7 @@ void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
...
@@ -90,7 +90,7 @@ void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
int
zmq
::
xpub_t
::
xsetsockopt
(
int
option_
,
const
void
*
optval_
,
int
zmq
::
xpub_t
::
xsetsockopt
(
int
option_
,
const
void
*
optval_
,
size_t
optvallen_
)
size_t
optvallen_
)
{
{
if
(
option_
!=
ZMQ_XPUB_VERBOSE
)
{
if
(
option_
!=
ZMQ_XPUB_VERBOSE
&&
option_
!=
ZMQ_XPUB_WAIT
)
{
errno
=
EINVAL
;
errno
=
EINVAL
;
return
-
1
;
return
-
1
;
}
}
...
@@ -98,7 +98,15 @@ int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
...
@@ -98,7 +98,15 @@ int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
errno
=
EINVAL
;
errno
=
EINVAL
;
return
-
1
;
return
-
1
;
}
}
verbose
=
(
*
static_cast
<
const
int
*>
(
optval_
)
!=
0
);
if
(
option_
==
ZMQ_XPUB_VERBOSE
)
{
verbose
=
(
*
static_cast
<
const
int
*>
(
optval_
)
!=
0
);
}
else
if
(
option_
==
ZMQ_XPUB_WAIT
)
{
wait
=
(
*
static_cast
<
const
int
*>
(
optval_
)
!=
0
);
}
else
{
return
-
1
;
}
return
0
;
return
0
;
}
}
...
@@ -127,6 +135,11 @@ int zmq::xpub_t::xsend (msg_t *msg_)
...
@@ -127,6 +135,11 @@ int zmq::xpub_t::xsend (msg_t *msg_)
subscriptions
.
match
((
unsigned
char
*
)
msg_
->
data
(),
msg_
->
size
(),
subscriptions
.
match
((
unsigned
char
*
)
msg_
->
data
(),
msg_
->
size
(),
mark_as_matching
,
this
);
mark_as_matching
,
this
);
if
(
wait
&&
!
dist
.
check_hwm
())
{
return
EAGAIN
;
}
// Send the message to all the pipes that were marked as matching
// Send the message to all the pipes that were marked as matching
// in the previous step.
// in the previous step.
int
rc
=
dist
.
send_to_matching
(
msg_
);
int
rc
=
dist
.
send_to_matching
(
msg_
);
...
...
src/xpub.hpp
View file @
d9a3cc48
...
@@ -79,6 +79,9 @@ namespace zmq
...
@@ -79,6 +79,9 @@ namespace zmq
// True if we are in the middle of sending a multi-part message.
// True if we are in the middle of sending a multi-part message.
bool
more
;
bool
more
;
// wait for reaching LWM if HWM is reached
bool
wait
;
// List of pending (un)subscriptions, ie. those that were already
// List of pending (un)subscriptions, ie. those that were already
// applied to the trie, but not yet received by the user.
// applied to the trie, but not yet received by the user.
typedef
std
::
basic_string
<
unsigned
char
>
blob_t
;
typedef
std
::
basic_string
<
unsigned
char
>
blob_t
;
...
...
tests/CMakeLists.txt
View file @
d9a3cc48
...
@@ -7,6 +7,7 @@ set(tests
...
@@ -7,6 +7,7 @@ set(tests
test_reqrep_inproc
test_reqrep_inproc
test_reqrep_tcp
test_reqrep_tcp
test_hwm
test_hwm
test_hwm_pubsub
test_reqrep_device
test_reqrep_device
test_sub_forward
test_sub_forward
test_invalid_rep
test_invalid_rep
...
@@ -42,6 +43,7 @@ set(tests
...
@@ -42,6 +43,7 @@ set(tests
test_many_sockets
test_many_sockets
test_diffserv
test_diffserv
test_connect_rid
test_connect_rid
test_xpub_wait_inproc
)
)
if
(
NOT WIN32
)
if
(
NOT WIN32
)
list
(
APPEND tests
list
(
APPEND tests
...
...
tests/Makefile.am
View file @
d9a3cc48
...
@@ -9,6 +9,7 @@ noinst_PROGRAMS = test_system \
...
@@ -9,6 +9,7 @@ noinst_PROGRAMS = test_system \
test_reqrep_inproc
\
test_reqrep_inproc
\
test_reqrep_tcp
\
test_reqrep_tcp
\
test_hwm
\
test_hwm
\
test_hwm_pubsub
\
test_reqrep_device
\
test_reqrep_device
\
test_sub_forward
\
test_sub_forward
\
test_invalid_rep
\
test_invalid_rep
\
...
@@ -52,7 +53,8 @@ noinst_PROGRAMS = test_system \
...
@@ -52,7 +53,8 @@ noinst_PROGRAMS = test_system \
test_bind_src_address
\
test_bind_src_address
\
test_metadata
\
test_metadata
\
test_id2fd
\
test_id2fd
\
test_capabilities
test_capabilities
\
test_xpub_wait_inproc
if
!ON_MINGW
if
!ON_MINGW
noinst_PROGRAMS
+=
test_shutdown_stress
\
noinst_PROGRAMS
+=
test_shutdown_stress
\
...
@@ -86,6 +88,7 @@ test_pair_tcp_SOURCES = test_pair_tcp.cpp testutil.hpp
...
@@ -86,6 +88,7 @@ test_pair_tcp_SOURCES = test_pair_tcp.cpp testutil.hpp
test_reqrep_inproc_SOURCES
=
test_reqrep_inproc.cpp testutil.hpp
test_reqrep_inproc_SOURCES
=
test_reqrep_inproc.cpp testutil.hpp
test_reqrep_tcp_SOURCES
=
test_reqrep_tcp.cpp testutil.hpp
test_reqrep_tcp_SOURCES
=
test_reqrep_tcp.cpp testutil.hpp
test_hwm_SOURCES
=
test_hwm.cpp
test_hwm_SOURCES
=
test_hwm.cpp
test_hwm_pubsub_SOURCES
=
test_hwm_pubsub.cpp
test_reqrep_device_SOURCES
=
test_reqrep_device.cpp
test_reqrep_device_SOURCES
=
test_reqrep_device.cpp
test_sub_forward_SOURCES
=
test_sub_forward.cpp
test_sub_forward_SOURCES
=
test_sub_forward.cpp
test_invalid_rep_SOURCES
=
test_invalid_rep.cpp
test_invalid_rep_SOURCES
=
test_invalid_rep.cpp
...
@@ -131,6 +134,7 @@ test_bind_src_address_SOURCES = test_bind_src_address.cpp
...
@@ -131,6 +134,7 @@ test_bind_src_address_SOURCES = test_bind_src_address.cpp
test_metadata_SOURCES
=
test_metadata.cpp
test_metadata_SOURCES
=
test_metadata.cpp
test_id2fd_SOURCES
=
test_id2fd.cpp
test_id2fd_SOURCES
=
test_id2fd.cpp
test_capabilities_SOURCES
=
test_capabilities.cpp
test_capabilities_SOURCES
=
test_capabilities.cpp
test_xpub_wait_inproc_SOURCES
=
test_xpub_wait_inproc.cpp
if
!ON_MINGW
if
!ON_MINGW
test_shutdown_stress_SOURCES
=
test_shutdown_stress.cpp
test_shutdown_stress_SOURCES
=
test_shutdown_stress.cpp
test_pair_ipc_SOURCES
=
test_pair_ipc.cpp testutil.hpp
test_pair_ipc_SOURCES
=
test_pair_ipc.cpp testutil.hpp
...
...
tests/test_hwm_pubsub.cpp
0 → 100644
View file @
d9a3cc48
/*
Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
const
int
MAX_SENDS
=
10000
;
int
test_defaults
(
int
send_hwm
,
int
msgCnt
)
{
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
int
rc
;
// Set up bind socket
void
*
pub_socket
=
zmq_socket
(
ctx
,
ZMQ_PUB
);
assert
(
pub_socket
);
rc
=
zmq_bind
(
pub_socket
,
"inproc://a"
);
assert
(
rc
==
0
);
// Set up connect socket
void
*
sub_socket
=
zmq_socket
(
ctx
,
ZMQ_SUB
);
assert
(
sub_socket
);
rc
=
zmq_connect
(
sub_socket
,
"inproc://a"
);
assert
(
rc
==
0
);
//set a hwm on publisher
rc
=
zmq_setsockopt
(
pub_socket
,
ZMQ_SNDHWM
,
&
send_hwm
,
sizeof
(
send_hwm
));
rc
=
zmq_setsockopt
(
sub_socket
,
ZMQ_SUBSCRIBE
,
0
,
0
);
// Send until we block
int
send_count
=
0
;
while
(
send_count
<
msgCnt
&&
zmq_send
(
pub_socket
,
NULL
,
0
,
ZMQ_DONTWAIT
)
==
0
)
++
send_count
;
// Now receive all sent messages
int
recv_count
=
0
;
while
(
0
==
zmq_recv
(
sub_socket
,
NULL
,
0
,
ZMQ_DONTWAIT
))
{
++
recv_count
;
}
assert
(
send_hwm
==
recv_count
);
// Clean up
rc
=
zmq_close
(
sub_socket
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
pub_socket
);
assert
(
rc
==
0
);
rc
=
zmq_ctx_term
(
ctx
);
assert
(
rc
==
0
);
return
recv_count
;
}
int
receive
(
void
*
socket
)
{
int
recv_count
=
0
;
// Now receive all sent messages
while
(
0
==
zmq_recv
(
socket
,
NULL
,
0
,
ZMQ_DONTWAIT
))
{
++
recv_count
;
}
return
recv_count
;
}
int
test_blocking
(
int
send_hwm
,
int
msgCnt
)
{
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
int
rc
;
// Set up bind socket
void
*
pub_socket
=
zmq_socket
(
ctx
,
ZMQ_PUB
);
assert
(
pub_socket
);
rc
=
zmq_bind
(
pub_socket
,
"inproc://a"
);
assert
(
rc
==
0
);
// Set up connect socket
void
*
sub_socket
=
zmq_socket
(
ctx
,
ZMQ_SUB
);
assert
(
sub_socket
);
rc
=
zmq_connect
(
sub_socket
,
"inproc://a"
);
assert
(
rc
==
0
);
//set a hwm on publisher
rc
=
zmq_setsockopt
(
pub_socket
,
ZMQ_SNDHWM
,
&
send_hwm
,
sizeof
(
send_hwm
));
int
wait
=
1
;
rc
=
zmq_setsockopt
(
pub_socket
,
ZMQ_XPUB_WAIT
,
&
wait
,
sizeof
(
wait
));
rc
=
zmq_setsockopt
(
sub_socket
,
ZMQ_SUBSCRIBE
,
0
,
0
);
// Send until we block
int
send_count
=
0
;
int
recv_count
=
0
;
while
(
send_count
<
msgCnt
)
{
rc
=
zmq_send
(
pub_socket
,
NULL
,
0
,
ZMQ_DONTWAIT
);
if
(
rc
==
0
)
{
++
send_count
;
}
else
if
(
-
1
==
rc
)
{
assert
(
EAGAIN
==
errno
);
recv_count
+=
receive
(
sub_socket
);
assert
(
recv_count
==
send_count
);
}
}
recv_count
+=
receive
(
sub_socket
);
// Clean up
rc
=
zmq_close
(
sub_socket
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
pub_socket
);
assert
(
rc
==
0
);
rc
=
zmq_ctx_term
(
ctx
);
assert
(
rc
==
0
);
return
recv_count
;
}
int
main
(
void
)
{
setup_test_environment
();
int
count
;
// send 1000 msg on hwm 1000, receive 1000
count
=
test_defaults
(
1000
,
1000
);
assert
(
count
==
1000
);
// send 6000 msg on hwm 2000, drops above hwm, only receive hwm
count
=
test_blocking
(
2000
,
6000
);
assert
(
count
==
6000
);
return
0
;
}
tests/test_xpub_wait_inproc.cpp
0 → 100644
View file @
d9a3cc48
/*
Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
int
main
(
void
)
{
setup_test_environment
();
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
// Create a publisher
void
*
pub
=
zmq_socket
(
ctx
,
ZMQ_PUB
);
assert
(
pub
);
int
rc
=
zmq_bind
(
pub
,
"inproc://soname"
);
assert
(
rc
==
0
);
// set pub socket options
int
wait
=
1
;
rc
=
zmq_setsockopt
(
pub
,
ZMQ_XPUB_WAIT
,
&
wait
,
4
);
assert
(
rc
==
0
);
int
hwm
=
2000
;
rc
=
zmq_setsockopt
(
pub
,
ZMQ_SNDHWM
,
&
hwm
,
4
);
assert
(
rc
==
0
);
// Create a subscriber
void
*
sub
=
zmq_socket
(
ctx
,
ZMQ_SUB
);
assert
(
sub
);
rc
=
zmq_connect
(
sub
,
"inproc://soname"
);
assert
(
rc
==
0
);
// Subscribe for all messages.
rc
=
zmq_setsockopt
(
sub
,
ZMQ_SUBSCRIBE
,
""
,
0
);
assert
(
rc
==
0
);
int
hwmlimit
=
hwm
-
1
;
int
send_count
=
0
;
// Send an empty message
for
(
int
i
=
0
;
i
<
hwmlimit
;
i
++
)
{
rc
=
zmq_send
(
pub
,
NULL
,
0
,
0
);
assert
(
rc
==
0
);
send_count
++
;
}
int
recv_count
=
0
;
do
{
// Receive the message in the subscriber
// rc = zmq_recv (sub, buff, sizeof (buff), ZMQ_DONTWAIT);
rc
=
zmq_recv
(
sub
,
NULL
,
0
,
ZMQ_DONTWAIT
);
if
(
-
1
==
rc
)
{
assert
(
EAGAIN
==
errno
);
}
else
{
assert
(
0
==
rc
);
recv_count
++
;
}
}
while
(
0
==
rc
);
assert
(
send_count
==
recv_count
);
// now test real blocking behavior
// set a timeout, default is infinite
int
timeout
=
0
;
rc
=
zmq_setsockopt
(
pub
,
ZMQ_SNDTIMEO
,
&
timeout
,
4
);
assert
(
rc
==
0
);
send_count
=
0
;
recv_count
=
0
;
hwmlimit
=
hwm
;
// Send an empty message
while
(
0
==
zmq_send
(
pub
,
NULL
,
0
,
0
)
)
{
send_count
++
;
}
assert
(
EAGAIN
==
errno
);
while
(
0
==
zmq_recv
(
sub
,
NULL
,
0
,
ZMQ_DONTWAIT
))
{
recv_count
++
;
}
assert
(
send_count
==
recv_count
);
// Clean up.
rc
=
zmq_close
(
pub
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
sub
);
assert
(
rc
==
0
);
rc
=
zmq_ctx_term
(
ctx
);
assert
(
rc
==
0
);
return
0
;
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment