Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
785ef41f
Commit
785ef41f
authored
Mar 12, 2013
by
Pieter Hintjens
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Refactored codecs to match ZMTP version numbers
parent
f0f16505
Hide whitespace changes
Inline
Side-by-side
Showing
16 changed files
with
386 additions
and
335 deletions
+386
-335
Makefile.am
src/Makefile.am
+6
-4
decoder.hpp
src/decoder.hpp
+0
-31
encoder.hpp
src/encoder.hpp
+0
-24
stream_engine.cpp
src/stream_engine.cpp
+27
-28
tcp_connecter.cpp
src/tcp_connecter.cpp
+1
-1
v1_decoder.cpp
src/v1_decoder.cpp
+73
-68
v1_decoder.hpp
src/v1_decoder.hpp
+7
-14
v1_encoder.cpp
src/v1_encoder.cpp
+30
-30
v1_encoder.hpp
src/v1_encoder.hpp
+6
-9
v2_decoder.cpp
src/v2_decoder.cpp
+75
-79
v2_decoder.hpp
src/v2_decoder.hpp
+60
-0
v2_encoder.cpp
src/v2_encoder.cpp
+36
-35
v2_encoder.hpp
src/v2_encoder.hpp
+56
-0
v2_protocol.hpp
src/v2_protocol.hpp
+4
-7
xsub.cpp
src/xsub.cpp
+4
-4
test_raw_sock.cpp
tests/test_raw_sock.cpp
+1
-1
No files found.
src/Makefile.am
View file @
785ef41f
...
...
@@ -88,10 +88,8 @@ libzmq_la_SOURCES = \
address.cpp
\
clock.cpp
\
ctx.cpp
\
decoder.cpp
\
devpoll.cpp
\
dist.cpp
\
encoder.cpp
\
epoll.cpp
\
err.cpp
\
fq.cpp
\
...
...
@@ -140,10 +138,14 @@ libzmq_la_SOURCES = \
router.cpp
\
dealer.cpp
\
v1_decoder.cpp
\
v1_decoder.hpp
\
v1_encoder.cpp
\
v1_decoder.hpp
\
v1_encoder.hpp
\
v1_protocol.hpp
\
v2_decoder.cpp
\
v2_decoder.hpp
\
v2_encoder.cpp
\
v2_encoder.hpp
\
v2_protocol.hpp
\
xsub.cpp
\
zmq.cpp
\
zmq_utils.cpp
\
...
...
src/decoder.hpp
View file @
785ef41f
...
...
@@ -32,7 +32,6 @@
namespace
zmq
{
class
i_msg_sink
;
// Helper base class for decoders that know the amount of data to read
...
...
@@ -210,36 +209,6 @@ namespace zmq
decoder_base_t
(
const
decoder_base_t
&
);
const
decoder_base_t
&
operator
=
(
const
decoder_base_t
&
);
};
// Decoder for 0MQ framing protocol. Converts data batches into messages.
class
decoder_t
:
public
decoder_base_t
<
decoder_t
>
{
public
:
decoder_t
(
size_t
bufsize_
,
int64_t
maxmsgsize_
);
~
decoder_t
();
// Set the receiver of decoded messages.
void
set_msg_sink
(
i_msg_sink
*
msg_sink_
);
private
:
bool
one_byte_size_ready
();
bool
eight_byte_size_ready
();
bool
flags_ready
();
bool
message_ready
();
i_msg_sink
*
msg_sink
;
unsigned
char
tmpbuf
[
8
];
msg_t
in_progress
;
int64_t
maxmsgsize
;
decoder_t
(
const
decoder_t
&
);
void
operator
=
(
const
decoder_t
&
);
};
}
#endif
...
...
src/encoder.hpp
View file @
785ef41f
...
...
@@ -168,30 +168,6 @@ namespace zmq
encoder_base_t
(
const
encoder_base_t
&
);
void
operator
=
(
const
encoder_base_t
&
);
};
// Encoder for 0MQ framing protocol. Converts messages into data batches.
class
encoder_t
:
public
encoder_base_t
<
encoder_t
>
{
public
:
encoder_t
(
size_t
bufsize_
);
~
encoder_t
();
void
set_msg_source
(
i_msg_source
*
msg_source_
);
private
:
bool
size_ready
();
bool
message_ready
();
i_msg_source
*
msg_source
;
msg_t
in_progress
;
unsigned
char
tmpbuf
[
10
];
encoder_t
(
const
encoder_t
&
);
const
encoder_t
&
operator
=
(
const
encoder_t
&
);
};
}
#endif
...
...
src/stream_engine.cpp
View file @
785ef41f
...
...
@@ -36,10 +36,10 @@
#include "stream_engine.hpp"
#include "io_thread.hpp"
#include "session_base.hpp"
#include "encoder.hpp"
#include "decoder.hpp"
#include "v1_encoder.hpp"
#include "v1_decoder.hpp"
#include "v2_encoder.hpp"
#include "v2_decoder.hpp"
#include "raw_decoder.hpp"
#include "raw_encoder.hpp"
#include "config.hpp"
...
...
@@ -103,13 +103,13 @@ zmq::stream_engine_t::~stream_engine_t ()
if
(
s
!=
retired_fd
)
{
#ifdef ZMQ_HAVE_WINDOWS
int
rc
=
closesocket
(
s
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
int
rc
=
closesocket
(
s
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
#else
int
rc
=
close
(
s
);
int
rc
=
close
(
s
);
errno_assert
(
rc
==
0
);
#endif
s
=
retired_fd
;
s
=
retired_fd
;
}
if
(
encoder
!=
NULL
)
...
...
@@ -230,14 +230,12 @@ void zmq::stream_engine_t::in_event ()
else
processed
=
decoder
->
process_buffer
(
inpos
,
insize
);
}
else
{
else
// Push the data to the decoder.
processed
=
decoder
->
process_buffer
(
inpos
,
insize
);
}
if
(
unlikely
(
processed
==
(
size_t
)
-
1
))
{
if
(
unlikely
(
processed
==
(
size_t
)
-
1
))
disconnection
=
true
;
}
else
{
// Stop polling for input if we got stuck.
...
...
@@ -292,14 +290,14 @@ void zmq::stream_engine_t::out_event ()
// If there are any data to write in write buffer, write as much as
// possible to the socket. Note that amount of data to write can be
// arbitra
t
ily large. However, we assume that underlying TCP layer has
// arbitra
r
ily large. However, we assume that underlying TCP layer has
// limited transmission buffer and thus the actual number of bytes
// written should be reasonably modest.
int
nbytes
=
write
(
outpos
,
outsize
);
// IO error has occurred. We stop waiting for output events.
// The engine is not terminated until we detect input error;
// this is necessary to prevent losing incom
m
ing messages.
// this is necessary to prevent losing incoming messages.
if
(
nbytes
==
-
1
)
{
reset_pollout
(
handle
);
if
(
unlikely
(
terminating
))
...
...
@@ -392,22 +390,22 @@ bool zmq::stream_engine_t::handshake ()
if
(
outpos
+
outsize
!=
greeting_output_buffer
+
greeting_size
)
{
if
(
outsize
==
0
)
set_pollout
(
handle
);
outpos
[
outsize
++
]
=
1
;
// Protocol
ver
sion
outpos
[
outsize
++
]
=
1
;
// Protocol
revi
sion
outpos
[
outsize
++
]
=
options
.
type
;
// Socket type
}
}
// Position of the
ver
sion field in the greeting.
const
size_t
ver
sion_pos
=
10
;
// Position of the
revi
sion field in the greeting.
const
size_t
revi
sion_pos
=
10
;
// Is the peer using ZMTP/1.0 with no
ver
sion number?
// If so, we send and receive rest
s of identity messages
// Is the peer using ZMTP/1.0 with no
revi
sion number?
// If so, we send and receive rest
of identity message
if
(
greeting
[
0
]
!=
0xff
||
!
(
greeting
[
9
]
&
0x01
))
{
encoder
=
new
(
std
::
nothrow
)
encoder_t
(
out_batch_size
);
encoder
=
new
(
std
::
nothrow
)
v1_
encoder_t
(
out_batch_size
);
alloc_assert
(
encoder
);
encoder
->
set_msg_source
(
session
);
decoder
=
new
(
std
::
nothrow
)
decoder_t
(
in_batch_size
,
options
.
maxmsgsize
);
decoder
=
new
(
std
::
nothrow
)
v1_
decoder_t
(
in_batch_size
,
options
.
maxmsgsize
);
alloc_assert
(
decoder
);
decoder
->
set_msg_sink
(
session
);
...
...
@@ -434,23 +432,25 @@ bool zmq::stream_engine_t::handshake ()
decoder
->
set_msg_sink
(
this
);
}
else
if
(
greeting
[
ver
sion_pos
]
==
0
)
{
// ZMTP/1.0 framing
.
encoder
=
new
(
std
::
nothrow
)
encoder_t
(
out_batch_size
);
if
(
greeting
[
revi
sion_pos
]
==
0
)
{
// ZMTP/1.0 framing
(revision 0)
encoder
=
new
(
std
::
nothrow
)
v1_
encoder_t
(
out_batch_size
);
alloc_assert
(
encoder
);
encoder
->
set_msg_source
(
session
);
decoder
=
new
(
std
::
nothrow
)
decoder_t
(
in_batch_size
,
options
.
maxmsgsize
);
decoder
=
new
(
std
::
nothrow
)
v1_
decoder_t
(
in_batch_size
,
options
.
maxmsgsize
);
alloc_assert
(
decoder
);
decoder
->
set_msg_sink
(
session
);
}
else
{
// v1 framing protocol.
encoder
=
new
(
std
::
nothrow
)
v1_encoder_t
(
out_batch_size
,
session
);
else
if
(
greeting
[
revision_pos
]
==
1
||
greeting
[
revision_pos
]
==
2
)
{
// ZMTP/2.0 framing (revision 1)
encoder
=
new
(
std
::
nothrow
)
v2_encoder_t
(
out_batch_size
,
session
);
alloc_assert
(
encoder
);
decoder
=
new
(
std
::
nothrow
)
v
1
_decoder_t
(
in_batch_size
,
options
.
maxmsgsize
,
session
);
v
2
_decoder_t
(
in_batch_size
,
options
.
maxmsgsize
,
session
);
alloc_assert
(
decoder
);
}
...
...
@@ -613,4 +613,3 @@ int zmq::stream_engine_t::read (void *data_, size_t size_)
#endif
}
src/tcp_connecter.cpp
View file @
785ef41f
...
...
@@ -102,7 +102,7 @@ void zmq::tcp_connecter_t::process_term (int linger_)
void
zmq
::
tcp_connecter_t
::
in_event
()
{
// We are not polling for incom
m
ing data, so we are actually called
// We are not polling for incoming data, so we are actually called
// because of error here. However, we can get error on out event as well
// on some platforms, so we'll simply handle both events in the same way.
out_event
();
...
...
src/v1_decoder.cpp
View file @
785ef41f
...
...
@@ -19,30 +19,30 @@
#include <stdlib.h>
#include <string.h>
#include <limits>
#include "platform.hpp"
#if
def
ZMQ_HAVE_WINDOWS
#if
defined
ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#endif
#include "
v1_protocol
.hpp"
#include "
decoder
.hpp"
#include "v1_decoder.hpp"
#include "i_msg_sink.hpp"
#include "likely.hpp"
#include "wire.hpp"
#include "err.hpp"
zmq
::
v1_decoder_t
::
v1_decoder_t
(
size_t
bufsize_
,
int64_t
maxmsgsize_
,
i_msg_sink
*
msg_sink_
)
:
zmq
::
v1_decoder_t
::
v1_decoder_t
(
size_t
bufsize_
,
int64_t
maxmsgsize_
)
:
decoder_base_t
<
v1_decoder_t
>
(
bufsize_
),
msg_sink
(
msg_sink_
),
msg_flags
(
0
),
msg_sink
(
NULL
),
maxmsgsize
(
maxmsgsize_
)
{
int
rc
=
in_progress
.
init
();
errno_assert
(
rc
==
0
);
// At the beginning, read one byte and go to
flags
_ready state.
next_step
(
tmpbuf
,
1
,
&
v1_decoder_t
::
flags
_ready
);
// At the beginning, read one byte and go to
one_byte_size
_ready state.
next_step
(
tmpbuf
,
1
,
&
v1_decoder_t
::
one_byte_size
_ready
);
}
zmq
::
v1_decoder_t
::~
v1_decoder_t
()
...
...
@@ -56,90 +56,95 @@ void zmq::v1_decoder_t::set_msg_sink (i_msg_sink *msg_sink_)
msg_sink
=
msg_sink_
;
}
bool
zmq
::
v1_decoder_t
::
flags_ready
()
{
msg_flags
=
0
;
if
(
tmpbuf
[
0
]
&
v1_protocol_t
::
more_flag
)
msg_flags
|=
msg_t
::
more
;
// The payload length is either one or eight bytes,
// depending on whether the 'large' bit is set.
if
(
tmpbuf
[
0
]
&
v1_protocol_t
::
large_flag
)
next_step
(
tmpbuf
,
8
,
&
v1_decoder_t
::
eight_byte_size_ready
);
else
next_step
(
tmpbuf
,
1
,
&
v1_decoder_t
::
one_byte_size_ready
);
return
true
;
}
bool
zmq
::
v1_decoder_t
::
one_byte_size_ready
()
{
int
rc
=
0
;
//
Message size must not exceed the maximum allowed size
.
if
(
maxmsgsize
>=
0
)
if
(
unlikely
(
tmpbuf
[
0
]
>
static_cast
<
uint64_t
>
(
maxmsgsize
)))
goto
error
;
// First byte of size is read. If it is 0xff read 8-byte size.
// Otherwise allocate the buffer for message data and read the
//
message data into it
.
if
(
*
tmpbuf
==
0xff
)
next_step
(
tmpbuf
,
8
,
&
v1_decoder_t
::
eight_byte_size_ready
);
else
{
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
rc
=
in_progress
.
init_size
(
tmpbuf
[
0
]);
if
(
unlikely
(
rc
))
{
errno_assert
(
errno
==
ENOMEM
);
int
rc
=
in_progress
.
init
();
// There has to be at least one byte (the flags) in the message).
if
(
!*
tmpbuf
)
{
decoding_error
();
return
false
;
}
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
int
rc
;
if
(
maxmsgsize
>=
0
&&
(
int64_t
)
(
*
tmpbuf
-
1
)
>
maxmsgsize
)
{
rc
=
-
1
;
errno
=
ENOMEM
;
}
else
rc
=
in_progress
.
init_size
(
*
tmpbuf
-
1
);
if
(
rc
!=
0
&&
errno
==
ENOMEM
)
{
rc
=
in_progress
.
init
();
errno_assert
(
rc
==
0
);
decoding_error
();
return
false
;
}
errno_assert
(
rc
==
0
);
goto
error
;
}
in_progress
.
set_flags
(
msg_flags
);
next_step
(
in_progress
.
data
(),
in_progress
.
size
(),
&
v1_decoder_t
::
message_ready
);
next_step
(
tmpbuf
,
1
,
&
v1_decoder_t
::
flags_ready
);
}
return
true
;
error
:
decoding_error
();
return
false
;
}
bool
zmq
::
v1_decoder_t
::
eight_byte_size_ready
()
{
int
rc
=
0
;
// 8-byte payload length is read. Allocate the buffer
// for message body and read the message data into it.
const
uint64_t
payload_length
=
get_uint64
(
tmpbuf
);
// The payload size is encoded as 64-bit unsigned integer.
// The most significant byte comes first.
const
uint64_t
msg_size
=
get_uint64
(
tmpbuf
);
// There has to be at least one byte (the flags) in the message).
if
(
payload_length
==
0
)
{
decoding_error
();
return
false
;
}
// Message size must not exceed the maximum allowed size.
if
(
maxmsgsize
>=
0
)
if
(
unlikely
(
msg_size
>
static_cast
<
uint64_t
>
(
maxmsgsize
)))
goto
error
;
if
(
maxmsgsize
>=
0
&&
payload_length
-
1
>
(
uint64_t
)
maxmsgsize
)
{
decoding_error
();
return
false
;
}
// Message size must fit into size_t data type.
if
(
unlikely
(
msg_size
!=
static_cast
<
size_t
>
(
msg_size
)))
goto
error
;
// Message size must fit within range of size_t data type.
if
(
payload_length
-
1
>
std
::
numeric_limits
<
size_t
>::
max
())
{
decoding_error
();
return
false
;
}
const
size_t
msg_size
=
static_cast
<
size_t
>
(
payload_length
-
1
);
// in_progress is initialised at this point so in theory we should
// close it before calling init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised.
rc
=
in_progress
.
init_size
(
static_cast
<
size_t
>
(
msg_size
)
);
if
(
unlikely
(
rc
)
)
{
// message and thus we can treat it as uninitialised.
..
int
rc
=
in_progress
.
init_size
(
msg_size
);
if
(
rc
!=
0
)
{
errno_assert
(
errno
==
ENOMEM
);
int
rc
=
in_progress
.
init
();
rc
=
in_progress
.
init
();
errno_assert
(
rc
==
0
);
goto
error
;
decoding_error
();
return
false
;
}
in_progress
.
set_flags
(
msg_flags
);
next_step
(
tmpbuf
,
1
,
&
v1_decoder_t
::
flags_ready
);
return
true
;
}
bool
zmq
::
v1_decoder_t
::
flags_ready
()
{
// Store the flags from the wire into the message structure.
in_progress
.
set_flags
(
tmpbuf
[
0
]
&
msg_t
::
more
);
next_step
(
in_progress
.
data
(),
in_progress
.
size
(),
&
v1_decoder_t
::
message_ready
);
return
true
;
error
:
decoding_error
();
return
false
;
}
bool
zmq
::
v1_decoder_t
::
message_ready
()
...
...
@@ -155,6 +160,6 @@ bool zmq::v1_decoder_t::message_ready ()
return
false
;
}
next_step
(
tmpbuf
,
1
,
&
v1_decoder_t
::
flags
_ready
);
next_step
(
tmpbuf
,
1
,
&
v1_decoder_t
::
one_byte_size
_ready
);
return
true
;
}
src/v1_decoder.hpp
View file @
785ef41f
...
...
@@ -20,41 +20,34 @@
#ifndef __ZMQ_V1_DECODER_HPP_INCLUDED__
#define __ZMQ_V1_DECODER_HPP_INCLUDED__
#include "err.hpp"
#include "msg.hpp"
#include "decoder.hpp"
#include "i_msg_sink.hpp"
#include "stdint.hpp"
namespace
zmq
{
// Decoder for 0MQ v1 framing protocol. Converts data stream into messages.
// Decoder for ZMTP/1.0 protocol. Converts data batches into messages.
class
v1_decoder_t
:
public
decoder_base_t
<
v1_decoder_t
>
{
public
:
v1_decoder_t
(
size_t
bufsize_
,
int64_t
maxmsgsize_
,
i_msg_sink
*
msg_sink_
);
virtual
~
v1_decoder_t
();
v1_decoder_t
(
size_t
bufsize_
,
int64_t
maxmsgsize_
);
~
v1_decoder_t
();
//
i_decoder interface
.
v
irtual
v
oid
set_msg_sink
(
i_msg_sink
*
msg_sink_
);
//
Set the receiver of decoded messages
.
void
set_msg_sink
(
i_msg_sink
*
msg_sink_
);
private
:
bool
flags_ready
();
bool
one_byte_size_ready
();
bool
eight_byte_size_ready
();
bool
flags_ready
();
bool
message_ready
();
i_msg_sink
*
msg_sink
;
unsigned
char
tmpbuf
[
8
];
unsigned
char
msg_flags
;
msg_t
in_progress
;
const
int64_t
maxmsgsize
;
int64_t
maxmsgsize
;
v1_decoder_t
(
const
v1_decoder_t
&
);
void
operator
=
(
const
v1_decoder_t
&
);
...
...
src/v1_encoder.cpp
View file @
785ef41f
...
...
@@ -17,14 +17,15 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
v1_protocol
.hpp"
#include "
encoder
.hpp"
#include "v1_encoder.hpp"
#include "i_msg_source.hpp"
#include "likely.hpp"
#include "wire.hpp"
zmq
::
v1_encoder_t
::
v1_encoder_t
(
size_t
bufsize_
,
i_msg_source
*
msg_source_
)
:
zmq
::
v1_encoder_t
::
v1_encoder_t
(
size_t
bufsize_
)
:
encoder_base_t
<
v1_encoder_t
>
(
bufsize_
),
msg_source
(
msg_source_
)
msg_source
(
NULL
)
{
int
rc
=
in_progress
.
init
();
errno_assert
(
rc
==
0
);
...
...
@@ -44,9 +45,17 @@ void zmq::v1_encoder_t::set_msg_source (i_msg_source *msg_source_)
msg_source
=
msg_source_
;
}
bool
zmq
::
v1_encoder_t
::
size_ready
()
{
// Write message body into the buffer.
next_step
(
in_progress
.
data
(),
in_progress
.
size
(),
&
v1_encoder_t
::
message_ready
,
!
(
in_progress
.
flags
()
&
msg_t
::
more
));
return
true
;
}
bool
zmq
::
v1_encoder_t
::
message_ready
()
{
//
Release the
content of the old message.
//
Destroy
content of the old message.
int
rc
=
in_progress
.
close
();
errno_assert
(
rc
==
0
);
...
...
@@ -59,42 +68,33 @@ bool zmq::v1_encoder_t::message_ready ()
errno_assert
(
rc
==
0
);
return
false
;
}
rc
=
msg_source
->
pull_msg
(
&
in_progress
);
if
(
unlikely
(
rc
))
{
if
(
unlikely
(
rc
!=
0
))
{
errno_assert
(
errno
==
EAGAIN
);
rc
=
in_progress
.
init
();
errno_assert
(
rc
==
0
);
return
false
;
}
// Encode flags.
unsigned
char
&
protocol_flags
=
tmpbuf
[
0
];
protocol_flags
=
0
;
if
(
in_progress
.
flags
()
&
msg_t
::
more
)
protocol_flags
|=
v1_protocol_t
::
more_flag
;
if
(
in_progress
.
size
()
>
255
)
protocol_flags
|=
v1_protocol_t
::
large_flag
;
// Get the message size.
size_t
size
=
in_progress
.
size
();
// Encode the message length. For messages less then 256 bytes,
// the length is encoded as 8-bit unsigned integer. For larger
// messages, 64-bit unsigned integer in network byte order is used.
const
size_t
size
=
in_progress
.
size
();
if
(
unlikely
(
size
>
255
))
{
put_uint64
(
tmpbuf
+
1
,
size
);
next_step
(
tmpbuf
,
9
,
&
v1_encoder_t
::
size_ready
,
false
);
// Account for the 'flags' byte.
size
++
;
// For messages less than 255 bytes long, write one byte of message size.
// For longer messages write 0xff escape character followed by 8-byte
// message size. In both cases 'flags' field follows.
if
(
size
<
255
)
{
tmpbuf
[
0
]
=
(
unsigned
char
)
size
;
tmpbuf
[
1
]
=
(
in_progress
.
flags
()
&
msg_t
::
more
);
next_step
(
tmpbuf
,
2
,
&
v1_encoder_t
::
size_ready
,
false
);
}
else
{
tmpbuf
[
1
]
=
static_cast
<
uint8_t
>
(
size
);
next_step
(
tmpbuf
,
2
,
&
v1_encoder_t
::
size_ready
,
false
);
tmpbuf
[
0
]
=
0xff
;
put_uint64
(
tmpbuf
+
1
,
size
);
tmpbuf
[
9
]
=
(
in_progress
.
flags
()
&
msg_t
::
more
);
next_step
(
tmpbuf
,
10
,
&
v1_encoder_t
::
size_ready
,
false
);
}
return
true
;
}
bool
zmq
::
v1_encoder_t
::
size_ready
()
{
// Write message body into the buffer.
next_step
(
in_progress
.
data
(),
in_progress
.
size
(),
&
v1_encoder_t
::
message_ready
,
!
(
in_progress
.
flags
()
&
msg_t
::
more
));
return
true
;
}
src/v1_encoder.hpp
View file @
785ef41f
...
...
@@ -20,25 +20,22 @@
#ifndef __ZMQ_V1_ENCODER_HPP_INCLUDED__
#define __ZMQ_V1_ENCODER_HPP_INCLUDED__
#include "msg.hpp"
#include "i_msg_source.hpp"
#include "encoder.hpp"
namespace
zmq
{
class
i_msg_source
;
// Encoder for
0MQ framing protocol. Converts messages into data stream
.
// Encoder for
ZMTP/1.0 protocol. Converts messages into data batches
.
class
v1_encoder_t
:
public
encoder_base_t
<
v1_encoder_t
>
{
public
:
v1_encoder_t
(
size_t
bufsize_
,
i_msg_source
*
msg_source_
);
virtual
~
v1_encoder_t
();
v1_encoder_t
(
size_t
bufsize_
);
~
v1_encoder_t
();
v
irtual
v
oid
set_msg_source
(
i_msg_source
*
msg_source_
);
void
set_msg_source
(
i_msg_source
*
msg_source_
);
private
:
...
...
@@ -47,7 +44,7 @@ namespace zmq
i_msg_source
*
msg_source
;
msg_t
in_progress
;
unsigned
char
tmpbuf
[
9
];
unsigned
char
tmpbuf
[
10
];
v1_encoder_t
(
const
v1_encoder_t
&
);
const
v1_encoder_t
&
operator
=
(
const
v1_encoder_t
&
);
...
...
src/decoder.cpp
→
src/
v2_
decoder.cpp
View file @
785ef41f
...
...
@@ -19,134 +19,130 @@
#include <stdlib.h>
#include <string.h>
#include <limits>
#include "platform.hpp"
#if
defined
ZMQ_HAVE_WINDOWS
#if
def
ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#endif
#include "
decoder
.hpp"
#include "
i_msg_sink
.hpp"
#include "
v2_protocol
.hpp"
#include "
v2_decoder
.hpp"
#include "likely.hpp"
#include "wire.hpp"
#include "err.hpp"
zmq
::
decoder_t
::
decoder_t
(
size_t
bufsize_
,
int64_t
maxmsgsize_
)
:
decoder_base_t
<
decoder_t
>
(
bufsize_
),
msg_sink
(
NULL
),
zmq
::
v2_decoder_t
::
v2_decoder_t
(
size_t
bufsize_
,
int64_t
maxmsgsize_
,
i_msg_sink
*
msg_sink_
)
:
decoder_base_t
<
v2_decoder_t
>
(
bufsize_
),
msg_sink
(
msg_sink_
),
msg_flags
(
0
),
maxmsgsize
(
maxmsgsize_
)
{
int
rc
=
in_progress
.
init
();
errno_assert
(
rc
==
0
);
// At the beginning, read one byte and go to
one_byte_size
_ready state.
next_step
(
tmpbuf
,
1
,
&
decoder_t
::
one_byte_size
_ready
);
// At the beginning, read one byte and go to
flags
_ready state.
next_step
(
tmpbuf
,
1
,
&
v2_decoder_t
::
flags
_ready
);
}
zmq
::
decoder_t
::~
decoder_t
()
zmq
::
v2_decoder_t
::~
v2_
decoder_t
()
{
int
rc
=
in_progress
.
close
();
errno_assert
(
rc
==
0
);
}
void
zmq
::
decoder_t
::
set_msg_sink
(
i_msg_sink
*
msg_sink_
)
void
zmq
::
v2_
decoder_t
::
set_msg_sink
(
i_msg_sink
*
msg_sink_
)
{
msg_sink
=
msg_sink_
;
}
bool
zmq
::
decoder_t
::
one_byte_size
_ready
()
bool
zmq
::
v2_decoder_t
::
flags
_ready
()
{
// First byte of size is read. If it is 0xff read 8-byte size.
// Otherwise allocate the buffer for message data and read the
// message data into it.
if
(
*
tmpbuf
==
0xff
)
next_step
(
tmpbuf
,
8
,
&
decoder_t
::
eight_byte_size_ready
);
else
{
// There has to be at least one byte (the flags) in the message).
if
(
!*
tmpbuf
)
{
decoding_error
();
return
false
;
}
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
int
rc
;
if
(
maxmsgsize
>=
0
&&
(
int64_t
)
(
*
tmpbuf
-
1
)
>
maxmsgsize
)
{
rc
=
-
1
;
errno
=
ENOMEM
;
}
else
rc
=
in_progress
.
init_size
(
*
tmpbuf
-
1
);
if
(
rc
!=
0
&&
errno
==
ENOMEM
)
{
rc
=
in_progress
.
init
();
errno_assert
(
rc
==
0
);
decoding_error
();
return
false
;
}
errno_assert
(
rc
==
0
);
msg_flags
=
0
;
if
(
tmpbuf
[
0
]
&
v2_protocol_t
::
more_flag
)
msg_flags
|=
msg_t
::
more
;
// The payload length is either one or eight bytes,
// depending on whether the 'large' bit is set.
if
(
tmpbuf
[
0
]
&
v2_protocol_t
::
large_flag
)
next_step
(
tmpbuf
,
8
,
&
v2_decoder_t
::
eight_byte_size_ready
);
else
next_step
(
tmpbuf
,
1
,
&
v2_decoder_t
::
one_byte_size_ready
);
next_step
(
tmpbuf
,
1
,
&
decoder_t
::
flags_ready
);
}
return
true
;
}
bool
zmq
::
decoder_t
::
eight
_byte_size_ready
()
bool
zmq
::
v2_decoder_t
::
one
_byte_size_ready
()
{
// 8-byte payload length is read. Allocate the buffer
// for message body and read the message data into it.
const
uint64_t
payload_length
=
get_uint64
(
tmpbuf
);
// There has to be at least one byte (the flags) in the message).
if
(
payload_length
==
0
)
{
decoding_error
();
return
false
;
}
int
rc
=
0
;
// Message size must not exceed the maximum allowed size.
if
(
maxmsgsize
>=
0
&&
payload_length
-
1
>
(
uint64_t
)
maxmsgsize
)
{
decoding_error
();
return
false
;
}
// Message size must fit within range of size_t data type.
if
(
payload_length
-
1
>
std
::
numeric_limits
<
size_t
>::
max
())
{
decoding_error
();
return
false
;
}
const
size_t
msg_size
=
static_cast
<
size_t
>
(
payload_length
-
1
);
if
(
maxmsgsize
>=
0
)
if
(
unlikely
(
tmpbuf
[
0
]
>
static_cast
<
uint64_t
>
(
maxmsgsize
)))
goto
error
;
// in_progress is initialised at this point so in theory we should
// close it before calling init_size, however, it's a 0-byte
// close it before calling
zmq_msg_
init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
int
rc
=
in_progress
.
init_size
(
msg_size
);
if
(
rc
!=
0
)
{
rc
=
in_progress
.
init_size
(
tmpbuf
[
0
]
);
if
(
unlikely
(
rc
)
)
{
errno_assert
(
errno
==
ENOMEM
);
rc
=
in_progress
.
init
();
int
rc
=
in_progress
.
init
();
errno_assert
(
rc
==
0
);
decoding_error
();
return
false
;
goto
error
;
}
next_step
(
tmpbuf
,
1
,
&
decoder_t
::
flags_ready
);
in_progress
.
set_flags
(
msg_flags
);
next_step
(
in_progress
.
data
(),
in_progress
.
size
(),
&
v2_decoder_t
::
message_ready
);
return
true
;
error
:
decoding_error
();
return
false
;
}
bool
zmq
::
decoder_t
::
flags
_ready
()
bool
zmq
::
v2_decoder_t
::
eight_byte_size
_ready
()
{
// Store the flags from the wire into the message structure.
in_progress
.
set_flags
(
tmpbuf
[
0
]
&
msg_t
::
more
);
int
rc
=
0
;
// The payload size is encoded as 64-bit unsigned integer.
// The most significant byte comes first.
const
uint64_t
msg_size
=
get_uint64
(
tmpbuf
);
// Message size must not exceed the maximum allowed size.
if
(
maxmsgsize
>=
0
)
if
(
unlikely
(
msg_size
>
static_cast
<
uint64_t
>
(
maxmsgsize
)))
goto
error
;
// Message size must fit into size_t data type.
if
(
unlikely
(
msg_size
!=
static_cast
<
size_t
>
(
msg_size
)))
goto
error
;
// in_progress is initialised at this point so in theory we should
// close it before calling init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised.
rc
=
in_progress
.
init_size
(
static_cast
<
size_t
>
(
msg_size
));
if
(
unlikely
(
rc
))
{
errno_assert
(
errno
==
ENOMEM
);
int
rc
=
in_progress
.
init
();
errno_assert
(
rc
==
0
);
goto
error
;
}
in_progress
.
set_flags
(
msg_flags
);
next_step
(
in_progress
.
data
(),
in_progress
.
size
(),
&
decoder_t
::
message_ready
);
&
v2_
decoder_t
::
message_ready
);
return
true
;
error
:
decoding_error
();
return
false
;
}
bool
zmq
::
decoder_t
::
message_ready
()
bool
zmq
::
v2_
decoder_t
::
message_ready
()
{
// Message is completely read. Push it further and start reading
// new message. (in_progress is a 0-byte message after this point.)
...
...
@@ -159,6 +155,6 @@ bool zmq::decoder_t::message_ready ()
return
false
;
}
next_step
(
tmpbuf
,
1
,
&
decoder_t
::
one_byte_size
_ready
);
next_step
(
tmpbuf
,
1
,
&
v2_decoder_t
::
flags
_ready
);
return
true
;
}
src/v2_decoder.hpp
0 → 100644
View file @
785ef41f
/*
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_V2_DECODER_HPP_INCLUDED__
#define __ZMQ_V2_DECODER_HPP_INCLUDED__
#include "decoder.hpp"
#include "i_msg_sink.hpp"
namespace
zmq
{
// Decoder for ZMTP/2.x framing protocol. Converts data stream into messages.
class
v2_decoder_t
:
public
decoder_base_t
<
v2_decoder_t
>
{
public
:
v2_decoder_t
(
size_t
bufsize_
,
int64_t
maxmsgsize_
,
i_msg_sink
*
msg_sink_
);
virtual
~
v2_decoder_t
();
// i_decoder interface.
virtual
void
set_msg_sink
(
i_msg_sink
*
msg_sink_
);
private
:
bool
flags_ready
();
bool
one_byte_size_ready
();
bool
eight_byte_size_ready
();
bool
message_ready
();
i_msg_sink
*
msg_sink
;
unsigned
char
tmpbuf
[
8
];
unsigned
char
msg_flags
;
msg_t
in_progress
;
const
int64_t
maxmsgsize
;
v2_decoder_t
(
const
v2_decoder_t
&
);
void
operator
=
(
const
v2_decoder_t
&
);
};
}
#endif
src/encoder.cpp
→
src/
v2_
encoder.cpp
View file @
785ef41f
...
...
@@ -17,44 +17,36 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
encoder
.hpp"
#include "
i_msg_source
.hpp"
#include "
v2_protocol
.hpp"
#include "
v2_encoder
.hpp"
#include "likely.hpp"
#include "wire.hpp"
zmq
::
encoder_t
::
encoder_t
(
size_t
bufsiz
e_
)
:
encoder_base_t
<
encoder_t
>
(
bufsize_
),
msg_source
(
NULL
)
zmq
::
v2_encoder_t
::
v2_encoder_t
(
size_t
bufsize_
,
i_msg_source
*
msg_sourc
e_
)
:
encoder_base_t
<
v2_
encoder_t
>
(
bufsize_
),
msg_source
(
msg_source_
)
{
int
rc
=
in_progress
.
init
();
errno_assert
(
rc
==
0
);
// Write 0 bytes to the batch and go to message_ready state.
next_step
(
NULL
,
0
,
&
encoder_t
::
message_ready
,
true
);
next_step
(
NULL
,
0
,
&
v2_
encoder_t
::
message_ready
,
true
);
}
zmq
::
encoder_t
::~
encoder_t
()
zmq
::
v2_encoder_t
::~
v2_
encoder_t
()
{
int
rc
=
in_progress
.
close
();
errno_assert
(
rc
==
0
);
}
void
zmq
::
encoder_t
::
set_msg_source
(
i_msg_source
*
msg_source_
)
void
zmq
::
v2_
encoder_t
::
set_msg_source
(
i_msg_source
*
msg_source_
)
{
msg_source
=
msg_source_
;
}
bool
zmq
::
encoder_t
::
siz
e_ready
()
bool
zmq
::
v2_encoder_t
::
messag
e_ready
()
{
// Write message body into the buffer.
next_step
(
in_progress
.
data
(),
in_progress
.
size
(),
&
encoder_t
::
message_ready
,
!
(
in_progress
.
flags
()
&
msg_t
::
more
));
return
true
;
}
bool
zmq
::
encoder_t
::
message_ready
()
{
// Destroy content of the old message.
// Release the content of the old message.
int
rc
=
in_progress
.
close
();
errno_assert
(
rc
==
0
);
...
...
@@ -67,33 +59,42 @@ bool zmq::encoder_t::message_ready ()
errno_assert
(
rc
==
0
);
return
false
;
}
rc
=
msg_source
->
pull_msg
(
&
in_progress
);
if
(
unlikely
(
rc
!=
0
))
{
if
(
unlikely
(
rc
))
{
errno_assert
(
errno
==
EAGAIN
);
rc
=
in_progress
.
init
();
errno_assert
(
rc
==
0
);
return
false
;
}
// Get the message size.
size_t
size
=
in_progress
.
size
();
// Account for the 'flags' byte.
size
++
;
// Encode flags.
unsigned
char
&
protocol_flags
=
tmpbuf
[
0
];
protocol_flags
=
0
;
if
(
in_progress
.
flags
()
&
msg_t
::
more
)
protocol_flags
|=
v2_protocol_t
::
more_flag
;
if
(
in_progress
.
size
()
>
255
)
protocol_flags
|=
v2_protocol_t
::
large_flag
;
//
For messages less than 255 bytes long, write one byte of message size.
//
For longer messages write 0xff escape character followed by 8-byte
// message
size. In both cases 'flags' field follows
.
if
(
size
<
255
)
{
tmpbuf
[
0
]
=
(
unsigned
char
)
size
;
tmpbuf
[
1
]
=
(
in_progress
.
flags
()
&
msg_t
::
mor
e
);
next_step
(
tmpbuf
,
2
,
&
encoder_t
::
size_ready
,
false
);
//
Encode the message length. For messages less then 256 bytes,
//
the length is encoded as 8-bit unsigned integer. For larger
// message
s, 64-bit unsigned integer in network byte order is used
.
const
size_t
size
=
in_progress
.
size
();
if
(
unlikely
(
size
>
255
))
{
put_uint64
(
tmpbuf
+
1
,
siz
e
);
next_step
(
tmpbuf
,
9
,
&
v2_
encoder_t
::
size_ready
,
false
);
}
else
{
tmpbuf
[
0
]
=
0xff
;
put_uint64
(
tmpbuf
+
1
,
size
);
tmpbuf
[
9
]
=
(
in_progress
.
flags
()
&
msg_t
::
more
);
next_step
(
tmpbuf
,
10
,
&
encoder_t
::
size_ready
,
false
);
tmpbuf
[
1
]
=
static_cast
<
uint8_t
>
(
size
);
next_step
(
tmpbuf
,
2
,
&
v2_encoder_t
::
size_ready
,
false
);
}
return
true
;
}
bool
zmq
::
v2_encoder_t
::
size_ready
()
{
// Write message body into the buffer.
next_step
(
in_progress
.
data
(),
in_progress
.
size
(),
&
v2_encoder_t
::
message_ready
,
!
(
in_progress
.
flags
()
&
msg_t
::
more
));
return
true
;
}
src/v2_encoder.hpp
0 → 100644
View file @
785ef41f
/*
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_V2_ENCODER_HPP_INCLUDED__
#define __ZMQ_V2_ENCODER_HPP_INCLUDED__
#include "encoder.hpp"
#include "i_msg_source.hpp"
namespace
zmq
{
class
i_msg_source
;
// Encoder for 0MQ framing protocol. Converts messages into data stream.
class
v2_encoder_t
:
public
encoder_base_t
<
v2_encoder_t
>
{
public
:
v2_encoder_t
(
size_t
bufsize_
,
i_msg_source
*
msg_source_
);
virtual
~
v2_encoder_t
();
virtual
void
set_msg_source
(
i_msg_source
*
msg_source_
);
private
:
bool
size_ready
();
bool
message_ready
();
i_msg_source
*
msg_source
;
msg_t
in_progress
;
unsigned
char
tmpbuf
[
9
];
v2_encoder_t
(
const
v2_encoder_t
&
);
const
v2_encoder_t
&
operator
=
(
const
v2_encoder_t
&
);
};
}
#endif
src/v
1
_protocol.hpp
→
src/v
2
_protocol.hpp
View file @
785ef41f
...
...
@@ -17,14 +17,13 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_V
1
_PROTOCOL_HPP_INCLUDED__
#define __ZMQ_V
1
_PROTOCOL_HPP_INCLUDED__
#ifndef __ZMQ_V
2
_PROTOCOL_HPP_INCLUDED__
#define __ZMQ_V
2
_PROTOCOL_HPP_INCLUDED__
namespace
zmq
{
// Definition of constans for v1 transport protocol.
class
v1_protocol_t
// Definition of constants for ZMTP/2.0 transport protocol.
class
v2_protocol_t
{
public
:
// Message flags.
...
...
@@ -33,9 +32,7 @@ namespace zmq
more_flag
=
1
,
large_flag
=
2
};
};
}
#endif
...
...
src/xsub.cpp
View file @
785ef41f
...
...
@@ -87,10 +87,10 @@ int zmq::xsub_t::xsend (msg_t *msg_)
if
(
size
>
0
&&
*
data
==
1
)
{
// Process subscribe message
// This used to filter out duplicate subscriptions,
// however this is alread done on the XPUB side and
// doing it here as well breaks ZMQ_XPUB_VERBOSE
// when there are forwarding devices involved.
// This used to filter out duplicate subscriptions,
// however this is alread done on the XPUB side and
// doing it here as well breaks ZMQ_XPUB_VERBOSE
// when there are forwarding devices involved.
subscriptions
.
add
(
data
+
1
,
size
-
1
);
return
dist
.
send_to_all
(
msg_
);
}
...
...
tests/test_raw_sock.cpp
View file @
785ef41f
...
...
@@ -107,7 +107,7 @@ int main (void)
assert
(
rc
==
11
);
// First four bytes are [revision][socktype][identity]
assert
(
buffer
[
0
]
==
1
);
// Revision = 1
assert
(
buffer
[
0
]
==
1
);
assert
(
buffer
[
1
]
==
ZMTP_DEALER
);
// Identity is 2 byte message
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment