Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
8672f582
Commit
8672f582
authored
Sep 04, 2012
by
Martin Hurton
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Introduce abstract interface for message encoder/decoder
parent
a224c973
Show whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
142 additions
and
21 deletions
+142
-21
decoder.hpp
src/decoder.hpp
+2
-1
encoder.hpp
src/encoder.hpp
+2
-1
i_decoder.hpp
src/i_decoder.hpp
+49
-0
i_encoder.hpp
src/i_encoder.hpp
+54
-0
stream_engine.cpp
src/stream_engine.cpp
+31
-15
stream_engine.hpp
src/stream_engine.hpp
+4
-4
No files found.
src/decoder.hpp
View file @
8672f582
...
@@ -29,6 +29,7 @@
...
@@ -29,6 +29,7 @@
#include "err.hpp"
#include "err.hpp"
#include "msg.hpp"
#include "msg.hpp"
#include "i_decoder.hpp"
#include "stdint.hpp"
#include "stdint.hpp"
namespace
zmq
namespace
zmq
...
@@ -47,7 +48,7 @@ namespace zmq
...
@@ -47,7 +48,7 @@ namespace zmq
// This class implements the state machine that parses the incoming buffer.
// This class implements the state machine that parses the incoming buffer.
// Derived class should implement individual state machine actions.
// Derived class should implement individual state machine actions.
template
<
typename
T
>
class
decoder_base_t
template
<
typename
T
>
class
decoder_base_t
:
public
i_decoder
{
{
public
:
public
:
...
...
src/encoder.hpp
View file @
8672f582
...
@@ -29,6 +29,7 @@
...
@@ -29,6 +29,7 @@
#include "err.hpp"
#include "err.hpp"
#include "msg.hpp"
#include "msg.hpp"
#include "i_encoder.hpp"
namespace
zmq
namespace
zmq
{
{
...
@@ -39,7 +40,7 @@ namespace zmq
...
@@ -39,7 +40,7 @@ namespace zmq
// fills the outgoing buffer. Derived classes should implement individual
// fills the outgoing buffer. Derived classes should implement individual
// state machine actions.
// state machine actions.
template
<
typename
T
>
class
encoder_base_t
template
<
typename
T
>
class
encoder_base_t
:
public
i_encoder
{
{
public
:
public
:
...
...
src/i_decoder.hpp
0 → 100644
View file @
8672f582
/*
Copyright (c) 2007-2012 iMatix Corporation
Copyright (c) 2007-2012 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_I_DECODER_HPP_INCLUDED__
#define __ZMQ_I_DECODER_HPP_INCLUDED__
#include <stdint.h>
namespace
zmq
{
class
i_msg_sink
;
// Interface to be implemented by message decoder.
struct
i_decoder
{
virtual
~
i_decoder
()
{}
virtual
void
set_msg_sink
(
i_msg_sink
*
msg_sink_
)
=
0
;
virtual
void
get_buffer
(
unsigned
char
**
data_
,
size_t
*
size_
)
=
0
;
virtual
size_t
process_buffer
(
unsigned
char
*
data_
,
size_t
size_
)
=
0
;
virtual
bool
stalled
()
const
=
0
;
};
}
#endif
src/i_encoder.hpp
0 → 100644
View file @
8672f582
/*
Copyright (c) 2007-2012 iMatix Corporation
Copyright (c) 2007-2012 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_I_ENCODER_HPP_INCLUDED__
#define __ZMQ_I_ENCODER_HPP_INCLUDED__
#include <stdint.h>
namespace
zmq
{
// Forward declaration
class
i_msg_source
;
// Interface to be implemented by message encoder.
struct
i_encoder
{
virtual
~
i_encoder
()
{}
// Set message producer.
virtual
void
set_msg_source
(
i_msg_source
*
msg_source_
)
=
0
;
// The function returns a batch of binary data. The data
// are filled to a supplied buffer. If no buffer is supplied (data_
// is NULL) encoder will provide buffer of its own.
// If offset is not NULL, it is filled by offset of the first message
// in the batch.If there's no beginning of a message in the batch,
// offset is set to -1.
virtual
void
get_data
(
unsigned
char
**
data_
,
size_t
*
size_
,
int
*
offset_
=
NULL
)
=
0
;
};
}
#endif
src/stream_engine.cpp
View file @
8672f582
...
@@ -38,6 +38,8 @@
...
@@ -38,6 +38,8 @@
#include "stream_engine.hpp"
#include "stream_engine.hpp"
#include "io_thread.hpp"
#include "io_thread.hpp"
#include "session_base.hpp"
#include "session_base.hpp"
#include "encoder.hpp"
#include "decoder.hpp"
#include "config.hpp"
#include "config.hpp"
#include "err.hpp"
#include "err.hpp"
#include "ip.hpp"
#include "ip.hpp"
...
@@ -48,11 +50,11 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons
...
@@ -48,11 +50,11 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons
s
(
fd_
),
s
(
fd_
),
inpos
(
NULL
),
inpos
(
NULL
),
insize
(
0
),
insize
(
0
),
decoder
(
in_batch_size
,
options_
.
maxmsgsize
),
decoder
(
NULL
),
input_error
(
false
),
input_error
(
false
),
outpos
(
NULL
),
outpos
(
NULL
),
outsize
(
0
),
outsize
(
0
),
encoder
(
out_batch_size
),
encoder
(
NULL
),
handshaking
(
true
),
handshaking
(
true
),
greeting_bytes_read
(
0
),
greeting_bytes_read
(
0
),
greeting_size
(
0
),
greeting_size
(
0
),
...
@@ -106,6 +108,11 @@ zmq::stream_engine_t::~stream_engine_t ()
...
@@ -106,6 +108,11 @@ zmq::stream_engine_t::~stream_engine_t ()
#endif
#endif
s
=
retired_fd
;
s
=
retired_fd
;
}
}
if
(
encoder
!=
NULL
)
delete
encoder
;
if
(
decoder
!=
NULL
)
delete
decoder
;
}
}
void
zmq
::
stream_engine_t
::
plug
(
io_thread_t
*
io_thread_
,
void
zmq
::
stream_engine_t
::
plug
(
io_thread_t
*
io_thread_
,
...
@@ -156,8 +163,10 @@ void zmq::stream_engine_t::unplug ()
...
@@ -156,8 +163,10 @@ void zmq::stream_engine_t::unplug ()
io_object_t
::
unplug
();
io_object_t
::
unplug
();
// Disconnect from session object.
// Disconnect from session object.
encoder
.
set_msg_source
(
NULL
);
if
(
encoder
)
decoder
.
set_msg_sink
(
NULL
);
encoder
->
set_msg_source
(
NULL
);
if
(
decoder
)
decoder
->
set_msg_sink
(
NULL
);
session
=
NULL
;
session
=
NULL
;
}
}
...
@@ -174,6 +183,7 @@ void zmq::stream_engine_t::in_event ()
...
@@ -174,6 +183,7 @@ void zmq::stream_engine_t::in_event ()
if
(
!
handshake
())
if
(
!
handshake
())
return
;
return
;
zmq_assert
(
decoder
);
bool
disconnection
=
false
;
bool
disconnection
=
false
;
// If there's no data to process in the buffer...
// If there's no data to process in the buffer...
...
@@ -183,7 +193,7 @@ void zmq::stream_engine_t::in_event ()
...
@@ -183,7 +193,7 @@ void zmq::stream_engine_t::in_event ()
// Note that buffer can be arbitrarily large. However, we assume
// Note that buffer can be arbitrarily large. However, we assume
// the underlying TCP layer has fixed buffer size and thus the
// the underlying TCP layer has fixed buffer size and thus the
// number of bytes read will be always limited.
// number of bytes read will be always limited.
decoder
.
get_buffer
(
&
inpos
,
&
insize
);
decoder
->
get_buffer
(
&
inpos
,
&
insize
);
insize
=
read
(
inpos
,
insize
);
insize
=
read
(
inpos
,
insize
);
// Check whether the peer has closed the connection.
// Check whether the peer has closed the connection.
...
@@ -194,7 +204,7 @@ void zmq::stream_engine_t::in_event ()
...
@@ -194,7 +204,7 @@ void zmq::stream_engine_t::in_event ()
}
}
// Push the data to the decoder.
// Push the data to the decoder.
size_t
processed
=
decoder
.
process_buffer
(
inpos
,
insize
);
size_t
processed
=
decoder
->
process_buffer
(
inpos
,
insize
);
if
(
unlikely
(
processed
==
(
size_t
)
-
1
))
{
if
(
unlikely
(
processed
==
(
size_t
)
-
1
))
{
disconnection
=
true
;
disconnection
=
true
;
...
@@ -220,7 +230,7 @@ void zmq::stream_engine_t::in_event ()
...
@@ -220,7 +230,7 @@ void zmq::stream_engine_t::in_event ()
// until after the session has accepted the message.
// until after the session has accepted the message.
if
(
disconnection
)
{
if
(
disconnection
)
{
input_error
=
true
;
input_error
=
true
;
if
(
decoder
.
stalled
())
if
(
decoder
->
stalled
())
reset_pollin
(
handle
);
reset_pollin
(
handle
);
else
else
error
();
error
();
...
@@ -233,7 +243,8 @@ void zmq::stream_engine_t::out_event ()
...
@@ -233,7 +243,8 @@ void zmq::stream_engine_t::out_event ()
if
(
!
outsize
)
{
if
(
!
outsize
)
{
outpos
=
NULL
;
outpos
=
NULL
;
encoder
.
get_data
(
&
outpos
,
&
outsize
);
zmq_assert
(
encoder
);
encoder
->
get_data
(
&
outpos
,
&
outsize
);
// If there is no data to send, stop polling for output.
// If there is no data to send, stop polling for output.
if
(
outsize
==
0
)
{
if
(
outsize
==
0
)
{
...
@@ -284,8 +295,9 @@ void zmq::stream_engine_t::activate_in ()
...
@@ -284,8 +295,9 @@ void zmq::stream_engine_t::activate_in ()
// There was an input error but the engine could not
// There was an input error but the engine could not
// be terminated (due to the stalled decoder).
// be terminated (due to the stalled decoder).
// Flush the pending message and terminate the engine now.
// Flush the pending message and terminate the engine now.
decoder
.
process_buffer
(
inpos
,
0
);
zmq_assert
(
decoder
);
zmq_assert
(
!
decoder
.
stalled
());
decoder
->
process_buffer
(
inpos
,
0
);
zmq_assert
(
!
decoder
->
stalled
());
session
->
flush
();
session
->
flush
();
error
();
error
();
return
;
return
;
...
@@ -379,8 +391,11 @@ bool zmq::stream_engine_t::handshake ()
...
@@ -379,8 +391,11 @@ bool zmq::stream_engine_t::handshake ()
// We have received either a header of identity message
// We have received either a header of identity message
// or the whole greeting.
// or the whole greeting.
encoder
.
set_msg_source
(
session
);
encoder
=
new
(
std
::
nothrow
)
encoder_t
(
out_batch_size
);
decoder
.
set_msg_sink
(
session
);
decoder
=
new
(
std
::
nothrow
)
decoder_t
(
in_batch_size
,
options
.
maxmsgsize
);
encoder
->
set_msg_source
(
session
);
decoder
->
set_msg_sink
(
session
);
zmq_assert
(
greeting
[
0
]
!=
0xff
||
greeting_bytes_read
>=
10
);
zmq_assert
(
greeting
[
0
]
!=
0xff
||
greeting_bytes_read
>=
10
);
...
@@ -395,7 +410,7 @@ bool zmq::stream_engine_t::handshake ()
...
@@ -395,7 +410,7 @@ bool zmq::stream_engine_t::handshake ()
const
size_t
header_size
=
options
.
identity_size
+
1
>=
255
?
10
:
2
;
const
size_t
header_size
=
options
.
identity_size
+
1
>=
255
?
10
:
2
;
unsigned
char
tmp
[
10
],
*
bufferp
=
tmp
;
unsigned
char
tmp
[
10
],
*
bufferp
=
tmp
;
size_t
buffer_size
=
header_size
;
size_t
buffer_size
=
header_size
;
encoder
.
get_data
(
&
bufferp
,
&
buffer_size
);
encoder
->
get_data
(
&
bufferp
,
&
buffer_size
);
zmq_assert
(
buffer_size
==
header_size
);
zmq_assert
(
buffer_size
==
header_size
);
// Make sure the decoder sees the data we have already received.
// Make sure the decoder sees the data we have already received.
...
@@ -408,7 +423,7 @@ bool zmq::stream_engine_t::handshake ()
...
@@ -408,7 +423,7 @@ bool zmq::stream_engine_t::handshake ()
// message right after the identity message, we temporarily
// message right after the identity message, we temporarily
// divert the message stream from session to ourselves.
// divert the message stream from session to ourselves.
if
(
options
.
type
==
ZMQ_PUB
||
options
.
type
==
ZMQ_XPUB
)
if
(
options
.
type
==
ZMQ_PUB
||
options
.
type
==
ZMQ_XPUB
)
decoder
.
set_msg_sink
(
this
);
decoder
->
set_msg_sink
(
this
);
}
}
// Start polling for output if necessary.
// Start polling for output if necessary.
...
@@ -441,7 +456,8 @@ int zmq::stream_engine_t::push_msg (msg_t *msg_)
...
@@ -441,7 +456,8 @@ int zmq::stream_engine_t::push_msg (msg_t *msg_)
// Once we have injected the subscription message, we can
// Once we have injected the subscription message, we can
// Divert the message flow back to the session.
// Divert the message flow back to the session.
decoder
.
set_msg_sink
(
session
);
zmq_assert
(
decoder
);
decoder
->
set_msg_sink
(
session
);
return
rc
;
return
rc
;
}
}
...
...
src/stream_engine.hpp
View file @
8672f582
...
@@ -28,8 +28,8 @@
...
@@ -28,8 +28,8 @@
#include "i_engine.hpp"
#include "i_engine.hpp"
#include "i_msg_sink.hpp"
#include "i_msg_sink.hpp"
#include "io_object.hpp"
#include "io_object.hpp"
#include "encoder.hpp"
#include "
i_
encoder.hpp"
#include "decoder.hpp"
#include "
i_
decoder.hpp"
#include "options.hpp"
#include "options.hpp"
#include "../include/zmq.h"
#include "../include/zmq.h"
...
@@ -105,12 +105,12 @@ namespace zmq
...
@@ -105,12 +105,12 @@ namespace zmq
unsigned
char
*
inpos
;
unsigned
char
*
inpos
;
size_t
insize
;
size_t
insize
;
decoder_t
decoder
;
i_decoder
*
decoder
;
bool
input_error
;
bool
input_error
;
unsigned
char
*
outpos
;
unsigned
char
*
outpos
;
size_t
outsize
;
size_t
outsize
;
encoder_t
encoder
;
i_encoder
*
encoder
;
// When true, we are still trying to determine whether
// When true, we are still trying to determine whether
// the peer is using versioned protocol, and if so, which
// the peer is using versioned protocol, and if so, which
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment