Commit e0246e32 authored by Martin Sustrik's avatar Martin Sustrik

Message-related functionality factored out into msg_t class.

This patch addresses serveral issues:
1. It gathers message related functionality scattered over whole
   codebase into a single class.
2. It makes zmq_msg_t an opaque datatype. Internals of the class
   don't pollute zmq.h header file.
3. zmq_msg_t size decreases from 48 to 32 bytes. That saves ~33%
   of memory in scenarios with large amount of small messages.
Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent 58169769
...@@ -121,34 +121,7 @@ ZMQ_EXPORT const char *zmq_strerror (int errnum); ...@@ -121,34 +121,7 @@ ZMQ_EXPORT const char *zmq_strerror (int errnum);
/* 0MQ message definition. */ /* 0MQ message definition. */
/******************************************************************************/ /******************************************************************************/
/* Maximal size of "Very Small Message". VSMs are passed by value */ typedef unsigned char zmq_msg_t [32];
/* to avoid excessive memory allocation/deallocation. */
/* If VMSs larger than 255 bytes are required, type of 'vsm_size' */
/* field in zmq_msg_t structure should be modified accordingly. */
#define ZMQ_MAX_VSM_SIZE 30
/* Message types. These integers may be stored in 'content' member of the */
/* message instead of regular pointer to the data. */
#define ZMQ_DELIMITER 31
#define ZMQ_VSM 32
/* Message flags. ZMQ_MSG_SHARED is strictly speaking not a message flag */
/* (it has no equivalent in the wire format), however, making it a flag */
/* allows us to pack the stucture tigher and thus improve performance. */
#define ZMQ_MSG_MORE 1
#define ZMQ_MSG_SHARED 128
#define ZMQ_MSG_MASK 129 /* Merges all the flags */
/* A message. Note that 'content' is not a pointer to the raw data. */
/* Rather it is pointer to zmq::msg_content_t structure */
/* (see src/msg_content.hpp for its definition). */
typedef struct
{
void *content;
unsigned char flags;
unsigned char vsm_size;
unsigned char vsm_data [ZMQ_MAX_VSM_SIZE];
} zmq_msg_t;
typedef void (zmq_free_fn) (void *data, void *hint); typedef void (zmq_free_fn) (void *data, void *hint);
......
...@@ -36,6 +36,10 @@ namespace zmq ...@@ -36,6 +36,10 @@ namespace zmq
// memory allocation by approximately 99.6% // memory allocation by approximately 99.6%
message_pipe_granularity = 256, message_pipe_granularity = 256,
// Size in bytes of the largest message that is still copied around
// rather than being reference-counted.
max_vsm_size = 29,
// Determines how often does socket poll for new commands when it // Determines how often does socket poll for new commands when it
// still has unprocessed messages to handle. Thus, if it is set to 100, // still has unprocessed messages to handle. Thus, if it is set to 100,
// socket will process 100 inbound messages before doing the poll. // socket will process 100 inbound messages before doing the poll.
......
...@@ -26,8 +26,9 @@ ...@@ -26,8 +26,9 @@
#include "io_thread.hpp" #include "io_thread.hpp"
#include "platform.hpp" #include "platform.hpp"
#include "reaper.hpp" #include "reaper.hpp"
#include "err.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "err.hpp"
#include "msg.hpp"
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
#include "windows.h" #include "windows.h"
...@@ -304,10 +305,10 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_) ...@@ -304,10 +305,10 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
void zmq::ctx_t::log (const char *format_, va_list args_) void zmq::ctx_t::log (const char *format_, va_list args_)
{ {
// Create the log message. // Create the log message.
zmq_msg_t msg; msg_t msg;
int rc = zmq_msg_init_size (&msg, strlen (format_) + 1); int rc = msg.init_size (strlen (format_) + 1);
zmq_assert (rc == 0); errno_assert (rc == 0);
memcpy (zmq_msg_data (&msg), format_, zmq_msg_size (&msg)); memcpy (msg.data (), format_, msg.size ());
// At this point we migrate the log socket to the current thread. // At this point we migrate the log socket to the current thread.
// We rely on mutex for executing the memory barrier. // We rely on mutex for executing the memory barrier.
...@@ -316,7 +317,8 @@ void zmq::ctx_t::log (const char *format_, va_list args_) ...@@ -316,7 +317,8 @@ void zmq::ctx_t::log (const char *format_, va_list args_)
log_socket->send (&msg, 0); log_socket->send (&msg, 0);
log_sync.unlock (); log_sync.unlock ();
zmq_msg_close (&msg); rc = msg.close ();
errno_assert (rc == 0);
} }
...@@ -26,8 +26,6 @@ ...@@ -26,8 +26,6 @@
#include <string> #include <string>
#include <stdarg.h> #include <stdarg.h>
#include "../include/zmq.h"
#include "mailbox.hpp" #include "mailbox.hpp"
#include "semaphore.hpp" #include "semaphore.hpp"
#include "ypipe.hpp" #include "ypipe.hpp"
......
...@@ -31,7 +31,8 @@ zmq::decoder_t::decoder_t (size_t bufsize_, int64_t maxmsgsize_) : ...@@ -31,7 +31,8 @@ zmq::decoder_t::decoder_t (size_t bufsize_, int64_t maxmsgsize_) :
destination (NULL), destination (NULL),
maxmsgsize (maxmsgsize_) maxmsgsize (maxmsgsize_)
{ {
zmq_msg_init (&in_progress); int rc = in_progress.init ();
errno_assert (rc == 0);
// At the beginning, read one byte and go to one_byte_size_ready state. // At the beginning, read one byte and go to one_byte_size_ready state.
next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready); next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready);
...@@ -39,7 +40,8 @@ zmq::decoder_t::decoder_t (size_t bufsize_, int64_t maxmsgsize_) : ...@@ -39,7 +40,8 @@ zmq::decoder_t::decoder_t (size_t bufsize_, int64_t maxmsgsize_) :
zmq::decoder_t::~decoder_t () zmq::decoder_t::~decoder_t ()
{ {
zmq_msg_close (&in_progress); int rc = in_progress.close ();
errno_assert (rc == 0);
} }
void zmq::decoder_t::set_inout (i_inout *destination_) void zmq::decoder_t::set_inout (i_inout *destination_)
...@@ -71,9 +73,9 @@ bool zmq::decoder_t::one_byte_size_ready () ...@@ -71,9 +73,9 @@ bool zmq::decoder_t::one_byte_size_ready ()
errno = ENOMEM; errno = ENOMEM;
} }
else else
rc = zmq_msg_init_size (&in_progress, *tmpbuf - 1); rc = in_progress.init_size (*tmpbuf - 1);
if (rc != 0 && errno == ENOMEM) { if (rc != 0 && errno == ENOMEM) {
rc = zmq_msg_init (&in_progress); rc = in_progress.init ();
errno_assert (rc == 0); errno_assert (rc == 0);
decoding_error (); decoding_error ();
return false; return false;
...@@ -106,9 +108,9 @@ bool zmq::decoder_t::eight_byte_size_ready () ...@@ -106,9 +108,9 @@ bool zmq::decoder_t::eight_byte_size_ready ()
errno = ENOMEM; errno = ENOMEM;
} }
else else
rc = zmq_msg_init_size (&in_progress, size - 1); rc = in_progress.init_size (size - 1);
if (rc != 0 && errno == ENOMEM) { if (rc != 0 && errno == ENOMEM) {
rc = zmq_msg_init (&in_progress); rc = in_progress.init ();
errno_assert (rc == 0); errno_assert (rc == 0);
decoding_error (); decoding_error ();
return false; return false;
...@@ -122,9 +124,9 @@ bool zmq::decoder_t::eight_byte_size_ready () ...@@ -122,9 +124,9 @@ bool zmq::decoder_t::eight_byte_size_ready ()
bool zmq::decoder_t::flags_ready () bool zmq::decoder_t::flags_ready ()
{ {
// Store the flags from the wire into the message structure. // Store the flags from the wire into the message structure.
in_progress.flags = tmpbuf [0]; in_progress.set_flags (tmpbuf [0]);
next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), next_step (in_progress.data (), in_progress.size (),
&decoder_t::message_ready); &decoder_t::message_ready);
return true; return true;
......
...@@ -27,10 +27,9 @@ ...@@ -27,10 +27,9 @@
#include <algorithm> #include <algorithm>
#include "err.hpp" #include "err.hpp"
#include "msg.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "../include/zmq.h"
namespace zmq namespace zmq
{ {
...@@ -196,7 +195,7 @@ namespace zmq ...@@ -196,7 +195,7 @@ namespace zmq
struct i_inout *destination; struct i_inout *destination;
unsigned char tmpbuf [8]; unsigned char tmpbuf [8];
::zmq_msg_t in_progress; msg_t in_progress;
int64_t maxmsgsize; int64_t maxmsgsize;
......
...@@ -18,8 +18,6 @@ ...@@ -18,8 +18,6 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "../include/zmq.h"
#include "dist.hpp" #include "dist.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "err.hpp" #include "err.hpp"
...@@ -89,10 +87,10 @@ void zmq::dist_t::activated (writer_t *pipe_) ...@@ -89,10 +87,10 @@ void zmq::dist_t::activated (writer_t *pipe_)
active++; active++;
} }
int zmq::dist_t::send (zmq_msg_t *msg_, int flags_) int zmq::dist_t::send (msg_t *msg_, int flags_)
{ {
// Is this end of a multipart message? // Is this end of a multipart message?
bool msg_more = msg_->flags & ZMQ_MSG_MORE; bool msg_more = msg_->flags () & msg_t::more;
// Push the message to active pipes. // Push the message to active pipes.
distribute (msg_, flags_); distribute (msg_, flags_);
...@@ -106,63 +104,33 @@ int zmq::dist_t::send (zmq_msg_t *msg_, int flags_) ...@@ -106,63 +104,33 @@ int zmq::dist_t::send (zmq_msg_t *msg_, int flags_)
return 0; return 0;
} }
void zmq::dist_t::distribute (zmq_msg_t *msg_, int flags_) void zmq::dist_t::distribute (msg_t *msg_, int flags_)
{ {
// If there are no active pipes available, simply drop the message. // If there are no active pipes available, simply drop the message.
if (active == 0) { if (active == 0) {
int rc = zmq_msg_close (msg_); int rc = msg_->close ();
zmq_assert (rc == 0); errno_assert (rc == 0);
rc = zmq_msg_init (msg_); rc = msg_->init ();
zmq_assert (rc == 0);
return;
}
msg_content_t *content = (msg_content_t*) msg_->content;
// For VSMs the copying is straighforward.
if (content == (msg_content_t*) ZMQ_VSM) {
for (pipes_t::size_type i = 0; i < active;)
if (write (pipes [i], msg_))
i++;
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0); zmq_assert (rc == 0);
return; return;
} }
// Optimisation for the case when there's only a single pipe // Add active-1 references to the message. We already hold one reference,
// to send the message to - no refcount adjustment i.e. no atomic // that's why -1.
// operations are needed. msg_->add_refs (active - 1);
if (active == 1) {
if (!write (pipes [0], msg_)) {
int rc = zmq_msg_close (msg_);
zmq_assert (rc == 0);
}
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return;
}
// There are at least 2 destinations for the message. That means we have
// to deal with reference counting. First add N-1 references to
// the content (we are holding one reference anyway, that's why -1).
if (msg_->flags & ZMQ_MSG_SHARED)
content->refcnt.add (active - 1);
else {
content->refcnt.set (active);
msg_->flags |= ZMQ_MSG_SHARED;
}
// Push the message to all destinations. // Push copy of the message to each active pipe.
for (pipes_t::size_type i = 0; i < active;) { for (pipes_t::size_type i = 0; i < active;) {
if (!write (pipes [i], msg_)) if (!write (pipes [i], msg_))
content->refcnt.sub (1); msg_->rm_refs (1);
else else
i++; i++;
} }
// Detach the original message from the data buffer. // Detach the original message from the data buffer. Note that we don't
int rc = zmq_msg_init (msg_); // close the message. That's because we've already used all the references.
zmq_assert (rc == 0); int rc = msg_->init ();
errno_assert (rc == 0);
} }
bool zmq::dist_t::has_out () bool zmq::dist_t::has_out ()
...@@ -170,14 +138,14 @@ bool zmq::dist_t::has_out () ...@@ -170,14 +138,14 @@ bool zmq::dist_t::has_out ()
return true; return true;
} }
bool zmq::dist_t::write (class writer_t *pipe_, zmq_msg_t *msg_) bool zmq::dist_t::write (class writer_t *pipe_, msg_t *msg_)
{ {
if (!pipe_->write (msg_)) { if (!pipe_->write (msg_)) {
active--; active--;
pipes.swap (pipes.index (pipe_), active); pipes.swap (pipes.index (pipe_), active);
return false; return false;
} }
if (!(msg_->flags & ZMQ_MSG_MORE)) if (!(msg_->flags () & msg_t::more))
pipe_->flush (); pipe_->flush ();
return true; return true;
} }
......
...@@ -40,7 +40,7 @@ namespace zmq ...@@ -40,7 +40,7 @@ namespace zmq
void attach (writer_t *pipe_); void attach (writer_t *pipe_);
void terminate (); void terminate ();
int send (zmq_msg_t *msg_, int flags_); int send (class msg_t *msg_, int flags_);
bool has_out (); bool has_out ();
// i_writer_events interface implementation. // i_writer_events interface implementation.
...@@ -51,10 +51,10 @@ namespace zmq ...@@ -51,10 +51,10 @@ namespace zmq
// Write the message to the pipe. Make the pipe inactive if writing // Write the message to the pipe. Make the pipe inactive if writing
// fails. In such a case false is returned. // fails. In such a case false is returned.
bool write (class writer_t *pipe_, zmq_msg_t *msg_); bool write (class writer_t *pipe_, class msg_t *msg_);
// Put the message to all active pipes. // Put the message to all active pipes.
void distribute (zmq_msg_t *msg_, int flags_); void distribute (class msg_t *msg_, int flags_);
// Plug in all the delayed pipes. // Plug in all the delayed pipes.
void clear_new_pipes (); void clear_new_pipes ();
......
...@@ -26,7 +26,8 @@ zmq::encoder_t::encoder_t (size_t bufsize_) : ...@@ -26,7 +26,8 @@ zmq::encoder_t::encoder_t (size_t bufsize_) :
encoder_base_t <encoder_t> (bufsize_), encoder_base_t <encoder_t> (bufsize_),
source (NULL) source (NULL)
{ {
zmq_msg_init (&in_progress); int rc = in_progress.init ();
errno_assert (rc == 0);
// Write 0 bytes to the batch and go to message_ready state. // Write 0 bytes to the batch and go to message_ready state.
next_step (NULL, 0, &encoder_t::message_ready, true); next_step (NULL, 0, &encoder_t::message_ready, true);
...@@ -34,7 +35,8 @@ zmq::encoder_t::encoder_t (size_t bufsize_) : ...@@ -34,7 +35,8 @@ zmq::encoder_t::encoder_t (size_t bufsize_) :
zmq::encoder_t::~encoder_t () zmq::encoder_t::~encoder_t ()
{ {
zmq_msg_close (&in_progress); int rc = in_progress.close ();
errno_assert (rc == 0);
} }
void zmq::encoder_t::set_inout (i_inout *source_) void zmq::encoder_t::set_inout (i_inout *source_)
...@@ -45,7 +47,7 @@ void zmq::encoder_t::set_inout (i_inout *source_) ...@@ -45,7 +47,7 @@ void zmq::encoder_t::set_inout (i_inout *source_)
bool zmq::encoder_t::size_ready () bool zmq::encoder_t::size_ready ()
{ {
// Write message body into the buffer. // Write message body into the buffer.
next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), next_step (in_progress.data (), in_progress.size (),
&encoder_t::message_ready, false); &encoder_t::message_ready, false);
return true; return true;
} }
...@@ -53,19 +55,21 @@ bool zmq::encoder_t::size_ready () ...@@ -53,19 +55,21 @@ bool zmq::encoder_t::size_ready ()
bool zmq::encoder_t::message_ready () bool zmq::encoder_t::message_ready ()
{ {
// Destroy content of the old message. // Destroy content of the old message.
zmq_msg_close (&in_progress); int rc = in_progress.close ();
errno_assert (rc == 0);
// Read new message. If there is none, return false. // Read new message. If there is none, return false.
// Note that new state is set only if write is successful. That way // Note that new state is set only if write is successful. That way
// unsuccessful write will cause retry on the next state machine // unsuccessful write will cause retry on the next state machine
// invocation. // invocation.
if (!source || !source->read (&in_progress)) { if (!source || !source->read (&in_progress)) {
zmq_msg_init (&in_progress); rc = in_progress.init ();
errno_assert (rc == 0);
return false; return false;
} }
// Get the message size. // Get the message size.
size_t size = zmq_msg_size (&in_progress); size_t size = in_progress.size ();
// Account for the 'flags' byte. // Account for the 'flags' byte.
size++; size++;
...@@ -75,16 +79,16 @@ bool zmq::encoder_t::message_ready () ...@@ -75,16 +79,16 @@ bool zmq::encoder_t::message_ready ()
// message size. In both cases 'flags' field follows. // message size. In both cases 'flags' field follows.
if (size < 255) { if (size < 255) {
tmpbuf [0] = (unsigned char) size; tmpbuf [0] = (unsigned char) size;
tmpbuf [1] = (in_progress.flags & ~ZMQ_MSG_SHARED); tmpbuf [1] = (in_progress.flags () & ~msg_t::shared);
next_step (tmpbuf, 2, &encoder_t::size_ready, next_step (tmpbuf, 2, &encoder_t::size_ready,
!(in_progress.flags & ZMQ_MSG_MORE)); !(in_progress.flags () & msg_t::more));
} }
else { else {
tmpbuf [0] = 0xff; tmpbuf [0] = 0xff;
put_uint64 (tmpbuf + 1, size); put_uint64 (tmpbuf + 1, size);
tmpbuf [9] = (in_progress.flags & ~ZMQ_MSG_SHARED); tmpbuf [9] = (in_progress.flags () & ~msg_t::shared);
next_step (tmpbuf, 10, &encoder_t::size_ready, next_step (tmpbuf, 10, &encoder_t::size_ready,
!(in_progress.flags & ZMQ_MSG_MORE)); !(in_progress.flags () & msg_t::more));
} }
return true; return true;
} }
...@@ -27,8 +27,7 @@ ...@@ -27,8 +27,7 @@
#include <algorithm> #include <algorithm>
#include "err.hpp" #include "err.hpp"
#include "msg.hpp"
#include "../include/zmq.h"
namespace zmq namespace zmq
{ {
...@@ -172,7 +171,7 @@ namespace zmq ...@@ -172,7 +171,7 @@ namespace zmq
bool message_ready (); bool message_ready ();
struct i_inout *source; struct i_inout *source;
::zmq_msg_t in_progress; msg_t in_progress;
unsigned char tmpbuf [10]; unsigned char tmpbuf [10];
encoder_t (const encoder_t&); encoder_t (const encoder_t&);
......
...@@ -18,8 +18,6 @@ ...@@ -18,8 +18,6 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "../include/zmq.h"
#include "err.hpp" #include "err.hpp"
#include "platform.hpp" #include "platform.hpp"
......
...@@ -21,6 +21,9 @@ ...@@ -21,6 +21,9 @@
#ifndef __ZMQ_ERR_HPP_INCLUDED__ #ifndef __ZMQ_ERR_HPP_INCLUDED__
#define __ZMQ_ERR_HPP_INCLUDED__ #define __ZMQ_ERR_HPP_INCLUDED__
// 0MQ-specific error codes are defined in zmq.h
#include "../include/zmq.h"
#include <assert.h> #include <assert.h>
#include <errno.h> #include <errno.h>
#include <string.h> #include <string.h>
......
...@@ -18,12 +18,11 @@ ...@@ -18,12 +18,11 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "../include/zmq.h"
#include "fq.hpp" #include "fq.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "err.hpp" #include "err.hpp"
#include "own.hpp" #include "own.hpp"
#include "msg.hpp"
zmq::fq_t::fq_t (own_t *sink_) : zmq::fq_t::fq_t (own_t *sink_) :
active (0), active (0),
...@@ -95,10 +94,11 @@ void zmq::fq_t::activated (reader_t *pipe_) ...@@ -95,10 +94,11 @@ void zmq::fq_t::activated (reader_t *pipe_)
active++; active++;
} }
int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_) int zmq::fq_t::recv (msg_t *msg_, int flags_)
{ {
// Deallocate old content of the message. // Deallocate old content of the message.
zmq_msg_close (msg_); int rc = msg_->close ();
errno_assert (rc == 0);
// Round-robin over the pipes to get the next message. // Round-robin over the pipes to get the next message.
for (int count = active; count != 0; count--) { for (int count = active; count != 0; count--) {
...@@ -116,7 +116,7 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_) ...@@ -116,7 +116,7 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_)
// and replaced by another active pipe. Thus we don't have to increase // and replaced by another active pipe. Thus we don't have to increase
// the 'current' pointer. // the 'current' pointer.
if (fetched) { if (fetched) {
more = msg_->flags & ZMQ_MSG_MORE; more = msg_->flags () & msg_t::more;
if (!more) { if (!more) {
current++; current++;
if (current >= active) if (current >= active)
...@@ -134,7 +134,8 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_) ...@@ -134,7 +134,8 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_)
// No message is available. Initialise the output parameter // No message is available. Initialise the output parameter
// to be a 0-byte message. // to be a 0-byte message.
zmq_msg_init (msg_); rc = msg_->init ();
errno_assert (rc == 0);
errno = EAGAIN; errno = EAGAIN;
return -1; return -1;
} }
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include "array.hpp" #include "array.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "msg.hpp"
namespace zmq namespace zmq
{ {
...@@ -40,7 +41,7 @@ namespace zmq ...@@ -40,7 +41,7 @@ namespace zmq
void attach (reader_t *pipe_); void attach (reader_t *pipe_);
void terminate (); void terminate ();
int recv (zmq_msg_t *msg_, int flags_); int recv (msg_t *msg_, int flags_);
bool has_in (); bool has_in ();
// i_reader_events implementation. // i_reader_events implementation.
......
...@@ -21,8 +21,7 @@ ...@@ -21,8 +21,7 @@
#ifndef __ZMQ_I_INOUT_HPP_INCLUDED__ #ifndef __ZMQ_I_INOUT_HPP_INCLUDED__
#define __ZMQ_I_INOUT_HPP_INCLUDED__ #define __ZMQ_I_INOUT_HPP_INCLUDED__
#include "../include/zmq.h" #include "msg.hpp"
#include "stdint.hpp" #include "stdint.hpp"
namespace zmq namespace zmq
...@@ -33,10 +32,10 @@ namespace zmq ...@@ -33,10 +32,10 @@ namespace zmq
virtual ~i_inout () {} virtual ~i_inout () {}
// Engine asks for a message to send to the network. // Engine asks for a message to send to the network.
virtual bool read (::zmq_msg_t *msg_) = 0; virtual bool read (msg_t *msg_) = 0;
// Engine received message from the network and sends it further on. // Engine received message from the network and sends it further on.
virtual bool write (::zmq_msg_t *msg_) = 0; virtual bool write (msg_t *msg_) = 0;
// Flush all the previously written messages. // Flush all the previously written messages.
virtual void flush () = 0; virtual void flush () = 0;
......
...@@ -20,8 +20,6 @@ ...@@ -20,8 +20,6 @@
#include <new> #include <new>
#include "../include/zmq.h"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "platform.hpp" #include "platform.hpp"
#include "err.hpp" #include "err.hpp"
......
...@@ -23,11 +23,9 @@ ...@@ -23,11 +23,9 @@
#include <stdlib.h> #include <stdlib.h>
#include <string> #include <string>
#include "../include/zmq.h"
#include "ip.hpp" #include "ip.hpp"
#include "platform.hpp"
#include "err.hpp" #include "err.hpp"
#include "platform.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#if defined ZMQ_HAVE_SOLARIS #if defined ZMQ_HAVE_SOLARIS
......
...@@ -18,12 +18,11 @@ ...@@ -18,12 +18,11 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "../include/zmq.h"
#include "lb.hpp" #include "lb.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "err.hpp" #include "err.hpp"
#include "own.hpp" #include "own.hpp"
#include "msg.hpp"
zmq::lb_t::lb_t (own_t *sink_) : zmq::lb_t::lb_t (own_t *sink_) :
active (0), active (0),
...@@ -93,26 +92,26 @@ void zmq::lb_t::activated (writer_t *pipe_) ...@@ -93,26 +92,26 @@ void zmq::lb_t::activated (writer_t *pipe_)
active++; active++;
} }
int zmq::lb_t::send (zmq_msg_t *msg_, int flags_) int zmq::lb_t::send (msg_t *msg_, int flags_)
{ {
// Drop the message if required. If we are at the end of the message // Drop the message if required. If we are at the end of the message
// switch back to non-dropping mode. // switch back to non-dropping mode.
if (dropping) { if (dropping) {
more = msg_->flags & ZMQ_MSG_MORE; more = msg_->flags () & msg_t::more;
if (!more) if (!more)
dropping = false; dropping = false;
int rc = zmq_msg_close (msg_); int rc = msg_->close ();
errno_assert (rc == 0); errno_assert (rc == 0);
rc = zmq_msg_init (msg_); rc = msg_->init ();
zmq_assert (rc == 0); zmq_assert (rc == 0);
return 0; return 0;
} }
while (active > 0) { while (active > 0) {
if (pipes [current]->write (msg_)) { if (pipes [current]->write (msg_)) {
more = msg_->flags & ZMQ_MSG_MORE; more = msg_->flags () & msg_t::more;
break; break;
} }
...@@ -138,8 +137,8 @@ int zmq::lb_t::send (zmq_msg_t *msg_, int flags_) ...@@ -138,8 +137,8 @@ int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)
} }
// Detach the message from the data buffer. // Detach the message from the data buffer.
int rc = zmq_msg_init (msg_); int rc = msg_->init ();
zmq_assert (rc == 0); errno_assert (rc == 0);
return 0; return 0;
} }
...@@ -154,13 +153,16 @@ bool zmq::lb_t::has_out () ...@@ -154,13 +153,16 @@ bool zmq::lb_t::has_out ()
while (active > 0) { while (active > 0) {
// Check whether zero-sized message can be written to the pipe. // Check whether zero-sized message can be written to the pipe.
zmq_msg_t msg; msg_t msg;
zmq_msg_init (&msg); int rc = msg.init ();
errno_assert (rc == 0);
if (pipes [current]->check_write (&msg)) { if (pipes [current]->check_write (&msg)) {
zmq_msg_close (&msg); rc = msg.close ();
errno_assert (rc == 0);
return true; return true;
} }
zmq_msg_close (&msg); rc = msg.close ();
errno_assert (rc == 0);
// Deactivate the pipe. // Deactivate the pipe.
active--; active--;
......
...@@ -38,7 +38,7 @@ namespace zmq ...@@ -38,7 +38,7 @@ namespace zmq
void attach (writer_t *pipe_); void attach (writer_t *pipe_);
void terminate (); void terminate ();
int send (zmq_msg_t *msg_, int flags_); int send (msg_t *msg_, int flags_);
bool has_out (); bool has_out ();
// i_writer_events interface implementation. // i_writer_events interface implementation.
......
...@@ -29,155 +29,234 @@ ...@@ -29,155 +29,234 @@
#include "likely.hpp" #include "likely.hpp"
#include "err.hpp" #include "err.hpp"
int zmq_msg_init (zmq_msg_t *msg_) bool zmq::msg_t::check ()
{ {
msg_->content = (zmq::msg_content_t*) ZMQ_VSM; return u.base.type >= type_min && u.base.type <= type_max;
msg_->flags = (unsigned char) ~ZMQ_MSG_MASK; }
msg_->vsm_size = 0;
int zmq::msg_t::init ()
{
u.vsm.type = type_vsm;
u.vsm.flags = 0;
u.vsm.size = 0;
return 0; return 0;
} }
int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_) int zmq::msg_t::init_size (size_t size_)
{ {
if (size_ <= ZMQ_MAX_VSM_SIZE) { if (size_ <= max_vsm_size) {
msg_->content = (zmq::msg_content_t*) ZMQ_VSM; u.vsm.type = type_vsm;
msg_->flags = (unsigned char) ~ZMQ_MSG_MASK; u.vsm.flags = 0;
msg_->vsm_size = (uint8_t) size_; u.vsm.size = (unsigned char) size_;
} }
else { else {
msg_->content = u.lmsg.type = type_lmsg;
(zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t) + size_); u.lmsg.flags = 0;
if (!msg_->content) { u.lmsg.content =
(content_t*) malloc (sizeof (content_t) + size_);
if (!u.lmsg.content) {
errno = ENOMEM; errno = ENOMEM;
return -1; return -1;
} }
msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;
u.lmsg.content->data = u.lmsg.content + 1;
zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; u.lmsg.content->size = size_;
content->data = (void*) (content + 1); u.lmsg.content->ffn = NULL;
content->size = size_; u.lmsg.content->hint = NULL;
content->ffn = NULL; new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
content->hint = NULL;
new (&content->refcnt) zmq::atomic_counter_t ();
} }
return 0; return 0;
} }
int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_, int zmq::msg_t::init_data (void *data_, size_t size_, zmq_free_fn *ffn_,
zmq_free_fn *ffn_, void *hint_) void *hint_)
{ {
msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t)); u.lmsg.type = type_lmsg;
alloc_assert (msg_->content); u.lmsg.flags = 0;
msg_->flags = (unsigned char) ~ZMQ_MSG_MASK; u.lmsg.content = (content_t*) malloc (sizeof (content_t));
zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; alloc_assert (u.lmsg.content);
content->data = data_;
content->size = size_; u.lmsg.content->data = data_;
content->ffn = ffn_; u.lmsg.content->size = size_;
content->hint = hint_; u.lmsg.content->ffn = ffn_;
new (&content->refcnt) zmq::atomic_counter_t (); u.lmsg.content->hint = hint_;
new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
return 0; return 0;
} }
int zmq_msg_close (zmq_msg_t *msg_) int zmq::msg_t::init_delimiter ()
{ {
// Check the validity tag. u.delimiter.type = type_delimiter;
if (unlikely (msg_->flags | ZMQ_MSG_MASK) != 0xff) { u.delimiter.flags = 0;
return 0;
}
int zmq::msg_t::close ()
{
// Check the validity of the message.
if (unlikely (!check ())) {
errno = EFAULT; errno = EFAULT;
return -1; return -1;
} }
// For VSMs and delimiters there are no resources to free. if (u.base.type == type_lmsg) {
if (msg_->content != (zmq::msg_content_t*) ZMQ_DELIMITER &&
msg_->content != (zmq::msg_content_t*) ZMQ_VSM) {
// If the content is not shared, or if it is shared and the reference. // If the content is not shared, or if it is shared and the reference
// count has dropped to zero, deallocate it. // count has dropped to zero, deallocate it.
zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; if (!(u.lmsg.flags & msg_t::shared) ||
if (!(msg_->flags & ZMQ_MSG_SHARED) || !content->refcnt.sub (1)) { !u.lmsg.content->refcnt.sub (1)) {
// We used "placement new" operator to initialize the reference. // We used "placement new" operator to initialize the reference
// counter so we call its destructor now. // counter so we call the destructor explicitly now.
content->refcnt.~atomic_counter_t (); u.lmsg.content->refcnt.~atomic_counter_t ();
if (content->ffn) if (u.lmsg.content->ffn)
content->ffn (content->data, content->hint); u.lmsg.content->ffn (u.lmsg.content->data,
free (content); u.lmsg.content->hint);
free (u.lmsg.content);
} }
} }
// Remove the validity tag from the message. // Make the message invalid.
msg_->flags = 0; u.base.type = 0;
return 0; return 0;
} }
int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_) int zmq::msg_t::move (msg_t &src_)
{ {
// Check the validity tags. // Check the validity of the source.
if (unlikely ((dest_->flags | ZMQ_MSG_MASK) != 0xff || if (unlikely (!src_.check ())) {
(src_->flags | ZMQ_MSG_MASK) != 0xff)) {
errno = EFAULT; errno = EFAULT;
return -1; return -1;
} }
zmq_msg_close (dest_); int rc = close ();
*dest_ = *src_; if (unlikely (rc < 0))
zmq_msg_init (src_); return rc;
*this = src_;
rc = src_.init ();
if (unlikely (rc < 0))
return rc;
return 0; return 0;
} }
int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_) int zmq::msg_t::copy (msg_t &src_)
{ {
// Check the validity tags. // Check the validity of the source.
if (unlikely ((dest_->flags | ZMQ_MSG_MASK) != 0xff || if (unlikely (!src_.check ())) {
(src_->flags | ZMQ_MSG_MASK) != 0xff)) {
errno = EFAULT; errno = EFAULT;
return -1; return -1;
} }
zmq_msg_close (dest_); int rc = close ();
if (unlikely (rc < 0))
return rc;
// VSMs and delimiters require no special handling. if (src_.u.base.type == type_lmsg) {
if (src_->content != (zmq::msg_content_t*) ZMQ_DELIMITER &&
src_->content != (zmq::msg_content_t*) ZMQ_VSM) {
// One reference is added to shared messages. Non-shared messages // One reference is added to shared messages. Non-shared messages
// are turned into shared messages and reference count is set to 2. // are turned into shared messages and reference count is set to 2.
zmq::msg_content_t *content = (zmq::msg_content_t*) src_->content; if (src_.u.lmsg.flags & msg_t::shared)
if (src_->flags & ZMQ_MSG_SHARED) src_.u.lmsg.content->refcnt.add (1);
content->refcnt.add (1);
else { else {
src_->flags |= ZMQ_MSG_SHARED; src_.u.lmsg.flags |= msg_t::shared;
content->refcnt.set (2); src_.u.lmsg.content->refcnt.set (2);
} }
} }
*dest_ = *src_; *this = src_;
return 0; return 0;
} }
void *zmq_msg_data (zmq_msg_t *msg_) void *zmq::msg_t::data ()
{ {
// Check the validity tag. // Check the validity of the message.
zmq_assert ((msg_->flags | ZMQ_MSG_MASK) == 0xff); zmq_assert (check ());
if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM) switch (u.base.type) {
return msg_->vsm_data; case type_vsm:
if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER) return u.vsm.data;
return NULL; case type_lmsg:
return u.lmsg.content->data;
default:
zmq_assert (false);
}
}
size_t zmq::msg_t::size ()
{
// Check the validity of the message.
zmq_assert (check ());
return ((zmq::msg_content_t*) msg_->content)->data; switch (u.base.type) {
case type_vsm:
return u.vsm.size;
case type_lmsg:
return u.lmsg.content->size;
default:
zmq_assert (false);
}
} }
size_t zmq_msg_size (zmq_msg_t *msg_) unsigned char zmq::msg_t::flags ()
{ {
// Check the validity tag. return u.base.flags;
zmq_assert ((msg_->flags | ZMQ_MSG_MASK) == 0xff); }
if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM) void zmq::msg_t::set_flags (unsigned char flags_)
return msg_->vsm_size; {
if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER) u.base.flags |= flags_;
return 0; }
void zmq::msg_t::reset_flags (unsigned char flags_)
{
u.base.flags &= ~flags_;
}
bool zmq::msg_t::is_delimiter ()
{
return u.base.type == type_delimiter;
}
void zmq::msg_t::add_refs (int refs_)
{
zmq_assert (refs_ >= 0);
return ((zmq::msg_content_t*) msg_->content)->size; // No copies required.
if (!refs_)
return;
// VSMs and delimiters can be copied straight away. The only message type
// that needs special care are long messages.
if (u.base.type == type_lmsg) {
if (u.lmsg.flags & msg_t::shared)
u.lmsg.content->refcnt.add (refs_);
else {
u.lmsg.content->refcnt.set (refs_ + 1);
u.lmsg.flags |= msg_t::shared;
}
}
}
void zmq::msg_t::rm_refs (int refs_)
{
zmq_assert (refs_ >= 0);
// No copies required.
if (!refs_)
return;
// The only message type that needs special care are long messages.
if (u.base.type == type_lmsg) {
zmq_assert (u.lmsg.flags & msg_t::shared);
u.lmsg.content->refcnt.sub (refs_);
}
} }
...@@ -23,13 +23,54 @@ ...@@ -23,13 +23,54 @@
#include <stddef.h> #include <stddef.h>
#include "../include/zmq.h" #include "config.hpp"
#include "atomic_counter.hpp" #include "atomic_counter.hpp"
namespace zmq namespace zmq
{ {
// Note that this structure needs to be explicitly constructed
// (init functions) and destructed (close function).
class msg_t
{
public:
// Mesage flags.
enum
{
more = 1,
shared = 128
};
// Signature for free function to deallocate the message content.
typedef void (free_fn_t) (void *data, void *hint);
bool check ();
int init ();
int init_size (size_t size_);
int init_data (void *data_, size_t size_, free_fn_t *ffn_,
void *hint_);
int init_delimiter ();
int close ();
int move (msg_t &src_);
int copy (msg_t &src_);
void *data ();
size_t size ();
unsigned char flags ();
void set_flags (unsigned char flags_);
void reset_flags (unsigned char flags_);
bool is_delimiter ();
// After calling this function you can copy the message in POD-style
// refs_ times. No need to call copy.
void add_refs (int refs_);
// Removes references previously added by add_refs.
void rm_refs (int refs_);
private:
// Shared message buffer. Message data are either allocated in one // Shared message buffer. Message data are either allocated in one
// continuous block along with this structure - thus avoiding one // continuous block along with this structure - thus avoiding one
// malloc/free pair or they are stored in used-supplied memory. // malloc/free pair or they are stored in used-supplied memory.
...@@ -37,16 +78,52 @@ namespace zmq ...@@ -37,16 +78,52 @@ namespace zmq
// used to deallocate the data. If the buffer is actually shared (there // used to deallocate the data. If the buffer is actually shared (there
// are at least 2 references to it) refcount member contains number of // are at least 2 references to it) refcount member contains number of
// references. // references.
struct content_t
struct msg_content_t
{ {
void *data; void *data;
size_t size; size_t size;
zmq_free_fn *ffn; free_fn_t *ffn;
void *hint; void *hint;
zmq::atomic_counter_t refcnt; zmq::atomic_counter_t refcnt;
}; };
// Different message types.
enum type_t
{
type_min = 101,
type_vsm = 101,
type_lmsg = 102,
type_delimiter = 103,
type_max = 103
};
// Note that fields shared between different message types are not
// moved to tha parent class (msg_t). This way we ger tighter packing
// of the data. Shared fields can be accessed via 'base' member of
// the union.
union {
struct {
unsigned char type;
unsigned char flags;
} base;
struct {
unsigned char type;
unsigned char flags;
unsigned char size;
unsigned char data [max_vsm_size];
} vsm;
struct {
unsigned char type;
unsigned char flags;
content_t *content;
} lmsg;
struct {
unsigned char type;
unsigned char flags;
} delimiter;
} u;
};
} }
#endif #endif
...@@ -21,8 +21,6 @@ ...@@ -21,8 +21,6 @@
#ifndef __ZMQ_OBJECT_HPP_INCLUDED__ #ifndef __ZMQ_OBJECT_HPP_INCLUDED__
#define __ZMQ_OBJECT_HPP_INCLUDED__ #define __ZMQ_OBJECT_HPP_INCLUDED__
#include "../include/zmq.h"
#include "stdint.hpp" #include "stdint.hpp"
#include "blob.hpp" #include "blob.hpp"
......
...@@ -20,8 +20,6 @@ ...@@ -20,8 +20,6 @@
#include <string.h> #include <string.h>
#include "../include/zmq.h"
#include "options.hpp" #include "options.hpp"
#include "err.hpp" #include "err.hpp"
......
...@@ -18,11 +18,10 @@ ...@@ -18,11 +18,10 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "../include/zmq.h"
#include "pair.hpp" #include "pair.hpp"
#include "err.hpp" #include "err.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "msg.hpp"
zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_) : zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_), socket_base_t (parent_, tid_),
...@@ -116,7 +115,7 @@ void zmq::pair_t::activated (class writer_t *pipe_) ...@@ -116,7 +115,7 @@ void zmq::pair_t::activated (class writer_t *pipe_)
outpipe_alive = true; outpipe_alive = true;
} }
int zmq::pair_t::xsend (zmq_msg_t *msg_, int flags_) int zmq::pair_t::xsend (msg_t *msg_, int flags_)
{ {
if (outpipe == NULL || !outpipe_alive) { if (outpipe == NULL || !outpipe_alive) {
errno = EAGAIN; errno = EAGAIN;
...@@ -133,16 +132,17 @@ int zmq::pair_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -133,16 +132,17 @@ int zmq::pair_t::xsend (zmq_msg_t *msg_, int flags_)
outpipe->flush (); outpipe->flush ();
// Detach the original message from the data buffer. // Detach the original message from the data buffer.
int rc = zmq_msg_init (msg_); int rc = msg_->init ();
zmq_assert (rc == 0); errno_assert (rc == 0);
return 0; return 0;
} }
int zmq::pair_t::xrecv (zmq_msg_t *msg_, int flags_) int zmq::pair_t::xrecv (msg_t *msg_, int flags_)
{ {
// Deallocate old content of the message. // Deallocate old content of the message.
zmq_msg_close (msg_); int rc = msg_->close ();
errno_assert (rc == 0);
if (!inpipe_alive || !inpipe || !inpipe->read (msg_)) { if (!inpipe_alive || !inpipe || !inpipe->read (msg_)) {
...@@ -150,7 +150,8 @@ int zmq::pair_t::xrecv (zmq_msg_t *msg_, int flags_) ...@@ -150,7 +150,8 @@ int zmq::pair_t::xrecv (zmq_msg_t *msg_, int flags_)
inpipe_alive = false; inpipe_alive = false;
// Initialise the output parameter to be a 0-byte message. // Initialise the output parameter to be a 0-byte message.
zmq_msg_init (msg_); rc = msg_->init ();
errno_assert (rc == 0);
errno = EAGAIN; errno = EAGAIN;
return -1; return -1;
} }
...@@ -171,10 +172,12 @@ bool zmq::pair_t::xhas_out () ...@@ -171,10 +172,12 @@ bool zmq::pair_t::xhas_out ()
if (!outpipe || !outpipe_alive) if (!outpipe || !outpipe_alive)
return false; return false;
zmq_msg_t msg; msg_t msg;
zmq_msg_init (&msg); int rc = msg.init ();
errno_assert (rc == 0);
outpipe_alive = outpipe->check_write (&msg); outpipe_alive = outpipe->check_write (&msg);
zmq_msg_close (&msg); rc = msg.close ();
errno_assert (rc == 0);
return outpipe_alive; return outpipe_alive;
} }
...@@ -40,8 +40,8 @@ namespace zmq ...@@ -40,8 +40,8 @@ namespace zmq
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_); const blob_t &peer_identity_);
int xsend (zmq_msg_t *msg_, int flags_); int xsend (class msg_t *msg_, int flags_);
int xrecv (zmq_msg_t *msg_, int flags_); int xrecv (class msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
......
...@@ -20,8 +20,6 @@ ...@@ -20,8 +20,6 @@
#include <new> #include <new>
#include "../include/zmq.h"
#include "pipe.hpp" #include "pipe.hpp"
#include "likely.hpp" #include "likely.hpp"
...@@ -53,11 +51,12 @@ zmq::reader_t::~reader_t () ...@@ -53,11 +51,12 @@ zmq::reader_t::~reader_t ()
zmq_assert (pipe); zmq_assert (pipe);
// First delete all the unread messages in the pipe. We have to do it by // First delete all the unread messages in the pipe. We have to do it by
// hand because zmq_msg_t is a POD, not a class, so there's no associated // hand because msg_t doesn't have automatic destructor.
// destructor. msg_t msg;
zmq_msg_t msg; while (pipe->read (&msg)) {
while (pipe->read (&msg)) int rc = msg.close ();
zmq_msg_close (&msg); errno_assert (rc == 0);
}
delete pipe; delete pipe;
} }
...@@ -68,11 +67,9 @@ void zmq::reader_t::set_event_sink (i_reader_events *sink_) ...@@ -68,11 +67,9 @@ void zmq::reader_t::set_event_sink (i_reader_events *sink_)
sink = sink_; sink = sink_;
} }
bool zmq::reader_t::is_delimiter (zmq_msg_t &msg_) bool zmq::reader_t::is_delimiter (msg_t &msg_)
{ {
unsigned char *offset = 0; return msg_.is_delimiter ();
return msg_.content == (void*) (offset + ZMQ_DELIMITER);
} }
bool zmq::reader_t::check_read () bool zmq::reader_t::check_read ()
...@@ -89,7 +86,7 @@ bool zmq::reader_t::check_read () ...@@ -89,7 +86,7 @@ bool zmq::reader_t::check_read ()
// If the next item in the pipe is message delimiter, // If the next item in the pipe is message delimiter,
// initiate its termination. // initiate its termination.
if (pipe->probe (is_delimiter)) { if (pipe->probe (is_delimiter)) {
zmq_msg_t msg; msg_t msg;
bool ok = pipe->read (&msg); bool ok = pipe->read (&msg);
zmq_assert (ok); zmq_assert (ok);
if (sink) if (sink)
...@@ -101,7 +98,7 @@ bool zmq::reader_t::check_read () ...@@ -101,7 +98,7 @@ bool zmq::reader_t::check_read ()
return true; return true;
} }
bool zmq::reader_t::read (zmq_msg_t *msg_) bool zmq::reader_t::read (msg_t *msg_)
{ {
if (!active) if (!active)
return false; return false;
...@@ -112,15 +109,14 @@ bool zmq::reader_t::read (zmq_msg_t *msg_) ...@@ -112,15 +109,14 @@ bool zmq::reader_t::read (zmq_msg_t *msg_)
} }
// If delimiter was read, start termination process of the pipe. // If delimiter was read, start termination process of the pipe.
unsigned char *offset = 0; if (msg_->is_delimiter ()) {
if (msg_->content == (void*) (offset + ZMQ_DELIMITER)) {
if (sink) if (sink)
sink->delimited (this); sink->delimited (this);
terminate (); terminate ();
return false; return false;
} }
if (!(msg_->flags & ZMQ_MSG_MORE)) if (!(msg_->flags () & msg_t::more))
msgs_read++; msgs_read++;
if (lwm > 0 && msgs_read % lwm == 0) if (lwm > 0 && msgs_read % lwm == 0)
...@@ -187,7 +183,7 @@ void zmq::writer_t::set_event_sink (i_writer_events *sink_) ...@@ -187,7 +183,7 @@ void zmq::writer_t::set_event_sink (i_writer_events *sink_)
sink = sink_; sink = sink_;
} }
bool zmq::writer_t::check_write (zmq_msg_t *msg_) bool zmq::writer_t::check_write (msg_t *msg_)
{ {
// We've already checked and there's no space free for the new message. // We've already checked and there's no space free for the new message.
// There's no point in checking once again. // There's no point in checking once again.
...@@ -202,13 +198,13 @@ bool zmq::writer_t::check_write (zmq_msg_t *msg_) ...@@ -202,13 +198,13 @@ bool zmq::writer_t::check_write (zmq_msg_t *msg_)
return true; return true;
} }
bool zmq::writer_t::write (zmq_msg_t *msg_) bool zmq::writer_t::write (msg_t *msg_)
{ {
if (unlikely (!check_write (msg_))) if (unlikely (!check_write (msg_)))
return false; return false;
pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE); pipe->write (*msg_, msg_->flags () & msg_t::more);
if (!(msg_->flags & ZMQ_MSG_MORE)) if (!(msg_->flags () & msg_t::more))
msgs_written++; msgs_written++;
return true; return true;
...@@ -217,10 +213,11 @@ bool zmq::writer_t::write (zmq_msg_t *msg_) ...@@ -217,10 +213,11 @@ bool zmq::writer_t::write (zmq_msg_t *msg_)
void zmq::writer_t::rollback () void zmq::writer_t::rollback ()
{ {
// Remove incomplete message from the pipe. // Remove incomplete message from the pipe.
zmq_msg_t msg; msg_t msg;
while (pipe->unwrite (&msg)) { while (pipe->unwrite (&msg)) {
zmq_assert (msg.flags & ZMQ_MSG_MORE); zmq_assert (msg.flags () & msg_t::more);
zmq_msg_close (&msg); int rc = msg.close ();
errno_assert (rc == 0);
} }
} }
...@@ -246,10 +243,8 @@ void zmq::writer_t::terminate () ...@@ -246,10 +243,8 @@ void zmq::writer_t::terminate ()
// Push delimiter into the pipe. Trick the compiler to belive that // Push delimiter into the pipe. Trick the compiler to belive that
// the tag is a valid pointer. Note that watermarks are not checked // the tag is a valid pointer. Note that watermarks are not checked
// thus the delimiter can be written even though the pipe is full. // thus the delimiter can be written even though the pipe is full.
zmq_msg_t msg; msg_t msg;
const unsigned char *offset = 0; msg.init_delimiter ();
msg.content = (void*) (offset + ZMQ_DELIMITER);
msg.flags = 0;
pipe->write (msg, false); pipe->write (msg, false);
flush (); flush ();
} }
......
...@@ -21,8 +21,7 @@ ...@@ -21,8 +21,7 @@
#ifndef __ZMQ_PIPE_HPP_INCLUDED__ #ifndef __ZMQ_PIPE_HPP_INCLUDED__
#define __ZMQ_PIPE_HPP_INCLUDED__ #define __ZMQ_PIPE_HPP_INCLUDED__
#include "../include/zmq.h" #include "msg.hpp"
#include "array.hpp" #include "array.hpp"
#include "ypipe.hpp" #include "ypipe.hpp"
#include "config.hpp" #include "config.hpp"
...@@ -43,7 +42,7 @@ namespace zmq ...@@ -43,7 +42,7 @@ namespace zmq
// event. When endpoint processes the event and returns, associated // event. When endpoint processes the event and returns, associated
// reader/writer object is deallocated. // reader/writer object is deallocated.
typedef ypipe_t <zmq_msg_t, message_pipe_granularity> pipe_t; typedef ypipe_t <msg_t, message_pipe_granularity> pipe_t;
struct i_reader_events struct i_reader_events
{ {
...@@ -69,7 +68,7 @@ namespace zmq ...@@ -69,7 +68,7 @@ namespace zmq
bool check_read (); bool check_read ();
// Reads a message to the underlying pipe. // Reads a message to the underlying pipe.
bool read (zmq_msg_t *msg_); bool read (msg_t *msg_);
// Ask pipe to terminate. // Ask pipe to terminate.
void terminate (); void terminate ();
...@@ -87,7 +86,7 @@ namespace zmq ...@@ -87,7 +86,7 @@ namespace zmq
void process_pipe_term_ack (); void process_pipe_term_ack ();
// Returns true if the message is delimiter; false otherwise. // Returns true if the message is delimiter; false otherwise.
static bool is_delimiter (zmq_msg_t &msg_); static bool is_delimiter (msg_t &msg_);
// True, if pipe can be read from. // True, if pipe can be read from.
bool active; bool active;
...@@ -136,11 +135,11 @@ namespace zmq ...@@ -136,11 +135,11 @@ namespace zmq
// Checks whether messages can be written to the pipe. // Checks whether messages can be written to the pipe.
// If writing the message would cause high watermark // If writing the message would cause high watermark
// the function returns false. // the function returns false.
bool check_write (zmq_msg_t *msg_); bool check_write (msg_t *msg_);
// Writes a message to the underlying pipe. Returns false if the // Writes a message to the underlying pipe. Returns false if the
// message cannot be written because high watermark was reached. // message cannot be written because high watermark was reached.
bool write (zmq_msg_t *msg_); bool write (msg_t *msg_);
// Remove unfinished part of a message from the pipe. // Remove unfinished part of a message from the pipe.
void rollback (); void rollback ();
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
*/ */
#include "pub.hpp" #include "pub.hpp"
#include "msg.hpp"
zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t tid_) : zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t tid_) :
xpub_t (parent_, tid_) xpub_t (parent_, tid_)
......
...@@ -18,10 +18,9 @@ ...@@ -18,10 +18,9 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "../include/zmq.h"
#include "pull.hpp" #include "pull.hpp"
#include "err.hpp" #include "err.hpp"
#include "msg.hpp"
zmq::pull_t::pull_t (class ctx_t *parent_, uint32_t tid_) : zmq::pull_t::pull_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_), socket_base_t (parent_, tid_),
...@@ -49,7 +48,7 @@ void zmq::pull_t::process_term (int linger_) ...@@ -49,7 +48,7 @@ void zmq::pull_t::process_term (int linger_)
socket_base_t::process_term (linger_); socket_base_t::process_term (linger_);
} }
int zmq::pull_t::xrecv (zmq_msg_t *msg_, int flags_) int zmq::pull_t::xrecv (msg_t *msg_, int flags_)
{ {
return fq.recv (msg_, flags_); return fq.recv (msg_, flags_);
} }
......
...@@ -39,7 +39,7 @@ namespace zmq ...@@ -39,7 +39,7 @@ namespace zmq
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_); const blob_t &peer_identity_);
int xrecv (zmq_msg_t *msg_, int flags_); int xrecv (class msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
private: private:
......
...@@ -18,11 +18,10 @@ ...@@ -18,11 +18,10 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "../include/zmq.h"
#include "push.hpp" #include "push.hpp"
#include "err.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "err.hpp"
#include "msg.hpp"
zmq::push_t::push_t (class ctx_t *parent_, uint32_t tid_) : zmq::push_t::push_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_), socket_base_t (parent_, tid_),
...@@ -50,7 +49,7 @@ void zmq::push_t::process_term (int linger_) ...@@ -50,7 +49,7 @@ void zmq::push_t::process_term (int linger_)
socket_base_t::process_term (linger_); socket_base_t::process_term (linger_);
} }
int zmq::push_t::xsend (zmq_msg_t *msg_, int flags_) int zmq::push_t::xsend (msg_t *msg_, int flags_)
{ {
return lb.send (msg_, flags_); return lb.send (msg_, flags_);
} }
......
...@@ -39,7 +39,7 @@ namespace zmq ...@@ -39,7 +39,7 @@ namespace zmq
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_); const blob_t &peer_identity_);
int xsend (zmq_msg_t *msg_, int flags_); int xsend (class msg_t *msg_, int flags_);
bool xhas_out (); bool xhas_out ();
private: private:
......
...@@ -18,10 +18,9 @@ ...@@ -18,10 +18,9 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "../include/zmq.h"
#include "rep.hpp" #include "rep.hpp"
#include "err.hpp" #include "err.hpp"
#include "msg.hpp"
zmq::rep_t::rep_t (class ctx_t *parent_, uint32_t tid_) : zmq::rep_t::rep_t (class ctx_t *parent_, uint32_t tid_) :
xrep_t (parent_, tid_), xrep_t (parent_, tid_),
...@@ -35,7 +34,7 @@ zmq::rep_t::~rep_t () ...@@ -35,7 +34,7 @@ zmq::rep_t::~rep_t ()
{ {
} }
int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_) int zmq::rep_t::xsend (msg_t *msg_, int flags_)
{ {
// If we are in the middle of receiving a request, we cannot send reply. // If we are in the middle of receiving a request, we cannot send reply.
if (!sending_reply) { if (!sending_reply) {
...@@ -43,7 +42,7 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -43,7 +42,7 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)
return -1; return -1;
} }
bool more = (msg_->flags & ZMQ_MSG_MORE); bool more = (msg_->flags () & msg_t::more);
// Push message to the reply pipe. // Push message to the reply pipe.
int rc = xrep_t::xsend (msg_, flags_); int rc = xrep_t::xsend (msg_, flags_);
...@@ -57,7 +56,7 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -57,7 +56,7 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)
return 0; return 0;
} }
int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_) int zmq::rep_t::xrecv (msg_t *msg_, int flags_)
{ {
// If we are in middle of sending a reply, we cannot receive next request. // If we are in middle of sending a reply, we cannot receive next request.
if (sending_reply) { if (sending_reply) {
...@@ -78,10 +77,10 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_) ...@@ -78,10 +77,10 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
int rc = xrep_t::xrecv (msg_, flags_); int rc = xrep_t::xrecv (msg_, flags_);
if (rc != 0) if (rc != 0)
return rc; return rc;
zmq_assert (msg_->flags & ZMQ_MSG_MORE); zmq_assert (msg_->flags () & msg_t::more);
// Empty message part delimits the traceback stack. // Empty message part delimits the traceback stack.
bottom = (zmq_msg_size (msg_) == 0); bottom = (msg_->size () == 0);
// Push it to the reply pipe. // Push it to the reply pipe.
rc = xrep_t::xsend (msg_, flags_); rc = xrep_t::xsend (msg_, flags_);
...@@ -98,7 +97,7 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_) ...@@ -98,7 +97,7 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
return rc; return rc;
// If whole request is read, flip the FSM to reply-sending state. // If whole request is read, flip the FSM to reply-sending state.
if (!(msg_->flags & ZMQ_MSG_MORE)) { if (!(msg_->flags () & msg_t::more)) {
sending_reply = true; sending_reply = true;
request_begins = true; request_begins = true;
} }
......
...@@ -34,8 +34,8 @@ namespace zmq ...@@ -34,8 +34,8 @@ namespace zmq
~rep_t (); ~rep_t ();
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
int xsend (zmq_msg_t *msg_, int flags_); int xsend (class msg_t *msg_, int flags_);
int xrecv (zmq_msg_t *msg_, int flags_); int xrecv (class msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
......
...@@ -18,10 +18,9 @@ ...@@ -18,10 +18,9 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "../include/zmq.h"
#include "req.hpp" #include "req.hpp"
#include "err.hpp" #include "err.hpp"
#include "msg.hpp"
zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_) : zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_) :
xreq_t (parent_, tid_), xreq_t (parent_, tid_),
...@@ -35,7 +34,7 @@ zmq::req_t::~req_t () ...@@ -35,7 +34,7 @@ zmq::req_t::~req_t ()
{ {
} }
int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_) int zmq::req_t::xsend (msg_t *msg_, int flags_)
{ {
// If we've sent a request and we still haven't got the reply, // If we've sent a request and we still haven't got the reply,
// we can't send another request. // we can't send another request.
...@@ -46,17 +45,17 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -46,17 +45,17 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
// First part of the request is empty message part (stack bottom). // First part of the request is empty message part (stack bottom).
if (message_begins) { if (message_begins) {
zmq_msg_t prefix; msg_t prefix;
int rc = zmq_msg_init (&prefix); int rc = prefix.init ();
zmq_assert (rc == 0); errno_assert (rc == 0);
prefix.flags |= ZMQ_MSG_MORE; prefix.set_flags (msg_t::more);
rc = xreq_t::xsend (&prefix, flags_); rc = xreq_t::xsend (&prefix, flags_);
if (rc != 0) if (rc != 0)
return rc; return rc;
message_begins = false; message_begins = false;
} }
bool more = msg_->flags & ZMQ_MSG_MORE; bool more = msg_->flags () & msg_t::more;
int rc = xreq_t::xsend (msg_, flags_); int rc = xreq_t::xsend (msg_, flags_);
if (rc != 0) if (rc != 0)
...@@ -71,7 +70,7 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -71,7 +70,7 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
return 0; return 0;
} }
int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_) int zmq::req_t::xrecv (msg_t *msg_, int flags_)
{ {
// If request wasn't send, we can't wait for reply. // If request wasn't send, we can't wait for reply.
if (!receiving_reply) { if (!receiving_reply) {
...@@ -84,8 +83,8 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_) ...@@ -84,8 +83,8 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)
int rc = xreq_t::xrecv (msg_, flags_); int rc = xreq_t::xrecv (msg_, flags_);
if (rc != 0) if (rc != 0)
return rc; return rc;
zmq_assert (msg_->flags & ZMQ_MSG_MORE); zmq_assert (msg_->flags () & msg_t::more);
zmq_assert (zmq_msg_size (msg_) == 0); zmq_assert (msg_->size () == 0);
message_begins = false; message_begins = false;
} }
...@@ -94,7 +93,7 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_) ...@@ -94,7 +93,7 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)
return rc; return rc;
// If the reply is fully received, flip the FSM into request-sending state. // If the reply is fully received, flip the FSM into request-sending state.
if (!(msg_->flags & ZMQ_MSG_MORE)) { if (!(msg_->flags () & msg_t::more)) {
receiving_reply = false; receiving_reply = false;
message_begins = true; message_begins = true;
} }
......
...@@ -34,8 +34,8 @@ namespace zmq ...@@ -34,8 +34,8 @@ namespace zmq
~req_t (); ~req_t ();
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
int xsend (zmq_msg_t *msg_, int flags_); int xsend (class msg_t *msg_, int flags_);
int xrecv (zmq_msg_t *msg_, int flags_); int xrecv (class msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
......
...@@ -80,7 +80,7 @@ void zmq::session_t::proceed_with_term () ...@@ -80,7 +80,7 @@ void zmq::session_t::proceed_with_term ()
own_t::process_term (0); own_t::process_term (0);
} }
bool zmq::session_t::read (::zmq_msg_t *msg_) bool zmq::session_t::read (msg_t *msg_)
{ {
if (!in_pipe) if (!in_pipe)
return false; return false;
...@@ -88,14 +88,15 @@ bool zmq::session_t::read (::zmq_msg_t *msg_) ...@@ -88,14 +88,15 @@ bool zmq::session_t::read (::zmq_msg_t *msg_)
if (!in_pipe->read (msg_)) if (!in_pipe->read (msg_))
return false; return false;
incomplete_in = msg_->flags & ZMQ_MSG_MORE; incomplete_in = msg_->flags () & msg_t::more;
return true; return true;
} }
bool zmq::session_t::write (::zmq_msg_t *msg_) bool zmq::session_t::write (msg_t *msg_)
{ {
if (out_pipe && out_pipe->write (msg_)) { if (out_pipe && out_pipe->write (msg_)) {
zmq_msg_init (msg_); int rc = msg_->init ();
errno_assert (rc == 0);
return true; return true;
} }
...@@ -120,13 +121,15 @@ void zmq::session_t::clean_pipes () ...@@ -120,13 +121,15 @@ void zmq::session_t::clean_pipes ()
// Remove any half-read message from the in pipe. // Remove any half-read message from the in pipe.
if (in_pipe) { if (in_pipe) {
while (incomplete_in) { while (incomplete_in) {
zmq_msg_t msg; msg_t msg;
zmq_msg_init (&msg); int rc = msg.init ();
errno_assert (rc == 0);
if (!read (&msg)) { if (!read (&msg)) {
zmq_assert (!incomplete_in); zmq_assert (!incomplete_in);
break; break;
} }
zmq_msg_close (&msg); rc = msg.close ();
errno_assert (rc == 0);
} }
} }
} }
......
...@@ -45,8 +45,8 @@ namespace zmq ...@@ -45,8 +45,8 @@ namespace zmq
// i_inout interface implementation. Note that detach method is not // i_inout interface implementation. Note that detach method is not
// implemented by generic session. Different session types may handle // implemented by generic session. Different session types may handle
// engine disconnection in different ways. // engine disconnection in different ways.
bool read (::zmq_msg_t *msg_); bool read (msg_t *msg_);
bool write (::zmq_msg_t *msg_); bool write (msg_t *msg_);
void flush (); void flush ();
void detach (); void detach ();
......
...@@ -22,8 +22,6 @@ ...@@ -22,8 +22,6 @@
#include <string> #include <string>
#include <algorithm> #include <algorithm>
#include "../include/zmq.h"
#include "platform.hpp" #include "platform.hpp"
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
...@@ -48,6 +46,7 @@ ...@@ -48,6 +46,7 @@
#include "platform.hpp" #include "platform.hpp"
#include "likely.hpp" #include "likely.hpp"
#include "uuid.hpp" #include "uuid.hpp"
#include "msg.hpp"
#include "pair.hpp" #include "pair.hpp"
#include "pub.hpp" #include "pub.hpp"
...@@ -464,7 +463,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -464,7 +463,7 @@ int zmq::socket_base_t::connect (const char *addr_)
return 0; return 0;
} }
int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) int zmq::socket_base_t::send (msg_t *msg_, int flags_)
{ {
// Check whether the library haven't been shut down yet. // Check whether the library haven't been shut down yet.
if (unlikely (ctx_terminated)) { if (unlikely (ctx_terminated)) {
...@@ -473,7 +472,7 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) ...@@ -473,7 +472,7 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
} }
// Check whether message passed to the function is valid. // Check whether message passed to the function is valid.
if (unlikely ((msg_->flags | ZMQ_MSG_MASK) != 0xff)) { if (unlikely (!msg_->check ())) {
errno = EFAULT; errno = EFAULT;
return -1; return -1;
} }
...@@ -485,7 +484,7 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) ...@@ -485,7 +484,7 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
// At this point we impose the MORE flag on the message. // At this point we impose the MORE flag on the message.
if (flags_ & ZMQ_SNDMORE) if (flags_ & ZMQ_SNDMORE)
msg_->flags |= ZMQ_MSG_MORE; msg_->set_flags (msg_t::more);
// Try to send the message. // Try to send the message.
rc = xsend (msg_, flags_); rc = xsend (msg_, flags_);
...@@ -509,7 +508,7 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) ...@@ -509,7 +508,7 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
return 0; return 0;
} }
int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
{ {
// Check whether the library haven't been shut down yet. // Check whether the library haven't been shut down yet.
if (unlikely (ctx_terminated)) { if (unlikely (ctx_terminated)) {
...@@ -518,7 +517,7 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) ...@@ -518,7 +517,7 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
} }
// Check whether message passed to the function is valid. // Check whether message passed to the function is valid.
if (unlikely ((msg_->flags | ZMQ_MSG_MASK) != 0xff)) { if (unlikely (!msg_->check ())) {
errno = EFAULT; errno = EFAULT;
return -1; return -1;
} }
...@@ -543,9 +542,9 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) ...@@ -543,9 +542,9 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
// If we have the message, return immediately. // If we have the message, return immediately.
if (rc == 0) { if (rc == 0) {
rcvmore = msg_->flags & ZMQ_MSG_MORE; rcvmore = msg_->flags () & msg_t::more;
if (rcvmore) if (rcvmore)
msg_->flags &= ~ZMQ_MSG_MORE; msg_->reset_flags (msg_t::more);
return 0; return 0;
} }
...@@ -565,9 +564,9 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) ...@@ -565,9 +564,9 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
rc = xrecv (msg_, flags_); rc = xrecv (msg_, flags_);
if (rc == 0) { if (rc == 0) {
rcvmore = msg_->flags & ZMQ_MSG_MORE; rcvmore = msg_->flags () & msg_t::more;
if (rcvmore) if (rcvmore)
msg_->flags &= ~ZMQ_MSG_MORE; msg_->reset_flags (msg_t::more);
} }
return rc; return rc;
} }
...@@ -585,9 +584,9 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) ...@@ -585,9 +584,9 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
block = true; block = true;
} }
rcvmore = msg_->flags & ZMQ_MSG_MORE; rcvmore = msg_->flags () & msg_t::more;
if (rcvmore) if (rcvmore)
msg_->flags &= ~ZMQ_MSG_MORE; msg_->reset_flags (msg_t::more);
return 0; return 0;
} }
...@@ -757,7 +756,7 @@ bool zmq::socket_base_t::xhas_out () ...@@ -757,7 +756,7 @@ bool zmq::socket_base_t::xhas_out ()
return false; return false;
} }
int zmq::socket_base_t::xsend (zmq_msg_t *msg_, int options_) int zmq::socket_base_t::xsend (msg_t *msg_, int options_)
{ {
errno = ENOTSUP; errno = ENOTSUP;
return -1; return -1;
...@@ -768,7 +767,7 @@ bool zmq::socket_base_t::xhas_in () ...@@ -768,7 +767,7 @@ bool zmq::socket_base_t::xhas_in ()
return false; return false;
} }
int zmq::socket_base_t::xrecv (zmq_msg_t *msg_, int options_) int zmq::socket_base_t::xrecv (msg_t *msg_, int options_)
{ {
errno = ENOTSUP; errno = ENOTSUP;
return -1; return -1;
......
...@@ -24,8 +24,6 @@ ...@@ -24,8 +24,6 @@
#include <map> #include <map>
#include <vector> #include <vector>
#include "../include/zmq.h"
#include "own.hpp" #include "own.hpp"
#include "array.hpp" #include "array.hpp"
#include "mutex.hpp" #include "mutex.hpp"
...@@ -69,8 +67,8 @@ namespace zmq ...@@ -69,8 +67,8 @@ namespace zmq
int getsockopt (int option_, void *optval_, size_t *optvallen_); int getsockopt (int option_, void *optval_, size_t *optvallen_);
int bind (const char *addr_); int bind (const char *addr_);
int connect (const char *addr_); int connect (const char *addr_);
int send (zmq_msg_t *msg_, int flags_); int send (class msg_t *msg_, int flags_);
int recv (zmq_msg_t *msg_, int flags_); int recv (class msg_t *msg_, int flags_);
int close (); int close ();
// These functions are used by the polling mechanism to determine // These functions are used by the polling mechanism to determine
...@@ -123,11 +121,11 @@ namespace zmq ...@@ -123,11 +121,11 @@ namespace zmq
// The default implementation assumes that send is not supported. // The default implementation assumes that send is not supported.
virtual bool xhas_out (); virtual bool xhas_out ();
virtual int xsend (zmq_msg_t *msg_, int options_); virtual int xsend (class msg_t *msg_, int options_);
// The default implementation assumes that recv in not supported. // The default implementation assumes that recv in not supported.
virtual bool xhas_in (); virtual bool xhas_in ();
virtual int xrecv (zmq_msg_t *msg_, int options_); virtual int xrecv (class msg_t *msg_, int options_);
// We are declaring termination handler as protected so that // We are declaring termination handler as protected so that
// individual socket types can hook into the termination process // individual socket types can hook into the termination process
......
...@@ -18,9 +18,8 @@ ...@@ -18,9 +18,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "../include/zmq.h"
#include "sub.hpp" #include "sub.hpp"
#include "msg.hpp"
zmq::sub_t::sub_t (class ctx_t *parent_, uint32_t tid_) : zmq::sub_t::sub_t (class ctx_t *parent_, uint32_t tid_) :
xsub_t (parent_, tid_) xsub_t (parent_, tid_)
...@@ -41,9 +40,10 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_, ...@@ -41,9 +40,10 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
} }
// Create the subscription message. // Create the subscription message.
zmq_msg_t msg; msg_t msg;
zmq_msg_init_size (&msg, optvallen_ + 1); int rc = msg.init_size (optvallen_ + 1);
unsigned char *data = (unsigned char*) zmq_msg_data (&msg); errno_assert (rc == 0);
unsigned char *data = (unsigned char*) msg.data ();
if (option_ == ZMQ_SUBSCRIBE) if (option_ == ZMQ_SUBSCRIBE)
*data = 1; *data = 1;
else if (option_ == ZMQ_UNSUBSCRIBE) else if (option_ == ZMQ_UNSUBSCRIBE)
...@@ -52,16 +52,17 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_, ...@@ -52,16 +52,17 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
// Pass it further on in the stack. // Pass it further on in the stack.
int err = 0; int err = 0;
int rc = xsub_t::xsend (&msg, 0); rc = xsub_t::xsend (&msg, 0);
if (rc != 0) if (rc != 0)
err = errno; err = errno;
zmq_msg_close (&msg); int rc2 = msg.close ();
errno_assert (rc2 == 0);
if (rc != 0) if (rc != 0)
errno = err; errno = err;
return rc; return rc;
} }
int zmq::sub_t::xsend (zmq_msg_t *msg_, int options_) int zmq::sub_t::xsend (msg_t *msg_, int options_)
{ {
// Overload the XSUB's send. // Overload the XSUB's send.
errno = ENOTSUP; errno = ENOTSUP;
......
...@@ -36,7 +36,7 @@ namespace zmq ...@@ -36,7 +36,7 @@ namespace zmq
protected: protected:
int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int options_); int xsend (class msg_t *msg_, int options_);
bool xhas_out (); bool xhas_out ();
private: private:
......
...@@ -22,8 +22,6 @@ ...@@ -22,8 +22,6 @@
#include <string> #include <string>
#include "../include/zmq.h"
#include "tcp_connecter.hpp" #include "tcp_connecter.hpp"
#include "platform.hpp" #include "platform.hpp"
#include "ip.hpp" #include "ip.hpp"
......
...@@ -20,8 +20,6 @@ ...@@ -20,8 +20,6 @@
#include <string.h> #include <string.h>
#include "../include/zmq.h"
#include "tcp_listener.hpp" #include "tcp_listener.hpp"
#include "platform.hpp" #include "platform.hpp"
#include "ip.hpp" #include "ip.hpp"
......
...@@ -18,11 +18,10 @@ ...@@ -18,11 +18,10 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "../include/zmq.h"
#include "xpub.hpp" #include "xpub.hpp"
#include "err.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "err.hpp"
#include "msg.hpp"
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_) : zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_), socket_base_t (parent_, tid_),
...@@ -53,7 +52,7 @@ void zmq::xpub_t::process_term (int linger_) ...@@ -53,7 +52,7 @@ void zmq::xpub_t::process_term (int linger_)
socket_base_t::process_term (linger_); socket_base_t::process_term (linger_);
} }
int zmq::xpub_t::xsend (zmq_msg_t *msg_, int flags_) int zmq::xpub_t::xsend (msg_t *msg_, int flags_)
{ {
return dist.send (msg_, flags_); return dist.send (msg_, flags_);
} }
...@@ -63,7 +62,7 @@ bool zmq::xpub_t::xhas_out () ...@@ -63,7 +62,7 @@ bool zmq::xpub_t::xhas_out ()
return dist.has_out (); return dist.has_out ();
} }
int zmq::xpub_t::xrecv (zmq_msg_t *msg_, int flags_) int zmq::xpub_t::xrecv (msg_t *msg_, int flags_)
{ {
errno = EAGAIN; errno = EAGAIN;
return -1; return -1;
......
...@@ -39,9 +39,9 @@ namespace zmq ...@@ -39,9 +39,9 @@ namespace zmq
// Implementations of virtual functions from socket_base_t. // Implementations of virtual functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_); const blob_t &peer_identity_);
int xsend (zmq_msg_t *msg_, int flags_); int xsend (class msg_t *msg_, int flags_);
bool xhas_out (); bool xhas_out ();
int xrecv (zmq_msg_t *msg_, int flags_); int xrecv (class msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
private: private:
......
...@@ -18,11 +18,9 @@ ...@@ -18,11 +18,9 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "../include/zmq.h"
#include "xrep.hpp" #include "xrep.hpp"
#include "err.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "err.hpp"
zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_), socket_base_t (parent_, tid_),
...@@ -159,7 +157,7 @@ void zmq::xrep_t::activated (writer_t *pipe_) ...@@ -159,7 +157,7 @@ void zmq::xrep_t::activated (writer_t *pipe_)
zmq_assert (false); zmq_assert (false);
} }
int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_) int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
{ {
// If this is the first part of the message it's the identity of the // If this is the first part of the message it's the identity of the
// peer to send the message to. // peer to send the message to.
...@@ -168,44 +166,43 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -168,44 +166,43 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)
// If we have malformed message (prefix with no subsequent message) // If we have malformed message (prefix with no subsequent message)
// then just silently ignore it. // then just silently ignore it.
if (msg_->flags & ZMQ_MSG_MORE) { if (msg_->flags () & msg_t::more) {
more_out = true; more_out = true;
// Find the pipe associated with the identity stored in the prefix. // Find the pipe associated with the identity stored in the prefix.
// If there's no such pipe just silently ignore the message. // If there's no such pipe just silently ignore the message.
blob_t identity ((unsigned char*) zmq_msg_data (msg_), blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
zmq_msg_size (msg_));
outpipes_t::iterator it = outpipes.find (identity); outpipes_t::iterator it = outpipes.find (identity);
if (it != outpipes.end ()) { if (it != outpipes.end ()) {
current_out = it->second.writer; current_out = it->second.writer;
zmq_msg_t empty; msg_t empty;
int rc = zmq_msg_init (&empty); int rc = empty.init ();
zmq_assert (rc == 0); errno_assert (rc == 0);
if (!current_out->check_write (&empty)) { if (!current_out->check_write (&empty)) {
it->second.active = false; it->second.active = false;
more_out = false; more_out = false;
current_out = NULL; current_out = NULL;
rc = zmq_msg_close (&empty); rc = empty.close ();
zmq_assert (rc == 0); errno_assert (rc == 0);
errno = EAGAIN; errno = EAGAIN;
return -1; return -1;
} }
rc = zmq_msg_close (&empty); rc = empty.close ();
zmq_assert (rc == 0); errno_assert (rc == 0);
} }
} }
int rc = zmq_msg_close (msg_); int rc = msg_->close ();
zmq_assert (rc == 0); errno_assert (rc == 0);
rc = zmq_msg_init (msg_); rc = msg_->init ();
zmq_assert (rc == 0); errno_assert (rc == 0);
return 0; return 0;
} }
// Check whether this is the last part of the message. // Check whether this is the last part of the message.
more_out = msg_->flags & ZMQ_MSG_MORE; more_out = msg_->flags () & msg_t::more;
// Push the message into the pipe. If there's no out pipe, just drop it. // Push the message into the pipe. If there's no out pipe, just drop it.
if (current_out) { if (current_out) {
...@@ -217,36 +214,38 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -217,36 +214,38 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)
} }
} }
else { else {
int rc = zmq_msg_close (msg_); int rc = msg_->close ();
zmq_assert (rc == 0); errno_assert (rc == 0);
} }
// Detach the message from the data buffer. // Detach the message from the data buffer.
int rc = zmq_msg_init (msg_); int rc = msg_->init ();
zmq_assert (rc == 0); errno_assert (rc == 0);
return 0; return 0;
} }
int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
{ {
// If there is a prefetched message, return it. // If there is a prefetched message, return it.
if (prefetched) { if (prefetched) {
zmq_msg_move (msg_, &prefetched_msg); int rc = msg_->move (prefetched_msg);
more_in = msg_->flags & ZMQ_MSG_MORE; errno_assert (rc == 0);
more_in = msg_->flags () & msg_t::more;
prefetched = false; prefetched = false;
return 0; return 0;
} }
// Deallocate old content of the message. // Deallocate old content of the message.
zmq_msg_close (msg_); int rc = msg_->close ();
errno_assert (rc == 0);
// If we are in the middle of reading a message, just grab next part of it. // If we are in the middle of reading a message, just grab next part of it.
if (more_in) { if (more_in) {
zmq_assert (inpipes [current_in].active); zmq_assert (inpipes [current_in].active);
bool fetched = inpipes [current_in].reader->read (msg_); bool fetched = inpipes [current_in].reader->read (msg_);
zmq_assert (fetched); zmq_assert (fetched);
more_in = msg_->flags & ZMQ_MSG_MORE; more_in = msg_->flags () & msg_t::more;
if (!more_in) { if (!more_in) {
current_in++; current_in++;
if (current_in >= inpipes.size ()) if (current_in >= inpipes.size ())
...@@ -264,12 +263,11 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) ...@@ -264,12 +263,11 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
// If we have a message, create a prefix and return it to the caller. // If we have a message, create a prefix and return it to the caller.
if (prefetched) { if (prefetched) {
int rc = zmq_msg_init_size (msg_, int rc = msg_->init_size (inpipes [current_in].identity.size ());
inpipes [current_in].identity.size ()); errno_assert (rc == 0);
zmq_assert (rc == 0); memcpy (msg_->data (), inpipes [current_in].identity.data (),
memcpy (zmq_msg_data (msg_), inpipes [current_in].identity.data (), msg_->size ());
zmq_msg_size (msg_)); msg_->set_flags (msg_t::more);
msg_->flags |= ZMQ_MSG_MORE;
return 0; return 0;
} }
...@@ -283,7 +281,8 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) ...@@ -283,7 +281,8 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
// No message is available. Initialise the output parameter // No message is available. Initialise the output parameter
// to be a 0-byte message. // to be a 0-byte message.
zmq_msg_init (msg_); rc = msg_->init ();
errno_assert (rc == 0);
errno = EAGAIN; errno = EAGAIN;
return -1; return -1;
} }
......
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
#include "socket_base.hpp" #include "socket_base.hpp"
#include "blob.hpp" #include "blob.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "msg.hpp"
namespace zmq namespace zmq
{ {
...@@ -45,8 +46,8 @@ namespace zmq ...@@ -45,8 +46,8 @@ namespace zmq
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipes (reader_t *inpipe_, writer_t *outpipe_, void xattach_pipes (reader_t *inpipe_, writer_t *outpipe_,
const blob_t &peer_identity_); const blob_t &peer_identity_);
int xsend (zmq_msg_t *msg_, int flags_); int xsend (class msg_t *msg_, int flags_);
int xrecv (zmq_msg_t *msg_, int flags_); int xrecv (class msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
...@@ -82,7 +83,7 @@ namespace zmq ...@@ -82,7 +83,7 @@ namespace zmq
bool prefetched; bool prefetched;
// Holds the prefetched message. // Holds the prefetched message.
zmq_msg_t prefetched_msg; msg_t prefetched_msg;
// If true, more incoming message parts are expected. // If true, more incoming message parts are expected.
bool more_in; bool more_in;
......
...@@ -18,10 +18,9 @@ ...@@ -18,10 +18,9 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "../include/zmq.h"
#include "xreq.hpp" #include "xreq.hpp"
#include "err.hpp" #include "err.hpp"
#include "msg.hpp"
zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) : zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_), socket_base_t (parent_, tid_),
...@@ -52,12 +51,12 @@ void zmq::xreq_t::process_term (int linger_) ...@@ -52,12 +51,12 @@ void zmq::xreq_t::process_term (int linger_)
socket_base_t::process_term (linger_); socket_base_t::process_term (linger_);
} }
int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_) int zmq::xreq_t::xsend (msg_t *msg_, int flags_)
{ {
return lb.send (msg_, flags_); return lb.send (msg_, flags_);
} }
int zmq::xreq_t::xrecv (zmq_msg_t *msg_, int flags_) int zmq::xreq_t::xrecv (msg_t *msg_, int flags_)
{ {
return fq.recv (msg_, flags_); return fq.recv (msg_, flags_);
} }
......
...@@ -41,8 +41,8 @@ namespace zmq ...@@ -41,8 +41,8 @@ namespace zmq
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_); const blob_t &peer_identity_);
int xsend (zmq_msg_t *msg_, int flags_); int xsend (class msg_t *msg_, int flags_);
int xrecv (zmq_msg_t *msg_, int flags_); int xrecv (class msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
......
...@@ -20,8 +20,6 @@ ...@@ -20,8 +20,6 @@
#include <string.h> #include <string.h>
#include "../include/zmq.h"
#include "xsub.hpp" #include "xsub.hpp"
#include "err.hpp" #include "err.hpp"
...@@ -34,12 +32,14 @@ zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_) : ...@@ -34,12 +32,14 @@ zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_) :
options.type = ZMQ_XSUB; options.type = ZMQ_XSUB;
options.requires_in = true; options.requires_in = true;
options.requires_out = false; options.requires_out = false;
zmq_msg_init (&message); int rc = message.init ();
errno_assert (rc == 0);
} }
zmq::xsub_t::~xsub_t () zmq::xsub_t::~xsub_t ()
{ {
zmq_msg_close (&message); int rc = message.close ();
errno_assert (rc == 0);
} }
void zmq::xsub_t::xattach_pipes (class reader_t *inpipe_, void zmq::xsub_t::xattach_pipes (class reader_t *inpipe_,
...@@ -55,10 +55,10 @@ void zmq::xsub_t::process_term (int linger_) ...@@ -55,10 +55,10 @@ void zmq::xsub_t::process_term (int linger_)
socket_base_t::process_term (linger_); socket_base_t::process_term (linger_);
} }
int zmq::xsub_t::xsend (zmq_msg_t *msg_, int options_) int zmq::xsub_t::xsend (msg_t *msg_, int options_)
{ {
size_t size = zmq_msg_size (msg_); size_t size = msg_->size ();
unsigned char *data = (unsigned char*) zmq_msg_data (msg_); unsigned char *data = (unsigned char*) msg_->data ();
// Malformed subscriptions are dropped silently. // Malformed subscriptions are dropped silently.
if (size >= 1) { if (size >= 1) {
...@@ -72,10 +72,10 @@ int zmq::xsub_t::xsend (zmq_msg_t *msg_, int options_) ...@@ -72,10 +72,10 @@ int zmq::xsub_t::xsend (zmq_msg_t *msg_, int options_)
subscriptions.rm (data + 1, size - 1); subscriptions.rm (data + 1, size - 1);
} }
int rc = zmq_msg_close (msg_); int rc = msg_->close ();
zmq_assert (rc == 0); errno_assert (rc == 0);
rc = zmq_msg_init (msg_); rc = msg_->init ();
zmq_assert (rc == 0); errno_assert (rc == 0);
return 0; return 0;
} }
...@@ -85,14 +85,15 @@ bool zmq::xsub_t::xhas_out () ...@@ -85,14 +85,15 @@ bool zmq::xsub_t::xhas_out ()
return true; return true;
} }
int zmq::xsub_t::xrecv (zmq_msg_t *msg_, int flags_) int zmq::xsub_t::xrecv (msg_t *msg_, int flags_)
{ {
// If there's already a message prepared by a previous call to zmq_poll, // If there's already a message prepared by a previous call to zmq_poll,
// return it straight ahead. // return it straight ahead.
if (has_message) { if (has_message) {
zmq_msg_move (msg_, &message); int rc = msg_->move (message);
errno_assert (rc == 0);
has_message = false; has_message = false;
more = msg_->flags & ZMQ_MSG_MORE; more = msg_->flags () & msg_t::more;
return 0; return 0;
} }
...@@ -112,13 +113,13 @@ int zmq::xsub_t::xrecv (zmq_msg_t *msg_, int flags_) ...@@ -112,13 +113,13 @@ int zmq::xsub_t::xrecv (zmq_msg_t *msg_, int flags_)
// Check whether the message matches at least one subscription. // Check whether the message matches at least one subscription.
// Non-initial parts of the message are passed // Non-initial parts of the message are passed
if (more || match (msg_)) { if (more || match (msg_)) {
more = msg_->flags & ZMQ_MSG_MORE; more = msg_->flags () & msg_t::more;
return 0; return 0;
} }
// Message doesn't match. Pop any remaining parts of the message // Message doesn't match. Pop any remaining parts of the message
// from the pipe. // from the pipe.
while (msg_->flags & ZMQ_MSG_MORE) { while (msg_->flags () & msg_t::more) {
rc = fq.recv (msg_, ZMQ_DONTWAIT); rc = fq.recv (msg_, ZMQ_DONTWAIT);
zmq_assert (rc == 0); zmq_assert (rc == 0);
} }
...@@ -158,15 +159,15 @@ bool zmq::xsub_t::xhas_in () ...@@ -158,15 +159,15 @@ bool zmq::xsub_t::xhas_in ()
// Message doesn't match. Pop any remaining parts of the message // Message doesn't match. Pop any remaining parts of the message
// from the pipe. // from the pipe.
while (message.flags & ZMQ_MSG_MORE) { while (message.flags () & msg_t::more) {
rc = fq.recv (&message, ZMQ_DONTWAIT); rc = fq.recv (&message, ZMQ_DONTWAIT);
zmq_assert (rc == 0); zmq_assert (rc == 0);
} }
} }
} }
bool zmq::xsub_t::match (zmq_msg_t *msg_) bool zmq::xsub_t::match (msg_t *msg_)
{ {
return subscriptions.check ((unsigned char*) zmq_msg_data (msg_), return subscriptions.check ((unsigned char*) msg_->data (), msg_->size ());
zmq_msg_size (msg_));
} }
...@@ -21,10 +21,9 @@ ...@@ -21,10 +21,9 @@
#ifndef __ZMQ_XSUB_HPP_INCLUDED__ #ifndef __ZMQ_XSUB_HPP_INCLUDED__
#define __ZMQ_XSUB_HPP_INCLUDED__ #define __ZMQ_XSUB_HPP_INCLUDED__
#include "../include/zmq.h"
#include "trie.hpp" #include "trie.hpp"
#include "socket_base.hpp" #include "socket_base.hpp"
#include "msg.hpp"
#include "fq.hpp" #include "fq.hpp"
namespace zmq namespace zmq
...@@ -42,9 +41,9 @@ namespace zmq ...@@ -42,9 +41,9 @@ namespace zmq
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_); const blob_t &peer_identity_);
int xsend (zmq_msg_t *msg_, int options_); int xsend (class msg_t *msg_, int options_);
bool xhas_out (); bool xhas_out ();
int xrecv (zmq_msg_t *msg_, int flags_); int xrecv (class msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
private: private:
...@@ -53,7 +52,7 @@ namespace zmq ...@@ -53,7 +52,7 @@ namespace zmq
void process_term (int linger_); void process_term (int linger_);
// Check whether the message matches at least one subscription. // Check whether the message matches at least one subscription.
bool match (zmq_msg_t *msg_); bool match (class msg_t *msg_);
// Fair queueing object for inbound pipes. // Fair queueing object for inbound pipes.
fq_t fq; fq_t fq;
...@@ -64,7 +63,7 @@ namespace zmq ...@@ -64,7 +63,7 @@ namespace zmq
// If true, 'message' contains a matching message to return on the // If true, 'message' contains a matching message to return on the
// next recv call. // next recv call.
bool has_message; bool has_message;
zmq_msg_t message; msg_t message;
// If true, part of a multipart message was already received, but // If true, part of a multipart message was already received, but
// there are following parts still waiting. // there are following parts still waiting.
......
...@@ -32,8 +32,6 @@ ...@@ -32,8 +32,6 @@
#include <poll.h> #include <poll.h>
#endif #endif
#include "../include/zmq.h"
#include <string.h> #include <string.h>
#include <errno.h> #include <errno.h>
#include <stdlib.h> #include <stdlib.h>
...@@ -46,6 +44,7 @@ ...@@ -46,6 +44,7 @@
#include "clock.hpp" #include "clock.hpp"
#include "ctx.hpp" #include "ctx.hpp"
#include "err.hpp" #include "err.hpp"
#include "msg.hpp"
#include "fd.hpp" #include "fd.hpp"
#if !defined ZMQ_HAVE_WINDOWS #if !defined ZMQ_HAVE_WINDOWS
...@@ -57,6 +56,10 @@ ...@@ -57,6 +56,10 @@
#include <pgm/pgm.h> #include <pgm/pgm.h>
#endif #endif
// Compile time check whether msg_t fits into zmq_msg_t.
typedef char check_msg_t_size
[sizeof (zmq::msg_t) == sizeof (zmq_msg_t) ? 1 : -1];
void zmq_version (int *major_, int *minor_, int *patch_) void zmq_version (int *major_, int *minor_, int *patch_)
{ {
*major_ = ZMQ_VERSION_MAJOR; *major_ = ZMQ_VERSION_MAJOR;
...@@ -260,7 +263,7 @@ int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_) ...@@ -260,7 +263,7 @@ int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_)
return -1; return -1;
} }
int sz = (int) zmq_msg_size (msg_); int sz = (int) zmq_msg_size (msg_);
int rc = (((zmq::socket_base_t*) s_)->send (msg_, flags_)); int rc = (((zmq::socket_base_t*) s_)->send ((zmq::msg_t*) msg_, flags_));
if (unlikely (rc < 0)) if (unlikely (rc < 0))
return -1; return -1;
return sz; return sz;
...@@ -272,12 +275,53 @@ int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_) ...@@ -272,12 +275,53 @@ int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_)
errno = ENOTSOCK; errno = ENOTSOCK;
return -1; return -1;
} }
int rc = (((zmq::socket_base_t*) s_)->recv (msg_, flags_)); int rc = (((zmq::socket_base_t*) s_)->recv ((zmq::msg_t*) msg_, flags_));
if (unlikely (rc < 0)) if (unlikely (rc < 0))
return -1; return -1;
return (int) zmq_msg_size (msg_); return (int) zmq_msg_size (msg_);
} }
int zmq_msg_init (zmq_msg_t *msg_)
{
return ((zmq::msg_t*) msg_)->init ();
}
int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
{
return ((zmq::msg_t*) msg_)->init_size (size_);
}
int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
zmq_free_fn *ffn_, void *hint_)
{
return ((zmq::msg_t*) msg_)->init_data (data_, size_, ffn_, hint_);
}
int zmq_msg_close (zmq_msg_t *msg_)
{
return ((zmq::msg_t*) msg_)->close ();
}
int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
{
return ((zmq::msg_t*) dest_)->move (*(zmq::msg_t*) src_);
}
int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
{
return ((zmq::msg_t*) dest_)->copy (*(zmq::msg_t*) src_);
}
void *zmq_msg_data (zmq_msg_t *msg_)
{
return ((zmq::msg_t*) msg_)->data ();
}
size_t zmq_msg_size (zmq_msg_t *msg_)
{
return ((zmq::msg_t*) msg_)->size ();
}
#if defined ZMQ_FORCE_SELECT #if defined ZMQ_FORCE_SELECT
#define ZMQ_POLL_BASED_ON_SELECT #define ZMQ_POLL_BASED_ON_SELECT
#elif defined ZMQ_FORCE_POLL #elif defined ZMQ_FORCE_POLL
......
...@@ -53,28 +53,27 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_, ...@@ -53,28 +53,27 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
peer_identity.assign (identity, uuid_t::uuid_blob_len + 1); peer_identity.assign (identity, uuid_t::uuid_blob_len + 1);
// Create a list of props to send. // Create a list of props to send.
msg_t msg;
zmq_msg_t msg; int rc = msg.init_size (4);
int rc = zmq_msg_init_size (&msg, 4);
errno_assert (rc == 0); errno_assert (rc == 0);
unsigned char *data = (unsigned char*) zmq_msg_data (&msg); unsigned char *data = (unsigned char*) msg.data ();
put_uint16 (data, prop_type); put_uint16 (data, prop_type);
put_uint16 (data + 2, options.type); put_uint16 (data + 2, options.type);
msg.flags |= ZMQ_MSG_MORE; msg.set_flags (msg_t::more);
to_send.push_back (msg); to_send.push_back (msg);
if (!options.identity.empty ()) { if (!options.identity.empty ()) {
rc = zmq_msg_init_size (&msg, 2 + options.identity.size ()); rc = msg.init_size (2 + options.identity.size ());
errno_assert (rc == 0); errno_assert (rc == 0);
data = (unsigned char*) zmq_msg_data (&msg); data = (unsigned char*) msg.data ();
put_uint16 (data, prop_identity); put_uint16 (data, prop_identity);
memcpy (data + 2, options.identity.data (), options.identity.size ()); memcpy (data + 2, options.identity.data (), options.identity.size ());
msg.flags |= ZMQ_MSG_MORE; msg.set_flags (msg_t::more);
to_send.push_back (msg); to_send.push_back (msg);
} }
// Remove the MORE flag from the last prop. // Remove the MORE flag from the last prop.
to_send.back ().flags &= ~ZMQ_MSG_MORE; to_send.back ().reset_flags (msg_t::more);
} }
zmq::zmq_init_t::~zmq_init_t () zmq::zmq_init_t::~zmq_init_t ()
...@@ -85,13 +84,13 @@ zmq::zmq_init_t::~zmq_init_t () ...@@ -85,13 +84,13 @@ zmq::zmq_init_t::~zmq_init_t ()
// If there are unsent props still queued deallocate them. // If there are unsent props still queued deallocate them.
for (to_send_t::iterator it = to_send.begin (); it != to_send.end (); for (to_send_t::iterator it = to_send.begin (); it != to_send.end ();
++it) { ++it) {
int rc = zmq_msg_close (&(*it)); int rc = it->close ();
errno_assert (rc == 0); errno_assert (rc == 0);
} }
to_send.clear (); to_send.clear ();
} }
bool zmq::zmq_init_t::read (::zmq_msg_t *msg_) bool zmq::zmq_init_t::read (msg_t *msg_)
{ {
// If the identity was already sent, do nothing. // If the identity was already sent, do nothing.
if (to_send.empty ()) if (to_send.empty ())
...@@ -107,15 +106,15 @@ bool zmq::zmq_init_t::read (::zmq_msg_t *msg_) ...@@ -107,15 +106,15 @@ bool zmq::zmq_init_t::read (::zmq_msg_t *msg_)
return true; return true;
} }
bool zmq::zmq_init_t::write (::zmq_msg_t *msg_) bool zmq::zmq_init_t::write (msg_t *msg_)
{ {
// If identity was already received, we are not interested // If identity was already received, we are not interested
// in subsequent messages. // in subsequent messages.
if (received) if (received)
return false; return false;
size_t size = zmq_msg_size (msg_); size_t size = msg_->size ();
unsigned char *data = (unsigned char*) zmq_msg_data (msg_); unsigned char *data = (unsigned char*) msg_->data ();
// There should be at least property type in the message. // There should be at least property type in the message.
zmq_assert (size >= 2); zmq_assert (size >= 2);
...@@ -139,7 +138,7 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_) ...@@ -139,7 +138,7 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
zmq_assert (false); zmq_assert (false);
} }
if (!(msg_->flags & ZMQ_MSG_MORE)) { if (!(msg_->flags () & msg_t::more)) {
received = true; received = true;
finalise_initialisation (); finalise_initialisation ();
} }
......
...@@ -23,14 +23,13 @@ ...@@ -23,14 +23,13 @@
#include <vector> #include <vector>
#include "../include/zmq.h"
#include "i_inout.hpp" #include "i_inout.hpp"
#include "i_engine.hpp" #include "i_engine.hpp"
#include "own.hpp"
#include "fd.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "blob.hpp" #include "blob.hpp"
#include "msg.hpp"
#include "own.hpp"
#include "fd.hpp"
namespace zmq namespace zmq
{ {
...@@ -58,8 +57,8 @@ namespace zmq ...@@ -58,8 +57,8 @@ namespace zmq
void dispatch_engine (); void dispatch_engine ();
// i_inout interface implementation. // i_inout interface implementation.
bool read (::zmq_msg_t *msg_); bool read (class msg_t *msg_);
bool write (::zmq_msg_t *msg_); bool write (class msg_t *msg_);
void flush (); void flush ();
void detach (); void detach ();
...@@ -75,7 +74,7 @@ namespace zmq ...@@ -75,7 +74,7 @@ namespace zmq
// List of messages to send to the peer during the connection // List of messages to send to the peer during the connection
// initiation phase. // initiation phase.
typedef std::vector < ::zmq_msg_t> to_send_t; typedef std::vector <msg_t> to_send_t;
to_send_t to_send; to_send_t to_send;
// True if peer's identity was already received. // True if peer's identity was already received.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment