Commit 1a4489f5 authored by TousakaRin's avatar TousakaRin

Fix bugs in calculating latency and qps frequency inaccuracies

parent 86222574
...@@ -31,9 +31,10 @@ DEFINE_string(cntl_server, "0.0.0.0:9000", "IP Address of server"); ...@@ -31,9 +31,10 @@ DEFINE_string(cntl_server, "0.0.0.0:9000", "IP Address of server");
DEFINE_string(echo_server, "0.0.0.0:9001", "IP Address of server"); DEFINE_string(echo_server, "0.0.0.0:9001", "IP Address of server");
DEFINE_int32(timeout_ms, 3000, "RPC timeout in milliseconds"); DEFINE_int32(timeout_ms, 3000, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 0, "Max retries(not including the first RPC)"); DEFINE_int32(max_retry, 0, "Max retries(not including the first RPC)");
DEFINE_int32(case_interval, 20, ""); DEFINE_int32(case_interval, 20, "Intervals for different test cases");
DEFINE_int32(client_frequent_interval_us, 10000, ""); DEFINE_int32(client_qps_change_interval_us, 50000,
DEFINE_string(case_file, "", ""); "The interval for client changes the sending speed");
DEFINE_string(case_file, "", "File path for test_cases");
void DisplayStage(const test::Stage& stage) { void DisplayStage(const test::Stage& stage) {
std::string type; std::string type;
...@@ -156,10 +157,9 @@ struct TestCaseContext { ...@@ -156,10 +157,9 @@ struct TestCaseContext {
void RunUpdateTask(void* data) { void RunUpdateTask(void* data) {
TestCaseContext* context = (TestCaseContext*)data; TestCaseContext* context = (TestCaseContext*)data;
bool should_continue = context->Update(); bool should_continue = context->Update();
timespec ts;
ts.tv_nsec = FLAGS_client_frequent_interval_us * 1000;
if (should_continue) { if (should_continue) {
bthread::get_global_timer_thread()->schedule(RunUpdateTask, data, ts); bthread::get_global_timer_thread()->schedule(RunUpdateTask, data,
butil::microseconds_from_now(FLAGS_client_qps_change_interval_us));
} else { } else {
context->running.store(false, butil::memory_order_release); context->running.store(false, butil::memory_order_release);
} }
...@@ -173,6 +173,7 @@ void RunCase(test::ControlService_Stub &cntl_stub, ...@@ -173,6 +173,7 @@ void RunCase(test::ControlService_Stub &cntl_stub,
options.protocol = FLAGS_protocol; options.protocol = FLAGS_protocol;
options.connection_type = FLAGS_connection_type; options.connection_type = FLAGS_connection_type;
options.timeout_ms = FLAGS_timeout_ms; options.timeout_ms = FLAGS_timeout_ms;
options.max_retry = FLAGS_max_retry;
if (channel.Init(FLAGS_echo_server.c_str(), &options) != 0) { if (channel.Init(FLAGS_echo_server.c_str(), &options) != 0) {
LOG(FATAL) << "Fail to initialize channel"; LOG(FATAL) << "Fail to initialize channel";
} }
...@@ -186,9 +187,8 @@ void RunCase(test::ControlService_Stub &cntl_stub, ...@@ -186,9 +187,8 @@ void RunCase(test::ControlService_Stub &cntl_stub,
CHECK(!cntl.Failed()) << "control failed"; CHECK(!cntl.Failed()) << "control failed";
TestCaseContext context(test_case); TestCaseContext context(test_case);
timespec ts; bthread::get_global_timer_thread()->schedule(RunUpdateTask, &context,
ts.tv_nsec = FLAGS_client_frequent_interval_us * 1000; butil::microseconds_from_now(FLAGS_client_qps_change_interval_us));
bthread::get_global_timer_thread()->schedule(RunUpdateTask, &context, ts);
while (context.running.load(butil::memory_order_acquire)) { while (context.running.load(butil::memory_order_acquire)) {
test::NotifyRequest echo_req; test::NotifyRequest echo_req;
......
...@@ -31,14 +31,18 @@ ...@@ -31,14 +31,18 @@
DEFINE_int32(logoff_ms, 2000, "Maximum duration of server's LOGOFF state " DEFINE_int32(logoff_ms, 2000, "Maximum duration of server's LOGOFF state "
"(waiting for client to close connection before server stops)"); "(waiting for client to close connection before server stops)");
DEFINE_int32(server_bthread_concurrency, 4, "For compute max qps"); DEFINE_int32(server_bthread_concurrency, 4, "For compute max qps");
DEFINE_int32(server_sync_sleep_us, 2500, "For compute max qps"); DEFINE_int32(server_sync_sleep_us, 2500, "For compute maximum qps");
// max qps = 1000 / 2.5 * 4 = 1600 // max qps = 1000 / 2.5 * 4 = 1600
DEFINE_int32(control_server_port, 9000, ""); DEFINE_int32(control_server_port, 9000, "");
DEFINE_int32(echo_port, 9001, ""); DEFINE_int32(echo_port, 9001, "TCP Port of echo server");
DEFINE_int32(cntl_port, 9000, "TCP Port of this server"); DEFINE_int32(cntl_port, 9000, "TCP Port of controller server");
DEFINE_string(case_file, "", ""); DEFINE_string(case_file, "", "File path for test_cases");
DEFINE_int32(server_frequent_interval_us, 10000, ""); DEFINE_int32(latency_change_interval_us, 50000, "Intervalt for server side changes the latency");
DEFINE_int32(server_max_concurrency, 0, "Echo Server's max_concurrency");
DEFINE_bool(use_usleep, false,
"EchoServer uses ::usleep or bthread_usleep to simulate latency "
"when processing requests");
bthread::TimerThread g_timer_thread; bthread::TimerThread g_timer_thread;
...@@ -110,9 +114,8 @@ public: ...@@ -110,9 +114,8 @@ public:
return; return;
} }
ComputeLatency(); ComputeLatency();
timespec ts; g_timer_thread.schedule(TimerTask, (void*)this,
ts.tv_nsec = FLAGS_server_frequent_interval_us * 1000; butil::microseconds_from_now(FLAGS_latency_change_interval_us));
g_timer_thread.schedule(TimerTask, (void*)this, ts);
} }
virtual void Echo(google::protobuf::RpcController* cntl_base, virtual void Echo(google::protobuf::RpcController* cntl_base,
...@@ -122,8 +125,12 @@ public: ...@@ -122,8 +125,12 @@ public:
brpc::ClosureGuard done_guard(done); brpc::ClosureGuard done_guard(done);
response->set_message("hello"); response->set_message("hello");
::usleep(FLAGS_server_sync_sleep_us); ::usleep(FLAGS_server_sync_sleep_us);
if (FLAGS_use_usleep) {
::usleep(_latency.load(butil::memory_order_relaxed));
} else {
bthread_usleep(_latency.load(butil::memory_order_relaxed)); bthread_usleep(_latency.load(butil::memory_order_relaxed));
} }
}
void ComputeLatency() { void ComputeLatency() {
if (_stage_index < _test_case.latency_stage_list_size() && if (_stage_index < _test_case.latency_stage_list_size() &&
...@@ -213,9 +220,11 @@ public: ...@@ -213,9 +220,11 @@ public:
CHECK(!_server.IsRunning()) << "Continuous StartCase"; CHECK(!_server.IsRunning()) << "Continuous StartCase";
const test::TestCase& test_case = _case_set.test_case(_case_index++); const test::TestCase& test_case = _case_set.test_case(_case_index++);
_echo_service->SetTestCase(test_case); _echo_service->SetTestCase(test_case);
brpc::ServerOptions options;
// options.max_concurrency = 15;
_server.MaxConcurrencyOf("test.EchoService.Echo") = test_case.max_concurrency(); _server.MaxConcurrencyOf("test.EchoService.Echo") = test_case.max_concurrency();
_server.Start(FLAGS_echo_port, NULL); _server.Start(FLAGS_echo_port, &options);
_echo_service->StartTestCase(); _echo_service->StartTestCase();
response->set_message("CaseStarted"); response->set_message("CaseStarted");
} else if (message == "StopCase") { } else if (message == "StopCase") {
......
--ABTest=true --case_file=test_case_test
--auto_cl_min_reserved_concurrency=20 --client_qps_change_interval_us=50000
--case_file=test_case.json --max_retry=0
--client_frequent_interval_us=5000000 --auto_cl_overload_threshold=0.3
--auto_cl_initial_max_concurrency=16
--server_frequent_interval_us=5000000 --latency_change_interval_us=50000
--server_bthread_concurrency=4
--server_sync_sleep_us=2500
--use_usleep=false
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
{ {
"lower_bound":3000, "lower_bound":3000,
"upper_bound":3000, "upper_bound":3000,
"duration_sec":3, "duration_sec":30,
"type":2 "type":2
} }
], ],
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
{ {
"lower_bound":20000, "lower_bound":20000,
"upper_bound":20000, "upper_bound":20000,
"duration_sec":3, "duration_sec":30,
"type":2 "type":2
} }
] ]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment