Commit c80e7b80 authored by Martin Sustrik's avatar Martin Sustrik

XPUB and XSUB socket types added.

These are just placeholders. At the moment XPUB behaves th same
as PUB and XSUB as SUB.
Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent abc8b5e4
...@@ -68,7 +68,7 @@ ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch); ...@@ -68,7 +68,7 @@ ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch);
/* 0MQ errors. */ /* 0MQ errors. */
/******************************************************************************/ /******************************************************************************/
/* A number random anough not to collide with different errno ranges on */ /* A number random enough not to collide with different errno ranges on */
/* different OSes. The assumption is that error_t is at least 32-bit type. */ /* different OSes. The assumption is that error_t is at least 32-bit type. */
#define ZMQ_HAUSNUMERO 156384712 #define ZMQ_HAUSNUMERO 156384712
...@@ -178,6 +178,8 @@ ZMQ_EXPORT int zmq_term (void *context); ...@@ -178,6 +178,8 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_XREP 6 #define ZMQ_XREP 6
#define ZMQ_PULL 7 #define ZMQ_PULL 7
#define ZMQ_PUSH 8 #define ZMQ_PUSH 8
#define ZMQ_XPUB 9
#define ZMQ_XSUB 10
#define ZMQ_UPSTREAM ZMQ_PULL /* Old alias, remove in 3.x */ #define ZMQ_UPSTREAM ZMQ_PULL /* Old alias, remove in 3.x */
#define ZMQ_DOWNSTREAM ZMQ_PUSH /* Old alias, remove in 3.x */ #define ZMQ_DOWNSTREAM ZMQ_PUSH /* Old alias, remove in 3.x */
......
...@@ -126,8 +126,10 @@ libzmq_la_SOURCES = \ ...@@ -126,8 +126,10 @@ libzmq_la_SOURCES = \
uuid.hpp \ uuid.hpp \
windows.hpp \ windows.hpp \
wire.hpp \ wire.hpp \
xpub.hpp \
xrep.hpp \ xrep.hpp \
xreq.hpp \ xreq.hpp \
xsub.hpp \
ypipe.hpp \ ypipe.hpp \
yqueue.hpp \ yqueue.hpp \
zmq_connecter.hpp \ zmq_connecter.hpp \
...@@ -181,8 +183,10 @@ libzmq_la_SOURCES = \ ...@@ -181,8 +183,10 @@ libzmq_la_SOURCES = \
transient_session.cpp \ transient_session.cpp \
trie.cpp \ trie.cpp \
uuid.cpp \ uuid.cpp \
xpub.cpp \
xrep.cpp \ xrep.cpp \
xreq.cpp \ xreq.cpp \
xsub.cpp \
zmq.cpp \ zmq.cpp \
zmq_connecter.cpp \ zmq_connecter.cpp \
zmq_engine.cpp \ zmq_engine.cpp \
......
...@@ -17,155 +17,13 @@ ...@@ -17,155 +17,13 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "../include/zmq.h"
#include "pub.hpp" #include "pub.hpp"
#include "err.hpp"
#include "msg_content.hpp"
#include "pipe.hpp"
zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t tid_) : zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_), xpub_t (parent_, tid_)
active (0),
terminating (false)
{ {
options.type = ZMQ_PUB;
options.requires_in = false;
options.requires_out = true;
} }
zmq::pub_t::~pub_t () zmq::pub_t::~pub_t ()
{ {
zmq_assert (pipes.empty ());
}
void zmq::pub_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (!inpipe_);
outpipe_->set_event_sink (this);
pipes.push_back (outpipe_);
pipes.swap (active, pipes.size () - 1);
active++;
if (terminating) {
register_term_acks (1);
outpipe_->terminate ();
}
} }
void zmq::pub_t::process_term (int linger_)
{
terminating = true;
// Start shutdown process for all the pipes.
for (pipes_t::size_type i = 0; i != pipes.size (); i++)
pipes [i]->terminate ();
// Wait for pipes to terminate before terminating yourself.
register_term_acks (pipes.size ());
// Continue with the termination immediately.
socket_base_t::process_term (linger_);
}
void zmq::pub_t::activated (writer_t *pipe_)
{
// Move the pipe to the list of active pipes.
pipes.swap (pipes.index (pipe_), active);
active++;
}
void zmq::pub_t::terminated (writer_t *pipe_)
{
// Remove the pipe from the list; adjust number of active pipes
// accordingly.
if (pipes.index (pipe_) < active)
active--;
pipes.erase (pipe_);
// If we are already terminating, wait for one term ack less.
if (terminating)
unregister_term_ack ();
}
int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
{
// If there are no active pipes available, simply drop the message.
if (active == 0) {
int rc = zmq_msg_close (msg_);
zmq_assert (rc == 0);
rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return 0;
}
msg_content_t *content = (msg_content_t*) msg_->content;
// For VSMs the copying is straighforward.
if (content == (msg_content_t*) ZMQ_VSM) {
for (pipes_t::size_type i = 0; i < active;)
if (write (pipes [i], msg_))
i++;
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return 0;
}
// Optimisation for the case when there's only a single pipe
// to send the message to - no refcount adjustment i.e. no atomic
// operations are needed.
if (active == 1) {
if (!write (pipes [0], msg_)) {
int rc = zmq_msg_close (msg_);
zmq_assert (rc == 0);
}
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return 0;
}
// There are at least 2 destinations for the message. That means we have
// to deal with reference counting. First add N-1 references to
// the content (we are holding one reference anyway, that's why -1).
if (msg_->flags & ZMQ_MSG_SHARED)
content->refcnt.add (active - 1);
else {
content->refcnt.set (active);
msg_->flags |= ZMQ_MSG_SHARED;
}
// Push the message to all destinations.
for (pipes_t::size_type i = 0; i < active;) {
if (!write (pipes [i], msg_))
content->refcnt.sub (1);
else
i++;
}
// Detach the original message from the data buffer.
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return 0;
}
bool zmq::pub_t::xhas_out ()
{
return true;
}
bool zmq::pub_t::write (class writer_t *pipe_, zmq_msg_t *msg_)
{
if (!pipe_->write (msg_)) {
active--;
pipes.swap (pipes.index (pipe_), active);
return false;
}
if (!(msg_->flags & ZMQ_MSG_MORE))
pipe_->flush ();
return true;
}
...@@ -20,50 +20,20 @@ ...@@ -20,50 +20,20 @@
#ifndef __ZMQ_PUB_HPP_INCLUDED__ #ifndef __ZMQ_PUB_HPP_INCLUDED__
#define __ZMQ_PUB_HPP_INCLUDED__ #define __ZMQ_PUB_HPP_INCLUDED__
#include "socket_base.hpp" #include "xpub.hpp"
#include "array.hpp"
#include "pipe.hpp"
namespace zmq namespace zmq
{ {
class pub_t : public socket_base_t, public i_writer_events class pub_t : public xpub_t
{ {
public: public:
pub_t (class ctx_t *parent_, uint32_t tid_); pub_t (class ctx_t *parent_, uint32_t tid_);
~pub_t (); ~pub_t ();
// Implementations of virtual functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
int xsend (zmq_msg_t *msg_, int flags_);
bool xhas_out ();
// i_writer_events interface implementation.
void activated (writer_t *pipe_);
void terminated (writer_t *pipe_);
private: private:
// Hook into the termination process.
void process_term (int linger_);
// Write the message to the pipe. Make the pipe inactive if writing
// fails. In such a case false is returned.
bool write (class writer_t *pipe_, zmq_msg_t *msg_);
// Outbound pipes, i.e. those the socket is sending messages to.
typedef array_t <class writer_t> pipes_t;
pipes_t pipes;
// Number of active pipes. All the active pipes are located at the
// beginning of the pipes array.
pipes_t::size_type active;
// True if termination process is already underway.
bool terminating;
pub_t (const pub_t&); pub_t (const pub_t&);
void operator = (const pub_t&); void operator = (const pub_t&);
}; };
......
...@@ -46,6 +46,8 @@ ...@@ -46,6 +46,8 @@
#include "ctx.hpp" #include "ctx.hpp"
#include "platform.hpp" #include "platform.hpp"
#include "likely.hpp" #include "likely.hpp"
#include "uuid.hpp"
#include "pair.hpp" #include "pair.hpp"
#include "pub.hpp" #include "pub.hpp"
#include "sub.hpp" #include "sub.hpp"
...@@ -55,7 +57,8 @@ ...@@ -55,7 +57,8 @@
#include "push.hpp" #include "push.hpp"
#include "xreq.hpp" #include "xreq.hpp"
#include "xrep.hpp" #include "xrep.hpp"
#include "uuid.hpp" #include "xpub.hpp"
#include "xsub.hpp"
zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
uint32_t tid_) uint32_t tid_)
...@@ -90,6 +93,12 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, ...@@ -90,6 +93,12 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
case ZMQ_PUSH: case ZMQ_PUSH:
s = new (std::nothrow) push_t (parent_, tid_); s = new (std::nothrow) push_t (parent_, tid_);
break; break;
case ZMQ_XPUB:
s = new (std::nothrow) xpub_t (parent_, tid_);
break;
case ZMQ_XSUB:
s = new (std::nothrow) xsub_t (parent_, tid_);
break;
default: default:
errno = EINVAL; errno = EINVAL;
return NULL; return NULL;
......
...@@ -17,145 +17,13 @@ ...@@ -17,145 +17,13 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <string.h>
#include "../include/zmq.h"
#include "sub.hpp" #include "sub.hpp"
#include "err.hpp"
zmq::sub_t::sub_t (class ctx_t *parent_, uint32_t tid_) : zmq::sub_t::sub_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_), xsub_t (parent_, tid_)
fq (this),
has_message (false),
more (false)
{ {
options.type = ZMQ_SUB;
options.requires_in = true;
options.requires_out = false;
zmq_msg_init (&message);
} }
zmq::sub_t::~sub_t () zmq::sub_t::~sub_t ()
{ {
zmq_msg_close (&message);
}
void zmq::sub_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && !outpipe_);
fq.attach (inpipe_);
}
void zmq::sub_t::process_term (int linger_)
{
fq.terminate ();
socket_base_t::process_term (linger_);
}
int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
if (option_ == ZMQ_SUBSCRIBE) {
subscriptions.add ((unsigned char*) optval_, optvallen_);
return 0;
}
if (option_ == ZMQ_UNSUBSCRIBE) {
if (!subscriptions.rm ((unsigned char*) optval_, optvallen_)) {
errno = EINVAL;
return -1;
}
return 0;
}
errno = EINVAL;
return -1;
}
int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)
{
// If there's already a message prepared by a previous call to zmq_poll,
// return it straight ahead.
if (has_message) {
zmq_msg_move (msg_, &message);
has_message = false;
more = msg_->flags & ZMQ_MSG_MORE;
return 0;
}
// TODO: This can result in infinite loop in the case of continuous
// stream of non-matching messages which breaks the non-blocking recv
// semantics.
while (true) {
// Get a message using fair queueing algorithm.
int rc = fq.recv (msg_, flags_);
// If there's no message available, return immediately.
// The same when error occurs.
if (rc != 0)
return -1;
// Check whether the message matches at least one subscription.
// Non-initial parts of the message are passed
if (more || match (msg_)) {
more = msg_->flags & ZMQ_MSG_MORE;
return 0;
}
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
while (msg_->flags & ZMQ_MSG_MORE) {
rc = fq.recv (msg_, ZMQ_NOBLOCK);
zmq_assert (rc == 0);
}
}
}
bool zmq::sub_t::xhas_in ()
{
// There are subsequent parts of the partly-read message available.
if (more)
return true;
// If there's already a message prepared by a previous call to zmq_poll,
// return straight ahead.
if (has_message)
return true;
// TODO: This can result in infinite loop in the case of continuous
// stream of non-matching messages.
while (true) {
// Get a message using fair queueing algorithm.
int rc = fq.recv (&message, ZMQ_NOBLOCK);
// If there's no message available, return immediately.
// The same when error occurs.
if (rc != 0) {
zmq_assert (errno == EAGAIN);
return false;
}
// Check whether the message matches at least one subscription.
if (match (&message)) {
has_message = true;
return true;
}
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
while (message.flags & ZMQ_MSG_MORE) {
rc = fq.recv (&message, ZMQ_NOBLOCK);
zmq_assert (rc == 0);
}
}
}
bool zmq::sub_t::match (zmq_msg_t *msg_)
{
return subscriptions.check ((unsigned char*) zmq_msg_data (msg_),
zmq_msg_size (msg_));
} }
...@@ -20,54 +20,20 @@ ...@@ -20,54 +20,20 @@
#ifndef __ZMQ_SUB_HPP_INCLUDED__ #ifndef __ZMQ_SUB_HPP_INCLUDED__
#define __ZMQ_SUB_HPP_INCLUDED__ #define __ZMQ_SUB_HPP_INCLUDED__
#include "../include/zmq.h" #include "xsub.hpp"
#include "trie.hpp"
#include "socket_base.hpp"
#include "fq.hpp"
namespace zmq namespace zmq
{ {
class sub_t : public socket_base_t class sub_t : public xsub_t
{ {
public: public:
sub_t (class ctx_t *parent_, uint32_t tid_); sub_t (class ctx_t *parent_, uint32_t tid_);
~sub_t (); ~sub_t ();
protected:
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
private: private:
// Hook into the termination process.
void process_term (int linger_);
// Check whether the message matches at least one subscription.
bool match (zmq_msg_t *msg_);
// Fair queueing object for inbound pipes.
fq_t fq;
// The repository of subscriptions.
trie_t subscriptions;
// If true, 'message' contains a matching message to return on the
// next recv call.
bool has_message;
zmq_msg_t message;
// If true, part of a multipart message was already received, but
// there are following parts still waiting.
bool more;
sub_t (const sub_t&); sub_t (const sub_t&);
void operator = (const sub_t&); void operator = (const sub_t&);
}; };
......
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include "xpub.hpp"
#include "err.hpp"
#include "msg_content.hpp"
#include "pipe.hpp"
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_),
active (0),
terminating (false)
{
options.type = ZMQ_PUB;
options.requires_in = false;
options.requires_out = true;
}
zmq::xpub_t::~xpub_t ()
{
zmq_assert (pipes.empty ());
}
void zmq::xpub_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (!inpipe_);
outpipe_->set_event_sink (this);
pipes.push_back (outpipe_);
pipes.swap (active, pipes.size () - 1);
active++;
if (terminating) {
register_term_acks (1);
outpipe_->terminate ();
}
}
void zmq::xpub_t::process_term (int linger_)
{
terminating = true;
// Start shutdown process for all the pipes.
for (pipes_t::size_type i = 0; i != pipes.size (); i++)
pipes [i]->terminate ();
// Wait for pipes to terminate before terminating yourself.
register_term_acks (pipes.size ());
// Continue with the termination immediately.
socket_base_t::process_term (linger_);
}
void zmq::xpub_t::activated (writer_t *pipe_)
{
// Move the pipe to the list of active pipes.
pipes.swap (pipes.index (pipe_), active);
active++;
}
void zmq::xpub_t::terminated (writer_t *pipe_)
{
// Remove the pipe from the list; adjust number of active pipes
// accordingly.
if (pipes.index (pipe_) < active)
active--;
pipes.erase (pipe_);
// If we are already terminating, wait for one term ack less.
if (terminating)
unregister_term_ack ();
}
int zmq::xpub_t::xsend (zmq_msg_t *msg_, int flags_)
{
// If there are no active pipes available, simply drop the message.
if (active == 0) {
int rc = zmq_msg_close (msg_);
zmq_assert (rc == 0);
rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return 0;
}
msg_content_t *content = (msg_content_t*) msg_->content;
// For VSMs the copying is straighforward.
if (content == (msg_content_t*) ZMQ_VSM) {
for (pipes_t::size_type i = 0; i < active;)
if (write (pipes [i], msg_))
i++;
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return 0;
}
// Optimisation for the case when there's only a single pipe
// to send the message to - no refcount adjustment i.e. no atomic
// operations are needed.
if (active == 1) {
if (!write (pipes [0], msg_)) {
int rc = zmq_msg_close (msg_);
zmq_assert (rc == 0);
}
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return 0;
}
// There are at least 2 destinations for the message. That means we have
// to deal with reference counting. First add N-1 references to
// the content (we are holding one reference anyway, that's why -1).
if (msg_->flags & ZMQ_MSG_SHARED)
content->refcnt.add (active - 1);
else {
content->refcnt.set (active);
msg_->flags |= ZMQ_MSG_SHARED;
}
// Push the message to all destinations.
for (pipes_t::size_type i = 0; i < active;) {
if (!write (pipes [i], msg_))
content->refcnt.sub (1);
else
i++;
}
// Detach the original message from the data buffer.
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return 0;
}
bool zmq::xpub_t::xhas_out ()
{
return true;
}
bool zmq::xpub_t::write (class writer_t *pipe_, zmq_msg_t *msg_)
{
if (!pipe_->write (msg_)) {
active--;
pipes.swap (pipes.index (pipe_), active);
return false;
}
if (!(msg_->flags & ZMQ_MSG_MORE))
pipe_->flush ();
return true;
}
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_XPUB_HPP_INCLUDED__
#define __ZMQ_XPUB_HPP_INCLUDED__
#include "socket_base.hpp"
#include "array.hpp"
#include "pipe.hpp"
namespace zmq
{
class xpub_t : public socket_base_t, public i_writer_events
{
public:
xpub_t (class ctx_t *parent_, uint32_t tid_);
~xpub_t ();
// Implementations of virtual functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
int xsend (zmq_msg_t *msg_, int flags_);
bool xhas_out ();
// i_writer_events interface implementation.
void activated (writer_t *pipe_);
void terminated (writer_t *pipe_);
private:
// Hook into the termination process.
void process_term (int linger_);
// Write the message to the pipe. Make the pipe inactive if writing
// fails. In such a case false is returned.
bool write (class writer_t *pipe_, zmq_msg_t *msg_);
// Outbound pipes, i.e. those the socket is sending messages to.
typedef array_t <class writer_t> pipes_t;
pipes_t pipes;
// Number of active pipes. All the active pipes are located at the
// beginning of the pipes array.
pipes_t::size_type active;
// True if termination process is already underway.
bool terminating;
xpub_t (const xpub_t&);
void operator = (const xpub_t&);
};
}
#endif
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <string.h>
#include "../include/zmq.h"
#include "xsub.hpp"
#include "err.hpp"
zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_),
fq (this),
has_message (false),
more (false)
{
options.type = ZMQ_SUB;
options.requires_in = true;
options.requires_out = false;
zmq_msg_init (&message);
}
zmq::xsub_t::~xsub_t ()
{
zmq_msg_close (&message);
}
void zmq::xsub_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && !outpipe_);
fq.attach (inpipe_);
}
void zmq::xsub_t::process_term (int linger_)
{
fq.terminate ();
socket_base_t::process_term (linger_);
}
int zmq::xsub_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
if (option_ == ZMQ_SUBSCRIBE) {
subscriptions.add ((unsigned char*) optval_, optvallen_);
return 0;
}
if (option_ == ZMQ_UNSUBSCRIBE) {
if (!subscriptions.rm ((unsigned char*) optval_, optvallen_)) {
errno = EINVAL;
return -1;
}
return 0;
}
errno = EINVAL;
return -1;
}
int zmq::xsub_t::xrecv (zmq_msg_t *msg_, int flags_)
{
// If there's already a message prepared by a previous call to zmq_poll,
// return it straight ahead.
if (has_message) {
zmq_msg_move (msg_, &message);
has_message = false;
more = msg_->flags & ZMQ_MSG_MORE;
return 0;
}
// TODO: This can result in infinite loop in the case of continuous
// stream of non-matching messages which breaks the non-blocking recv
// semantics.
while (true) {
// Get a message using fair queueing algorithm.
int rc = fq.recv (msg_, flags_);
// If there's no message available, return immediately.
// The same when error occurs.
if (rc != 0)
return -1;
// Check whether the message matches at least one subscription.
// Non-initial parts of the message are passed
if (more || match (msg_)) {
more = msg_->flags & ZMQ_MSG_MORE;
return 0;
}
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
while (msg_->flags & ZMQ_MSG_MORE) {
rc = fq.recv (msg_, ZMQ_NOBLOCK);
zmq_assert (rc == 0);
}
}
}
bool zmq::xsub_t::xhas_in ()
{
// There are subsequent parts of the partly-read message available.
if (more)
return true;
// If there's already a message prepared by a previous call to zmq_poll,
// return straight ahead.
if (has_message)
return true;
// TODO: This can result in infinite loop in the case of continuous
// stream of non-matching messages.
while (true) {
// Get a message using fair queueing algorithm.
int rc = fq.recv (&message, ZMQ_NOBLOCK);
// If there's no message available, return immediately.
// The same when error occurs.
if (rc != 0) {
zmq_assert (errno == EAGAIN);
return false;
}
// Check whether the message matches at least one subscription.
if (match (&message)) {
has_message = true;
return true;
}
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
while (message.flags & ZMQ_MSG_MORE) {
rc = fq.recv (&message, ZMQ_NOBLOCK);
zmq_assert (rc == 0);
}
}
}
bool zmq::xsub_t::match (zmq_msg_t *msg_)
{
return subscriptions.check ((unsigned char*) zmq_msg_data (msg_),
zmq_msg_size (msg_));
}
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_XSUB_HPP_INCLUDED__
#define __ZMQ_XSUB_HPP_INCLUDED__
#include "../include/zmq.h"
#include "trie.hpp"
#include "socket_base.hpp"
#include "fq.hpp"
namespace zmq
{
class xsub_t : public socket_base_t
{
public:
xsub_t (class ctx_t *parent_, uint32_t tid_);
~xsub_t ();
protected:
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
private:
// Hook into the termination process.
void process_term (int linger_);
// Check whether the message matches at least one subscription.
bool match (zmq_msg_t *msg_);
// Fair queueing object for inbound pipes.
fq_t fq;
// The repository of subscriptions.
trie_t subscriptions;
// If true, 'message' contains a matching message to return on the
// next recv call.
bool has_message;
zmq_msg_t message;
// If true, part of a multipart message was already received, but
// there are following parts still waiting.
bool more;
xsub_t (const xsub_t&);
void operator = (const xsub_t&);
};
}
#endif
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment