Unverified Commit 821685f2 authored by Luca Boccassi's avatar Luca Boccassi Committed by GitHub

Merge pull request #3822 from somdoron/GROUPMAXLENGTH

problem: maximum size of group doesn't match the RFC maximum size
parents f17a794d 05194eb5
...@@ -397,7 +397,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (const zmq_msg_t *msg_, ...@@ -397,7 +397,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (const zmq_msg_t *msg_,
#define ZMQ_GSSAPI 3 #define ZMQ_GSSAPI 3
/* RADIO-DISH protocol */ /* RADIO-DISH protocol */
#define ZMQ_GROUP_MAX_LENGTH 15 #define ZMQ_GROUP_MAX_LENGTH 255
/* Deprecated options and aliases */ /* Deprecated options and aliases */
#define ZMQ_IDENTITY ZMQ_ROUTING_ID #define ZMQ_IDENTITY ZMQ_ROUTING_ID
......
...@@ -233,7 +233,6 @@ void zmq::dish_t::send_subscriptions (pipe_t *pipe_) ...@@ -233,7 +233,6 @@ void zmq::dish_t::send_subscriptions (pipe_t *pipe_)
// Send it to the pipe. // Send it to the pipe.
pipe_->write (&msg); pipe_->write (&msg);
msg.close ();
} }
pipe_->flush (); pipe_->flush ();
......
...@@ -178,9 +178,7 @@ void zmq::dist_t::distribute (msg_t *msg_) ...@@ -178,9 +178,7 @@ void zmq::dist_t::distribute (msg_t *msg_)
++i; ++i;
} }
} }
int rc = msg_->close (); int rc = msg_->init ();
errno_assert (rc == 0);
rc = msg_->init ();
errno_assert (rc == 0); errno_assert (rc == 0);
return; return;
} }
......
...@@ -79,7 +79,8 @@ int zmq::msg_t::init () ...@@ -79,7 +79,8 @@ int zmq::msg_t::init ()
_u.vsm.type = type_vsm; _u.vsm.type = type_vsm;
_u.vsm.flags = 0; _u.vsm.flags = 0;
_u.vsm.size = 0; _u.vsm.size = 0;
_u.vsm.group[0] = '\0'; _u.vsm.group.sgroup.group[0] = '\0';
_u.vsm.group.type = group_type_short;
_u.vsm.routing_id = 0; _u.vsm.routing_id = 0;
return 0; return 0;
} }
...@@ -91,13 +92,15 @@ int zmq::msg_t::init_size (size_t size_) ...@@ -91,13 +92,15 @@ int zmq::msg_t::init_size (size_t size_)
_u.vsm.type = type_vsm; _u.vsm.type = type_vsm;
_u.vsm.flags = 0; _u.vsm.flags = 0;
_u.vsm.size = static_cast<unsigned char> (size_); _u.vsm.size = static_cast<unsigned char> (size_);
_u.vsm.group[0] = '\0'; _u.vsm.group.sgroup.group[0] = '\0';
_u.vsm.group.type = group_type_short;
_u.vsm.routing_id = 0; _u.vsm.routing_id = 0;
} else { } else {
_u.lmsg.metadata = NULL; _u.lmsg.metadata = NULL;
_u.lmsg.type = type_lmsg; _u.lmsg.type = type_lmsg;
_u.lmsg.flags = 0; _u.lmsg.flags = 0;
_u.lmsg.group[0] = '\0'; _u.lmsg.group.sgroup.group[0] = '\0';
_u.lmsg.group.type = group_type_short;
_u.lmsg.routing_id = 0; _u.lmsg.routing_id = 0;
_u.lmsg.content = NULL; _u.lmsg.content = NULL;
if (sizeof (content_t) + size_ > size_) if (sizeof (content_t) + size_ > size_)
...@@ -129,7 +132,8 @@ int zmq::msg_t::init_external_storage (content_t *content_, ...@@ -129,7 +132,8 @@ int zmq::msg_t::init_external_storage (content_t *content_,
_u.zclmsg.metadata = NULL; _u.zclmsg.metadata = NULL;
_u.zclmsg.type = type_zclmsg; _u.zclmsg.type = type_zclmsg;
_u.zclmsg.flags = 0; _u.zclmsg.flags = 0;
_u.zclmsg.group[0] = '\0'; _u.zclmsg.group.sgroup.group[0] = '\0';
_u.zclmsg.group.type = group_type_short;
_u.zclmsg.routing_id = 0; _u.zclmsg.routing_id = 0;
_u.zclmsg.content = content_; _u.zclmsg.content = content_;
...@@ -158,13 +162,15 @@ int zmq::msg_t::init_data (void *data_, ...@@ -158,13 +162,15 @@ int zmq::msg_t::init_data (void *data_,
_u.cmsg.flags = 0; _u.cmsg.flags = 0;
_u.cmsg.data = data_; _u.cmsg.data = data_;
_u.cmsg.size = size_; _u.cmsg.size = size_;
_u.cmsg.group[0] = '\0'; _u.cmsg.group.sgroup.group[0] = '\0';
_u.cmsg.group.type = group_type_short;
_u.cmsg.routing_id = 0; _u.cmsg.routing_id = 0;
} else { } else {
_u.lmsg.metadata = NULL; _u.lmsg.metadata = NULL;
_u.lmsg.type = type_lmsg; _u.lmsg.type = type_lmsg;
_u.lmsg.flags = 0; _u.lmsg.flags = 0;
_u.lmsg.group[0] = '\0'; _u.lmsg.group.sgroup.group[0] = '\0';
_u.lmsg.group.type = group_type_short;
_u.lmsg.routing_id = 0; _u.lmsg.routing_id = 0;
_u.lmsg.content = _u.lmsg.content =
static_cast<content_t *> (malloc (sizeof (content_t))); static_cast<content_t *> (malloc (sizeof (content_t)));
...@@ -187,7 +193,8 @@ int zmq::msg_t::init_delimiter () ...@@ -187,7 +193,8 @@ int zmq::msg_t::init_delimiter ()
_u.delimiter.metadata = NULL; _u.delimiter.metadata = NULL;
_u.delimiter.type = type_delimiter; _u.delimiter.type = type_delimiter;
_u.delimiter.flags = 0; _u.delimiter.flags = 0;
_u.delimiter.group[0] = '\0'; _u.delimiter.group.sgroup.group[0] = '\0';
_u.delimiter.group.type = group_type_short;
_u.delimiter.routing_id = 0; _u.delimiter.routing_id = 0;
return 0; return 0;
} }
...@@ -197,7 +204,8 @@ int zmq::msg_t::init_join () ...@@ -197,7 +204,8 @@ int zmq::msg_t::init_join ()
_u.base.metadata = NULL; _u.base.metadata = NULL;
_u.base.type = type_join; _u.base.type = type_join;
_u.base.flags = 0; _u.base.flags = 0;
_u.base.group[0] = '\0'; _u.base.group.sgroup.group[0] = '\0';
_u.base.group.type = group_type_short;
_u.base.routing_id = 0; _u.base.routing_id = 0;
return 0; return 0;
} }
...@@ -207,7 +215,8 @@ int zmq::msg_t::init_leave () ...@@ -207,7 +215,8 @@ int zmq::msg_t::init_leave ()
_u.base.metadata = NULL; _u.base.metadata = NULL;
_u.base.type = type_leave; _u.base.type = type_leave;
_u.base.flags = 0; _u.base.flags = 0;
_u.base.group[0] = '\0'; _u.base.group.sgroup.group[0] = '\0';
_u.base.group.type = group_type_short;
_u.base.routing_id = 0; _u.base.routing_id = 0;
return 0; return 0;
} }
...@@ -289,6 +298,16 @@ int zmq::msg_t::close () ...@@ -289,6 +298,16 @@ int zmq::msg_t::close ()
_u.base.metadata = NULL; _u.base.metadata = NULL;
} }
if (_u.base.group.type == group_type_long) {
if (!_u.base.group.lgroup.content->refcnt.sub (1)) {
// We used "placement new" operator to initialize the reference
// counter so we call the destructor explicitly now.
_u.base.group.lgroup.content->refcnt.~atomic_counter_t ();
free (_u.base.group.lgroup.content);
}
}
// Make the message invalid. // Make the message invalid.
_u.base.type = 0; _u.base.type = 0;
...@@ -346,6 +365,9 @@ int zmq::msg_t::copy (msg_t &src_) ...@@ -346,6 +365,9 @@ int zmq::msg_t::copy (msg_t &src_)
if (src_._u.base.metadata != NULL) if (src_._u.base.metadata != NULL)
src_._u.base.metadata->add_ref (); src_._u.base.metadata->add_ref ();
if (src_._u.base.group.type == group_type_long)
src_._u.base.group.lgroup.content->refcnt.add (1);
*this = src_; *this = src_;
return 0; return 0;
...@@ -638,12 +660,16 @@ int zmq::msg_t::reset_routing_id () ...@@ -638,12 +660,16 @@ int zmq::msg_t::reset_routing_id ()
const char *zmq::msg_t::group () const const char *zmq::msg_t::group () const
{ {
return _u.base.group; if (_u.base.group.type == group_type_long)
return _u.base.group.lgroup.content->group;
return _u.base.group.sgroup.group;
} }
int zmq::msg_t::set_group (const char *group_) int zmq::msg_t::set_group (const char *group_)
{ {
return set_group (group_, ZMQ_GROUP_MAX_LENGTH); size_t length = strnlen (group_, ZMQ_GROUP_MAX_LENGTH);
return set_group (group_, length);
} }
int zmq::msg_t::set_group (const char *group_, size_t length_) int zmq::msg_t::set_group (const char *group_, size_t length_)
...@@ -653,8 +679,19 @@ int zmq::msg_t::set_group (const char *group_, size_t length_) ...@@ -653,8 +679,19 @@ int zmq::msg_t::set_group (const char *group_, size_t length_)
return -1; return -1;
} }
strncpy (_u.base.group, group_, length_); if (length_ > 14) {
_u.base.group[length_] = '\0'; _u.base.group.lgroup.type = group_type_long;
_u.base.group.lgroup.content =
(long_group_t *) malloc (sizeof (long_group_t));
assert (_u.base.group.lgroup.content);
new (&_u.base.group.lgroup.content->refcnt) zmq::atomic_counter_t ();
_u.base.group.lgroup.content->refcnt.set (1);
strncpy (_u.base.group.lgroup.content->group, group_, length_);
_u.base.group.lgroup.content->group[length_] = '\0';
} else {
strncpy (_u.base.group.sgroup.group, group_, length_);
_u.base.group.sgroup.group[length_] = '\0';
}
return 0; return 0;
} }
......
...@@ -46,7 +46,7 @@ ...@@ -46,7 +46,7 @@
// Note that it has to be declared as "C" so that it is the same as // Note that it has to be declared as "C" so that it is the same as
// zmq_free_fn defined in zmq.h. // zmq_free_fn defined in zmq.h.
extern "C" { extern "C" {
typedef void(msg_free_fn) (void *data_, void *hint_); typedef void (msg_free_fn) (void *data_, void *hint_);
} }
namespace zmq namespace zmq
...@@ -215,6 +215,33 @@ class msg_t ...@@ -215,6 +215,33 @@ class msg_t
type_max = 107 type_max = 107
}; };
enum group_type_t
{
group_type_short,
group_type_long
};
struct long_group_t
{
char group[ZMQ_GROUP_MAX_LENGTH + 1];
atomic_counter_t refcnt;
};
union group_t
{
unsigned char type;
struct
{
unsigned char type;
char group[15];
} sgroup;
struct
{
unsigned char type;
long_group_t *content;
} lgroup;
};
// Note that fields shared between different message types are not // Note that fields shared between different message types are not
// moved to the parent class (msg_t). This way we get tighter packing // moved to the parent class (msg_t). This way we get tighter packing
// of the data. Shared fields can be accessed via 'base' member of // of the data. Shared fields can be accessed via 'base' member of
...@@ -224,13 +251,13 @@ class msg_t ...@@ -224,13 +251,13 @@ class msg_t
struct struct
{ {
metadata_t *metadata; metadata_t *metadata;
unsigned char unsigned char unused[msg_t_size
unused[msg_t_size - (sizeof (metadata_t *) + 2
- (sizeof (metadata_t *) + 2 + 16 + sizeof (uint32_t))]; + sizeof (uint32_t) + sizeof (group_t))];
unsigned char type; unsigned char type;
unsigned char flags; unsigned char flags;
char group[16];
uint32_t routing_id; uint32_t routing_id;
group_t group;
} base; } base;
struct struct
{ {
...@@ -239,57 +266,59 @@ class msg_t ...@@ -239,57 +266,59 @@ class msg_t
unsigned char size; unsigned char size;
unsigned char type; unsigned char type;
unsigned char flags; unsigned char flags;
char group[16];
uint32_t routing_id; uint32_t routing_id;
group_t group;
} vsm; } vsm;
struct struct
{ {
metadata_t *metadata; metadata_t *metadata;
content_t *content; content_t *content;
unsigned char unused[msg_t_size unsigned char
- (sizeof (metadata_t *) + sizeof (content_t *) unused[msg_t_size
+ 2 + 16 + sizeof (uint32_t))]; - (sizeof (metadata_t *) + sizeof (content_t *) + 2
+ sizeof (uint32_t) + sizeof (group_t))];
unsigned char type; unsigned char type;
unsigned char flags; unsigned char flags;
char group[16];
uint32_t routing_id; uint32_t routing_id;
group_t group;
} lmsg; } lmsg;
struct struct
{ {
metadata_t *metadata; metadata_t *metadata;
content_t *content; content_t *content;
unsigned char unused[msg_t_size unsigned char
- (sizeof (metadata_t *) + sizeof (content_t *) unused[msg_t_size
+ 2 + 16 + sizeof (uint32_t))]; - (sizeof (metadata_t *) + sizeof (content_t *) + 2
+ sizeof (uint32_t) + sizeof (group_t))];
unsigned char type; unsigned char type;
unsigned char flags; unsigned char flags;
char group[16];
uint32_t routing_id; uint32_t routing_id;
group_t group;
} zclmsg; } zclmsg;
struct struct
{ {
metadata_t *metadata; metadata_t *metadata;
void *data; void *data;
size_t size; size_t size;
unsigned char unsigned char unused[msg_t_size
unused[msg_t_size - (sizeof (metadata_t *) + sizeof (void *)
- (sizeof (metadata_t *) + sizeof (void *) + sizeof (size_t) + 2 + sizeof (uint32_t)
+ sizeof (size_t) + 2 + 16 + sizeof (uint32_t))]; + sizeof (group_t))];
unsigned char type; unsigned char type;
unsigned char flags; unsigned char flags;
char group[16];
uint32_t routing_id; uint32_t routing_id;
group_t group;
} cmsg; } cmsg;
struct struct
{ {
metadata_t *metadata; metadata_t *metadata;
unsigned char unsigned char unused[msg_t_size
unused[msg_t_size - (sizeof (metadata_t *) + 2
- (sizeof (metadata_t *) + 2 + 16 + sizeof (uint32_t))]; + sizeof (uint32_t) + sizeof (group_t))];
unsigned char type; unsigned char type;
unsigned char flags; unsigned char flags;
char group[16];
uint32_t routing_id; uint32_t routing_id;
group_t group;
} delimiter; } delimiter;
} _u; } _u;
}; };
......
...@@ -96,6 +96,35 @@ void test_leave_unjoined_fails () ...@@ -96,6 +96,35 @@ void test_leave_unjoined_fails ()
test_context_socket_close (dish); test_context_socket_close (dish);
} }
void test_long_group ()
{
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *radio = test_context_socket (ZMQ_RADIO);
bind_loopback (radio, false, my_endpoint, len);
void *dish = test_context_socket (ZMQ_DISH);
// Joining to a long group, over 14 chars
char group[19] = "0123456789ABCDEFGH";
TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, group));
// Connecting
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dish, my_endpoint));
msleep (SETTLE_TIME);
// This is going to be sent to the dish
msg_send_expect_success (radio, group, "HELLO");
// Check the correct message arrived
msg_recv_cmp (dish, group, "HELLO");
test_context_socket_close (dish);
test_context_socket_close (radio);
}
void test_join_too_long_fails () void test_join_too_long_fails ()
{ {
void *dish = test_context_socket (ZMQ_DISH); void *dish = test_context_socket (ZMQ_DISH);
...@@ -505,6 +534,7 @@ int main (void) ...@@ -505,6 +534,7 @@ int main (void)
UNITY_BEGIN (); UNITY_BEGIN ();
RUN_TEST (test_leave_unjoined_fails); RUN_TEST (test_leave_unjoined_fails);
RUN_TEST (test_join_too_long_fails); RUN_TEST (test_join_too_long_fails);
RUN_TEST (test_long_group);
RUN_TEST (test_join_twice_fails); RUN_TEST (test_join_twice_fails);
RUN_TEST (test_radio_bind_fails_ipv4); RUN_TEST (test_radio_bind_fails_ipv4);
RUN_TEST (test_radio_bind_fails_ipv6); RUN_TEST (test_radio_bind_fails_ipv6);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment