Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
fe82c643
Unverified
Commit
fe82c643
authored
Aug 16, 2018
by
Luca Boccassi
Committed by
GitHub
Aug 16, 2018
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #3220 from sigiesec/code-improvements
Various refactorings
parents
31f69937
987e75b8
Hide whitespace changes
Inline
Side-by-side
Showing
15 changed files
with
556 additions
and
518 deletions
+556
-518
Makefile.am
Makefile.am
+9
-7
address.hpp
src/address.hpp
+1
-0
ctx.cpp
src/ctx.cpp
+2
-18
options.hpp
src/options.hpp
+9
-0
pipe.cpp
src/pipe.cpp
+12
-0
pipe.hpp
src/pipe.hpp
+3
-3
session_base.cpp
src/session_base.cpp
+178
-134
session_base.hpp
src/session_base.hpp
+27
-0
socket_base.cpp
src/socket_base.cpp
+167
-168
socket_base.hpp
src/socket_base.hpp
+42
-23
stream_engine.cpp
src/stream_engine.cpp
+0
-1
stream_engine.hpp
src/stream_engine.hpp
+0
-3
CMakeLists.txt
tests/CMakeLists.txt
+0
-1
test_connect_delay_tipc.cpp
tests/test_connect_delay_tipc.cpp
+106
-117
test_unbind_inproc.cpp
tests/test_unbind_inproc.cpp
+0
-43
No files found.
Makefile.am
View file @
fe82c643
...
...
@@ -404,7 +404,6 @@ test_apps = \
tests/test_stream_disconnect
\
tests/test_stream_timeout
\
tests/test_disconnect_inproc
\
tests/test_unbind_inproc
\
tests/test_unbind_wildcard
\
tests/test_ctx_options
\
tests/test_ctx_destroy
\
...
...
@@ -556,9 +555,6 @@ tests_test_stream_disconnect_LDADD = src/libzmq.la
tests_test_disconnect_inproc_SOURCES
=
tests/test_disconnect_inproc.cpp
tests_test_disconnect_inproc_LDADD
=
src/libzmq.la
tests_test_unbind_inproc_SOURCES
=
tests/test_unbind_inproc.cpp
tests_test_unbind_inproc_LDADD
=
src/libzmq.la
tests_test_unbind_wildcard_SOURCES
=
tests/test_unbind_wildcard.cpp
tests_test_unbind_wildcard_LDADD
=
src/libzmq.la
...
...
@@ -819,7 +815,8 @@ test_apps += \
tests/test_address_tipc
tests_test_connect_delay_tipc_SOURCES
=
tests/test_connect_delay_tipc.cpp
tests_test_connect_delay_tipc_LDADD
=
src/libzmq.la
tests_test_connect_delay_tipc_LDADD
=
src/libzmq.la
${
UNITY_LIBS
}
tests_test_connect_delay_tipc_CPPFLAGS
=
${
UNITY_CPPFLAGS
}
tests_test_pair_tipc_SOURCES
=
tests/test_pair_tipc.cpp
tests_test_pair_tipc_LDADD
=
src/libzmq.la
...
...
@@ -986,8 +983,13 @@ XFAIL_TESTS += tests/test_abstract_ipc
endif
if
ON_GNU
XFAIL_TESTS
+=
test_ipc_wildcard
\
test_term_endpoint
XFAIL_TESTS
+=
tests/test_ipc_wildcard
\
tests/test_term_endpoint
endif
# TODO remove this again when resolving https://github.com/zeromq/libzmq/issues/3124
if
BUILD_TIPC
XFAIL_TESTS
+=
tests/test_connect_delay_tipc
endif
EXTRA_DIST
=
\
...
...
src/address.hpp
View file @
fe82c643
...
...
@@ -49,6 +49,7 @@ class vmci_address_t;
namespace
protocol_name
{
static
const
char
inproc
[]
=
"inproc"
;
static
const
char
tcp
[]
=
"tcp"
;
static
const
char
udp
[]
=
"udp"
;
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
...
...
src/ctx.cpp
View file @
fe82c643
...
...
@@ -617,15 +617,7 @@ void zmq::ctx_t::connect_inproc_sockets (
errno_assert
(
rc
==
0
);
}
bool
conflate
=
pending_connection_
.
endpoint
.
options
.
conflate
&&
(
pending_connection_
.
endpoint
.
options
.
type
==
ZMQ_DEALER
||
pending_connection_
.
endpoint
.
options
.
type
==
ZMQ_PULL
||
pending_connection_
.
endpoint
.
options
.
type
==
ZMQ_PUSH
||
pending_connection_
.
endpoint
.
options
.
type
==
ZMQ_PUB
||
pending_connection_
.
endpoint
.
options
.
type
==
ZMQ_SUB
);
if
(
!
conflate
)
{
if
(
!
get_effective_conflate_option
(
pending_connection_
.
endpoint
.
options
))
{
pending_connection_
.
connect_pipe
->
set_hwms_boost
(
bind_options_
.
sndhwm
,
bind_options_
.
rcvhwm
);
pending_connection_
.
bind_pipe
->
set_hwms_boost
(
...
...
@@ -660,15 +652,7 @@ void zmq::ctx_t::connect_inproc_sockets (
// is open before sending.
if
(
pending_connection_
.
endpoint
.
options
.
recv_routing_id
&&
pending_connection_
.
endpoint
.
socket
->
check_tag
())
{
msg_t
routing_id
;
const
int
rc
=
routing_id
.
init_size
(
bind_options_
.
routing_id_size
);
errno_assert
(
rc
==
0
);
memcpy
(
routing_id
.
data
(),
bind_options_
.
routing_id
,
bind_options_
.
routing_id_size
);
routing_id
.
set_flags
(
msg_t
::
routing_id
);
const
bool
written
=
pending_connection_
.
bind_pipe
->
write
(
&
routing_id
);
zmq_assert
(
written
);
pending_connection_
.
bind_pipe
->
flush
();
send_routing_id
(
pending_connection_
.
bind_pipe
,
bind_options_
);
}
}
...
...
src/options.hpp
View file @
fe82c643
...
...
@@ -268,6 +268,15 @@ struct options_t
std
::
map
<
std
::
string
,
std
::
string
>
app_metadata
;
};
inline
bool
get_effective_conflate_option
(
const
options_t
&
options
)
{
// conflate is only effective for some socket types
return
options
.
conflate
&&
(
options
.
type
==
ZMQ_DEALER
||
options
.
type
==
ZMQ_PULL
||
options
.
type
==
ZMQ_PUSH
||
options
.
type
==
ZMQ_PUB
||
options
.
type
==
ZMQ_SUB
);
}
int
do_getsockopt
(
void
*
const
optval_
,
size_t
*
const
optvallen_
,
const
void
*
value_
,
...
...
src/pipe.cpp
View file @
fe82c643
...
...
@@ -76,6 +76,18 @@ int zmq::pipepair (class object_t *parents_[2],
return
0
;
}
void
zmq
::
send_routing_id
(
pipe_t
*
pipe_
,
const
options_t
&
options_
)
{
zmq
::
msg_t
id
;
const
int
rc
=
id
.
init_size
(
options_
.
routing_id_size
);
errno_assert
(
rc
==
0
);
memcpy
(
id
.
data
(),
options_
.
routing_id
,
options_
.
routing_id_size
);
id
.
set_flags
(
zmq
::
msg_t
::
routing_id
);
const
bool
written
=
pipe_
->
write
(
&
id
);
zmq_assert
(
written
);
pipe_
->
flush
();
}
zmq
::
pipe_t
::
pipe_t
(
object_t
*
parent_
,
upipe_t
*
inpipe_
,
upipe_t
*
outpipe_
,
...
...
src/pipe.hpp
View file @
fe82c643
...
...
@@ -36,6 +36,7 @@
#include "stdint.hpp"
#include "array.hpp"
#include "blob.hpp"
#include "options.hpp"
namespace
zmq
{
...
...
@@ -235,9 +236,6 @@ class pipe_t : public object_t,
// Routing id of the writer. Used uniquely by the reader side.
int
_server_socket_routing_id
;
// Pipe's credential.
blob_t
_credential
;
// Returns true if the message is delimiter; false otherwise.
static
bool
is_delimiter
(
const
msg_t
&
msg_
);
...
...
@@ -250,6 +248,8 @@ class pipe_t : public object_t,
pipe_t
(
const
pipe_t
&
);
const
pipe_t
&
operator
=
(
const
pipe_t
&
);
};
void
send_routing_id
(
pipe_t
*
pipe_
,
const
options_t
&
options_
);
}
#endif
src/session_base.cpp
View file @
fe82c643
...
...
@@ -394,11 +394,7 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
object_t
*
parents
[
2
]
=
{
this
,
_socket
};
pipe_t
*
pipes
[
2
]
=
{
NULL
,
NULL
};
bool
conflate
=
options
.
conflate
&&
(
options
.
type
==
ZMQ_DEALER
||
options
.
type
==
ZMQ_PULL
||
options
.
type
==
ZMQ_PUSH
||
options
.
type
==
ZMQ_PUB
||
options
.
type
==
ZMQ_SUB
);
const
bool
conflate
=
get_effective_conflate_option
(
options
);
int
hwms
[
2
]
=
{
conflate
?
-
1
:
options
.
rcvhwm
,
conflate
?
-
1
:
options
.
sndhwm
};
...
...
@@ -554,6 +550,51 @@ void zmq::session_base_t::reconnect ()
_pipe
->
hiccup
();
}
zmq
::
session_base_t
::
connecter_factory_entry_t
zmq
::
session_base_t
::
_connecter_factories
[]
=
{
std
::
make_pair
(
protocol_name
::
tcp
,
&
zmq
::
session_base_t
::
create_connecter_tcp
),
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
&& !defined ZMQ_HAVE_VXWORKS
std
::
make_pair
(
protocol_name
::
ipc
,
&
zmq
::
session_base_t
::
create_connecter_ipc
),
#endif
#if defined ZMQ_HAVE_TIPC
std
::
make_pair
(
protocol_name
::
tipc
,
&
zmq
::
session_base_t
::
create_connecter_tipc
),
#endif
#if defined ZMQ_HAVE_VMCI
std
::
make_pair
(
protocol_name
::
vmci
,
&
zmq
::
session_base_t
::
create_connecter_vmci
),
#endif
};
zmq
::
session_base_t
::
connecter_factory_map_t
zmq
::
session_base_t
::
_connecter_factories_map
(
_connecter_factories
,
_connecter_factories
+
sizeof
(
_connecter_factories
)
/
sizeof
(
_connecter_factories
[
0
]));
zmq
::
session_base_t
::
start_connecting_entry_t
zmq
::
session_base_t
::
_start_connecting_entries
[]
=
{
std
::
make_pair
(
protocol_name
::
udp
,
&
zmq
::
session_base_t
::
start_connecting_udp
),
#if defined ZMQ_HAVE_OPENPGM
std
::
make_pair
(
"pgm"
,
&
zmq
::
session_base_t
::
start_connecting_pgm
),
std
::
make_pair
(
"epgm"
,
&
zmq
::
session_base_t
::
start_connecting_pgm
),
#endif
#if defined ZMQ_HAVE_NORM
std
::
make_pair
(
"norm"
,
&
zmq
::
session_base_t
::
start_connecting_norm
),
#endif
};
zmq
::
session_base_t
::
start_connecting_map_t
zmq
::
session_base_t
::
_start_connecting_map
(
_start_connecting_entries
,
_start_connecting_entries
+
sizeof
(
_start_connecting_entries
)
/
sizeof
(
_start_connecting_entries
[
0
]));
void
zmq
::
session_base_t
::
start_connecting
(
bool
wait_
)
{
zmq_assert
(
_active
);
...
...
@@ -564,157 +605,160 @@ void zmq::session_base_t::start_connecting (bool wait_)
zmq_assert
(
io_thread
);
// Create the connecter object.
const
connecter_factory_map_t
::
const_iterator
connecter_factories_it
=
_connecter_factories_map
.
find
(
_addr
->
protocol
);
if
(
connecter_factories_it
!=
_connecter_factories_map
.
end
())
{
own_t
*
connecter
=
(
this
->*
connecter_factories_it
->
second
)
(
io_thread
,
wait_
);
if
(
_addr
->
protocol
==
protocol_name
::
tcp
)
{
if
(
!
options
.
socks_proxy_address
.
empty
())
{
address_t
*
proxy_address
=
new
(
std
::
nothrow
)
address_t
(
protocol_name
::
tcp
,
options
.
socks_proxy_address
,
this
->
get_ctx
());
alloc_assert
(
proxy_address
);
socks_connecter_t
*
connecter
=
new
(
std
::
nothrow
)
socks_connecter_t
(
io_thread
,
this
,
options
,
_addr
,
proxy_address
,
wait_
);
alloc_assert
(
connecter
);
launch_child
(
connecter
);
}
else
{
tcp_connecter_t
*
connecter
=
new
(
std
::
nothrow
)
tcp_connecter_t
(
io_thread
,
this
,
options
,
_addr
,
wait_
);
alloc_assert
(
connecter
);
launch_child
(
connecter
);
}
return
;
}
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
&& !defined ZMQ_HAVE_VXWORKS
if
(
_addr
->
protocol
==
protocol_name
::
ipc
)
{
ipc_connecter_t
*
connecter
=
new
(
std
::
nothrow
)
ipc_connecter_t
(
io_thread
,
this
,
options
,
_addr
,
wait_
);
alloc_assert
(
connecter
);
launch_child
(
connecter
);
return
;
}
#endif
#if defined ZMQ_HAVE_TIPC
if
(
_addr
->
protocol
==
protocol_name
::
tipc
)
{
tipc_connecter_t
*
connecter
=
new
(
std
::
nothrow
)
tipc_connecter_t
(
io_thread
,
this
,
options
,
_addr
,
wait_
);
alloc_assert
(
connecter
);
launch_child
(
connecter
);
const
start_connecting_map_t
::
const_iterator
start_connecting_it
=
_start_connecting_map
.
find
(
_addr
->
protocol
);
if
(
start_connecting_it
!=
_start_connecting_map
.
end
())
{
(
this
->*
start_connecting_it
->
second
)
(
io_thread
);
return
;
}
#endif
if
(
_addr
->
protocol
==
protocol_name
::
udp
)
{
zmq_assert
(
options
.
type
==
ZMQ_DISH
||
options
.
type
==
ZMQ_RADIO
||
options
.
type
==
ZMQ_DGRAM
);
udp_engine_t
*
engine
=
new
(
std
::
nothrow
)
udp_engine_t
(
options
);
alloc_assert
(
engine
);
bool
recv
=
false
;
bool
send
=
false
;
if
(
options
.
type
==
ZMQ_RADIO
)
{
send
=
true
;
recv
=
false
;
}
else
if
(
options
.
type
==
ZMQ_DISH
)
{
send
=
false
;
recv
=
true
;
}
else
if
(
options
.
type
==
ZMQ_DGRAM
)
{
send
=
true
;
recv
=
true
;
}
zmq_assert
(
false
);
}
int
rc
=
engine
->
init
(
_addr
,
send
,
recv
);
errno_assert
(
rc
==
0
);
#if defined ZMQ_HAVE_VMCI
zmq
::
own_t
*
zmq
::
session_base_t
::
create_connecter_vmci
(
io_thread_t
*
io_thread_
,
bool
wait_
)
{
return
new
(
std
::
nothrow
)
vmci_connecter_t
(
io_thread_
,
this
,
options
,
_addr
,
wait_
);
}
#endif
send_attach
(
this
,
engine
);
#if defined ZMQ_HAVE_TIPC
zmq
::
own_t
*
zmq
::
session_base_t
::
create_connecter_tipc
(
io_thread_t
*
io_thread_
,
bool
wait_
)
{
return
new
(
std
::
nothrow
)
tipc_connecter_t
(
io_thread_
,
this
,
options
,
_addr
,
wait_
);
}
#endif
return
;
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
&& !defined ZMQ_HAVE_VXWORKS
zmq
::
own_t
*
zmq
::
session_base_t
::
create_connecter_ipc
(
io_thread_t
*
io_thread_
,
bool
wait_
)
{
return
new
(
std
::
nothrow
)
ipc_connecter_t
(
io_thread_
,
this
,
options
,
_addr
,
wait_
);
}
#endif
zmq
::
own_t
*
zmq
::
session_base_t
::
create_connecter_tcp
(
io_thread_t
*
io_thread_
,
bool
wait_
)
{
if
(
!
options
.
socks_proxy_address
.
empty
())
{
address_t
*
proxy_address
=
new
(
std
::
nothrow
)
address_t
(
protocol_name
::
tcp
,
options
.
socks_proxy_address
,
this
->
get_ctx
());
alloc_assert
(
proxy_address
);
return
new
(
std
::
nothrow
)
socks_connecter_t
(
io_thread_
,
this
,
options
,
_addr
,
proxy_address
,
wait_
);
}
return
new
(
std
::
nothrow
)
tcp_connecter_t
(
io_thread_
,
this
,
options
,
_addr
,
wait_
);
}
#ifdef ZMQ_HAVE_OPENPGM
void
zmq
::
session_base_t
::
start_connecting_pgm
(
io_thread_t
*
io_thread_
)
{
zmq_assert
(
options
.
type
==
ZMQ_PUB
||
options
.
type
==
ZMQ_XPUB
||
options
.
type
==
ZMQ_SUB
||
options
.
type
==
ZMQ_XSUB
);
// For EPGM transport with UDP encapsulation of PGM is used.
bool
const
udp_encapsulation
=
_addr
->
protocol
==
"epgm"
;
// At this point we'll create message pipes to the session straight
// away. There's no point in delaying it as no concept of 'connect'
// exists with PGM anyway.
if
(
options
.
type
==
ZMQ_PUB
||
options
.
type
==
ZMQ_XPUB
)
{
// PGM sender.
pgm_sender_t
*
pgm_sender
=
new
(
std
::
nothrow
)
pgm_sender_t
(
io_thread_
,
options
);
alloc_assert
(
pgm_sender
);
int
rc
=
pgm_sender
->
init
(
udp_encapsulation
,
_addr
->
address
.
c_str
());
errno_assert
(
rc
==
0
);
// Both PGM and EPGM transports are using the same infrastructure.
if
(
_addr
->
protocol
==
"pgm"
||
_addr
->
protocol
==
"epgm"
)
{
zmq_assert
(
options
.
type
==
ZMQ_PUB
||
options
.
type
==
ZMQ_XPUB
||
options
.
type
==
ZMQ_SUB
||
options
.
type
==
ZMQ_XSUB
);
// For EPGM transport with UDP encapsulation of PGM is used.
bool
const
udp_encapsulation
=
_addr
->
protocol
==
"epgm"
;
// At this point we'll create message pipes to the session straight
// away. There's no point in delaying it as no concept of 'connect'
// exists with PGM anyway.
if
(
options
.
type
==
ZMQ_PUB
||
options
.
type
==
ZMQ_XPUB
)
{
// PGM sender.
pgm_sender_t
*
pgm_sender
=
new
(
std
::
nothrow
)
pgm_sender_t
(
io_thread
,
options
);
alloc_assert
(
pgm_sender
);
int
rc
=
pgm_sender
->
init
(
udp_encapsulation
,
_addr
->
address
.
c_str
());
errno_assert
(
rc
==
0
);
send_attach
(
this
,
pgm_sender
);
}
else
{
// PGM receiver.
pgm_receiver_t
*
pgm_receiver
=
new
(
std
::
nothrow
)
pgm_receiver_t
(
io_thread
,
options
);
alloc_assert
(
pgm_receiver
);
int
rc
=
pgm_receiver
->
init
(
udp_encapsulation
,
_addr
->
address
.
c_str
());
errno_assert
(
rc
==
0
);
send_attach
(
this
,
pgm_receiver
);
}
send_attach
(
this
,
pgm_sender
);
}
else
{
// PGM receiver.
pgm_receiver_t
*
pgm_receiver
=
new
(
std
::
nothrow
)
pgm_receiver_t
(
io_thread_
,
options
);
alloc_assert
(
pgm_receiver
);
return
;
int
rc
=
pgm_receiver
->
init
(
udp_encapsulation
,
_addr
->
address
.
c_str
());
errno_assert
(
rc
==
0
);
send_attach
(
this
,
pgm_receiver
);
}
}
#endif
#ifdef ZMQ_HAVE_NORM
if
(
_addr
->
protocol
==
"norm"
)
{
// At this point we'll create message pipes to the session straight
// away. There's no point in delaying it as no concept of 'connect'
// exists with NORM anyway.
if
(
options
.
type
==
ZMQ_PUB
||
options
.
type
==
ZMQ_XPUB
)
{
// NORM sender.
norm_engine_t
*
norm_sender
=
new
(
std
::
nothrow
)
norm_engine_t
(
io_thread
,
options
);
alloc_assert
(
norm_sender
);
int
rc
=
norm_sender
->
init
(
_addr
->
address
.
c_str
(),
true
,
false
);
errno_assert
(
rc
==
0
);
send_attach
(
this
,
norm_sender
);
}
else
{
// ZMQ_SUB or ZMQ_XSUB
// NORM receiver.
norm_engine_t
*
norm_receiver
=
new
(
std
::
nothrow
)
norm_engine_t
(
io_thread
,
options
);
alloc_assert
(
norm_receiver
);
int
rc
=
norm_receiver
->
init
(
_addr
->
address
.
c_str
(),
false
,
true
);
errno_assert
(
rc
==
0
);
send_attach
(
this
,
norm_receiver
);
}
return
;
}
#endif // ZMQ_HAVE_NORM
void
zmq
::
session_base_t
::
start_connecting_norm
(
io_thread_t
*
io_thread_
)
{
// At this point we'll create message pipes to the session straight
// away. There's no point in delaying it as no concept of 'connect'
// exists with NORM anyway.
if
(
options
.
type
==
ZMQ_PUB
||
options
.
type
==
ZMQ_XPUB
)
{
// NORM sender.
norm_engine_t
*
norm_sender
=
new
(
std
::
nothrow
)
norm_engine_t
(
io_thread_
,
options
);
alloc_assert
(
norm_sender
);
int
rc
=
norm_sender
->
init
(
_addr
->
address
.
c_str
(),
true
,
false
);
errno_assert
(
rc
==
0
);
#if defined ZMQ_HAVE_VMCI
if
(
_addr
->
protocol
==
protocol_name
::
vmci
)
{
vmci_connecter_t
*
connecter
=
new
(
std
::
nothrow
)
vmci_connecter_t
(
io_thread
,
this
,
options
,
_addr
,
wait_
);
alloc_assert
(
connecter
);
launch_child
(
connecter
);
return
;
send_attach
(
this
,
norm_sender
);
}
else
{
// ZMQ_SUB or ZMQ_XSUB
// NORM receiver.
norm_engine_t
*
norm_receiver
=
new
(
std
::
nothrow
)
norm_engine_t
(
io_thread_
,
options
);
alloc_assert
(
norm_receiver
);
int
rc
=
norm_receiver
->
init
(
_addr
->
address
.
c_str
(),
false
,
true
);
errno_assert
(
rc
==
0
);
send_attach
(
this
,
norm_receiver
);
}
}
#endif
zmq_assert
(
false
);
void
zmq
::
session_base_t
::
start_connecting_udp
(
io_thread_t
*
/*io_thread_*/
)
{
zmq_assert
(
options
.
type
==
ZMQ_DISH
||
options
.
type
==
ZMQ_RADIO
||
options
.
type
==
ZMQ_DGRAM
);
udp_engine_t
*
engine
=
new
(
std
::
nothrow
)
udp_engine_t
(
options
);
alloc_assert
(
engine
);
bool
recv
=
false
;
bool
send
=
false
;
if
(
options
.
type
==
ZMQ_RADIO
)
{
send
=
true
;
recv
=
false
;
}
else
if
(
options
.
type
==
ZMQ_DISH
)
{
send
=
false
;
recv
=
true
;
}
else
if
(
options
.
type
==
ZMQ_DGRAM
)
{
send
=
true
;
recv
=
true
;
}
const
int
rc
=
engine
->
init
(
_addr
,
send
,
recv
);
errno_assert
(
rc
==
0
);
send_attach
(
this
,
engine
);
}
src/session_base.hpp
View file @
fe82c643
...
...
@@ -105,6 +105,33 @@ class session_base_t : public own_t, public io_object_t, public i_pipe_events
private
:
void
start_connecting
(
bool
wait_
);
typedef
own_t
*
(
session_base_t
::*
connecter_factory_fun_t
)
(
io_thread_t
*
io_thread
,
bool
wait_
);
typedef
std
::
pair
<
std
::
string
,
connecter_factory_fun_t
>
connecter_factory_entry_t
;
static
connecter_factory_entry_t
_connecter_factories
[];
typedef
std
::
map
<
std
::
string
,
connecter_factory_fun_t
>
connecter_factory_map_t
;
static
connecter_factory_map_t
_connecter_factories_map
;
own_t
*
create_connecter_vmci
(
io_thread_t
*
io_thread_
,
bool
wait_
);
own_t
*
create_connecter_tipc
(
io_thread_t
*
io_thread_
,
bool
wait_
);
own_t
*
create_connecter_ipc
(
io_thread_t
*
io_thread_
,
bool
wait_
);
own_t
*
create_connecter_tcp
(
io_thread_t
*
io_thread_
,
bool
wait_
);
typedef
void
(
session_base_t
::*
start_connecting_fun_t
)
(
io_thread_t
*
io_thread
);
typedef
std
::
pair
<
std
::
string
,
start_connecting_fun_t
>
start_connecting_entry_t
;
static
start_connecting_entry_t
_start_connecting_entries
[];
typedef
std
::
map
<
std
::
string
,
start_connecting_fun_t
>
start_connecting_map_t
;
static
start_connecting_map_t
_start_connecting_map
;
void
start_connecting_pgm
(
io_thread_t
*
io_thread_
);
void
start_connecting_norm
(
io_thread_t
*
io_thread_
);
void
start_connecting_udp
(
io_thread_t
*
io_thread_
);
void
reconnect
();
// Handlers for incoming commands.
...
...
src/socket_base.cpp
View file @
fe82c643
...
...
@@ -97,6 +97,38 @@
#include "scatter.hpp"
#include "dgram.hpp"
void
zmq
::
socket_base_t
::
inprocs_t
::
emplace
(
const
char
*
endpoint_uri_
,
pipe_t
*
pipe_
)
{
_inprocs
.
ZMQ_MAP_INSERT_OR_EMPLACE
(
std
::
string
(
endpoint_uri_
),
pipe_
);
}
int
zmq
::
socket_base_t
::
inprocs_t
::
erase_pipes
(
const
std
::
string
&
endpoint_uri_str_
)
{
const
std
::
pair
<
map_t
::
iterator
,
map_t
::
iterator
>
range
=
_inprocs
.
equal_range
(
endpoint_uri_str_
);
if
(
range
.
first
==
range
.
second
)
{
errno
=
ENOENT
;
return
-
1
;
}
for
(
map_t
::
iterator
it
=
range
.
first
;
it
!=
range
.
second
;
++
it
)
it
->
second
->
terminate
(
true
);
_inprocs
.
erase
(
range
.
first
,
range
.
second
);
return
0
;
}
void
zmq
::
socket_base_t
::
inprocs_t
::
erase_pipe
(
pipe_t
*
pipe_
)
{
for
(
map_t
::
iterator
it
=
_inprocs
.
begin
(),
end
=
_inprocs
.
end
();
it
!=
end
;
++
it
)
if
(
it
->
second
==
pipe_
)
{
_inprocs
.
erase
(
it
);
break
;
}
}
bool
zmq
::
socket_base_t
::
check_tag
()
const
{
return
_tag
==
0xbaddecaf
;
...
...
@@ -267,9 +299,11 @@ void zmq::socket_base_t::stop ()
send_stop
();
}
// TODO consider renaming protocol_ to scheme_ in conformance with RFC 3986
// terminology, but this requires extensive changes to be consistent
int
zmq
::
socket_base_t
::
parse_uri
(
const
char
*
uri_
,
std
::
string
&
protocol_
,
std
::
string
&
address
_
)
std
::
string
&
path
_
)
{
zmq_assert
(
uri_
!=
NULL
);
...
...
@@ -280,9 +314,9 @@ int zmq::socket_base_t::parse_uri (const char *uri_,
return
-
1
;
}
protocol_
=
uri
.
substr
(
0
,
pos
);
address
_
=
uri
.
substr
(
pos
+
3
);
path
_
=
uri
.
substr
(
pos
+
3
);
if
(
protocol_
.
empty
()
||
address
_
.
empty
())
{
if
(
protocol_
.
empty
()
||
path
_
.
empty
())
{
errno
=
EINVAL
;
return
-
1
;
}
...
...
@@ -292,7 +326,7 @@ int zmq::socket_base_t::parse_uri (const char *uri_,
int
zmq
::
socket_base_t
::
check_protocol
(
const
std
::
string
&
protocol_
)
const
{
// First check out whether the protocol is something we are aware of.
if
(
protocol_
!=
"inproc"
if
(
protocol_
!=
protocol_name
::
inproc
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
&& !defined ZMQ_HAVE_VXWORKS
&&
protocol_
!=
protocol_name
::
ipc
...
...
@@ -465,7 +499,7 @@ void zmq::socket_base_t::remove_signaler (signaler_t *s_)
(
static_cast
<
mailbox_safe_t
*>
(
_mailbox
))
->
remove_signaler
(
s_
);
}
int
zmq
::
socket_base_t
::
bind
(
const
char
*
addr
_
)
int
zmq
::
socket_base_t
::
bind
(
const
char
*
endpoint_uri
_
)
{
scoped_optional_lock_t
sync_lock
(
_thread_safe
?
&
_sync
:
NULL
);
...
...
@@ -480,19 +514,20 @@ int zmq::socket_base_t::bind (const char *addr_)
return
-
1
;
}
// Parse
addr
_ string.
// Parse
endpoint_uri
_ string.
std
::
string
protocol
;
std
::
string
address
;
if
(
parse_uri
(
addr_
,
protocol
,
address
)
||
check_protocol
(
protocol
))
{
if
(
parse_uri
(
endpoint_uri_
,
protocol
,
address
)
||
check_protocol
(
protocol
))
{
return
-
1
;
}
if
(
protocol
==
"inproc"
)
{
if
(
protocol
==
protocol_name
::
inproc
)
{
const
endpoint_t
endpoint
=
{
this
,
options
};
rc
=
register_endpoint
(
addr
_
,
endpoint
);
rc
=
register_endpoint
(
endpoint_uri
_
,
endpoint
);
if
(
rc
==
0
)
{
connect_pending
(
addr
_
,
this
);
_last_endpoint
.
assign
(
addr
_
);
connect_pending
(
endpoint_uri
_
,
this
);
_last_endpoint
.
assign
(
endpoint_uri
_
);
options
.
connected
=
true
;
}
return
rc
;
...
...
@@ -501,7 +536,7 @@ int zmq::socket_base_t::bind (const char *addr_)
if
(
protocol
==
"pgm"
||
protocol
==
"epgm"
||
protocol
==
"norm"
)
{
// For convenience's sake, bind can be used interchangeable with
// connect for PGM, EPGM, NORM transports.
rc
=
connect
(
addr
_
);
rc
=
connect
(
endpoint_uri
_
);
if
(
rc
!=
-
1
)
options
.
connected
=
true
;
return
rc
;
...
...
@@ -556,7 +591,7 @@ int zmq::socket_base_t::bind (const char *addr_)
// Save last endpoint URI
paddr
->
to_string
(
_last_endpoint
);
add_endpoint
(
addr
_
,
static_cast
<
own_t
*>
(
session
),
newpipe
);
add_endpoint
(
endpoint_uri
_
,
static_cast
<
own_t
*>
(
session
),
newpipe
);
return
0
;
}
...
...
@@ -626,7 +661,7 @@ int zmq::socket_base_t::bind (const char *addr_)
// Save last endpoint URI
listener
->
get_address
(
_last_endpoint
);
add_endpoint
(
addr
_
,
static_cast
<
own_t
*>
(
listener
),
NULL
);
add_endpoint
(
endpoint_uri
_
,
static_cast
<
own_t
*>
(
listener
),
NULL
);
options
.
connected
=
true
;
return
0
;
}
...
...
@@ -656,7 +691,7 @@ int zmq::socket_base_t::bind (const char *addr_)
return
-
1
;
}
int
zmq
::
socket_base_t
::
connect
(
const
char
*
addr
_
)
int
zmq
::
socket_base_t
::
connect
(
const
char
*
endpoint_uri
_
)
{
scoped_optional_lock_t
sync_lock
(
_thread_safe
?
&
_sync
:
NULL
);
...
...
@@ -671,43 +706,40 @@ int zmq::socket_base_t::connect (const char *addr_)
return
-
1
;
}
// Parse
addr
_ string.
// Parse
endpoint_uri
_ string.
std
::
string
protocol
;
std
::
string
address
;
if
(
parse_uri
(
addr_
,
protocol
,
address
)
||
check_protocol
(
protocol
))
{
if
(
parse_uri
(
endpoint_uri_
,
protocol
,
address
)
||
check_protocol
(
protocol
))
{
return
-
1
;
}
if
(
protocol
==
"inproc"
)
{
if
(
protocol
==
protocol_name
::
inproc
)
{
// TODO: inproc connect is specific with respect to creating pipes
// as there's no 'reconnect' functionality implemented. Once that
// is in place we should follow generic pipe creation algorithm.
// Find the peer endpoint.
const
endpoint_t
peer
=
find_endpoint
(
addr
_
);
const
endpoint_t
peer
=
find_endpoint
(
endpoint_uri
_
);
// The total HWM for an inproc connection should be the sum of
// the binder's HWM and the connector's HWM.
int
sndhwm
=
0
;
if
(
peer
.
socket
==
NULL
)
sndhwm
=
options
.
sndhwm
;
else
if
(
options
.
sndhwm
!=
0
&&
peer
.
options
.
rcvhwm
!=
0
)
sndhwm
=
options
.
sndhwm
+
peer
.
options
.
rcvhwm
;
int
rcvhwm
=
0
;
if
(
peer
.
socket
==
NULL
)
rcvhwm
=
options
.
rcvhwm
;
else
if
(
options
.
rcvhwm
!=
0
&&
peer
.
options
.
sndhwm
!=
0
)
rcvhwm
=
options
.
rcvhwm
+
peer
.
options
.
sndhwm
;
const
int
sndhwm
=
peer
.
socket
==
NULL
?
options
.
sndhwm
:
options
.
sndhwm
!=
0
&&
peer
.
options
.
rcvhwm
!=
0
?
options
.
sndhwm
+
peer
.
options
.
rcvhwm
:
0
;
const
int
rcvhwm
=
peer
.
socket
==
NULL
?
options
.
rcvhwm
:
options
.
rcvhwm
!=
0
&&
peer
.
options
.
sndhwm
!=
0
?
options
.
rcvhwm
+
peer
.
options
.
sndhwm
:
0
;
// Create a bi-directional pipe to connect the peers.
object_t
*
parents
[
2
]
=
{
this
,
peer
.
socket
==
NULL
?
this
:
peer
.
socket
};
pipe_t
*
new_pipes
[
2
]
=
{
NULL
,
NULL
};
const
bool
conflate
=
options
.
conflate
&&
(
options
.
type
==
ZMQ_DEALER
||
options
.
type
==
ZMQ_PULL
||
options
.
type
==
ZMQ_PUSH
||
options
.
type
==
ZMQ_PUB
||
options
.
type
==
ZMQ_SUB
);
const
bool
conflate
=
get_effective_conflate_option
(
options
);
int
hwms
[
2
]
=
{
conflate
?
-
1
:
sndhwm
,
conflate
?
-
1
:
rcvhwm
};
bool
conflates
[
2
]
=
{
conflate
,
conflate
};
...
...
@@ -725,42 +757,19 @@ int zmq::socket_base_t::connect (const char *addr_)
// to send the routing id message or not. To resolve this,
// we always send our routing id and drop it later if
// the peer doesn't expect it.
msg_t
id
;
rc
=
id
.
init_size
(
options
.
routing_id_size
);
errno_assert
(
rc
==
0
);
memcpy
(
id
.
data
(),
options
.
routing_id
,
options
.
routing_id_size
);
id
.
set_flags
(
msg_t
::
routing_id
);
const
bool
written
=
new_pipes
[
0
]
->
write
(
&
id
);
zmq_assert
(
written
);
new_pipes
[
0
]
->
flush
();
send_routing_id
(
new_pipes
[
0
],
options
);
const
endpoint_t
endpoint
=
{
this
,
options
};
pend_connection
(
std
::
string
(
addr
_
),
endpoint
,
new_pipes
);
pend_connection
(
std
::
string
(
endpoint_uri
_
),
endpoint
,
new_pipes
);
}
else
{
// If required, send the routing id of the local socket to the peer.
if
(
peer
.
options
.
recv_routing_id
)
{
msg_t
id
;
rc
=
id
.
init_size
(
options
.
routing_id_size
);
errno_assert
(
rc
==
0
);
memcpy
(
id
.
data
(),
options
.
routing_id
,
options
.
routing_id_size
);
id
.
set_flags
(
msg_t
::
routing_id
);
const
bool
written
=
new_pipes
[
0
]
->
write
(
&
id
);
zmq_assert
(
written
);
new_pipes
[
0
]
->
flush
();
send_routing_id
(
new_pipes
[
0
],
options
);
}
// If required, send the routing id of the peer to the local socket.
if
(
options
.
recv_routing_id
)
{
msg_t
id
;
rc
=
id
.
init_size
(
peer
.
options
.
routing_id_size
);
errno_assert
(
rc
==
0
);
memcpy
(
id
.
data
(),
peer
.
options
.
routing_id
,
peer
.
options
.
routing_id_size
);
id
.
set_flags
(
msg_t
::
routing_id
);
const
bool
written
=
new_pipes
[
1
]
->
write
(
&
id
);
zmq_assert
(
written
);
new_pipes
[
1
]
->
flush
();
send_routing_id
(
new_pipes
[
1
],
peer
.
options
);
}
// Attach remote end of the pipe to the peer socket. Note that peer's
...
...
@@ -773,10 +782,10 @@ int zmq::socket_base_t::connect (const char *addr_)
attach_pipe
(
new_pipes
[
0
],
false
,
true
);
// Save last endpoint URI
_last_endpoint
.
assign
(
addr
_
);
_last_endpoint
.
assign
(
endpoint_uri
_
);
// remember inproc connections for disconnect
_inprocs
.
ZMQ_MAP_INSERT_OR_EMPLACE
(
std
::
string
(
addr_
)
,
new_pipes
[
0
]);
_inprocs
.
emplace
(
endpoint_uri_
,
new_pipes
[
0
]);
options
.
connected
=
true
;
return
0
;
...
...
@@ -785,7 +794,7 @@ int zmq::socket_base_t::connect (const char *addr_)
(
options
.
type
==
ZMQ_DEALER
||
options
.
type
==
ZMQ_SUB
||
options
.
type
==
ZMQ_PUB
||
options
.
type
==
ZMQ_REQ
);
if
(
unlikely
(
is_single_connect
))
{
if
(
0
!=
_endpoints
.
count
(
addr
_
))
{
if
(
0
!=
_endpoints
.
count
(
endpoint_uri
_
))
{
// There is no valid use for multiple connects for SUB-PUB nor
// DEALER-ROUTER nor REQ-REP. Multiple connects produces
// nonsensical results.
...
...
@@ -942,11 +951,7 @@ int zmq::socket_base_t::connect (const char *addr_)
object_t
*
parents
[
2
]
=
{
this
,
session
};
pipe_t
*
new_pipes
[
2
]
=
{
NULL
,
NULL
};
const
bool
conflate
=
options
.
conflate
&&
(
options
.
type
==
ZMQ_DEALER
||
options
.
type
==
ZMQ_PULL
||
options
.
type
==
ZMQ_PUSH
||
options
.
type
==
ZMQ_PUB
||
options
.
type
==
ZMQ_SUB
);
const
bool
conflate
=
get_effective_conflate_option
(
options
);
int
hwms
[
2
]
=
{
conflate
?
-
1
:
options
.
sndhwm
,
conflate
?
-
1
:
options
.
rcvhwm
};
...
...
@@ -965,21 +970,48 @@ int zmq::socket_base_t::connect (const char *addr_)
// Save last endpoint URI
paddr
->
to_string
(
_last_endpoint
);
add_endpoint
(
addr
_
,
static_cast
<
own_t
*>
(
session
),
newpipe
);
add_endpoint
(
endpoint_uri
_
,
static_cast
<
own_t
*>
(
session
),
newpipe
);
return
0
;
}
void
zmq
::
socket_base_t
::
add_endpoint
(
const
char
*
addr_
,
std
::
string
zmq
::
socket_base_t
::
resolve_tcp_addr
(
std
::
string
endpoint_uri_
,
const
char
*
tcp_address_
)
{
// The resolved last_endpoint is used as a key in the endpoints map.
// The address passed by the user might not match in the TCP case due to
// IPv4-in-IPv6 mapping (EG: tcp://[::ffff:127.0.0.1]:9999), so try to
// resolve before giving up. Given at this stage we don't know whether a
// socket is connected or bound, try with both.
if
(
_endpoints
.
find
(
endpoint_uri_
)
==
_endpoints
.
end
())
{
tcp_address_t
*
tcp_addr
=
new
(
std
::
nothrow
)
tcp_address_t
();
alloc_assert
(
tcp_addr
);
int
rc
=
tcp_addr
->
resolve
(
tcp_address_
,
false
,
options
.
ipv6
);
if
(
rc
==
0
)
{
tcp_addr
->
to_string
(
endpoint_uri_
);
if
(
_endpoints
.
find
(
endpoint_uri_
)
==
_endpoints
.
end
())
{
rc
=
tcp_addr
->
resolve
(
tcp_address_
,
true
,
options
.
ipv6
);
if
(
rc
==
0
)
{
tcp_addr
->
to_string
(
endpoint_uri_
);
}
}
}
LIBZMQ_DELETE
(
tcp_addr
);
}
return
endpoint_uri_
;
}
void
zmq
::
socket_base_t
::
add_endpoint
(
const
char
*
endpoint_uri_
,
own_t
*
endpoint_
,
pipe_t
*
pipe_
)
{
// Activate the session. Make it a child of this socket.
launch_child
(
endpoint_
);
_endpoints
.
ZMQ_MAP_INSERT_OR_EMPLACE
(
std
::
string
(
addr
_
),
_endpoints
.
ZMQ_MAP_INSERT_OR_EMPLACE
(
std
::
string
(
endpoint_uri
_
),
endpoint_pipe_t
(
endpoint_
,
pipe_
));
}
int
zmq
::
socket_base_t
::
term_endpoint
(
const
char
*
addr
_
)
int
zmq
::
socket_base_t
::
term_endpoint
(
const
char
*
endpoint_uri
_
)
{
scoped_optional_lock_t
sync_lock
(
_thread_safe
?
&
_sync
:
NULL
);
...
...
@@ -990,75 +1022,43 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
}
// Check whether endpoint address passed to the function is valid.
if
(
unlikely
(
!
addr
_
))
{
if
(
unlikely
(
!
endpoint_uri
_
))
{
errno
=
EINVAL
;
return
-
1
;
}
// Process pending commands, if any, since there could be pending unprocessed process_own()'s
// (from launch_child() for example) we're asked to terminate now.
int
rc
=
process_commands
(
0
,
false
);
const
int
rc
=
process_commands
(
0
,
false
);
if
(
unlikely
(
rc
!=
0
))
{
return
-
1
;
}
// Parse addr_ string.
std
::
string
protocol
;
std
::
string
address
;
if
(
parse_uri
(
addr_
,
protocol
,
address
)
||
check_protocol
(
protocol
))
{
// Parse endpoint_uri_ string.
std
::
string
uri_protocol
;
std
::
string
uri_path
;
if
(
parse_uri
(
endpoint_uri_
,
uri_protocol
,
uri_path
)
||
check_protocol
(
uri_protocol
))
{
return
-
1
;
}
const
std
::
string
addr_str
=
std
::
string
(
addr
_
);
const
std
::
string
endpoint_uri_str
=
std
::
string
(
endpoint_uri
_
);
// Disconnect an inproc socket
if
(
protocol
==
"inproc"
)
{
if
(
unregister_endpoint
(
addr_str
,
this
)
==
0
)
{
return
0
;
}
const
std
::
pair
<
inprocs_t
::
iterator
,
inprocs_t
::
iterator
>
range
=
_inprocs
.
equal_range
(
addr_str
);
if
(
range
.
first
==
range
.
second
)
{
errno
=
ENOENT
;
return
-
1
;
}
for
(
inprocs_t
::
iterator
it
=
range
.
first
;
it
!=
range
.
second
;
++
it
)
it
->
second
->
terminate
(
true
);
_inprocs
.
erase
(
range
.
first
,
range
.
second
);
return
0
;
if
(
uri_protocol
==
protocol_name
::
inproc
)
{
return
unregister_endpoint
(
endpoint_uri_str
,
this
)
==
0
?
0
:
_inprocs
.
erase_pipes
(
endpoint_uri_str
);
}
std
::
string
resolved_addr
=
addr_
;
const
std
::
string
resolved_endpoint_uri
=
uri_protocol
==
protocol_name
::
tcp
?
resolve_tcp_addr
(
endpoint_uri_str
,
uri_path
.
c_str
())
:
endpoint_uri_str
;
// The resolved last_endpoint is used as a key in the endpoints map.
// The address passed by the user might not match in the TCP case due to
// IPv4-in-IPv6 mapping (EG: tcp://[::ffff:127.0.0.1]:9999), so try to
// resolve before giving up. Given at this stage we don't know whether a
// socket is connected or bound, try with both.
if
(
protocol
==
protocol_name
::
tcp
)
{
if
(
_endpoints
.
find
(
resolved_addr
)
==
_endpoints
.
end
())
{
tcp_address_t
*
tcp_addr
=
new
(
std
::
nothrow
)
tcp_address_t
();
alloc_assert
(
tcp_addr
);
rc
=
tcp_addr
->
resolve
(
address
.
c_str
(),
false
,
options
.
ipv6
);
if
(
rc
==
0
)
{
tcp_addr
->
to_string
(
resolved_addr
);
if
(
_endpoints
.
find
(
resolved_addr
)
==
_endpoints
.
end
())
{
rc
=
tcp_addr
->
resolve
(
address
.
c_str
(),
true
,
options
.
ipv6
);
if
(
rc
==
0
)
{
tcp_addr
->
to_string
(
resolved_addr
);
}
}
}
LIBZMQ_DELETE
(
tcp_addr
);
}
}
// Find the endpoints range (if any) corresponding to the addr_ string.
// Find the endpoints range (if any) corresponding to the endpoint_uri_ string.
const
std
::
pair
<
endpoints_t
::
iterator
,
endpoints_t
::
iterator
>
range
=
_endpoints
.
equal_range
(
resolved_
addr
);
_endpoints
.
equal_range
(
resolved_
endpoint_uri
);
if
(
range
.
first
==
range
.
second
)
{
errno
=
ENOENT
;
return
-
1
;
...
...
@@ -1538,12 +1538,7 @@ void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
xpipe_terminated
(
pipe_
);
// Remove pipe from inproc pipes
for
(
inprocs_t
::
iterator
it
=
_inprocs
.
begin
(),
end
=
_inprocs
.
end
();
it
!=
end
;
++
it
)
if
(
it
->
second
==
pipe_
)
{
_inprocs
.
erase
(
it
);
break
;
}
_inprocs
.
erase_pipe
(
pipe_
);
// Remove the pipe from the list of attached pipes and confirm its
// termination if we are already shutting down.
...
...
@@ -1576,14 +1571,14 @@ int zmq::socket_base_t::monitor (const char *endpoint_, int events_)
stop_monitor
();
return
0
;
}
// Parse
addr
_ string.
// Parse
endpoint_uri
_ string.
std
::
string
protocol
;
std
::
string
address
;
if
(
parse_uri
(
endpoint_
,
protocol
,
address
)
||
check_protocol
(
protocol
))
return
-
1
;
// Event notification only supported over inproc://
if
(
protocol
!=
"inproc"
)
{
if
(
protocol
!=
protocol_name
::
inproc
)
{
errno
=
EPROTONOSUPPORT
;
return
-
1
;
}
...
...
@@ -1611,101 +1606,104 @@ int zmq::socket_base_t::monitor (const char *endpoint_, int events_)
return
rc
;
}
void
zmq
::
socket_base_t
::
event_connected
(
const
std
::
string
&
addr
_
,
void
zmq
::
socket_base_t
::
event_connected
(
const
std
::
string
&
endpoint_uri
_
,
zmq
::
fd_t
fd_
)
{
event
(
addr
_
,
fd_
,
ZMQ_EVENT_CONNECTED
);
event
(
endpoint_uri
_
,
fd_
,
ZMQ_EVENT_CONNECTED
);
}
void
zmq
::
socket_base_t
::
event_connect_delayed
(
const
std
::
string
&
addr_
,
int
err_
)
void
zmq
::
socket_base_t
::
event_connect_delayed
(
const
std
::
string
&
endpoint_uri_
,
int
err_
)
{
event
(
addr
_
,
err_
,
ZMQ_EVENT_CONNECT_DELAYED
);
event
(
endpoint_uri
_
,
err_
,
ZMQ_EVENT_CONNECT_DELAYED
);
}
void
zmq
::
socket_base_t
::
event_connect_retried
(
const
std
::
string
&
addr_
,
int
interval_
)
void
zmq
::
socket_base_t
::
event_connect_retried
(
const
std
::
string
&
endpoint_uri_
,
int
interval_
)
{
event
(
addr
_
,
interval_
,
ZMQ_EVENT_CONNECT_RETRIED
);
event
(
endpoint_uri
_
,
interval_
,
ZMQ_EVENT_CONNECT_RETRIED
);
}
void
zmq
::
socket_base_t
::
event_listening
(
const
std
::
string
&
addr
_
,
void
zmq
::
socket_base_t
::
event_listening
(
const
std
::
string
&
endpoint_uri
_
,
zmq
::
fd_t
fd_
)
{
event
(
addr
_
,
fd_
,
ZMQ_EVENT_LISTENING
);
event
(
endpoint_uri
_
,
fd_
,
ZMQ_EVENT_LISTENING
);
}
void
zmq
::
socket_base_t
::
event_bind_failed
(
const
std
::
string
&
addr_
,
int
err_
)
void
zmq
::
socket_base_t
::
event_bind_failed
(
const
std
::
string
&
endpoint_uri_
,
int
err_
)
{
event
(
addr
_
,
err_
,
ZMQ_EVENT_BIND_FAILED
);
event
(
endpoint_uri
_
,
err_
,
ZMQ_EVENT_BIND_FAILED
);
}
void
zmq
::
socket_base_t
::
event_accepted
(
const
std
::
string
&
addr
_
,
void
zmq
::
socket_base_t
::
event_accepted
(
const
std
::
string
&
endpoint_uri
_
,
zmq
::
fd_t
fd_
)
{
event
(
addr
_
,
fd_
,
ZMQ_EVENT_ACCEPTED
);
event
(
endpoint_uri
_
,
fd_
,
ZMQ_EVENT_ACCEPTED
);
}
void
zmq
::
socket_base_t
::
event_accept_failed
(
const
std
::
string
&
addr
_
,
void
zmq
::
socket_base_t
::
event_accept_failed
(
const
std
::
string
&
endpoint_uri
_
,
int
err_
)
{
event
(
addr
_
,
err_
,
ZMQ_EVENT_ACCEPT_FAILED
);
event
(
endpoint_uri
_
,
err_
,
ZMQ_EVENT_ACCEPT_FAILED
);
}
void
zmq
::
socket_base_t
::
event_closed
(
const
std
::
string
&
addr_
,
zmq
::
fd_t
fd_
)
void
zmq
::
socket_base_t
::
event_closed
(
const
std
::
string
&
endpoint_uri_
,
zmq
::
fd_t
fd_
)
{
event
(
addr
_
,
fd_
,
ZMQ_EVENT_CLOSED
);
event
(
endpoint_uri
_
,
fd_
,
ZMQ_EVENT_CLOSED
);
}
void
zmq
::
socket_base_t
::
event_close_failed
(
const
std
::
string
&
addr_
,
int
err_
)
void
zmq
::
socket_base_t
::
event_close_failed
(
const
std
::
string
&
endpoint_uri_
,
int
err_
)
{
event
(
addr
_
,
err_
,
ZMQ_EVENT_CLOSE_FAILED
);
event
(
endpoint_uri
_
,
err_
,
ZMQ_EVENT_CLOSE_FAILED
);
}
void
zmq
::
socket_base_t
::
event_disconnected
(
const
std
::
string
&
addr
_
,
void
zmq
::
socket_base_t
::
event_disconnected
(
const
std
::
string
&
endpoint_uri
_
,
zmq
::
fd_t
fd_
)
{
event
(
addr
_
,
fd_
,
ZMQ_EVENT_DISCONNECTED
);
event
(
endpoint_uri
_
,
fd_
,
ZMQ_EVENT_DISCONNECTED
);
}
void
zmq
::
socket_base_t
::
event_handshake_failed_no_detail
(
const
std
::
string
&
addr
_
,
int
err_
)
const
std
::
string
&
endpoint_uri
_
,
int
err_
)
{
event
(
addr
_
,
err_
,
ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL
);
event
(
endpoint_uri
_
,
err_
,
ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL
);
}
void
zmq
::
socket_base_t
::
event_handshake_failed_protocol
(
const
std
::
string
&
addr
_
,
int
err_
)
const
std
::
string
&
endpoint_uri
_
,
int
err_
)
{
event
(
addr
_
,
err_
,
ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL
);
event
(
endpoint_uri
_
,
err_
,
ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL
);
}
void
zmq
::
socket_base_t
::
event_handshake_failed_auth
(
const
std
::
string
&
addr_
,
int
err_
)
void
zmq
::
socket_base_t
::
event_handshake_failed_auth
(
const
std
::
string
&
endpoint_uri_
,
int
err_
)
{
event
(
addr
_
,
err_
,
ZMQ_EVENT_HANDSHAKE_FAILED_AUTH
);
event
(
endpoint_uri
_
,
err_
,
ZMQ_EVENT_HANDSHAKE_FAILED_AUTH
);
}
void
zmq
::
socket_base_t
::
event_handshake_succeeded
(
const
std
::
string
&
addr_
,
int
err_
)
void
zmq
::
socket_base_t
::
event_handshake_succeeded
(
const
std
::
string
&
endpoint_uri_
,
int
err_
)
{
event
(
addr
_
,
err_
,
ZMQ_EVENT_HANDSHAKE_SUCCEEDED
);
event
(
endpoint_uri
_
,
err_
,
ZMQ_EVENT_HANDSHAKE_SUCCEEDED
);
}
void
zmq
::
socket_base_t
::
event
(
const
std
::
string
&
addr
_
,
void
zmq
::
socket_base_t
::
event
(
const
std
::
string
&
endpoint_uri
_
,
intptr_t
value_
,
int
type_
)
{
scoped_lock_t
lock
(
_monitor_sync
);
if
(
_monitor_events
&
type_
)
{
monitor_event
(
type_
,
value_
,
addr
_
);
monitor_event
(
type_
,
value_
,
endpoint_uri
_
);
}
}
// Send a monitor event
void
zmq
::
socket_base_t
::
monitor_event
(
int
event_
,
intptr_t
value_
,
const
std
::
string
&
addr
_
)
const
const
std
::
string
&
endpoint_uri
_
)
const
{
// this is a private method which is only called from
// contexts where the mutex has been locked before
...
...
@@ -1723,8 +1721,9 @@ void zmq::socket_base_t::monitor_event (int event_,
zmq_sendmsg
(
_monitor_socket
,
&
msg
,
ZMQ_SNDMORE
);
// Send address in second frame
zmq_msg_init_size
(
&
msg
,
addr_
.
size
());
memcpy
(
zmq_msg_data
(
&
msg
),
addr_
.
c_str
(),
addr_
.
size
());
zmq_msg_init_size
(
&
msg
,
endpoint_uri_
.
size
());
memcpy
(
zmq_msg_data
(
&
msg
),
endpoint_uri_
.
c_str
(),
endpoint_uri_
.
size
());
zmq_sendmsg
(
_monitor_socket
,
&
msg
,
0
);
}
}
...
...
src/socket_base.hpp
View file @
fe82c643
...
...
@@ -82,9 +82,9 @@ class socket_base_t : public own_t,
// Interface for communication with the API layer.
int
setsockopt
(
int
option_
,
const
void
*
optval_
,
size_t
optvallen_
);
int
getsockopt
(
int
option_
,
void
*
optval_
,
size_t
*
optvallen_
);
int
bind
(
const
char
*
addr
_
);
int
connect
(
const
char
*
addr
_
);
int
term_endpoint
(
const
char
*
addr
_
);
int
bind
(
const
char
*
endpoint_uri
_
);
int
connect
(
const
char
*
endpoint_uri
_
);
int
term_endpoint
(
const
char
*
endpoint_uri
_
);
int
send
(
zmq
::
msg_t
*
msg_
,
int
flags_
);
int
recv
(
zmq
::
msg_t
*
msg_
,
int
flags_
);
void
add_signaler
(
signaler_t
*
s_
);
...
...
@@ -120,20 +120,24 @@ class socket_base_t : public own_t,
int
monitor
(
const
char
*
endpoint_
,
int
events_
);
void
event_connected
(
const
std
::
string
&
addr_
,
zmq
::
fd_t
fd_
);
void
event_connect_delayed
(
const
std
::
string
&
addr_
,
int
err_
);
void
event_connect_retried
(
const
std
::
string
&
addr_
,
int
interval_
);
void
event_listening
(
const
std
::
string
&
addr_
,
zmq
::
fd_t
fd_
);
void
event_bind_failed
(
const
std
::
string
&
addr_
,
int
err_
);
void
event_accepted
(
const
std
::
string
&
addr_
,
zmq
::
fd_t
fd_
);
void
event_accept_failed
(
const
std
::
string
&
addr_
,
int
err_
);
void
event_closed
(
const
std
::
string
&
addr_
,
zmq
::
fd_t
fd_
);
void
event_close_failed
(
const
std
::
string
&
addr_
,
int
err_
);
void
event_disconnected
(
const
std
::
string
&
addr_
,
zmq
::
fd_t
fd_
);
void
event_handshake_failed_no_detail
(
const
std
::
string
&
addr_
,
int
err_
);
void
event_handshake_failed_protocol
(
const
std
::
string
&
addr_
,
int
err_
);
void
event_handshake_failed_auth
(
const
std
::
string
&
addr_
,
int
err_
);
void
event_handshake_succeeded
(
const
std
::
string
&
addr_
,
int
err_
);
void
event_connected
(
const
std
::
string
&
endpoint_uri_
,
zmq
::
fd_t
fd_
);
void
event_connect_delayed
(
const
std
::
string
&
endpoint_uri_
,
int
err_
);
void
event_connect_retried
(
const
std
::
string
&
endpoint_uri_
,
int
interval_
);
void
event_listening
(
const
std
::
string
&
endpoint_uri_
,
zmq
::
fd_t
fd_
);
void
event_bind_failed
(
const
std
::
string
&
endpoint_uri_
,
int
err_
);
void
event_accepted
(
const
std
::
string
&
endpoint_uri_
,
zmq
::
fd_t
fd_
);
void
event_accept_failed
(
const
std
::
string
&
endpoint_uri_
,
int
err_
);
void
event_closed
(
const
std
::
string
&
endpoint_uri_
,
zmq
::
fd_t
fd_
);
void
event_close_failed
(
const
std
::
string
&
endpoint_uri_
,
int
err_
);
void
event_disconnected
(
const
std
::
string
&
endpoint_uri_
,
zmq
::
fd_t
fd_
);
void
event_handshake_failed_no_detail
(
const
std
::
string
&
endpoint_uri_
,
int
err_
);
void
event_handshake_failed_protocol
(
const
std
::
string
&
endpoint_uri_
,
int
err_
);
void
event_handshake_failed_auth
(
const
std
::
string
&
endpoint_uri_
,
int
err_
);
void
event_handshake_succeeded
(
const
std
::
string
&
endpoint_uri_
,
int
err_
);
// Query the state of a specific peer. The default implementation
// always returns an ENOTSUP error.
...
...
@@ -182,17 +186,19 @@ class socket_base_t : public own_t,
private
:
// test if event should be sent and then dispatch it
void
event
(
const
std
::
string
&
addr
_
,
intptr_t
value_
,
int
type_
);
void
event
(
const
std
::
string
&
endpoint_uri
_
,
intptr_t
value_
,
int
type_
);
// Socket event data dispatch
void
monitor_event
(
int
event_
,
intptr_t
value_
,
const
std
::
string
&
addr_
)
const
;
void
monitor_event
(
int
event_
,
intptr_t
value_
,
const
std
::
string
&
endpoint_uri_
)
const
;
// Monitor socket cleanup
void
stop_monitor
(
bool
send_monitor_stopped_event_
=
true
);
// Creates new endpoint ID and adds the endpoint to the map.
void
add_endpoint
(
const
char
*
addr_
,
own_t
*
endpoint_
,
pipe_t
*
pipe_
);
void
add_endpoint
(
const
char
*
endpoint_uri_
,
own_t
*
endpoint_
,
pipe_t
*
pipe_
);
// Map of open endpoints.
typedef
std
::
pair
<
own_t
*
,
pipe_t
*>
endpoint_pipe_t
;
...
...
@@ -200,7 +206,17 @@ class socket_base_t : public own_t,
endpoints_t
_endpoints
;
// Map of open inproc endpoints.
typedef
std
::
multimap
<
std
::
string
,
pipe_t
*>
inprocs_t
;
class
inprocs_t
{
public
:
void
emplace
(
const
char
*
endpoint_uri_
,
pipe_t
*
pipe_
);
int
erase_pipes
(
const
std
::
string
&
endpoint_uri_str_
);
void
erase_pipe
(
pipe_t
*
pipe_
);
private
:
typedef
std
::
multimap
<
std
::
string
,
pipe_t
*>
map_t
;
map_t
_inprocs
;
};
inprocs_t
_inprocs
;
// To be called after processing commands or invoking any command
...
...
@@ -224,7 +240,7 @@ class socket_base_t : public own_t,
// Parse URI string.
static
int
parse_uri
(
const
char
*
uri_
,
std
::
string
&
protocol_
,
std
::
string
&
address
_
);
parse_uri
(
const
char
*
uri_
,
std
::
string
&
protocol_
,
std
::
string
&
path
_
);
// Check whether transport protocol, as specified in connect or
// bind, is available and compatible with the socket type.
...
...
@@ -249,6 +265,9 @@ class socket_base_t : public own_t,
void
update_pipe_options
(
int
option_
);
std
::
string
resolve_tcp_addr
(
std
::
string
endpoint_uri_
,
const
char
*
tcp_address_
);
// Socket's mailbox object.
i_mailbox
*
_mailbox
;
...
...
src/stream_engine.cpp
View file @
fe82c643
...
...
@@ -67,7 +67,6 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_,
const
options_t
&
options_
,
const
std
::
string
&
endpoint_
)
:
_s
(
fd_
),
_as_server
(
false
),
_handle
(
static_cast
<
handle_t
>
(
NULL
)),
_inpos
(
NULL
),
_insize
(
0
),
...
...
src/stream_engine.hpp
View file @
fe82c643
...
...
@@ -145,9 +145,6 @@ class stream_engine_t : public io_object_t, public i_engine
// Underlying socket.
fd_t
_s
;
// True iff this is server's engine.
bool
_as_server
;
msg_t
_tx_msg
;
// Need to store PING payload for PONG
msg_t
_pong_msg
;
...
...
tests/CMakeLists.txt
View file @
fe82c643
...
...
@@ -28,7 +28,6 @@ set(tests
test_stream_empty
test_stream_disconnect
test_disconnect_inproc
test_unbind_inproc
test_unbind_wildcard
test_ctx_options
test_ctx_destroy
...
...
tests/test_connect_delay_tipc.cpp
View file @
fe82c643
...
...
@@ -28,17 +28,23 @@
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
int
main
(
void
)
#include <unity.h>
void
setUp
()
{
if
(
!
is_tipc_available
())
{
printf
(
"TIPC environment unavailable, skipping test
\n
"
);
return
77
;
}
setup_test_context
();
}
void
tearDown
()
{
teardown_test_context
();
}
void
test_send_one_connected_one_unconnected
()
{
int
val
;
int
rc
;
char
buffer
[
16
];
// TEST 1.
// First we're going to attempt to send messages to two
// pipes, one connected, the other not. We should see
...
...
@@ -46,61 +52,57 @@ int main (void)
// of the messages getting queued, as connect() creates a
// pipe immediately.
void
*
context
=
zmq_ctx_new
();
assert
(
context
);
void
*
to
=
zmq_socket
(
context
,
ZMQ_PULL
);
assert
(
to
);
void
*
to
=
test_context_socket
(
ZMQ_PULL
);
// Bind the one valid receiver
val
=
0
;
rc
=
zmq_setsockopt
(
to
,
ZMQ_LINGER
,
&
val
,
sizeof
(
val
));
assert
(
rc
==
0
);
rc
=
zmq_bind
(
to
,
"tipc://{6555,0,0}"
);
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
to
,
ZMQ_LINGER
,
&
val
,
sizeof
(
val
)));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
to
,
"tipc://{6555,0,0}"
));
// Create a socket pushing to two endpoints - only 1 message should arrive.
void
*
from
=
zmq_socket
(
context
,
ZMQ_PUSH
);
assert
(
from
);
void
*
from
=
test_context_socket
(
ZMQ_PUSH
);
val
=
0
;
zmq_setsockopt
(
from
,
ZMQ_LINGER
,
&
val
,
sizeof
(
val
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
from
,
ZMQ_LINGER
,
&
val
,
sizeof
(
val
)));
// This pipe will not connect
rc
=
zmq_connect
(
from
,
"tipc://{5556,0}@0.0.0"
);
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
from
,
"tipc://{5556,0}@0.0.0"
));
// This pipe will
rc
=
zmq_connect
(
from
,
"tipc://{6555,0}@0.0.0"
);
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
from
,
"tipc://{6555,0}@0.0.0"
));
// We send 10 messages, 5 should just get stuck in the queue
// for the not-yet-connected pipe
for
(
int
i
=
0
;
i
<
10
;
++
i
)
{
rc
=
zmq_send
(
from
,
"Hello"
,
5
,
0
);
assert
(
rc
==
5
);
const
int
send_count
=
10
;
for
(
int
i
=
0
;
i
<
send_count
;
++
i
)
{
send_string_expect_success
(
from
,
"Hello"
,
0
);
}
// We now consume from the connected pipe
// - we should see just 5
int
timeout
=
250
;
rc
=
zmq_setsockopt
(
to
,
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
int
));
assert
(
rc
==
0
);
int
timeout
=
SETTLE_TIME
;
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
to
,
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
int
))
);
int
seen
=
0
;
while
(
true
)
{
rc
=
zmq_recv
(
to
,
&
buffer
,
sizeof
(
buffer
),
0
);
if
(
rc
==
-
1
)
char
buffer
[
16
];
int
rc
=
zmq_recv
(
to
,
&
buffer
,
sizeof
(
buffer
),
0
);
if
(
rc
==
-
1
)
{
TEST_ASSERT_EQUAL_INT
(
EAGAIN
,
zmq_errno
());
break
;
// Break when we didn't get a message
}
seen
++
;
}
assert
(
seen
==
5
);
TEST_ASSERT_EQUAL_INT
(
send_count
/
2
,
seen
);
rc
=
zmq_close
(
from
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
to
);
assert
(
rc
==
0
);
test_context_socket_close
(
from
);
test_context_socket_close
(
to
);
}
rc
=
zmq_ctx_term
(
context
);
assert
(
rc
==
0
);
void
test_send_one_connected_one_unconnected_with_delay
()
{
int
val
;
// TEST 2
// This time we will do the same thing, connect two pipes,
...
...
@@ -109,135 +111,122 @@ int main (void)
// also set the delay attach on connect flag, which should
// cause the pipe attachment to be delayed until the connection
// succeeds.
context
=
zmq_ctx_new
();
// Bind the valid socket
to
=
zmq_socket
(
context
,
ZMQ_PULL
);
assert
(
to
);
rc
=
zmq_bind
(
to
,
"tipc://{5560,0,0}"
);
assert
(
rc
==
0
);
void
*
to
=
test_context_socket
(
ZMQ_PULL
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
to
,
"tipc://{5560,0,0}"
));
val
=
0
;
rc
=
zmq_setsockopt
(
to
,
ZMQ_LINGER
,
&
val
,
sizeof
(
val
));
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
to
,
ZMQ_LINGER
,
&
val
,
sizeof
(
val
))
);
// Create a socket pushing to two endpoints - all messages should arrive.
from
=
zmq_socket
(
context
,
ZMQ_PUSH
);
assert
(
from
);
void
*
from
=
test_context_socket
(
ZMQ_PUSH
);
val
=
0
;
rc
=
zmq_setsockopt
(
from
,
ZMQ_LINGER
,
&
val
,
sizeof
(
val
));
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
from
,
ZMQ_LINGER
,
&
val
,
sizeof
(
val
))
);
// Set the key flag
val
=
1
;
rc
=
zmq_setsockopt
(
from
,
ZMQ_DELAY_ATTACH_ON_CONNECT
,
&
val
,
sizeof
(
val
));
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
from
,
ZMQ_DELAY_ATTACH_ON_CONNECT
,
&
val
,
sizeof
(
val
))
);
// Connect to the invalid socket
rc
=
zmq_connect
(
from
,
"tipc://{5561,0}@0.0.0"
);
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
from
,
"tipc://{5561,0}@0.0.0"
));
// Connect to the valid socket
rc
=
zmq_connect
(
from
,
"tipc://{5560,0}@0.0.0"
);
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
from
,
"tipc://{5560,0}@0.0.0"
));
// Send 10 messages, all should be routed to the connected pipe
for
(
int
i
=
0
;
i
<
10
;
++
i
)
{
rc
=
zmq_send
(
from
,
"Hello"
,
5
,
0
);
assert
(
rc
==
5
);
const
int
send_count
=
10
;
for
(
int
i
=
0
;
i
<
send_count
;
++
i
)
{
send_string_expect_success
(
from
,
"Hello"
,
0
);
}
rc
=
zmq_setsockopt
(
to
,
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
int
));
assert
(
rc
==
0
);
int
timeout
=
SETTLE_TIME
;
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
to
,
ZMQ_RCVTIMEO
,
&
timeout
,
sizeof
(
int
)));
seen
=
0
;
int
seen
=
0
;
while
(
true
)
{
rc
=
zmq_recv
(
to
,
&
buffer
,
sizeof
(
buffer
),
0
);
if
(
rc
==
-
1
)
char
buffer
[
16
];
int
rc
=
zmq_recv
(
to
,
&
buffer
,
sizeof
(
buffer
),
0
);
if
(
rc
==
-
1
)
{
TEST_ASSERT_EQUAL_INT
(
EAGAIN
,
zmq_errno
());
break
;
// Break when we didn't get a message
}
seen
++
;
}
assert
(
seen
==
10
);
rc
=
zmq_close
(
from
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
to
);
assert
(
rc
==
0
);
TEST_ASSERT_EQUAL_INT
(
send_count
,
seen
);
rc
=
zmq_ctx_term
(
context
);
assert
(
rc
==
0
);
test_context_socket_close
(
from
);
test_context_socket_close
(
to
);
}
void
test_send_disconnected_with_delay
()
{
// TEST 3
// This time we want to validate that the same blocking behaviour
// occurs with an existing connection that is broken. We will send
// messages to a connected pipe, disconnect and verify the messages
// block. Then we reconnect and verify messages flow again.
context
=
zmq_ctx_new
();
void
*
backend
=
zmq_socket
(
context
,
ZMQ_DEALER
);
assert
(
backend
);
void
*
frontend
=
zmq_socket
(
context
,
ZMQ_DEALER
);
assert
(
frontend
);
void
*
backend
=
test_context_socket
(
ZMQ_DEALER
);
void
*
frontend
=
test_context_socket
(
ZMQ_DEALER
);
int
zero
=
0
;
rc
=
zmq_setsockopt
(
backend
,
ZMQ_LINGER
,
&
zero
,
sizeof
(
zero
));
assert
(
rc
==
0
);
rc
=
zmq_setsockopt
(
frontend
,
ZMQ_LINGER
,
&
zero
,
sizeof
(
zero
));
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
backend
,
ZMQ_LINGER
,
&
zero
,
sizeof
(
zero
))
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
frontend
,
ZMQ_LINGER
,
&
zero
,
sizeof
(
zero
))
);
// Frontend connects to backend using DELAY_ATTACH_ON_CONNECT
int
on
=
1
;
rc
=
zmq_setsockopt
(
frontend
,
ZMQ_DELAY_ATTACH_ON_CONNECT
,
&
on
,
sizeof
(
on
));
assert
(
rc
==
0
);
rc
=
zmq_bind
(
backend
,
"tipc://{5560,0,0}"
);
assert
(
rc
==
0
);
rc
=
zmq_connect
(
frontend
,
"tipc://{5560,0}@0.0.0"
);
assert
(
rc
==
0
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
frontend
,
ZMQ_DELAY_ATTACH_ON_CONNECT
,
&
on
,
sizeof
(
on
)));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
backend
,
"tipc://{5560,0,0}"
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
frontend
,
"tipc://{5560,0}@0.0.0"
));
// Ping backend to frontend so we know when the connection is up
rc
=
zmq_send
(
backend
,
"Hello"
,
5
,
0
);
assert
(
rc
==
5
);
rc
=
zmq_recv
(
frontend
,
buffer
,
255
,
0
);
assert
(
rc
==
5
);
send_string_expect_success
(
backend
,
"Hello"
,
0
);
recv_string_expect_success
(
frontend
,
"Hello"
,
0
);
// Send message from frontend to backend
rc
=
zmq_send
(
frontend
,
"Hello"
,
5
,
ZMQ_DONTWAIT
);
assert
(
rc
==
5
);
send_string_expect_success
(
frontend
,
"Hello"
,
ZMQ_DONTWAIT
);
rc
=
zmq_close
(
backend
);
assert
(
rc
==
0
);
test_context_socket_close
(
backend
);
// Give time to process disconnect
msleep
(
SETTLE_TIME
);
// Send a message, should fail
rc
=
zmq_send
(
frontend
,
"Hello"
,
5
,
ZMQ_DONTWAIT
);
assert
(
rc
==
-
1
);
TEST_ASSERT_FAILURE_ERRNO
(
EAGAIN
,
zmq_send
(
frontend
,
"Hello"
,
5
,
ZMQ_DONTWAIT
)
);
// Recreate backend socket
backend
=
zmq_socket
(
context
,
ZMQ_DEALER
);
assert
(
backend
);
rc
=
zmq_setsockopt
(
backend
,
ZMQ_LINGER
,
&
zero
,
sizeof
(
zero
));
assert
(
rc
==
0
);
rc
=
zmq_bind
(
backend
,
"tipc://{5560,0,0}"
);
assert
(
rc
==
0
);
backend
=
test_context_socket
(
ZMQ_DEALER
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
backend
,
ZMQ_LINGER
,
&
zero
,
sizeof
(
zero
)));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
backend
,
"tipc://{5560,0,0}"
));
// Ping backend to frontend so we know when the connection is up
rc
=
zmq_send
(
backend
,
"Hello"
,
5
,
0
);
assert
(
rc
==
5
);
rc
=
zmq_recv
(
frontend
,
buffer
,
255
,
0
);
assert
(
rc
==
5
);
send_string_expect_success
(
backend
,
"Hello"
,
0
);
recv_string_expect_success
(
frontend
,
"Hello"
,
0
);
// After the reconnect, should succeed
rc
=
zmq_send
(
frontend
,
"Hello"
,
5
,
ZMQ_DONTWAIT
);
assert
(
rc
==
5
);
send_string_expect_success
(
frontend
,
"Hello"
,
ZMQ_DONTWAIT
);
rc
=
zmq_close
(
backend
);
assert
(
rc
==
0
);
test_context_socket_close
(
backend
);
test_context_socket_close
(
frontend
);
}
rc
=
zmq_close
(
frontend
);
assert
(
rc
==
0
);
int
main
(
void
)
{
if
(
!
is_tipc_available
())
{
printf
(
"TIPC environment unavailable, skipping test
\n
"
);
return
77
;
}
rc
=
zmq_ctx_term
(
context
);
assert
(
rc
==
0
);
UNITY_BEGIN
();
RUN_TEST
(
test_send_one_connected_one_unconnected
);
RUN_TEST
(
test_send_one_connected_one_unconnected_with_delay
);
RUN_TEST
(
test_send_disconnected_with_delay
);
return
UNITY_END
();
}
tests/test_unbind_inproc.cpp
deleted
100644 → 0
View file @
31f69937
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
int
main
(
void
)
{
setup_test_environment
();
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
void
*
sb
=
zmq_socket
(
ctx
,
ZMQ_REP
);
assert
(
sb
);
int
rc
=
zmq_bind
(
sb
,
"inproc://a"
);
assert
(
rc
==
0
);
rc
=
zmq_unbind
(
sb
,
"inproc://a"
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
sb
);
assert
(
rc
==
0
);
rc
=
zmq_ctx_term
(
ctx
);
assert
(
rc
==
0
);
return
0
;
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment