Commit 7942db76 authored by Martin Hurton's avatar Martin Hurton

Refactor code so that messages go through engines

parent c3961442
...@@ -26,8 +26,6 @@ libzmq_la_SOURCES = \ ...@@ -26,8 +26,6 @@ libzmq_la_SOURCES = \
i_encoder.hpp \ i_encoder.hpp \
i_decoder.hpp \ i_decoder.hpp \
i_engine.hpp \ i_engine.hpp \
i_msg_sink.hpp \
i_msg_source.hpp \
i_poll_events.hpp \ i_poll_events.hpp \
io_object.hpp \ io_object.hpp \
io_thread.hpp \ io_thread.hpp \
......
...@@ -32,8 +32,6 @@ ...@@ -32,8 +32,6 @@
namespace zmq namespace zmq
{ {
class i_msg_sink;
// Helper base class for decoders that know the amount of data to read // Helper base class for decoders that know the amount of data to read
// in advance at any moment. Knowing the amount in advance is a property // in advance at any moment. Knowing the amount in advance is a property
// of the protocol used. 0MQ framing protocol is based size-prefixed // of the protocol used. 0MQ framing protocol is based size-prefixed
...@@ -89,106 +87,66 @@ namespace zmq ...@@ -89,106 +87,66 @@ namespace zmq
// Processes the data in the buffer previously allocated using // Processes the data in the buffer previously allocated using
// get_buffer function. size_ argument specifies nemuber of bytes // get_buffer function. size_ argument specifies nemuber of bytes
// actually filled into the buffer. Function returns number of // actually filled into the buffer. Function returns 1 when the
// bytes actually processed. // whole message was decoded or 0 when more data is required.
inline size_t process_buffer (unsigned char *data_, size_t size_) // On error, -1 is returned and errno set accordingly.
// Number of bytes processed is returned in byts_used_.
inline int decode (const unsigned char *data_, size_t size_,
size_t &bytes_used_)
{ {
// Check if we had an error in previous attempt. bytes_used_ = 0;
if (unlikely (!(static_cast <T*> (this)->next)))
return (size_t) -1;
// In case of zero-copy simply adjust the pointers, no copying // In case of zero-copy simply adjust the pointers, no copying
// is required. Also, run the state machine in case all the data // is required. Also, run the state machine in case all the data
// were processed. // were processed.
if (data_ == read_pos) { if (data_ == read_pos) {
zmq_assert (size_ <= to_read);
read_pos += size_; read_pos += size_;
to_read -= size_; to_read -= size_;
bytes_used_ = size_;
while (!to_read) { while (!to_read) {
if (!(static_cast <T*> (this)->*next) ()) { const int rc = (static_cast <T*> (this)->*next) ();
if (unlikely (!(static_cast <T*> (this)->next))) if (rc != 0)
return (size_t) -1; return rc;
return size_;
}
}
return size_;
}
size_t pos = 0;
while (true) {
// Try to get more space in the message to fill in.
// If none is available, return.
while (!to_read) {
if (!(static_cast <T*> (this)->*next) ()) {
if (unlikely (!(static_cast <T*> (this)->next)))
return (size_t) -1;
return pos;
} }
return 0;
} }
// If there are no more data in the buffer, return. while (bytes_used_ < size_) {
if (pos == size_)
return pos;
// Copy the data from buffer to the message. // Copy the data from buffer to the message.
size_t to_copy = std::min (to_read, size_ - pos); const size_t to_copy = std::min (to_read, size_ - bytes_used_);
memcpy (read_pos, data_ + pos, to_copy); memcpy (read_pos, data_ + bytes_used_, to_copy);
read_pos += to_copy; read_pos += to_copy;
pos += to_copy;
to_read -= to_copy; to_read -= to_copy;
bytes_used_ += to_copy;
// Try to get more space in the message to fill in.
// If none is available, return.
while (to_read == 0) {
const int rc = (static_cast <T*> (this)->*next) ();
if (rc != 0)
return rc;
} }
} }
// Returns true if the decoder has been fed all required data return 0;
// but cannot proceed with the next decoding step.
// False is returned if the decoder has encountered an error.
bool stalled ()
{
// Check whether there was decoding error.
if (unlikely (!(static_cast <T*> (this)->next)))
return false;
while (!to_read) {
if (!(static_cast <T*> (this)->*next) ()) {
if (unlikely (!(static_cast <T*> (this)->next)))
return false;
return true;
}
}
return false;
}
inline bool message_ready_size (size_t /* msg_sz */)
{
zmq_assert (false);
return false;
} }
protected: protected:
// Prototype of state machine action. Action should return false if // Prototype of state machine action. Action should return false if
// it is unable to push the data to the system. // it is unable to push the data to the system.
typedef bool (T::*step_t) (); typedef int (T::*step_t) ();
// This function should be called from derived class to read data // This function should be called from derived class to read data
// from the buffer and schedule next state machine action. // from the buffer and schedule next state machine action.
inline void next_step (void *read_pos_, size_t to_read_, inline void next_step (void *read_pos_, size_t to_read_, step_t next_)
step_t next_)
{ {
read_pos = (unsigned char*) read_pos_; read_pos = (unsigned char*) read_pos_;
to_read = to_read_; to_read = to_read_;
next = next_; next = next_;
} }
// This function should be called from the derived class to
// abort decoder state machine.
inline void decoding_error ()
{
next = NULL;
}
private: private:
// Next step. If set to NULL, it means that associated data stream // Next step. If set to NULL, it means that associated data stream
......
...@@ -38,8 +38,6 @@ ...@@ -38,8 +38,6 @@
namespace zmq namespace zmq
{ {
class i_msg_source;
// Helper base class for encoders. It implements the state machine that // Helper base class for encoders. It implements the state machine that
// fills the outgoing buffer. Derived classes should implement individual // fills the outgoing buffer. Derived classes should implement individual
// state machine actions. // state machine actions.
...@@ -49,7 +47,8 @@ namespace zmq ...@@ -49,7 +47,8 @@ namespace zmq
public: public:
inline encoder_base_t (size_t bufsize_) : inline encoder_base_t (size_t bufsize_) :
bufsize (bufsize_) bufsize (bufsize_),
in_progress (NULL)
{ {
buf = (unsigned char*) malloc (bufsize_); buf = (unsigned char*) malloc (bufsize_);
alloc_assert (buf); alloc_assert (buf);
...@@ -65,17 +64,13 @@ namespace zmq ...@@ -65,17 +64,13 @@ namespace zmq
// The function returns a batch of binary data. The data // The function returns a batch of binary data. The data
// are filled to a supplied buffer. If no buffer is supplied (data_ // are filled to a supplied buffer. If no buffer is supplied (data_
// points to NULL) decoder object will provide buffer of its own. // points to NULL) decoder object will provide buffer of its own.
// If offset is not NULL, it is filled by offset of the first message inline size_t encode (unsigned char **data_, size_t size_)
// in the batch.If there's no beginning of a message in the batch,
// offset is set to -1.
inline void get_data (unsigned char **data_, size_t *size_,
int *offset_ = NULL)
{ {
unsigned char *buffer = !*data_ ? buf : *data_; unsigned char *buffer = !*data_ ? buf : *data_;
size_t buffersize = !*data_ ? bufsize : *size_; size_t buffersize = !*data_ ? bufsize : size_;
if (offset_) if (in_progress == NULL)
*offset_ = -1; return 0;
size_t pos = 0; size_t pos = 0;
while (pos < buffersize) { while (pos < buffersize) {
...@@ -84,15 +79,16 @@ namespace zmq ...@@ -84,15 +79,16 @@ namespace zmq
// If there are still no data, return what we already have // If there are still no data, return what we already have
// in the buffer. // in the buffer.
if (!to_write) { if (!to_write) {
// If we are to encode the beginning of a new message, if (new_msg_flag) {
// adjust the message offset. int rc = in_progress->close ();
if (beginning) errno_assert (rc == 0);
if (offset_ && *offset_ == -1) rc = in_progress->init ();
*offset_ = static_cast <int> (pos); errno_assert (rc == 0);
in_progress = NULL;
if (!(static_cast <T*> (this)->*next) ())
break; break;
} }
(static_cast <T*> (this)->*next) ();
}
// If there are no data in the buffer yet and we are able to // If there are no data in the buffer yet and we are able to
// fill whole buffer in a single go, let's use zero-copy. // fill whole buffer in a single go, let's use zero-copy.
...@@ -106,10 +102,10 @@ namespace zmq ...@@ -106,10 +102,10 @@ namespace zmq
// amounts of time. // amounts of time.
if (!pos && !*data_ && to_write >= buffersize) { if (!pos && !*data_ && to_write >= buffersize) {
*data_ = write_pos; *data_ = write_pos;
*size_ = to_write; pos = to_write;
write_pos = NULL; write_pos = NULL;
to_write = 0; to_write = 0;
return; return pos;
} }
// Copy data to the buffer. If the buffer is full, return. // Copy data to the buffer. If the buffer is full, return.
...@@ -121,7 +117,14 @@ namespace zmq ...@@ -121,7 +117,14 @@ namespace zmq
} }
*data_ = buffer; *data_ = buffer;
*size_ = pos; return pos;
}
void load_msg (msg_t *msg_)
{
zmq_assert (in_progress == NULL);
in_progress = msg_;
(static_cast <T*> (this)->*next) ();
} }
inline bool has_data () inline bool has_data ()
...@@ -132,18 +135,17 @@ namespace zmq ...@@ -132,18 +135,17 @@ namespace zmq
protected: protected:
// Prototype of state machine action. // Prototype of state machine action.
typedef bool (T::*step_t) (); typedef void (T::*step_t) ();
// This function should be called from derived class to write the data // This function should be called from derived class to write the data
// to the buffer and schedule next state machine action. Set beginning // to the buffer and schedule next state machine action.
// to true when you are writing first byte of a message.
inline void next_step (void *write_pos_, size_t to_write_, inline void next_step (void *write_pos_, size_t to_write_,
step_t next_, bool beginning_) step_t next_, bool new_msg_flag_)
{ {
write_pos = (unsigned char*) write_pos_; write_pos = (unsigned char*) write_pos_;
to_write = to_write_; to_write = to_write_;
next = next_; next = next_;
beginning = beginning_; new_msg_flag = new_msg_flag_;
} }
private: private:
...@@ -158,8 +160,7 @@ namespace zmq ...@@ -158,8 +160,7 @@ namespace zmq
// is dead. // is dead.
step_t next; step_t next;
// If true, first byte of the message is being written. bool new_msg_flag;
bool beginning;
// The buffer for encoded data. // The buffer for encoded data.
size_t bufsize; size_t bufsize;
...@@ -167,6 +168,11 @@ namespace zmq ...@@ -167,6 +168,11 @@ namespace zmq
encoder_base_t (const encoder_base_t&); encoder_base_t (const encoder_base_t&);
void operator = (const encoder_base_t&); void operator = (const encoder_base_t&);
protected:
msg_t *in_progress;
}; };
} }
......
...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
namespace zmq namespace zmq
{ {
class i_msg_sink; class msg_t;
// Interface to be implemented by message decoder. // Interface to be implemented by message decoder.
...@@ -34,15 +34,16 @@ namespace zmq ...@@ -34,15 +34,16 @@ namespace zmq
public: public:
virtual ~i_decoder () {} virtual ~i_decoder () {}
virtual void set_msg_sink (i_msg_sink *msg_sink_) = 0;
virtual void get_buffer (unsigned char **data_, size_t *size_) = 0; virtual void get_buffer (unsigned char **data_, size_t *size_) = 0;
virtual size_t process_buffer (unsigned char *data_, size_t size_) = 0; // Decodes data pointed to by data_.
// When a message is decoded, 1 is returned.
virtual bool stalled () = 0; // When the decoder needs more data, 0 is returnd.
// On error, -1 is returned and errno is set accordingly.
virtual int decode (const unsigned char *data_, size_t size_,
size_t &processed) = 0;
virtual bool message_ready_size (size_t msg_sz) = 0; virtual msg_t *msg () = 0;
}; };
} }
......
...@@ -26,7 +26,7 @@ namespace zmq ...@@ -26,7 +26,7 @@ namespace zmq
{ {
// Forward declaration // Forward declaration
class i_msg_source; class msg_t;
// Interface to be implemented by message encoder. // Interface to be implemented by message encoder.
...@@ -34,17 +34,14 @@ namespace zmq ...@@ -34,17 +34,14 @@ namespace zmq
{ {
virtual ~i_encoder () {} virtual ~i_encoder () {}
// Set message producer.
virtual void set_msg_source (i_msg_source *msg_source_) = 0;
// The function returns a batch of binary data. The data // The function returns a batch of binary data. The data
// are filled to a supplied buffer. If no buffer is supplied (data_ // are filled to a supplied buffer. If no buffer is supplied (data_
// is NULL) encoder will provide buffer of its own. // is NULL) encoder will provide buffer of its own.
// If offset is not NULL, it is filled by offset of the first message // Function returns 0 when a new message is required.
// in the batch.If there's no beginning of a message in the batch, virtual size_t encode (unsigned char **data_, size_t size) = 0;
// offset is set to -1.
virtual void get_data (unsigned char **data_, size_t *size_, // Load a new message into encoder.
int *offset_ = NULL) = 0; virtual void load_msg (msg_t *msg_) = 0;
virtual bool has_data () = 0; virtual bool has_data () = 0;
}; };
......
/*
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_I_MSG_SINK_HPP_INCLUDED__
#define __ZMQ_I_MSG_SINK_HPP_INCLUDED__
namespace zmq
{
// Forward declaration
class msg_t;
// Interface to be implemented by message sink.
class i_msg_sink
{
public:
virtual ~i_msg_sink () {}
// Delivers a message. Returns 0 if successful; -1 otherwise.
// The function takes ownership of the passed message.
virtual int push_msg (msg_t *msg_) = 0;
};
}
#endif
/*
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_I_MSG_SOURCE_HPP_INCLUDED__
#define __ZMQ_I_MSG_SOURCE_HPP_INCLUDED__
namespace zmq
{
// Forward declaration
class msg_t;
// Interface to be implemented by message source.
class i_msg_source
{
public:
virtual ~i_msg_source () {}
// Fetch a message. Returns 0 if successful; -1 otherwise.
// The caller is responsible for freeing the message when no
// longer used.
virtual int pull_msg (msg_t *msg_) = 0;
};
}
#endif
...@@ -41,8 +41,8 @@ zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_, ...@@ -41,8 +41,8 @@ zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
pgm_socket (true, options_), pgm_socket (true, options_),
options (options_), options (options_),
session (NULL), session (NULL),
mru_decoder (NULL), active_tsi (NULL),
pending_bytes (0) insize (0)
{ {
} }
...@@ -83,9 +83,7 @@ void zmq::pgm_receiver_t::unplug () ...@@ -83,9 +83,7 @@ void zmq::pgm_receiver_t::unplug ()
delete it->second.decoder; delete it->second.decoder;
} }
peers.clear (); peers.clear ();
active_tsi = NULL;
mru_decoder = NULL;
pending_bytes = 0;
if (has_rx_timer) { if (has_rx_timer) {
cancel_timer (rx_timer_id); cancel_timer (rx_timer_id);
...@@ -111,49 +109,47 @@ void zmq::pgm_receiver_t::activate_out () ...@@ -111,49 +109,47 @@ void zmq::pgm_receiver_t::activate_out ()
void zmq::pgm_receiver_t::activate_in () void zmq::pgm_receiver_t::activate_in ()
{ {
// It is possible that the most recently used decoder zmq_assert (session != NULL);
// processed the whole buffer but failed to write zmq_assert (active_tsi != NULL);
// the last message into the pipe.
if (pending_bytes == 0) { const peers_t::iterator it = peers.find (*active_tsi);
if (mru_decoder != NULL) { zmq_assert (it != peers.end ());
mru_decoder->process_buffer (NULL, 0); zmq_assert (it->second.joined);
// Push the pending message into the session.
int rc = session->push_msg (it->second.decoder->msg ());
errno_assert (rc == 0);
if (insize > 0) {
rc = process_input (it->second.decoder);
if (rc == -1) {
// HWM reached; we will try later.
if (errno == EAGAIN) {
session->flush (); session->flush ();
}
// Resume polling.
set_pollin (pipe_handle);
set_pollin (socket_handle);
return; return;
} }
// Data error. Delete message decoder, mark the
zmq_assert (mru_decoder != NULL); // peer as not joined and drop remaining data.
zmq_assert (pending_ptr != NULL); it->second.joined = false;
delete it->second.decoder;
// Ask the decoder to process remaining data. it->second.decoder = NULL;
size_t n = mru_decoder->process_buffer (pending_ptr, pending_bytes); insize = 0;
pending_bytes -= n; }
session->flush (); }
if (pending_bytes > 0)
return;
// Resume polling. // Resume polling.
set_pollin (pipe_handle); set_pollin (pipe_handle);
set_pollin (socket_handle); set_pollin (socket_handle);
active_tsi = NULL;
in_event (); in_event ();
} }
void zmq::pgm_receiver_t::in_event () void zmq::pgm_receiver_t::in_event ()
{ {
// Read data from the underlying pgm_socket. // Read data from the underlying pgm_socket.
unsigned char *data = NULL;
const pgm_tsi_t *tsi = NULL; const pgm_tsi_t *tsi = NULL;
if (pending_bytes > 0)
return;
if (has_rx_timer) { if (has_rx_timer) {
cancel_timer (rx_timer_id); cancel_timer (rx_timer_id);
has_rx_timer = false; has_rx_timer = false;
...@@ -167,7 +163,7 @@ void zmq::pgm_receiver_t::in_event () ...@@ -167,7 +163,7 @@ void zmq::pgm_receiver_t::in_event ()
// Note the workaround made not to break strict-aliasing rules. // Note the workaround made not to break strict-aliasing rules.
void *tmp = NULL; void *tmp = NULL;
ssize_t received = pgm_socket.receive (&tmp, &tsi); ssize_t received = pgm_socket.receive (&tmp, &tsi);
data = (unsigned char*) tmp; inpos = (unsigned char*) tmp;
// No data to process. This may happen if the packet received is // No data to process. This may happen if the packet received is
// neither ODATA nor ODATA. // neither ODATA nor ODATA.
...@@ -187,8 +183,6 @@ void zmq::pgm_receiver_t::in_event () ...@@ -187,8 +183,6 @@ void zmq::pgm_receiver_t::in_event ()
if (received == -1) { if (received == -1) {
if (it != peers.end ()) { if (it != peers.end ()) {
it->second.joined = false; it->second.joined = false;
if (it->second.decoder == mru_decoder)
mru_decoder = NULL;
if (it->second.decoder != NULL) { if (it->second.decoder != NULL) {
delete it->second.decoder; delete it->second.decoder;
it->second.decoder = NULL; it->second.decoder = NULL;
...@@ -203,11 +197,13 @@ void zmq::pgm_receiver_t::in_event () ...@@ -203,11 +197,13 @@ void zmq::pgm_receiver_t::in_event ()
it = peers.insert (peers_t::value_type (*tsi, peer_info)).first; it = peers.insert (peers_t::value_type (*tsi, peer_info)).first;
} }
insize = static_cast <size_t> (received);
// Read the offset of the fist message in the current packet. // Read the offset of the fist message in the current packet.
zmq_assert ((size_t) received >= sizeof (uint16_t)); zmq_assert (insize >= sizeof (uint16_t));
uint16_t offset = get_uint16 (data); uint16_t offset = get_uint16 (inpos);
data += sizeof (uint16_t); inpos += sizeof (uint16_t);
received -= sizeof (uint16_t); insize -= sizeof (uint16_t);
// Join the stream if needed. // Join the stream if needed.
if (!it->second.joined) { if (!it->second.joined) {
...@@ -217,12 +213,12 @@ void zmq::pgm_receiver_t::in_event () ...@@ -217,12 +213,12 @@ void zmq::pgm_receiver_t::in_event ()
if (offset == 0xffff) if (offset == 0xffff)
continue; continue;
zmq_assert (offset <= received); zmq_assert (offset <= insize);
zmq_assert (it->second.decoder == NULL); zmq_assert (it->second.decoder == NULL);
// We have to move data to the begining of the first message. // We have to move data to the begining of the first message.
data += offset; inpos += offset;
received -= offset; insize -= offset;
// Mark the stream as joined. // Mark the stream as joined.
it->second.joined = true; it->second.joined = true;
...@@ -231,28 +227,24 @@ void zmq::pgm_receiver_t::in_event () ...@@ -231,28 +227,24 @@ void zmq::pgm_receiver_t::in_event ()
it->second.decoder = new (std::nothrow) it->second.decoder = new (std::nothrow)
v1_decoder_t (0, options.maxmsgsize); v1_decoder_t (0, options.maxmsgsize);
alloc_assert (it->second.decoder); alloc_assert (it->second.decoder);
it->second.decoder->set_msg_sink (session);
} }
mru_decoder = it->second.decoder; int rc = process_input (it->second.decoder);
if (rc == -1) {
if (errno == EAGAIN) {
active_tsi = tsi;
// Push all the data to the decoder.
ssize_t processed = it->second.decoder->process_buffer (data, received);
if (processed < received) {
// Save some state so we can resume the decoding process later.
pending_bytes = received - processed;
pending_ptr = data + processed;
// Stop polling. // Stop polling.
reset_pollin (pipe_handle); reset_pollin (pipe_handle);
reset_pollin (socket_handle); reset_pollin (socket_handle);
// Reset outstanding timer. break;
if (has_rx_timer) {
cancel_timer (rx_timer_id);
has_rx_timer = false;
} }
break; it->second.joined = false;
delete it->second.decoder;
it->second.decoder = NULL;
insize = 0;
} }
} }
...@@ -260,6 +252,29 @@ void zmq::pgm_receiver_t::in_event () ...@@ -260,6 +252,29 @@ void zmq::pgm_receiver_t::in_event ()
session->flush (); session->flush ();
} }
int zmq::pgm_receiver_t::process_input (v1_decoder_t *decoder)
{
zmq_assert (session != NULL);
while (insize > 0) {
size_t n = 0;
int rc = decoder->decode (inpos, insize, n);
if (rc == -1)
return -1;
inpos += n;
insize -= n;
if (rc == 0)
break;
rc = session->push_msg (decoder->msg ());
if (rc == -1) {
errno_assert (errno == EAGAIN);
return -1;
}
}
return 0;
}
void zmq::pgm_receiver_t::timer_event (int token) void zmq::pgm_receiver_t::timer_event (int token)
{ {
zmq_assert (token == rx_timer_id); zmq_assert (token == rx_timer_id);
......
...@@ -69,6 +69,10 @@ namespace zmq ...@@ -69,6 +69,10 @@ namespace zmq
// Unplug the engine from the session. // Unplug the engine from the session.
void unplug (); void unplug ();
// Decode received data (inpos, insize) and forward decoded
// messages to the session.
int process_input (v1_decoder_t *decoder);
// PGM is not able to move subscriptions upstream. Thus, drop all // PGM is not able to move subscriptions upstream. Thus, drop all
// the pending subscriptions. // the pending subscriptions.
void drop_subscriptions (); void drop_subscriptions ();
...@@ -112,14 +116,13 @@ namespace zmq ...@@ -112,14 +116,13 @@ namespace zmq
// Associated session. // Associated session.
zmq::session_base_t *session; zmq::session_base_t *session;
// Most recently used decoder. const pgm_tsi_t *active_tsi;
v1_decoder_t *mru_decoder;
// Number of bytes not consumed by the decoder due to pipe overflow. // Number of bytes not consumed by the decoder due to pipe overflow.
size_t pending_bytes; size_t insize;
// Pointer to data still waiting to be processed by the decoder. // Pointer to data still waiting to be processed by the decoder.
unsigned char *pending_ptr; const unsigned char *inpos;
// Poll handle associated with PGM socket. // Poll handle associated with PGM socket.
handle_t socket_handle; handle_t socket_handle;
......
...@@ -39,13 +39,17 @@ zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_, ...@@ -39,13 +39,17 @@ zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
io_object_t (parent_), io_object_t (parent_),
has_tx_timer (false), has_tx_timer (false),
has_rx_timer (false), has_rx_timer (false),
session (NULL),
encoder (0), encoder (0),
more_flag (false),
pgm_socket (false, options_), pgm_socket (false, options_),
options (options_), options (options_),
out_buffer (NULL), out_buffer (NULL),
out_buffer_size (0), out_buffer_size (0),
write_size (0) write_size (0)
{ {
int rc = msg.init ();
errno_assert (rc == 0);
} }
int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_) int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
...@@ -69,7 +73,7 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_) ...@@ -69,7 +73,7 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_)
fd_t rdata_notify_fd = retired_fd; fd_t rdata_notify_fd = retired_fd;
fd_t pending_notify_fd = retired_fd; fd_t pending_notify_fd = retired_fd;
encoder.set_msg_source (session_); session = session_;
// Fill fds from PGM transport and add them to the poller. // Fill fds from PGM transport and add them to the poller.
pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd, pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd,
...@@ -106,7 +110,7 @@ void zmq::pgm_sender_t::unplug () ...@@ -106,7 +110,7 @@ void zmq::pgm_sender_t::unplug ()
rm_fd (uplink_handle); rm_fd (uplink_handle);
rm_fd (rdata_notify_handle); rm_fd (rdata_notify_handle);
rm_fd (pending_notify_handle); rm_fd (pending_notify_handle);
encoder.set_msg_source (NULL); session = NULL;
} }
void zmq::pgm_sender_t::terminate () void zmq::pgm_sender_t::terminate ()
...@@ -128,6 +132,9 @@ void zmq::pgm_sender_t::activate_in () ...@@ -128,6 +132,9 @@ void zmq::pgm_sender_t::activate_in ()
zmq::pgm_sender_t::~pgm_sender_t () zmq::pgm_sender_t::~pgm_sender_t ()
{ {
int rc = msg.close ();
errno_assert (rc == 0);
if (out_buffer) { if (out_buffer) {
free (out_buffer); free (out_buffer);
out_buffer = NULL; out_buffer = NULL;
...@@ -161,18 +168,31 @@ void zmq::pgm_sender_t::out_event () ...@@ -161,18 +168,31 @@ void zmq::pgm_sender_t::out_event ()
// the get data function we prevent it from returning its own buffer. // the get data function we prevent it from returning its own buffer.
unsigned char *bf = out_buffer + sizeof (uint16_t); unsigned char *bf = out_buffer + sizeof (uint16_t);
size_t bfsz = out_buffer_size - sizeof (uint16_t); size_t bfsz = out_buffer_size - sizeof (uint16_t);
int offset = -1; uint16_t offset = 0xffff;
encoder.get_data (&bf, &bfsz, &offset);
size_t bytes = encoder.encode (&bf, bfsz);
while (bytes < bfsz) {
if (!more_flag && offset == 0xffff)
offset = static_cast <uint16_t> (bytes);
int rc = session->pull_msg (&msg);
if (rc == -1)
break;
more_flag = msg.flags () & msg_t::more;
encoder.load_msg (&msg);
bf = out_buffer + sizeof (uint16_t) + bytes;
bytes += encoder.encode (&bf, bfsz - bytes);
}
// If there are no data to write stop polling for output. // If there are no data to write stop polling for output.
if (!bfsz) { if (bytes == 0) {
reset_pollout (handle); reset_pollout (handle);
return; return;
} }
write_size = sizeof (uint16_t) + bytes;
// Put offset information in the buffer. // Put offset information in the buffer.
write_size = bfsz + sizeof (uint16_t); put_uint16 (out_buffer, offset);
put_uint16 (out_buffer, offset == -1 ? 0xffff : (uint16_t) offset);
} }
if (has_tx_timer) { if (has_tx_timer) {
......
...@@ -34,6 +34,7 @@ ...@@ -34,6 +34,7 @@
#include "options.hpp" #include "options.hpp"
#include "pgm_socket.hpp" #include "pgm_socket.hpp"
#include "v1_encoder.hpp" #include "v1_encoder.hpp"
#include "msg.hpp"
namespace zmq namespace zmq
{ {
...@@ -75,9 +76,16 @@ namespace zmq ...@@ -75,9 +76,16 @@ namespace zmq
bool has_tx_timer; bool has_tx_timer;
bool has_rx_timer; bool has_rx_timer;
session_base_t *session;
// Message encoder. // Message encoder.
v1_encoder_t encoder; v1_encoder_t encoder;
msg_t msg;
// Keeps track of message boundaries.
bool more_flag;
// PGM socket. // PGM socket.
pgm_socket_t pgm_socket; pgm_socket_t pgm_socket;
......
...@@ -26,72 +26,38 @@ ...@@ -26,72 +26,38 @@
#endif #endif
#include "raw_decoder.hpp" #include "raw_decoder.hpp"
#include "likely.hpp"
#include "wire.hpp"
#include "err.hpp" #include "err.hpp"
zmq::raw_decoder_t::raw_decoder_t (size_t bufsize_, zmq::raw_decoder_t::raw_decoder_t (size_t bufsize_) :
int64_t maxmsgsize_, i_msg_sink *msg_sink_) : bufsize (bufsize_)
decoder_base_t <raw_decoder_t> (bufsize_),
msg_sink (msg_sink_),
maxmsgsize (maxmsgsize_)
{ {
int rc = in_progress.init (); int rc = in_progress.init ();
errno_assert (rc == 0); errno_assert (rc == 0);
buffer = (unsigned char *) malloc (bufsize);
alloc_assert (buffer);
} }
zmq::raw_decoder_t::~raw_decoder_t () zmq::raw_decoder_t::~raw_decoder_t ()
{ {
int rc = in_progress.close (); int rc = in_progress.close ();
errno_assert (rc == 0); errno_assert (rc == 0);
}
void zmq::raw_decoder_t::set_msg_sink (i_msg_sink *msg_sink_) free (buffer);
{
msg_sink = msg_sink_;
} }
bool zmq::raw_decoder_t::stalled () void zmq::raw_decoder_t::get_buffer (unsigned char **data_, size_t *size_)
{ {
return false; *data_ = buffer;
*size_ = bufsize;
} }
bool zmq::raw_decoder_t::message_ready_size (size_t msg_sz) int zmq::raw_decoder_t::decode (const uint8_t *data_, size_t size_,
size_t &bytes_used_)
{ {
int rc = in_progress.init_size (msg_sz); int rc = in_progress.init_size (size_);
if (rc != 0) { errno_assert (rc != -1);
errno_assert (errno == ENOMEM); memcpy (in_progress.data (), data_, size_);
rc = in_progress.init (); bytes_used_ = size_;
errno_assert (rc == 0); return 1;
decoding_error ();
return false;
}
next_step (in_progress.data (), in_progress.size (),
&raw_decoder_t::raw_message_ready);
return true;
}
bool zmq::raw_decoder_t::raw_message_ready ()
{
zmq_assert (in_progress.size ());
// Message is completely read. Push it further and start reading
// new message. (in_progress is a 0-byte message after this point.)
if (unlikely (!msg_sink))
return false;
int rc = msg_sink->push_msg (&in_progress);
if (unlikely (rc != 0)) {
if (errno != EAGAIN)
decoding_error ();
return false;
}
// NOTE: This is just to break out of process_buffer
// raw_message_ready should never get called in state machine w/o
// message_ready_size from stream_engine.
next_step (in_progress.data (), 1,
&raw_decoder_t::raw_message_ready);
return true;
} }
...@@ -22,9 +22,7 @@ ...@@ -22,9 +22,7 @@
#include "err.hpp" #include "err.hpp"
#include "msg.hpp" #include "msg.hpp"
#include "decoder.hpp" #include "i_decoder.hpp"
#include "raw_decoder.hpp"
#include "i_msg_sink.hpp"
#include "stdint.hpp" #include "stdint.hpp"
namespace zmq namespace zmq
...@@ -32,30 +30,31 @@ namespace zmq ...@@ -32,30 +30,31 @@ namespace zmq
// Decoder for 0MQ v1 framing protocol. Converts data stream into messages. // Decoder for 0MQ v1 framing protocol. Converts data stream into messages.
class raw_decoder_t : public decoder_base_t <raw_decoder_t> class raw_decoder_t : public i_decoder
{ {
public: public:
raw_decoder_t (size_t bufsize_, raw_decoder_t (size_t bufsize_);
int64_t maxmsgsize_, i_msg_sink *msg_sink_);
virtual ~raw_decoder_t (); virtual ~raw_decoder_t ();
// i_decoder interface. // i_decoder interface.
virtual void set_msg_sink (i_msg_sink *msg_sink_);
virtual bool stalled (); virtual void get_buffer (unsigned char **data_, size_t *size_);
virtual bool message_ready_size (size_t msg_sz); virtual int decode (const unsigned char *data_, size_t size_,
size_t &processed);
virtual msg_t *msg () { return &in_progress; }
private:
private:
bool raw_message_ready ();
i_msg_sink *msg_sink;
msg_t in_progress; msg_t in_progress;
const int64_t maxmsgsize; const int64_t bufsize;
unsigned char *buffer;
raw_decoder_t (const raw_decoder_t&); raw_decoder_t (const raw_decoder_t&);
void operator = (const raw_decoder_t&); void operator = (const raw_decoder_t&);
......
...@@ -19,65 +19,22 @@ ...@@ -19,65 +19,22 @@
#include "encoder.hpp" #include "encoder.hpp"
#include "raw_encoder.hpp" #include "raw_encoder.hpp"
#include "i_msg_source.hpp"
#include "likely.hpp" #include "likely.hpp"
#include "wire.hpp" #include "wire.hpp"
zmq::raw_encoder_t::raw_encoder_t (size_t bufsize_, i_msg_source *msg_source_) : zmq::raw_encoder_t::raw_encoder_t (size_t bufsize_) :
encoder_base_t <raw_encoder_t> (bufsize_), encoder_base_t <raw_encoder_t> (bufsize_)
msg_source (msg_source_)
{ {
int rc = in_progress.init ();
errno_assert (rc == 0);
// Write 0 bytes to the batch and go to message_ready state. // Write 0 bytes to the batch and go to message_ready state.
next_step (NULL, 0, &raw_encoder_t::raw_message_ready, true); next_step (NULL, 0, &raw_encoder_t::raw_message_ready, true);
} }
zmq::raw_encoder_t::~raw_encoder_t () zmq::raw_encoder_t::~raw_encoder_t ()
{ {
int rc = in_progress.close ();
errno_assert (rc == 0);
}
void zmq::raw_encoder_t::set_msg_source (i_msg_source *msg_source_)
{
msg_source = msg_source_;
}
bool zmq::raw_encoder_t::raw_message_size_ready ()
{
// Write message body into the buffer.
next_step (in_progress.data (), in_progress.size (),
&raw_encoder_t::raw_message_ready, !(in_progress.flags () & msg_t::more));
return true;
} }
bool zmq::raw_encoder_t::raw_message_ready () void zmq::raw_encoder_t::raw_message_ready ()
{ {
// Destroy content of the old message. next_step (in_progress->data (), in_progress->size (),
int rc = in_progress.close (); &raw_encoder_t::raw_message_ready, true);
errno_assert (rc == 0);
// Read new message. If there is none, return false.
// Note that new state is set only if write is successful. That way
// unsuccessful write will cause retry on the next state machine
// invocation.
if (unlikely (!msg_source)) {
rc = in_progress.init ();
errno_assert (rc == 0);
return false;
}
rc = msg_source->pull_msg (&in_progress);
if (unlikely (rc != 0)) {
errno_assert (errno == EAGAIN);
rc = in_progress.init ();
errno_assert (rc == 0);
return false;
}
in_progress.reset_flags(0xff);
next_step (NULL, 0, &raw_encoder_t::raw_message_size_ready, true);
return true;
} }
...@@ -44,19 +44,13 @@ namespace zmq ...@@ -44,19 +44,13 @@ namespace zmq
{ {
public: public:
raw_encoder_t (size_t bufsize_, i_msg_source *msg_source_); raw_encoder_t (size_t bufsize_);
~raw_encoder_t (); ~raw_encoder_t ();
void set_msg_source (i_msg_source *msg_source_);
private: private:
bool raw_message_ready (); void raw_message_ready ();
bool raw_message_size_ready ();
i_msg_source *msg_source;
msg_t in_progress;
unsigned char tmpbuf [4];
raw_encoder_t (const raw_encoder_t&); raw_encoder_t (const raw_encoder_t&);
const raw_encoder_t &operator = (const raw_encoder_t&); const raw_encoder_t &operator = (const raw_encoder_t&);
}; };
......
...@@ -138,7 +138,7 @@ zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_, ...@@ -138,7 +138,7 @@ zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_, socket_base_t *socket_, const options_t &options_,
const address_t *addr_) : const address_t *addr_) :
dealer_session_t (io_thread_, connect_, socket_, options_, addr_), dealer_session_t (io_thread_, connect_, socket_, options_, addr_),
state (identity) state (bottom)
{ {
} }
...@@ -163,12 +163,6 @@ int zmq::req_session_t::push_msg (msg_t *msg_) ...@@ -163,12 +163,6 @@ int zmq::req_session_t::push_msg (msg_t *msg_)
return dealer_session_t::push_msg (msg_); return dealer_session_t::push_msg (msg_);
} }
break; break;
case identity:
if (msg_->flags () == 0) {
state = bottom;
return dealer_session_t::push_msg (msg_);
}
break;
} }
errno = EFAULT; errno = EFAULT;
return -1; return -1;
...@@ -177,5 +171,5 @@ int zmq::req_session_t::push_msg (msg_t *msg_) ...@@ -177,5 +171,5 @@ int zmq::req_session_t::push_msg (msg_t *msg_)
void zmq::req_session_t::reset () void zmq::req_session_t::reset ()
{ {
session_base_t::reset (); session_base_t::reset ();
state = identity; state = bottom;
} }
...@@ -74,7 +74,6 @@ namespace zmq ...@@ -74,7 +74,6 @@ namespace zmq
private: private:
enum { enum {
identity,
bottom, bottom,
body body
} state; } state;
......
...@@ -111,8 +111,6 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, ...@@ -111,8 +111,6 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
socket (socket_), socket (socket_),
io_thread (io_thread_), io_thread (io_thread_),
has_linger_timer (false), has_linger_timer (false),
identity_sent (false),
identity_received (false),
addr (addr_) addr (addr_)
{ {
} }
...@@ -146,17 +144,6 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_) ...@@ -146,17 +144,6 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
int zmq::session_base_t::pull_msg (msg_t *msg_) int zmq::session_base_t::pull_msg (msg_t *msg_)
{ {
// Unless the socket is in raw mode, the first
// message we send is its identity.
if (unlikely (!identity_sent && !options.raw_sock)) {
int rc = msg_->init_size (options.identity_size);
errno_assert (rc == 0);
memcpy (msg_->data (), options.identity, options.identity_size);
identity_sent = true;
incomplete_in = false;
return 0;
}
if (!pipe || !pipe->read (msg_)) { if (!pipe || !pipe->read (msg_)) {
errno = EAGAIN; errno = EAGAIN;
return -1; return -1;
...@@ -168,20 +155,6 @@ int zmq::session_base_t::pull_msg (msg_t *msg_) ...@@ -168,20 +155,6 @@ int zmq::session_base_t::pull_msg (msg_t *msg_)
int zmq::session_base_t::push_msg (msg_t *msg_) int zmq::session_base_t::push_msg (msg_t *msg_)
{ {
// Unless the socket is in raw mode, the first
// message we receive is its identity.
if (unlikely (!identity_received && !options.raw_sock)) {
msg_->set_flags (msg_t::identity);
identity_received = true;
if (!options.recv_identity) {
int rc = msg_->close ();
errno_assert (rc == 0);
rc = msg_->init ();
errno_assert (rc == 0);
return 0;
}
}
if (pipe && pipe->write (msg_)) { if (pipe && pipe->write (msg_)) {
int rc = msg_->init (); int rc = msg_->init ();
errno_assert (rc == 0); errno_assert (rc == 0);
...@@ -194,9 +167,6 @@ int zmq::session_base_t::push_msg (msg_t *msg_) ...@@ -194,9 +167,6 @@ int zmq::session_base_t::push_msg (msg_t *msg_)
void zmq::session_base_t::reset () void zmq::session_base_t::reset ()
{ {
// Restore identity flags.
identity_sent = false;
identity_received = false;
} }
void zmq::session_base_t::flush () void zmq::session_base_t::flush ()
......
...@@ -26,8 +26,6 @@ ...@@ -26,8 +26,6 @@
#include "own.hpp" #include "own.hpp"
#include "io_object.hpp" #include "io_object.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "i_msg_source.hpp"
#include "i_msg_sink.hpp"
#include "socket_base.hpp" #include "socket_base.hpp"
namespace zmq namespace zmq
...@@ -42,9 +40,7 @@ namespace zmq ...@@ -42,9 +40,7 @@ namespace zmq
class session_base_t : class session_base_t :
public own_t, public own_t,
public io_object_t, public io_object_t,
public i_pipe_events, public i_pipe_events
public i_msg_source,
public i_msg_sink
{ {
public: public:
...@@ -56,12 +52,6 @@ namespace zmq ...@@ -56,12 +52,6 @@ namespace zmq
// To be used once only, when creating the session. // To be used once only, when creating the session.
void attach_pipe (zmq::pipe_t *pipe_); void attach_pipe (zmq::pipe_t *pipe_);
// i_msg_source interface implementation.
virtual int pull_msg (msg_t *msg_);
// i_msg_sink interface implementation.
virtual int push_msg (msg_t *msg_);
// Following functions are the interface exposed towards the engine. // Following functions are the interface exposed towards the engine.
virtual void reset (); virtual void reset ();
void flush (); void flush ();
...@@ -73,6 +63,15 @@ namespace zmq ...@@ -73,6 +63,15 @@ namespace zmq
void hiccuped (zmq::pipe_t *pipe_); void hiccuped (zmq::pipe_t *pipe_);
void terminated (zmq::pipe_t *pipe_); void terminated (zmq::pipe_t *pipe_);
// Delivers a message. Returns 0 if successful; -1 otherwise.
// The function takes ownership of the message.
int push_msg (msg_t *msg_);
// Fetches a message. Returns 0 if successful; -1 otherwise.
// The caller is responsible for freeing the message when no
// longer used.
int pull_msg (msg_t *msg_);
socket_base_t *get_socket (); socket_base_t *get_socket ();
protected: protected:
...@@ -137,10 +136,6 @@ namespace zmq ...@@ -137,10 +136,6 @@ namespace zmq
// True is linger timer is running. // True is linger timer is running.
bool has_linger_timer; bool has_linger_timer;
// If true, identity has been sent/received from the network.
bool identity_sent;
bool identity_received;
// Protocol and address to use when connecting. // Protocol and address to use when connecting.
const address_t *addr; const address_t *addr;
......
...@@ -50,7 +50,6 @@ ...@@ -50,7 +50,6 @@
zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, const std::string &endpoint_) : zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, const std::string &endpoint_) :
s (fd_), s (fd_),
io_enabled (false),
inpos (NULL), inpos (NULL),
insize (0), insize (0),
decoder (NULL), decoder (NULL),
...@@ -64,13 +63,23 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons ...@@ -64,13 +63,23 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons
endpoint (endpoint_), endpoint (endpoint_),
plugged (false), plugged (false),
terminating (false), terminating (false),
io_error (false),
congested (false),
identity_received (false),
identity_sent (false),
rx_initialized (false),
tx_initialized (false),
subscription_required (false),
socket (NULL) socket (NULL)
{ {
int rc = tx_msg.init ();
errno_assert (rc == 0);
// Put the socket into non-blocking mode. // Put the socket into non-blocking mode.
unblock_socket (s); unblock_socket (s);
// Set the socket buffer limits for the underlying socket. // Set the socket buffer limits for the underlying socket.
if (options.sndbuf) { if (options.sndbuf) {
int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF, rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF,
(char*) &options.sndbuf, sizeof (int)); (char*) &options.sndbuf, sizeof (int));
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
...@@ -79,7 +88,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons ...@@ -79,7 +88,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons
#endif #endif
} }
if (options.rcvbuf) { if (options.rcvbuf) {
int rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF, rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF,
(char*) &options.rcvbuf, sizeof (int)); (char*) &options.rcvbuf, sizeof (int));
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
...@@ -112,6 +121,9 @@ zmq::stream_engine_t::~stream_engine_t () ...@@ -112,6 +121,9 @@ zmq::stream_engine_t::~stream_engine_t ()
s = retired_fd; s = retired_fd;
} }
int rc = tx_msg.close ();
errno_assert (rc == 0);
if (encoder != NULL) if (encoder != NULL)
delete encoder; delete encoder;
if (decoder != NULL) if (decoder != NULL)
...@@ -133,15 +145,14 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, ...@@ -133,15 +145,14 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
// Connect to I/O threads poller object. // Connect to I/O threads poller object.
io_object_t::plug (io_thread_); io_object_t::plug (io_thread_);
handle = add_fd (s); handle = add_fd (s);
io_enabled = true; io_error = false;
if (options.raw_sock) { if (options.raw_sock) {
// no handshaking for raw sock, instantiate raw encoder and decoders // no handshaking for raw sock, instantiate raw encoder and decoders
encoder = new (std::nothrow) raw_encoder_t (out_batch_size, session); encoder = new (std::nothrow) raw_encoder_t (out_batch_size);
alloc_assert (encoder); alloc_assert (encoder);
decoder = new (std::nothrow) decoder = new (std::nothrow) raw_decoder_t (in_batch_size);
raw_decoder_t (in_batch_size, options.maxmsgsize, session);
alloc_assert (decoder); alloc_assert (decoder);
// disable handshaking for raw socket // disable handshaking for raw socket
...@@ -169,19 +180,12 @@ void zmq::stream_engine_t::unplug () ...@@ -169,19 +180,12 @@ void zmq::stream_engine_t::unplug ()
plugged = false; plugged = false;
// Cancel all fd subscriptions. // Cancel all fd subscriptions.
if (io_enabled) { if (!io_error)
rm_fd (handle); rm_fd (handle);
io_enabled = false;
}
// Disconnect from I/O threads poller object. // Disconnect from I/O threads poller object.
io_object_t::unplug (); io_object_t::unplug ();
// Disconnect from session object.
if (encoder)
encoder->set_msg_source (NULL);
if (decoder)
decoder->set_msg_sink (NULL);
session = NULL; session = NULL;
} }
...@@ -198,14 +202,21 @@ void zmq::stream_engine_t::terminate () ...@@ -198,14 +202,21 @@ void zmq::stream_engine_t::terminate ()
void zmq::stream_engine_t::in_event () void zmq::stream_engine_t::in_event ()
{ {
assert (!io_error);
// If still handshaking, receive and process the greeting message. // If still handshaking, receive and process the greeting message.
if (unlikely (handshaking)) if (unlikely (handshaking))
if (!handshake ()) if (!handshake ())
return; return;
zmq_assert (decoder); zmq_assert (decoder);
bool disconnection = false;
size_t processed; // If there has been an I/O error, stop polling.
if (congested) {
rm_fd (handle);
io_error = true;
return;
}
// If there's no data to process in the buffer... // If there's no data to process in the buffer...
if (!insize) { if (!insize) {
...@@ -215,58 +226,51 @@ void zmq::stream_engine_t::in_event () ...@@ -215,58 +226,51 @@ void zmq::stream_engine_t::in_event ()
// the underlying TCP layer has fixed buffer size and thus the // the underlying TCP layer has fixed buffer size and thus the
// number of bytes read will be always limited. // number of bytes read will be always limited.
decoder->get_buffer (&inpos, &insize); decoder->get_buffer (&inpos, &insize);
insize = read (inpos, insize); const int bytes_read = read (inpos, insize);
// Check whether the peer has closed the connection. // Check whether the peer has closed the connection.
if (insize == (size_t) -1) { if (bytes_read == -1) {
insize = 0; error ();
disconnection = true; return;
}
} }
if (options.raw_sock) { // Adjust input size
if (insize == 0 || !decoder->message_ready_size (insize)) insize = static_cast <size_t> (bytes_read);
processed = 0;
else
processed = decoder->process_buffer (inpos, insize);
} }
else
// Push the data to the decoder.
processed = decoder->process_buffer (inpos, insize);
if (unlikely (processed == (size_t) -1))
disconnection = true;
else {
// Stop polling for input if we got stuck. int rc = 0;
if (processed < insize) size_t processed = 0;
reset_pollin (handle);
// Adjust the buffer. while (insize > 0) {
rc = decoder->decode (inpos, insize, processed);
zmq_assert (processed <= insize);
inpos += processed; inpos += processed;
insize -= processed; insize -= processed;
if (rc == 0 || rc == -1)
break;
rc = write_msg (decoder->msg ());
if (rc == -1)
break;
} }
// Flush all messages the decoder may have produced. // Tear down the connection if we have failed to decode input data
session->flush (); // or the session has rejected the message.
if (rc == -1) {
// Input error has occurred. If the last decoded if (errno != EAGAIN) {
// message has already been accepted, we terminate
// the engine immediately. Otherwise, we stop
// waiting for input events and postpone the termination
// until after the session has accepted the message.
if (disconnection) {
if (decoder->stalled ()) {
rm_fd (handle);
io_enabled = false;
}
else
error (); error ();
return;
}
congested = true;
reset_pollin (handle);
} }
session->flush ();
} }
void zmq::stream_engine_t::out_event () void zmq::stream_engine_t::out_event ()
{ {
zmq_assert (!io_error);
// If write buffer is empty, try to read new data from the encoder. // If write buffer is empty, try to read new data from the encoder.
if (!outsize) { if (!outsize) {
...@@ -279,7 +283,19 @@ void zmq::stream_engine_t::out_event () ...@@ -279,7 +283,19 @@ void zmq::stream_engine_t::out_event ()
} }
outpos = NULL; outpos = NULL;
encoder->get_data (&outpos, &outsize); outsize = encoder->encode (&outpos, 0);
while (outsize < out_batch_size) {
if (read_msg (&tx_msg) == -1)
break;
encoder->load_msg (&tx_msg);
unsigned char *bufptr = outpos + outsize;
size_t n = encoder->encode (&bufptr, out_batch_size - outsize);
zmq_assert (n > 0);
if (outpos == NULL)
outpos = bufptr;
outsize += n;
}
// If there is no data to send, stop polling for output. // If there is no data to send, stop polling for output.
if (outsize == 0) { if (outsize == 0) {
...@@ -321,6 +337,9 @@ void zmq::stream_engine_t::out_event () ...@@ -321,6 +337,9 @@ void zmq::stream_engine_t::out_event ()
void zmq::stream_engine_t::activate_out () void zmq::stream_engine_t::activate_out ()
{ {
if (unlikely (io_error))
return;
set_pollout (handle); set_pollout (handle);
// Speculative write: The assumption is that at the moment new message // Speculative write: The assumption is that at the moment new message
...@@ -332,22 +351,45 @@ void zmq::stream_engine_t::activate_out () ...@@ -332,22 +351,45 @@ void zmq::stream_engine_t::activate_out ()
void zmq::stream_engine_t::activate_in () void zmq::stream_engine_t::activate_in ()
{ {
if (unlikely (!io_enabled)) { zmq_assert (congested);
// There was an input error but the engine could not zmq_assert (session != NULL);
// be terminated (due to the stalled decoder). zmq_assert (decoder != NULL);
// Flush the pending message and terminate the engine now.
zmq_assert (decoder); int rc = write_msg (decoder->msg ());
decoder->process_buffer (inpos, 0); if (rc == -1) {
zmq_assert (!decoder->stalled ()); if (errno == EAGAIN)
session->flush (); session->flush ();
else
error (); error ();
return; return;
} }
while (insize > 0) {
size_t processed = 0;
rc = decoder->decode (inpos, insize, processed);
zmq_assert (processed <= insize);
inpos += processed;
insize -= processed;
if (rc == 0 || rc == -1)
break;
rc = write_msg (decoder->msg ());
if (rc == -1)
break;
}
if (rc == -1 && errno == EAGAIN)
session->flush ();
else
if (rc == -1 || io_error)
error ();
else {
congested = false;
set_pollin (handle); set_pollin (handle);
session->flush ();
// Speculative read. // Speculative read.
in_event (); in_event ();
}
} }
bool zmq::stream_engine_t::handshake () bool zmq::stream_engine_t::handshake ()
...@@ -402,11 +444,9 @@ bool zmq::stream_engine_t::handshake () ...@@ -402,11 +444,9 @@ bool zmq::stream_engine_t::handshake ()
if (greeting_recv [0] != 0xff || !(greeting_recv [9] & 0x01)) { if (greeting_recv [0] != 0xff || !(greeting_recv [9] & 0x01)) {
encoder = new (std::nothrow) v1_encoder_t (out_batch_size); encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
alloc_assert (encoder); alloc_assert (encoder);
encoder->set_msg_source (session);
decoder = new (std::nothrow) v1_decoder_t (in_batch_size, options.maxmsgsize); decoder = new (std::nothrow) v1_decoder_t (in_batch_size, options.maxmsgsize);
alloc_assert (decoder); alloc_assert (decoder);
decoder->set_msg_sink (session);
// We have already sent the message header. // We have already sent the message header.
// Since there is no way to tell the encoder to // Since there is no way to tell the encoder to
...@@ -414,8 +454,7 @@ bool zmq::stream_engine_t::handshake () ...@@ -414,8 +454,7 @@ bool zmq::stream_engine_t::handshake ()
// header data away. // header data away.
const size_t header_size = options.identity_size + 1 >= 255 ? 10 : 2; const size_t header_size = options.identity_size + 1 >= 255 ? 10 : 2;
unsigned char tmp [10], *bufferp = tmp; unsigned char tmp [10], *bufferp = tmp;
size_t buffer_size = header_size; size_t buffer_size = encoder->encode (&bufferp, header_size);
encoder->get_data (&bufferp, &buffer_size);
zmq_assert (buffer_size == header_size); zmq_assert (buffer_size == header_size);
// Make sure the decoder sees the data we have already received. // Make sure the decoder sees the data we have already received.
...@@ -424,33 +463,28 @@ bool zmq::stream_engine_t::handshake () ...@@ -424,33 +463,28 @@ bool zmq::stream_engine_t::handshake ()
// To allow for interoperability with peers that do not forward // To allow for interoperability with peers that do not forward
// their subscriptions, we inject a phony subscription // their subscriptions, we inject a phony subscription
// message into the incoming message stream. To put this // message into the incomming message stream.
// message right after the identity message, we temporarily
// divert the message stream from session to ourselves.
if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB)
decoder->set_msg_sink (this); subscription_required = true;
} }
else else
if (greeting_recv [revision_pos] == ZMTP_1_0) { if (greeting_recv [revision_pos] == ZMTP_1_0) {
encoder = new (std::nothrow) v1_encoder_t ( encoder = new (std::nothrow) v1_encoder_t (
out_batch_size); out_batch_size);
alloc_assert (encoder); alloc_assert (encoder);
encoder->set_msg_source (session);
decoder = new (std::nothrow) v1_decoder_t ( decoder = new (std::nothrow) v1_decoder_t (
in_batch_size, options.maxmsgsize); in_batch_size, options.maxmsgsize);
alloc_assert (decoder); alloc_assert (decoder);
decoder->set_msg_sink (session);
} }
else else
if (greeting_recv [revision_pos] == ZMTP_2_0 if (greeting_recv [revision_pos] == ZMTP_2_0
|| greeting_recv [revision_pos] == ZMTP_2_1) { || greeting_recv [revision_pos] == ZMTP_2_1) {
encoder = new (std::nothrow) v2_encoder_t ( encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
out_batch_size, session);
alloc_assert (encoder); alloc_assert (encoder);
decoder = new (std::nothrow) v2_decoder_t ( decoder = new (std::nothrow) v2_decoder_t (
in_batch_size, options.maxmsgsize, session); in_batch_size, options.maxmsgsize);
alloc_assert (decoder); alloc_assert (decoder);
} }
...@@ -465,35 +499,67 @@ bool zmq::stream_engine_t::handshake () ...@@ -465,35 +499,67 @@ bool zmq::stream_engine_t::handshake ()
return true; return true;
} }
int zmq::stream_engine_t::push_msg (msg_t *msg_) int zmq::stream_engine_t::read_msg (msg_t *msg_)
{
if (likely (tx_initialized || options.raw_sock))
return session->pull_msg (msg_);
if (!identity_sent) {
int rc = msg_->init_size (options.identity_size);
errno_assert (rc == 0);
memcpy (msg_->data (), options.identity, options.identity_size);
identity_sent = true;
tx_initialized = true;
return 0;
}
tx_initialized = true;
return 0;
}
int zmq::stream_engine_t::write_msg (msg_t *msg_)
{ {
zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB); if (likely (rx_initialized || options.raw_sock))
return session->push_msg (msg_);
// The first message is identity. if (!identity_received) {
// Let the session process it. if (options.recv_identity) {
msg_->set_flags (msg_t::identity);
int rc = session->push_msg (msg_); int rc = session->push_msg (msg_);
if (rc == -1)
return -1;
}
else {
int rc = msg_->close ();
errno_assert (rc == 0); errno_assert (rc == 0);
rc = msg_->init ();
errno_assert (rc == 0);
}
// Inject the subscription message so that the ZMQ 2.x peer identity_received = true;
// receives our messages. }
rc = msg_->init_size (1);
// Inject the subscription message, so that also
// ZMQ 2.x peers receive published messages.
if (subscription_required) {
int rc = msg_->init_size (1);
errno_assert (rc == 0); errno_assert (rc == 0);
*(unsigned char*) msg_->data () = 1; *(unsigned char*) msg_->data () = 1;
rc = session->push_msg (msg_); rc = session->push_msg (msg_);
session->flush (); if (rc == -1)
return -1;
// Once we have injected the subscription message, we can subscription_required = false;
// Divert the message flow back to the session. }
zmq_assert (decoder);
decoder->set_msg_sink (session);
return rc; rx_initialized = true;
return 0;
} }
void zmq::stream_engine_t::error () void zmq::stream_engine_t::error ()
{ {
zmq_assert (session); zmq_assert (session);
socket->event_disconnected (endpoint, s); socket->event_disconnected (endpoint, s);
session->flush ();
session->detach (); session->detach ();
unplug (); unplug ();
delete this; delete this;
......
...@@ -24,7 +24,6 @@ ...@@ -24,7 +24,6 @@
#include "fd.hpp" #include "fd.hpp"
#include "i_engine.hpp" #include "i_engine.hpp"
#include "i_msg_sink.hpp"
#include "io_object.hpp" #include "io_object.hpp"
#include "i_encoder.hpp" #include "i_encoder.hpp"
#include "i_decoder.hpp" #include "i_decoder.hpp"
...@@ -43,12 +42,13 @@ namespace zmq ...@@ -43,12 +42,13 @@ namespace zmq
}; };
class io_thread_t; class io_thread_t;
class msg_t;
class session_base_t; class session_base_t;
// This engine handles any socket with SOCK_STREAM semantics, // This engine handles any socket with SOCK_STREAM semantics,
// e.g. TCP socket or an UNIX domain socket. // e.g. TCP socket or an UNIX domain socket.
class stream_engine_t : public io_object_t, public i_engine, public i_msg_sink class stream_engine_t : public io_object_t, public i_engine
{ {
public: public:
...@@ -62,9 +62,6 @@ namespace zmq ...@@ -62,9 +62,6 @@ namespace zmq
void activate_in (); void activate_in ();
void activate_out (); void activate_out ();
// i_msg_sink interface implementation.
virtual int push_msg (msg_t *msg_);
// i_poll_events interface implementation. // i_poll_events interface implementation.
void in_event (); void in_event ();
void out_event (); void out_event ();
...@@ -94,11 +91,14 @@ namespace zmq ...@@ -94,11 +91,14 @@ namespace zmq
// peer -1 is returned. // peer -1 is returned.
int read (void *data_, size_t size_); int read (void *data_, size_t size_);
int read_msg (msg_t *msg_);
int write_msg (msg_t *msg_);
// Underlying socket. // Underlying socket.
fd_t s; fd_t s;
// True iff we are registered with an I/O poller. msg_t tx_msg;
bool io_enabled;
handle_t handle; handle_t handle;
...@@ -137,6 +137,29 @@ namespace zmq ...@@ -137,6 +137,29 @@ namespace zmq
bool plugged; bool plugged;
bool terminating; bool terminating;
bool io_error;
// True iff the session could not accept more
// messages due to flow control.
bool congested;
// True iff the engine has received identity message.
bool identity_received;
// True iff the engine has sent identity message.
bool identity_sent;
// True iff the engine has received all ZMTP control messages.
bool rx_initialized;
// True iff the engine has sent all ZMTP control messages.
bool tx_initialized;
// Indicates whether the engine is to inject a phony
// subscription message into the incomming stream.
// Needed to support old peers.
bool subscription_required;
// Socket // Socket
zmq::socket_base_t *socket; zmq::socket_base_t *socket;
......
...@@ -28,14 +28,12 @@ ...@@ -28,14 +28,12 @@
#include "decoder.hpp" #include "decoder.hpp"
#include "v1_decoder.hpp" #include "v1_decoder.hpp"
#include "i_msg_sink.hpp"
#include "likely.hpp" #include "likely.hpp"
#include "wire.hpp" #include "wire.hpp"
#include "err.hpp" #include "err.hpp"
zmq::v1_decoder_t::v1_decoder_t (size_t bufsize_, int64_t maxmsgsize_) : zmq::v1_decoder_t::v1_decoder_t (size_t bufsize_, int64_t maxmsgsize_) :
decoder_base_t <v1_decoder_t> (bufsize_), decoder_base_t <v1_decoder_t> (bufsize_),
msg_sink (NULL),
maxmsgsize (maxmsgsize_) maxmsgsize (maxmsgsize_)
{ {
int rc = in_progress.init (); int rc = in_progress.init ();
...@@ -51,12 +49,7 @@ zmq::v1_decoder_t::~v1_decoder_t () ...@@ -51,12 +49,7 @@ zmq::v1_decoder_t::~v1_decoder_t ()
errno_assert (rc == 0); errno_assert (rc == 0);
} }
void zmq::v1_decoder_t::set_msg_sink (i_msg_sink *msg_sink_) int zmq::v1_decoder_t::one_byte_size_ready ()
{
msg_sink = msg_sink_;
}
bool zmq::v1_decoder_t::one_byte_size_ready ()
{ {
// First byte of size is read. If it is 0xff read 8-byte size. // First byte of size is read. If it is 0xff read 8-byte size.
// Otherwise allocate the buffer for message data and read the // Otherwise allocate the buffer for message data and read the
...@@ -67,34 +60,33 @@ bool zmq::v1_decoder_t::one_byte_size_ready () ...@@ -67,34 +60,33 @@ bool zmq::v1_decoder_t::one_byte_size_ready ()
// There has to be at least one byte (the flags) in the message). // There has to be at least one byte (the flags) in the message).
if (!*tmpbuf) { if (!*tmpbuf) {
decoding_error (); errno = EPROTO;
return false; return -1;
}
if (maxmsgsize >= 0 && (int64_t) (*tmpbuf - 1) > maxmsgsize) {
errno = EMSGSIZE;
return -1;
} }
// in_progress is initialised at this point so in theory we should // in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte // close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised... // message and thus we can treat it as uninitialised...
int rc; int rc = in_progress.init_size (*tmpbuf - 1);
if (maxmsgsize >= 0 && (int64_t) (*tmpbuf - 1) > maxmsgsize) { if (rc != 0) {
rc = -1; errno_assert (errno == ENOMEM);
errno = ENOMEM;
}
else
rc = in_progress.init_size (*tmpbuf - 1);
if (rc != 0 && errno == ENOMEM) {
rc = in_progress.init (); rc = in_progress.init ();
errno_assert (rc == 0); errno_assert (rc == 0);
decoding_error (); errno = ENOMEM;
return false; return -1;
} }
errno_assert (rc == 0);
next_step (tmpbuf, 1, &v1_decoder_t::flags_ready); next_step (tmpbuf, 1, &v1_decoder_t::flags_ready);
} }
return true; return 0;
} }
bool zmq::v1_decoder_t::eight_byte_size_ready () int zmq::v1_decoder_t::eight_byte_size_ready ()
{ {
// 8-byte payload length is read. Allocate the buffer // 8-byte payload length is read. Allocate the buffer
// for message body and read the message data into it. // for message body and read the message data into it.
...@@ -102,20 +94,20 @@ bool zmq::v1_decoder_t::eight_byte_size_ready () ...@@ -102,20 +94,20 @@ bool zmq::v1_decoder_t::eight_byte_size_ready ()
// There has to be at least one byte (the flags) in the message). // There has to be at least one byte (the flags) in the message).
if (payload_length == 0) { if (payload_length == 0) {
decoding_error (); errno = EPROTO;
return false; return -1;
} }
// Message size must not exceed the maximum allowed size. // Message size must not exceed the maximum allowed size.
if (maxmsgsize >= 0 && payload_length - 1 > (uint64_t) maxmsgsize) { if (maxmsgsize >= 0 && payload_length - 1 > (uint64_t) maxmsgsize) {
decoding_error (); errno = EMSGSIZE;
return false; return -1;
} }
// Message size must fit within range of size_t data type. // Message size must fit within range of size_t data type.
if (payload_length - 1 > std::numeric_limits <size_t>::max ()) { if (payload_length - 1 > std::numeric_limits <size_t>::max ()) {
decoding_error (); errno = EMSGSIZE;
return false; return -1;
} }
const size_t msg_size = static_cast <size_t> (payload_length - 1); const size_t msg_size = static_cast <size_t> (payload_length - 1);
...@@ -128,15 +120,15 @@ bool zmq::v1_decoder_t::eight_byte_size_ready () ...@@ -128,15 +120,15 @@ bool zmq::v1_decoder_t::eight_byte_size_ready ()
errno_assert (errno == ENOMEM); errno_assert (errno == ENOMEM);
rc = in_progress.init (); rc = in_progress.init ();
errno_assert (rc == 0); errno_assert (rc == 0);
decoding_error (); errno = ENOMEM;
return false; return -1;
} }
next_step (tmpbuf, 1, &v1_decoder_t::flags_ready); next_step (tmpbuf, 1, &v1_decoder_t::flags_ready);
return true; return 0;
} }
bool zmq::v1_decoder_t::flags_ready () int zmq::v1_decoder_t::flags_ready ()
{ {
// Store the flags from the wire into the message structure. // Store the flags from the wire into the message structure.
in_progress.set_flags (tmpbuf [0] & msg_t::more); in_progress.set_flags (tmpbuf [0] & msg_t::more);
...@@ -144,22 +136,13 @@ bool zmq::v1_decoder_t::flags_ready () ...@@ -144,22 +136,13 @@ bool zmq::v1_decoder_t::flags_ready ()
next_step (in_progress.data (), in_progress.size (), next_step (in_progress.data (), in_progress.size (),
&v1_decoder_t::message_ready); &v1_decoder_t::message_ready);
return true; return 0;
} }
bool zmq::v1_decoder_t::message_ready () int zmq::v1_decoder_t::message_ready ()
{ {
// Message is completely read. Push it further and start reading // Message is completely read. Push it further and start reading
// new message. (in_progress is a 0-byte message after this point.) // new message. (in_progress is a 0-byte message after this point.)
if (unlikely (!msg_sink))
return false;
int rc = msg_sink->push_msg (&in_progress);
if (unlikely (rc != 0)) {
if (errno != EAGAIN)
decoding_error ();
return false;
}
next_step (tmpbuf, 1, &v1_decoder_t::one_byte_size_ready); next_step (tmpbuf, 1, &v1_decoder_t::one_byte_size_ready);
return true; return 1;
} }
...@@ -33,17 +33,15 @@ namespace zmq ...@@ -33,17 +33,15 @@ namespace zmq
v1_decoder_t (size_t bufsize_, int64_t maxmsgsize_); v1_decoder_t (size_t bufsize_, int64_t maxmsgsize_);
~v1_decoder_t (); ~v1_decoder_t ();
// Set the receiver of decoded messages. virtual msg_t *msg () { return &in_progress; }
void set_msg_sink (i_msg_sink *msg_sink_);
private: private:
bool one_byte_size_ready (); int one_byte_size_ready ();
bool eight_byte_size_ready (); int eight_byte_size_ready ();
bool flags_ready (); int flags_ready ();
bool message_ready (); int message_ready ();
i_msg_sink *msg_sink;
unsigned char tmpbuf [8]; unsigned char tmpbuf [8];
msg_t in_progress; msg_t in_progress;
......
...@@ -19,65 +19,31 @@ ...@@ -19,65 +19,31 @@
#include "encoder.hpp" #include "encoder.hpp"
#include "v1_encoder.hpp" #include "v1_encoder.hpp"
#include "i_msg_source.hpp"
#include "likely.hpp" #include "likely.hpp"
#include "wire.hpp" #include "wire.hpp"
zmq::v1_encoder_t::v1_encoder_t (size_t bufsize_) : zmq::v1_encoder_t::v1_encoder_t (size_t bufsize_) :
encoder_base_t <v1_encoder_t> (bufsize_), encoder_base_t <v1_encoder_t> (bufsize_)
msg_source (NULL)
{ {
int rc = in_progress.init ();
errno_assert (rc == 0);
// Write 0 bytes to the batch and go to message_ready state. // Write 0 bytes to the batch and go to message_ready state.
next_step (NULL, 0, &v1_encoder_t::message_ready, true); next_step (NULL, 0, &v1_encoder_t::message_ready, true);
} }
zmq::v1_encoder_t::~v1_encoder_t () zmq::v1_encoder_t::~v1_encoder_t ()
{ {
int rc = in_progress.close ();
errno_assert (rc == 0);
}
void zmq::v1_encoder_t::set_msg_source (i_msg_source *msg_source_)
{
msg_source = msg_source_;
} }
bool zmq::v1_encoder_t::size_ready () void zmq::v1_encoder_t::size_ready ()
{ {
// Write message body into the buffer. // Write message body into the buffer.
next_step (in_progress.data (), in_progress.size (), next_step (in_progress->data (), in_progress->size (),
&v1_encoder_t::message_ready, !(in_progress.flags () & msg_t::more)); &v1_encoder_t::message_ready, true);
return true;
} }
bool zmq::v1_encoder_t::message_ready () void zmq::v1_encoder_t::message_ready ()
{ {
// Destroy content of the old message.
int rc = in_progress.close ();
errno_assert (rc == 0);
// Read new message. If there is none, return false.
// Note that new state is set only if write is successful. That way
// unsuccessful write will cause retry on the next state machine
// invocation.
if (unlikely (!msg_source)) {
rc = in_progress.init ();
errno_assert (rc == 0);
return false;
}
rc = msg_source->pull_msg (&in_progress);
if (unlikely (rc != 0)) {
errno_assert (errno == EAGAIN);
rc = in_progress.init ();
errno_assert (rc == 0);
return false;
}
// Get the message size. // Get the message size.
size_t size = in_progress.size (); size_t size = in_progress->size ();
// Account for the 'flags' byte. // Account for the 'flags' byte.
size++; size++;
...@@ -87,14 +53,13 @@ bool zmq::v1_encoder_t::message_ready () ...@@ -87,14 +53,13 @@ bool zmq::v1_encoder_t::message_ready ()
// message size. In both cases 'flags' field follows. // message size. In both cases 'flags' field follows.
if (size < 255) { if (size < 255) {
tmpbuf [0] = (unsigned char) size; tmpbuf [0] = (unsigned char) size;
tmpbuf [1] = (in_progress.flags () & msg_t::more); tmpbuf [1] = (in_progress->flags () & msg_t::more);
next_step (tmpbuf, 2, &v1_encoder_t::size_ready, false); next_step (tmpbuf, 2, &v1_encoder_t::size_ready, false);
} }
else { else {
tmpbuf [0] = 0xff; tmpbuf [0] = 0xff;
put_uint64 (tmpbuf + 1, size); put_uint64 (tmpbuf + 1, size);
tmpbuf [9] = (in_progress.flags () & msg_t::more); tmpbuf [9] = (in_progress->flags () & msg_t::more);
next_step (tmpbuf, 10, &v1_encoder_t::size_ready, false); next_step (tmpbuf, 10, &v1_encoder_t::size_ready, false);
} }
return true;
} }
...@@ -24,8 +24,6 @@ ...@@ -24,8 +24,6 @@
namespace zmq namespace zmq
{ {
class i_msg_source;
// Encoder for ZMTP/1.0 protocol. Converts messages into data batches. // Encoder for ZMTP/1.0 protocol. Converts messages into data batches.
class v1_encoder_t : public encoder_base_t <v1_encoder_t> class v1_encoder_t : public encoder_base_t <v1_encoder_t>
...@@ -35,15 +33,11 @@ namespace zmq ...@@ -35,15 +33,11 @@ namespace zmq
v1_encoder_t (size_t bufsize_); v1_encoder_t (size_t bufsize_);
~v1_encoder_t (); ~v1_encoder_t ();
void set_msg_source (i_msg_source *msg_source_);
private: private:
bool size_ready (); void size_ready ();
bool message_ready (); void message_ready ();
i_msg_source *msg_source;
msg_t in_progress;
unsigned char tmpbuf [10]; unsigned char tmpbuf [10];
v1_encoder_t (const v1_encoder_t&); v1_encoder_t (const v1_encoder_t&);
......
...@@ -31,10 +31,8 @@ ...@@ -31,10 +31,8 @@
#include "wire.hpp" #include "wire.hpp"
#include "err.hpp" #include "err.hpp"
zmq::v2_decoder_t::v2_decoder_t (size_t bufsize_, zmq::v2_decoder_t::v2_decoder_t (size_t bufsize_, int64_t maxmsgsize_) :
int64_t maxmsgsize_, i_msg_sink *msg_sink_) :
decoder_base_t <v2_decoder_t> (bufsize_), decoder_base_t <v2_decoder_t> (bufsize_),
msg_sink (msg_sink_),
msg_flags (0), msg_flags (0),
maxmsgsize (maxmsgsize_) maxmsgsize (maxmsgsize_)
{ {
...@@ -51,12 +49,7 @@ zmq::v2_decoder_t::~v2_decoder_t () ...@@ -51,12 +49,7 @@ zmq::v2_decoder_t::~v2_decoder_t ()
errno_assert (rc == 0); errno_assert (rc == 0);
} }
void zmq::v2_decoder_t::set_msg_sink (i_msg_sink *msg_sink_) int zmq::v2_decoder_t::flags_ready ()
{
msg_sink = msg_sink_;
}
bool zmq::v2_decoder_t::flags_ready ()
{ {
msg_flags = 0; msg_flags = 0;
if (tmpbuf [0] & v2_protocol_t::more_flag) if (tmpbuf [0] & v2_protocol_t::more_flag)
...@@ -69,92 +62,79 @@ bool zmq::v2_decoder_t::flags_ready () ...@@ -69,92 +62,79 @@ bool zmq::v2_decoder_t::flags_ready ()
else else
next_step (tmpbuf, 1, &v2_decoder_t::one_byte_size_ready); next_step (tmpbuf, 1, &v2_decoder_t::one_byte_size_ready);
return true; return 0;
} }
bool zmq::v2_decoder_t::one_byte_size_ready () int zmq::v2_decoder_t::one_byte_size_ready ()
{ {
int rc = 0;
// Message size must not exceed the maximum allowed size. // Message size must not exceed the maximum allowed size.
if (maxmsgsize >= 0) if (maxmsgsize >= 0)
if (unlikely (tmpbuf [0] > static_cast <uint64_t> (maxmsgsize))) if (unlikely (tmpbuf [0] > static_cast <uint64_t> (maxmsgsize))) {
goto error; errno = EMSGSIZE;
return -1;
}
// in_progress is initialised at this point so in theory we should // in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte // close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised... // message and thus we can treat it as uninitialised...
rc = in_progress.init_size (tmpbuf [0]); int rc = in_progress.init_size (tmpbuf [0]);
if (unlikely (rc)) { if (unlikely (rc)) {
errno_assert (errno == ENOMEM); errno_assert (errno == ENOMEM);
int rc = in_progress.init (); rc = in_progress.init ();
errno_assert (rc == 0); errno_assert (rc == 0);
goto error; errno = ENOMEM;
return -1;
} }
in_progress.set_flags (msg_flags); in_progress.set_flags (msg_flags);
next_step (in_progress.data (), in_progress.size (), next_step (in_progress.data (), in_progress.size (),
&v2_decoder_t::message_ready); &v2_decoder_t::message_ready);
return true; return 0;
error:
decoding_error ();
return false;
} }
bool zmq::v2_decoder_t::eight_byte_size_ready () int zmq::v2_decoder_t::eight_byte_size_ready ()
{ {
int rc = 0;
// The payload size is encoded as 64-bit unsigned integer. // The payload size is encoded as 64-bit unsigned integer.
// The most significant byte comes first. // The most significant byte comes first.
const uint64_t msg_size = get_uint64 (tmpbuf); const uint64_t msg_size = get_uint64 (tmpbuf);
// Message size must not exceed the maximum allowed size. // Message size must not exceed the maximum allowed size.
if (maxmsgsize >= 0) if (maxmsgsize >= 0)
if (unlikely (msg_size > static_cast <uint64_t> (maxmsgsize))) if (unlikely (msg_size > static_cast <uint64_t> (maxmsgsize))) {
goto error; errno = EMSGSIZE;
return -1;
}
// Message size must fit into size_t data type. // Message size must fit into size_t data type.
if (unlikely (msg_size != static_cast <size_t> (msg_size))) if (unlikely (msg_size != static_cast <size_t> (msg_size))) {
goto error; errno = EMSGSIZE;
return -1;
}
// in_progress is initialised at this point so in theory we should // in_progress is initialised at this point so in theory we should
// close it before calling init_size, however, it's a 0-byte // close it before calling init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised. // message and thus we can treat it as uninitialised.
rc = in_progress.init_size (static_cast <size_t> (msg_size)); int rc = in_progress.init_size (static_cast <size_t> (msg_size));
if (unlikely (rc)) { if (unlikely (rc)) {
errno_assert (errno == ENOMEM); errno_assert (errno == ENOMEM);
int rc = in_progress.init (); rc = in_progress.init ();
errno_assert (rc == 0); errno_assert (rc == 0);
goto error; errno = ENOMEM;
return -1;
} }
in_progress.set_flags (msg_flags); in_progress.set_flags (msg_flags);
next_step (in_progress.data (), in_progress.size (), next_step (in_progress.data (), in_progress.size (),
&v2_decoder_t::message_ready); &v2_decoder_t::message_ready);
return true; return 0;
error:
decoding_error ();
return false;
} }
bool zmq::v2_decoder_t::message_ready () int zmq::v2_decoder_t::message_ready ()
{ {
// Message is completely read. Push it further and start reading // Message is completely read. Signal this to the caller
// new message. (in_progress is a 0-byte message after this point.) // and prepare to decode next message.
if (unlikely (!msg_sink))
return false;
int rc = msg_sink->push_msg (&in_progress);
if (unlikely (rc != 0)) {
if (errno != EAGAIN)
decoding_error ();
return false;
}
next_step (tmpbuf, 1, &v2_decoder_t::flags_ready); next_step (tmpbuf, 1, &v2_decoder_t::flags_ready);
return true; return 1;
} }
...@@ -21,7 +21,6 @@ ...@@ -21,7 +21,6 @@
#define __ZMQ_V2_DECODER_HPP_INCLUDED__ #define __ZMQ_V2_DECODER_HPP_INCLUDED__
#include "decoder.hpp" #include "decoder.hpp"
#include "i_msg_sink.hpp"
namespace zmq namespace zmq
{ {
...@@ -30,21 +29,19 @@ namespace zmq ...@@ -30,21 +29,19 @@ namespace zmq
{ {
public: public:
v2_decoder_t (size_t bufsize_, v2_decoder_t (size_t bufsize_, int64_t maxmsgsize_);
int64_t maxmsgsize_, i_msg_sink *msg_sink_);
virtual ~v2_decoder_t (); virtual ~v2_decoder_t ();
// i_decoder interface. // i_decoder interface.
virtual void set_msg_sink (i_msg_sink *msg_sink_); virtual msg_t *msg () { return &in_progress; }
private: private:
bool flags_ready (); int flags_ready ();
bool one_byte_size_ready (); int one_byte_size_ready ();
bool eight_byte_size_ready (); int eight_byte_size_ready ();
bool message_ready (); int message_ready ();
i_msg_sink *msg_sink;
unsigned char tmpbuf [8]; unsigned char tmpbuf [8];
unsigned char msg_flags; unsigned char msg_flags;
msg_t in_progress; msg_t in_progress;
......
...@@ -22,64 +22,31 @@ ...@@ -22,64 +22,31 @@
#include "likely.hpp" #include "likely.hpp"
#include "wire.hpp" #include "wire.hpp"
zmq::v2_encoder_t::v2_encoder_t (size_t bufsize_, i_msg_source *msg_source_) : zmq::v2_encoder_t::v2_encoder_t (size_t bufsize_) :
encoder_base_t <v2_encoder_t> (bufsize_), encoder_base_t <v2_encoder_t> (bufsize_)
msg_source (msg_source_)
{ {
int rc = in_progress.init ();
errno_assert (rc == 0);
// Write 0 bytes to the batch and go to message_ready state. // Write 0 bytes to the batch and go to message_ready state.
next_step (NULL, 0, &v2_encoder_t::message_ready, true); next_step (NULL, 0, &v2_encoder_t::message_ready, true);
} }
zmq::v2_encoder_t::~v2_encoder_t () zmq::v2_encoder_t::~v2_encoder_t ()
{ {
int rc = in_progress.close ();
errno_assert (rc == 0);
} }
void zmq::v2_encoder_t::set_msg_source (i_msg_source *msg_source_) void zmq::v2_encoder_t::message_ready ()
{ {
msg_source = msg_source_;
}
bool zmq::v2_encoder_t::message_ready ()
{
// Release the content of the old message.
int rc = in_progress.close ();
errno_assert (rc == 0);
// Read new message. If there is none, return false.
// Note that new state is set only if write is successful. That way
// unsuccessful write will cause retry on the next state machine
// invocation.
if (unlikely (!msg_source)) {
rc = in_progress.init ();
errno_assert (rc == 0);
return false;
}
rc = msg_source->pull_msg (&in_progress);
if (unlikely (rc)) {
errno_assert (errno == EAGAIN);
rc = in_progress.init ();
errno_assert (rc == 0);
return false;
}
// Encode flags. // Encode flags.
unsigned char &protocol_flags = tmpbuf [0]; unsigned char &protocol_flags = tmpbuf [0];
protocol_flags = 0; protocol_flags = 0;
if (in_progress.flags () & msg_t::more) if (in_progress->flags () & msg_t::more)
protocol_flags |= v2_protocol_t::more_flag; protocol_flags |= v2_protocol_t::more_flag;
if (in_progress.size () > 255) if (in_progress->size () > 255)
protocol_flags |= v2_protocol_t::large_flag; protocol_flags |= v2_protocol_t::large_flag;
// Encode the message length. For messages less then 256 bytes, // Encode the message length. For messages less then 256 bytes,
// the length is encoded as 8-bit unsigned integer. For larger // the length is encoded as 8-bit unsigned integer. For larger
// messages, 64-bit unsigned integer in network byte order is used. // messages, 64-bit unsigned integer in network byte order is used.
const size_t size = in_progress.size (); const size_t size = in_progress->size ();
if (unlikely (size > 255)) { if (unlikely (size > 255)) {
put_uint64 (tmpbuf + 1, size); put_uint64 (tmpbuf + 1, size);
next_step (tmpbuf, 9, &v2_encoder_t::size_ready, false); next_step (tmpbuf, 9, &v2_encoder_t::size_ready, false);
...@@ -88,13 +55,11 @@ bool zmq::v2_encoder_t::message_ready () ...@@ -88,13 +55,11 @@ bool zmq::v2_encoder_t::message_ready ()
tmpbuf [1] = static_cast <uint8_t> (size); tmpbuf [1] = static_cast <uint8_t> (size);
next_step (tmpbuf, 2, &v2_encoder_t::size_ready, false); next_step (tmpbuf, 2, &v2_encoder_t::size_ready, false);
} }
return true;
} }
bool zmq::v2_encoder_t::size_ready () void zmq::v2_encoder_t::size_ready ()
{ {
// Write message body into the buffer. // Write message body into the buffer.
next_step (in_progress.data (), in_progress.size (), next_step (in_progress->data (), in_progress->size (),
&v2_encoder_t::message_ready, !(in_progress.flags () & msg_t::more)); &v2_encoder_t::message_ready, true);
return true;
} }
...@@ -21,30 +21,23 @@ ...@@ -21,30 +21,23 @@
#define __ZMQ_V2_ENCODER_HPP_INCLUDED__ #define __ZMQ_V2_ENCODER_HPP_INCLUDED__
#include "encoder.hpp" #include "encoder.hpp"
#include "i_msg_source.hpp"
namespace zmq namespace zmq
{ {
class i_msg_source;
// Encoder for 0MQ framing protocol. Converts messages into data stream. // Encoder for 0MQ framing protocol. Converts messages into data stream.
class v2_encoder_t : public encoder_base_t <v2_encoder_t> class v2_encoder_t : public encoder_base_t <v2_encoder_t>
{ {
public: public:
v2_encoder_t (size_t bufsize_, i_msg_source *msg_source_); v2_encoder_t (size_t bufsize_);
virtual ~v2_encoder_t (); virtual ~v2_encoder_t ();
virtual void set_msg_source (i_msg_source *msg_source_);
private: private:
bool size_ready (); void size_ready ();
bool message_ready (); void message_ready ();
i_msg_source *msg_source;
msg_t in_progress;
unsigned char tmpbuf [9]; unsigned char tmpbuf [9];
v2_encoder_t (const v2_encoder_t&); v2_encoder_t (const v2_encoder_t&);
......
...@@ -33,7 +33,7 @@ namespace zmq ...@@ -33,7 +33,7 @@ namespace zmq
*buffer_ = value; *buffer_ = value;
} }
inline uint8_t get_uint8 (unsigned char *buffer_) inline uint8_t get_uint8 (const unsigned char *buffer_)
{ {
return *buffer_; return *buffer_;
} }
...@@ -44,7 +44,7 @@ namespace zmq ...@@ -44,7 +44,7 @@ namespace zmq
buffer_ [1] = (unsigned char) (value & 0xff); buffer_ [1] = (unsigned char) (value & 0xff);
} }
inline uint16_t get_uint16 (unsigned char *buffer_) inline uint16_t get_uint16 (const unsigned char *buffer_)
{ {
return return
(((uint16_t) buffer_ [0]) << 8) | (((uint16_t) buffer_ [0]) << 8) |
...@@ -59,7 +59,7 @@ namespace zmq ...@@ -59,7 +59,7 @@ namespace zmq
buffer_ [3] = (unsigned char) (value & 0xff); buffer_ [3] = (unsigned char) (value & 0xff);
} }
inline uint32_t get_uint32 (unsigned char *buffer_) inline uint32_t get_uint32 (const unsigned char *buffer_)
{ {
return return
(((uint32_t) buffer_ [0]) << 24) | (((uint32_t) buffer_ [0]) << 24) |
...@@ -80,7 +80,7 @@ namespace zmq ...@@ -80,7 +80,7 @@ namespace zmq
buffer_ [7] = (unsigned char) (value & 0xff); buffer_ [7] = (unsigned char) (value & 0xff);
} }
inline uint64_t get_uint64 (unsigned char *buffer_) inline uint64_t get_uint64 (const unsigned char *buffer_)
{ {
return return
(((uint64_t) buffer_ [0]) << 56) | (((uint64_t) buffer_ [0]) << 56) |
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment