Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
7bbccdea
Commit
7bbccdea
authored
Jun 17, 2012
by
Ian Barber
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #383 from Kobolog/master
Reworked ZMQ_FAIL_UNROUTABLE to actually work as it was intended.
parents
21eb8c8f
08749c8e
Show whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
92 additions
and
27 deletions
+92
-27
zmq_setsockopt.txt
doc/zmq_setsockopt.txt
+8
-8
zmq_socket.txt
doc/zmq_socket.txt
+5
-5
zmq.h
include/zmq.h
+1
-1
router.cpp
src/router.cpp
+9
-10
router.hpp
src/router.hpp
+3
-2
Makefile.am
tests/Makefile.am
+3
-1
test_router_behavior.cpp
tests/test_router_behavior.cpp
+63
-0
No files found.
doc/zmq_setsockopt.txt
View file @
7bbccdea
...
...
@@ -366,18 +366,18 @@ Default value:: 0 (false)
Applicable socket types:: all, primarily when using TCP/IPC transports.
ZMQ_
FAIL_UNROUTABLE: Set unroutable message
behavior
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~
ZMQ_
ROUTER_BEHAVIOR: Set the ROUTER socket
behavior
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the
behavior when an unroutable message is encountered in a 'ZMQ_ROUTER'
socket. A value of `0` is the default behavior when the message is silently
dropped, while a value of `1` forces the sending to fail with a 'EHOSTUNREACH'
error code
.
Sets the
'ROUTER' socket behavior when an unroutable message is encountered. A value
of `0` is the default when the message is silently discarded, while a value of `1`
forces the sending to fail with an 'EAGAIN' error code, effectively enabling sending
messages in a blocking fashion
.
[horizontal]
Option value type:: int
Option value unit::
boolean
Default value:: 0
(false)
Option value unit::
0, 1
Default value:: 0
Applicable socket types:: ZMQ_ROUTER
...
...
doc/zmq_socket.txt
View file @
7bbccdea
...
...
@@ -147,13 +147,13 @@ message before passing it to the application. Messages received are fair-queued
from among all connected peers. When sending messages a 'ZMQ_ROUTER' socket shall
remove the first part of the message and use it to determine the _identity_ of
the peer the message shall be routed to. If the peer does not exist anymore
the message shall be silently discarded.
the message shall be silently discarded by default, unless 'ZMQ_ROUTER_BEHAVIOR'
socket option is set to '1'.
When a 'ZMQ_ROUTER' socket enters an exceptional state due to having reached the
high water mark for all peers, or if there are no peers at all, then any
messages sent to the socket shall be dropped until the exceptional state ends.
Likewise, any messages routed to a non-existent peer or a peer for which the
individual high water mark has been reached shall also be dropped.
high water mark for all peers, then any messages sent to the socket shall be dropped
until the exceptional state ends. Likewise, any messages routed to a peer for which
the individual high water mark has been reached shall also be dropped.
When a 'ZMQ_REQ' socket is connected to a 'ZMQ_ROUTER' socket, in addition to the
_identity_ of the originating peer each message received shall contain an empty
...
...
include/zmq.h
View file @
7bbccdea
...
...
@@ -221,7 +221,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_SNDTIMEO 28
#define ZMQ_IPV4ONLY 31
#define ZMQ_LAST_ENDPOINT 32
#define ZMQ_
FAIL_UNROUTABLE
33
#define ZMQ_
ROUTER_BEHAVIOR
33
#define ZMQ_TCP_KEEPALIVE 34
#define ZMQ_TCP_KEEPALIVE_CNT 35
#define ZMQ_TCP_KEEPALIVE_IDLE 36
...
...
src/router.cpp
View file @
7bbccdea
...
...
@@ -35,7 +35,7 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
current_out
(
NULL
),
more_out
(
false
),
next_peer_id
(
generate_random
()),
fail
_unroutable
(
false
)
report
_unroutable
(
false
)
{
options
.
type
=
ZMQ_ROUTER
;
...
...
@@ -74,7 +74,7 @@ void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
int
zmq
::
router_t
::
xsetsockopt
(
int
option_
,
const
void
*
optval_
,
size_t
optvallen_
)
{
if
(
option_
!=
ZMQ_
FAIL_UNROUTABLE
)
{
if
(
option_
!=
ZMQ_
ROUTER_BEHAVIOR
)
{
errno
=
EINVAL
;
return
-
1
;
}
...
...
@@ -82,7 +82,7 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
errno
=
EINVAL
;
return
-
1
;
}
fail
_unroutable
=
*
static_cast
<
const
int
*>
(
optval_
);
report
_unroutable
=
*
static_cast
<
const
int
*>
(
optval_
);
return
0
;
}
...
...
@@ -135,8 +135,6 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_)
if
(
!
more_out
)
{
zmq_assert
(
!
current_out
);
int
retval
=
0
;
// If we have malformed message (prefix with no subsequent message)
// then just silently ignore it.
// TODO: The connections should be killed instead.
...
...
@@ -146,7 +144,7 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_)
// Find the pipe associated with the identity stored in the prefix.
// If there's no such pipe just silently ignore the message, unless
//
fail
_unreachable is set.
//
report
_unreachable is set.
blob_t
identity
((
unsigned
char
*
)
msg_
->
data
(),
msg_
->
size
());
outpipes_t
::
iterator
it
=
outpipes
.
find
(
identity
);
...
...
@@ -156,9 +154,10 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_)
it
->
second
.
active
=
false
;
current_out
=
NULL
;
}
}
else
if
(
fail_unroutable
)
{
errno
=
EHOSTUNREACH
;
retval
=
-
1
;
}
else
if
(
report_unroutable
)
{
more_out
=
false
;
errno
=
EAGAIN
;
return
-
1
;
}
}
...
...
@@ -166,7 +165,7 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_)
errno_assert
(
rc
==
0
);
rc
=
msg_
->
init
();
errno_assert
(
rc
==
0
);
return
retval
;
return
0
;
}
// Check whether this is the last part of the message.
...
...
src/router.hpp
View file @
7bbccdea
...
...
@@ -110,8 +110,9 @@ namespace zmq
// algorithm. This value is the next ID to use (if not used already).
uint32_t
next_peer_id
;
// If true, fail on unroutable messages instead of silently dropping them.
bool
fail_unroutable
;
// If true, report EAGAIN to the caller instead of silently dropping
// the message targeting an unknown peer.
bool
report_unroutable
;
router_t
(
const
router_t
&
);
const
router_t
&
operator
=
(
const
router_t
&
);
...
...
tests/Makefile.am
View file @
7bbccdea
...
...
@@ -16,7 +16,8 @@ noinst_PROGRAMS = test_pair_inproc \
test_connect_delay
\
test_last_endpoint
\
test_term_endpoint
\
test_monitor
test_monitor
\
test_router_behavior
if
!ON_MINGW
noinst_PROGRAMS
+=
test_shutdown_stress
\
...
...
@@ -39,6 +40,7 @@ test_connect_delay_SOURCES = test_connect_delay.cpp
test_last_endpoint_SOURCES
=
test_last_endpoint.cpp
test_term_endpoint_SOURCES
=
test_term_endpoint.cpp
test_monitor_SOURCES
=
test_monitor.cpp
test_router_behavior_SOURCES
=
test_router_behavior.cpp
if
!ON_MINGW
test_shutdown_stress_SOURCES
=
test_shutdown_stress.cpp
...
...
tests/test_router_behavior.cpp
0 → 100644
View file @
7bbccdea
/*
Copyright (c) 2010-2011 250bpm s.r.o.
Copyright (c) 2011 iMatix Corporation
Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include "testutil.hpp"
int
main
(
int
argc
,
char
*
argv
[])
{
fprintf
(
stderr
,
"test_router_behavior running...
\n
"
);
void
*
ctx
=
zmq_init
(
1
);
assert
(
ctx
);
// Creating the first socket.
void
*
sa
=
zmq_socket
(
ctx
,
ZMQ_ROUTER
);
assert
(
sa
);
int
rc
=
zmq_bind
(
sa
,
"tcp://127.0.0.1:15560"
);
assert
(
rc
==
0
);
// Sending a message to an unknown peer with the default behavior.
rc
=
zmq_send
(
sa
,
"UNKNOWN"
,
7
,
ZMQ_SNDMORE
);
assert
(
rc
==
7
);
rc
=
zmq_send
(
sa
,
"DATA"
,
4
,
0
);
assert
(
rc
==
4
);
int
behavior
=
1
;
// Setting the socket behavior to a new mode.
rc
=
zmq_setsockopt
(
sa
,
ZMQ_ROUTER_BEHAVIOR
,
&
behavior
,
sizeof
(
behavior
));
assert
(
rc
==
0
);
// Sending a message to an unknown peer with verbose behavior.
rc
=
zmq_send
(
sa
,
"UNKNOWN"
,
7
,
ZMQ_SNDMORE
|
ZMQ_DONTWAIT
);
assert
(
rc
==
-
1
&&
errno
==
EAGAIN
);
rc
=
zmq_close
(
sa
);
assert
(
rc
==
0
);
rc
=
zmq_term
(
ctx
);
assert
(
rc
==
0
);
return
0
;
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment