Commit 5d4f6b18 authored by Martin Hurton's avatar Martin Hurton

Implement flow control for ZMQ_P2P sockets

parent f9521c6b
...@@ -47,6 +47,7 @@ void zmq::p2p_t::xattach_pipes (class reader_t *inpipe_, ...@@ -47,6 +47,7 @@ void zmq::p2p_t::xattach_pipes (class reader_t *inpipe_,
zmq_assert (!inpipe && !outpipe); zmq_assert (!inpipe && !outpipe);
inpipe = inpipe_; inpipe = inpipe_;
outpipe = outpipe_; outpipe = outpipe_;
outpipe_alive = true;
} }
void zmq::p2p_t::xdetach_inpipe (class reader_t *pipe_) void zmq::p2p_t::xdetach_inpipe (class reader_t *pipe_)
...@@ -75,7 +76,8 @@ void zmq::p2p_t::xrevive (class reader_t *pipe_) ...@@ -75,7 +76,8 @@ void zmq::p2p_t::xrevive (class reader_t *pipe_)
void zmq::p2p_t::xrevive (class writer_t *pipe_) void zmq::p2p_t::xrevive (class writer_t *pipe_)
{ {
zmq_not_implemented (); zmq_assert (!outpipe_alive);
outpipe_alive = true;
} }
int zmq::p2p_t::xsetsockopt (int option_, const void *optval_, int zmq::p2p_t::xsetsockopt (int option_, const void *optval_,
...@@ -87,13 +89,17 @@ int zmq::p2p_t::xsetsockopt (int option_, const void *optval_, ...@@ -87,13 +89,17 @@ int zmq::p2p_t::xsetsockopt (int option_, const void *optval_,
int zmq::p2p_t::xsend (zmq_msg_t *msg_, int flags_) int zmq::p2p_t::xsend (zmq_msg_t *msg_, int flags_)
{ {
if (!outpipe) { if (outpipe == NULL || !outpipe_alive) {
errno = EAGAIN;
return -1;
}
if (!outpipe->write (msg_)) {
outpipe_alive = false;
errno = EAGAIN; errno = EAGAIN;
return -1; return -1;
} }
bool written = outpipe->write (msg_);
zmq_assert (written);
if (!(flags_ & ZMQ_NOFLUSH)) if (!(flags_ & ZMQ_NOFLUSH))
outpipe->flush (); outpipe->flush ();
...@@ -132,7 +138,10 @@ bool zmq::p2p_t::xhas_in () ...@@ -132,7 +138,10 @@ bool zmq::p2p_t::xhas_in ()
bool zmq::p2p_t::xhas_out () bool zmq::p2p_t::xhas_out ()
{ {
// TODO: Implement this once queue limits are in-place. if (outpipe == NULL || !outpipe_alive)
return true; return false;
outpipe_alive = outpipe->check_write ();
return outpipe_alive;
} }
...@@ -53,6 +53,7 @@ namespace zmq ...@@ -53,6 +53,7 @@ namespace zmq
class writer_t *outpipe; class writer_t *outpipe;
bool alive; bool alive;
bool outpipe_alive;
p2p_t (const p2p_t&); p2p_t (const p2p_t&);
void operator = (const p2p_t&); void operator = (const p2p_t&);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment