Commit 1b79da0d authored by evoskuil's avatar evoskuil

Problem: proxy leaks one message payload (0..n bytes) on close.

parent 4ea7d018
...@@ -143,7 +143,7 @@ int zmq::proxy ( ...@@ -143,7 +143,7 @@ int zmq::proxy (
// Wait while there are either requests or replies to process. // Wait while there are either requests or replies to process.
rc = zmq_poll (&items [0], qt_poll_items, -1); rc = zmq_poll (&items [0], qt_poll_items, -1);
if (unlikely (rc < 0)) if (unlikely (rc < 0))
return -1; return close_and_return (&msg, -1);
// Get the pollout separately because when combining this with pollin it maxes the CPU // Get the pollout separately because when combining this with pollin it maxes the CPU
// because pollout shall most of the time return directly. // because pollout shall most of the time return directly.
...@@ -151,7 +151,7 @@ int zmq::proxy ( ...@@ -151,7 +151,7 @@ int zmq::proxy (
if (frontend_ != backend_) { if (frontend_ != backend_) {
rc = zmq_poll (&itemsout [0], 2, 0); rc = zmq_poll (&itemsout [0], 2, 0);
if (unlikely (rc < 0)) { if (unlikely (rc < 0)) {
return -1; return close_and_return (&msg, -1);
} }
} }
...@@ -159,17 +159,17 @@ int zmq::proxy ( ...@@ -159,17 +159,17 @@ int zmq::proxy (
if (control_ && items [2].revents & ZMQ_POLLIN) { if (control_ && items [2].revents & ZMQ_POLLIN) {
rc = control_->recv (&msg, 0); rc = control_->recv (&msg, 0);
if (unlikely (rc < 0)) if (unlikely (rc < 0))
return -1; return close_and_return (&msg, -1);
moresz = sizeof more; moresz = sizeof more;
rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz); rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
if (unlikely (rc < 0) || more) if (unlikely (rc < 0) || more)
return -1; return close_and_return (&msg, -1);
// Copy message to capture socket if any // Copy message to capture socket if any
rc = capture(capture_, msg); rc = capture (capture_, msg);
if (unlikely (rc < 0)) if (unlikely (rc < 0))
return -1; return close_and_return (&msg, -1);
if (msg.size () == 5 && memcmp (msg.data (), "PAUSE", 5) == 0) if (msg.size () == 5 && memcmp (msg.data (), "PAUSE", 5) == 0)
state = paused; state = paused;
...@@ -180,7 +180,7 @@ int zmq::proxy ( ...@@ -180,7 +180,7 @@ int zmq::proxy (
if (msg.size () == 9 && memcmp (msg.data (), "TERMINATE", 9) == 0) if (msg.size () == 9 && memcmp (msg.data (), "TERMINATE", 9) == 0)
state = terminated; state = terminated;
else { else {
// This is an API error, we should assert // This is an API error, so we assert
puts ("E: invalid command sent to proxy"); puts ("E: invalid command sent to proxy");
zmq_assert (false); zmq_assert (false);
} }
...@@ -189,19 +189,20 @@ int zmq::proxy ( ...@@ -189,19 +189,20 @@ int zmq::proxy (
if (state == active if (state == active
&& items [0].revents & ZMQ_POLLIN && items [0].revents & ZMQ_POLLIN
&& (frontend_ == backend_ || itemsout [1].revents & ZMQ_POLLOUT)) { && (frontend_ == backend_ || itemsout [1].revents & ZMQ_POLLOUT)) {
rc = forward(frontend_, backend_, capture_,msg); rc = forward (frontend_, backend_, capture_, msg);
if (unlikely (rc < 0)) if (unlikely (rc < 0))
return -1; return close_and_return (&msg, -1);
} }
// Process a reply // Process a reply
if (state == active if (state == active
&& frontend_ != backend_ && frontend_ != backend_
&& items [1].revents & ZMQ_POLLIN && items [1].revents & ZMQ_POLLIN
&& itemsout [0].revents & ZMQ_POLLOUT) { && itemsout [0].revents & ZMQ_POLLOUT) {
rc = forward(backend_, frontend_, capture_,msg); rc = forward (backend_, frontend_, capture_, msg);
if (unlikely (rc < 0)) if (unlikely (rc < 0))
return -1; return close_and_return (&msg, -1);
} }
} }
return 0;
return close_and_return (&msg, 0);
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment