Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
3ef3b940
Commit
3ef3b940
authored
Apr 28, 2013
by
Pieter Hintjens
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #552 from hurtonm/master
Implement ZMTP/3.0 NULL mechanism
parents
df012358
a7032e9c
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
265 additions
and
28 deletions
+265
-28
stream_engine.cpp
src/stream_engine.cpp
+198
-5
stream_engine.hpp
src/stream_engine.hpp
+26
-7
test_raw_sock.cpp
tests/test_raw_sock.cpp
+41
-16
No files found.
src/stream_engine.cpp
View file @
3ef3b940
...
@@ -57,6 +57,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons
...
@@ -57,6 +57,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons
outsize
(
0
),
outsize
(
0
),
encoder
(
NULL
),
encoder
(
NULL
),
handshaking
(
true
),
handshaking
(
true
),
greeting_size
(
v2_greeting_size
),
greeting_bytes_read
(
0
),
greeting_bytes_read
(
0
),
session
(
NULL
),
session
(
NULL
),
options
(
options_
),
options
(
options_
),
...
@@ -68,6 +69,8 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons
...
@@ -68,6 +69,8 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons
io_error
(
false
),
io_error
(
false
),
congested
(
false
),
congested
(
false
),
subscription_required
(
false
),
subscription_required
(
false
),
output_paused
(
false
),
ready_command_received
(
false
),
socket
(
NULL
)
socket
(
NULL
)
{
{
int
rc
=
tx_msg
.
init
();
int
rc
=
tx_msg
.
init
();
...
@@ -417,7 +420,7 @@ bool zmq::stream_engine_t::handshake ()
...
@@ -417,7 +420,7 @@ bool zmq::stream_engine_t::handshake ()
if
(
greeting_recv
[
0
]
!=
0xff
)
if
(
greeting_recv
[
0
]
!=
0xff
)
break
;
break
;
if
(
greeting_bytes_read
<
10
)
if
(
greeting_bytes_read
<
signature_size
)
continue
;
continue
;
// Inspect the right-most bit of the 10th byte (which coincides
// Inspect the right-most bit of the 10th byte (which coincides
...
@@ -428,12 +431,32 @@ bool zmq::stream_engine_t::handshake ()
...
@@ -428,12 +431,32 @@ bool zmq::stream_engine_t::handshake ()
break
;
break
;
// The peer is using versioned protocol.
// The peer is using versioned protocol.
// Send the
rest of the greeting, if necessary
.
// Send the
major version number
.
if
(
outpos
+
outsize
!=
greeting_send
+
greeting
_size
)
{
if
(
outpos
+
outsize
==
greeting_send
+
signature
_size
)
{
if
(
outsize
==
0
)
if
(
outsize
==
0
)
set_pollout
(
handle
);
set_pollout
(
handle
);
outpos
[
outsize
++
]
=
ZMTP_2_1
;
// Protocol revision
outpos
[
outsize
++
]
=
3
;
// Major version number
outpos
[
outsize
++
]
=
options
.
type
;
// Socket type
}
if
(
greeting_bytes_read
>
signature_size
)
{
if
(
outpos
+
outsize
==
greeting_send
+
signature_size
+
1
)
{
if
(
outsize
==
0
)
set_pollout
(
handle
);
// Use ZMTP/2.0 to talk to older peers.
if
(
greeting_recv
[
10
]
==
ZMTP_1_0
||
greeting_recv
[
10
]
==
ZMTP_2_0
)
outpos
[
outsize
++
]
=
options
.
type
;
else
{
outpos
[
outsize
++
]
=
0
;
// Minor version number
memset
(
outpos
+
outsize
,
0
,
20
);
memcpy
(
outpos
+
outsize
,
"NULL"
,
4
);
outsize
+=
20
;
memset
(
outpos
+
outsize
,
0
,
32
);
outsize
+=
32
;
greeting_size
=
v3_greeting_size
;
}
}
}
}
}
}
...
@@ -478,6 +501,15 @@ bool zmq::stream_engine_t::handshake ()
...
@@ -478,6 +501,15 @@ bool zmq::stream_engine_t::handshake ()
in_batch_size
,
options
.
maxmsgsize
);
in_batch_size
,
options
.
maxmsgsize
);
alloc_assert
(
decoder
);
alloc_assert
(
decoder
);
}
}
else
if
(
greeting_recv
[
revision_pos
]
==
ZMTP_2_0
)
{
encoder
=
new
(
std
::
nothrow
)
v2_encoder_t
(
out_batch_size
);
alloc_assert
(
encoder
);
decoder
=
new
(
std
::
nothrow
)
v2_decoder_t
(
in_batch_size
,
options
.
maxmsgsize
);
alloc_assert
(
decoder
);
}
else
{
else
{
encoder
=
new
(
std
::
nothrow
)
v2_encoder_t
(
out_batch_size
);
encoder
=
new
(
std
::
nothrow
)
v2_encoder_t
(
out_batch_size
);
alloc_assert
(
encoder
);
alloc_assert
(
encoder
);
...
@@ -485,6 +517,15 @@ bool zmq::stream_engine_t::handshake ()
...
@@ -485,6 +517,15 @@ bool zmq::stream_engine_t::handshake ()
decoder
=
new
(
std
::
nothrow
)
v2_decoder_t
(
decoder
=
new
(
std
::
nothrow
)
v2_decoder_t
(
in_batch_size
,
options
.
maxmsgsize
);
in_batch_size
,
options
.
maxmsgsize
);
alloc_assert
(
decoder
);
alloc_assert
(
decoder
);
if
(
memcmp
(
greeting_recv
+
12
,
"NULL
\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0
"
,
20
)
==
0
)
{
read_msg
=
&
stream_engine_t
::
send_ready_command
;
write_msg
=
&
stream_engine_t
::
receive_ready_command
;
}
else
{
error
();
return
false
;
}
}
}
// Start polling for output if necessary.
// Start polling for output if necessary.
...
@@ -530,6 +571,135 @@ int zmq::stream_engine_t::write_identity (msg_t *msg_)
...
@@ -530,6 +571,135 @@ int zmq::stream_engine_t::write_identity (msg_t *msg_)
return
0
;
return
0
;
}
}
int
zmq
::
stream_engine_t
::
send_ready_command
(
msg_t
*
msg_
)
{
unsigned
char
*
const
command_buffer
=
(
unsigned
char
*
)
malloc
(
512
);
alloc_assert
(
command_buffer
);
unsigned
char
*
ptr
=
command_buffer
;
// Add mechanism string
memcpy
(
ptr
,
"READY "
,
8
);
ptr
+=
8
;
// Add socket type property
const
char
*
socket_type
=
socket_type_string
(
options
.
type
);
ptr
+=
add_property
(
ptr
,
"Socket-Type"
,
socket_type
,
strlen
(
socket_type
));
// Add identity property
if
(
options
.
type
==
ZMQ_REQ
||
options
.
type
==
ZMQ_DEALER
||
options
.
type
==
ZMQ_ROUTER
)
{
ptr
+=
add_property
(
ptr
,
"Identity"
,
options
.
identity
,
options
.
identity_size
);
}
const
size_t
command_size
=
ptr
-
command_buffer
;
const
int
rc
=
msg_
->
init_size
(
command_size
);
errno_assert
(
rc
==
0
);
memcpy
(
msg_
->
data
(),
command_buffer
,
command_size
);
free
(
command_buffer
);
if
(
ready_command_received
)
read_msg
=
&
stream_engine_t
::
pull_msg_from_session
;
else
read_msg
=
&
stream_engine_t
::
wait
;
return
0
;
}
int
zmq
::
stream_engine_t
::
receive_ready_command
(
msg_t
*
msg_
)
{
const
unsigned
char
*
const
command_buffer
=
static_cast
<
unsigned
char
*>
(
msg_
->
data
());
const
size_t
command_size
=
msg_
->
size
();
const
unsigned
char
*
ptr
=
command_buffer
;
size_t
bytes_left
=
command_size
;
if
(
bytes_left
<
8
||
memcmp
(
ptr
,
"READY "
,
8
))
{
errno
=
EPROTO
;
return
-
1
;
}
ptr
+=
8
;
bytes_left
-=
8
;
// Parse the property list
while
(
bytes_left
>
1
)
{
const
size_t
name_length
=
static_cast
<
size_t
>
(
*
ptr
);
ptr
+=
1
;
bytes_left
-=
1
;
if
(
bytes_left
<
name_length
)
break
;
const
std
::
string
name
=
std
::
string
((
const
char
*
)
ptr
,
name_length
);
ptr
+=
name_length
;
bytes_left
-=
name_length
;
if
(
bytes_left
<
4
)
break
;
const
size_t
value_length
=
static_cast
<
size_t
>
(
get_uint32
(
ptr
));
ptr
+=
4
;
bytes_left
-=
4
;
if
(
bytes_left
<
value_length
)
break
;
const
unsigned
char
*
const
value
=
ptr
;
ptr
+=
value_length
;
bytes_left
-=
value_length
;
if
(
name
==
"Socket-Type"
)
{
// Implement socket type checking
}
else
if
(
name
==
"Identity"
)
{
if
(
options
.
recv_identity
)
{
msg_t
identity
;
int
rc
=
identity
.
init_size
(
value_length
);
errno_assert
(
rc
==
0
);
memcpy
(
identity
.
data
(),
value
,
value_length
);
identity
.
set_flags
(
msg_t
::
identity
);
rc
=
session
->
push_msg
(
&
identity
);
errno_assert
(
rc
==
0
);
}
}
}
if
(
bytes_left
>
0
)
{
errno
=
EPROTO
;
return
-
1
;
}
int
rc
=
msg_
->
close
();
errno_assert
(
rc
==
0
);
rc
=
msg_
->
init
();
errno_assert
(
rc
==
0
);
write_msg
=
&
stream_engine_t
::
push_msg_to_session
;
ready_command_received
=
true
;
if
(
output_paused
)
{
activate_out
();
output_paused
=
false
;
}
return
0
;
}
int
zmq
::
stream_engine_t
::
wait
(
msg_t
*
msg_
)
{
if
(
ready_command_received
)
{
read_msg
=
&
stream_engine_t
::
pull_msg_from_session
;
return
pull_msg_from_session
(
msg_
);
}
else
{
output_paused
=
true
;
errno
=
EAGAIN
;
return
-
1
;
}
}
int
zmq
::
stream_engine_t
::
pull_msg_from_session
(
msg_t
*
msg_
)
int
zmq
::
stream_engine_t
::
pull_msg_from_session
(
msg_t
*
msg_
)
{
{
return
session
->
pull_msg
(
msg_
);
return
session
->
pull_msg
(
msg_
);
...
@@ -557,6 +727,29 @@ int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_)
...
@@ -557,6 +727,29 @@ int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_)
return
push_msg_to_session
(
msg_
);
return
push_msg_to_session
(
msg_
);
}
}
size_t
zmq
::
stream_engine_t
::
add_property
(
unsigned
char
*
ptr
,
const
char
*
name
,
const
void
*
value
,
size_t
value_len
)
{
const
size_t
name_len
=
strlen
(
name
);
zmq_assert
(
name_len
<=
255
);
*
ptr
++
=
static_cast
<
unsigned
char
>
(
name_len
);
memcpy
(
ptr
,
name
,
name_len
);
ptr
+=
name_len
;
zmq_assert
(
value_len
<=
(
2
^
31
)
-
1
);
put_uint32
(
ptr
,
static_cast
<
uint32_t
>
(
value_len
));
ptr
+=
4
;
memcpy
(
ptr
,
value
,
value_len
);
return
1
+
name_len
+
4
+
value_len
;
}
const
char
*
zmq
::
stream_engine_t
::
socket_type_string
(
int
socket_type
)
{
const
char
*
names
[]
=
{
"PAIR"
,
"PUB"
,
"SUB"
,
"REQ"
,
"REP"
,
"DEALER"
,
"ROUTER"
,
"PULL"
,
"PUSH"
,
"XPUB"
,
"XSUB"
};
zmq_assert
(
socket_type
>=
0
&&
socket_type
<=
10
);
return
names
[
socket_type
];
}
void
zmq
::
stream_engine_t
::
error
()
void
zmq
::
stream_engine_t
::
error
()
{
{
zmq_assert
(
session
);
zmq_assert
(
session
);
...
...
src/stream_engine.hpp
View file @
3ef3b940
...
@@ -37,8 +37,7 @@ namespace zmq
...
@@ -37,8 +37,7 @@ namespace zmq
enum
enum
{
{
ZMTP_1_0
=
0
,
ZMTP_1_0
=
0
,
ZMTP_2_0
=
1
,
ZMTP_2_0
=
1
ZMTP_2_1
=
2
};
};
class
io_thread_t
;
class
io_thread_t
;
...
@@ -94,11 +93,21 @@ namespace zmq
...
@@ -94,11 +93,21 @@ namespace zmq
int
read_identity
(
msg_t
*
msg_
);
int
read_identity
(
msg_t
*
msg_
);
int
write_identity
(
msg_t
*
msg_
);
int
write_identity
(
msg_t
*
msg_
);
int
send_ready_command
(
msg_t
*
msg
);
int
receive_ready_command
(
msg_t
*
msg
);
int
pull_msg_from_session
(
msg_t
*
msg_
);
int
pull_msg_from_session
(
msg_t
*
msg_
);
int
push_msg_to_session
(
msg_t
*
msg
);
int
push_msg_to_session
(
msg_t
*
msg
);
int
wait
(
msg_t
*
msg_
);
int
write_subscription_msg
(
msg_t
*
msg_
);
int
write_subscription_msg
(
msg_t
*
msg_
);
size_t
add_property
(
unsigned
char
*
ptr
,
const
char
*
name
,
const
void
*
value
,
size_t
value_len
);
const
char
*
socket_type_string
(
int
socket_type
);
// Underlying socket.
// Underlying socket.
fd_t
s
;
fd_t
s
;
...
@@ -119,13 +128,20 @@ namespace zmq
...
@@ -119,13 +128,20 @@ namespace zmq
// version. When false, normal message flow has started.
// version. When false, normal message flow has started.
bool
handshaking
;
bool
handshaking
;
// Size of the greeting message:
static
const
size_t
signature_size
=
10
;
// Preamble (10 bytes) + version (1 byte) + socket type (1 byte).
static
const
size_t
greeting_size
=
12
;
// Size of ZMTP/1.0 and ZMTP/2.0 greeting message
static
const
size_t
v2_greeting_size
=
12
;
// Size of ZMTP/3.0 greeting message
static
const
size_t
v3_greeting_size
=
64
;
// Expected greeting size.
size_t
greeting_size
;
// Greeting received from, and sent to peer
// Greeting received from, and sent to peer
unsigned
char
greeting_recv
[
greeting_size
];
unsigned
char
greeting_recv
[
v3_
greeting_size
];
unsigned
char
greeting_send
[
greeting_size
];
unsigned
char
greeting_send
[
v3_
greeting_size
];
// Size of greeting received so far
// Size of greeting received so far
unsigned
int
greeting_bytes_read
;
unsigned
int
greeting_bytes_read
;
...
@@ -156,6 +172,9 @@ namespace zmq
...
@@ -156,6 +172,9 @@ namespace zmq
// Needed to support old peers.
// Needed to support old peers.
bool
subscription_required
;
bool
subscription_required
;
bool
output_paused
;
bool
ready_command_received
;
// Socket
// Socket
zmq
::
socket_base_t
*
socket
;
zmq
::
socket_base_t
*
socket
;
...
...
tests/test_raw_sock.cpp
View file @
3ef3b940
...
@@ -28,9 +28,10 @@
...
@@ -28,9 +28,10 @@
typedef
unsigned
char
byte
;
typedef
unsigned
char
byte
;
typedef
struct
{
typedef
struct
{
byte
signature
[
10
];
// 0xFF 8*0x00 0x7F
byte
signature
[
10
];
// 0xFF 8*0x00 0x7F
byte
revision
;
// 2 = ZMTP/2.1
byte
version
[
2
];
// 0x03 0x00 for ZMTP/3.0
byte
socktype
;
// Defined in ZMTP spec
byte
mechanism
[
20
];
// "NULL"
byte
identity
[
2
];
// Empty message
byte
as_server
;
byte
filler
[
31
];
}
zmtp_greeting_t
;
}
zmtp_greeting_t
;
#define ZMTP_DEALER 5 // Socket type constants
#define ZMTP_DEALER 5 // Socket type constants
...
@@ -40,7 +41,7 @@ typedef struct {
...
@@ -40,7 +41,7 @@ typedef struct {
// 8-byte size is set to 1 for backwards compatibility
// 8-byte size is set to 1 for backwards compatibility
static
zmtp_greeting_t
greeting
static
zmtp_greeting_t
greeting
=
{
{
0xFF
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
1
,
0x7F
},
2
,
0
,
{
0
,
0
}
};
=
{
{
0xFF
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
1
,
0x7F
},
{
3
,
0
},
{
'N'
,
'U'
,
'L'
,
'L'
}
};
int
main
(
void
)
int
main
(
void
)
{
{
...
@@ -91,7 +92,6 @@ int main (void)
...
@@ -91,7 +92,6 @@ int main (void)
// Send our own protocol greeting
// Send our own protocol greeting
rc
=
zmq_msg_send
(
&
identity
,
router
,
ZMQ_SNDMORE
);
rc
=
zmq_msg_send
(
&
identity
,
router
,
ZMQ_SNDMORE
);
assert
(
rc
>
0
);
assert
(
rc
>
0
);
greeting
.
socktype
=
ZMTP_ROUTER
;
rc
=
zmq_send
(
router
,
&
greeting
,
sizeof
(
greeting
),
0
);
rc
=
zmq_send
(
router
,
&
greeting
,
sizeof
(
greeting
),
0
);
assert
(
rc
==
sizeof
(
greeting
));
assert
(
rc
==
sizeof
(
greeting
));
...
@@ -101,22 +101,47 @@ int main (void)
...
@@ -101,22 +101,47 @@ int main (void)
assert
(
rc
>
0
);
assert
(
rc
>
0
);
assert
(
zmq_msg_more
(
&
identity
));
assert
(
zmq_msg_more
(
&
identity
));
// Second frame contains all remaining data from DEALER
// Second frame contains the rest of greeting along with
// the Ready command
rc
=
zmq_recv
(
router
,
buffer
,
255
,
0
);
rc
=
zmq_recv
(
router
,
buffer
,
255
,
0
);
assert
(
rc
==
11
);
assert
(
rc
==
99
);
// First
four bytes are [revision][socktype][identity]
// First
two bytes are major and minor version numbers.
assert
(
buffer
[
0
]
==
2
);
// ZMTP/2.1
assert
(
buffer
[
0
]
==
3
);
// ZMTP/3.0
assert
(
buffer
[
1
]
==
ZMTP_DEALER
);
assert
(
buffer
[
1
]
==
0
);
// Identity is 2 byte message
// Mechanism is "NULL"
assert
(
buffer
[
2
]
==
0
);
// Flags = 0
assert
(
memcmp
(
buffer
+
2
,
"NULL
\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0
"
,
22
)
==
0
);
assert
(
buffer
[
3
]
==
0
);
// Size = 0
assert
(
memcmp
(
buffer
+
54
,
"
\0\53
READY "
,
10
)
==
0
);
assert
(
memcmp
(
buffer
+
64
,
"
\13
Socket-Type
\0\0\0\6
DEALER"
,
22
)
==
0
);
assert
(
memcmp
(
buffer
+
86
,
"
\10
Identity
\0\0\0\0
"
,
13
)
==
0
);
// Announce we are ready
memcpy
(
buffer
,
"
\0\53
"
,
2
);
memcpy
(
buffer
+
2
,
"READY "
,
8
);
memcpy
(
buffer
+
10
,
"
\13
Socket-Type
\0\0\0\6
ROUTER"
,
22
);
memcpy
(
buffer
+
32
,
"
\10
Identity
\0\0\0\0
"
,
13
);
// Send Ready command
rc
=
zmq_msg_send
(
&
identity
,
router
,
ZMQ_SNDMORE
);
assert
(
rc
>
0
);
rc
=
zmq_send
(
router
,
buffer
,
45
,
0
);
assert
(
rc
==
45
);
// Now we expect the data from the DEALER socket
// First frame is, again, the identity of the connection
rc
=
zmq_msg_recv
(
&
identity
,
router
,
0
);
assert
(
rc
>
0
);
assert
(
zmq_msg_more
(
&
identity
));
// Third frame contains Hello message from DEALER
rc
=
zmq_recv
(
router
,
buffer
,
sizeof
buffer
,
0
);
assert
(
rc
==
7
);
// Then we have a 5-byte message "Hello"
// Then we have a 5-byte message "Hello"
assert
(
buffer
[
4
]
==
0
);
// Flags = 0
assert
(
buffer
[
0
]
==
0
);
// Flags = 0
assert
(
buffer
[
5
]
==
5
);
// Size = 5
assert
(
buffer
[
1
]
==
5
);
// Size = 5
assert
(
memcmp
(
buffer
+
6
,
"Hello"
,
5
)
==
0
);
assert
(
memcmp
(
buffer
+
2
,
"Hello"
,
5
)
==
0
);
// Send "World" back to DEALER
// Send "World" back to DEALER
rc
=
zmq_msg_send
(
&
identity
,
router
,
ZMQ_SNDMORE
);
rc
=
zmq_msg_send
(
&
identity
,
router
,
ZMQ_SNDMORE
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment