Commit eb7b8a41 authored by Martin Sustrik's avatar Martin Sustrik

REP socket layered on top of XREP socket

parent 3e97c0fe
...@@ -21,175 +21,36 @@ ...@@ -21,175 +21,36 @@
#include "rep.hpp" #include "rep.hpp"
#include "err.hpp" #include "err.hpp"
#include "pipe.hpp"
zmq::rep_t::rep_t (class ctx_t *parent_, uint32_t slot_) : zmq::rep_t::rep_t (class ctx_t *parent_, uint32_t slot_) :
socket_base_t (parent_, slot_), xrep_t (parent_, slot_),
active (0),
current (0),
sending_reply (false), sending_reply (false),
more (false), request_begins (true)
reply_pipe (NULL)
{ {
options.requires_in = true;
options.requires_out = true;
// We don't need immediate connect. We'll be able to send messages
// (replies) only when connection is established and thus requests
// can arrive anyway.
options.immediate_connect = false;
} }
zmq::rep_t::~rep_t () zmq::rep_t::~rep_t ()
{ {
zmq_assert (in_pipes.empty ());
zmq_assert (out_pipes.empty ());
}
void zmq::rep_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && outpipe_);
zmq_assert (in_pipes.size () == out_pipes.size ());
inpipe_->set_event_sink (this);
in_pipes.push_back (inpipe_);
in_pipes.swap (active, in_pipes.size () - 1);
outpipe_->set_event_sink (this);
out_pipes.push_back (outpipe_);
out_pipes.swap (active, out_pipes.size () - 1);
active++;
}
void zmq::rep_t::xterm_pipes ()
{
for (in_pipes_t::size_type i = 0; i != in_pipes.size (); i++)
in_pipes [i]->terminate ();
for (out_pipes_t::size_type i = 0; i != out_pipes.size (); i++)
out_pipes [i]->terminate ();
}
void zmq::rep_t::terminated (reader_t *pipe_)
{
// ???
zmq_assert (sending_reply || !more || in_pipes [current] != pipe_);
zmq_assert (pipe_);
zmq_assert (in_pipes.size () == out_pipes.size ());
in_pipes_t::size_type index = in_pipes.index (pipe_);
if (index < active) {
active--;
if (current == active)
current = 0;
}
in_pipes.erase (index);
// ???
if (!zombie) {
if (out_pipes [index])
out_pipes [index]->terminate ();
out_pipes.erase (index);
}
}
void zmq::rep_t::terminated (writer_t *pipe_)
{
zmq_assert (pipe_);
zmq_assert (in_pipes.size () == out_pipes.size ());
out_pipes_t::size_type index = out_pipes.index (pipe_);
// If the connection we've got the request from disconnects,
// there's nowhere to send the reply. Forget about the reply pipe.
// Once the reply is sent it will be dropped.
if (sending_reply && pipe_ == reply_pipe)
reply_pipe = NULL;
if (out_pipes.index (pipe_) < active) {
active--;
if (current == active)
current = 0;
}
out_pipes.erase (index);
// ???
if (!zombie) {
if (in_pipes [index])
in_pipes [index]->terminate ();
in_pipes.erase (index);
}
}
bool zmq::rep_t::xhas_pipes ()
{
return !in_pipes.empty () || !out_pipes.empty ();
}
void zmq::rep_t::activated (reader_t *pipe_)
{
// Move the pipe to the list of active pipes.
in_pipes_t::size_type index = in_pipes.index (pipe_);
in_pipes.swap (index, active);
out_pipes.swap (index, active);
active++;
}
void zmq::rep_t::activated (writer_t *pipe_)
{
// TODO: What here?
zmq_assert (false);
} }
int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_) int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)
{ {
// If we are in the middle of receiving a request, we cannot send reply.
if (!sending_reply) { if (!sending_reply) {
errno = EFSM; errno = EFSM;
return -1; return -1;
} }
if (reply_pipe) { bool more = (msg_->flags & ZMQ_MSG_MORE);
// Push message to the reply pipe.
bool written = reply_pipe->write (msg_);
zmq_assert (!more || written);
// The pipe is full... // Push message to the reply pipe.
// When this happens, we simply return an error. int rc = xrep_t::xsend (msg_, flags_);
// This makes REP sockets vulnerable to DoS attack when if (rc != 0)
// misbehaving requesters stop collecting replies. return rc;
// TODO: Tear down the underlying connection (?)
if (!written) {
// TODO: The reply socket becomes deactivated here... // If the reply is complete flip the FSM back to request receiving state.
errno = EAGAIN; if (!more)
return -1;
}
more = msg_->flags & ZMQ_MSG_MORE;
}
else {
// If the requester have disconnected in the meantime, drop the reply.
more = msg_->flags & ZMQ_MSG_MORE;
zmq_msg_close (msg_);
}
// Flush the reply to the requester.
if (!more) {
if (reply_pipe)
reply_pipe->flush ();
sending_reply = false; sending_reply = false;
reply_pipe = NULL;
}
// Detach the message from the data buffer.
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return 0; return 0;
} }
...@@ -202,70 +63,44 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_) ...@@ -202,70 +63,44 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
return -1; return -1;
} }
// Deallocate old content of the message. if (request_begins) {
zmq_msg_close (msg_);
// We haven't started reading a request yet... // Copy the backtrace stack to the reply pipe.
if (!more) { bool bottom = false;
while (!bottom) {
// Round-robin over the pipes to get next message. // TODO: What if request can be read but reply pipe is not
int count; // ready for writing?
for (count = active; count != 0; count--) {
if (in_pipes [current]->read (msg_))
break;
// Move the pipe to the list of inactive pipes. // Get next part of the backtrace stack.
active--; int rc = xrep_t::xrecv (msg_, flags_);
in_pipes.swap (current, active); if (rc != 0)
out_pipes.swap (current, active); return rc;
zmq_assert (msg_->flags & ZMQ_MSG_MORE);
// Move to next pipe. // Empty message part delimits the traceback stack.
current++; bottom = (zmq_msg_size (msg_) == 0);
if (current >= active)
current = 0;
}
// No message is available. Initialise the output parameter // Push it to the reply pipe.
// to be a 0-byte message. rc = xrep_t::xsend (msg_, flags_);
if (count == 0) { zmq_assert (rc == 0);
zmq_msg_init (msg_);
errno = EAGAIN;
return -1;
} }
// We are aware of a new message now. Setup the reply pipe. request_begins = false;
reply_pipe = out_pipes [current];
// Copy the routing info to the reply pipe.
while (true) {
// Push message to the reply pipe.
// TODO: What if the pipe is full?
// Tear down the underlying connection?
bool written = reply_pipe->write (msg_);
zmq_assert (written);
// Message part of zero size delimits the traceback stack.
if (zmq_msg_size (msg_) == 0)
break;
// Get next part of the message.
bool fetched = in_pipes [current]->read (msg_);
zmq_assert (fetched);
}
} }
// Now the routing info is processed. Get the first part // Now the routing info is safely stored. Get the first part
// of the message payload and exit. // of the message payload and exit.
bool fetched = in_pipes [current]->read (msg_); int rc = xrep_t::xrecv (msg_, flags_);
zmq_assert (fetched); if (rc != 0)
more = msg_->flags & ZMQ_MSG_MORE; return rc;
if (!more) {
current++; // If whole request is read, flip the FSM to reply-sending state.
if (current >= active) if (!(msg_->flags & ZMQ_MSG_MORE)) {
current = 0;
sending_reply = true; sending_reply = true;
request_begins = true;
} }
return 0; return 0;
} }
...@@ -274,25 +109,7 @@ bool zmq::rep_t::xhas_in () ...@@ -274,25 +109,7 @@ bool zmq::rep_t::xhas_in ()
if (sending_reply) if (sending_reply)
return false; return false;
if (more) return xrep_t::xhas_in ();
return true;
for (int count = active; count != 0; count--) {
if (in_pipes [current]->check_read ())
return !sending_reply;
// Move the pipe to the list of inactive pipes.
active--;
in_pipes.swap (current, active);
out_pipes.swap (current, active);
// Move to the next pipe.
current++;
if (current >= active)
current = 0;
}
return false;
} }
bool zmq::rep_t::xhas_out () bool zmq::rep_t::xhas_out ()
...@@ -300,10 +117,6 @@ bool zmq::rep_t::xhas_out () ...@@ -300,10 +117,6 @@ bool zmq::rep_t::xhas_out ()
if (!sending_reply) if (!sending_reply)
return false; return false;
if (more) return xrep_t::xhas_out ();
return true;
// TODO: No check for write here...
return sending_reply;
} }
...@@ -20,17 +20,12 @@ ...@@ -20,17 +20,12 @@
#ifndef __ZMQ_REP_HPP_INCLUDED__ #ifndef __ZMQ_REP_HPP_INCLUDED__
#define __ZMQ_REP_HPP_INCLUDED__ #define __ZMQ_REP_HPP_INCLUDED__
#include "socket_base.hpp" #include "xrep.hpp"
#include "yarray.hpp"
#include "pipe.hpp"
namespace zmq namespace zmq
{ {
class rep_t : class rep_t : public xrep_t
public socket_base_t,
public i_reader_events,
public i_writer_events
{ {
public: public:
...@@ -38,50 +33,20 @@ namespace zmq ...@@ -38,50 +33,20 @@ namespace zmq
~rep_t (); ~rep_t ();
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
void xterm_pipes ();
bool xhas_pipes ();
int xsend (zmq_msg_t *msg_, int flags_); int xsend (zmq_msg_t *msg_, int flags_);
int xrecv (zmq_msg_t *msg_, int flags_); int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
// i_reader_events interface implementation.
void activated (reader_t *pipe_);
void terminated (reader_t *pipe_);
// i_writer_events interface implementation.
void activated (writer_t *pipe_);
void terminated (writer_t *pipe_);
private: private:
// List in outbound and inbound pipes. Note that the two lists are // If true, we are in process of sending the reply. If false we are
// always in sync. I.e. outpipe with index N communicates with the // in process of receiving a request.
// same session as inpipe with index N.
typedef yarray_t <writer_t> out_pipes_t;
out_pipes_t out_pipes;
typedef yarray_t <reader_t> in_pipes_t;
in_pipes_t in_pipes;
// Number of active inpipes. All the active inpipes are located at the
// beginning of the in_pipes array.
in_pipes_t::size_type active;
// Index of the next inbound pipe to read a request from.
in_pipes_t::size_type current;
// If true, request was already received and reply wasn't completely
// sent yet.
bool sending_reply; bool sending_reply;
// True, if message processed at the moment (either sent or received) // If true, we are starting to receive a request. The beginning
// is processed only partially. // of the request is the backtrace stack.
bool more; bool request_begins;
// Pipe we are going to send reply to.
writer_t *reply_pipe;
rep_t (const rep_t&); rep_t (const rep_t&);
void operator = (const rep_t&); void operator = (const rep_t&);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment