Commit fd42be9d authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #541 from hurtonm/rework_message_flow

Refactor code so that messages go through engines
parents 91f1e131 7942db76
......@@ -26,8 +26,6 @@ libzmq_la_SOURCES = \
i_encoder.hpp \
i_decoder.hpp \
i_engine.hpp \
i_msg_sink.hpp \
i_msg_source.hpp \
i_poll_events.hpp \
io_object.hpp \
io_thread.hpp \
......
......@@ -32,8 +32,6 @@
namespace zmq
{
class i_msg_sink;
// Helper base class for decoders that know the amount of data to read
// in advance at any moment. Knowing the amount in advance is a property
// of the protocol used. 0MQ framing protocol is based size-prefixed
......@@ -89,106 +87,66 @@ namespace zmq
// Processes the data in the buffer previously allocated using
// get_buffer function. size_ argument specifies nemuber of bytes
// actually filled into the buffer. Function returns number of
// bytes actually processed.
inline size_t process_buffer (unsigned char *data_, size_t size_)
// actually filled into the buffer. Function returns 1 when the
// whole message was decoded or 0 when more data is required.
// On error, -1 is returned and errno set accordingly.
// Number of bytes processed is returned in byts_used_.
inline int decode (const unsigned char *data_, size_t size_,
size_t &bytes_used_)
{
// Check if we had an error in previous attempt.
if (unlikely (!(static_cast <T*> (this)->next)))
return (size_t) -1;
bytes_used_ = 0;
// In case of zero-copy simply adjust the pointers, no copying
// is required. Also, run the state machine in case all the data
// were processed.
if (data_ == read_pos) {
zmq_assert (size_ <= to_read);
read_pos += size_;
to_read -= size_;
bytes_used_ = size_;
while (!to_read) {
if (!(static_cast <T*> (this)->*next) ()) {
if (unlikely (!(static_cast <T*> (this)->next)))
return (size_t) -1;
return size_;
}
const int rc = (static_cast <T*> (this)->*next) ();
if (rc != 0)
return rc;
}
return size_;
return 0;
}
size_t pos = 0;
while (true) {
// Try to get more space in the message to fill in.
// If none is available, return.
while (!to_read) {
if (!(static_cast <T*> (this)->*next) ()) {
if (unlikely (!(static_cast <T*> (this)->next)))
return (size_t) -1;
return pos;
}
}
// If there are no more data in the buffer, return.
if (pos == size_)
return pos;
while (bytes_used_ < size_) {
// Copy the data from buffer to the message.
size_t to_copy = std::min (to_read, size_ - pos);
memcpy (read_pos, data_ + pos, to_copy);
const size_t to_copy = std::min (to_read, size_ - bytes_used_);
memcpy (read_pos, data_ + bytes_used_, to_copy);
read_pos += to_copy;
pos += to_copy;
to_read -= to_copy;
}
}
// Returns true if the decoder has been fed all required data
// but cannot proceed with the next decoding step.
// False is returned if the decoder has encountered an error.
bool stalled ()
{
// Check whether there was decoding error.
if (unlikely (!(static_cast <T*> (this)->next)))
return false;
while (!to_read) {
if (!(static_cast <T*> (this)->*next) ()) {
if (unlikely (!(static_cast <T*> (this)->next)))
return false;
return true;
bytes_used_ += to_copy;
// Try to get more space in the message to fill in.
// If none is available, return.
while (to_read == 0) {
const int rc = (static_cast <T*> (this)->*next) ();
if (rc != 0)
return rc;
}
}
return false;
}
inline bool message_ready_size (size_t /* msg_sz */)
{
zmq_assert (false);
return false;
return 0;
}
protected:
// Prototype of state machine action. Action should return false if
// it is unable to push the data to the system.
typedef bool (T::*step_t) ();
typedef int (T::*step_t) ();
// This function should be called from derived class to read data
// from the buffer and schedule next state machine action.
inline void next_step (void *read_pos_, size_t to_read_,
step_t next_)
inline void next_step (void *read_pos_, size_t to_read_, step_t next_)
{
read_pos = (unsigned char*) read_pos_;
to_read = to_read_;
next = next_;
}
// This function should be called from the derived class to
// abort decoder state machine.
inline void decoding_error ()
{
next = NULL;
}
private:
// Next step. If set to NULL, it means that associated data stream
......
......@@ -38,8 +38,6 @@
namespace zmq
{
class i_msg_source;
// Helper base class for encoders. It implements the state machine that
// fills the outgoing buffer. Derived classes should implement individual
// state machine actions.
......@@ -49,7 +47,8 @@ namespace zmq
public:
inline encoder_base_t (size_t bufsize_) :
bufsize (bufsize_)
bufsize (bufsize_),
in_progress (NULL)
{
buf = (unsigned char*) malloc (bufsize_);
alloc_assert (buf);
......@@ -65,17 +64,13 @@ namespace zmq
// The function returns a batch of binary data. The data
// are filled to a supplied buffer. If no buffer is supplied (data_
// points to NULL) decoder object will provide buffer of its own.
// If offset is not NULL, it is filled by offset of the first message
// in the batch.If there's no beginning of a message in the batch,
// offset is set to -1.
inline void get_data (unsigned char **data_, size_t *size_,
int *offset_ = NULL)
inline size_t encode (unsigned char **data_, size_t size_)
{
unsigned char *buffer = !*data_ ? buf : *data_;
size_t buffersize = !*data_ ? bufsize : *size_;
size_t buffersize = !*data_ ? bufsize : size_;
if (offset_)
*offset_ = -1;
if (in_progress == NULL)
return 0;
size_t pos = 0;
while (pos < buffersize) {
......@@ -84,14 +79,15 @@ namespace zmq
// If there are still no data, return what we already have
// in the buffer.
if (!to_write) {
// If we are to encode the beginning of a new message,
// adjust the message offset.
if (beginning)
if (offset_ && *offset_ == -1)
*offset_ = static_cast <int> (pos);
if (!(static_cast <T*> (this)->*next) ())
if (new_msg_flag) {
int rc = in_progress->close ();
errno_assert (rc == 0);
rc = in_progress->init ();
errno_assert (rc == 0);
in_progress = NULL;
break;
}
(static_cast <T*> (this)->*next) ();
}
// If there are no data in the buffer yet and we are able to
......@@ -106,10 +102,10 @@ namespace zmq
// amounts of time.
if (!pos && !*data_ && to_write >= buffersize) {
*data_ = write_pos;
*size_ = to_write;
pos = to_write;
write_pos = NULL;
to_write = 0;
return;
return pos;
}
// Copy data to the buffer. If the buffer is full, return.
......@@ -121,7 +117,14 @@ namespace zmq
}
*data_ = buffer;
*size_ = pos;
return pos;
}
void load_msg (msg_t *msg_)
{
zmq_assert (in_progress == NULL);
in_progress = msg_;
(static_cast <T*> (this)->*next) ();
}
inline bool has_data ()
......@@ -132,18 +135,17 @@ namespace zmq
protected:
// Prototype of state machine action.
typedef bool (T::*step_t) ();
typedef void (T::*step_t) ();
// This function should be called from derived class to write the data
// to the buffer and schedule next state machine action. Set beginning
// to true when you are writing first byte of a message.
// to the buffer and schedule next state machine action.
inline void next_step (void *write_pos_, size_t to_write_,
step_t next_, bool beginning_)
step_t next_, bool new_msg_flag_)
{
write_pos = (unsigned char*) write_pos_;
to_write = to_write_;
next = next_;
beginning = beginning_;
new_msg_flag = new_msg_flag_;
}
private:
......@@ -158,8 +160,7 @@ namespace zmq
// is dead.
step_t next;
// If true, first byte of the message is being written.
bool beginning;
bool new_msg_flag;
// The buffer for encoded data.
size_t bufsize;
......@@ -167,6 +168,11 @@ namespace zmq
encoder_base_t (const encoder_base_t&);
void operator = (const encoder_base_t&);
protected:
msg_t *in_progress;
};
}
......
......@@ -25,7 +25,7 @@
namespace zmq
{
class i_msg_sink;
class msg_t;
// Interface to be implemented by message decoder.
......@@ -34,15 +34,16 @@ namespace zmq
public:
virtual ~i_decoder () {}
virtual void set_msg_sink (i_msg_sink *msg_sink_) = 0;
virtual void get_buffer (unsigned char **data_, size_t *size_) = 0;
virtual size_t process_buffer (unsigned char *data_, size_t size_) = 0;
virtual bool stalled () = 0;
// Decodes data pointed to by data_.
// When a message is decoded, 1 is returned.
// When the decoder needs more data, 0 is returnd.
// On error, -1 is returned and errno is set accordingly.
virtual int decode (const unsigned char *data_, size_t size_,
size_t &processed) = 0;
virtual bool message_ready_size (size_t msg_sz) = 0;
virtual msg_t *msg () = 0;
};
}
......
......@@ -26,7 +26,7 @@ namespace zmq
{
// Forward declaration
class i_msg_source;
class msg_t;
// Interface to be implemented by message encoder.
......@@ -34,17 +34,14 @@ namespace zmq
{
virtual ~i_encoder () {}
// Set message producer.
virtual void set_msg_source (i_msg_source *msg_source_) = 0;
// The function returns a batch of binary data. The data
// are filled to a supplied buffer. If no buffer is supplied (data_
// is NULL) encoder will provide buffer of its own.
// If offset is not NULL, it is filled by offset of the first message
// in the batch.If there's no beginning of a message in the batch,
// offset is set to -1.
virtual void get_data (unsigned char **data_, size_t *size_,
int *offset_ = NULL) = 0;
// Function returns 0 when a new message is required.
virtual size_t encode (unsigned char **data_, size_t size) = 0;
// Load a new message into encoder.
virtual void load_msg (msg_t *msg_) = 0;
virtual bool has_data () = 0;
};
......
/*
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_I_MSG_SINK_HPP_INCLUDED__
#define __ZMQ_I_MSG_SINK_HPP_INCLUDED__
namespace zmq
{
// Forward declaration
class msg_t;
// Interface to be implemented by message sink.
class i_msg_sink
{
public:
virtual ~i_msg_sink () {}
// Delivers a message. Returns 0 if successful; -1 otherwise.
// The function takes ownership of the passed message.
virtual int push_msg (msg_t *msg_) = 0;
};
}
#endif
/*
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_I_MSG_SOURCE_HPP_INCLUDED__
#define __ZMQ_I_MSG_SOURCE_HPP_INCLUDED__
namespace zmq
{
// Forward declaration
class msg_t;
// Interface to be implemented by message source.
class i_msg_source
{
public:
virtual ~i_msg_source () {}
// Fetch a message. Returns 0 if successful; -1 otherwise.
// The caller is responsible for freeing the message when no
// longer used.
virtual int pull_msg (msg_t *msg_) = 0;
};
}
#endif
......@@ -41,8 +41,8 @@ zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
pgm_socket (true, options_),
options (options_),
session (NULL),
mru_decoder (NULL),
pending_bytes (0)
active_tsi (NULL),
insize (0)
{
}
......@@ -83,9 +83,7 @@ void zmq::pgm_receiver_t::unplug ()
delete it->second.decoder;
}
peers.clear ();
mru_decoder = NULL;
pending_bytes = 0;
active_tsi = NULL;
if (has_rx_timer) {
cancel_timer (rx_timer_id);
......@@ -111,49 +109,47 @@ void zmq::pgm_receiver_t::activate_out ()
void zmq::pgm_receiver_t::activate_in ()
{
// It is possible that the most recently used decoder
// processed the whole buffer but failed to write
// the last message into the pipe.
if (pending_bytes == 0) {
if (mru_decoder != NULL) {
mru_decoder->process_buffer (NULL, 0);
session->flush ();
zmq_assert (session != NULL);
zmq_assert (active_tsi != NULL);
const peers_t::iterator it = peers.find (*active_tsi);
zmq_assert (it != peers.end ());
zmq_assert (it->second.joined);
// Push the pending message into the session.
int rc = session->push_msg (it->second.decoder->msg ());
errno_assert (rc == 0);
if (insize > 0) {
rc = process_input (it->second.decoder);
if (rc == -1) {
// HWM reached; we will try later.
if (errno == EAGAIN) {
session->flush ();
return;
}
// Data error. Delete message decoder, mark the
// peer as not joined and drop remaining data.
it->second.joined = false;
delete it->second.decoder;
it->second.decoder = NULL;
insize = 0;
}
// Resume polling.
set_pollin (pipe_handle);
set_pollin (socket_handle);
return;
}
zmq_assert (mru_decoder != NULL);
zmq_assert (pending_ptr != NULL);
// Ask the decoder to process remaining data.
size_t n = mru_decoder->process_buffer (pending_ptr, pending_bytes);
pending_bytes -= n;
session->flush ();
if (pending_bytes > 0)
return;
// Resume polling.
set_pollin (pipe_handle);
set_pollin (socket_handle);
active_tsi = NULL;
in_event ();
}
void zmq::pgm_receiver_t::in_event ()
{
// Read data from the underlying pgm_socket.
unsigned char *data = NULL;
const pgm_tsi_t *tsi = NULL;
if (pending_bytes > 0)
return;
if (has_rx_timer) {
cancel_timer (rx_timer_id);
has_rx_timer = false;
......@@ -167,7 +163,7 @@ void zmq::pgm_receiver_t::in_event ()
// Note the workaround made not to break strict-aliasing rules.
void *tmp = NULL;
ssize_t received = pgm_socket.receive (&tmp, &tsi);
data = (unsigned char*) tmp;
inpos = (unsigned char*) tmp;
// No data to process. This may happen if the packet received is
// neither ODATA nor ODATA.
......@@ -187,8 +183,6 @@ void zmq::pgm_receiver_t::in_event ()
if (received == -1) {
if (it != peers.end ()) {
it->second.joined = false;
if (it->second.decoder == mru_decoder)
mru_decoder = NULL;
if (it->second.decoder != NULL) {
delete it->second.decoder;
it->second.decoder = NULL;
......@@ -203,11 +197,13 @@ void zmq::pgm_receiver_t::in_event ()
it = peers.insert (peers_t::value_type (*tsi, peer_info)).first;
}
insize = static_cast <size_t> (received);
// Read the offset of the fist message in the current packet.
zmq_assert ((size_t) received >= sizeof (uint16_t));
uint16_t offset = get_uint16 (data);
data += sizeof (uint16_t);
received -= sizeof (uint16_t);
zmq_assert (insize >= sizeof (uint16_t));
uint16_t offset = get_uint16 (inpos);
inpos += sizeof (uint16_t);
insize -= sizeof (uint16_t);
// Join the stream if needed.
if (!it->second.joined) {
......@@ -217,12 +213,12 @@ void zmq::pgm_receiver_t::in_event ()
if (offset == 0xffff)
continue;
zmq_assert (offset <= received);
zmq_assert (offset <= insize);
zmq_assert (it->second.decoder == NULL);
// We have to move data to the begining of the first message.
data += offset;
received -= offset;
inpos += offset;
insize -= offset;
// Mark the stream as joined.
it->second.joined = true;
......@@ -231,28 +227,24 @@ void zmq::pgm_receiver_t::in_event ()
it->second.decoder = new (std::nothrow)
v1_decoder_t (0, options.maxmsgsize);
alloc_assert (it->second.decoder);
it->second.decoder->set_msg_sink (session);
}
mru_decoder = it->second.decoder;
// Push all the data to the decoder.
ssize_t processed = it->second.decoder->process_buffer (data, received);
if (processed < received) {
// Save some state so we can resume the decoding process later.
pending_bytes = received - processed;
pending_ptr = data + processed;
// Stop polling.
reset_pollin (pipe_handle);
reset_pollin (socket_handle);
// Reset outstanding timer.
if (has_rx_timer) {
cancel_timer (rx_timer_id);
has_rx_timer = false;
int rc = process_input (it->second.decoder);
if (rc == -1) {
if (errno == EAGAIN) {
active_tsi = tsi;
// Stop polling.
reset_pollin (pipe_handle);
reset_pollin (socket_handle);
break;
}
break;
it->second.joined = false;
delete it->second.decoder;
it->second.decoder = NULL;
insize = 0;
}
}
......@@ -260,6 +252,29 @@ void zmq::pgm_receiver_t::in_event ()
session->flush ();
}
int zmq::pgm_receiver_t::process_input (v1_decoder_t *decoder)
{
zmq_assert (session != NULL);
while (insize > 0) {
size_t n = 0;
int rc = decoder->decode (inpos, insize, n);
if (rc == -1)
return -1;
inpos += n;
insize -= n;
if (rc == 0)
break;
rc = session->push_msg (decoder->msg ());
if (rc == -1) {
errno_assert (errno == EAGAIN);
return -1;
}
}
return 0;
}
void zmq::pgm_receiver_t::timer_event (int token)
{
zmq_assert (token == rx_timer_id);
......
......@@ -69,6 +69,10 @@ namespace zmq
// Unplug the engine from the session.
void unplug ();
// Decode received data (inpos, insize) and forward decoded
// messages to the session.
int process_input (v1_decoder_t *decoder);
// PGM is not able to move subscriptions upstream. Thus, drop all
// the pending subscriptions.
void drop_subscriptions ();
......@@ -112,14 +116,13 @@ namespace zmq
// Associated session.
zmq::session_base_t *session;
// Most recently used decoder.
v1_decoder_t *mru_decoder;
const pgm_tsi_t *active_tsi;
// Number of bytes not consumed by the decoder due to pipe overflow.
size_t pending_bytes;
size_t insize;
// Pointer to data still waiting to be processed by the decoder.
unsigned char *pending_ptr;
const unsigned char *inpos;
// Poll handle associated with PGM socket.
handle_t socket_handle;
......
......@@ -39,13 +39,17 @@ zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
io_object_t (parent_),
has_tx_timer (false),
has_rx_timer (false),
session (NULL),
encoder (0),
more_flag (false),
pgm_socket (false, options_),
options (options_),
out_buffer (NULL),
out_buffer_size (0),
write_size (0)
{
int rc = msg.init ();
errno_assert (rc == 0);
}
int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
......@@ -69,7 +73,7 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_)
fd_t rdata_notify_fd = retired_fd;
fd_t pending_notify_fd = retired_fd;
encoder.set_msg_source (session_);
session = session_;
// Fill fds from PGM transport and add them to the poller.
pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd,
......@@ -106,7 +110,7 @@ void zmq::pgm_sender_t::unplug ()
rm_fd (uplink_handle);
rm_fd (rdata_notify_handle);
rm_fd (pending_notify_handle);
encoder.set_msg_source (NULL);
session = NULL;
}
void zmq::pgm_sender_t::terminate ()
......@@ -128,6 +132,9 @@ void zmq::pgm_sender_t::activate_in ()
zmq::pgm_sender_t::~pgm_sender_t ()
{
int rc = msg.close ();
errno_assert (rc == 0);
if (out_buffer) {
free (out_buffer);
out_buffer = NULL;
......@@ -161,18 +168,31 @@ void zmq::pgm_sender_t::out_event ()
// the get data function we prevent it from returning its own buffer.
unsigned char *bf = out_buffer + sizeof (uint16_t);
size_t bfsz = out_buffer_size - sizeof (uint16_t);
int offset = -1;
encoder.get_data (&bf, &bfsz, &offset);
uint16_t offset = 0xffff;
size_t bytes = encoder.encode (&bf, bfsz);
while (bytes < bfsz) {
if (!more_flag && offset == 0xffff)
offset = static_cast <uint16_t> (bytes);
int rc = session->pull_msg (&msg);
if (rc == -1)
break;
more_flag = msg.flags () & msg_t::more;
encoder.load_msg (&msg);
bf = out_buffer + sizeof (uint16_t) + bytes;
bytes += encoder.encode (&bf, bfsz - bytes);
}
// If there are no data to write stop polling for output.
if (!bfsz) {
if (bytes == 0) {
reset_pollout (handle);
return;
}
write_size = sizeof (uint16_t) + bytes;
// Put offset information in the buffer.
write_size = bfsz + sizeof (uint16_t);
put_uint16 (out_buffer, offset == -1 ? 0xffff : (uint16_t) offset);
put_uint16 (out_buffer, offset);
}
if (has_tx_timer) {
......
......@@ -34,6 +34,7 @@
#include "options.hpp"
#include "pgm_socket.hpp"
#include "v1_encoder.hpp"
#include "msg.hpp"
namespace zmq
{
......@@ -75,9 +76,16 @@ namespace zmq
bool has_tx_timer;
bool has_rx_timer;
session_base_t *session;
// Message encoder.
v1_encoder_t encoder;
msg_t msg;
// Keeps track of message boundaries.
bool more_flag;
// PGM socket.
pgm_socket_t pgm_socket;
......
......@@ -26,72 +26,38 @@
#endif
#include "raw_decoder.hpp"
#include "likely.hpp"
#include "wire.hpp"
#include "err.hpp"
zmq::raw_decoder_t::raw_decoder_t (size_t bufsize_,
int64_t maxmsgsize_, i_msg_sink *msg_sink_) :
decoder_base_t <raw_decoder_t> (bufsize_),
msg_sink (msg_sink_),
maxmsgsize (maxmsgsize_)
zmq::raw_decoder_t::raw_decoder_t (size_t bufsize_) :
bufsize (bufsize_)
{
int rc = in_progress.init ();
errno_assert (rc == 0);
buffer = (unsigned char *) malloc (bufsize);
alloc_assert (buffer);
}
zmq::raw_decoder_t::~raw_decoder_t ()
{
int rc = in_progress.close ();
errno_assert (rc == 0);
}
void zmq::raw_decoder_t::set_msg_sink (i_msg_sink *msg_sink_)
{
msg_sink = msg_sink_;
}
bool zmq::raw_decoder_t::stalled ()
{
return false;
free (buffer);
}
bool zmq::raw_decoder_t::message_ready_size (size_t msg_sz)
void zmq::raw_decoder_t::get_buffer (unsigned char **data_, size_t *size_)
{
int rc = in_progress.init_size (msg_sz);
if (rc != 0) {
errno_assert (errno == ENOMEM);
rc = in_progress.init ();
errno_assert (rc == 0);
decoding_error ();
return false;
}
next_step (in_progress.data (), in_progress.size (),
&raw_decoder_t::raw_message_ready);
return true;
*data_ = buffer;
*size_ = bufsize;
}
bool zmq::raw_decoder_t::raw_message_ready ()
int zmq::raw_decoder_t::decode (const uint8_t *data_, size_t size_,
size_t &bytes_used_)
{
zmq_assert (in_progress.size ());
// Message is completely read. Push it further and start reading
// new message. (in_progress is a 0-byte message after this point.)
if (unlikely (!msg_sink))
return false;
int rc = msg_sink->push_msg (&in_progress);
if (unlikely (rc != 0)) {
if (errno != EAGAIN)
decoding_error ();
return false;
}
// NOTE: This is just to break out of process_buffer
// raw_message_ready should never get called in state machine w/o
// message_ready_size from stream_engine.
next_step (in_progress.data (), 1,
&raw_decoder_t::raw_message_ready);
return true;
int rc = in_progress.init_size (size_);
errno_assert (rc != -1);
memcpy (in_progress.data (), data_, size_);
bytes_used_ = size_;
return 1;
}
......@@ -22,9 +22,7 @@
#include "err.hpp"
#include "msg.hpp"
#include "decoder.hpp"
#include "raw_decoder.hpp"
#include "i_msg_sink.hpp"
#include "i_decoder.hpp"
#include "stdint.hpp"
namespace zmq
......@@ -32,30 +30,31 @@ namespace zmq
// Decoder for 0MQ v1 framing protocol. Converts data stream into messages.
class raw_decoder_t : public decoder_base_t <raw_decoder_t>
class raw_decoder_t : public i_decoder
{
public:
raw_decoder_t (size_t bufsize_,
int64_t maxmsgsize_, i_msg_sink *msg_sink_);
raw_decoder_t (size_t bufsize_);
virtual ~raw_decoder_t ();
// i_decoder interface.
virtual void set_msg_sink (i_msg_sink *msg_sink_);
virtual bool stalled ();
virtual void get_buffer (unsigned char **data_, size_t *size_);
virtual bool message_ready_size (size_t msg_sz);
virtual int decode (const unsigned char *data_, size_t size_,
size_t &processed);
virtual msg_t *msg () { return &in_progress; }
private:
private:
bool raw_message_ready ();
i_msg_sink *msg_sink;
msg_t in_progress;
const int64_t maxmsgsize;
const int64_t bufsize;
unsigned char *buffer;
raw_decoder_t (const raw_decoder_t&);
void operator = (const raw_decoder_t&);
......
......@@ -19,65 +19,22 @@
#include "encoder.hpp"
#include "raw_encoder.hpp"
#include "i_msg_source.hpp"
#include "likely.hpp"
#include "wire.hpp"
zmq::raw_encoder_t::raw_encoder_t (size_t bufsize_, i_msg_source *msg_source_) :
encoder_base_t <raw_encoder_t> (bufsize_),
msg_source (msg_source_)
zmq::raw_encoder_t::raw_encoder_t (size_t bufsize_) :
encoder_base_t <raw_encoder_t> (bufsize_)
{
int rc = in_progress.init ();
errno_assert (rc == 0);
// Write 0 bytes to the batch and go to message_ready state.
next_step (NULL, 0, &raw_encoder_t::raw_message_ready, true);
}
zmq::raw_encoder_t::~raw_encoder_t ()
{
int rc = in_progress.close ();
errno_assert (rc == 0);
}
void zmq::raw_encoder_t::set_msg_source (i_msg_source *msg_source_)
{
msg_source = msg_source_;
}
bool zmq::raw_encoder_t::raw_message_size_ready ()
{
// Write message body into the buffer.
next_step (in_progress.data (), in_progress.size (),
&raw_encoder_t::raw_message_ready, !(in_progress.flags () & msg_t::more));
return true;
}
bool zmq::raw_encoder_t::raw_message_ready ()
void zmq::raw_encoder_t::raw_message_ready ()
{
// Destroy content of the old message.
int rc = in_progress.close ();
errno_assert (rc == 0);
// Read new message. If there is none, return false.
// Note that new state is set only if write is successful. That way
// unsuccessful write will cause retry on the next state machine
// invocation.
if (unlikely (!msg_source)) {
rc = in_progress.init ();
errno_assert (rc == 0);
return false;
}
rc = msg_source->pull_msg (&in_progress);
if (unlikely (rc != 0)) {
errno_assert (errno == EAGAIN);
rc = in_progress.init ();
errno_assert (rc == 0);
return false;
}
in_progress.reset_flags(0xff);
next_step (NULL, 0, &raw_encoder_t::raw_message_size_ready, true);
return true;
next_step (in_progress->data (), in_progress->size (),
&raw_encoder_t::raw_message_ready, true);
}
......@@ -44,19 +44,13 @@ namespace zmq
{
public:
raw_encoder_t (size_t bufsize_, i_msg_source *msg_source_);
raw_encoder_t (size_t bufsize_);
~raw_encoder_t ();
void set_msg_source (i_msg_source *msg_source_);
private:
bool raw_message_ready ();
bool raw_message_size_ready ();
void raw_message_ready ();
i_msg_source *msg_source;
msg_t in_progress;
unsigned char tmpbuf [4];
raw_encoder_t (const raw_encoder_t&);
const raw_encoder_t &operator = (const raw_encoder_t&);
};
......
......@@ -138,7 +138,7 @@ zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const address_t *addr_) :
dealer_session_t (io_thread_, connect_, socket_, options_, addr_),
state (identity)
state (bottom)
{
}
......@@ -163,12 +163,6 @@ int zmq::req_session_t::push_msg (msg_t *msg_)
return dealer_session_t::push_msg (msg_);
}
break;
case identity:
if (msg_->flags () == 0) {
state = bottom;
return dealer_session_t::push_msg (msg_);
}
break;
}
errno = EFAULT;
return -1;
......@@ -177,5 +171,5 @@ int zmq::req_session_t::push_msg (msg_t *msg_)
void zmq::req_session_t::reset ()
{
session_base_t::reset ();
state = identity;
state = bottom;
}
......@@ -74,7 +74,6 @@ namespace zmq
private:
enum {
identity,
bottom,
body
} state;
......
......@@ -111,8 +111,6 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
socket (socket_),
io_thread (io_thread_),
has_linger_timer (false),
identity_sent (false),
identity_received (false),
addr (addr_)
{
}
......@@ -146,17 +144,6 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
int zmq::session_base_t::pull_msg (msg_t *msg_)
{
// Unless the socket is in raw mode, the first
// message we send is its identity.
if (unlikely (!identity_sent && !options.raw_sock)) {
int rc = msg_->init_size (options.identity_size);
errno_assert (rc == 0);
memcpy (msg_->data (), options.identity, options.identity_size);
identity_sent = true;
incomplete_in = false;
return 0;
}
if (!pipe || !pipe->read (msg_)) {
errno = EAGAIN;
return -1;
......@@ -168,20 +155,6 @@ int zmq::session_base_t::pull_msg (msg_t *msg_)
int zmq::session_base_t::push_msg (msg_t *msg_)
{
// Unless the socket is in raw mode, the first
// message we receive is its identity.
if (unlikely (!identity_received && !options.raw_sock)) {
msg_->set_flags (msg_t::identity);
identity_received = true;
if (!options.recv_identity) {
int rc = msg_->close ();
errno_assert (rc == 0);
rc = msg_->init ();
errno_assert (rc == 0);
return 0;
}
}
if (pipe && pipe->write (msg_)) {
int rc = msg_->init ();
errno_assert (rc == 0);
......@@ -194,9 +167,6 @@ int zmq::session_base_t::push_msg (msg_t *msg_)
void zmq::session_base_t::reset ()
{
// Restore identity flags.
identity_sent = false;
identity_received = false;
}
void zmq::session_base_t::flush ()
......
......@@ -26,8 +26,6 @@
#include "own.hpp"
#include "io_object.hpp"
#include "pipe.hpp"
#include "i_msg_source.hpp"
#include "i_msg_sink.hpp"
#include "socket_base.hpp"
namespace zmq
......@@ -42,9 +40,7 @@ namespace zmq
class session_base_t :
public own_t,
public io_object_t,
public i_pipe_events,
public i_msg_source,
public i_msg_sink
public i_pipe_events
{
public:
......@@ -56,12 +52,6 @@ namespace zmq
// To be used once only, when creating the session.
void attach_pipe (zmq::pipe_t *pipe_);
// i_msg_source interface implementation.
virtual int pull_msg (msg_t *msg_);
// i_msg_sink interface implementation.
virtual int push_msg (msg_t *msg_);
// Following functions are the interface exposed towards the engine.
virtual void reset ();
void flush ();
......@@ -73,6 +63,15 @@ namespace zmq
void hiccuped (zmq::pipe_t *pipe_);
void terminated (zmq::pipe_t *pipe_);
// Delivers a message. Returns 0 if successful; -1 otherwise.
// The function takes ownership of the message.
int push_msg (msg_t *msg_);
// Fetches a message. Returns 0 if successful; -1 otherwise.
// The caller is responsible for freeing the message when no
// longer used.
int pull_msg (msg_t *msg_);
socket_base_t *get_socket ();
protected:
......@@ -137,10 +136,6 @@ namespace zmq
// True is linger timer is running.
bool has_linger_timer;
// If true, identity has been sent/received from the network.
bool identity_sent;
bool identity_received;
// Protocol and address to use when connecting.
const address_t *addr;
......
......@@ -50,7 +50,6 @@
zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, const std::string &endpoint_) :
s (fd_),
io_enabled (false),
inpos (NULL),
insize (0),
decoder (NULL),
......@@ -64,13 +63,23 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons
endpoint (endpoint_),
plugged (false),
terminating (false),
io_error (false),
congested (false),
identity_received (false),
identity_sent (false),
rx_initialized (false),
tx_initialized (false),
subscription_required (false),
socket (NULL)
{
int rc = tx_msg.init ();
errno_assert (rc == 0);
// Put the socket into non-blocking mode.
unblock_socket (s);
// Set the socket buffer limits for the underlying socket.
if (options.sndbuf) {
int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF,
rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF,
(char*) &options.sndbuf, sizeof (int));
#ifdef ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
......@@ -79,7 +88,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons
#endif
}
if (options.rcvbuf) {
int rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF,
rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF,
(char*) &options.rcvbuf, sizeof (int));
#ifdef ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
......@@ -112,6 +121,9 @@ zmq::stream_engine_t::~stream_engine_t ()
s = retired_fd;
}
int rc = tx_msg.close ();
errno_assert (rc == 0);
if (encoder != NULL)
delete encoder;
if (decoder != NULL)
......@@ -133,15 +145,14 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
// Connect to I/O threads poller object.
io_object_t::plug (io_thread_);
handle = add_fd (s);
io_enabled = true;
io_error = false;
if (options.raw_sock) {
// no handshaking for raw sock, instantiate raw encoder and decoders
encoder = new (std::nothrow) raw_encoder_t (out_batch_size, session);
encoder = new (std::nothrow) raw_encoder_t (out_batch_size);
alloc_assert (encoder);
decoder = new (std::nothrow)
raw_decoder_t (in_batch_size, options.maxmsgsize, session);
decoder = new (std::nothrow) raw_decoder_t (in_batch_size);
alloc_assert (decoder);
// disable handshaking for raw socket
......@@ -169,19 +180,12 @@ void zmq::stream_engine_t::unplug ()
plugged = false;
// Cancel all fd subscriptions.
if (io_enabled) {
if (!io_error)
rm_fd (handle);
io_enabled = false;
}
// Disconnect from I/O threads poller object.
io_object_t::unplug ();
// Disconnect from session object.
if (encoder)
encoder->set_msg_source (NULL);
if (decoder)
decoder->set_msg_sink (NULL);
session = NULL;
}
......@@ -198,14 +202,21 @@ void zmq::stream_engine_t::terminate ()
void zmq::stream_engine_t::in_event ()
{
assert (!io_error);
// If still handshaking, receive and process the greeting message.
if (unlikely (handshaking))
if (!handshake ())
return;
zmq_assert (decoder);
bool disconnection = false;
size_t processed;
// If there has been an I/O error, stop polling.
if (congested) {
rm_fd (handle);
io_error = true;
return;
}
// If there's no data to process in the buffer...
if (!insize) {
......@@ -215,58 +226,51 @@ void zmq::stream_engine_t::in_event ()
// the underlying TCP layer has fixed buffer size and thus the
// number of bytes read will be always limited.
decoder->get_buffer (&inpos, &insize);
insize = read (inpos, insize);
const int bytes_read = read (inpos, insize);
// Check whether the peer has closed the connection.
if (insize == (size_t) -1) {
insize = 0;
disconnection = true;
if (bytes_read == -1) {
error ();
return;
}
}
if (options.raw_sock) {
if (insize == 0 || !decoder->message_ready_size (insize))
processed = 0;
else
processed = decoder->process_buffer (inpos, insize);
// Adjust input size
insize = static_cast <size_t> (bytes_read);
}
else
// Push the data to the decoder.
processed = decoder->process_buffer (inpos, insize);
if (unlikely (processed == (size_t) -1))
disconnection = true;
else {
// Stop polling for input if we got stuck.
if (processed < insize)
reset_pollin (handle);
int rc = 0;
size_t processed = 0;
// Adjust the buffer.
while (insize > 0) {
rc = decoder->decode (inpos, insize, processed);
zmq_assert (processed <= insize);
inpos += processed;
insize -= processed;
if (rc == 0 || rc == -1)
break;
rc = write_msg (decoder->msg ());
if (rc == -1)
break;
}
// Flush all messages the decoder may have produced.
session->flush ();
// Input error has occurred. If the last decoded
// message has already been accepted, we terminate
// the engine immediately. Otherwise, we stop
// waiting for input events and postpone the termination
// until after the session has accepted the message.
if (disconnection) {
if (decoder->stalled ()) {
rm_fd (handle);
io_enabled = false;
}
else
// Tear down the connection if we have failed to decode input data
// or the session has rejected the message.
if (rc == -1) {
if (errno != EAGAIN) {
error ();
return;
}
congested = true;
reset_pollin (handle);
}
session->flush ();
}
void zmq::stream_engine_t::out_event ()
{
zmq_assert (!io_error);
// If write buffer is empty, try to read new data from the encoder.
if (!outsize) {
......@@ -279,7 +283,19 @@ void zmq::stream_engine_t::out_event ()
}
outpos = NULL;
encoder->get_data (&outpos, &outsize);
outsize = encoder->encode (&outpos, 0);
while (outsize < out_batch_size) {
if (read_msg (&tx_msg) == -1)
break;
encoder->load_msg (&tx_msg);
unsigned char *bufptr = outpos + outsize;
size_t n = encoder->encode (&bufptr, out_batch_size - outsize);
zmq_assert (n > 0);
if (outpos == NULL)
outpos = bufptr;
outsize += n;
}
// If there is no data to send, stop polling for output.
if (outsize == 0) {
......@@ -321,6 +337,9 @@ void zmq::stream_engine_t::out_event ()
void zmq::stream_engine_t::activate_out ()
{
if (unlikely (io_error))
return;
set_pollout (handle);
// Speculative write: The assumption is that at the moment new message
......@@ -332,22 +351,45 @@ void zmq::stream_engine_t::activate_out ()
void zmq::stream_engine_t::activate_in ()
{
if (unlikely (!io_enabled)) {
// There was an input error but the engine could not
// be terminated (due to the stalled decoder).
// Flush the pending message and terminate the engine now.
zmq_assert (decoder);
decoder->process_buffer (inpos, 0);
zmq_assert (!decoder->stalled ());
session->flush ();
error ();
zmq_assert (congested);
zmq_assert (session != NULL);
zmq_assert (decoder != NULL);
int rc = write_msg (decoder->msg ());
if (rc == -1) {
if (errno == EAGAIN)
session->flush ();
else
error ();
return;
}
set_pollin (handle);
while (insize > 0) {
size_t processed = 0;
rc = decoder->decode (inpos, insize, processed);
zmq_assert (processed <= insize);
inpos += processed;
insize -= processed;
if (rc == 0 || rc == -1)
break;
rc = write_msg (decoder->msg ());
if (rc == -1)
break;
}
// Speculative read.
in_event ();
if (rc == -1 && errno == EAGAIN)
session->flush ();
else
if (rc == -1 || io_error)
error ();
else {
congested = false;
set_pollin (handle);
session->flush ();
// Speculative read.
in_event ();
}
}
bool zmq::stream_engine_t::handshake ()
......@@ -402,11 +444,9 @@ bool zmq::stream_engine_t::handshake ()
if (greeting_recv [0] != 0xff || !(greeting_recv [9] & 0x01)) {
encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
alloc_assert (encoder);
encoder->set_msg_source (session);
decoder = new (std::nothrow) v1_decoder_t (in_batch_size, options.maxmsgsize);
alloc_assert (decoder);
decoder->set_msg_sink (session);
// We have already sent the message header.
// Since there is no way to tell the encoder to
......@@ -414,8 +454,7 @@ bool zmq::stream_engine_t::handshake ()
// header data away.
const size_t header_size = options.identity_size + 1 >= 255 ? 10 : 2;
unsigned char tmp [10], *bufferp = tmp;
size_t buffer_size = header_size;
encoder->get_data (&bufferp, &buffer_size);
size_t buffer_size = encoder->encode (&bufferp, header_size);
zmq_assert (buffer_size == header_size);
// Make sure the decoder sees the data we have already received.
......@@ -424,33 +463,28 @@ bool zmq::stream_engine_t::handshake ()
// To allow for interoperability with peers that do not forward
// their subscriptions, we inject a phony subscription
// message into the incoming message stream. To put this
// message right after the identity message, we temporarily
// divert the message stream from session to ourselves.
// message into the incomming message stream.
if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB)
decoder->set_msg_sink (this);
subscription_required = true;
}
else
if (greeting_recv [revision_pos] == ZMTP_1_0) {
encoder = new (std::nothrow) v1_encoder_t (
out_batch_size);
alloc_assert (encoder);
encoder->set_msg_source (session);
decoder = new (std::nothrow) v1_decoder_t (
in_batch_size, options.maxmsgsize);
alloc_assert (decoder);
decoder->set_msg_sink (session);
}
else
if (greeting_recv [revision_pos] == ZMTP_2_0
|| greeting_recv [revision_pos] == ZMTP_2_1) {
encoder = new (std::nothrow) v2_encoder_t (
out_batch_size, session);
encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
alloc_assert (encoder);
decoder = new (std::nothrow) v2_decoder_t (
in_batch_size, options.maxmsgsize, session);
in_batch_size, options.maxmsgsize);
alloc_assert (decoder);
}
......@@ -465,35 +499,67 @@ bool zmq::stream_engine_t::handshake ()
return true;
}
int zmq::stream_engine_t::push_msg (msg_t *msg_)
int zmq::stream_engine_t::read_msg (msg_t *msg_)
{
zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB);
if (likely (tx_initialized || options.raw_sock))
return session->pull_msg (msg_);
// The first message is identity.
// Let the session process it.
int rc = session->push_msg (msg_);
errno_assert (rc == 0);
if (!identity_sent) {
int rc = msg_->init_size (options.identity_size);
errno_assert (rc == 0);
memcpy (msg_->data (), options.identity, options.identity_size);
identity_sent = true;
tx_initialized = true;
return 0;
}
// Inject the subscription message so that the ZMQ 2.x peer
// receives our messages.
rc = msg_->init_size (1);
errno_assert (rc == 0);
*(unsigned char*) msg_->data () = 1;
rc = session->push_msg (msg_);
session->flush ();
tx_initialized = true;
return 0;
}
// Once we have injected the subscription message, we can
// Divert the message flow back to the session.
zmq_assert (decoder);
decoder->set_msg_sink (session);
int zmq::stream_engine_t::write_msg (msg_t *msg_)
{
if (likely (rx_initialized || options.raw_sock))
return session->push_msg (msg_);
if (!identity_received) {
if (options.recv_identity) {
msg_->set_flags (msg_t::identity);
int rc = session->push_msg (msg_);
if (rc == -1)
return -1;
}
else {
int rc = msg_->close ();
errno_assert (rc == 0);
rc = msg_->init ();
errno_assert (rc == 0);
}
identity_received = true;
}
return rc;
// Inject the subscription message, so that also
// ZMQ 2.x peers receive published messages.
if (subscription_required) {
int rc = msg_->init_size (1);
errno_assert (rc == 0);
*(unsigned char*) msg_->data () = 1;
rc = session->push_msg (msg_);
if (rc == -1)
return -1;
subscription_required = false;
}
rx_initialized = true;
return 0;
}
void zmq::stream_engine_t::error ()
{
zmq_assert (session);
socket->event_disconnected (endpoint, s);
session->flush ();
session->detach ();
unplug ();
delete this;
......
......@@ -24,7 +24,6 @@
#include "fd.hpp"
#include "i_engine.hpp"
#include "i_msg_sink.hpp"
#include "io_object.hpp"
#include "i_encoder.hpp"
#include "i_decoder.hpp"
......@@ -43,12 +42,13 @@ namespace zmq
};
class io_thread_t;
class msg_t;
class session_base_t;
// This engine handles any socket with SOCK_STREAM semantics,
// e.g. TCP socket or an UNIX domain socket.
class stream_engine_t : public io_object_t, public i_engine, public i_msg_sink
class stream_engine_t : public io_object_t, public i_engine
{
public:
......@@ -62,9 +62,6 @@ namespace zmq
void activate_in ();
void activate_out ();
// i_msg_sink interface implementation.
virtual int push_msg (msg_t *msg_);
// i_poll_events interface implementation.
void in_event ();
void out_event ();
......@@ -94,11 +91,14 @@ namespace zmq
// peer -1 is returned.
int read (void *data_, size_t size_);
int read_msg (msg_t *msg_);
int write_msg (msg_t *msg_);
// Underlying socket.
fd_t s;
// True iff we are registered with an I/O poller.
bool io_enabled;
msg_t tx_msg;
handle_t handle;
......@@ -137,6 +137,29 @@ namespace zmq
bool plugged;
bool terminating;
bool io_error;
// True iff the session could not accept more
// messages due to flow control.
bool congested;
// True iff the engine has received identity message.
bool identity_received;
// True iff the engine has sent identity message.
bool identity_sent;
// True iff the engine has received all ZMTP control messages.
bool rx_initialized;
// True iff the engine has sent all ZMTP control messages.
bool tx_initialized;
// Indicates whether the engine is to inject a phony
// subscription message into the incomming stream.
// Needed to support old peers.
bool subscription_required;
// Socket
zmq::socket_base_t *socket;
......
......@@ -28,14 +28,12 @@
#include "decoder.hpp"
#include "v1_decoder.hpp"
#include "i_msg_sink.hpp"
#include "likely.hpp"
#include "wire.hpp"
#include "err.hpp"
zmq::v1_decoder_t::v1_decoder_t (size_t bufsize_, int64_t maxmsgsize_) :
decoder_base_t <v1_decoder_t> (bufsize_),
msg_sink (NULL),
maxmsgsize (maxmsgsize_)
{
int rc = in_progress.init ();
......@@ -51,12 +49,7 @@ zmq::v1_decoder_t::~v1_decoder_t ()
errno_assert (rc == 0);
}
void zmq::v1_decoder_t::set_msg_sink (i_msg_sink *msg_sink_)
{
msg_sink = msg_sink_;
}
bool zmq::v1_decoder_t::one_byte_size_ready ()
int zmq::v1_decoder_t::one_byte_size_ready ()
{
// First byte of size is read. If it is 0xff read 8-byte size.
// Otherwise allocate the buffer for message data and read the
......@@ -67,34 +60,33 @@ bool zmq::v1_decoder_t::one_byte_size_ready ()
// There has to be at least one byte (the flags) in the message).
if (!*tmpbuf) {
decoding_error ();
return false;
errno = EPROTO;
return -1;
}
if (maxmsgsize >= 0 && (int64_t) (*tmpbuf - 1) > maxmsgsize) {
errno = EMSGSIZE;
return -1;
}
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
int rc;
if (maxmsgsize >= 0 && (int64_t) (*tmpbuf - 1) > maxmsgsize) {
rc = -1;
errno = ENOMEM;
}
else
rc = in_progress.init_size (*tmpbuf - 1);
if (rc != 0 && errno == ENOMEM) {
int rc = in_progress.init_size (*tmpbuf - 1);
if (rc != 0) {
errno_assert (errno == ENOMEM);
rc = in_progress.init ();
errno_assert (rc == 0);
decoding_error ();
return false;
errno = ENOMEM;
return -1;
}
errno_assert (rc == 0);
next_step (tmpbuf, 1, &v1_decoder_t::flags_ready);
}
return true;
return 0;
}
bool zmq::v1_decoder_t::eight_byte_size_ready ()
int zmq::v1_decoder_t::eight_byte_size_ready ()
{
// 8-byte payload length is read. Allocate the buffer
// for message body and read the message data into it.
......@@ -102,20 +94,20 @@ bool zmq::v1_decoder_t::eight_byte_size_ready ()
// There has to be at least one byte (the flags) in the message).
if (payload_length == 0) {
decoding_error ();
return false;
errno = EPROTO;
return -1;
}
// Message size must not exceed the maximum allowed size.
if (maxmsgsize >= 0 && payload_length - 1 > (uint64_t) maxmsgsize) {
decoding_error ();
return false;
errno = EMSGSIZE;
return -1;
}
// Message size must fit within range of size_t data type.
if (payload_length - 1 > std::numeric_limits <size_t>::max ()) {
decoding_error ();
return false;
errno = EMSGSIZE;
return -1;
}
const size_t msg_size = static_cast <size_t> (payload_length - 1);
......@@ -128,15 +120,15 @@ bool zmq::v1_decoder_t::eight_byte_size_ready ()
errno_assert (errno == ENOMEM);
rc = in_progress.init ();
errno_assert (rc == 0);
decoding_error ();
return false;
errno = ENOMEM;
return -1;
}
next_step (tmpbuf, 1, &v1_decoder_t::flags_ready);
return true;
return 0;
}
bool zmq::v1_decoder_t::flags_ready ()
int zmq::v1_decoder_t::flags_ready ()
{
// Store the flags from the wire into the message structure.
in_progress.set_flags (tmpbuf [0] & msg_t::more);
......@@ -144,22 +136,13 @@ bool zmq::v1_decoder_t::flags_ready ()
next_step (in_progress.data (), in_progress.size (),
&v1_decoder_t::message_ready);
return true;
return 0;
}
bool zmq::v1_decoder_t::message_ready ()
int zmq::v1_decoder_t::message_ready ()
{
// Message is completely read. Push it further and start reading
// new message. (in_progress is a 0-byte message after this point.)
if (unlikely (!msg_sink))
return false;
int rc = msg_sink->push_msg (&in_progress);
if (unlikely (rc != 0)) {
if (errno != EAGAIN)
decoding_error ();
return false;
}
next_step (tmpbuf, 1, &v1_decoder_t::one_byte_size_ready);
return true;
return 1;
}
......@@ -33,17 +33,15 @@ namespace zmq
v1_decoder_t (size_t bufsize_, int64_t maxmsgsize_);
~v1_decoder_t ();
// Set the receiver of decoded messages.
void set_msg_sink (i_msg_sink *msg_sink_);
virtual msg_t *msg () { return &in_progress; }
private:
bool one_byte_size_ready ();
bool eight_byte_size_ready ();
bool flags_ready ();
bool message_ready ();
int one_byte_size_ready ();
int eight_byte_size_ready ();
int flags_ready ();
int message_ready ();
i_msg_sink *msg_sink;
unsigned char tmpbuf [8];
msg_t in_progress;
......
......@@ -19,65 +19,31 @@
#include "encoder.hpp"
#include "v1_encoder.hpp"
#include "i_msg_source.hpp"
#include "likely.hpp"
#include "wire.hpp"
zmq::v1_encoder_t::v1_encoder_t (size_t bufsize_) :
encoder_base_t <v1_encoder_t> (bufsize_),
msg_source (NULL)
encoder_base_t <v1_encoder_t> (bufsize_)
{
int rc = in_progress.init ();
errno_assert (rc == 0);
// Write 0 bytes to the batch and go to message_ready state.
next_step (NULL, 0, &v1_encoder_t::message_ready, true);
}
zmq::v1_encoder_t::~v1_encoder_t ()
{
int rc = in_progress.close ();
errno_assert (rc == 0);
}
void zmq::v1_encoder_t::set_msg_source (i_msg_source *msg_source_)
{
msg_source = msg_source_;
}
bool zmq::v1_encoder_t::size_ready ()
void zmq::v1_encoder_t::size_ready ()
{
// Write message body into the buffer.
next_step (in_progress.data (), in_progress.size (),
&v1_encoder_t::message_ready, !(in_progress.flags () & msg_t::more));
return true;
next_step (in_progress->data (), in_progress->size (),
&v1_encoder_t::message_ready, true);
}
bool zmq::v1_encoder_t::message_ready ()
void zmq::v1_encoder_t::message_ready ()
{
// Destroy content of the old message.
int rc = in_progress.close ();
errno_assert (rc == 0);
// Read new message. If there is none, return false.
// Note that new state is set only if write is successful. That way
// unsuccessful write will cause retry on the next state machine
// invocation.
if (unlikely (!msg_source)) {
rc = in_progress.init ();
errno_assert (rc == 0);
return false;
}
rc = msg_source->pull_msg (&in_progress);
if (unlikely (rc != 0)) {
errno_assert (errno == EAGAIN);
rc = in_progress.init ();
errno_assert (rc == 0);
return false;
}
// Get the message size.
size_t size = in_progress.size ();
size_t size = in_progress->size ();
// Account for the 'flags' byte.
size++;
......@@ -87,14 +53,13 @@ bool zmq::v1_encoder_t::message_ready ()
// message size. In both cases 'flags' field follows.
if (size < 255) {
tmpbuf [0] = (unsigned char) size;
tmpbuf [1] = (in_progress.flags () & msg_t::more);
tmpbuf [1] = (in_progress->flags () & msg_t::more);
next_step (tmpbuf, 2, &v1_encoder_t::size_ready, false);
}
else {
tmpbuf [0] = 0xff;
put_uint64 (tmpbuf + 1, size);
tmpbuf [9] = (in_progress.flags () & msg_t::more);
tmpbuf [9] = (in_progress->flags () & msg_t::more);
next_step (tmpbuf, 10, &v1_encoder_t::size_ready, false);
}
return true;
}
......@@ -24,8 +24,6 @@
namespace zmq
{
class i_msg_source;
// Encoder for ZMTP/1.0 protocol. Converts messages into data batches.
class v1_encoder_t : public encoder_base_t <v1_encoder_t>
......@@ -35,15 +33,11 @@ namespace zmq
v1_encoder_t (size_t bufsize_);
~v1_encoder_t ();
void set_msg_source (i_msg_source *msg_source_);
private:
bool size_ready ();
bool message_ready ();
void size_ready ();
void message_ready ();
i_msg_source *msg_source;
msg_t in_progress;
unsigned char tmpbuf [10];
v1_encoder_t (const v1_encoder_t&);
......
......@@ -31,10 +31,8 @@
#include "wire.hpp"
#include "err.hpp"
zmq::v2_decoder_t::v2_decoder_t (size_t bufsize_,
int64_t maxmsgsize_, i_msg_sink *msg_sink_) :
zmq::v2_decoder_t::v2_decoder_t (size_t bufsize_, int64_t maxmsgsize_) :
decoder_base_t <v2_decoder_t> (bufsize_),
msg_sink (msg_sink_),
msg_flags (0),
maxmsgsize (maxmsgsize_)
{
......@@ -51,12 +49,7 @@ zmq::v2_decoder_t::~v2_decoder_t ()
errno_assert (rc == 0);
}
void zmq::v2_decoder_t::set_msg_sink (i_msg_sink *msg_sink_)
{
msg_sink = msg_sink_;
}
bool zmq::v2_decoder_t::flags_ready ()
int zmq::v2_decoder_t::flags_ready ()
{
msg_flags = 0;
if (tmpbuf [0] & v2_protocol_t::more_flag)
......@@ -69,92 +62,79 @@ bool zmq::v2_decoder_t::flags_ready ()
else
next_step (tmpbuf, 1, &v2_decoder_t::one_byte_size_ready);
return true;
return 0;
}
bool zmq::v2_decoder_t::one_byte_size_ready ()
int zmq::v2_decoder_t::one_byte_size_ready ()
{
int rc = 0;
// Message size must not exceed the maximum allowed size.
if (maxmsgsize >= 0)
if (unlikely (tmpbuf [0] > static_cast <uint64_t> (maxmsgsize)))
goto error;
if (unlikely (tmpbuf [0] > static_cast <uint64_t> (maxmsgsize))) {
errno = EMSGSIZE;
return -1;
}
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
rc = in_progress.init_size (tmpbuf [0]);
int rc = in_progress.init_size (tmpbuf [0]);
if (unlikely (rc)) {
errno_assert (errno == ENOMEM);
int rc = in_progress.init ();
rc = in_progress.init ();
errno_assert (rc == 0);
goto error;
errno = ENOMEM;
return -1;
}
in_progress.set_flags (msg_flags);
next_step (in_progress.data (), in_progress.size (),
&v2_decoder_t::message_ready);
return true;
error:
decoding_error ();
return false;
return 0;
}
bool zmq::v2_decoder_t::eight_byte_size_ready ()
int zmq::v2_decoder_t::eight_byte_size_ready ()
{
int rc = 0;
// The payload size is encoded as 64-bit unsigned integer.
// The most significant byte comes first.
const uint64_t msg_size = get_uint64 (tmpbuf);
// Message size must not exceed the maximum allowed size.
if (maxmsgsize >= 0)
if (unlikely (msg_size > static_cast <uint64_t> (maxmsgsize)))
goto error;
if (unlikely (msg_size > static_cast <uint64_t> (maxmsgsize))) {
errno = EMSGSIZE;
return -1;
}
// Message size must fit into size_t data type.
if (unlikely (msg_size != static_cast <size_t> (msg_size)))
goto error;
if (unlikely (msg_size != static_cast <size_t> (msg_size))) {
errno = EMSGSIZE;
return -1;
}
// in_progress is initialised at this point so in theory we should
// close it before calling init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised.
rc = in_progress.init_size (static_cast <size_t> (msg_size));
int rc = in_progress.init_size (static_cast <size_t> (msg_size));
if (unlikely (rc)) {
errno_assert (errno == ENOMEM);
int rc = in_progress.init ();
rc = in_progress.init ();
errno_assert (rc == 0);
goto error;
errno = ENOMEM;
return -1;
}
in_progress.set_flags (msg_flags);
next_step (in_progress.data (), in_progress.size (),
&v2_decoder_t::message_ready);
return true;
error:
decoding_error ();
return false;
return 0;
}
bool zmq::v2_decoder_t::message_ready ()
int zmq::v2_decoder_t::message_ready ()
{
// Message is completely read. Push it further and start reading
// new message. (in_progress is a 0-byte message after this point.)
if (unlikely (!msg_sink))
return false;
int rc = msg_sink->push_msg (&in_progress);
if (unlikely (rc != 0)) {
if (errno != EAGAIN)
decoding_error ();
return false;
}
// Message is completely read. Signal this to the caller
// and prepare to decode next message.
next_step (tmpbuf, 1, &v2_decoder_t::flags_ready);
return true;
return 1;
}
......@@ -21,7 +21,6 @@
#define __ZMQ_V2_DECODER_HPP_INCLUDED__
#include "decoder.hpp"
#include "i_msg_sink.hpp"
namespace zmq
{
......@@ -30,21 +29,19 @@ namespace zmq
{
public:
v2_decoder_t (size_t bufsize_,
int64_t maxmsgsize_, i_msg_sink *msg_sink_);
v2_decoder_t (size_t bufsize_, int64_t maxmsgsize_);
virtual ~v2_decoder_t ();
// i_decoder interface.
virtual void set_msg_sink (i_msg_sink *msg_sink_);
virtual msg_t *msg () { return &in_progress; }
private:
bool flags_ready ();
bool one_byte_size_ready ();
bool eight_byte_size_ready ();
bool message_ready ();
int flags_ready ();
int one_byte_size_ready ();
int eight_byte_size_ready ();
int message_ready ();
i_msg_sink *msg_sink;
unsigned char tmpbuf [8];
unsigned char msg_flags;
msg_t in_progress;
......
......@@ -22,64 +22,31 @@
#include "likely.hpp"
#include "wire.hpp"
zmq::v2_encoder_t::v2_encoder_t (size_t bufsize_, i_msg_source *msg_source_) :
encoder_base_t <v2_encoder_t> (bufsize_),
msg_source (msg_source_)
zmq::v2_encoder_t::v2_encoder_t (size_t bufsize_) :
encoder_base_t <v2_encoder_t> (bufsize_)
{
int rc = in_progress.init ();
errno_assert (rc == 0);
// Write 0 bytes to the batch and go to message_ready state.
next_step (NULL, 0, &v2_encoder_t::message_ready, true);
}
zmq::v2_encoder_t::~v2_encoder_t ()
{
int rc = in_progress.close ();
errno_assert (rc == 0);
}
void zmq::v2_encoder_t::set_msg_source (i_msg_source *msg_source_)
void zmq::v2_encoder_t::message_ready ()
{
msg_source = msg_source_;
}
bool zmq::v2_encoder_t::message_ready ()
{
// Release the content of the old message.
int rc = in_progress.close ();
errno_assert (rc == 0);
// Read new message. If there is none, return false.
// Note that new state is set only if write is successful. That way
// unsuccessful write will cause retry on the next state machine
// invocation.
if (unlikely (!msg_source)) {
rc = in_progress.init ();
errno_assert (rc == 0);
return false;
}
rc = msg_source->pull_msg (&in_progress);
if (unlikely (rc)) {
errno_assert (errno == EAGAIN);
rc = in_progress.init ();
errno_assert (rc == 0);
return false;
}
// Encode flags.
unsigned char &protocol_flags = tmpbuf [0];
protocol_flags = 0;
if (in_progress.flags () & msg_t::more)
if (in_progress->flags () & msg_t::more)
protocol_flags |= v2_protocol_t::more_flag;
if (in_progress.size () > 255)
if (in_progress->size () > 255)
protocol_flags |= v2_protocol_t::large_flag;
// Encode the message length. For messages less then 256 bytes,
// the length is encoded as 8-bit unsigned integer. For larger
// messages, 64-bit unsigned integer in network byte order is used.
const size_t size = in_progress.size ();
const size_t size = in_progress->size ();
if (unlikely (size > 255)) {
put_uint64 (tmpbuf + 1, size);
next_step (tmpbuf, 9, &v2_encoder_t::size_ready, false);
......@@ -88,13 +55,11 @@ bool zmq::v2_encoder_t::message_ready ()
tmpbuf [1] = static_cast <uint8_t> (size);
next_step (tmpbuf, 2, &v2_encoder_t::size_ready, false);
}
return true;
}
bool zmq::v2_encoder_t::size_ready ()
void zmq::v2_encoder_t::size_ready ()
{
// Write message body into the buffer.
next_step (in_progress.data (), in_progress.size (),
&v2_encoder_t::message_ready, !(in_progress.flags () & msg_t::more));
return true;
next_step (in_progress->data (), in_progress->size (),
&v2_encoder_t::message_ready, true);
}
......@@ -21,30 +21,23 @@
#define __ZMQ_V2_ENCODER_HPP_INCLUDED__
#include "encoder.hpp"
#include "i_msg_source.hpp"
namespace zmq
{
class i_msg_source;
// Encoder for 0MQ framing protocol. Converts messages into data stream.
class v2_encoder_t : public encoder_base_t <v2_encoder_t>
{
public:
v2_encoder_t (size_t bufsize_, i_msg_source *msg_source_);
v2_encoder_t (size_t bufsize_);
virtual ~v2_encoder_t ();
virtual void set_msg_source (i_msg_source *msg_source_);
private:
bool size_ready ();
bool message_ready ();
void size_ready ();
void message_ready ();
i_msg_source *msg_source;
msg_t in_progress;
unsigned char tmpbuf [9];
v2_encoder_t (const v2_encoder_t&);
......
......@@ -33,7 +33,7 @@ namespace zmq
*buffer_ = value;
}
inline uint8_t get_uint8 (unsigned char *buffer_)
inline uint8_t get_uint8 (const unsigned char *buffer_)
{
return *buffer_;
}
......@@ -44,7 +44,7 @@ namespace zmq
buffer_ [1] = (unsigned char) (value & 0xff);
}
inline uint16_t get_uint16 (unsigned char *buffer_)
inline uint16_t get_uint16 (const unsigned char *buffer_)
{
return
(((uint16_t) buffer_ [0]) << 8) |
......@@ -59,7 +59,7 @@ namespace zmq
buffer_ [3] = (unsigned char) (value & 0xff);
}
inline uint32_t get_uint32 (unsigned char *buffer_)
inline uint32_t get_uint32 (const unsigned char *buffer_)
{
return
(((uint32_t) buffer_ [0]) << 24) |
......@@ -80,7 +80,7 @@ namespace zmq
buffer_ [7] = (unsigned char) (value & 0xff);
}
inline uint64_t get_uint64 (unsigned char *buffer_)
inline uint64_t get_uint64 (const unsigned char *buffer_)
{
return
(((uint64_t) buffer_ [0]) << 56) |
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment