Commit d8b975f4 authored by Martin Sustrik's avatar Martin Sustrik

msg_store_t renamed to swap_t

parent d90b4071
...@@ -76,7 +76,6 @@ libzmq_la_SOURCES = \ ...@@ -76,7 +76,6 @@ libzmq_la_SOURCES = \
lb.hpp \ lb.hpp \
likely.hpp \ likely.hpp \
msg_content.hpp \ msg_content.hpp \
msg_store.hpp \
mutex.hpp \ mutex.hpp \
named_session.hpp \ named_session.hpp \
object.hpp \ object.hpp \
...@@ -105,6 +104,7 @@ libzmq_la_SOURCES = \ ...@@ -105,6 +104,7 @@ libzmq_la_SOURCES = \
stdint.hpp \ stdint.hpp \
streamer.hpp \ streamer.hpp \
sub.hpp \ sub.hpp \
swap.hpp \
tcp_connecter.hpp \ tcp_connecter.hpp \
tcp_listener.hpp \ tcp_listener.hpp \
tcp_socket.hpp \ tcp_socket.hpp \
...@@ -138,7 +138,6 @@ libzmq_la_SOURCES = \ ...@@ -138,7 +138,6 @@ libzmq_la_SOURCES = \
ip.cpp \ ip.cpp \
kqueue.cpp \ kqueue.cpp \
lb.cpp \ lb.cpp \
msg_store.cpp \
named_session.cpp \ named_session.cpp \
object.cpp \ object.cpp \
options.cpp \ options.cpp \
...@@ -162,6 +161,7 @@ libzmq_la_SOURCES = \ ...@@ -162,6 +161,7 @@ libzmq_la_SOURCES = \
socket_base.cpp \ socket_base.cpp \
streamer.cpp \ streamer.cpp \
sub.cpp \ sub.cpp \
swap.cpp \
tcp_connecter.cpp \ tcp_connecter.cpp \
tcp_listener.cpp \ tcp_listener.cpp \
tcp_socket.cpp \ tcp_socket.cpp \
......
...@@ -174,7 +174,7 @@ zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, reader_t *reader_, ...@@ -174,7 +174,7 @@ zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, reader_t *reader_,
// Open the swap file, if required. // Open the swap file, if required.
if (swap_size_ > 0) { if (swap_size_ > 0) {
swap = new (std::nothrow) msg_store_t (swap_size_); swap = new (std::nothrow) swap_t (swap_size_);
zmq_assert (swap); zmq_assert (swap);
int rc = swap->init (); int rc = swap->init ();
zmq_assert (rc == 0); zmq_assert (rc == 0);
......
...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
#include "stdint.hpp" #include "stdint.hpp"
#include "yarray_item.hpp" #include "yarray_item.hpp"
#include "ypipe.hpp" #include "ypipe.hpp"
#include "msg_store.hpp" #include "swap.hpp"
#include "config.hpp" #include "config.hpp"
#include "object.hpp" #include "object.hpp"
...@@ -183,7 +183,7 @@ namespace zmq ...@@ -183,7 +183,7 @@ namespace zmq
// Pointer to the message swap. If NULL, messages are always // Pointer to the message swap. If NULL, messages are always
// kept in main memory. // kept in main memory.
msg_store_t *swap; swap_t *swap;
// Sink for the events (either the socket or the session). // Sink for the events (either the socket or the session).
i_writer_events *sink; i_writer_events *sink;
......
...@@ -35,11 +35,11 @@ ...@@ -35,11 +35,11 @@
#include <sstream> #include <sstream>
#include <algorithm> #include <algorithm>
#include "swap.hpp"
#include "atomic_counter.hpp" #include "atomic_counter.hpp"
#include "msg_store.hpp"
#include "err.hpp" #include "err.hpp"
zmq::msg_store_t::msg_store_t (int64_t filesize_, size_t block_size_) : zmq::swap_t::swap_t (int64_t filesize_, size_t block_size_) :
fd (-1), fd (-1),
filesize (filesize_), filesize (filesize_),
file_pos (0), file_pos (0),
...@@ -60,7 +60,7 @@ zmq::msg_store_t::msg_store_t (int64_t filesize_, size_t block_size_) : ...@@ -60,7 +60,7 @@ zmq::msg_store_t::msg_store_t (int64_t filesize_, size_t block_size_) :
read_buf = write_buf = buf1; read_buf = write_buf = buf1;
} }
zmq::msg_store_t::~msg_store_t () zmq::swap_t::~swap_t ()
{ {
delete [] buf1; delete [] buf1;
delete [] buf2; delete [] buf2;
...@@ -83,7 +83,7 @@ zmq::msg_store_t::~msg_store_t () ...@@ -83,7 +83,7 @@ zmq::msg_store_t::~msg_store_t ()
errno_assert (rc == 0); errno_assert (rc == 0);
} }
int zmq::msg_store_t::init () int zmq::swap_t::init ()
{ {
static zmq::atomic_counter_t seqnum (0); static zmq::atomic_counter_t seqnum (0);
...@@ -116,7 +116,7 @@ int zmq::msg_store_t::init () ...@@ -116,7 +116,7 @@ int zmq::msg_store_t::init ()
return 0; return 0;
} }
bool zmq::msg_store_t::store (zmq_msg_t *msg_) bool zmq::swap_t::store (zmq_msg_t *msg_)
{ {
size_t msg_size = zmq_msg_size (msg_); size_t msg_size = zmq_msg_size (msg_);
...@@ -138,7 +138,7 @@ bool zmq::msg_store_t::store (zmq_msg_t *msg_) ...@@ -138,7 +138,7 @@ bool zmq::msg_store_t::store (zmq_msg_t *msg_)
return true; return true;
} }
void zmq::msg_store_t::fetch (zmq_msg_t *msg_) void zmq::swap_t::fetch (zmq_msg_t *msg_)
{ {
// There must be at least one message available. // There must be at least one message available.
zmq_assert (read_pos != write_pos); zmq_assert (read_pos != write_pos);
...@@ -157,12 +157,12 @@ void zmq::msg_store_t::fetch (zmq_msg_t *msg_) ...@@ -157,12 +157,12 @@ void zmq::msg_store_t::fetch (zmq_msg_t *msg_)
copy_from_file (zmq_msg_data (msg_), msg_size); copy_from_file (zmq_msg_data (msg_), msg_size);
} }
void zmq::msg_store_t::commit () void zmq::swap_t::commit ()
{ {
commit_pos = write_pos; commit_pos = write_pos;
} }
void zmq::msg_store_t::rollback () void zmq::swap_t::rollback ()
{ {
if (commit_pos == write_pos || read_pos == write_pos) if (commit_pos == write_pos || read_pos == write_pos)
return; return;
...@@ -183,17 +183,17 @@ void zmq::msg_store_t::rollback () ...@@ -183,17 +183,17 @@ void zmq::msg_store_t::rollback ()
write_pos = commit_pos; write_pos = commit_pos;
} }
bool zmq::msg_store_t::empty () bool zmq::swap_t::empty ()
{ {
return read_pos == write_pos; return read_pos == write_pos;
} }
bool zmq::msg_store_t::full () bool zmq::swap_t::full ()
{ {
return buffer_space () == 1; return buffer_space () == 1;
} }
void zmq::msg_store_t::copy_from_file (void *buffer_, size_t count_) void zmq::swap_t::copy_from_file (void *buffer_, size_t count_)
{ {
char *dest_ptr = (char *) buffer_; char *dest_ptr = (char *) buffer_;
size_t chunk_size, remainder = count_; size_t chunk_size, remainder = count_;
...@@ -217,7 +217,7 @@ void zmq::msg_store_t::copy_from_file (void *buffer_, size_t count_) ...@@ -217,7 +217,7 @@ void zmq::msg_store_t::copy_from_file (void *buffer_, size_t count_)
} }
} }
void zmq::msg_store_t::copy_to_file (const void *buffer_, size_t count_) void zmq::swap_t::copy_to_file (const void *buffer_, size_t count_)
{ {
char *source_ptr = (char *) buffer_; char *source_ptr = (char *) buffer_;
size_t chunk_size, remainder = count_; size_t chunk_size, remainder = count_;
...@@ -246,7 +246,7 @@ void zmq::msg_store_t::copy_to_file (const void *buffer_, size_t count_) ...@@ -246,7 +246,7 @@ void zmq::msg_store_t::copy_to_file (const void *buffer_, size_t count_)
} }
} }
void zmq::msg_store_t::fill_buf (char *buf, int64_t pos) void zmq::swap_t::fill_buf (char *buf, int64_t pos)
{ {
if (file_pos != pos) { if (file_pos != pos) {
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
...@@ -272,7 +272,7 @@ void zmq::msg_store_t::fill_buf (char *buf, int64_t pos) ...@@ -272,7 +272,7 @@ void zmq::msg_store_t::fill_buf (char *buf, int64_t pos)
file_pos += octets_total; file_pos += octets_total;
} }
void zmq::msg_store_t::save_write_buf () void zmq::swap_t::save_write_buf ()
{ {
if (file_pos != write_buf_start_addr) { if (file_pos != write_buf_start_addr) {
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
...@@ -298,7 +298,7 @@ void zmq::msg_store_t::save_write_buf () ...@@ -298,7 +298,7 @@ void zmq::msg_store_t::save_write_buf ()
file_pos += octets_total; file_pos += octets_total;
} }
int64_t zmq::msg_store_t::buffer_space () int64_t zmq::swap_t::buffer_space ()
{ {
if (write_pos < read_pos) if (write_pos < read_pos)
return read_pos - write_pos; return read_pos - write_pos;
......
...@@ -17,8 +17,8 @@ ...@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef __ZMQ_MSG_STORE_HPP_INCLUDED__ #ifndef __ZMQ_SWAP_HPP_INCLUDED__
#define __ZMQ_MSG_STORE_HPP_INCLUDED__ #define __ZMQ_SWAP_HPP_INCLUDED__
#include "../include/zmq.h" #include "../include/zmq.h"
...@@ -28,38 +28,38 @@ ...@@ -28,38 +28,38 @@
namespace zmq namespace zmq
{ {
// This class implements a message store. Messages are retrieved from // This class implements a message swap. Messages are retrieved from
// the store in the same order as they entered it. // the swap in the same order as they entered it.
class msg_store_t class swap_t
{ {
public: public:
enum { default_block_size = 8192 }; enum { default_block_size = 8192 };
// Creates message store. // Creates the swap.
msg_store_t (int64_t filesize_, size_t block_size_ = default_block_size); swap_t (int64_t filesize_, size_t block_size_ = default_block_size);
~msg_store_t (); ~swap_t ();
int init (); int init ();
// Stores the message into the message store. The function // Stores the message into the swap. The function
// returns false if the message store is full; true otherwise. // returns false if the swap is full; true otherwise.
bool store (zmq_msg_t *msg_); bool store (zmq_msg_t *msg_);
// Fetches the oldest message from the message store. It is an error // Fetches the oldest message from the swap. It is an error
// to call this function when the message store is empty. // to call this function when the swap is empty.
void fetch (zmq_msg_t *msg_); void fetch (zmq_msg_t *msg_);
void commit (); void commit ();
void rollback (); void rollback ();
// Returns true if the message store is empty; false otherwise. // Returns true if the swap is empty; false otherwise.
bool empty (); bool empty ();
// Returns true if and only if the store is full. // Returns true if and only if the swap is full.
bool full (); bool full ();
private: private:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment