Commit d5670f34 authored by Martin Sustrik's avatar Martin Sustrik

ZMQII-26: Use zero-copy for large messages (rx side)

parent 77017872
......@@ -22,8 +22,11 @@
#include <stddef.h>
#include <string.h>
#include <stdlib.h>
#include <algorithm>
#include "err.hpp"
namespace zmq
{
......@@ -42,31 +45,80 @@ namespace zmq
{
public:
inline decoder_t () :
read_ptr (NULL),
inline decoder_t (size_t bufsize_) :
read_pos (NULL),
to_read (0),
next (NULL)
next (NULL),
bufsize (bufsize_)
{
buf = (unsigned char*) malloc (bufsize_);
zmq_assert (buf);
}
inline ~decoder_t ()
{
free (buf);
}
// Returns a buffer to be filled with binary data.
inline void get_buffer (unsigned char **data_, size_t *size_)
{
// If we are expected to read large message, we'll opt for zero-
// copy, i.e. we'll ask caller to fill the data directly to the
// message. Note that subsequent read(s) are non-blocking, thus
// each single read reads at most SO_RCVBUF bytes at once not
// depending on how large is the chunk returned from here.
// As a consequence, large messages being received won't block
// other engines running in the same I/O thread for excessive
// amounts of time.
if (to_read >= bufsize) {
*data_ = read_pos;
*size_ = to_read;
return;
}
*data_ = buf;
*size_ = bufsize;
}
// Push the binary data to the decoder. Returns number of bytes
// actually parsed.
inline size_t write (unsigned char *data_, size_t size_)
// Processes the data in the buffer previously allocated using
// get_buffer function. size_ argument specifies nemuber of bytes
// actually filled into the buffer. Function returns number of
// bytes actually processed.
inline size_t process_buffer (unsigned char *data_, size_t size_)
{
// In case of zero-copy simply adjust the pointers, no copying
// is required. Also, run the state machine in case all the data
// were processed.
if (data_ == read_pos) {
read_pos += size_;
to_read -= size_;
while (!to_read)
if (!(static_cast <T*> (this)->*next) ())
return size_;
return size_;
}
size_t pos = 0;
while (true) {
size_t to_copy = std::min (to_read, size_ - pos);
if (read_ptr) {
memcpy (read_ptr, data_ + pos, to_copy);
read_ptr += to_copy;
}
pos += to_copy;
to_read -= to_copy;
// Try to get more space in the message to fill in.
// If none is available, return.
while (!to_read)
if (!(static_cast <T*> (this)->*next) ())
return pos;
// If there are no more data in the buffer, return.
if (pos == size_)
return pos;
// Copy the data from buffer to the message.
size_t to_copy = std::min (to_read, size_ - pos);
memcpy (read_pos, data_ + pos, to_copy);
read_pos += to_copy;
pos += to_copy;
to_read -= to_copy;
}
}
......@@ -78,20 +130,23 @@ namespace zmq
// This function should be called from derived class to read data
// from the buffer and schedule next state machine action.
inline void next_step (void *read_ptr_, size_t to_read_,
inline void next_step (void *read_pos_, size_t to_read_,
step_t next_)
{
read_ptr = (unsigned char*) read_ptr_;
read_pos = (unsigned char*) read_pos_;
to_read = to_read_;
next = next_;
}
private:
unsigned char *read_ptr;
unsigned char *read_pos;
size_t to_read;
step_t next;
size_t bufsize;
unsigned char *buf;
decoder_t (const decoder_t&);
void operator = (const decoder_t&);
};
......
......@@ -22,8 +22,11 @@
#include <stddef.h>
#include <string.h>
#include <stdlib.h>
#include <algorithm>
#include "err.hpp"
namespace zmq
{
......@@ -35,68 +38,81 @@ namespace zmq
{
public:
inline encoder_t ()
inline encoder_t (size_t bufsize_) :
bufsize (bufsize_)
{
buf = (unsigned char*) malloc (bufsize_);
zmq_assert (buf);
}
// The function tries to fill the supplied chunk by binary data.
// If offset is not NULL, it is filled by offset of the first message
// in the batch. If there's no beginning of a message in the batch,
// offset is set to -1. Both data_ and size_ are in/out parameters.
// Upon exit, data_ contains actual position of the data read (may
// be different from the position requested) and size_ contains number
// of bytes actually provided.
inline void read (unsigned char **data_, size_t *size_,
inline ~encoder_t ()
{
free (buf);
}
// The function returns a batch of binary data. If offset is not NULL,
// it is filled by offset of the first message in the batch. If there's
// no beginning of a message in the batch, offset is set to -1.
inline void get_buffer (unsigned char **data_, size_t *size_,
int *offset_ = NULL)
{
int offset = -1;
size_t pos = 0;
if (offset_)
*offset_ = -1;
while (true) {
// If there are no more data to return, run the state machine.
// If there are still no data, return what we already have
// in the buffer.
if (!to_write) {
if (!(static_cast <T*> (this)->*next) ()) {
*data_ = buf;
*size_ = pos;
return;
}
while (pos < *size_) {
// If beginning of the message was processed, adjust the
// first-message-offset.
if (beginning) {
if (offset_ && *offset_ == -1)
*offset_ = pos;
beginning = false;
}
}
// If we are able to fill whole buffer in a single go, let's
// use zero-copy. There's no disadvantage to it as we cannot
// stuck multiple messages into the buffer anyway. Note that
// subsequent write(s) are non-blocking, thus each single
// write writes at most SO_SNDBUF bytes at once not depending
// on how large is the chunk returned from here.
// If there are no data in the buffer yet and we are able to
// fill whole buffer in a single go, let's use zero-copy.
// There's no disadvantage to it as we cannot stuck multiple
// messages into the buffer anyway. Note that subsequent
// write(s) are non-blocking, thus each single write writes
// at most SO_SNDBUF bytes at once not depending on how large
// is the chunk returned from here.
// As a consequence, large messages being sent won't block
// other engines running in the same I/O thread for excessive
// amounts of time.
if (pos == 0 && to_write >= *size_) {
if (!pos && to_write >= bufsize) {
*data_ = write_pos;
write_pos += to_write;
pos = to_write;
*size_ = to_write;
write_pos = NULL;
to_write = 0;
break;
return;
}
if (to_write) {
size_t to_copy = std::min (to_write, *size_ - pos);
memcpy (*data_ + pos, write_pos, to_copy);
pos += to_copy;
write_pos += to_copy;
to_write -= to_copy;
}
else {
bool more = (static_cast <T*> (this)->*next) ();
if (beginning && offset == -1) {
offset = pos;
beginning = false;
}
if (!more)
break;
// Copy data to the buffer. If the buffer is full, return.
size_t to_copy = std::min (to_write, bufsize - pos);
memcpy (buf + pos, write_pos, to_copy);
pos += to_copy;
write_pos += to_copy;
to_write -= to_copy;
if (pos == bufsize) {
*data_ = buf;
*size_ = pos;
return;
}
}
// Return offset of the first message in the buffer.
if (offset_)
*offset_ = offset;
// Return the size of the filled-in portion of the buffer.
*size_ = pos;
}
protected:
// Prototype of state machine action.
......@@ -121,6 +137,9 @@ namespace zmq
step_t next;
bool beginning;
size_t bufsize;
unsigned char *buf;
encoder_t (const encoder_t&);
void operator = (const encoder_t&);
};
......
......@@ -22,7 +22,8 @@
#include "wire.hpp"
#include "err.hpp"
zmq::zmq_decoder_t::zmq_decoder_t () :
zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) :
decoder_t <zmq_decoder_t> (bufsize_),
destination (NULL)
{
zmq_msg_init (&in_progress);
......
......@@ -32,7 +32,7 @@ namespace zmq
{
public:
zmq_decoder_t ();
zmq_decoder_t (size_t bufsize_);
~zmq_decoder_t ();
void set_inout (struct i_inout *destination_);
......
......@@ -21,7 +21,8 @@
#include "i_inout.hpp"
#include "wire.hpp"
zmq::zmq_encoder_t::zmq_encoder_t () :
zmq::zmq_encoder_t::zmq_encoder_t (size_t bufsize_) :
encoder_t <zmq_encoder_t> (bufsize_),
source (NULL)
{
zmq_msg_init (&in_progress);
......
......@@ -32,7 +32,7 @@ namespace zmq
{
public:
zmq_encoder_t ();
zmq_encoder_t (size_t bufsize_);
~zmq_encoder_t ();
void set_inout (struct i_inout *source_);
......
......@@ -27,21 +27,15 @@
zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_,
const options_t &options_) :
io_object_t (parent_),
inbuf (NULL),
inpos (NULL),
insize (0),
inpos (0),
outbuf (NULL),
decoder (in_batch_size),
outpos (NULL),
outsize (0),
outpos (0),
encoder (out_batch_size),
inout (NULL),
options (options_)
{
// Allocate read & write buffer.
inbuf_storage = (unsigned char*) malloc (in_batch_size);
zmq_assert (inbuf_storage);
outbuf_storage = (unsigned char*) malloc (out_batch_size);
zmq_assert (outbuf_storage);
// Initialise the underlying socket.
int rc = tcp_socket.open (fd_, options.sndbuf, options.rcvbuf);
zmq_assert (rc == 0);
......@@ -49,8 +43,6 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_,
zmq::zmq_engine_t::~zmq_engine_t ()
{
free (outbuf_storage);
free (inbuf_storage);
}
void zmq::zmq_engine_t::plug (i_inout *inout_)
......@@ -80,13 +72,12 @@ void zmq::zmq_engine_t::unplug ()
void zmq::zmq_engine_t::in_event ()
{
// If there's no data to process in the buffer, read new data.
if (inpos == insize) {
// If there's no data to process in the buffer...
if (!insize) {
// Read as much data as possible to the read buffer.
inbuf = inbuf_storage;
insize = tcp_socket.read (inbuf, in_batch_size);
inpos = 0;
// Retrieve the buffer and read as much data as possible.
decoder.get_buffer (&inpos, &insize);
insize = tcp_socket.read (inpos, insize);
// Check whether the peer has closed the connection.
if (insize == (size_t) -1) {
......@@ -96,15 +87,15 @@ void zmq::zmq_engine_t::in_event ()
}
}
// Following code should be executed even if there's not a single byte in
// the buffer. There still can be a decoded messages stored in the decoder.
// Push the data to the decoder.
int nbytes = decoder.write (inbuf + inpos, insize - inpos);
size_t processed = decoder.process_buffer (inpos, insize);
// Adjust the buffer.
inpos += processed;
insize -= processed;
// Adjust read position. Stop polling for input if we got stuck.
inpos += nbytes;
if (inpos < insize)
// Stop polling for input if we got stuck.
if (processed < insize)
reset_pollin (handle);
// Flush all messages the decoder may have produced.
......@@ -114,31 +105,28 @@ void zmq::zmq_engine_t::in_event ()
void zmq::zmq_engine_t::out_event ()
{
// If write buffer is empty, try to read new data from the encoder.
if (outpos == outsize) {
outbuf = outbuf_storage;
outsize = out_batch_size;
encoder.read (&outbuf, &outsize);
outpos = 0;
if (!outsize) {
encoder.get_buffer (&outpos, &outsize);
// If there is no data to send, stop polling for output.
if (outsize == 0)
if (outsize == 0) {
reset_pollout (handle);
return;
}
}
// If there are any data to write in write buffer, write as much as
// possible to the socket.
if (outpos < outsize) {
int nbytes = tcp_socket.write (outbuf + outpos, outsize - outpos);
// Handle problems with the connection.
if (nbytes == -1) {
error ();
return;
}
int nbytes = tcp_socket.write (outpos, outsize);
outpos += nbytes;
// Handle problems with the connection.
if (nbytes == -1) {
error ();
return;
}
outpos += nbytes;
outsize -= nbytes;
}
void zmq::zmq_engine_t::revive ()
......
......@@ -57,21 +57,16 @@ namespace zmq
tcp_socket_t tcp_socket;
handle_t handle;
unsigned char *inbuf_storage;
unsigned char *inbuf;
unsigned char *inpos;
size_t insize;
size_t inpos;
zmq_decoder_t decoder;
unsigned char *outbuf_storage;
unsigned char *outbuf;
unsigned char *outpos;
size_t outsize;
size_t outpos;
zmq_encoder_t encoder;
i_inout *inout;
zmq_encoder_t encoder;
zmq_decoder_t decoder;
options_t options;
zmq_engine_t (const zmq_engine_t&);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment