Unverified Commit 6ba83745 authored by Ge Jun's avatar Ge Jun Committed by GitHub

Merge pull request #380 from TousakaRin/auto_concurrency_limiter

Auto concurrency limiter
parents d84ba761 6c220435
cmake_minimum_required(VERSION 2.8.10)
project(asynchronous_echo_c++ C CXX)
option(EXAMPLE_LINK_SO "Whether examples are linked dynamically" OFF)
execute_process(
COMMAND bash -c "find ${CMAKE_SOURCE_DIR}/../.. -type d -regex \".*output/include$\" | head -n1 | xargs dirname | tr -d '\n'"
OUTPUT_VARIABLE OUTPUT_PATH
)
set(CMAKE_PREFIX_PATH ${OUTPUT_PATH})
include(FindThreads)
include(FindProtobuf)
protobuf_generate_cpp(PROTO_SRC PROTO_HEADER echo.proto)
# include PROTO_HEADER
include_directories(${CMAKE_CURRENT_BINARY_DIR})
find_path(BRPC_INCLUDE_PATH NAMES brpc/server.h)
if(EXAMPLE_LINK_SO)
find_library(BRPC_LIB NAMES brpc)
else()
find_library(BRPC_LIB NAMES libbrpc.a brpc)
endif()
if((NOT BRPC_INCLUDE_PATH) OR (NOT BRPC_LIB))
message(FATAL_ERROR "Fail to find brpc")
endif()
include_directories(${BRPC_INCLUDE_PATH})
find_path(GFLAGS_INCLUDE_PATH gflags/gflags.h)
find_library(GFLAGS_LIBRARY NAMES gflags libgflags)
if((NOT GFLAGS_INCLUDE_PATH) OR (NOT GFLAGS_LIBRARY))
message(FATAL_ERROR "Fail to find gflags")
endif()
include_directories(${GFLAGS_INCLUDE_PATH})
execute_process(
COMMAND bash -c "grep \"namespace [_A-Za-z0-9]\\+ {\" ${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $2}' | tr -d '\n'"
OUTPUT_VARIABLE GFLAGS_NS
)
if(${GFLAGS_NS} STREQUAL "GFLAGS_NAMESPACE")
execute_process(
COMMAND bash -c "grep \"#define GFLAGS_NAMESPACE [_A-Za-z0-9]\\+\" ${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $3}' | tr -d '\n'"
OUTPUT_VARIABLE GFLAGS_NS
)
endif()
if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
include(CheckFunctionExists)
CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME)
if(NOT HAVE_CLOCK_GETTIME)
set(DEFINE_CLOCK_GETTIME "-DNO_CLOCK_GETTIME_IN_MAC")
endif()
endif()
set(CMAKE_CPP_FLAGS "${DEFINE_CLOCK_GETTIME} -DGFLAGS_NS=${GFLAGS_NS}")
set(CMAKE_CXX_FLAGS "${CMAKE_CPP_FLAGS} -DNDEBUG -O2 -D__const__= -pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer")
if(CMAKE_VERSION VERSION_LESS "3.1.3")
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
endif()
if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
endif()
else()
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
endif()
find_path(LEVELDB_INCLUDE_PATH NAMES leveldb/db.h)
find_library(LEVELDB_LIB NAMES leveldb)
if ((NOT LEVELDB_INCLUDE_PATH) OR (NOT LEVELDB_LIB))
message(FATAL_ERROR "Fail to find leveldb")
endif()
include_directories(${LEVELDB_INCLUDE_PATH})
find_library(SSL_LIB NAMES ssl)
if (NOT SSL_LIB)
message(FATAL_ERROR "Fail to find ssl")
endif()
find_library(CRYPTO_LIB NAMES crypto)
if (NOT CRYPTO_LIB)
message(FATAL_ERROR "Fail to find crypto")
endif()
set(DYNAMIC_LIB
${CMAKE_THREAD_LIBS_INIT}
${GFLAGS_LIBRARY}
${PROTOBUF_LIBRARIES}
${LEVELDB_LIB}
${SSL_LIB}
${CRYPTO_LIB}
dl
)
if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
set(DYNAMIC_LIB ${DYNAMIC_LIB}
pthread
"-framework CoreFoundation"
"-framework CoreGraphics"
"-framework CoreData"
"-framework CoreText"
"-framework Security"
"-framework Foundation"
"-Wl,-U,_MallocExtension_ReleaseFreeMemory"
"-Wl,-U,_ProfilerStart"
"-Wl,-U,_ProfilerStop"
"-Wl,-U,_RegisterThriftProtocol")
endif()
add_executable(asynchronous_echo_client client.cpp ${PROTO_SRC})
add_executable(asynchronous_echo_server server.cpp ${PROTO_SRC})
target_link_libraries(asynchronous_echo_client ${BRPC_LIB} ${DYNAMIC_LIB})
target_link_libraries(asynchronous_echo_server ${BRPC_LIB} ${DYNAMIC_LIB})
syntax="proto2";
package test;
option cc_generic_services = true;
message NotifyRequest {
required string message = 1;
};
message NotifyResponse {
required string message = 1;
};
enum ChangeType {
FLUCTUATE = 1; // Fluctuating between upper and lower bound
SMOOTH = 2; // Smoothly rising from the lower bound to the upper bound
}
message Stage {
required int32 lower_bound = 1;
required int32 upper_bound = 2;
required int32 duration_sec = 3;
required ChangeType type = 4;
}
message TestCase {
required string case_name = 1;
required string max_concurrency = 2;
repeated Stage qps_stage_list = 3;
repeated Stage latency_stage_list = 4;
}
message TestCaseSet {
repeated TestCase test_case = 1;
}
service ControlService {
rpc Notify(NotifyRequest) returns (NotifyResponse);
}
service EchoService {
rpc Echo(NotifyRequest) returns (NotifyResponse);
};
// Copyright (c) 2014 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// A client sending requests to server asynchronously every 1 second.
#include <gflags/gflags.h>
#include <butil/logging.h>
#include <butil/time.h>
#include <brpc/channel.h>
#include <bvar/bvar.h>
#include <bthread/timer_thread.h>
#include <json2pb/json_to_pb.h>
#include <fstream>
#include "cl_test.pb.h"
DEFINE_string(protocol, "baidu_std", "Protocol type. Defined in src/brpc/options.proto");
DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short");
DEFINE_string(cntl_server, "0.0.0.0:9000", "IP Address of server");
DEFINE_string(echo_server, "0.0.0.0:9001", "IP Address of server");
DEFINE_int32(timeout_ms, 3000, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 0, "Max retries(not including the first RPC)");
DEFINE_int32(case_interval, 20, "Intervals for different test cases");
DEFINE_int32(client_qps_change_interval_us, 50000,
"The interval for client changes the sending speed");
DEFINE_string(case_file, "", "File path for test_cases");
void DisplayStage(const test::Stage& stage) {
std::string type;
switch(stage.type()) {
case test::FLUCTUATE:
type = "Fluctuate";
break;
case test::SMOOTH:
type = "Smooth";
break;
default:
type = "Unknown";
}
std::stringstream ss;
ss
<< "Stage:[" << stage.lower_bound() << ':'
<< stage.upper_bound() << "]"
<< " , Type:" << type;
LOG(INFO) << ss.str();
}
uint32_t cast_func(void* arg) {
return *(uint32_t*)arg;
}
butil::atomic<uint32_t> g_timeout(0);
butil::atomic<uint32_t> g_error(0);
butil::atomic<uint32_t> g_succ(0);
bvar::PassiveStatus<uint32_t> g_timeout_bvar(cast_func, &g_timeout);
bvar::PassiveStatus<uint32_t> g_error_bvar(cast_func, &g_error);
bvar::PassiveStatus<uint32_t> g_succ_bvar(cast_func, &g_succ);
bvar::LatencyRecorder g_latency_rec;
void LoadCaseSet(test::TestCaseSet* case_set, const std::string& file_path) {
std::ifstream ifs(file_path.c_str(), std::ios::in);
if (!ifs) {
LOG(FATAL) << "Fail to open case set file: " << file_path;
}
std::string case_set_json((std::istreambuf_iterator<char>(ifs)),
std::istreambuf_iterator<char>());
std::string err;
if (!json2pb::JsonToProtoMessage(case_set_json, case_set, &err)) {
LOG(FATAL)
<< "Fail to trans case_set from json to protobuf message: "
<< err;
}
}
void HandleEchoResponse(
brpc::Controller* cntl,
test::NotifyResponse* response) {
// std::unique_ptr makes sure cntl/response will be deleted before returning.
std::unique_ptr<brpc::Controller> cntl_guard(cntl);
std::unique_ptr<test::NotifyResponse> response_guard(response);
if (cntl->Failed() && cntl->ErrorCode() == brpc::ERPCTIMEDOUT) {
g_timeout.fetch_add(1, butil::memory_order_relaxed);
LOG_EVERY_N(INFO, 1000) << cntl->ErrorText();
} else if (cntl->Failed()) {
g_error.fetch_add(1, butil::memory_order_relaxed);
LOG_EVERY_N(INFO, 1000) << cntl->ErrorText();
} else {
g_succ.fetch_add(1, butil::memory_order_relaxed);
g_latency_rec << cntl->latency_us();
}
}
void Expose() {
g_timeout_bvar.expose_as("cl", "timeout");
g_error_bvar.expose_as("cl", "failed");
g_succ_bvar.expose_as("cl", "succ");
g_latency_rec.expose("cl");
}
struct TestCaseContext {
TestCaseContext(const test::TestCase& tc)
: running(true)
, stage_index(0)
, test_case(tc)
, next_stage_sec(test_case.qps_stage_list(0).duration_sec() +
butil::gettimeofday_s()) {
DisplayStage(test_case.qps_stage_list(stage_index));
Update();
}
bool Update() {
if (butil::gettimeofday_s() >= next_stage_sec) {
++stage_index;
if (stage_index < test_case.qps_stage_list_size()) {
next_stage_sec += test_case.qps_stage_list(stage_index).duration_sec();
DisplayStage(test_case.qps_stage_list(stage_index));
} else {
return false;
}
}
int qps = 0;
const test::Stage& qps_stage = test_case.qps_stage_list(stage_index);
const int lower_bound = qps_stage.lower_bound();
const int upper_bound = qps_stage.upper_bound();
if (qps_stage.type() == test::FLUCTUATE) {
qps = butil::fast_rand_less_than(upper_bound - lower_bound) + lower_bound;
} else if (qps_stage.type() == test::SMOOTH) {
qps = lower_bound + (upper_bound - lower_bound) /
double(qps_stage.duration_sec()) * (qps_stage.duration_sec() - next_stage_sec
+ butil::gettimeofday_s());
}
interval_us.store(1.0 / qps * 1000000, butil::memory_order_relaxed);
return true;
}
butil::atomic<bool> running;
butil::atomic<int64_t> interval_us;
int stage_index;
const test::TestCase test_case;
int next_stage_sec;
};
void RunUpdateTask(void* data) {
TestCaseContext* context = (TestCaseContext*)data;
bool should_continue = context->Update();
if (should_continue) {
bthread::get_global_timer_thread()->schedule(RunUpdateTask, data,
butil::microseconds_from_now(FLAGS_client_qps_change_interval_us));
} else {
context->running.store(false, butil::memory_order_release);
}
}
void RunCase(test::ControlService_Stub &cntl_stub,
const test::TestCase& test_case) {
LOG(INFO) << "Running case:`" << test_case.case_name() << '\'';
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = FLAGS_protocol;
options.connection_type = FLAGS_connection_type;
options.timeout_ms = FLAGS_timeout_ms;
options.max_retry = FLAGS_max_retry;
if (channel.Init(FLAGS_echo_server.c_str(), &options) != 0) {
LOG(FATAL) << "Fail to initialize channel";
}
test::EchoService_Stub echo_stub(&channel);
test::NotifyRequest cntl_req;
test::NotifyResponse cntl_rsp;
brpc::Controller cntl;
cntl_req.set_message("StartCase");
cntl_stub.Notify(&cntl, &cntl_req, &cntl_rsp, NULL);
CHECK(!cntl.Failed()) << "control failed";
TestCaseContext context(test_case);
bthread::get_global_timer_thread()->schedule(RunUpdateTask, &context,
butil::microseconds_from_now(FLAGS_client_qps_change_interval_us));
while (context.running.load(butil::memory_order_acquire)) {
test::NotifyRequest echo_req;
echo_req.set_message("hello");
brpc::Controller* echo_cntl = new brpc::Controller;
test::NotifyResponse* echo_rsp = new test::NotifyResponse;
google::protobuf::Closure* done = brpc::NewCallback(
&HandleEchoResponse, echo_cntl, echo_rsp);
echo_stub.Echo(echo_cntl, &echo_req, echo_rsp, done);
::usleep(context.interval_us.load(butil::memory_order_relaxed));
}
LOG(INFO) << "Waiting to stop case: `" << test_case.case_name() << '\'';
::sleep(FLAGS_case_interval);
cntl.Reset();
cntl_req.set_message("StopCase");
cntl_stub.Notify(&cntl, &cntl_req, &cntl_rsp, NULL);
CHECK(!cntl.Failed()) << "control failed";
LOG(INFO) << "Case `" << test_case.case_name() << "' finshed:";
}
int main(int argc, char* argv[]) {
// Parse gflags. We recommend you to use gflags as well.
GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
Expose();
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = FLAGS_protocol;
options.connection_type = FLAGS_connection_type;
options.timeout_ms = FLAGS_timeout_ms;
if (channel.Init(FLAGS_cntl_server.c_str(), &options) != 0) {
LOG(ERROR) << "Fail to initialize channel";
return -1;
}
test::ControlService_Stub cntl_stub(&channel);
test::TestCaseSet case_set;
LoadCaseSet(&case_set, FLAGS_case_file);
brpc::Controller cntl;
test::NotifyRequest cntl_req;
test::NotifyResponse cntl_rsp;
cntl_req.set_message("ResetCaseSet");
cntl_stub.Notify(&cntl, &cntl_req, &cntl_rsp, NULL);
CHECK(!cntl.Failed()) << "Cntl Failed";
for (int i = 0; i < case_set.test_case_size(); ++i) {
RunCase(cntl_stub, case_set.test_case(i));
}
LOG(INFO) << "EchoClient is going to quit";
return 0;
}
This diff is collapsed.
--case_file=test_case_test
--client_qps_change_interval_us=50000
--max_retry=0
--auto_cl_overload_threshold=0.3
--auto_cl_initial_max_concurrency=16
--latency_change_interval_us=50000
--server_bthread_concurrency=4
--server_sync_sleep_us=2500
--use_usleep=false
--case_file=test_case_test
--client_qps_change_interval_us=50000
--max_retry=0
--auto_cl_overload_threshold=0.3
--auto_cl_initial_max_concurrency=16
--latency_change_interval_us=50000
--server_bthread_concurrency=16
--server_max_concurrency=15
--server_sync_sleep_us=2500
--use_usleep=true
{"test_case":[
{
"case_name":"CheckPeakQps",
"max_concurrency":"140",
"qps_stage_list":
[
{
"lower_bound":3000,
"upper_bound":3000,
"duration_sec":30,
"type":2
}
],
"latency_stage_list":
[
{
"lower_bound":20000,
"upper_bound":20000,
"duration_sec":30,
"type":2
}
]
},
{
"case_name":"qps_stable_noload, latency_raise_smooth",
"max_concurrency":"auto",
"qps_stage_list":
[
{
"lower_bound":100,
"upper_bound":1500,
"duration_sec":10,
"type":2
},
{
"lower_bound":1500,
"upper_bound":1500,
"duration_sec":190,
"type":2
}
],
"latency_stage_list":
[
{
"lower_bound":2000,
"upper_bound":90000,
"duration_sec":200,
"type":2
}
]
},
{
"case_name":"qps_fluctuate_noload, latency_stable",
"max_concurrency":"auto",
"qps_stage_list":
[
{
"lower_bound":100,
"upper_bound":300,
"duration_sec":10,
"type":2
},
{
"lower_bound":300,
"upper_bound":1800,
"duration_sec":290,
"type":1
}
],
"latency_stage_list":
[
{
"lower_bound":40000,
"upper_bound":40000,
"duration_sec":300,
"type":1
}
]
},
{
"case_name":"qps_stable_overload, latency_stable",
"max_concurrency":"auto",
"qps_stage_list":
[
{
"lower_bound":200,
"upper_bound":3000,
"duration_sec":20,
"type":2
},
{
"lower_bound":3000,
"upper_bound":3000,
"duration_sec":180,
"type":2
}
],
"latency_stage_list":
[
{
"lower_bound":40000,
"upper_bound":40000,
"duration_sec":200,
"type":2
}
]
},
{
"case_name":"qps_stable_overload, latency_raise_smooth",
"max_concurrency":"auto",
"qps_stage_list":
[
{
"lower_bound":200,
"upper_bound":3000,
"duration_sec":20,
"type":2
},
{
"lower_bound":3000,
"upper_bound":3000,
"duration_sec":180,
"type":2
}
],
"latency_stage_list":
[
{
"lower_bound":30000,
"upper_bound":80000,
"duration_sec":200,
"type":2
}
]
},
{
"case_name":"qps_overload_then_noload, latency_stable",
"max_concurrency":"auto",
"qps_stage_list":
[
{
"lower_bound":200,
"upper_bound":2500,
"duration_sec":20,
"type":2
},
{
"lower_bound":2500,
"upper_bound":2500,
"duration_sec":150,
"type":2
},
{
"lower_bound":2500,
"upper_bound":1000,
"duration_sec":20,
"type":2
},
{
"lower_bound":1000,
"upper_bound":1000,
"duration_sec":150,
"type":2
}
],
"latency_stage_list":
[
{
"lower_bound":30000,
"upper_bound":30000,
"duration_sec":200,
"type":2
}
]
},
{
"case_name":"qps_noload_to_overload, latency_stable",
"max_concurrency":"auto",
"qps_stage_list":
[
{
"lower_bound":200,
"upper_bound":3000,
"duration_sec":150,
"type":2
},
{
"lower_bound":3000,
"upper_bound":3000,
"duration_sec":150,
"type":2
}
],
"latency_stage_list":
[
{
"lower_bound":30000,
"upper_bound":30000,
"duration_sec":200,
"type":2
}
]
},
{
"case_name":"qps_fluctuate_noload, latency_fluctuate_noload",
"max_concurrency":"auto",
"qps_stage_list":
[
{
"lower_bound":100,
"upper_bound":300,
"duration_sec":10,
"type":2
},
{
"lower_bound":300,
"upper_bound":1800,
"duration_sec":190,
"type":1
}
],
"latency_stage_list":
[
{
"lower_bound":30000,
"upper_bound":50000,
"duration_sec":200,
"type":1
}
]
},
{
"case_name":"qps_stable_noload, latency_leap_raise",
"max_concurrency":"auto",
"qps_stage_list":
[
{
"lower_bound":300,
"upper_bound":1800,
"duration_sec":20,
"type":2
},
{
"lower_bound":1800,
"upper_bound":1800,
"duration_sec":220,
"type":2
}
],
"latency_stage_list":
[
{
"lower_bound":30000,
"upper_bound":30000,
"duration_sec":100,
"type":2
},
{
"lower_bound":50000,
"upper_bound":50000,
"duration_sec":100,
"type":2
}
]
}
]}
// Copyright (c) 2014 Baidu, Inc.G
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Authors: Lei He (helei@qiyi.com)
#include <cstring>
#include <strings.h>
#include "butil/logging.h"
#include "butil/strings/string_number_conversions.h"
#include "brpc/adaptive_max_concurrency.h"
namespace brpc {
inline bool CompareStringPieceWithoutCase(
const butil::StringPiece& s1, const char* s2) {
DCHECK(s2 != NULL);
if (std::strlen(s2) != s1.size()) {
return false;
}
return ::strncasecmp(s1.data(), s2, s1.size()) == 0;
}
AdaptiveMaxConcurrency::AdaptiveMaxConcurrency(const butil::StringPiece& name) {
if (butil::StringToInt(name, &_max_concurrency) && _max_concurrency >= 0) {
_name = "constant";
} else if (_max_concurrency < 0) {
LOG(FATAL) << "Invalid max_concurrency: " << name;
} else {
_name.assign(name.begin(), name.end());
_max_concurrency = 0;
}
}
void AdaptiveMaxConcurrency::operator=(const butil::StringPiece& name) {
int max_concurrency = 0;
if (butil::StringToInt(name, &max_concurrency) && max_concurrency >= 0) {
_name = "constant";
_max_concurrency = max_concurrency;
} else if (max_concurrency < 0) {
LOG(ERROR) << "Fail to set max_concurrency, invalid value:" << name;
} else if (CompareStringPieceWithoutCase(name, "constant")) {
LOG(WARNING)
<< "If you want to use a constant maximum concurrency, assign "
<< "an integer value directly to ServerOptions.max_concurrency "
<< "like: `server_options.max_concurrency = 1000`";
_name.assign(name.begin(), name.end());
_max_concurrency = 0;
} else {
_name.assign(name.begin(), name.end());
_max_concurrency = 0;
}
}
bool operator==(const AdaptiveMaxConcurrency& adaptive_concurrency,
const butil::StringPiece& concurrency) {
return CompareStringPieceWithoutCase(concurrency,
adaptive_concurrency.name().c_str());
}
} // namespace brpc
// Copyright (c) 2014 Baidu, Inc.G
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Authors: Lei He (helei@qiyi.com)
#ifndef BRPC_ADAPTIVE_MAX_CONCURRENCY_H
#define BRPC_ADAPTIVE_MAX_CONCURRENCY_H
// To brpc developers: This is a header included by user, don't depend
// on internal structures, use opaque pointers instead.
#include "butil/strings/string_piece.h"
#include "brpc/options.pb.h"
namespace brpc {
class AdaptiveMaxConcurrency{
public:
AdaptiveMaxConcurrency()
: _name("constant")
, _max_concurrency(0) {}
AdaptiveMaxConcurrency(int max_concurrency)
: _name("constant")
, _max_concurrency(max_concurrency) {}
// Non-trivial destructor to prevent AdaptiveMaxConcurrency from being
// passed to variadic arguments without explicit type conversion.
// eg:
// printf("%d", options.max_concurrency) // compile error
// printf("%d", static_cast<int>(options.max_concurrency) // ok
~AdaptiveMaxConcurrency() {}
AdaptiveMaxConcurrency(const butil::StringPiece& name);
void operator=(int max_concurrency) {
_name = "constant";
_max_concurrency = max_concurrency;
}
void operator=(const butil::StringPiece& name);
operator int() const { return _max_concurrency; }
const std::string& name() const { return _name; }
private:
std::string _name;
int _max_concurrency;
};
bool operator==(const AdaptiveMaxConcurrency& adaptive_concurrency,
const butil::StringPiece& concurrency);
inline bool operator==(const butil::StringPiece& concurrency,
const AdaptiveMaxConcurrency& adaptive_concurrency) {
return adaptive_concurrency == concurrency;
}
inline bool operator!=(const AdaptiveMaxConcurrency& adaptive_concurrency,
const butil::StringPiece& concurrency) {
return !(adaptive_concurrency == concurrency);
}
inline bool operator!=(const butil::StringPiece& concurrency,
const AdaptiveMaxConcurrency& adaptive_concurrency) {
return !(adaptive_concurrency == concurrency);
}
} // namespace brpc
#endif // BRPC_ADAPTIVE_MAX_CONCURRENCY_H
...@@ -95,9 +95,12 @@ void StatusService::default_method(::google::protobuf::RpcController* cntl_base, ...@@ -95,9 +95,12 @@ void StatusService::default_method(::google::protobuf::RpcController* cntl_base,
<< "_connection_count\" class=\"flot-placeholder\"></div></div>"; << "_connection_count\" class=\"flot-placeholder\"></div></div>";
} }
os << '\n'; os << '\n';
const int max_concurrency = server->options().max_concurrency; const AdaptiveMaxConcurrency& max_concurrency =
if (max_concurrency > 0) { server->options().max_concurrency;
os << "max_concurrency: " << max_concurrency << '\n'; if (max_concurrency == "constant") {
os << "max_concurrency: " << static_cast<int>(max_concurrency) << '\n';
} else {
os << "concurrency limiter: " << max_concurrency.name() << '\n';
} }
os << '\n'; os << '\n';
......
// Copyright (c) 2014 Baidu, Inc.G
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Authors: Lei He (helei@qiyi.com)
#include "brpc/concurrency_limiter.h"
namespace brpc {
ConcurrencyLimiter* ConcurrencyLimiter::CreateConcurrencyLimiterOrDie(
const AdaptiveMaxConcurrency& max_concurrency) {
const ConcurrencyLimiter* cl =
ConcurrencyLimiterExtension()->Find(max_concurrency.name().c_str());
CHECK(cl != NULL)
<< "Fail to find ConcurrencyLimiter by `"
<< max_concurrency.name() << "'";
ConcurrencyLimiter* cl_copy = cl->New();
CHECK(cl_copy != NULL) << "Fail to new ConcurrencyLimiter";
if (max_concurrency == "constant") {
cl_copy->_max_concurrency = max_concurrency;
}
return cl_copy;
}
} // namespace brpc
// Copyright (c) 2014 Baidu, Inc.G
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Authors: Lei He (helei@qiyi.com)
#ifndef BRPC_CONCURRENCY_LIMITER_H
#define BRPC_CONCURRENCY_LIMITER_H
#include "brpc/describable.h"
#include "brpc/destroyable.h"
#include "brpc/extension.h" // Extension<T>
#include "brpc/adaptive_max_concurrency.h" // AdaptiveMaxConcurrency
namespace brpc {
class ConcurrencyLimiter : public Destroyable {
public:
ConcurrencyLimiter() : _max_concurrency(0) {}
// This method should be called each time a request comes in. It returns
// false when the concurrency reaches the upper limit, otherwise it
// returns true. Normally, when OnRequested returns false, you should
// return an ELIMIT error directly.
virtual bool OnRequested() = 0;
// Each request should call this method before responding.
// `error_code' : Error code obtained from the controller, 0 means success.
// `latency' : Microseconds taken by RPC.
// NOTE: Even if OnRequested returns false, after sending ELIMIT, you
// still need to call OnResponded.
virtual void OnResponded(int error_code, int64_t latency_us) = 0;
// Returns the current maximum concurrency. Note that the maximum
// concurrency of some ConcurrencyLimiters(eg: `auto', `gradient')
// is dynamically changing.
int max_concurrency() { return _max_concurrency; };
// Expose internal vars. NOT thread-safe.
// Return 0 on success, -1 otherwise.
virtual int Expose(const butil::StringPiece& prefix) = 0;
// Create/destroy an instance.
// Caller is responsible for Destroy() the instance after usage.
virtual ConcurrencyLimiter* New() const = 0;
virtual ~ConcurrencyLimiter() {}
// Create ConcurrencyLimiter* and coredump if it fails.
// Caller is responsible for Destroy() the instance after usage.
static ConcurrencyLimiter* CreateConcurrencyLimiterOrDie(
const AdaptiveMaxConcurrency& max_concurrency);
protected:
// Assume int32_t is atomic in x86
int32_t _max_concurrency;
};
inline Extension<const ConcurrencyLimiter>* ConcurrencyLimiterExtension() {
return Extension<const ConcurrencyLimiter>::instance();
}
} // namespace brpc
#endif // BRPC_CONCURRENCY_LIMITER_H
...@@ -115,8 +115,8 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); ...@@ -115,8 +115,8 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
// << Flags >> // << Flags >>
static const uint32_t FLAGS_IGNORE_EOVERCROWDED = 1; static const uint32_t FLAGS_IGNORE_EOVERCROWDED = 1;
static const uint32_t FLAGS_SECURITY_MODE = (1 << 1); static const uint32_t FLAGS_SECURITY_MODE = (1 << 1);
// Incremented Server._concurrency // Called Server._cl->OnRequested()
static const uint32_t FLAGS_ADDED_CONCURRENCY = (1 << 2); static const uint32_t FLAGS_CONCURRENCY_LIMITER_REQUESTED = (1 << 2);
static const uint32_t FLAGS_READ_PROGRESSIVELY = (1 << 3); static const uint32_t FLAGS_READ_PROGRESSIVELY = (1 << 3);
static const uint32_t FLAGS_PROGRESSIVE_READER = (1 << 4); static const uint32_t FLAGS_PROGRESSIVE_READER = (1 << 4);
static const uint32_t FLAGS_BACKUP_REQUEST = (1 << 5); static const uint32_t FLAGS_BACKUP_REQUEST = (1 << 5);
......
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
#include <limits> #include <limits>
#include "butil/macros.h" #include "butil/macros.h"
#include "brpc/controller.h"
#include "brpc/details/server_private_accessor.h"
#include "brpc/details/method_status.h" #include "brpc/details/method_status.h"
namespace brpc { namespace brpc {
...@@ -25,24 +27,35 @@ static int cast_nprocessing(void* arg) { ...@@ -25,24 +27,35 @@ static int cast_nprocessing(void* arg) {
} }
MethodStatus::MethodStatus() MethodStatus::MethodStatus()
: _max_concurrency(0) : _cl(NULL)
, _nprocessing_bvar(cast_nprocessing, &_nprocessing) , _nprocessing_bvar(cast_nprocessing, &_nprocessing)
, _nrefused_per_second(&_nrefused_bvar, 1)
, _nprocessing(0) { , _nprocessing(0) {
} }
MethodStatus::~MethodStatus() { MethodStatus::~MethodStatus() {
if (NULL != _cl) {
_cl->Destroy();
_cl = NULL;
}
} }
int MethodStatus::Expose(const butil::StringPiece& prefix) { int MethodStatus::Expose(const butil::StringPiece& prefix) {
if (_nprocessing_bvar.expose_as(prefix, "processing") != 0) { if (_nprocessing_bvar.expose_as(prefix, "processing") != 0) {
return -1; return -1;
} }
if (_nrefused_per_second.expose_as(prefix, "refused_per_second") != 0) {
return -1;
}
if (_nerror.expose_as(prefix, "error") != 0) { if (_nerror.expose_as(prefix, "error") != 0) {
return -1; return -1;
} }
if (_latency_rec.expose(prefix) != 0) { if (_latency_rec.expose(prefix) != 0) {
return -1; return -1;
} }
if (NULL != _cl && _cl->Expose(prefix) != 0) {
return -1;
}
return 0; return 0;
} }
...@@ -114,4 +127,19 @@ void MethodStatus::Describe( ...@@ -114,4 +127,19 @@ void MethodStatus::Describe(
_nprocessing, options, false); _nprocessing, options, false);
} }
void MethodStatus::SetConcurrencyLimiter(ConcurrencyLimiter* cl) {
if (NULL != _cl) {
_cl->Destroy();
}
_cl = cl;
}
ScopedMethodStatus::~ScopedMethodStatus() {
if (_status) {
_status->OnResponded(_c->ErrorCode(), butil::cpuwide_time_us() - _received_us);
_status = NULL;
}
ServerPrivateAccessor(_c->server()).RemoveConcurrency(_c);
}
} // namespace brpc } // namespace brpc
...@@ -20,10 +20,13 @@ ...@@ -20,10 +20,13 @@
#include "butil/macros.h" // DISALLOW_COPY_AND_ASSIGN #include "butil/macros.h" // DISALLOW_COPY_AND_ASSIGN
#include "bvar/bvar.h" // vars #include "bvar/bvar.h" // vars
#include "brpc/describable.h" #include "brpc/describable.h"
#include "brpc/concurrency_limiter.h"
namespace brpc { namespace brpc {
class Controller;
class Server;
// Record accessing stats of a method. // Record accessing stats of a method.
class MethodStatus : public Describable { class MethodStatus : public Describable {
public: public:
...@@ -36,12 +39,12 @@ public: ...@@ -36,12 +39,12 @@ public:
bool OnRequested(); bool OnRequested();
// Call this when the method just finished. // Call this when the method just finished.
// `success' : successful call or not. // `error_code' : The error code obtained from the controller. Equal to
// 0 when the call is successful.
// `latency_us' : microseconds taken by a successful call. Latency can // `latency_us' : microseconds taken by a successful call. Latency can
// be measured in this utility class as well, but the callsite often // be measured in this utility class as well, but the callsite often
// did the time keeping and the cost is better saved. If `success' is // did the time keeping and the cost is better saved.
// false, `latency_us' is not used. void OnResponded(int error_code, int64_t latency_us);
void OnResponded(bool success, int64_t latency_us);
// Expose internal vars. // Expose internal vars.
// Return 0 on success, -1 otherwise. // Return 0 on success, -1 otherwise.
...@@ -50,65 +53,72 @@ public: ...@@ -50,65 +53,72 @@ public:
// Describe internal vars, used by /status // Describe internal vars, used by /status
void Describe(std::ostream &os, const DescribeOptions&) const; void Describe(std::ostream &os, const DescribeOptions&) const;
int max_concurrency() const { return _max_concurrency; } // Current maximum concurrency of method.
int& max_concurrency() { return _max_concurrency; } // Return 0 if the maximum concurrency is not restricted.
int max_concurrency() const {
if (NULL == _cl) {
return 0;
} else {
return _cl->max_concurrency();
}
}
private: private:
friend class ScopedMethodStatus; friend class ScopedMethodStatus;
friend class Server;
DISALLOW_COPY_AND_ASSIGN(MethodStatus); DISALLOW_COPY_AND_ASSIGN(MethodStatus);
void OnError();
int _max_concurrency; // Note: SetConcurrencyLimiter() is not thread safe and can only be called
bvar::Adder<int64_t> _nerror; // before the server is started.
bvar::LatencyRecorder _latency_rec; void SetConcurrencyLimiter(ConcurrencyLimiter* cl);
bvar::PassiveStatus<int> _nprocessing_bvar;
ConcurrencyLimiter* _cl;
bvar::Adder<int64_t> _nerror;
bvar::LatencyRecorder _latency_rec;
bvar::PassiveStatus<int> _nprocessing_bvar;
bvar::Adder<uint32_t> _nrefused_bvar;
bvar::Window<bvar::Adder<uint32_t>> _nrefused_per_second;
butil::atomic<int> BAIDU_CACHELINE_ALIGNMENT _nprocessing; butil::atomic<int> BAIDU_CACHELINE_ALIGNMENT _nprocessing;
}; };
// If release() is not called before destruction of this object,
// an error will be counted.
class ScopedMethodStatus { class ScopedMethodStatus {
public: public:
ScopedMethodStatus(MethodStatus* status) : _status(status) {} ScopedMethodStatus(MethodStatus* status,
~ScopedMethodStatus() { Controller* c,
if (_status) { int64_t received_us)
_status->OnError(); : _status(status)
_status = NULL; , _c(c)
} , _received_us(received_us) {}
} ~ScopedMethodStatus();
MethodStatus* release() {
MethodStatus* tmp = _status;
_status = NULL;
return tmp;
}
operator MethodStatus* () const { return _status; } operator MethodStatus* () const { return _status; }
private: private:
DISALLOW_COPY_AND_ASSIGN(ScopedMethodStatus); DISALLOW_COPY_AND_ASSIGN(ScopedMethodStatus);
MethodStatus* _status; MethodStatus* _status;
Controller* _c;
uint64_t _received_us;
}; };
inline bool MethodStatus::OnRequested() { inline bool MethodStatus::OnRequested() {
const int last_nproc = _nprocessing.fetch_add(1, butil::memory_order_relaxed); _nprocessing.fetch_add(1, butil::memory_order_relaxed);
// _max_concurrency may be changed by user at any time. if (NULL == _cl || _cl->OnRequested()) {
const int saved_max_concurrency = _max_concurrency; return true;
return (saved_max_concurrency <= 0 || last_nproc < saved_max_concurrency); }
_nrefused_bvar << 1;
return false;
} }
inline void MethodStatus::OnResponded(bool success, int64_t latency) { inline void MethodStatus::OnResponded(int error_code, int64_t latency) {
if (success) { _nprocessing.fetch_sub(1, butil::memory_order_relaxed);
if (0 == error_code) {
_latency_rec << latency; _latency_rec << latency;
_nprocessing.fetch_sub(1, butil::memory_order_relaxed);
} else { } else {
OnError(); _nerror << 1;
}
if (NULL != _cl) {
_cl->OnResponded(error_code, latency);
} }
}
inline void MethodStatus::OnError() {
_nerror << 1;
_nprocessing.fetch_sub(1, butil::memory_order_relaxed);
} }
} // namespace brpc } // namespace brpc
#endif //BRPC_METHOD_STATUS_H #endif //BRPC_METHOD_STATUS_H
...@@ -38,25 +38,19 @@ public: ...@@ -38,25 +38,19 @@ public:
_server->_nerror << 1; _server->_nerror << 1;
} }
// Returns true iff the `max_concurrency' limit is not reached. // Returns true if the `max_concurrency' limit is not reached.
bool AddConcurrency(Controller* c) { bool AddConcurrency(Controller* c) {
if (_server->options().max_concurrency <= 0) { if (NULL != _server->_cl) {
return true; c->add_flag(Controller::FLAGS_CONCURRENCY_LIMITER_REQUESTED);
} return _server->_cl->OnRequested();
if (butil::subtle::NoBarrier_AtomicIncrement(&_server->_concurrency, 1) }
<= _server->options().max_concurrency) { return true;
c->add_flag(Controller::FLAGS_ADDED_CONCURRENCY);
return true;
}
butil::subtle::NoBarrier_AtomicIncrement(&_server->_concurrency, -1);
return false;
} }
// Remove the increment of AddConcurrency(). Must not be called when
// AddConcurrency() returned false.
void RemoveConcurrency(const Controller* c) { void RemoveConcurrency(const Controller* c) {
if (c->has_flag(Controller::FLAGS_ADDED_CONCURRENCY)) { if (c->has_flag(Controller::FLAGS_CONCURRENCY_LIMITER_REQUESTED)){
butil::subtle::NoBarrier_AtomicIncrement(&_server->_concurrency, -1); CHECK(_server->_cl != NULL);
_server->_cl->OnResponded(c->ErrorCode(), c->latency_us());
} }
} }
...@@ -128,20 +122,6 @@ private: ...@@ -128,20 +122,6 @@ private:
const Server* _server; const Server* _server;
}; };
class ScopedRemoveConcurrency {
public:
ScopedRemoveConcurrency(const Server* server, const Controller* c)
: _server(server), _cntl(c) {}
~ScopedRemoveConcurrency() {
ServerPrivateAccessor(_server).RemoveConcurrency(_cntl);
}
private:
DISALLOW_COPY_AND_ASSIGN(ScopedRemoveConcurrency);
const Server* _server;
const Controller* _cntl;
};
} // namespace brpc } // namespace brpc
......
...@@ -65,6 +65,11 @@ ...@@ -65,6 +65,11 @@
# include "brpc/policy/thrift_protocol.h" # include "brpc/policy/thrift_protocol.h"
#endif #endif
// Concurrency Limiters
#include "brpc/concurrency_limiter.h"
#include "brpc/policy/auto_concurrency_limiter.h"
#include "brpc/policy/constant_concurrency_limiter.h"
#include "brpc/input_messenger.h" // get_or_new_client_side_messenger #include "brpc/input_messenger.h" // get_or_new_client_side_messenger
#include "brpc/socket_map.h" // SocketMapList #include "brpc/socket_map.h" // SocketMapList
#include "brpc/server.h" #include "brpc/server.h"
...@@ -99,7 +104,6 @@ using namespace policy; ...@@ -99,7 +104,6 @@ using namespace policy;
const char* const DUMMY_SERVER_PORT_FILE = "dummy_server.port"; const char* const DUMMY_SERVER_PORT_FILE = "dummy_server.port";
struct GlobalExtensions { struct GlobalExtensions {
GlobalExtensions() GlobalExtensions()
: ch_mh_lb(MurmurHash32) : ch_mh_lb(MurmurHash32)
...@@ -120,6 +124,9 @@ struct GlobalExtensions { ...@@ -120,6 +124,9 @@ struct GlobalExtensions {
ConsistentHashingLoadBalancer ch_mh_lb; ConsistentHashingLoadBalancer ch_mh_lb;
ConsistentHashingLoadBalancer ch_md5_lb; ConsistentHashingLoadBalancer ch_md5_lb;
DynPartLoadBalancer dynpart_lb; DynPartLoadBalancer dynpart_lb;
AutoConcurrencyLimiter auto_cl;
ConstantConcurrencyLimiter constant_cl;
}; };
static pthread_once_t register_extensions_once = PTHREAD_ONCE_INIT; static pthread_once_t register_extensions_once = PTHREAD_ONCE_INIT;
...@@ -550,6 +557,10 @@ static void GlobalInitializeOrDieImpl() { ...@@ -550,6 +557,10 @@ static void GlobalInitializeOrDieImpl() {
} }
} }
// Concurrency Limiters
ConcurrencyLimiterExtension()->RegisterOrDie("auto", &g_ext->auto_cl);
ConcurrencyLimiterExtension()->RegisterOrDie("constant", &g_ext->constant_cl);
if (FLAGS_usercode_in_pthread) { if (FLAGS_usercode_in_pthread) {
// Optional. If channel/server are initialized before main(), this // Optional. If channel/server are initialized before main(), this
// flag may be false at here even if it will be set to true after // flag may be false at here even if it will be set to true after
......
// Copyright (c) 2014 Baidu, Inc.G
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Authors: Lei He (helei@qiyi.com)
#include <cmath>
#include <gflags/gflags.h>
#include "brpc/errno.pb.h"
#include "brpc/policy/auto_concurrency_limiter.h"
namespace brpc {
namespace policy {
DEFINE_int32(auto_cl_sample_window_size_ms, 1000, "Duration of the sampling window.");
DEFINE_int32(auto_cl_min_sample_count, 100,
"During the duration of the sampling window, if the number of "
"requests collected is less than this value, the sampling window "
"will be discarded.");
DEFINE_int32(auto_cl_max_sample_count, 200,
"During the duration of the sampling window, once the number of "
"requests collected is greater than this value, even if the "
"duration of the window has not ended, the max_concurrency will "
"be updated and a new sampling window will be started.");
DEFINE_double(auto_cl_sampling_interval_ms, 0.1,
"Interval for sampling request in auto concurrency limiter");
DEFINE_int32(auto_cl_initial_max_concurrency, 40,
"Initial max concurrency for grandient concurrency limiter");
DEFINE_int32(auto_cl_noload_latency_remeasure_interval_ms, 50000,
"Interval for remeasurement of noload_latency. In the period of "
"remeasurement of noload_latency will halve max_concurrency.");
DEFINE_double(auto_cl_alpha_factor_for_ema, 0.1,
"The smoothing coefficient used in the calculation of ema, "
"the value range is 0-1. The smaller the value, the smaller "
"the effect of a single sample_window on max_concurrency.");
DEFINE_double(auto_cl_overload_threshold, 0.3,
"Expected ratio of latency fluctuations");
DEFINE_bool(auto_cl_enable_error_punish, true,
"Whether to consider failed requests when calculating maximum concurrency");
DEFINE_double(auto_cl_fail_punish_ratio, 1.0,
"Use the failed requests to punish normal requests. The larger "
"the configuration item, the more aggressive the penalty strategy.");
static int32_t cast_max_concurrency(void* arg) {
return *(int32_t*) arg;
}
AutoConcurrencyLimiter::AutoConcurrencyLimiter()
: _remeasure_start_us(NextResetTime(butil::gettimeofday_us()))
, _reset_latency_us(0)
, _min_latency_us(-1)
, _ema_peak_qps(-1)
, _ema_factor(FLAGS_auto_cl_alpha_factor_for_ema)
, _overload_threshold(FLAGS_auto_cl_overload_threshold)
, _max_concurrency_bvar(cast_max_concurrency, &_max_concurrency)
, _last_sampling_time_us(0)
, _total_succ_req(0)
, _current_concurrency(0) {
_max_concurrency = FLAGS_auto_cl_initial_max_concurrency;
}
int AutoConcurrencyLimiter::Expose(const butil::StringPiece& prefix) {
if (_max_concurrency_bvar.expose_as(prefix, "auto_cl_max_concurrency") != 0) {
return -1;
}
return 0;
}
AutoConcurrencyLimiter* AutoConcurrencyLimiter::New() const {
return new (std::nothrow) AutoConcurrencyLimiter;
}
void AutoConcurrencyLimiter::Destroy() {
delete this;
}
bool AutoConcurrencyLimiter::OnRequested() {
const int32_t current_concurrency =
_current_concurrency.fetch_add(1, butil::memory_order_relaxed);
if (current_concurrency >= _max_concurrency) {
return false;
}
return true;
}
void AutoConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) {
_current_concurrency.fetch_sub(1, butil::memory_order_relaxed);
if (0 == error_code) {
_total_succ_req.fetch_add(1, butil::memory_order_relaxed);
} else if (ELIMIT == error_code) {
return;
}
int64_t now_time_us = butil::gettimeofday_us();
int64_t last_sampling_time_us =
_last_sampling_time_us.load(butil::memory_order_relaxed);
if (last_sampling_time_us == 0 ||
now_time_us - last_sampling_time_us >=
FLAGS_auto_cl_sampling_interval_ms * 1000) {
bool sample_this_call = _last_sampling_time_us.compare_exchange_strong(
last_sampling_time_us, now_time_us, butil::memory_order_relaxed);
if (sample_this_call) {
int32_t max_concurrency = AddSample(error_code, latency_us, now_time_us);
if (max_concurrency != 0) {
LOG_EVERY_N(INFO, 60)
<< "MaxConcurrency updated by auto limiter,"
<< "current_max_concurrency:" << max_concurrency
<< ", min_latency_us: " << _min_latency_us;
}
}
}
}
int64_t AutoConcurrencyLimiter::NextResetTime(int64_t sampling_time_us) {
int64_t reset_start_us = sampling_time_us +
(FLAGS_auto_cl_noload_latency_remeasure_interval_ms / 2 +
butil::fast_rand_less_than(FLAGS_auto_cl_noload_latency_remeasure_interval_ms / 2)) * 1000;
return reset_start_us;
}
int32_t AutoConcurrencyLimiter::AddSample(int error_code,
int64_t latency_us,
int64_t sampling_time_us) {
std::unique_lock<butil::Mutex> lock_guard(_sw_mutex);
// Waiting for the current concurrent decline
if (_reset_latency_us > sampling_time_us) {
return 0;
}
// Remeasure min_latency when concurrency has dropped to low load
if (_reset_latency_us > 0) {
_min_latency_us = -1;
_reset_latency_us = 0;
_remeasure_start_us = NextResetTime(sampling_time_us);
ResetSampleWindow(sampling_time_us);
}
if (_sw.start_time_us == 0) {
_sw.start_time_us = sampling_time_us;
}
if (error_code != 0 && FLAGS_auto_cl_enable_error_punish) {
++_sw.failed_count;
_sw.total_failed_us += latency_us;
} else if (error_code == 0) {
++_sw.succ_count;
_sw.total_succ_us += latency_us;
}
if (_sw.succ_count + _sw.failed_count < FLAGS_auto_cl_min_sample_count) {
if (sampling_time_us - _sw.start_time_us >=
FLAGS_auto_cl_sample_window_size_ms * 1000) {
// If the sample size is insufficient at the end of the sampling
// window, discard the entire sampling window
ResetSampleWindow(sampling_time_us);
}
return 0;
}
if (sampling_time_us - _sw.start_time_us <
FLAGS_auto_cl_sample_window_size_ms * 1000 &&
_sw.succ_count + _sw.failed_count < FLAGS_auto_cl_max_sample_count) {
return 0;
}
if(_sw.succ_count > 0) {
int max_concurrency = UpdateMaxConcurrency(sampling_time_us);
ResetSampleWindow(sampling_time_us);
return max_concurrency;
} else {
// All request failed
_max_concurrency /= 2;
ResetSampleWindow(sampling_time_us);
return 0;
}
}
void AutoConcurrencyLimiter::ResetSampleWindow(int64_t sampling_time_us) {
_sw.start_time_us = sampling_time_us;
_sw.succ_count = 0;
_sw.failed_count = 0;
_sw.total_failed_us = 0;
_sw.total_succ_us = 0;
}
void AutoConcurrencyLimiter::UpdateMinLatency(int64_t latency_us) {
if (_min_latency_us <= 0) {
_min_latency_us = latency_us;
} else if (latency_us < _min_latency_us) {
_min_latency_us = latency_us * _ema_factor + _min_latency_us * (1 - _ema_factor);
}
}
void AutoConcurrencyLimiter::UpdateQps(int32_t succ_count,
int64_t sampling_time_us) {
double qps = 1000000.0 * succ_count / (sampling_time_us - _sw.start_time_us);
if (qps >= _ema_peak_qps) {
_ema_peak_qps = qps;
} else {
_ema_peak_qps =
qps * (_ema_factor / 10) + _ema_peak_qps * (1 - _ema_factor / 10);
}
}
int32_t AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
int32_t total_succ_req =
_total_succ_req.exchange(0, butil::memory_order_relaxed);
double failed_punish =
_sw.total_failed_us * FLAGS_auto_cl_fail_punish_ratio;
int64_t avg_latency =
std::ceil((failed_punish + _sw.total_succ_us) / _sw.succ_count);
UpdateMinLatency(avg_latency);
UpdateQps(total_succ_req, sampling_time_us);
int next_max_concurrency = 0;
// Remeasure min_latency at regular intervals
if (_remeasure_start_us <= sampling_time_us) {
_reset_latency_us = sampling_time_us + avg_latency * 2;
next_max_concurrency = _max_concurrency * 0.75;
} else {
int32_t noload_concurrency =
std::ceil(_min_latency_us * _ema_peak_qps / 1000000);
if (avg_latency < (1.0 + _overload_threshold) * _min_latency_us) {
next_max_concurrency = std::ceil(noload_concurrency *
(2.0 + _overload_threshold - double(avg_latency) / _min_latency_us));
} else {
next_max_concurrency = noload_concurrency;
}
}
if (next_max_concurrency != _max_concurrency) {
_max_concurrency = next_max_concurrency;
}
return next_max_concurrency;
}
} // namespace policy
} // namespace brpc
// Copyright (c) 2014 Baidu, Inc.G
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Authors: Lei He (helei@qiyi.com)
#ifndef BRPC_POLICY_AUTO_CONCURRENCY_LIMITER_H
#define BRPC_POLICY_AUTO_CONCURRENCY_LIMITER_H
#include "bvar/bvar.h"
#include "butil/containers/bounded_queue.h"
#include "brpc/concurrency_limiter.h"
namespace brpc {
namespace policy {
class AutoConcurrencyLimiter : public ConcurrencyLimiter {
public:
AutoConcurrencyLimiter();
~AutoConcurrencyLimiter() {}
bool OnRequested() override;
void OnResponded(int error_code, int64_t latency_us) override;
int Expose(const butil::StringPiece& prefix) override;
AutoConcurrencyLimiter* New() const override;
void Destroy() override;
private:
struct SampleWindow {
SampleWindow()
: start_time_us(0)
, succ_count(0)
, failed_count(0)
, total_failed_us(0)
, total_succ_us(0) {}
int64_t start_time_us;
int32_t succ_count;
int32_t failed_count;
int64_t total_failed_us;
int64_t total_succ_us;
};
int32_t AddSample(int error_code, int64_t latency_us, int64_t sampling_time_us);
int64_t NextResetTime(int64_t sampling_time_us);
// The following methods are not thread safe and can only be called
// in AppSample()
int32_t UpdateMaxConcurrency(int64_t sampling_time_us);
void ResetSampleWindow(int64_t sampling_time_us);
void UpdateMinLatency(int64_t latency_us);
void UpdateQps(int32_t succ_count, int64_t sampling_time_us);
double peak_qps();
SampleWindow _sw;
int64_t _remeasure_start_us;
int64_t _reset_latency_us;
int64_t _min_latency_us;
double _ema_peak_qps;
const double _ema_factor;
const double _overload_threshold;
butil::Mutex _sw_mutex;
bvar::PassiveStatus<int32_t> _max_concurrency_bvar;
butil::atomic<int64_t> BAIDU_CACHELINE_ALIGNMENT _last_sampling_time_us;
butil::atomic<int32_t> _total_succ_req;
butil::atomic<int32_t> _current_concurrency;
};
} // namespace policy
} // namespace brpc
#endif // BRPC_POLICY_AUTO_CONCURRENCY_LIMITER_H
...@@ -146,11 +146,10 @@ void SendRpcResponse(int64_t correlation_id, ...@@ -146,11 +146,10 @@ void SendRpcResponse(int64_t correlation_id,
span->set_start_send_us(butil::cpuwide_time_us()); span->set_start_send_us(butil::cpuwide_time_us());
} }
Socket* sock = accessor.get_sending_socket(); Socket* sock = accessor.get_sending_socket();
ScopedMethodStatus method_status(method_status_raw);
std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl); std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);
ScopedMethodStatus method_status(method_status_raw, cntl, received_us);
std::unique_ptr<const google::protobuf::Message> recycle_req(req); std::unique_ptr<const google::protobuf::Message> recycle_req(req);
std::unique_ptr<const google::protobuf::Message> recycle_res(res); std::unique_ptr<const google::protobuf::Message> recycle_res(res);
ScopedRemoveConcurrency remove_concurrency_dummy(server, cntl);
StreamId response_stream_id = accessor.response_stream(); StreamId response_stream_id = accessor.response_stream();
...@@ -264,10 +263,6 @@ void SendRpcResponse(int64_t correlation_id, ...@@ -264,10 +263,6 @@ void SendRpcResponse(int64_t correlation_id,
// TODO: this is not sent // TODO: this is not sent
span->set_sent_us(butil::cpuwide_time_us()); span->set_sent_us(butil::cpuwide_time_us());
} }
if (method_status) {
method_status.release()->OnResponded(
!cntl->Failed(), butil::cpuwide_time_us() - received_us);
}
} }
struct CallMethodInBackupThreadArgs { struct CallMethodInBackupThreadArgs {
...@@ -395,8 +390,9 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { ...@@ -395,8 +390,9 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
} }
if (!server_accessor.AddConcurrency(cntl.get())) { if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d", cntl->SetFailed(
server->options().max_concurrency); ELIMIT, "Reached server's max_concurrency=%d",
server->max_concurrency());
break; break;
} }
......
// Copyright (c) 2014 Baidu, Inc.G
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Authors: Lei He (helei@qiyi.com)
#include "brpc/policy/constant_concurrency_limiter.h"
namespace brpc {
namespace policy {
bool ConstantConcurrencyLimiter::OnRequested() {
const int32_t current_concurrency =
_current_concurrency.fetch_add(1, butil::memory_order_relaxed);
if (_max_concurrency != 0 && current_concurrency >= _max_concurrency) {
return false;
}
return true;
}
void ConstantConcurrencyLimiter::OnResponded(int error_code, int64_t latency) {
_current_concurrency.fetch_sub(1, butil::memory_order_relaxed);
}
int ConstantConcurrencyLimiter::Expose(const butil::StringPiece& prefix) {
return 0;
}
ConstantConcurrencyLimiter* ConstantConcurrencyLimiter::New() const {
return new (std::nothrow) ConstantConcurrencyLimiter;
}
void ConstantConcurrencyLimiter::Destroy() {
delete this;
}
} // namespace policy
} // namespace brpc
// Copyright (c) 2014 Baidu, Inc.G
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Authors: Lei He (helei@qiyi.com)
#ifndef BRPC_POLICY_CONSTANT_CONCURRENCY_LIMITER_H
#define BRPC_POLICY_CONSTANT_CONCURRENCY_LIMITER_H
#include "brpc/concurrency_limiter.h"
namespace brpc {
namespace policy {
class ConstantConcurrencyLimiter : public ConcurrencyLimiter {
public:
ConstantConcurrencyLimiter() : _current_concurrency(0) {}
~ConstantConcurrencyLimiter() {}
bool OnRequested() override;
void OnResponded(int error_code, int64_t latency_us) override;
int Expose(const butil::StringPiece& prefix) override;
ConstantConcurrencyLimiter* New() const override;
void Destroy() override;
private:
butil::atomic<int32_t> _current_concurrency;
};
} // namespace policy
} // namespace brpc
#endif // BRPC_POLICY_CONSTANT_CONCURRENCY_LIMITER_H
...@@ -557,12 +557,11 @@ static void SendHttpResponse(Controller *cntl, ...@@ -557,12 +557,11 @@ static void SendHttpResponse(Controller *cntl,
if (span) { if (span) {
span->set_start_send_us(butil::cpuwide_time_us()); span->set_start_send_us(butil::cpuwide_time_us());
} }
ScopedMethodStatus method_status(method_status_raw);
std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl); std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);
ScopedMethodStatus method_status(method_status_raw,cntl, received_us);
std::unique_ptr<const google::protobuf::Message> recycle_req(req); std::unique_ptr<const google::protobuf::Message> recycle_req(req);
std::unique_ptr<const google::protobuf::Message> recycle_res(res); std::unique_ptr<const google::protobuf::Message> recycle_res(res);
Socket* socket = accessor.get_sending_socket(); Socket* socket = accessor.get_sending_socket();
ScopedRemoveConcurrency remove_concurrency_dummy(server, cntl);
if (cntl->IsCloseConnection()) { if (cntl->IsCloseConnection()) {
socket->SetFailed(); socket->SetFailed();
...@@ -727,10 +726,6 @@ static void SendHttpResponse(Controller *cntl, ...@@ -727,10 +726,6 @@ static void SendHttpResponse(Controller *cntl,
// TODO: this is not sent // TODO: this is not sent
span->set_sent_us(butil::cpuwide_time_us()); span->set_sent_us(butil::cpuwide_time_us());
} }
if (method_status) {
method_status.release()->OnResponded(
!cntl->Failed(), butil::cpuwide_time_us() - received_us);
}
} }
inline void SendHttpResponse(Controller *cntl, const Server* svr, inline void SendHttpResponse(Controller *cntl, const Server* svr,
...@@ -1192,7 +1187,7 @@ void ProcessHttpRequest(InputMessageBase *msg) { ...@@ -1192,7 +1187,7 @@ void ProcessHttpRequest(InputMessageBase *msg) {
} }
if (!server_accessor.AddConcurrency(cntl.get())) { if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d", cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
server->options().max_concurrency); server->max_concurrency());
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us()); return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
} }
if (FLAGS_usercode_in_pthread && TooManyUserCode()) { if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
......
...@@ -231,12 +231,11 @@ static void SendHuluResponse(int64_t correlation_id, ...@@ -231,12 +231,11 @@ static void SendHuluResponse(int64_t correlation_id,
if (span) { if (span) {
span->set_start_send_us(butil::cpuwide_time_us()); span->set_start_send_us(butil::cpuwide_time_us());
} }
ScopedMethodStatus method_status(method_status_raw);
Socket* sock = accessor.get_sending_socket(); Socket* sock = accessor.get_sending_socket();
std::unique_ptr<HuluController, LogErrorTextAndDelete> recycle_cntl(cntl); std::unique_ptr<HuluController, LogErrorTextAndDelete> recycle_cntl(cntl);
ScopedMethodStatus method_status(method_status_raw, cntl, received_us);
std::unique_ptr<const google::protobuf::Message> recycle_req(req); std::unique_ptr<const google::protobuf::Message> recycle_req(req);
std::unique_ptr<const google::protobuf::Message> recycle_res(res); std::unique_ptr<const google::protobuf::Message> recycle_res(res);
ScopedRemoveConcurrency remove_concurrency_dummy(server, cntl);
if (cntl->IsCloseConnection()) { if (cntl->IsCloseConnection()) {
sock->SetFailed(); sock->SetFailed();
...@@ -318,10 +317,6 @@ static void SendHuluResponse(int64_t correlation_id, ...@@ -318,10 +317,6 @@ static void SendHuluResponse(int64_t correlation_id,
// TODO: this is not sent // TODO: this is not sent
span->set_sent_us(butil::cpuwide_time_us()); span->set_sent_us(butil::cpuwide_time_us());
} }
if (method_status) {
method_status.release()->OnResponded(
!cntl->Failed(), butil::cpuwide_time_us() - received_us);
}
} }
// Defined in baidu_rpc_protocol.cpp // Defined in baidu_rpc_protocol.cpp
...@@ -429,7 +424,7 @@ void ProcessHuluRequest(InputMessageBase* msg_base) { ...@@ -429,7 +424,7 @@ void ProcessHuluRequest(InputMessageBase* msg_base) {
if (!server_accessor.AddConcurrency(cntl.get())) { if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d", cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
server->options().max_concurrency); server->max_concurrency());
break; break;
} }
if (FLAGS_usercode_in_pthread && TooManyUserCode()) { if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
......
...@@ -60,7 +60,7 @@ SendMongoResponse::~SendMongoResponse() { ...@@ -60,7 +60,7 @@ SendMongoResponse::~SendMongoResponse() {
void SendMongoResponse::Run() { void SendMongoResponse::Run() {
std::unique_ptr<SendMongoResponse> delete_self(this); std::unique_ptr<SendMongoResponse> delete_self(this);
ScopedMethodStatus method_status(status); ScopedMethodStatus method_status(status, &cntl, received_us);
Socket* socket = ControllerPrivateAccessor(&cntl).get_sending_socket(); Socket* socket = ControllerPrivateAccessor(&cntl).get_sending_socket();
if (cntl.IsCloseConnection()) { if (cntl.IsCloseConnection()) {
...@@ -102,10 +102,6 @@ void SendMongoResponse::Run() { ...@@ -102,10 +102,6 @@ void SendMongoResponse::Run() {
return; return;
} }
} }
if (method_status) {
method_status.release()->OnResponded(
!cntl.Failed(), butil::cpuwide_time_us() - received_us);
}
} }
ParseResult ParseMongoMessage(butil::IOBuf* source, ParseResult ParseMongoMessage(butil::IOBuf* source,
...@@ -224,8 +220,9 @@ void ProcessMongoRequest(InputMessageBase* msg_base) { ...@@ -224,8 +220,9 @@ void ProcessMongoRequest(InputMessageBase* msg_base) {
} }
if (!ServerPrivateAccessor(server).AddConcurrency(&(mongo_done->cntl))) { if (!ServerPrivateAccessor(server).AddConcurrency(&(mongo_done->cntl))) {
mongo_done->cntl.SetFailed(ELIMIT, "Reached server's max_concurrency=%d", mongo_done->cntl.SetFailed(
server->options().max_concurrency); ELIMIT, "Reached server's max_concurrency=%d",
server->max_concurrency());
break; break;
} }
if (FLAGS_usercode_in_pthread && TooManyUserCode()) { if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
......
...@@ -64,7 +64,6 @@ public: ...@@ -64,7 +64,6 @@ public:
void NsheadClosure::Run() { void NsheadClosure::Run() {
// Recycle itself after `Run' // Recycle itself after `Run'
std::unique_ptr<NsheadClosure, DeleteNsheadClosure> recycle_ctx(this); std::unique_ptr<NsheadClosure, DeleteNsheadClosure> recycle_ctx(this);
ScopedRemoveConcurrency remove_concurrency_dummy(_server, &_controller);
ControllerPrivateAccessor accessor(&_controller); ControllerPrivateAccessor accessor(&_controller);
Span* span = accessor.span(); Span* span = accessor.span();
...@@ -72,7 +71,8 @@ void NsheadClosure::Run() { ...@@ -72,7 +71,8 @@ void NsheadClosure::Run() {
span->set_start_send_us(butil::cpuwide_time_us()); span->set_start_send_us(butil::cpuwide_time_us());
} }
Socket* sock = accessor.get_sending_socket(); Socket* sock = accessor.get_sending_socket();
ScopedMethodStatus method_status(_server->options().nshead_service->_status); ScopedMethodStatus method_status(_server->options().nshead_service->_status,
&_controller, _received_us);
if (!method_status) { if (!method_status) {
// Judge errors belongings. // Judge errors belongings.
// may not be accurate, but it does not matter too much. // may not be accurate, but it does not matter too much.
...@@ -123,10 +123,6 @@ void NsheadClosure::Run() { ...@@ -123,10 +123,6 @@ void NsheadClosure::Run() {
// TODO: this is not sent // TODO: this is not sent
span->set_sent_us(butil::cpuwide_time_us()); span->set_sent_us(butil::cpuwide_time_us());
} }
if (method_status) {
method_status.release()->OnResponded(
!_controller.Failed(), butil::cpuwide_time_us() - _received_us);
}
} }
void NsheadClosure::SetMethodName(const std::string& full_method_name) { void NsheadClosure::SetMethodName(const std::string& full_method_name) {
...@@ -296,8 +292,9 @@ void ProcessNsheadRequest(InputMessageBase* msg_base) { ...@@ -296,8 +292,9 @@ void ProcessNsheadRequest(InputMessageBase* msg_base) {
break; break;
} }
if (!server_accessor.AddConcurrency(cntl)) { if (!server_accessor.AddConcurrency(cntl)) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d", cntl->SetFailed(
server->options().max_concurrency); ELIMIT, "Reached server's max_concurrency=%d",
server->max_concurrency());
break; break;
} }
if (FLAGS_usercode_in_pthread && TooManyUserCode()) { if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
......
...@@ -215,12 +215,11 @@ static void SendSofaResponse(int64_t correlation_id, ...@@ -215,12 +215,11 @@ static void SendSofaResponse(int64_t correlation_id,
if (span) { if (span) {
span->set_start_send_us(butil::cpuwide_time_us()); span->set_start_send_us(butil::cpuwide_time_us());
} }
ScopedMethodStatus method_status(method_status_raw);
Socket* sock = accessor.get_sending_socket(); Socket* sock = accessor.get_sending_socket();
std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl); std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);
ScopedMethodStatus method_status(method_status_raw, cntl, received_us);
std::unique_ptr<const google::protobuf::Message> recycle_req(req); std::unique_ptr<const google::protobuf::Message> recycle_req(req);
std::unique_ptr<const google::protobuf::Message> recycle_res(res); std::unique_ptr<const google::protobuf::Message> recycle_res(res);
ScopedRemoveConcurrency remove_concurrency_dummy(server, cntl);
if (cntl->IsCloseConnection()) { if (cntl->IsCloseConnection()) {
sock->SetFailed(); sock->SetFailed();
...@@ -294,10 +293,6 @@ static void SendSofaResponse(int64_t correlation_id, ...@@ -294,10 +293,6 @@ static void SendSofaResponse(int64_t correlation_id,
// TODO: this is not sent // TODO: this is not sent
span->set_sent_us(butil::cpuwide_time_us()); span->set_sent_us(butil::cpuwide_time_us());
} }
if (method_status) {
method_status.release()->OnResponded(
!cntl->Failed(), butil::cpuwide_time_us() - received_us);
}
} }
// Defined in baidu_rpc_protocol.cpp // Defined in baidu_rpc_protocol.cpp
...@@ -391,8 +386,9 @@ void ProcessSofaRequest(InputMessageBase* msg_base) { ...@@ -391,8 +386,9 @@ void ProcessSofaRequest(InputMessageBase* msg_base) {
} }
if (!server_accessor.AddConcurrency(cntl.get())) { if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d", cntl->SetFailed(
server->options().max_concurrency); ELIMIT, "Reached server's max_concurrency=%d",
server->max_concurrency());
break; break;
} }
if (FLAGS_usercode_in_pthread && TooManyUserCode()) { if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
......
...@@ -225,7 +225,6 @@ void ThriftClosure::DoRun() { ...@@ -225,7 +225,6 @@ void ThriftClosure::DoRun() {
// Recycle itself after `Run' // Recycle itself after `Run'
std::unique_ptr<ThriftClosure> recycle_ctx(this); std::unique_ptr<ThriftClosure> recycle_ctx(this);
const Server* server = _controller.server(); const Server* server = _controller.server();
ScopedRemoveConcurrency remove_concurrency_dummy(server, &_controller);
ControllerPrivateAccessor accessor(&_controller); ControllerPrivateAccessor accessor(&_controller);
Span* span = accessor.span(); Span* span = accessor.span();
...@@ -233,8 +232,10 @@ void ThriftClosure::DoRun() { ...@@ -233,8 +232,10 @@ void ThriftClosure::DoRun() {
span->set_start_send_us(butil::cpuwide_time_us()); span->set_start_send_us(butil::cpuwide_time_us());
} }
Socket* sock = accessor.get_sending_socket(); Socket* sock = accessor.get_sending_socket();
ScopedMethodStatus method_status(server->options().thrift_service ? ScopedMethodStatus method_status(
server->options().thrift_service->_status : NULL); server->options().thrift_service ?
server->options().thrift_service->_status : NULL,
&_controller, _received_us);
if (!method_status) { if (!method_status) {
// Judge errors belongings. // Judge errors belongings.
// may not be accurate, but it does not matter too much. // may not be accurate, but it does not matter too much.
...@@ -348,10 +349,6 @@ void ThriftClosure::DoRun() { ...@@ -348,10 +349,6 @@ void ThriftClosure::DoRun() {
// TODO: this is not sent // TODO: this is not sent
span->set_sent_us(butil::cpuwide_time_us()); span->set_sent_us(butil::cpuwide_time_us());
} }
if (method_status) {
method_status.release()->OnResponded(
!_controller.Failed(), butil::cpuwide_time_us() - _received_us);
}
} }
ParseResult ParseThriftMessage(butil::IOBuf* source, ParseResult ParseThriftMessage(butil::IOBuf* source,
...@@ -526,7 +523,7 @@ void ProcessThriftRequest(InputMessageBase* msg_base) { ...@@ -526,7 +523,7 @@ void ProcessThriftRequest(InputMessageBase* msg_base) {
} }
if (!server_accessor.AddConcurrency(cntl)) { if (!server_accessor.AddConcurrency(cntl)) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d", cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
server->options().max_concurrency); server->max_concurrency());
break; break;
} }
if (FLAGS_usercode_in_pthread && TooManyUserCode()) { if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
......
...@@ -382,9 +382,7 @@ Server::Server(ProfilerLinker) ...@@ -382,9 +382,7 @@ Server::Server(ProfilerLinker)
, _last_start_time(0) , _last_start_time(0)
, _derivative_thread(INVALID_BTHREAD) , _derivative_thread(INVALID_BTHREAD)
, _keytable_pool(NULL) , _keytable_pool(NULL)
, _concurrency(0) { , _cl(NULL) {
BAIDU_CASSERT(offsetof(Server, _concurrency) % 64 == 0,
Server_concurrency_must_be_aligned_by_cacheline);
} }
Server::~Server() { Server::~Server() {
...@@ -425,6 +423,10 @@ Server::~Server() { ...@@ -425,6 +423,10 @@ Server::~Server() {
delete _options.auth; delete _options.auth;
_options.auth = NULL; _options.auth = NULL;
} }
if (_cl) {
_cl->Destroy();
_cl = NULL;
}
} }
int Server::AddBuiltinServices() { int Server::AddBuiltinServices() {
...@@ -662,6 +664,8 @@ static int get_port_from_fd(int fd) { ...@@ -662,6 +664,8 @@ static int get_port_from_fd(int fd) {
return ntohs(addr.sin_port); return ntohs(addr.sin_port);
} }
static AdaptiveMaxConcurrency g_default_max_concurrency_of_method = 0;
int Server::StartInternal(const butil::ip_t& ip, int Server::StartInternal(const butil::ip_t& ip,
const PortRange& port_range, const PortRange& port_range,
const ServerOptions *opt) { const ServerOptions *opt) {
...@@ -832,8 +836,6 @@ int Server::StartInternal(const butil::ip_t& ip, ...@@ -832,8 +836,6 @@ int Server::StartInternal(const butil::ip_t& ip,
} }
} }
_concurrency = 0;
if (_options.has_builtin_services && if (_options.has_builtin_services &&
_builtin_service_count <= 0 && _builtin_service_count <= 0 &&
AddBuiltinServices() != 0) { AddBuiltinServices() != 0) {
...@@ -870,6 +872,31 @@ int Server::StartInternal(const butil::ip_t& ip, ...@@ -870,6 +872,31 @@ int Server::StartInternal(const butil::ip_t& ip,
bthread_setconcurrency(_options.num_threads); bthread_setconcurrency(_options.num_threads);
} }
if (NULL != _cl) {
_cl->Destroy();
_cl = NULL;
}
if (_options.max_concurrency != "constant" ||
static_cast<int>(_options.max_concurrency) != 0) {
_cl = ConcurrencyLimiter::CreateConcurrencyLimiterOrDie(
_options.max_concurrency);
_cl->Expose("Server_Concurrency_Limiter");
}
for (MethodMap::iterator it = _method_map.begin();
it != _method_map.end(); ++it) {
if (it->second.is_builtin_service) {
it->second.status->SetConcurrencyLimiter(NULL);
} else if (it->second.max_concurrency == "constant" &&
static_cast<int>(it->second.max_concurrency) == 0) {
it->second.status->SetConcurrencyLimiter(NULL);
} else {
it->second.status->SetConcurrencyLimiter(
ConcurrencyLimiter::CreateConcurrencyLimiterOrDie(
it->second.max_concurrency));
}
}
// Create listening ports // Create listening ports
if (port_range.min_port > port_range.max_port) { if (port_range.min_port > port_range.max_port) {
LOG(ERROR) << "Invalid port_range=[" << port_range.min_port << '-' LOG(ERROR) << "Invalid port_range=[" << port_range.min_port << '-'
...@@ -1959,35 +1986,44 @@ bool Server::ClearCertMapping(CertMaps& bg) { ...@@ -1959,35 +1986,44 @@ bool Server::ClearCertMapping(CertMaps& bg) {
} }
int Server::ResetMaxConcurrency(int max_concurrency) { int Server::ResetMaxConcurrency(int max_concurrency) {
if (!IsRunning()) { LOG(WARNING) << "ResetMaxConcurrency is already deprecated";
LOG(WARNING) << "ResetMaxConcurrency is only allowd for a Running Server";
return -1;
}
// Assume that modifying int32 is atomical in X86
_options.max_concurrency = max_concurrency;
return 0; return 0;
} }
static int g_default_max_concurrency_of_method = 0; int Server::max_concurrency() const {
if (NULL != _cl) {
return _cl->max_concurrency();
} else {
return g_default_max_concurrency_of_method;
}
}
int& Server::MaxConcurrencyOf(MethodProperty* mp) { AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(MethodProperty* mp) {
if (IsRunning()) {
LOG(WARNING) << "MaxConcurrencyOf is only allowd before Server started";
return g_default_max_concurrency_of_method;
}
if (mp->status == NULL) { if (mp->status == NULL) {
LOG(ERROR) << "method=" << mp->method->full_name() LOG(ERROR) << "method=" << mp->method->full_name()
<< " does not support max_concurrency"; << " does not support max_concurrency";
_failed_to_set_max_concurrency_of_method = true; _failed_to_set_max_concurrency_of_method = true;
return g_default_max_concurrency_of_method; return g_default_max_concurrency_of_method;
} }
return mp->status->max_concurrency(); return mp->max_concurrency;
} }
int Server::MaxConcurrencyOf(const MethodProperty* mp) const { int Server::MaxConcurrencyOf(const MethodProperty* mp) const {
if (IsRunning()) {
LOG(WARNING) << "MaxConcurrencyOf is only allowd before Server started";
return g_default_max_concurrency_of_method;
}
if (mp == NULL || mp->status == NULL) { if (mp == NULL || mp->status == NULL) {
return 0; return 0;
} }
return mp->status->max_concurrency(); return mp->max_concurrency;
} }
int& Server::MaxConcurrencyOf(const butil::StringPiece& full_method_name) { AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(const butil::StringPiece& full_method_name) {
MethodProperty* mp = _method_map.seek(full_method_name); MethodProperty* mp = _method_map.seek(full_method_name);
if (mp == NULL) { if (mp == NULL) {
LOG(ERROR) << "Fail to find method=" << full_method_name; LOG(ERROR) << "Fail to find method=" << full_method_name;
...@@ -2001,7 +2037,7 @@ int Server::MaxConcurrencyOf(const butil::StringPiece& full_method_name) const { ...@@ -2001,7 +2037,7 @@ int Server::MaxConcurrencyOf(const butil::StringPiece& full_method_name) const {
return MaxConcurrencyOf(_method_map.seek(full_method_name)); return MaxConcurrencyOf(_method_map.seek(full_method_name));
} }
int& Server::MaxConcurrencyOf(const butil::StringPiece& full_service_name, AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(const butil::StringPiece& full_service_name,
const butil::StringPiece& method_name) { const butil::StringPiece& method_name) {
MethodProperty* mp = const_cast<MethodProperty*>( MethodProperty* mp = const_cast<MethodProperty*>(
FindMethodPropertyByFullName(full_service_name, method_name)); FindMethodPropertyByFullName(full_service_name, method_name));
...@@ -2020,7 +2056,7 @@ int Server::MaxConcurrencyOf(const butil::StringPiece& full_service_name, ...@@ -2020,7 +2056,7 @@ int Server::MaxConcurrencyOf(const butil::StringPiece& full_service_name,
full_service_name, method_name)); full_service_name, method_name));
} }
int& Server::MaxConcurrencyOf(google::protobuf::Service* service, AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(google::protobuf::Service* service,
const butil::StringPiece& method_name) { const butil::StringPiece& method_name) {
return MaxConcurrencyOf(service->GetDescriptor()->full_name(), method_name); return MaxConcurrencyOf(service->GetDescriptor()->full_name(), method_name);
} }
...@@ -2072,4 +2108,4 @@ int Server::SSLSwitchCTXByHostname(struct ssl_st* ssl, ...@@ -2072,4 +2108,4 @@ int Server::SSLSwitchCTXByHostname(struct ssl_st* ssl,
} }
#endif // SSL_CTRL_SET_TLSEXT_HOSTNAME #endif // SSL_CTRL_SET_TLSEXT_HOSTNAME
} // namespace brpc } // namespace brpc
...@@ -36,6 +36,8 @@ ...@@ -36,6 +36,8 @@
#include "brpc/builtin/tabbed.h" #include "brpc/builtin/tabbed.h"
#include "brpc/details/profiler_linker.h" #include "brpc/details/profiler_linker.h"
#include "brpc/health_reporter.h" #include "brpc/health_reporter.h"
#include "brpc/concurrency_limiter.h"
#include "brpc/adaptive_max_concurrency.h"
extern "C" { extern "C" {
struct ssl_ctx_st; struct ssl_ctx_st;
...@@ -114,7 +116,12 @@ struct ServerOptions { ...@@ -114,7 +116,12 @@ struct ServerOptions {
// shall try another server. // shall try another server.
// NOTE: accesses to builtin services are not limited by this option. // NOTE: accesses to builtin services are not limited by this option.
// Default: 0 (unlimited) // Default: 0 (unlimited)
int max_concurrency; // NOTE: Once you have chosen the automatic concurrency limit strategy, brpc
// ONLY limits concurrency at the method level, And each method will use
// the strategy you set in ServerOptions to limit the maximum concurrency,
// even if you have set a maximum concurrency through `MaxConcurrencyOf`.
AdaptiveMaxConcurrency max_concurrency;
// ------------------------------------------------------- // -------------------------------------------------------
// Differences between session-local and thread-local data // Differences between session-local and thread-local data
...@@ -327,6 +334,7 @@ public: ...@@ -327,6 +334,7 @@ public:
google::protobuf::Service* service; google::protobuf::Service* service;
const google::protobuf::MethodDescriptor* method; const google::protobuf::MethodDescriptor* method;
MethodStatus* status; MethodStatus* status;
AdaptiveMaxConcurrency max_concurrency;
MethodProperty(); MethodProperty();
}; };
...@@ -476,27 +484,32 @@ public: ...@@ -476,27 +484,32 @@ public:
// current_tab_name is the tab highlighted. // current_tab_name is the tab highlighted.
void PrintTabsBody(std::ostream& os, const char* current_tab_name) const; void PrintTabsBody(std::ostream& os, const char* current_tab_name) const;
// Reset the max_concurrency set by ServerOptions.max_concurrency after
// Server is started. // This method is already deprecated.You should NOT call it anymore.
// The concurrency will be limited by the new value if this function is
// successfully returned.
// Returns 0 on success, -1 otherwise.
int ResetMaxConcurrency(int max_concurrency); int ResetMaxConcurrency(int max_concurrency);
// Server's current max concurrency
int max_concurrency() const;
// Get/set max_concurrency associated with a method. // Get/set max_concurrency associated with a method.
// Example: // Example:
// server.MaxConcurrencyOf("example.EchoService.Echo") = 10; // server.MaxConcurrencyOf("example.EchoService.Echo") = 10;
// or server.MaxConcurrencyOf("example.EchoService", "Echo") = 10; // or server.MaxConcurrencyOf("example.EchoService", "Echo") = 10;
// or server.MaxConcurrencyOf(&service, "Echo") = 10; // or server.MaxConcurrencyOf(&service, "Echo") = 10;
int& MaxConcurrencyOf(const butil::StringPiece& full_method_name); // Note: These interfaces can ONLY be called before the server is started.
// And you should NOT set the max_concurrency when you are going to choose
// an auto concurrency limiter, eg `options.max_concurrency = "auto"`.If you
// still called non-const version of the interface, your changes to the
// maximum concurrency will not take effect.
AdaptiveMaxConcurrency& MaxConcurrencyOf(const butil::StringPiece& full_method_name);
int MaxConcurrencyOf(const butil::StringPiece& full_method_name) const; int MaxConcurrencyOf(const butil::StringPiece& full_method_name) const;
int& MaxConcurrencyOf(const butil::StringPiece& full_service_name, AdaptiveMaxConcurrency& MaxConcurrencyOf(const butil::StringPiece& full_service_name,
const butil::StringPiece& method_name); const butil::StringPiece& method_name);
int MaxConcurrencyOf(const butil::StringPiece& full_service_name, int MaxConcurrencyOf(const butil::StringPiece& full_service_name,
const butil::StringPiece& method_name) const; const butil::StringPiece& method_name) const;
int& MaxConcurrencyOf(google::protobuf::Service* service, AdaptiveMaxConcurrency& MaxConcurrencyOf(google::protobuf::Service* service,
const butil::StringPiece& method_name); const butil::StringPiece& method_name);
int MaxConcurrencyOf(google::protobuf::Service* service, int MaxConcurrencyOf(google::protobuf::Service* service,
const butil::StringPiece& method_name) const; const butil::StringPiece& method_name) const;
...@@ -590,7 +603,7 @@ friend class Controller; ...@@ -590,7 +603,7 @@ friend class Controller;
static bool ResetCertMappings(CertMaps& bg, const SSLContextMap& ctx_map); static bool ResetCertMappings(CertMaps& bg, const SSLContextMap& ctx_map);
static bool ClearCertMapping(CertMaps& bg); static bool ClearCertMapping(CertMaps& bg);
int& MaxConcurrencyOf(MethodProperty*); AdaptiveMaxConcurrency& MaxConcurrencyOf(MethodProperty*);
int MaxConcurrencyOf(const MethodProperty*) const; int MaxConcurrencyOf(const MethodProperty*) const;
DISALLOW_COPY_AND_ASSIGN(Server); DISALLOW_COPY_AND_ASSIGN(Server);
...@@ -650,7 +663,7 @@ friend class Controller; ...@@ -650,7 +663,7 @@ friend class Controller;
// Replace `ServerPrivateAccessor' with other private-access // Replace `ServerPrivateAccessor' with other private-access
// mechanism // mechanism
mutable bvar::Adder<int64_t> _nerror; mutable bvar::Adder<int64_t> _nerror;
mutable int32_t BAIDU_CACHELINE_ALIGNMENT _concurrency; ConcurrencyLimiter* _cl;
}; };
// Get the data attached to current searching thread. The data is created by // Get the data attached to current searching thread. The data is created by
......
...@@ -220,6 +220,7 @@ protected: ...@@ -220,6 +220,7 @@ protected:
brpc::Controller* cntl = new brpc::Controller(); brpc::Controller* cntl = new brpc::Controller();
cntl->_current_call.peer_id = ptr->id(); cntl->_current_call.peer_id = ptr->id();
cntl->_current_call.sending_sock.reset(ptr.release()); cntl->_current_call.sending_sock.reset(ptr.release());
cntl->_server = &ts->_dummy;
google::protobuf::Message* res = google::protobuf::Message* res =
ts->_svc.GetResponsePrototype(method).New(); ts->_svc.GetResponsePrototype(method).New();
...@@ -701,7 +702,7 @@ protected: ...@@ -701,7 +702,7 @@ protected:
CallMethod(&subchans[0], &cntl, &req, &res, false); CallMethod(&subchans[0], &cntl, &req, &res, false);
ASSERT_TRUE(cntl.Failed()); ASSERT_TRUE(cntl.Failed());
ASSERT_EQ(brpc::EINTERNAL, cntl.ErrorCode()) << cntl.ErrorText(); ASSERT_EQ(brpc::EINTERNAL, cntl.ErrorCode()) << cntl.ErrorText();
ASSERT_EQ("[E2001]Method ComboEcho() not implemented.", cntl.ErrorText()); ASSERT_EQ("[E2001][127.0.1.1:0]Method ComboEcho() not implemented.", cntl.ErrorText());
// do the rpc call. // do the rpc call.
cntl.Reset(); cntl.Reset();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment