Commit 311fb0d8 authored by Martin Sustrik's avatar Martin Sustrik

Subscription matching moved from XSUB to SUB socket

This patch will prevent duplicate matching in devices in the future.
Instead of matching in both XPUB and XSUB, it'll happen only
in XPUB. Receiver endpoint will still filter messages via SUB
socket.
Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent 718885fd
...@@ -22,19 +22,39 @@ ...@@ -22,19 +22,39 @@
#include "msg.hpp" #include "msg.hpp"
zmq::sub_t::sub_t (class ctx_t *parent_, uint32_t tid_) : zmq::sub_t::sub_t (class ctx_t *parent_, uint32_t tid_) :
xsub_t (parent_, tid_) xsub_t (parent_, tid_),
has_message (false),
more (false)
{ {
options.type = ZMQ_SUB; options.type = ZMQ_SUB;
int rc = message.init ();
errno_assert (rc == 0);
} }
zmq::sub_t::~sub_t () zmq::sub_t::~sub_t ()
{ {
int rc = message.close ();
errno_assert (rc == 0);
} }
int zmq::sub_t::xsetsockopt (int option_, const void *optval_, int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_) size_t optvallen_)
{ {
if (option_ != ZMQ_SUBSCRIBE && option_ != ZMQ_UNSUBSCRIBE) { // Process a subscription.
if (option_ == ZMQ_SUBSCRIBE)
subscriptions.add ((unsigned char*) optval_, optvallen_);
// Process an unsubscription. Return error if there is no corresponding
// subscription.
else if (option_ == ZMQ_UNSUBSCRIBE) {
if (!subscriptions.rm ((unsigned char*) optval_, optvallen_)) {
errno = EINVAL;
return -1;
}
}
// Unknow option.
else {
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
...@@ -74,3 +94,90 @@ bool zmq::sub_t::xhas_out () ...@@ -74,3 +94,90 @@ bool zmq::sub_t::xhas_out ()
// Overload the XSUB's send. // Overload the XSUB's send.
return false; return false;
} }
int zmq::sub_t::xrecv (msg_t *msg_, int flags_)
{
// If there's already a message prepared by a previous call to zmq_poll,
// return it straight ahead.
if (has_message) {
int rc = msg_->move (message);
errno_assert (rc == 0);
has_message = false;
more = msg_->flags () & msg_t::more;
return 0;
}
// TODO: This can result in infinite loop in the case of continuous
// stream of non-matching messages which breaks the non-blocking recv
// semantics.
while (true) {
// Get a message using fair queueing algorithm.
int rc = xsub_t::xrecv (msg_, flags_);
// If there's no message available, return immediately.
// The same when error occurs.
if (rc != 0)
return -1;
// Check whether the message matches at least one subscription.
// Non-initial parts of the message are passed
if (more || match (msg_)) {
more = msg_->flags () & msg_t::more;
return 0;
}
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
while (msg_->flags () & msg_t::more) {
rc = xsub_t::xrecv (msg_, ZMQ_DONTWAIT);
zmq_assert (rc == 0);
}
}
}
bool zmq::sub_t::xhas_in ()
{
// There are subsequent parts of the partly-read message available.
if (more)
return true;
// If there's already a message prepared by a previous call to zmq_poll,
// return straight ahead.
if (has_message)
return true;
// TODO: This can result in infinite loop in the case of continuous
// stream of non-matching messages.
while (true) {
// Get a message using fair queueing algorithm.
int rc = xsub_t::xrecv (&message, ZMQ_DONTWAIT);
// If there's no message available, return immediately.
// The same when error occurs.
if (rc != 0) {
zmq_assert (errno == EAGAIN);
return false;
}
// Check whether the message matches at least one subscription.
if (match (&message)) {
has_message = true;
return true;
}
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
while (message.flags () & msg_t::more) {
rc = xsub_t::xrecv (&message, ZMQ_DONTWAIT);
zmq_assert (rc == 0);
}
}
}
bool zmq::sub_t::match (msg_t *msg_)
{
return subscriptions.check ((unsigned char*) msg_->data (), msg_->size ());
}
...@@ -22,6 +22,8 @@ ...@@ -22,6 +22,8 @@
#define __ZMQ_SUB_HPP_INCLUDED__ #define __ZMQ_SUB_HPP_INCLUDED__
#include "xsub.hpp" #include "xsub.hpp"
#include "trie.hpp"
#include "msg.hpp"
namespace zmq namespace zmq
{ {
...@@ -38,9 +40,26 @@ namespace zmq ...@@ -38,9 +40,26 @@ namespace zmq
int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (class msg_t *msg_, int options_); int xsend (class msg_t *msg_, int options_);
bool xhas_out (); bool xhas_out ();
int xrecv (class msg_t *msg_, int flags_);
bool xhas_in ();
private: private:
// Check whether the message matches at least one subscription.
bool match (class msg_t *msg_);
// The repository of subscriptions.
trie_t subscriptions;
// If true, 'message' contains a matching message to return on the
// next recv call.
bool has_message;
msg_t message;
// If true, part of a multipart message was already received, but
// there are following parts still waiting.
bool more;
sub_t (const sub_t&); sub_t (const sub_t&);
const sub_t &operator = (const sub_t&); const sub_t &operator = (const sub_t&);
}; };
......
...@@ -24,19 +24,13 @@ ...@@ -24,19 +24,13 @@
#include "err.hpp" #include "err.hpp"
zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_) : zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_), socket_base_t (parent_, tid_)
has_message (false),
more (false)
{ {
options.type = ZMQ_XSUB; options.type = ZMQ_XSUB;
int rc = message.init ();
errno_assert (rc == 0);
} }
zmq::xsub_t::~xsub_t () zmq::xsub_t::~xsub_t ()
{ {
int rc = message.close ();
errno_assert (rc == 0);
} }
void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
...@@ -57,21 +51,8 @@ void zmq::xsub_t::xterminated (pipe_t *pipe_) ...@@ -57,21 +51,8 @@ void zmq::xsub_t::xterminated (pipe_t *pipe_)
int zmq::xsub_t::xsend (msg_t *msg_, int options_) int zmq::xsub_t::xsend (msg_t *msg_, int options_)
{ {
size_t size = msg_->size (); // TODO: Once we'll send the subscription upstream here. For now
unsigned char *data = (unsigned char*) msg_->data (); // just empty the message.
// Malformed subscriptions are dropped silently.
if (size >= 1) {
// Process a subscription.
if (*data == 1)
subscriptions.add (data + 1, size - 1);
// Process an unsubscription. Invalid unsubscription is ignored.
if (*data == 0)
subscriptions.rm (data + 1, size - 1);
}
int rc = msg_->close (); int rc = msg_->close ();
errno_assert (rc == 0); errno_assert (rc == 0);
rc = msg_->init (); rc = msg_->init ();
...@@ -85,89 +66,13 @@ bool zmq::xsub_t::xhas_out () ...@@ -85,89 +66,13 @@ bool zmq::xsub_t::xhas_out ()
return true; return true;
} }
int zmq::xsub_t::xrecv (msg_t *msg_, int flags_) int zmq::xsub_t::xrecv (class msg_t *msg_, int flags_)
{ {
// If there's already a message prepared by a previous call to zmq_poll, return fq.recv (msg_, flags_);
// return it straight ahead.
if (has_message) {
int rc = msg_->move (message);
errno_assert (rc == 0);
has_message = false;
more = msg_->flags () & msg_t::more;
return 0;
}
// TODO: This can result in infinite loop in the case of continuous
// stream of non-matching messages which breaks the non-blocking recv
// semantics.
while (true) {
// Get a message using fair queueing algorithm.
int rc = fq.recv (msg_, flags_);
// If there's no message available, return immediately.
// The same when error occurs.
if (rc != 0)
return -1;
// Check whether the message matches at least one subscription.
// Non-initial parts of the message are passed
if (more || match (msg_)) {
more = msg_->flags () & msg_t::more;
return 0;
}
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
while (msg_->flags () & msg_t::more) {
rc = fq.recv (msg_, ZMQ_DONTWAIT);
zmq_assert (rc == 0);
}
}
} }
bool zmq::xsub_t::xhas_in () bool zmq::xsub_t::xhas_in ()
{ {
// There are subsequent parts of the partly-read message available. return fq.has_in ();
if (more)
return true;
// If there's already a message prepared by a previous call to zmq_poll,
// return straight ahead.
if (has_message)
return true;
// TODO: This can result in infinite loop in the case of continuous
// stream of non-matching messages.
while (true) {
// Get a message using fair queueing algorithm.
int rc = fq.recv (&message, ZMQ_DONTWAIT);
// If there's no message available, return immediately.
// The same when error occurs.
if (rc != 0) {
zmq_assert (errno == EAGAIN);
return false;
}
// Check whether the message matches at least one subscription.
if (match (&message)) {
has_message = true;
return true;
}
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
while (message.flags () & msg_t::more) {
rc = fq.recv (&message, ZMQ_DONTWAIT);
zmq_assert (rc == 0);
}
}
}
bool zmq::xsub_t::match (msg_t *msg_)
{
return subscriptions.check ((unsigned char*) msg_->data (), msg_->size ());
} }
...@@ -21,9 +21,7 @@ ...@@ -21,9 +21,7 @@
#ifndef __ZMQ_XSUB_HPP_INCLUDED__ #ifndef __ZMQ_XSUB_HPP_INCLUDED__
#define __ZMQ_XSUB_HPP_INCLUDED__ #define __ZMQ_XSUB_HPP_INCLUDED__
#include "trie.hpp"
#include "socket_base.hpp" #include "socket_base.hpp"
#include "msg.hpp"
#include "fq.hpp" #include "fq.hpp"
namespace zmq namespace zmq
...@@ -50,24 +48,9 @@ namespace zmq ...@@ -50,24 +48,9 @@ namespace zmq
private: private:
// Check whether the message matches at least one subscription.
bool match (class msg_t *msg_);
// Fair queueing object for inbound pipes. // Fair queueing object for inbound pipes.
fq_t fq; fq_t fq;
// The repository of subscriptions.
trie_t subscriptions;
// If true, 'message' contains a matching message to return on the
// next recv call.
bool has_message;
msg_t message;
// If true, part of a multipart message was already received, but
// there are following parts still waiting.
bool more;
xsub_t (const xsub_t&); xsub_t (const xsub_t&);
const xsub_t &operator = (const xsub_t&); const xsub_t &operator = (const xsub_t&);
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment