Commit d3e0be15 authored by bebopagogo's avatar bebopagogo

added uncommitted norm_engine changes

parent 33f22d0d
...@@ -60,7 +60,7 @@ namespace zmq ...@@ -60,7 +60,7 @@ namespace zmq
{ {
free (buf); free (buf);
} }
// The function returns a batch of binary data. The data // The function returns a batch of binary data. The data
// are filled to a supplied buffer. If no buffer is supplied (data_ // are filled to a supplied buffer. If no buffer is supplied (data_
// points to NULL) decoder object will provide buffer of its own. // points to NULL) decoder object will provide buffer of its own.
......
...@@ -15,7 +15,7 @@ zmq::norm_engine_t::norm_engine_t(io_thread_t* parent_, ...@@ -15,7 +15,7 @@ zmq::norm_engine_t::norm_engine_t(io_thread_t* parent_,
norm_instance(NORM_INSTANCE_INVALID), norm_session(NORM_SESSION_INVALID), norm_instance(NORM_INSTANCE_INVALID), norm_session(NORM_SESSION_INVALID),
is_sender(false), is_receiver(false), is_sender(false), is_receiver(false),
zmq_encoder(0), tx_more_bit(false), zmq_output_ready(false), zmq_encoder(0), tx_more_bit(false), zmq_output_ready(false),
norm_tx_stream(NORM_OBJECT_INVALID), norm_tx_ready(false), norm_tx_stream(NORM_OBJECT_INVALID), norm_tx_ready(false),
tx_index(0), tx_len(0), tx_index(0), tx_len(0),
zmq_input_ready(false) zmq_input_ready(false)
{ {
...@@ -267,12 +267,14 @@ void zmq::norm_engine_t::send_data() ...@@ -267,12 +267,14 @@ void zmq::norm_engine_t::send_data()
// Buffer contained end of message (should we flush?) // Buffer contained end of message (should we flush?)
//NormStreamMarkEom(norm_tx_stream); //NormStreamMarkEom(norm_tx_stream);
// Note this makes NORM fairly chatty for low duty cycle messaging // Note this makes NORM fairly chatty for low duty cycle messaging
// but makes sure content is delivered quickly. Positive acknowledgements
// with flush override would make NORM more succinct here
NormStreamFlush(norm_tx_stream, true, NORM_FLUSH_ACTIVE); NormStreamFlush(norm_tx_stream, true, NORM_FLUSH_ACTIVE);
} }
tx_index = tx_len = 0; // all buffered data was written tx_index = tx_len = 0; // all buffered data was written
} }
// Still norm_tx_ready, so ask for more data from zmq_session // Still norm_tx_ready, so ask for more data from zmq_session
if (!zmq_encoder.has_data()) if (!zmq_encoder.has_data())
{ {
// Existing message had no more data to encode // Existing message had no more data to encode
if (-1 == zmq_session->pull_msg(&tx_msg)) if (-1 == zmq_session->pull_msg(&tx_msg))
...@@ -333,20 +335,19 @@ void zmq::norm_engine_t::in_event() ...@@ -333,20 +335,19 @@ void zmq::norm_engine_t::in_event()
break; break;
case NORM_RX_OBJECT_ABORTED: case NORM_RX_OBJECT_ABORTED:
{
NormRxStreamState* rxState = (NormRxStreamState*)NormObjectGetUserData(event.object);
if (NULL != rxState)
{ {
NormRxStreamState* rxState = (NormRxStreamState*)NormObjectGetUserData(event.object); // Remove the state from the list it's in
if (NULL != rxState) // This is now unnecessary since deletion takes care of list removal
{ // but in the interest of being clear ...
// Remove the state from the list it's in NormRxStreamState::List* list = rxState->AccessList();
// This is now unnecessary since deletion takes care of list removal if (NULL != list) list->Remove(*rxState);
// but in the interest of being clear ...
NormRxStreamState::List* list = rxState->AccessList();
if (NULL != list) list->Remove(*rxState);
}
delete rxState;
} }
delete rxState;
break; break;
}
case NORM_REMOTE_SENDER_INACTIVE: case NORM_REMOTE_SENDER_INACTIVE:
// Here we free resources used for this formerly active sender. // Here we free resources used for this formerly active sender.
// Note w/ NORM_SYNC_STREAM, if sender reactivates, we may // Note w/ NORM_SYNC_STREAM, if sender reactivates, we may
......
...@@ -2,8 +2,6 @@ ...@@ -2,8 +2,6 @@
#ifndef __ZMQ_NORM_ENGINE_HPP_INCLUDED__ #ifndef __ZMQ_NORM_ENGINE_HPP_INCLUDED__
#define __ZMQ_NORM_ENGINE_HPP_INCLUDED__ #define __ZMQ_NORM_ENGINE_HPP_INCLUDED__
#define ZMQ_HAVE_NORM 1
#if defined ZMQ_HAVE_NORM #if defined ZMQ_HAVE_NORM
#include "io_object.hpp" #include "io_object.hpp"
...@@ -12,7 +10,7 @@ ...@@ -12,7 +10,7 @@
#include "v2_decoder.hpp" #include "v2_decoder.hpp"
#include "v2_encoder.hpp" #include "v2_encoder.hpp"
#include <norm/include/normApi.h> #include <normApi.h>
namespace zmq namespace zmq
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment