Commit 72161fb0 authored by Martin Sustrik's avatar Martin Sustrik

format of subscriptions changed (no * needed anymore)

parent c97967ed
...@@ -326,17 +326,20 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) ...@@ -326,17 +326,20 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
if (errno != EAGAIN) if (errno != EAGAIN)
return -1; return -1;
app_thread->process_commands (false, false); app_thread->process_commands (false, false);
ticks = 0;
rc = xrecv (msg_, flags_); rc = xrecv (msg_, flags_);
ticks = 0;
} }
else { else {
while (rc != 0) { while (rc != 0) {
if (errno != EAGAIN) if (errno == EINPROGRESS)
return -1; app_thread->process_commands (false, true);
else if (errno == EAGAIN)
app_thread->process_commands (true, false); app_thread->process_commands (true, false);
ticks = 0; else
return -1;
rc = xrecv (msg_, flags_); rc = xrecv (msg_, flags_);
} }
ticks = 0;
} }
......
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <string.h>
#include "../bindings/c/zmq.h" #include "../bindings/c/zmq.h"
#include "sub.hpp" #include "sub.hpp"
...@@ -67,41 +69,30 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_, ...@@ -67,41 +69,30 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_) size_t optvallen_)
{ {
if (option_ == ZMQ_SUBSCRIBE) { if (option_ == ZMQ_SUBSCRIBE) {
std::string subscription ((const char*) optval_, optvallen_); if (!optvallen_)
if (subscription == "*")
all_count++; all_count++;
else if (subscription [subscription.size () - 1] == '*')
prefixes.insert (subscription.substr (0, subscription.size () - 1));
else else
topics.insert (subscription); subscriptions.insert (std::string ((const char*) optval_,
optvallen_));
return 0; return 0;
} }
if (option_ == ZMQ_UNSUBSCRIBE) { if (option_ == ZMQ_UNSUBSCRIBE) {
std::string subscription ((const char*) optval_, optvallen_); if (!optvallen_) {
if (subscription == "*") {
if (!all_count) { if (!all_count) {
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
all_count--; all_count--;
} }
else if (subscription [subscription.size () - 1] == '*') {
subscriptions_t::iterator it = prefixes.find (
subscription.substr (0, subscription.size () - 1));
if (it == prefixes.end ()) {
errno = EINVAL;
return -1;
}
prefixes.erase (it);
}
else { else {
subscriptions_t::iterator it = topics.find (subscription); subscriptions_t::iterator it = subscriptions.find (
if (it == topics.end ()) { std::string ((const char*) optval_, optvallen_));
if (it == subscriptions.end ()) {
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
topics.erase (it); subscriptions.erase (it);
} }
return 0; return 0;
} }
...@@ -124,8 +115,6 @@ int zmq::sub_t::xflush () ...@@ -124,8 +115,6 @@ int zmq::sub_t::xflush ()
int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_) int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)
{ {
while (true) {
// Get a message using fair queueing algorithm. // Get a message using fair queueing algorithm.
int rc = fq.recv (msg_, flags_); int rc = fq.recv (msg_, flags_);
...@@ -133,35 +122,30 @@ int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_) ...@@ -133,35 +122,30 @@ int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)
if (rc != 0 && errno == EAGAIN) if (rc != 0 && errno == EAGAIN)
return -1; return -1;
// If there is no subscription return -1/EAGAIN. // If there is at least one * subscription, the message matches.
if (!all_count && prefixes.empty () && topics.empty ()) {
errno = EAGAIN;
return -1;
}
// If there is at least one "*" subscription, the message matches.
if (all_count) if (all_count)
return 0; return 0;
// Check the message format.
// TODO: We should either ignore the message or drop the connection
// if the message doesn't conform with the expected format.
unsigned char *data = (unsigned char*) zmq_msg_data (msg_);
zmq_assert (*data <= zmq_msg_size (msg_) - 1);
std::string topic ((const char*) (data + 1), *data);
// Check whether the message matches at least one prefix subscription. // Check whether the message matches at least one prefix subscription.
for (subscriptions_t::iterator it = prefixes.begin (); // TODO: Make this efficient - O(log(n)) where n is number of characters in
it != prefixes.end (); it++) // the longest subscription string.
if (it->size () <= topic.size () && for (subscriptions_t::iterator it = subscriptions.begin ();
*it == topic.substr (0, it->size ())) it != subscriptions.end (); it++) {
return 0; size_t msg_size = zmq_msg_size (msg_);
size_t sub_size = it->size ();
// Check whether the message matches an exact match subscription. if (sub_size <= msg_size &&
subscriptions_t::iterator it = topics.find (topic); memcmp (zmq_msg_data (msg_), it->data (), sub_size) == 0)
if (it != topics.end ())
return 0; return 0;
} }
// The message did not pass the filter. Trim it.
// Note that we are returning a different error code so that the caller
// knows there are more messages available. We cannot loop here as
// a stream of non-matching messages would create a DoS situation.
zmq_msg_close (msg_);
zmq_msg_init (msg_);
errno = EINPROGRESS;
return -1;
} }
bool zmq::sub_t::xhas_in () bool zmq::sub_t::xhas_in ()
......
...@@ -56,16 +56,11 @@ namespace zmq ...@@ -56,16 +56,11 @@ namespace zmq
// Fair queueing object for inbound pipes. // Fair queueing object for inbound pipes.
fq_t fq; fq_t fq;
// Number of active "*" subscriptions. // Number of active * subscriptions.
int all_count; int all_count;
typedef std::multiset <std::string> subscriptions_t; typedef std::multiset <std::string> subscriptions_t;
subscriptions_t subscriptions;
// List of all prefix subscriptions.
subscriptions_t prefixes;
// List of all exact match subscriptions.
subscriptions_t topics;
sub_t (const sub_t&); sub_t (const sub_t&);
void operator = (const sub_t&); void operator = (const sub_t&);
......
...@@ -101,8 +101,11 @@ void zmq::zmq_engine_t::in_event () ...@@ -101,8 +101,11 @@ void zmq::zmq_engine_t::in_event ()
insize -= processed; insize -= processed;
// Stop polling for input if we got stuck. // Stop polling for input if we got stuck.
if (processed < insize) if (processed < insize) {
reset_pollin (handle); zmq_assert (false);
// TODO: This may happen is queue limits are implemented.
// reset_pollin (handle);
}
// Flush all messages the decoder may have produced. // Flush all messages the decoder may have produced.
inout->flush (); inout->flush ();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment