Commit f0945c85 authored by Luca Boccassi's avatar Luca Boccassi Committed by GitHub

Merge pull request #2110 from c-rack/remove-tabs

Problem: source conatins tabs and trailing spaces
parents e8d665c4 8cd33339
...@@ -91,7 +91,7 @@ zmq::ctx_t::ctx_t () : ...@@ -91,7 +91,7 @@ zmq::ctx_t::ctx_t () :
vmci_family = -1; vmci_family = -1;
#endif #endif
scoped_lock_t locker(crypto_sync); scoped_lock_t locker(crypto_sync);
#if defined (ZMQ_USE_TWEETNACL) #if defined (ZMQ_USE_TWEETNACL)
// allow opening of /dev/urandom // allow opening of /dev/urandom
unsigned char tmpbytes[4]; unsigned char tmpbytes[4];
...@@ -510,13 +510,13 @@ void zmq::ctx_t::pend_connection (const std::string &addr_, ...@@ -510,13 +510,13 @@ void zmq::ctx_t::pend_connection (const std::string &addr_,
endpoints_t::iterator it = endpoints.find (addr_); endpoints_t::iterator it = endpoints.find (addr_);
if (it == endpoints.end ()) { if (it == endpoints.end ()) {
// Still no bind. // Still no bind.
endpoint_.socket->inc_seqnum (); endpoint_.socket->inc_seqnum ();
pending_connections.insert (pending_connections_t::value_type (addr_, pending_connection)); pending_connections.insert (pending_connections_t::value_type (addr_, pending_connection));
} else { } else {
// Bind has happened in the mean time, connect directly // Bind has happened in the mean time, connect directly
connect_inproc_sockets(it->second.socket, it->second.options, pending_connection, connect_side); connect_inproc_sockets(it->second.socket, it->second.options, pending_connection, connect_side);
} }
} }
void zmq::ctx_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_) void zmq::ctx_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_)
......
...@@ -119,7 +119,7 @@ namespace zmq ...@@ -119,7 +119,7 @@ namespace zmq
bytes_used_ = size_; bytes_used_ = size_;
while (!to_read) { while (!to_read) {
const int rc = const int rc =
(static_cast <T *> (this)->*next) (data_ + bytes_used_); (static_cast <T *> (this)->*next) (data_ + bytes_used_);
if (rc != 0) if (rc != 0)
return rc; return rc;
......
...@@ -53,7 +53,7 @@ zmq::dgram_t::~dgram_t () ...@@ -53,7 +53,7 @@ zmq::dgram_t::~dgram_t ()
void zmq::dgram_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::dgram_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{ {
LIBZMQ_UNUSED(subscribe_to_all_); LIBZMQ_UNUSED(subscribe_to_all_);
zmq_assert (pipe_); zmq_assert (pipe_);
......
...@@ -130,7 +130,7 @@ int zmq::get_peer_ip_address (fd_t sockfd_, std::string &ip_addr_) ...@@ -130,7 +130,7 @@ int zmq::get_peer_ip_address (fd_t sockfd_, std::string &ip_addr_)
rc = getpeername (sockfd_, (struct sockaddr*) &ss, &addrlen); rc = getpeername (sockfd_, (struct sockaddr*) &ss, &addrlen);
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
if (rc == SOCKET_ERROR) { if (rc == SOCKET_ERROR) {
const int last_error = WSAGetLastError(); const int last_error = WSAGetLastError();
wsa_assert (last_error != WSANOTINITIALISED && wsa_assert (last_error != WSANOTINITIALISED &&
last_error != WSAEFAULT && last_error != WSAEFAULT &&
last_error != WSAEINPROGRESS && last_error != WSAEINPROGRESS &&
......
...@@ -9,14 +9,14 @@ ...@@ -9,14 +9,14 @@
zmq::norm_engine_t::norm_engine_t(io_thread_t* parent_, zmq::norm_engine_t::norm_engine_t(io_thread_t* parent_,
const options_t& options_) const options_t& options_)
: io_object_t(parent_), zmq_session(NULL), options(options_), : io_object_t(parent_), zmq_session(NULL), options(options_),
norm_instance(NORM_INSTANCE_INVALID), norm_session(NORM_SESSION_INVALID), norm_instance(NORM_INSTANCE_INVALID), norm_session(NORM_SESSION_INVALID),
is_sender(false), is_receiver(false), is_sender(false), is_receiver(false),
zmq_encoder(0), norm_tx_stream(NORM_OBJECT_INVALID), zmq_encoder(0), norm_tx_stream(NORM_OBJECT_INVALID),
tx_first_msg(true), tx_more_bit(false), tx_first_msg(true), tx_more_bit(false),
zmq_output_ready(false), norm_tx_ready(false), zmq_output_ready(false), norm_tx_ready(false),
tx_index(0), tx_len(0), tx_index(0), tx_len(0),
zmq_input_ready(false) zmq_input_ready(false)
{ {
int rc = tx_msg.init(); int rc = tx_msg.init();
errno_assert(0 == rc); errno_assert(0 == rc);
...@@ -50,7 +50,7 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv) ...@@ -50,7 +50,7 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv)
{ {
ifacePtr = network_; ifacePtr = network_;
} }
// Second, look for optional multicast ifaceName // Second, look for optional multicast ifaceName
char ifaceName[256]; char ifaceName[256];
const char* addrPtr = strchr(ifacePtr, ';'); const char* addrPtr = strchr(ifacePtr, ';');
...@@ -68,7 +68,7 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv) ...@@ -68,7 +68,7 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv)
addrPtr = ifacePtr; addrPtr = ifacePtr;
ifacePtr = NULL; ifacePtr = NULL;
} }
// Finally, parse IP address and port number // Finally, parse IP address and port number
const char* portPtr = strrchr(addrPtr, ':'); const char* portPtr = strrchr(addrPtr, ':');
if (NULL == portPtr) if (NULL == portPtr)
...@@ -76,7 +76,7 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv) ...@@ -76,7 +76,7 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv)
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
char addr[256]; char addr[256];
size_t addrLen = portPtr - addrPtr; size_t addrLen = portPtr - addrPtr;
if (addrLen > 255) addrLen = 255; if (addrLen > 255) addrLen = 255;
...@@ -84,7 +84,7 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv) ...@@ -84,7 +84,7 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv)
addr[addrLen] = '\0'; addr[addrLen] = '\0';
portPtr++; portPtr++;
unsigned short portNumber = atoi(portPtr); unsigned short portNumber = atoi(portPtr);
if (NORM_INSTANCE_INVALID == norm_instance) if (NORM_INSTANCE_INVALID == norm_instance)
{ {
if (NORM_INSTANCE_INVALID == (norm_instance = NormCreateInstance())) if (NORM_INSTANCE_INVALID == (norm_instance = NormCreateInstance()))
...@@ -93,14 +93,14 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv) ...@@ -93,14 +93,14 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv)
return -1; return -1;
} }
} }
// TBD - What do we use for our local NormNodeId? // TBD - What do we use for our local NormNodeId?
// (for now we use automatic, IP addr based assignment or passed in 'id') // (for now we use automatic, IP addr based assignment or passed in 'id')
// a) Use ZMQ Identity somehow? // a) Use ZMQ Identity somehow?
// b) Add function to use iface addr // b) Add function to use iface addr
// c) Randomize and implement a NORM session layer // c) Randomize and implement a NORM session layer
// conflict detection/resolution protocol // conflict detection/resolution protocol
norm_session = NormCreateSession(norm_instance, addr, portNumber, localId); norm_session = NormCreateSession(norm_instance, addr, portNumber, localId);
if (NORM_SESSION_INVALID == norm_session) if (NORM_SESSION_INVALID == norm_session)
{ {
...@@ -129,7 +129,7 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv) ...@@ -129,7 +129,7 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv)
NormSetMulticastInterface(norm_session, ifacePtr); NormSetMulticastInterface(norm_session, ifacePtr);
} }
} }
if (recv) if (recv)
{ {
// The alternative NORM_SYNC_CURRENT here would provide "instant" // The alternative NORM_SYNC_CURRENT here would provide "instant"
...@@ -148,7 +148,7 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv) ...@@ -148,7 +148,7 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv)
} }
is_receiver = true; is_receiver = true;
} }
if (send) if (send)
{ {
// Pick a random sender instance id (aka norm sender session id) // Pick a random sender instance id (aka norm sender session id)
...@@ -163,10 +163,10 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv) ...@@ -163,10 +163,10 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv)
norm_instance = NORM_INSTANCE_INVALID; norm_instance = NORM_INSTANCE_INVALID;
errno = savedErrno; errno = savedErrno;
return -1; return -1;
} }
NormSetCongestionControl(norm_session, true); NormSetCongestionControl(norm_session, true);
norm_tx_ready = true; norm_tx_ready = true;
is_sender = true; is_sender = true;
if (NORM_OBJECT_INVALID == (norm_tx_stream = NormStreamOpen(norm_session, 2*1024*1024))) if (NORM_OBJECT_INVALID == (norm_tx_stream = NormStreamOpen(norm_session, 2*1024*1024)))
{ {
// errno set by whatever failed // errno set by whatever failed
...@@ -178,11 +178,11 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv) ...@@ -178,11 +178,11 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv)
return -1; return -1;
} }
} }
//NormSetMessageTrace(norm_session, true); //NormSetMessageTrace(norm_session, true);
//NormSetDebugLevel(3); //NormSetDebugLevel(3);
//NormOpenDebugLog(norm_instance, "normLog.txt"); //NormOpenDebugLog(norm_instance, "normLog.txt");
return 0; // no error return 0; // no error
} // end zmq::norm_engine_t::init() } // end zmq::norm_engine_t::init()
...@@ -192,12 +192,12 @@ void zmq::norm_engine_t::shutdown() ...@@ -192,12 +192,12 @@ void zmq::norm_engine_t::shutdown()
if (is_receiver) if (is_receiver)
{ {
NormStopReceiver(norm_session); NormStopReceiver(norm_session);
// delete any active NormRxStreamState // delete any active NormRxStreamState
rx_pending_list.Destroy(); rx_pending_list.Destroy();
rx_ready_list.Destroy(); rx_ready_list.Destroy();
msg_ready_list.Destroy(); msg_ready_list.Destroy();
is_receiver = false; is_receiver = false;
} }
if (is_sender) if (is_sender)
...@@ -224,20 +224,20 @@ void zmq::norm_engine_t::plug (io_thread_t* io_thread_, session_base_t *session_ ...@@ -224,20 +224,20 @@ void zmq::norm_engine_t::plug (io_thread_t* io_thread_, session_base_t *session_
zmq_session = session_; zmq_session = session_;
if (is_sender) zmq_output_ready = true; if (is_sender) zmq_output_ready = true;
if (is_receiver) zmq_input_ready = true; if (is_receiver) zmq_input_ready = true;
fd_t normDescriptor = NormGetDescriptor(norm_instance); fd_t normDescriptor = NormGetDescriptor(norm_instance);
norm_descriptor_handle = add_fd(normDescriptor); norm_descriptor_handle = add_fd(normDescriptor);
// Set POLLIN for notification of pending NormEvents // Set POLLIN for notification of pending NormEvents
set_pollin(norm_descriptor_handle); set_pollin(norm_descriptor_handle);
if (is_sender) send_data(); if (is_sender) send_data();
} // end zmq::norm_engine_t::init() } // end zmq::norm_engine_t::init()
void zmq::norm_engine_t::unplug() void zmq::norm_engine_t::unplug()
{ {
rm_fd(norm_descriptor_handle); rm_fd(norm_descriptor_handle);
zmq_session = NULL; zmq_session = NULL;
} // end zmq::norm_engine_t::unplug() } // end zmq::norm_engine_t::unplug()
...@@ -253,7 +253,7 @@ void zmq::norm_engine_t::restart_output() ...@@ -253,7 +253,7 @@ void zmq::norm_engine_t::restart_output()
// There's new message data available from the session // There's new message data available from the session
zmq_output_ready = true; zmq_output_ready = true;
if (norm_tx_ready) send_data(); if (norm_tx_ready) send_data();
} // end zmq::norm_engine_t::restart_output() } // end zmq::norm_engine_t::restart_output()
void zmq::norm_engine_t::send_data() void zmq::norm_engine_t::send_data()
...@@ -289,7 +289,7 @@ void zmq::norm_engine_t::send_data() ...@@ -289,7 +289,7 @@ void zmq::norm_engine_t::send_data()
// Need to pull and load a new message to send // Need to pull and load a new message to send
if (-1 == zmq_session->pull_msg(&tx_msg)) if (-1 == zmq_session->pull_msg(&tx_msg))
{ {
// We need to wait for "restart_output()" to be called by ZMQ // We need to wait for "restart_output()" to be called by ZMQ
zmq_output_ready = false; zmq_output_ready = false;
break; break;
} }
...@@ -301,7 +301,7 @@ void zmq::norm_engine_t::send_data() ...@@ -301,7 +301,7 @@ void zmq::norm_engine_t::send_data()
// 'syncs' mid-stream. We key off the the state of the 'more_flag' // 'syncs' mid-stream. We key off the the state of the 'more_flag'
// I.e.,If more_flag _was_ false previously, this is the first // I.e.,If more_flag _was_ false previously, this is the first
// frame of a ZMQ message. // frame of a ZMQ message.
if (tx_more_bit) if (tx_more_bit)
tx_buffer[0] = (char)0xff; // this is not first frame of message tx_buffer[0] = (char)0xff; // this is not first frame of message
else else
tx_buffer[0] = 0x00; // this is first frame of message tx_buffer[0] = 0x00; // this is first frame of message
...@@ -339,7 +339,7 @@ void zmq::norm_engine_t::in_event() ...@@ -339,7 +339,7 @@ void zmq::norm_engine_t::in_event()
zmq_assert(false); zmq_assert(false);
return; return;
} }
switch(event.type) switch(event.type)
{ {
case NORM_TX_QUEUE_VACANCY: case NORM_TX_QUEUE_VACANCY:
...@@ -350,13 +350,13 @@ void zmq::norm_engine_t::in_event() ...@@ -350,13 +350,13 @@ void zmq::norm_engine_t::in_event()
send_data(); send_data();
} }
break; break;
case NORM_RX_OBJECT_NEW: case NORM_RX_OBJECT_NEW:
//break; //break;
case NORM_RX_OBJECT_UPDATED: case NORM_RX_OBJECT_UPDATED:
recv_data(event.object); recv_data(event.object);
break; break;
case NORM_RX_OBJECT_ABORTED: case NORM_RX_OBJECT_ABORTED:
{ {
NormRxStreamState* rxState = (NormRxStreamState*)NormObjectGetUserData(event.object); NormRxStreamState* rxState = (NormRxStreamState*)NormObjectGetUserData(event.object);
...@@ -370,7 +370,7 @@ void zmq::norm_engine_t::in_event() ...@@ -370,7 +370,7 @@ void zmq::norm_engine_t::in_event()
} }
delete rxState; delete rxState;
break; break;
} }
case NORM_REMOTE_SENDER_INACTIVE: case NORM_REMOTE_SENDER_INACTIVE:
// Here we free resources used for this formerly active sender. // Here we free resources used for this formerly active sender.
// Note w/ NORM_SYNC_STREAM, if sender reactivates, we may // Note w/ NORM_SYNC_STREAM, if sender reactivates, we may
...@@ -380,11 +380,11 @@ void zmq::norm_engine_t::in_event() ...@@ -380,11 +380,11 @@ void zmq::norm_engine_t::in_event()
// user configurable timeout here to wait some amount of time // user configurable timeout here to wait some amount of time
// after this event to declare the remote sender truly dead // after this event to declare the remote sender truly dead
// and delete its state??? // and delete its state???
NormNodeDelete(event.sender); NormNodeDelete(event.sender);
break; break;
default: default:
// We ignore some NORM events // We ignore some NORM events
break; break;
} }
} // zmq::norm_engine_t::in_event() } // zmq::norm_engine_t::in_event()
...@@ -396,7 +396,7 @@ void zmq::norm_engine_t::restart_input() ...@@ -396,7 +396,7 @@ void zmq::norm_engine_t::restart_input()
// Process any pending received messages // Process any pending received messages
if (!msg_ready_list.IsEmpty()) if (!msg_ready_list.IsEmpty())
recv_data(NORM_OBJECT_INVALID); recv_data(NORM_OBJECT_INVALID);
} // end zmq::norm_engine_t::restart_input() } // end zmq::norm_engine_t::restart_input()
void zmq::norm_engine_t::recv_data(NormObjectHandle object) void zmq::norm_engine_t::recv_data(NormObjectHandle object)
...@@ -447,19 +447,19 @@ void zmq::norm_engine_t::recv_data(NormObjectHandle object) ...@@ -447,19 +447,19 @@ void zmq::norm_engine_t::recv_data(NormObjectHandle object)
{ {
switch(rxState->Decode()) switch(rxState->Decode())
{ {
case 1: // msg completed case 1: // msg completed
// Complete message decoded, move this stream to msg_ready_list // Complete message decoded, move this stream to msg_ready_list
// to push the message up to the session below. Note the stream // to push the message up to the session below. Note the stream
// will be returned to the "rx_ready_list" after that's done // will be returned to the "rx_ready_list" after that's done
rx_ready_list.Remove(*rxState); rx_ready_list.Remove(*rxState);
msg_ready_list.Append(*rxState); msg_ready_list.Append(*rxState);
continue; continue;
case -1: // decoding error (shouldn't happen w/ NORM, but ...) case -1: // decoding error (shouldn't happen w/ NORM, but ...)
// We need to re-sync this stream (decoder buffer was reset) // We need to re-sync this stream (decoder buffer was reset)
rxState->SetSync(false); rxState->SetSync(false);
break; break;
default: // 0 - need more data default: // 0 - need more data
break; break;
} }
...@@ -524,12 +524,12 @@ void zmq::norm_engine_t::recv_data(NormObjectHandle object) ...@@ -524,12 +524,12 @@ void zmq::norm_engine_t::recv_data(NormObjectHandle object)
rx_pending_list.Append(*rxState); rx_pending_list.Append(*rxState);
} }
} // end while(NULL != (rxState = iterator.GetNextItem())) } // end while(NULL != (rxState = iterator.GetNextItem()))
if (zmq_input_ready) if (zmq_input_ready)
{ {
// At this point, we've made a pass through the "rx_ready" stream list // At this point, we've made a pass through the "rx_ready" stream list
// Now make a pass through the "msg_pending" list (if the zmq session // Now make a pass through the "msg_pending" list (if the zmq session
// ready for more input). This may possibly return streams back to // ready for more input). This may possibly return streams back to
// the "rx ready" stream list after their pending message is handled // the "rx ready" stream list after their pending message is handled
NormRxStreamState::List::Iterator iterator(msg_ready_list); NormRxStreamState::List::Iterator iterator(msg_ready_list);
NormRxStreamState* rxState; NormRxStreamState* rxState;
...@@ -549,7 +549,7 @@ void zmq::norm_engine_t::recv_data(NormObjectHandle object) ...@@ -549,7 +549,7 @@ void zmq::norm_engine_t::recv_data(NormObjectHandle object)
{ {
// session rejected message? // session rejected message?
// TBD - handle this better // TBD - handle this better
zmq_assert(false); zmq_assert(false);
} }
} }
// else message was accepted. // else message was accepted.
...@@ -561,15 +561,15 @@ void zmq::norm_engine_t::recv_data(NormObjectHandle object) ...@@ -561,15 +561,15 @@ void zmq::norm_engine_t::recv_data(NormObjectHandle object)
} // end while(NULL != (rxState = iterator.GetNextItem())) } // end while(NULL != (rxState = iterator.GetNextItem()))
} // end if (zmq_input_ready) } // end if (zmq_input_ready)
} // end while ((!rx_ready_list.empty() || (zmq_input_ready && !msg_ready_list.empty())) } // end while ((!rx_ready_list.empty() || (zmq_input_ready && !msg_ready_list.empty()))
// Alert zmq of the messages we have pushed up // Alert zmq of the messages we have pushed up
zmq_session->flush(); zmq_session->flush();
} // end zmq::norm_engine_t::recv_data() } // end zmq::norm_engine_t::recv_data()
zmq::norm_engine_t::NormRxStreamState::NormRxStreamState(NormObjectHandle normStream, zmq::norm_engine_t::NormRxStreamState::NormRxStreamState(NormObjectHandle normStream,
int64_t maxMsgSize) int64_t maxMsgSize)
: norm_stream(normStream), max_msg_size(maxMsgSize), : norm_stream(normStream), max_msg_size(maxMsgSize),
in_sync(false), rx_ready(false), zmq_decoder(NULL), skip_norm_sync(false), in_sync(false), rx_ready(false), zmq_decoder(NULL), skip_norm_sync(false),
buffer_ptr(NULL), buffer_size(0), buffer_count(0), buffer_ptr(NULL), buffer_size(0), buffer_count(0),
prev(NULL), next(NULL), list(NULL) prev(NULL), next(NULL), list(NULL)
...@@ -620,17 +620,17 @@ int zmq::norm_engine_t::NormRxStreamState::Decode() ...@@ -620,17 +620,17 @@ int zmq::norm_engine_t::NormRxStreamState::Decode()
{ {
// There's pending data for the decoder to decode // There's pending data for the decoder to decode
size_t processed = 0; size_t processed = 0;
// This a bit of a kludgy approach used to weed // This a bit of a kludgy approach used to weed
// out the NORM ZMQ message transport "syncFlag" byte // out the NORM ZMQ message transport "syncFlag" byte
// from the ZMQ message stream being decoded (but it works!) // from the ZMQ message stream being decoded (but it works!)
if (skip_norm_sync) if (skip_norm_sync)
{ {
buffer_ptr++; buffer_ptr++;
buffer_count--; buffer_count--;
skip_norm_sync = false; skip_norm_sync = false;
} }
int rc = zmq_decoder->decode(buffer_ptr, buffer_count, processed); int rc = zmq_decoder->decode(buffer_ptr, buffer_count, processed);
buffer_ptr += processed; buffer_ptr += processed;
buffer_count -= processed; buffer_count -= processed;
...@@ -651,7 +651,7 @@ int zmq::norm_engine_t::NormRxStreamState::Decode() ...@@ -651,7 +651,7 @@ int zmq::norm_engine_t::NormRxStreamState::Decode()
skip_norm_sync = false; // will get consumed by norm sync check skip_norm_sync = false; // will get consumed by norm sync check
Init(); Init();
break; break;
case 0: case 0:
// need more data, keep decoding until buffer exhausted // need more data, keep decoding until buffer exhausted
break; break;
...@@ -662,7 +662,7 @@ int zmq::norm_engine_t::NormRxStreamState::Decode() ...@@ -662,7 +662,7 @@ int zmq::norm_engine_t::NormRxStreamState::Decode()
buffer_size = 0; buffer_size = 0;
zmq_decoder->get_buffer(&buffer_ptr, &buffer_size); zmq_decoder->get_buffer(&buffer_ptr, &buffer_size);
return 0; // need more data return 0; // need more data
} // end zmq::norm_engine_t::NormRxStreamState::Decode() } // end zmq::norm_engine_t::NormRxStreamState::Decode()
zmq::norm_engine_t::NormRxStreamState::List::List() zmq::norm_engine_t::NormRxStreamState::List::List()
...@@ -723,6 +723,6 @@ zmq::norm_engine_t::NormRxStreamState* zmq::norm_engine_t::NormRxStreamState::Li ...@@ -723,6 +723,6 @@ zmq::norm_engine_t::NormRxStreamState* zmq::norm_engine_t::NormRxStreamState::Li
if (NULL != nextItem) next_item = nextItem->next; if (NULL != nextItem) next_item = nextItem->next;
return nextItem; return nextItem;
} // end zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem() } // end zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem()
#endif // ZMQ_HAVE_NORM #endif // ZMQ_HAVE_NORM
...@@ -16,17 +16,17 @@ namespace zmq ...@@ -16,17 +16,17 @@ namespace zmq
{ {
class io_thread_t; class io_thread_t;
class session_base_t; class session_base_t;
class norm_engine_t : public io_object_t, public i_engine class norm_engine_t : public io_object_t, public i_engine
{ {
public: public:
norm_engine_t (zmq::io_thread_t *parent_, const options_t &options_); norm_engine_t (zmq::io_thread_t *parent_, const options_t &options_);
~norm_engine_t (); ~norm_engine_t ();
// create NORM instance, session, etc // create NORM instance, session, etc
int init(const char* network_, bool send, bool recv); int init(const char* network_, bool send, bool recv);
void shutdown(); void shutdown();
// i_engine interface implementation. // i_engine interface implementation.
// Plug the engine to the session. // Plug the engine to the session.
virtual void plug (zmq::io_thread_t *io_thread_, virtual void plug (zmq::io_thread_t *io_thread_,
...@@ -45,43 +45,43 @@ namespace zmq ...@@ -45,43 +45,43 @@ namespace zmq
virtual void restart_output (); virtual void restart_output ();
virtual void zap_msg_available () {}; virtual void zap_msg_available () {};
// i_poll_events interface implementation. // i_poll_events interface implementation.
// (we only need in_event() for NormEvent notification) // (we only need in_event() for NormEvent notification)
// (i.e., don't have any output events or timers (yet)) // (i.e., don't have any output events or timers (yet))
void in_event (); void in_event ();
private: private:
void unplug(); void unplug();
void send_data(); void send_data();
void recv_data(NormObjectHandle stream); void recv_data(NormObjectHandle stream);
enum {BUFFER_SIZE = 2048}; enum {BUFFER_SIZE = 2048};
// Used to keep track of streams from multiple senders // Used to keep track of streams from multiple senders
class NormRxStreamState class NormRxStreamState
{ {
public: public:
NormRxStreamState(NormObjectHandle normStream, NormRxStreamState(NormObjectHandle normStream,
int64_t maxMsgSize); int64_t maxMsgSize);
~NormRxStreamState(); ~NormRxStreamState();
NormObjectHandle GetStreamHandle() const NormObjectHandle GetStreamHandle() const
{return norm_stream;} {return norm_stream;}
bool Init(); bool Init();
void SetRxReady(bool state) void SetRxReady(bool state)
{rx_ready = state;} {rx_ready = state;}
bool IsRxReady() const bool IsRxReady() const
{return rx_ready;} {return rx_ready;}
void SetSync(bool state) void SetSync(bool state)
{in_sync = state;} {in_sync = state;}
bool InSync() const bool InSync() const
{return in_sync;} {return in_sync;}
// These are used to feed data to decoder // These are used to feed data to decoder
// and its underlying "msg" buffer // and its underlying "msg" buffer
char* AccessBuffer() char* AccessBuffer()
...@@ -98,21 +98,21 @@ namespace zmq ...@@ -98,21 +98,21 @@ namespace zmq
// occurs the 'sync' is dropped and the // occurs the 'sync' is dropped and the
// decoder re-initialized // decoder re-initialized
int Decode(); int Decode();
class List class List
{ {
public: public:
List(); List();
~List(); ~List();
void Append(NormRxStreamState& item); void Append(NormRxStreamState& item);
void Remove(NormRxStreamState& item); void Remove(NormRxStreamState& item);
bool IsEmpty() const bool IsEmpty() const
{return (NULL == head);} {return (NULL == head);}
void Destroy(); void Destroy();
class Iterator class Iterator
{ {
public: public:
...@@ -122,36 +122,36 @@ namespace zmq ...@@ -122,36 +122,36 @@ namespace zmq
NormRxStreamState* next_item; NormRxStreamState* next_item;
}; };
friend class Iterator; friend class Iterator;
private: private:
NormRxStreamState* head; NormRxStreamState* head;
NormRxStreamState* tail; NormRxStreamState* tail;
}; // end class zmq::norm_engine_t::NormRxStreamState::List }; // end class zmq::norm_engine_t::NormRxStreamState::List
friend class List; friend class List;
List* AccessList() List* AccessList()
{return list;} {return list;}
private: private:
NormObjectHandle norm_stream; NormObjectHandle norm_stream;
int64_t max_msg_size; int64_t max_msg_size;
bool in_sync; bool in_sync;
bool rx_ready; bool rx_ready;
v2_decoder_t* zmq_decoder; v2_decoder_t* zmq_decoder;
bool skip_norm_sync; bool skip_norm_sync;
unsigned char* buffer_ptr; unsigned char* buffer_ptr;
size_t buffer_size; size_t buffer_size;
size_t buffer_count; size_t buffer_count;
NormRxStreamState* prev; NormRxStreamState* prev;
NormRxStreamState* next; NormRxStreamState* next;
NormRxStreamState::List* list; NormRxStreamState::List* list;
}; // end class zmq::norm_engine_t::NormRxStreamState }; // end class zmq::norm_engine_t::NormRxStreamState
session_base_t* zmq_session; session_base_t* zmq_session;
options_t options; options_t options;
NormInstanceHandle norm_instance; NormInstanceHandle norm_instance;
...@@ -161,25 +161,25 @@ namespace zmq ...@@ -161,25 +161,25 @@ namespace zmq
bool is_receiver; bool is_receiver;
// Sender state // Sender state
msg_t tx_msg; msg_t tx_msg;
v2_encoder_t zmq_encoder; // for tx messages (we use v2 for now) v2_encoder_t zmq_encoder; // for tx messages (we use v2 for now)
NormObjectHandle norm_tx_stream; NormObjectHandle norm_tx_stream;
bool tx_first_msg; bool tx_first_msg;
bool tx_more_bit; bool tx_more_bit;
bool zmq_output_ready; // zmq has msg(s) to send bool zmq_output_ready; // zmq has msg(s) to send
bool norm_tx_ready; // norm has tx queue vacancy bool norm_tx_ready; // norm has tx queue vacancy
// TBD - maybe don't need buffer if can access zmq message buffer directly? // TBD - maybe don't need buffer if can access zmq message buffer directly?
char tx_buffer[BUFFER_SIZE]; char tx_buffer[BUFFER_SIZE];
unsigned int tx_index; unsigned int tx_index;
unsigned int tx_len; unsigned int tx_len;
// Receiver state // Receiver state
// Lists of norm rx streams from remote senders // Lists of norm rx streams from remote senders
bool zmq_input_ready; // zmq ready to receive msg(s) bool zmq_input_ready; // zmq ready to receive msg(s)
NormRxStreamState::List rx_pending_list; // rx streams waiting for data reception NormRxStreamState::List rx_pending_list; // rx streams waiting for data reception
NormRxStreamState::List rx_ready_list; // rx streams ready for NormStreamRead() NormRxStreamState::List rx_ready_list; // rx streams ready for NormStreamRead()
NormRxStreamState::List msg_ready_list; // rx streams w/ msg ready for push to zmq NormRxStreamState::List msg_ready_list; // rx streams w/ msg ready for push to zmq
}; // end class norm_engine_t }; // end class norm_engine_t
} }
......
...@@ -390,12 +390,12 @@ void zmq::pipe_t::terminate (bool delay_) ...@@ -390,12 +390,12 @@ void zmq::pipe_t::terminate (bool delay_)
// If terminate was already called, we can ignore the duplicate invocation. // If terminate was already called, we can ignore the duplicate invocation.
if (state == term_req_sent1 || state == term_req_sent2) { if (state == term_req_sent1 || state == term_req_sent2) {
return; return;
} }
// If the pipe is in the final phase of async termination, it's going to // If the pipe is in the final phase of async termination, it's going to
// closed anyway. No need to do anything special here. // closed anyway. No need to do anything special here.
else if (state == term_ack_sent) { else if (state == term_ack_sent) {
return; return;
} }
// The simple sync termination case. Ask the peer to terminate and wait // The simple sync termination case. Ask the peer to terminate and wait
// for the ack. // for the ack.
else if (state == active) { else if (state == active) {
...@@ -422,7 +422,7 @@ void zmq::pipe_t::terminate (bool delay_) ...@@ -422,7 +422,7 @@ void zmq::pipe_t::terminate (bool delay_)
// There are no other states. // There are no other states.
else { else {
zmq_assert (false); zmq_assert (false);
} }
// Stop outbound flow of messages. // Stop outbound flow of messages.
out_active = false; out_active = false;
......
...@@ -432,9 +432,9 @@ bool zmq::select_t::is_retired_fd (const fd_entry_t &entry) ...@@ -432,9 +432,9 @@ bool zmq::select_t::is_retired_fd (const fd_entry_t &entry)
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
u_short zmq::select_t::get_fd_family (fd_t fd_) u_short zmq::select_t::get_fd_family (fd_t fd_)
{ {
// Use sockaddr_storage instead of sockaddr to accomodate differect structure sizes // Use sockaddr_storage instead of sockaddr to accomodate differect structure sizes
sockaddr_storage addr = { 0 }; sockaddr_storage addr = { 0 };
int addr_size = sizeof addr; int addr_size = sizeof addr;
int type; int type;
int type_length = sizeof(int); int type_length = sizeof(int);
......
...@@ -338,11 +338,11 @@ int zmq::signaler_t::recv_failable () ...@@ -338,11 +338,11 @@ int zmq::signaler_t::recv_failable ()
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
int nbytes = ::recv (r, (char *) &dummy, sizeof (dummy), 0); int nbytes = ::recv (r, (char *) &dummy, sizeof (dummy), 0);
if (nbytes == SOCKET_ERROR) { if (nbytes == SOCKET_ERROR) {
const int last_error = WSAGetLastError(); const int last_error = WSAGetLastError();
if (last_error == WSAEWOULDBLOCK) { if (last_error == WSAEWOULDBLOCK) {
errno = EAGAIN; errno = EAGAIN;
return -1; return -1;
} }
wsa_assert (last_error == WSAEWOULDBLOCK); wsa_assert (last_error == WSAEWOULDBLOCK);
} }
#else #else
...@@ -388,7 +388,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) ...@@ -388,7 +388,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
#elif defined ZMQ_HAVE_WINDOWS #elif defined ZMQ_HAVE_WINDOWS
# if !defined _WIN32_WCE # if !defined _WIN32_WCE
// Windows CE does not manage security attributes // Windows CE does not manage security attributes
SECURITY_DESCRIPTOR sd; SECURITY_DESCRIPTOR sd;
SECURITY_ATTRIBUTES sa; SECURITY_ATTRIBUTES sa;
memset (&sd, 0, sizeof sd); memset (&sd, 0, sizeof sd);
......
...@@ -432,7 +432,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, ...@@ -432,7 +432,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
return -1; return -1;
} }
memset(optval_, 0, *optvallen_); memset(optval_, 0, *optvallen_);
*((int*) optval_) = thread_safe ? 1 : 0; *((int*) optval_) = thread_safe ? 1 : 0;
*optvallen_ = sizeof (int); *optvallen_ = sizeof (int);
return 0; return 0;
} }
......
...@@ -60,7 +60,7 @@ zmq::stream_t::~stream_t () ...@@ -60,7 +60,7 @@ zmq::stream_t::~stream_t ()
void zmq::stream_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::stream_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{ {
LIBZMQ_UNUSED(subscribe_to_all_); LIBZMQ_UNUSED(subscribe_to_all_);
zmq_assert (pipe_); zmq_assert (pipe_);
......
...@@ -189,7 +189,7 @@ void zmq::tune_tcp_maxrt (fd_t sockfd_, int timeout_) ...@@ -189,7 +189,7 @@ void zmq::tune_tcp_maxrt (fd_t sockfd_, int timeout_)
// If not a single byte can be written to the socket in non-blocking mode // If not a single byte can be written to the socket in non-blocking mode
// we'll get an error (this may happen during the speculative write). // we'll get an error (this may happen during the speculative write).
const int last_error = WSAGetLastError(); const int last_error = WSAGetLastError();
if (nbytes == SOCKET_ERROR && last_error == WSAEWOULDBLOCK) if (nbytes == SOCKET_ERROR && last_error == WSAEWOULDBLOCK)
return 0; return 0;
...@@ -201,7 +201,7 @@ void zmq::tune_tcp_maxrt (fd_t sockfd_, int timeout_) ...@@ -201,7 +201,7 @@ void zmq::tune_tcp_maxrt (fd_t sockfd_, int timeout_)
last_error == WSAECONNABORTED || last_error == WSAECONNABORTED ||
last_error == WSAETIMEDOUT || last_error == WSAETIMEDOUT ||
last_error == WSAECONNRESET last_error == WSAECONNRESET
)) ))
return -1; return -1;
// Circumvent a Windows bug: // Circumvent a Windows bug:
...@@ -256,12 +256,12 @@ int zmq::tcp_read (fd_t s_, void *data_, size_t size_) ...@@ -256,12 +256,12 @@ int zmq::tcp_read (fd_t s_, void *data_, size_t size_)
errno = EAGAIN; errno = EAGAIN;
} }
else { else {
wsa_assert (last_error == WSAENETDOWN || wsa_assert (last_error == WSAENETDOWN ||
last_error == WSAENETRESET || last_error == WSAENETRESET ||
last_error == WSAECONNABORTED || last_error == WSAECONNABORTED ||
last_error == WSAETIMEDOUT || last_error == WSAETIMEDOUT ||
last_error == WSAECONNRESET || last_error == WSAECONNRESET ||
last_error == WSAECONNREFUSED || last_error == WSAECONNREFUSED ||
last_error == WSAENOTCONN); last_error == WSAENOTCONN);
errno = wsa_error_to_errno (last_error); errno = wsa_error_to_errno (last_error);
} }
......
...@@ -281,7 +281,7 @@ zmq::fd_t zmq::tcp_listener_t::accept () ...@@ -281,7 +281,7 @@ zmq::fd_t zmq::tcp_listener_t::accept ()
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
if (sock == INVALID_SOCKET) { if (sock == INVALID_SOCKET) {
const int last_error = WSAGetLastError(); const int last_error = WSAGetLastError();
wsa_assert (last_error == WSAEWOULDBLOCK || wsa_assert (last_error == WSAEWOULDBLOCK ||
last_error == WSAECONNRESET || last_error == WSAECONNRESET ||
last_error == WSAEMFILE || last_error == WSAEMFILE ||
......
...@@ -573,8 +573,8 @@ int crypto_hashblocks(u8 *x,const u8 *m,u64 n) ...@@ -573,8 +573,8 @@ int crypto_hashblocks(u8 *x,const u8 *m,u64 n)
b[3] += t; b[3] += t;
FOR(j,8) a[(j+1)%8] = b[j]; FOR(j,8) a[(j+1)%8] = b[j];
if (i%16 == 15) if (i%16 == 15)
FOR(j,16) FOR(j,16)
w[j] += w[(j+9)%16] + sigma0(w[(j+1)%16]) + sigma1(w[(j+14)%16]); w[j] += w[(j+9)%16] + sigma0(w[(j+1)%16]) + sigma1(w[(j+14)%16]);
} }
FOR(i,8) { a[i] += z[i]; z[i] = a[i]; } FOR(i,8) { a[i] += z[i]; z[i] = a[i]; }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment