Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
5b4b8a06
Commit
5b4b8a06
authored
Jul 05, 2015
by
Thomas Köppe
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
[decoder*] Style fixes for consistency
parent
e83bad14
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
111 additions
and
116 deletions
+111
-116
decoder.hpp
src/decoder.hpp
+33
-35
decoder_allocators.cpp
src/decoder_allocators.cpp
+45
-48
decoder_allocators.hpp
src/decoder_allocators.hpp
+33
-33
No files found.
src/decoder.hpp
View file @
5b4b8a06
...
...
@@ -30,16 +30,15 @@
#ifndef __ZMQ_DECODER_HPP_INCLUDED__
#define __ZMQ_DECODER_HPP_INCLUDED__
#include <stddef.h>
#include <string.h>
#include <stdlib.h>
#include <algorithm>
#include <cstddef>
#include <cstdlib>
#include "decoder_allocators.hpp"
#include "err.hpp"
#include "msg.hpp"
#include "i_decoder.hpp"
#include "msg.hpp"
#include "stdint.hpp"
#include "decoder_allocators.hpp"
namespace
zmq
{
...
...
@@ -60,26 +59,26 @@ namespace zmq
{
public
:
inline
decoder_base_t
(
A
*
allocator_
)
:
next
(
NULL
),
read_pos
(
NULL
),
to_read
(
0
),
allocator
(
allocator_
)
explicit
decoder_base_t
(
A
*
allocator_
)
:
next
(
NULL
),
read_pos
(
NULL
),
to_read
(
0
),
allocator
(
allocator_
)
{
buf
=
allocator
->
allocate
();
buf
=
allocator
->
allocate
();
}
// The destructor doesn't have to be virtual. It is mad virtual
// The destructor doesn't have to be virtual. It is mad
e
virtual
// just to keep ICC and code checking tools from complaining.
inline
virtual
~
decoder_base_t
()
virtual
~
decoder_base_t
()
{
allocator
->
deallocate
();
allocator
->
deallocate
();
}
// Returns a buffer to be filled with binary data.
inline
void
get_buffer
(
unsigned
char
**
data_
,
size_t
*
size_
)
void
get_buffer
(
unsigned
char
**
data_
,
std
::
size_t
*
size_
)
{
buf
=
allocator
->
allocate
();
buf
=
allocator
->
allocate
();
// If we are expected to read large message, we'll opt for zero-
// copy, i.e. we'll ask caller to fill the data directly to the
...
...
@@ -89,14 +88,14 @@ namespace zmq
// As a consequence, large messages being received won't block
// other engines running in the same I/O thread for excessive
// amounts of time.
if
(
to_read
>=
allocator
->
size
())
{
if
(
to_read
>=
allocator
->
size
())
{
*
data_
=
read_pos
;
*
size_
=
to_read
;
return
;
}
*
data_
=
buf
;
*
size_
=
allocator
->
size
();
*
size_
=
allocator
->
size
();
}
// Processes the data in the buffer previously allocated using
...
...
@@ -105,8 +104,8 @@ namespace zmq
// whole message was decoded or 0 when more data is required.
// On error, -1 is returned and errno set accordingly.
// Number of bytes processed is returned in byts_used_.
in
line
int
decode
(
const
unsigned
char
*
data_
,
size_t
size_
,
size_t
&
bytes_used_
)
in
t
decode
(
const
unsigned
char
*
data_
,
std
::
size_t
size_
,
std
::
size_t
&
bytes_used_
)
{
bytes_used_
=
0
;
...
...
@@ -120,7 +119,7 @@ namespace zmq
bytes_used_
=
size_
;
while
(
!
to_read
)
{
const
int
rc
=
(
static_cast
<
T
*>
(
this
)
->*
next
)
(
data_
+
bytes_used_
);
const
int
rc
=
(
static_cast
<
T
*>
(
this
)
->*
next
)
(
data_
+
bytes_used_
);
if
(
rc
!=
0
)
return
rc
;
}
...
...
@@ -129,11 +128,11 @@ namespace zmq
while
(
bytes_used_
<
size_
)
{
// Copy the data from buffer to the message.
const
size_t
to_copy
=
std
::
min
(
to_read
,
size_
-
bytes_used_
);
const
s
td
::
s
ize_t
to_copy
=
std
::
min
(
to_read
,
size_
-
bytes_used_
);
// only copy when the destination address is different from the
// current address in the buffer
if
(
read_pos
!=
data_
+
bytes_used_
)
{
memcpy
(
read_pos
,
data_
+
bytes_used_
,
to_copy
);
std
::
memcpy
(
read_pos
,
data_
+
bytes_used_
,
to_copy
);
}
read_pos
+=
to_copy
;
...
...
@@ -143,7 +142,7 @@ namespace zmq
// If none is available, return.
while
(
to_read
==
0
)
{
// pass current address in the buffer
const
int
rc
=
(
static_cast
<
T
*>
(
this
)
->*
next
)
(
data_
+
bytes_used_
);
const
int
rc
=
(
static_cast
<
T
*>
(
this
)
->*
next
)
(
data_
+
bytes_used_
);
if
(
rc
!=
0
)
return
rc
;
}
...
...
@@ -152,22 +151,22 @@ namespace zmq
return
0
;
}
virtual
void
resize_buffer
(
size_t
new_size
)
virtual
void
resize_buffer
(
std
::
size_t
new_size
)
{
allocator
->
resize
(
new_size
);
allocator
->
resize
(
new_size
);
}
protected
:
// Prototype of state machine action. Action should return false if
// it is unable to push the data to the system.
typedef
int
(
T
::
*
step_t
)
(
unsigned
char
const
*
);
typedef
int
(
T
::
*
step_t
)
(
unsigned
char
const
*
);
// This function should be called from derived class to read data
// from the buffer and schedule next state machine action.
inline
void
next_step
(
void
*
read_pos_
,
size_t
to_read_
,
step_t
next_
)
void
next_step
(
void
*
read_pos_
,
std
::
size_t
to_read_
,
step_t
next_
)
{
read_pos
=
(
unsigned
char
*
)
read_pos_
;
read_pos
=
static_cast
<
unsigned
char
*>
(
read_pos_
)
;
to_read
=
to_read_
;
next
=
next_
;
}
...
...
@@ -183,16 +182,15 @@ namespace zmq
unsigned
char
*
read_pos
;
// How much data to read before taking next step.
size_t
to_read
;
s
td
::
s
ize_t
to_read
;
// The duffer for data to decode.
A
*
allocator
;
unsigned
char
*
buf
;
A
*
allocator
;
unsigned
char
*
buf
;
decoder_base_t
(
const
decoder_base_t
&
);
const
decoder_base_t
&
operator
=
(
const
decoder_base_t
&
);
decoder_base_t
(
const
decoder_base_t
&
);
const
decoder_base_t
&
operator
=
(
const
decoder_base_t
&
);
};
}
#endif
src/decoder_allocators.cpp
View file @
5b4b8a06
...
...
@@ -33,79 +33,76 @@
#include "msg.hpp"
zmq
::
shared_message_memory_allocator
::
shared_message_memory_allocator
(
size_t
bufsize_
)
:
buf
(
NULL
),
bufsize
(
0
),
max_size
(
bufsize_
),
msg_refcnt
(
NULL
),
maxCounters
(
std
::
ceil
(
static_cast
<
double
>
(
max_size
)
/
static_cast
<
double
>
(
msg_t
::
max_vsm_size
))
)
zmq
::
shared_message_memory_allocator
::
shared_message_memory_allocator
(
std
::
size_t
bufsize_
)
:
buf
(
NULL
),
bufsize
(
0
),
max_size
(
bufsize_
),
msg_refcnt
(
NULL
),
maxCounters
(
std
::
ceil
(
static_cast
<
double
>
(
max_size
)
/
static_cast
<
double
>
(
msg_t
::
max_vsm_size
))
)
{
}
zmq
::
shared_message_memory_allocator
::
shared_message_memory_allocator
(
size_t
bufsize_
,
size_t
maxMessages
)
:
buf
(
NULL
),
bufsize
(
0
),
max_size
(
bufsize_
),
msg_refcnt
(
NULL
),
maxCounters
(
maxMessages
)
zmq
::
shared_message_memory_allocator
::
shared_message_memory_allocator
(
std
::
size_t
bufsize_
,
std
::
size_t
maxMessages
)
:
buf
(
NULL
),
bufsize
(
0
),
max_size
(
bufsize_
),
msg_refcnt
(
NULL
),
maxCounters
(
maxMessages
)
{
}
zmq
::
shared_message_memory_allocator
::~
shared_message_memory_allocator
()
zmq
::
shared_message_memory_allocator
::~
shared_message_memory_allocator
()
{
deallocate
();
}
unsigned
char
*
zmq
::
shared_message_memory_allocator
::
allocate
()
unsigned
char
*
zmq
::
shared_message_memory_allocator
::
allocate
()
{
if
(
buf
)
{
if
(
buf
)
{
// release reference count to couple lifetime to messages
zmq
::
atomic_counter_t
*
c
=
reinterpret_cast
<
zmq
::
atomic_counter_t
*
>
(
buf
);
zmq
::
atomic_counter_t
*
c
=
reinterpret_cast
<
zmq
::
atomic_counter_t
*
>
(
buf
);
// if refcnt drops to 0, there are no message using the buffer
// because either all messages have been closed or only vsm-messages
// were created
if
(
c
->
sub
(
1
))
{
if
(
c
->
sub
(
1
))
{
// buffer is still in use as message data. "Release" it and create a new one
// release pointer because we are going to create a new buffer
release
();
release
();
}
}
// if buf != NULL it is not used by any message so we can re-use it for the next run
if
(
!
buf
)
{
// allocate memory for reference counters together with reception buffer
size_t
const
allocationsize
=
max_size
+
sizeof
(
zmq
::
atomic_counter_t
)
+
maxCounters
*
sizeof
(
zmq
::
atomic_counter_t
);
std
::
size_t
const
allocationsize
=
max_size
+
sizeof
(
zmq
::
atomic_counter_t
)
+
maxCounters
*
sizeof
(
zmq
::
atomic_counter_t
);
buf
=
static_cast
<
unsigned
char
*>
(
malloc
(
allocationsize
)
);
buf
=
static_cast
<
unsigned
char
*>
(
std
::
malloc
(
allocationsize
)
);
alloc_assert
(
buf
);
new
(
buf
)
atomic_counter_t
(
1
);
}
else
{
new
(
buf
)
atomic_counter_t
(
1
);
}
else
{
// release reference count to couple lifetime to messages
zmq
::
atomic_counter_t
*
c
=
reinterpret_cast
<
zmq
::
atomic_counter_t
*>
(
buf
);
c
->
set
(
1
);
zmq
::
atomic_counter_t
*
c
=
reinterpret_cast
<
zmq
::
atomic_counter_t
*>
(
buf
);
c
->
set
(
1
);
}
bufsize
=
max_size
;
msg_refcnt
=
reinterpret_cast
<
zmq
::
atomic_counter_t
*>
(
buf
+
sizeof
(
atomic_counter_t
)
+
max_size
);
return
buf
+
sizeof
(
zmq
::
atomic_counter_t
);
msg_refcnt
=
reinterpret_cast
<
zmq
::
atomic_counter_t
*>
(
buf
+
sizeof
(
atomic_counter_t
)
+
max_size
);
return
buf
+
sizeof
(
zmq
::
atomic_counter_t
);
}
void
zmq
::
shared_message_memory_allocator
::
deallocate
()
void
zmq
::
shared_message_memory_allocator
::
deallocate
()
{
std
::
free
(
buf
);
std
::
free
(
buf
);
buf
=
NULL
;
bufsize
=
0
;
msg_refcnt
=
NULL
;
}
unsigned
char
*
zmq
::
shared_message_memory_allocator
::
release
()
unsigned
char
*
zmq
::
shared_message_memory_allocator
::
release
()
{
unsigned
char
*
b
=
buf
;
buf
=
NULL
;
...
...
@@ -115,30 +112,31 @@ unsigned char* zmq::shared_message_memory_allocator::release()
return
b
;
}
void
zmq
::
shared_message_memory_allocator
::
inc_ref
()
void
zmq
::
shared_message_memory_allocator
::
inc_ref
()
{
(
reinterpret_cast
<
zmq
::
atomic_counter_t
*>
(
buf
))
->
add
(
1
);
(
reinterpret_cast
<
zmq
::
atomic_counter_t
*>
(
buf
))
->
add
(
1
);
}
void
zmq
::
shared_message_memory_allocator
::
call_dec_ref
(
void
*
,
void
*
hint
)
{
zmq_assert
(
hint
);
unsigned
char
*
buf
=
static_cast
<
unsigned
char
*>
(
hint
);
zmq
::
atomic_counter_t
*
c
=
reinterpret_cast
<
zmq
::
atomic_counter_t
*>
(
buf
);
void
zmq
::
shared_message_memory_allocator
::
call_dec_ref
(
void
*
,
void
*
hint
)
{
zmq_assert
(
hint
);
unsigned
char
*
buf
=
static_cast
<
unsigned
char
*>
(
hint
);
zmq
::
atomic_counter_t
*
c
=
reinterpret_cast
<
zmq
::
atomic_counter_t
*>
(
buf
);
if
(
!
c
->
sub
(
1
))
{
c
->~
atomic_counter_t
();
free
(
buf
);
if
(
!
c
->
sub
(
1
))
{
c
->~
atomic_counter_t
();
std
::
free
(
buf
);
buf
=
NULL
;
}
}
s
ize_t
zmq
::
shared_message_memory_allocator
::
size
()
const
s
td
::
size_t
zmq
::
shared_message_memory_allocator
::
size
()
const
{
return
bufsize
;
}
unsigned
char
*
zmq
::
shared_message_memory_allocator
::
data
()
unsigned
char
*
zmq
::
shared_message_memory_allocator
::
data
()
{
return
buf
+
sizeof
(
zmq
::
atomic_counter_t
);
}
\ No newline at end of file
return
buf
+
sizeof
(
zmq
::
atomic_counter_t
);
}
src/decoder_allocators.hpp
View file @
5b4b8a06
...
...
@@ -27,13 +27,14 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef
ZEROMQ_DECODER_ALLOCATORS_HPP
#define
ZEROMQ_DECODER_ALLOCATORS_HPP
#ifndef
__ZMQ_DECODER_ALLOCATORS_HPP_INCLUDED__
#define
__ZMQ_DECODER_ALLOCATORS_HPP_INCLUDED__
#include <cstddef>
#include <cstdlib>
#include "err.hpp"
#include "atomic_counter.hpp"
#include "err.hpp"
namespace
zmq
{
...
...
@@ -41,43 +42,42 @@ namespace zmq
class
c_single_allocator
{
public
:
explicit
c_single_allocator
(
size_t
bufsize_
)
:
explicit
c_single_allocator
(
std
::
size_t
bufsize_
)
:
bufsize
(
bufsize_
),
buf
(
static_cast
<
unsigned
char
*>
(
malloc
(
bufsize
)
))
buf
(
static_cast
<
unsigned
char
*>
(
std
::
malloc
(
bufsize
)
))
{
alloc_assert
(
buf
);
}
~
c_single_allocator
()
~
c_single_allocator
()
{
std
::
free
(
buf
);
std
::
free
(
buf
);
}
unsigned
char
*
allocate
()
unsigned
char
*
allocate
()
{
return
buf
;
}
void
deallocate
()
void
deallocate
()
{
}
s
ize_t
size
()
const
s
td
::
size_t
size
()
const
{
return
bufsize
;
}
void
resize
(
size_t
new_size
)
void
resize
(
std
::
size_t
new_size
)
{
bufsize
=
new_size
;
}
private
:
size_t
bufsize
;
s
td
::
s
ize_t
bufsize
;
unsigned
char
*
buf
;
c_single_allocator
(
c_single_allocator
const
&
);
c_single_allocator
&
operator
=
(
c_single_allocator
const
&
);
c_single_allocator
(
c_single_allocator
const
&
);
c_single_allocator
&
operator
=
(
c_single_allocator
const
&
);
};
// This allocater allocates a reference counted buffer which is used by v2_decoder_t
...
...
@@ -92,58 +92,58 @@ namespace zmq
class
shared_message_memory_allocator
{
public
:
explicit
shared_message_memory_allocator
(
size_t
bufsize_
);
explicit
shared_message_memory_allocator
(
std
::
size_t
bufsize_
);
// Create an allocator for a maximum number of messages
shared_message_memory_allocator
(
size_t
bufsize_
,
size_t
maxMessages
);
shared_message_memory_allocator
(
std
::
size_t
bufsize_
,
std
::
size_t
maxMessages
);
~
shared_message_memory_allocator
();
~
shared_message_memory_allocator
();
// Allocate a new buffer
//
// This releases the current buffer to be bound to the lifetime of the messages
// created on this bufer.
unsigned
char
*
allocate
();
unsigned
char
*
allocate
();
// force deallocation of buffer.
void
deallocate
();
void
deallocate
();
// Give up ownership of the buffer. The buffer's lifetime is now coupled to
// the messages constructed on top of it.
unsigned
char
*
release
();
unsigned
char
*
release
();
void
inc_ref
();
void
inc_ref
();
static
void
call_dec_ref
(
void
*
,
void
*
buffer
);
static
void
call_dec_ref
(
void
*
,
void
*
buffer
);
s
ize_t
size
()
const
;
s
td
::
size_t
size
()
const
;
// Return pointer to the first message data byte.
unsigned
char
*
data
();
unsigned
char
*
data
();
// Return pointer to the first byte of the buffer.
unsigned
char
*
buffer
()
unsigned
char
*
buffer
()
{
return
buf
;
}
void
resize
(
size_t
new_size
)
void
resize
(
std
::
size_t
new_size
)
{
bufsize
=
new_size
;
}
//
zmq
::
atomic_counter_t
*
create_refcnt
()
zmq
::
atomic_counter_t
*
create_refcnt
()
{
return
msg_refcnt
++
;
}
private
:
unsigned
char
*
buf
;
size_t
bufsize
;
size_t
max_size
;
s
td
::
s
ize_t
bufsize
;
s
td
::
s
ize_t
max_size
;
zmq
::
atomic_counter_t
*
msg_refcnt
;
size_t
maxCounters
;
s
td
::
s
ize_t
maxCounters
;
};
}
#endif //ZEROMQ_DECODER_ALLOCATORS_HPP
#endif
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment