Commit c56d797b authored by Christian Kamm's avatar Christian Kamm

REQ sockets drop replies from unasked peers.

* Add lb_t::sendpipe() that returns the pipe that was used for sending,
  similar to fq_t::recvpipe().
* Add forwarder functions to dealer_t to access these two.
* Add logic to req_t to ignore replies on pipes that are not the one
  where the request was sent.
* Enable test in test_spec_req.
parent 524bd7ac
...@@ -28,6 +28,7 @@ Chia-liang Kao <clkao@clkao.org> ...@@ -28,6 +28,7 @@ Chia-liang Kao <clkao@clkao.org>
Chris Rempel <csrl@gmx.com> Chris Rempel <csrl@gmx.com>
Chris Wong <chris@chriswongstudio.com> Chris Wong <chris@chriswongstudio.com>
Christian Gudrian <christian.gudrian@fluidon.com> Christian Gudrian <christian.gudrian@fluidon.com>
Christian Kamm <kamm@incasoftware.de>
Chuck Remes <cremes@mac.com> Chuck Remes <cremes@mac.com>
Conrad D. Steenberg <conrad.steenberg@caltech.edu> Conrad D. Steenberg <conrad.steenberg@caltech.edu>
Dhammika Pathirana <dhammika@gmail.com> Dhammika Pathirana <dhammika@gmail.com>
......
...@@ -80,12 +80,12 @@ int zmq::dealer_t::xsetsockopt (int option_, const void *optval_, ...@@ -80,12 +80,12 @@ int zmq::dealer_t::xsetsockopt (int option_, const void *optval_,
int zmq::dealer_t::xsend (msg_t *msg_) int zmq::dealer_t::xsend (msg_t *msg_)
{ {
return lb.send (msg_); return sendpipe (msg_, NULL);
} }
int zmq::dealer_t::xrecv (msg_t *msg_) int zmq::dealer_t::xrecv (msg_t *msg_)
{ {
return fq.recv (msg_); return recvpipe (msg_, NULL);
} }
bool zmq::dealer_t::xhas_in () bool zmq::dealer_t::xhas_in ()
...@@ -113,3 +113,13 @@ void zmq::dealer_t::xpipe_terminated (pipe_t *pipe_) ...@@ -113,3 +113,13 @@ void zmq::dealer_t::xpipe_terminated (pipe_t *pipe_)
fq.pipe_terminated (pipe_); fq.pipe_terminated (pipe_);
lb.pipe_terminated (pipe_); lb.pipe_terminated (pipe_);
} }
int zmq::dealer_t::sendpipe (msg_t *msg_, pipe_t **pipe_)
{
return lb.sendpipe (msg_, pipe_);
}
int zmq::dealer_t::recvpipe (msg_t *msg_, pipe_t **pipe_)
{
return fq.recvpipe (msg_, pipe_);
}
...@@ -55,6 +55,10 @@ namespace zmq ...@@ -55,6 +55,10 @@ namespace zmq
void xwrite_activated (zmq::pipe_t *pipe_); void xwrite_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_); void xpipe_terminated (zmq::pipe_t *pipe_);
// Send and recv - knowing which pipe was used.
int sendpipe (zmq::msg_t *msg_, zmq::pipe_t **pipe_);
int recvpipe (zmq::msg_t *msg_, zmq::pipe_t **pipe_);
private: private:
// Messages are fair-queued from inbound pipes. And load-balanced to // Messages are fair-queued from inbound pipes. And load-balanced to
......
...@@ -69,6 +69,11 @@ void zmq::lb_t::activated (pipe_t *pipe_) ...@@ -69,6 +69,11 @@ void zmq::lb_t::activated (pipe_t *pipe_)
} }
int zmq::lb_t::send (msg_t *msg_) int zmq::lb_t::send (msg_t *msg_)
{
return sendpipe (msg_, NULL);
}
int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_)
{ {
// Drop the message if required. If we are at the end of the message // Drop the message if required. If we are at the end of the message
// switch back to non-dropping mode. // switch back to non-dropping mode.
...@@ -86,7 +91,11 @@ int zmq::lb_t::send (msg_t *msg_) ...@@ -86,7 +91,11 @@ int zmq::lb_t::send (msg_t *msg_)
while (active > 0) { while (active > 0) {
if (pipes [current]->write (msg_)) if (pipes [current]->write (msg_))
{
if (pipe_)
*pipe_ = pipes [current];
break; break;
}
zmq_assert (!more); zmq_assert (!more);
active--; active--;
...@@ -139,4 +148,3 @@ bool zmq::lb_t::has_out () ...@@ -139,4 +148,3 @@ bool zmq::lb_t::has_out ()
return false; return false;
} }
...@@ -41,6 +41,13 @@ namespace zmq ...@@ -41,6 +41,13 @@ namespace zmq
void pipe_terminated (pipe_t *pipe_); void pipe_terminated (pipe_t *pipe_);
int send (msg_t *msg_); int send (msg_t *msg_);
// Sends a message and stores the pipe that was used in pipe_.
// It is possible for this function to return success but keep pipe_
// unset if the rest of a multipart message to a terminated pipe is
// being dropped. For the first frame, this will never happen.
int sendpipe (msg_t *msg_, pipe_t **pipe_);
bool has_out (); bool has_out ();
private: private:
......
...@@ -27,7 +27,8 @@ ...@@ -27,7 +27,8 @@
zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_, int sid_) : zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
dealer_t (parent_, tid_, sid_), dealer_t (parent_, tid_, sid_),
receiving_reply (false), receiving_reply (false),
message_begins (true) message_begins (true),
reply_pipe (NULL)
{ {
options.type = ZMQ_REQ; options.type = ZMQ_REQ;
} }
...@@ -51,10 +52,28 @@ int zmq::req_t::xsend (msg_t *msg_) ...@@ -51,10 +52,28 @@ int zmq::req_t::xsend (msg_t *msg_)
int rc = bottom.init (); int rc = bottom.init ();
errno_assert (rc == 0); errno_assert (rc == 0);
bottom.set_flags (msg_t::more); bottom.set_flags (msg_t::more);
rc = dealer_t::xsend (&bottom);
reply_pipe = NULL;
rc = dealer_t::sendpipe (&bottom, &reply_pipe);
if (rc != 0) if (rc != 0)
return -1; return -1;
assert (reply_pipe);
message_begins = false; message_begins = false;
// Eat all currently avaliable messages before the request is fully
// sent. This is done to avoid:
// REQ sends request to A, A replies, B replies too.
// A's reply was first and matches, that is used.
// An hour later REQ sends a request to B. B's old reply is used.
msg_t drop;
while (true) {
rc = drop.init ();
errno_assert (rc == 0);
rc = dealer_t::xrecv (&drop);
if (rc != 0)
break;
drop.close ();
}
} }
bool more = msg_->flags () & msg_t::more ? true : false; bool more = msg_->flags () & msg_t::more ? true : false;
...@@ -82,7 +101,7 @@ int zmq::req_t::xrecv (msg_t *msg_) ...@@ -82,7 +101,7 @@ int zmq::req_t::xrecv (msg_t *msg_)
// First part of the reply should be the original request ID. // First part of the reply should be the original request ID.
if (message_begins) { if (message_begins) {
int rc = dealer_t::xrecv (msg_); int rc = recv_reply_pipe (msg_);
if (rc != 0) if (rc != 0)
return rc; return rc;
...@@ -103,7 +122,7 @@ int zmq::req_t::xrecv (msg_t *msg_) ...@@ -103,7 +122,7 @@ int zmq::req_t::xrecv (msg_t *msg_)
message_begins = false; message_begins = false;
} }
int rc = dealer_t::xrecv (msg_); int rc = recv_reply_pipe (msg_);
if (rc != 0) if (rc != 0)
return rc; return rc;
...@@ -134,6 +153,18 @@ bool zmq::req_t::xhas_out () ...@@ -134,6 +153,18 @@ bool zmq::req_t::xhas_out ()
return dealer_t::xhas_out (); return dealer_t::xhas_out ();
} }
int zmq::req_t::recv_reply_pipe (msg_t *msg_)
{
while (true) {
pipe_t *pipe = NULL;
int rc = dealer_t::recvpipe(msg_, &pipe);
if (rc != 0)
return rc;
if (pipe == reply_pipe)
return 0;
}
}
zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_, zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_, socket_base_t *socket_, const options_t &options_,
const address_t *addr_) : const address_t *addr_) :
......
...@@ -44,6 +44,12 @@ namespace zmq ...@@ -44,6 +44,12 @@ namespace zmq
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
protected:
// Receive only from the pipe the request was sent to, discarding
// frames from other pipes.
int recv_reply_pipe (zmq::msg_t *msg_);
private: private:
// If true, request was already sent and reply wasn't received yet or // If true, request was already sent and reply wasn't received yet or
...@@ -54,6 +60,9 @@ namespace zmq ...@@ -54,6 +60,9 @@ namespace zmq
// of the message must be empty message part (backtrace stack bottom). // of the message must be empty message part (backtrace stack bottom).
bool message_begins; bool message_begins;
// The pipe the request was sent to and where the reply is expected.
zmq::pipe_t *reply_pipe;
req_t (const req_t&); req_t (const req_t&);
const req_t &operator = (const req_t&); const req_t &operator = (const req_t&);
}; };
......
...@@ -234,8 +234,7 @@ int main (void) ...@@ -234,8 +234,7 @@ int main (void)
// SHALL accept an incoming message only from the last peer that it sent a // SHALL accept an incoming message only from the last peer that it sent a
// request to. // request to.
// SHALL discard silently any messages received from other peers. // SHALL discard silently any messages received from other peers.
// *** Test disabled until libzmq does this properly *** test_req_only_listens_to_current_peer (ctx);
// test_req_only_listens_to_current_peer (ctx);
} }
int rc = zmq_ctx_term (ctx); int rc = zmq_ctx_term (ctx);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment