// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // A server to receive EchoRequest and send back EchoResponse. #include <gflags/gflags.h> #include <butil/logging.h> #include <brpc/server.h> #include <butil/atomicops.h> #include <butil/time.h> #include <butil/logging.h> #include <json2pb/json_to_pb.h> #include <bthread/timer_thread.h> #include <bthread/bthread.h> #include <cstdlib> #include <fstream> #include "cl_test.pb.h" DEFINE_int32(logoff_ms, 2000, "Maximum duration of server's LOGOFF state " "(waiting for client to close connection before server stops)"); DEFINE_int32(server_bthread_concurrency, 4, "Configuring the value of bthread_concurrency, For compute max qps, "); DEFINE_int32(server_sync_sleep_us, 2500, "Usleep time, each request will be executed once, For compute max qps"); // max qps = 1000 / 2.5 * 4 DEFINE_int32(control_server_port, 9000, ""); DEFINE_int32(echo_port, 9001, "TCP Port of echo server"); DEFINE_int32(cntl_port, 9000, "TCP Port of controller server"); DEFINE_string(case_file, "", "File path for test_cases"); DEFINE_int32(latency_change_interval_us, 50000, "Intervalt for server side changes the latency"); DEFINE_int32(server_max_concurrency, 0, "Echo Server's max_concurrency"); DEFINE_bool(use_usleep, false, "EchoServer uses ::usleep or bthread_usleep to simulate latency " "when processing requests"); bthread::TimerThread g_timer_thread; int cast_func(void* arg) { return *(int*)arg; } void DisplayStage(const test::Stage& stage) { std::string type; switch(stage.type()) { case test::FLUCTUATE: type = "Fluctuate"; break; case test::SMOOTH: type = "Smooth"; break; default: type = "Unknown"; } std::stringstream ss; ss << "Stage:[" << stage.lower_bound() << ':' << stage.upper_bound() << "]" << " , Type:" << type; LOG(INFO) << ss.str(); } butil::atomic<int> cnt(0); butil::atomic<int> atomic_sleep_time(0); bvar::PassiveStatus<int> atomic_sleep_time_bvar(cast_func, &atomic_sleep_time); namespace bthread { DECLARE_int32(bthread_concurrency); } void TimerTask(void* data); class EchoServiceImpl : public test::EchoService { public: EchoServiceImpl() : _stage_index(0) , _running_case(false) { }; virtual ~EchoServiceImpl() {}; void SetTestCase(const test::TestCase& test_case) { _test_case = test_case; _next_stage_start = _test_case.latency_stage_list(0).duration_sec() + butil::gettimeofday_s(); _stage_index = 0; _running_case = false; DisplayStage(_test_case.latency_stage_list(_stage_index)); } void StartTestCase() { CHECK(!_running_case); _running_case = true; UpdateLatency(); } void StopTestCase() { _running_case = false; } void UpdateLatency() { if (!_running_case) { return; } ComputeLatency(); g_timer_thread.schedule(TimerTask, (void*)this, butil::microseconds_from_now(FLAGS_latency_change_interval_us)); } virtual void Echo(google::protobuf::RpcController* cntl_base, const test::NotifyRequest* request, test::NotifyResponse* response, google::protobuf::Closure* done) { brpc::ClosureGuard done_guard(done); response->set_message("hello"); ::usleep(FLAGS_server_sync_sleep_us); if (FLAGS_use_usleep) { ::usleep(_latency.load(butil::memory_order_relaxed)); } else { bthread_usleep(_latency.load(butil::memory_order_relaxed)); } } void ComputeLatency() { if (_stage_index < _test_case.latency_stage_list_size() && butil::gettimeofday_s() > _next_stage_start) { ++_stage_index; if (_stage_index < _test_case.latency_stage_list_size()) { _next_stage_start += _test_case.latency_stage_list(_stage_index).duration_sec(); DisplayStage(_test_case.latency_stage_list(_stage_index)); } } if (_stage_index == _test_case.latency_stage_list_size()) { const test::Stage& latency_stage = _test_case.latency_stage_list(_stage_index - 1); if (latency_stage.type() == test::ChangeType::FLUCTUATE) { _latency.store((latency_stage.lower_bound() + latency_stage.upper_bound()) / 2, butil::memory_order_relaxed); } else if (latency_stage.type() == test::ChangeType::SMOOTH) { _latency.store(latency_stage.upper_bound(), butil::memory_order_relaxed); } return; } const test::Stage& latency_stage = _test_case.latency_stage_list(_stage_index); const int lower_bound = latency_stage.lower_bound(); const int upper_bound = latency_stage.upper_bound(); if (latency_stage.type() == test::FLUCTUATE) { _latency.store(butil::fast_rand_less_than(upper_bound - lower_bound) + lower_bound, butil::memory_order_relaxed); } else if (latency_stage.type() == test::SMOOTH) { int latency = lower_bound + (upper_bound - lower_bound) / double(latency_stage.duration_sec()) * (latency_stage.duration_sec() - _next_stage_start + butil::gettimeofday_s()); _latency.store(latency, butil::memory_order_relaxed); } else { LOG(FATAL) << "Wrong Type:" << latency_stage.type(); } } private: int _stage_index; int _next_stage_start; butil::atomic<int> _latency; test::TestCase _test_case; bool _running_case; }; void TimerTask(void* data) { EchoServiceImpl* echo_service = (EchoServiceImpl*)data; echo_service->UpdateLatency(); } class ControlServiceImpl : public test::ControlService { public: ControlServiceImpl() : _case_index(0) { LoadCaseSet(FLAGS_case_file); _echo_service = new EchoServiceImpl; if (_server.AddService(_echo_service, brpc::SERVER_OWNS_SERVICE) != 0) { LOG(FATAL) << "Fail to add service"; } g_timer_thread.start(NULL); } virtual ~ControlServiceImpl() { _echo_service->StopTestCase(); g_timer_thread.stop_and_join(); }; virtual void Notify(google::protobuf::RpcController* cntl_base, const test::NotifyRequest* request, test::NotifyResponse* response, google::protobuf::Closure* done) { brpc::ClosureGuard done_guard(done); const std::string& message = request->message(); LOG(INFO) << message; if (message == "ResetCaseSet") { _server.Stop(0); _server.Join(); _echo_service->StopTestCase(); LoadCaseSet(FLAGS_case_file); _case_index = 0; response->set_message("CaseSetReset"); } else if (message == "StartCase") { CHECK(!_server.IsRunning()) << "Continuous StartCase"; const test::TestCase& test_case = _case_set.test_case(_case_index++); _echo_service->SetTestCase(test_case); brpc::ServerOptions options; options.max_concurrency = FLAGS_server_max_concurrency; _server.MaxConcurrencyOf("test.EchoService.Echo") = test_case.max_concurrency(); _server.Start(FLAGS_echo_port, &options); _echo_service->StartTestCase(); response->set_message("CaseStarted"); } else if (message == "StopCase") { CHECK(_server.IsRunning()) << "Continuous StopCase"; _server.Stop(0); _server.Join(); _echo_service->StopTestCase(); response->set_message("CaseStopped"); } else { LOG(FATAL) << "Invalid message:" << message; response->set_message("Invalid Cntl Message"); } } private: void LoadCaseSet(const std::string& file_path) { std::ifstream ifs(file_path.c_str(), std::ios::in); if (!ifs) { LOG(FATAL) << "Fail to open case set file: " << file_path; } std::string case_set_json((std::istreambuf_iterator<char>(ifs)), std::istreambuf_iterator<char>()); test::TestCaseSet case_set; std::string err; if (!json2pb::JsonToProtoMessage(case_set_json, &case_set, &err)) { LOG(FATAL) << "Fail to trans case_set from json to protobuf message: " << err; } _case_set = case_set; ifs.close(); } brpc::Server _server; EchoServiceImpl* _echo_service; test::TestCaseSet _case_set; int _case_index; }; int main(int argc, char* argv[]) { // Parse gflags. We recommend you to use gflags as well. GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); bthread::FLAGS_bthread_concurrency= FLAGS_server_bthread_concurrency; brpc::Server server; ControlServiceImpl control_service_impl; if (server.AddService(&control_service_impl, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { LOG(ERROR) << "Fail to add service"; return -1; } if (server.Start(FLAGS_cntl_port, NULL) != 0) { LOG(ERROR) << "Fail to start EchoServer"; return -1; } server.RunUntilAskedToQuit(); return 0; }