Commit 7be7962f authored by Martin Sustrik's avatar Martin Sustrik

prefix-style message filtering added

parent 42ad2aa0
...@@ -23,7 +23,8 @@ ...@@ -23,7 +23,8 @@
#include "err.hpp" #include "err.hpp"
zmq::sub_t::sub_t (class app_thread_t *parent_) : zmq::sub_t::sub_t (class app_thread_t *parent_) :
socket_base_t (parent_) socket_base_t (parent_),
all_count (0)
{ {
} }
...@@ -36,18 +37,41 @@ int zmq::sub_t::setsockopt (int option_, const void *optval_, ...@@ -36,18 +37,41 @@ int zmq::sub_t::setsockopt (int option_, const void *optval_,
{ {
if (option_ == ZMQ_SUBSCRIBE) { if (option_ == ZMQ_SUBSCRIBE) {
std::string subscription ((const char*) optval_, optvallen_); std::string subscription ((const char*) optval_, optvallen_);
subscriptions.insert (subscription); if (subscription == "*")
all_count++;
else if (subscription [subscription.size () - 1] == '*')
prefixes.insert (subscription.substr (0, subscription.size () - 1));
else
topics.insert (subscription);
return 0; return 0;
} }
if (option_ == ZMQ_UNSUBSCRIBE) { if (option_ == ZMQ_UNSUBSCRIBE) {
std::string subscription ((const char*) optval_, optvallen_); std::string subscription ((const char*) optval_, optvallen_);
subscriptions_t::iterator it = subscriptions.find (subscription); if (subscription == "*") {
if (it == subscriptions.end ()) { if (!all_count) {
errno = EINVAL; errno = EINVAL;
return -1; return -1;
}
all_count--;
}
else if (subscription [subscription.size () - 1] == '*') {
subscriptions_t::iterator it = prefixes.find (
subscription.substr (0, subscription.size () - 1));
if (it == prefixes.end ()) {
errno = EINVAL;
return -1;
}
prefixes.erase (it);
}
else {
subscriptions_t::iterator it = topics.find (subscription);
if (it == topics.end ()) {
errno = EINVAL;
return -1;
}
topics.erase (it);
} }
subscriptions.erase (it);
return 0; return 0;
} }
...@@ -65,18 +89,27 @@ int zmq::sub_t::recv (struct zmq_msg_t *msg_, int flags_) ...@@ -65,18 +89,27 @@ int zmq::sub_t::recv (struct zmq_msg_t *msg_, int flags_)
if (rc != 0 && errno == EAGAIN) if (rc != 0 && errno == EAGAIN)
return -1; return -1;
// If there is at least one "*" subscription, the message matches.
if (all_count)
return 0;
// Check the message format. // Check the message format.
// TODO: We should either ignore the message or drop the connection // TODO: We should either ignore the message or drop the connection
// if the message doesn't conform with the expected format. // if the message doesn't conform with the expected format.
unsigned char *data = (unsigned char*) zmq_msg_data (msg_); unsigned char *data = (unsigned char*) zmq_msg_data (msg_);
zmq_assert (*data <= zmq_msg_size (msg_) - 1); zmq_assert (*data <= zmq_msg_size (msg_) - 1);
// Check whether the message matches at least one subscription.
std::string topic ((const char*) (data + 1), *data); std::string topic ((const char*) (data + 1), *data);
subscriptions_t::iterator it = subscriptions.find (topic);
if (it != subscriptions.end ())
break;
}
return 0; // Check whether the message matches at least one prefix subscription.
for (subscriptions_t::iterator it = prefixes.begin ();
it != prefixes.end (); it++)
if (it->size () <= topic.size () &&
*it == topic.substr (0, it->size ()))
return 0;
// Check whether the message matches an exact match subscription.
subscriptions_t::iterator it = topics.find (topic);
if (it != topics.end ())
return 0;
}
} }
...@@ -41,9 +41,15 @@ namespace zmq ...@@ -41,9 +41,15 @@ namespace zmq
private: private:
// List of all the active subscriptions. // Number of active "*" subscriptions.
int all_count;
// List of all prefix subscriptions.
typedef std::multiset <std::string> subscriptions_t; typedef std::multiset <std::string> subscriptions_t;
subscriptions_t subscriptions; subscriptions_t prefixes;
// List of all exact match subscriptions.
subscriptions_t topics;
}; };
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment