Commit 06d7a447 authored by Martin Hurton's avatar Martin Hurton

Implement flow control for ZMQ_PUB sockets

parent f9c84a1a
...@@ -25,7 +25,8 @@ ...@@ -25,7 +25,8 @@
#include "pipe.hpp" #include "pipe.hpp"
zmq::pub_t::pub_t (class app_thread_t *parent_) : zmq::pub_t::pub_t (class app_thread_t *parent_) :
socket_base_t (parent_) socket_base_t (parent_),
stalled_pipe (NULL)
{ {
options.requires_in = false; options.requires_in = false;
options.requires_out = true; options.requires_out = true;
...@@ -53,6 +54,8 @@ void zmq::pub_t::xdetach_inpipe (class reader_t *pipe_) ...@@ -53,6 +54,8 @@ void zmq::pub_t::xdetach_inpipe (class reader_t *pipe_)
void zmq::pub_t::xdetach_outpipe (class writer_t *pipe_) void zmq::pub_t::xdetach_outpipe (class writer_t *pipe_)
{ {
out_pipes.erase (pipe_); out_pipes.erase (pipe_);
if (pipe_ == stalled_pipe)
stalled_pipe = NULL;
} }
void zmq::pub_t::xkill (class reader_t *pipe_) void zmq::pub_t::xkill (class reader_t *pipe_)
...@@ -67,7 +70,8 @@ void zmq::pub_t::xrevive (class reader_t *pipe_) ...@@ -67,7 +70,8 @@ void zmq::pub_t::xrevive (class reader_t *pipe_)
void zmq::pub_t::xrevive (class writer_t *pipe_) void zmq::pub_t::xrevive (class writer_t *pipe_)
{ {
zmq_not_implemented (); zmq_assert (stalled_pipe = pipe_);
stalled_pipe = NULL;
} }
int zmq::pub_t::xsetsockopt (int option_, const void *optval_, int zmq::pub_t::xsetsockopt (int option_, const void *optval_,
...@@ -91,11 +95,10 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -91,11 +95,10 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
} }
// First check whether all pipes are available for writing. // First check whether all pipes are available for writing.
for (out_pipes_t::size_type i = 0; i != pipes_count; i++) if (!check_write ()) {
if (!out_pipes [i]->check_write ()) { errno = EAGAIN;
errno = EAGAIN; return -1;
return -1; }
}
msg_content_t *content = (msg_content_t*) msg_->content; msg_content_t *content = (msg_content_t*) msg_->content;
...@@ -171,7 +174,22 @@ bool zmq::pub_t::xhas_in () ...@@ -171,7 +174,22 @@ bool zmq::pub_t::xhas_in ()
bool zmq::pub_t::xhas_out () bool zmq::pub_t::xhas_out ()
{ {
// TODO: Reimplement when queue limits are added. return check_write ();
}
bool zmq::pub_t::check_write ()
{
if (stalled_pipe != NULL)
return false;
out_pipes_t::size_type pipes_num = out_pipes.size ();
for (out_pipes_t::size_type i = 0; i < pipes_num; i++) {
if (!out_pipes [i]->check_write ()) {
stalled_pipe = out_pipes [i];
return false;
}
}
return true; return true;
} }
...@@ -54,6 +54,13 @@ namespace zmq ...@@ -54,6 +54,13 @@ namespace zmq
typedef yarray_t <class writer_t> out_pipes_t; typedef yarray_t <class writer_t> out_pipes_t;
out_pipes_t out_pipes; out_pipes_t out_pipes;
// Pointer to the pipe we are waiting for to became writable
// again; NULL if tha last send operation was successful.
class writer_t *stalled_pipe;
// Check whether we can write a message to all pipes.
bool check_write ();
pub_t (const pub_t&); pub_t (const pub_t&);
void operator = (const pub_t&); void operator = (const pub_t&);
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment