Commit 68675e23 authored by somdoron's avatar somdoron

adds group to zmq_msg

parent b2718149
...@@ -239,6 +239,8 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int property, int optval); ...@@ -239,6 +239,8 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int property, int optval);
ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property); ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property);
ZMQ_EXPORT int zmq_msg_set_routing_id (zmq_msg_t *msg, uint32_t routing_id); ZMQ_EXPORT int zmq_msg_set_routing_id (zmq_msg_t *msg, uint32_t routing_id);
ZMQ_EXPORT uint32_t zmq_msg_routing_id (zmq_msg_t *msg); ZMQ_EXPORT uint32_t zmq_msg_routing_id (zmq_msg_t *msg);
ZMQ_EXPORT int zmq_msg_set_group (zmq_msg_t *msg, const char *group);
ZMQ_EXPORT const char *zmq_msg_group (zmq_msg_t *msg);
/******************************************************************************/ /******************************************************************************/
...@@ -359,7 +361,7 @@ ZMQ_EXPORT uint32_t zmq_msg_routing_id (zmq_msg_t *msg); ...@@ -359,7 +361,7 @@ ZMQ_EXPORT uint32_t zmq_msg_routing_id (zmq_msg_t *msg);
#define ZMQ_GSSAPI 3 #define ZMQ_GSSAPI 3
/* RADIO-DISH protocol */ /* RADIO-DISH protocol */
#define ZMQ_GROUP_MAX_LENGTH 255 #define ZMQ_GROUP_MAX_LENGTH 15
/* Deprecated options and aliases */ /* Deprecated options and aliases */
#define ZMQ_TCP_ACCEPT_FILTER 38 #define ZMQ_TCP_ACCEPT_FILTER 38
......
...@@ -84,6 +84,7 @@ int zmq::msg_t::init () ...@@ -84,6 +84,7 @@ int zmq::msg_t::init ()
u.vsm.type = type_vsm; u.vsm.type = type_vsm;
u.vsm.flags = 0; u.vsm.flags = 0;
u.vsm.size = 0; u.vsm.size = 0;
u.vsm.group[0] = '\0';
u.vsm.routing_id = 0; u.vsm.routing_id = 0;
u.vsm.fd = retired_fd; u.vsm.fd = retired_fd;
return 0; return 0;
...@@ -96,6 +97,7 @@ int zmq::msg_t::init_size (size_t size_) ...@@ -96,6 +97,7 @@ int zmq::msg_t::init_size (size_t size_)
u.vsm.type = type_vsm; u.vsm.type = type_vsm;
u.vsm.flags = 0; u.vsm.flags = 0;
u.vsm.size = (unsigned char) size_; u.vsm.size = (unsigned char) size_;
u.vsm.group[0] = '\0';
u.vsm.routing_id = 0; u.vsm.routing_id = 0;
u.vsm.fd = retired_fd; u.vsm.fd = retired_fd;
} }
...@@ -103,6 +105,7 @@ int zmq::msg_t::init_size (size_t size_) ...@@ -103,6 +105,7 @@ int zmq::msg_t::init_size (size_t size_)
u.lmsg.metadata = NULL; u.lmsg.metadata = NULL;
u.lmsg.type = type_lmsg; u.lmsg.type = type_lmsg;
u.lmsg.flags = 0; u.lmsg.flags = 0;
u.lmsg.group[0] = '\0';
u.lmsg.routing_id = 0; u.lmsg.routing_id = 0;
u.lmsg.fd = retired_fd; u.lmsg.fd = retired_fd;
u.lmsg.content = NULL; u.lmsg.content = NULL;
...@@ -131,6 +134,7 @@ int zmq::msg_t::init_external_storage(content_t* content_, void* data_, size_t s ...@@ -131,6 +134,7 @@ int zmq::msg_t::init_external_storage(content_t* content_, void* data_, size_t s
u.zclmsg.metadata = NULL; u.zclmsg.metadata = NULL;
u.zclmsg.type = type_zclmsg; u.zclmsg.type = type_zclmsg;
u.zclmsg.flags = 0; u.zclmsg.flags = 0;
u.zclmsg.group[0] = '\0';
u.zclmsg.routing_id = 0; u.zclmsg.routing_id = 0;
u.zclmsg.fd = retired_fd; u.zclmsg.fd = retired_fd;
...@@ -158,6 +162,7 @@ int zmq::msg_t::init_data (void *data_, size_t size_, ...@@ -158,6 +162,7 @@ int zmq::msg_t::init_data (void *data_, size_t size_,
u.cmsg.flags = 0; u.cmsg.flags = 0;
u.cmsg.data = data_; u.cmsg.data = data_;
u.cmsg.size = size_; u.cmsg.size = size_;
u.cmsg.group[0] = '\0';
u.cmsg.routing_id = 0; u.cmsg.routing_id = 0;
u.cmsg.fd = retired_fd; u.cmsg.fd = retired_fd;
} }
...@@ -165,6 +170,7 @@ int zmq::msg_t::init_data (void *data_, size_t size_, ...@@ -165,6 +170,7 @@ int zmq::msg_t::init_data (void *data_, size_t size_,
u.lmsg.metadata = NULL; u.lmsg.metadata = NULL;
u.lmsg.type = type_lmsg; u.lmsg.type = type_lmsg;
u.lmsg.flags = 0; u.lmsg.flags = 0;
u.lmsg.group[0] = '\0';
u.lmsg.routing_id = 0; u.lmsg.routing_id = 0;
u.lmsg.fd = retired_fd; u.lmsg.fd = retired_fd;
u.lmsg.content = (content_t*) malloc (sizeof (content_t)); u.lmsg.content = (content_t*) malloc (sizeof (content_t));
...@@ -188,6 +194,7 @@ int zmq::msg_t::init_delimiter () ...@@ -188,6 +194,7 @@ int zmq::msg_t::init_delimiter ()
u.delimiter.metadata = NULL; u.delimiter.metadata = NULL;
u.delimiter.type = type_delimiter; u.delimiter.type = type_delimiter;
u.delimiter.flags = 0; u.delimiter.flags = 0;
u.delimiter.group[0] = '\0';
u.delimiter.routing_id = 0; u.delimiter.routing_id = 0;
u.delimiter.fd = retired_fd; u.delimiter.fd = retired_fd;
return 0; return 0;
...@@ -519,6 +526,24 @@ int zmq::msg_t::reset_routing_id () ...@@ -519,6 +526,24 @@ int zmq::msg_t::reset_routing_id ()
return 0; return 0;
} }
const char * zmq::msg_t::group ()
{
return u.base.group;
}
int zmq::msg_t::set_group (const char * group_)
{
if (strlen (group_) > ZMQ_GROUP_MAX_LENGTH)
{
errno = EINVAL;
return -1;
}
strcpy (u.base.group, group_);
return 0;
}
zmq::atomic_counter_t *zmq::msg_t::refcnt() zmq::atomic_counter_t *zmq::msg_t::refcnt()
{ {
switch(u.base.type) switch(u.base.type)
......
...@@ -117,6 +117,8 @@ namespace zmq ...@@ -117,6 +117,8 @@ namespace zmq
uint32_t get_routing_id (); uint32_t get_routing_id ();
int set_routing_id (uint32_t routing_id_); int set_routing_id (uint32_t routing_id_);
int reset_routing_id (); int reset_routing_id ();
const char * group ();
int set_group (const char* group_);
// After calling this function you can copy the message in POD-style // After calling this function you can copy the message in POD-style
// refs_ times. No need to call copy. // refs_ times. No need to call copy.
...@@ -131,8 +133,9 @@ namespace zmq ...@@ -131,8 +133,9 @@ namespace zmq
enum { msg_t_size = 64 }; enum { msg_t_size = 64 };
enum { max_vsm_size = msg_t_size - (sizeof (metadata_t *) + enum { max_vsm_size = msg_t_size - (sizeof (metadata_t *) +
3 + 3 +
16 +
sizeof (uint32_t) + sizeof (uint32_t) +
sizeof (fd_t)) }; sizeof (fd_t))};
private: private:
zmq::atomic_counter_t* refcnt(); zmq::atomic_counter_t* refcnt();
...@@ -165,10 +168,12 @@ namespace zmq ...@@ -165,10 +168,12 @@ namespace zmq
metadata_t *metadata; metadata_t *metadata;
unsigned char unused [msg_t_size - (sizeof (metadata_t *) + unsigned char unused [msg_t_size - (sizeof (metadata_t *) +
2 + 2 +
16 +
sizeof (uint32_t) + sizeof (uint32_t) +
sizeof (fd_t))]; sizeof (fd_t))];
unsigned char type; unsigned char type;
unsigned char flags; unsigned char flags;
char group [16];
uint32_t routing_id; uint32_t routing_id;
fd_t fd; fd_t fd;
} base; } base;
...@@ -178,6 +183,7 @@ namespace zmq ...@@ -178,6 +183,7 @@ namespace zmq
unsigned char size; unsigned char size;
unsigned char type; unsigned char type;
unsigned char flags; unsigned char flags;
char group [16];
uint32_t routing_id; uint32_t routing_id;
fd_t fd; fd_t fd;
} vsm; } vsm;
...@@ -187,10 +193,12 @@ namespace zmq ...@@ -187,10 +193,12 @@ namespace zmq
unsigned char unused [msg_t_size - (sizeof (metadata_t *) + unsigned char unused [msg_t_size - (sizeof (metadata_t *) +
sizeof (content_t*) + sizeof (content_t*) +
2 + 2 +
16 +
sizeof (uint32_t) + sizeof (uint32_t) +
sizeof (fd_t))]; sizeof (fd_t))];
unsigned char type; unsigned char type;
unsigned char flags; unsigned char flags;
char group [16];
uint32_t routing_id; uint32_t routing_id;
fd_t fd; fd_t fd;
} lmsg; } lmsg;
...@@ -200,10 +208,12 @@ namespace zmq ...@@ -200,10 +208,12 @@ namespace zmq
unsigned char unused [msg_t_size - (sizeof (metadata_t *) + unsigned char unused [msg_t_size - (sizeof (metadata_t *) +
sizeof (content_t*) + sizeof (content_t*) +
2 + 2 +
16 +
sizeof (uint32_t) + sizeof (uint32_t) +
sizeof (fd_t))]; sizeof (fd_t))];
unsigned char type; unsigned char type;
unsigned char flags; unsigned char flags;
char group [16];
uint32_t routing_id; uint32_t routing_id;
fd_t fd; fd_t fd;
} zclmsg; } zclmsg;
...@@ -215,10 +225,12 @@ namespace zmq ...@@ -215,10 +225,12 @@ namespace zmq
sizeof (void*) + sizeof (void*) +
sizeof (size_t) + sizeof (size_t) +
2 + 2 +
16 +
sizeof (uint32_t) + sizeof (uint32_t) +
sizeof (fd_t))]; sizeof (fd_t))];
unsigned char type; unsigned char type;
unsigned char flags; unsigned char flags;
char group [16];
uint32_t routing_id; uint32_t routing_id;
fd_t fd; fd_t fd;
} cmsg; } cmsg;
...@@ -226,10 +238,12 @@ namespace zmq ...@@ -226,10 +238,12 @@ namespace zmq
metadata_t *metadata; metadata_t *metadata;
unsigned char unused [msg_t_size - (sizeof (metadata_t *) + unsigned char unused [msg_t_size - (sizeof (metadata_t *) +
2 + 2 +
16 +
sizeof (uint32_t) + sizeof (uint32_t) +
sizeof (fd_t))]; sizeof (fd_t))];
unsigned char type; unsigned char type;
unsigned char flags; unsigned char flags;
char group [16];
uint32_t routing_id; uint32_t routing_id;
fd_t fd; fd_t fd;
} delimiter; } delimiter;
......
...@@ -115,33 +115,10 @@ int zmq::radio_t::xsend (msg_t *msg_) ...@@ -115,33 +115,10 @@ int zmq::radio_t::xsend (msg_t *msg_)
return -1; return -1;
} }
size_t size = msg_->size ();
char *group = (char*) msg_->data();
// Maximum allowed group length is 255
if (size > ZMQ_GROUP_MAX_LENGTH)
size = ZMQ_GROUP_MAX_LENGTH;
// Check if NULL terminated
bool terminated = false;
for (size_t index = 0; index < size; index++) {
if (group[index] == '\0') {
terminated = true;
break;
}
}
if (!terminated) {
// User didn't include a group in the message
errno = EINVAL;
return -1;
}
dist.unmatch (); dist.unmatch ();
std::pair<subscriptions_t::iterator, subscriptions_t::iterator> range = std::pair<subscriptions_t::iterator, subscriptions_t::iterator> range =
subscriptions.equal_range (std::string(group)); subscriptions.equal_range (std::string(msg_->group ()));
for (subscriptions_t::iterator it = range.first; it != range.second; ++it) { for (subscriptions_t::iterator it = range.first; it != range.second; ++it) {
dist.match (it-> second); dist.match (it-> second);
......
...@@ -695,6 +695,16 @@ uint32_t zmq_msg_routing_id (zmq_msg_t *msg_) ...@@ -695,6 +695,16 @@ uint32_t zmq_msg_routing_id (zmq_msg_t *msg_)
return ((zmq::msg_t *) msg_)->get_routing_id (); return ((zmq::msg_t *) msg_)->get_routing_id ();
} }
int zmq_msg_set_group (zmq_msg_t *msg_, const char *group_)
{
return ((zmq::msg_t *) msg_)->set_group (group_);
}
const char *zmq_msg_group (zmq_msg_t *msg_)
{
return ((zmq::msg_t *) msg_)->group ();
}
// Get message metadata string // Get message metadata string
const char *zmq_msg_gets (zmq_msg_t *msg_, const char *property_) const char *zmq_msg_gets (zmq_msg_t *msg_, const char *property_)
......
...@@ -29,6 +29,57 @@ ...@@ -29,6 +29,57 @@
#include "testutil.hpp" #include "testutil.hpp"
int msg_send (zmq_msg_t *msg_, void *s_, const char* group_, const char* body_)
{
int rc = zmq_msg_init_size (msg_, strlen (body_));
if (rc != 0)
return rc;
memcpy (zmq_msg_data (msg_), body_, strlen (body_));
rc = zmq_msg_set_group (msg_, group_);
if (rc != 0) {
zmq_msg_close (msg_);
return rc;
}
rc = zmq_msg_send (msg_, s_, 0);
zmq_msg_close (msg_);
return rc;
}
int msg_recv_cmp (zmq_msg_t *msg_, void *s_, const char* group_, const char* body_)
{
int rc = zmq_msg_init (msg_);
if (rc != 0)
return -1;
int recv_rc = zmq_msg_recv (msg_, s_, 0);
if (recv_rc == -1)
return -1;
if (strcmp (zmq_msg_group (msg_), group_) != 0)
{
zmq_msg_close (msg_);
return -1;
}
char * body = (char*) malloc (sizeof(char) * (zmq_msg_size (msg_) + 1));
memcpy (body, zmq_msg_data (msg_), zmq_msg_size (msg_));
body [zmq_msg_size (msg_)] = '\0';
if (strcmp (body, body_) != 0)
{
zmq_msg_close (msg_);
return -1;
}
zmq_msg_close (msg_);
return recv_rc;
}
int main (void) int main (void)
{ {
setup_test_environment (); setup_test_environment ();
...@@ -42,7 +93,7 @@ int main (void) ...@@ -42,7 +93,7 @@ int main (void)
assert (rc == 0); assert (rc == 0);
// Leaving a group which we didn't join // Leaving a group which we didn't join
rc = zmq_leave (dish, "World"); rc = zmq_leave (dish, "Movies");
assert (rc == -1); assert (rc == -1);
// Joining too long group // Joining too long group
...@@ -54,11 +105,11 @@ int main (void) ...@@ -54,11 +105,11 @@ int main (void)
assert (rc == -1); assert (rc == -1);
// Joining // Joining
rc = zmq_join (dish, "World"); rc = zmq_join (dish, "Movies");
assert (rc == 0); assert (rc == 0);
// Duplicate Joining // Duplicate Joining
rc = zmq_join (dish, "World"); rc = zmq_join (dish, "Movies");
assert (rc == -1); assert (rc == -1);
// Connecting // Connecting
...@@ -67,53 +118,51 @@ int main (void) ...@@ -67,53 +118,51 @@ int main (void)
zmq_sleep (1); zmq_sleep (1);
// This is not going to be sent as dish only subscribe to "World" zmq_msg_t msg;
rc = zmq_send (radio, "Hello\0Message", 13, 0);
assert (rc == 13);
// This is going to be sent to the dish // This is not going to be sent as dish only subscribe to "Movies"
rc = zmq_send (radio, "World\0Message", 13, 0); rc = msg_send (&msg, radio, "TV", "Friends");
assert (rc == 13); assert (rc == 7);
char* data = (char*) malloc (sizeof(char) * 13); // This is going to be sent to the dish
rc = msg_send (&msg, radio, "Movies", "Godfather");
assert (rc == 9);
rc = zmq_recv (dish, data, 13, 0); // Check the correct message arrived
assert (rc == 13); rc = msg_recv_cmp (&msg, dish, "Movies", "Godfather");
assert (strcmp (data, "World") == 0); assert (rc == 9);
// Join group during connection optvallen // Join group during connection optvallen
rc = zmq_join (dish, "Hello"); rc = zmq_join (dish, "TV");
assert (rc == 0); assert (rc == 0);
zmq_sleep (1); zmq_sleep (1);
// This should arrive now as we joined the group // This should arrive now as we joined the group
rc = zmq_send (radio, "Hello\0Message", 13, 0); rc = msg_send (&msg, radio, "TV", "Friends");
assert (rc == 13); assert (rc == 7);
rc = zmq_recv (dish, data, 13, 0); // Check the correct message arrived
assert (rc == 13); rc = msg_recv_cmp (&msg, dish, "TV", "Friends");
assert (strcmp (data, "Hello") == 0); assert (rc == 7);
// Leaving group // Leaving groupr
rc = zmq_leave (dish, "Hello"); rc = zmq_leave (dish, "TV");
assert (rc == 0); assert (rc == 0);
zmq_sleep (1); zmq_sleep (1);
// This is not going to be sent as dish only subscribe to "World" // This is not going to be sent as dish only subscribe to "Movies"
rc = zmq_send (radio, "Hello\0Message", 13, 0); rc = msg_send (&msg, radio, "TV", "Friends");
assert (rc == 13); assert (rc == 7);
// This is going to be sent to the dish // This is going to be sent to the dish
rc = zmq_send (radio, "World\0Message", 13, 0); rc = msg_send (&msg, radio, "Movies", "Godfather");
assert (rc == 13); assert (rc == 9);
rc = zmq_recv (dish, data, 13, 0);
assert (rc == 13);
assert (strcmp (data, "World") == 0);
free (data); // Check the correct message arrived
rc = msg_recv_cmp (&msg, dish, "Movies", "Godfather");
assert (rc == 9);
rc = zmq_close (dish); rc = zmq_close (dish);
assert (rc == 0); assert (rc == 0);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment