Commit 9be877c6 authored by Martin Sustrik's avatar Martin Sustrik

ZMQII-26: Use zero-copy for large messages

parent bfef2fcd
...@@ -24,6 +24,8 @@ ...@@ -24,6 +24,8 @@
#include <string.h> #include <string.h>
#include <algorithm> #include <algorithm>
#include <stdio.h>
namespace zmq namespace zmq
{ {
...@@ -44,17 +46,31 @@ namespace zmq ...@@ -44,17 +46,31 @@ namespace zmq
// NULL, it is filled by offset of the first message in the batch. // NULL, it is filled by offset of the first message in the batch.
// If there's no beginning of a message in the batch, offset is // If there's no beginning of a message in the batch, offset is
// set to -1. // set to -1.
inline size_t read (unsigned char *data_, size_t size_, inline void read (unsigned char **data_, size_t *size_,
int *offset_ = NULL) int *offset_ = NULL)
{ {
int offset = -1; int offset = -1;
size_t pos = 0; size_t pos = 0;
while (pos < size_) { while (pos < *size_) {
// If we are able to fill whole buffer in a single go, let's
// use zero-copy. There's no disadvantage to it as we cannot
// stuck multiple messages into the buffer anyway.
if (pos == 0 && to_write >= *size_) {
*data_ = write_pos;
write_pos += *size_;
to_write -= *size_;
// TODO: manage beginning & offset here.
return;
}
if (to_write) { if (to_write) {
size_t to_copy = std::min (to_write, size_ - pos); size_t to_copy = std::min (to_write, *size_ - pos);
memcpy (data_ + pos, write_pos, to_copy); memcpy (*data_ + pos, write_pos, to_copy);
pos += to_copy; pos += to_copy;
write_pos += to_copy; write_pos += to_copy;
to_write -= to_copy; to_write -= to_copy;
...@@ -70,10 +86,12 @@ namespace zmq ...@@ -70,10 +86,12 @@ namespace zmq
} }
} }
// Return offset of the first message in the buffer.
if (offset_) if (offset_)
*offset_ = offset; *offset_ = offset;
return pos; // Return the size of the filled-in portion of the buffer.
*size_ = pos;
} }
protected: protected:
......
...@@ -26,17 +26,19 @@ ...@@ -26,17 +26,19 @@
zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_) : zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_) :
io_object_t (parent_), io_object_t (parent_),
inbuf (NULL),
insize (0), insize (0),
inpos (0), inpos (0),
outbuf (NULL),
outsize (0), outsize (0),
outpos (0), outpos (0),
inout (NULL) inout (NULL)
{ {
// Allocate read & write buffer. // Allocate read & write buffer.
inbuf = (unsigned char*) malloc (in_batch_size); inbuf_storage = (unsigned char*) malloc (in_batch_size);
zmq_assert (inbuf); zmq_assert (inbuf_storage);
outbuf = (unsigned char*) malloc (out_batch_size); outbuf_storage = (unsigned char*) malloc (out_batch_size);
zmq_assert (outbuf); zmq_assert (outbuf_storage);
// Initialise the underlying socket. // Initialise the underlying socket.
int rc = tcp_socket.open (fd_); int rc = tcp_socket.open (fd_);
...@@ -45,8 +47,8 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_) : ...@@ -45,8 +47,8 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_) :
zmq::zmq_engine_t::~zmq_engine_t () zmq::zmq_engine_t::~zmq_engine_t ()
{ {
free (outbuf); free (outbuf_storage);
free (inbuf); free (inbuf_storage);
} }
void zmq::zmq_engine_t::plug (i_inout *inout_) void zmq::zmq_engine_t::plug (i_inout *inout_)
...@@ -80,11 +82,12 @@ void zmq::zmq_engine_t::in_event () ...@@ -80,11 +82,12 @@ void zmq::zmq_engine_t::in_event ()
if (inpos == insize) { if (inpos == insize) {
// Read as much data as possible to the read buffer. // Read as much data as possible to the read buffer.
inbuf = inbuf_storage;
insize = tcp_socket.read (inbuf, in_batch_size); insize = tcp_socket.read (inbuf, in_batch_size);
inpos = 0; inpos = 0;
// Check whether the peer has closed the connection. // Check whether the peer has closed the connection.
if (insize == -1) { if (insize == (size_t) -1) {
insize = 0; insize = 0;
error (); error ();
return; return;
...@@ -111,7 +114,9 @@ void zmq::zmq_engine_t::out_event () ...@@ -111,7 +114,9 @@ void zmq::zmq_engine_t::out_event ()
// If write buffer is empty, try to read new data from the encoder. // If write buffer is empty, try to read new data from the encoder.
if (outpos == outsize) { if (outpos == outsize) {
outsize = encoder.read (outbuf, out_batch_size); outbuf = outbuf_storage;
outsize = out_batch_size;
encoder.read (&outbuf, &outsize);
outpos = 0; outpos = 0;
// If there is no data to send, stop polling for output. // If there is no data to send, stop polling for output.
......
...@@ -20,6 +20,8 @@ ...@@ -20,6 +20,8 @@
#ifndef __ZMQ_ZMQ_ENGINE_HPP_INCLUDED__ #ifndef __ZMQ_ZMQ_ENGINE_HPP_INCLUDED__
#define __ZMQ_ZMQ_ENGINE_HPP_INCLUDED__ #define __ZMQ_ZMQ_ENGINE_HPP_INCLUDED__
#include <stddef.h>
#include "i_engine.hpp" #include "i_engine.hpp"
#include "io_object.hpp" #include "io_object.hpp"
#include "tcp_socket.hpp" #include "tcp_socket.hpp"
...@@ -54,13 +56,15 @@ namespace zmq ...@@ -54,13 +56,15 @@ namespace zmq
tcp_socket_t tcp_socket; tcp_socket_t tcp_socket;
handle_t handle; handle_t handle;
unsigned char *inbuf_storage;
unsigned char *inbuf; unsigned char *inbuf;
int insize; size_t insize;
int inpos; size_t inpos;
unsigned char *outbuf_storage;
unsigned char *outbuf; unsigned char *outbuf;
int outsize; size_t outsize;
int outpos; size_t outpos;
i_inout *inout; i_inout *inout;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment