Commit 0a97e054 authored by bebopagogo's avatar bebopagogo

removed dependency on deprecated encoder_base::has_data() method

parent 43071bc7
...@@ -12,8 +12,9 @@ zmq::norm_engine_t::norm_engine_t(io_thread_t* parent_, ...@@ -12,8 +12,9 @@ zmq::norm_engine_t::norm_engine_t(io_thread_t* parent_,
: io_object_t(parent_), zmq_session(NULL), options(options_), : io_object_t(parent_), zmq_session(NULL), options(options_),
norm_instance(NORM_INSTANCE_INVALID), norm_session(NORM_SESSION_INVALID), norm_instance(NORM_INSTANCE_INVALID), norm_session(NORM_SESSION_INVALID),
is_sender(false), is_receiver(false), is_sender(false), is_receiver(false),
zmq_encoder(0), tx_more_bit(false), zmq_output_ready(false), zmq_encoder(0), norm_tx_stream(NORM_OBJECT_INVALID),
norm_tx_stream(NORM_OBJECT_INVALID), norm_tx_ready(false), tx_first_msg(true), tx_more_bit(false),
zmq_output_ready(false), norm_tx_ready(false),
tx_index(0), tx_len(0), tx_index(0), tx_len(0),
zmq_input_ready(false) zmq_input_ready(false)
{ {
...@@ -94,8 +95,9 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv) ...@@ -94,8 +95,9 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv)
// TBD - What do we use for our local NormNodeId? // TBD - What do we use for our local NormNodeId?
// (for now we use automatic, IP addr based assignment or passed in 'id') // (for now we use automatic, IP addr based assignment or passed in 'id')
// a) Add function to use iface addr // a) Use ZMQ Identity somehow?
// b) Randomize and implement a NORM session layer // b) Add function to use iface addr
// c) Randomize and implement a NORM session layer
// conflict detection/resolution protocol // conflict detection/resolution protocol
norm_session = NormCreateSession(norm_instance, addr, portNumber, localId); norm_session = NormCreateSession(norm_instance, addr, portNumber, localId);
...@@ -107,6 +109,7 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv) ...@@ -107,6 +109,7 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv)
errno = savedErrno; errno = savedErrno;
return -1; return -1;
} }
// There's many other useful NORM options that could be applied here
if (NormIsUnicastAddress(addr)) if (NormIsUnicastAddress(addr))
{ {
NormSetDefaultUnicastNack(norm_session, true); NormSetDefaultUnicastNack(norm_session, true);
...@@ -128,6 +131,9 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv) ...@@ -128,6 +131,9 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv)
if (recv) if (recv)
{ {
// The alternative NORM_SYNC_CURRENT here would provide "instant"
// receiver sync to the sender's _current_ message transmission.
// NORM_SYNC_STREAM tries to get everything the sender has cached/buffered
NormSetDefaultSyncPolicy(norm_session, NORM_SYNC_STREAM); NormSetDefaultSyncPolicy(norm_session, NORM_SYNC_STREAM);
if (!NormStartReceiver(norm_session, 2*1024*1024)) if (!NormStartReceiver(norm_session, 2*1024*1024))
{ {
...@@ -159,7 +165,7 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv) ...@@ -159,7 +165,7 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv)
} }
NormSetCongestionControl(norm_session, true); NormSetCongestionControl(norm_session, true);
norm_tx_ready = true; norm_tx_ready = true;
is_sender = true; is_sender = true;
if (NORM_OBJECT_INVALID == (norm_tx_stream = NormStreamOpen(norm_session, 2*1024*1024))) if (NORM_OBJECT_INVALID == (norm_tx_stream = NormStreamOpen(norm_session, 2*1024*1024)))
{ {
// errno set by whatever failed // errno set by whatever failed
...@@ -172,6 +178,10 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv) ...@@ -172,6 +178,10 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv)
} }
} }
//NormSetMessageTrace(norm_session, true);
//NormSetDebugLevel(3);
//NormOpenDebugLog(norm_instance, "normLog.txt");
return 0; // no error return 0; // no error
} // end zmq::norm_engine_t::init() } // end zmq::norm_engine_t::init()
...@@ -250,9 +260,62 @@ void zmq::norm_engine_t::send_data() ...@@ -250,9 +260,62 @@ void zmq::norm_engine_t::send_data()
// Here we write as much as is available or we can // Here we write as much as is available or we can
while (zmq_output_ready && norm_tx_ready) while (zmq_output_ready && norm_tx_ready)
{ {
if (0 == tx_len)
{
// Our tx_buffer needs data to send
// Get more data from encoder
size_t space = BUFFER_SIZE;
unsigned char* bufPtr = (unsigned char*)tx_buffer;
tx_len = zmq_encoder.encode(&bufPtr, space);
if (0 == tx_len)
{
if (tx_first_msg)
{
// We don't need to mark eom/flush until a message is sent
tx_first_msg = false;
}
else
{
// A prior message was completely written to stream, so
// mark end-of-message and possibly flush (to force packet transmission,
// even if it's not a full segment so message gets delivered quickly)
// NormStreamMarkEom(norm_tx_stream); // the flush below marks eom
// Note NORM_FLUSH_ACTIVE makes NORM fairly chatty for low duty cycle messaging
// but makes sure content is delivered quickly. Positive acknowledgements
// with flush override would make NORM more succinct here
NormStreamFlush(norm_tx_stream, true, NORM_FLUSH_ACTIVE);
}
// Need to pull and load a new message to send
if (-1 == zmq_session->pull_msg(&tx_msg))
{
// We need to wait for "restart_output()" to be called by ZMQ
zmq_output_ready = false;
break;
}
zmq_encoder.load_msg(&tx_msg);
// Should we write message size header for NORM to use? Or expect NORM
// receiver to decode ZMQ message framing format(s)?
// OK - we need to use a byte to denote when the ZMQ frame is the _first_
// frame of a message so it can be decoded properly when a receiver
// 'syncs' mid-stream. We key off the the state of the 'more_flag'
// I.e.,If more_flag _was_ false previously, this is the first
// frame of a ZMQ message.
if (tx_more_bit)
tx_buffer[0] = (char)0xff; // this is not first frame of message
else
tx_buffer[0] = 0x00; // this is first frame of message
tx_more_bit = (0 != (tx_msg.flags() & msg_t::more));
// Go ahead an get a first chunk of the message
bufPtr++;
space--;
tx_len = 1 + zmq_encoder.encode(&bufPtr, space);
tx_index = 0;
}
}
// Do we have data in our tx_buffer pending // Do we have data in our tx_buffer pending
if (tx_index < tx_len) if (tx_index < tx_len)
{ {
// We have data in our tx_buffer to send, so write it to the stream
tx_index += NormStreamWrite(norm_tx_stream, tx_buffer + tx_index, tx_len - tx_index); tx_index += NormStreamWrite(norm_tx_stream, tx_buffer + tx_index, tx_len - tx_index);
if (tx_index < tx_len) if (tx_index < tx_len)
{ {
...@@ -260,48 +323,9 @@ void zmq::norm_engine_t::send_data() ...@@ -260,48 +323,9 @@ void zmq::norm_engine_t::send_data()
norm_tx_ready = false; norm_tx_ready = false;
break; break;
} }
else if (!zmq_encoder.has_data()) tx_len = 0; // all buffered data was written
{
// Buffer contained end of message (should we flush?)
//NormStreamMarkEom(norm_tx_stream);
// Note this makes NORM fairly chatty for low duty cycle messaging
// but makes sure content is delivered quickly. Positive acknowledgements
// with flush override would make NORM more succinct here
NormStreamFlush(norm_tx_stream, true, NORM_FLUSH_ACTIVE);
}
tx_index = tx_len = 0; // all buffered data was written
}
// Still norm_tx_ready, so ask for more data from zmq_session
if (!zmq_encoder.has_data())
{
// Existing message had no more data to encode
if (-1 == zmq_session->pull_msg(&tx_msg))
{
// We need to wait for "restart_output()" to be called by ZMQ
zmq_output_ready = false;
break;
}
zmq_encoder.load_msg(&tx_msg);
// Should we write message size header for NORM to use? Or expect NORM
// receiver to decode ZMQ message framing format(s)?
// OK - we need to use a byte to denote when the ZMQ frame is the _first_
// frame of a message so it can be decoded properly when a receiver
// 'syncs' mid-stream. We key off the the state of the 'more_flag'
// I.e.,If more_flag _was_ false previously, this is the first
// frame of a ZMQ message.
if (tx_more_bit)
tx_buffer[0] = (char)0xff; // this is not first frame of message
else
tx_buffer[0] = 0x00; // this is first frame of message
tx_more_bit = (0 != (tx_msg.flags() & v2_protocol_t::more_flag));
tx_len = 1;
} }
// Get more data from encoder } // end while (zmq_output_ready && norm_tx_ready)
size_t space = BUFFER_SIZE - tx_index;
unsigned char* bufPtr = (unsigned char*)(tx_buffer + tx_len);
size_t bytes = zmq_encoder.encode(&bufPtr, space);
tx_len += bytes;
}
} // end zmq::norm_engine_t::send_data() } // end zmq::norm_engine_t::send_data()
void zmq::norm_engine_t::in_event() void zmq::norm_engine_t::in_event()
...@@ -327,7 +351,7 @@ void zmq::norm_engine_t::in_event() ...@@ -327,7 +351,7 @@ void zmq::norm_engine_t::in_event()
break; break;
case NORM_RX_OBJECT_NEW: case NORM_RX_OBJECT_NEW:
break; //break;
case NORM_RX_OBJECT_UPDATED: case NORM_RX_OBJECT_UPDATED:
recv_data(event.object); recv_data(event.object);
break; break;
......
...@@ -162,11 +162,12 @@ namespace zmq ...@@ -162,11 +162,12 @@ namespace zmq
// Sender state // Sender state
msg_t tx_msg; msg_t tx_msg;
v2_encoder_t zmq_encoder; // for tx messages (we use v2 for now) v2_encoder_t zmq_encoder; // for tx messages (we use v2 for now)
NormObjectHandle norm_tx_stream;
bool tx_first_msg;
bool tx_more_bit; bool tx_more_bit;
bool zmq_output_ready; // zmq has msg(s) to send bool zmq_output_ready; // zmq has msg(s) to send
NormObjectHandle norm_tx_stream;
bool norm_tx_ready; // norm has tx queue vacancy bool norm_tx_ready; // norm has tx queue vacancy
// tbd - maybe don't need buffer if can access encoder buffer directly? // tbd - maybe don't need buffer if can access zmq message buffer directly?
char tx_buffer[BUFFER_SIZE]; char tx_buffer[BUFFER_SIZE];
unsigned int tx_index; unsigned int tx_index;
unsigned int tx_len; unsigned int tx_len;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment