Commit ec901d82 authored by gejun's avatar gejun

convert more docs

parent 75d371e7
# Introduction
COMAKE中增加bvar依赖:`CONFIGS('public/bvar@ci-base')`
源文件中`#include <bvar/bvar.h>`
bvar分为多个具体的类,常用的有:
- bvar::Adder<T> : 计数器,默认0,varname << N相当于varname += N。
- bvar::Maxer<T> : 求最大值,默认std::numeric_limits<T>::min(),varname << N相当于varname = max(varname, N)。
- bvar::Miner<T> : 求最小值,默认std::numeric_limits<T>::max(),varname << N相当于varname = min(varname, N)。
- bvar::IntRecorder : 求自使用以来的平均值。注意这里的定语不是“一段时间内”。一般要通过Window衍生出时间窗口内的平均值。
- bvar::Window<VAR> : 获得某个bvar在一段时间内的累加值。Window衍生于已存在的bvar,会自动更新。
- bvar::PerSecond<VAR> : 或的某个bvar在一段时间内平均每秒的累加值。PerSecond也是会自动更新的衍生变量。
- bvar::LatencyRecorder : 专用于记录延时和qps的变量。输入延时,平均延时/最大延时/qps/总次数 都有了。
例子:
```
// 构造时不带名字,则无法被查询到。并不是所有的bvar都会显示在/vars
bvar::Adder<int> request_count1;
// 构造时带了名字就可以被查询到,现在查询/vars应该可以看到"request_count2: 0"
bvar::Adder<int> request_count2("request_count2");
// 或者可以调用expose,第一个变量也可以被查询了
request_count1.expose("request_count1");
// 换一个名字
request_count1.expose("request_count1_another");
// 让第一个变量重新不能被查询
request_count1.hide();
// 各自加上一些数,两个变量目前的值应当分别是6和-1
request_count1 << 1 << 2 << 3;
request_count2 << -1;
LOG(INFO) << "result1=" << request_count1 << " result2=" << request_count2;
// 统计上一分钟request_count1的变化量
bvar::Window<bvar::Adder<int> > request_count1_minute("request_count1_minute", &request_count1, 60);
// 统计上一分钟request_count1的"qps"
bvar::PerSecond<bvar::Adder<int> > request_count1_per_second("request_count1_per_second", &request_count1, 60);
// 让我们每隔一秒钟给request_count1加上1.
request_count1.reset(); // 清0
for (int i = 0; i < 120; ++i) {
request_count1 << 1;
// 依次看到1, 2, 3 ...直到60后保持不变,其中一些数字可能跳过或重复,最差情况下Window有1秒延时。
LOG(INFO) << "request_count1_minute=" << request_count1_minute;
// 开始可能由0,之后一直看到1, 1, 1 ...,最差情况下PerSecond也有1秒延时。
LOG(INFO) << "request_count1_per_second=" << request_count1_per_second;
sleep(1);
}
```
About thread-safety:
- bvar是线程兼容的。你可以在不同的线程里操作不同的bvar。比如你可以在多个线程中同时expose或hide**不同的**bvar,它们会合理地操作需要共享的全局数据,是安全的。
- **除了读写接口**,bvar的其他函数都是线程不安全的:比如说你不能在多个线程中同时expose或hide**同一个**bvar,这很可能会导致程序crash。一般来说,读写之外的其他接口也没有必要在多个线程中同时操作。
# bvar::Variable
Variable是所有bvar的基类,主要提供全局注册,列举,查询等功能。
用户以默认参数建立一个bvar时,这个bvar并未注册到任何全局结构中,在这种情况下,bvar纯粹是一个更快的计数器。我们称把一个bvar注册到全局表中的行为为”曝光“,可通过**expose**函数曝光:
```
// Expose this variable globally so that it's counted in following functions:
// list_exposed
// count_exposed
// describe_exposed
// find_exposed
// Return 0 on success, -1 otherwise.
int expose(const base::StringPiece& name);
int expose(const base::StringPiece& prefix, const base::StringPiece& name);
```
全局曝光后的bvar名字便为name或prefix + name,可通过以_exposed为后缀的static函数查询。比如Variable::describe_exposed(name)会返回名为name的bvar的描述。
当相同名字的bvar已存在时,expose会打印FATAL日志并返回-1。如果选项**--bvar_abort_on_same_name**设为true (默认是false),程序会直接abort。
下面是一些曝光bvar的例子:
```
bvar::Adder<int> count1;
count1 << 10 << 20 << 30; // values add up to 60.
count1.expose("my_count"); // expose the variable globally
CHECK_EQ("60", bvar::Variable::describe_exposed("count1"));
my_count.expose("another_name_for_count1"); // expose the variable with another name
CHECK_EQ("", bvar::Variable::describe_exposed("count1"));
CHECK_EQ("60", bvar::Variable::describe_exposed("another_name_for_count1"));
bvar::Adder<int> count2("count2"); // exposed in constructor directly
CHECK_EQ("0", bvar::Variable::describe_exposed("count2")); // default value of Adder<int> is 0
bvar::Status<std::string> status1("count2", "hello"); // the name conflicts. if -bvar_abort_on_same_name is true,
// program aborts, otherwise a fatal log is printed.
```
为避免重名,bvar的名字应加上前缀,建议为<namespace>_<module>_<name>。为了方便使用,我们提供了**expose_as**函数,接收一个前缀。
```
// Expose this variable with a prefix.
// Example:
// namespace foo {
// namespace bar {
// class ApplePie {
// ApplePie() {
// // foo_bar_apple_pie_error
// _error.expose_as("foo_bar_apple_pie", "error");
// }
// private:
// bvar::Adder<int> _error;
// };
// } // foo
// } // bar
int expose_as(const base::StringPiece& prefix, const base::StringPiece& name);
```
# Export all variables
我们提供dump_exposed函数导出进程中的所有已曝光的bvar:
```
// Implement this class to write variables into different places.
// If dump() returns false, Variable::dump_exposed() stops and returns -1.
class Dumper {
public:
virtual bool dump(const std::string& name, const base::StringPiece& description) = 0;
};
// Options for Variable::dump_exposed().
struct DumpOptions {
// Contructed with default options.
DumpOptions();
// If this is true, string-type values will be quoted.
bool quote_string;
// The ? in wildcards. Wildcards in URL need to use another character
// because ? is reserved.
char question_mark;
// Separator for white_wildcards and black_wildcards.
char wildcard_separator;
// Name matched by these wildcards (or exact names) are kept.
std::string white_wildcards;
// Name matched by these wildcards (or exact names) are skipped.
std::string black_wildcards;
};
class Variable {
...
...
// Find all exposed variables matching `white_wildcards' but
// `black_wildcards' and send them to `dumper'.
// Use default options when `options' is NULL.
// Return number of dumped variables, -1 on error.
static int dump_exposed(Dumper* dumper, const DumpOptions* options);
};
```
最常见的导出需求是通过HTTP接口查询和写入本地文件。前者在baidu-rpc中通过[/vars](http://wiki.baidu.com/display/RPC/vars)服务提供,后者则已实现在bvar中,由用户选择开启。该功能由5个gflags控制,你的程序需要使用[gflags](http://wiki.baidu.com/pages/viewpage.action?pageId=71698818)
![img](http://wiki.baidu.com/download/attachments/133624370/image2015-8-8%2023%3A18%3A21.png?version=1&modificationDate=1439047101000&api=v2)
用户可在程序启动前加上对应的gflags,在baidu-rpc中也可通过[/flags](http://wiki.baidu.com/display/RPC/flags)服务在启动后动态修改某个gflag。
当bvar_dump_file不为空时,程序会启动一个后台导出线程以bvar_dump_interval指定的间隔更新bvar_dump_file,其中包含了被bvar_dump_include匹配且不被bvar_dump_exclude匹配的所有bvar。
比如我们把所有的gflags修改为下图:
![img](http://wiki.baidu.com/download/attachments/133624370/image2015-8-9%2014%3A38%3A1.png?version=1&modificationDate=1439102281000&api=v2)
导出文件为:
```
$ cat bvar.echo_server.data
rpc_server_8002_builtin_service_count : 20
rpc_server_8002_connection_count : 1
rpc_server_8002_nshead_service_adaptor : baidu::rpc::policy::NovaServiceAdaptor
rpc_server_8002_service_count : 1
rpc_server_8002_start_time : 2015/07/24-21:08:03
rpc_server_8002_uptime_ms : 14740954
```
像”`iobuf_block_count : 8`”被bvar_dump_include过滤了,“`rpc_server_8002_error : 0`”则被bvar_dump_exclude排除了。
如果你的程序没有使用baidu-rpc,仍需要动态修改gflag(一般不需要),可以调用google::SetCommandLineOption(),如下所示:
```
#include <gflags/gflags.h>
...
if (google::SetCommandLineOption("bvar_dump_include", "*service*").empty()) {
LOG(ERROR) << "Fail to set bvar_dump_include";
return -1;
}
LOG(INFO) << "Successfully set bvar_dump_include to *service*";
```
请勿直接设置FLAGS_bvar_dump_file / FLAGS_bvar_dump_include / FLAGS_bvar_dump_exclude。
一方面这些gflag类型都是std::string,直接覆盖是线程不安全的;另一方面不会触发validator(检查正确性的回调),所以也不会启动后台导出线程。
# bvar::Reducer
Reducer用二元运算符把多个值合并为一个值,运算符需满足结合律,交换律,没有副作用。只有满足这三点,我们才能确保合并的结果不受线程私有数据如何分布的影响。像减法就不满足结合律和交换律,它无法作为此处的运算符。
```
// Reduce multiple values into one with `Op': e1 Op e2 Op e3 ...
// `Op' shall satisfy:
// - associative: a Op (b Op c) == (a Op b) Op c
// - commutative: a Op b == b Op a;
// - no side effects: a Op b never changes if a and b are fixed.
// otherwise the result is undefined.
template <typename T, typename Op>
class Reducer : public Variable;
```
reducer << e1 << e2 << e3的作用等价于reducer = e1 op e2 op e3。
常见的Redcuer子类有bvar::Adder, bvar::Maxer, bvar::Miner。
## bvar::Adder
顾名思义,用于累加,Op为+。
```
bvar::Adder<int> value;
value<< 1 << 2 << 3 << -4;
CHECK_EQ(2, value.get_value());
bvar::Adder<double> fp_value; // 可能有warning
fp_value << 1.0 << 2.0 << 3.0 << -4.0;
CHECK_DOUBLE_EQ(2.0, fp_value.get_value());
```
Adder<>可用于非基本类型,对应的类型至少要重载`T operator+(T, T)`。一个已经存在的例子是std::string,下面的代码会把string拼接起来:
```
// This is just proof-of-concept, don't use it for production code because it makes a
// bunch of temporary strings which is not efficient, use std::ostringstream instead.
bvar::Adder<std::string> concater;
std::string str1 = "world";
concater << "hello " << str1;
CHECK_EQ("hello world", concater.get_value());
```
## bvar::Maxer
用于取最大值,运算符为std::max。
```
bvar::Maxer<int> value;
value<< 1 << 2 << 3 << -4;
CHECK_EQ(3, value.get_value());
```
Since Maxer<> use std::numeric_limits<T>::min() as the identity, it cannot be applied to generic types unless you specialized std::numeric_limits<> (and overloaded operator<, yes, not operator>).
## bvar::Miner
用于取最小值,运算符为std::min。
```
bvar::Maxer<int> value;
value<< 1 << 2 << 3 << -4;
CHECK_EQ(-4, value.get_value());
```
Since Miner<> use std::numeric_limits<T>::max() as the identity, it cannot be applied to generic types unless you specialized std::numeric_limits<> (and overloaded operator<).
# bvar::IntRecorder
用于计算平均值。
```
// For calculating average of numbers.
// Example:
// IntRecorder latency;
// latency << 1 << 3 << 5;
// CHECK_EQ(3, latency.average());
class IntRecorder : public Variable;
```
# bvar::LatencyRecorder
专用于计算latency和qps的计数器。只需填入latency数据,就能获得latency / max_latency / qps / count。统计窗口是最后一个参数,不填为bvar_dump_interval(这里没填)。
注意:LatencyRecorder没有继承Variable,而是多个bvar的组合。
```
LatencyRecorder write_latency("table2_my_table_write"); // produces 4 variables:
// table2_my_table_write_latency
// table2_my_table_write_max_latency
// table2_my_table_write_qps
// table2_my_table_write_count
// In your write function
write_latency << the_latency_of_write;
```
# bvar::Window
获得之前一段时间内的统计值。Window不能独立存在,必须依赖于一个已有的计数器。Window会自动更新,不用给它发送数据。出于性能考虑,Window的数据来自于每秒一次对原计数器的采样,在最差情况下,Window的返回值有1秒的延时。
```
// Get data within a time window.
// The time unit is 1 second fixed.
// Window relies on other bvar which should be constructed before this window and destructs after this window.
// R must:
// - have get_sampler() (not require thread-safe)
// - defined value_type and sampler_type
template <typename R>
class Window : public Variable;
```
# bvar::PerSecond
获得之前一段时间内平均每秒的统计值。它和Window基本相同,除了返回值会除以时间窗口之外。
```
bvar::Adder<int> sum;
// sum_per_second.get_value()是sum在之前60秒内*平均每秒*的累加值,省略最后一个时间窗口的话默认为bvar_dump_interval。
bvar::PerSecond<bvar::Adder<int> > sum_per_second(&sum, 60);
```
**PerSecond并不总是有意义**
上面的代码中没有Maxer,因为一段时间内的最大值除以时间窗口是没有意义的。
```
bvar::Maxer<int> max_value;
// 错误!最大值除以时间是没有意义的
bvar::PerSecond<bvar::Maxer<int> > max_value_per_second_wrong(&max_value);
// 正确,把Window的时间窗口设为1秒才是正确的做法
bvar::Window<bvar::Maxer<int> > max_value_per_second(&max_value, 1);
```
## 和Window的差别
比如要统计内存在上一分钟内的变化,用Window<>的话,返回值的含义是”上一分钟内存增加了18M”,用PerSecond<>的话,返回值的含义是“上一分钟平均每秒增加了0.3M”。
Window的优点是精确值,适合一些比较小的量,比如“上一分钟的错误数“,如果这用PerSecond的话,得到可能是”上一分钟平均每秒产生了0.0167个错误",这相比于”上一分钟有1个错误“显然不够清晰。另外一些和时间无关的量也要用Window,比如统计上一分钟cpu占用率的方法是用一个Adder同时累加cpu时间和真实时间,然后用Window获得上一分钟的cpu时间和真实时间,两者相除就得到了上一分钟的cpu占用率,这和时间无关,用PerSecond会产生错误的结果。
# bvar::Status
记录和显示一个值,拥有额外的set_value函数。
```
// Display a rarely or periodically updated value.
// Usage:
// bvar::Status<int> foo_count1(17);
// foo_count1.expose("my_value");
//
// bvar::Status<int> foo_count2;
// foo_count2.set_value(17);
//
// bvar::Status<int> foo_count3("my_value", 17);
//
// Notice that Tp needs to be std::string or acceptable by boost::atomic<Tp>.
template <typename Tp>
class Status : public Variable;
```
# bvar::PassiveStatus
按需显示值。在一些场合中,我们无法set_value或不知道以何种频率set_value,更适合的方式也许是当需要显示时才打印。用户传入打印回调函数实现这个目的。
```
// Display a updated-by-need value. This is done by passing in an user callback
// which is called to produce the value.
// Example:
// int print_number(void* arg) {
// ...
// return 5;
// }
//
// // number1 : 5
// bvar::PassiveStatus status1("number1", print_number, arg);
//
// // foo_number2 : 5
// bvar::PassiveStatus status2(typeid(Foo), "number2", print_number, arg);
template <typename Tp>
class PassiveStatus : public Variable;
```
虽然很简单,但PassiveStatus是最有用的bvar之一,因为很多统计量已经存在,我们不需要再次存储它们,而只要按需获取。比如下面的代码声明了一个在linux下显示进程用户名的bvar:
```
static void get_username(std::ostream& os, void*) {
char buf[32];
if (getlogin_r(buf, sizeof(buf)) == 0) {
buf[sizeof(buf)-1] = '\0';
os << buf;
} else {
os << "unknown";
}
}
PassiveStatus<std::string> g_username("process_username", get_username, NULL);
```
# bvar::GFlag
Expose important gflags as bvar so that they're monitored (in noah).
```
DEFINE_int32(my_flag_that_matters, 8, "...");
// Expose the gflag as *same-named* bvar so that it's monitored (in noah).
static bvar::GFlag s_gflag_my_flag_that_matters("my_flag_that_matters");
// ^
// the gflag name
// Expose the gflag as a bvar named "foo_bar_my_flag_that_matters".
static bvar::GFlag s_gflag_my_flag_that_matters_with_prefix("foo_bar", "my_flag_that_matters");
```
\ No newline at end of file
Client指发起请求的一端,在baidu-rpc中没有对应的实体,取而代之的是[baidu::rpc::Channel](http://websvn.work.baidu.com/repos/public/show/trunk/baidu-rpc/src/baidu/rpc/channel.h?revision=HEAD),它代表和一台或一组服务器的交互通道,Client和Channel在角色上的差别在实践中并不重要,你可以把Channel视作Client。
Channel可以被进程中的所有线程共用,你不需要为每个线程创建独立的Channel,也不需要用锁互斥。不过Channel的创建和析构并不是线程安全的,请确保在Init成功后再被多线程访问,在没有线程访问后再析构。
一些RPC实现中有RpcClient的概念,包含了Client端的配置信息和资源管理。baidu-rpc不需要这些,以往在RpcClient中配置的线程数、长短连接等等要么被加入了Channel,要么可以通过gflags全局配置,这么做的好处:
1. 方便。你不需要在创建Channel时传入RpcClient,也不需要存储RpcClient。以往不少代码需要传递RpcClient,比较麻烦。gflags使你无需写代码就能通过命令行或配置文件改变程序的行为。
2. 共用资源。比如server和channel可以共用后台线程。
3. 生命周期。析构RpcClient的过程很容易出错,现在由框架负责则不会有问题。
就像大部分类那样,Channel必须在**Init**之后才能使用,options为NULL时所有参数取默认值,如果你要使用非默认值,这么做就行了:
```
baidu::rpc::ChannelOptions options; // 包含了默认值
options.xxx = yyy;
...
channel.Init(..., &options);
```
Init函数分为连接一台服务器和连接服务集群。
# 连接一台服务器
**channel.h**
```
// options为NULL时取默认值。注意Channel不会修改options,Init结束后不会再访问options。所以options一般放栈上。
int Init(EndPoint server_addr_and_port, const ChannelOptions* options);
int Init(const char* server_addr_and_port, const ChannelOptions* options);
int Init(const char* server_addr, int port, const ChannelOptions* options);
```
这类Channel连接的服务器往往有固定的ip地址,不需要名字服务和负载均衡,创建起来相对轻量。但是**请勿频繁创建使用域名的Channel**。这需要查询dns,可能最多耗时10秒。
合法的“server_addr_and_port”:
- 127.0.0.1:80
- [cq01-cos-dev00.cq01.baidu.com](http://cq01-cos-dev00.cq01.baidu.com/):8765
- localhost:9000
不合法的"server_addr_and_port":
- 127.0.0.1:90000 # 端口过大
- 10.39.2.300:8000 # 非法的ip
# 连接服务集群
**channel.h**
```
int Init(const char* naming_service_url, const char* load_balancer_name, const ChannelOptions* options);
```
options为NULL时取默认值。注意Channel不会存储options,Init结束后也不会再访问options。所以options随用随建放栈上就行了。channel.options()可以获得channel正在使用的所有选项。
这类Channel需要定期从名字服务中获得服务器列表,并通过负载均衡算法选择出一台或若干台机器发送请求。创建和析构此类Channel牵涉到较多的资源,比如在创建时得访问一次名字服务(否则便不知道有哪些服务器可选)。由于Channel可被多个线程共用,一般也没有必要动态创建。
**不应该**在每次请求前动态地创建此类(连接服务集群的)Channel。
r31806之后当load_balancer_name为NULL或空时,此Init转为连接单台server,naming_service_url应该是server的ip+port 或 域名+port。你可以通过这个Init函数统一Channel的初始化方式。比如你可以把naming_service_url和load_balancer_name放在配置文件中,要连接单台server时把load_balancer_name置空,要连接多台server时则设置一个有效的算法名称。
## 名字服务
名字服务把一个名字映射为可修改的机器列表,在client端的位置如下:
![img](http://wiki.baidu.com/download/attachments/71337222/image2015-7-6%2016%3A11%3A41.png?version=1&modificationDate=1436170301000&api=v2)
有了名字服务后上游记录的是一个名字,而不是每一台下游机器。而当下游机器变化时,就只需要修改名字服务中的列表,而不需要逐台修改每个上游,因为上游会定期请求或被推送最新的列表。这个过程也常被称为“解耦上下游”。通过naming_service_url选择名字服务,一般形式是"**protocol://service_name**"
### bns://<bns-name>
BNS是百度内常用的名字服务,比如bns://rdev.matrix.all,其中"bns"是protocol,"rdev.matrix.all"是service-name。相关一个gflag是[-ns_access_interval](http://brpc.baidu.com:8765/flags/ns_access_interval)
如果bns中显示不为空,但Channel却说找不到服务器,那么有可能bns列表中的机器状态位(status)为非0,含义为机器不可用,所以不会被加入到server候选集中,具体可通过命令行查看:
`get_instance_by_service [bns_node_name] -s`
### file://<path>
服务器列表放在path所在的文件里,比如"file://conf/local_machine_list"中的“conf/local_machine_list”对应一个文件,其中每行应是一台服务器的地址。当文件更新时,框架会重新加载。
### list://<addr1>,<addr2>...
服务器列表直接跟在list://之后,以逗号分隔,比如"list://db-bce-81-3-186.db01:7000,m1-bce-44-67-72.m1:7000,cp01-rd-cos-006.cp01:7000" 中有三个地址。
### http://<url>
连接一个域名下所有的机器, 例如http://www.baidu.com:80 ,注意连接单点的Init(两个参数)虽然也可传入域名,但只会连接域名下的一台机器。
### 名字服务过滤器
当名字服务获得机器列表后,可以自定义一个过滤器进行筛选,最后把结果传递给负载均衡:
![img](http://wiki.baidu.com/download/attachments/71337222/dafe68fd-6ace-444a-8df9-72bf0311298d.JPG?version=1&modificationDate=1463536052000&api=v2)
过滤器的接口如下:
```
// naming_service_filter.h
class NamingServiceFilter {
public:
// Return true to take this `server' as a candidate to issue RPC
// Return false to filter it out
virtual bool Accept(const ServerNode& server) const = 0;
};
// naming_service.h
struct ServerNode {
base::EndPoint addr;
std::string tag;
};
```
常见的业务策略如根据bns中每个server不同tag进行过滤,自定义的过滤器配置在ChannelOptions中,默认为NULL(不过滤):
```
class MyNamingServiceFilter : public baidu::rpc::NamingServiceFilter {
public:
bool Accept(const baidu::rpc::ServerNode& server) const {
return server.tag == "main";
}
};
int main() {
...
MyNamingServiceFilter my_filter;
...
baidu::rpc::ChannelOptions options;
options.ns_filter = &my_filter;
...
}
```
## 负载均衡
当下游机器超过一台时,我们需要分割流量,此过程一般称为负载均衡,在client端的位置如下图所示:
![img](http://wiki.baidu.com/download/attachments/71337222/image2015-7-6%2016%3A7%3A48.png?version=1&modificationDate=1436170068000&api=v2)
理想的算法是每个请求都得到及时的处理,且任意机器crash对全局影响较小。但由于client端无法及时获得server端的拥塞信息,而且负载均衡算法不能耗费太多的cpu,一般来说用户得根据具体的场景选择合适的算法,目前rpc提供的算法有(通过load_balancer_name指定):
### rr
即round robin,总是选择列表中的下一台服务器,结尾的下一台是开头,无需其他设置。比如有3台机器a,b,c,那么框架会依次向a, b, c, a, b, c, ...发送请求。注意这个算法的前提是服务器的配置,网络条件,负载都是类似的。
### random
随机从列表中选择一台服务器,无需其他设置。和round robin类似,这个算法的前提也是服务器都是类似的。
### la
locality-aware,优先选择延时低的下游,直到其延时高于其他机器,无需其他设置。实现原理请查看[Locality-aware load balancing](http://wiki.baidu.com/pages/viewpage.action?pageId=38012521)
### c_murmurhash or c_md5
一致性哈希,与简单hash的不同之处在于增加或删除机器时不会使分桶结果剧烈变化,特别适合cache类服务。
发起RPC前需要设置Controller.set_request_code(),否则RPC会失败。request_code一般是请求中主键部分的32位哈希值,**不需要和负载均衡使用的哈希算法一致**。比如用c_murmurhash算法也可以用md5计算哈希值。
[baidu/rpc/policy/hasher.h](https://svn.baidu.com/public/trunk/baidu-rpc/src/baidu/rpc/policy/hasher.h)中包含了常用的hash函数。如果用std::string key代表请求的主键,controller.set_request_code(baidu::rpc::policy::MurmurHash32(key.data(), key.size()))就正确地设置了request_code。
注意甄别请求中的“主键”部分和“属性”部分,不要为了偷懒或通用,就把请求的所有内容一股脑儿计算出哈希值,属性的变化会使请求的目的地发生剧烈的变化。另外也要注意padding问题,比如struct Foo { int32_t a; int64_t b; }在64位机器上a和b之间有4个字节的空隙,内容未定义,如果像hash(&foo, sizeof(foo))这样计算哈希值,结果就是未定义的,得把内容紧密排列或序列化后再算。
实现原理请查看[Consistent Hashing](http://wiki.baidu.com/pages/viewpage.action?pageId=105311464)
## 健康检查
连接断开的server会被暂时隔离而不会被负载均衡算法选中,baidu-rpc会定期连接被隔离的server(间隔由参数[-health_check_interval](http://brpc.baidu.com:8765/flags/health_check_interval)控制),一旦server被连接上,它会恢复为可用状态。如果在隔离过程中,server从名字服务中删除了,baidu-rpc也会停止连接尝试。
# 发起访问
一般来说,我们不直接调用Channel.CallMethod,而是通过protobuf生成的桩XXX_Stub,过程更像是“调用函数”。stub内没什么成员变量,建议在栈上创建和使用,而不必new,当然你也可以把stub存下来复用。Channel::CallMethod和stub访问都是线程安全的。比如:
```
XXX_Stub stub(&channel);
stub.some_method(controller, request, response, done);
```
甚至
```
XXX_Stub(&channel).some_method(controller, request, response, done);
```
一个例外是http client。访问http服务和protobuf没什么关系,直接调用CallMethod即可,除了Controller和done均为NULL,详见[访问HTTP服务](http://wiki.baidu.com/pages/viewpage.action?pageId=213828697)
## 同步访问
同步访问指的是:CallMethod会阻塞到server端返回response或发生错误(包括超时)。
由于同步访问中CallMethod结束意味着RPC结束,response/controller不再会被框架使用,它们都可以分配在栈上。注意,如果request/response字段特别多字节数特别大的话,还是更适合分配在堆上。
```
MyRequest request;
MyResponse response;
baidu::rpc::Controller cntl;
XXX_Stub stub(&channel);
request.set_foo(...);
cntl.set_timeout_ms(...);
stub.some_method(&cntl, &request, &response, NULL);
if (cntl->Failed()) {
// RPC出错了
} else {
// RPC成功了,response里有我们想要的回复数据。
}
```
## 异步访问
异步访问指的是给CallMethod传递一个额外的回调对象done,CallMethod会在发出request后就结束了,而不是在RPC结束后。当server端返回response或发生错误(包括超时)时,done->Run()会被调用。对RPC的后续处理应该写在done->Run()里,而不是CallMethod后。
由于CallMethod结束不意味着RPC结束,response/controller仍可能被框架及done->Run()使用,它们一般得创建在堆上,并在done->Run()中删除。如果提前删除了它们,那当done->Run()被调用时,将访问到无效内存。
你可以独立地创建这些对象,并使用NewCallback生成done(见下文“使用NewCallback”),也可以把Response和Controller作为done的成员变量,一起new出来(见下文“继承google::protobuf::Closure”),一般使用前一种方法。
**发起异步请求后Request和Channel也可以立刻析构**
这两样和response/controller是不同的。请注意,这是说Channel的析构可以立刻发生在CallMethod**之后**,并不是说析构可以和CallMethod同时发生,删除正被另一个线程使用的Channel是未定义行为(很可能crash)。
### 使用NewCallback
```
static void OnRPCDone(MyResponse* response, baidu::rpc::Controller* cntl) {
// unique_ptr会帮助我们在return时自动删掉response/cntl,防止忘记。gcc 3.4下的unique_ptr是public/common提供的模拟版本。
std::unique_ptr<MyResponse> response_guard(response);
std::unique_ptr<baidu::rpc::Controller> cntl_guard(cntl);
if (cntl->Failed()) {
// RPC出错了. response里的值是未定义的,勿用。
} else {
// RPC成功了,response里有我们想要的数据。开始RPC的后续处理。
}
// NewCallback产生的Closure会在Run结束后删除自己,不用我们做。
}
MyResponse* response = new MyResponse;
baidu::rpc::Controller* cntl = new baidu::rpc::Controller;
MyService_Stub stub(&channel);
MyRequest request; // you don't have to new request, even in an asynchronous call.
request.set_foo(...);
cntl->set_timeout_ms(...);
stub.some_method(cntl, &request, response, google::protobuf::NewCallback(OnRPCDone, response, cntl));
```
由于protobuf 3把NewCallback设置为私有,r32035后baidu-rpc把NewCallback独立于[src/baidu/rpc/callback.h](https://svn.baidu.com/public/trunk/baidu-rpc/src/baidu/rpc/callback.h)。如果你的程序出现NewCallback相关的编译错误,把google::protobuf::NewCallback替换为baidu::rpc::NewCallback就行了。
### 继承google::protobuf::Closure
使用NewCallback的缺点是要分配三次内存:response, controller, done。如果profiler证明这儿的内存分配有瓶颈,可以考虑自己继承Closure,把response/controller作为成员变量,这样可以把三次new合并为一次。但缺点就是代码不够美观,如果内存分配不是瓶颈,别用这种方法。`
```
class OnRPCDone: public google::protobuf::Closure {
public:
void Run() {
// unique_ptr会帮助我们在return时自动delete this,防止忘记。gcc 3.4下的unique_ptr是public/common提供的模拟版本。
std::unique_ptr<OnRPCDone> self_guard(this);
if (cntl->Failed()) {
// RPC出错了. response里的值是未定义的,勿用。
} else {
// RPC成功了,response里有我们想要的数据。开始RPC的后续处理。
}
}
MyResponse response;
baidu::rpc::Controller cntl;
}
OnRPCDone* done = new OnRPCDone;
MyService_Stub stub(&channel);
MyRequest request; // you don't have to new request, even in an asynchronous call.
request.set_foo(...);
done->cntl.set_timeout_ms(...);
stub.some_method(&done->cntl, &request, &done->response, done);
```
### 如果异步访问中的回调函数特别复杂会有什么影响
没有特别的影响,回调会运行在独立的bthread中,不会阻塞其他的逻辑。你可以在回调中做各种阻塞操作。
### rpc发送处的代码和回调函数是在同一个线程里执行吗
一定不在同一个线程里运行,即使该次rpc调用刚进去就失败了,回调也会在另一个bthread中运行。这可以在加锁进行rpc(不推荐)的代码中避免死锁。
## 等待RPC完成
当你需要发起多个并发操作时,可能[ParallelChannel](http://wiki.baidu.com/pages/viewpage.action?pageId=213828709#id-组合访问-ParallelChannel)更方便。
如下代码发起两个异步RPC后等待它们完成。
```
const baidu::rpc::CallId cid1 = controller1->call_id();
const baidu::rpc::CallId cid2 = controller2->call_id();
...
stub.method1(controller1, request1, response1, done1);
stub.method2(controller2, request2, response2, done2);
...
baidu::rpc::Join(cid1);
baidu::rpc::Join(cid2);
```
**在发起RPC前**调用Controller.call_id()获得一个id,发起RPC调用后Join那个id。
Join()的行为是等到**RPC结束且调用了done后**,一些Join的性质如下:
- 如果对应的RPC已经结束,Join将立刻返回。
- 多个线程可以Join同一个id,RPC结束时都会醒来。
- 同步RPC也可以在另一个线程中被Join,但一般不会这么做。
Join()在之前的版本叫做JoinResponse(),如果你在编译时被提示deprecated之类的,请修改为Join()。
在RPC调用后Join(controller->call_id())是**错误**的行为,一定要先把call_id保存下来。因为RPC调用后controller可能被随时开始运行的done删除。
下面代码的Join方式是**错误**的。
```
static void on_rpc_done(Controller* controller, MyResponse* response) {
... Handle response ...
delete controller;
delete response;
}
Controller* controller1 = new Controller;
Controller* controller2 = new Controller;
MyResponse* response1 = new MyResponse;
MyResponse* response2 = new MyResponse;
...
stub.method1(controller1, &request1, response1, google::protobuf::NewCallback(on_rpc_done, controller1, response1));
stub.method2(controller2, &request2, response2, google::protobuf::NewCallback(on_rpc_done, controller2, response2));
...
baidu::rpc::Join(controller1->call_id()); // 错误,controller1可能被on_rpc_done删除了
baidu::rpc::Join(controller2->call_id()); // 错误,controller2可能被on_rpc_done删除了
```
## 半同步
Join可用来实现“半同步”操作:即等待多个异步操作返回。由于调用处的代码会等到多个RPC都结束后再醒来,所以controller和response都可以放栈上。
```
baidu::rpc::Controller cntl1;
baidu::rpc::Controller cntl2;
MyResponse response1;
MyResponse response2;
...
stub1.method1(&cntl1, &request1, &response1, baidu::rpc::DoNothing());
stub2.method2(&cntl2, &request2, &response2, baidu::rpc::DoNothing());
...
baidu::rpc::Join(cntl1.call_id());
baidu::rpc::Join(cntl2.call_id());
```
baidu::rpc::DoNothing()可获得一个什么都不干的done,专门用于半同步访问。它的生命周期由框架管理,用户不用关心。
注意在上面的代码中,我们在RPC结束后又访问了controller.call_id(),这是没有问题的,因为DoNothing中并不会像上面的on_rpc_done中那样删除Controller。
## 取消RPC
baidu::rpc::StartCancel(CallId)可取消任意RPC,CallId必须**在发起RPC前**通过Controller.call_id()获得,其他时刻都可能有race condition。
Icon
是baidu::rpc::StartCancel(CallId),不是controller.StartCancel(),后者被禁用,没有效果。
顾名思义,StartCancel调用完成后RPC并未立刻结束,你不应该碰触Controller的任何字段或删除任何资源,它们自然会在RPC结束时被done中对应逻辑处理。如果你一定要在原地等到RPC结束(一般不需要),则可通过Join(call_id)。
关于StartCancel的一些事实:
- call_id在发起RPC前就可以被取消,RPC会直接结束(done仍会被调用)。
- call_id可以在另一个线程中被取消。
- 取消一个已经取消的call_id不会有任何效果。推论:同一个call_id可以被多个线程同时取消,但最多一次有效果。
- 取消只是指client会忽略对应的RPC结果,**不意味着server端会取消对应的操作**,server cancelation是另一个功能。
## 获取Server的地址和端口
remote_side()方法可知道request被送向了哪个server,返回值类型是[base::EndPoint](https://svn.baidu.com/public/trunk/common/base/endpoint.h),包含一个ip4地址和端口。在RPC结束前调用这个方法都是没有意义的。
打印方式:
```
LOG(INFO) << "remote_side=" << cntl->remote_side();
printf("remote_side=%s\n", base::endpoint2str(cntl->remote_side()).c_str());
```
## 获取Client的地址和端口
r31384后通过local_side()方法可**在RPC结束后**获得发起RPC的地址和端口。
打印方式:
```
LOG(INFO) << "local_side=" << cntl->local_side();
printf("local_side=%s\n", base::endpoint2str(cntl->local_side()).c_str());
```
## 新建baidu::rpc::Controller的代价大吗
不大,不用刻意地重用,但Controller是个大杂烩,可能会包含一些缓存,Reset()可以避免反复地创建这些缓存。
在大部分场景下,构造Controller和重置Controller(通过Reset)的代价差不多,比如下面代码中的snippet1和snippet2性能差异不大。
```
// snippet1
for (int i = 0; i < n; ++i) {
baidu::rpc::Controller controller;
...
stub.CallSomething(..., &controller);
}
// snippet2
baidu::rpc::Controller controller;
for (int i = 0; i < n; ++i) {
controller.Reset();
...
stub.CallSomething(..., &controller);
}
```
但如果snippet1中的Controller是new出来的,那么snippet1就会多出“内存分配”的开销,在一些情况下可能会慢一些。
# 设置
Client端的设置主要由三部分组成:
- baidu::rpc::ChannelOptions: 定义在[src/baidu/rpc/channel.h](https://svn.baidu.com/public/trunk/baidu-rpc/src/baidu/rpc/channel.h)中,用于初始化Channel,一旦初始化成功无法修改。
- baidu::rpc::Controller: 定义在[src/baidu/rpc/controller.h](https://svn.baidu.com/public/trunk/baidu-rpc/src/baidu/rpc/controller.h)中,用于在某次RPC中覆盖ChannelOptions中的选项,可根据上下文每次均不同。
- 全局gflags:常用于调节一些底层代码的行为,一般不用修改。请自行阅读服务/flags页面中的说明。
Controller包含了request中没有的数据和选项。server端和client端的Controller结构体是一样的,但使用的字段可能是不同的,你需要仔细阅读Controller中的注释,明确哪些字段可以在server端使用,哪些可以在client端使用。
一个Controller对应一次RPC。一个Controller可以在Reset()后被另一个RPC复用,但一个Controller不能被多个RPC同时使用(不论是否在同一个线程发起)。
Controller的特点:
1. 一个Controller只能有一个使用者,没有特殊说明的话,Controller中的方法默认线程不安全。
2. 因为不能被共享,所以一般不会用共享指针管理Controller,如果你用共享指针了,很可能意味着出错了。
3. 创建于开始RPC前,析构于RPC结束后,常见几种模式:
- 同步RPC前Controller放栈上,出作用域后自行析构。注意异步RPC的Controller绝对不能放栈上,否则其析构时异步调用很可能还在进行中,从而引发未定义行为。
- 异步RPC前new Controller,done中删除。
- 异步RPC前从某个全局或thread-local的pool中取出Controller,done中Reset()并归还pool。当然Reset()也可发生在取出时,但在归还时能更及时地释放资源。
## 超时
**ChannelOptions.timeout_ms**是对应Channel上一次RPC的超时,Controller.set_timeout_ms()可修改某次RPC的值。单位毫秒,默认值1秒,最大值2^31(约24天),-1表示一直等到回复或错误。
**ChannelOptions.connect_timeout_ms**是对应Channel上一次RPC的连接超时,单位毫秒,默认值1秒。-1表示等到连接建立或出错,此值被限制为不能超过timeout_ms。注意此超时独立于TCP的连接超时,一般来说前者小于后者,反之则可能在connect_timeout_ms未达到前由于TCP连接超时而出错。
注意1:baidu-rpc中的超时是deadline,超过就意味着RPC结束。UB/hulu中的超时既有单次访问的,也有代表deadline的。迁移到baidu-rpc时请仔细区分。
注意2:r31711后超时的错误码为**ERPCTIMEDOUT (1008)**,ETIMEDOUT的意思是连接超时。r31711前,超时的错误码是ETIMEDOUT (110)。原因:RPC内很早就区分了这两者,但考虑到linux下的使用习惯,在RPC结束前把ERPCTIMEDOUT改为了ETIMEDOUT。使用中我们逐渐发现不管是RPC内部实现(比如组合channel)还是一些用户场景都需要区分RPC超时和连接超时,综合考虑后决定不再合并这两个错误。如果你的程序中有诸如cntl->ErrorCode() == ETIMEDOUT的代码,你考虑下这里到底是否用对了,如果其实是在判RPC超时的话,得改成ERPCTIMEDOUT。
## 重试
ChannelOptions.max_retry是该Channel上所有RPC的默认最大重试次数,Controller.set_max_retry()可修改某次RPC的值,默认值3,0表示不重试。
r32111后Controller.retried_count()返回重试次数。r34717后Controller.has_backup_request()获知是否发送过backup_request。
**重试时框架会尽量避开之前尝试过的server。**
重试的触发条件有(条件之间是AND关系):
- 连接出错。如果server一直没有返回,但连接没有问题,这种情况下不会重试。如果你需要在一定时间后发送另一个请求,使用backup request。
- 没到超时。
- 有剩余重试次数。Controller.set_max_retry(0)或ChannelOptions.max_retry = 0可关闭重试。
- 重试对错误可能有效。比如请求有错时(EREQUEST)不会重试,因为server总不会接受,没有意义。
### 连接出错
如果server一直没有返回,但连接没有问题,这种情况下不会重试。如果你需要在一定时间后发送另一个请求,使用backup request,工作机制如下:如果response没有在backup_request_ms内返回,则发送另外一个请求,哪个先回来就取哪个。新请求会被尽量送到不同的server。如果backup_request_ms大于超时,则backup request总不会被发送。backup request会消耗一次重试次数。backup request不意味着server端cancel。
ChannelOptions.backup_request_ms影响该Channel上所有RPC,单位毫秒,默认值-1(表示不开启),Controller.set_backup_request_ms()可修改某次RPC的值。
### 没到超时
超时后RPC会尽快结束。
### 没有超过最大重试次数
Controller.set_max_retry()或ChannelOptions.max_retry设置最大重试次数,设为0关闭重试。
### 错误值得重试
一些错误重试是没有意义的,就不会重试,比如请求有错时(EREQUEST)不会重试,因为server总不会接受。
r32009后用户可以通过继承[baidu::rpc::RetryPolicy](https://svn.baidu.com/public/trunk/baidu-rpc/src/baidu/rpc/retry_policy.h)自定义重试条件。r34642后通过cntl->response()可获得对应RPC的response。对ERPCTIMEDOUT代表的RPC超时总是不重试,即使RetryPolicy中允许。
比如baidu-rpc默认不重试HTTP相关的错误,而你的程序中希望在碰到HTTP_STATUS_FORBIDDEN (403)时重试,可以这么做:
```
#include <baidu/rpc/retry_policy.h>
class MyRetryPolicy : public baidu::rpc::RetryPolicy {
public:
bool DoRetry(const baidu::rpc::Controller* cntl) const {
if (cntl->ErrorCode() == baidu::rpc::EHTTP && // HTTP错误
cntl->http_response().status_code() == baidu::rpc::HTTP_STATUS_FORBIDDEN) {
return true;
}
// 把其他情况丢给框架。
return baidu::rpc::DefaultRetryPolicy()->DoRetry(cntl);
}
};
...
// 给ChannelOptions.retry_policy赋值就行了。
// 注意:retry_policy必须在Channel使用期间保持有效,Channel也不会删除retry_policy,所以大部分情况下RetryPolicy都应以单例模式创建。
baidu::rpc::ChannelOptions options;
static MyRetryPolicy g_my_retry_policy;
options.retry_policy = &g_my_retry_policy;
...
```
### 重试应当保守
由于成本的限制,大部分线上server的冗余度是有限的,更多是满足多机房互备的需求。而激进的重试逻辑很容易导致众多client对server集群造成2-3倍的压力,最终使集群雪崩:由于server来不及处理导致队列越积越长,使所有的请求得经过很长的排队才被处理而最终超时,相当于服务停摆。r32009前重试整体上是安全的,只要连接不断RPC就不会重试,一般不会产生大量的重试请求。而r32009后引入的RetryPolicy一方面使用户可以定制重试条件,另一方面也可能使重试变成一场“风暴”。当你定制RetryPolicy时,你需要仔细考虑client和server的协作关系,并设计对应的异常测试,以确保行为符合预期。
## 协议
Channel的默认协议是标准协议,可通过设置ChannelOptions.protocol换为其他协议,这个字段既接受enum也接受字符串,目前支持的有:
- PROTOCOL_BAIDU_STD 或 “baidu_std",即[标准协议](http://gollum.baidu.com/RPCSpec),默认为单连接。
- PROTOCOL_HULU_PBRPC 或 "hulu_pbrpc",hulu的协议,默认为单连接。
- PROTOCOL_NOVA_PBRPC 或 ”nova_pbrpc“,网盟的协议,默认为连接池。
- PROTOCOL_HTTP 或 ”http", http协议,默认为连接池(Keep-Alive)。具体方法见[访问HTTP服务](http://wiki.baidu.com/pages/viewpage.action?pageId=213828697)。
- PROTOCOL_SOFA_PBRPC 或 "sofa_pbrpc",sofa-pbrpc的协议,默认为单连接。
- PROTOCOL_PUBLIC_PBRPC 或 "public_pbrpc",public/pbrpc的协议,默认为连接池。
- PROTOCOL_UBRPC_COMPACK 或 "ubrpc_compack",public/ubrpc的协议,使用compack打包,默认为连接池。具体方法见[ubrpc (by protobuf)](http://wiki.baidu.com/pages/viewpage.action?pageId=213828700#id-访问UB-ubrpc(byprotobuf))。相关的还有PROTOCOL_UBRPC_MCPACK2或ubrpc_mcpack2,使用mcpack2打包。
- PROTOCOL_NSHEAD_CLIENT 或 "nshead_client",这是发送baidu-rpc-ub中所有UBXXXRequest需要的协议,默认为连接池。具体方法见[访问ub](http://wiki.baidu.com/pages/viewpage.action?pageId=213828700)。
- PROTOCOL_NSHEAD 或 "nshead",这是baidu-rpc中发送NsheadMessage需要的协议,默认为连接池。注意发送NsheadMessage的效果等同于发送baidu-rpc-ub中的UBRawBufferRequest,但更加方便一点。具体方法见[nshead+blob](http://wiki.baidu.com/pages/viewpage.action?pageId=213828700#id-访问UB-nshead+blob) 。
- PROTOCOL_MEMCACHE 或 "memcache",memcached的二进制协议,默认为单连接。具体方法见[访问memcached](http://wiki.baidu.com/pages/viewpage.action?pageId=213828702)。
- PROTOCOL_REDIS 或 "redis",redis 1.2后的协议(也是hiredis支持的协议),默认为单连接。具体方法见[访问Redis](http://wiki.baidu.com/pages/viewpage.action?pageId=213828705)。
- PROTOCOL_ITP 或 "itp", 凤巢的协议,格式为nshead + control idl + user idl,使用mcpack2pb适配,默认为连接池。具体方法见[访问ITP](http://wiki.baidu.com/pages/viewpage.action?pageId=184259578)。
- PROTOCOL_NSHEAD_MCPACK 或 "nshead_mcpack", 顾名思义,格式为nshead + mcpack,使用mcpack2pb适配,默认为连接池。
- PROTOCOL_ESP 或 "esp",访问使用esp协议的服务,默认为连接池。
## 连接方式
baidu-rpc支持以下连接方式:
- 短连接:每次RPC call前建立连接,结束后关闭连接。由于每次调用得有建立连接的开销,这种方式一般用于偶尔发起的操作,而不是持续发起请求的场景。
- 连接池:每次RPC call前取用空闲连接,结束后归还,一个连接上最多只有一个请求,对一台server可能有多条连接。各类使用nshead的协议和http 1.1都是这个方式。
- 单连接:进程内与一台server最多一个连接,一个连接上可能同时有多个请求,回复返回顺序和请求顺序不需要一致,这是标准协议,hulu-pbrpc,sofa-pbrpc的默认选项。
| | 短连接 | 连接池 | 单连接 |
| ---------- | ---------------------------------------- | ------------------- | ----------------- |
| 长连接 | 否 (每次都要建立tcp连接) | 是 | 是 |
| server端连接数 | qps*latency (原理见[little's law](https://en.wikipedia.org/wiki/Little%27s_law)) | qps*latency | 1 |
| 极限qps | 差,且受限于单机端口数 | 中等 | 高 |
| latency | 1.5RTT(connect) + 1RTT + 处理时间 | 1RTT + 处理时间 | 1RTT + 处理时间 |
| cpu占用 | 高每次都要tcp connect | 中等每个请求都要一次sys write | 低合并写出在大流量时减少cpu占用 |
框架会为协议选择默认的连接方式,用户**一般不用修改**。若需要,把ChannelOptions.connection_type设为:
- CONNECTION_TYPE_SINGLE 或 "single" 为单连接
- CONNECTION_TYPE_POOLED 或 "pooled" 为连接池, 与单个远端的最大连接数由[-max_connection_pool_size](http://brpc.baidu.com:8765/flags/max_connection_pool_size)控制。
- CONNECTION_TYPE_SHORT 或 "short" 为短连接
- 设置为“”(空字符串)则让框架选择协议对应的默认连接方式。
r31468之后baidu-rpc支持[Streaming RPC](http://wiki.baidu.com/display/RPC/Streaming+RPC),这是一种应用层的连接,用于传递流式数据。
## 关闭连接池中的闲置连接
当连接池中的某个连接在-[idle_timeout_second](http://brpc.baidu.com:8765/flags/idle_timeout_second)时间内没有读写,则被视作“闲置”,会被自动关闭。打开[-log_idle_connection_close](http://brpc.baidu.com:8765/flags/log_idle_connection_close)后关闭前会打印一条日志。默认值为10秒。此功能只对连接池(pooled)有效。
## 延迟关闭连接
多个channel可能通过引用计数引用同一个连接,当引用某个连接的最后一个channel析构时,该连接将被关闭。但在一些场景中,channel在使用前才被创建,用完立刻析构,这时其中一些连接就会被无谓地关闭再被打开,效果类似短连接。
一个解决办法是用户把所有或常用的channel缓存下来,这样自然能避免channel频繁产生和析构,但目前baidu-rpc没有提供这样一个utility,用户自己(正确)实现有一些工作量。
另一个解决办法是设置全局选项[-defer_close_second](http://brpc.baidu.com:8765/flags/defer_close_second),设置后引用计数清0时连接并不会立刻被关闭,而是会等待这么多秒再关闭,如果在这段时间内又有channel引用了这个连接,它会恢复正常被使用的状态。不管channel创建析构有多频率,这个选项使得关闭连接的频率有上限。这个选项的副作用是一些fd不会被及时关闭,如果延时被误设为一个大数值,程序占据的fd个数可能会很大。
## 连接的缓冲区大小
[-socket_recv_buffer_size](http://brpc.baidu.com:8765/flags/socket_recv_buffer_size)设置所有连接的接收缓冲区大小,默认-1(不修改)
[-socket_send_buffer_size](http://brpc.baidu.com:8765/flags/socket_send_buffer_size)设置所有连接的发送缓冲区大小,默认-1(不修改)
## log_id
通过set_log_id()可设置log_id。这个id会被送到服务器端,一般会被打在日志里,从而把一次检索经过的所有服务串联起来。不同产品线可能有不同的叫法。一些产品线有字符串格式的“s值”,内容也是64位的16进制数,可以转成整型后再设入log_id。
## 附件
标准协议和hulu协议支持附件,这段数据由用户自定义,不经过protobuf的序列化。站在client的角度,设置在Controller::request_attachment()的附件会被server端收到,response_attachment()则包含了server端送回的附件。附件不受压缩选项影响。
在http协议中,附件对应[message body](http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html),比如要POST的数据就设置在request_attachment()中。
## giano认证
```
// Create a baas::CredentialGenerator using Giano's API
baas::CredentialGenerator generator = CREATE_MOCK_PERSONAL_GENERATOR(
"mock_user", "mock_roles", "mock_group", baas::sdk::BAAS_OK);
// Create a baidu::rpc::policy::GianoAuthenticator using the generator we just created
// and then pass it into baidu::rpc::ChannelOptions
baidu::rpc::policy::GianoAuthenticator auth(&generator, NULL);
baidu::rpc::ChannelOptions option;
option.auth = &auth;
```
首先通过调用Giano API生成验证器baas::CredentialGenerator,具体可参看[Giano快速上手手册.pdf](http://wiki.baidu.com/download/attachments/37774685/Giano%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B%E6%89%8B%E5%86%8C.pdf?version=1&modificationDate=1421990746000&api=v2)。然后按照如上代码一步步将其设置到baidu::rpc::ChannelOptions里去。
当client设置认证后,任何一个新连接建立后都必须首先发送一段验证信息(通过Giano认证器生成),才能发送后续请求。认证成功后,该连接上的后续请求不会再带有验证消息。
## 重置
调用Reset方法可让Controller回到刚创建时的状态。
别在RPC结束前重置Controller,行为是未定义的。
## 压缩
set_request_compress_type()设置request的压缩方式,默认不压缩。注意:附件不会被压缩。HTTP body的压缩方法见[client压缩request body](http://wiki.baidu.com/pages/viewpage.action?pageId=213828697#id-访问HTTP-压缩requestbody)。
支持的压缩方法有:
- baidu::rpc::CompressTypeSnappy : [snanpy压缩](http://google.github.io/snappy/),压缩和解压显著快于其他压缩方法,但压缩率最低。
- baidu::rpc::CompressTypeGzip : [gzip压缩](http://en.wikipedia.org/wiki/Gzip),显著慢于snappy,但压缩率高
- baidu::rpc::CompressTypeZlib : [zlib压缩](http://en.wikipedia.org/wiki/Zlib),比gzip快10%~20%,压缩率略好于gzip,但速度仍明显慢于snappy。
下表是多种压缩算法应对重复率很高的数据时的性能,仅供参考。
| Compress method | Compress size(B) | Compress time(us) | Decompress time(us) | Compress throughput(MB/s) | Decompress throughput(MB/s) | Compress ratio |
| --------------- | ---------------- | ----------------- | ------------------- | ------------------------- | --------------------------- | -------------- |
| Snappy | 128 | 0.753114 | 0.890815 | 162.0875 | 137.0322 | 37.50% |
| Gzip | 10.85185 | 1.849199 | 11.2488 | 66.01252 | 47.66% | |
| Zlib | 10.71955 | 1.66522 | 11.38763 | 73.30581 | 38.28% | |
| Snappy | 1024 | 1.404812 | 1.374915 | 695.1555 | 710.2713 | 8.79% |
| Gzip | 16.97748 | 3.950946 | 57.52106 | 247.1718 | 6.64% | |
| Zlib | 15.98913 | 3.06195 | 61.07665 | 318.9348 | 5.47% | |
| Snappy | 16384 | 8.822967 | 9.865008 | 1770.946 | 1583.881 | 4.96% |
| Gzip | 160.8642 | 43.85911 | 97.13162 | 356.2544 | 0.78% | |
| Zlib | 147.6828 | 29.06039 | 105.8011 | 537.6734 | 0.71% | |
| Snappy | 32768 | 16.16362 | 19.43596 | 1933.354 | 1607.844 | 4.82% |
| Gzip | 229.7803 | 82.71903 | 135.9995 | 377.7849 | 0.54% | |
| Zlib | 240.7464 | 54.44099 | 129.8046 | 574.0161 | 0.50% | |
下表是多种压缩算法应对重复率很低的数据时的性能,仅供参考。
| Compress method | Compress size(B) | Compress time(us) | Decompress time(us) | Compress throughput(MB/s) | Decompress throughput(MB/s) | Compress ratio |
| --------------- | ---------------- | ----------------- | ------------------- | ------------------------- | --------------------------- | -------------- |
| Snappy | 128 | 0.866002 | 0.718052 | 140.9584 | 170.0021 | 105.47% |
| Gzip | 15.89855 | 4.936242 | 7.678077 | 24.7294 | 116.41% | |
| Zlib | 15.88757 | 4.793953 | 7.683384 | 25.46339 | 107.03% | |
| Snappy | 1024 | 2.087972 | 1.06572 | 467.7087 | 916.3403 | 100.78% |
| Gzip | 32.54279 | 12.27744 | 30.00857 | 79.5412 | 79.79% | |
| Zlib | 31.51397 | 11.2374 | 30.98824 | 86.90288 | 78.61% | |
| Snappy | 16384 | 12.598 | 6.306592 | 1240.276 | 2477.566 | 100.06% |
| Gzip | 537.1803 | 129.7558 | 29.08707 | 120.4185 | 75.32% | |
| Zlib | 519.5705 | 115.1463 | 30.07291 | 135.697 | 75.24% | |
| Snappy | 32768 | 22.68531 | 12.39793 | 1377.543 | 2520.582 | 100.03% |
| Gzip | 1403.974 | 258.9239 | 22.25825 | 120.6919 | 75.25% | |
| Zlib | 1370.201 | 230.3683 | 22.80687 | 135.6524 | 75.21% | |
# FAQ
### Q: baidu-rpc能用unix domain socket吗
不能。因为同机socket并不走网络,相比domain socket性能只会略微下降,替换为domain socket意义不大。以后可能会扩展支持。
### Q: Fail to connect to xx.xx.xx.xx:xxxx, Connection refused是什么意思
一般是对端server没打开端口(很可能挂了)。
### Q: 经常遇到Connection timedout(不在一个机房)
![img](http://wiki.baidu.com/download/attachments/71337200/image2017-2-28%2015%3A48%3A48.png?version=1&modificationDate=1488268128000&api=v2)
这个就是连接超时了,调大连接超时:
![img](http://wiki.baidu.com/download/attachments/71337200/image2017-2-28%2015%3A48%3A9.png?version=1&modificationDate=1488268089000&api=v2)
注意这并不是RPC超时,RPC超时打印的日志是"Reached timeout=..."。
### Q: 为什么同步方式是好的,异步就crash了
重点检查Controller,Response和done的生命周期。在异步访问中,RPC调用结束并不意味着RPC整个过程结束,而是要在done被调用后才会结束。所以这些对象不应在调用RPC后就释放,而是要在done里面释放。所以你一般不能把这些对象分配在栈上,而应该使用NewCallback等方式分配在堆上。详见[异步访问](http://wiki.baidu.com/pages/viewpage.action?pageId=213828685#id-创建和访问Client-异步访问)。
### Q: 我怎么确认server处理了我的请求
不一定能。当response返回且成功时,我们确认这个过程一定成功了。当response返回且失败时,我们确认这个过程一定失败了。但当response没有返回时,它可能失败,也可能成功。如果我们选择重试,那一个成功的过程也可能会被再执行一次。所以一般来说RPC服务都应当考虑[幂等](http://en.wikipedia.org/wiki/Idempotence)问题,否则重试可能会导致多次叠加副作用而产生意向不到的结果。比如以读为主的检索服务大都没有副作用而天然幂等,无需特殊处理。而像写也很多的存储服务则要在设计时就加入版本号或序列号之类的机制以拒绝已经发生的过程,保证幂等。
### Q: BNS中机器列表已经配置了,但是RPC报"Fail to select server, No data available"错误
使用get_instance_by_service -s your_bns_name 来检查一下所有机器的status状态, 只有status为0的机器才能被client访问.
### Q: Invalid address=`bns://group.user-persona.dumi.nj03'是什么意思
FATAL 04-07 20:00:03 7778 public/baidu-rpc/src/baidu/rpc/channel.cpp:123] Invalid address=`bns://group.user-persona.dumi.nj03'. You should use Init(naming_service_name, load_balancer_name, options) to access multiple servers.
访问bns要使用三个参数的Init,它第二个参数是load_balancer_name,而你这里用的是两个参数的Init,框架当你是访问单点,就会报这个错。
\ No newline at end of file
随着产品线规模的增大,对下游的访问流程会越来越复杂,其中往往包含多个同时发起或逐个执行的异步操作。但这类代码的多线程陷阱很多,用户可能写出了bug也不自知,复现和调试也比较困难。而且实现往往只能解决同步的情况,要么不支持全异步要么得重写一套。以半同步为例,它指等待多个异步操作完成。它的同步实现一般是异步地发起多个操作,然后逐个等待各自完成;它的异步实现一般是用一个带计数器的回调,每当一个操作完成时计数器减一,直到0时调用回调。我们可以看到它的缺点:
- 同步和异步代码不一致。用户无法轻易地从一个模式转为另一种模式。从设计的角度,不一致暗示了没有抓住本质。
- 往往不能被取消。正确及时地取消一个操作不是一件易事,何况是组合访问。大部分实现不会支持取消一个组合访问。但这对于backup request这类降低延时的技术是必须的。
- 不能继续组合。比如你很难把一个半同步访问变成“更大"的访问模式的一部分。你只是满足了目前的需求,换个场景还得重写一套。
我们需要更好的抽象。如果有一种结构,它们的组合仍是同一种结构,用户可以便用统一接口完成同步、异步、取消等操作。我们其实已经有这个结构了:*Channel*。如果我们能以不同的方式把一些Channel组合为更大更复杂的Channel,并把不同的访问模式置入其中,那么用户便获得了一个一致且能组合的积木。欢迎使用这个强大的工具。
# ParallelChannel
ParallelChannel (“pchan”)同时访问其包含的sub channel,并合并它们的结果。用户可通过CallMapper修改请求,通过ResponseMerger合并结果。ParallelChannel看起来就像是一个Channel:
- 支持同步和异步访问。
- 发起异步操作后可以立刻删除。
- 可以取消。
- 支持超时。
示例代码见[example/parallel_echo_c++](https://svn.baidu.com/public/trunk/baidu-rpc/example/parallel_echo_c++/)
任何baidu::rpc::ChannelBase的子类都可以加入ParallelChannel,包括ParallelChannel和其他组合Channel。用户可以设置ParallelChannelOptions.fail_limit来控制访问的最大失败次数(r31803前是ParallelChannel::set_fail_limit),当失败的访问达到这个数目时,RPC call会立刻结束而不等待超时。
当baidu-rpc >= 1.0.155.31351时,一个sub channel可多次加入同一个ParallelChannel。当你需要对同一个服务发起多次异步访问并等待它们完成的话,这很有用。
ParallelChannel的内部结构大致如下:
![img](http://wiki.baidu.com/download/attachments/71337222/image2015-7-6%2016%3A1%3A59.png?version=1&modificationDate=1436169719000&api=v2)
## 插入sub channel
可通过如下接口把sub channel插入ParallelChannel:
```c++
int AddChannel(baidu::rpc::ChannelBase* sub_channel,
ChannelOwnership ownership,
CallMapper* call_mapper,
ResponseMerger* response_merger);
```
当ownership为baidu::rpc::OWNS_CHANNEL时,sub_channel会在ParallelChannel析构时被删除。当baidu-rpc >= 1.0.155.31351时,由于一个sub channel可能会多次加入一个ParallelChannel,只要其中一个指明了ownership为baidu::rpc::OWNS_CHANNEL,那个sub channel就会在ParallelChannel析构时被删除(一次)。
访问ParallelChannel时调用AddChannel是线程**不安全**的。
## CallMapper
用于把对ParallelChannel的调用转化为对sub channel的调用。如果call_mapper是NULL,sub channel的请求就是ParallelChannel的请求,而response则New()自ParallelChannel的response。如果call_mapper不为NULL,则会在ParallelChannel析构时被删除。当baidu-rpc >= 1.0.105.30846时,call_mapper内含引用计数,一个call_mapper可与多个sub channel关联。
```c++
class CallMapper {
public:
virtual ~CallMapper();
virtual SubCall Map(int channel_index/*starting from 0*/,
const google::protobuf::MethodDescriptor* method,
const google::protobuf::Message* request,
google::protobuf::Message* response) = 0;
};
```
channel_index:该sub channel在ParallelChannel中的位置,从0开始计数。
method/request/response:ParallelChannel.CallMethod()的参数。
返回的SubCall被用于访问对应sub channel,SubCall有两个特殊值:
- 返回SubCall::Bad()则对ParallelChannel的该次访问立刻失败,Controller.ErrorCode()为EREQUEST。
- 返回SubCall::Skip()则跳过对该sub channel的访问,如果所有的sub channel都被跳过了,该次访问立刻失败,Controller.ErrorCode()为ECANCELED。
常见的Map()实现有:
- 广播request。这也是call_mapper为NULL时的行为:
```c++
class Broadcaster : public CallMapper {
public:
SubCall Map(int channel_index/*starting from 0*/,
const google::protobuf::MethodDescriptor* method,
const google::protobuf::Message* request,
google::protobuf::Message* response) {
// method/request和pchan保持一致,response是new出来的,最后的flag告诉pchan在RPC结束后删除Response。
return SubCall(method, request, response->New(), DELETE_RESPONSE);
}
};
```
- 修改request中的字段后再发。
```c++
class ModifyRequest : public CallMapper {
public:
SubCall Map(int channel_index/*starting from 0*/,
const google::protobuf::MethodDescriptor* method,
const google::protobuf::Message* request,
google::protobuf::Message* response) {
FooRequest* copied_req = baidu::rpc::Clone<FooRequest>(request);
copied_req->set_xxx(...);
// 拷贝并修改request,最后的flag告诉pchan在RPC结束后删除Request和Response。
return SubCall(method, copied_req, response->New(), DELETE_REQUEST | DELETE_RESPONSE);
}
};
```
- request和response已经包含了sub request/response,直接取出来访问对应的sub channel。
```c++
class UseFieldAsSubRequest : public CallMapper {
public:
SubCall Map(int channel_index/*starting from 0*/,
const google::protobuf::MethodDescriptor* method,
const google::protobuf::Message* request,
google::protobuf::Message* response) {
if (channel_index >= request->sub_request_size()) {
// sub_request不够,说明外面准备数据的地方和pchan中sub channel的个数不符,返回Bad()会让该次访问立刻结束并报EREQUEST错误
return SubCall::Bad();
}
// 取出对应的sub request,增加一个sub response,最后的flag为0告诉pchan什么都不用删(因为删除request/response自然一起删了)
return SubCall(sub_method, request->sub_request(channel_index), response->add_sub_response(), 0);
}
};
```
## ResponseMerger
response_merger把sub channel的response合并入总的response,其为NULL时,则使用response->MergeFrom(*sub_response),MergeFrom的行为可概括为“除了合并repeated字段,其余都是覆盖”。如果你需要更复杂的行为,则需实现ResponseMerger。response_merger是一个个执行的,所以你并不需要考虑多个Merge同时运行的情况。如果response_merger不为NULL,则会在ParallelChannel析构时被删除。response_merger内含引用计数,一个response_merger可与多个sub channel关联。
Result的取值有:
- MERGED: 成功合并。
- FAIL (之前叫IGNORE): sub_response没有合并成功,会被记作一次失败。比如10 sub channels & fail_limit=4,3个在合并前已经失败了,1个合并后返回了FAIL。这次RPC会被视作发生了4次错误,由于达到了fail_limit这次RPC会立刻结束。
- FAIL_ALL (之前叫CALL_FAILED): 使本次RPC call立刻结束。
## 获得访问sub channel时的controller
有时访问者需要了解访问sub channel时的细节,通过Controller.sub(i)可获得访问sub channel的controller.
```
// Get the controllers for accessing sub channels in combo channels.
// Ordinary channel:
// sub_count() is 0 and sub() is always NULL.
// ParallelChannel/PartitionChannel:
// sub_count() is #sub-channels and sub(i) is the controller for
// accessing i-th sub channel inside ParallelChannel, if i is outside
// [0, sub_count() - 1], sub(i) is NULL.
// NOTE: You must test sub() against NULL, ALWAYS. Even if i is inside
// range, sub(i) can still be NULL:
// * the rpc call may fail and terminate before accessing the sub channel
// * the sub channel was skipped
// SelectiveChannel/DynamicPartitionChannel:
// sub_count() is always 1 and sub(0) is the controller of successful
// or last call to sub channels.
int sub_count() const;
const Controller* sub(int index) const;
```
# SelectiveChannel
[SelectiveChannel](https://svn.baidu.com/public/trunk/baidu-rpc/src/baidu/rpc/selective_channel.h) (“schan”)按负载均衡算法访问其包含的一个Channel,相比普通Channel它更加高层:把流量分给sub channel,而不是具体的Server。SelectiveChannel主要用来支持机器组之间的负载均衡,它具备Channel的主要属性:
- 支持同步和异步访问。
- 发起异步操作后可以立刻删除。
- 可以取消。
- 支持超时。
示例代码见[example/selective_echo_c++](https://svn.baidu.com/public/trunk/baidu-rpc/example/selective_echo_c++/)
任何baidu::rpc::ChannelBase的子类都可加入SelectiveChannel,包括SelectiveChannel和其他组合Channel。
SelectiveChannel的重试独立于其中的sub channel,当SelectiveChannel访问某个sub channel失败时(可能本身包含了重试),它会重试另外一个sub channel。
目前SelectiveChannel要求request必须在RPC结束前有效,其他channel没有这个要求。如果你使用SelectiveChannel发起异步操作,确保request在done中才被删除。
## 使用SelectiveChannel
SelectiveChannel的初始化和普通Channel基本一样,但Init不需要指定名字服务,因为SelectiveChannel面向sub channel并通过AddChannel动态添加,而普通Channel面向的server才记录在名字服务中。
```c++
#include <baidu/rpc/selective_channel.h>
...
baidu::rpc::SelectiveChannel schan;
baidu::rpc::ChannelOptions schan_options;
schan_options.timeout_ms = ...;
schan_options.backup_request_ms = ...;
schan_options.max_retry = ...;
if (schan.Init(load_balancer, &schan_options) != 0) {
LOG(ERROR) << "Fail to init SelectiveChannel";
return -1;
}
```
初始化完毕后通过AddChannel加入sub channel。
```c++
if (schan.AddChannel(sub_channel, NULL/*ChannelHandle*/) != 0) { // 第二个参数ChannelHandle用于删除sub channel,不用删除可填NULL
LOG(ERROR) << "Fail to add sub_channel";
return -1;
}
```
注意:
- 和ParallelChannel不同,SelectiveChannel的AddChannel可在任意时刻调用,即使该SelectiveChannel正在被访问(下一次访问时生效)
- SelectiveChannel总是own sub channel,这和ParallelChannel可选择ownership是不同的。
- 如果AddChannel第二个参数不为空,会填入一个类型为baidu::rpc::SelectiveChannel::ChannelHandle的值,这个handle可作为RemoveAndDestroyChannel的参数来动态删除一个channel。
- SelectiveChannel会用自身的超时覆盖sub channel初始化时指定的超时。比如某个sub channel的超时为100ms,SelectiveChannel的超时为500ms,实际访问时的超时是500ms,而不是100ms。
访问SelectiveChannel的方式和普通Channel是一样的。
## 以往多个bns分流为例
一些场景中我们需要向多个bns下的机器分流,原因可能有:
- 完成同一个检索功能的机器被挂载到了不同的bns下。
- 机器被拆成了多个组,流量先分流给一个组,再分流到组内机器。组间的分流方式和组内有所不同。
这都可以通过SelectiveChannel完成。
SelectiveChannel的创建和普通Channel类似,但不需要名字服务,而是通过AddChannel方法插入sub channel。下面的代码创建了一个SelectiveChannel,并插入三个访问不同bns的普通Channel。
```c++
baidu::rpc::SelectiveChannel channel;
baidu::rpc::ChannelOptions schan_options;
schan_options.timeout_ms = FLAGS_timeout_ms;
schan_options.backup_request_ms = FLAGS_backup_ms;
schan_options.max_retry = FLAGS_max_retry;
if (channel.Init("c_murmurhash", &schan_options) != 0) {
LOG(ERROR) << "Fail to init SelectiveChannel";
return -1;
}
for (int i = 0; i < 3; ++i) {
baidu::rpc::Channel* sub_channel = new baidu::rpc::Channel;
if (sub_channel->Init(bns_node_name[i], "rr", NULL) != 0) {
LOG(ERROR) << "Fail to init sub channel " << i;
return -1;
}
if (channel.AddChannel(sub_channel, NULL/*handle for removal*/) != 0) {
LOG(ERROR) << "Fail to add sub_channel to channel";
return -1;
}
}
...
XXXService_Stub stub(&channel);
stub.FooMethod(&cntl, &request, &response, NULL);
...
```
# PartitionChannel
[PartitionChannel](https://svn.baidu.com/public/trunk/baidu-rpc/src/baidu/rpc/partition_channel.h)是特殊的ParallelChannel,它会根据名字服务中的tag自动建立对应分库的sub channel。这样用户就可以把所有的分库机器挂在一个名字服务内,通过tag来指定哪台机器对应哪个分库。示例代码见[example/partition_echo_c++](https://svn.baidu.com/public/trunk/baidu-rpc/example/partition_echo_c++/)
ParititonChannel只能处理一种分库方法,当用户需要多种分库方法共存,或从一个分库方法平滑地切换为另一种分库方法时,可以使用DynamicPartitionChannel,它会根据不同的分库方式动态地建立对应的sub PartitionChannel,并根据容量把请求分配给不同的分库。示例代码见[example/dynamic_partition_echo_c++](https://svn.baidu.com/public/trunk/baidu-rpc/example/dynamic_partition_echo_c++/)
如果分库在不同的名字服务内,那么用户得自行用ParallelChannel组装,即每个sub channel对应一个分库(使用不同的名字服务)。ParellelChannel的使用方法请见上一节。
## 使用PartitionChannel
首先定制PartitionParser。这个例子中tag的形式是N/M,N代表分库的index,M是分库的个数。比如0/3代表一共3个分库,这是第一个。
```c++
#include <baidu/rpc/partition_channel.h>
...
class MyPartitionParser : public baidu::rpc::PartitionParser {
public:
bool ParseFromTag(const std::string& tag, baidu::rpc::Partition* out) {
// "N/M" : #N partition of M partitions.
size_t pos = tag.find_first_of('/');
if (pos == std::string::npos) {
LOG(ERROR) << "Invalid tag=" << tag;
return false;
}
char* endptr = NULL;
out->index = strtol(tag.c_str(), &endptr, 10);
if (endptr != tag.data() + pos) {
LOG(ERROR) << "Invalid index=" << base::StringPiece(tag.data(), pos);
return false;
}
out->num_partition_kinds = strtol(tag.c_str() + pos + 1, &endptr, 10);
if (endptr != tag.c_str() + tag.size()) {
LOG(ERROR) << "Invalid num=" << tag.data() + pos + 1;
return false;
}
return true;
}
};
```
然后初始化PartitionChannel。
```c++
#include <baidu/rpc/partition_channel.h>
...
baidu::rpc::PartitionChannel channel;
baidu::rpc::PartitionChannelOptions options;
options.protocol = ...; // PartitionChannelOptions继承了ChannelOptions,后者有的前者也有
options.timeout_ms = ...; // 同上
options.fail_limit = 1; // PartitionChannel自己的选项,意思同ParalellChannel中的fail_limit。这里为1的意思是只要有1个分库访问失败,这次RPC就失败了。
if (channel.Init(num_partition_kinds, new MyPartitionParser(),
server_address, load_balancer, &options) != 0) {
LOG(ERROR) << "Fail to init PartitionChannel";
return -1;
}
// 访问方法和普通Channel是一样的
```
## 使用DynamicPartitionChannel
DynamicPartitionChannel的使用方法和PartitionChannel基本上是一样的,先定制PartitionParser再初始化,但Init时不需要num_partition_kinds,因为DynamicPartitionChannel会为不同的分库方法动态建立不同的sub PartitionChannel。
下面我们演示一下使用DynamicPartitionChannel平滑地从3库变成4库。
首先我们在本地启动三个Server,分别对应8004, 8005, 8006端口。
```
$ ./echo_server -server_num 3
TRACE: 09-06 10:40:39: * 0 server.cpp:159] EchoServer is serving on port=8004
TRACE: 09-06 10:40:39: * 0 server.cpp:159] EchoServer is serving on port=8005
TRACE: 09-06 10:40:39: * 0 server.cpp:159] EchoServer is serving on port=8006
TRACE: 09-06 10:40:40: * 0 server.cpp:192] S[0]=0 S[1]=0 S[2]=0 [total=0]
TRACE: 09-06 10:40:41: * 0 server.cpp:192] S[0]=0 S[1]=0 S[2]=0 [total=0]
TRACE: 09-06 10:40:42: * 0 server.cpp:192] S[0]=0 S[1]=0 S[2]=0 [total=0]
```
启动后每个Server每秒会打印上一秒收到的流量,目前都是0。然后我们在本地启动使用DynamicPartitionChannel的Client,初始化DynamicPartitionChannel的代码如下:
```c++
...
baidu::rpc::DynamicPartitionChannel channel;
baidu::rpc::PartitionChannelOptions options;
options.succeed_without_server = true; // 表示允许server_list在DynamicPartitionChannel.Init启动时为空,否则Init会失败。
options.fail_limit = 1; // 任何访问分库失败都认为RPC失败。调大这个数值可以使访问更宽松,比如等于2的话表示至少两个分库失败才算失败。
if (channel.Init(new MyPartitionParser(), "file://server_list", "rr", &options) != 0) {
LOG(ERROR) << "Fail to init channel";
return -1;
}
...
```
名字服务"file://server_list"的内容是:
```
0.0.0.0:8004 0/3 # 表示3分库中的第一个分库,其他依次类推
0.0.0.0:8004 1/3
0.0.0.0:8004 2/3
```
****
3分库方案的3个库都在8004端口对应的server上。启动Client后Client发现了8004,并向其发送流量。
```
$ ./echo_client
TRACE: 09-06 10:51:10: * 0 src/baidu/rpc/policy/file_naming_service.cpp:83] Got 3 unique addresses from `server_list'
TRACE: 09-06 10:51:10: * 0 src/baidu/rpc/socket.cpp:779] Connected to 0.0.0.0:8004 via fd=3 SocketId=0 self_port=46544
TRACE: 09-06 10:51:11: * 0 client.cpp:226] Sending EchoRequest at qps=132472 latency=371
TRACE: 09-06 10:51:12: * 0 client.cpp:226] Sending EchoRequest at qps=132658 latency=370
TRACE: 09-06 10:51:13: * 0 client.cpp:226] Sending EchoRequest at qps=133208 latency=369
```
同时Server端收到了3倍的流量:因为访问一次Client端要访问三次8004,分别对应每个分库。
```
TRACE: 09-06 10:51:11: * 0 server.cpp:192] S[0]=398866 S[1]=0 S[2]=0 [total=398866]
TRACE: 09-06 10:51:12: * 0 server.cpp:192] S[0]=398117 S[1]=0 S[2]=0 [total=398117]
TRACE: 09-06 10:51:13: * 0 server.cpp:192] S[0]=398873 S[1]=0 S[2]=0 [total=398873]
```
我们开始修改分库,在server_list中加入4分库的8005:
```
0.0.0.0:8004 0/3
0.0.0.0:8004 1/3
0.0.0.0:8004 2/3
0.0.0.0:8005 0/4
0.0.0.0:8005 1/4
0.0.0.0:8005 2/4
0.0.0.0:8005 3/4
```
观察Client和Server的输出变化。Client端发现了server_list的变化并重新载入,但qps并没有什么变化。
```
TRACE: 09-06 10:57:10: * 0 src/baidu/rpc/policy/file_naming_service.cpp:83] Got 7 unique addresses from `server_list'
TRACE: 09-06 10:57:10: * 0 src/baidu/rpc/socket.cpp:779] Connected to 0.0.0.0:8005 via fd=7 SocketId=768 self_port=39171
TRACE: 09-06 10:57:11: * 0 client.cpp:226] Sending EchoRequest at qps=135346 latency=363
TRACE: 09-06 10:57:12: * 0 client.cpp:226] Sending EchoRequest at qps=134201 latency=366
TRACE: 09-06 10:57:13: * 0 client.cpp:226] Sending EchoRequest at qps=137627 latency=356
TRACE: 09-06 10:57:14: * 0 client.cpp:226] Sending EchoRequest at qps=136775 latency=359
TRACE: 09-06 10:57:15: * 0 client.cpp:226] Sending EchoRequest at qps=139043 latency=353
```
server端的变化比较大。8005收到了流量,并且和8004的流量比例关系约为4 : 3。
```
TRACE: 09-06 10:57:09: * 0 server.cpp:192] S[0]=398597 S[1]=0 S[2]=0 [total=398597]
TRACE: 09-06 10:57:10: * 0 server.cpp:192] S[0]=392839 S[1]=0 S[2]=0 [total=392839]
TRACE: 09-06 10:57:11: * 0 server.cpp:192] S[0]=334704 S[1]=83219 S[2]=0 [total=417923]
TRACE: 09-06 10:57:12: * 0 server.cpp:192] S[0]=206215 S[1]=273873 S[2]=0 [total=480088]
TRACE: 09-06 10:57:13: * 0 server.cpp:192] S[0]=204520 S[1]=270483 S[2]=0 [total=475003]
TRACE: 09-06 10:57:14: * 0 server.cpp:192] S[0]=207055 S[1]=273725 S[2]=0 [total=480780]
TRACE: 09-06 10:57:15: * 0 server.cpp:192] S[0]=208453 S[1]=276803 S[2]=0 [total=485256]
```
由于访问一次Client要访问三次8004或四次8005。而8004:8005流量是3:4,说明Client以1:1的比例访问了3分库和4分库。这个比例关系取决于其容量。容量的计算是递归的:
- 普通连接NamingService的Channel的容量等于它其中所有server的容量之和。如果BNS上没有配置权值,单个server的容量为1。
- ParallelChannel或PartitionChannel的容量等于它其中Sub Channel容量的最小值。
- SelectiveChannel的容量等于它其中Sub Channel的容量之和。
- DynamicPartitionChannel的容量等于它其中Sub PartitionChannel的容量之和。
在我们这儿的场景中,3分库和4分库的容量是一样的,都是1。所有的3库都在8004,所有的4库都在8005,所以这两个Server的流量比例就是分库数的比例。
我们可以让4分库方案加入更多机器。修改server_list加入8006:
```
0.0.0.0:8004 0/3
0.0.0.0:8004 1/3
0.0.0.0:8004 2/3
0.0.0.0:8005 0/4
0.0.0.0:8005 1/4
0.0.0.0:8005 2/4
0.0.0.0:8005 3/4
0.0.0.0:8006 0/4
0.0.0.0:8006 1/4
0.0.0.0:8006 2/4
0.0.0.0:8006 3/4
```
Client的变化仍旧不大:
```
TRACE: 09-06 11:11:51: * 0 src/baidu/rpc/policy/file_naming_service.cpp:83] Got 11 unique addresses from `server_list'
TRACE: 09-06 11:11:51: * 0 src/baidu/rpc/socket.cpp:779] Connected to 0.0.0.0:8006 via fd=8 SocketId=1280 self_port=40759
TRACE: 09-06 11:11:51: * 0 client.cpp:226] Sending EchoRequest at qps=131799 latency=372
TRACE: 09-06 11:11:52: * 0 client.cpp:226] Sending EchoRequest at qps=136217 latency=361
TRACE: 09-06 11:11:53: * 0 client.cpp:226] Sending EchoRequest at qps=133531 latency=368
TRACE: 09-06 11:11:54: * 0 client.cpp:226] Sending EchoRequest at qps=136072 latency=361
```
Server端可以看到8006收到了流量。三台server的流量比例约为3:4:4。这是因为3分库的容量仍为1,而4分库由于8006的加入变成了2。3分库和4分库的流量比例是3:8。4分库中的每个分库在8005和8006上都有实例,同一个分库的不同实例使用round robin分流,所以8005和8006平摊了流量。最后的效果就是3:4:4。
```
TRACE: 09-06 11:11:51: * 0 server.cpp:192] S[0]=199625 S[1]=263226 S[2]=0 [total=462851]
TRACE: 09-06 11:11:52: * 0 server.cpp:192] S[0]=143248 S[1]=190717 S[2]=159756 [total=493721]
TRACE: 09-06 11:11:53: * 0 server.cpp:192] S[0]=133003 S[1]=178328 S[2]=178325 [total=489656]
TRACE: 09-06 11:11:54: * 0 server.cpp:192] S[0]=135534 S[1]=180386 S[2]=180333 [total=496253]
```
我们尝试下掉3分库中的一个分库:
```
0.0.0.0:8004 0/3
0.0.0.0:8004 1/3
#0.0.0.0:8004 2/3
0.0.0.0:8005 0/4
0.0.0.0:8005 1/4
0.0.0.0:8005 2/4
0.0.0.0:8005 3/4
0.0.0.0:8006 0/4
0.0.0.0:8006 1/4
0.0.0.0:8006 2/4
0.0.0.0:8006 3/4
```
Client端发现了这点。
```
TRACE: 09-06 11:17:47: * 0 src/baidu/rpc/policy/file_naming_service.cpp:83] Got 10 unique addresses from `server_list'
TRACE: 09-06 11:17:47: * 0 client.cpp:226] Sending EchoRequest at qps=131653 latency=373
TRACE: 09-06 11:17:48: * 0 client.cpp:226] Sending EchoRequest at qps=120560 latency=407
TRACE: 09-06 11:17:49: * 0 client.cpp:226] Sending EchoRequest at qps=124100 latency=395
TRACE: 09-06 11:17:50: * 0 client.cpp:226] Sending EchoRequest at qps=123743 latency=397
```
Server端更明显,8004很快没有了流量。这是因为去掉的2/3分库已经是3分库中最后的2/3分库,一旦被注释,3分库的容量就变为了0,导致8004分不到任何流量了。
```
TRACE: 09-06 11:17:47: * 0 server.cpp:192] S[0]=130864 S[1]=174499 S[2]=174548 [total=479911]
TRACE: 09-06 11:17:48: * 0 server.cpp:192] S[0]=20063 S[1]=230027 S[2]=230098 [total=480188]
TRACE: 09-06 11:17:49: * 0 server.cpp:192] S[0]=0 S[1]=245961 S[2]=245888 [total=491849]
TRACE: 09-06 11:17:50: * 0 server.cpp:192] S[0]=0 S[1]=250198 S[2]=250150 [total=500348]
```
在真实的线上环境中,我们会逐渐地增加4分库的server,同时下掉3分库中的server。DynamicParititonChannel会按照每种分库方式的容量动态切分流量。当某个时刻3分库的容量变为0时,我们便平滑地把Server从3分库变为了4分库,同时并没有修改Client的代码。
\ No newline at end of file
baidu-rpc使用[baidu::rpc::Controller](https://svn.baidu.com/public/trunk/baidu-rpc/src/baidu/rpc/controller.h)设置一次RPC的参数和获取一次RPC的结果,ErrorCode()和ErrorText()是Controller的两个方法,分别是该次RPC的错误码和错误描述,只在RPC结束后才能访问,否则结果未定义。ErrorText()由Controller的基类google::protobuf::RpcController定义,ErrorCode()则是baidu::rpc::Controller定义的。Controller还有个Failed()方法告知该次RPC是否失败,这三者的关系是:
- 当Failed()为true时,ErrorCode()一定不为0,ErrorText()是非空的错误描述
- 当Failed()为false时,ErrorCode()一定为0,ErrorText()是未定义的(目前在baidu-rpc中会为空,但你最好不要依赖这个事实)
# 标记RPC为错误
baidu-rpc的client端和server端都有Controller,都可以通过SetFailed()修改其中的ErrorCode和ErrorText。当多次调用一个Controller的SetFailed时,ErrorCode会被覆盖,ErrorText则是**添加**而不是覆盖,在client端,框架会额外加上第几次重试,在server端,框架会额外加上server的地址信息。
client端Controller的SetFailed()常由框架调用,比如发送request失败,接收到的response不符合要求等等。只有在进行较复杂的访问操作时用户才可能需要设置client端的错误,比如在访问后端前做额外的请求检查,发现有错误时需要把RPC设置为失败。
server端Controller的SetFailed()常由用户在服务回调中调用。当处理过程发生错误时,一般调用SetFailed()并释放资源后就return了。框架会把错误码和错误信息按交互协议填入response,client端的框架收到后会填入它那边的Controller中,从而让用户在RPC结束后取到。需要注意的是,**server端在SetFailed()时一般不需要再打条日志。**打日志是比较慢的,在繁忙的线上磁盘上,很容易出现巨大的lag。一个错误频发的client容易减慢整个server的速度而影响到其他的client,理论上来说这甚至能成为一种攻击手段。对于希望在server端看到错误信息的场景,可以打开**-log_error_text**开关(已上线服务可访问/flags/log_error_text?setvalue=true动态打开),server会在每次失败的RPC后把对应Controller的ErrorText()打印出来。
# baidu-rpc的错误码
baidu-rpc使用的所有ErrorCode都定义在[errno.proto](https://svn.baidu.com/public/trunk/baidu-rpc/protocol/baidu/rpc/errno.proto)中,*SYS_*开头的来自linux系统,与/usr/include/errno.h中定义的精确一致,定义在proto中是为了跨语言。其余的是baidu-rpc自有的。
[berror(error_code)](https://svn.baidu.com/public/trunk/common/base/errno.h)可获得error_code的描述,berror()可获得[system errno](http://www.cplusplus.com/reference/cerrno/errno/)的描述。**ErrorText() != berror(****ErrorCode())**,ErrorText()会包含更具体的错误信息。baidu-rpc默认包含berror所属的[public/common模块](http://wiki.baidu.com/pages/viewpage.action?pageId=38035224),你可以直接使用。
baidu-rpc中常见错误的打印内容列表如下:
| 错误码 | 数值 | 重试 | 说明 | 日志 |
| -------------- | ---- | ---- | ---------------------------------------- | ---------------------------------------- |
| EAGAIN | 11 | 是 | 同时异步发送的请求过多。r32803前为硬限(<1024),出现频率较高;r32803后为软限,很少出现。 | Resource temporarily unavailable |
| ETIMEDOUT | 110 | 是 | 在r31711后代表连接超时。r31711前代表RPC超时,相当于ERPCTIMEDOUT。 | Connection timed out |
| ENOSERVICE | 1001 | 否 | 找不到服务,不太出现,一般会返回ENOMETHOD。 | |
| ENOMETHOD | 1002 | 否 | 找不到方法。 | 形式广泛,常见如"Fail to find method=..." |
| EREQUEST | 1003 | 否 | request格式或序列化错误,client端和server端都可能设置 | 形式广泛:"Missing required fields in request: ...""Fail to parse request message, ...""Bad request" |
| EAUTH | 1004 | 否 | 认证失败 | "Authentication failed" |
| ETOOMANYFAILS | 1005 | 否 | ParallelChannel中太多子channel失败 | "%d/%d channels failed, fail_limit=%d" |
| EBACKUPREQUEST | 1007 | 是 | 触发backup request时设置,用户一般在/rpcz里看到 | “reached backup timeout=%dms" |
| ERPCTIMEDOUT | 1008 | 否 | RPC超时 | "reached timeout=%dms" |
| EFAILEDSOCKET | 1009 | 是 | RPC进行过程中TCP连接出现问题 | "The socket was SetFailed" |
| EHTTP | 1010 | 否 | `r31923后失败的HTTP访问(非2xx状态码)均使用这个错误码。默认不重试,可通过RetryPolicy定制` | Bad http call |
| EOVERCROWDED | 1011 | 是 | 连接上有过多的未发送数据,一般是由于同时发起了过多的异步访问。可通过参数-socket_max_unwritten_bytes控制,默认8MB。 | The server is overcrowded |
| EINTERNAL | 2001 | 否 | Server端Controller.SetFailed没有指定错误码时使用的默认错误码。 | "Internal Server Error" |
| ERESPONSE | 2002 | 否 | response解析或格式错误,client端和server端都可能设置 | 形式广泛"Missing required fields in response: ...""Fail to parse response message, ""Bad response" |
| ELOGOFF | 2003 | 是 | Server已经被Stop了 | "Server is going to quit" |
| ELIMIT | 2004 | 是 | 同时处理的请求数超过ServerOptions.max_concurrency了 | "Reached server's limit=%d on concurrent requests", |
# 自定义错误码
在C++/C中你可以通过宏、常量、protobuf enum等方式定义ErrorCode:
```
#define ESTOP -114 // C/C++
static const int EMYERROR = 30; // C/C++
const int EMYERROR2 = -31; // C++ only
```
如果你需要用berror返回这些新错误码的描述,你可以在.cpp或.c文件的全局域中调用BAIDU_REGISTER_ERRNO(error_code, description)进行注册,比如:
```
BAIDU_REGISTER_ERRNO(ESTOP, "the thread is stopping")
BAIDU_REGISTER_ERRNO(EMYERROR, "my error")
```
strerror/strerror_r不认识使用BAIDU_REGISTER_ERRNO定义的错误码,自然地,printf类的函数中的%m也不能转化为对应的描述,你必须使用%s并配以berror()。
```
errno = ESTOP;
printf("Describe errno: %m\n"); // [Wrong] Describe errno: Unknown error -114
printf("Describe errno: %s\n", strerror_r(errno, NULL, 0)); // [Wrong] Describe errno: Unknown error -114
printf("Describe errno: %s\n", berror()); // [Correct] Describe errno: the thread is stopping
printf("Describe errno: %s\n", berror(errno)); // [Correct] Describe errno: the thread is stopping
```
当同一个error code被重复注册时,如果都是在C++中定义的,那么会出现链接错误:
```
redefinition of `class BaiduErrnoHelper<30>'
```
否则在程序启动时会abort:
```
Fail to define EMYERROR(30) which is already defined as `Read-only file system', abort
```
总的来说这和RPC框架没什么关系,直到你希望通过RPC框架传递ErrorCode。这个需求很自然,不过你得确保不同的模块对ErrorCode的理解是相同的,否则当两个模块把一个错误码理解为不同的错误时,它们之间的交互将出现无法预计的行为。为了防止这种情况出现,你最好这么做:
- 优先使用系统错误码,它们的值和含义是固定不变的。
- 多个交互的模块使用同一份错误码定义,防止后续修改时产生不一致。
- 使用BAIDU_REGISTER_ERRNO描述新错误码,以确保同一个进程内错误码是互斥的。
\ No newline at end of file
# 概述 # 概述
类似于kylin的[ExecMan](http://websvn.work.baidu.com/repos/list#from=/repos/inf_common/show/trunk/kylin/ExecMan.h?revision=HEAD!!handler=loadframe), 类似于kylin的[ExecMan](http://websvn.work.baidu.com/repos/list#from=/repos/inf_common/show/trunk/kylin/ExecMan.h?revision=HEAD!!handler=loadframe), [ExecutionQueue](http://websvn.work.baidu.com/repos/list#from=/repos/public/show/trunk/bthread/bthread/execution_queue.h?revision=HEAD!!handler=loadframe)提供了异步串行执行的功能。ExecutionQueue的相关技术最早使用在RPC中实现[多线程向同一个fd写数据](http://wiki.baidu.com/display/RPC/IO#IO-%E5%8F%91%E6%B6%88%E6%81%AF). 在r31345之后加入到bthread。 ExecutionQueue 提供了如下基本功能:
[ExecutionQueue](http://websvn.work.baidu.com/repos/list#from=/repos/public/show/trunk/bthread/bthread/execution_queue.h?revision=HEAD!!handler=loadframe)
提供了异步串行执行的功能。
ExecutionQueue的相关技术最早使用在RPC中实现[多线程向同一个fd写数据](http://wiki.baidu.com/display/RPC/IO#IO-%E5%8F%91%E6%B6%88%E6%81%AF).
在r31345之后加入到bthread。 ExecutionQueue 提供了如下基本功能:
- 异步有序执行: 任务在另外一个单独的线程中执行, 并且执行顺序严格和提交顺序一致. - 异步有序执行: 任务在另外一个单独的线程中执行, 并且执行顺序严格和提交顺序一致.
- Multi Producer: 多个线程可以同时向一个ExecutionQueue提交任务 - Multi Producer: 多个线程可以同时向一个ExecutionQueue提交任务
...@@ -13,30 +9,18 @@ ExecutionQueue的相关技术最早使用在RPC中实现[多线程向同一个fd ...@@ -13,30 +9,18 @@ ExecutionQueue的相关技术最早使用在RPC中实现[多线程向同一个fd
- 支持高优任务插队 - 支持高优任务插队
和ExecMan的主要区别: 和ExecMan的主要区别:
- ExecutionQueue的任务提交接口是[wait-free](https://en.wikipedia.org/wiki/Non-blocking_algorithm#Wait-freedom)的, ExecMan依赖了lock, 这意味着当机器整体比较繁忙的时候,使用ExecutionQueue不会因为某个进程被系统强制切换导致所有线程都被阻塞。
- ExecutionQueue的任务提交接口是[wait-free](https://en.wikipedia.org/wiki/Non-blocking_algorithm#Wait-freedom)的, - ExecutionQueue支持批量处理: 执行线程可以批量处理提交的任务, 获得更好的locality. ExecMan的某个线程处理完某个AsyncClient的AsyncContext之后下一个任务很可能是属于另外一个AsyncClient的AsyncContex, 这时候cpu cache会在不同AsyncClient依赖的资源间进行不停的切换。
ExecMan依赖了lock, - ExecutionQueue的处理函数不会被绑定到固定的线程中执行, ExecMan中是根据AsyncClient hash到固定的执行线程,不同的ExecutionQueue之间的任务处理完全独立,当线程数足够多的情况下,所有非空闲的ExecutionQueue都能同时得到调度。同时也意味着当线程数不足的时候,ExecutionQueue无法保证公平性, 当发生这种情况的时候需要动态增加bthread的worker线程来增加整体的处理能力.
这意味着当机器整体比较繁忙的时候,使用ExecutionQueue不会因为某个进程被系统强制切换导致所有线程都被阻塞。 - ExecutionQueue运行线程为bthread, 可以随意的使用一些bthread同步原语而不用担心阻塞pthread的执行. 而在ExecMan里面得尽量避免使用较高概率会导致阻塞的同步原语.
- ExecutionQueue支持批量处理: 执行线程可以批量处理提交的任务, 获得更好的locality.
ExecMan的某个线程处理完某个AsyncClient的AsyncContext之后下一个任务很可能是属于另外一个AsyncClient的AsyncContex,
这时候cpu cache会在不同AsyncClient依赖的资源间进行不停的切换。
- ExecutionQueue的处理函数不会被绑定到固定的线程中执行, ExecMan中是根据AsyncClient
hash到固定的执行线程,不同的ExecutionQueue之间的任务处理完全独立,当线程数足够多的情况下,所有非空闲的ExecutionQueue都能同时得到调度。
同时也意味着当线程数不足的时候,ExecutionQueue无法保证公平性,
当发生这种情况的时候需要动态增加bthread的worker线程来增加整体的处理能力.
- ExecutionQueue运行线程为bthread, 可以随意的使用一些bthread同步原语而不用担心阻塞pthread的执行.
而在ExecMan里面得尽量避免使用较高概率会导致阻塞的同步原语.
# 背景 # 背景
在多核并发编程领域, [Message 在多核并发编程领域, [Message passing](https://en.wikipedia.org/wiki/Message_passing)作为一种解决竞争的手段得到了比较广泛的应用,它按照业务依赖的资源将逻辑拆分成若干个独立actor,每个actor负责对应资源的维护工作,当一个流程需要修改某个资源的时候,
passing](https://en.wikipedia.org/wiki/Message_passing)作为一种解决竞争的手段得到了比较广泛的应用,它按照业务依赖的资源将逻辑拆分成若干个独立actor,每个actor负责对应资源的维护工作,当一个流程需要修改某个资源的时候,
就转化为一个消息发送给对应actor,这个actor(通常在另外的上下文中)根据命令内容对这个资源进行相应的修改,之后可以选择唤醒调用者(同步)或者提交到下一个actor(异步)的方式进行后续处理。 就转化为一个消息发送给对应actor,这个actor(通常在另外的上下文中)根据命令内容对这个资源进行相应的修改,之后可以选择唤醒调用者(同步)或者提交到下一个actor(异步)的方式进行后续处理。
![img](http://web.mit.edu/6.005/www/fa14/classes/20-queues-locks/figures/producer-consumer.png) ![img](http://web.mit.edu/6.005/www/fa14/classes/20-queues-locks/figures/producer-consumer.png)
# ExecutionQueue Vs Mutex # ExecutionQueue Vs Mutex
ExecutionQueue和mutex都可以用来在多线程场景中消除竞争. 相比较使用mutex, ExecutionQueue和mutex都可以用来在多线程场景中消除竞争. 相比较使用mutex,
...@@ -50,23 +34,18 @@ ExecutionQueue和mutex都可以用来在多线程场景中消除竞争. 相比 ...@@ -50,23 +34,18 @@ ExecutionQueue和mutex都可以用来在多线程场景中消除竞争. 相比
但是缺点也同样明显: 但是缺点也同样明显:
- 一个流程的代码往往散落在多个地方,代码理解和维护成本高。 - 一个流程的代码往往散落在多个地方,代码理解和维护成本高。
- 为了提高并发度, 一件事情往往会被拆分到多个ExecutionQueue进行流水线处理, - 为了提高并发度, 一件事情往往会被拆分到多个ExecutionQueue进行流水线处理,这样会导致在多核之间不停的进行切换,会付出额外的调度以及同步cache的开销, 尤其是竞争的临界区非常小的情况下, 这些开销不能忽略.
这样会导致在多核之间不停的进行切换,会付出额外的调度以及同步cache的开销, - 同时原子的操作多个资源实现会变得复杂, 使用mutex可以同时锁住多个mutex, 用了ExeuctionQueue就需要依赖额外的dispatch queue了。
尤其是竞争的临界区非常小的情况下, 这些开销不能忽略.
- 同时原子的操作多个资源实现会变得复杂, 使用mutex可以同时锁住多个mutex,
用了ExeuctionQueue就需要依赖额外的dispatch queue了。
- 由于所有操作都是单线程的,某个任务运行慢了就会阻塞同一个ExecutionQueue的其他操作。 - 由于所有操作都是单线程的,某个任务运行慢了就会阻塞同一个ExecutionQueue的其他操作。
- 并发控制变得复杂,ExecutionQueue可能会由于缓存的任务过多占用过多的内存。 - 并发控制变得复杂,ExecutionQueue可能会由于缓存的任务过多占用过多的内存。
不考虑性能和复杂度,理论上任何系统都可以只使用mutex或者ExecutionQueue来消除竞争. 不考虑性能和复杂度,理论上任何系统都可以只使用mutex或者ExecutionQueue来消除竞争.
但是复杂系统的设计上,建议根据不同的场景灵活决定如何使用这两个工具: 但是复杂系统的设计上,建议根据不同的场景灵活决定如何使用这两个工具:
- 如果临界区非常小,竞争又不是很激烈,优先选择使用mutex, 之后可以结合[contention - 如果临界区非常小,竞争又不是很激烈,优先选择使用mutex, 之后可以结合[contention profiler](http://wiki.baidu.com/display/RPC/contention+profiler)来判断mutex是否成为瓶颈。
profiler](http://wiki.baidu.com/display/RPC/contention+profiler)来判断mutex是否成为瓶颈。
- 需要有序执行,或者无法消除的激烈竞争但是可以通过批量执行来提高吞吐, 可以选择使用ExecutionQueue。 - 需要有序执行,或者无法消除的激烈竞争但是可以通过批量执行来提高吞吐, 可以选择使用ExecutionQueue。
总之, 总之,多线程编程没有万能的模型,需要根据具体的场景,结合丰富的profliling工具,最终在复杂度和性能之间找到合适的平衡。
多线程编程没有万能的模型,需要根据具体的场景,结合丰富的profliling工具,最终在复杂度和性能之间找到合适的平衡。
**特别指出一点**,Linux中mutex无竞争的lock/unlock只有需要几条原子指令,在绝大多数场景下的开销都可以忽略不计. **特别指出一点**,Linux中mutex无竞争的lock/unlock只有需要几条原子指令,在绝大多数场景下的开销都可以忽略不计.
...@@ -111,11 +90,8 @@ int execution_queue_start( ...@@ -111,11 +90,8 @@ int execution_queue_start(
void* meta); void* meta);
``` ```
创建的返回值是一个64位的id, , 创建的返回值是一个64位的id, 相当于ExecutionQueue实例的一个[弱引用](https://en.wikipedia.org/wiki/Weak_reference), 可以wait-free的在O(1)时间内定位一个ExecutionQueue, 你可以到处拷贝这个id, 甚至可以放在RPC中,作为远端资源的定位工具。
相当于ExecutionQueue实例的一个[弱引用](https://en.wikipedia.org/wiki/Weak_reference), 你必须保证meta的生命周期,在对应的ExecutionQueue真正停止前不会释放.
可以wait-free的在O(1)时间内定位一个ExecutionQueue, 你可以到处拷贝这个id, 甚至可以放在RPC中,
作为远端资源的定位工具。
你必须保证meta的生命周期,在对应的ExecutionQueue真正停止前不会释放**.**
### 停止一个ExecutionQueue: ### 停止一个ExecutionQueue:
...@@ -137,9 +113,10 @@ int execution_queue_join(ExecutionQueueId<T> id); ...@@ -137,9 +113,10 @@ int execution_queue_join(ExecutionQueueId<T> id);
``` ```
stop和join都可以多次调用, 都会又合理的行为。stop可以随时调用而不用当心线程安全性问题。 stop和join都可以多次调用, 都会又合理的行为。stop可以随时调用而不用当心线程安全性问题。
和fd的close类似,如果stop不被调用, 相应的资源会永久泄露
安全释放meta的时机: 可以在execute函数中收到iter.is_queue_stopped()==true的任务的时候释放, 和fd的close类似,如果stop不被调用, 相应的资源会永久泄露。
也可以等到join返回之后释放. 注意不要double-free
安全释放meta的时机: 可以在execute函数中收到iter.is_queue_stopped()==true的任务的时候释放,也可以等到join返回之后释放. 注意不要double-free
### 提交任务 ### 提交任务
...@@ -192,13 +169,9 @@ int execution_queue_execute(ExecutionQueueId<T> id, ...@@ -192,13 +169,9 @@ int execution_queue_execute(ExecutionQueueId<T> id,
TaskHandle* handle); TaskHandle* handle);
``` ```
high_priority的task之间的执行顺序也会**严格按照提交顺序**, 这点和ExecMan不同, high_priority的task之间的执行顺序也会**严格按照提交顺序**, 这点和ExecMan不同, ExecMan的QueueExecEmergent的AsyncContex执行顺序是undefined. 但是这也意味着你没有办法将任何任务插队到一个high priority的任务之前执行.
ExecMan的QueueExecEmergent的AsyncContex执行顺序是undefined.
但是这也意味着你没有办法将任何任务插队到一个high priority的任务之前执行.
开启inplace_if_possible, 在无竞争的场景中可以省去一次线程调度和cache同步的开销. 开启inplace_if_possible, 在无竞争的场景中可以省去一次线程调度和cache同步的开销. 但是可能会造成死锁或者递归层数过多(比如不停的ping-pong)的问题,开启前请确定你的代码中不存在这些问题。
但是可能会造成死锁或者递归层数过多(比如不停的ping-pong)的问题,
开启前请确定你的代码中不存在这些问题。
### 取消一个已提交任务 ### 取消一个已提交任务
...@@ -211,5 +184,4 @@ ExecMan的QueueExecEmergent的AsyncContex执行顺序是undefined. ...@@ -211,5 +184,4 @@ ExecMan的QueueExecEmergent的AsyncContex执行顺序是undefined.
int execution_queue_cancel(const TaskHandle& h); int execution_queue_cancel(const TaskHandle& h);
``` ```
返回非0仅仅意味着ExecutionQueue已经将对应的task递给过execute, 返回非0仅仅意味着ExecutionQueue已经将对应的task递给过execute, 真实的逻辑中可能将这个task缓存在另外的容器中,所以这并不意味着逻辑上的task已经结束,你需要在自己的业务上保证这一点.
真实的逻辑中可能将这个task缓存在另外的容器中,所以这并不意味着逻辑上的task已经结束,你需要在自己的业务上保证这一点. \ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment