Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
08b01a51
Commit
08b01a51
authored
Sep 07, 2017
by
Doron Somech
Committed by
GitHub
Sep 07, 2017
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #2743 from sigiesec/rename-identity
Problem: term "identity" is confusing
parents
876d9073
cd55c624
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
43 changed files
with
320 additions
and
287 deletions
+320
-287
zmq_getsockopt.txt
doc/zmq_getsockopt.txt
+20
-14
zmq_msg_gets.txt
doc/zmq_msg_gets.txt
+4
-2
zmq_setsockopt.txt
doc/zmq_setsockopt.txt
+48
-33
zmq_socket.txt
doc/zmq_socket.txt
+20
-20
zmq.h
include/zmq.h
+7
-5
ctx.cpp
src/ctx.cpp
+8
-8
mechanism.cpp
src/mechanism.cpp
+13
-13
mechanism.hpp
src/mechanism.hpp
+3
-3
metadata.cpp
src/metadata.cpp
+7
-1
msg.cpp
src/msg.cpp
+2
-2
msg.hpp
src/msg.hpp
+2
-2
options.cpp
src/options.cpp
+10
-10
options.hpp
src/options.hpp
+4
-4
pipe.cpp
src/pipe.cpp
+12
-12
pipe.hpp
src/pipe.hpp
+6
-6
req.cpp
src/req.cpp
+1
-1
router.cpp
src/router.cpp
+0
-0
router.hpp
src/router.hpp
+2
-2
server.cpp
src/server.cpp
+6
-6
server.hpp
src/server.hpp
+1
-1
session_base.cpp
src/session_base.cpp
+3
-3
socket_base.cpp
src/socket_base.cpp
+22
-22
socket_base.hpp
src/socket_base.hpp
+1
-1
stream.cpp
src/stream.cpp
+36
-36
stream.hpp
src/stream.hpp
+3
-3
stream_engine.cpp
src/stream_engine.cpp
+25
-25
stream_engine.hpp
src/stream_engine.hpp
+2
-2
zap_client.cpp
src/zap_client.cpp
+3
-3
zmq.cpp
src/zmq.cpp
+3
-3
zmq_draft.h
src/zmq_draft.h
+2
-2
test_connect_rid.cpp
tests/test_connect_rid.cpp
+9
-9
test_issue_566.cpp
tests/test_issue_566.cpp
+1
-1
test_probe_router.cpp
tests/test_probe_router.cpp
+1
-1
test_router_handover.cpp
tests/test_router_handover.cpp
+2
-2
test_router_mandatory.cpp
tests/test_router_mandatory.cpp
+3
-3
test_security_curve.cpp
tests/test_security_curve.cpp
+13
-9
test_security_plain.cpp
tests/test_security_plain.cpp
+1
-1
test_security_zap.cpp
tests/test_security_zap.cpp
+7
-9
test_spec_req.cpp
tests/test_spec_req.cpp
+1
-1
test_spec_router.cpp
tests/test_spec_router.cpp
+2
-2
test_stream.cpp
tests/test_stream.cpp
+1
-1
test_stream_disconnect.cpp
tests/test_stream_disconnect.cpp
+2
-2
testutil_security.hpp
tests/testutil_security.hpp
+1
-1
No files found.
doc/zmq_getsockopt.txt
View file @
08b01a51
...
@@ -289,7 +289,7 @@ ZMQ_HANDSHAKE_IVL: Retrieve maximum handshake interval
...
@@ -289,7 +289,7 @@ ZMQ_HANDSHAKE_IVL: Retrieve maximum handshake interval
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_HANDSHAKE_IVL' option shall retrieve the maximum handshake interval
The 'ZMQ_HANDSHAKE_IVL' option shall retrieve the maximum handshake interval
for the specified 'socket'. Handshaking is the exchange of socket configuration
for the specified 'socket'. Handshaking is the exchange of socket configuration
information (socket type,
identity
, security) that occurs when a connection
information (socket type,
routing id
, security) that occurs when a connection
is first opened, only for connection-oriented transports. If handshaking does
is first opened, only for connection-oriented transports. If handshaking does
not complete within the configured time, the connection shall be closed.
not complete within the configured time, the connection shall be closed.
The value 0 means no handshake time limit.
The value 0 means no handshake time limit.
...
@@ -303,19 +303,8 @@ Applicable socket types:: all but ZMQ_STREAM, only for connection-oriented trans
...
@@ -303,19 +303,8 @@ Applicable socket types:: all but ZMQ_STREAM, only for connection-oriented trans
ZMQ_IDENTITY: Retrieve socket identity
ZMQ_IDENTITY: Retrieve socket identity
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_IDENTITY' option shall retrieve the identity of the specified 'socket'.
This option name is now deprecated. Use ZMQ_ROUTING_ID instead.
Socket identity is used only by request/reply pattern. Namely, it can be used
ZMQ_IDENTITY remains as an alias for now.
in tandem with ROUTER socket to route messages to the peer with specific
identity.
Identity should be at least one byte and at most 255 bytes long. Identities
starting with binary zero are reserved for use by 0MQ infrastructure.
[horizontal]
Option value type:: binary data
Option value unit:: N/A
Default value:: NULL
Applicable socket types:: ZMQ_REP, ZMQ_REQ, ZMQ_ROUTER, ZMQ_DEALER.
ZMQ_IMMEDIATE: Retrieve attach-on-connect value
ZMQ_IMMEDIATE: Retrieve attach-on-connect value
...
@@ -656,6 +645,23 @@ Default value:: 10000
...
@@ -656,6 +645,23 @@ Default value:: 10000
Applicable socket types:: all, when using multicast transports
Applicable socket types:: all, when using multicast transports
ZMQ_ROUTING_ID: Retrieve socket routing id
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_ROUTING_ID' option shall retrieve the routing id of the specified 'socket'.
Routing ids are used only by the request/reply pattern. Specifically, it can be used
in tandem with ROUTER socket to route messages to the peer with a specific
routing id.
A routing id must be at least one byte and at most 255 bytes long. Identities
starting with a zero byte are reserved for use by the 0MQ infrastructure.
[horizontal]
Option value type:: binary data
Option value unit:: N/A
Default value:: NULL
Applicable socket types:: ZMQ_REP, ZMQ_REQ, ZMQ_ROUTER, ZMQ_DEALER.
ZMQ_SNDBUF: Retrieve kernel transmit buffer size
ZMQ_SNDBUF: Retrieve kernel transmit buffer size
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_SNDBUF' option shall retrieve the underlying kernel transmit buffer
The 'ZMQ_SNDBUF' option shall retrieve the underlying kernel transmit buffer
...
...
doc/zmq_msg_gets.txt
View file @
08b01a51
...
@@ -26,14 +26,16 @@ The following ZMTP properties can be retrieved with the _zmq_msg_gets()_
...
@@ -26,14 +26,16 @@ The following ZMTP properties can be retrieved with the _zmq_msg_gets()_
function:
function:
Socket-Type
Socket-Type
Identity
Routing-Id
Note: 'Identity' is a deprecated alias for 'Routing-Id'.
Additionally, when available for the underlying transport, the *Peer-Address*
Additionally, when available for the underlying transport, the *Peer-Address*
property will return the IP address of the remote endpoint as returned by
property will return the IP address of the remote endpoint as returned by
getnameinfo(2).
getnameinfo(2).
The names of these properties are also defined in _zmq.h_ as
The names of these properties are also defined in _zmq.h_ as
_ZMQ_MSG_PROPERTY_SOCKET_TYPE_ _ZMQ_MSG_PROPERTY_
IDENTITY
_, and
_ZMQ_MSG_PROPERTY_SOCKET_TYPE_ _ZMQ_MSG_PROPERTY_
ROUTING_ID
_, and
_ZMQ_MSG_PROPERTY_PEER_ADDRESS_.
_ZMQ_MSG_PROPERTY_PEER_ADDRESS_.
Currently, these definitions are only available as a DRAFT API.
Currently, these definitions are only available as a DRAFT API.
...
...
doc/zmq_setsockopt.txt
View file @
08b01a51
...
@@ -90,22 +90,29 @@ Applicable socket types:: all, when using TCP or UDP transports.
...
@@ -90,22 +90,29 @@ Applicable socket types:: all, when using TCP or UDP transports.
ZMQ_CONNECT_RID: Assign the next outbound connection id
ZMQ_CONNECT_RID: Assign the next outbound connection id
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_CONNECT_RID' option sets the peer id of the next host connected
This option name is now deprecated. Use ZMQ_CONNECT_ROUTING_ID instead.
via the zmq_connect() call, and immediately readies that connection for
ZMQ_CONNECT_RID remains as an alias for now.
data transfer with the named id. This option applies only to the first
subsequent call to zmq_connect(), calls thereafter use default connection
behaviour.
Typical use is to set this socket option ahead of each zmq_connect() attempt
to a new host. Each connection MUST be assigned a unique name. Assigning a
ZMQ_CONNECT_ROUTING_ID: Assign the next outbound routing id
name that is already in use is not allowed.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_CONNECT_ROUTING_ID' option sets the peer id of the peer connected
via the next zmq_connect() call, such that that connection is immediately ready for
data transfer with the given routing id. This option applies only to the first
subsequent call to zmq_connect(), zmq_connect() calls thereafter use the default
connection behaviour.
Typical use is to set this socket option ahead of each zmq_connect() call.
Each connection MUST be assigned a unique routing id. Assigning a
routing id that is already in use is not allowed.
Useful when connecting ROUTER to ROUTER, or STREAM to STREAM, as it
Useful when connecting ROUTER to ROUTER, or STREAM to STREAM, as it
allows for immediate sending to peers. Outbound id framing requirements
allows for immediate sending to peers. Outbound
routing
id framing requirements
for ROUTER and STREAM sockets apply.
for ROUTER and STREAM sockets apply.
The peer id should be from 1 to 255 bytes long and MAY NOT start with
The routing id must be from 1 to 255 bytes long and MAY NOT start with
binary zero.
a zero byte (such routing ids are reserved for internal use by the 0MQ
infrastructure).
[horizontal]
[horizontal]
Option value type:: binary data
Option value type:: binary data
...
@@ -305,7 +312,7 @@ ZMQ_HANDSHAKE_IVL: Set maximum handshake interval
...
@@ -305,7 +312,7 @@ ZMQ_HANDSHAKE_IVL: Set maximum handshake interval
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_HANDSHAKE_IVL' option shall set the maximum handshake interval for
The 'ZMQ_HANDSHAKE_IVL' option shall set the maximum handshake interval for
the specified 'socket'. Handshaking is the exchange of socket configuration
the specified 'socket'. Handshaking is the exchange of socket configuration
information (socket type,
identity
, security) that occurs when a connection
information (socket type,
routing id
, security) that occurs when a connection
is first opened, only for connection-oriented transports. If handshaking does
is first opened, only for connection-oriented transports. If handshaking does
not complete within the configured time, the connection shall be closed.
not complete within the configured time, the connection shall be closed.
The value 0 means no handshake time limit.
The value 0 means no handshake time limit.
...
@@ -364,22 +371,8 @@ Applicable socket types:: all, when using connection-oriented transports
...
@@ -364,22 +371,8 @@ Applicable socket types:: all, when using connection-oriented transports
ZMQ_IDENTITY: Set socket identity
ZMQ_IDENTITY: Set socket identity
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_IDENTITY' option shall set the identity of the specified 'socket'
This option name is now deprecated. Use ZMQ_ROUTING_ID instead.
when connecting to a ROUTER socket. The identity should be from 1 to 255
ZMQ_IDENTITY remains as an alias for now.
bytes long and may contain any values.
If two clients use the same identity when connecting to a ROUTER, the
results shall depend on the ZMQ_ROUTER_HANDOVER option setting. If that
is not set (or set to the default of zero), the ROUTER socket shall reject
clients trying to connect with an already-used identity. If that option
is set to 1, the ROUTER socket shall hand-over the connection to the new
client and disconnect the existing one.
[horizontal]
Option value type:: binary data
Option value unit:: N/A
Default value:: NULL
Applicable socket types:: ZMQ_REQ, ZMQ_REP, ZMQ_ROUTER, ZMQ_DEALER.
ZMQ_IMMEDIATE: Queue messages only to completed connections
ZMQ_IMMEDIATE: Queue messages only to completed connections
...
@@ -737,12 +730,12 @@ Default value:: 0
...
@@ -737,12 +730,12 @@ Default value:: 0
Applicable socket types:: ZMQ_REQ
Applicable socket types:: ZMQ_REQ
ZMQ_ROUTER_HANDOVER: handle duplicate client
identitie
s on ROUTER sockets
ZMQ_ROUTER_HANDOVER: handle duplicate client
routing id
s on ROUTER sockets
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~
If two clients use the same
identity
when connecting to a ROUTER, the
If two clients use the same
routing id
when connecting to a ROUTER, the
results shall depend on the ZMQ_ROUTER_HANDOVER option setting. If that
results shall depend on the ZMQ_ROUTER_HANDOVER option setting. If that
is not set (or set to the default of zero), the ROUTER socket shall reject
is not set (or set to the default of zero), the ROUTER socket shall reject
clients trying to connect with an already-used
identity
. If that option
clients trying to connect with an already-used
routing id
. If that option
is set to 1, the ROUTER socket shall hand-over the connection to the new
is set to 1, the ROUTER socket shall hand-over the connection to the new
client and disconnect the existing one.
client and disconnect the existing one.
...
@@ -782,7 +775,7 @@ raw mode, and when using the tcp:// transport, it will read and write TCP data
...
@@ -782,7 +775,7 @@ raw mode, and when using the tcp:// transport, it will read and write TCP data
without 0MQ framing. This lets 0MQ applications talk to non-0MQ applications.
without 0MQ framing. This lets 0MQ applications talk to non-0MQ applications.
When using raw mode, you cannot set explicit identities, and the ZMQ_SNDMORE
When using raw mode, you cannot set explicit identities, and the ZMQ_SNDMORE
flag is ignored when sending data messages. In raw mode you can close a specific
flag is ignored when sending data messages. In raw mode you can close a specific
connection by sending it a zero-length message (following the
identity
frame).
connection by sending it a zero-length message (following the
routing id
frame).
NOTE: This option is deprecated, please use ZMQ_STREAM sockets instead.
NOTE: This option is deprecated, please use ZMQ_STREAM sockets instead.
...
@@ -793,6 +786,28 @@ Default value:: 0
...
@@ -793,6 +786,28 @@ Default value:: 0
Applicable socket types:: ZMQ_ROUTER
Applicable socket types:: ZMQ_ROUTER
ZMQ_ROUTING_ID: Set socket routing id
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_ROUTING_ID' option shall set the routing id of the specified 'socket'
when connecting to a ROUTER socket.
A routing id must be at least one byte and at most 255 bytes long. Identities
starting with a zero byte are reserved for use by the 0MQ infrastructure.
If two clients use the same routing id when connecting to a ROUTER, the
results shall depend on the ZMQ_ROUTER_HANDOVER option setting. If that
is not set (or set to the default of zero), the ROUTER socket shall reject
clients trying to connect with an already-used routing id. If that option
is set to 1, the ROUTER socket shall hand-over the connection to the new
client and disconnect the existing one.
[horizontal]
Option value type:: binary data
Option value unit:: N/A
Default value:: NULL
Applicable socket types:: ZMQ_REQ, ZMQ_REP, ZMQ_ROUTER, ZMQ_DEALER.
ZMQ_SNDBUF: Set kernel transmit buffer size
ZMQ_SNDBUF: Set kernel transmit buffer size
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_SNDBUF' option shall set the underlying kernel transmit buffer size
The 'ZMQ_SNDBUF' option shall set the underlying kernel transmit buffer size
...
...
doc/zmq_socket.txt
View file @
08b01a51
...
@@ -383,26 +383,26 @@ non-0MQ peer, when using the tcp:// transport. A 'ZMQ_STREAM' socket can
...
@@ -383,26 +383,26 @@ non-0MQ peer, when using the tcp:// transport. A 'ZMQ_STREAM' socket can
act as client and/or server, sending and/or receiving TCP data asynchronously.
act as client and/or server, sending and/or receiving TCP data asynchronously.
When receiving TCP data, a 'ZMQ_STREAM' socket shall prepend a message part
When receiving TCP data, a 'ZMQ_STREAM' socket shall prepend a message part
containing the _
identity
_ of the originating peer to the message before passing
containing the _
routing id
_ of the originating peer to the message before passing
it to the application. Messages received are fair-queued from among all
it to the application. Messages received are fair-queued from among all
connected peers.
connected peers.
When sending TCP data, a 'ZMQ_STREAM' socket shall remove the first part of the
When sending TCP data, a 'ZMQ_STREAM' socket shall remove the first part of the
message and use it to determine the _
identity
_ of the peer the message shall be
message and use it to determine the _
routing id
_ of the peer the message shall be
routed to, and unroutable messages shall cause an EHOSTUNREACH or EAGAIN error.
routed to, and unroutable messages shall cause an EHOSTUNREACH or EAGAIN error.
To open a connection to a server, use the zmq_connect call, and then fetch the
To open a connection to a server, use the zmq_connect call, and then fetch the
socket
identity using the ZMQ_IDENTITY zmq_getsockopt call
.
socket
routing id using the zmq_getsockopt call with the ZMQ_ROUTING_ID option
.
To close a specific connection, send the
identity
frame followed by a
To close a specific connection, send the
routing id
frame followed by a
zero-length message (see EXAMPLE section).
zero-length message (see EXAMPLE section).
When a connection is made, a zero-length message will be received by the
When a connection is made, a zero-length message will be received by the
application. Similarly, when the peer disconnects (or the connection is lost),
application. Similarly, when the peer disconnects (or the connection is lost),
a zero-length message will be received by the application.
a zero-length message will be received by the application.
You must send one
identity
frame followed by one data frame. The ZMQ_SNDMORE
You must send one
routing id
frame followed by one data frame. The ZMQ_SNDMORE
flag is required for
identity
frames but is ignored on data frames.
flag is required for
routing id
frames but is ignored on data frames.
[horizontal]
[horizontal]
.Summary of ZMQ_STREAM characteristics
.Summary of ZMQ_STREAM characteristics
...
@@ -492,10 +492,10 @@ ZMQ_ROUTER
...
@@ -492,10 +492,10 @@ ZMQ_ROUTER
^^^^^^^^^^
^^^^^^^^^^
A socket of type 'ZMQ_ROUTER' is an advanced socket type used for extending
A socket of type 'ZMQ_ROUTER' is an advanced socket type used for extending
request/reply sockets. When receiving messages a 'ZMQ_ROUTER' socket shall
request/reply sockets. When receiving messages a 'ZMQ_ROUTER' socket shall
prepend a message part containing the _
identity
_ of the originating peer to the
prepend a message part containing the _
routing id
_ of the originating peer to the
message before passing it to the application. Messages received are fair-queued
message before passing it to the application. Messages received are fair-queued
from among all connected peers. When sending messages a 'ZMQ_ROUTER' socket shall
from among all connected peers. When sending messages a 'ZMQ_ROUTER' socket shall
remove the first part of the message and use it to determine the _
identity
_ of
remove the first part of the message and use it to determine the _
routing id
_ of
the peer the message shall be routed to. If the peer does not exist anymore, or
the peer the message shall be routed to. If the peer does not exist anymore, or
has never existed, the message shall be silently discarded. However, if
has never existed, the message shall be silently discarded. However, if
'ZMQ_ROUTER_MANDATORY' socket option is set to '1', the socket shall fail
'ZMQ_ROUTER_MANDATORY' socket option is set to '1', the socket shall fail
...
@@ -514,9 +514,9 @@ or more peers. Likewise, the socket shall generate 'ZMQ_POLLOUT' events when
...
@@ -514,9 +514,9 @@ or more peers. Likewise, the socket shall generate 'ZMQ_POLLOUT' events when
at least one message can be sent to one or more peers.
at least one message can be sent to one or more peers.
When a 'ZMQ_REQ' socket is connected to a 'ZMQ_ROUTER' socket, in addition to the
When a 'ZMQ_REQ' socket is connected to a 'ZMQ_ROUTER' socket, in addition to the
_
identity
_ of the originating peer each message received shall contain an empty
_
routing id
_ of the originating peer each message received shall contain an empty
_delimiter_ message part. Hence, the entire structure of each received message
_delimiter_ message part. Hence, the entire structure of each received message
as seen by the application becomes: one or more _
identity
_ parts, _delimiter_
as seen by the application becomes: one or more _
routing id
_ parts, _delimiter_
part, one or more _body parts_. When sending replies to a 'ZMQ_REQ' socket the
part, one or more _body parts_. When sending replies to a 'ZMQ_REQ' socket the
application must include the _delimiter_ part.
application must include the _delimiter_ part.
...
@@ -559,16 +559,16 @@ void *socket = zmq_socket (ctx, ZMQ_STREAM);
...
@@ -559,16 +559,16 @@ void *socket = zmq_socket (ctx, ZMQ_STREAM);
assert (socket);
assert (socket);
int rc = zmq_bind (socket, "tcp://*:8080");
int rc = zmq_bind (socket, "tcp://*:8080");
assert (rc == 0);
assert (rc == 0);
/* Data structure to hold the ZMQ_STREAM
ID
*/
/* Data structure to hold the ZMQ_STREAM
routing id
*/
uint8_t id [256];
uint8_t
routing_
id [256];
size_t id_size = 256;
size_t
routing_
id_size = 256;
/* Data structure to hold the ZMQ_STREAM received data */
/* Data structure to hold the ZMQ_STREAM received data */
uint8_t raw [256];
uint8_t raw [256];
size_t raw_size = 256;
size_t raw_size = 256;
while (1) {
while (1) {
/* Get HTTP request;
ID
frame and then request */
/* Get HTTP request;
routing id
frame and then request */
id_size = zmq_recv (socket,
id, 256, 0);
routing_id_size = zmq_recv (socket, routing_
id, 256, 0);
assert (id_size > 0);
assert (
routing_
id_size > 0);
do {
do {
raw_size = zmq_recv (socket, raw, 256, 0);
raw_size = zmq_recv (socket, raw, 256, 0);
assert (raw_size >= 0);
assert (raw_size >= 0);
...
@@ -579,11 +579,11 @@ while (1) {
...
@@ -579,11 +579,11 @@ while (1) {
"Content-Type: text/plain\r\n"
"Content-Type: text/plain\r\n"
"\r\n"
"\r\n"
"Hello, World!";
"Hello, World!";
/* Sends the
ID
frame followed by the response */
/* Sends the
routing id
frame followed by the response */
zmq_send (socket,
id,
id_size, ZMQ_SNDMORE);
zmq_send (socket,
routing_id, routing_
id_size, ZMQ_SNDMORE);
zmq_send (socket, http_response, strlen (http_response), 0);
zmq_send (socket, http_response, strlen (http_response), 0);
/* Closes the connection by sending the
ID
frame followed by a zero response */
/* Closes the connection by sending the
routing id
frame followed by a zero response */
zmq_send (socket,
id,
id_size, ZMQ_SNDMORE);
zmq_send (socket,
routing_id, routing_
id_size, ZMQ_SNDMORE);
zmq_send (socket, 0, 0, 0);
zmq_send (socket, 0, 0, 0);
}
}
zmq_close (socket);
zmq_close (socket);
...
...
include/zmq.h
View file @
08b01a51
...
@@ -299,7 +299,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (const zmq_msg_t *msg, const char *property)
...
@@ -299,7 +299,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (const zmq_msg_t *msg, const char *property)
/* Socket options. */
/* Socket options. */
#define ZMQ_AFFINITY 4
#define ZMQ_AFFINITY 4
#define ZMQ_
IDENTITY
5
#define ZMQ_
ROUTING_ID
5
#define ZMQ_SUBSCRIBE 6
#define ZMQ_SUBSCRIBE 6
#define ZMQ_UNSUBSCRIBE 7
#define ZMQ_UNSUBSCRIBE 7
#define ZMQ_RATE 8
#define ZMQ_RATE 8
...
@@ -345,7 +345,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (const zmq_msg_t *msg, const char *property)
...
@@ -345,7 +345,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (const zmq_msg_t *msg, const char *property)
#define ZMQ_ZAP_DOMAIN 55
#define ZMQ_ZAP_DOMAIN 55
#define ZMQ_ROUTER_HANDOVER 56
#define ZMQ_ROUTER_HANDOVER 56
#define ZMQ_TOS 57
#define ZMQ_TOS 57
#define ZMQ_CONNECT_RID 61
#define ZMQ_CONNECT_R
OUTING_
ID 61
#define ZMQ_GSSAPI_SERVER 62
#define ZMQ_GSSAPI_SERVER 62
#define ZMQ_GSSAPI_PRINCIPAL 63
#define ZMQ_GSSAPI_PRINCIPAL 63
#define ZMQ_GSSAPI_SERVICE_PRINCIPAL 64
#define ZMQ_GSSAPI_SERVICE_PRINCIPAL 64
...
@@ -390,6 +390,8 @@ ZMQ_EXPORT const char *zmq_msg_gets (const zmq_msg_t *msg, const char *property)
...
@@ -390,6 +390,8 @@ ZMQ_EXPORT const char *zmq_msg_gets (const zmq_msg_t *msg, const char *property)
#define ZMQ_GROUP_MAX_LENGTH 15
#define ZMQ_GROUP_MAX_LENGTH 15
/* Deprecated options and aliases */
/* Deprecated options and aliases */
#define ZMQ_IDENTITY ZMQ_ROUTING_ID
#define ZMQ_CONNECT_RID ZMQ_CONNECT_ROUTING_ID
#define ZMQ_TCP_ACCEPT_FILTER 38
#define ZMQ_TCP_ACCEPT_FILTER 38
#define ZMQ_IPC_FILTER_PID 58
#define ZMQ_IPC_FILTER_PID 58
#define ZMQ_IPC_FILTER_UID 59
#define ZMQ_IPC_FILTER_UID 59
...
@@ -620,7 +622,7 @@ ZMQ_EXPORT int zmq_msg_set_group(zmq_msg_t *msg, const char *group);
...
@@ -620,7 +622,7 @@ ZMQ_EXPORT int zmq_msg_set_group(zmq_msg_t *msg, const char *group);
ZMQ_EXPORT
const
char
*
zmq_msg_group
(
zmq_msg_t
*
msg
);
ZMQ_EXPORT
const
char
*
zmq_msg_group
(
zmq_msg_t
*
msg
);
/* DRAFT Msg property names. */
/* DRAFT Msg property names. */
#define ZMQ_MSG_PROPERTY_
IDENTITY "Identity
"
#define ZMQ_MSG_PROPERTY_
ROUTING_ID "Routing-Id
"
#define ZMQ_MSG_PROPERTY_SOCKET_TYPE "Socket-Type"
#define ZMQ_MSG_PROPERTY_SOCKET_TYPE "Socket-Type"
#define ZMQ_MSG_PROPERTY_USER_ID "User-Id"
#define ZMQ_MSG_PROPERTY_USER_ID "User-Id"
#define ZMQ_MSG_PROPERTY_PEER_ADDRESS "Peer-Address"
#define ZMQ_MSG_PROPERTY_PEER_ADDRESS "Peer-Address"
...
@@ -662,8 +664,8 @@ ZMQ_EXPORT int zmq_poller_remove_fd (void *poller, int fd);
...
@@ -662,8 +664,8 @@ ZMQ_EXPORT int zmq_poller_remove_fd (void *poller, int fd);
#endif
#endif
ZMQ_EXPORT
int
zmq_socket_get_peer_state
(
void
*
socket
,
ZMQ_EXPORT
int
zmq_socket_get_peer_state
(
void
*
socket
,
const
void
*
identity
,
const
void
*
routing_id
,
size_t
identity
_size
);
size_t
routing_id
_size
);
/******************************************************************************/
/******************************************************************************/
/* Scheduling timers */
/* Scheduling timers */
...
...
src/ctx.cpp
View file @
08b01a51
...
@@ -529,7 +529,7 @@ void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
...
@@ -529,7 +529,7 @@ void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
bind_socket_
->
inc_seqnum
();
bind_socket_
->
inc_seqnum
();
pending_connection_
.
bind_pipe
->
set_tid
(
bind_socket_
->
get_tid
());
pending_connection_
.
bind_pipe
->
set_tid
(
bind_socket_
->
get_tid
());
if
(
!
bind_options
.
recv_
identity
)
{
if
(
!
bind_options
.
recv_
routing_id
)
{
msg_t
msg
;
msg_t
msg
;
const
bool
ok
=
pending_connection_
.
bind_pipe
->
read
(
&
msg
);
const
bool
ok
=
pending_connection_
.
bind_pipe
->
read
(
&
msg
);
zmq_assert
(
ok
);
zmq_assert
(
ok
);
...
@@ -569,16 +569,16 @@ void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
...
@@ -569,16 +569,16 @@ void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
// When a ctx is terminated all pending inproc connection will be
// When a ctx is terminated all pending inproc connection will be
// connected, but the socket will already be closed and the pipe will be
// connected, but the socket will already be closed and the pipe will be
// in waiting_for_delimiter state, which means no more writes can be done
// in waiting_for_delimiter state, which means no more writes can be done
// and the
identity
write fails and causes an assert. Check if the socket
// and the
routing id
write fails and causes an assert. Check if the socket
// is open before sending.
// is open before sending.
if
(
pending_connection_
.
endpoint
.
options
.
recv_
identity
&&
if
(
pending_connection_
.
endpoint
.
options
.
recv_
routing_id
&&
pending_connection_
.
endpoint
.
socket
->
check_tag
())
{
pending_connection_
.
endpoint
.
socket
->
check_tag
())
{
msg_t
id
;
msg_t
routing_
id
;
const
int
rc
=
id
.
init_size
(
bind_options
.
identity
_size
);
const
int
rc
=
routing_id
.
init_size
(
bind_options
.
routing_id
_size
);
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
memcpy
(
id
.
data
(),
bind_options
.
identity
,
bind_options
.
identity
_size
);
memcpy
(
routing_id
.
data
(),
bind_options
.
routing_id
,
bind_options
.
routing_id
_size
);
id
.
set_flags
(
msg_t
::
identity
);
routing_id
.
set_flags
(
msg_t
::
routing_id
);
const
bool
written
=
pending_connection_
.
bind_pipe
->
write
(
&
id
);
const
bool
written
=
pending_connection_
.
bind_pipe
->
write
(
&
routing_
id
);
zmq_assert
(
written
);
zmq_assert
(
written
);
pending_connection_
.
bind_pipe
->
flush
();
pending_connection_
.
bind_pipe
->
flush
();
}
}
...
...
src/mechanism.cpp
View file @
08b01a51
...
@@ -46,17 +46,17 @@ zmq::mechanism_t::~mechanism_t ()
...
@@ -46,17 +46,17 @@ zmq::mechanism_t::~mechanism_t ()
{
{
}
}
void
zmq
::
mechanism_t
::
set_peer_
identity
(
const
void
*
id_ptr
,
size_t
id_size
)
void
zmq
::
mechanism_t
::
set_peer_
routing_id
(
const
void
*
id_ptr
,
size_t
id_size
)
{
{
identity
=
blob_t
(
static_cast
<
const
unsigned
char
*>
(
id_ptr
),
id_size
);
routing_id
=
blob_t
(
static_cast
<
const
unsigned
char
*>
(
id_ptr
),
id_size
);
}
}
void
zmq
::
mechanism_t
::
peer_
identity
(
msg_t
*
msg_
)
void
zmq
::
mechanism_t
::
peer_
routing_id
(
msg_t
*
msg_
)
{
{
const
int
rc
=
msg_
->
init_size
(
identity
.
size
());
const
int
rc
=
msg_
->
init_size
(
routing_id
.
size
());
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
memcpy
(
msg_
->
data
(),
identity
.
data
(),
identity
.
size
());
memcpy
(
msg_
->
data
(),
routing_id
.
data
(),
routing_id
.
size
());
msg_
->
set_flags
(
msg_t
::
identity
);
msg_
->
set_flags
(
msg_t
::
routing_id
);
}
}
void
zmq
::
mechanism_t
::
set_user_id
(
const
void
*
data_
,
size_t
size_
)
void
zmq
::
mechanism_t
::
set_user_id
(
const
void
*
data_
,
size_t
size_
)
...
@@ -132,12 +132,12 @@ size_t zmq::mechanism_t::add_basic_properties (unsigned char *buf,
...
@@ -132,12 +132,12 @@ size_t zmq::mechanism_t::add_basic_properties (unsigned char *buf,
ZMQ_MSG_PROPERTY_SOCKET_TYPE
,
socket_type
,
ZMQ_MSG_PROPERTY_SOCKET_TYPE
,
socket_type
,
strlen
(
socket_type
));
strlen
(
socket_type
));
// Add
identity
property
// Add
routing id
property
if
(
options
.
type
==
ZMQ_REQ
||
options
.
type
==
ZMQ_DEALER
if
(
options
.
type
==
ZMQ_REQ
||
options
.
type
==
ZMQ_DEALER
||
options
.
type
==
ZMQ_ROUTER
)
||
options
.
type
==
ZMQ_ROUTER
)
ptr
+=
add_property
(
ptr
,
buf_capacity
-
(
ptr
-
buf
),
ptr
+=
add_property
(
ptr
,
buf_capacity
-
(
ptr
-
buf
),
ZMQ_MSG_PROPERTY_
IDENTITY
,
options
.
identity
,
ZMQ_MSG_PROPERTY_
ROUTING_ID
,
options
.
routing_id
,
options
.
identity
_size
);
options
.
routing_id
_size
);
return
ptr
-
buf
;
return
ptr
-
buf
;
}
}
...
@@ -148,8 +148,8 @@ size_t zmq::mechanism_t::basic_properties_len() const
...
@@ -148,8 +148,8 @@ size_t zmq::mechanism_t::basic_properties_len() const
return
property_len
(
ZMQ_MSG_PROPERTY_SOCKET_TYPE
,
strlen
(
socket_type
))
return
property_len
(
ZMQ_MSG_PROPERTY_SOCKET_TYPE
,
strlen
(
socket_type
))
+
((
options
.
type
==
ZMQ_REQ
||
options
.
type
==
ZMQ_DEALER
+
((
options
.
type
==
ZMQ_REQ
||
options
.
type
==
ZMQ_DEALER
||
options
.
type
==
ZMQ_ROUTER
)
||
options
.
type
==
ZMQ_ROUTER
)
?
property_len
(
ZMQ_MSG_PROPERTY_
IDENTITY
,
?
property_len
(
ZMQ_MSG_PROPERTY_
ROUTING_ID
,
options
.
identity
_size
)
options
.
routing_id
_size
)
:
0
);
:
0
);
}
}
...
@@ -199,8 +199,8 @@ int zmq::mechanism_t::parse_metadata (const unsigned char *ptr_,
...
@@ -199,8 +199,8 @@ int zmq::mechanism_t::parse_metadata (const unsigned char *ptr_,
ptr_
+=
value_length
;
ptr_
+=
value_length
;
bytes_left
-=
value_length
;
bytes_left
-=
value_length
;
if
(
name
==
ZMQ_MSG_PROPERTY_
IDENTITY
&&
options
.
recv_identity
)
if
(
name
==
ZMQ_MSG_PROPERTY_
ROUTING_ID
&&
options
.
recv_routing_id
)
set_peer_
identity
(
value
,
value_length
);
set_peer_
routing_id
(
value
,
value_length
);
else
else
if
(
name
==
ZMQ_MSG_PROPERTY_SOCKET_TYPE
)
{
if
(
name
==
ZMQ_MSG_PROPERTY_SOCKET_TYPE
)
{
const
std
::
string
socket_type
((
char
*
)
value
,
value_length
);
const
std
::
string
socket_type
((
char
*
)
value
,
value_length
);
...
...
src/mechanism.hpp
View file @
08b01a51
...
@@ -74,9 +74,9 @@ namespace zmq
...
@@ -74,9 +74,9 @@ namespace zmq
// Returns the status of this mechanism.
// Returns the status of this mechanism.
virtual
status_t
status
()
const
=
0
;
virtual
status_t
status
()
const
=
0
;
void
set_peer_
identity
(
const
void
*
id_ptr
,
size_t
id_size
);
void
set_peer_
routing_id
(
const
void
*
id_ptr
,
size_t
id_size
);
void
peer_
identity
(
msg_t
*
msg_
);
void
peer_
routing_id
(
msg_t
*
msg_
);
void
set_user_id
(
const
void
*
user_id
,
size_t
size
);
void
set_user_id
(
const
void
*
user_id
,
size_t
size
);
...
@@ -138,7 +138,7 @@ namespace zmq
...
@@ -138,7 +138,7 @@ namespace zmq
private
:
private
:
blob_t
identity
;
blob_t
routing_id
;
blob_t
user_id
;
blob_t
user_id
;
...
...
src/metadata.cpp
View file @
08b01a51
...
@@ -39,8 +39,14 @@ zmq::metadata_t::metadata_t (const dict_t &dict) :
...
@@ -39,8 +39,14 @@ zmq::metadata_t::metadata_t (const dict_t &dict) :
const
char
*
zmq
::
metadata_t
::
get
(
const
std
::
string
&
property
)
const
const
char
*
zmq
::
metadata_t
::
get
(
const
std
::
string
&
property
)
const
{
{
dict_t
::
const_iterator
it
=
dict
.
find
(
property
);
dict_t
::
const_iterator
it
=
dict
.
find
(
property
);
if
(
it
==
dict
.
end
())
if
(
it
==
dict
.
end
())
{
/** \todo remove this when support for the deprecated name "Identity" is dropped */
if
(
property
==
"Identity"
)
return
get
(
ZMQ_MSG_PROPERTY_ROUTING_ID
);
return
NULL
;
return
NULL
;
}
else
else
return
it
->
second
.
c_str
();
return
it
->
second
.
c_str
();
}
}
...
...
src/msg.cpp
View file @
08b01a51
...
@@ -413,9 +413,9 @@ void zmq::msg_t::reset_metadata ()
...
@@ -413,9 +413,9 @@ void zmq::msg_t::reset_metadata ()
}
}
}
}
bool
zmq
::
msg_t
::
is_
identity
()
const
bool
zmq
::
msg_t
::
is_
routing_id
()
const
{
{
return
(
u
.
base
.
flags
&
identity
)
==
identity
;
return
(
u
.
base
.
flags
&
routing_id
)
==
routing_id
;
}
}
bool
zmq
::
msg_t
::
is_credential
()
const
bool
zmq
::
msg_t
::
is_credential
()
const
...
...
src/msg.hpp
View file @
08b01a51
...
@@ -79,7 +79,7 @@ namespace zmq
...
@@ -79,7 +79,7 @@ namespace zmq
more
=
1
,
// Followed by more parts
more
=
1
,
// Followed by more parts
command
=
2
,
// Command frame (see ZMTP spec)
command
=
2
,
// Command frame (see ZMTP spec)
credential
=
32
,
credential
=
32
,
identity
=
64
,
routing_id
=
64
,
shared
=
128
shared
=
128
};
};
...
@@ -109,7 +109,7 @@ namespace zmq
...
@@ -109,7 +109,7 @@ namespace zmq
metadata_t
*
metadata
()
const
;
metadata_t
*
metadata
()
const
;
void
set_metadata
(
metadata_t
*
metadata_
);
void
set_metadata
(
metadata_t
*
metadata_
);
void
reset_metadata
();
void
reset_metadata
();
bool
is_
identity
()
const
;
bool
is_
routing_id
()
const
;
bool
is_credential
()
const
;
bool
is_credential
()
const
;
bool
is_delimiter
()
const
;
bool
is_delimiter
()
const
;
bool
is_join
()
const
;
bool
is_join
()
const
;
...
...
src/options.cpp
View file @
08b01a51
...
@@ -48,7 +48,7 @@ zmq::options_t::options_t () :
...
@@ -48,7 +48,7 @@ zmq::options_t::options_t () :
sndhwm
(
1000
),
sndhwm
(
1000
),
rcvhwm
(
1000
),
rcvhwm
(
1000
),
affinity
(
0
),
affinity
(
0
),
identity
_size
(
0
),
routing_id
_size
(
0
),
rate
(
100
),
rate
(
100
),
recovery_ivl
(
10000
),
recovery_ivl
(
10000
),
multicast_hops
(
1
),
multicast_hops
(
1
),
...
@@ -70,7 +70,7 @@ zmq::options_t::options_t () :
...
@@ -70,7 +70,7 @@ zmq::options_t::options_t () :
immediate
(
0
),
immediate
(
0
),
filter
(
false
),
filter
(
false
),
invert_matching
(
false
),
invert_matching
(
false
),
recv_
identity
(
false
),
recv_
routing_id
(
false
),
raw_socket
(
false
),
raw_socket
(
false
),
raw_notify
(
true
),
raw_notify
(
true
),
tcp_keepalive
(
-
1
),
tcp_keepalive
(
-
1
),
...
@@ -166,11 +166,11 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
...
@@ -166,11 +166,11 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
}
}
break
;
break
;
case
ZMQ_
IDENTITY
:
case
ZMQ_
ROUTING_ID
:
//
Identity
is any binary string from 1 to 255 octets
//
Routing id
is any binary string from 1 to 255 octets
if
(
optvallen_
>
0
&&
optvallen_
<
256
)
{
if
(
optvallen_
>
0
&&
optvallen_
<
256
)
{
identity
_size
=
(
unsigned
char
)
optvallen_
;
routing_id
_size
=
(
unsigned
char
)
optvallen_
;
memcpy
(
identity
,
optval_
,
identity
_size
);
memcpy
(
routing_id
,
optval_
,
routing_id
_size
);
return
0
;
return
0
;
}
}
break
;
break
;
...
@@ -679,10 +679,10 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
...
@@ -679,10 +679,10 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
}
}
break
;
break
;
case
ZMQ_
IDENTITY
:
case
ZMQ_
ROUTING_ID
:
if
(
*
optvallen_
>=
identity
_size
)
{
if
(
*
optvallen_
>=
routing_id
_size
)
{
memcpy
(
optval_
,
identity
,
identity
_size
);
memcpy
(
optval_
,
routing_id
,
routing_id
_size
);
*
optvallen_
=
identity
_size
;
*
optvallen_
=
routing_id
_size
;
return
0
;
return
0
;
}
}
break
;
break
;
...
...
src/options.hpp
View file @
08b01a51
...
@@ -70,9 +70,9 @@ namespace zmq
...
@@ -70,9 +70,9 @@ namespace zmq
// I/O thread affinity.
// I/O thread affinity.
uint64_t
affinity
;
uint64_t
affinity
;
// Socket
identity
// Socket
routing id.
unsigned
char
identity
_size
;
unsigned
char
routing_id
_size
;
unsigned
char
identity
[
256
];
unsigned
char
routing_id
[
256
];
// Maximum transfer rate [kb/s]. Default 100kb/s.
// Maximum transfer rate [kb/s]. Default 100kb/s.
int
rate
;
int
rate
;
...
@@ -144,7 +144,7 @@ namespace zmq
...
@@ -144,7 +144,7 @@ namespace zmq
bool
invert_matching
;
bool
invert_matching
;
// If true, the identity message is forwarded to the socket.
// If true, the identity message is forwarded to the socket.
bool
recv_
identity
;
bool
recv_
routing_id
;
// if true, router socket accepts non-zmq tcp connections
// if true, router socket accepts non-zmq tcp connections
bool
raw_socket
;
bool
raw_socket
;
...
...
src/pipe.cpp
View file @
08b01a51
...
@@ -92,7 +92,7 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
...
@@ -92,7 +92,7 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
sink
(
NULL
),
sink
(
NULL
),
state
(
active
),
state
(
active
),
delay
(
true
),
delay
(
true
),
routing_id
(
0
),
server_socket_routing_id
(
0
),
conflate
(
conflate_
)
conflate
(
conflate_
)
{
{
}
}
...
@@ -115,24 +115,24 @@ void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
...
@@ -115,24 +115,24 @@ void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
sink
=
sink_
;
sink
=
sink_
;
}
}
void
zmq
::
pipe_t
::
set_
routing_id
(
uint32_t
routing_id_
)
void
zmq
::
pipe_t
::
set_
server_socket_routing_id
(
uint32_t
server_socket_
routing_id_
)
{
{
routing_id
=
routing_id_
;
server_socket_routing_id
=
server_socket_
routing_id_
;
}
}
uint32_t
zmq
::
pipe_t
::
get_routing_id
()
uint32_t
zmq
::
pipe_t
::
get_
server_socket_
routing_id
()
{
{
return
routing_id
;
return
server_socket_
routing_id
;
}
}
void
zmq
::
pipe_t
::
set_
identity
(
const
blob_t
&
identity
_
)
void
zmq
::
pipe_t
::
set_
router_socket_routing_id
(
const
blob_t
&
router_socket_routing_id
_
)
{
{
identity
=
identity
_
;
router_socket_routing_id
=
router_socket_routing_id
_
;
}
}
zmq
::
blob_t
zmq
::
pipe_t
::
get_
identity
()
zmq
::
blob_t
zmq
::
pipe_t
::
get_
routing_id
()
{
{
return
identity
;
return
router_socket_routing_id
;
}
}
zmq
::
blob_t
zmq
::
pipe_t
::
get_credential
()
const
zmq
::
blob_t
zmq
::
pipe_t
::
get_credential
()
const
...
@@ -194,7 +194,7 @@ read_message:
...
@@ -194,7 +194,7 @@ read_message:
return
false
;
return
false
;
}
}
if
(
!
(
msg_
->
flags
()
&
msg_t
::
more
)
&&
!
msg_
->
is_
identity
())
if
(
!
(
msg_
->
flags
()
&
msg_t
::
more
)
&&
!
msg_
->
is_
routing_id
())
msgs_read
++
;
msgs_read
++
;
if
(
lwm
>
0
&&
msgs_read
%
lwm
==
0
)
if
(
lwm
>
0
&&
msgs_read
%
lwm
==
0
)
...
@@ -224,9 +224,9 @@ bool zmq::pipe_t::write (msg_t *msg_)
...
@@ -224,9 +224,9 @@ bool zmq::pipe_t::write (msg_t *msg_)
return
false
;
return
false
;
bool
more
=
msg_
->
flags
()
&
msg_t
::
more
?
true
:
false
;
bool
more
=
msg_
->
flags
()
&
msg_t
::
more
?
true
:
false
;
const
bool
is_
identity
=
msg_
->
is_identity
();
const
bool
is_
routing_id
=
msg_
->
is_routing_id
();
outpipe
->
write
(
*
msg_
,
more
);
outpipe
->
write
(
*
msg_
,
more
);
if
(
!
more
&&
!
is_
identity
)
if
(
!
more
&&
!
is_
routing_id
)
msgs_written
++
;
msgs_written
++
;
return
true
;
return
true
;
...
...
src/pipe.hpp
View file @
08b01a51
...
@@ -85,12 +85,12 @@ namespace zmq
...
@@ -85,12 +85,12 @@ namespace zmq
void
set_event_sink
(
i_pipe_events
*
sink_
);
void
set_event_sink
(
i_pipe_events
*
sink_
);
// Pipe endpoint can store an routing ID to be used by its clients.
// Pipe endpoint can store an routing ID to be used by its clients.
void
set_routing_id
(
uint32_t
routing_id_
);
void
set_
server_socket_
routing_id
(
uint32_t
routing_id_
);
uint32_t
get_routing_id
();
uint32_t
get_
server_socket_
routing_id
();
// Pipe endpoint can store an opaque ID to be used by its clients.
// Pipe endpoint can store an opaque ID to be used by its clients.
void
set_
identity
(
const
blob_t
&
identity_
);
void
set_
router_socket_routing_id
(
const
blob_t
&
identity_
);
blob_t
get_
identity
();
blob_t
get_
routing_id
();
blob_t
get_credential
()
const
;
blob_t
get_credential
()
const
;
...
@@ -227,10 +227,10 @@ namespace zmq
...
@@ -227,10 +227,10 @@ namespace zmq
bool
delay
;
bool
delay
;
// Identity of the writer. Used uniquely by the reader side.
// Identity of the writer. Used uniquely by the reader side.
blob_t
identity
;
blob_t
router_socket_routing_id
;
// Identity of the writer. Used uniquely by the reader side.
// Identity of the writer. Used uniquely by the reader side.
int
routing_id
;
int
server_socket_
routing_id
;
// Pipe's credential.
// Pipe's credential.
blob_t
credential
;
blob_t
credential
;
...
...
src/req.cpp
View file @
08b01a51
...
@@ -75,7 +75,7 @@ int zmq::req_t::xsend (msg_t *msg_)
...
@@ -75,7 +75,7 @@ int zmq::req_t::xsend (msg_t *msg_)
message_begins
=
true
;
message_begins
=
true
;
}
}
// First part of the request is the request
identity
.
// First part of the request is the request
routing id
.
if
(
message_begins
)
{
if
(
message_begins
)
{
reply_pipe
=
NULL
;
reply_pipe
=
NULL
;
...
...
src/router.cpp
View file @
08b01a51
This diff is collapsed.
Click to expand it.
src/router.hpp
View file @
08b01a51
...
@@ -85,7 +85,7 @@ namespace zmq
...
@@ -85,7 +85,7 @@ namespace zmq
// If true, the receiver got the message part with
// If true, the receiver got the message part with
// the peer's identity.
// the peer's identity.
bool
identity
_sent
;
bool
routing_id
_sent
;
// Holds the prefetched identity.
// Holds the prefetched identity.
msg_t
prefetched_id
;
msg_t
prefetched_id
;
...
@@ -123,7 +123,7 @@ namespace zmq
...
@@ -123,7 +123,7 @@ namespace zmq
// Routing IDs are generated. It's a simple increment and wrap-over
// Routing IDs are generated. It's a simple increment and wrap-over
// algorithm. This value is the next ID to use (if not used already).
// algorithm. This value is the next ID to use (if not used already).
uint32_t
next_
r
id
;
uint32_t
next_
integral_routing_
id
;
// If true, report EAGAIN to the caller instead of silently dropping
// If true, report EAGAIN to the caller instead of silently dropping
// the message targeting an unknown peer.
// the message targeting an unknown peer.
...
...
src/server.cpp
View file @
08b01a51
...
@@ -38,7 +38,7 @@
...
@@ -38,7 +38,7 @@
zmq
::
server_t
::
server_t
(
class
ctx_t
*
parent_
,
uint32_t
tid_
,
int
sid_
)
:
zmq
::
server_t
::
server_t
(
class
ctx_t
*
parent_
,
uint32_t
tid_
,
int
sid_
)
:
socket_base_t
(
parent_
,
tid_
,
sid_
,
true
),
socket_base_t
(
parent_
,
tid_
,
sid_
,
true
),
next_rid
(
generate_random
())
next_r
outing_
id
(
generate_random
())
{
{
options
.
type
=
ZMQ_SERVER
;
options
.
type
=
ZMQ_SERVER
;
}
}
...
@@ -54,11 +54,11 @@ void zmq::server_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
...
@@ -54,11 +54,11 @@ void zmq::server_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
zmq_assert
(
pipe_
);
zmq_assert
(
pipe_
);
uint32_t
routing_id
=
next_rid
++
;
uint32_t
routing_id
=
next_r
outing_
id
++
;
if
(
!
routing_id
)
if
(
!
routing_id
)
routing_id
=
next_r
id
++
;
// Never use R
ID zero
routing_id
=
next_r
outing_id
++
;
// Never use Routing
ID zero
pipe_
->
set_routing_id
(
routing_id
);
pipe_
->
set_
server_socket_
routing_id
(
routing_id
);
// Add the record into output pipes lookup table
// Add the record into output pipes lookup table
outpipe_t
outpipe
=
{
pipe_
,
true
};
outpipe_t
outpipe
=
{
pipe_
,
true
};
bool
ok
=
outpipes
.
insert
(
outpipes_t
::
value_type
(
routing_id
,
outpipe
)).
second
;
bool
ok
=
outpipes
.
insert
(
outpipes_t
::
value_type
(
routing_id
,
outpipe
)).
second
;
...
@@ -69,7 +69,7 @@ void zmq::server_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
...
@@ -69,7 +69,7 @@ void zmq::server_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
void
zmq
::
server_t
::
xpipe_terminated
(
pipe_t
*
pipe_
)
void
zmq
::
server_t
::
xpipe_terminated
(
pipe_t
*
pipe_
)
{
{
outpipes_t
::
iterator
it
=
outpipes
.
find
(
pipe_
->
get_routing_id
());
outpipes_t
::
iterator
it
=
outpipes
.
find
(
pipe_
->
get_
server_socket_
routing_id
());
zmq_assert
(
it
!=
outpipes
.
end
());
zmq_assert
(
it
!=
outpipes
.
end
());
outpipes
.
erase
(
it
);
outpipes
.
erase
(
it
);
fq
.
pipe_terminated
(
pipe_
);
fq
.
pipe_terminated
(
pipe_
);
...
@@ -159,7 +159,7 @@ int zmq::server_t::xrecv (msg_t *msg_)
...
@@ -159,7 +159,7 @@ int zmq::server_t::xrecv (msg_t *msg_)
zmq_assert
(
pipe
!=
NULL
);
zmq_assert
(
pipe
!=
NULL
);
uint32_t
routing_id
=
pipe
->
get_routing_id
();
uint32_t
routing_id
=
pipe
->
get_
server_socket_
routing_id
();
msg_
->
set_routing_id
(
routing_id
);
msg_
->
set_routing_id
(
routing_id
);
return
0
;
return
0
;
...
...
src/server.hpp
View file @
08b01a51
...
@@ -85,7 +85,7 @@ namespace zmq
...
@@ -85,7 +85,7 @@ namespace zmq
// Routing IDs are generated. It's a simple increment and wrap-over
// Routing IDs are generated. It's a simple increment and wrap-over
// algorithm. This value is the next ID to use (if not used already).
// algorithm. This value is the next ID to use (if not used already).
uint32_t
next_rid
;
uint32_t
next_r
outing_
id
;
server_t
(
const
server_t
&
);
server_t
(
const
server_t
&
);
const
server_t
&
operator
=
(
const
server_t
&
);
const
server_t
&
operator
=
(
const
server_t
&
);
...
...
src/session_base.cpp
View file @
08b01a51
...
@@ -356,12 +356,12 @@ int zmq::session_base_t::zap_connect ()
...
@@ -356,12 +356,12 @@ int zmq::session_base_t::zap_connect ()
send_bind
(
peer
.
socket
,
new_pipes
[
1
],
false
);
send_bind
(
peer
.
socket
,
new_pipes
[
1
],
false
);
// Send empty
identity
if required by the peer.
// Send empty
routing id
if required by the peer.
if
(
peer
.
options
.
recv_
identity
)
{
if
(
peer
.
options
.
recv_
routing_id
)
{
msg_t
id
;
msg_t
id
;
rc
=
id
.
init
();
rc
=
id
.
init
();
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
id
.
set_flags
(
msg_t
::
identity
);
id
.
set_flags
(
msg_t
::
routing_id
);
bool
ok
=
zap_pipe
->
write
(
&
id
);
bool
ok
=
zap_pipe
->
write
(
&
id
);
zmq_assert
(
ok
);
zmq_assert
(
ok
);
zap_pipe
->
flush
();
zap_pipe
->
flush
();
...
...
src/socket_base.cpp
View file @
08b01a51
...
@@ -221,11 +221,11 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool
...
@@ -221,11 +221,11 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool
}
}
}
}
int
zmq
::
socket_base_t
::
get_peer_state
(
const
void
*
identity
,
int
zmq
::
socket_base_t
::
get_peer_state
(
const
void
*
routing_id_
,
size_t
identity_size
)
const
size_t
routing_id_size_
)
const
{
{
LIBZMQ_UNUSED
(
identity
);
LIBZMQ_UNUSED
(
routing_id_
);
LIBZMQ_UNUSED
(
identity_size
);
LIBZMQ_UNUSED
(
routing_id_size_
);
// Only ROUTER sockets support this
// Only ROUTER sockets support this
errno
=
ENOTSUP
;
errno
=
ENOTSUP
;
...
@@ -764,14 +764,14 @@ int zmq::socket_base_t::connect (const char *addr_)
...
@@ -764,14 +764,14 @@ int zmq::socket_base_t::connect (const char *addr_)
if
(
!
peer
.
socket
)
{
if
(
!
peer
.
socket
)
{
// The peer doesn't exist yet so we don't know whether
// The peer doesn't exist yet so we don't know whether
// to send the
identity
message or not. To resolve this,
// to send the
routing id
message or not. To resolve this,
// we always send our
identity
and drop it later if
// we always send our
routing id
and drop it later if
// the peer doesn't expect it.
// the peer doesn't expect it.
msg_t
id
;
msg_t
id
;
rc
=
id
.
init_size
(
options
.
identity
_size
);
rc
=
id
.
init_size
(
options
.
routing_id
_size
);
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
memcpy
(
id
.
data
(),
options
.
identity
,
options
.
identity
_size
);
memcpy
(
id
.
data
(),
options
.
routing_id
,
options
.
routing_id
_size
);
id
.
set_flags
(
msg_t
::
identity
);
id
.
set_flags
(
msg_t
::
routing_id
);
bool
written
=
new_pipes
[
0
]
->
write
(
&
id
);
bool
written
=
new_pipes
[
0
]
->
write
(
&
id
);
zmq_assert
(
written
);
zmq_assert
(
written
);
new_pipes
[
0
]
->
flush
();
new_pipes
[
0
]
->
flush
();
...
@@ -780,25 +780,25 @@ int zmq::socket_base_t::connect (const char *addr_)
...
@@ -780,25 +780,25 @@ int zmq::socket_base_t::connect (const char *addr_)
pend_connection
(
std
::
string
(
addr_
),
endpoint
,
new_pipes
);
pend_connection
(
std
::
string
(
addr_
),
endpoint
,
new_pipes
);
}
}
else
{
else
{
// If required, send the
identity
of the local socket to the peer.
// If required, send the
routing id
of the local socket to the peer.
if
(
peer
.
options
.
recv_
identity
)
{
if
(
peer
.
options
.
recv_
routing_id
)
{
msg_t
id
;
msg_t
id
;
rc
=
id
.
init_size
(
options
.
identity
_size
);
rc
=
id
.
init_size
(
options
.
routing_id
_size
);
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
memcpy
(
id
.
data
(),
options
.
identity
,
options
.
identity
_size
);
memcpy
(
id
.
data
(),
options
.
routing_id
,
options
.
routing_id
_size
);
id
.
set_flags
(
msg_t
::
identity
);
id
.
set_flags
(
msg_t
::
routing_id
);
bool
written
=
new_pipes
[
0
]
->
write
(
&
id
);
bool
written
=
new_pipes
[
0
]
->
write
(
&
id
);
zmq_assert
(
written
);
zmq_assert
(
written
);
new_pipes
[
0
]
->
flush
();
new_pipes
[
0
]
->
flush
();
}
}
// If required, send the
identity
of the peer to the local socket.
// If required, send the
routing id
of the peer to the local socket.
if
(
options
.
recv_
identity
)
{
if
(
options
.
recv_
routing_id
)
{
msg_t
id
;
msg_t
id
;
rc
=
id
.
init_size
(
peer
.
options
.
identity
_size
);
rc
=
id
.
init_size
(
peer
.
options
.
routing_id
_size
);
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
memcpy
(
id
.
data
(),
peer
.
options
.
identity
,
peer
.
options
.
identity
_size
);
memcpy
(
id
.
data
(),
peer
.
options
.
routing_id
,
peer
.
options
.
routing_id
_size
);
id
.
set_flags
(
msg_t
::
identity
);
id
.
set_flags
(
msg_t
::
routing_id
);
bool
written
=
new_pipes
[
1
]
->
write
(
&
id
);
bool
written
=
new_pipes
[
1
]
->
write
(
&
id
);
zmq_assert
(
written
);
zmq_assert
(
written
);
new_pipes
[
1
]
->
flush
();
new_pipes
[
1
]
->
flush
();
...
@@ -1591,9 +1591,9 @@ void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
...
@@ -1591,9 +1591,9 @@ void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
void
zmq
::
socket_base_t
::
extract_flags
(
msg_t
*
msg_
)
void
zmq
::
socket_base_t
::
extract_flags
(
msg_t
*
msg_
)
{
{
// Test whether
IDENTITY
flag is valid for this socket type.
// Test whether
routing_id
flag is valid for this socket type.
if
(
unlikely
(
msg_
->
flags
()
&
msg_t
::
identity
))
if
(
unlikely
(
msg_
->
flags
()
&
msg_t
::
routing_id
))
zmq_assert
(
options
.
recv_
identity
);
zmq_assert
(
options
.
recv_
routing_id
);
// Remove MORE flag.
// Remove MORE flag.
rcvmore
=
msg_
->
flags
()
&
msg_t
::
more
?
true
:
false
;
rcvmore
=
msg_
->
flags
()
&
msg_t
::
more
?
true
:
false
;
...
...
src/socket_base.hpp
View file @
08b01a51
...
@@ -186,7 +186,7 @@ namespace zmq
...
@@ -186,7 +186,7 @@ namespace zmq
void
process_destroy
();
void
process_destroy
();
// Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types
// Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types
std
::
string
connect_rid
;
std
::
string
connect_r
outing_
id
;
private
:
private
:
// test if event should be sent and then dispatch it
// test if event should be sent and then dispatch it
...
...
src/stream.cpp
View file @
08b01a51
...
@@ -39,22 +39,22 @@
...
@@ -39,22 +39,22 @@
zmq
::
stream_t
::
stream_t
(
class
ctx_t
*
parent_
,
uint32_t
tid_
,
int
sid_
)
:
zmq
::
stream_t
::
stream_t
(
class
ctx_t
*
parent_
,
uint32_t
tid_
,
int
sid_
)
:
socket_base_t
(
parent_
,
tid_
,
sid_
),
socket_base_t
(
parent_
,
tid_
,
sid_
),
prefetched
(
false
),
prefetched
(
false
),
identity
_sent
(
false
),
routing_id
_sent
(
false
),
current_out
(
NULL
),
current_out
(
NULL
),
more_out
(
false
),
more_out
(
false
),
next_
r
id
(
generate_random
())
next_
integral_routing_
id
(
generate_random
())
{
{
options
.
type
=
ZMQ_STREAM
;
options
.
type
=
ZMQ_STREAM
;
options
.
raw_socket
=
true
;
options
.
raw_socket
=
true
;
prefetched_id
.
init
();
prefetched_
routing_
id
.
init
();
prefetched_msg
.
init
();
prefetched_msg
.
init
();
}
}
zmq
::
stream_t
::~
stream_t
()
zmq
::
stream_t
::~
stream_t
()
{
{
zmq_assert
(
outpipes
.
empty
());
zmq_assert
(
outpipes
.
empty
());
prefetched_id
.
close
();
prefetched_
routing_
id
.
close
();
prefetched_msg
.
close
();
prefetched_msg
.
close
();
}
}
...
@@ -70,7 +70,7 @@ void zmq::stream_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
...
@@ -70,7 +70,7 @@ void zmq::stream_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
void
zmq
::
stream_t
::
xpipe_terminated
(
pipe_t
*
pipe_
)
void
zmq
::
stream_t
::
xpipe_terminated
(
pipe_t
*
pipe_
)
{
{
outpipes_t
::
iterator
it
=
outpipes
.
find
(
pipe_
->
get_
identity
());
outpipes_t
::
iterator
it
=
outpipes
.
find
(
pipe_
->
get_
routing_id
());
zmq_assert
(
it
!=
outpipes
.
end
());
zmq_assert
(
it
!=
outpipes
.
end
());
outpipes
.
erase
(
it
);
outpipes
.
erase
(
it
);
fq
.
pipe_terminated
(
pipe_
);
fq
.
pipe_terminated
(
pipe_
);
...
@@ -107,10 +107,10 @@ int zmq::stream_t::xsend (msg_t *msg_)
...
@@ -107,10 +107,10 @@ int zmq::stream_t::xsend (msg_t *msg_)
// TODO: The connections should be killed instead.
// TODO: The connections should be killed instead.
if
(
msg_
->
flags
()
&
msg_t
::
more
)
{
if
(
msg_
->
flags
()
&
msg_t
::
more
)
{
// Find the pipe associated with the
identity
stored in the prefix.
// Find the pipe associated with the
routing id
stored in the prefix.
// If there's no such pipe return an error
// If there's no such pipe return an error
blob_t
identity
((
unsigned
char
*
)
msg_
->
data
(),
msg_
->
size
());
blob_t
routing_id
((
unsigned
char
*
)
msg_
->
data
(),
msg_
->
size
());
outpipes_t
::
iterator
it
=
outpipes
.
find
(
identity
);
outpipes_t
::
iterator
it
=
outpipes
.
find
(
routing_id
);
if
(
it
!=
outpipes
.
end
())
{
if
(
it
!=
outpipes
.
end
())
{
current_out
=
it
->
second
.
pipe
;
current_out
=
it
->
second
.
pipe
;
...
@@ -183,9 +183,9 @@ int zmq::stream_t::xsetsockopt (int option_, const void *optval_,
...
@@ -183,9 +183,9 @@ int zmq::stream_t::xsetsockopt (int option_, const void *optval_,
if
(
is_int
)
memcpy
(
&
value
,
optval_
,
sizeof
(
int
));
if
(
is_int
)
memcpy
(
&
value
,
optval_
,
sizeof
(
int
));
switch
(
option_
)
{
switch
(
option_
)
{
case
ZMQ_CONNECT_RID
:
case
ZMQ_CONNECT_R
OUTING_
ID
:
if
(
optval_
&&
optvallen_
)
{
if
(
optval_
&&
optvallen_
)
{
connect_rid
.
assign
((
char
*
)
optval_
,
optvallen_
);
connect_r
outing_
id
.
assign
((
char
*
)
optval_
,
optvallen_
);
return
0
;
return
0
;
}
}
break
;
break
;
...
@@ -207,10 +207,10 @@ int zmq::stream_t::xsetsockopt (int option_, const void *optval_,
...
@@ -207,10 +207,10 @@ int zmq::stream_t::xsetsockopt (int option_, const void *optval_,
int
zmq
::
stream_t
::
xrecv
(
msg_t
*
msg_
)
int
zmq
::
stream_t
::
xrecv
(
msg_t
*
msg_
)
{
{
if
(
prefetched
)
{
if
(
prefetched
)
{
if
(
!
identity
_sent
)
{
if
(
!
routing_id
_sent
)
{
int
rc
=
msg_
->
move
(
prefetched_id
);
int
rc
=
msg_
->
move
(
prefetched_
routing_
id
);
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
identity
_sent
=
true
;
routing_id
_sent
=
true
;
}
}
else
{
else
{
int
rc
=
msg_
->
move
(
prefetched_msg
);
int
rc
=
msg_
->
move
(
prefetched_msg
);
...
@@ -231,10 +231,10 @@ int zmq::stream_t::xrecv (msg_t *msg_)
...
@@ -231,10 +231,10 @@ int zmq::stream_t::xrecv (msg_t *msg_)
// We have received a frame with TCP data.
// We have received a frame with TCP data.
// Rather than sending this frame, we keep it in prefetched
// Rather than sending this frame, we keep it in prefetched
// buffer and send a frame with peer's ID.
// buffer and send a frame with peer's ID.
blob_t
identity
=
pipe
->
get_identity
();
blob_t
routing_id
=
pipe
->
get_routing_id
();
rc
=
msg_
->
close
();
rc
=
msg_
->
close
();
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
rc
=
msg_
->
init_size
(
identity
.
size
());
rc
=
msg_
->
init_size
(
routing_id
.
size
());
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
// forward metadata (if any)
// forward metadata (if any)
...
@@ -242,11 +242,11 @@ int zmq::stream_t::xrecv (msg_t *msg_)
...
@@ -242,11 +242,11 @@ int zmq::stream_t::xrecv (msg_t *msg_)
if
(
metadata
)
if
(
metadata
)
msg_
->
set_metadata
(
metadata
);
msg_
->
set_metadata
(
metadata
);
memcpy
(
msg_
->
data
(),
identity
.
data
(),
identity
.
size
());
memcpy
(
msg_
->
data
(),
routing_id
.
data
(),
routing_id
.
size
());
msg_
->
set_flags
(
msg_t
::
more
);
msg_
->
set_flags
(
msg_t
::
more
);
prefetched
=
true
;
prefetched
=
true
;
identity
_sent
=
true
;
routing_id
_sent
=
true
;
return
0
;
return
0
;
}
}
...
@@ -267,20 +267,20 @@ bool zmq::stream_t::xhas_in ()
...
@@ -267,20 +267,20 @@ bool zmq::stream_t::xhas_in ()
zmq_assert
(
pipe
!=
NULL
);
zmq_assert
(
pipe
!=
NULL
);
zmq_assert
((
prefetched_msg
.
flags
()
&
msg_t
::
more
)
==
0
);
zmq_assert
((
prefetched_msg
.
flags
()
&
msg_t
::
more
)
==
0
);
blob_t
identity
=
pipe
->
get_identity
();
blob_t
routing_id
=
pipe
->
get_routing_id
();
rc
=
prefetched_
id
.
init_size
(
identity
.
size
());
rc
=
prefetched_
routing_id
.
init_size
(
routing_id
.
size
());
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
// forward metadata (if any)
// forward metadata (if any)
metadata_t
*
metadata
=
prefetched_msg
.
metadata
();
metadata_t
*
metadata
=
prefetched_msg
.
metadata
();
if
(
metadata
)
if
(
metadata
)
prefetched_id
.
set_metadata
(
metadata
);
prefetched_
routing_
id
.
set_metadata
(
metadata
);
memcpy
(
prefetched_
id
.
data
(),
identity
.
data
(),
identity
.
size
());
memcpy
(
prefetched_
routing_id
.
data
(),
routing_id
.
data
(),
routing_id
.
size
());
prefetched_id
.
set_flags
(
msg_t
::
more
);
prefetched_
routing_
id
.
set_flags
(
msg_t
::
more
);
prefetched
=
true
;
prefetched
=
true
;
identity
_sent
=
false
;
routing_id
_sent
=
false
;
return
true
;
return
true
;
}
}
...
@@ -295,27 +295,27 @@ bool zmq::stream_t::xhas_out ()
...
@@ -295,27 +295,27 @@ bool zmq::stream_t::xhas_out ()
void
zmq
::
stream_t
::
identify_peer
(
pipe_t
*
pipe_
)
void
zmq
::
stream_t
::
identify_peer
(
pipe_t
*
pipe_
)
{
{
// Always assign
identity
for raw-socket
// Always assign
routing id
for raw-socket
unsigned
char
buffer
[
5
];
unsigned
char
buffer
[
5
];
buffer
[
0
]
=
0
;
buffer
[
0
]
=
0
;
blob_t
identity
;
blob_t
routing_id
;
if
(
connect_rid
.
length
())
{
if
(
connect_r
outing_
id
.
length
())
{
identity
=
blob_t
((
unsigned
char
*
)
connect_r
id
.
c_str
(),
routing_id
=
blob_t
((
unsigned
char
*
)
connect_routing_
id
.
c_str
(),
connect_rid
.
length
());
connect_r
outing_
id
.
length
());
connect_rid
.
clear
();
connect_r
outing_
id
.
clear
();
outpipes_t
::
iterator
it
=
outpipes
.
find
(
identity
);
outpipes_t
::
iterator
it
=
outpipes
.
find
(
routing_id
);
zmq_assert
(
it
==
outpipes
.
end
());
zmq_assert
(
it
==
outpipes
.
end
());
}
}
else
{
else
{
put_uint32
(
buffer
+
1
,
next_
r
id
++
);
put_uint32
(
buffer
+
1
,
next_
integral_routing_
id
++
);
identity
=
blob_t
(
buffer
,
sizeof
buffer
);
routing_id
=
blob_t
(
buffer
,
sizeof
buffer
);
memcpy
(
options
.
identity
,
identity
.
data
(),
identity
.
size
());
memcpy
(
options
.
routing_id
,
routing_id
.
data
(),
routing_id
.
size
());
options
.
identity_size
=
(
unsigned
char
)
identity
.
size
();
options
.
routing_id_size
=
(
unsigned
char
)
routing_id
.
size
();
}
}
pipe_
->
set_
identity
(
identity
);
pipe_
->
set_
router_socket_routing_id
(
routing_id
);
// Add the record into output pipes lookup table
// Add the record into output pipes lookup table
outpipe_t
outpipe
=
{
pipe_
,
true
};
outpipe_t
outpipe
=
{
pipe_
,
true
};
const
bool
ok
=
outpipes
.
insert
(
const
bool
ok
=
outpipes
.
insert
(
outpipes_t
::
value_type
(
identity
,
outpipe
)).
second
;
outpipes_t
::
value_type
(
routing_id
,
outpipe
)).
second
;
zmq_assert
(
ok
);
zmq_assert
(
ok
);
}
}
src/stream.hpp
View file @
08b01a51
...
@@ -70,10 +70,10 @@ namespace zmq
...
@@ -70,10 +70,10 @@ namespace zmq
// If true, the receiver got the message part with
// If true, the receiver got the message part with
// the peer's identity.
// the peer's identity.
bool
identity
_sent
;
bool
routing_id
_sent
;
// Holds the prefetched identity.
// Holds the prefetched identity.
msg_t
prefetched_id
;
msg_t
prefetched_
routing_
id
;
// Holds the prefetched message.
// Holds the prefetched message.
msg_t
prefetched_msg
;
msg_t
prefetched_msg
;
...
@@ -96,7 +96,7 @@ namespace zmq
...
@@ -96,7 +96,7 @@ namespace zmq
// Routing IDs are generated. It's a simple increment and wrap-over
// Routing IDs are generated. It's a simple increment and wrap-over
// algorithm. This value is the next ID to use (if not used already).
// algorithm. This value is the next ID to use (if not used already).
uint32_t
next_
r
id
;
uint32_t
next_
integral_routing_
id
;
stream_t
(
const
stream_t
&
);
stream_t
(
const
stream_t
&
);
const
stream_t
&
operator
=
(
const
stream_t
&
);
const
stream_t
&
operator
=
(
const
stream_t
&
);
...
...
src/stream_engine.cpp
View file @
08b01a51
...
@@ -81,8 +81,8 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
...
@@ -81,8 +81,8 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
options
(
options_
),
options
(
options_
),
endpoint
(
endpoint_
),
endpoint
(
endpoint_
),
plugged
(
false
),
plugged
(
false
),
next_msg
(
&
stream_engine_t
::
identity
_msg
),
next_msg
(
&
stream_engine_t
::
routing_id
_msg
),
process_msg
(
&
stream_engine_t
::
process_
identity
_msg
),
process_msg
(
&
stream_engine_t
::
process_
routing_id
_msg
),
io_error
(
false
),
io_error
(
false
),
subscription_required
(
false
),
subscription_required
(
false
),
mechanism
(
NULL
),
mechanism
(
NULL
),
...
@@ -229,11 +229,11 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
...
@@ -229,11 +229,11 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
// start optional timer, to prevent handshake hanging on no input
// start optional timer, to prevent handshake hanging on no input
set_handshake_timer
();
set_handshake_timer
();
// Send the 'length' and 'flags' fields of the
identity
message.
// Send the 'length' and 'flags' fields of the
routing id
message.
// The 'length' field is encoded in the long format.
// The 'length' field is encoded in the long format.
outpos
=
greeting_send
;
outpos
=
greeting_send
;
outpos
[
outsize
++
]
=
0xff
;
outpos
[
outsize
++
]
=
0xff
;
put_uint64
(
&
outpos
[
outsize
],
options
.
identity
_size
+
1
);
put_uint64
(
&
outpos
[
outsize
],
options
.
routing_id
_size
+
1
);
outsize
+=
8
;
outsize
+=
8
;
outpos
[
outsize
++
]
=
0x7f
;
outpos
[
outsize
++
]
=
0x7f
;
}
}
...
@@ -520,7 +520,7 @@ bool zmq::stream_engine_t::handshake ()
...
@@ -520,7 +520,7 @@ bool zmq::stream_engine_t::handshake ()
// Inspect the right-most bit of the 10th byte (which coincides
// Inspect the right-most bit of the 10th byte (which coincides
// with the 'flags' field if a regular message was sent).
// with the 'flags' field if a regular message was sent).
// Zero indicates this is a header of
identity
message
// Zero indicates this is a header of
a routing id
message
// (i.e. the peer is using the unversioned protocol).
// (i.e. the peer is using the unversioned protocol).
if
(
!
(
greeting_recv
[
9
]
&
0x01
))
if
(
!
(
greeting_recv
[
9
]
&
0x01
))
break
;
break
;
...
@@ -575,7 +575,7 @@ bool zmq::stream_engine_t::handshake ()
...
@@ -575,7 +575,7 @@ bool zmq::stream_engine_t::handshake ()
const
size_t
revision_pos
=
10
;
const
size_t
revision_pos
=
10
;
// Is the peer using ZMTP/1.0 with no revision number?
// Is the peer using ZMTP/1.0 with no revision number?
// If so, we send and receive rest of
identity
message
// If so, we send and receive rest of
routing id
message
if
(
greeting_recv
[
0
]
!=
0xff
||
!
(
greeting_recv
[
9
]
&
0x01
))
{
if
(
greeting_recv
[
0
]
!=
0xff
||
!
(
greeting_recv
[
9
]
&
0x01
))
{
if
(
session
->
zap_enabled
())
{
if
(
session
->
zap_enabled
())
{
// reject ZMTP 1.0 connections if ZAP is enabled
// reject ZMTP 1.0 connections if ZAP is enabled
...
@@ -593,14 +593,14 @@ bool zmq::stream_engine_t::handshake ()
...
@@ -593,14 +593,14 @@ bool zmq::stream_engine_t::handshake ()
// Since there is no way to tell the encoder to
// Since there is no way to tell the encoder to
// skip the message header, we simply throw that
// skip the message header, we simply throw that
// header data away.
// header data away.
const
size_t
header_size
=
options
.
identity
_size
+
1
>=
255
?
10
:
2
;
const
size_t
header_size
=
options
.
routing_id
_size
+
1
>=
255
?
10
:
2
;
unsigned
char
tmp
[
10
],
*
bufferp
=
tmp
;
unsigned
char
tmp
[
10
],
*
bufferp
=
tmp
;
// Prepare the
identity
message and load it into encoder.
// Prepare the
routing id
message and load it into encoder.
// Then consume bytes we have already sent to the peer.
// Then consume bytes we have already sent to the peer.
const
int
rc
=
tx_msg
.
init_size
(
options
.
identity
_size
);
const
int
rc
=
tx_msg
.
init_size
(
options
.
routing_id
_size
);
zmq_assert
(
rc
==
0
);
zmq_assert
(
rc
==
0
);
memcpy
(
tx_msg
.
data
(),
options
.
identity
,
options
.
identity
_size
);
memcpy
(
tx_msg
.
data
(),
options
.
routing_id
,
options
.
routing_id
_size
);
encoder
->
load_msg
(
&
tx_msg
);
encoder
->
load_msg
(
&
tx_msg
);
size_t
buffer_size
=
encoder
->
encode
(
&
bufferp
,
header_size
);
size_t
buffer_size
=
encoder
->
encode
(
&
bufferp
,
header_size
);
zmq_assert
(
buffer_size
==
header_size
);
zmq_assert
(
buffer_size
==
header_size
);
...
@@ -615,12 +615,12 @@ bool zmq::stream_engine_t::handshake ()
...
@@ -615,12 +615,12 @@ bool zmq::stream_engine_t::handshake ()
if
(
options
.
type
==
ZMQ_PUB
||
options
.
type
==
ZMQ_XPUB
)
if
(
options
.
type
==
ZMQ_PUB
||
options
.
type
==
ZMQ_XPUB
)
subscription_required
=
true
;
subscription_required
=
true
;
// We are sending our
identity
now and the next message
// We are sending our
routing id
now and the next message
// will come from the socket.
// will come from the socket.
next_msg
=
&
stream_engine_t
::
pull_msg_from_session
;
next_msg
=
&
stream_engine_t
::
pull_msg_from_session
;
// We are expecting
identity
message.
// We are expecting
routing id
message.
process_msg
=
&
stream_engine_t
::
process_
identity
_msg
;
process_msg
=
&
stream_engine_t
::
process_
routing_id
_msg
;
}
}
else
else
if
(
greeting_recv
[
revision_pos
]
==
ZMTP_1_0
)
{
if
(
greeting_recv
[
revision_pos
]
==
ZMTP_1_0
)
{
...
@@ -729,20 +729,20 @@ bool zmq::stream_engine_t::handshake ()
...
@@ -729,20 +729,20 @@ bool zmq::stream_engine_t::handshake ()
return
true
;
return
true
;
}
}
int
zmq
::
stream_engine_t
::
identity
_msg
(
msg_t
*
msg_
)
int
zmq
::
stream_engine_t
::
routing_id
_msg
(
msg_t
*
msg_
)
{
{
int
rc
=
msg_
->
init_size
(
options
.
identity
_size
);
int
rc
=
msg_
->
init_size
(
options
.
routing_id
_size
);
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
if
(
options
.
identity
_size
>
0
)
if
(
options
.
routing_id
_size
>
0
)
memcpy
(
msg_
->
data
(),
options
.
identity
,
options
.
identity
_size
);
memcpy
(
msg_
->
data
(),
options
.
routing_id
,
options
.
routing_id
_size
);
next_msg
=
&
stream_engine_t
::
pull_msg_from_session
;
next_msg
=
&
stream_engine_t
::
pull_msg_from_session
;
return
0
;
return
0
;
}
}
int
zmq
::
stream_engine_t
::
process_
identity
_msg
(
msg_t
*
msg_
)
int
zmq
::
stream_engine_t
::
process_
routing_id
_msg
(
msg_t
*
msg_
)
{
{
if
(
options
.
recv_
identity
)
{
if
(
options
.
recv_
routing_id
)
{
msg_
->
set_flags
(
msg_t
::
identity
);
msg_
->
set_flags
(
msg_t
::
routing_id
);
int
rc
=
session
->
push_msg
(
msg_
);
int
rc
=
session
->
push_msg
(
msg_
);
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
}
}
...
@@ -839,14 +839,14 @@ void zmq::stream_engine_t::mechanism_ready ()
...
@@ -839,14 +839,14 @@ void zmq::stream_engine_t::mechanism_ready ()
has_heartbeat_timer
=
true
;
has_heartbeat_timer
=
true
;
}
}
if
(
options
.
recv_
identity
)
{
if
(
options
.
recv_
routing_id
)
{
msg_t
identity
;
msg_t
routing_id
;
mechanism
->
peer_
identity
(
&
identity
);
mechanism
->
peer_
routing_id
(
&
routing_id
);
const
int
rc
=
session
->
push_msg
(
&
identity
);
const
int
rc
=
session
->
push_msg
(
&
routing_id
);
if
(
rc
==
-
1
&&
errno
==
EAGAIN
)
{
if
(
rc
==
-
1
&&
errno
==
EAGAIN
)
{
// If the write is failing at this stage with
// If the write is failing at this stage with
// an EAGAIN the pipe must be being shut down,
// an EAGAIN the pipe must be being shut down,
// so we can just bail out of the
identity
set.
// so we can just bail out of the
routing id
set.
return
;
return
;
}
}
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
...
...
src/stream_engine.hpp
View file @
08b01a51
...
@@ -99,8 +99,8 @@ namespace zmq
...
@@ -99,8 +99,8 @@ namespace zmq
// Detects the protocol used by the peer.
// Detects the protocol used by the peer.
bool
handshake
();
bool
handshake
();
int
identity
_msg
(
msg_t
*
msg_
);
int
routing_id
_msg
(
msg_t
*
msg_
);
int
process_
identity
_msg
(
msg_t
*
msg_
);
int
process_
routing_id
_msg
(
msg_t
*
msg_
);
int
next_handshake_command
(
msg_t
*
msg
);
int
next_handshake_command
(
msg_t
*
msg
);
int
process_handshake_command
(
msg_t
*
msg
);
int
process_handshake_command
(
msg_t
*
msg
);
...
...
src/zap_client.cpp
View file @
08b01a51
...
@@ -104,10 +104,10 @@ void zap_client_t::send_zap_request (const char *mechanism,
...
@@ -104,10 +104,10 @@ void zap_client_t::send_zap_request (const char *mechanism,
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
//
Identity
frame
//
Routing id
frame
rc
=
msg
.
init_size
(
options
.
identity
_size
);
rc
=
msg
.
init_size
(
options
.
routing_id
_size
);
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
memcpy
(
msg
.
data
(),
options
.
identity
,
options
.
identity
_size
);
memcpy
(
msg
.
data
(),
options
.
routing_id
,
options
.
routing_id
_size
);
msg
.
set_flags
(
msg_t
::
more
);
msg
.
set_flags
(
msg_t
::
more
);
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
...
...
src/zmq.cpp
View file @
08b01a51
...
@@ -1359,14 +1359,14 @@ int zmq_poller_wait_all (void *poller_, zmq_poller_event_t *events_, int n_event
...
@@ -1359,14 +1359,14 @@ int zmq_poller_wait_all (void *poller_, zmq_poller_event_t *events_, int n_event
// Peer-specific state
// Peer-specific state
int
zmq_socket_get_peer_state
(
void
*
s_
,
int
zmq_socket_get_peer_state
(
void
*
s_
,
const
void
*
identity
,
const
void
*
routing_id_
,
size_t
identity_size
)
size_t
routing_id_size_
)
{
{
zmq
::
socket_base_t
*
s
=
as_socket_base_t
(
s_
);
zmq
::
socket_base_t
*
s
=
as_socket_base_t
(
s_
);
if
(
!
s
)
if
(
!
s
)
return
-
1
;
return
-
1
;
return
s
->
get_peer_state
(
identity
,
identity_size
);
return
s
->
get_peer_state
(
routing_id_
,
routing_id_size_
);
}
}
// Timers
// Timers
...
...
src/zmq_draft.h
View file @
08b01a51
...
@@ -142,8 +142,8 @@ int zmq_poller_remove_fd (void *poller, int fd);
...
@@ -142,8 +142,8 @@ int zmq_poller_remove_fd (void *poller, int fd);
#endif
#endif
int
zmq_socket_get_peer_state
(
void
*
socket
,
int
zmq_socket_get_peer_state
(
void
*
socket
,
const
void
*
identity
,
const
void
*
routing_id
,
size_t
identity
_size
);
size_t
routing_id
_size
);
/******************************************************************************/
/******************************************************************************/
/* Scheduling timers */
/* Scheduling timers */
...
...
tests/test_connect_rid.cpp
View file @
08b01a51
...
@@ -61,13 +61,13 @@ void test_stream_2_stream(){
...
@@ -61,13 +61,13 @@ void test_stream_2_stream(){
assert
(
0
==
ret
);
assert
(
0
==
ret
);
// Do the connection.
// Do the connection.
ret
=
zmq_setsockopt
(
rconn1
,
ZMQ_CONNECT_RID
,
"conn1"
,
6
);
ret
=
zmq_setsockopt
(
rconn1
,
ZMQ_CONNECT_R
OUTING_
ID
,
"conn1"
,
6
);
assert
(
0
==
ret
);
assert
(
0
==
ret
);
ret
=
zmq_connect
(
rconn1
,
my_endpoint
);
ret
=
zmq_connect
(
rconn1
,
my_endpoint
);
/* Uncomment to test assert on duplicate rid.
/* Uncomment to test assert on duplicate r
outing
id.
// Test duplicate connect attempt.
// Test duplicate connect attempt.
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6);
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_R
OUTING_
ID, "conn1", 6);
assert (0 == ret);
assert (0 == ret);
ret = zmq_connect (rconn1, bindip);
ret = zmq_connect (rconn1, bindip);
assert (0 == ret);
assert (0 == ret);
...
@@ -126,18 +126,18 @@ void test_router_2_router(bool named){
...
@@ -126,18 +126,18 @@ void test_router_2_router(bool named){
// If we're in named mode, set some identities.
// If we're in named mode, set some identities.
if
(
named
)
{
if
(
named
)
{
ret
=
zmq_setsockopt
(
rbind
,
ZMQ_
IDENTITY
,
"X"
,
1
);
ret
=
zmq_setsockopt
(
rbind
,
ZMQ_
ROUTING_ID
,
"X"
,
1
);
ret
=
zmq_setsockopt
(
rconn1
,
ZMQ_
IDENTITY
,
"Y"
,
1
);
ret
=
zmq_setsockopt
(
rconn1
,
ZMQ_
ROUTING_ID
,
"Y"
,
1
);
}
}
// Make call to connect using a connect_rid.
// Make call to connect using a connect_r
outing_
id.
ret
=
zmq_setsockopt
(
rconn1
,
ZMQ_CONNECT_RID
,
"conn1"
,
6
);
ret
=
zmq_setsockopt
(
rconn1
,
ZMQ_CONNECT_R
OUTING_
ID
,
"conn1"
,
6
);
assert
(
0
==
ret
);
assert
(
0
==
ret
);
ret
=
zmq_connect
(
rconn1
,
my_endpoint
);
ret
=
zmq_connect
(
rconn1
,
my_endpoint
);
assert
(
0
==
ret
);
assert
(
0
==
ret
);
/* Uncomment to test assert on duplicate rid
/* Uncomment to test assert on duplicate r
outing
id
// Test duplicate connect attempt.
// Test duplicate connect attempt.
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6);
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_R
OUTING_
ID, "conn1", 6);
assert (0 == ret);
assert (0 == ret);
ret = zmq_connect (rconn1, bindip);
ret = zmq_connect (rconn1, bindip);
assert (0 == ret);
assert (0 == ret);
...
...
tests/test_issue_566.cpp
View file @
08b01a51
...
@@ -62,7 +62,7 @@ int main (void)
...
@@ -62,7 +62,7 @@ int main (void)
void
*
dealer
=
zmq_socket
(
ctx2
,
ZMQ_DEALER
);
void
*
dealer
=
zmq_socket
(
ctx2
,
ZMQ_DEALER
);
char
identity
[
10
];
char
identity
[
10
];
sprintf
(
identity
,
"%09d"
,
cycle
);
sprintf
(
identity
,
"%09d"
,
cycle
);
rc
=
zmq_setsockopt
(
dealer
,
ZMQ_
IDENTITY
,
identity
,
10
);
rc
=
zmq_setsockopt
(
dealer
,
ZMQ_
ROUTING_ID
,
identity
,
10
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
int
rcvtimeo
=
1000
;
int
rcvtimeo
=
1000
;
rc
=
zmq_setsockopt
(
dealer
,
ZMQ_RCVTIMEO
,
&
rcvtimeo
,
sizeof
(
int
));
rc
=
zmq_setsockopt
(
dealer
,
ZMQ_RCVTIMEO
,
&
rcvtimeo
,
sizeof
(
int
));
...
...
tests/test_probe_router.cpp
View file @
08b01a51
...
@@ -48,7 +48,7 @@ int main (void)
...
@@ -48,7 +48,7 @@ int main (void)
// Create client and connect to server, doing a probe
// Create client and connect to server, doing a probe
void
*
client
=
zmq_socket
(
ctx
,
ZMQ_ROUTER
);
void
*
client
=
zmq_socket
(
ctx
,
ZMQ_ROUTER
);
assert
(
client
);
assert
(
client
);
rc
=
zmq_setsockopt
(
client
,
ZMQ_
IDENTITY
,
"X"
,
1
);
rc
=
zmq_setsockopt
(
client
,
ZMQ_
ROUTING_ID
,
"X"
,
1
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
int
probe
=
1
;
int
probe
=
1
;
rc
=
zmq_setsockopt
(
client
,
ZMQ_PROBE_ROUTER
,
&
probe
,
sizeof
(
probe
));
rc
=
zmq_setsockopt
(
client
,
ZMQ_PROBE_ROUTER
,
&
probe
,
sizeof
(
probe
));
...
...
tests/test_router_handover.cpp
View file @
08b01a51
...
@@ -53,7 +53,7 @@ int main (void)
...
@@ -53,7 +53,7 @@ int main (void)
// Create dealer called "X" and connect it to our router
// Create dealer called "X" and connect it to our router
void
*
dealer_one
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
void
*
dealer_one
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
assert
(
dealer_one
);
assert
(
dealer_one
);
rc
=
zmq_setsockopt
(
dealer_one
,
ZMQ_
IDENTITY
,
"X"
,
1
);
rc
=
zmq_setsockopt
(
dealer_one
,
ZMQ_
ROUTING_ID
,
"X"
,
1
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_connect
(
dealer_one
,
my_endpoint
);
rc
=
zmq_connect
(
dealer_one
,
my_endpoint
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
...
@@ -71,7 +71,7 @@ int main (void)
...
@@ -71,7 +71,7 @@ int main (void)
// Now create a second dealer that uses the same identity
// Now create a second dealer that uses the same identity
void
*
dealer_two
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
void
*
dealer_two
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
assert
(
dealer_two
);
assert
(
dealer_two
);
rc
=
zmq_setsockopt
(
dealer_two
,
ZMQ_
IDENTITY
,
"X"
,
1
);
rc
=
zmq_setsockopt
(
dealer_two
,
ZMQ_
ROUTING_ID
,
"X"
,
1
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_connect
(
dealer_two
,
my_endpoint
);
rc
=
zmq_connect
(
dealer_two
,
my_endpoint
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
...
...
tests/test_router_mandatory.cpp
View file @
08b01a51
...
@@ -85,13 +85,13 @@ void test_get_peer_state ()
...
@@ -85,13 +85,13 @@ void test_get_peer_state ()
const
char
*
dealer2_identity
=
"Y"
;
const
char
*
dealer2_identity
=
"Y"
;
// Name dealer1 "X" and connect it to our router
// Name dealer1 "X" and connect it to our router
rc
=
zmq_setsockopt
(
dealer1
,
ZMQ_
IDENTITY
,
dealer1_identity
,
1
);
rc
=
zmq_setsockopt
(
dealer1
,
ZMQ_
ROUTING_ID
,
dealer1_identity
,
1
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_connect
(
dealer1
,
my_endpoint
);
rc
=
zmq_connect
(
dealer1
,
my_endpoint
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// Name dealer2 "Y" and connect it to our router
// Name dealer2 "Y" and connect it to our router
rc
=
zmq_setsockopt
(
dealer2
,
ZMQ_
IDENTITY
,
dealer2_identity
,
1
);
rc
=
zmq_setsockopt
(
dealer2
,
ZMQ_
ROUTING_ID
,
dealer2_identity
,
1
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_connect
(
dealer2
,
my_endpoint
);
rc
=
zmq_connect
(
dealer2
,
my_endpoint
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
...
@@ -250,7 +250,7 @@ void test_basic ()
...
@@ -250,7 +250,7 @@ void test_basic ()
// Create dealer called "X" and connect it to our router
// Create dealer called "X" and connect it to our router
void
*
dealer
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
void
*
dealer
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
assert
(
dealer
);
assert
(
dealer
);
rc
=
zmq_setsockopt
(
dealer
,
ZMQ_
IDENTITY
,
"X"
,
1
);
rc
=
zmq_setsockopt
(
dealer
,
ZMQ_
ROUTING_ID
,
"X"
,
1
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_connect
(
dealer
,
my_endpoint
);
rc
=
zmq_connect
(
dealer
,
my_endpoint
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
...
...
tests/test_security_curve.cpp
View file @
08b01a51
...
@@ -658,7 +658,7 @@ int main (void)
...
@@ -658,7 +658,7 @@ int main (void)
setup_context_and_server_side
(
&
ctx
,
&
handler
,
&
zap_thread
,
&
server
,
setup_context_and_server_side
(
&
ctx
,
&
handler
,
&
zap_thread
,
&
server
,
&
server_mon
,
my_endpoint
);
&
server_mon
,
my_endpoint
);
test_null_key
(
ctx
,
server
,
server_mon
,
my_endpoint
,
null_key
,
test_null_key
(
ctx
,
server
,
server_mon
,
my_endpoint
,
null_key
,
valid_client_public
,
valid_client_secret
);
valid_client_public
,
valid_client_secret
);
shutdown_context_and_server_side
(
ctx
,
zap_thread
,
server
,
server_mon
,
shutdown_context_and_server_side
(
ctx
,
zap_thread
,
server
,
server_mon
,
handler
);
handler
);
...
@@ -668,7 +668,7 @@ int main (void)
...
@@ -668,7 +668,7 @@ int main (void)
setup_context_and_server_side
(
&
ctx
,
&
handler
,
&
zap_thread
,
&
server
,
setup_context_and_server_side
(
&
ctx
,
&
handler
,
&
zap_thread
,
&
server
,
&
server_mon
,
my_endpoint
);
&
server_mon
,
my_endpoint
);
test_null_key
(
ctx
,
server
,
server_mon
,
my_endpoint
,
valid_server_public
,
test_null_key
(
ctx
,
server
,
server_mon
,
my_endpoint
,
valid_server_public
,
null_key
,
valid_client_secret
);
null_key
,
valid_client_secret
);
shutdown_context_and_server_side
(
ctx
,
zap_thread
,
server
,
server_mon
,
shutdown_context_and_server_side
(
ctx
,
zap_thread
,
server
,
server_mon
,
handler
);
handler
);
...
@@ -678,7 +678,7 @@ int main (void)
...
@@ -678,7 +678,7 @@ int main (void)
setup_context_and_server_side
(
&
ctx
,
&
handler
,
&
zap_thread
,
&
server
,
setup_context_and_server_side
(
&
ctx
,
&
handler
,
&
zap_thread
,
&
server
,
&
server_mon
,
my_endpoint
);
&
server_mon
,
my_endpoint
);
test_null_key
(
ctx
,
server
,
server_mon
,
my_endpoint
,
valid_server_public
,
test_null_key
(
ctx
,
server
,
server_mon
,
my_endpoint
,
valid_server_public
,
valid_client_public
,
null_key
);
valid_client_public
,
null_key
);
shutdown_context_and_server_side
(
ctx
,
zap_thread
,
server
,
server_mon
,
shutdown_context_and_server_side
(
ctx
,
zap_thread
,
server
,
server_mon
,
handler
);
handler
);
...
@@ -752,7 +752,8 @@ int main (void)
...
@@ -752,7 +752,8 @@ int main (void)
shutdown_context_and_server_side
(
ctx
,
zap_thread
,
server
,
server_mon
,
shutdown_context_and_server_side
(
ctx
,
zap_thread
,
server
,
server_mon
,
handler
);
handler
);
fprintf
(
stderr
,
"test_curve_security_invalid_initiate_command_encrypted_cookie
\n
"
);
fprintf
(
stderr
,
"test_curve_security_invalid_initiate_command_encrypted_cookie
\n
"
);
setup_context_and_server_side
(
&
ctx
,
&
handler
,
&
zap_thread
,
&
server
,
setup_context_and_server_side
(
&
ctx
,
&
handler
,
&
zap_thread
,
&
server
,
&
server_mon
,
my_endpoint
);
&
server_mon
,
my_endpoint
);
test_curve_security_invalid_initiate_command_encrypted_cookie
(
test_curve_security_invalid_initiate_command_encrypted_cookie
(
...
@@ -760,7 +761,9 @@ int main (void)
...
@@ -760,7 +761,9 @@ int main (void)
shutdown_context_and_server_side
(
ctx
,
zap_thread
,
server
,
server_mon
,
shutdown_context_and_server_side
(
ctx
,
zap_thread
,
server
,
server_mon
,
handler
);
handler
);
fprintf
(
stderr
,
"test_curve_security_invalid_initiate_command_encrypted_content
\n
"
);
fprintf
(
stderr
,
"test_curve_security_invalid_initiate_command_encrypted_content
\n
"
);
setup_context_and_server_side
(
&
ctx
,
&
handler
,
&
zap_thread
,
&
server
,
setup_context_and_server_side
(
&
ctx
,
&
handler
,
&
zap_thread
,
&
server
,
&
server_mon
,
my_endpoint
);
&
server_mon
,
my_endpoint
);
test_curve_security_invalid_initiate_command_encrypted_content
(
test_curve_security_invalid_initiate_command_encrypted_content
(
...
@@ -769,15 +772,16 @@ int main (void)
...
@@ -769,15 +772,16 @@ int main (void)
handler
);
handler
);
// test with a large identity (resulting in large metadata)
// test with a large identity (resulting in large metadata)
fprintf
(
stderr
,
"test_curve_security_with_valid_credentials (large identity)
\n
"
);
fprintf
(
stderr
,
"test_curve_security_with_valid_credentials (large identity)
\n
"
);
setup_context_and_server_side
(
setup_context_and_server_side
(
&
ctx
,
&
handler
,
&
zap_thread
,
&
server
,
&
server_mon
,
my_endpoint
,
&
ctx
,
&
handler
,
&
zap_thread
,
&
server
,
&
server_mon
,
my_endpoint
,
&
zap_handler_large_identity
,
&
socket_config_curve_server
,
&
valid_server_secret
,
&
zap_handler_large_identity
,
&
socket_config_curve_server
,
large_identity
);
&
valid_server_secret
,
large_identity
);
test_curve_security_with_valid_credentials
(
ctx
,
my_endpoint
,
server
,
test_curve_security_with_valid_credentials
(
ctx
,
my_endpoint
,
server
,
server_mon
,
timeout
);
server_mon
,
timeout
);
shutdown_context_and_server_side
(
ctx
,
zap_thread
,
server
,
server_mon
,
shutdown_context_and_server_side
(
ctx
,
zap_thread
,
server
,
server_mon
,
handler
);
handler
);
ctx
=
zmq_ctx_new
();
ctx
=
zmq_ctx_new
();
test_curve_security_invalid_keysize
(
ctx
);
test_curve_security_invalid_keysize
(
ctx
);
...
...
tests/test_security_plain.cpp
View file @
08b01a51
...
@@ -108,7 +108,7 @@ int main (void)
...
@@ -108,7 +108,7 @@ int main (void)
// Server socket will accept connections
// Server socket will accept connections
void
*
server
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
void
*
server
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
assert
(
server
);
assert
(
server
);
int
rc
=
zmq_setsockopt
(
server
,
ZMQ_
IDENTITY
,
"IDENT"
,
6
);
int
rc
=
zmq_setsockopt
(
server
,
ZMQ_
ROUTING_ID
,
"IDENT"
,
6
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
int
as_server
=
1
;
int
as_server
=
1
;
rc
=
zmq_setsockopt
(
server
,
ZMQ_PLAIN_SERVER
,
&
as_server
,
sizeof
(
int
));
rc
=
zmq_setsockopt
(
server
,
ZMQ_PLAIN_SERVER
,
&
as_server
,
sizeof
(
int
));
...
...
tests/test_security_zap.cpp
View file @
08b01a51
...
@@ -280,13 +280,11 @@ int main (void)
...
@@ -280,13 +280,11 @@ int main (void)
test_zap_errors
(
&
socket_config_plain_server
,
NULL
,
test_zap_errors
(
&
socket_config_plain_server
,
NULL
,
&
socket_config_plain_client
,
NULL
);
&
socket_config_plain_client
,
NULL
);
if
(
zmq_has
(
"curve"
))
{
fprintf
(
stderr
,
"CURVE mechanism
\n
"
);
fprintf
(
stderr
,
"CURVE mechanism
\n
"
);
setup_testutil_security_curve
();
setup_testutil_security_curve
();
curve_client_data_t
curve_client_data
=
{
curve_client_data_t
curve_client_data
=
{
valid_server_public
,
valid_client_public
,
valid_client_secret
};
valid_server_public
,
valid_client_public
,
valid_client_secret
};
test_zap_errors
(
&
socket_config_curve_server
,
valid_server_secret
,
test_zap_errors
(
&
socket_config_curve_server
,
valid_server_secret
,
&
socket_config_curve_client
,
&
curve_client_data
);
&
socket_config_curve_client
,
&
curve_client_data
);
}
}
}
tests/test_spec_req.cpp
View file @
08b01a51
...
@@ -82,7 +82,7 @@ void test_req_only_listens_to_current_peer (void *ctx)
...
@@ -82,7 +82,7 @@ void test_req_only_listens_to_current_peer (void *ctx)
void
*
req
=
zmq_socket
(
ctx
,
ZMQ_REQ
);
void
*
req
=
zmq_socket
(
ctx
,
ZMQ_REQ
);
assert
(
req
);
assert
(
req
);
int
rc
=
zmq_setsockopt
(
req
,
ZMQ_
IDENTITY
,
"A"
,
2
);
int
rc
=
zmq_setsockopt
(
req
,
ZMQ_
ROUTING_ID
,
"A"
,
2
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_bind
(
req
,
bind_address
);
rc
=
zmq_bind
(
req
,
bind_address
);
...
...
tests/test_spec_router.cpp
View file @
08b01a51
...
@@ -58,7 +58,7 @@ void test_fair_queue_in (void *ctx)
...
@@ -58,7 +58,7 @@ void test_fair_queue_in (void *ctx)
char
*
str
=
strdup
(
"A"
);
char
*
str
=
strdup
(
"A"
);
str
[
0
]
+=
peer
;
str
[
0
]
+=
peer
;
rc
=
zmq_setsockopt
(
senders
[
peer
],
ZMQ_
IDENTITY
,
str
,
2
);
rc
=
zmq_setsockopt
(
senders
[
peer
],
ZMQ_
ROUTING_ID
,
str
,
2
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
free
(
str
);
free
(
str
);
...
@@ -130,7 +130,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
...
@@ -130,7 +130,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
void
*
B
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
void
*
B
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
assert
(
B
);
assert
(
B
);
rc
=
zmq_setsockopt
(
B
,
ZMQ_
IDENTITY
,
"B"
,
2
);
rc
=
zmq_setsockopt
(
B
,
ZMQ_
ROUTING_ID
,
"B"
,
2
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_connect
(
B
,
connect_address
);
rc
=
zmq_connect
(
B
,
connect_address
);
...
...
tests/test_stream.cpp
View file @
08b01a51
...
@@ -277,7 +277,7 @@ test_stream_to_stream (void)
...
@@ -277,7 +277,7 @@ test_stream_to_stream (void)
// Sent HTTP request on client socket
// Sent HTTP request on client socket
// Get server identity
// Get server identity
rc
=
zmq_getsockopt
(
client
,
ZMQ_
IDENTITY
,
id
,
&
id_size
);
rc
=
zmq_getsockopt
(
client
,
ZMQ_
ROUTING_ID
,
id
,
&
id_size
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// First frame is server identity
// First frame is server identity
rc
=
zmq_send
(
client
,
id
,
id_size
,
ZMQ_SNDMORE
);
rc
=
zmq_send
(
client
,
id
,
id_size
,
ZMQ_SNDMORE
);
...
...
tests/test_stream_disconnect.cpp
View file @
08b01a51
...
@@ -57,7 +57,7 @@ bool has_more (void* socket)
...
@@ -57,7 +57,7 @@ bool has_more (void* socket)
bool
get_identity
(
void
*
socket
,
char
*
data
,
size_t
*
size
)
bool
get_identity
(
void
*
socket
,
char
*
data
,
size_t
*
size
)
{
{
int
rc
=
zmq_getsockopt
(
socket
,
ZMQ_
IDENTITY
,
data
,
size
);
int
rc
=
zmq_getsockopt
(
socket
,
ZMQ_
ROUTING_ID
,
data
,
size
);
return
rc
==
0
;
return
rc
==
0
;
}
}
...
@@ -140,7 +140,7 @@ int main(int, char**)
...
@@ -140,7 +140,7 @@ int main(int, char**)
// Send initial message.
// Send initial message.
char
blob_data
[
256
];
char
blob_data
[
256
];
size_t
blob_size
=
sizeof
(
blob_data
);
size_t
blob_size
=
sizeof
(
blob_data
);
rc
=
zmq_getsockopt
(
sockets
[
CLIENT
],
ZMQ_
IDENTITY
,
blob_data
,
&
blob_size
);
rc
=
zmq_getsockopt
(
sockets
[
CLIENT
],
ZMQ_
ROUTING_ID
,
blob_data
,
&
blob_size
);
assert
(
rc
!=
-
1
);
assert
(
rc
!=
-
1
);
assert
(
blob_size
>
0
);
assert
(
blob_size
>
0
);
zmq_msg_t
msg
;
zmq_msg_t
msg
;
...
...
tests/testutil_security.hpp
View file @
08b01a51
...
@@ -349,7 +349,7 @@ void setup_context_and_server_side (
...
@@ -349,7 +349,7 @@ void setup_context_and_server_side (
socket_config_
(
*
server
,
socket_config_data_
);
socket_config_
(
*
server
,
socket_config_data_
);
rc
=
zmq_setsockopt
(
*
server
,
ZMQ_
IDENTITY
,
identity
,
strlen
(
identity
));
rc
=
zmq_setsockopt
(
*
server
,
ZMQ_
ROUTING_ID
,
identity
,
strlen
(
identity
));
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_bind
(
*
server
,
"tcp://127.0.0.1:*"
);
rc
=
zmq_bind
(
*
server
,
"tcp://127.0.0.1:*"
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment