Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
12c7db8c
Commit
12c7db8c
authored
Mar 12, 2013
by
Pieter Hintjens
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Code name clean up
parent
d826c53b
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
18 additions
and
23 deletions
+18
-23
stream_engine.cpp
src/stream_engine.cpp
+10
-10
stream_engine.hpp
src/stream_engine.hpp
+8
-13
No files found.
src/stream_engine.cpp
View file @
12c7db8c
...
@@ -150,7 +150,7 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
...
@@ -150,7 +150,7 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
else
{
else
{
// Send the 'length' and 'flags' fields of the identity message.
// Send the 'length' and 'flags' fields of the identity message.
// The 'length' field is encoded in the long format.
// The 'length' field is encoded in the long format.
outpos
=
greeting_
output_buffer
;
outpos
=
greeting_
send
;
outpos
[
outsize
++
]
=
0xff
;
outpos
[
outsize
++
]
=
0xff
;
put_uint64
(
&
outpos
[
outsize
],
options
.
identity_size
+
1
);
put_uint64
(
&
outpos
[
outsize
],
options
.
identity_size
+
1
);
outsize
+=
8
;
outsize
+=
8
;
...
@@ -357,7 +357,7 @@ bool zmq::stream_engine_t::handshake ()
...
@@ -357,7 +357,7 @@ bool zmq::stream_engine_t::handshake ()
// Receive the greeting.
// Receive the greeting.
while
(
greeting_bytes_read
<
greeting_size
)
{
while
(
greeting_bytes_read
<
greeting_size
)
{
const
int
n
=
read
(
greeting
+
greeting_bytes_read
,
const
int
n
=
read
(
greeting
_recv
+
greeting_bytes_read
,
greeting_size
-
greeting_bytes_read
);
greeting_size
-
greeting_bytes_read
);
if
(
n
==
-
1
)
{
if
(
n
==
-
1
)
{
error
();
error
();
...
@@ -371,7 +371,7 @@ bool zmq::stream_engine_t::handshake ()
...
@@ -371,7 +371,7 @@ bool zmq::stream_engine_t::handshake ()
// We have received at least one byte from the peer.
// We have received at least one byte from the peer.
// If the first byte is not 0xff, we know that the
// If the first byte is not 0xff, we know that the
// peer is using unversioned protocol.
// peer is using unversioned protocol.
if
(
greeting
[
0
]
!=
0xff
)
if
(
greeting
_recv
[
0
]
!=
0xff
)
break
;
break
;
if
(
greeting_bytes_read
<
10
)
if
(
greeting_bytes_read
<
10
)
...
@@ -381,12 +381,12 @@ bool zmq::stream_engine_t::handshake ()
...
@@ -381,12 +381,12 @@ bool zmq::stream_engine_t::handshake ()
// with the 'flags' field if a regular message was sent).
// with the 'flags' field if a regular message was sent).
// Zero indicates this is a header of identity message
// Zero indicates this is a header of identity message
// (i.e. the peer is using the unversioned protocol).
// (i.e. the peer is using the unversioned protocol).
if
(
!
(
greeting
[
9
]
&
0x01
))
if
(
!
(
greeting
_recv
[
9
]
&
0x01
))
break
;
break
;
// The peer is using versioned protocol.
// The peer is using versioned protocol.
// Send the rest of the greeting, if necessary.
// Send the rest of the greeting, if necessary.
if
(
outpos
+
outsize
!=
greeting_
output_buffer
+
greeting_size
)
{
if
(
outpos
+
outsize
!=
greeting_
send
+
greeting_size
)
{
if
(
outsize
==
0
)
if
(
outsize
==
0
)
set_pollout
(
handle
);
set_pollout
(
handle
);
outpos
[
outsize
++
]
=
ZMTP_2_1
;
// Protocol revision
outpos
[
outsize
++
]
=
ZMTP_2_1
;
// Protocol revision
...
@@ -399,7 +399,7 @@ bool zmq::stream_engine_t::handshake ()
...
@@ -399,7 +399,7 @@ bool zmq::stream_engine_t::handshake ()
// Is the peer using ZMTP/1.0 with no revision number?
// Is the peer using ZMTP/1.0 with no revision number?
// If so, we send and receive rest of identity message
// If so, we send and receive rest of identity message
if
(
greeting
[
0
]
!=
0xff
||
!
(
greeting
[
9
]
&
0x01
))
{
if
(
greeting
_recv
[
0
]
!=
0xff
||
!
(
greeting_recv
[
9
]
&
0x01
))
{
encoder
=
new
(
std
::
nothrow
)
v1_encoder_t
(
out_batch_size
);
encoder
=
new
(
std
::
nothrow
)
v1_encoder_t
(
out_batch_size
);
alloc_assert
(
encoder
);
alloc_assert
(
encoder
);
encoder
->
set_msg_source
(
session
);
encoder
->
set_msg_source
(
session
);
...
@@ -419,7 +419,7 @@ bool zmq::stream_engine_t::handshake ()
...
@@ -419,7 +419,7 @@ bool zmq::stream_engine_t::handshake ()
zmq_assert
(
buffer_size
==
header_size
);
zmq_assert
(
buffer_size
==
header_size
);
// Make sure the decoder sees the data we have already received.
// Make sure the decoder sees the data we have already received.
inpos
=
greeting
;
inpos
=
greeting
_recv
;
insize
=
greeting_bytes_read
;
insize
=
greeting_bytes_read
;
// To allow for interoperability with peers that do not forward
// To allow for interoperability with peers that do not forward
...
@@ -431,7 +431,7 @@ bool zmq::stream_engine_t::handshake ()
...
@@ -431,7 +431,7 @@ bool zmq::stream_engine_t::handshake ()
decoder
->
set_msg_sink
(
this
);
decoder
->
set_msg_sink
(
this
);
}
}
else
else
if
(
greeting
[
revision_pos
]
==
ZMTP_1_0
)
{
if
(
greeting
_recv
[
revision_pos
]
==
ZMTP_1_0
)
{
encoder
=
new
(
std
::
nothrow
)
v1_encoder_t
(
encoder
=
new
(
std
::
nothrow
)
v1_encoder_t
(
out_batch_size
);
out_batch_size
);
alloc_assert
(
encoder
);
alloc_assert
(
encoder
);
...
@@ -443,8 +443,8 @@ bool zmq::stream_engine_t::handshake ()
...
@@ -443,8 +443,8 @@ bool zmq::stream_engine_t::handshake ()
decoder
->
set_msg_sink
(
session
);
decoder
->
set_msg_sink
(
session
);
}
}
else
else
if
(
greeting
[
revision_pos
]
==
ZMTP_2_0
if
(
greeting
_recv
[
revision_pos
]
==
ZMTP_2_0
||
greeting
[
revision_pos
]
==
ZMTP_2_1
)
{
||
greeting
_recv
[
revision_pos
]
==
ZMTP_2_1
)
{
encoder
=
new
(
std
::
nothrow
)
v2_encoder_t
(
encoder
=
new
(
std
::
nothrow
)
v2_encoder_t
(
out_batch_size
,
session
);
out_batch_size
,
session
);
alloc_assert
(
encoder
);
alloc_assert
(
encoder
);
...
...
src/stream_engine.hpp
View file @
12c7db8c
...
@@ -97,10 +97,6 @@ namespace zmq
...
@@ -97,10 +97,6 @@ namespace zmq
// Underlying socket.
// Underlying socket.
fd_t
s
;
fd_t
s
;
// Size of the greeting message:
// Preamble (10 bytes) + version (1 byte) + socket type (1 byte).
static
const
size_t
greeting_size
=
12
;
// True iff we are registered with an I/O poller.
// True iff we are registered with an I/O poller.
bool
io_enabled
;
bool
io_enabled
;
...
@@ -119,17 +115,16 @@ namespace zmq
...
@@ -119,17 +115,16 @@ namespace zmq
// version. When false, normal message flow has started.
// version. When false, normal message flow has started.
bool
handshaking
;
bool
handshaking
;
//
The receive buffer holding the greeting message
//
Size of the greeting message:
//
that we are receiving from the peer
.
//
Preamble (10 bytes) + version (1 byte) + socket type (1 byte)
.
unsigned
char
greeting
[
greeting_size
]
;
static
const
size_t
greeting_size
=
12
;
//
The number of bytes of the greeting message that
//
Greeting received from, and sent to peer
// we have already received.
unsigned
char
greeting_recv
[
greeting_size
];
unsigned
int
greeting_bytes_read
;
unsigned
char
greeting_send
[
greeting_size
]
;
// The send buffer holding the greeting message
// Size of greeting received so far
// that we are sending to the peer.
unsigned
int
greeting_bytes_read
;
unsigned
char
greeting_output_buffer
[
greeting_size
];
// The session this engine is attached to.
// The session this engine is attached to.
zmq
::
session_base_t
*
session
;
zmq
::
session_base_t
*
session
;
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment