Commit c7d52ec2 authored by somdoron's avatar somdoron

radio-dish join/leave are ZMTP commands

parent 5054f2eb
...@@ -90,13 +90,15 @@ void zmq::dish_t::xhiccuped (pipe_t *pipe_) ...@@ -90,13 +90,15 @@ void zmq::dish_t::xhiccuped (pipe_t *pipe_)
int zmq::dish_t::xjoin (const char* group_) int zmq::dish_t::xjoin (const char* group_)
{ {
if (strlen (group_) > ZMQ_GROUP_MAX_LENGTH) { std::string group = std::string (group_);
if (group.length () > ZMQ_GROUP_MAX_LENGTH) {
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
subscriptions_t::iterator it = subscriptions_t::iterator it =
std::find (subscriptions.begin (), subscriptions.end (), std::string(group_)); std::find (subscriptions.begin (), subscriptions.end (), group);
// User cannot join same group twice // User cannot join same group twice
if (it != subscriptions.end ()) { if (it != subscriptions.end ()) {
...@@ -104,16 +106,14 @@ int zmq::dish_t::xjoin (const char* group_) ...@@ -104,16 +106,14 @@ int zmq::dish_t::xjoin (const char* group_)
return -1; return -1;
} }
subscriptions.push_back (std::string (group_)); subscriptions.push_back (group);
size_t len = strlen (group_);
msg_t msg; msg_t msg;
int rc = msg.init_size (len + 1); int rc = msg.init_join ();
errno_assert (rc == 0); errno_assert (rc == 0);
char *data = (char*) msg.data (); rc = msg.set_group (group_);
data[0] = 'J'; errno_assert (rc == 0);
memcpy (data + 1, group_, len);
int err = 0; int err = 0;
rc = dist.send_to_all (&msg); rc = dist.send_to_all (&msg);
...@@ -128,12 +128,14 @@ int zmq::dish_t::xjoin (const char* group_) ...@@ -128,12 +128,14 @@ int zmq::dish_t::xjoin (const char* group_)
int zmq::dish_t::xleave (const char* group_) int zmq::dish_t::xleave (const char* group_)
{ {
if (strlen (group_) > ZMQ_GROUP_MAX_LENGTH) { std::string group = std::string (group_);
if (group.length () > ZMQ_GROUP_MAX_LENGTH) {
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
subscriptions_t::iterator it = std::find (subscriptions.begin (), subscriptions.end (), std::string (group_)); subscriptions_t::iterator it = std::find (subscriptions.begin (), subscriptions.end (), group);
if (it == subscriptions.end ()) { if (it == subscriptions.end ()) {
errno = EINVAL; errno = EINVAL;
...@@ -142,14 +144,12 @@ int zmq::dish_t::xleave (const char* group_) ...@@ -142,14 +144,12 @@ int zmq::dish_t::xleave (const char* group_)
subscriptions.erase (it); subscriptions.erase (it);
size_t len = strlen (group_);
msg_t msg; msg_t msg;
int rc = msg.init_size (len + 1); int rc = msg.init_leave ();
errno_assert (rc == 0); errno_assert (rc == 0);
char *data = (char*) msg.data (); rc = msg.set_group (group_);
data[0] = 'L'; errno_assert (rc == 0);
memcpy (data + 1, group_, len);
int err = 0; int err = 0;
rc = dist.send_to_all (&msg); rc = dist.send_to_all (&msg);
...@@ -226,21 +226,15 @@ void zmq::dish_t::send_subscriptions (pipe_t *pipe_) ...@@ -226,21 +226,15 @@ void zmq::dish_t::send_subscriptions (pipe_t *pipe_)
{ {
for (subscriptions_t::iterator it = subscriptions.begin (); it != subscriptions.end (); ++it) { for (subscriptions_t::iterator it = subscriptions.begin (); it != subscriptions.end (); ++it) {
msg_t msg; msg_t msg;
int rc = msg.init_size (it->length () + 1); int rc = msg.init_join ();
errno_assert (rc == 0);
rc = msg.set_group (it->c_str());
errno_assert (rc == 0); errno_assert (rc == 0);
char *data = (char*) msg.data ();
data [0] = 'J';
it->copy (data + 1, it->length ());
// Send it to the pipe. // Send it to the pipe.
bool sent = pipe_->write (&msg); pipe_->write (&msg);
msg.close ();
// If we reached the SNDHWM, and thus cannot send the subscription, drop
// the subscription message instead. This matches the behaviour of
// zmq_setsockopt(ZMQ_SUBSCRIBE, ...), which also drops subscriptions
// when the SNDHWM is reached.
if (!sent)
msg.close ();
} }
pipe_->flush (); pipe_->flush ();
...@@ -303,6 +297,48 @@ int zmq::dish_session_t::push_msg (msg_t *msg_) ...@@ -303,6 +297,48 @@ int zmq::dish_session_t::push_msg (msg_t *msg_)
} }
} }
int zmq::dish_session_t::pull_msg (msg_t *msg_)
{
int rc = session_base_t::pull_msg (msg_);
if (rc != 0)
return rc;
if (!msg_->is_join () && !msg_->is_leave ())
return rc;
else {
int group_length = strlen (msg_->group ());
msg_t command;
int offset;
if (msg_->is_join ()) {
command.init_size (group_length + 5);
offset = 5;
memcpy (command.data (), "\4JOIN", 5);
}
else {
command.init_size (group_length + 6);
offset = 6;
memcpy (command.data (), "\5LEAVE", 6);
}
command.set_flags (msg_t::command);
char* command_data = (char*)command.data ();
// Copy the group
memcpy (command_data + offset, msg_->group (), group_length);
// Close the join message
int rc = msg_->close ();
errno_assert (rc == 0);
*msg_ = command;
return 0;
}
}
void zmq::dish_session_t::reset () void zmq::dish_session_t::reset ()
{ {
session_base_t::reset (); session_base_t::reset ();
......
...@@ -104,6 +104,7 @@ namespace zmq ...@@ -104,6 +104,7 @@ namespace zmq
// Overrides of the functions from session_base_t. // Overrides of the functions from session_base_t.
int push_msg (msg_t *msg_); int push_msg (msg_t *msg_);
int pull_msg (msg_t *msg_);
void reset (); void reset ();
private: private:
......
...@@ -200,6 +200,28 @@ int zmq::msg_t::init_delimiter () ...@@ -200,6 +200,28 @@ int zmq::msg_t::init_delimiter ()
return 0; return 0;
} }
int zmq::msg_t::init_join ()
{
u.base.metadata = NULL;
u.base.type = type_join;
u.base.flags = 0;
u.base.group[0] = '\0';
u.base.routing_id = 0;
u.base.fd = retired_fd;
return 0;
}
int zmq::msg_t::init_leave ()
{
u.base.metadata = NULL;
u.base.type = type_leave;
u.base.flags = 0;
u.base.group[0] = '\0';
u.base.routing_id = 0;
u.base.fd = retired_fd;
return 0;
}
int zmq::msg_t::close () int zmq::msg_t::close ()
{ {
// Check the validity of the message. // Check the validity of the message.
...@@ -440,6 +462,16 @@ bool zmq::msg_t::is_zcmsg() const ...@@ -440,6 +462,16 @@ bool zmq::msg_t::is_zcmsg() const
return u.base.type == type_zclmsg; return u.base.type == type_zclmsg;
} }
bool zmq::msg_t::is_join() const
{
return u.base.type == type_join;
}
bool zmq::msg_t::is_leave() const
{
return u.base.type == type_leave;
}
void zmq::msg_t::add_refs (int refs_) void zmq::msg_t::add_refs (int refs_)
{ {
zmq_assert (refs_ >= 0); zmq_assert (refs_ >= 0);
......
...@@ -95,6 +95,8 @@ namespace zmq ...@@ -95,6 +95,8 @@ namespace zmq
int init_external_storage(content_t* content_, void *data_, size_t size_, int init_external_storage(content_t* content_, void *data_, size_t size_,
msg_free_fn *ffn_, void *hint_); msg_free_fn *ffn_, void *hint_);
int init_delimiter (); int init_delimiter ();
int init_join ();
int init_leave ();
int close (); int close ();
int move (msg_t &src_); int move (msg_t &src_);
int copy (msg_t &src_); int copy (msg_t &src_);
...@@ -111,6 +113,8 @@ namespace zmq ...@@ -111,6 +113,8 @@ namespace zmq
bool is_identity () const; bool is_identity () const;
bool is_credential () const; bool is_credential () const;
bool is_delimiter () const; bool is_delimiter () const;
bool is_join () const;
bool is_leave () const;
bool is_vsm () const; bool is_vsm () const;
bool is_cmsg () const; bool is_cmsg () const;
bool is_zcmsg() const; bool is_zcmsg() const;
...@@ -137,7 +141,6 @@ namespace zmq ...@@ -137,7 +141,6 @@ namespace zmq
16 + 16 +
sizeof (uint32_t) + sizeof (uint32_t) +
sizeof (fd_t))}; sizeof (fd_t))};
private: private:
zmq::atomic_counter_t* refcnt(); zmq::atomic_counter_t* refcnt();
...@@ -157,7 +160,13 @@ namespace zmq ...@@ -157,7 +160,13 @@ namespace zmq
// zero-copy LMSG message for v2_decoder // zero-copy LMSG message for v2_decoder
type_zclmsg = 105, type_zclmsg = 105,
type_max = 105 // Join message for radio_dish
type_join = 106,
// Leave message for radio_dish
type_leave = 107,
type_max = 107
}; };
// Note that fields shared between different message types are not // Note that fields shared between different message types are not
......
...@@ -65,15 +65,13 @@ void zmq::radio_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) ...@@ -65,15 +65,13 @@ void zmq::radio_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
void zmq::radio_t::xread_activated (pipe_t *pipe_) void zmq::radio_t::xread_activated (pipe_t *pipe_)
{ {
// There are some subscriptions waiting. Let's process them. // There are some subscriptions waiting. Let's process them.
msg_t sub; msg_t msg;
while (pipe_->read (&sub)) { while (pipe_->read (&msg)) {
// Apply the subscription to the trie // Apply the subscription to the trie
const char * data = (char *) sub.data (); if (msg.is_join () || msg.is_leave ()) {
const size_t size = sub.size (); std::string group = std::string (msg.group ());
if (size > 0 && (*data == 'J' || *data == 'L')) {
std::string group = std::string (data + 1, sub. size() - 1);
if (*data == 'J') if (msg.is_join ())
subscriptions.insert (subscriptions_t::value_type (group, pipe_)); subscriptions.insert (subscriptions_t::value_type (group, pipe_));
else { else {
std::pair<subscriptions_t::iterator, subscriptions_t::iterator> range = std::pair<subscriptions_t::iterator, subscriptions_t::iterator> range =
...@@ -87,7 +85,7 @@ void zmq::radio_t::xread_activated (pipe_t *pipe_) ...@@ -87,7 +85,7 @@ void zmq::radio_t::xread_activated (pipe_t *pipe_)
} }
} }
} }
sub.close (); msg.close ();
} }
} }
...@@ -157,6 +155,52 @@ zmq::radio_session_t::~radio_session_t () ...@@ -157,6 +155,52 @@ zmq::radio_session_t::~radio_session_t ()
{ {
} }
int zmq::radio_session_t::push_msg (msg_t *msg_)
{
if (msg_->flags() & msg_t::command) {
char *command_data =
static_cast <char *> (msg_->data ());
const size_t data_size = msg_->size ();
int group_length;
char * group;
msg_t join_leave_msg;
int rc;
// Set the msg type to either JOIN or LEAVE
if (data_size >= 5 && memcmp (command_data, "\4JOIN", 5) == 0) {
group_length = data_size - 5;
group = command_data + 5;
rc = join_leave_msg.init_join ();
}
else if (data_size >= 6 && memcmp (command_data, "\5LEAVE", 6) == 0) {
group_length = data_size - 6;
group = command_data + 6;
rc = join_leave_msg.init_leave ();
}
// If it is not a JOIN or LEAVE just push the message
else
return session_base_t::push_msg (msg_);
errno_assert (rc == 0);
// Set the group
rc = join_leave_msg.set_group (group, group_length);
errno_assert (rc == 0);
// Close the current command
rc = msg_->close ();
errno_assert (rc == 0);
// Push the join or leave command
*msg_ = join_leave_msg;
return session_base_t::push_msg (msg_);
}
else
return session_base_t::push_msg (msg_);
}
int zmq::radio_session_t::pull_msg (msg_t *msg_) int zmq::radio_session_t::pull_msg (msg_t *msg_)
{ {
if (state == group) { if (state == group) {
......
...@@ -87,6 +87,7 @@ namespace zmq ...@@ -87,6 +87,7 @@ namespace zmq
~radio_session_t (); ~radio_session_t ();
// Overrides of the functions from session_base_t. // Overrides of the functions from session_base_t.
int push_msg (msg_t *msg_);
int pull_msg (msg_t *msg_); int pull_msg (msg_t *msg_);
void reset (); void reset ();
private: private:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment