Commit 1960b4e8 authored by somdoron's avatar somdoron

Filtering messages on dish side

parent c7d52ec2
...@@ -97,8 +97,7 @@ int zmq::dish_t::xjoin (const char* group_) ...@@ -97,8 +97,7 @@ int zmq::dish_t::xjoin (const char* group_)
return -1; return -1;
} }
subscriptions_t::iterator it = subscriptions_t::iterator it = subscriptions.find (group);
std::find (subscriptions.begin (), subscriptions.end (), group);
// User cannot join same group twice // User cannot join same group twice
if (it != subscriptions.end ()) { if (it != subscriptions.end ()) {
...@@ -106,7 +105,7 @@ int zmq::dish_t::xjoin (const char* group_) ...@@ -106,7 +105,7 @@ int zmq::dish_t::xjoin (const char* group_)
return -1; return -1;
} }
subscriptions.push_back (group); subscriptions.insert (group);
msg_t msg; msg_t msg;
int rc = msg.init_join (); int rc = msg.init_join ();
...@@ -185,15 +184,21 @@ int zmq::dish_t::xrecv (msg_t *msg_) ...@@ -185,15 +184,21 @@ int zmq::dish_t::xrecv (msg_t *msg_)
return 0; return 0;
} }
// Get a message using fair queueing algorithm. while (true) {
int rc = fq.recv (msg_);
// If there's no message available, return immediately. // Get a message using fair queueing algorithm.
// The same when error occurs. int rc = fq.recv (msg_);
if (rc != 0)
return -1;
return 0; // If there's no message available, return immediately.
// The same when error occurs.
if (rc != 0)
return -1;
// Filtering non matching messages
subscriptions_t::iterator it = subscriptions.find (std::string(msg_->group ()));
if (it != subscriptions.end ())
return 0;
}
} }
bool zmq::dish_t::xhas_in () bool zmq::dish_t::xhas_in ()
...@@ -203,18 +208,24 @@ bool zmq::dish_t::xhas_in () ...@@ -203,18 +208,24 @@ bool zmq::dish_t::xhas_in ()
if (has_message) if (has_message)
return true; return true;
// Get a message using fair queueing algorithm. while (true) {
int rc = fq.recv (&message); // Get a message using fair queueing algorithm.
int rc = fq.recv (&message);
// If there's no message available, return immediately. // If there's no message available, return immediately.
// The same when error occurs. // The same when error occurs.
if (rc != 0) { if (rc != 0) {
errno_assert (errno == EAGAIN); errno_assert (errno == EAGAIN);
return false; return false;
} }
has_message = true; // Filtering non matching messages
return true; subscriptions_t::iterator it = subscriptions.find (std::string(message.group ()));
if (it != subscriptions.end ()) {
has_message = true;
return true;
}
}
} }
zmq::blob_t zmq::dish_t::get_credential () const zmq::blob_t zmq::dish_t::get_credential () const
......
...@@ -81,7 +81,7 @@ namespace zmq ...@@ -81,7 +81,7 @@ namespace zmq
dist_t dist; dist_t dist;
// The repository of subscriptions. // The repository of subscriptions.
typedef std::vector<std::string> subscriptions_t; typedef std::set<std::string> subscriptions_t;
subscriptions_t subscriptions; subscriptions_t subscriptions;
// If true, 'message' contains a matching message to return on the // If true, 'message' contains a matching message to return on the
......
...@@ -60,7 +60,7 @@ int msg_recv_cmp (zmq_msg_t *msg_, void *s_, const char* group_, const char* bod ...@@ -60,7 +60,7 @@ int msg_recv_cmp (zmq_msg_t *msg_, void *s_, const char* group_, const char* bod
if (recv_rc == -1) if (recv_rc == -1)
return -1; return -1;
if (strcmp (zmq_msg_group (msg_), group_) != 0) if (strcmp (zmq_msg_group (msg_), group_) != 0)
{ {
zmq_msg_close (msg_); zmq_msg_close (msg_);
return -1; return -1;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment