Commit 6eb3e5a1 authored by TousakaRin's avatar TousakaRin

review

parent e69935d8
...@@ -17,10 +17,8 @@ ...@@ -17,10 +17,8 @@
#ifndef BRPC_CONCURRENCY_LIMITER_H #ifndef BRPC_CONCURRENCY_LIMITER_H
#define BRPC_CONCURRENCY_LIMITER_H #define BRPC_CONCURRENCY_LIMITER_H
#include "bvar/passive_status.h"
#include "brpc/describable.h" #include "brpc/describable.h"
#include "brpc/destroyable.h" #include "brpc/destroyable.h"
#include "brpc/adaptive_max_concurrency.h"
#include "brpc/extension.h" // Extension<T> #include "brpc/extension.h" // Extension<T>
namespace brpc { namespace brpc {
......
...@@ -107,7 +107,7 @@ inline bool MethodStatus::OnRequested() { ...@@ -107,7 +107,7 @@ inline bool MethodStatus::OnRequested() {
} }
inline void MethodStatus::OnResponded(int error_code, int64_t latency) { inline void MethodStatus::OnResponded(int error_code, int64_t latency) {
if (error_code == 0) { if (0 == error_code) {
_latency_rec << latency; _latency_rec << latency;
_nprocessing.fetch_sub(1, butil::memory_order_relaxed); _nprocessing.fetch_sub(1, butil::memory_order_relaxed);
} else { } else {
......
...@@ -84,6 +84,7 @@ ...@@ -84,6 +84,7 @@
extern "C" { extern "C" {
// defined in gperftools/malloc_extension_c.h // defined in gperftools/malloc_extension_c.h
void BAIDU_WEAK MallocExtension_ReleaseFreeMemory(void); void BAIDU_WEAK MallocExtension_ReleaseFreeMemory(void);
void BAIDU_WEAK RegisterThriftProtocol();
} }
namespace brpc { namespace brpc {
...@@ -104,8 +105,6 @@ using namespace policy; ...@@ -104,8 +105,6 @@ using namespace policy;
const char* const DUMMY_SERVER_PORT_FILE = "dummy_server.port"; const char* const DUMMY_SERVER_PORT_FILE = "dummy_server.port";
void __attribute__((weak)) RegisterThriftProtocol();
struct GlobalExtensions { struct GlobalExtensions {
GlobalExtensions() GlobalExtensions()
: ch_mh_lb(MurmurHash32) : ch_mh_lb(MurmurHash32)
...@@ -476,16 +475,10 @@ static void GlobalInitializeOrDieImpl() { ...@@ -476,16 +475,10 @@ static void GlobalInitializeOrDieImpl() {
exit(1); exit(1);
} }
// Use Macro is more straight forward than weak link technology(becasue of static link issue) // Register Thrift framed protocol if linked
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
Protocol thrift_binary_protocol = { if (RegisterThriftProtocol) {
policy::ParseThriftMessage, RegisterThriftProtocol();
policy::SerializeThriftRequest, policy::PackThriftRequest,
policy::ProcessThriftRequest, policy::ProcessThriftResponse,
policy::VerifyThriftRequest, NULL, NULL,
CONNECTION_TYPE_POOLED_AND_SHORT, "thrift" };
if (RegisterProtocol(PROTOCOL_THRIFT, thrift_binary_protocol) != 0) {
exit(1);
} }
#endif #endif
......
...@@ -31,10 +31,10 @@ namespace policy { ...@@ -31,10 +31,10 @@ namespace policy {
DEFINE_int32(gradient_cl_sampling_interval_us, 1000, DEFINE_int32(gradient_cl_sampling_interval_us, 1000,
"Interval for sampling request in gradient concurrency limiter"); "Interval for sampling request in gradient concurrency limiter");
DEFINE_int32(gradient_cl_sample_window_size_ms, 1000, DEFINE_int32(gradient_cl_sample_window_size_ms, 1000,
"Sample window size for update concurrency in grandient " "Sample window size for update max concurrency in grandient "
"concurrency limiter"); "concurrency limiter");
DEFINE_int32(gradient_cl_min_sample_count, 100, DEFINE_int32(gradient_cl_min_sample_count, 100,
"Minium sample count for update concurrency"); "Minium sample count for update max concurrency");
DEFINE_int32(gradient_cl_adjust_smooth, 50, DEFINE_int32(gradient_cl_adjust_smooth, 50,
"Smooth coefficient for adjust the max concurrency, the value is 0-99," "Smooth coefficient for adjust the max concurrency, the value is 0-99,"
"the larger the value, the smaller the amount of each change"); "the larger the value, the smaller the amount of each change");
...@@ -45,10 +45,10 @@ DEFINE_bool(gradient_cl_enable_error_punish, true, ...@@ -45,10 +45,10 @@ DEFINE_bool(gradient_cl_enable_error_punish, true,
DEFINE_int32(gradient_cl_max_error_punish_ms, 3000, DEFINE_int32(gradient_cl_max_error_punish_ms, 3000,
"The maximum time wasted for a single failed request"); "The maximum time wasted for a single failed request");
DEFINE_double(gradient_cl_fail_punish_aggressive, 1.0, DEFINE_double(gradient_cl_fail_punish_aggressive, 1.0,
"Use the failed request to punish minRTT. The larger the configuration" "Use the failed requests to punish normal requests. The larger the "
"item, the more aggressive the penalty strategy."); "configuration item, the more aggressive the penalty strategy.");
DEFINE_int32(gradient_cl_window_count, 30, DEFINE_int32(gradient_cl_window_count, 20,
"Sample windows count for compute history min average rtt"); "Sample windows count for compute history min average latency");
DEFINE_int32(gradient_cl_task_per_req, 3, DEFINE_int32(gradient_cl_task_per_req, 3,
"How many tasks will be generated for each request, calculate the maximum " "How many tasks will be generated for each request, calculate the maximum "
"concurrency of a system by calculating the maximum possible concurrent " "concurrency of a system by calculating the maximum possible concurrent "
...@@ -164,11 +164,6 @@ void GradientConcurrencyLimiter::AddSample(int error_code, int64_t latency_us, ...@@ -164,11 +164,6 @@ void GradientConcurrencyLimiter::AddSample(int error_code, int64_t latency_us,
_sw.total_failed_us += latency_us; _sw.total_failed_us += latency_us;
} else if (error_code == 0) { } else if (error_code == 0) {
++_sw.succ_count; ++_sw.succ_count;
if (_sw.min_latency_us < 0) {
_sw.min_latency_us = latency_us;
} else {
_sw.min_latency_us = std::min(latency_us, _sw.min_latency_us);
}
_sw.total_succ_us += latency_us; _sw.total_succ_us += latency_us;
} }
...@@ -195,7 +190,6 @@ void GradientConcurrencyLimiter::ResetSampleWindow(int64_t sampling_time_us) { ...@@ -195,7 +190,6 @@ void GradientConcurrencyLimiter::ResetSampleWindow(int64_t sampling_time_us) {
_sw.failed_count = 0; _sw.failed_count = 0;
_sw.total_failed_us = 0; _sw.total_failed_us = 0;
_sw.total_succ_us = 0; _sw.total_succ_us = 0;
_sw.min_latency_us = -1;
} }
void GradientConcurrencyLimiter::UpdateConcurrency() { void GradientConcurrencyLimiter::UpdateConcurrency() {
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#define BRPC_POLICY_GRANDIENT_CONCURRENCY_LIMITER_H #define BRPC_POLICY_GRANDIENT_CONCURRENCY_LIMITER_H
#include "brpc/concurrency_limiter.h" #include "brpc/concurrency_limiter.h"
#include "bvar/bvar.h"
namespace brpc { namespace brpc {
namespace policy { namespace policy {
...@@ -49,13 +50,11 @@ private: ...@@ -49,13 +50,11 @@ private:
: start_time_us(0) : start_time_us(0)
, succ_count(0) , succ_count(0)
, failed_count(0) , failed_count(0)
, min_latency_us(-1)
, total_failed_us(0) , total_failed_us(0)
, total_succ_us(0) {} , total_succ_us(0) {}
int64_t start_time_us; int64_t start_time_us;
int32_t succ_count; int32_t succ_count;
int32_t failed_count; int32_t failed_count;
int64_t min_latency_us;
int64_t total_failed_us; int64_t total_failed_us;
int64_t total_succ_us; int64_t total_succ_us;
}; };
......
...@@ -37,6 +37,7 @@ ...@@ -37,6 +37,7 @@
#include "brpc/details/profiler_linker.h" #include "brpc/details/profiler_linker.h"
#include "brpc/health_reporter.h" #include "brpc/health_reporter.h"
#include "brpc/concurrency_limiter.h" #include "brpc/concurrency_limiter.h"
#include "brpc/adaptive_max_concurrency.h"
extern "C" { extern "C" {
struct ssl_ctx_st; struct ssl_ctx_st;
...@@ -485,7 +486,7 @@ public: ...@@ -485,7 +486,7 @@ public:
// Reset the max_concurrency set by ServerOptions.max_concurrency after // Reset the max_concurrency set by ServerOptions.max_concurrency after
// Server is started. // Server is started.
// The concurrency will be limited by the new value if this function is // The concurrency will be limited by the new value if this function is
// successfully returned. // successfully returned.
// Note: You may call this interface ONLY if you use the CONSTANT // Note: You may call this interface ONLY if you use the CONSTANT
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment