Commit 0b59866a authored by Martin Sustrik's avatar Martin Sustrik

Patches from sub-forward branch incorporated

Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent 311fb0d8
...@@ -34,6 +34,7 @@ libzmq_la_SOURCES = \ ...@@ -34,6 +34,7 @@ libzmq_la_SOURCES = \
likely.hpp \ likely.hpp \
mailbox.hpp \ mailbox.hpp \
msg.hpp \ msg.hpp \
mtrie.hpp \
mutex.hpp \ mutex.hpp \
named_session.hpp \ named_session.hpp \
object.hpp \ object.hpp \
...@@ -97,6 +98,7 @@ libzmq_la_SOURCES = \ ...@@ -97,6 +98,7 @@ libzmq_la_SOURCES = \
lb.cpp \ lb.cpp \
mailbox.cpp \ mailbox.cpp \
msg.cpp \ msg.cpp \
mtrie.cpp \
named_session.cpp \ named_session.cpp \
object.cpp \ object.cpp \
options.cpp \ options.cpp \
......
...@@ -42,6 +42,7 @@ namespace zmq ...@@ -42,6 +42,7 @@ namespace zmq
bind, bind,
activate_read, activate_read,
activate_write, activate_write,
hiccup,
pipe_term, pipe_term,
pipe_term_ack, pipe_term_ack,
term_req, term_req,
...@@ -95,6 +96,13 @@ namespace zmq ...@@ -95,6 +96,13 @@ namespace zmq
uint64_t msgs_read; uint64_t msgs_read;
} activate_write; } activate_write;
// Sent by pipe reader to writer after creating a new inpipe.
// The parameter is actually of type pipe_t::upipe_t, however,
// its definition is private so we'll have to do with void*.
struct {
void *pipe;
} hiccup;
// Sent by pipe reader to pipe writer to ask it to terminate // Sent by pipe reader to pipe writer to ask it to terminate
// its end of the pipe. // its end of the pipe.
struct { struct {
......
...@@ -111,7 +111,7 @@ void zmq::connect_session_t::start_connecting (bool wait_) ...@@ -111,7 +111,7 @@ void zmq::connect_session_t::start_connecting (bool wait_)
zmq_assert (false); zmq_assert (false);
} }
bool zmq::connect_session_t::attached (const blob_t &peer_identity_) bool zmq::connect_session_t::xattached (const blob_t &peer_identity_)
{ {
// If there was no previous connection... // If there was no previous connection...
if (!connected) { if (!connected) {
...@@ -153,9 +153,12 @@ bool zmq::connect_session_t::attached (const blob_t &peer_identity_) ...@@ -153,9 +153,12 @@ bool zmq::connect_session_t::attached (const blob_t &peer_identity_)
return true; return true;
} }
void zmq::connect_session_t::detached () bool zmq::connect_session_t::xdetached ()
{ {
// Reconnect. // Reconnect.
start_connecting (true); start_connecting (true);
// Don't tear the session down.
return true;
} }
...@@ -44,8 +44,8 @@ namespace zmq ...@@ -44,8 +44,8 @@ namespace zmq
private: private:
// Handlers for events from session base class. // Handlers for events from session base class.
bool attached (const blob_t &peer_identity_); bool xattached (const blob_t &peer_identity_);
void detached (); bool xdetached ();
// Start the connection process. // Start the connection process.
void start_connecting (bool wait_); void start_connecting (bool wait_);
......
...@@ -62,6 +62,11 @@ void zmq::fq_t::activated (pipe_t *pipe_) ...@@ -62,6 +62,11 @@ void zmq::fq_t::activated (pipe_t *pipe_)
} }
int zmq::fq_t::recv (msg_t *msg_, int flags_) int zmq::fq_t::recv (msg_t *msg_, int flags_)
{
return recvpipe (msg_, flags_, NULL);
}
int zmq::fq_t::recvpipe (msg_t *msg_, int flags_, pipe_t **pipe_)
{ {
// Deallocate old content of the message. // Deallocate old content of the message.
int rc = msg_->close (); int rc = msg_->close ();
...@@ -83,6 +88,8 @@ int zmq::fq_t::recv (msg_t *msg_, int flags_) ...@@ -83,6 +88,8 @@ int zmq::fq_t::recv (msg_t *msg_, int flags_)
// and replaced by another active pipe. Thus we don't have to increase // and replaced by another active pipe. Thus we don't have to increase
// the 'current' pointer. // the 'current' pointer.
if (fetched) { if (fetched) {
if (pipe_)
*pipe_ = pipes [current];
more = msg_->flags () & msg_t::more; more = msg_->flags () & msg_t::more;
if (!more) { if (!more) {
current++; current++;
......
...@@ -29,8 +29,9 @@ namespace zmq ...@@ -29,8 +29,9 @@ namespace zmq
{ {
// Class manages a set of inbound pipes. On receive it performs fair // Class manages a set of inbound pipes. On receive it performs fair
// queueing (RFC970) so that senders gone berserk won't cause denial of // queueing so that senders gone berserk won't cause denial of
// service for decent senders. // service for decent senders.
class fq_t class fq_t
{ {
public: public:
...@@ -43,6 +44,7 @@ namespace zmq ...@@ -43,6 +44,7 @@ namespace zmq
void terminated (pipe_t *pipe_); void terminated (pipe_t *pipe_);
int recv (msg_t *msg_, int flags_); int recv (msg_t *msg_, int flags_);
int recvpipe (msg_t *msg_, int flags_, pipe_t **pipe_);
bool has_in (); bool has_in ();
private: private:
......
/*
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include <new>
#include <algorithm>
#include "platform.hpp"
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#endif
#include "err.hpp"
#include "pipe.hpp"
#include "mtrie.hpp"
zmq::mtrie_t::mtrie_t () :
min (0),
count (0)
{
}
zmq::mtrie_t::~mtrie_t ()
{
if (count == 1)
delete next.node;
else if (count > 1) {
for (unsigned short i = 0; i != count; ++i)
if (next.table [i])
delete next.table [i];
free (next.table);
}
}
bool zmq::mtrie_t::add (unsigned char *prefix_, size_t size_, pipe_t *pipe_)
{
// We are at the node corresponding to the prefix. We are done.
if (!size_) {
bool result = pipes.empty ();
pipes.insert (pipe_);
return result;
}
unsigned char c = *prefix_;
if (c < min || c >= min + count) {
// The character is out of range of currently handled
// charcters. We have to extend the table.
if (!count) {
min = c;
count = 1;
next.node = NULL;
}
else if (count == 1) {
unsigned char oldc = min;
mtrie_t *oldp = next.node;
count = (min < c ? c - min : min - c) + 1;
next.table = (mtrie_t**)
malloc (sizeof (mtrie_t*) * count);
zmq_assert (next.table);
for (unsigned short i = 0; i != count; ++i)
next.table [i] = 0;
min = std::min (min, c);
next.table [oldc - min] = oldp;
}
else if (min < c) {
// The new character is above the current character range.
unsigned short old_count = count;
count = c - min + 1;
next.table = (mtrie_t**) realloc ((void*) next.table,
sizeof (mtrie_t*) * count);
zmq_assert (next.table);
for (unsigned short i = old_count; i != count; i++)
next.table [i] = NULL;
}
else {
// The new character is below the current character range.
unsigned short old_count = count;
count = (min + old_count) - c;
next.table = (mtrie_t**) realloc ((void*) next.table,
sizeof (mtrie_t*) * count);
zmq_assert (next.table);
memmove (next.table + min - c, next.table,
old_count * sizeof (mtrie_t*));
for (unsigned short i = 0; i != min - c; i++)
next.table [i] = NULL;
min = c;
}
}
// If next node does not exist, create one.
if (count == 1) {
if (!next.node) {
next.node = new (std::nothrow) mtrie_t;
zmq_assert (next.node);
}
return next.node->add (prefix_ + 1, size_ - 1, pipe_);
}
else {
if (!next.table [c - min]) {
next.table [c - min] = new (std::nothrow) mtrie_t;
zmq_assert (next.table [c - min]);
}
return next.table [c - min]->add (prefix_ + 1, size_ - 1, pipe_);
}
}
void zmq::mtrie_t::rm (pipe_t *pipe_,
void (*func_) (unsigned char *data_, size_t size_, void *arg_),
void *arg_)
{
unsigned char *buff = NULL;
rm_helper (pipe_, &buff, 0, 0, func_, arg_);
free (buff);
}
void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_,
size_t buffsize_, size_t maxbuffsize_,
void (*func_) (unsigned char *data_, size_t size_, void *arg_),
void *arg_)
{
// Remove the subscription from this node.
if (pipes.erase (pipe_) && pipes.empty ())
func_ (*buff_, buffsize_, arg_);
// Adjust the buffer.
if (buffsize_ >= maxbuffsize_) {
maxbuffsize_ = buffsize_ + 256;
*buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_);
alloc_assert (*buff_);
}
// If there are no subnodes in the trie, return.
if (count == 0)
return;
// If there's one subnode (optimisation).
if (count == 1) {
(*buff_) [buffsize_] = min;
buffsize_++;
next.node->rm_helper (pipe_, buff_, buffsize_, maxbuffsize_,
func_, arg_);
return;
}
// If there are multiple subnodes.
for (unsigned char c = 0; c != count; c++) {
(*buff_) [buffsize_] = min + c;
if (next.table [c])
next.table [c]->rm_helper (pipe_, buff_, buffsize_ + 1,
maxbuffsize_, func_, arg_);
}
}
bool zmq::mtrie_t::rm (unsigned char *prefix_, size_t size_, pipe_t *pipe_)
{
if (!size_) {
pipes_t::size_type erased = pipes.erase (pipe_);
zmq_assert (erased == 1);
return pipes.empty ();
}
unsigned char c = *prefix_;
if (!count || c < min || c >= min + count)
return false;
mtrie_t *next_node =
count == 1 ? next.node : next.table [c - min];
if (!next_node)
return false;
return next_node->rm (prefix_ + 1, size_ - 1, pipe_);
}
void zmq::mtrie_t::match (unsigned char *data_, size_t size_, pipes_t &pipes_)
{
// Merge the subscriptions from this node to the resultset.
pipes_.insert (pipes.begin (), pipes.end ());
// If there are no subnodes in the trie, return.
if (count == 0)
return;
// If there's one subnode (optimisation).
if (count == 1) {
next.node->match (data_ + 1, size_ - 1, pipes_);
return;
}
// If there are multiple subnodes.
for (unsigned char c = 0; c != count; c++) {
if (next.table [c])
next.table [c]->match (data_ + 1, size_ - 1, pipes_);
}
}
/*
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_MTRIE_HPP_INCLUDED__
#define __ZMQ_MTRIE_HPP_INCLUDED__
#include <stddef.h>
#include <set>
#include "stdint.hpp"
namespace zmq
{
// Multi-trie. Each node in the trie is a set of pointers to pipes.
class mtrie_t
{
public:
typedef std::set <class pipe_t*> pipes_t;
mtrie_t ();
~mtrie_t ();
// Add key to the trie. Returns true if it's a new subscription
// rather than a duplicate.
bool add (unsigned char *prefix_, size_t size_, class pipe_t *pipe_);
// Remove all subscriptions for a specific peer from the trie.
// If there are no subscriptions left on some topics, invoke the
// supplied callback function.
void rm (class pipe_t *pipe_,
void (*func_) (unsigned char *data_, size_t size_, void *arg_),
void *arg_);
// Remove specific subscription from the trie. Return true is it was
// actually removed rather than de-duplicated.
bool rm (unsigned char *prefix_, size_t size_, class pipe_t *pipe_);
// Get all matching pipes.
void match (unsigned char *data_, size_t size_, pipes_t &pipes_);
private:
void rm_helper (class pipe_t *pipe_, unsigned char **buff_,
size_t buffsize_, size_t maxbuffsize_,
void (*func_) (unsigned char *data_, size_t size_, void *arg_),
void *arg_);
pipes_t pipes;
unsigned char min;
unsigned short count;
union {
class mtrie_t *node;
class mtrie_t **table;
} next;
mtrie_t (const mtrie_t&);
const mtrie_t &operator = (const mtrie_t&);
};
}
#endif
...@@ -45,7 +45,7 @@ zmq::named_session_t::~named_session_t () ...@@ -45,7 +45,7 @@ zmq::named_session_t::~named_session_t ()
unregister_session (peer_identity); unregister_session (peer_identity);
} }
bool zmq::named_session_t::attached (const blob_t &peer_identity_) bool zmq::named_session_t::xattached (const blob_t &peer_identity_)
{ {
// Double check that identities match. // Double check that identities match.
zmq_assert (peer_identity == peer_identity_); zmq_assert (peer_identity == peer_identity_);
...@@ -58,9 +58,10 @@ bool zmq::named_session_t::attached (const blob_t &peer_identity_) ...@@ -58,9 +58,10 @@ bool zmq::named_session_t::attached (const blob_t &peer_identity_)
return true; return true;
} }
void zmq::named_session_t::detached () bool zmq::named_session_t::xdetached ()
{ {
// Do nothing. Named sessions are never destroyed because of disconnection. // Do nothing. Named sessions are never destroyed because of disconnection.
// Neither they have to actively reconnect. // Neither they have to actively reconnect.
return true;
} }
...@@ -40,8 +40,8 @@ namespace zmq ...@@ -40,8 +40,8 @@ namespace zmq
~named_session_t (); ~named_session_t ();
// Handlers for events from session base class. // Handlers for events from session base class.
bool attached (const blob_t &peer_identity_); bool xattached (const blob_t &peer_identity_);
void detached (); bool xdetached ();
private: private:
......
...@@ -74,7 +74,7 @@ void zmq::object_t::process_command (command_t &cmd_) ...@@ -74,7 +74,7 @@ void zmq::object_t::process_command (command_t &cmd_)
case command_t::plug: case command_t::plug:
process_plug (); process_plug ();
process_seqnum (); process_seqnum ();
return; break;
case command_t::own: case command_t::own:
process_own (cmd_.args.own.object); process_own (cmd_.args.own.object);
...@@ -96,9 +96,13 @@ void zmq::object_t::process_command (command_t &cmd_) ...@@ -96,9 +96,13 @@ void zmq::object_t::process_command (command_t &cmd_)
process_seqnum (); process_seqnum ();
break; break;
case command_t::hiccup:
process_hiccup (cmd_.args.hiccup.pipe);
break;
case command_t::pipe_term: case command_t::pipe_term:
process_pipe_term (); process_pipe_term ();
return; break;
case command_t::pipe_term_ack: case command_t::pipe_term_ack:
process_pipe_term_ack (); process_pipe_term_ack ();
...@@ -290,6 +294,18 @@ void zmq::object_t::send_activate_write (pipe_t *destination_, ...@@ -290,6 +294,18 @@ void zmq::object_t::send_activate_write (pipe_t *destination_,
send_command (cmd); send_command (cmd);
} }
void zmq::object_t::send_hiccup (pipe_t *destination_, void *pipe_)
{
command_t cmd;
#if defined ZMQ_MAKE_VALGRIND_HAPPY
memset (&cmd, 0, sizeof (cmd));
#endif
cmd.destination = destination_;
cmd.type = command_t::hiccup;
cmd.args.hiccup.pipe = pipe_;
send_command (cmd);
}
void zmq::object_t::send_pipe_term (pipe_t *destination_) void zmq::object_t::send_pipe_term (pipe_t *destination_)
{ {
command_t cmd; command_t cmd;
...@@ -418,6 +434,11 @@ void zmq::object_t::process_activate_write (uint64_t msgs_read_) ...@@ -418,6 +434,11 @@ void zmq::object_t::process_activate_write (uint64_t msgs_read_)
zmq_assert (false); zmq_assert (false);
} }
void zmq::object_t::process_hiccup (void *pipe_)
{
zmq_assert (false);
}
void zmq::object_t::process_pipe_term () void zmq::object_t::process_pipe_term ()
{ {
zmq_assert (false); zmq_assert (false);
......
...@@ -71,6 +71,7 @@ namespace zmq ...@@ -71,6 +71,7 @@ namespace zmq
void send_activate_read (class pipe_t *destination_); void send_activate_read (class pipe_t *destination_);
void send_activate_write (class pipe_t *destination_, void send_activate_write (class pipe_t *destination_,
uint64_t msgs_read_); uint64_t msgs_read_);
void send_hiccup (class pipe_t *destination_, void *pipe_);
void send_pipe_term (class pipe_t *destination_); void send_pipe_term (class pipe_t *destination_);
void send_pipe_term_ack (class pipe_t *destination_); void send_pipe_term_ack (class pipe_t *destination_);
void send_term_req (class own_t *destination_, void send_term_req (class own_t *destination_,
...@@ -92,6 +93,7 @@ namespace zmq ...@@ -92,6 +93,7 @@ namespace zmq
const blob_t &peer_identity_); const blob_t &peer_identity_);
virtual void process_activate_read (); virtual void process_activate_read ();
virtual void process_activate_write (uint64_t msgs_read_); virtual void process_activate_write (uint64_t msgs_read_);
virtual void process_hiccup (void *pipe_);
virtual void process_pipe_term (); virtual void process_pipe_term ();
virtual void process_pipe_term_ack (); virtual void process_pipe_term_ack ();
virtual void process_term_req (class own_t *object_); virtual void process_term_req (class own_t *object_);
......
...@@ -205,6 +205,29 @@ void zmq::pipe_t::process_activate_write (uint64_t msgs_read_) ...@@ -205,6 +205,29 @@ void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)
} }
} }
void zmq::pipe_t::process_hiccup (void *pipe_)
{
// Destroy old outpipe. Note that the read end of the pipe was already
// migrated to this thread.
zmq_assert (outpipe);
outpipe->flush ();
msg_t msg;
while (outpipe->read (&msg)) {
int rc = msg.close ();
errno_assert (rc == 0);
}
delete outpipe;
// Plug in the new outpipe.
zmq_assert (pipe_);
outpipe = (upipe_t*) pipe_;
out_active = true;
// If appropriate, notify the user about the hiccup.
if (state == active)
sink->hiccuped (this);
}
void zmq::pipe_t::process_pipe_term () void zmq::pipe_t::process_pipe_term ()
{ {
// This is the simple case of peer-induced termination. If there are no // This is the simple case of peer-induced termination. If there are no
...@@ -379,3 +402,23 @@ void zmq::pipe_t::delimit () ...@@ -379,3 +402,23 @@ void zmq::pipe_t::delimit ()
// Delimiter in any other state is invalid. // Delimiter in any other state is invalid.
zmq_assert (false); zmq_assert (false);
} }
void zmq::pipe_t::hiccup ()
{
// If termination is already under way do nothing.
if (state != active)
return;
// We'll drop the pointer to the inpipe. From now on, the peer is
// responsible for deallocating it.
inpipe = NULL;
// Create new inpipe.
inpipe = new (std::nothrow) pipe_t::upipe_t ();
alloc_assert (inpipe);
in_active = true;
// Notify the peer about the hiccup.
send_hiccup (peer, (void*) inpipe);
}
...@@ -44,6 +44,7 @@ namespace zmq ...@@ -44,6 +44,7 @@ namespace zmq
virtual void read_activated (class pipe_t *pipe_) = 0; virtual void read_activated (class pipe_t *pipe_) = 0;
virtual void write_activated (class pipe_t *pipe_) = 0; virtual void write_activated (class pipe_t *pipe_) = 0;
virtual void hiccuped (class pipe_t *pipe_) = 0;
virtual void terminated (class pipe_t *pipe_) = 0; virtual void terminated (class pipe_t *pipe_) = 0;
}; };
...@@ -86,6 +87,11 @@ namespace zmq ...@@ -86,6 +87,11 @@ namespace zmq
// Flush the messages downsteam. // Flush the messages downsteam.
void flush (); void flush ();
// Temporaraily disconnects the inbound message stream and drops
// all the messages on the fly. Causes 'hiccuped' event to be generated
// in the peer.
void hiccup ();
// Ask pipe to terminate. The termination will happen asynchronously // Ask pipe to terminate. The termination will happen asynchronously
// and user will be notified about actual deallocation by 'terminated' // and user will be notified about actual deallocation by 'terminated'
// event. // event.
...@@ -93,18 +99,19 @@ namespace zmq ...@@ -93,18 +99,19 @@ namespace zmq
private: private:
// Type of the underlying lock-free pipe.
typedef ypipe_t <msg_t, message_pipe_granularity> upipe_t;
// Command handlers. // Command handlers.
void process_activate_read (); void process_activate_read ();
void process_activate_write (uint64_t msgs_read_); void process_activate_write (uint64_t msgs_read_);
void process_hiccup (void *pipe_);
void process_pipe_term (); void process_pipe_term ();
void process_pipe_term_ack (); void process_pipe_term_ack ();
// Handler for delimiter read from the pipe. // Handler for delimiter read from the pipe.
void delimit (); void delimit ();
// Type of the underlying lock-free pipe.
typedef ypipe_t <msg_t, message_pipe_granularity> upipe_t;
// Constructor is private. Pipe can only be created using // Constructor is private. Pipe can only be created using
// pipepair function. // pipepair function.
pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
......
...@@ -30,3 +30,16 @@ zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t tid_) : ...@@ -30,3 +30,16 @@ zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t tid_) :
zmq::pub_t::~pub_t () zmq::pub_t::~pub_t ()
{ {
} }
int zmq::pub_t::xrecv (class msg_t *msg_, int flags_)
{
// Messages cannot be received from PUB socket.
errno = ENOTSUP;
return -1;
}
bool zmq::pub_t::xhas_in ()
{
return false;
}
...@@ -33,6 +33,10 @@ namespace zmq ...@@ -33,6 +33,10 @@ namespace zmq
pub_t (class ctx_t *parent_, uint32_t tid_); pub_t (class ctx_t *parent_, uint32_t tid_);
~pub_t (); ~pub_t ();
// Implementations of virtual functions from socket_base_t.
int xrecv (class msg_t *msg_, int flags_);
bool xhas_in ();
private: private:
pub_t (const pub_t&); pub_t (const pub_t&);
......
...@@ -146,6 +146,13 @@ void zmq::session_t::write_activated (pipe_t *pipe_) ...@@ -146,6 +146,13 @@ void zmq::session_t::write_activated (pipe_t *pipe_)
engine->activate_in (); engine->activate_in ();
} }
void zmq::session_t::hiccuped (pipe_t *pipe_)
{
// Hiccups are always sent from session to socket, not the other
// way round.
zmq_assert (false);
}
void zmq::session_t::process_plug () void zmq::session_t::process_plug ()
{ {
} }
...@@ -287,4 +294,25 @@ void zmq::session_t::unregister_session (const blob_t &name_) ...@@ -287,4 +294,25 @@ void zmq::session_t::unregister_session (const blob_t &name_)
socket->unregister_session (name_); socket->unregister_session (name_);
} }
bool zmq::session_t::attached (const blob_t &peer_identity_)
{
return xattached (peer_identity_);
}
void zmq::session_t::detached ()
{
if (!xdetached ()) {
// Derived session type have asked for session termination.
terminate ();
return;
}
// For subscriber sockets we hiccup the inbound pipe, which will cause
// the socket object to resend all the subscriptions.
if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB))
pipe->hiccup ();
}
...@@ -44,9 +44,7 @@ namespace zmq ...@@ -44,9 +44,7 @@ namespace zmq
// To be used once only, when creating the session. // To be used once only, when creating the session.
void attach_pipe (class pipe_t *pipe_); void attach_pipe (class pipe_t *pipe_);
// i_inout interface implementation. Note that detach method is not // i_inout interface implementation.
// implemented by generic session. Different session types may handle
// engine disconnection in different ways.
bool read (msg_t *msg_); bool read (msg_t *msg_);
bool write (msg_t *msg_); bool write (msg_t *msg_);
void flush (); void flush ();
...@@ -55,17 +53,19 @@ namespace zmq ...@@ -55,17 +53,19 @@ namespace zmq
// i_pipe_events interface implementation. // i_pipe_events interface implementation.
void read_activated (class pipe_t *pipe_); void read_activated (class pipe_t *pipe_);
void write_activated (class pipe_t *pipe_); void write_activated (class pipe_t *pipe_);
void hiccuped (class pipe_t *pipe_);
void terminated (class pipe_t *pipe_); void terminated (class pipe_t *pipe_);
protected: protected:
// Two events for the derived session type. Attached is triggered // Events from the engine. Attached is triggered when session is
// when session is attached to a peer. The function can reject the new // attached to a peer. The function can reject the new peer by
// peer by returning false. Detached is triggered at the beginning of // returning false. Detached is triggered at the beginning of
// the termination process when session is about to be detached from // the termination process when session is about to be detached from
// the peer. // the peer. If it returns false, session will be terminated.
virtual bool attached (const blob_t &peer_identity_) = 0; // To be overloaded by the derived session type.
virtual void detached () = 0; virtual bool xattached (const blob_t &peer_identity_) = 0;
virtual bool xdetached () = 0;
// Returns true if there is an engine attached to the session. // Returns true if there is an engine attached to the session.
bool has_engine (); bool has_engine ();
...@@ -78,6 +78,9 @@ namespace zmq ...@@ -78,6 +78,9 @@ namespace zmq
private: private:
bool attached (const blob_t &peer_identity_);
void detached ();
// Handlers for incoming commands. // Handlers for incoming commands.
void process_plug (); void process_plug ();
void process_attach (struct i_engine *engine_, void process_attach (struct i_engine *engine_,
......
...@@ -764,7 +764,7 @@ bool zmq::socket_base_t::xhas_out () ...@@ -764,7 +764,7 @@ bool zmq::socket_base_t::xhas_out ()
return false; return false;
} }
int zmq::socket_base_t::xsend (msg_t *msg_, int options_) int zmq::socket_base_t::xsend (msg_t *msg_, int flags_)
{ {
errno = ENOTSUP; errno = ENOTSUP;
return -1; return -1;
...@@ -775,7 +775,7 @@ bool zmq::socket_base_t::xhas_in () ...@@ -775,7 +775,7 @@ bool zmq::socket_base_t::xhas_in ()
return false; return false;
} }
int zmq::socket_base_t::xrecv (msg_t *msg_, int options_) int zmq::socket_base_t::xrecv (msg_t *msg_, int flags_)
{ {
errno = ENOTSUP; errno = ENOTSUP;
return -1; return -1;
...@@ -790,6 +790,11 @@ void zmq::socket_base_t::xwrite_activated (pipe_t *pipe_) ...@@ -790,6 +790,11 @@ void zmq::socket_base_t::xwrite_activated (pipe_t *pipe_)
zmq_assert (false); zmq_assert (false);
} }
void zmq::socket_base_t::xhiccuped (pipe_t *pipe_)
{
zmq_assert (false);
}
void zmq::socket_base_t::in_event () void zmq::socket_base_t::in_event ()
{ {
// Process any commands from other threads/sockets that may be available // Process any commands from other threads/sockets that may be available
...@@ -837,6 +842,11 @@ void zmq::socket_base_t::write_activated (pipe_t *pipe_) ...@@ -837,6 +842,11 @@ void zmq::socket_base_t::write_activated (pipe_t *pipe_)
xwrite_activated (pipe_); xwrite_activated (pipe_);
} }
void zmq::socket_base_t::hiccuped (pipe_t *pipe_)
{
xhiccuped (pipe_);
}
void zmq::socket_base_t::terminated (pipe_t *pipe_) void zmq::socket_base_t::terminated (pipe_t *pipe_)
{ {
// Notify the specific socket type about the pipe termination. // Notify the specific socket type about the pipe termination.
......
...@@ -96,6 +96,7 @@ namespace zmq ...@@ -96,6 +96,7 @@ namespace zmq
// i_pipe_events interface implementation. // i_pipe_events interface implementation.
void read_activated (pipe_t *pipe_); void read_activated (pipe_t *pipe_);
void write_activated (pipe_t *pipe_); void write_activated (pipe_t *pipe_);
void hiccuped (pipe_t *pipe_);
void terminated (pipe_t *pipe_); void terminated (pipe_t *pipe_);
protected: protected:
...@@ -116,15 +117,16 @@ namespace zmq ...@@ -116,15 +117,16 @@ namespace zmq
// The default implementation assumes that send is not supported. // The default implementation assumes that send is not supported.
virtual bool xhas_out (); virtual bool xhas_out ();
virtual int xsend (class msg_t *msg_, int options_); virtual int xsend (class msg_t *msg_, int flags_);
// The default implementation assumes that recv in not supported. // The default implementation assumes that recv in not supported.
virtual bool xhas_in (); virtual bool xhas_in ();
virtual int xrecv (class msg_t *msg_, int options_); virtual int xrecv (class msg_t *msg_, int flags_);
// i_pipe_events will be forwarded to these functions. // i_pipe_events will be forwarded to these functions.
virtual void xread_activated (pipe_t *pipe_); virtual void xread_activated (pipe_t *pipe_);
virtual void xwrite_activated (pipe_t *pipe_); virtual void xwrite_activated (pipe_t *pipe_);
virtual void xhiccuped (pipe_t *pipe_);
virtual void xterminated (pipe_t *pipe_) = 0; virtual void xterminated (pipe_t *pipe_) = 0;
// Delay actual destruction of the socket. // Delay actual destruction of the socket.
......
...@@ -22,43 +22,18 @@ ...@@ -22,43 +22,18 @@
#include "msg.hpp" #include "msg.hpp"
zmq::sub_t::sub_t (class ctx_t *parent_, uint32_t tid_) : zmq::sub_t::sub_t (class ctx_t *parent_, uint32_t tid_) :
xsub_t (parent_, tid_), xsub_t (parent_, tid_)
has_message (false),
more (false)
{ {
options.type = ZMQ_SUB; options.type = ZMQ_SUB;
int rc = message.init ();
errno_assert (rc == 0);
} }
zmq::sub_t::~sub_t () zmq::sub_t::~sub_t ()
{ {
int rc = message.close ();
errno_assert (rc == 0);
} }
int zmq::sub_t::xsetsockopt (int option_, const void *optval_, int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_) size_t optvallen_)
{ {
// Process a subscription.
if (option_ == ZMQ_SUBSCRIBE)
subscriptions.add ((unsigned char*) optval_, optvallen_);
// Process an unsubscription. Return error if there is no corresponding
// subscription.
else if (option_ == ZMQ_UNSUBSCRIBE) {
if (!subscriptions.rm ((unsigned char*) optval_, optvallen_)) {
errno = EINVAL;
return -1;
}
}
// Unknow option.
else {
errno = EINVAL;
return -1;
}
// Create the subscription message. // Create the subscription message.
msg_t msg; msg_t msg;
int rc = msg.init_size (optvallen_ + 1); int rc = msg.init_size (optvallen_ + 1);
...@@ -82,7 +57,7 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_, ...@@ -82,7 +57,7 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
return rc; return rc;
} }
int zmq::sub_t::xsend (msg_t *msg_, int options_) int zmq::sub_t::xsend (msg_t *msg_, int flags_)
{ {
// Overload the XSUB's send. // Overload the XSUB's send.
errno = ENOTSUP; errno = ENOTSUP;
...@@ -95,89 +70,3 @@ bool zmq::sub_t::xhas_out () ...@@ -95,89 +70,3 @@ bool zmq::sub_t::xhas_out ()
return false; return false;
} }
int zmq::sub_t::xrecv (msg_t *msg_, int flags_)
{
// If there's already a message prepared by a previous call to zmq_poll,
// return it straight ahead.
if (has_message) {
int rc = msg_->move (message);
errno_assert (rc == 0);
has_message = false;
more = msg_->flags () & msg_t::more;
return 0;
}
// TODO: This can result in infinite loop in the case of continuous
// stream of non-matching messages which breaks the non-blocking recv
// semantics.
while (true) {
// Get a message using fair queueing algorithm.
int rc = xsub_t::xrecv (msg_, flags_);
// If there's no message available, return immediately.
// The same when error occurs.
if (rc != 0)
return -1;
// Check whether the message matches at least one subscription.
// Non-initial parts of the message are passed
if (more || match (msg_)) {
more = msg_->flags () & msg_t::more;
return 0;
}
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
while (msg_->flags () & msg_t::more) {
rc = xsub_t::xrecv (msg_, ZMQ_DONTWAIT);
zmq_assert (rc == 0);
}
}
}
bool zmq::sub_t::xhas_in ()
{
// There are subsequent parts of the partly-read message available.
if (more)
return true;
// If there's already a message prepared by a previous call to zmq_poll,
// return straight ahead.
if (has_message)
return true;
// TODO: This can result in infinite loop in the case of continuous
// stream of non-matching messages.
while (true) {
// Get a message using fair queueing algorithm.
int rc = xsub_t::xrecv (&message, ZMQ_DONTWAIT);
// If there's no message available, return immediately.
// The same when error occurs.
if (rc != 0) {
zmq_assert (errno == EAGAIN);
return false;
}
// Check whether the message matches at least one subscription.
if (match (&message)) {
has_message = true;
return true;
}
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
while (message.flags () & msg_t::more) {
rc = xsub_t::xrecv (&message, ZMQ_DONTWAIT);
zmq_assert (rc == 0);
}
}
}
bool zmq::sub_t::match (msg_t *msg_)
{
return subscriptions.check ((unsigned char*) msg_->data (), msg_->size ());
}
...@@ -22,8 +22,6 @@ ...@@ -22,8 +22,6 @@
#define __ZMQ_SUB_HPP_INCLUDED__ #define __ZMQ_SUB_HPP_INCLUDED__
#include "xsub.hpp" #include "xsub.hpp"
#include "trie.hpp"
#include "msg.hpp"
namespace zmq namespace zmq
{ {
...@@ -38,28 +36,11 @@ namespace zmq ...@@ -38,28 +36,11 @@ namespace zmq
protected: protected:
int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (class msg_t *msg_, int options_); int xsend (class msg_t *msg_, int flags_);
bool xhas_out (); bool xhas_out ();
int xrecv (class msg_t *msg_, int flags_);
bool xhas_in ();
private: private:
// Check whether the message matches at least one subscription.
bool match (class msg_t *msg_);
// The repository of subscriptions.
trie_t subscriptions;
// If true, 'message' contains a matching message to return on the
// next recv call.
bool has_message;
msg_t message;
// If true, part of a multipart message was already received, but
// there are following parts still waiting.
bool more;
sub_t (const sub_t&); sub_t (const sub_t&);
const sub_t &operator = (const sub_t&); const sub_t &operator = (const sub_t&);
}; };
......
...@@ -30,14 +30,14 @@ zmq::transient_session_t::~transient_session_t () ...@@ -30,14 +30,14 @@ zmq::transient_session_t::~transient_session_t ()
{ {
} }
bool zmq::transient_session_t::attached (const blob_t &peer_identity_) bool zmq::transient_session_t::xattached (const blob_t &peer_identity_)
{ {
// Transient session is always valid. // Transient session is always valid.
return true; return true;
} }
void zmq::transient_session_t::detached () bool zmq::transient_session_t::xdetached ()
{ {
// There's no way to reestablish a transient session. Tear it down. // There's no way to reestablish a transient session. Tear it down.
terminate (); return false;
} }
...@@ -40,8 +40,8 @@ namespace zmq ...@@ -40,8 +40,8 @@ namespace zmq
private: private:
// Handlers for events from session base class. // Handlers for events from session base class.
bool attached (const blob_t &peer_identity_); bool xattached (const blob_t &peer_identity_);
void detached (); bool xdetached ();
transient_session_t (const transient_session_t&); transient_session_t (const transient_session_t&);
const transient_session_t &operator = (const transient_session_t&); const transient_session_t &operator = (const transient_session_t&);
......
...@@ -50,12 +50,12 @@ zmq::trie_t::~trie_t () ...@@ -50,12 +50,12 @@ zmq::trie_t::~trie_t ()
} }
} }
void zmq::trie_t::add (unsigned char *prefix_, size_t size_) bool zmq::trie_t::add (unsigned char *prefix_, size_t size_)
{ {
// We are at the node corresponding to the prefix. We are done. // We are at the node corresponding to the prefix. We are done.
if (!size_) { if (!size_) {
++refcnt; ++refcnt;
return; return refcnt == 1;
} }
unsigned char c = *prefix_; unsigned char c = *prefix_;
...@@ -74,7 +74,7 @@ void zmq::trie_t::add (unsigned char *prefix_, size_t size_) ...@@ -74,7 +74,7 @@ void zmq::trie_t::add (unsigned char *prefix_, size_t size_)
count = (min < c ? c - min : min - c) + 1; count = (min < c ? c - min : min - c) + 1;
next.table = (trie_t**) next.table = (trie_t**)
malloc (sizeof (trie_t*) * count); malloc (sizeof (trie_t*) * count);
alloc_assert (next.table); zmq_assert (next.table);
for (unsigned short i = 0; i != count; ++i) for (unsigned short i = 0; i != count; ++i)
next.table [i] = 0; next.table [i] = 0;
min = std::min (min, c); min = std::min (min, c);
...@@ -111,26 +111,28 @@ void zmq::trie_t::add (unsigned char *prefix_, size_t size_) ...@@ -111,26 +111,28 @@ void zmq::trie_t::add (unsigned char *prefix_, size_t size_)
if (count == 1) { if (count == 1) {
if (!next.node) { if (!next.node) {
next.node = new (std::nothrow) trie_t; next.node = new (std::nothrow) trie_t;
alloc_assert (next.node); zmq_assert (next.node);
} }
next.node->add (prefix_ + 1, size_ - 1); return next.node->add (prefix_ + 1, size_ - 1);
} }
else { else {
if (!next.table [c - min]) { if (!next.table [c - min]) {
next.table [c - min] = new (std::nothrow) trie_t; next.table [c - min] = new (std::nothrow) trie_t;
alloc_assert (next.table [c - min]); zmq_assert (next.table [c - min]);
} }
next.table [c - min]->add (prefix_ + 1, size_ - 1); return next.table [c - min]->add (prefix_ + 1, size_ - 1);
} }
} }
bool zmq::trie_t::rm (unsigned char *prefix_, size_t size_) bool zmq::trie_t::rm (unsigned char *prefix_, size_t size_)
{ {
// TODO: Shouldn't an error be reported if the key does not exist?
if (!size_) { if (!size_) {
if (!refcnt) if (!refcnt)
return false; return false;
refcnt--; refcnt--;
return true; return refcnt == 0;
} }
unsigned char c = *prefix_; unsigned char c = *prefix_;
...@@ -179,3 +181,48 @@ bool zmq::trie_t::check (unsigned char *data_, size_t size_) ...@@ -179,3 +181,48 @@ bool zmq::trie_t::check (unsigned char *data_, size_t size_)
size_--; size_--;
} }
} }
void zmq::trie_t::apply (void (*func_) (unsigned char *data_, size_t size_,
void *arg_), void *arg_)
{
unsigned char *buff = NULL;
apply_helper (&buff, 0, 0, func_, arg_);
free (buff);
}
void zmq::trie_t::apply_helper (
unsigned char **buff_, size_t buffsize_, size_t maxbuffsize_,
void (*func_) (unsigned char *data_, size_t size_, void *arg_), void *arg_)
{
// If this node is a subscription, apply the function.
if (refcnt)
func_ (*buff_, buffsize_, arg_);
// Adjust the buffer.
if (buffsize_ >= maxbuffsize_) {
maxbuffsize_ = buffsize_ + 256;
*buff_ = (unsigned char*) realloc (*buff_, maxbuffsize_);
zmq_assert (*buff_);
}
// If there are no subnodes in the trie, return.
if (count == 0)
return;
// If there's one subnode (optimisation).
if (count == 1) {
(*buff_) [buffsize_] = min;
buffsize_++;
next.node->apply_helper (buff_, buffsize_, maxbuffsize_, func_, arg_);
return;
}
// If there are multiple subnodes.
for (unsigned char c = 0; c != count; c++) {
(*buff_) [buffsize_] = min + c;
if (next.table [c])
next.table [c]->apply_helper (buff_, buffsize_ + 1, maxbuffsize_,
func_, arg_);
}
}
...@@ -35,12 +35,28 @@ namespace zmq ...@@ -35,12 +35,28 @@ namespace zmq
trie_t (); trie_t ();
~trie_t (); ~trie_t ();
void add (unsigned char *prefix_, size_t size_); // Add key to the trie. Returns true if this is a new item in the trie
// rather than a duplicate.
bool add (unsigned char *prefix_, size_t size_);
// Remove key from the trie. Returns true if the item is actually
// removed from the trie.
bool rm (unsigned char *prefix_, size_t size_); bool rm (unsigned char *prefix_, size_t size_);
// Check whether particular key is in the trie.
bool check (unsigned char *data_, size_t size_); bool check (unsigned char *data_, size_t size_);
// Apply the function supplied to each subscription in the trie.
void apply (void (*func_) (unsigned char *data_, size_t size_,
void *arg_), void *arg_);
private: private:
void apply_helper (
unsigned char **buff_, size_t buffsize_, size_t maxbuffsize_,
void (*func_) (unsigned char *data_, size_t size_, void *arg_),
void *arg_);
uint32_t refcnt; uint32_t refcnt;
unsigned char min; unsigned char min;
unsigned short count; unsigned short count;
......
...@@ -18,6 +18,8 @@ ...@@ -18,6 +18,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <string.h>
#include "xpub.hpp" #include "xpub.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "err.hpp" #include "err.hpp"
...@@ -37,6 +39,12 @@ void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) ...@@ -37,6 +39,12 @@ void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
{ {
zmq_assert (pipe_); zmq_assert (pipe_);
dist.attach (pipe_); dist.attach (pipe_);
fq.attach (pipe_);
}
void zmq::xpub_t::xread_activated (pipe_t *pipe_)
{
fq.activated (pipe_);
} }
void zmq::xpub_t::xwrite_activated (pipe_t *pipe_) void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
...@@ -46,11 +54,37 @@ void zmq::xpub_t::xwrite_activated (pipe_t *pipe_) ...@@ -46,11 +54,37 @@ void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
void zmq::xpub_t::xterminated (pipe_t *pipe_) void zmq::xpub_t::xterminated (pipe_t *pipe_)
{ {
// Remove the pipe from the trie. If there are topics that nobody
// is interested in anymore, send corresponding unsubscriptions
// upstream.
subscriptions.rm (pipe_, send_unsubscription, this);
dist.terminated (pipe_); dist.terminated (pipe_);
fq.terminated (pipe_);
} }
int zmq::xpub_t::xsend (msg_t *msg_, int flags_) int zmq::xpub_t::xsend (msg_t *msg_, int flags_)
{ {
// First, process any (un)subscriptions from downstream.
msg_t sub;
sub.init ();
while (true) {
// Grab next subscription.
pipe_t *pipe;
int rc = fq.recvpipe (&sub, 0, &pipe);
if (rc != 0 && errno == EAGAIN)
break;
errno_assert (rc == 0);
// Apply the subscription to the trie. If it's not a duplicate,
// store it so that it can be passed to used on next recv call.
if (apply_subscription (&sub, pipe) && options.type != ZMQ_PUB)
pending.push_back (blob_t ((unsigned char*) sub.data (),
sub.size ()));
}
sub.close ();
return dist.send (msg_, flags_); return dist.send (msg_, flags_);
} }
...@@ -61,12 +95,92 @@ bool zmq::xpub_t::xhas_out () ...@@ -61,12 +95,92 @@ bool zmq::xpub_t::xhas_out ()
int zmq::xpub_t::xrecv (msg_t *msg_, int flags_) int zmq::xpub_t::xrecv (msg_t *msg_, int flags_)
{ {
// If there is at least one
if (!pending.empty ()) {
int rc = msg_->close ();
errno_assert (rc == 0);
rc = msg_->init_size (pending.front ().size ());
errno_assert (rc == 0);
memcpy (msg_->data (), pending.front ().data (),
pending.front ().size ());
pending.pop_front ();
return 0;
}
// Grab and apply next subscription.
pipe_t *pipe;
int rc = fq.recvpipe (msg_, 0, &pipe);
if (rc != 0)
return -1;
if (!apply_subscription (msg_, pipe)) {
// TODO: This should be a loop rather!
msg_->close ();
msg_->init ();
errno = EAGAIN; errno = EAGAIN;
return -1; return -1;
}
return 0;
} }
bool zmq::xpub_t::xhas_in () bool zmq::xpub_t::xhas_in ()
{ {
if (!pending.empty ())
return true;
// Even if there are subscriptions in the fair-queuer they may be
// duplicates. Thus, we have to check by hand wheter there is any
// subscription available to pass upstream.
// First, process any (un)subscriptions from downstream.
msg_t sub;
sub.init ();
while (true) {
// Grab next subscription.
pipe_t *pipe;
int rc = fq.recvpipe (&sub, 0, &pipe);
if (rc != 0 && errno == EAGAIN) {
sub.close ();
return false; return false;
}
errno_assert (rc == 0);
// Apply the subscription to the trie. If it's not a duplicate store
// it so that it can be passed to used on next recv call.
if (apply_subscription (&sub, pipe) && options.type != ZMQ_PUB) {
pending.push_back (blob_t ((unsigned char*) sub.data (),
sub.size ()));
sub.close ();
return true;
}
}
}
bool zmq::xpub_t::apply_subscription (msg_t *sub_, pipe_t *pipe_)
{
unsigned char *data = (unsigned char*) sub_->data ();
size_t size = sub_->size ();
zmq_assert (size > 0 && (*data == 0 || *data == 1));
if (*data == 0)
return subscriptions.rm (data + 1, size - 1, pipe_);
else
return subscriptions.add (data + 1, size - 1, pipe_);
}
void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
void *arg_)
{
xpub_t *self = (xpub_t*) arg_;
if (self->options.type != ZMQ_PUB) {
// Place the unsubscription to the queue of pending (un)sunscriptions
// to be retrived by the user later on.
xpub_t *self = (xpub_t*) arg_;
blob_t unsub (size_ + 1, 0);
unsub [0] = 0;
memcpy (&unsub [1], data_, size_);
self->pending.push_back (unsub);
}
} }
...@@ -21,9 +21,14 @@ ...@@ -21,9 +21,14 @@
#ifndef __ZMQ_XPUB_HPP_INCLUDED__ #ifndef __ZMQ_XPUB_HPP_INCLUDED__
#define __ZMQ_XPUB_HPP_INCLUDED__ #define __ZMQ_XPUB_HPP_INCLUDED__
#include <deque>
#include "socket_base.hpp" #include "socket_base.hpp"
#include "mtrie.hpp"
#include "array.hpp" #include "array.hpp"
#include "blob.hpp"
#include "dist.hpp" #include "dist.hpp"
#include "fq.hpp"
namespace zmq namespace zmq
{ {
...@@ -42,14 +47,35 @@ namespace zmq ...@@ -42,14 +47,35 @@ namespace zmq
bool xhas_out (); bool xhas_out ();
int xrecv (class msg_t *msg_, int flags_); int xrecv (class msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
void xread_activated (class pipe_t *pipe_);
void xwrite_activated (class pipe_t *pipe_); void xwrite_activated (class pipe_t *pipe_);
void xterminated (class pipe_t *pipe_); void xterminated (class pipe_t *pipe_);
private: private:
// Applies the subscription to the trie. Return false if it is a
// duplicate.
bool apply_subscription (class msg_t *sub_, class pipe_t *pipe_);
// Function to be applied to the trie to send all the subsciptions
// upstream.
static void send_unsubscription (unsigned char *data_, size_t size_,
void *arg_);
// List of all subscriptions mapped to corresponding pipes.
mtrie_t subscriptions;
// Distributor of messages holding the list of outbound pipes. // Distributor of messages holding the list of outbound pipes.
dist_t dist; dist_t dist;
// Object to fair-queue the subscription requests.
fq_t fq;
// List of pending (un)subscriptions, ie. those that were already
// applied to the trie, but not yet received by the user.
typedef std::deque <blob_t> pending_t;
pending_t pending;
xpub_t (const xpub_t&); xpub_t (const xpub_t&);
const xpub_t &operator = (const xpub_t&); const xpub_t &operator = (const xpub_t&);
}; };
......
...@@ -24,19 +24,30 @@ ...@@ -24,19 +24,30 @@
#include "err.hpp" #include "err.hpp"
zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_) : zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_) socket_base_t (parent_, tid_),
has_message (false),
more (false)
{ {
options.type = ZMQ_XSUB; options.type = ZMQ_XSUB;
int rc = message.init ();
errno_assert (rc == 0);
} }
zmq::xsub_t::~xsub_t () zmq::xsub_t::~xsub_t ()
{ {
int rc = message.close ();
errno_assert (rc == 0);
} }
void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
{ {
zmq_assert (pipe_); zmq_assert (pipe_);
fq.attach (pipe_); fq.attach (pipe_);
dist.attach (pipe_);
// Send all the cached subscriptions to the new upstream peer.
subscriptions.apply (send_subscription, pipe_);
pipe_->flush ();
} }
void zmq::xsub_t::xread_activated (pipe_t *pipe_) void zmq::xsub_t::xread_activated (pipe_t *pipe_)
...@@ -44,20 +55,49 @@ void zmq::xsub_t::xread_activated (pipe_t *pipe_) ...@@ -44,20 +55,49 @@ void zmq::xsub_t::xread_activated (pipe_t *pipe_)
fq.activated (pipe_); fq.activated (pipe_);
} }
void zmq::xsub_t::xwrite_activated (pipe_t *pipe_)
{
dist.activated (pipe_);
}
void zmq::xsub_t::xterminated (pipe_t *pipe_) void zmq::xsub_t::xterminated (pipe_t *pipe_)
{ {
fq.terminated (pipe_); fq.terminated (pipe_);
dist.terminated (pipe_);
} }
int zmq::xsub_t::xsend (msg_t *msg_, int options_) void zmq::xsub_t::xhiccuped (pipe_t *pipe_)
{ {
// TODO: Once we'll send the subscription upstream here. For now // Send all the cached subscriptions to the hiccuped pipe.
// just empty the message. subscriptions.apply (send_subscription, pipe_);
int rc = msg_->close (); pipe_->flush ();
errno_assert (rc == 0); }
rc = msg_->init ();
errno_assert (rc == 0); int zmq::xsub_t::xsend (msg_t *msg_, int flags_)
{
size_t size = msg_->size ();
unsigned char *data = (unsigned char*) msg_->data ();
// Malformed subscriptions.
if (size < 1 || (*data != 0 && *data != 1)) {
errno = EINVAL;
return -1;
}
// Process the subscription.
if (*data == 1) {
if (subscriptions.add (data + 1, size - 1))
return dist.send (msg_, flags_);
else
return 0; return 0;
}
else if (*data == 0) {
if (subscriptions.rm (data + 1, size - 1))
return dist.send (msg_, flags_);
else
return 0;
}
zmq_assert (false);
} }
bool zmq::xsub_t::xhas_out () bool zmq::xsub_t::xhas_out ()
...@@ -66,13 +106,109 @@ bool zmq::xsub_t::xhas_out () ...@@ -66,13 +106,109 @@ bool zmq::xsub_t::xhas_out ()
return true; return true;
} }
int zmq::xsub_t::xrecv (class msg_t *msg_, int flags_) int zmq::xsub_t::xrecv (msg_t *msg_, int flags_)
{ {
return fq.recv (msg_, flags_); // If there's already a message prepared by a previous call to zmq_poll,
// return it straight ahead.
if (has_message) {
int rc = msg_->move (message);
errno_assert (rc == 0);
has_message = false;
more = msg_->flags () & msg_t::more;
return 0;
}
// TODO: This can result in infinite loop in the case of continuous
// stream of non-matching messages which breaks the non-blocking recv
// semantics.
while (true) {
// Get a message using fair queueing algorithm.
int rc = fq.recv (msg_, flags_);
// If there's no message available, return immediately.
// The same when error occurs.
if (rc != 0)
return -1;
// Check whether the message matches at least one subscription.
// Non-initial parts of the message are passed
if (more || match (msg_)) {
more = msg_->flags () & msg_t::more;
return 0;
}
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
while (msg_->flags () & msg_t::more) {
rc = fq.recv (msg_, ZMQ_DONTWAIT);
zmq_assert (rc == 0);
}
}
} }
bool zmq::xsub_t::xhas_in () bool zmq::xsub_t::xhas_in ()
{ {
return fq.has_in (); // There are subsequent parts of the partly-read message available.
if (more)
return true;
// If there's already a message prepared by a previous call to zmq_poll,
// return straight ahead.
if (has_message)
return true;
// TODO: This can result in infinite loop in the case of continuous
// stream of non-matching messages.
while (true) {
// Get a message using fair queueing algorithm.
int rc = fq.recv (&message, ZMQ_DONTWAIT);
// If there's no message available, return immediately.
// The same when error occurs.
if (rc != 0) {
zmq_assert (errno == EAGAIN);
return false;
}
// Check whether the message matches at least one subscription.
if (match (&message)) {
has_message = true;
return true;
}
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
while (message.flags () & msg_t::more) {
rc = fq.recv (&message, ZMQ_DONTWAIT);
zmq_assert (rc == 0);
}
}
}
bool zmq::xsub_t::match (msg_t *msg_)
{
return subscriptions.check ((unsigned char*) msg_->data (), msg_->size ());
} }
void zmq::xsub_t::send_subscription (unsigned char *data_, size_t size_,
void *arg_)
{
pipe_t *pipe = (pipe_t*) arg_;
// Create the subsctription message.
msg_t msg;
int rc = msg.init_size (size_ + 1);
zmq_assert (rc == 0);
unsigned char *data = (unsigned char*) msg.data ();
data [0] = 1;
memcpy (data + 1, data_, size_);
// Send it to the pipe.
bool sent = pipe->write (&msg);
zmq_assert (sent);
msg.close ();
}
...@@ -22,7 +22,10 @@ ...@@ -22,7 +22,10 @@
#define __ZMQ_XSUB_HPP_INCLUDED__ #define __ZMQ_XSUB_HPP_INCLUDED__
#include "socket_base.hpp" #include "socket_base.hpp"
#include "dist.hpp"
#include "fq.hpp" #include "fq.hpp"
#include "trie.hpp"
#include "msg.hpp"
namespace zmq namespace zmq
{ {
...@@ -39,18 +42,43 @@ namespace zmq ...@@ -39,18 +42,43 @@ namespace zmq
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
int xsend (class msg_t *msg_, int options_); int xsend (class msg_t *msg_, int flags_);
bool xhas_out (); bool xhas_out ();
int xrecv (class msg_t *msg_, int flags_); int xrecv (class msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
void xread_activated (class pipe_t *pipe_); void xread_activated (class pipe_t *pipe_);
void xwrite_activated (class pipe_t *pipe_);
void xhiccuped (pipe_t *pipe_);
void xterminated (class pipe_t *pipe_); void xterminated (class pipe_t *pipe_);
private: private:
// Check whether the message matches at least one subscription.
bool match (class msg_t *msg_);
// Function to be applied to the trie to send all the subsciptions
// upstream.
static void send_subscription (unsigned char *data_, size_t size_,
void *arg_);
// Fair queueing object for inbound pipes. // Fair queueing object for inbound pipes.
fq_t fq; fq_t fq;
// Object for distributing the subscriptions upstream.
dist_t dist;
// The repository of subscriptions.
trie_t subscriptions;
// If true, 'message' contains a matching message to return on the
// next recv call.
bool has_message;
msg_t message;
// If true, part of a multipart message was already received, but
// there are following parts still waiting.
bool more;
xsub_t (const xsub_t&); xsub_t (const xsub_t&);
const xsub_t &operator = (const xsub_t&); const xsub_t &operator = (const xsub_t&);
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment