Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
17b95683
Commit
17b95683
authored
Sep 01, 2017
by
Luca Boccassi
Committed by
GitHub
Sep 01, 2017
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #2730 from sigiesec/fix-issue-2623
Problem: unable to query state of a router for a particular peer
parents
4691714d
79e28af4
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
313 additions
and
95 deletions
+313
-95
zmq.h
include/zmq.h
+4
-0
router.cpp
src/router.cpp
+21
-0
router.hpp
src/router.hpp
+1
-0
socket_base.cpp
src/socket_base.cpp
+8
-0
socket_base.hpp
src/socket_base.hpp
+5
-1
zmq.cpp
src/zmq.cpp
+67
-87
zmq_draft.h
src/zmq_draft.h
+4
-0
test_router_mandatory.cpp
tests/test_router_mandatory.cpp
+203
-7
No files found.
include/zmq.h
View file @
17b95683
...
@@ -661,6 +661,10 @@ ZMQ_EXPORT int zmq_poller_modify_fd (void *poller, int fd, short events);
...
@@ -661,6 +661,10 @@ ZMQ_EXPORT int zmq_poller_modify_fd (void *poller, int fd, short events);
ZMQ_EXPORT
int
zmq_poller_remove_fd
(
void
*
poller
,
int
fd
);
ZMQ_EXPORT
int
zmq_poller_remove_fd
(
void
*
poller
,
int
fd
);
#endif
#endif
ZMQ_EXPORT
int
zmq_socket_get_peer_state
(
void
*
socket
,
const
void
*
identity
,
size_t
identity_size
);
/******************************************************************************/
/******************************************************************************/
/* Scheduling timers */
/* Scheduling timers */
/******************************************************************************/
/******************************************************************************/
...
...
src/router.cpp
View file @
17b95683
...
@@ -443,6 +443,27 @@ zmq::blob_t zmq::router_t::get_credential () const
...
@@ -443,6 +443,27 @@ zmq::blob_t zmq::router_t::get_credential () const
return
fq
.
get_credential
();
return
fq
.
get_credential
();
}
}
int
zmq
::
router_t
::
get_peer_state
(
const
void
*
identity
,
size_t
identity_size
)
const
{
int
res
=
0
;
blob_t
identity_blob
((
unsigned
char
*
)
identity
,
identity_size
);
outpipes_t
::
const_iterator
it
=
outpipes
.
find
(
identity_blob
);
if
(
it
==
outpipes
.
end
())
{
errno
=
EHOSTUNREACH
;
return
-
1
;
}
const
outpipe_t
&
outpipe
=
it
->
second
;
if
(
outpipe
.
pipe
->
check_hwm
())
res
|=
ZMQ_POLLOUT
;
/** \todo does it make any sense to check the inpipe as well? */
return
res
;
}
bool
zmq
::
router_t
::
identify_peer
(
pipe_t
*
pipe_
)
bool
zmq
::
router_t
::
identify_peer
(
pipe_t
*
pipe_
)
{
{
msg_t
msg
;
msg_t
msg
;
...
...
src/router.hpp
View file @
17b95683
...
@@ -64,6 +64,7 @@ namespace zmq
...
@@ -64,6 +64,7 @@ namespace zmq
void
xread_activated
(
zmq
::
pipe_t
*
pipe_
);
void
xread_activated
(
zmq
::
pipe_t
*
pipe_
);
void
xwrite_activated
(
zmq
::
pipe_t
*
pipe_
);
void
xwrite_activated
(
zmq
::
pipe_t
*
pipe_
);
void
xpipe_terminated
(
zmq
::
pipe_t
*
pipe_
);
void
xpipe_terminated
(
zmq
::
pipe_t
*
pipe_
);
int
get_peer_state
(
const
void
*
identity
,
size_t
identity_size
)
const
;
protected
:
protected
:
...
...
src/socket_base.cpp
View file @
17b95683
...
@@ -221,6 +221,14 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool
...
@@ -221,6 +221,14 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool
}
}
}
}
int
zmq
::
socket_base_t
::
get_peer_state
(
const
void
*
identity
,
size_t
identity_size
)
const
{
// Only ROUTER sockets support this
errno
=
ENOTSUP
;
return
-
1
;
}
zmq
::
socket_base_t
::~
socket_base_t
()
zmq
::
socket_base_t
::~
socket_base_t
()
{
{
if
(
mailbox
)
if
(
mailbox
)
...
...
src/socket_base.hpp
View file @
17b95683
...
@@ -138,6 +138,11 @@ namespace zmq
...
@@ -138,6 +138,11 @@ namespace zmq
void
event_handshake_failed_auth
(
const
std
::
string
&
addr_
,
int
err_
);
void
event_handshake_failed_auth
(
const
std
::
string
&
addr_
,
int
err_
);
void
event_handshake_succeeded
(
const
std
::
string
&
addr_
,
int
err_
);
void
event_handshake_succeeded
(
const
std
::
string
&
addr_
,
int
err_
);
// Query the state of a specific peer. The default implementation
// always returns an ENOTSUP error.
virtual
int
get_peer_state
(
const
void
*
identity
,
size_t
identity_size
)
const
;
protected
:
protected
:
socket_base_t
(
zmq
::
ctx_t
*
parent_
,
uint32_t
tid_
,
int
sid_
,
bool
thread_safe_
=
false
);
socket_base_t
(
zmq
::
ctx_t
*
parent_
,
uint32_t
tid_
,
int
sid_
,
bool
thread_safe_
=
false
);
...
@@ -180,7 +185,6 @@ namespace zmq
...
@@ -180,7 +185,6 @@ namespace zmq
// Delay actual destruction of the socket.
// Delay actual destruction of the socket.
void
process_destroy
();
void
process_destroy
();
// Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types
// Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types
std
::
string
connect_rid
;
std
::
string
connect_rid
;
...
...
src/zmq.cpp
View file @
17b95683
...
@@ -247,6 +247,16 @@ int zmq_ctx_destroy (void *ctx_)
...
@@ -247,6 +247,16 @@ int zmq_ctx_destroy (void *ctx_)
// Sockets
// Sockets
static
zmq
::
socket_base_t
*
as_socket_base_t
(
void
*
s_
)
{
zmq
::
socket_base_t
*
s
=
static_cast
<
zmq
::
socket_base_t
*>
(
s_
);
if
(
!
s_
||
!
s
->
check_tag
())
{
errno
=
ENOTSOCK
;
return
NULL
;
}
return
s
;
}
void
*
zmq_socket
(
void
*
ctx_
,
int
type_
)
void
*
zmq_socket
(
void
*
ctx_
,
int
type_
)
{
{
if
(
!
ctx_
||
!
((
zmq
::
ctx_t
*
)
ctx_
)
->
check_tag
())
{
if
(
!
ctx_
||
!
((
zmq
::
ctx_t
*
)
ctx_
)
->
check_tag
())
{
...
@@ -260,109 +270,83 @@ void *zmq_socket (void *ctx_, int type_)
...
@@ -260,109 +270,83 @@ void *zmq_socket (void *ctx_, int type_)
int
zmq_close
(
void
*
s_
)
int
zmq_close
(
void
*
s_
)
{
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
zmq
::
socket_base_t
*
s
=
as_socket_base_t
(
s_
);
errno
=
ENOTSOCK
;
if
(
!
s
)
return
-
1
;
return
-
1
;
}
s
->
close
();
((
zmq
::
socket_base_t
*
)
s_
)
->
close
();
return
0
;
return
0
;
}
}
int
zmq_setsockopt
(
void
*
s_
,
int
option_
,
const
void
*
optval_
,
int
zmq_setsockopt
(
void
*
s_
,
int
option_
,
const
void
*
optval_
,
size_t
optvallen_
)
size_t
optvallen_
)
{
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
zmq
::
socket_base_t
*
s
=
as_socket_base_t
(
s_
);
errno
=
ENOTSOCK
;
if
(
!
s
)
return
-
1
;
return
-
1
;
}
return
s
->
setsockopt
(
option_
,
optval_
,
optvallen_
);
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
int
result
=
s
->
setsockopt
(
option_
,
optval_
,
optvallen_
);
return
result
;
}
}
int
zmq_getsockopt
(
void
*
s_
,
int
option_
,
void
*
optval_
,
size_t
*
optvallen_
)
int
zmq_getsockopt
(
void
*
s_
,
int
option_
,
void
*
optval_
,
size_t
*
optvallen_
)
{
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
zmq
::
socket_base_t
*
s
=
as_socket_base_t
(
s_
);
errno
=
ENOTSOCK
;
if
(
!
s
)
return
-
1
;
return
-
1
;
}
return
s
->
getsockopt
(
option_
,
optval_
,
optvallen_
);
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
int
result
=
s
->
getsockopt
(
option_
,
optval_
,
optvallen_
);
return
result
;
}
}
int
zmq_socket_monitor
(
void
*
s_
,
const
char
*
addr_
,
int
events_
)
int
zmq_socket_monitor
(
void
*
s_
,
const
char
*
addr_
,
int
events_
)
{
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
zmq
::
socket_base_t
*
s
=
as_socket_base_t
(
s_
);
errno
=
ENOTSOCK
;
if
(
!
s
)
return
-
1
;
return
-
1
;
}
return
s
->
monitor
(
addr_
,
events_
);
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
int
result
=
s
->
monitor
(
addr_
,
events_
);
return
result
;
}
}
int
zmq_join
(
void
*
s_
,
const
char
*
group_
)
int
zmq_join
(
void
*
s_
,
const
char
*
group_
)
{
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
zmq
::
socket_base_t
*
s
=
as_socket_base_t
(
s_
);
errno
=
ENOTSOCK
;
if
(
!
s
)
return
-
1
;
return
-
1
;
}
return
s
->
join
(
group_
);
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
int
result
=
s
->
join
(
group_
);
return
result
;
}
}
int
zmq_leave
(
void
*
s_
,
const
char
*
group_
)
int
zmq_leave
(
void
*
s_
,
const
char
*
group_
)
{
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
zmq
::
socket_base_t
*
s
=
as_socket_base_t
(
s_
);
errno
=
ENOTSOCK
;
if
(
!
s
)
return
-
1
;
return
-
1
;
}
return
s
->
leave
(
group_
);
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
int
result
=
s
->
leave
(
group_
);
return
result
;
}
}
int
zmq_bind
(
void
*
s_
,
const
char
*
addr_
)
int
zmq_bind
(
void
*
s_
,
const
char
*
addr_
)
{
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
zmq
::
socket_base_t
*
s
=
as_socket_base_t
(
s_
);
errno
=
ENOTSOCK
;
if
(
!
s
)
return
-
1
;
return
-
1
;
}
return
s
->
bind
(
addr_
);
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
int
result
=
s
->
bind
(
addr_
);
return
result
;
}
}
int
zmq_connect
(
void
*
s_
,
const
char
*
addr_
)
int
zmq_connect
(
void
*
s_
,
const
char
*
addr_
)
{
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
zmq
::
socket_base_t
*
s
=
as_socket_base_t
(
s_
);
errno
=
ENOTSOCK
;
if
(
!
s
)
return
-
1
;
return
-
1
;
}
return
s
->
connect
(
addr_
);
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
int
result
=
s
->
connect
(
addr_
);
return
result
;
}
}
int
zmq_unbind
(
void
*
s_
,
const
char
*
addr_
)
int
zmq_unbind
(
void
*
s_
,
const
char
*
addr_
)
{
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
zmq
::
socket_base_t
*
s
=
as_socket_base_t
(
s_
);
errno
=
ENOTSOCK
;
if
(
!
s
)
return
-
1
;
return
-
1
;
}
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
return
s
->
term_endpoint
(
addr_
);
return
s
->
term_endpoint
(
addr_
);
}
}
int
zmq_disconnect
(
void
*
s_
,
const
char
*
addr_
)
int
zmq_disconnect
(
void
*
s_
,
const
char
*
addr_
)
{
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
zmq
::
socket_base_t
*
s
=
as_socket_base_t
(
s_
);
errno
=
ENOTSOCK
;
if
(
!
s
)
return
-
1
;
return
-
1
;
}
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
return
s
->
term_endpoint
(
addr_
);
return
s
->
term_endpoint
(
addr_
);
}
}
...
@@ -392,10 +376,9 @@ int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_)
...
@@ -392,10 +376,9 @@ int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_)
int
zmq_send
(
void
*
s_
,
const
void
*
buf_
,
size_t
len_
,
int
flags_
)
int
zmq_send
(
void
*
s_
,
const
void
*
buf_
,
size_t
len_
,
int
flags_
)
{
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
zmq
::
socket_base_t
*
s
=
as_socket_base_t
(
s_
);
errno
=
ENOTSOCK
;
if
(
!
s
)
return
-
1
;
return
-
1
;
}
zmq_msg_t
msg
;
zmq_msg_t
msg
;
if
(
zmq_msg_init_size
(
&
msg
,
len_
))
if
(
zmq_msg_init_size
(
&
msg
,
len_
))
return
-
1
;
return
-
1
;
...
@@ -405,7 +388,6 @@ int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
...
@@ -405,7 +388,6 @@ int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
assert
(
buf_
);
assert
(
buf_
);
memcpy
(
zmq_msg_data
(
&
msg
),
buf_
,
len_
);
memcpy
(
zmq_msg_data
(
&
msg
),
buf_
,
len_
);
}
}
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
int
rc
=
s_sendmsg
(
s
,
&
msg
,
flags_
);
int
rc
=
s_sendmsg
(
s
,
&
msg
,
flags_
);
if
(
unlikely
(
rc
<
0
))
{
if
(
unlikely
(
rc
<
0
))
{
int
err
=
errno
;
int
err
=
errno
;
...
@@ -421,16 +403,14 @@ int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
...
@@ -421,16 +403,14 @@ int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
int
zmq_send_const
(
void
*
s_
,
const
void
*
buf_
,
size_t
len_
,
int
flags_
)
int
zmq_send_const
(
void
*
s_
,
const
void
*
buf_
,
size_t
len_
,
int
flags_
)
{
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
zmq
::
socket_base_t
*
s
=
as_socket_base_t
(
s_
);
errno
=
ENOTSOCK
;
if
(
!
s
)
return
-
1
;
return
-
1
;
}
zmq_msg_t
msg
;
zmq_msg_t
msg
;
int
rc
=
zmq_msg_init_data
(
&
msg
,
(
void
*
)
buf_
,
len_
,
NULL
,
NULL
);
int
rc
=
zmq_msg_init_data
(
&
msg
,
(
void
*
)
buf_
,
len_
,
NULL
,
NULL
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
-
1
;
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
rc
=
s_sendmsg
(
s
,
&
msg
,
flags_
);
rc
=
s_sendmsg
(
s
,
&
msg
,
flags_
);
if
(
unlikely
(
rc
<
0
))
{
if
(
unlikely
(
rc
<
0
))
{
int
err
=
errno
;
int
err
=
errno
;
...
@@ -454,10 +434,9 @@ int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_)
...
@@ -454,10 +434,9 @@ int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_)
//
//
int
zmq_sendiov
(
void
*
s_
,
iovec
*
a_
,
size_t
count_
,
int
flags_
)
int
zmq_sendiov
(
void
*
s_
,
iovec
*
a_
,
size_t
count_
,
int
flags_
)
{
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
zmq
::
socket_base_t
*
s
=
as_socket_base_t
(
s_
);
errno
=
ENOTSOCK
;
if
(
!
s
)
return
-
1
;
return
-
1
;
}
if
(
unlikely
(
count_
<=
0
||
!
a_
))
{
if
(
unlikely
(
count_
<=
0
||
!
a_
))
{
errno
=
EINVAL
;
errno
=
EINVAL
;
return
-
1
;
return
-
1
;
...
@@ -465,7 +444,6 @@ int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
...
@@ -465,7 +444,6 @@ int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
int
rc
=
0
;
int
rc
=
0
;
zmq_msg_t
msg
;
zmq_msg_t
msg
;
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
for
(
size_t
i
=
0
;
i
<
count_
;
++
i
)
{
for
(
size_t
i
=
0
;
i
<
count_
;
++
i
)
{
rc
=
zmq_msg_init_size
(
&
msg
,
a_
[
i
].
iov_len
);
rc
=
zmq_msg_init_size
(
&
msg
,
a_
[
i
].
iov_len
);
...
@@ -512,15 +490,13 @@ int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_)
...
@@ -512,15 +490,13 @@ int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_)
int
zmq_recv
(
void
*
s_
,
void
*
buf_
,
size_t
len_
,
int
flags_
)
int
zmq_recv
(
void
*
s_
,
void
*
buf_
,
size_t
len_
,
int
flags_
)
{
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
zmq
::
socket_base_t
*
s
=
as_socket_base_t
(
s_
);
errno
=
ENOTSOCK
;
if
(
!
s
)
return
-
1
;
return
-
1
;
}
zmq_msg_t
msg
;
zmq_msg_t
msg
;
int
rc
=
zmq_msg_init
(
&
msg
);
int
rc
=
zmq_msg_init
(
&
msg
);
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
int
nbytes
=
s_recvmsg
(
s
,
&
msg
,
flags_
);
int
nbytes
=
s_recvmsg
(
s
,
&
msg
,
flags_
);
if
(
unlikely
(
nbytes
<
0
))
{
if
(
unlikely
(
nbytes
<
0
))
{
int
err
=
errno
;
int
err
=
errno
;
...
@@ -562,17 +538,14 @@ int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
...
@@ -562,17 +538,14 @@ int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
//
//
int
zmq_recviov
(
void
*
s_
,
iovec
*
a_
,
size_t
*
count_
,
int
flags_
)
int
zmq_recviov
(
void
*
s_
,
iovec
*
a_
,
size_t
*
count_
,
int
flags_
)
{
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
zmq
::
socket_base_t
*
s
=
as_socket_base_t
(
s_
);
errno
=
ENOTSOCK
;
if
(
!
s
)
return
-
1
;
return
-
1
;
}
if
(
unlikely
(
!
count_
||
*
count_
<=
0
||
!
a_
))
{
if
(
unlikely
(
!
count_
||
*
count_
<=
0
||
!
a_
))
{
errno
=
EINVAL
;
errno
=
EINVAL
;
return
-
1
;
return
-
1
;
}
}
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
size_t
count
=
*
count_
;
size_t
count
=
*
count_
;
int
nread
=
0
;
int
nread
=
0
;
bool
recvmore
=
true
;
bool
recvmore
=
true
;
...
@@ -634,24 +607,18 @@ int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
...
@@ -634,24 +607,18 @@ int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
int
zmq_msg_send
(
zmq_msg_t
*
msg_
,
void
*
s_
,
int
flags_
)
int
zmq_msg_send
(
zmq_msg_t
*
msg_
,
void
*
s_
,
int
flags_
)
{
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
zmq
::
socket_base_t
*
s
=
as_socket_base_t
(
s_
);
errno
=
ENOTSOCK
;
if
(
!
s
)
return
-
1
;
return
-
1
;
}
return
s_sendmsg
(
s
,
msg_
,
flags_
);
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
int
result
=
s_sendmsg
(
s
,
msg_
,
flags_
);
return
result
;
}
}
int
zmq_msg_recv
(
zmq_msg_t
*
msg_
,
void
*
s_
,
int
flags_
)
int
zmq_msg_recv
(
zmq_msg_t
*
msg_
,
void
*
s_
,
int
flags_
)
{
{
if
(
!
s_
||
!
((
zmq
::
socket_base_t
*
)
s_
)
->
check_tag
())
{
zmq
::
socket_base_t
*
s
=
as_socket_base_t
(
s_
);
errno
=
ENOTSOCK
;
if
(
!
s
)
return
-
1
;
return
-
1
;
}
return
s_recvmsg
(
s
,
msg_
,
flags_
);
zmq
::
socket_base_t
*
s
=
(
zmq
::
socket_base_t
*
)
s_
;
int
result
=
s_recvmsg
(
s
,
msg_
,
flags_
);
return
result
;
}
}
int
zmq_msg_close
(
zmq_msg_t
*
msg_
)
int
zmq_msg_close
(
zmq_msg_t
*
msg_
)
...
@@ -1389,6 +1356,19 @@ int zmq_poller_wait_all (void *poller_, zmq_poller_event_t *events_, int n_event
...
@@ -1389,6 +1356,19 @@ int zmq_poller_wait_all (void *poller_, zmq_poller_event_t *events_, int n_event
return
rc
;
return
rc
;
}
}
// Peer-specific state
int
zmq_socket_get_peer_state
(
void
*
s_
,
const
void
*
identity
,
size_t
identity_size
)
{
zmq
::
socket_base_t
*
s
=
as_socket_base_t
(
s_
);
if
(
!
s
)
return
-
1
;
return
s
->
get_peer_state
(
identity
,
identity_size
);
}
// Timers
// Timers
void
*
zmq_timers_new
(
void
)
void
*
zmq_timers_new
(
void
)
...
...
src/zmq_draft.h
View file @
17b95683
...
@@ -141,6 +141,10 @@ int zmq_poller_modify_fd (void *poller, int fd, short events);
...
@@ -141,6 +141,10 @@ int zmq_poller_modify_fd (void *poller, int fd, short events);
int
zmq_poller_remove_fd
(
void
*
poller
,
int
fd
);
int
zmq_poller_remove_fd
(
void
*
poller
,
int
fd
);
#endif
#endif
int
zmq_socket_get_peer_state
(
void
*
socket
,
const
void
*
identity
,
size_t
identity_size
);
/******************************************************************************/
/******************************************************************************/
/* Scheduling timers */
/* Scheduling timers */
/******************************************************************************/
/******************************************************************************/
...
...
tests/test_router_mandatory.cpp
View file @
17b95683
...
@@ -29,9 +29,195 @@
...
@@ -29,9 +29,195 @@
#include "testutil.hpp"
#include "testutil.hpp"
int
main
(
void
)
#ifdef ZMQ_BUILD_DRAFT_API
bool
send_msg_to_peer_if_ready
(
void
*
router
,
const
char
*
peer_identity
)
{
int
rc
=
zmq_socket_get_peer_state
(
router
,
peer_identity
,
1
);
if
(
rc
==
-
1
)
printf
(
"zmq_socket_get_peer_state failed for %s: %i
\n
"
,
peer_identity
,
errno
);
assert
(
rc
!=
-
1
);
if
(
rc
&
ZMQ_POLLOUT
)
{
rc
=
zmq_send
(
router
,
peer_identity
,
1
,
ZMQ_SNDMORE
|
ZMQ_DONTWAIT
);
assert
(
rc
==
1
);
rc
=
zmq_send
(
router
,
"Hello"
,
5
,
ZMQ_DONTWAIT
);
assert
(
rc
==
5
);
return
true
;
}
return
false
;
}
#endif
void
test_get_peer_state
()
{
#ifdef ZMQ_BUILD_DRAFT_API
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
void
*
router
=
zmq_socket
(
ctx
,
ZMQ_ROUTER
);
assert
(
router
);
int
rc
;
int
mandatory
=
1
;
rc
=
zmq_setsockopt
(
router
,
ZMQ_ROUTER_MANDATORY
,
&
mandatory
,
sizeof
(
mandatory
));
const
char
*
my_endpoint
=
"inproc://test_get_peer_state"
;
rc
=
zmq_bind
(
router
,
my_endpoint
);
assert
(
rc
==
0
);
void
*
dealer1
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
assert
(
dealer1
);
void
*
dealer2
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
assert
(
dealer2
);
// Lower HWMs to allow doing the test with fewer messages
int
hwm
=
100
;
rc
=
zmq_setsockopt
(
router
,
ZMQ_SNDHWM
,
&
hwm
,
sizeof
(
int
));
assert
(
rc
==
0
);
rc
=
zmq_setsockopt
(
dealer1
,
ZMQ_RCVHWM
,
&
hwm
,
sizeof
(
int
));
assert
(
rc
==
0
);
rc
=
zmq_setsockopt
(
dealer2
,
ZMQ_RCVHWM
,
&
hwm
,
sizeof
(
int
));
assert
(
rc
==
0
);
const
char
*
dealer1_identity
=
"X"
;
const
char
*
dealer2_identity
=
"Y"
;
// Name dealer1 "X" and connect it to our router
rc
=
zmq_setsockopt
(
dealer1
,
ZMQ_IDENTITY
,
dealer1_identity
,
1
);
assert
(
rc
==
0
);
rc
=
zmq_connect
(
dealer1
,
my_endpoint
);
assert
(
rc
==
0
);
// Name dealer2 "Y" and connect it to our router
rc
=
zmq_setsockopt
(
dealer2
,
ZMQ_IDENTITY
,
dealer2_identity
,
1
);
assert
(
rc
==
0
);
rc
=
zmq_connect
(
dealer2
,
my_endpoint
);
assert
(
rc
==
0
);
// Get message from both dealers to know when connection is ready
char
buffer
[
255
];
rc
=
zmq_send
(
dealer1
,
"Hello"
,
5
,
0
);
assert
(
rc
==
5
);
rc
=
zmq_recv
(
router
,
buffer
,
255
,
0
);
assert
(
rc
==
1
);
assert
(
0
==
memcmp
(
buffer
,
dealer1_identity
,
rc
));
rc
=
zmq_recv
(
router
,
buffer
,
255
,
0
);
assert
(
rc
==
5
);
rc
=
zmq_send
(
dealer2
,
"Hello"
,
5
,
0
);
assert
(
rc
==
5
);
rc
=
zmq_recv
(
router
,
buffer
,
255
,
0
);
assert
(
rc
==
1
);
assert
(
0
==
memcmp
(
buffer
,
dealer2_identity
,
rc
));
rc
=
zmq_recv
(
router
,
buffer
,
255
,
0
);
assert
(
rc
==
5
);
void
*
poller
=
zmq_poller_new
();
assert
(
poller
);
// Poll on router and dealer1, but not on dealer2
rc
=
zmq_poller_add
(
poller
,
router
,
NULL
,
ZMQ_POLLOUT
);
assert
(
rc
==
0
);
rc
=
zmq_poller_add
(
poller
,
dealer1
,
NULL
,
ZMQ_POLLIN
);
assert
(
rc
==
0
);
const
size_t
count
=
10000
;
const
size_t
event_size
=
2
;
bool
dealer2_blocked
=
false
;
size_t
dealer1_sent
=
0
,
dealer2_sent
=
0
,
dealer1_received
=
0
;
zmq_poller_event_t
events
[
event_size
];
for
(
size_t
iteration
=
0
;
iteration
<
count
;
++
iteration
)
{
rc
=
zmq_poller_wait_all
(
poller
,
events
,
event_size
,
-
1
);
assert
(
rc
!=
-
1
);
for
(
size_t
event_no
=
0
;
event_no
<
event_size
;
++
event_no
)
{
const
zmq_poller_event_t
&
current_event
=
events
[
event_no
];
if
(
current_event
.
socket
==
router
&&
current_event
.
events
&
ZMQ_POLLOUT
)
{
if
(
send_msg_to_peer_if_ready
(
router
,
dealer1_identity
))
++
dealer1_sent
;
if
(
send_msg_to_peer_if_ready
(
router
,
dealer2_identity
))
++
dealer2_sent
;
else
dealer2_blocked
=
true
;
}
if
(
current_event
.
socket
==
dealer1
&&
current_event
.
events
&
ZMQ_POLLIN
)
{
rc
=
zmq_recv
(
dealer1
,
buffer
,
255
,
ZMQ_DONTWAIT
);
assert
(
rc
==
5
);
int
more
;
size_t
more_size
=
sizeof
(
more
);
rc
=
zmq_getsockopt
(
dealer1
,
ZMQ_RCVMORE
,
&
more
,
&
more_size
);
assert
(
rc
==
0
);
assert
(
!
more
);
++
dealer1_received
;
}
// never read from dealer2, so its pipe becomes full eventually
}
}
printf
(
"dealer1_sent = %zu, dealer2_sent = %zu, dealer1_received = %zu
\n
"
,
dealer1_sent
,
dealer2_sent
,
dealer1_received
);
assert
(
dealer2_blocked
);
zmq_poller_destroy
(
&
poller
);
rc
=
zmq_close
(
router
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
dealer1
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
dealer2
);
assert
(
rc
==
0
);
rc
=
zmq_ctx_term
(
ctx
);
assert
(
rc
==
0
);
#endif
}
void
test_get_peer_state_corner_cases
()
{
#ifdef ZMQ_BUILD_DRAFT_API
const
char
peer_identity
[]
=
"foo"
;
// call get_peer_state with NULL socket
int
rc
=
zmq_socket_get_peer_state
(
NULL
,
peer_identity
,
strlen
(
peer_identity
));
assert
(
rc
==
-
1
&&
errno
==
ENOTSOCK
);
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
void
*
dealer
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
assert
(
dealer
);
void
*
router
=
zmq_socket
(
ctx
,
ZMQ_ROUTER
);
assert
(
router
);
// call get_peer_state with a non-ROUTER socket
rc
=
zmq_socket_get_peer_state
(
dealer
,
peer_identity
,
strlen
(
peer_identity
));
assert
(
rc
==
-
1
&&
errno
==
ENOTSUP
);
// call get_peer_state for an unknown identity
rc
=
zmq_socket_get_peer_state
(
router
,
peer_identity
,
strlen
(
peer_identity
));
assert
(
rc
==
-
1
&&
errno
==
EHOSTUNREACH
);
rc
=
zmq_close
(
router
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
dealer
);
assert
(
rc
==
0
);
rc
=
zmq_ctx_term
(
ctx
);
assert
(
rc
==
0
);
#endif
}
void
test_basic
()
{
{
setup_test_environment
();
size_t
len
=
MAX_SOCKET_STRING
;
size_t
len
=
MAX_SOCKET_STRING
;
char
my_endpoint
[
MAX_SOCKET_STRING
];
char
my_endpoint
[
MAX_SOCKET_STRING
];
void
*
ctx
=
zmq_ctx_new
();
void
*
ctx
=
zmq_ctx_new
();
...
@@ -55,7 +241,8 @@ int main (void)
...
@@ -55,7 +241,8 @@ int main (void)
// Send a message to an unknown peer with mandatory routing
// Send a message to an unknown peer with mandatory routing
// This will fail
// This will fail
int
mandatory
=
1
;
int
mandatory
=
1
;
rc
=
zmq_setsockopt
(
router
,
ZMQ_ROUTER_MANDATORY
,
&
mandatory
,
sizeof
(
mandatory
));
rc
=
zmq_setsockopt
(
router
,
ZMQ_ROUTER_MANDATORY
,
&
mandatory
,
sizeof
(
mandatory
));
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_send
(
router
,
"UNKNOWN"
,
7
,
ZMQ_SNDMORE
);
rc
=
zmq_send
(
router
,
"UNKNOWN"
,
7
,
ZMQ_SNDMORE
);
assert
(
rc
==
-
1
&&
errno
==
EHOSTUNREACH
);
assert
(
rc
==
-
1
&&
errno
==
EHOSTUNREACH
);
...
@@ -69,12 +256,12 @@ int main (void)
...
@@ -69,12 +256,12 @@ int main (void)
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// Get message from dealer to know when connection is ready
// Get message from dealer to know when connection is ready
char
buffer
[
255
];
char
buffer
[
255
];
rc
=
zmq_send
(
dealer
,
"Hello"
,
5
,
0
);
rc
=
zmq_send
(
dealer
,
"Hello"
,
5
,
0
);
assert
(
rc
==
5
);
assert
(
rc
==
5
);
rc
=
zmq_recv
(
router
,
buffer
,
255
,
0
);
rc
=
zmq_recv
(
router
,
buffer
,
255
,
0
);
assert
(
rc
==
1
);
assert
(
rc
==
1
);
assert
(
buffer
[
0
]
==
'X'
);
assert
(
buffer
[
0
]
==
'X'
);
// Send a message to connected dealer now
// Send a message to connected dealer now
// It should work
// It should work
...
@@ -82,7 +269,7 @@ int main (void)
...
@@ -82,7 +269,7 @@ int main (void)
assert
(
rc
==
1
);
assert
(
rc
==
1
);
rc
=
zmq_send
(
router
,
"Hello"
,
5
,
0
);
rc
=
zmq_send
(
router
,
"Hello"
,
5
,
0
);
assert
(
rc
==
5
);
assert
(
rc
==
5
);
rc
=
zmq_close
(
router
);
rc
=
zmq_close
(
router
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
...
@@ -91,6 +278,15 @@ int main (void)
...
@@ -91,6 +278,15 @@ int main (void)
rc
=
zmq_ctx_term
(
ctx
);
rc
=
zmq_ctx_term
(
ctx
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
}
int
main
(
void
)
{
setup_test_environment
();
test_basic
();
test_get_peer_state
();
test_get_peer_state_corner_cases
();
return
0
;
return
0
;
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment