Commit f520738a authored by Doron Somech's avatar Doron Somech Committed by GitHub

Merge pull request #2740 from bluca/proxy_stats_frames

Problem: proxy_steerable STATISTICS returns conflated buffers
parents a89d79aa b8695a47
...@@ -161,34 +161,44 @@ int forward ( ...@@ -161,34 +161,44 @@ int forward (
return 0; return 0;
} }
static int loop_and_send_multipart_stat (zmq::socket_base_t *control_,
uint64_t stat, bool first, bool more)
{
int rc;
zmq::msg_t msg;
// VSM of 8 bytes can't fail to init
msg.init_size (sizeof (uint64_t));
memcpy (msg.data (), (const void *)&stat, sizeof (uint64_t));
// if the first message is handled to the pipe successfully then the HWM
// is not full, which means failures are due to interrupts (on Windows pipes
// are TCP sockets), so keep retrying
do {
rc = control_->send (&msg, more ? ZMQ_SNDMORE : 0);
} while (!first && rc != 0 && errno == EAGAIN);
return rc;
}
int reply_stats( int reply_stats(
class zmq::socket_base_t *control_, class zmq::socket_base_t *control_,
zmq_socket_stats_t* frontend_stats, zmq_socket_stats_t* frontend_stats,
zmq_socket_stats_t* backend_stats) zmq_socket_stats_t* backend_stats)
{ {
// first part: frontend stats // first part: frontend stats - the first send might fail due to HWM
if (loop_and_send_multipart_stat (control_, frontend_stats->msg_in, true, true) != 0)
zmq::msg_t stats_msg1, stats_msg2; return -1;
int rc = stats_msg1.init_size (sizeof(zmq_socket_stats_t));
if (unlikely (rc < 0))
return close_and_return (&stats_msg1, -1);
memcpy (stats_msg1.data(), (const void*) frontend_stats, sizeof(zmq_socket_stats_t));
rc = control_->send (&stats_msg1, ZMQ_SNDMORE); loop_and_send_multipart_stat (control_, frontend_stats->bytes_in, false, true);
if (unlikely (rc < 0)) loop_and_send_multipart_stat (control_, frontend_stats->msg_out, false, true);
return close_and_return (&stats_msg1, -1); loop_and_send_multipart_stat (control_, frontend_stats->bytes_out, false, true);
// second part: backend stats // second part: backend stats
loop_and_send_multipart_stat (control_, backend_stats->msg_in, true, true);
rc = stats_msg2.init_size (sizeof(zmq_socket_stats_t)); loop_and_send_multipart_stat (control_, backend_stats->bytes_in, false, true);
if (unlikely (rc < 0)) loop_and_send_multipart_stat (control_, backend_stats->msg_out, false, true);
return close_and_return (&stats_msg2, -1); loop_and_send_multipart_stat (control_, backend_stats->bytes_out, false, false);
memcpy (stats_msg2.data(), (const void*) backend_stats, sizeof(zmq_socket_stats_t));
rc = control_->send (&stats_msg2, 0);
if (unlikely (rc < 0))
return close_and_return (&stats_msg2, -1);
return 0; return 0;
} }
......
...@@ -335,6 +335,28 @@ server_worker (void *ctx) ...@@ -335,6 +335,28 @@ server_worker (void *ctx)
assert (rc == 0); assert (rc == 0);
} }
uint64_t recv_stat (void *sock, bool last)
{
uint64_t res;
zmq_msg_t stats_msg;
int rc = zmq_msg_init (&stats_msg);
assert (rc == 0);
rc = zmq_recvmsg (sock, &stats_msg, 0);
assert (rc == sizeof(uint64_t));
memcpy(&res, zmq_msg_data(&stats_msg), zmq_msg_size(&stats_msg));
rc = zmq_msg_close (&stats_msg);
assert (rc == 0);
int more;
size_t moresz = sizeof more;
rc = zmq_getsockopt (sock, ZMQ_RCVMORE, &more, &moresz);
assert (rc == 0);
assert ((last && !more) || (!last && more));
return res;
}
// Utility function to interrogate the proxy: // Utility function to interrogate the proxy:
void check_proxy_stats(void *control_proxy) void check_proxy_stats(void *control_proxy)
...@@ -346,31 +368,16 @@ void check_proxy_stats(void *control_proxy) ...@@ -346,31 +368,16 @@ void check_proxy_stats(void *control_proxy)
assert (rc == 10); assert (rc == 10);
// first frame of the reply contains FRONTEND stats: // first frame of the reply contains FRONTEND stats:
total_stats.frontend.msg_in = recv_stat (control_proxy, false);
zmq_msg_t stats_msg; total_stats.frontend.bytes_in = recv_stat (control_proxy, false);
rc = zmq_msg_init (&stats_msg); total_stats.frontend.msg_out = recv_stat (control_proxy, false);
assert (rc == 0); total_stats.frontend.bytes_out = recv_stat (control_proxy, false);
rc = zmq_recvmsg (control_proxy, &stats_msg, 0);
assert (rc == sizeof(zmq_socket_stats_t));
memcpy(&total_stats.frontend, zmq_msg_data(&stats_msg), zmq_msg_size(&stats_msg));
// second frame of the reply contains BACKEND stats: // second frame of the reply contains BACKEND stats:
total_stats.backend.msg_in = recv_stat (control_proxy, false);
int more; total_stats.backend.bytes_in = recv_stat (control_proxy, false);
size_t moresz = sizeof more; total_stats.backend.msg_out = recv_stat (control_proxy, false);
rc = zmq_getsockopt (control_proxy, ZMQ_RCVMORE, &more, &moresz); total_stats.backend.bytes_out = recv_stat (control_proxy, true);
assert (rc == 0 && more == 1);
rc = zmq_recvmsg (control_proxy, &stats_msg, 0);
assert (rc == sizeof(zmq_socket_stats_t));
memcpy(&total_stats.backend, zmq_msg_data(&stats_msg), zmq_msg_size(&stats_msg));
rc = zmq_getsockopt (control_proxy, ZMQ_RCVMORE, &more, &moresz);
assert (rc == 0 && more == 0);
// check stats // check stats
...@@ -394,9 +401,6 @@ void check_proxy_stats(void *control_proxy) ...@@ -394,9 +401,6 @@ void check_proxy_stats(void *control_proxy)
assert( total_stats.frontend.msg_out == (unsigned)zmq_atomic_counter_value(g_workers_pkts_out) ); assert( total_stats.frontend.msg_out == (unsigned)zmq_atomic_counter_value(g_workers_pkts_out) );
assert( total_stats.backend.msg_in == (unsigned)zmq_atomic_counter_value(g_workers_pkts_out) ); assert( total_stats.backend.msg_in == (unsigned)zmq_atomic_counter_value(g_workers_pkts_out) );
assert( total_stats.backend.msg_out == (unsigned)zmq_atomic_counter_value(g_clients_pkts_out) ); assert( total_stats.backend.msg_out == (unsigned)zmq_atomic_counter_value(g_clients_pkts_out) );
rc = zmq_msg_close (&stats_msg);
assert (rc == 0);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment