Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
B
brpc
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
brpc
Commits
75cfdf6a
Commit
75cfdf6a
authored
Aug 01, 2018
by
TousakaRin
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Add example for auto_concurrency_limiter
parent
af218f5b
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
977 additions
and
0 deletions
+977
-0
CMakeLists.txt
example/auto_concurrency_limiter/CMakeLists.txt
+116
-0
cl_test.proto
example/auto_concurrency_limiter/cl_test.proto
+44
-0
client.cpp
example/auto_concurrency_limiter/client.cpp
+244
-0
dummy_server.port
example/auto_concurrency_limiter/dummy_server.port
+1
-0
server.cpp
example/auto_concurrency_limiter/server.cpp
+283
-0
settings.flags
example/auto_concurrency_limiter/settings.flags
+7
-0
test_case.json
example/auto_concurrency_limiter/test_case.json
+282
-0
No files found.
example/auto_concurrency_limiter/CMakeLists.txt
0 → 100644
View file @
75cfdf6a
cmake_minimum_required
(
VERSION 2.8.10
)
project
(
asynchronous_echo_c++ C CXX
)
option
(
EXAMPLE_LINK_SO
"Whether examples are linked dynamically"
OFF
)
execute_process
(
COMMAND bash -c
"find
${
CMAKE_SOURCE_DIR
}
/../.. -type d -regex
\"
.*output/include$
\"
| head -n1 | xargs dirname | tr -d '
\n
'"
OUTPUT_VARIABLE OUTPUT_PATH
)
set
(
CMAKE_PREFIX_PATH
${
OUTPUT_PATH
}
)
include
(
FindThreads
)
include
(
FindProtobuf
)
protobuf_generate_cpp
(
PROTO_SRC PROTO_HEADER echo.proto
)
# include PROTO_HEADER
include_directories
(
${
CMAKE_CURRENT_BINARY_DIR
}
)
find_path
(
BRPC_INCLUDE_PATH NAMES brpc/server.h
)
if
(
EXAMPLE_LINK_SO
)
find_library
(
BRPC_LIB NAMES brpc
)
else
()
find_library
(
BRPC_LIB NAMES libbrpc.a brpc
)
endif
()
if
((
NOT BRPC_INCLUDE_PATH
)
OR
(
NOT BRPC_LIB
))
message
(
FATAL_ERROR
"Fail to find brpc"
)
endif
()
include_directories
(
${
BRPC_INCLUDE_PATH
}
)
find_path
(
GFLAGS_INCLUDE_PATH gflags/gflags.h
)
find_library
(
GFLAGS_LIBRARY NAMES gflags libgflags
)
if
((
NOT GFLAGS_INCLUDE_PATH
)
OR
(
NOT GFLAGS_LIBRARY
))
message
(
FATAL_ERROR
"Fail to find gflags"
)
endif
()
include_directories
(
${
GFLAGS_INCLUDE_PATH
}
)
execute_process
(
COMMAND bash -c
"grep
\"
namespace [_A-Za-z0-9]
\\
+ {
\"
${
GFLAGS_INCLUDE_PATH
}
/gflags/gflags_declare.h | head -1 | awk '{print $2}' | tr -d '
\n
'"
OUTPUT_VARIABLE GFLAGS_NS
)
if
(
${
GFLAGS_NS
}
STREQUAL
"GFLAGS_NAMESPACE"
)
execute_process
(
COMMAND bash -c
"grep
\"
#define GFLAGS_NAMESPACE [_A-Za-z0-9]
\\
+
\"
${
GFLAGS_INCLUDE_PATH
}
/gflags/gflags_declare.h | head -1 | awk '{print $3}' | tr -d '
\n
'"
OUTPUT_VARIABLE GFLAGS_NS
)
endif
()
if
(
CMAKE_SYSTEM_NAME STREQUAL
"Darwin"
)
include
(
CheckFunctionExists
)
CHECK_FUNCTION_EXISTS
(
clock_gettime HAVE_CLOCK_GETTIME
)
if
(
NOT HAVE_CLOCK_GETTIME
)
set
(
DEFINE_CLOCK_GETTIME
"-DNO_CLOCK_GETTIME_IN_MAC"
)
endif
()
endif
()
set
(
CMAKE_CPP_FLAGS
"
${
DEFINE_CLOCK_GETTIME
}
-DGFLAGS_NS=
${
GFLAGS_NS
}
"
)
set
(
CMAKE_CXX_FLAGS
"
${
CMAKE_CPP_FLAGS
}
-DNDEBUG -O2 -D__const__= -pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer"
)
if
(
CMAKE_VERSION VERSION_LESS
"3.1.3"
)
if
(
CMAKE_CXX_COMPILER_ID STREQUAL
"GNU"
)
set
(
CMAKE_CXX_FLAGS
"
${
CMAKE_CXX_FLAGS
}
-std=c++11"
)
endif
()
if
(
CMAKE_CXX_COMPILER_ID STREQUAL
"Clang"
)
set
(
CMAKE_CXX_FLAGS
"
${
CMAKE_CXX_FLAGS
}
-std=c++11"
)
endif
()
else
()
set
(
CMAKE_CXX_STANDARD 11
)
set
(
CMAKE_CXX_STANDARD_REQUIRED ON
)
endif
()
find_path
(
LEVELDB_INCLUDE_PATH NAMES leveldb/db.h
)
find_library
(
LEVELDB_LIB NAMES leveldb
)
if
((
NOT LEVELDB_INCLUDE_PATH
)
OR
(
NOT LEVELDB_LIB
))
message
(
FATAL_ERROR
"Fail to find leveldb"
)
endif
()
include_directories
(
${
LEVELDB_INCLUDE_PATH
}
)
find_library
(
SSL_LIB NAMES ssl
)
if
(
NOT SSL_LIB
)
message
(
FATAL_ERROR
"Fail to find ssl"
)
endif
()
find_library
(
CRYPTO_LIB NAMES crypto
)
if
(
NOT CRYPTO_LIB
)
message
(
FATAL_ERROR
"Fail to find crypto"
)
endif
()
set
(
DYNAMIC_LIB
${
CMAKE_THREAD_LIBS_INIT
}
${
GFLAGS_LIBRARY
}
${
PROTOBUF_LIBRARIES
}
${
LEVELDB_LIB
}
${
SSL_LIB
}
${
CRYPTO_LIB
}
dl
)
if
(
CMAKE_SYSTEM_NAME STREQUAL
"Darwin"
)
set
(
DYNAMIC_LIB
${
DYNAMIC_LIB
}
pthread
"-framework CoreFoundation"
"-framework CoreGraphics"
"-framework CoreData"
"-framework CoreText"
"-framework Security"
"-framework Foundation"
"-Wl,-U,_MallocExtension_ReleaseFreeMemory"
"-Wl,-U,_ProfilerStart"
"-Wl,-U,_ProfilerStop"
"-Wl,-U,_RegisterThriftProtocol"
)
endif
()
add_executable
(
asynchronous_echo_client client.cpp
${
PROTO_SRC
}
)
add_executable
(
asynchronous_echo_server server.cpp
${
PROTO_SRC
}
)
target_link_libraries
(
asynchronous_echo_client
${
BRPC_LIB
}
${
DYNAMIC_LIB
}
)
target_link_libraries
(
asynchronous_echo_server
${
BRPC_LIB
}
${
DYNAMIC_LIB
}
)
example/auto_concurrency_limiter/cl_test.proto
0 → 100644
View file @
75cfdf6a
syntax
=
"proto2"
;
package
test
;
option
cc_generic_services
=
true
;
message
NotifyRequest
{
required
string
message
=
1
;
};
message
NotifyResponse
{
required
string
message
=
1
;
};
enum
ChangeType
{
FLUCTUATE
=
1
;
// Fluctuating between upper and lower bound
SMOOTH
=
2
;
// Smoothly rising from the lower bound to the upper bound
}
message
Stage
{
required
int32
lower_bound
=
1
;
required
int32
upper_bound
=
2
;
required
int32
duration_sec
=
3
;
required
ChangeType
type
=
4
;
}
message
TestCase
{
required
string
case_name
=
1
;
required
string
max_concurrency
=
2
;
repeated
Stage
qps_stage_list
=
3
;
repeated
Stage
latency_stage_list
=
4
;
}
message
TestCaseSet
{
repeated
TestCase
test_case
=
1
;
}
service
ControlService
{
rpc
Notify
(
NotifyRequest
)
returns
(
NotifyResponse
);
}
service
EchoService
{
rpc
Echo
(
NotifyRequest
)
returns
(
NotifyResponse
);
};
example/auto_concurrency_limiter/client.cpp
0 → 100644
View file @
75cfdf6a
// Copyright (c) 2014 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// A client sending requests to server asynchronously every 1 second.
#include <gflags/gflags.h>
#include <butil/logging.h>
#include <butil/time.h>
#include <brpc/channel.h>
#include <bvar/bvar.h>
#include <bthread/timer_thread.h>
#include <json2pb/json_to_pb.h>
#include <fstream>
#include "cl_test.pb.h"
DEFINE_string
(
protocol
,
"baidu_std"
,
"Protocol type. Defined in src/brpc/options.proto"
);
DEFINE_string
(
connection_type
,
""
,
"Connection type. Available values: single, pooled, short"
);
DEFINE_string
(
cntl_server
,
"0.0.0.0:9000"
,
"IP Address of server"
);
DEFINE_string
(
echo_server
,
"0.0.0.0:9001"
,
"IP Address of server"
);
DEFINE_int32
(
timeout_ms
,
3000
,
"RPC timeout in milliseconds"
);
DEFINE_int32
(
max_retry
,
0
,
"Max retries(not including the first RPC)"
);
DEFINE_int32
(
case_interval
,
20
,
""
);
DEFINE_int32
(
client_frequent_interval_us
,
10000
,
""
);
DEFINE_string
(
case_file
,
""
,
""
);
void
DisplayStage
(
const
test
::
Stage
&
stage
)
{
std
::
string
type
;
switch
(
stage
.
type
())
{
case
test
:
:
FLUCTUATE
:
type
=
"Fluctuate"
;
break
;
case
test
:
:
SMOOTH
:
type
=
"Smooth"
;
break
;
default:
type
=
"Unknown"
;
}
std
::
stringstream
ss
;
ss
<<
"Stage:["
<<
stage
.
lower_bound
()
<<
':'
<<
stage
.
upper_bound
()
<<
"]"
<<
" , Type:"
<<
type
;
LOG
(
INFO
)
<<
ss
.
str
();
}
uint32_t
cast_func
(
void
*
arg
)
{
return
*
(
uint32_t
*
)
arg
;
}
butil
::
atomic
<
uint32_t
>
g_timeout
(
0
);
butil
::
atomic
<
uint32_t
>
g_error
(
0
);
butil
::
atomic
<
uint32_t
>
g_succ
(
0
);
bvar
::
PassiveStatus
<
uint32_t
>
g_timeout_bvar
(
cast_func
,
&
g_timeout
);
bvar
::
PassiveStatus
<
uint32_t
>
g_error_bvar
(
cast_func
,
&
g_error
);
bvar
::
PassiveStatus
<
uint32_t
>
g_succ_bvar
(
cast_func
,
&
g_succ
);
bvar
::
LatencyRecorder
g_latency_rec
;
void
LoadCaseSet
(
test
::
TestCaseSet
*
case_set
,
const
std
::
string
&
file_path
)
{
std
::
ifstream
ifs
(
file_path
.
c_str
(),
std
::
ios
::
in
);
if
(
!
ifs
)
{
LOG
(
FATAL
)
<<
"Fail to open case set file: "
<<
file_path
;
}
std
::
string
case_set_json
((
std
::
istreambuf_iterator
<
char
>
(
ifs
)),
std
::
istreambuf_iterator
<
char
>
());
std
::
string
err
;
if
(
!
json2pb
::
JsonToProtoMessage
(
case_set_json
,
case_set
,
&
err
))
{
LOG
(
FATAL
)
<<
"Fail to trans case_set from json to protobuf message: "
<<
err
;
}
}
void
HandleEchoResponse
(
brpc
::
Controller
*
cntl
,
test
::
NotifyResponse
*
response
)
{
// std::unique_ptr makes sure cntl/response will be deleted before returning.
std
::
unique_ptr
<
brpc
::
Controller
>
cntl_guard
(
cntl
);
std
::
unique_ptr
<
test
::
NotifyResponse
>
response_guard
(
response
);
if
(
cntl
->
Failed
()
&&
cntl
->
ErrorCode
()
==
brpc
::
ERPCTIMEDOUT
)
{
g_timeout
.
fetch_add
(
1
,
butil
::
memory_order_relaxed
);
LOG_EVERY_N
(
INFO
,
1000
)
<<
cntl
->
ErrorText
();
}
else
if
(
cntl
->
Failed
())
{
g_error
.
fetch_add
(
1
,
butil
::
memory_order_relaxed
);
LOG_EVERY_N
(
INFO
,
1000
)
<<
cntl
->
ErrorText
();
}
else
{
g_succ
.
fetch_add
(
1
,
butil
::
memory_order_relaxed
);
g_latency_rec
<<
cntl
->
latency_us
();
}
}
void
Expose
()
{
g_timeout_bvar
.
expose_as
(
"cl"
,
"timeout"
);
g_error_bvar
.
expose_as
(
"cl"
,
"failed"
);
g_succ_bvar
.
expose_as
(
"cl"
,
"succ"
);
g_latency_rec
.
expose
(
"cl"
);
}
struct
TestCaseContext
{
TestCaseContext
(
const
test
::
TestCase
&
tc
)
:
running
(
true
)
,
stage_index
(
0
)
,
test_case
(
tc
)
,
next_stage_sec
(
test_case
.
qps_stage_list
(
0
).
duration_sec
()
+
butil
::
gettimeofday_s
())
{
DisplayStage
(
test_case
.
qps_stage_list
(
stage_index
));
Update
();
}
bool
Update
()
{
if
(
butil
::
gettimeofday_s
()
>=
next_stage_sec
)
{
++
stage_index
;
if
(
stage_index
<
test_case
.
qps_stage_list_size
())
{
next_stage_sec
+=
test_case
.
qps_stage_list
(
stage_index
).
duration_sec
();
DisplayStage
(
test_case
.
qps_stage_list
(
stage_index
));
}
else
{
return
false
;
}
}
int
qps
=
0
;
const
test
::
Stage
&
qps_stage
=
test_case
.
qps_stage_list
(
stage_index
);
const
int
lower_bound
=
qps_stage
.
lower_bound
();
const
int
upper_bound
=
qps_stage
.
upper_bound
();
if
(
qps_stage
.
type
()
==
test
::
FLUCTUATE
)
{
qps
=
butil
::
fast_rand_less_than
(
upper_bound
-
lower_bound
)
+
lower_bound
;
}
else
if
(
qps_stage
.
type
()
==
test
::
SMOOTH
)
{
qps
=
lower_bound
+
(
upper_bound
-
lower_bound
)
/
double
(
qps_stage
.
duration_sec
())
*
(
qps_stage
.
duration_sec
()
-
next_stage_sec
+
butil
::
gettimeofday_s
());
}
interval_us
.
store
(
1.0
/
qps
*
1000000
,
butil
::
memory_order_relaxed
);
return
true
;
}
butil
::
atomic
<
bool
>
running
;
butil
::
atomic
<
int64_t
>
interval_us
;
int
stage_index
;
const
test
::
TestCase
test_case
;
int
next_stage_sec
;
};
void
RunUpdateTask
(
void
*
data
)
{
TestCaseContext
*
context
=
(
TestCaseContext
*
)
data
;
bool
should_continue
=
context
->
Update
();
timespec
ts
;
ts
.
tv_nsec
=
FLAGS_client_frequent_interval_us
*
1000
;
if
(
should_continue
)
{
bthread
::
get_global_timer_thread
()
->
schedule
(
RunUpdateTask
,
data
,
ts
);
}
else
{
context
->
running
.
store
(
false
,
butil
::
memory_order_release
);
}
}
void
RunCase
(
test
::
ControlService_Stub
&
cntl_stub
,
const
test
::
TestCase
&
test_case
)
{
LOG
(
INFO
)
<<
"Running case:`"
<<
test_case
.
case_name
()
<<
'\''
;
brpc
::
Channel
channel
;
brpc
::
ChannelOptions
options
;
options
.
protocol
=
FLAGS_protocol
;
options
.
connection_type
=
FLAGS_connection_type
;
options
.
timeout_ms
=
FLAGS_timeout_ms
;
if
(
channel
.
Init
(
FLAGS_echo_server
.
c_str
(),
&
options
)
!=
0
)
{
LOG
(
FATAL
)
<<
"Fail to initialize channel"
;
}
test
::
EchoService_Stub
echo_stub
(
&
channel
);
test
::
NotifyRequest
cntl_req
;
test
::
NotifyResponse
cntl_rsp
;
brpc
::
Controller
cntl
;
cntl_req
.
set_message
(
"StartCase"
);
cntl_stub
.
Notify
(
&
cntl
,
&
cntl_req
,
&
cntl_rsp
,
NULL
);
CHECK
(
!
cntl
.
Failed
())
<<
"control failed"
;
TestCaseContext
context
(
test_case
);
timespec
ts
;
ts
.
tv_nsec
=
FLAGS_client_frequent_interval_us
*
1000
;
bthread
::
get_global_timer_thread
()
->
schedule
(
RunUpdateTask
,
&
context
,
ts
);
while
(
context
.
running
.
load
(
butil
::
memory_order_acquire
))
{
test
::
NotifyRequest
echo_req
;
echo_req
.
set_message
(
"hello"
);
brpc
::
Controller
*
echo_cntl
=
new
brpc
::
Controller
;
test
::
NotifyResponse
*
echo_rsp
=
new
test
::
NotifyResponse
;
google
::
protobuf
::
Closure
*
done
=
brpc
::
NewCallback
(
&
HandleEchoResponse
,
echo_cntl
,
echo_rsp
);
echo_stub
.
Echo
(
echo_cntl
,
&
echo_req
,
echo_rsp
,
done
);
::
usleep
(
context
.
interval_us
.
load
(
butil
::
memory_order_relaxed
));
}
LOG
(
INFO
)
<<
"Waiting to stop case: `"
<<
test_case
.
case_name
()
<<
'\''
;
::
sleep
(
FLAGS_case_interval
);
cntl
.
Reset
();
cntl_req
.
set_message
(
"StopCase"
);
cntl_stub
.
Notify
(
&
cntl
,
&
cntl_req
,
&
cntl_rsp
,
NULL
);
CHECK
(
!
cntl
.
Failed
())
<<
"control failed"
;
LOG
(
INFO
)
<<
"Case `"
<<
test_case
.
case_name
()
<<
"' finshed:"
;
}
int
main
(
int
argc
,
char
*
argv
[])
{
// Parse gflags. We recommend you to use gflags as well.
GFLAGS_NS
::
ParseCommandLineFlags
(
&
argc
,
&
argv
,
true
);
Expose
();
brpc
::
Channel
channel
;
brpc
::
ChannelOptions
options
;
options
.
protocol
=
FLAGS_protocol
;
options
.
connection_type
=
FLAGS_connection_type
;
options
.
timeout_ms
=
FLAGS_timeout_ms
;
if
(
channel
.
Init
(
FLAGS_cntl_server
.
c_str
(),
&
options
)
!=
0
)
{
LOG
(
ERROR
)
<<
"Fail to initialize channel"
;
return
-
1
;
}
test
::
ControlService_Stub
cntl_stub
(
&
channel
);
test
::
TestCaseSet
case_set
;
LoadCaseSet
(
&
case_set
,
FLAGS_case_file
);
brpc
::
Controller
cntl
;
test
::
NotifyRequest
cntl_req
;
test
::
NotifyResponse
cntl_rsp
;
cntl_req
.
set_message
(
"ResetCaseSet"
);
cntl_stub
.
Notify
(
&
cntl
,
&
cntl_req
,
&
cntl_rsp
,
NULL
);
CHECK
(
!
cntl
.
Failed
())
<<
"Cntl Failed"
;
for
(
int
i
=
0
;
i
<
case_set
.
test_case_size
();
++
i
)
{
RunCase
(
cntl_stub
,
case_set
.
test_case
(
i
));
}
LOG
(
INFO
)
<<
"EchoClient is going to quit"
;
return
0
;
}
example/auto_concurrency_limiter/dummy_server.port
0 → 100644
View file @
75cfdf6a
9999
example/auto_concurrency_limiter/server.cpp
0 → 100644
View file @
75cfdf6a
// Copyright (c) 2014 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// A server to receive EchoRequest and send back EchoResponse.
#include <gflags/gflags.h>
#include <butil/logging.h>
#include <brpc/server.h>
#include <butil/atomicops.h>
#include <butil/time.h>
#include <butil/logging.h>
#include <json2pb/json_to_pb.h>
#include <bthread/timer_thread.h>
#include <bthread/bthread.h>
#include <cstdlib>
#include <fstream>
#include "cl_test.pb.h"
DEFINE_int32
(
logoff_ms
,
2000
,
"Maximum duration of server's LOGOFF state "
"(waiting for client to close connection before server stops)"
);
DEFINE_int32
(
server_bthread_concurrency
,
4
,
"For compute max qps"
);
DEFINE_int32
(
server_sync_sleep_us
,
2500
,
"For compute max qps"
);
// max qps = 1000 / 2.5 * 4 = 1600
DEFINE_int32
(
control_server_port
,
9000
,
""
);
DEFINE_int32
(
echo_port
,
9001
,
""
);
DEFINE_int32
(
cntl_port
,
9000
,
"TCP Port of this server"
);
DEFINE_string
(
case_file
,
""
,
""
);
DEFINE_int32
(
server_frequent_interval_us
,
10000
,
""
);
bthread
::
TimerThread
g_timer_thread
;
int
cast_func
(
void
*
arg
)
{
return
*
(
int
*
)
arg
;
}
void
DisplayStage
(
const
test
::
Stage
&
stage
)
{
std
::
string
type
;
switch
(
stage
.
type
())
{
case
test
:
:
FLUCTUATE
:
type
=
"Fluctuate"
;
break
;
case
test
:
:
SMOOTH
:
type
=
"Smooth"
;
break
;
default:
type
=
"Unknown"
;
}
std
::
stringstream
ss
;
ss
<<
"Stage:["
<<
stage
.
lower_bound
()
<<
':'
<<
stage
.
upper_bound
()
<<
"]"
<<
" , Type:"
<<
type
;
LOG
(
INFO
)
<<
ss
.
str
();
}
butil
::
atomic
<
int
>
cnt
(
0
);
butil
::
atomic
<
int
>
atomic_sleep_time
(
0
);
bvar
::
PassiveStatus
<
int
>
atomic_sleep_time_bvar
(
cast_func
,
&
atomic_sleep_time
);
namespace
bthread
{
DECLARE_int32
(
bthread_concurrency
);
}
void
TimerTask
(
void
*
data
);
class
EchoServiceImpl
:
public
test
::
EchoService
{
public
:
EchoServiceImpl
()
:
_stage_index
(
0
)
,
_running_case
(
false
)
{
};
virtual
~
EchoServiceImpl
()
{};
void
SetTestCase
(
const
test
::
TestCase
&
test_case
)
{
_test_case
=
test_case
;
_next_stage_start
=
_test_case
.
latency_stage_list
(
0
).
duration_sec
()
+
butil
::
gettimeofday_s
();
_stage_index
=
0
;
_running_case
=
false
;
DisplayStage
(
_test_case
.
latency_stage_list
(
_stage_index
));
}
void
StartTestCase
()
{
CHECK
(
!
_running_case
);
_running_case
=
true
;
UpdateLatency
();
}
void
StopTestCase
()
{
_running_case
=
false
;
}
void
UpdateLatency
()
{
if
(
!
_running_case
)
{
return
;
}
ComputeLatency
();
timespec
ts
;
ts
.
tv_nsec
=
FLAGS_server_frequent_interval_us
*
1000
;
g_timer_thread
.
schedule
(
TimerTask
,
(
void
*
)
this
,
ts
);
}
virtual
void
Echo
(
google
::
protobuf
::
RpcController
*
cntl_base
,
const
test
::
NotifyRequest
*
request
,
test
::
NotifyResponse
*
response
,
google
::
protobuf
::
Closure
*
done
)
{
brpc
::
ClosureGuard
done_guard
(
done
);
response
->
set_message
(
"hello"
);
::
usleep
(
FLAGS_server_sync_sleep_us
);
bthread_usleep
(
_latency
.
load
(
butil
::
memory_order_relaxed
));
}
void
ComputeLatency
()
{
if
(
_stage_index
<
_test_case
.
latency_stage_list_size
()
&&
butil
::
gettimeofday_s
()
>
_next_stage_start
)
{
++
_stage_index
;
if
(
_stage_index
<
_test_case
.
latency_stage_list_size
())
{
_next_stage_start
+=
_test_case
.
latency_stage_list
(
_stage_index
).
duration_sec
();
DisplayStage
(
_test_case
.
latency_stage_list
(
_stage_index
));
}
}
if
(
_stage_index
==
_test_case
.
latency_stage_list_size
())
{
const
test
::
Stage
&
latency_stage
=
_test_case
.
latency_stage_list
(
_stage_index
-
1
);
if
(
latency_stage
.
type
()
==
test
::
ChangeType
::
FLUCTUATE
)
{
_latency
.
store
((
latency_stage
.
lower_bound
()
+
latency_stage
.
upper_bound
())
/
2
,
butil
::
memory_order_relaxed
);
}
else
if
(
latency_stage
.
type
()
==
test
::
ChangeType
::
SMOOTH
)
{
_latency
.
store
(
latency_stage
.
upper_bound
(),
butil
::
memory_order_relaxed
);
}
return
;
}
const
test
::
Stage
&
latency_stage
=
_test_case
.
latency_stage_list
(
_stage_index
);
const
int
lower_bound
=
latency_stage
.
lower_bound
();
const
int
upper_bound
=
latency_stage
.
upper_bound
();
if
(
latency_stage
.
type
()
==
test
::
FLUCTUATE
)
{
_latency
.
store
(
butil
::
fast_rand_less_than
(
upper_bound
-
lower_bound
)
+
lower_bound
,
butil
::
memory_order_relaxed
);
}
else
if
(
latency_stage
.
type
()
==
test
::
SMOOTH
)
{
int
latency
=
lower_bound
+
(
upper_bound
-
lower_bound
)
/
double
(
latency_stage
.
duration_sec
())
*
(
latency_stage
.
duration_sec
()
-
_next_stage_start
+
butil
::
gettimeofday_s
());
_latency
.
store
(
latency
,
butil
::
memory_order_relaxed
);
}
else
{
LOG
(
FATAL
)
<<
"Wrong Type:"
<<
latency_stage
.
type
();
}
}
private
:
int
_stage_index
;
int
_next_stage_start
;
butil
::
atomic
<
int
>
_latency
;
test
::
TestCase
_test_case
;
bool
_running_case
;
};
void
TimerTask
(
void
*
data
)
{
EchoServiceImpl
*
echo_service
=
(
EchoServiceImpl
*
)
data
;
echo_service
->
UpdateLatency
();
}
class
ControlServiceImpl
:
public
test
::
ControlService
{
public
:
ControlServiceImpl
()
:
_case_index
(
0
)
{
LoadCaseSet
(
FLAGS_case_file
);
_echo_service
=
new
EchoServiceImpl
;
if
(
_server
.
AddService
(
_echo_service
,
brpc
::
SERVER_OWNS_SERVICE
)
!=
0
)
{
LOG
(
FATAL
)
<<
"Fail to add service"
;
}
g_timer_thread
.
start
(
NULL
);
}
virtual
~
ControlServiceImpl
()
{
_echo_service
->
StopTestCase
();
g_timer_thread
.
stop_and_join
();
};
virtual
void
Notify
(
google
::
protobuf
::
RpcController
*
cntl_base
,
const
test
::
NotifyRequest
*
request
,
test
::
NotifyResponse
*
response
,
google
::
protobuf
::
Closure
*
done
)
{
brpc
::
ClosureGuard
done_guard
(
done
);
const
std
::
string
&
message
=
request
->
message
();
LOG
(
INFO
)
<<
message
;
if
(
message
==
"ResetCaseSet"
)
{
_server
.
Stop
(
0
);
_server
.
Join
();
_echo_service
->
StopTestCase
();
LoadCaseSet
(
FLAGS_case_file
);
_case_index
=
0
;
response
->
set_message
(
"CaseSetReset"
);
}
else
if
(
message
==
"StartCase"
)
{
CHECK
(
!
_server
.
IsRunning
())
<<
"Continuous StartCase"
;
const
test
::
TestCase
&
test_case
=
_case_set
.
test_case
(
_case_index
++
);
_echo_service
->
SetTestCase
(
test_case
);
_server
.
MaxConcurrencyOf
(
"test.EchoService.Echo"
)
=
test_case
.
max_concurrency
();
_server
.
Start
(
FLAGS_echo_port
,
NULL
);
_echo_service
->
StartTestCase
();
response
->
set_message
(
"CaseStarted"
);
}
else
if
(
message
==
"StopCase"
)
{
CHECK
(
_server
.
IsRunning
())
<<
"Continuous StopCase"
;
_server
.
Stop
(
0
);
_server
.
Join
();
_echo_service
->
StopTestCase
();
response
->
set_message
(
"CaseStopped"
);
}
else
{
LOG
(
FATAL
)
<<
"Invalid message:"
<<
message
;
response
->
set_message
(
"Invalid Cntl Message"
);
}
}
private
:
void
LoadCaseSet
(
const
std
::
string
&
file_path
)
{
std
::
ifstream
ifs
(
file_path
.
c_str
(),
std
::
ios
::
in
);
if
(
!
ifs
)
{
LOG
(
FATAL
)
<<
"Fail to open case set file: "
<<
file_path
;
}
std
::
string
case_set_json
((
std
::
istreambuf_iterator
<
char
>
(
ifs
)),
std
::
istreambuf_iterator
<
char
>
());
test
::
TestCaseSet
case_set
;
std
::
string
err
;
if
(
!
json2pb
::
JsonToProtoMessage
(
case_set_json
,
&
case_set
,
&
err
))
{
LOG
(
FATAL
)
<<
"Fail to trans case_set from json to protobuf message: "
<<
err
;
}
_case_set
=
case_set
;
ifs
.
close
();
}
brpc
::
Server
_server
;
EchoServiceImpl
*
_echo_service
;
test
::
TestCaseSet
_case_set
;
int
_case_index
;
};
int
main
(
int
argc
,
char
*
argv
[])
{
// Parse gflags. We recommend you to use gflags as well.
GFLAGS_NS
::
ParseCommandLineFlags
(
&
argc
,
&
argv
,
true
);
bthread
::
FLAGS_bthread_concurrency
=
FLAGS_server_bthread_concurrency
;
brpc
::
Server
server
;
ControlServiceImpl
control_service_impl
;
if
(
server
.
AddService
(
&
control_service_impl
,
brpc
::
SERVER_DOESNT_OWN_SERVICE
)
!=
0
)
{
LOG
(
ERROR
)
<<
"Fail to add service"
;
return
-
1
;
}
if
(
server
.
Start
(
FLAGS_cntl_port
,
NULL
)
!=
0
)
{
LOG
(
ERROR
)
<<
"Fail to start EchoServer"
;
return
-
1
;
}
server
.
RunUntilAskedToQuit
();
return
0
;
}
example/auto_concurrency_limiter/settings.flags
0 → 100644
View file @
75cfdf6a
--ABTest=true
--auto_cl_min_reserved_concurrency=20
--case_file=test_case.json
--client_frequent_interval_us=5000000
--server_frequent_interval_us=5000000
example/auto_concurrency_limiter/test_case.json
0 → 100644
View file @
75cfdf6a
{
"test_case"
:[
{
"case_name"
:
"CheckPeakQps"
,
"max_concurrency"
:
"140"
,
"qps_stage_list"
:
[
{
"lower_bound"
:
3000
,
"upper_bound"
:
3000
,
"duration_sec"
:
3
,
"type"
:
2
}
],
"latency_stage_list"
:
[
{
"lower_bound"
:
20000
,
"upper_bound"
:
20000
,
"duration_sec"
:
3
,
"type"
:
2
}
]
},
{
"case_name"
:
"qps_stable_noload, latency_raise_smooth"
,
"max_concurrency"
:
"auto"
,
"qps_stage_list"
:
[
{
"lower_bound"
:
100
,
"upper_bound"
:
1500
,
"duration_sec"
:
10
,
"type"
:
2
},
{
"lower_bound"
:
1500
,
"upper_bound"
:
1500
,
"duration_sec"
:
190
,
"type"
:
2
}
],
"latency_stage_list"
:
[
{
"lower_bound"
:
2000
,
"upper_bound"
:
90000
,
"duration_sec"
:
200
,
"type"
:
2
}
]
},
{
"case_name"
:
"qps_fluctuate_noload, latency_stable"
,
"max_concurrency"
:
"auto"
,
"qps_stage_list"
:
[
{
"lower_bound"
:
100
,
"upper_bound"
:
300
,
"duration_sec"
:
10
,
"type"
:
2
},
{
"lower_bound"
:
300
,
"upper_bound"
:
1800
,
"duration_sec"
:
290
,
"type"
:
1
}
],
"latency_stage_list"
:
[
{
"lower_bound"
:
40000
,
"upper_bound"
:
40000
,
"duration_sec"
:
300
,
"type"
:
1
}
]
},
{
"case_name"
:
"qps_stable_overload, latency_stable"
,
"max_concurrency"
:
"auto"
,
"qps_stage_list"
:
[
{
"lower_bound"
:
200
,
"upper_bound"
:
3000
,
"duration_sec"
:
20
,
"type"
:
2
},
{
"lower_bound"
:
3000
,
"upper_bound"
:
3000
,
"duration_sec"
:
180
,
"type"
:
2
}
],
"latency_stage_list"
:
[
{
"lower_bound"
:
40000
,
"upper_bound"
:
40000
,
"duration_sec"
:
200
,
"type"
:
2
}
]
},
{
"case_name"
:
"qps_stable_overload, latency_raise_smooth"
,
"max_concurrency"
:
"auto"
,
"qps_stage_list"
:
[
{
"lower_bound"
:
200
,
"upper_bound"
:
3000
,
"duration_sec"
:
20
,
"type"
:
2
},
{
"lower_bound"
:
3000
,
"upper_bound"
:
3000
,
"duration_sec"
:
180
,
"type"
:
2
}
],
"latency_stage_list"
:
[
{
"lower_bound"
:
30000
,
"upper_bound"
:
80000
,
"duration_sec"
:
200
,
"type"
:
2
}
]
},
{
"case_name"
:
"qps_overload_then_noload, latency_stable"
,
"max_concurrency"
:
"auto"
,
"qps_stage_list"
:
[
{
"lower_bound"
:
200
,
"upper_bound"
:
2500
,
"duration_sec"
:
20
,
"type"
:
2
},
{
"lower_bound"
:
2500
,
"upper_bound"
:
2500
,
"duration_sec"
:
150
,
"type"
:
2
},
{
"lower_bound"
:
2500
,
"upper_bound"
:
1000
,
"duration_sec"
:
20
,
"type"
:
2
},
{
"lower_bound"
:
1000
,
"upper_bound"
:
1000
,
"duration_sec"
:
150
,
"type"
:
2
}
],
"latency_stage_list"
:
[
{
"lower_bound"
:
30000
,
"upper_bound"
:
30000
,
"duration_sec"
:
200
,
"type"
:
2
}
]
},
{
"case_name"
:
"qps_noload_to_overload, latency_stable"
,
"max_concurrency"
:
"auto"
,
"qps_stage_list"
:
[
{
"lower_bound"
:
200
,
"upper_bound"
:
3000
,
"duration_sec"
:
150
,
"type"
:
2
},
{
"lower_bound"
:
3000
,
"upper_bound"
:
3000
,
"duration_sec"
:
150
,
"type"
:
2
}
],
"latency_stage_list"
:
[
{
"lower_bound"
:
30000
,
"upper_bound"
:
30000
,
"duration_sec"
:
200
,
"type"
:
2
}
]
},
{
"case_name"
:
"qps_fluctuate_noload, latency_fluctuate_noload"
,
"max_concurrency"
:
"auto"
,
"qps_stage_list"
:
[
{
"lower_bound"
:
100
,
"upper_bound"
:
300
,
"duration_sec"
:
10
,
"type"
:
2
},
{
"lower_bound"
:
300
,
"upper_bound"
:
1800
,
"duration_sec"
:
190
,
"type"
:
1
}
],
"latency_stage_list"
:
[
{
"lower_bound"
:
30000
,
"upper_bound"
:
50000
,
"duration_sec"
:
200
,
"type"
:
1
}
]
},
{
"case_name"
:
"qps_stable_noload, latency_leap_raise"
,
"max_concurrency"
:
"auto"
,
"qps_stage_list"
:
[
{
"lower_bound"
:
300
,
"upper_bound"
:
1800
,
"duration_sec"
:
20
,
"type"
:
2
},
{
"lower_bound"
:
1800
,
"upper_bound"
:
1800
,
"duration_sec"
:
220
,
"type"
:
2
}
],
"latency_stage_list"
:
[
{
"lower_bound"
:
30000
,
"upper_bound"
:
30000
,
"duration_sec"
:
100
,
"type"
:
2
},
{
"lower_bound"
:
50000
,
"upper_bound"
:
50000
,
"duration_sec"
:
100
,
"type"
:
2
}
]
}
]}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment