Commit 1bcb0a73 authored by Constantin Rack's avatar Constantin Rack

Merge pull request #1734 from somdoron/radio-dish

problem: radio-dish is still hacky
parents e4243886 1960b4e8
...@@ -239,6 +239,8 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int property, int optval); ...@@ -239,6 +239,8 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int property, int optval);
ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property); ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property);
ZMQ_EXPORT int zmq_msg_set_routing_id (zmq_msg_t *msg, uint32_t routing_id); ZMQ_EXPORT int zmq_msg_set_routing_id (zmq_msg_t *msg, uint32_t routing_id);
ZMQ_EXPORT uint32_t zmq_msg_routing_id (zmq_msg_t *msg); ZMQ_EXPORT uint32_t zmq_msg_routing_id (zmq_msg_t *msg);
ZMQ_EXPORT int zmq_msg_set_group (zmq_msg_t *msg, const char *group);
ZMQ_EXPORT const char *zmq_msg_group (zmq_msg_t *msg);
/******************************************************************************/ /******************************************************************************/
...@@ -359,7 +361,7 @@ ZMQ_EXPORT uint32_t zmq_msg_routing_id (zmq_msg_t *msg); ...@@ -359,7 +361,7 @@ ZMQ_EXPORT uint32_t zmq_msg_routing_id (zmq_msg_t *msg);
#define ZMQ_GSSAPI 3 #define ZMQ_GSSAPI 3
/* RADIO-DISH protocol */ /* RADIO-DISH protocol */
#define ZMQ_GROUP_MAX_LENGTH 255 #define ZMQ_GROUP_MAX_LENGTH 15
/* Deprecated options and aliases */ /* Deprecated options and aliases */
#define ZMQ_TCP_ACCEPT_FILTER 38 #define ZMQ_TCP_ACCEPT_FILTER 38
......
...@@ -37,7 +37,7 @@ zmq::shared_message_memory_allocator::shared_message_memory_allocator (std::size ...@@ -37,7 +37,7 @@ zmq::shared_message_memory_allocator::shared_message_memory_allocator (std::size
buf(NULL), buf(NULL),
bufsize(0), bufsize(0),
max_size(bufsize_), max_size(bufsize_),
msg_refcnt(NULL), msg_content(NULL),
maxCounters (static_cast <size_t> (std::ceil (static_cast <double> (max_size) / static_cast <double> (msg_t::max_vsm_size)))) maxCounters (static_cast <size_t> (std::ceil (static_cast <double> (max_size) / static_cast <double> (msg_t::max_vsm_size))))
{ {
} }
...@@ -46,7 +46,7 @@ zmq::shared_message_memory_allocator::shared_message_memory_allocator (std::size ...@@ -46,7 +46,7 @@ zmq::shared_message_memory_allocator::shared_message_memory_allocator (std::size
buf(NULL), buf(NULL),
bufsize(0), bufsize(0),
max_size(bufsize_), max_size(bufsize_),
msg_refcnt(NULL), msg_content(NULL),
maxCounters(maxMessages) maxCounters(maxMessages)
{ {
} }
...@@ -77,7 +77,7 @@ unsigned char* zmq::shared_message_memory_allocator::allocate () ...@@ -77,7 +77,7 @@ unsigned char* zmq::shared_message_memory_allocator::allocate ()
// allocate memory for reference counters together with reception buffer // allocate memory for reference counters together with reception buffer
std::size_t const allocationsize = std::size_t const allocationsize =
max_size + sizeof (zmq::atomic_counter_t) + max_size + sizeof (zmq::atomic_counter_t) +
maxCounters * sizeof (zmq::atomic_counter_t); maxCounters * sizeof (zmq::msg_t::content_t);
buf = static_cast <unsigned char *> (std::malloc (allocationsize)); buf = static_cast <unsigned char *> (std::malloc (allocationsize));
alloc_assert (buf); alloc_assert (buf);
...@@ -90,7 +90,7 @@ unsigned char* zmq::shared_message_memory_allocator::allocate () ...@@ -90,7 +90,7 @@ unsigned char* zmq::shared_message_memory_allocator::allocate ()
} }
bufsize = max_size; bufsize = max_size;
msg_refcnt = reinterpret_cast <zmq::atomic_counter_t*> (buf + sizeof (atomic_counter_t) + max_size); msg_content = reinterpret_cast <zmq::msg_t::content_t*> (buf + sizeof (atomic_counter_t) + max_size);
return buf + sizeof (zmq::atomic_counter_t); return buf + sizeof (zmq::atomic_counter_t);
} }
...@@ -108,7 +108,7 @@ unsigned char* zmq::shared_message_memory_allocator::release () ...@@ -108,7 +108,7 @@ unsigned char* zmq::shared_message_memory_allocator::release ()
unsigned char* b = buf; unsigned char* b = buf;
buf = NULL; buf = NULL;
bufsize = 0; bufsize = 0;
msg_refcnt = NULL; msg_content = NULL;
return b; return b;
} }
......
...@@ -34,6 +34,7 @@ ...@@ -34,6 +34,7 @@
#include <cstdlib> #include <cstdlib>
#include "atomic_counter.hpp" #include "atomic_counter.hpp"
#include "msg.hpp"
#include "err.hpp" #include "err.hpp"
namespace zmq namespace zmq
...@@ -132,21 +133,21 @@ namespace zmq ...@@ -132,21 +133,21 @@ namespace zmq
bufsize = new_size; bufsize = new_size;
} }
zmq::atomic_counter_t* provide_refcnt () zmq::msg_t::content_t* provide_content ()
{ {
return msg_refcnt; return msg_content;
} }
void advance_refcnt () void advance_content ()
{ {
msg_refcnt++; msg_content++;
} }
private: private:
unsigned char* buf; unsigned char* buf;
std::size_t bufsize; std::size_t bufsize;
std::size_t max_size; std::size_t max_size;
zmq::atomic_counter_t* msg_refcnt; zmq::msg_t::content_t* msg_content;
std::size_t maxCounters; std::size_t maxCounters;
}; };
} }
......
...@@ -29,6 +29,7 @@ ...@@ -29,6 +29,7 @@
#include <string.h> #include <string.h>
#include "../include/zmq.h"
#include "macros.hpp" #include "macros.hpp"
#include "dish.hpp" #include "dish.hpp"
#include "err.hpp" #include "err.hpp"
...@@ -89,13 +90,14 @@ void zmq::dish_t::xhiccuped (pipe_t *pipe_) ...@@ -89,13 +90,14 @@ void zmq::dish_t::xhiccuped (pipe_t *pipe_)
int zmq::dish_t::xjoin (const char* group_) int zmq::dish_t::xjoin (const char* group_)
{ {
if (strlen (group_) > ZMQ_GROUP_MAX_LENGTH) { std::string group = std::string (group_);
if (group.length () > ZMQ_GROUP_MAX_LENGTH) {
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
subscriptions_t::iterator it = subscriptions_t::iterator it = subscriptions.find (group);
std::find (subscriptions.begin (), subscriptions.end (), std::string(group_));
// User cannot join same group twice // User cannot join same group twice
if (it != subscriptions.end ()) { if (it != subscriptions.end ()) {
...@@ -103,16 +105,14 @@ int zmq::dish_t::xjoin (const char* group_) ...@@ -103,16 +105,14 @@ int zmq::dish_t::xjoin (const char* group_)
return -1; return -1;
} }
subscriptions.push_back (std::string (group_)); subscriptions.insert (group);
size_t len = strlen (group_);
msg_t msg; msg_t msg;
int rc = msg.init_size (len + 1); int rc = msg.init_join ();
errno_assert (rc == 0); errno_assert (rc == 0);
char *data = (char*) msg.data (); rc = msg.set_group (group_);
data[0] = 'J'; errno_assert (rc == 0);
memcpy (data + 1, group_, len);
int err = 0; int err = 0;
rc = dist.send_to_all (&msg); rc = dist.send_to_all (&msg);
...@@ -127,12 +127,14 @@ int zmq::dish_t::xjoin (const char* group_) ...@@ -127,12 +127,14 @@ int zmq::dish_t::xjoin (const char* group_)
int zmq::dish_t::xleave (const char* group_) int zmq::dish_t::xleave (const char* group_)
{ {
if (strlen (group_) > ZMQ_GROUP_MAX_LENGTH) { std::string group = std::string (group_);
if (group.length () > ZMQ_GROUP_MAX_LENGTH) {
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
subscriptions_t::iterator it = std::find (subscriptions.begin (), subscriptions.end (), std::string (group_)); subscriptions_t::iterator it = std::find (subscriptions.begin (), subscriptions.end (), group);
if (it == subscriptions.end ()) { if (it == subscriptions.end ()) {
errno = EINVAL; errno = EINVAL;
...@@ -141,14 +143,12 @@ int zmq::dish_t::xleave (const char* group_) ...@@ -141,14 +143,12 @@ int zmq::dish_t::xleave (const char* group_)
subscriptions.erase (it); subscriptions.erase (it);
size_t len = strlen (group_);
msg_t msg; msg_t msg;
int rc = msg.init_size (len + 1); int rc = msg.init_leave ();
errno_assert (rc == 0); errno_assert (rc == 0);
char *data = (char*) msg.data (); rc = msg.set_group (group_);
data[0] = 'L'; errno_assert (rc == 0);
memcpy (data + 1, group_, len);
int err = 0; int err = 0;
rc = dist.send_to_all (&msg); rc = dist.send_to_all (&msg);
...@@ -184,15 +184,21 @@ int zmq::dish_t::xrecv (msg_t *msg_) ...@@ -184,15 +184,21 @@ int zmq::dish_t::xrecv (msg_t *msg_)
return 0; return 0;
} }
// Get a message using fair queueing algorithm. while (true) {
int rc = fq.recv (msg_);
// If there's no message available, return immediately. // Get a message using fair queueing algorithm.
// The same when error occurs. int rc = fq.recv (msg_);
if (rc != 0)
return -1; // If there's no message available, return immediately.
// The same when error occurs.
if (rc != 0)
return -1;
return 0; // Filtering non matching messages
subscriptions_t::iterator it = subscriptions.find (std::string(msg_->group ()));
if (it != subscriptions.end ())
return 0;
}
} }
bool zmq::dish_t::xhas_in () bool zmq::dish_t::xhas_in ()
...@@ -202,18 +208,24 @@ bool zmq::dish_t::xhas_in () ...@@ -202,18 +208,24 @@ bool zmq::dish_t::xhas_in ()
if (has_message) if (has_message)
return true; return true;
// Get a message using fair queueing algorithm. while (true) {
int rc = fq.recv (&message); // Get a message using fair queueing algorithm.
int rc = fq.recv (&message);
// If there's no message available, return immediately.
// The same when error occurs. // If there's no message available, return immediately.
if (rc != 0) { // The same when error occurs.
errno_assert (errno == EAGAIN); if (rc != 0) {
return false; errno_assert (errno == EAGAIN);
return false;
}
// Filtering non matching messages
subscriptions_t::iterator it = subscriptions.find (std::string(message.group ()));
if (it != subscriptions.end ()) {
has_message = true;
return true;
}
} }
has_message = true;
return true;
} }
zmq::blob_t zmq::dish_t::get_credential () const zmq::blob_t zmq::dish_t::get_credential () const
...@@ -225,22 +237,121 @@ void zmq::dish_t::send_subscriptions (pipe_t *pipe_) ...@@ -225,22 +237,121 @@ void zmq::dish_t::send_subscriptions (pipe_t *pipe_)
{ {
for (subscriptions_t::iterator it = subscriptions.begin (); it != subscriptions.end (); ++it) { for (subscriptions_t::iterator it = subscriptions.begin (); it != subscriptions.end (); ++it) {
msg_t msg; msg_t msg;
int rc = msg.init_size (it->length () + 1); int rc = msg.init_join ();
errno_assert (rc == 0);
rc = msg.set_group (it->c_str());
errno_assert (rc == 0); errno_assert (rc == 0);
char *data = (char*) msg.data ();
data [0] = 'J';
it->copy (data + 1, it->length ());
// Send it to the pipe. // Send it to the pipe.
bool sent = pipe_->write (&msg); pipe_->write (&msg);
msg.close ();
// If we reached the SNDHWM, and thus cannot send the subscription, drop
// the subscription message instead. This matches the behaviour of
// zmq_setsockopt(ZMQ_SUBSCRIBE, ...), which also drops subscriptions
// when the SNDHWM is reached.
if (!sent)
msg.close ();
} }
pipe_->flush (); pipe_->flush ();
} }
zmq::dish_session_t::dish_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
address_t *addr_) :
session_base_t (io_thread_, connect_, socket_, options_, addr_),
state (group)
{
}
zmq::dish_session_t::~dish_session_t ()
{
}
int zmq::dish_session_t::push_msg (msg_t *msg_)
{
if (state == group) {
if ((msg_->flags() & msg_t::more) != msg_t::more) {
errno = EFAULT;
return -1;
}
if (msg_->size() > ZMQ_GROUP_MAX_LENGTH) {
errno = EFAULT;
return -1;
}
group_msg = *msg_;
state = body;
int rc = msg_->init ();
errno_assert (rc == 0);
return 0;
}
else {
// Set the message group
int rc = msg_->set_group ((char*)group_msg.data (), group_msg. size());
errno_assert (rc == 0);
// We set the group, so we don't need the group_msg anymore
rc = group_msg.close ();
errno_assert (rc == 0);
// Thread safe socket doesn't support multipart messages
if ((msg_->flags() & msg_t::more) == msg_t::more) {
errno = EFAULT;
return -1;
}
// Push message to dish socket
rc = session_base_t::push_msg (msg_);
if (rc == 0)
state = group;
return rc;
}
}
int zmq::dish_session_t::pull_msg (msg_t *msg_)
{
int rc = session_base_t::pull_msg (msg_);
if (rc != 0)
return rc;
if (!msg_->is_join () && !msg_->is_leave ())
return rc;
else {
int group_length = strlen (msg_->group ());
msg_t command;
int offset;
if (msg_->is_join ()) {
command.init_size (group_length + 5);
offset = 5;
memcpy (command.data (), "\4JOIN", 5);
}
else {
command.init_size (group_length + 6);
offset = 6;
memcpy (command.data (), "\5LEAVE", 6);
}
command.set_flags (msg_t::command);
char* command_data = (char*)command.data ();
// Copy the group
memcpy (command_data + offset, msg_->group (), group_length);
// Close the join message
int rc = msg_->close ();
errno_assert (rc == 0);
*msg_ = command;
return 0;
}
}
void zmq::dish_session_t::reset ()
{
session_base_t::reset ();
state = group;
}
...@@ -71,7 +71,7 @@ namespace zmq ...@@ -71,7 +71,7 @@ namespace zmq
int xleave (const char *group_); int xleave (const char *group_);
private: private:
// Send subscriptions to a pipe // Send subscriptions to a pipe
void send_subscriptions (pipe_t *pipe_); void send_subscriptions (pipe_t *pipe_);
// Fair queueing object for inbound pipes. // Fair queueing object for inbound pipes.
...@@ -81,7 +81,7 @@ namespace zmq ...@@ -81,7 +81,7 @@ namespace zmq
dist_t dist; dist_t dist;
// The repository of subscriptions. // The repository of subscriptions.
typedef std::vector<std::string> subscriptions_t; typedef std::set<std::string> subscriptions_t;
subscriptions_t subscriptions; subscriptions_t subscriptions;
// If true, 'message' contains a matching message to return on the // If true, 'message' contains a matching message to return on the
...@@ -93,6 +93,33 @@ namespace zmq ...@@ -93,6 +93,33 @@ namespace zmq
const dish_t &operator = (const dish_t&); const dish_t &operator = (const dish_t&);
}; };
class dish_session_t : public session_base_t
{
public:
dish_session_t (zmq::io_thread_t *io_thread_, bool connect_,
zmq::socket_base_t *socket_, const options_t &options_,
address_t *addr_);
~dish_session_t ();
// Overrides of the functions from session_base_t.
int push_msg (msg_t *msg_);
int pull_msg (msg_t *msg_);
void reset ();
private:
enum {
group,
body
} state;
msg_t group_msg;
dish_session_t (const dish_session_t&);
const dish_session_t &operator = (const dish_session_t&);
};
} }
#endif #endif
...@@ -53,7 +53,7 @@ bool zmq::msg_t::check () ...@@ -53,7 +53,7 @@ bool zmq::msg_t::check ()
int zmq::msg_t::init (void* data_, size_t size_, int zmq::msg_t::init (void* data_, size_t size_,
msg_free_fn* ffn_, void* hint, msg_free_fn* ffn_, void* hint,
zmq::atomic_counter_t* refcnt_) content_t* content_)
{ {
if (size_ < max_vsm_size) { if (size_ < max_vsm_size) {
int const rc = init_size(size_); int const rc = init_size(size_);
...@@ -68,9 +68,9 @@ int zmq::msg_t::init (void* data_, size_t size_, ...@@ -68,9 +68,9 @@ int zmq::msg_t::init (void* data_, size_t size_,
return -1; return -1;
} }
} }
else if(refcnt_) else if(content_)
{ {
return init_external_storage(data_, size_, refcnt_, ffn_, hint); return init_external_storage(content_, data_, size_, ffn_, hint);
} }
else else
{ {
...@@ -84,26 +84,30 @@ int zmq::msg_t::init () ...@@ -84,26 +84,30 @@ int zmq::msg_t::init ()
u.vsm.type = type_vsm; u.vsm.type = type_vsm;
u.vsm.flags = 0; u.vsm.flags = 0;
u.vsm.size = 0; u.vsm.size = 0;
u.vsm.group[0] = '\0';
u.vsm.routing_id = 0; u.vsm.routing_id = 0;
file_desc = -1; u.vsm.fd = retired_fd;
return 0; return 0;
} }
int zmq::msg_t::init_size (size_t size_) int zmq::msg_t::init_size (size_t size_)
{ {
file_desc = -1;
if (size_ <= max_vsm_size) { if (size_ <= max_vsm_size) {
u.vsm.metadata = NULL; u.vsm.metadata = NULL;
u.vsm.type = type_vsm; u.vsm.type = type_vsm;
u.vsm.flags = 0; u.vsm.flags = 0;
u.vsm.size = (unsigned char) size_; u.vsm.size = (unsigned char) size_;
u.vsm.group[0] = '\0';
u.vsm.routing_id = 0; u.vsm.routing_id = 0;
u.vsm.fd = retired_fd;
} }
else { else {
u.lmsg.metadata = NULL; u.lmsg.metadata = NULL;
u.lmsg.type = type_lmsg; u.lmsg.type = type_lmsg;
u.lmsg.flags = 0; u.lmsg.flags = 0;
u.lmsg.group[0] = '\0';
u.lmsg.routing_id = 0; u.lmsg.routing_id = 0;
u.lmsg.fd = retired_fd;
u.lmsg.content = NULL; u.lmsg.content = NULL;
if (sizeof (content_t) + size_ > size_) if (sizeof (content_t) + size_ > size_)
u.lmsg.content = (content_t*) malloc (sizeof (content_t) + size_); u.lmsg.content = (content_t*) malloc (sizeof (content_t) + size_);
...@@ -121,25 +125,25 @@ int zmq::msg_t::init_size (size_t size_) ...@@ -121,25 +125,25 @@ int zmq::msg_t::init_size (size_t size_)
return 0; return 0;
} }
int zmq::msg_t::init_external_storage(void *data_, size_t size_, zmq::atomic_counter_t* ctr, int zmq::msg_t::init_external_storage(content_t* content_, void* data_, size_t size_,
msg_free_fn *ffn_, void *hint_) msg_free_fn *ffn_, void* hint_)
{ {
zmq_assert(NULL != data_); zmq_assert(NULL != data_);
zmq_assert(NULL != ctr); zmq_assert(NULL != content_);
file_desc = -1;
u.zclmsg.metadata = NULL; u.zclmsg.metadata = NULL;
u.zclmsg.type = type_zclmsg; u.zclmsg.type = type_zclmsg;
u.zclmsg.flags = 0; u.zclmsg.flags = 0;
u.zclmsg.group[0] = '\0';
u.zclmsg.routing_id = 0; u.zclmsg.routing_id = 0;
u.zclmsg.fd = retired_fd;
u.zclmsg.data = data_; u.zclmsg.content = content_;
u.zclmsg.size = size_; u.zclmsg.content->data = data_;
u.zclmsg.ffn = ffn_; u.zclmsg.content->size = size_;
u.zclmsg.hint = hint_; u.zclmsg.content->ffn = ffn_;
u.zclmsg.refcnt = ctr; u.zclmsg.content->hint = hint_;
new (u.zclmsg.refcnt) zmq::atomic_counter_t(); new (&u.zclmsg.content->refcnt) zmq::atomic_counter_t();
return 0; return 0;
} }
...@@ -151,8 +155,6 @@ int zmq::msg_t::init_data (void *data_, size_t size_, ...@@ -151,8 +155,6 @@ int zmq::msg_t::init_data (void *data_, size_t size_,
// would occur once the data is accessed // would occur once the data is accessed
zmq_assert (data_ != NULL || size_ == 0); zmq_assert (data_ != NULL || size_ == 0);
file_desc = -1;
// Initialize constant message if there's no need to deallocate // Initialize constant message if there's no need to deallocate
if (ffn_ == NULL) { if (ffn_ == NULL) {
u.cmsg.metadata = NULL; u.cmsg.metadata = NULL;
...@@ -160,13 +162,17 @@ int zmq::msg_t::init_data (void *data_, size_t size_, ...@@ -160,13 +162,17 @@ int zmq::msg_t::init_data (void *data_, size_t size_,
u.cmsg.flags = 0; u.cmsg.flags = 0;
u.cmsg.data = data_; u.cmsg.data = data_;
u.cmsg.size = size_; u.cmsg.size = size_;
u.cmsg.group[0] = '\0';
u.cmsg.routing_id = 0; u.cmsg.routing_id = 0;
u.cmsg.fd = retired_fd;
} }
else { else {
u.lmsg.metadata = NULL; u.lmsg.metadata = NULL;
u.lmsg.type = type_lmsg; u.lmsg.type = type_lmsg;
u.lmsg.flags = 0; u.lmsg.flags = 0;
u.lmsg.group[0] = '\0';
u.lmsg.routing_id = 0; u.lmsg.routing_id = 0;
u.lmsg.fd = retired_fd;
u.lmsg.content = (content_t*) malloc (sizeof (content_t)); u.lmsg.content = (content_t*) malloc (sizeof (content_t));
if (!u.lmsg.content) { if (!u.lmsg.content) {
errno = ENOMEM; errno = ENOMEM;
...@@ -188,7 +194,31 @@ int zmq::msg_t::init_delimiter () ...@@ -188,7 +194,31 @@ int zmq::msg_t::init_delimiter ()
u.delimiter.metadata = NULL; u.delimiter.metadata = NULL;
u.delimiter.type = type_delimiter; u.delimiter.type = type_delimiter;
u.delimiter.flags = 0; u.delimiter.flags = 0;
u.delimiter.group[0] = '\0';
u.delimiter.routing_id = 0; u.delimiter.routing_id = 0;
u.delimiter.fd = retired_fd;
return 0;
}
int zmq::msg_t::init_join ()
{
u.base.metadata = NULL;
u.base.type = type_join;
u.base.flags = 0;
u.base.group[0] = '\0';
u.base.routing_id = 0;
u.base.fd = retired_fd;
return 0;
}
int zmq::msg_t::init_leave ()
{
u.base.metadata = NULL;
u.base.type = type_leave;
u.base.flags = 0;
u.base.group[0] = '\0';
u.base.routing_id = 0;
u.base.fd = retired_fd;
return 0; return 0;
} }
...@@ -220,19 +250,19 @@ int zmq::msg_t::close () ...@@ -220,19 +250,19 @@ int zmq::msg_t::close ()
if (is_zcmsg()) if (is_zcmsg())
{ {
zmq_assert( u.zclmsg.ffn ); zmq_assert( u.zclmsg.content->ffn );
// If the content is not shared, or if it is shared and the reference // If the content is not shared, or if it is shared and the reference
// count has dropped to zero, deallocate it. // count has dropped to zero, deallocate it.
if (!(u.zclmsg.flags & msg_t::shared) || if (!(u.zclmsg.flags & msg_t::shared) ||
!u.zclmsg.refcnt->sub (1)) { !u.zclmsg.content->refcnt.sub (1)) {
// We used "placement new" operator to initialize the reference // We used "placement new" operator to initialize the reference
// counter so we call the destructor explicitly now. // counter so we call the destructor explicitly now.
u.zclmsg.refcnt->~atomic_counter_t (); u.zclmsg.content->refcnt.~atomic_counter_t ();
u.zclmsg.ffn (u.zclmsg.data, u.zclmsg.content->ffn (u.zclmsg.content->data,
u.zclmsg.hint); u.zclmsg.content->hint);
} }
} }
...@@ -327,7 +357,7 @@ void *zmq::msg_t::data () ...@@ -327,7 +357,7 @@ void *zmq::msg_t::data ()
case type_cmsg: case type_cmsg:
return u.cmsg.data; return u.cmsg.data;
case type_zclmsg: case type_zclmsg:
return u.zclmsg.data; return u.zclmsg.content->data;
default: default:
zmq_assert (false); zmq_assert (false);
return NULL; return NULL;
...@@ -345,7 +375,7 @@ size_t zmq::msg_t::size () ...@@ -345,7 +375,7 @@ size_t zmq::msg_t::size ()
case type_lmsg: case type_lmsg:
return u.lmsg.content->size; return u.lmsg.content->size;
case type_zclmsg: case type_zclmsg:
return u.zclmsg.size; return u.zclmsg.content->size;
case type_cmsg: case type_cmsg:
return u.cmsg.size; return u.cmsg.size;
default: default:
...@@ -369,14 +399,14 @@ void zmq::msg_t::reset_flags (unsigned char flags_) ...@@ -369,14 +399,14 @@ void zmq::msg_t::reset_flags (unsigned char flags_)
u.base.flags &= ~flags_; u.base.flags &= ~flags_;
} }
int64_t zmq::msg_t::fd () zmq::fd_t zmq::msg_t::fd ()
{ {
return file_desc; return u.base.fd;
} }
void zmq::msg_t::set_fd (int64_t fd_) void zmq::msg_t::set_fd (fd_t fd_)
{ {
file_desc = fd_; u.base.fd = fd_;
} }
zmq::metadata_t *zmq::msg_t::metadata () const zmq::metadata_t *zmq::msg_t::metadata () const
...@@ -432,6 +462,16 @@ bool zmq::msg_t::is_zcmsg() const ...@@ -432,6 +462,16 @@ bool zmq::msg_t::is_zcmsg() const
return u.base.type == type_zclmsg; return u.base.type == type_zclmsg;
} }
bool zmq::msg_t::is_join() const
{
return u.base.type == type_join;
}
bool zmq::msg_t::is_leave() const
{
return u.base.type == type_leave;
}
void zmq::msg_t::add_refs (int refs_) void zmq::msg_t::add_refs (int refs_)
{ {
zmq_assert (refs_ >= 0); zmq_assert (refs_ >= 0);
...@@ -485,10 +525,10 @@ bool zmq::msg_t::rm_refs (int refs_) ...@@ -485,10 +525,10 @@ bool zmq::msg_t::rm_refs (int refs_)
return false; return false;
} }
if (is_zcmsg() && !u.zclmsg.refcnt->sub(refs_)) { if (is_zcmsg() && !u.zclmsg.content->refcnt.sub(refs_)) {
// storage for rfcnt is provided externally // storage for rfcnt is provided externally
if (u.zclmsg.ffn) { if (u.zclmsg.content->ffn) {
u.zclmsg.ffn(u.zclmsg.data, u.zclmsg.hint); u.zclmsg.content->ffn(u.zclmsg.content->data, u.zclmsg.content->hint);
} }
return false; return false;
...@@ -518,6 +558,30 @@ int zmq::msg_t::reset_routing_id () ...@@ -518,6 +558,30 @@ int zmq::msg_t::reset_routing_id ()
return 0; return 0;
} }
const char * zmq::msg_t::group ()
{
return u.base.group;
}
int zmq::msg_t::set_group (const char * group_)
{
return set_group (group_, strlen (group_));
}
int zmq::msg_t::set_group (const char * group_, size_t length_)
{
if (length_> ZMQ_GROUP_MAX_LENGTH)
{
errno = EINVAL;
return -1;
}
strncpy (u.base.group, group_, length_);
u.base.group[length_] = '\0';
return 0;
}
zmq::atomic_counter_t *zmq::msg_t::refcnt() zmq::atomic_counter_t *zmq::msg_t::refcnt()
{ {
switch(u.base.type) switch(u.base.type)
...@@ -525,7 +589,7 @@ zmq::atomic_counter_t *zmq::msg_t::refcnt() ...@@ -525,7 +589,7 @@ zmq::atomic_counter_t *zmq::msg_t::refcnt()
case type_lmsg: case type_lmsg:
return &u.lmsg.content->refcnt; return &u.lmsg.content->refcnt;
case type_zclmsg: case type_zclmsg:
return u.zclmsg.refcnt; return &u.zclmsg.content->refcnt;
default: default:
zmq_assert(false); zmq_assert(false);
return NULL; return NULL;
......
...@@ -34,6 +34,7 @@ ...@@ -34,6 +34,7 @@
#include <stdio.h> #include <stdio.h>
#include "config.hpp" #include "config.hpp"
#include "fd.hpp"
#include "atomic_counter.hpp" #include "atomic_counter.hpp"
#include "metadata.hpp" #include "metadata.hpp"
...@@ -55,6 +56,22 @@ namespace zmq ...@@ -55,6 +56,22 @@ namespace zmq
{ {
public: public:
// Shared message buffer. Message data are either allocated in one
// continuous block along with this structure - thus avoiding one
// malloc/free pair or they are stored in used-supplied memory.
// In the latter case, ffn member stores pointer to the function to be
// used to deallocate the data. If the buffer is actually shared (there
// are at least 2 references to it) refcount member contains number of
// references.
struct content_t
{
void *data;
size_t size;
msg_free_fn *ffn;
void *hint;
zmq::atomic_counter_t refcnt;
};
// Message flags. // Message flags.
enum enum
{ {
...@@ -70,14 +87,16 @@ namespace zmq ...@@ -70,14 +87,16 @@ namespace zmq
int init (void* data, size_t size_, int init (void* data, size_t size_,
msg_free_fn* ffn_, void* hint, msg_free_fn* ffn_, void* hint,
zmq::atomic_counter_t* refcnt_ = NULL); content_t* content_ = NULL);
int init_size (size_t size_); int init_size (size_t size_);
int init_data (void *data_, size_t size_, msg_free_fn *ffn_, int init_data (void *data_, size_t size_, msg_free_fn *ffn_,
void *hint_); void *hint_);
int init_external_storage(void *data_, size_t size_, zmq::atomic_counter_t* ctr, int init_external_storage(content_t* content_, void *data_, size_t size_,
msg_free_fn *ffn_, void *hint_); msg_free_fn *ffn_, void *hint_);
int init_delimiter (); int init_delimiter ();
int init_join ();
int init_leave ();
int close (); int close ();
int move (msg_t &src_); int move (msg_t &src_);
int copy (msg_t &src_); int copy (msg_t &src_);
...@@ -86,20 +105,25 @@ namespace zmq ...@@ -86,20 +105,25 @@ namespace zmq
unsigned char flags (); unsigned char flags ();
void set_flags (unsigned char flags_); void set_flags (unsigned char flags_);
void reset_flags (unsigned char flags_); void reset_flags (unsigned char flags_);
int64_t fd (); fd_t fd ();
void set_fd (int64_t fd_); void set_fd (fd_t fd_);
metadata_t *metadata () const; metadata_t *metadata () const;
void set_metadata (metadata_t *metadata_); void set_metadata (metadata_t *metadata_);
void reset_metadata (); void reset_metadata ();
bool is_identity () const; bool is_identity () const;
bool is_credential () const; bool is_credential () const;
bool is_delimiter () const; bool is_delimiter () const;
bool is_join () const;
bool is_leave () const;
bool is_vsm () const; bool is_vsm () const;
bool is_cmsg () const; bool is_cmsg () const;
bool is_zcmsg() const; bool is_zcmsg() const;
uint32_t get_routing_id (); uint32_t get_routing_id ();
int set_routing_id (uint32_t routing_id_); int set_routing_id (uint32_t routing_id_);
int reset_routing_id (); int reset_routing_id ();
const char * group ();
int set_group (const char* group_);
int set_group (const char*, size_t length);
// After calling this function you can copy the message in POD-style // After calling this function you can copy the message in POD-style
// refs_ times. No need to call copy. // refs_ times. No need to call copy.
...@@ -112,27 +136,14 @@ namespace zmq ...@@ -112,27 +136,14 @@ namespace zmq
// Size in bytes of the largest message that is still copied around // Size in bytes of the largest message that is still copied around
// rather than being reference-counted. // rather than being reference-counted.
enum { msg_t_size = 64 }; enum { msg_t_size = 64 };
enum { max_vsm_size = msg_t_size - (8 + sizeof (metadata_t *) + 3 + sizeof(uint32_t)) }; enum { max_vsm_size = msg_t_size - (sizeof (metadata_t *) +
3 +
16 +
sizeof (uint32_t) +
sizeof (fd_t))};
private: private:
zmq::atomic_counter_t* refcnt(); zmq::atomic_counter_t* refcnt();
// Shared message buffer. Message data are either allocated in one
// continuous block along with this structure - thus avoiding one
// malloc/free pair or they are stored in used-supplied memory.
// In the latter case, ffn member stores pointer to the function to be
// used to deallocate the data. If the buffer is actually shared (there
// are at least 2 references to it) refcount member contains number of
// references.
struct content_t
{
void *data;
size_t size;
msg_free_fn *ffn;
void *hint;
zmq::atomic_counter_t refcnt;
};
// Different message types. // Different message types.
enum type_t enum type_t
{ {
...@@ -149,11 +160,14 @@ namespace zmq ...@@ -149,11 +160,14 @@ namespace zmq
// zero-copy LMSG message for v2_decoder // zero-copy LMSG message for v2_decoder
type_zclmsg = 105, type_zclmsg = 105,
type_max = 105 // Join message for radio_dish
}; type_join = 106,
// the file descriptor where this message originated, needs to be 64bit due to alignment // Leave message for radio_dish
int64_t file_desc; type_leave = 107,
type_max = 107
};
// Note that fields shared between different message types are not // Note that fields shared between different message types are not
// moved to the parent class (msg_t). This way we get tighter packing // moved to the parent class (msg_t). This way we get tighter packing
...@@ -162,10 +176,16 @@ namespace zmq ...@@ -162,10 +176,16 @@ namespace zmq
union { union {
struct { struct {
metadata_t *metadata; metadata_t *metadata;
unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + 2 + sizeof(uint32_t))]; unsigned char unused [msg_t_size - (sizeof (metadata_t *) +
2 +
16 +
sizeof (uint32_t) +
sizeof (fd_t))];
unsigned char type; unsigned char type;
unsigned char flags; unsigned char flags;
char group [16];
uint32_t routing_id; uint32_t routing_id;
fd_t fd;
} base; } base;
struct { struct {
metadata_t *metadata; metadata_t *metadata;
...@@ -173,54 +193,69 @@ namespace zmq ...@@ -173,54 +193,69 @@ namespace zmq
unsigned char size; unsigned char size;
unsigned char type; unsigned char type;
unsigned char flags; unsigned char flags;
char group [16];
uint32_t routing_id; uint32_t routing_id;
fd_t fd;
} vsm; } vsm;
struct { struct {
metadata_t *metadata; metadata_t *metadata;
content_t *content; content_t *content;
unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) unsigned char unused [msg_t_size - (sizeof (metadata_t *) +
+ sizeof (content_t*) sizeof (content_t*) +
+ 2 2 +
+ sizeof(uint32_t))]; 16 +
sizeof (uint32_t) +
sizeof (fd_t))];
unsigned char type; unsigned char type;
unsigned char flags; unsigned char flags;
char group [16];
uint32_t routing_id; uint32_t routing_id;
fd_t fd;
} lmsg; } lmsg;
struct { struct {
metadata_t *metadata; metadata_t *metadata;
void *data; content_t *content;
size_t size; unsigned char unused [msg_t_size - (sizeof (metadata_t *) +
msg_free_fn *ffn; sizeof (content_t*) +
void *hint; 2 +
zmq::atomic_counter_t* refcnt; 16 +
unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) sizeof (uint32_t) +
+ sizeof (void*) sizeof (fd_t))];
+ sizeof (size_t)
+ sizeof (msg_free_fn*)
+ sizeof (void*)
+ sizeof (zmq::atomic_counter_t*)
+ 2
+ sizeof(uint32_t))];
unsigned char type; unsigned char type;
unsigned char flags; unsigned char flags;
char group [16];
uint32_t routing_id; uint32_t routing_id;
fd_t fd;
} zclmsg; } zclmsg;
struct { struct {
metadata_t *metadata; metadata_t *metadata;
void* data; void* data;
size_t size; size_t size;
unsigned char unused unsigned char unused [msg_t_size - (sizeof (metadata_t *) +
[msg_t_size - (8 + sizeof (metadata_t *) + sizeof (void*) + sizeof (size_t) + 2 + sizeof(uint32_t))]; sizeof (void*) +
sizeof (size_t) +
2 +
16 +
sizeof (uint32_t) +
sizeof (fd_t))];
unsigned char type; unsigned char type;
unsigned char flags; unsigned char flags;
char group [16];
uint32_t routing_id; uint32_t routing_id;
fd_t fd;
} cmsg; } cmsg;
struct { struct {
metadata_t *metadata; metadata_t *metadata;
unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + 2 + sizeof(uint32_t))]; unsigned char unused [msg_t_size - (sizeof (metadata_t *) +
2 +
16 +
sizeof (uint32_t) +
sizeof (fd_t))];
unsigned char type; unsigned char type;
unsigned char flags; unsigned char flags;
char group [16];
uint32_t routing_id; uint32_t routing_id;
fd_t fd;
} delimiter; } delimiter;
} u; } u;
}; };
......
...@@ -65,15 +65,13 @@ void zmq::radio_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) ...@@ -65,15 +65,13 @@ void zmq::radio_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
void zmq::radio_t::xread_activated (pipe_t *pipe_) void zmq::radio_t::xread_activated (pipe_t *pipe_)
{ {
// There are some subscriptions waiting. Let's process them. // There are some subscriptions waiting. Let's process them.
msg_t sub; msg_t msg;
while (pipe_->read (&sub)) { while (pipe_->read (&msg)) {
// Apply the subscription to the trie // Apply the subscription to the trie
const char * data = (char *) sub.data (); if (msg.is_join () || msg.is_leave ()) {
const size_t size = sub.size (); std::string group = std::string (msg.group ());
if (size > 0 && (*data == 'J' || *data == 'L')) {
std::string group = std::string (data + 1, sub. size() - 1);
if (*data == 'J') if (msg.is_join ())
subscriptions.insert (subscriptions_t::value_type (group, pipe_)); subscriptions.insert (subscriptions_t::value_type (group, pipe_));
else { else {
std::pair<subscriptions_t::iterator, subscriptions_t::iterator> range = std::pair<subscriptions_t::iterator, subscriptions_t::iterator> range =
...@@ -87,7 +85,7 @@ void zmq::radio_t::xread_activated (pipe_t *pipe_) ...@@ -87,7 +85,7 @@ void zmq::radio_t::xread_activated (pipe_t *pipe_)
} }
} }
} }
sub.close (); msg.close ();
} }
} }
...@@ -115,37 +113,13 @@ int zmq::radio_t::xsend (msg_t *msg_) ...@@ -115,37 +113,13 @@ int zmq::radio_t::xsend (msg_t *msg_)
return -1; return -1;
} }
size_t size = msg_->size ();
char *group = (char*) msg_->data();
// Maximum allowed group length is 255
if (size > ZMQ_GROUP_MAX_LENGTH)
size = ZMQ_GROUP_MAX_LENGTH;
// Check if NULL terminated
bool terminated = false;
for (size_t index = 0; index < size; index++) {
if (group[index] == '\0') {
terminated = true;
break;
}
}
if (!terminated) {
// User didn't include a group in the message
errno = EINVAL;
return -1;
}
dist.unmatch (); dist.unmatch ();
std::pair<subscriptions_t::iterator, subscriptions_t::iterator> range = std::pair<subscriptions_t::iterator, subscriptions_t::iterator> range =
subscriptions.equal_range (std::string(group)); subscriptions.equal_range (std::string(msg_->group ()));
for (subscriptions_t::iterator it = range.first; it != range.second; ++it) { for (subscriptions_t::iterator it = range.first; it != range.second; ++it)
dist.match (it-> second); dist.match (it-> second);
}
int rc = dist.send_to_matching (msg_); int rc = dist.send_to_matching (msg_);
...@@ -168,3 +142,93 @@ bool zmq::radio_t::xhas_in () ...@@ -168,3 +142,93 @@ bool zmq::radio_t::xhas_in ()
{ {
return false; return false;
} }
zmq::radio_session_t::radio_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
address_t *addr_) :
session_base_t (io_thread_, connect_, socket_, options_, addr_),
state (group)
{
}
zmq::radio_session_t::~radio_session_t ()
{
}
int zmq::radio_session_t::push_msg (msg_t *msg_)
{
if (msg_->flags() & msg_t::command) {
char *command_data =
static_cast <char *> (msg_->data ());
const size_t data_size = msg_->size ();
int group_length;
char * group;
msg_t join_leave_msg;
int rc;
// Set the msg type to either JOIN or LEAVE
if (data_size >= 5 && memcmp (command_data, "\4JOIN", 5) == 0) {
group_length = data_size - 5;
group = command_data + 5;
rc = join_leave_msg.init_join ();
}
else if (data_size >= 6 && memcmp (command_data, "\5LEAVE", 6) == 0) {
group_length = data_size - 6;
group = command_data + 6;
rc = join_leave_msg.init_leave ();
}
// If it is not a JOIN or LEAVE just push the message
else
return session_base_t::push_msg (msg_);
errno_assert (rc == 0);
// Set the group
rc = join_leave_msg.set_group (group, group_length);
errno_assert (rc == 0);
// Close the current command
rc = msg_->close ();
errno_assert (rc == 0);
// Push the join or leave command
*msg_ = join_leave_msg;
return session_base_t::push_msg (msg_);
}
else
return session_base_t::push_msg (msg_);
}
int zmq::radio_session_t::pull_msg (msg_t *msg_)
{
if (state == group) {
int rc = session_base_t::pull_msg (&pending_msg);
if (rc != 0)
return rc;
const char *group = pending_msg.group ();
int length = strlen (group);
// First frame is the group
msg_->init_size (length);
msg_->set_flags (msg_t::more);
memcpy (msg_->data (), group, length);
// Next status is the body
state = body;
return 0;
}
else {
*msg_ = pending_msg;
state = group;
return 0;
}
}
void zmq::radio_session_t::reset ()
{
session_base_t::reset ();
state = group;
}
...@@ -77,6 +77,31 @@ namespace zmq ...@@ -77,6 +77,31 @@ namespace zmq
const radio_t &operator = (const radio_t&); const radio_t &operator = (const radio_t&);
}; };
class radio_session_t : public session_base_t
{
public:
radio_session_t (zmq::io_thread_t *io_thread_, bool connect_,
zmq::socket_base_t *socket_, const options_t &options_,
address_t *addr_);
~radio_session_t ();
// Overrides of the functions from session_base_t.
int push_msg (msg_t *msg_);
int pull_msg (msg_t *msg_);
void reset ();
private:
enum {
group,
body
} state;
msg_t pending_msg;
radio_session_t (const radio_session_t&);
const radio_session_t &operator = (const radio_session_t&);
};
} }
#endif #endif
...@@ -62,13 +62,13 @@ int zmq::raw_decoder_t::decode (const uint8_t *data_, size_t size_, ...@@ -62,13 +62,13 @@ int zmq::raw_decoder_t::decode (const uint8_t *data_, size_t size_,
{ {
int rc = in_progress.init ((unsigned char*)data_, size_, int rc = in_progress.init ((unsigned char*)data_, size_,
shared_message_memory_allocator::call_dec_ref, shared_message_memory_allocator::call_dec_ref,
allocator.buffer(), allocator.buffer (),
allocator.provide_refcnt() ); allocator.provide_content ());
// if the buffer serves as memory for a zero-copy message, release it // if the buffer serves as memory for a zero-copy message, release it
// and allocate a new buffer in get_buffer for the next decode // and allocate a new buffer in get_buffer for the next decode
if (in_progress.is_zcmsg()) { if (in_progress.is_zcmsg ()) {
allocator.advance_refcnt(); allocator.advance_content();
allocator.release(); allocator.release();
} }
......
...@@ -45,6 +45,8 @@ ...@@ -45,6 +45,8 @@
#include "ctx.hpp" #include "ctx.hpp"
#include "req.hpp" #include "req.hpp"
#include "radio.hpp"
#include "dish.hpp"
zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
bool active_, class socket_base_t *socket_, const options_t &options_, bool active_, class socket_base_t *socket_, const options_t &options_,
...@@ -56,6 +58,14 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, ...@@ -56,6 +58,14 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
s = new (std::nothrow) req_session_t (io_thread_, active_, s = new (std::nothrow) req_session_t (io_thread_, active_,
socket_, options_, addr_); socket_, options_, addr_);
break; break;
case ZMQ_RADIO:
s = new (std::nothrow) radio_session_t (io_thread_, active_,
socket_, options_, addr_);
break;
case ZMQ_DISH:
s = new (std::nothrow) dish_session_t (io_thread_, active_,
socket_, options_, addr_);
break;
case ZMQ_DEALER: case ZMQ_DEALER:
case ZMQ_REP: case ZMQ_REP:
case ZMQ_ROUTER: case ZMQ_ROUTER:
...@@ -69,8 +79,6 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, ...@@ -69,8 +79,6 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
case ZMQ_STREAM: case ZMQ_STREAM:
case ZMQ_SERVER: case ZMQ_SERVER:
case ZMQ_CLIENT: case ZMQ_CLIENT:
case ZMQ_RADIO:
case ZMQ_DISH:
s = new (std::nothrow) session_base_t (io_thread_, active_, s = new (std::nothrow) session_base_t (io_thread_, active_,
socket_, options_, addr_); socket_, options_, addr_);
break; break;
......
...@@ -127,12 +127,12 @@ int zmq::v2_decoder_t::size_ready(uint64_t msg_size, unsigned char const* read_p ...@@ -127,12 +127,12 @@ int zmq::v2_decoder_t::size_ready(uint64_t msg_size, unsigned char const* read_p
// if the message will be a large message, pass a valid refcnt memory location as well // if the message will be a large message, pass a valid refcnt memory location as well
rc = in_progress.init ((unsigned char *) read_pos, static_cast <size_t> (msg_size), rc = in_progress.init ((unsigned char *) read_pos, static_cast <size_t> (msg_size),
shared_message_memory_allocator::call_dec_ref, buffer(), shared_message_memory_allocator::call_dec_ref, buffer(),
provide_refcnt ()); provide_content ());
// For small messages, data has been copied and refcount does not have to be increased // For small messages, data has been copied and refcount does not have to be increased
if (in_progress.is_zcmsg()) if (in_progress.is_zcmsg())
{ {
advance_refcnt(); advance_content();
inc_ref(); inc_ref();
} }
} }
......
...@@ -695,6 +695,16 @@ uint32_t zmq_msg_routing_id (zmq_msg_t *msg_) ...@@ -695,6 +695,16 @@ uint32_t zmq_msg_routing_id (zmq_msg_t *msg_)
return ((zmq::msg_t *) msg_)->get_routing_id (); return ((zmq::msg_t *) msg_)->get_routing_id ();
} }
int zmq_msg_set_group (zmq_msg_t *msg_, const char *group_)
{
return ((zmq::msg_t *) msg_)->set_group (group_);
}
const char *zmq_msg_group (zmq_msg_t *msg_)
{
return ((zmq::msg_t *) msg_)->group ();
}
// Get message metadata string // Get message metadata string
const char *zmq_msg_gets (zmq_msg_t *msg_, const char *property_) const char *zmq_msg_gets (zmq_msg_t *msg_, const char *property_)
......
...@@ -29,6 +29,57 @@ ...@@ -29,6 +29,57 @@
#include "testutil.hpp" #include "testutil.hpp"
int msg_send (zmq_msg_t *msg_, void *s_, const char* group_, const char* body_)
{
int rc = zmq_msg_init_size (msg_, strlen (body_));
if (rc != 0)
return rc;
memcpy (zmq_msg_data (msg_), body_, strlen (body_));
rc = zmq_msg_set_group (msg_, group_);
if (rc != 0) {
zmq_msg_close (msg_);
return rc;
}
rc = zmq_msg_send (msg_, s_, 0);
zmq_msg_close (msg_);
return rc;
}
int msg_recv_cmp (zmq_msg_t *msg_, void *s_, const char* group_, const char* body_)
{
int rc = zmq_msg_init (msg_);
if (rc != 0)
return -1;
int recv_rc = zmq_msg_recv (msg_, s_, 0);
if (recv_rc == -1)
return -1;
if (strcmp (zmq_msg_group (msg_), group_) != 0)
{
zmq_msg_close (msg_);
return -1;
}
char * body = (char*) malloc (sizeof(char) * (zmq_msg_size (msg_) + 1));
memcpy (body, zmq_msg_data (msg_), zmq_msg_size (msg_));
body [zmq_msg_size (msg_)] = '\0';
if (strcmp (body, body_) != 0)
{
zmq_msg_close (msg_);
return -1;
}
zmq_msg_close (msg_);
return recv_rc;
}
int main (void) int main (void)
{ {
setup_test_environment (); setup_test_environment ();
...@@ -38,11 +89,11 @@ int main (void) ...@@ -38,11 +89,11 @@ int main (void)
void *radio = zmq_socket (ctx, ZMQ_RADIO); void *radio = zmq_socket (ctx, ZMQ_RADIO);
void *dish = zmq_socket (ctx, ZMQ_DISH); void *dish = zmq_socket (ctx, ZMQ_DISH);
int rc = zmq_bind (radio, "inproc://test-radio-dish"); int rc = zmq_bind (radio, "tcp://127.0.0.1:5556");
assert (rc == 0); assert (rc == 0);
// Leaving a group which we didn't join // Leaving a group which we didn't join
rc = zmq_leave (dish, "World"); rc = zmq_leave (dish, "Movies");
assert (rc == -1); assert (rc == -1);
// Joining too long group // Joining too long group
...@@ -54,66 +105,64 @@ int main (void) ...@@ -54,66 +105,64 @@ int main (void)
assert (rc == -1); assert (rc == -1);
// Joining // Joining
rc = zmq_join (dish, "World"); rc = zmq_join (dish, "Movies");
assert (rc == 0); assert (rc == 0);
// Duplicate Joining // Duplicate Joining
rc = zmq_join (dish, "World"); rc = zmq_join (dish, "Movies");
assert (rc == -1); assert (rc == -1);
// Connecting // Connecting
rc = zmq_connect (dish, "inproc://test-radio-dish"); rc = zmq_connect (dish, "tcp://127.0.0.1:5556");
assert (rc == 0); assert (rc == 0);
zmq_sleep (1); zmq_sleep (1);
// This is not going to be sent as dish only subscribe to "World" zmq_msg_t msg;
rc = zmq_send (radio, "Hello\0Message", 13, 0);
assert (rc == 13);
// This is going to be sent to the dish // This is not going to be sent as dish only subscribe to "Movies"
rc = zmq_send (radio, "World\0Message", 13, 0); rc = msg_send (&msg, radio, "TV", "Friends");
assert (rc == 13); assert (rc == 7);
char* data = (char*) malloc (sizeof(char) * 13); // This is going to be sent to the dish
rc = msg_send (&msg, radio, "Movies", "Godfather");
assert (rc == 9);
rc = zmq_recv (dish, data, 13, 0); // Check the correct message arrived
assert (rc == 13); rc = msg_recv_cmp (&msg, dish, "Movies", "Godfather");
assert (strcmp (data, "World") == 0); assert (rc == 9);
// Join group during connection optvallen // Join group during connection optvallen
rc = zmq_join (dish, "Hello"); rc = zmq_join (dish, "TV");
assert (rc == 0); assert (rc == 0);
zmq_sleep (1); zmq_sleep (1);
// This should arrive now as we joined the group // This should arrive now as we joined the group
rc = zmq_send (radio, "Hello\0Message", 13, 0); rc = msg_send (&msg, radio, "TV", "Friends");
assert (rc == 13); assert (rc == 7);
rc = zmq_recv (dish, data, 13, 0); // Check the correct message arrived
assert (rc == 13); rc = msg_recv_cmp (&msg, dish, "TV", "Friends");
assert (strcmp (data, "Hello") == 0); assert (rc == 7);
// Leaving group // Leaving groupr
rc = zmq_leave (dish, "Hello"); rc = zmq_leave (dish, "TV");
assert (rc == 0); assert (rc == 0);
zmq_sleep (1); zmq_sleep (1);
// This is not going to be sent as dish only subscribe to "World" // This is not going to be sent as dish only subscribe to "Movies"
rc = zmq_send (radio, "Hello\0Message", 13, 0); rc = msg_send (&msg, radio, "TV", "Friends");
assert (rc == 13); assert (rc == 7);
// This is going to be sent to the dish // This is going to be sent to the dish
rc = zmq_send (radio, "World\0Message", 13, 0); rc = msg_send (&msg, radio, "Movies", "Godfather");
assert (rc == 13); assert (rc == 9);
rc = zmq_recv (dish, data, 13, 0);
assert (rc == 13);
assert (strcmp (data, "World") == 0);
free (data); // Check the correct message arrived
rc = msg_recv_cmp (&msg, dish, "Movies", "Godfather");
assert (rc == 9);
rc = zmq_close (dish); rc = zmq_close (dish);
assert (rc == 0); assert (rc == 0);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment