Commit 4be95134 authored by f18m's avatar f18m Committed by Luca Boccassi

Add "STATISTICS" command to zmq_proxy_steerable() (#2737)

* Issue #2736: Add STATISTICS command to zmq_proxy_steerable()
parent f4b32aa7
...@@ -82,6 +82,21 @@ ...@@ -82,6 +82,21 @@
#endif // ZMQ_HAVE_POLLER #endif // ZMQ_HAVE_POLLER
// Control socket messages
typedef struct
{
uint64_t msg_in;
uint64_t bytes_in;
uint64_t msg_out;
uint64_t bytes_out;
} zmq_socket_stats_t;
// Utility functions
int capture ( int capture (
class zmq::socket_base_t *capture_, class zmq::socket_base_t *capture_,
zmq::msg_t& msg_, zmq::msg_t& msg_,
...@@ -104,18 +119,21 @@ int capture ( ...@@ -104,18 +119,21 @@ int capture (
} }
int forward ( int forward (
class zmq::socket_base_t *from_, class zmq::socket_base_t *from_, zmq_socket_stats_t* from_stats,
class zmq::socket_base_t *to_, class zmq::socket_base_t *to_, zmq_socket_stats_t* to_stats,
class zmq::socket_base_t *capture_, class zmq::socket_base_t *capture_,
zmq::msg_t& msg_) zmq::msg_t& msg_)
{ {
int more; int more;
size_t moresz; size_t moresz;
size_t complete_msg_size = 0;
while (true) { while (true) {
int rc = from_->recv (&msg_, 0); int rc = from_->recv (&msg_, 0);
if (unlikely (rc < 0)) if (unlikely (rc < 0))
return -1; return -1;
complete_msg_size += msg_.size();
moresz = sizeof more; moresz = sizeof more;
rc = from_->getsockopt (ZMQ_RCVMORE, &more, &moresz); rc = from_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
if (unlikely (rc < 0)) if (unlikely (rc < 0))
...@@ -129,12 +147,53 @@ int forward ( ...@@ -129,12 +147,53 @@ int forward (
rc = to_->send (&msg_, more ? ZMQ_SNDMORE : 0); rc = to_->send (&msg_, more ? ZMQ_SNDMORE : 0);
if (unlikely (rc < 0)) if (unlikely (rc < 0))
return -1; return -1;
if (more == 0) if (more == 0)
break; break;
} }
// A multipart message counts as 1 packet:
from_stats->msg_in++;
from_stats->bytes_in += complete_msg_size;
to_stats->msg_out++;
to_stats->bytes_out += complete_msg_size;
return 0;
}
int reply_stats(
class zmq::socket_base_t *control_,
zmq_socket_stats_t* frontend_stats,
zmq_socket_stats_t* backend_stats)
{
// first part: frontend stats
zmq::msg_t stats_msg1, stats_msg2;
int rc = stats_msg1.init_size (sizeof(zmq_socket_stats_t));
if (unlikely (rc < 0))
return close_and_return (&stats_msg1, -1);
memcpy (stats_msg1.data(), (const void*) frontend_stats, sizeof(zmq_socket_stats_t));
rc = control_->send (&stats_msg1, ZMQ_SNDMORE);
if (unlikely (rc < 0))
return close_and_return (&stats_msg1, -1);
// second part: backend stats
rc = stats_msg2.init_size (sizeof(zmq_socket_stats_t));
if (unlikely (rc < 0))
return close_and_return (&stats_msg2, -1);
memcpy (stats_msg2.data(), (const void*) backend_stats, sizeof(zmq_socket_stats_t));
rc = control_->send (&stats_msg2, 0);
if (unlikely (rc < 0))
return close_and_return (&stats_msg2, -1);
return 0; return 0;
} }
#ifdef ZMQ_HAVE_POLLER #ifdef ZMQ_HAVE_POLLER
int zmq::proxy ( int zmq::proxy (
...@@ -168,6 +227,10 @@ int zmq::proxy ( ...@@ -168,6 +227,10 @@ int zmq::proxy (
bool backend_out = false; bool backend_out = false;
bool control_in = false; bool control_in = false;
zmq::socket_poller_t::event_t events [3]; zmq::socket_poller_t::event_t events [3];
zmq_socket_stats_t frontend_stats;
zmq_socket_stats_t backend_stats;
memset(&frontend_stats, 0, sizeof(frontend_stats));
memset(&backend_stats, 0, sizeof(backend_stats));
// Don't allocate these pollers from stack because they will take more than 900 kB of stack! // Don't allocate these pollers from stack because they will take more than 900 kB of stack!
// On Windows this blows up default stack of 1 MB and aborts the program. // On Windows this blows up default stack of 1 MB and aborts the program.
...@@ -322,12 +385,19 @@ int zmq::proxy ( ...@@ -322,12 +385,19 @@ int zmq::proxy (
} else { } else {
if (msg.size () == 9 && memcmp (msg.data (), "TERMINATE", 9) == 0) if (msg.size () == 9 && memcmp (msg.data (), "TERMINATE", 9) == 0)
state = terminated; state = terminated;
else {
if (msg.size () == 10 && memcmp (msg.data (), "STATISTICS", 10) == 0)
{
rc = reply_stats(control_, &frontend_stats, &backend_stats);
CHECK_RC_EXIT_ON_FAILURE ();
}
else { else {
// This is an API error, we assert // This is an API error, we assert
puts ("E: invalid command sent to proxy"); puts ("E: invalid command sent to proxy");
zmq_assert (false); zmq_assert (false);
} }
} }
}
control_in = false; control_in = false;
} }
...@@ -336,7 +406,7 @@ int zmq::proxy ( ...@@ -336,7 +406,7 @@ int zmq::proxy (
// Process a request, 'ZMQ_POLLIN' on 'frontend_' and 'ZMQ_POLLOUT' on 'backend_'. // Process a request, 'ZMQ_POLLIN' on 'frontend_' and 'ZMQ_POLLOUT' on 'backend_'.
// In case of frontend_==backend_ there's no 'ZMQ_POLLOUT' event. // In case of frontend_==backend_ there's no 'ZMQ_POLLOUT' event.
if (frontend_in && (backend_out || frontend_equal_to_backend)) { if (frontend_in && (backend_out || frontend_equal_to_backend)) {
rc = forward (frontend_, backend_, capture_, msg); rc = forward (frontend_, &frontend_stats, backend_, &backend_stats, capture_, msg);
CHECK_RC_EXIT_ON_FAILURE (); CHECK_RC_EXIT_ON_FAILURE ();
request_processed = true; request_processed = true;
frontend_in = backend_out = false; frontend_in = backend_out = false;
...@@ -347,7 +417,7 @@ int zmq::proxy ( ...@@ -347,7 +417,7 @@ int zmq::proxy (
// covers all of the cases. 'backend_in' is always false if frontend_==backend_ due to // covers all of the cases. 'backend_in' is always false if frontend_==backend_ due to
// design in 'for' event processing loop. // design in 'for' event processing loop.
if (backend_in && frontend_out) { if (backend_in && frontend_out) {
rc = forward (backend_, frontend_, capture_, msg); rc = forward (backend_, &backend_stats, frontend_, &frontend_stats, capture_, msg);
CHECK_RC_EXIT_ON_FAILURE (); CHECK_RC_EXIT_ON_FAILURE ();
reply_processed = true; reply_processed = true;
backend_in = frontend_out = false; backend_in = frontend_out = false;
...@@ -443,6 +513,11 @@ int zmq::proxy ( ...@@ -443,6 +513,11 @@ int zmq::proxy (
{ backend_, 0, ZMQ_POLLOUT, 0 } { backend_, 0, ZMQ_POLLOUT, 0 }
}; };
zmq_socket_stats_t frontend_stats;
memset(&frontend_stats, 0, sizeof(frontend_stats));
zmq_socket_stats_t backend_stats;
memset(&backend_stats, 0, sizeof(backend_stats));
// Proxy can be in these three states // Proxy can be in these three states
enum { enum {
active, active,
...@@ -491,16 +566,24 @@ int zmq::proxy ( ...@@ -491,16 +566,24 @@ int zmq::proxy (
if (msg.size () == 9 && memcmp (msg.data (), "TERMINATE", 9) == 0) if (msg.size () == 9 && memcmp (msg.data (), "TERMINATE", 9) == 0)
state = terminated; state = terminated;
else { else {
// This is an API error, so we assert if (msg.size () == 10 && memcmp (msg.data (), "STATISTICS", 10) == 0)
{
rc = reply_stats(control_, &frontend_stats, &backend_stats);
if (unlikely (rc < 0))
return close_and_return (&msg, -1);
}
else {
// This is an API error, we assert
puts ("E: invalid command sent to proxy"); puts ("E: invalid command sent to proxy");
zmq_assert (false); zmq_assert (false);
} }
} }
}
// Process a request // Process a request
if (state == active if (state == active
&& items [0].revents & ZMQ_POLLIN && items [0].revents & ZMQ_POLLIN
&& (frontend_ == backend_ || itemsout [1].revents & ZMQ_POLLOUT)) { && (frontend_ == backend_ || itemsout [1].revents & ZMQ_POLLOUT)) {
rc = forward (frontend_, backend_, capture_, msg); rc = forward (frontend_, &frontend_stats, backend_, &backend_stats, capture_, msg);
if (unlikely (rc < 0)) if (unlikely (rc < 0))
return close_and_return (&msg, -1); return close_and_return (&msg, -1);
} }
...@@ -509,7 +592,7 @@ int zmq::proxy ( ...@@ -509,7 +592,7 @@ int zmq::proxy (
&& frontend_ != backend_ && frontend_ != backend_
&& items [1].revents & ZMQ_POLLIN && items [1].revents & ZMQ_POLLIN
&& itemsout [0].revents & ZMQ_POLLOUT) { && itemsout [0].revents & ZMQ_POLLOUT) {
rc = forward (backend_, frontend_, capture_, msg); rc = forward (backend_, &backend_stats, frontend_, &frontend_stats, capture_, msg);
if (unlikely (rc < 0)) if (unlikely (rc < 0))
return close_and_return (&msg, -1); return close_and_return (&msg, -1);
} }
......
...@@ -55,6 +55,24 @@ struct thread_data { ...@@ -55,6 +55,24 @@ struct thread_data {
int id; int id;
}; };
typedef struct
{
uint64_t msg_in;
uint64_t bytes_in;
uint64_t msg_out;
uint64_t bytes_out;
} zmq_socket_stats_t;
typedef struct
{
zmq_socket_stats_t frontend;
zmq_socket_stats_t backend;
} zmq_proxy_stats_t;
void *g_clients_pkts_out = NULL;
void *g_workers_pkts_out = NULL;
static void static void
client_task (void *db) client_task (void *db)
{ {
...@@ -100,6 +118,7 @@ client_task (void *db) ...@@ -100,6 +118,7 @@ client_task (void *db)
zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 }, { control, 0, ZMQ_POLLIN, 0 } }; zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 }, { control, 0, ZMQ_POLLIN, 0 } };
int request_nbr = 0; int request_nbr = 0;
bool run = true; bool run = true;
bool keep_sending = true;
while (run) { while (run) {
// Tick once per 200 ms, pulling in arriving messages // Tick once per 200 ms, pulling in arriving messages
int centitick; int centitick;
...@@ -119,17 +138,33 @@ client_task (void *db) ...@@ -119,17 +138,33 @@ client_task (void *db)
} }
if (items [1].revents & ZMQ_POLLIN) { if (items [1].revents & ZMQ_POLLIN) {
rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0); rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0);
if (rc > 0)
{
content[rc] = 0; // NULL-terminate the command string
if (is_verbose) printf("client receive - identity = %s command = %s\n", identity, content); if (is_verbose) printf("client receive - identity = %s command = %s\n", identity, content);
if (memcmp (content, "TERMINATE", 9) == 0) { if (memcmp (content, "TERMINATE", 9) == 0) {
run = false; run = false;
break; break;
} }
if (memcmp (content, "STOP", 4) == 0) {
keep_sending = false;
break;
}
}
} }
} }
if (keep_sending)
{
sprintf(content, "request #%03d", ++request_nbr); // CONTENT_SIZE sprintf(content, "request #%03d", ++request_nbr); // CONTENT_SIZE
if (is_verbose) printf("client send - identity = %s request #%03d\n", identity, request_nbr);
zmq_atomic_counter_inc(g_clients_pkts_out);
rc = zmq_send (client, content, CONTENT_SIZE, 0); rc = zmq_send (client, content, CONTENT_SIZE, 0);
assert (rc == CONTENT_SIZE); assert (rc == CONTENT_SIZE);
} }
}
rc = zmq_close (client); rc = zmq_close (client);
assert (rc == 0); assert (rc == 0);
...@@ -173,13 +208,11 @@ server_task (void *ctx) ...@@ -173,13 +208,11 @@ server_task (void *ctx)
assert (rc == 0); assert (rc == 0);
// Control socket receives terminate command from main over inproc // Control socket receives terminate command from main over inproc
void *control = zmq_socket (ctx, ZMQ_SUB); void *control = zmq_socket (ctx, ZMQ_REP);
assert (control); assert (control);
rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0);
assert (rc == 0);
rc = zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger)); rc = zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger));
assert (rc == 0); assert (rc == 0);
rc = zmq_connect (control, "inproc://control"); rc = zmq_connect (control, "inproc://control_proxy");
assert (rc == 0); assert (rc == 0);
// Launch pool of worker threads, precise number is not critical // Launch pool of worker threads, precise number is not critical
...@@ -255,13 +288,17 @@ server_worker (void *ctx) ...@@ -255,13 +288,17 @@ server_worker (void *ctx)
char identity [ID_SIZE_MAX]; // the size received is the size sent char identity [ID_SIZE_MAX]; // the size received is the size sent
bool run = true; bool run = true;
bool keep_sending = true;
while (run) { while (run) {
rc = zmq_recv (control, content, CONTENT_SIZE_MAX, ZMQ_DONTWAIT); // usually, rc == -1 (no message) rc = zmq_recv (control, content, CONTENT_SIZE_MAX, ZMQ_DONTWAIT); // usually, rc == -1 (no message)
if (rc > 0) { if (rc > 0) {
content[rc] = 0; // NULL-terminate the command string
if (is_verbose) if (is_verbose)
printf("server_worker receives command = %s\n", content); printf("server_worker receives command = %s\n", content);
if (memcmp (content, "TERMINATE", 9) == 0) if (memcmp (content, "TERMINATE", 9) == 0)
run = false; run = false;
if (memcmp (content, "STOP", 4) == 0)
keep_sending = false;
} }
// The DEALER socket gives us the reply envelope and message // The DEALER socket gives us the reply envelope and message
// if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0 // if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0
...@@ -273,11 +310,17 @@ server_worker (void *ctx) ...@@ -273,11 +310,17 @@ server_worker (void *ctx)
printf ("server receive - identity = %s content = %s\n", identity, content); printf ("server receive - identity = %s content = %s\n", identity, content);
// Send 0..4 replies back // Send 0..4 replies back
if (keep_sending)
{
int reply, replies = rand() % 5; int reply, replies = rand() % 5;
for (reply = 0; reply < replies; reply++) { for (reply = 0; reply < replies; reply++) {
// Sleep for some fraction of a second // Sleep for some fraction of a second
msleep (rand () % 10 + 1); msleep (rand () % 10 + 1);
// Send message from server to client // Send message from server to client
if (is_verbose) printf("server send - identity = %s reply\n", identity);
zmq_atomic_counter_inc(g_workers_pkts_out);
rc = zmq_send (worker, identity, ID_SIZE, ZMQ_SNDMORE); rc = zmq_send (worker, identity, ID_SIZE, ZMQ_SNDMORE);
assert (rc == ID_SIZE); assert (rc == ID_SIZE);
rc = zmq_send (worker, content, CONTENT_SIZE, 0); rc = zmq_send (worker, content, CONTENT_SIZE, 0);
...@@ -285,12 +328,75 @@ server_worker (void *ctx) ...@@ -285,12 +328,75 @@ server_worker (void *ctx)
} }
} }
} }
}
rc = zmq_close (worker); rc = zmq_close (worker);
assert (rc == 0); assert (rc == 0);
rc = zmq_close (control); rc = zmq_close (control);
assert (rc == 0); assert (rc == 0);
} }
// Utility function to interrogate the proxy:
void check_proxy_stats(void *control_proxy)
{
zmq_proxy_stats_t total_stats;
int rc;
rc = zmq_send (control_proxy, "STATISTICS", 10, 0);
assert (rc == 10);
// first frame of the reply contains FRONTEND stats:
zmq_msg_t stats_msg;
rc = zmq_msg_init (&stats_msg);
assert (rc == 0);
rc = zmq_recvmsg (control_proxy, &stats_msg, 0);
assert (rc == sizeof(zmq_socket_stats_t));
memcpy(&total_stats.frontend, zmq_msg_data(&stats_msg), zmq_msg_size(&stats_msg));
// second frame of the reply contains BACKEND stats:
int more;
size_t moresz = sizeof more;
rc = zmq_getsockopt (control_proxy, ZMQ_RCVMORE, &more, &moresz);
assert (rc == 0 && more == 1);
rc = zmq_recvmsg (control_proxy, &stats_msg, 0);
assert (rc == sizeof(zmq_socket_stats_t));
memcpy(&total_stats.backend, zmq_msg_data(&stats_msg), zmq_msg_size(&stats_msg));
rc = zmq_getsockopt (control_proxy, ZMQ_RCVMORE, &more, &moresz);
assert (rc == 0 && more == 0);
// check stats
if (is_verbose)
{
printf ("frontend: pkts_in=%lu bytes_in=%lu pkts_out=%lu bytes_out=%lu\n",
total_stats.frontend.msg_in, total_stats.frontend.bytes_in,
total_stats.frontend.msg_out, total_stats.frontend.bytes_out);
printf ("backend: pkts_in=%lu bytes_in=%lu pkts_out=%lu bytes_out=%lu\n",
total_stats.backend.msg_in, total_stats.backend.bytes_in,
total_stats.backend.msg_out, total_stats.backend.bytes_out);
printf ("clients sent out %d requests\n", zmq_atomic_counter_value(g_clients_pkts_out));
printf ("workers sent out %d replies\n", zmq_atomic_counter_value(g_workers_pkts_out));
}
assert( total_stats.frontend.msg_in == (unsigned)zmq_atomic_counter_value(g_clients_pkts_out) );
assert( total_stats.frontend.msg_out == (unsigned)zmq_atomic_counter_value(g_workers_pkts_out) );
assert( total_stats.backend.msg_in == (unsigned)zmq_atomic_counter_value(g_workers_pkts_out) );
assert( total_stats.backend.msg_out == (unsigned)zmq_atomic_counter_value(g_clients_pkts_out) );
rc = zmq_msg_close (&stats_msg);
assert (rc == 0);
}
// The main thread simply starts several clients and a server, and then // The main thread simply starts several clients and a server, and then
// waits for the server to finish. // waits for the server to finish.
...@@ -300,6 +406,11 @@ int main (void) ...@@ -300,6 +406,11 @@ int main (void)
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
g_clients_pkts_out = zmq_atomic_counter_new ();
g_workers_pkts_out = zmq_atomic_counter_new ();
// Control socket receives terminate command from main over inproc // Control socket receives terminate command from main over inproc
void *control = zmq_socket (ctx, ZMQ_PUB); void *control = zmq_socket (ctx, ZMQ_PUB);
assert (control); assert (control);
...@@ -309,6 +420,14 @@ int main (void) ...@@ -309,6 +420,14 @@ int main (void)
rc = zmq_bind (control, "inproc://control"); rc = zmq_bind (control, "inproc://control");
assert (rc == 0); assert (rc == 0);
// Control socket receives terminate command from main over inproc
void *control_proxy = zmq_socket (ctx, ZMQ_REQ);
assert (control_proxy);
rc = zmq_setsockopt (control_proxy, ZMQ_LINGER, &linger, sizeof (linger));
assert (rc == 0);
rc = zmq_bind (control_proxy, "inproc://control_proxy");
assert (rc == 0);
void *threads [QT_CLIENTS + 1]; void *threads [QT_CLIENTS + 1];
struct thread_data databags [QT_CLIENTS + 1]; struct thread_data databags [QT_CLIENTS + 1];
for (int i = 0; i < QT_CLIENTS; i++) { for (int i = 0; i < QT_CLIENTS; i++) {
...@@ -319,11 +438,34 @@ int main (void) ...@@ -319,11 +438,34 @@ int main (void)
threads[QT_CLIENTS] = zmq_threadstart (&server_task, ctx); threads[QT_CLIENTS] = zmq_threadstart (&server_task, ctx);
msleep (500); // Run for 500 ms then quit msleep (500); // Run for 500 ms then quit
if (is_verbose)
printf ("stopping all clients and server workers\n");
rc = zmq_send (control, "STOP", 4, 0);
assert (rc == 4);
msleep(500); // Wait for all clients and workers to STOP
if (is_verbose)
printf ("retrieving stats from the proxy\n");
check_proxy_stats(control_proxy);
if (is_verbose)
printf ("shutting down all clients and server workers\n");
rc = zmq_send (control, "TERMINATE", 9, 0); rc = zmq_send (control, "TERMINATE", 9, 0);
assert (rc == 9); assert (rc == 9);
if (is_verbose)
printf ("shutting down the proxy\n");
rc = zmq_send (control_proxy, "TERMINATE", 9, 0);
assert (rc == 9);
rc = zmq_close (control); rc = zmq_close (control);
assert (rc == 0); assert (rc == 0);
rc = zmq_close (control_proxy);
assert (rc == 0);
for (int i = 0; i < QT_CLIENTS + 1; i++) for (int i = 0; i < QT_CLIENTS + 1; i++)
zmq_threadclose (threads[i]); zmq_threadclose (threads[i]);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment