Commit e4911522 authored by Martin Sustrik's avatar Martin Sustrik

zmq_encoder/decoder are able to add/trim prefixes from messages; fair queueing…

zmq_encoder/decoder are able to add/trim prefixes from messages; fair queueing and load balancing algorithms factorised into separate classes
parent bd792faa
...@@ -66,6 +66,7 @@ libzmq_la_SOURCES = app_thread.hpp \ ...@@ -66,6 +66,7 @@ libzmq_la_SOURCES = app_thread.hpp \
err.hpp \ err.hpp \
fd.hpp \ fd.hpp \
fd_signaler.hpp \ fd_signaler.hpp \
fq.hpp \
i_inout.hpp \ i_inout.hpp \
io_object.hpp \ io_object.hpp \
io_thread.hpp \ io_thread.hpp \
...@@ -75,6 +76,7 @@ libzmq_la_SOURCES = app_thread.hpp \ ...@@ -75,6 +76,7 @@ libzmq_la_SOURCES = app_thread.hpp \
i_poll_events.hpp \ i_poll_events.hpp \
i_signaler.hpp \ i_signaler.hpp \
kqueue.hpp \ kqueue.hpp \
lb.hpp \
msg_content.hpp \ msg_content.hpp \
mutex.hpp \ mutex.hpp \
object.hpp \ object.hpp \
...@@ -126,10 +128,12 @@ libzmq_la_SOURCES = app_thread.hpp \ ...@@ -126,10 +128,12 @@ libzmq_la_SOURCES = app_thread.hpp \
epoll.cpp \ epoll.cpp \
err.cpp \ err.cpp \
fd_signaler.cpp \ fd_signaler.cpp \
fq.cpp \
io_object.cpp \ io_object.cpp \
io_thread.cpp \ io_thread.cpp \
ip.cpp \ ip.cpp \
kqueue.cpp \ kqueue.cpp \
lb.cpp \
object.cpp \ object.cpp \
options.cpp \ options.cpp \
owned.cpp \ owned.cpp \
......
...@@ -24,8 +24,7 @@ ...@@ -24,8 +24,7 @@
#include "pipe.hpp" #include "pipe.hpp"
zmq::downstream_t::downstream_t (class app_thread_t *parent_) : zmq::downstream_t::downstream_t (class app_thread_t *parent_) :
socket_base_t (parent_), socket_base_t (parent_)
current (0)
{ {
options.requires_in = false; options.requires_in = false;
options.requires_out = true; options.requires_out = true;
...@@ -39,7 +38,7 @@ void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_, ...@@ -39,7 +38,7 @@ void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_) class writer_t *outpipe_)
{ {
zmq_assert (!inpipe_ && outpipe_); zmq_assert (!inpipe_ && outpipe_);
pipes.push_back (outpipe_); lb.attach (outpipe_);
} }
void zmq::downstream_t::xdetach_inpipe (class reader_t *pipe_) void zmq::downstream_t::xdetach_inpipe (class reader_t *pipe_)
...@@ -51,7 +50,7 @@ void zmq::downstream_t::xdetach_inpipe (class reader_t *pipe_) ...@@ -51,7 +50,7 @@ void zmq::downstream_t::xdetach_inpipe (class reader_t *pipe_)
void zmq::downstream_t::xdetach_outpipe (class writer_t *pipe_) void zmq::downstream_t::xdetach_outpipe (class writer_t *pipe_)
{ {
zmq_assert (pipe_); zmq_assert (pipe_);
pipes.erase (pipes.index (pipe_)); lb.detach (pipe_);
} }
void zmq::downstream_t::xkill (class reader_t *pipe_) void zmq::downstream_t::xkill (class reader_t *pipe_)
...@@ -76,29 +75,7 @@ int zmq::downstream_t::xsetsockopt (int option_, const void *optval_, ...@@ -76,29 +75,7 @@ int zmq::downstream_t::xsetsockopt (int option_, const void *optval_,
int zmq::downstream_t::xsend (zmq_msg_t *msg_, int flags_) int zmq::downstream_t::xsend (zmq_msg_t *msg_, int flags_)
{ {
// If there are no pipes we cannot send the message. return lb.send (msg_, flags_);
if (pipes.empty ()) {
errno = EAGAIN;
return -1;
}
// Move to the next pipe (load-balancing).
current++;
if (current >= pipes.size ())
current = 0;
// TODO: Implement this once queue limits are in-place.
zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_)));
// Push message to the selected pipe.
pipes [current]->write (msg_);
pipes [current]->flush ();
// Detach the message from the data buffer.
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return 0;
} }
int zmq::downstream_t::xflush () int zmq::downstream_t::xflush ()
...@@ -124,8 +101,7 @@ bool zmq::downstream_t::xhas_in () ...@@ -124,8 +101,7 @@ bool zmq::downstream_t::xhas_in ()
bool zmq::downstream_t::xhas_out () bool zmq::downstream_t::xhas_out ()
{ {
// TODO: Modify this code once pipe limits are in place. return lb.has_out ();
return true;
} }
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
#define __ZMQ_DOWNSTREAM_HPP_INCLUDED__ #define __ZMQ_DOWNSTREAM_HPP_INCLUDED__
#include "socket_base.hpp" #include "socket_base.hpp"
#include "yarray.hpp" #include "lb.hpp"
namespace zmq namespace zmq
{ {
...@@ -48,12 +48,8 @@ namespace zmq ...@@ -48,12 +48,8 @@ namespace zmq
private: private:
// List of outbound pipes. // Load balancer managing the outbound pipes.
typedef yarray_t <class writer_t> pipes_t; lb_t lb;
pipes_t pipes;
// Points to the last pipe that the most recent message was sent to.
pipes_t::size_type current;
downstream_t (const downstream_t&); downstream_t (const downstream_t&);
void operator = (const downstream_t&); void operator = (const downstream_t&);
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../bindings/c/zmq.h"
#include "fq.hpp"
#include "pipe.hpp"
#include "err.hpp"
zmq::fq_t::fq_t () :
active (0),
current (0)
{
}
zmq::fq_t::~fq_t ()
{
for (pipes_t::size_type i = 0; i != pipes.size (); i++)
pipes [i]->term ();
}
void zmq::fq_t::attach (reader_t *pipe_)
{
pipes.push_back (pipe_);
pipes.swap (active, pipes.size () - 1);
active++;
}
void zmq::fq_t::detach (reader_t *pipe_)
{
// Remove the pipe from the list; adjust number of active pipes
// accordingly.
if (pipes.index (pipe_) < active)
active--;
pipes.erase (pipe_);
}
void zmq::fq_t::kill (reader_t *pipe_)
{
// Move the pipe to the list of inactive pipes.
active--;
pipes.swap (pipes.index (pipe_), active);
}
void zmq::fq_t::revive (reader_t *pipe_)
{
// Move the pipe to the list of active pipes.
pipes.swap (pipes.index (pipe_), active);
active++;
}
int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_)
{
// Deallocate old content of the message.
zmq_msg_close (msg_);
// Round-robin over the pipes to get next message.
for (int count = active; count != 0; count--) {
bool fetched = pipes [current]->read (msg_);
current++;
if (current >= active)
current = 0;
if (fetched)
return 0;
}
// No message is available. Initialise the output parameter
// to be a 0-byte message.
zmq_msg_init (msg_);
errno = EAGAIN;
return -1;
}
bool zmq::fq_t::has_in ()
{
// Note that messing with current doesn't break the fairness of fair
// queueing algorithm. If there are no messages available current will
// get back to its original value. Otherwise it'll point to the first
// pipe holding messages, skipping only pipes with no messages available.
for (int count = active; count != 0; count--) {
if (pipes [current]->check_read ())
return true;
current++;
if (current >= active)
current = 0;
}
return false;
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_FQ_HPP_INCLUDED__
#define __ZMQ_FQ_HPP_INCLUDED__
#include "yarray.hpp"
namespace zmq
{
// Class manages a set of inbound pipes. On receive it performs fair
// queueing (RFC970) so that senders gone berserk won't cause denial of
// service for decent senders.
class fq_t
{
public:
fq_t ();
~fq_t ();
void attach (class reader_t *pipe_);
void detach (class reader_t *pipe_);
void kill (class reader_t *pipe_);
void revive (class reader_t *pipe_);
int recv (zmq_msg_t *msg_, int flags_);
bool has_in ();
private:
// Inbound pipes.
typedef yarray_t <class reader_t> pipes_t;
pipes_t pipes;
// Number of active pipes. All the active pipes are located at the
// beginning of the pipes array.
pipes_t::size_type active;
// Index of the next bound pipe to read a message from.
pipes_t::size_type current;
fq_t (const fq_t&);
void operator = (const fq_t&);
};
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../bindings/c/zmq.h"
#include "lb.hpp"
#include "pipe.hpp"
#include "err.hpp"
zmq::lb_t::lb_t () :
active (0),
current (0)
{
}
zmq::lb_t::~lb_t ()
{
for (pipes_t::size_type i = 0; i != pipes.size (); i++)
pipes [i]->term ();
}
void zmq::lb_t::attach (writer_t *pipe_)
{
pipes.push_back (pipe_);
pipes.swap (active, pipes.size () - 1);
active++;
}
void zmq::lb_t::detach (writer_t *pipe_)
{
// Remove the pipe from the list; adjust number of active pipes
// accordingly.
if (pipes.index (pipe_) < active)
active--;
pipes.erase (pipe_);
}
void zmq::lb_t::kill (writer_t *pipe_)
{
// Move the pipe to the list of inactive pipes.
active--;
pipes.swap (pipes.index (pipe_), active);
}
void zmq::lb_t::revive (writer_t *pipe_)
{
// Move the pipe to the list of active pipes.
pipes.swap (pipes.index (pipe_), active);
active++;
}
int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)
{
// If there are no pipes we cannot send the message.
if (pipes.empty ()) {
errno = EAGAIN;
return -1;
}
// Move to the next pipe (load-balancing).
current++;
if (current >= active)
current = 0;
// TODO: Implement this once queue limits are in-place.
zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_)));
// Push message to the selected pipe.
pipes [current]->write (msg_);
pipes [current]->flush ();
// Detach the message from the data buffer.
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return 0;
}
bool zmq::lb_t::has_out ()
{
for (int count = active; count != 0; count--) {
// We should be able to write at least 1-byte message to interrupt
// polling for POLLOUT.
// TODO: Shouldn't we use a saner value here?
if (pipes [current]->check_write (1))
return true;
current++;
if (current >= active)
current = 0;
}
return false;
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_LB_HPP_INCLUDED__
#define __ZMQ_LB_HPP_INCLUDED__
#include "yarray.hpp"
namespace zmq
{
// Class manages a set of outbound pipes. On send it load balances
// messages fairly among the pipes.
class lb_t
{
public:
lb_t ();
~lb_t ();
void attach (class writer_t *pipe_);
void detach (class writer_t *pipe_);
void kill (class writer_t *pipe_);
void revive (class writer_t *pipe_);
int send (zmq_msg_t *msg_, int flags_);
bool has_out ();
private:
// List of outbound pipes.
typedef yarray_t <class writer_t> pipes_t;
pipes_t pipes;
// Number of active pipes. All the active pipes are located at the
// beginning of the pipes array.
pipes_t::size_type active;
// Points to the last pipe that the most recent message was sent to.
pipes_t::size_type current;
lb_t (const lb_t&);
void operator = (const lb_t&);
};
}
#endif
...@@ -171,7 +171,7 @@ void zmq::pgm_receiver_t::in_event () ...@@ -171,7 +171,7 @@ void zmq::pgm_receiver_t::in_event ()
it->second.joined = true; it->second.joined = true;
// Create and connect decoder for joined peer. // Create and connect decoder for joined peer.
it->second.decoder = new zmq_decoder_t (0); it->second.decoder = new zmq_decoder_t (0, NULL, 0);
it->second.decoder->set_inout (inout); it->second.decoder->set_inout (inout);
} }
......
...@@ -35,7 +35,7 @@ ...@@ -35,7 +35,7 @@
zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_, zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
const options_t &options_, const char *session_name_) : const options_t &options_, const char *session_name_) :
io_object_t (parent_), io_object_t (parent_),
encoder (0), encoder (0, false),
pgm_socket (false, options_), pgm_socket (false, options_),
options (options_), options (options_),
session_name (session_name_), session_name (session_name_),
......
...@@ -21,12 +21,9 @@ ...@@ -21,12 +21,9 @@
#include "sub.hpp" #include "sub.hpp"
#include "err.hpp" #include "err.hpp"
#include "pipe.hpp"
zmq::sub_t::sub_t (class app_thread_t *parent_) : zmq::sub_t::sub_t (class app_thread_t *parent_) :
socket_base_t (parent_), socket_base_t (parent_),
active (0),
current (0),
all_count (0) all_count (0)
{ {
options.requires_in = true; options.requires_in = true;
...@@ -35,44 +32,35 @@ zmq::sub_t::sub_t (class app_thread_t *parent_) : ...@@ -35,44 +32,35 @@ zmq::sub_t::sub_t (class app_thread_t *parent_) :
zmq::sub_t::~sub_t () zmq::sub_t::~sub_t ()
{ {
for (in_pipes_t::size_type i = 0; i != in_pipes.size (); i++)
in_pipes [i]->term ();
in_pipes.clear ();
} }
void zmq::sub_t::xattach_pipes (class reader_t *inpipe_, void zmq::sub_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_) class writer_t *outpipe_)
{ {
zmq_assert (!outpipe_); zmq_assert (inpipe_ && !outpipe_);
in_pipes.push_back (inpipe_); fq.attach (inpipe_);
in_pipes.swap (active, in_pipes.size () - 1);
active++;
} }
void zmq::sub_t::xdetach_inpipe (class reader_t *pipe_) void zmq::sub_t::xdetach_inpipe (class reader_t *pipe_)
{ {
if (in_pipes.index (pipe_) < active) zmq_assert (pipe_);
active--; fq.detach (pipe_);
in_pipes.erase (pipe_);
} }
void zmq::sub_t::xdetach_outpipe (class writer_t *pipe_) void zmq::sub_t::xdetach_outpipe (class writer_t *pipe_)
{ {
// SUB socket is read-only thus there should be no outpipes.
zmq_assert (false); zmq_assert (false);
} }
void zmq::sub_t::xkill (class reader_t *pipe_) void zmq::sub_t::xkill (class reader_t *pipe_)
{ {
// Move the pipe to the list of inactive pipes. fq.kill (pipe_);
in_pipes.swap (in_pipes.index (pipe_), active - 1);
active--;
} }
void zmq::sub_t::xrevive (class reader_t *pipe_) void zmq::sub_t::xrevive (class reader_t *pipe_)
{ {
// Move the pipe to the list of active pipes. fq.revive (pipe_);
in_pipes.swap (in_pipes.index (pipe_), active);
active++;
} }
int zmq::sub_t::xsetsockopt (int option_, const void *optval_, int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
...@@ -139,7 +127,7 @@ int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_) ...@@ -139,7 +127,7 @@ int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)
while (true) { while (true) {
// Get a message using fair queueing algorithm. // Get a message using fair queueing algorithm.
int rc = fq (msg_, flags_); int rc = fq.recv (msg_, flags_);
// If there's no message available, return immediately. // If there's no message available, return immediately.
if (rc != 0 && errno == EAGAIN) if (rc != 0 && errno == EAGAIN)
...@@ -176,28 +164,6 @@ int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_) ...@@ -176,28 +164,6 @@ int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)
} }
} }
int zmq::sub_t::fq (zmq_msg_t *msg_, int flags_)
{
// Deallocate old content of the message.
zmq_msg_close (msg_);
// Round-robin over the pipes to get next message.
for (int count = active; count != 0; count--) {
bool fetched = in_pipes [current]->read (msg_);
current++;
if (current >= active)
current = 0;
if (fetched)
return 0;
}
// No message is available. Initialise the output parameter
// to be a 0-byte message.
zmq_msg_init (msg_);
errno = EAGAIN;
return -1;
}
bool zmq::sub_t::xhas_in () bool zmq::sub_t::xhas_in ()
{ {
// TODO: This is more complex as we have to ignore all the messages that // TODO: This is more complex as we have to ignore all the messages that
......
...@@ -24,7 +24,7 @@ ...@@ -24,7 +24,7 @@
#include <string> #include <string>
#include "socket_base.hpp" #include "socket_base.hpp"
#include "yarray.hpp" #include "fq.hpp"
namespace zmq namespace zmq
{ {
...@@ -53,26 +53,15 @@ namespace zmq ...@@ -53,26 +53,15 @@ namespace zmq
private: private:
// Helper function to return one message choosed using // Fair queueing object for inbound pipes.
// fair queueing algorithm. fq_t fq;
int fq (zmq_msg_t *msg_, int flags_);
// Inbound pipes, i.e. those the socket is getting messages from.
typedef yarray_t <class reader_t> in_pipes_t;
in_pipes_t in_pipes;
// Number of active inbound pipes. Active pipes are stored in the
// initial section of the in_pipes array.
in_pipes_t::size_type active;
// Index of the next inbound pipe to read messages from.
in_pipes_t::size_type current;
// Number of active "*" subscriptions. // Number of active "*" subscriptions.
int all_count; int all_count;
// List of all prefix subscriptions.
typedef std::multiset <std::string> subscriptions_t; typedef std::multiset <std::string> subscriptions_t;
// List of all prefix subscriptions.
subscriptions_t prefixes; subscriptions_t prefixes;
// List of all exact match subscriptions. // List of all exact match subscriptions.
......
...@@ -21,12 +21,9 @@ ...@@ -21,12 +21,9 @@
#include "upstream.hpp" #include "upstream.hpp"
#include "err.hpp" #include "err.hpp"
#include "pipe.hpp"
zmq::upstream_t::upstream_t (class app_thread_t *parent_) : zmq::upstream_t::upstream_t (class app_thread_t *parent_) :
socket_base_t (parent_), socket_base_t (parent_)
active (0),
current (0)
{ {
options.requires_in = true; options.requires_in = true;
options.requires_out = false; options.requires_out = false;
...@@ -40,21 +37,13 @@ void zmq::upstream_t::xattach_pipes (class reader_t *inpipe_, ...@@ -40,21 +37,13 @@ void zmq::upstream_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_) class writer_t *outpipe_)
{ {
zmq_assert (inpipe_ && !outpipe_); zmq_assert (inpipe_ && !outpipe_);
fq.attach (inpipe_);
pipes.push_back (inpipe_);
pipes.swap (active, pipes.size () - 1);
active++;
} }
void zmq::upstream_t::xdetach_inpipe (class reader_t *pipe_) void zmq::upstream_t::xdetach_inpipe (class reader_t *pipe_)
{ {
// Remove the pipe from the list; adjust number of active pipes
// accordingly.
zmq_assert (pipe_); zmq_assert (pipe_);
pipes_t::size_type index = pipes.index (pipe_); fq.detach (pipe_);
if (index < active)
active--;
pipes.erase (index);
} }
void zmq::upstream_t::xdetach_outpipe (class writer_t *pipe_) void zmq::upstream_t::xdetach_outpipe (class writer_t *pipe_)
...@@ -65,16 +54,12 @@ void zmq::upstream_t::xdetach_outpipe (class writer_t *pipe_) ...@@ -65,16 +54,12 @@ void zmq::upstream_t::xdetach_outpipe (class writer_t *pipe_)
void zmq::upstream_t::xkill (class reader_t *pipe_) void zmq::upstream_t::xkill (class reader_t *pipe_)
{ {
// Move the pipe to the list of inactive pipes. fq.kill (pipe_);
active--;
pipes.swap (pipes.index (pipe_), active);
} }
void zmq::upstream_t::xrevive (class reader_t *pipe_) void zmq::upstream_t::xrevive (class reader_t *pipe_)
{ {
// Move the pipe to the list of active pipes. fq.revive (pipe_);
pipes.swap (pipes.index (pipe_), active);
active++;
} }
int zmq::upstream_t::xsetsockopt (int option_, const void *optval_, int zmq::upstream_t::xsetsockopt (int option_, const void *optval_,
...@@ -99,41 +84,12 @@ int zmq::upstream_t::xflush () ...@@ -99,41 +84,12 @@ int zmq::upstream_t::xflush ()
int zmq::upstream_t::xrecv (zmq_msg_t *msg_, int flags_) int zmq::upstream_t::xrecv (zmq_msg_t *msg_, int flags_)
{ {
// Deallocate old content of the message. return fq.recv (msg_, flags_);
zmq_msg_close (msg_);
// Round-robin over the pipes to get next message.
for (int count = active; count != 0; count--) {
bool fetched = pipes [current]->read (msg_);
current++;
if (current >= active)
current = 0;
if (fetched)
return 0;
}
// No message is available. Initialise the output parameter
// to be a 0-byte message.
zmq_msg_init (msg_);
errno = EAGAIN;
return -1;
} }
bool zmq::upstream_t::xhas_in () bool zmq::upstream_t::xhas_in ()
{ {
// Note that messing with current doesn't break the fairness of fair return fq.has_in ();
// queueing algorithm. If there are no messages available current will
// get back to its original value. Otherwise it'll point to the first
// pipe holding messages, skipping only pipes with no messages available.
for (int count = active; count != 0; count--) {
if (pipes [current]->check_read ())
return true;
current++;
if (current >= active)
current = 0;
}
return false;
} }
bool zmq::upstream_t::xhas_out () bool zmq::upstream_t::xhas_out ()
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
#define __ZMQ_UPSTREAM_HPP_INCLUDED__ #define __ZMQ_UPSTREAM_HPP_INCLUDED__
#include "socket_base.hpp" #include "socket_base.hpp"
#include "yarray.hpp" #include "fq.hpp"
namespace zmq namespace zmq
{ {
...@@ -48,16 +48,8 @@ namespace zmq ...@@ -48,16 +48,8 @@ namespace zmq
private: private:
// Inbound pipes. // Fair queueing object for inbound pipes.
typedef yarray_t <class reader_t> pipes_t; fq_t fq;
pipes_t pipes;
// Number of active pipes. All the active pipes are located at the
// beginning of the pipes array.
pipes_t::size_type active;
// Index of the next bound pipe to read a message from.
pipes_t::size_type current;
upstream_t (const upstream_t&); upstream_t (const upstream_t&);
void operator = (const upstream_t&); void operator = (const upstream_t&);
......
...@@ -21,7 +21,6 @@ ...@@ -21,7 +21,6 @@
#include "xrep.hpp" #include "xrep.hpp"
#include "err.hpp" #include "err.hpp"
#include "pipe.hpp"
zmq::xrep_t::xrep_t (class app_thread_t *parent_) : zmq::xrep_t::xrep_t (class app_thread_t *parent_) :
socket_base_t (parent_) socket_base_t (parent_)
...@@ -37,12 +36,16 @@ zmq::xrep_t::~xrep_t () ...@@ -37,12 +36,16 @@ zmq::xrep_t::~xrep_t ()
void zmq::xrep_t::xattach_pipes (class reader_t *inpipe_, void zmq::xrep_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_) class writer_t *outpipe_)
{ {
zmq_assert (inpipe_ && outpipe_);
fq.attach (inpipe_);
zmq_assert (false); zmq_assert (false);
} }
void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_) void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_)
{ {
zmq_assert (false); zmq_assert (pipe_);
fq.detach (pipe_);
} }
void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_) void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_)
...@@ -52,12 +55,12 @@ void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_) ...@@ -52,12 +55,12 @@ void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_)
void zmq::xrep_t::xkill (class reader_t *pipe_) void zmq::xrep_t::xkill (class reader_t *pipe_)
{ {
zmq_assert (false); fq.kill (pipe_);
} }
void zmq::xrep_t::xrevive (class reader_t *pipe_) void zmq::xrep_t::xrevive (class reader_t *pipe_)
{ {
zmq_assert (false); fq.revive (pipe_);
} }
int zmq::xrep_t::xsetsockopt (int option_, const void *optval_, int zmq::xrep_t::xsetsockopt (int option_, const void *optval_,
...@@ -81,14 +84,12 @@ int zmq::xrep_t::xflush () ...@@ -81,14 +84,12 @@ int zmq::xrep_t::xflush ()
int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
{ {
zmq_assert (false); return fq.recv (msg_, flags_);
return -1;
} }
bool zmq::xrep_t::xhas_in () bool zmq::xrep_t::xhas_in ()
{ {
zmq_assert (false); return fq.has_in ();
return false;
} }
bool zmq::xrep_t::xhas_out () bool zmq::xrep_t::xhas_out ()
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
#define __ZMQ_XREP_HPP_INCLUDED__ #define __ZMQ_XREP_HPP_INCLUDED__
#include "socket_base.hpp" #include "socket_base.hpp"
#include "yarray.hpp" #include "fq.hpp"
namespace zmq namespace zmq
{ {
...@@ -48,6 +48,9 @@ namespace zmq ...@@ -48,6 +48,9 @@ namespace zmq
private: private:
// Inbound messages are fair-queued.
fq_t fq;
xrep_t (const xrep_t&); xrep_t (const xrep_t&);
void operator = (const xrep_t&); void operator = (const xrep_t&);
}; };
......
...@@ -21,7 +21,6 @@ ...@@ -21,7 +21,6 @@
#include "xreq.hpp" #include "xreq.hpp"
#include "err.hpp" #include "err.hpp"
#include "pipe.hpp"
zmq::xreq_t::xreq_t (class app_thread_t *parent_) : zmq::xreq_t::xreq_t (class app_thread_t *parent_) :
socket_base_t (parent_) socket_base_t (parent_)
...@@ -37,27 +36,31 @@ zmq::xreq_t::~xreq_t () ...@@ -37,27 +36,31 @@ zmq::xreq_t::~xreq_t ()
void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_, void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_) class writer_t *outpipe_)
{ {
zmq_assert (false); zmq_assert (inpipe_ && outpipe_);
fq.attach (inpipe_);
lb.attach (outpipe_);
} }
void zmq::xreq_t::xdetach_inpipe (class reader_t *pipe_) void zmq::xreq_t::xdetach_inpipe (class reader_t *pipe_)
{ {
zmq_assert (false); zmq_assert (pipe_);
fq.detach (pipe_);
} }
void zmq::xreq_t::xdetach_outpipe (class writer_t *pipe_) void zmq::xreq_t::xdetach_outpipe (class writer_t *pipe_)
{ {
zmq_assert (false); zmq_assert (pipe_);
lb.detach (pipe_);
} }
void zmq::xreq_t::xkill (class reader_t *pipe_) void zmq::xreq_t::xkill (class reader_t *pipe_)
{ {
zmq_assert (false); fq.kill (pipe_);
} }
void zmq::xreq_t::xrevive (class reader_t *pipe_) void zmq::xreq_t::xrevive (class reader_t *pipe_)
{ {
zmq_assert (false); fq.revive (pipe_);
} }
int zmq::xreq_t::xsetsockopt (int option_, const void *optval_, int zmq::xreq_t::xsetsockopt (int option_, const void *optval_,
...@@ -69,32 +72,29 @@ int zmq::xreq_t::xsetsockopt (int option_, const void *optval_, ...@@ -69,32 +72,29 @@ int zmq::xreq_t::xsetsockopt (int option_, const void *optval_,
int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_) int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_)
{ {
zmq_assert (false); return lb.send (msg_, flags_);
return -1;
} }
int zmq::xreq_t::xflush () int zmq::xreq_t::xflush ()
{ {
// TODO: Implement flushing.
zmq_assert (false); zmq_assert (false);
return -1; return -1;
} }
int zmq::xreq_t::xrecv (zmq_msg_t *msg_, int flags_) int zmq::xreq_t::xrecv (zmq_msg_t *msg_, int flags_)
{ {
zmq_assert (false); return fq.recv (msg_, flags_);
return -1;
} }
bool zmq::xreq_t::xhas_in () bool zmq::xreq_t::xhas_in ()
{ {
zmq_assert (false); return fq.has_in ();
return false;
} }
bool zmq::xreq_t::xhas_out () bool zmq::xreq_t::xhas_out ()
{ {
zmq_assert (false); return lb.has_out ();
return false;
} }
...@@ -21,7 +21,8 @@ ...@@ -21,7 +21,8 @@
#define __ZMQ_XREQ_HPP_INCLUDED__ #define __ZMQ_XREQ_HPP_INCLUDED__
#include "socket_base.hpp" #include "socket_base.hpp"
#include "yarray.hpp" #include "fq.hpp"
#include "lb.hpp"
namespace zmq namespace zmq
{ {
...@@ -48,6 +49,11 @@ namespace zmq ...@@ -48,6 +49,11 @@ namespace zmq
private: private:
// Messages are fair-queued from inbound pipes. And load-balanced to
// the outbound pipes.
fq_t fq;
lb_t lb;
xreq_t (const xreq_t&); xreq_t (const xreq_t&);
void operator = (const xreq_t&); void operator = (const xreq_t&);
}; };
......
...@@ -17,23 +17,41 @@ ...@@ -17,23 +17,41 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <stdlib.h>
#include <string.h>
#include "zmq_decoder.hpp" #include "zmq_decoder.hpp"
#include "i_inout.hpp" #include "i_inout.hpp"
#include "wire.hpp" #include "wire.hpp"
#include "err.hpp" #include "err.hpp"
zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) : zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_,
void *prefix_, size_t prefix_size_) :
decoder_t <zmq_decoder_t> (bufsize_), decoder_t <zmq_decoder_t> (bufsize_),
destination (NULL) destination (NULL)
{ {
zmq_msg_init (&in_progress); zmq_msg_init (&in_progress);
if (!prefix_) {
prefix = NULL;
prefix_size = 0;
}
else {
prefix = malloc (prefix_size_);
zmq_assert (prefix);
memcpy (prefix, prefix_, prefix_size_);
prefix_size = prefix_size_;
}
// At the beginning, read one byte and go to one_byte_size_ready state. // At the beginning, read one byte and go to one_byte_size_ready state.
next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready); next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready);
} }
zmq::zmq_decoder_t::~zmq_decoder_t () zmq::zmq_decoder_t::~zmq_decoder_t ()
{ {
if (prefix)
free (prefix);
zmq_msg_close (&in_progress); zmq_msg_close (&in_progress);
} }
...@@ -55,11 +73,15 @@ bool zmq::zmq_decoder_t::one_byte_size_ready () ...@@ -55,11 +73,15 @@ bool zmq::zmq_decoder_t::one_byte_size_ready ()
// in_progress is initialised at this point so in theory we should // in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte // close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised... // message and thus we can treat it as uninitialised...
int rc = zmq_msg_init_size (&in_progress, *tmpbuf); int rc = zmq_msg_init_size (&in_progress, prefix_size + *tmpbuf);
errno_assert (rc == 0); errno_assert (rc == 0);
next_step (zmq_msg_data (&in_progress), *tmpbuf, // Fill in the message prefix if any.
&zmq_decoder_t::message_ready); if (prefix)
memcpy (zmq_msg_data (&in_progress), prefix, prefix_size);
next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size,
*tmpbuf, &zmq_decoder_t::message_ready);
} }
return true; return true;
} }
...@@ -74,11 +96,15 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready () ...@@ -74,11 +96,15 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready ()
// in_progress is initialised at this point so in theory we should // in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte // close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised... // message and thus we can treat it as uninitialised...
int rc = zmq_msg_init_size (&in_progress, size); int rc = zmq_msg_init_size (&in_progress, prefix_size + size);
errno_assert (rc == 0); errno_assert (rc == 0);
next_step (zmq_msg_data (&in_progress), size, // Fill in the message prefix if any.
&zmq_decoder_t::message_ready); if (prefix)
memcpy (zmq_msg_data (&in_progress), prefix, prefix_size);
next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size ,
size, &zmq_decoder_t::message_ready);
return true; return true;
} }
......
...@@ -32,7 +32,9 @@ namespace zmq ...@@ -32,7 +32,9 @@ namespace zmq
{ {
public: public:
zmq_decoder_t (size_t bufsize_); // If prefix is not NULL, it will be glued to the beginning of every
// decoded message.
zmq_decoder_t (size_t bufsize_, void *prefix_, size_t prefix_size_);
~zmq_decoder_t (); ~zmq_decoder_t ();
void set_inout (struct i_inout *destination_); void set_inout (struct i_inout *destination_);
...@@ -47,6 +49,9 @@ namespace zmq ...@@ -47,6 +49,9 @@ namespace zmq
unsigned char tmpbuf [8]; unsigned char tmpbuf [8];
::zmq_msg_t in_progress; ::zmq_msg_t in_progress;
void *prefix;
size_t prefix_size;
zmq_decoder_t (const zmq_decoder_t&); zmq_decoder_t (const zmq_decoder_t&);
void operator = (const zmq_decoder_t&); void operator = (const zmq_decoder_t&);
}; };
......
...@@ -21,9 +21,10 @@ ...@@ -21,9 +21,10 @@
#include "i_inout.hpp" #include "i_inout.hpp"
#include "wire.hpp" #include "wire.hpp"
zmq::zmq_encoder_t::zmq_encoder_t (size_t bufsize_) : zmq::zmq_encoder_t::zmq_encoder_t (size_t bufsize_, bool trim_prefix_) :
encoder_t <zmq_encoder_t> (bufsize_), encoder_t <zmq_encoder_t> (bufsize_),
source (NULL) source (NULL),
trim_prefix (trim_prefix_)
{ {
zmq_msg_init (&in_progress); zmq_msg_init (&in_progress);
...@@ -44,8 +45,16 @@ void zmq::zmq_encoder_t::set_inout (i_inout *source_) ...@@ -44,8 +45,16 @@ void zmq::zmq_encoder_t::set_inout (i_inout *source_)
bool zmq::zmq_encoder_t::size_ready () bool zmq::zmq_encoder_t::size_ready ()
{ {
// Write message body into the buffer. // Write message body into the buffer.
if (!trim_prefix) {
next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress),
&zmq_encoder_t::message_ready, false); &zmq_encoder_t::message_ready, false);
}
else {
size_t prefix_size = *(unsigned char*) zmq_msg_data (&in_progress);
next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size,
zmq_msg_size (&in_progress) - prefix_size,
&zmq_encoder_t::message_ready, false);
}
return true; return true;
} }
...@@ -63,7 +72,11 @@ bool zmq::zmq_encoder_t::message_ready () ...@@ -63,7 +72,11 @@ bool zmq::zmq_encoder_t::message_ready ()
return false; return false;
} }
// Get the message size. If the prefix is not to be sent, adjust the
// size accordingly.
size_t size = zmq_msg_size (&in_progress); size_t size = zmq_msg_size (&in_progress);
if (trim_prefix)
size -= *(unsigned char*) zmq_msg_data (&in_progress);
// For messages less than 255 bytes long, write one byte of message size. // For messages less than 255 bytes long, write one byte of message size.
// For longer messages write 0xff escape character followed by 8-byte // For longer messages write 0xff escape character followed by 8-byte
......
...@@ -32,7 +32,7 @@ namespace zmq ...@@ -32,7 +32,7 @@ namespace zmq
{ {
public: public:
zmq_encoder_t (size_t bufsize_); zmq_encoder_t (size_t bufsize_, bool trim_prefix_);
~zmq_encoder_t (); ~zmq_encoder_t ();
void set_inout (struct i_inout *source_); void set_inout (struct i_inout *source_);
...@@ -46,6 +46,8 @@ namespace zmq ...@@ -46,6 +46,8 @@ namespace zmq
::zmq_msg_t in_progress; ::zmq_msg_t in_progress;
unsigned char tmpbuf [9]; unsigned char tmpbuf [9];
bool trim_prefix;
zmq_encoder_t (const zmq_encoder_t&); zmq_encoder_t (const zmq_encoder_t&);
void operator = (const zmq_encoder_t&); void operator = (const zmq_encoder_t&);
}; };
......
...@@ -29,10 +29,10 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_, ...@@ -29,10 +29,10 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_,
io_object_t (parent_), io_object_t (parent_),
inpos (NULL), inpos (NULL),
insize (0), insize (0),
decoder (in_batch_size), decoder (in_batch_size, NULL, 0),
outpos (NULL), outpos (NULL),
outsize (0), outsize (0),
encoder (out_batch_size), encoder (out_batch_size, false),
inout (NULL), inout (NULL),
options (options_) options (options_)
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment