// Copyright (c) 2014 Baidu, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // A server to receive EchoRequest and send back EchoResponse. #include <vector> #include <gflags/gflags.h> #include <base/time.h> #include <base/logging.h> #include <base/string_printf.h> #include <base/string_splitter.h> #include <base/rand_util.h> #include <brpc/server.h> #include "echo.pb.h" DEFINE_bool(echo_attachment, true, "Echo attachment as well"); DEFINE_int32(port, 8004, "TCP Port of this server"); DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no " "read/write operations during the last `idle_timeout_s'"); DEFINE_int32(logoff_ms, 2000, "Maximum duration of server's LOGOFF state " "(waiting for client to close connection before server stops)"); DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel"); DEFINE_int32(server_num, 1, "Number of servers"); DEFINE_string(sleep_us, "", "Sleep so many microseconds before responding"); DEFINE_bool(spin, false, "spin rather than sleep"); DEFINE_double(exception_ratio, 0.1, "Percentage of irregular latencies"); DEFINE_double(min_ratio, 0.2, "min_sleep / sleep_us"); DEFINE_double(max_ratio, 10, "max_sleep / sleep_us"); // Your implementation of example::EchoService class EchoServiceImpl : public example::EchoService { public: EchoServiceImpl() : _index(0) {} virtual ~EchoServiceImpl() {}; void set_index(size_t index, int64_t sleep_us) { _index = index; _sleep_us = sleep_us; } virtual void Echo(google::protobuf::RpcController* cntl_base, const example::EchoRequest* request, example::EchoResponse* response, google::protobuf::Closure* done) { brpc::ClosureGuard done_guard(done); brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); if (_sleep_us > 0) { double delay = _sleep_us; const double a = FLAGS_exception_ratio * 0.5; if (a >= 0.0001) { double x = base::RandDouble(); if (x < a) { const double min_sleep_us = FLAGS_min_ratio * _sleep_us; delay = min_sleep_us + (_sleep_us - min_sleep_us) * x / a; } else if (x + a > 1) { const double max_sleep_us = FLAGS_max_ratio * _sleep_us; delay = _sleep_us + (max_sleep_us - _sleep_us) * (x + a - 1) / a; } } if (FLAGS_spin) { int64_t end_time = base::gettimeofday_us() + (int64_t)delay; while (base::gettimeofday_us() < end_time) {} } else { bthread_usleep((int64_t)delay); } } // Echo request and its attachment response->set_message(request->message()); if (FLAGS_echo_attachment) { cntl->response_attachment().append(cntl->request_attachment()); } _nreq << 1; } size_t num_requests() const { return _nreq.get_value(); } private: size_t _index; int64_t _sleep_us; bvar::Adder<size_t> _nreq; }; int main(int argc, char* argv[]) { // Parse gflags. We recommend you to use gflags as well. google::ParseCommandLineFlags(&argc, &argv, true); if (FLAGS_server_num <= 0) { LOG(ERROR) << "server_num must be positive"; return -1; } // We need multiple servers in this example. brpc::Server* servers = new brpc::Server[FLAGS_server_num]; // For more options see `brpc/server.h'. brpc::ServerOptions options; options.idle_timeout_sec = FLAGS_idle_timeout_s; options.max_concurrency = FLAGS_max_concurrency; base::StringSplitter sp(FLAGS_sleep_us.c_str(), ','); std::vector<int64_t> sleep_list; for (; sp; ++sp) { sleep_list.push_back(strtoll(sp.field(), NULL, 10)); } if (sleep_list.empty()) { sleep_list.push_back(0); } // Instance of your services. EchoServiceImpl* echo_service_impls = new EchoServiceImpl[FLAGS_server_num]; // Add the service into servers. Notice the second parameter, because the // service is put on stack, we don't want server to delete it, otherwise // use brpc::SERVER_OWNS_SERVICE. for (int i = 0; i < FLAGS_server_num; ++i) { int64_t sleep_us = sleep_list[(size_t)i < sleep_list.size() ? i : (sleep_list.size() - 1)]; echo_service_impls[i].set_index(i, sleep_us); // will be shown on /version page servers[i].set_version(base::string_printf( "example/dynamic_partition_echo_c++[%d]", i)); if (servers[i].AddService(&echo_service_impls[i], brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { LOG(ERROR) << "Fail to add service"; return -1; } // Start the server. int port = FLAGS_port + i; if (servers[i].Start(port, &options) != 0) { LOG(ERROR) << "Fail to start EchoServer"; return -1; } } // Service logic are running in separate worker threads, for main thread, // we don't have much to do, just spinning. std::vector<size_t> last_num_requests(FLAGS_server_num); while (!brpc::IsAskedToQuit()) { sleep(1); size_t cur_total = 0; for (int i = 0; i < FLAGS_server_num; ++i) { const size_t current_num_requests = echo_service_impls[i].num_requests(); size_t diff = current_num_requests - last_num_requests[i]; cur_total += diff; last_num_requests[i] = current_num_requests; LOG(INFO) << "S[" << i << "]=" << diff << ' ' << noflush; } LOG(INFO) << "[total=" << cur_total << ']'; } // Don't forget to stop and join the server otherwise still-running // worker threads may crash your program. Clients will have/ at most // `FLAGS_logoff_ms' to close their connections. If some connections // still remains after `FLAGS_logoff_ms', they will be closed by force. for (int i = 0; i < FLAGS_server_num; ++i) { servers[i].Stop(FLAGS_logoff_ms); } for (int i = 0; i < FLAGS_server_num; ++i) { servers[i].Join(); } delete [] servers; delete [] echo_service_impls; return 0; }