Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
cd4d8bb1
Commit
cd4d8bb1
authored
Apr 28, 2013
by
Martin Hurton
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Implement ZMTP/3.0 NULL mechanism
parent
df012358
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
224 additions
and
12 deletions
+224
-12
stream_engine.cpp
src/stream_engine.cpp
+198
-5
stream_engine.hpp
src/stream_engine.hpp
+26
-7
No files found.
src/stream_engine.cpp
View file @
cd4d8bb1
...
@@ -57,6 +57,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons
...
@@ -57,6 +57,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons
outsize
(
0
),
outsize
(
0
),
encoder
(
NULL
),
encoder
(
NULL
),
handshaking
(
true
),
handshaking
(
true
),
greeting_size
(
v2_greeting_size
),
greeting_bytes_read
(
0
),
greeting_bytes_read
(
0
),
session
(
NULL
),
session
(
NULL
),
options
(
options_
),
options
(
options_
),
...
@@ -68,6 +69,8 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons
...
@@ -68,6 +69,8 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons
io_error
(
false
),
io_error
(
false
),
congested
(
false
),
congested
(
false
),
subscription_required
(
false
),
subscription_required
(
false
),
output_paused
(
false
),
ready_command_received
(
false
),
socket
(
NULL
)
socket
(
NULL
)
{
{
int
rc
=
tx_msg
.
init
();
int
rc
=
tx_msg
.
init
();
...
@@ -417,7 +420,7 @@ bool zmq::stream_engine_t::handshake ()
...
@@ -417,7 +420,7 @@ bool zmq::stream_engine_t::handshake ()
if
(
greeting_recv
[
0
]
!=
0xff
)
if
(
greeting_recv
[
0
]
!=
0xff
)
break
;
break
;
if
(
greeting_bytes_read
<
10
)
if
(
greeting_bytes_read
<
signature_size
)
continue
;
continue
;
// Inspect the right-most bit of the 10th byte (which coincides
// Inspect the right-most bit of the 10th byte (which coincides
...
@@ -428,12 +431,32 @@ bool zmq::stream_engine_t::handshake ()
...
@@ -428,12 +431,32 @@ bool zmq::stream_engine_t::handshake ()
break
;
break
;
// The peer is using versioned protocol.
// The peer is using versioned protocol.
// Send the
rest of the greeting, if necessary
.
// Send the
major version number
.
if
(
outpos
+
outsize
!=
greeting_send
+
greeting
_size
)
{
if
(
outpos
+
outsize
==
greeting_send
+
signature
_size
)
{
if
(
outsize
==
0
)
if
(
outsize
==
0
)
set_pollout
(
handle
);
set_pollout
(
handle
);
outpos
[
outsize
++
]
=
ZMTP_2_1
;
// Protocol revision
outpos
[
outsize
++
]
=
3
;
// Major version number
outpos
[
outsize
++
]
=
options
.
type
;
// Socket type
}
if
(
greeting_bytes_read
>
signature_size
)
{
if
(
outpos
+
outsize
==
greeting_send
+
signature_size
+
1
)
{
if
(
outsize
==
0
)
set_pollout
(
handle
);
// Use ZMTP/2.0 to talk to older peers.
if
(
greeting_recv
[
10
]
==
ZMTP_1_0
||
greeting_recv
[
10
]
==
ZMTP_2_0
)
outpos
[
outsize
++
]
=
options
.
type
;
else
{
outpos
[
outsize
++
]
=
0
;
// Minor version number
memset
(
outpos
+
outsize
,
0
,
20
);
memcpy
(
outpos
+
outsize
,
"NULL"
,
4
);
outsize
+=
20
;
memset
(
outpos
+
outsize
,
0
,
32
);
outsize
+=
32
;
greeting_size
=
v3_greeting_size
;
}
}
}
}
}
}
...
@@ -478,6 +501,15 @@ bool zmq::stream_engine_t::handshake ()
...
@@ -478,6 +501,15 @@ bool zmq::stream_engine_t::handshake ()
in_batch_size
,
options
.
maxmsgsize
);
in_batch_size
,
options
.
maxmsgsize
);
alloc_assert
(
decoder
);
alloc_assert
(
decoder
);
}
}
else
if
(
greeting_recv
[
revision_pos
]
==
ZMTP_2_0
)
{
encoder
=
new
(
std
::
nothrow
)
v2_encoder_t
(
out_batch_size
);
alloc_assert
(
encoder
);
decoder
=
new
(
std
::
nothrow
)
v2_decoder_t
(
in_batch_size
,
options
.
maxmsgsize
);
alloc_assert
(
decoder
);
}
else
{
else
{
encoder
=
new
(
std
::
nothrow
)
v2_encoder_t
(
out_batch_size
);
encoder
=
new
(
std
::
nothrow
)
v2_encoder_t
(
out_batch_size
);
alloc_assert
(
encoder
);
alloc_assert
(
encoder
);
...
@@ -485,6 +517,15 @@ bool zmq::stream_engine_t::handshake ()
...
@@ -485,6 +517,15 @@ bool zmq::stream_engine_t::handshake ()
decoder
=
new
(
std
::
nothrow
)
v2_decoder_t
(
decoder
=
new
(
std
::
nothrow
)
v2_decoder_t
(
in_batch_size
,
options
.
maxmsgsize
);
in_batch_size
,
options
.
maxmsgsize
);
alloc_assert
(
decoder
);
alloc_assert
(
decoder
);
if
(
memcmp
(
greeting_recv
+
12
,
"NULL
\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0
"
,
20
)
==
0
)
{
read_msg
=
&
stream_engine_t
::
send_ready_command
;
write_msg
=
&
stream_engine_t
::
receive_ready_command
;
}
else
{
error
();
return
false
;
}
}
}
// Start polling for output if necessary.
// Start polling for output if necessary.
...
@@ -530,6 +571,135 @@ int zmq::stream_engine_t::write_identity (msg_t *msg_)
...
@@ -530,6 +571,135 @@ int zmq::stream_engine_t::write_identity (msg_t *msg_)
return
0
;
return
0
;
}
}
int
zmq
::
stream_engine_t
::
send_ready_command
(
msg_t
*
msg_
)
{
unsigned
char
*
const
command_buffer
=
(
unsigned
char
*
)
malloc
(
512
);
alloc_assert
(
command_buffer
);
unsigned
char
*
ptr
=
command_buffer
;
// Add mechanism string
memcpy
(
ptr
,
"READY "
,
8
);
ptr
+=
8
;
// Add socket type property
const
char
*
socket_type
=
socket_type_string
(
options
.
type
);
ptr
+=
add_property
(
ptr
,
"Socket-Type"
,
socket_type
,
strlen
(
socket_type
));
// Add identity property
if
(
options
.
type
==
ZMQ_REQ
||
options
.
type
==
ZMQ_DEALER
||
options
.
type
==
ZMQ_ROUTER
)
{
ptr
+=
add_property
(
ptr
,
"Identity"
,
options
.
identity
,
options
.
identity_size
);
}
const
size_t
command_size
=
ptr
-
command_buffer
;
const
int
rc
=
msg_
->
init_size
(
command_size
);
errno_assert
(
rc
==
0
);
memcpy
(
msg_
->
data
(),
command_buffer
,
command_size
);
free
(
command_buffer
);
if
(
ready_command_received
)
read_msg
=
&
stream_engine_t
::
pull_msg_from_session
;
else
read_msg
=
&
stream_engine_t
::
wait
;
return
0
;
}
int
zmq
::
stream_engine_t
::
receive_ready_command
(
msg_t
*
msg_
)
{
const
unsigned
char
*
const
command_buffer
=
static_cast
<
unsigned
char
*>
(
msg_
->
data
());
const
size_t
command_size
=
msg_
->
size
();
const
unsigned
char
*
ptr
=
command_buffer
;
size_t
bytes_left
=
command_size
;
if
(
bytes_left
<
8
||
memcmp
(
ptr
,
"READY "
,
8
))
{
errno
=
EPROTO
;
return
-
1
;
}
ptr
+=
8
;
bytes_left
-=
8
;
// Parse the property list
while
(
bytes_left
>
1
)
{
const
size_t
name_length
=
static_cast
<
size_t
>
(
*
ptr
);
ptr
+=
1
;
bytes_left
-=
1
;
if
(
bytes_left
<
name_length
)
break
;
const
std
::
string
name
=
std
::
string
((
const
char
*
)
ptr
,
name_length
);
ptr
+=
name_length
;
bytes_left
-=
name_length
;
if
(
bytes_left
<
4
)
break
;
const
size_t
value_length
=
static_cast
<
size_t
>
(
get_uint32
(
ptr
));
ptr
+=
4
;
bytes_left
-=
4
;
if
(
bytes_left
<
value_length
)
break
;
const
unsigned
char
*
const
value
=
ptr
;
ptr
+=
value_length
;
bytes_left
-=
value_length
;
if
(
name
==
"Socket-Type"
)
{
// Implement socket type checking
}
else
if
(
name
==
"Identity"
)
{
if
(
options
.
recv_identity
)
{
msg_t
identity
;
int
rc
=
identity
.
init_size
(
value_length
);
errno_assert
(
rc
==
0
);
memcpy
(
identity
.
data
(),
value
,
value_length
);
identity
.
set_flags
(
msg_t
::
identity
);
rc
=
session
->
push_msg
(
&
identity
);
errno_assert
(
rc
==
0
);
}
}
}
if
(
bytes_left
>
0
)
{
errno
=
EPROTO
;
return
-
1
;
}
int
rc
=
msg_
->
close
();
errno_assert
(
rc
==
0
);
rc
=
msg_
->
init
();
errno_assert
(
rc
==
0
);
write_msg
=
&
stream_engine_t
::
push_msg_to_session
;
ready_command_received
=
true
;
if
(
output_paused
)
{
activate_out
();
output_paused
=
false
;
}
return
0
;
}
int
zmq
::
stream_engine_t
::
wait
(
msg_t
*
msg_
)
{
if
(
ready_command_received
)
{
read_msg
=
&
stream_engine_t
::
pull_msg_from_session
;
return
pull_msg_from_session
(
msg_
);
}
else
{
output_paused
=
true
;
errno
=
EAGAIN
;
return
-
1
;
}
}
int
zmq
::
stream_engine_t
::
pull_msg_from_session
(
msg_t
*
msg_
)
int
zmq
::
stream_engine_t
::
pull_msg_from_session
(
msg_t
*
msg_
)
{
{
return
session
->
pull_msg
(
msg_
);
return
session
->
pull_msg
(
msg_
);
...
@@ -557,6 +727,29 @@ int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_)
...
@@ -557,6 +727,29 @@ int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_)
return
push_msg_to_session
(
msg_
);
return
push_msg_to_session
(
msg_
);
}
}
size_t
zmq
::
stream_engine_t
::
add_property
(
unsigned
char
*
ptr
,
const
char
*
name
,
const
void
*
value
,
size_t
value_len
)
{
const
size_t
name_len
=
strlen
(
name
);
zmq_assert
(
name_len
<=
255
);
*
ptr
++
=
static_cast
<
unsigned
char
>
(
name_len
);
memcpy
(
ptr
,
name
,
name_len
);
ptr
+=
name_len
;
zmq_assert
(
value_len
<=
(
2
^
31
)
-
1
);
put_uint32
(
ptr
,
static_cast
<
uint32_t
>
(
value_len
));
ptr
+=
4
;
memcpy
(
ptr
,
value
,
value_len
);
return
1
+
name_len
+
4
+
value_len
;
}
const
char
*
zmq
::
stream_engine_t
::
socket_type_string
(
int
socket_type
)
{
const
char
*
names
[]
=
{
"PAIR"
,
"PUB"
,
"SUB"
,
"REQ"
,
"REP"
,
"DEALER"
,
"ROUTER"
,
"PULL"
,
"PUSH"
,
"XPUB"
,
"XSUB"
};
zmq_assert
(
socket_type
>=
0
&&
socket_type
<=
10
);
return
names
[
socket_type
];
}
void
zmq
::
stream_engine_t
::
error
()
void
zmq
::
stream_engine_t
::
error
()
{
{
zmq_assert
(
session
);
zmq_assert
(
session
);
...
...
src/stream_engine.hpp
View file @
cd4d8bb1
...
@@ -37,8 +37,7 @@ namespace zmq
...
@@ -37,8 +37,7 @@ namespace zmq
enum
enum
{
{
ZMTP_1_0
=
0
,
ZMTP_1_0
=
0
,
ZMTP_2_0
=
1
,
ZMTP_2_0
=
1
ZMTP_2_1
=
2
};
};
class
io_thread_t
;
class
io_thread_t
;
...
@@ -94,11 +93,21 @@ namespace zmq
...
@@ -94,11 +93,21 @@ namespace zmq
int
read_identity
(
msg_t
*
msg_
);
int
read_identity
(
msg_t
*
msg_
);
int
write_identity
(
msg_t
*
msg_
);
int
write_identity
(
msg_t
*
msg_
);
int
send_ready_command
(
msg_t
*
msg
);
int
receive_ready_command
(
msg_t
*
msg
);
int
pull_msg_from_session
(
msg_t
*
msg_
);
int
pull_msg_from_session
(
msg_t
*
msg_
);
int
push_msg_to_session
(
msg_t
*
msg
);
int
push_msg_to_session
(
msg_t
*
msg
);
int
wait
(
msg_t
*
msg_
);
int
write_subscription_msg
(
msg_t
*
msg_
);
int
write_subscription_msg
(
msg_t
*
msg_
);
size_t
add_property
(
unsigned
char
*
ptr
,
const
char
*
name
,
const
void
*
value
,
size_t
value_len
);
const
char
*
socket_type_string
(
int
socket_type
);
// Underlying socket.
// Underlying socket.
fd_t
s
;
fd_t
s
;
...
@@ -119,13 +128,20 @@ namespace zmq
...
@@ -119,13 +128,20 @@ namespace zmq
// version. When false, normal message flow has started.
// version. When false, normal message flow has started.
bool
handshaking
;
bool
handshaking
;
// Size of the greeting message:
static
const
size_t
signature_size
=
10
;
// Preamble (10 bytes) + version (1 byte) + socket type (1 byte).
static
const
size_t
greeting_size
=
12
;
// Size of ZMTP/1.0 and ZMTP/2.0 greeting message
static
const
size_t
v2_greeting_size
=
12
;
// Size of ZMTP/3.0 greeting message
static
const
size_t
v3_greeting_size
=
64
;
// Expected greeting size.
size_t
greeting_size
;
// Greeting received from, and sent to peer
// Greeting received from, and sent to peer
unsigned
char
greeting_recv
[
greeting_size
];
unsigned
char
greeting_recv
[
v3_
greeting_size
];
unsigned
char
greeting_send
[
greeting_size
];
unsigned
char
greeting_send
[
v3_
greeting_size
];
// Size of greeting received so far
// Size of greeting received so far
unsigned
int
greeting_bytes_read
;
unsigned
int
greeting_bytes_read
;
...
@@ -156,6 +172,9 @@ namespace zmq
...
@@ -156,6 +172,9 @@ namespace zmq
// Needed to support old peers.
// Needed to support old peers.
bool
subscription_required
;
bool
subscription_required
;
bool
output_paused
;
bool
ready_command_received
;
// Socket
// Socket
zmq
::
socket_base_t
*
socket
;
zmq
::
socket_base_t
*
socket
;
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment