Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
9e80f07a
Commit
9e80f07a
authored
Jun 03, 2015
by
Pieter Hintjens
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #1417 from jens-auer/zero_copy
Zero copy message receive in v2_decoder
parents
03d6a734
e9b403a7
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
365 additions
and
126 deletions
+365
-126
atomic_counter.hpp
src/atomic_counter.hpp
+1
-1
decoder.hpp
src/decoder.hpp
+66
-18
msg.cpp
src/msg.cpp
+62
-39
msg.hpp
src/msg.hpp
+30
-20
v1_decoder.cpp
src/v1_decoder.cpp
+6
-5
v1_decoder.hpp
src/v1_decoder.hpp
+7
-5
v2_decoder.cpp
src/v2_decoder.cpp
+130
-32
v2_decoder.hpp
src/v2_decoder.hpp
+63
-6
No files found.
src/atomic_counter.hpp
View file @
9e80f07a
...
...
@@ -183,7 +183,7 @@ namespace zmq
#endif
}
inline
integer_t
get
()
inline
integer_t
get
()
const
{
return
value
;
}
...
...
src/decoder.hpp
View file @
9e80f07a
...
...
@@ -42,6 +42,45 @@
namespace
zmq
{
// Static buffer policy.
class
c_single_allocator
{
public
:
c_single_allocator
(
size_t
bufsize_
)
:
buf
((
unsigned
char
*
)
malloc
(
bufsize
)
),
bufsize
(
bufsize_
)
{
alloc_assert
(
buf
);
}
~
c_single_allocator
()
{
free
(
buf
);
}
unsigned
char
*
allocate
()
{
return
buf
;
}
void
deallocate
()
{
}
size_t
size
()
const
{
return
bufsize
;
}
private
:
unsigned
char
*
buf
;
size_t
bufsize
;
c_single_allocator
(
c_single_allocator
const
&
);
c_single_allocator
&
operator
=
(
c_single_allocator
const
&
);
};
// Helper base class for decoders that know the amount of data to read
// in advance at any moment. Knowing the amount in advance is a property
// of the protocol used. 0MQ framing protocol is based size-prefixed
...
...
@@ -52,31 +91,34 @@ namespace zmq
//
// This class implements the state machine that parses the incoming buffer.
// Derived class should implement individual state machine actions.
template
<
typename
T
>
class
decoder_base_t
:
public
i_decoder
//
// Buffer managment is done by an allocator policy.
template
<
typename
T
,
typename
A
=
c_single_allocator
>
class
decoder_base_t
:
public
i_decoder
{
public
:
inline
decoder_base_t
(
size_t
bufsize
_
)
:
next
(
NULL
),
read_pos
(
NULL
),
to_read
(
0
),
bufsize
(
bufsize_
)
inline
decoder_base_t
(
A
*
allocator
_
)
:
next
(
NULL
),
read_pos
(
NULL
),
to_read
(
0
),
allocator
(
allocator_
)
{
buf
=
(
unsigned
char
*
)
malloc
(
bufsize_
);
alloc_assert
(
buf
);
buf
=
allocator
->
allocate
();
}
// The destructor doesn't have to be virtual. It is mad virtual
// just to keep ICC and code checking tools from complaining.
inline
virtual
~
decoder_base_t
()
{
free
(
buf
);
allocator
->
deallocate
(
);
}
// Returns a buffer to be filled with binary data.
inline
void
get_buffer
(
unsigned
char
**
data_
,
size_t
*
size_
)
{
buf
=
allocator
->
allocate
();
// If we are expected to read large message, we'll opt for zero-
// copy, i.e. we'll ask caller to fill the data directly to the
// message. Note that subsequent read(s) are non-blocking, thus
...
...
@@ -85,14 +127,14 @@ namespace zmq
// As a consequence, large messages being received won't block
// other engines running in the same I/O thread for excessive
// amounts of time.
if
(
to_read
>=
bufsize
)
{
if
(
to_read
>=
allocator
->
size
()
)
{
*
data_
=
read_pos
;
*
size_
=
to_read
;
return
;
}
*
data_
=
buf
;
*
size_
=
bufsize
;
*
size_
=
allocator
->
size
()
;
}
// Processes the data in the buffer previously allocated using
...
...
@@ -116,7 +158,7 @@ namespace zmq
bytes_used_
=
size_
;
while
(
!
to_read
)
{
const
int
rc
=
(
static_cast
<
T
*>
(
this
)
->*
next
)
();
const
int
rc
=
(
static_cast
<
T
*>
(
this
)
->*
next
)
(
data_
+
bytes_used_
);
if
(
rc
!=
0
)
return
rc
;
}
...
...
@@ -126,14 +168,20 @@ namespace zmq
while
(
bytes_used_
<
size_
)
{
// Copy the data from buffer to the message.
const
size_t
to_copy
=
std
::
min
(
to_read
,
size_
-
bytes_used_
);
memcpy
(
read_pos
,
data_
+
bytes_used_
,
to_copy
);
// only copy when the destination address is different from the
// current address in the buffer
if
(
read_pos
!=
data_
+
bytes_used_
)
{
memcpy
(
read_pos
,
data_
+
bytes_used_
,
to_copy
);
}
read_pos
+=
to_copy
;
to_read
-=
to_copy
;
bytes_used_
+=
to_copy
;
// Try to get more space in the message to fill in.
// If none is available, return.
while
(
to_read
==
0
)
{
const
int
rc
=
(
static_cast
<
T
*>
(
this
)
->*
next
)
();
// pass current address in the buffer
const
int
rc
=
(
static_cast
<
T
*>
(
this
)
->*
next
)
(
data_
+
bytes_used_
);
if
(
rc
!=
0
)
return
rc
;
}
...
...
@@ -146,7 +194,7 @@ namespace zmq
// Prototype of state machine action. Action should return false if
// it is unable to push the data to the system.
typedef
int
(
T
::*
step_t
)
();
typedef
int
(
T
::*
step_t
)
(
unsigned
char
const
*
);
// This function should be called from derived class to read data
// from the buffer and schedule next state machine action.
...
...
@@ -171,8 +219,8 @@ namespace zmq
size_t
to_read
;
// The duffer for data to decode.
size_t
bufsize
;
unsigned
char
*
buf
;
A
*
allocator
;
unsigned
char
*
buf
;
decoder_base_t
(
const
decoder_base_t
&
);
const
decoder_base_t
&
operator
=
(
const
decoder_base_t
&
);
...
...
src/msg.cpp
View file @
9e80f07a
...
...
@@ -45,11 +45,30 @@
typedef
char
zmq_msg_size_check
[
2
*
((
sizeof
(
zmq
::
msg_t
)
==
sizeof
(
zmq_msg_t
))
!=
0
)
-
1
];
// check whether the size of atomic_counter_t matches the size of the wrapped integer
// to ensure that the lsmg union is correctly aligned
typedef
char
zmq_msg_size_check
[
2
*
((
sizeof
(
zmq
::
atomic_counter_t
)
==
sizeof
(
zmq
::
atomic_counter_t
::
integer_t
))
!=
0
)
-
1
];
bool
zmq
::
msg_t
::
check
()
{
return
u
.
base
.
type
>=
type_min
&&
u
.
base
.
type
<=
type_max
;
}
int
zmq
::
msg_t
::
init
(
void
*
data_
,
size_t
size_
,
msg_free_fn
*
ffn_
,
void
*
hint_
)
{
if
(
size_
<=
max_vsm_size
)
{
int
rc
=
init_size
(
size_
);
memcpy
(
data
(),
data_
,
size_
);
return
rc
;
}
else
{
return
init_data
(
data_
,
size_
,
ffn_
,
hint_
);
}
}
int
zmq
::
msg_t
::
init
()
{
u
.
vsm
.
metadata
=
NULL
;
...
...
@@ -76,18 +95,16 @@ int zmq::msg_t::init_size (size_t size_)
u
.
lmsg
.
type
=
type_lmsg
;
u
.
lmsg
.
flags
=
0
;
u
.
lmsg
.
routing_id
=
0
;
u
.
lmsg
.
content
=
(
content_t
*
)
malloc
(
sizeof
(
content_t
)
+
size_
);
if
(
unlikely
(
!
u
.
lmsg
.
content
))
{
u
.
lmsg
.
data
=
malloc
(
size_
);
if
(
unlikely
(
!
u
.
lmsg
.
data
))
{
errno
=
ENOMEM
;
return
-
1
;
}
u
.
lmsg
.
content
->
data
=
u
.
lmsg
.
content
+
1
;
u
.
lmsg
.
content
->
size
=
size_
;
u
.
lmsg
.
content
->
ffn
=
NULL
;
u
.
lmsg
.
content
->
hint
=
NULL
;
new
(
&
u
.
lmsg
.
content
->
refcnt
)
zmq
::
atomic_counter_t
();
u
.
lmsg
.
size
=
size_
;
u
.
lmsg
.
ffn
=
NULL
;
u
.
lmsg
.
hint
=
NULL
;
new
(
&
u
.
lmsg
.
refcnt
.
counter
)
zmq
::
atomic_counter_t
();
}
return
0
;
}
...
...
@@ -115,17 +132,12 @@ int zmq::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_,
u
.
lmsg
.
type
=
type_lmsg
;
u
.
lmsg
.
flags
=
0
;
u
.
lmsg
.
routing_id
=
0
;
u
.
lmsg
.
content
=
(
content_t
*
)
malloc
(
sizeof
(
content_t
));
if
(
!
u
.
lmsg
.
content
)
{
errno
=
ENOMEM
;
return
-
1
;
}
u
.
lmsg
.
content
->
data
=
data_
;
u
.
lmsg
.
content
->
size
=
size_
;
u
.
lmsg
.
content
->
ffn
=
ffn_
;
u
.
lmsg
.
content
->
hint
=
hint_
;
new
(
&
u
.
lmsg
.
content
->
refcnt
)
zmq
::
atomic_counter_t
();
u
.
lmsg
.
data
=
data_
;
u
.
lmsg
.
size
=
size_
;
u
.
lmsg
.
ffn
=
ffn_
;
u
.
lmsg
.
hint
=
hint_
;
new
(
&
u
.
lmsg
.
refcnt
.
counter
)
zmq
::
atomic_counter_t
();
}
return
0
;
...
...
@@ -140,6 +152,13 @@ int zmq::msg_t::init_delimiter ()
return
0
;
}
zmq
::
atomic_counter_t
&
zmq
::
msg_t
::
msg_counter
()
{
zmq_assert
(
is_lmsg
()
);
void
*
ptr
=
static_cast
<
void
*>
(
&
u
.
lmsg
.
refcnt
.
counter
);
return
*
static_cast
<
atomic_counter_t
*>
(
ptr
);
}
int
zmq
::
msg_t
::
close
()
{
// Check the validity of the message.
...
...
@@ -153,16 +172,14 @@ int zmq::msg_t::close ()
// If the content is not shared, or if it is shared and the reference
// count has dropped to zero, deallocate it.
if
(
!
(
u
.
lmsg
.
flags
&
msg_t
::
shared
)
||
!
u
.
lmsg
.
content
->
refcnt
.
sub
(
1
))
{
// We used "placement new" operator to initialize the reference
// counter so we call the destructor explicitly now.
u
.
lmsg
.
content
->
refcnt
.
~
atomic_counter_t
();
if
(
u
.
lmsg
.
content
->
ffn
)
u
.
lmsg
.
content
->
ffn
(
u
.
lmsg
.
content
->
data
,
u
.
lmsg
.
content
->
hint
);
free
(
u
.
lmsg
.
content
);
!
msg_counter
().
sub
(
1
))
{
if
(
u
.
lmsg
.
ffn
)
{
u
.
lmsg
.
ffn
(
u
.
lmsg
.
data
,
u
.
lmsg
.
hint
);
}
else
{
free
(
u
.
lmsg
.
data
);
}
}
}
...
...
@@ -214,10 +231,10 @@ int zmq::msg_t::copy (msg_t &src_)
// One reference is added to shared messages. Non-shared messages
// are turned into shared messages and reference count is set to 2.
if
(
src_
.
u
.
lmsg
.
flags
&
msg_t
::
shared
)
src_
.
u
.
lmsg
.
content
->
refcnt
.
add
(
1
);
src_
.
msg_counter
()
.
add
(
1
);
else
{
src_
.
u
.
lmsg
.
flags
|=
msg_t
::
shared
;
src_
.
u
.
lmsg
.
content
->
refcnt
.
set
(
2
);
src_
.
msg_counter
()
.
set
(
2
);
}
}
...
...
@@ -239,7 +256,7 @@ void *zmq::msg_t::data ()
case
type_vsm
:
return
u
.
vsm
.
data
;
case
type_lmsg
:
return
u
.
lmsg
.
content
->
data
;
return
u
.
lmsg
.
data
;
case
type_cmsg
:
return
u
.
cmsg
.
data
;
default
:
...
...
@@ -257,7 +274,7 @@ size_t zmq::msg_t::size ()
case
type_vsm
:
return
u
.
vsm
.
size
;
case
type_lmsg
:
return
u
.
lmsg
.
content
->
size
;
return
u
.
lmsg
.
size
;
case
type_cmsg
:
return
u
.
cmsg
.
size
;
default
:
...
...
@@ -333,6 +350,11 @@ bool zmq::msg_t::is_vsm ()
return
u
.
base
.
type
==
type_vsm
;
}
bool
zmq
::
msg_t
::
is_lmsg
()
const
{
return
u
.
base
.
type
==
type_lmsg
;
}
bool
zmq
::
msg_t
::
is_cmsg
()
{
return
u
.
base
.
type
==
type_cmsg
;
...
...
@@ -353,9 +375,9 @@ void zmq::msg_t::add_refs (int refs_)
// message type that needs special care are long messages.
if
(
u
.
base
.
type
==
type_lmsg
)
{
if
(
u
.
lmsg
.
flags
&
msg_t
::
shared
)
u
.
lmsg
.
content
->
refcnt
.
add
(
refs_
);
msg_counter
()
.
add
(
refs_
);
else
{
u
.
lmsg
.
content
->
refcnt
.
set
(
refs_
+
1
);
msg_counter
()
.
set
(
refs_
+
1
);
u
.
lmsg
.
flags
|=
msg_t
::
shared
;
}
}
...
...
@@ -379,14 +401,14 @@ bool zmq::msg_t::rm_refs (int refs_)
}
// The only message type that needs special care are long messages.
if
(
!
u
.
lmsg
.
content
->
refcnt
.
sub
(
refs_
))
{
if
(
!
msg_counter
()
.
sub
(
refs_
))
{
// We used "placement new" operator to initialize the reference
// counter so we call the destructor explicitly now.
u
.
lmsg
.
content
->
refcnt
.
~
atomic_counter_t
();
msg_counter
()
.
~
atomic_counter_t
();
if
(
u
.
lmsg
.
content
->
ffn
)
u
.
lmsg
.
content
->
ffn
(
u
.
lmsg
.
content
->
data
,
u
.
lmsg
.
content
->
hint
);
free
(
u
.
lmsg
.
content
);
if
(
u
.
lmsg
.
ffn
)
u
.
lmsg
.
ffn
(
u
.
lmsg
.
data
,
u
.
lmsg
.
hint
);
free
(
u
.
lmsg
.
data
);
return
false
;
}
...
...
@@ -404,3 +426,4 @@ int zmq::msg_t::set_routing_id(uint32_t routing_id_)
u
.
base
.
routing_id
=
routing_id_
;
return
0
;
}
src/msg.hpp
View file @
9e80f07a
...
...
@@ -54,7 +54,6 @@ namespace zmq
class
msg_t
{
public
:
// Message flags.
enum
{
...
...
@@ -66,6 +65,8 @@ namespace zmq
};
bool
check
();
int
init
(
void
*
data_
,
size_t
size_
,
msg_free_fn
*
ffn_
,
void
*
hint_
);
int
init
();
int
init_size
(
size_t
size_
);
int
init_data
(
void
*
data_
,
size_t
size_
,
msg_free_fn
*
ffn_
,
...
...
@@ -88,6 +89,7 @@ namespace zmq
bool
is_credential
()
const
;
bool
is_delimiter
()
const
;
bool
is_vsm
();
bool
is_lmsg
()
const
;
bool
is_cmsg
();
uint32_t
get_routing_id
();
int
set_routing_id
(
uint32_t
routing_id_
);
...
...
@@ -107,22 +109,6 @@ namespace zmq
enum
{
msg_t_size
=
64
};
enum
{
max_vsm_size
=
msg_t_size
-
(
8
+
sizeof
(
metadata_t
*
)
+
3
+
sizeof
(
uint32_t
))
};
// Shared message buffer. Message data are either allocated in one
// continuous block along with this structure - thus avoiding one
// malloc/free pair or they are stored in used-supplied memory.
// In the latter case, ffn member stores pointer to the function to be
// used to deallocate the data. If the buffer is actually shared (there
// are at least 2 references to it) refcount member contains number of
// references.
struct
content_t
{
void
*
data
;
size_t
size
;
msg_free_fn
*
ffn
;
void
*
hint
;
zmq
::
atomic_counter_t
refcnt
;
};
// Different message types.
enum
type_t
{
...
...
@@ -138,6 +124,8 @@ namespace zmq
type_max
=
104
};
atomic_counter_t
&
msg_counter
();
// the file descriptor where this message originated, needs to be 64bit due to alignment
int64_t
file_desc
;
...
...
@@ -161,10 +149,32 @@ namespace zmq
unsigned
char
flags
;
uint32_t
routing_id
;
}
vsm
;
struct
{
struct
lmsg_t
{
metadata_t
*
metadata
;
content_t
*
content
;
unsigned
char
unused
[
msg_t_size
-
(
8
+
sizeof
(
metadata_t
*
)
+
sizeof
(
content_t
*
)
+
2
+
sizeof
(
uint32_t
))];
// Shared message buffer. Message data are either allocated in one
// continuous block along with this structure - thus avoiding one
// malloc/free pair or they are stored in used-supplied memory.
// In the latter case, ffn member stores pointer to the function to be
// used to deallocate the data. If the buffer is actually shared (there
// are at least 2 references to it) refcount member contains number of
// references.
void
*
data
;
size_t
size
;
msg_free_fn
*
ffn
;
void
*
hint
;
// create an aligned block for an atomic_counter_t object
union
aligned_atomic_counter_storage
{
zmq
::
atomic_counter_t
::
integer_t
maxAlign
;
unsigned
char
counter
[
sizeof
(
zmq
::
atomic_counter_t
)
];
}
refcnt
;
unsigned
char
unused
[
msg_t_size
-
(
8
+
sizeof
(
metadata_t
*
)
+
sizeof
(
void
*
)
+
sizeof
(
size_t
)
+
sizeof
(
msg_free_fn
*
)
+
sizeof
(
void
*
)
+
sizeof
(
aligned_atomic_counter_storage
)
+
2
+
sizeof
(
uint32_t
))];
unsigned
char
type
;
unsigned
char
flags
;
uint32_t
routing_id
;
...
...
src/v1_decoder.cpp
View file @
9e80f07a
...
...
@@ -43,7 +43,8 @@
#include "err.hpp"
zmq
::
v1_decoder_t
::
v1_decoder_t
(
size_t
bufsize_
,
int64_t
maxmsgsize_
)
:
decoder_base_t
<
v1_decoder_t
>
(
bufsize_
),
c_single_allocator
(
bufsize_
),
decoder_base_t
<
v1_decoder_t
>
(
this
),
maxmsgsize
(
maxmsgsize_
)
{
int
rc
=
in_progress
.
init
();
...
...
@@ -59,7 +60,7 @@ zmq::v1_decoder_t::~v1_decoder_t ()
errno_assert
(
rc
==
0
);
}
int
zmq
::
v1_decoder_t
::
one_byte_size_ready
()
int
zmq
::
v1_decoder_t
::
one_byte_size_ready
(
unsigned
char
const
*
)
{
// First byte of size is read. If it is 0xff read 8-byte size.
// Otherwise allocate the buffer for message data and read the
...
...
@@ -96,7 +97,7 @@ int zmq::v1_decoder_t::one_byte_size_ready ()
return
0
;
}
int
zmq
::
v1_decoder_t
::
eight_byte_size_ready
()
int
zmq
::
v1_decoder_t
::
eight_byte_size_ready
(
unsigned
char
const
*
)
{
// 8-byte payload length is read. Allocate the buffer
// for message body and read the message data into it.
...
...
@@ -138,7 +139,7 @@ int zmq::v1_decoder_t::eight_byte_size_ready ()
return
0
;
}
int
zmq
::
v1_decoder_t
::
flags_ready
()
int
zmq
::
v1_decoder_t
::
flags_ready
(
unsigned
char
const
*
)
{
// Store the flags from the wire into the message structure.
in_progress
.
set_flags
(
tmpbuf
[
0
]
&
msg_t
::
more
);
...
...
@@ -149,7 +150,7 @@ int zmq::v1_decoder_t::flags_ready ()
return
0
;
}
int
zmq
::
v1_decoder_t
::
message_ready
()
int
zmq
::
v1_decoder_t
::
message_ready
(
unsigned
char
const
*
)
{
// Message is completely read. Push it further and start reading
// new message. (in_progress is a 0-byte message after this point.)
...
...
src/v1_decoder.hpp
View file @
9e80f07a
...
...
@@ -36,7 +36,9 @@ namespace zmq
{
// Decoder for ZMTP/1.0 protocol. Converts data batches into messages.
class
v1_decoder_t
:
public
decoder_base_t
<
v1_decoder_t
>
class
v1_decoder_t
:
public
zmq
::
c_single_allocator
,
public
decoder_base_t
<
v1_decoder_t
>
{
public
:
...
...
@@ -47,10 +49,10 @@ namespace zmq
private
:
int
one_byte_size_ready
();
int
eight_byte_size_ready
();
int
flags_ready
();
int
message_ready
();
int
one_byte_size_ready
(
unsigned
char
const
*
);
int
eight_byte_size_ready
(
unsigned
char
const
*
);
int
flags_ready
(
unsigned
char
const
*
);
int
message_ready
(
unsigned
char
const
*
);
unsigned
char
tmpbuf
[
8
];
msg_t
in_progress
;
...
...
src/v2_decoder.cpp
View file @
9e80f07a
...
...
@@ -41,8 +41,96 @@
#include "wire.hpp"
#include "err.hpp"
zmq
::
shared_message_memory_allocator
::
shared_message_memory_allocator
(
size_t
bufsize_
)
:
buf
(
NULL
),
bufsize
(
bufsize_
)
{
}
zmq
::
shared_message_memory_allocator
::~
shared_message_memory_allocator
()
{
deallocate
();
}
unsigned
char
*
zmq
::
shared_message_memory_allocator
::
allocate
()
{
if
(
buf
)
{
// release reference count to couple lifetime to messages
call_dec_ref
(
NULL
,
buf
);
// release pointer because we are going to create a new buffer
release
();
}
// @todo aligmnet padding may be needed
if
(
!
buf
)
{
buf
=
(
unsigned
char
*
)
malloc
(
bufsize
+
sizeof
(
zmq
::
atomic_counter_t
));
alloc_assert
(
buf
);
new
(
buf
)
atomic_counter_t
(
1
);
}
return
buf
+
sizeof
(
zmq
::
atomic_counter_t
);
}
void
zmq
::
shared_message_memory_allocator
::
deallocate
()
{
free
(
buf
);
buf
=
NULL
;
}
unsigned
char
*
zmq
::
shared_message_memory_allocator
::
release
()
{
unsigned
char
*
b
=
buf
;
buf
=
NULL
;
return
b
;
}
void
zmq
::
shared_message_memory_allocator
::
reset
(
unsigned
char
*
b
)
{
deallocate
();
buf
=
b
;
}
void
zmq
::
shared_message_memory_allocator
::
inc_ref
()
{
((
zmq
::
atomic_counter_t
*
)
buf
)
->
add
(
1
);
}
void
zmq
::
shared_message_memory_allocator
::
call_dec_ref
(
void
*
,
void
*
hint
)
{
zmq_assert
(
hint
);
zmq
::
atomic_counter_t
*
c
=
reinterpret_cast
<
zmq
::
atomic_counter_t
*>
(
hint
);
if
(
!
c
->
sub
(
1
))
{
c
->~
atomic_counter_t
();
free
(
hint
);
}
}
size_t
zmq
::
shared_message_memory_allocator
::
size
()
const
{
if
(
buf
)
{
return
bufsize
;
}
else
{
return
0
;
}
}
unsigned
char
*
zmq
::
shared_message_memory_allocator
::
data
()
{
zmq_assert
(
buf
);
return
buf
+
sizeof
(
zmq
::
atomic_counter_t
);
}
zmq
::
v2_decoder_t
::
v2_decoder_t
(
size_t
bufsize_
,
int64_t
maxmsgsize_
)
:
decoder_base_t
<
v2_decoder_t
>
(
bufsize_
),
shared_message_memory_allocator
(
bufsize_
),
decoder_base_t
<
v2_decoder_t
,
shared_message_memory_allocator
>
(
this
),
msg_flags
(
0
),
maxmsgsize
(
maxmsgsize_
)
{
...
...
@@ -59,7 +147,7 @@ zmq::v2_decoder_t::~v2_decoder_t ()
errno_assert
(
rc
==
0
);
}
int
zmq
::
v2_decoder_t
::
flags_ready
()
int
zmq
::
v2_decoder_t
::
flags_ready
(
unsigned
char
const
*
)
{
msg_flags
=
0
;
if
(
tmpbuf
[
0
]
&
v2_protocol_t
::
more_flag
)
...
...
@@ -77,40 +165,20 @@ int zmq::v2_decoder_t::flags_ready ()
return
0
;
}
int
zmq
::
v2_decoder_t
::
one_byte_size_ready
()
int
zmq
::
v2_decoder_t
::
one_byte_size_ready
(
unsigned
char
const
*
read_from
)
{
// Message size must not exceed the maximum allowed size.
if
(
maxmsgsize
>=
0
)
if
(
unlikely
(
tmpbuf
[
0
]
>
static_cast
<
uint64_t
>
(
maxmsgsize
)))
{
errno
=
EMSGSIZE
;
return
-
1
;
}
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
int
rc
=
in_progress
.
init_size
(
tmpbuf
[
0
]);
if
(
unlikely
(
rc
))
{
errno_assert
(
errno
==
ENOMEM
);
rc
=
in_progress
.
init
();
errno_assert
(
rc
==
0
);
errno
=
ENOMEM
;
return
-
1
;
}
in_progress
.
set_flags
(
msg_flags
);
next_step
(
in_progress
.
data
(),
in_progress
.
size
(),
&
v2_decoder_t
::
message_ready
);
return
0
;
return
size_ready
(
tmpbuf
[
0
],
read_from
);
}
int
zmq
::
v2_decoder_t
::
eight_byte_size_ready
()
{
int
zmq
::
v2_decoder_t
::
eight_byte_size_ready
(
unsigned
char
const
*
read_from
)
{
// The payload size is encoded as 64-bit unsigned integer.
// The most significant byte comes first.
const
uint64_t
msg_size
=
get_uint64
(
tmpbuf
);
const
uint64_t
msg_size
=
get_uint64
(
tmpbuf
);
return
size_ready
(
msg_size
,
read_from
);
}
int
zmq
::
v2_decoder_t
::
size_ready
(
uint64_t
msg_size
,
unsigned
char
const
*
read_pos
)
{
// Message size must not exceed the maximum allowed size.
if
(
maxmsgsize
>=
0
)
if
(
unlikely
(
msg_size
>
static_cast
<
uint64_t
>
(
maxmsgsize
)))
{
...
...
@@ -127,7 +195,31 @@ int zmq::v2_decoder_t::eight_byte_size_ready ()
// in_progress is initialised at this point so in theory we should
// close it before calling init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised.
int
rc
=
in_progress
.
init_size
(
static_cast
<
size_t
>
(
msg_size
));
int
rc
=
-
1
;
// the current message can exceed the current buffer. We have to copy the buffer
// data into a new message and complete it in the next receive.
if
(
unlikely
((
unsigned
char
*
)
read_pos
+
msg_size
>
(
data
()
+
size
())))
{
// a new message has started, but the size would exceed the pre-allocated arena
// this happens everytime when a message does not fit completely into the buffer
rc
=
in_progress
.
init_size
(
static_cast
<
size_t
>
(
msg_size
));
}
else
{
// construct message using n bytes from the buffer as storage
// increase buffer ref count
rc
=
in_progress
.
init
(
(
unsigned
char
*
)
read_pos
,
msg_size
,
shared_message_memory_allocator
::
call_dec_ref
,
buffer
()
);
// For small messages, data has been copied and refcount does not have to be increased
if
(
in_progress
.
is_lmsg
())
{
inc_ref
();
}
}
if
(
unlikely
(
rc
))
{
errno_assert
(
errno
==
ENOMEM
);
rc
=
in_progress
.
init
();
...
...
@@ -137,13 +229,19 @@ int zmq::v2_decoder_t::eight_byte_size_ready ()
}
in_progress
.
set_flags
(
msg_flags
);
// this sets read_pos to
// the message data address if the data needs to be copied
// for small message / messages exceeding the current buffer
// or
// to the current start address in the buffer because the message
// was constructed to use n bytes from the address passed as argument
next_step
(
in_progress
.
data
(),
in_progress
.
size
(),
&
v2_decoder_t
::
message_ready
);
return
0
;
}
int
zmq
::
v2_decoder_t
::
message_ready
()
int
zmq
::
v2_decoder_t
::
message_ready
(
unsigned
char
const
*
)
{
// Message is completely read. Signal this to the caller
// and prepare to decode next message.
...
...
src/v2_decoder.hpp
View file @
9e80f07a
...
...
@@ -34,11 +34,66 @@
namespace
zmq
{
// Decoder for ZMTP/2.x framing protocol. Converts data stream into messages.
class
v2_decoder_t
:
public
decoder_base_t
<
v2_decoder_t
>
// This allocater allocates a reference counted buffer which is used by v2_decoder_t
// to use zero-copy msg::init_data to create messages with memory from this buffer as
// data storage.
//
// The buffer is allocated with a reference count of 1 to make sure that is is alive while
// decoding messages. Otherwise, it is possible that e.g. the first message increases the count
// from zero to one, gets passed to the user application, processed in the user thread and deleted
// which would then deallocate the buffer. The drawback is that the buffer may be allocated longer
// than necessary because it is only deleted when allocate is called the next time.
class
shared_message_memory_allocator
{
public
:
shared_message_memory_allocator
(
size_t
bufsize_
);
~
shared_message_memory_allocator
();
// Allocate a new buffer
//
// This releases the current buffer to be bound to the lifetime of the messages
// created on this bufer.
unsigned
char
*
allocate
();
// force deallocation of buffer.
void
deallocate
();
// Give up ownership of the buffer. The buffer's lifetime is now coupled to
// the messages constructed on top of it.
unsigned
char
*
release
();
void
reset
(
unsigned
char
*
b
);
void
inc_ref
();
static
void
call_dec_ref
(
void
*
,
void
*
buffer
);
size_t
size
()
const
;
// Return pointer to the first message data byte.
unsigned
char
*
data
();
// Return pointer to the first byte of the buffer.
unsigned
char
*
buffer
()
{
return
buf
;
}
private
:
unsigned
char
*
buf
;
size_t
bufsize
;
};
// Decoder for ZMTP/2.x framing protocol. Converts data stream into messages.
// The class has to inherit from shared_message_memory_allocator because
// the base class calls allocate in its constructor.
class
v2_decoder_t
:
// inherit first from allocator to ensure that it is constructed before decoder_base_t
public
shared_message_memory_allocator
,
public
decoder_base_t
<
v2_decoder_t
,
shared_message_memory_allocator
>
{
public
:
v2_decoder_t
(
size_t
bufsize_
,
int64_t
maxmsgsize_
);
virtual
~
v2_decoder_t
();
...
...
@@ -47,10 +102,12 @@ namespace zmq
private
:
int
flags_ready
();
int
one_byte_size_ready
();
int
eight_byte_size_ready
();
int
message_ready
();
int
flags_ready
(
unsigned
char
const
*
);
int
one_byte_size_ready
(
unsigned
char
const
*
);
int
eight_byte_size_ready
(
unsigned
char
const
*
);
int
message_ready
(
unsigned
char
const
*
);
int
size_ready
(
uint64_t
size_
,
unsigned
char
const
*
);
unsigned
char
tmpbuf
[
8
];
unsigned
char
msg_flags
;
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment