Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
c91a638a
Commit
c91a638a
authored
Mar 15, 2014
by
bebopagogo
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
added norm_engine
parent
49e035fb
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
893 additions
and
0 deletions
+893
-0
norm_engine.cpp
src/norm_engine.cpp
+704
-0
norm_engine.hpp
src/norm_engine.hpp
+189
-0
No files found.
src/norm_engine.cpp
0 → 100644
View file @
c91a638a
#include "platform.hpp"
#define ZMQ_HAVE_NORM 1
#if defined ZMQ_HAVE_NORM
#include "norm_engine.hpp"
#include "session_base.hpp"
#include "v2_protocol.hpp"
zmq
::
norm_engine_t
::
norm_engine_t
(
io_thread_t
*
parent_
,
const
options_t
&
options_
)
:
io_object_t
(
parent_
),
zmq_session
(
NULL
),
options
(
options_
),
norm_instance
(
NORM_INSTANCE_INVALID
),
norm_session
(
NORM_SESSION_INVALID
),
is_sender
(
false
),
is_receiver
(
false
),
zmq_encoder
(
0
),
tx_more_bit
(
false
),
zmq_output_ready
(
false
),
norm_tx_stream
(
NORM_OBJECT_INVALID
),
norm_tx_ready
(
false
),
tx_index
(
0
),
tx_len
(
0
),
zmq_input_ready
(
false
)
{
int
rc
=
tx_msg
.
init
();
errno_assert
(
0
==
rc
);
}
zmq
::
norm_engine_t
::~
norm_engine_t
()
{
shutdown
();
// in case it was not already called
}
int
zmq
::
norm_engine_t
::
init
(
const
char
*
network_
,
bool
send
,
bool
recv
)
{
// Parse the "network_" address int "iface", "addr", and "port"
// norm endpoint format: [id,][<iface>;]<addr>:<port>
// First, look for optional local NormNodeId
NormNodeId
localId
=
NORM_NODE_ANY
;
const
char
*
ifacePtr
=
strchr
(
network_
,
','
);
if
(
NULL
!=
ifacePtr
)
{
size_t
idLen
=
ifacePtr
-
network_
;
if
(
idLen
>
31
)
idLen
=
31
;
char
idText
[
32
];
strncpy
(
idText
,
network_
,
idLen
);
idText
[
idLen
]
=
'\0'
;
localId
=
(
NormNodeId
)
atoi
(
idText
);
ifacePtr
++
;
}
else
{
ifacePtr
=
network_
;
}
// Second, look for optional multicast ifaceName
char
ifaceName
[
256
];
const
char
*
addrPtr
=
strchr
(
ifacePtr
,
';'
);
if
(
NULL
!=
addrPtr
)
{
size_t
ifaceLen
=
addrPtr
-
ifacePtr
;
if
(
ifaceLen
>
255
)
ifaceLen
=
255
;
// return error instead?
strncpy
(
ifaceName
,
ifacePtr
,
ifaceLen
);
ifaceName
[
ifaceLen
]
=
'\0'
;
ifacePtr
=
ifaceName
;
addrPtr
++
;
}
else
{
addrPtr
=
ifacePtr
;
ifacePtr
=
NULL
;
}
// Finally, parse IP address and port number
const
char
*
portPtr
=
strrchr
(
addrPtr
,
':'
);
if
(
NULL
==
portPtr
)
{
errno
=
EINVAL
;
return
-
1
;
}
char
addr
[
256
];
size_t
addrLen
=
portPtr
-
addrPtr
;
if
(
addrLen
>
255
)
addrLen
=
255
;
strncpy
(
addr
,
addrPtr
,
addrLen
);
addr
[
addrLen
]
=
'\0'
;
portPtr
++
;
unsigned
short
portNumber
=
atoi
(
portPtr
);
if
(
NORM_INSTANCE_INVALID
==
norm_instance
)
{
if
(
NORM_INSTANCE_INVALID
==
(
norm_instance
=
NormCreateInstance
()))
{
// errno set by whatever caused NormCreateInstance() to fail
return
-
1
;
}
}
// TBD - What do we use for our local NormNodeId?
// (for now we use automatic, IP addr based assignment or passed in 'id')
// a) Add function to use iface addr
// b) Randomize and implement a NORM session layer
// conflict detection/resolution protocol
norm_session
=
NormCreateSession
(
norm_instance
,
addr
,
portNumber
,
localId
);
if
(
NORM_SESSION_INVALID
==
norm_session
)
{
int
savedErrno
=
errno
;
NormDestroyInstance
(
norm_instance
);
norm_instance
=
NORM_INSTANCE_INVALID
;
errno
=
savedErrno
;
return
-
1
;
}
if
(
NormIsUnicastAddress
(
addr
))
{
NormSetDefaultUnicastNack
(
norm_session
,
true
);
}
else
{
// These only apply for multicast sessions
//NormSetTTL(norm_session, options.multicast_hops); // ZMQ default is 1
NormSetTTL
(
norm_session
,
255
);
// since the ZMQ_MULTICAST_HOPS socket option isn't well-supported
NormSetRxPortReuse
(
norm_session
,
true
);
// port reuse doesn't work for non-connected unicast
NormSetLoopback
(
norm_session
,
true
);
// needed when multicast users on same machine
if
(
NULL
!=
ifacePtr
)
{
// Note a bad interface may not be caught until sender or receiver start
// (Since sender/receiver is not yet started, this always succeeds here)
NormSetMulticastInterface
(
norm_session
,
ifacePtr
);
}
}
if
(
recv
)
{
NormSetDefaultSyncPolicy
(
norm_session
,
NORM_SYNC_STREAM
);
if
(
!
NormStartReceiver
(
norm_session
,
2
*
1024
*
1024
))
{
// errno set by whatever failed
int
savedErrno
=
errno
;
NormDestroyInstance
(
norm_instance
);
// session gets closed, too
norm_session
=
NORM_SESSION_INVALID
;
norm_instance
=
NORM_INSTANCE_INVALID
;
errno
=
savedErrno
;
return
-
1
;
}
is_receiver
=
true
;
}
if
(
send
)
{
// Pick a random sender instance id (aka norm sender session id)
NormSessionId
instanceId
=
NormGetRandomSessionId
();
// TBD - provide "options" for some NORM sender parameters
if
(
!
NormStartSender
(
norm_session
,
instanceId
,
2
*
1024
*
1024
,
1400
,
16
,
4
))
{
// errno set by whatever failed
int
savedErrno
=
errno
;
NormDestroyInstance
(
norm_instance
);
// session gets closed, too
norm_session
=
NORM_SESSION_INVALID
;
norm_instance
=
NORM_INSTANCE_INVALID
;
errno
=
savedErrno
;
return
-
1
;
}
NormSetCongestionControl
(
norm_session
,
true
);
norm_tx_ready
=
true
;
is_sender
=
true
;
if
(
NORM_OBJECT_INVALID
==
(
norm_tx_stream
=
NormStreamOpen
(
norm_session
,
2
*
1024
*
1024
)))
{
// errno set by whatever failed
int
savedErrno
=
errno
;
NormDestroyInstance
(
norm_instance
);
// session gets closed, too
norm_session
=
NORM_SESSION_INVALID
;
norm_instance
=
NORM_INSTANCE_INVALID
;
errno
=
savedErrno
;
return
-
1
;
}
}
return
0
;
// no error
}
// end zmq::norm_engine_t::init()
void
zmq
::
norm_engine_t
::
shutdown
()
{
// TBD - implement a more graceful shutdown option
if
(
is_receiver
)
{
NormStopReceiver
(
norm_session
);
// delete any active NormRxStreamState
rx_pending_list
.
Destroy
();
rx_ready_list
.
Destroy
();
msg_ready_list
.
Destroy
();
is_receiver
=
false
;
}
if
(
is_sender
)
{
NormStopSender
(
norm_session
);
is_sender
=
false
;
}
if
(
NORM_SESSION_INVALID
!=
norm_session
)
{
NormDestroySession
(
norm_session
);
norm_session
=
NORM_SESSION_INVALID
;
}
if
(
NORM_INSTANCE_INVALID
!=
norm_instance
)
{
NormStopInstance
(
norm_instance
);
NormDestroyInstance
(
norm_instance
);
norm_instance
=
NORM_INSTANCE_INVALID
;
}
}
// end zmq::norm_engine_t::shutdown()
void
zmq
::
norm_engine_t
::
plug
(
io_thread_t
*
io_thread_
,
session_base_t
*
session_
)
{
// TBD - we may assign the NORM engine to an io_thread in the future???
zmq_session
=
session_
;
if
(
is_sender
)
zmq_output_ready
=
true
;
if
(
is_receiver
)
zmq_input_ready
=
true
;
fd_t
normDescriptor
=
NormGetDescriptor
(
norm_instance
);
norm_descriptor_handle
=
add_fd
(
normDescriptor
);
// Set POLLIN for notification of pending NormEvents
set_pollin
(
norm_descriptor_handle
);
if
(
is_sender
)
send_data
();
}
// end zmq::norm_engine_t::init()
void
zmq
::
norm_engine_t
::
unplug
()
{
rm_fd
(
norm_descriptor_handle
);
zmq_session
=
NULL
;
}
// end zmq::norm_engine_t::unplug()
void
zmq
::
norm_engine_t
::
terminate
()
{
unplug
();
shutdown
();
delete
this
;
}
void
zmq
::
norm_engine_t
::
restart_output
()
{
// There's new message data available from the session
zmq_output_ready
=
true
;
if
(
norm_tx_ready
)
send_data
();
}
// end zmq::norm_engine_t::restart_output()
void
zmq
::
norm_engine_t
::
send_data
()
{
// Here we write as much as is available or we can
while
(
zmq_output_ready
&&
norm_tx_ready
)
{
// Do we have data in our tx_buffer pending
if
(
tx_index
<
tx_len
)
{
tx_index
+=
NormStreamWrite
(
norm_tx_stream
,
tx_buffer
+
tx_index
,
tx_len
-
tx_index
);
if
(
tx_index
<
tx_len
)
{
// NORM stream buffer full, wait for NORM_TX_QUEUE_VACANCY
norm_tx_ready
=
false
;
break
;
}
else
if
(
!
zmq_encoder
.
has_data
())
{
// Buffer contained end of message (should we flush?)
//NormStreamMarkEom(norm_tx_stream);
// Note this makes NORM fairly chatty for low duty cycle messaging
NormStreamFlush
(
norm_tx_stream
,
true
,
NORM_FLUSH_ACTIVE
);
}
tx_index
=
tx_len
=
0
;
// all buffered data was written
}
// Still norm_tx_ready, so ask for more data from zmq_session
if
(
!
zmq_encoder
.
has_data
())
{
// Existing message had no more data to encode
if
(
-
1
==
zmq_session
->
pull_msg
(
&
tx_msg
))
{
// We need to wait for "restart_output()" to be called by ZMQ
zmq_output_ready
=
false
;
break
;
}
zmq_encoder
.
load_msg
(
&
tx_msg
);
// Should we write message size header for NORM to use? Or expect NORM
// receiver to decode ZMQ message framing format(s)?
// OK - we need to use a byte to denote when the ZMQ frame is the _first_
// frame of a message so it can be decoded properly when a receiver
// 'syncs' mid-stream. We key off the the state of the 'more_flag'
// I.e.,If more_flag _was_ false previously, this is the first
// frame of a ZMQ message.
if
(
tx_more_bit
)
tx_buffer
[
0
]
=
(
char
)
0xff
;
// this is not first frame of message
else
tx_buffer
[
0
]
=
0x00
;
// this is first frame of message
tx_more_bit
=
(
0
!=
(
tx_msg
.
flags
()
&
v2_protocol_t
::
more_flag
));
tx_len
=
1
;
}
// Get more data from encoder
size_t
space
=
BUFFER_SIZE
-
tx_index
;
unsigned
char
*
bufPtr
=
(
unsigned
char
*
)(
tx_buffer
+
tx_len
);
size_t
bytes
=
zmq_encoder
.
encode
(
&
bufPtr
,
space
);
tx_len
+=
bytes
;
}
}
// end zmq::norm_engine_t::send_data()
void
zmq
::
norm_engine_t
::
in_event
()
{
// This means a NormEvent is pending, so call NormGetNextEvent() and handle
NormEvent
event
;
if
(
!
NormGetNextEvent
(
norm_instance
,
&
event
))
{
// NORM has died before we unplugged?!
zmq_assert
(
false
);
return
;
}
switch
(
event
.
type
)
{
case
NORM_TX_QUEUE_VACANCY
:
case
NORM_TX_QUEUE_EMPTY
:
if
(
!
norm_tx_ready
)
{
norm_tx_ready
=
true
;
send_data
();
}
break
;
case
NORM_RX_OBJECT_NEW
:
break
;
case
NORM_RX_OBJECT_UPDATED
:
recv_data
(
event
.
object
);
break
;
case
NORM_RX_OBJECT_ABORTED
:
{
NormRxStreamState
*
rxState
=
(
NormRxStreamState
*
)
NormObjectGetUserData
(
event
.
object
);
if
(
NULL
!=
rxState
)
{
// Remove the state from the list it's in
// This is now unnecessary since deletion takes care of list removal
// but in the interest of being clear ...
NormRxStreamState
::
List
*
list
=
rxState
->
AccessList
();
if
(
NULL
!=
list
)
list
->
Remove
(
*
rxState
);
}
delete
rxState
;
}
break
;
case
NORM_REMOTE_SENDER_INACTIVE
:
// Here we free resources used for this formerly active sender.
// Note w/ NORM_SYNC_STREAM, if sender reactivates, we may
// get some messages delivered twice. NORM_SYNC_CURRENT would
// mitigate that but might miss data at startup. Always tradeoffs.
// Instead of immediately deleting, we could instead initiate a
// user configurable timeout here to wait some amount of time
// after this event to declare the remote sender truly dead
// and delete its state???
NormNodeDelete
(
event
.
sender
);
break
;
default
:
// We ignore some NORM events
break
;
}
}
// zmq::norm_engine_t::in_event()
void
zmq
::
norm_engine_t
::
restart_input
()
{
// TBD - should we check/assert that zmq_input_ready was false???
zmq_input_ready
=
true
;
// Process any pending received messages
if
(
!
msg_ready_list
.
IsEmpty
())
recv_data
(
NORM_OBJECT_INVALID
);
}
// end zmq::norm_engine_t::restart_input()
void
zmq
::
norm_engine_t
::
recv_data
(
NormObjectHandle
object
)
{
if
(
NORM_OBJECT_INVALID
!=
object
)
{
// Call result of NORM_RX_OBJECT_UPDATED notification
// This is a rx_ready indication for a new or existing rx stream
// First, determine if this is a stream we already know
zmq_assert
(
NORM_OBJECT_STREAM
==
NormObjectGetType
(
object
));
// Since there can be multiple senders (publishers), we keep
// state for each separate rx stream.
NormRxStreamState
*
rxState
=
(
NormRxStreamState
*
)
NormObjectGetUserData
(
object
);
if
(
NULL
==
rxState
)
{
// This is a new stream, so create rxState with zmq decoder, etc
rxState
=
new
NormRxStreamState
(
object
,
options
.
maxmsgsize
);
if
(
!
rxState
->
Init
())
{
errno_assert
(
false
);
delete
rxState
;
return
;
}
NormObjectSetUserData
(
object
,
rxState
);
}
else
if
(
!
rxState
->
IsRxReady
())
{
// Existing non-ready stream, so remove from pending
// list to be promoted to rx_ready_list ...
rx_pending_list
.
Remove
(
*
rxState
);
}
if
(
!
rxState
->
IsRxReady
())
{
// TBD - prepend up front for immediate service?
rxState
->
SetRxReady
(
true
);
rx_ready_list
.
Append
(
*
rxState
);
}
}
// This loop repeats until we've read all data available from "rx ready" inbound streams
// and pushed any accumulated messages we can up to the zmq session.
while
(
!
rx_ready_list
.
IsEmpty
()
||
(
zmq_input_ready
&&
!
msg_ready_list
.
IsEmpty
()))
{
// Iterate through our rx_ready streams, reading data into the decoder
// (This services incoming "rx ready" streams in a round-robin fashion)
NormRxStreamState
::
List
::
Iterator
iterator
(
rx_ready_list
);
NormRxStreamState
*
rxState
;
while
(
NULL
!=
(
rxState
=
iterator
.
GetNextItem
()))
{
switch
(
rxState
->
Decode
())
{
case
1
:
// msg completed
// Complete message decoded, move this stream to msg_ready_list
// to push the message up to the session below. Note the stream
// will be returned to the "rx_ready_list" after that's done
rx_ready_list
.
Remove
(
*
rxState
);
msg_ready_list
.
Append
(
*
rxState
);
continue
;
case
-
1
:
// decoding error (shouldn't happen w/ NORM, but ...)
// We need to re-sync this stream (decoder buffer was reset)
rxState
->
SetSync
(
false
);
break
;
default
:
// 0 - need more data
break
;
}
// Get more data from this stream
NormObjectHandle
stream
=
rxState
->
GetStreamHandle
();
// First, make sure we're in sync ...
while
(
!
rxState
->
InSync
())
{
// seek NORM message start
if
(
!
NormStreamSeekMsgStart
(
stream
))
{
// Need to wait for more data
break
;
}
// read message 'flag' byte to see if this it's a 'final' frame
char
syncFlag
;
unsigned
int
numBytes
=
1
;
if
(
!
NormStreamRead
(
stream
,
&
syncFlag
,
&
numBytes
))
{
// broken stream (shouldn't happen after seek msg start?)
zmq_assert
(
false
);
continue
;
}
if
(
0
==
numBytes
)
{
// This probably shouldn't happen either since we found msg start
// Need to wait for more data
break
;
}
if
(
0
==
syncFlag
)
rxState
->
SetSync
(
true
);
// else keep seeking ...
}
// end while(!rxState->InSync())
if
(
!
rxState
->
InSync
())
{
// Need more data for this stream, so remove from "rx ready"
// list and iterate to next "rx ready" stream
rxState
->
SetRxReady
(
false
);
// Move from rx_ready_list to rx_pending_list
rx_ready_list
.
Remove
(
*
rxState
);
rx_pending_list
.
Append
(
*
rxState
);
continue
;
}
// Now we're actually ready to read data from the NORM stream to the zmq_decoder
// the underlying zmq_decoder->get_buffer() call sets how much is needed.
unsigned
int
numBytes
=
rxState
->
GetBytesNeeded
();
if
(
!
NormStreamRead
(
stream
,
rxState
->
AccessBuffer
(),
&
numBytes
))
{
// broken NORM stream, so re-sync
rxState
->
Init
();
// TBD - check result
// This will retry syncing, and getting data from this stream
// since we don't increment the "it" iterator
continue
;
}
rxState
->
IncrementBufferCount
(
numBytes
);
if
(
0
==
numBytes
)
{
// All the data available has been read
// Need to wait for NORM_RX_OBJECT_UPDATED for this stream
rxState
->
SetRxReady
(
false
);
// Move from rx_ready_list to rx_pending_list
rx_ready_list
.
Remove
(
*
rxState
);
rx_pending_list
.
Append
(
*
rxState
);
}
}
// end while(NULL != (rxState = iterator.GetNextItem()))
if
(
zmq_input_ready
)
{
// At this point, we've made a pass through the "rx_ready" stream list
// Now make a pass through the "msg_pending" list (if the zmq session
// ready for more input). This may possibly return streams back to
// the "rx ready" stream list after their pending message is handled
NormRxStreamState
::
List
::
Iterator
iterator
(
msg_ready_list
);
NormRxStreamState
*
rxState
;
while
(
NULL
!=
(
rxState
=
iterator
.
GetNextItem
()))
{
msg_t
*
msg
=
rxState
->
AccessMsg
();
int
rc
=
zmq_session
->
push_msg
(
msg
);
if
(
-
1
==
rc
)
{
if
(
EAGAIN
==
errno
)
{
// need to wait until session calls "restart_input()"
zmq_input_ready
=
false
;
break
;
}
else
{
// session rejected message?
// TBD - handle this better
zmq_assert
(
false
);
}
}
// else message was accepted.
msg_ready_list
.
Remove
(
*
rxState
);
if
(
rxState
->
IsRxReady
())
// Move back to "rx_ready" list to read more data
rx_ready_list
.
Append
(
*
rxState
);
else
// Move back to "rx_pending" list until NORM_RX_OBJECT_UPDATED
msg_ready_list
.
Append
(
*
rxState
);
}
// end while(NULL != (rxState = iterator.GetNextItem()))
}
// end if (zmq_input_ready)
}
// end while ((!rx_ready_list.empty() || (zmq_input_ready && !msg_ready_list.empty()))
// Alert zmq of the messages we have pushed up
zmq_session
->
flush
();
}
// end zmq::norm_engine_t::recv_data()
zmq
::
norm_engine_t
::
NormRxStreamState
::
NormRxStreamState
(
NormObjectHandle
normStream
,
int64_t
maxMsgSize
)
:
norm_stream
(
normStream
),
max_msg_size
(
maxMsgSize
),
in_sync
(
false
),
rx_ready
(
false
),
zmq_decoder
(
NULL
),
skip_norm_sync
(
false
),
buffer_ptr
(
NULL
),
buffer_size
(
0
),
buffer_count
(
0
),
prev
(
NULL
),
next
(
NULL
),
list
(
NULL
)
{
}
zmq
::
norm_engine_t
::
NormRxStreamState
::~
NormRxStreamState
()
{
if
(
NULL
!=
zmq_decoder
)
{
delete
zmq_decoder
;
zmq_decoder
=
NULL
;
}
if
(
NULL
!=
list
)
{
list
->
Remove
(
*
this
);
list
=
NULL
;
}
}
bool
zmq
::
norm_engine_t
::
NormRxStreamState
::
Init
()
{
in_sync
=
false
;
skip_norm_sync
=
false
;
if
(
NULL
!=
zmq_decoder
)
delete
zmq_decoder
;
// Note "in_batch_size" comes from config.h
zmq_decoder
=
new
(
std
::
nothrow
)
v2_decoder_t
(
in_batch_size
,
max_msg_size
);
alloc_assert
(
zmq_decoder
);
if
(
NULL
!=
zmq_decoder
)
{
buffer_count
=
0
;
buffer_size
=
0
;
zmq_decoder
->
get_buffer
(
&
buffer_ptr
,
&
buffer_size
);
return
true
;
}
else
{
return
false
;
}
}
// end zmq::norm_engine_t::NormRxStreamState::Init()
// This decodes any pending data sitting in our stream decoder buffer
// It returns 1 upon message completion, -1 on error, 1 on msg completion
int
zmq
::
norm_engine_t
::
NormRxStreamState
::
Decode
()
{
// If we have pending bytes to decode, process those first
while
(
buffer_count
>
0
)
{
// There's pending data for the decoder to decode
size_t
processed
=
0
;
// This a bit of a kludgy approach used to weed
// out the NORM ZMQ message transport "syncFlag" byte
// from the ZMQ message stream being decoded (but it works!)
if
(
skip_norm_sync
)
{
buffer_ptr
++
;
buffer_count
--
;
skip_norm_sync
=
false
;
}
int
rc
=
zmq_decoder
->
decode
(
buffer_ptr
,
buffer_count
,
processed
);
buffer_ptr
+=
processed
;
buffer_count
-=
processed
;
switch
(
rc
)
{
case
1
:
// msg completed
if
(
0
==
buffer_count
)
{
buffer_size
=
0
;
zmq_decoder
->
get_buffer
(
&
buffer_ptr
,
&
buffer_size
);
}
skip_norm_sync
=
true
;
return
1
;
case
-
1
:
// decoder error (reset decoder and state variables)
in_sync
=
false
;
skip_norm_sync
=
false
;
// will get consumed by norm sync check
Init
();
break
;
case
0
:
// need more data, keep decoding until buffer exhausted
break
;
}
}
// Reset buffer pointer/count for next read
buffer_count
=
0
;
buffer_size
=
0
;
zmq_decoder
->
get_buffer
(
&
buffer_ptr
,
&
buffer_size
);
return
0
;
// need more data
}
// end zmq::norm_engine_t::NormRxStreamState::Decode()
zmq
::
norm_engine_t
::
NormRxStreamState
::
List
::
List
()
:
head
(
NULL
),
tail
(
NULL
)
{
}
zmq
::
norm_engine_t
::
NormRxStreamState
::
List
::~
List
()
{
Destroy
();
}
void
zmq
::
norm_engine_t
::
NormRxStreamState
::
List
::
Destroy
()
{
NormRxStreamState
*
item
=
head
;
while
(
NULL
!=
item
)
{
Remove
(
*
item
);
delete
item
;
item
=
head
;
}
}
// end zmq::norm_engine_t::NormRxStreamState::List::Destroy()
void
zmq
::
norm_engine_t
::
NormRxStreamState
::
List
::
Append
(
NormRxStreamState
&
item
)
{
item
.
prev
=
tail
;
if
(
NULL
!=
tail
)
tail
->
next
=
&
item
;
else
head
=
&
item
;
item
.
next
=
NULL
;
tail
=
&
item
;
item
.
list
=
this
;
}
// end zmq::norm_engine_t::NormRxStreamState::List::Append()
void
zmq
::
norm_engine_t
::
NormRxStreamState
::
List
::
Remove
(
NormRxStreamState
&
item
)
{
if
(
NULL
!=
item
.
prev
)
item
.
prev
->
next
=
item
.
next
;
else
head
=
item
.
next
;
if
(
NULL
!=
item
.
next
)
item
.
next
->
prev
=
item
.
prev
;
else
tail
=
item
.
prev
;
item
.
prev
=
item
.
next
=
NULL
;
item
.
list
=
NULL
;
}
// end zmq::norm_engine_t::NormRxStreamState::List::Remove()
zmq
::
norm_engine_t
::
NormRxStreamState
::
List
::
Iterator
::
Iterator
(
const
List
&
list
)
:
next_item
(
list
.
head
)
{
}
zmq
::
norm_engine_t
::
NormRxStreamState
*
zmq
::
norm_engine_t
::
NormRxStreamState
::
List
::
Iterator
::
GetNextItem
()
{
NormRxStreamState
*
nextItem
=
next_item
;
if
(
NULL
!=
nextItem
)
next_item
=
nextItem
->
next
;
return
nextItem
;
}
// end zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem()
#endif // ZMQ_HAVE_NORM
src/norm_engine.hpp
0 → 100644
View file @
c91a638a
#ifndef __ZMQ_NORM_ENGINE_HPP_INCLUDED__
#define __ZMQ_NORM_ENGINE_HPP_INCLUDED__
#define ZMQ_HAVE_NORM 1
#if defined ZMQ_HAVE_NORM
#include "io_object.hpp"
#include "i_engine.hpp"
#include "options.hpp"
#include "v2_decoder.hpp"
#include "v2_encoder.hpp"
#include <norm/include/normApi.h>
namespace
zmq
{
class
io_thread_t
;
class
session_base_t
;
class
norm_engine_t
:
public
io_object_t
,
public
i_engine
{
public
:
norm_engine_t
(
zmq
::
io_thread_t
*
parent_
,
const
options_t
&
options_
);
~
norm_engine_t
();
// create NORM instance, session, etc
int
init
(
const
char
*
network_
,
bool
send
,
bool
recv
);
void
shutdown
();
// i_engine interface implementation.
// Plug the engine to the session.
virtual
void
plug
(
zmq
::
io_thread_t
*
io_thread_
,
class
session_base_t
*
session_
);
// Terminate and deallocate the engine. Note that 'detached'
// events are not fired on termination.
virtual
void
terminate
();
// This method is called by the session to signalise that more
// messages can be written to the pipe.
virtual
void
restart_input
();
// This method is called by the session to signalise that there
// are messages to send available.
virtual
void
restart_output
();
virtual
void
zap_msg_available
()
{};
// i_poll_events interface implementation.
// (we only need in_event() for NormEvent notification)
// (i.e., don't have any output events or timers (yet))
void
in_event
();
private
:
void
unplug
();
void
send_data
();
void
recv_data
(
NormObjectHandle
stream
);
enum
{
BUFFER_SIZE
=
2048
};
// Used to keep track of streams from multiple senders
class
NormRxStreamState
{
public
:
NormRxStreamState
(
NormObjectHandle
normStream
,
int64_t
maxMsgSize
);
~
NormRxStreamState
();
NormObjectHandle
GetStreamHandle
()
const
{
return
norm_stream
;}
bool
Init
();
void
SetRxReady
(
bool
state
)
{
rx_ready
=
state
;}
bool
IsRxReady
()
const
{
return
rx_ready
;}
void
SetSync
(
bool
state
)
{
in_sync
=
state
;}
bool
InSync
()
const
{
return
in_sync
;}
// These are used to feed data to decoder
// and its underlying "msg" buffer
char
*
AccessBuffer
()
{
return
(
char
*
)(
buffer_ptr
+
buffer_count
);}
size_t
GetBytesNeeded
()
const
{
return
(
buffer_size
-
buffer_count
);}
void
IncrementBufferCount
(
size_t
count
)
{
buffer_count
+=
count
;}
msg_t
*
AccessMsg
()
{
return
zmq_decoder
->
msg
();}
// This invokes the decoder "decode" method
// returning 0 if more data is needed,
// 1 if the message is complete, If an error
// occurs the 'sync' is dropped and the
// decoder re-initialized
int
Decode
();
class
List
{
public
:
List
();
~
List
();
void
Append
(
NormRxStreamState
&
item
);
void
Remove
(
NormRxStreamState
&
item
);
bool
IsEmpty
()
const
{
return
(
NULL
==
head
);}
void
Destroy
();
class
Iterator
{
public
:
Iterator
(
const
List
&
list
);
NormRxStreamState
*
GetNextItem
();
private
:
NormRxStreamState
*
next_item
;
};
friend
class
Iterator
;
private
:
NormRxStreamState
*
head
;
NormRxStreamState
*
tail
;
};
// end class zmq::norm_engine_t::NormRxStreamState::List
friend
class
List
;
List
*
AccessList
()
{
return
list
;}
private
:
NormObjectHandle
norm_stream
;
int64_t
max_msg_size
;
bool
in_sync
;
bool
rx_ready
;
v2_decoder_t
*
zmq_decoder
;
bool
skip_norm_sync
;
unsigned
char
*
buffer_ptr
;
size_t
buffer_size
;
size_t
buffer_count
;
NormRxStreamState
*
prev
;
NormRxStreamState
*
next
;
NormRxStreamState
::
List
*
list
;
};
// end class zmq::norm_engine_t::NormRxStreamState
session_base_t
*
zmq_session
;
options_t
options
;
NormInstanceHandle
norm_instance
;
handle_t
norm_descriptor_handle
;
NormSessionHandle
norm_session
;
bool
is_sender
;
bool
is_receiver
;
// Sender state
msg_t
tx_msg
;
v2_encoder_t
zmq_encoder
;
// for tx messages (we use v2 for now)
bool
tx_more_bit
;
bool
zmq_output_ready
;
// zmq has msg(s) to send
NormObjectHandle
norm_tx_stream
;
bool
norm_tx_ready
;
// norm has tx queue vacancy
// tbd - maybe don't need buffer if can access encoder buffer directly?
char
tx_buffer
[
BUFFER_SIZE
];
unsigned
int
tx_index
;
unsigned
int
tx_len
;
// Receiver state
// Lists of norm rx streams from remote senders
bool
zmq_input_ready
;
// zmq ready to receive msg(s)
NormRxStreamState
::
List
rx_pending_list
;
// rx streams waiting for data reception
NormRxStreamState
::
List
rx_ready_list
;
// rx streams ready for NormStreamRead()
NormRxStreamState
::
List
msg_ready_list
;
// rx streams w/ msg ready for push to zmq
};
// end class norm_engine_t
}
#endif // ZMQ_HAVE_NORM
#endif // !__ZMQ_NORM_ENGINE_HPP_INCLUDED__
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment