Commit 8463b4d5 authored by Martin Sustrik's avatar Martin Sustrik

SWAP functionality removed

On-disk storage should be implemented in devices rather than
in 0MQ core. 0MQ is a networking library and there's no point
in storing network buffers on disk.
Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent fb27a000
......@@ -79,22 +79,6 @@ Default value:: 0
Applicable socket types:: all
ZMQ_SWAP: Retrieve disk offload size
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_SWAP' option shall retrieve the disk offload (swap) size for the
specified 'socket'. A socket which has 'ZMQ_SWAP' set to a non-zero value may
exceed it's high water mark; in this case outstanding messages shall be
offloaded to storage on disk rather than held in memory.
The value of 'ZMQ_SWAP' defines the maximum size of the swap space in bytes.
[horizontal]
Option value type:: int64_t
Option value unit:: bytes
Default value:: 0
Applicable socket types:: all
ZMQ_AFFINITY: Retrieve I/O thread affinity
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_AFFINITY' option shall retrieve the I/O thread affinity for newly
......
......@@ -47,22 +47,6 @@ Default value:: 0
Applicable socket types:: all
ZMQ_SWAP: Set disk offload size
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_SWAP' option shall set the disk offload (swap) size for the specified
'socket'. A socket which has 'ZMQ_SWAP' set to a non-zero value may exceed it's
high water mark; in this case outstanding messages shall be offloaded to
storage on disk rather than held in memory.
The value of 'ZMQ_SWAP' defines the maximum size of the swap space in bytes.
[horizontal]
Option value type:: int64_t
Option value unit:: bytes
Default value:: 0
Applicable socket types:: all
ZMQ_AFFINITY: Set I/O thread affinity
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_AFFINITY' option shall set the I/O thread affinity for newly created
......
......@@ -184,7 +184,6 @@ ZMQ_EXPORT int zmq_term (void *context);
/* Socket options. */
#define ZMQ_HWM 1
#define ZMQ_SWAP 3
#define ZMQ_AFFINITY 4
#define ZMQ_IDENTITY 5
#define ZMQ_SUBSCRIBE 6
......
......@@ -115,7 +115,6 @@ libzmq_la_SOURCES = \
socket_base.hpp \
stdint.hpp \
sub.hpp \
swap.hpp \
tcp_connecter.hpp \
tcp_listener.hpp \
tcp_socket.hpp \
......@@ -173,7 +172,6 @@ libzmq_la_SOURCES = \
session.cpp \
socket_base.cpp \
sub.cpp \
swap.cpp \
tcp_connecter.cpp \
tcp_listener.cpp \
tcp_socket.cpp \
......
......@@ -59,10 +59,6 @@ namespace zmq
// Maximal delta between high and low watermark.
max_wm_delta = 1024,
// Swap inteligently batches data for writing to disk. The size of
// the batch in bytes is specified by this option.
swap_block_size = 8192,
// Maximum number of events the I/O thread can process in one go.
max_io_events = 256,
......
......@@ -27,7 +27,6 @@
zmq::options_t::options_t () :
hwm (0),
swap (0),
affinity (0),
rate (100),
recovery_ivl (10),
......@@ -59,14 +58,6 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
hwm = *((uint64_t*) optval_);
return 0;
case ZMQ_SWAP:
if (optvallen_ != sizeof (int64_t) || *((int64_t*) optval_) < 0) {
errno = EINVAL;
return -1;
}
swap = *((int64_t*) optval_);
return 0;
case ZMQ_AFFINITY:
if (optvallen_ != sizeof (uint64_t)) {
errno = EINVAL;
......@@ -195,15 +186,6 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*optvallen_ = sizeof (uint64_t);
return 0;
case ZMQ_SWAP:
if (*optvallen_ < sizeof (int64_t)) {
errno = EINVAL;
return -1;
}
*((int64_t*) optval_) = swap;
*optvallen_ = sizeof (int64_t);
return 0;
case ZMQ_AFFINITY:
if (*optvallen_ < sizeof (uint64_t)) {
errno = EINVAL;
......
......@@ -36,7 +36,6 @@ namespace zmq
int getsockopt (int option_, void *optval_, size_t *optvallen_);
uint64_t hwm;
int64_t swap;
uint64_t affinity;
blob_t identity;
......
......@@ -163,7 +163,7 @@ void zmq::reader_t::process_pipe_term_ack ()
}
zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, reader_t *reader_,
uint64_t hwm_, int64_t swap_size_) :
uint64_t hwm_) :
object_t (parent_),
active (true),
pipe (pipe_),
......@@ -171,28 +171,15 @@ zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, reader_t *reader_,
hwm (hwm_),
msgs_read (0),
msgs_written (0),
swap (NULL),
sink (NULL),
swapping (false),
pending_delimiter (false),
terminating (false)
{
// Inform reader about the writer.
reader->set_writer (this);
// Open the swap file, if required.
if (swap_size_ > 0) {
swap = new (std::nothrow) swap_t (swap_size_);
alloc_assert (swap);
int rc = swap->init ();
zmq_assert (rc == 0);
}
}
zmq::writer_t::~writer_t ()
{
if (swap)
delete swap;
}
void zmq::writer_t::set_event_sink (i_writer_events *sink_)
......@@ -208,21 +195,9 @@ bool zmq::writer_t::check_write (zmq_msg_t *msg_)
if (unlikely (!active))
return false;
if (unlikely (swapping)) {
if (unlikely (!swap->fits (msg_))) {
active = false;
return false;
}
}
else {
if (unlikely (pipe_full ())) {
if (swap)
swapping = true;
else {
active = false;
return false;
}
}
if (unlikely (pipe_full ())) {
active = false;
return false;
}
return true;
......@@ -233,14 +208,6 @@ bool zmq::writer_t::write (zmq_msg_t *msg_)
if (unlikely (!check_write (msg_)))
return false;
if (unlikely (swapping)) {
bool stored = swap->store (msg_);
zmq_assert (stored);
if (!(msg_->flags & ZMQ_MSG_MORE))
swap->commit ();
return true;
}
pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE);
if (!(msg_->flags & ZMQ_MSG_MORE))
msgs_written++;
......@@ -250,12 +217,6 @@ bool zmq::writer_t::write (zmq_msg_t *msg_)
void zmq::writer_t::rollback ()
{
// Remove incomplete message from the swap.
if (unlikely (swapping)) {
swap->rollback ();
return;
}
// Remove incomplete message from the pipe.
zmq_msg_t msg;
while (pipe->unwrite (&msg)) {
......@@ -266,8 +227,7 @@ void zmq::writer_t::rollback ()
void zmq::writer_t::flush ()
{
// In the swapping mode, flushing is automatically handled by swap object.
if (!swapping && !pipe->flush ())
if (!pipe->flush ())
send_activate_reader (reader);
}
......@@ -284,11 +244,6 @@ void zmq::writer_t::terminate ()
// Rollback any unfinished messages.
rollback ();
if (swapping) {
pending_delimiter = true;
return;
}
// Push delimiter into the pipe. Trick the compiler to belive that
// the tag is a valid pointer. Note that watermarks are not checked
// thus the delimiter can be written even though the pipe is full.
......@@ -305,40 +260,6 @@ void zmq::writer_t::process_activate_writer (uint64_t msgs_read_)
// Store the reader's message sequence number.
msgs_read = msgs_read_;
// If we are in the swapping mode, we have some messages in the swap.
// Given that pipe is now ready for writing we can move part of the
// swap into the pipe.
if (swapping) {
zmq_msg_t msg;
while (!pipe_full () && !swap->empty ()) {
swap->fetch(&msg);
pipe->write (msg, msg.flags & ZMQ_MSG_MORE);
if (!(msg.flags & ZMQ_MSG_MORE))
msgs_written++;
}
if (!pipe->flush ())
send_activate_reader (reader);
// There are no more messages in the swap. We can switch into
// standard in-memory mode.
if (swap->empty ()) {
swapping = false;
// Push delimiter into the pipe. Trick the compiler to belive that
// the tag is a valid pointer. Note that watermarks are not checked
// thus the delimiter can be written even though the pipe is full.
if (pending_delimiter) {
zmq_msg_t msg;
const unsigned char *offset = 0;
msg.content = (void*) (offset + ZMQ_DELIMITER);
msg.flags = 0;
pipe->write (msg, false);
flush ();
return;
}
}
}
// If the writer was non-active before, let's make it active
// (available for writing messages to).
if (!active && !terminating) {
......@@ -371,7 +292,7 @@ bool zmq::writer_t::pipe_full ()
}
void zmq::create_pipe (object_t *reader_parent_, object_t *writer_parent_,
uint64_t hwm_, int64_t swap_size_, reader_t **reader_, writer_t **writer_)
uint64_t hwm_, reader_t **reader_, writer_t **writer_)
{
// First compute the low water mark. Following point should be taken
// into consideration:
......@@ -404,6 +325,6 @@ void zmq::create_pipe (object_t *reader_parent_, object_t *writer_parent_,
*reader_ = new (std::nothrow) reader_t (reader_parent_, pipe, lwm);
alloc_assert (*reader_);
*writer_ = new (std::nothrow) writer_t (writer_parent_, pipe, *reader_,
hwm_, swap_size_);
hwm_);
alloc_assert (*writer_);
}
......@@ -26,7 +26,6 @@
#include "stdint.hpp"
#include "array.hpp"
#include "ypipe.hpp"
#include "swap.hpp"
#include "config.hpp"
#include "object.hpp"
......@@ -35,7 +34,7 @@ namespace zmq
// Creates a pipe. Returns pointer to reader and writer objects.
void create_pipe (object_t *reader_parent_, object_t *writer_parent_,
uint64_t hwm_, int64_t swap_size_, class reader_t **reader_,
uint64_t hwm_, class reader_t **reader_,
class writer_t **writer_);
// The shutdown mechanism for pipe works as follows: Either endpoint
......@@ -59,7 +58,7 @@ namespace zmq
class reader_t : public object_t, public array_item_t
{
friend void create_pipe (object_t*, object_t*, uint64_t,
int64_t, reader_t**, writer_t**);
reader_t**, writer_t**);
friend class writer_t;
public:
......@@ -128,7 +127,7 @@ namespace zmq
class writer_t : public object_t, public array_item_t
{
friend void create_pipe (object_t*, object_t*, uint64_t,
int64_t, reader_t**, writer_t**);
reader_t**, writer_t**);
public:
......@@ -136,8 +135,8 @@ namespace zmq
void set_event_sink (i_writer_events *endpoint_);
// Checks whether messages can be written to the pipe.
// If writing the message would cause high watermark and (optionally)
// if the swap is full, the function returns false.
// If writing the message would cause high watermark
// the function returns false.
bool check_write (zmq_msg_t *msg_);
// Writes a message to the underlying pipe. Returns false if the
......@@ -156,19 +155,17 @@ namespace zmq
private:
writer_t (class object_t *parent_, pipe_t *pipe_, reader_t *reader_,
uint64_t hwm_, int64_t swap_size_);
uint64_t hwm_);
~writer_t ();
// Command handlers.
void process_activate_writer (uint64_t msgs_read_);
void process_pipe_term ();
// Tests whether underlying pipe is already full. The swap is not
// taken into account.
// Tests whether underlying pipe is already full.
bool pipe_full ();
// True, if this object can be written to. Undelying ypipe may be full
// but as long as there's swap space available, this flag is true.
// True, if this object can be written to.
bool active;
// The underlying pipe.
......@@ -187,20 +184,9 @@ namespace zmq
// Number of messages we have written so far.
uint64_t msgs_written;
// Pointer to the message swap. If NULL, messages are always
// kept in main memory.
swap_t *swap;
// Sink for the events (either the socket or the session).
i_writer_events *sink;
// If true, swap is active. New messages are to be written to the swap.
bool swapping;
// If true, there's a delimiter to be written to the pipe after the
// swap is empied.
bool pending_delimiter;
// True is 'terminate' method was called of 'pipe_term' command
// arrived from the reader.
bool terminating;
......
......@@ -247,13 +247,11 @@ void zmq::session_t::process_attach (i_engine *engine_,
// Create the pipes, as required.
if (options.requires_in) {
create_pipe (socket, this, options.hwm, options.swap,
&socket_reader, &out_pipe);
create_pipe (socket, this, options.hwm, &socket_reader, &out_pipe);
out_pipe->set_event_sink (this);
}
if (options.requires_out) {
create_pipe (this, socket, options.hwm, options.swap, &in_pipe,
&socket_writer);
create_pipe (this, socket, options.hwm, &in_pipe, &socket_writer);
in_pipe->set_event_sink (this);
}
......
......@@ -376,28 +376,22 @@ int zmq::socket_base_t::connect (const char *addr_)
writer_t *outpipe_writer = NULL;
// The total HWM for an inproc connection should be the sum of
// the binder's HWM and the connector's HWM. (Similarly for the
// SWAP.)
// the binder's HWM and the connector's HWM.
int64_t hwm;
if (options.hwm == 0 || peer.options.hwm == 0)
hwm = 0;
else
hwm = options.hwm + peer.options.hwm;
int64_t swap;
if (options.swap == 0 && peer.options.swap == 0)
swap = 0;
else
swap = options.swap + peer.options.swap;
// Create inbound pipe, if required.
if (options.requires_in)
create_pipe (this, peer.socket, hwm, swap,
&inpipe_reader, &inpipe_writer);
create_pipe (this, peer.socket, hwm, &inpipe_reader,
&inpipe_writer);
// Create outbound pipe, if required.
if (options.requires_out)
create_pipe (peer.socket, this, hwm, swap,
&outpipe_reader, &outpipe_writer);
create_pipe (peer.socket, this, hwm, &outpipe_reader,
&outpipe_writer);
// Attach the pipes to this socket object.
attach_pipes (inpipe_reader, outpipe_writer, peer.options.identity);
......@@ -435,12 +429,12 @@ int zmq::socket_base_t::connect (const char *addr_)
// Create inbound pipe, if required.
if (options.requires_in)
create_pipe (this, session, options.hwm, options.swap,
create_pipe (this, session, options.hwm,
&inpipe_reader, &inpipe_writer);
// Create outbound pipe, if required.
if (options.requires_out)
create_pipe (session, this, options.hwm, options.swap,
create_pipe (session, this, options.hwm,
&outpipe_reader, &outpipe_writer);
// Attach the pipes to the socket object.
......
/*
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "platform.hpp"
#ifdef ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#include <io.h>
#else
#include <unistd.h>
#endif
#include "../include/zmq.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <string.h>
#include <sstream>
#include <algorithm>
#include "swap.hpp"
#include "config.hpp"
#include "atomic_counter.hpp"
#include "err.hpp"
zmq::swap_t::swap_t (int64_t filesize_) :
fd (-1),
filesize (filesize_),
file_pos (0),
write_pos (0),
read_pos (0),
block_size (swap_block_size),
write_buf_start_addr (0)
{
zmq_assert (filesize > 0);
zmq_assert (block_size > 0);
buf1 = new (std::nothrow) char [block_size];
alloc_assert (buf1);
buf2 = new (std::nothrow) char [block_size];
alloc_assert (buf2);
read_buf = write_buf = buf1;
}
zmq::swap_t::~swap_t ()
{
delete [] buf1;
delete [] buf2;
if (fd == -1)
return;
#ifdef ZMQ_HAVE_WINDOWS
int rc = _close (fd);
#else
int rc = close (fd);
#endif
errno_assert (rc == 0);
#ifdef ZMQ_HAVE_WINDOWS
rc = _unlink (filename.c_str ());
#else
rc = unlink (filename.c_str ());
#endif
errno_assert (rc == 0);
}
int zmq::swap_t::init ()
{
static zmq::atomic_counter_t seqnum (0);
// Get process ID.
#ifdef ZMQ_HAVE_WINDOWS
int pid = GetCurrentThreadId ();
#else
pid_t pid = getpid ();
#endif
std::ostringstream outs;
outs << "zmq_" << pid << '_' << seqnum.get () << ".swap";
filename = outs.str ();
seqnum.add (1);
// Open the backing file.
#ifdef ZMQ_HAVE_WINDOWS
fd = _open (filename.c_str (), _O_RDWR | _O_CREAT, 0600);
#else
fd = open (filename.c_str (), O_RDWR | O_CREAT, 0600);
#endif
if (fd == -1)
return -1;
#ifdef ZMQ_HAVE_LINUX
// Enable more aggresive read-ahead optimization.
posix_fadvise (fd, 0, filesize, POSIX_FADV_SEQUENTIAL);
#endif
return 0;
}
bool zmq::swap_t::store (zmq_msg_t *msg_)
{
size_t msg_size = zmq_msg_size (msg_);
// Check buffer space availability.
// NOTE: We always keep one byte open.
if (buffer_space () <= (int64_t) (sizeof msg_size + 1 + msg_size))
return false;
// Don't store the ZMQ_MSG_SHARED flag.
uint8_t msg_flags = msg_->flags & ~ZMQ_MSG_SHARED;
// Write message length, flags, and message body.
copy_to_file (&msg_size, sizeof msg_size);
copy_to_file (&msg_flags, sizeof msg_flags);
copy_to_file (zmq_msg_data (msg_), msg_size);
zmq_msg_close (msg_);
return true;
}
void zmq::swap_t::fetch (zmq_msg_t *msg_)
{
// There must be at least one message available.
zmq_assert (read_pos != write_pos);
// Retrieve the message size.
size_t msg_size;
copy_from_file (&msg_size, sizeof msg_size);
// Initialize the message.
zmq_msg_init_size (msg_, msg_size);
// Retrieve the message flags.
copy_from_file (&msg_->flags, sizeof msg_->flags);
// Retrieve the message payload.
copy_from_file (zmq_msg_data (msg_), msg_size);
}
void zmq::swap_t::commit ()
{
commit_pos = write_pos;
}
void zmq::swap_t::rollback ()
{
if (commit_pos == write_pos || read_pos == write_pos)
return;
if (write_pos > read_pos)
zmq_assert (read_pos <= commit_pos && commit_pos <= write_pos);
else
zmq_assert (read_pos <= commit_pos || commit_pos <= write_pos);
if (commit_pos / block_size == read_pos / block_size) {
write_buf_start_addr = commit_pos % block_size;
write_buf = read_buf;
}
else if (commit_pos / block_size != write_pos / block_size) {
write_buf_start_addr = commit_pos % block_size;
fill_buf (write_buf, write_buf_start_addr);
}
write_pos = commit_pos;
}
bool zmq::swap_t::empty ()
{
return read_pos == write_pos;
}
/*
bool zmq::swap_t::full ()
{
// Check that at least the message size can be written to the swap.
return buffer_space () < (int64_t) (sizeof (size_t) + 1);
}
*/
bool zmq::swap_t::fits (zmq_msg_t *msg_)
{
// Check whether whole binary representation of the message
// fits into the swap.
size_t msg_size = zmq_msg_size (msg_);
if (buffer_space () <= (int64_t) (sizeof msg_size + 1 + msg_size))
return false;
return true;
}
void zmq::swap_t::copy_from_file (void *buffer_, size_t count_)
{
char *dest_ptr = (char *) buffer_;
size_t chunk_size, remainder = count_;
while (remainder > 0) {
chunk_size = std::min (remainder,
std::min ((size_t) (filesize - read_pos),
(size_t) (block_size - read_pos % block_size)));
memcpy (dest_ptr, &read_buf [read_pos % block_size], chunk_size);
dest_ptr += chunk_size;
read_pos = (read_pos + chunk_size) % filesize;
if (read_pos % block_size == 0) {
if (read_pos / block_size == write_pos / block_size)
read_buf = write_buf;
else
fill_buf (read_buf, read_pos);
}
remainder -= chunk_size;
}
}
void zmq::swap_t::copy_to_file (const void *buffer_, size_t count_)
{
char *source_ptr = (char *) buffer_;
size_t chunk_size, remainder = count_;
while (remainder > 0) {
chunk_size = std::min (remainder,
std::min ((size_t) (filesize - write_pos),
(size_t) (block_size - write_pos % block_size)));
memcpy (&write_buf [write_pos % block_size], source_ptr, chunk_size);
source_ptr += chunk_size;
write_pos = (write_pos + chunk_size) % filesize;
if (write_pos % block_size == 0) {
save_write_buf ();
write_buf_start_addr = write_pos;
if (write_buf == read_buf) {
if (read_buf == buf2)
write_buf = buf1;
else
write_buf = buf2;
}
}
remainder -= chunk_size;
}
}
void zmq::swap_t::fill_buf (char *buf, int64_t pos)
{
if (file_pos != pos) {
#ifdef ZMQ_HAVE_WINDOWS
__int64 offset = _lseeki64 (fd, pos, SEEK_SET);
#else
off_t offset = lseek (fd, (off_t) pos, SEEK_SET);
#endif
errno_assert (offset == pos);
file_pos = pos;
}
size_t octets_stored = 0;
size_t octets_total = std::min (block_size, (size_t) (filesize - file_pos));
while (octets_stored < octets_total) {
#ifdef ZMQ_HAVE_WINDOWS
int rc = _read (fd, &buf [octets_stored], octets_total - octets_stored);
#else
ssize_t rc = read (fd, &buf [octets_stored],
octets_total - octets_stored);
#endif
errno_assert (rc > 0);
octets_stored += rc;
}
file_pos += octets_total;
}
void zmq::swap_t::save_write_buf ()
{
if (file_pos != write_buf_start_addr) {
#ifdef ZMQ_HAVE_WINDOWS
__int64 offset = _lseeki64 (fd, write_buf_start_addr, SEEK_SET);
#else
off_t offset = lseek (fd, (off_t) write_buf_start_addr, SEEK_SET);
#endif
errno_assert (offset == write_buf_start_addr);
file_pos = write_buf_start_addr;
}
size_t octets_stored = 0;
size_t octets_total = std::min (block_size, (size_t) (filesize - file_pos));
while (octets_stored < octets_total) {
#ifdef ZMQ_HAVE_WINDOWS
int rc = _write (fd, &write_buf [octets_stored],
octets_total - octets_stored);
#else
ssize_t rc = write (fd, &write_buf [octets_stored],
octets_total - octets_stored);
#endif
errno_assert (rc > 0);
octets_stored += rc;
}
file_pos += octets_total;
}
int64_t zmq::swap_t::buffer_space ()
{
if (write_pos < read_pos)
return read_pos - write_pos;
return filesize - (write_pos - read_pos);
}
/*
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_SWAP_HPP_INCLUDED__
#define __ZMQ_SWAP_HPP_INCLUDED__
#include "../include/zmq.h"
#include <string>
#include "stdint.hpp"
namespace zmq
{
// This class implements a message swap. Messages are retrieved from
// the swap in the same order as they entered it.
class swap_t
{
public:
enum { default_block_size = 8192 };
// Creates the swap.
swap_t (int64_t filesize_);
~swap_t ();
int init ();
// Stores the message into the swap. The function
// returns false if the swap is full; true otherwise.
bool store (zmq_msg_t *msg_);
// Fetches the oldest message from the swap. It is an error
// to call this function when the swap is empty.
void fetch (zmq_msg_t *msg_);
void commit ();
void rollback ();
// Returns true if the swap is empty; false otherwise.
bool empty ();
// // Returns true if and only if the swap is full.
// bool full ();
// Returns true if the message fits into swap.
bool fits (zmq_msg_t *msg_);
private:
// Copies data from a memory buffer to the backing file.
// Wraps around when reaching maximum file size.
void copy_from_file (void *buffer_, size_t count_);
// Copies data from the backing file to the memory buffer.
// Wraps around when reaching end-of-file.
void copy_to_file (const void *buffer_, size_t count_);
// Returns the buffer space available.
int64_t buffer_space ();
void fill_buf (char *buf, int64_t pos);
void save_write_buf ();
// File descriptor to the backing file.
int fd;
// Name of the backing file.
std::string filename;
// Maximum size of the backing file.
int64_t filesize;
// File offset associated with the fd file descriptor.
int64_t file_pos;
// File offset the next message will be stored at.
int64_t write_pos;
// File offset the next message will be read from.
int64_t read_pos;
int64_t commit_pos;
size_t block_size;
char *buf1;
char *buf2;
char *read_buf;
char *write_buf;
int64_t write_buf_start_addr;
// Disable copying of the swap object.
swap_t (const swap_t&);
const swap_t &operator = (const swap_t&);
};
}
#endif
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment