Commit d66c2508 authored by Luca Boccassi's avatar Luca Boccassi Committed by GitHub

Merge pull request #2519 from bjovke/master

Case found not covered in latest zmq::proxy() code.
parents 64807214 b6fb1f64
......@@ -66,13 +66,17 @@
delete poller_receive_blocked;\
delete poller_send_blocked;\
delete poller_both_blocked;\
delete poller_frontend_only;\
delete poller_backend_only\
#define CHECK_RC_EXIT_ON_FAILURE()\
if (rc < 0) {\
PROXY_CLEANUP();\
return close_and_return (&msg, -1);\
}
do {\
if (rc < 0) {\
PROXY_CLEANUP();\
return close_and_return (&msg, -1);\
}\
} while(0)
#endif // ZMQ_HAVE_POLLER
......@@ -173,14 +177,18 @@ int zmq::proxy (
// If frontend_==backend_ 'poller_send_blocked' and 'poller_receive_blocked' are the same, 'ZMQ_POLLIN' is ignored.
// In that case 'poller_send_blocked' is not used. We need only 'poller_receive_blocked'.
// We also don't need 'poller_both_blocked', no need to initialize it.
// We also don't need 'poller_both_blocked', 'poller_backend_only' nor 'poller_frontend_only' no need to initialize it.
// We save some RAM and time for initialization.
zmq::socket_poller_t *poller_send_blocked = NULL; // All except 'ZMQ_POLLIN' on 'backend_'.
zmq::socket_poller_t *poller_both_blocked = NULL; // All except 'ZMQ_POLLIN' on both 'frontend_' and 'backend_'.
zmq::socket_poller_t *poller_frontend_only = NULL; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'frontend_'.
zmq::socket_poller_t *poller_backend_only = NULL; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'backend_'.
if (frontend_ != backend_) {
poller_send_blocked = new (std::nothrow) zmq::socket_poller_t; // All except 'ZMQ_POLLIN' on 'backend_'.
poller_both_blocked = new (std::nothrow) zmq::socket_poller_t; // All except 'ZMQ_POLLIN' on both 'frontend_' and 'backend_'.
poller_frontend_only = new (std::nothrow) zmq::socket_poller_t; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'frontend_'.
poller_backend_only = new (std::nothrow) zmq::socket_poller_t; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'backend_'.
frontend_equal_to_backend = false;
} else
frontend_equal_to_backend = true;
......@@ -222,6 +230,10 @@ int zmq::proxy (
CHECK_RC_EXIT_ON_FAILURE ();
rc = poller_receive_blocked->add (backend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'frontend_'.
CHECK_RC_EXIT_ON_FAILURE ();
rc = poller_frontend_only->add (frontend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT);
CHECK_RC_EXIT_ON_FAILURE ();
rc = poller_backend_only->add (backend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT);
CHECK_RC_EXIT_ON_FAILURE ();
}
// Register 'control_' with pollers.
......@@ -239,6 +251,10 @@ int zmq::proxy (
CHECK_RC_EXIT_ON_FAILURE ();
rc = poller_both_blocked->add (control_, NULL, ZMQ_POLLIN);
CHECK_RC_EXIT_ON_FAILURE ();
rc = poller_frontend_only->add (control_, NULL, ZMQ_POLLIN);
CHECK_RC_EXIT_ON_FAILURE ();
rc = poller_backend_only->add (control_, NULL, ZMQ_POLLIN);
CHECK_RC_EXIT_ON_FAILURE ();
}
}
......@@ -342,34 +358,50 @@ int zmq::proxy (
if (poller_wait == poller_both_blocked)
poller_wait = poller_send_blocked;
else
if (poller_wait == poller_receive_blocked)
if (poller_wait == poller_receive_blocked || poller_wait == poller_frontend_only)
poller_wait = poller_in;
}
if (reply_processed) { // 'backend_' -> 'frontend_'
if (poller_wait == poller_both_blocked)
poller_wait = poller_receive_blocked;
else
if (poller_wait == poller_send_blocked)
if (poller_wait == poller_send_blocked || poller_wait == poller_backend_only)
poller_wait = poller_in;
}
}
} else {
// No requests have been processed, there were no 'ZMQ_POLLOUT' events.
// That means that receiving end queue(s) is/are full.
// Disable receiving 'ZMQ_POLLIN' for sockets for which there's no 'ZMQ_POLLOUT'.
// No requests have been processed, there were no 'ZMQ_POLLIN' with corresponding 'ZMQ_POLLOUT' events.
// That means that out queue(s) is/are full or one out queue is full and second one has no messages to process.
// Disable receiving 'ZMQ_POLLIN' for sockets for which there's no 'ZMQ_POLLOUT',
// or wait only on both 'backend_''s or 'frontend_''s 'ZMQ_POLLIN' and 'ZMQ_POLLOUT'.
if (frontend_in) {
if (poller_wait == poller_send_blocked)
poller_wait = poller_both_blocked;
else
poller_wait = poller_receive_blocked;
if (frontend_out)
// If frontend_in and frontend_out are true, obviously backend_in and backend_out are both false.
// In that case we need to wait for both 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' only on 'backend_'.
// We'll never get here in case of frontend_==backend_ because then frontend_out will always be false.
poller_wait = poller_backend_only;
else {
if (poller_wait == poller_send_blocked)
poller_wait = poller_both_blocked;
else
if (poller_wait == poller_in)
poller_wait = poller_receive_blocked;
}
}
if (backend_in) {
// Will never be reached if frontend_==backend_, 'backend_in' will
// always be false due to design in 'for' event processing loop.
if (poller_wait == poller_receive_blocked)
poller_wait = poller_both_blocked;
else
poller_wait = poller_send_blocked;
if (backend_out)
// If backend_in and backend_out are true, obviously frontend_in and frontend_out are both false.
// In that case we need to wait for both 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' only on 'frontend_'.
poller_wait = poller_frontend_only;
else {
if (poller_wait == poller_receive_blocked)
poller_wait = poller_both_blocked;
else
if (poller_wait == poller_in)
poller_wait = poller_send_blocked;
}
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment