Commit 12532c79 authored by Martin Sustrik's avatar Martin Sustrik

O(1) fair-queueing in XREP implemented

Up to now the complexity of fair-queueing in XREP was O(n).
Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent ec81f8fb
......@@ -63,7 +63,8 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
peer (NULL),
sink (NULL),
state (active),
delay (delay_)
delay (delay_),
pipe_id (0)
{
}
......@@ -85,6 +86,16 @@ void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
sink = sink_;
}
void zmq::pipe_t::set_pipe_id (uint32_t id_)
{
pipe_id = id_;
}
uint32_t zmq::pipe_t::get_pipe_id ()
{
return pipe_id;
}
bool zmq::pipe_t::check_read ()
{
if (unlikely (!in_active || (state != active && state != pending)))
......
......@@ -25,6 +25,7 @@
#include "ypipe.hpp"
#include "config.hpp"
#include "object.hpp"
#include "stdint.hpp"
#include "array.hpp"
namespace zmq
......@@ -67,6 +68,10 @@ namespace zmq
// Specifies the object to send events to.
void set_event_sink (i_pipe_events *sink_);
// Pipe endpoint can store an opaque ID to be used by its clients.
void set_pipe_id (uint32_t id_);
uint32_t get_pipe_id ();
// Returns true if there is at least one message to read in the pipe.
bool check_read ();
......@@ -176,6 +181,9 @@ namespace zmq
// asks us to.
bool delay;
// Opaque ID. To be used by the clients, not the pipe itself.
uint32_t pipe_id;
// Returns true if the message is delimiter; false otherwise.
static bool is_delimiter (msg_t &msg_);
......
......@@ -26,7 +26,6 @@
zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_),
current_in (0),
prefetched (false),
more_in (false),
current_out (NULL),
......@@ -34,14 +33,16 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
{
options.type = ZMQ_XREP;
prefetched_msg.init ();
// Start the peer ID sequence from a random point.
generate_random (&next_peer_id, sizeof (next_peer_id));
}
zmq::xrep_t::~xrep_t ()
{
zmq_assert (inpipes.empty ());
zmq_assert (outpipes.empty ());
prefetched_msg.close ();
}
void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
......@@ -68,8 +69,8 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
zmq_assert (ok);
// Add the pipe to the list of inbound pipes.
inpipe_t inpipe = {pipe_, next_peer_id, true};
inpipes.push_back (inpipe);
pipe_->set_pipe_id (next_peer_id);
fq.attach (pipe_);
// Advance next peer ID so that if new connection is dropped shortly after
// its creation we don't accidentally get two subsequent peers with
......@@ -79,20 +80,8 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
void zmq::xrep_t::xterminated (pipe_t *pipe_)
{
for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
++it) {
if (it->pipe == pipe_) {
if ((inpipes_t::size_type) (it - inpipes.begin ()) < current_in)
current_in--;
inpipes.erase (it);
if (current_in >= inpipes.size ())
current_in = 0;
goto clean_outpipes;
}
}
zmq_assert (false);
fq.terminated (pipe_);
clean_outpipes:
for (outpipes_t::iterator it = outpipes.begin ();
it != outpipes.end (); ++it) {
if (it->second.pipe == pipe_) {
......@@ -107,15 +96,7 @@ clean_outpipes:
void zmq::xrep_t::xread_activated (pipe_t *pipe_)
{
for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
++it) {
if (it->pipe == pipe_) {
zmq_assert (!it->active);
it->active = true;
return;
}
}
zmq_assert (false);
fq.activated (pipe_);
}
void zmq::xrep_t::xwrite_activated (pipe_t *pipe_)
......@@ -212,55 +193,30 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
return 0;
}
// Deallocate old content of the message.
int rc = msg_->close ();
errno_assert (rc == 0);
// Get next message part.
pipe_t *pipe;
int rc = fq.recvpipe (msg_, flags_, &pipe);
if (rc != 0)
return -1;
// If we are in the middle of reading a message, just grab next part of it.
// If we are in the middle of reading a message, just return the next part.
if (more_in) {
zmq_assert (inpipes [current_in].active);
bool fetched = inpipes [current_in].pipe->read (msg_);
zmq_assert (fetched);
more_in = msg_->flags () & (msg_t::more | msg_t::label);
if (!more_in) {
current_in++;
if (current_in >= inpipes.size ())
current_in = 0;
}
return 0;
}
// Round-robin over the pipes to get the next message.
for (inpipes_t::size_type count = inpipes.size (); count != 0; count--) {
// Try to fetch new message.
if (inpipes [current_in].active)
prefetched = inpipes [current_in].pipe->read (&prefetched_msg);
// If we have a message, create a prefix and return it to the caller.
if (prefetched) {
int rc = msg_->init_size (4);
// We are at the beginning of a new message. Move the message part we
// have to the prefetched and return the ID of the peer instead.
rc = prefetched_msg.move (*msg_);
errno_assert (rc == 0);
put_uint32 ((unsigned char*) msg_->data (),
inpipes [current_in].peer_id);
prefetched = true;
rc = msg_->close ();
errno_assert (rc == 0);
rc = msg_->init_size (4);
errno_assert (rc == 0);
put_uint32 ((unsigned char*) msg_->data (), pipe->get_pipe_id ());
msg_->set_flags (msg_t::label);
return 0;
}
// If me don't have a message, mark the pipe as passive and
// move to next pipe.
inpipes [current_in].active = false;
current_in++;
if (current_in >= inpipes.size ())
current_in = 0;
}
// No message is available. Initialise the output parameter
// to be a 0-byte message.
rc = msg_->init ();
errno_assert (rc == 0);
errno = EAGAIN;
return -1;
}
int zmq::xrep_t::rollback (void)
......@@ -275,28 +231,9 @@ int zmq::xrep_t::rollback (void)
bool zmq::xrep_t::xhas_in ()
{
// There are subsequent parts of the partly-read message available.
if (prefetched || more_in)
return true;
// Note that messing with current doesn't break the fairness of fair
// queueing algorithm. If there are no messages available current will
// get back to its original value. Otherwise it'll point to the first
// pipe holding messages, skipping only pipes with no messages available.
for (inpipes_t::size_type count = inpipes.size (); count != 0; count--) {
if (inpipes [current_in].active &&
inpipes [current_in].pipe->check_read ())
if (prefetched)
return true;
// If me don't have a message, mark the pipe as passive and
// move to next pipe.
inpipes [current_in].active = false;
current_in++;
if (current_in >= inpipes.size ())
current_in = 0;
}
return false;
return fq.has_in ();
}
bool zmq::xrep_t::xhas_out ()
......
......@@ -22,11 +22,11 @@
#define __ZMQ_XREP_HPP_INCLUDED__
#include <map>
#include <vector>
#include "socket_base.hpp"
#include "stdint.hpp"
#include "msg.hpp"
#include "fq.hpp"
namespace zmq
{
......@@ -58,19 +58,8 @@ namespace zmq
private:
struct inpipe_t
{
class pipe_t *pipe;
uint32_t peer_id;
bool active;
};
// Inbound pipes with the names of corresponging peers.
typedef std::vector <inpipe_t> inpipes_t;
inpipes_t inpipes;
// The pipe we are currently reading from.
inpipes_t::size_type current_in;
// Fair queueing object for inbound pipes.
fq_t fq;
// Have we prefetched a message.
bool prefetched;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment