同步和异步代码不一致。用户无法轻易地从一个模式转为另一种模式。从设计的角度,不一致暗示了没有抓住本质。
往往不能被取消。正确及时地取消一个操作不是一件易事,何况是组合访问。但取消对于终结无意义的等待是很必要的。
不能继续组合。比如你很难把一个上述实现变成"更大"的访问模式的一部分。换个场景还得重写一套。
- 往往不能被取消。正确及时地取消一个操作不是一件易事,何况是组合访问。但取消对于终结无意义的等待是很必要的。
- 不能继续组合。比如你很难把一个上述实现变成“更大"的访问模式的一部分。换个场景还得重写一套。
# ParallelChannel
ParallelChannel (“pchan”)同时访问其包含的sub channel,并合并它们的结果。用户可通过CallMapper修改请求,通过ResponseMerger合并结果。ParallelChannel看起来就像是一个Channel:
ParallelChannel (有时被称为"pchan")同时访问其包含的sub channel,并合并它们的结果。用户可通过CallMapper修改请求,通过ResponseMerger合并结果。ParallelChannel看起来就像是一个Channel:
- 支持同步和异步访问。
- 发起异步操作后可以立刻删除。
任何brpc::ChannelBase的子类都可以加入ParallelChannel,包括ParallelChannel和其他组合Channel。用户可以设置ParallelChannelOptions.fail_limit来控制访问的最大失败次数,当失败的访问达到这个数目时,RPC call会立刻结束而不等待超时。
一个sub channel可多次加入同一个ParallelChannel。当你需要对同一个服务发起多次异步访问并等待它们完成的话,这很有用。
ResponseMerger* response_merger);
当ownership为brpc::OWNS_CHANNEL时,sub_channel会在ParallelChannel析构时被删除。由于一个sub channel可能会多次加入一个ParallelChannel,只要其中一个指明了ownership为brpc::OWNS_CHANNEL,那个sub channel就会在ParallelChannel析构时被删除(一次)
当ownership为brpc::OWNS_CHANNEL时,sub_channel会在ParallelChannel析构时被删除。一个sub channel可能会多次加入一个ParallelChannel,如果其中一个指明了ownership为brpc::OWNS_CHANNEL,那个sub channel会在ParallelChannel析构时被最多删除一次
## CallMapper
const google::protobuf::MethodDescriptor* method,
const google::protobuf::Message* request,
google::protobuf::Message* response) {
// method/request和pchan保持一致,response是new出来的,最后的flag告诉pchan在RPC结束后删除Response。
// method/request和pchan保持一致.
// response是new出来的,最后的flag告诉pchan在RPC结束后删除Response。
return SubCall(method, request, response->New(), DELETE_RESPONSE);
- request和response已经包含了sub request/response,直接取出来访问对应的sub channel
request和response已经包含了sub request/response,直接取出来。
class UseFieldAsSubRequest : public CallMapper {
const google::protobuf::Message* request,
google::protobuf::Message* response) {
if (channel_index >= request->sub_request_size()) {
// sub_request不够,说明外面准备数据的地方和pchan中sub channel的个数不符,返回Bad()会让该次访问立刻结束并报EREQUEST错误
// sub_request不够,说明外面准备数据的地方和pchan中sub channel的个数不符.
// 返回Bad()让该次访问立刻失败
// 返回Bad()让该次访问立刻失败
return SubCall::Bad();
// 取出对应的sub request,增加一个sub response,最后的flag为0告诉pchan什么都不用删(因为删除request/response自然一起删了)
// 取出对应的sub request,增加一个sub response,最后的flag为0告诉pchan什么都不用删
return SubCall(sub_method, request->sub_request(channel_index), response->add_sub_response(), 0);
## ResponseMerger
response_merger把sub channel的response合并入总的response,其为NULL时,则使用response->MergeFrom(*sub_response),MergeFrom的行为可概括为“除了合并repeated字段,其余都是覆盖”。如果你需要更复杂的行为,则需实现ResponseMerger。response_merger是一个个执行的,所以你并不需要考虑多个Merge同时运行的情况。如果response_merger不为NULL,则会在ParallelChannel析构时被删除。response_merger内含引用计数,一个response_merger可与多个sub channel关联。
response_merger把sub channel的response合并入总的response,其为NULL时,则使用response->MergeFrom(*sub_response),MergeFrom的行为可概括为"除了合并repeated字段,其余都是覆盖"。如果你需要更复杂的行为,则需实现ResponseMerger。response_merger是一个个执行的,所以你并不需要考虑多个Merge同时运行的情况。response_merger在ParallelChannel析构时被删除。response_merger内含引用计数,一个response_merger可与多个sub channel关联。
- MERGED: 成功合并。
- FAIL (之前叫IGNORE): sub_response没有合并成功,会被记作一次失败。比如10 sub channels & fail_limit=4,3个在合并前已经失败了,1个合并后返回了FAIL。这次RPC会被视作发生了4次错误,由于达到了fail_limit这次RPC会立刻结束。
- FAIL_ALL (之前叫CALL_FAILED): 使本次RPC call立刻结束。
- FAIL: sub_response没有合并成功,会被记作一次失败。比如有10个sub channels且fail_limit为4,只要有4个合并结果返回了FAIL,这次RPC就会达到fail_limit并立刻结束。
- FAIL_ALL: 使本次RPC直接结束。
- FAIL_ALL: 使本次RPC直接结束。
## 获得访问sub channel时的controller
......@@ -148,7 +152,7 @@ const Controller* sub(int index) const;
# SelectiveChannel
[SelectiveChannel]( (“schan”)按负载均衡算法访问其包含的一个Channel,相比普通Channel它更加高层:把流量分给sub channel,而不是具体的Server。SelectiveChannel主要用来支持机器组之间的负载均衡,它具备Channel的主要属性:
[SelectiveChannel]( (有时被称为"schan")按负载均衡算法访问其包含的Channel,相比普通Channel它更加高层:把流量分给sub channel,而不是具体的Server。SelectiveChannel主要用来支持机器组之间的负载均衡,它具备Channel的主要属性:
- 支持同步和异步访问。
- 发起异步操作后可以立刻删除。
SelectiveChannel的重试独立于其中的sub channel,当SelectiveChannel访问某个sub channel失败时(可能本身包含了重试),它会重试另外一个sub channel。
SelectiveChannel的重试独立于其中的sub channel,当SelectiveChannel访问某个sub channel失败后(本身可能重试),它会重试另外一个sub channel。
## 使用SelectiveChannel
SelectiveChannel的初始化和普通Channel基本一样,但Init不需要指定名字服务,因为SelectiveChannel面向sub channel并通过AddChannel动态添加,而普通Channel面向的server才记录在名字服务中
SelectiveChannel的初始化和普通Channel基本一样,但Init不需要指定名字服务,因为SelectiveChannel通过AddChannel动态添加sub channel,而普通Channel通过名字服务动态管理server
#include <brpc/selective_channel.h>
......@@ -195,26 +199,25 @@ if (schan.AddChannel(sub_channel, NULL/*ChannelHandle*/) != 0) { // 第二个
- 和ParallelChannel不同,SelectiveChannel的AddChannel可在任意时刻调用,即使该SelectiveChannel正在被访问(下一次访问时生效)
- SelectiveChannel总是own sub channel,这和ParallelChannel可选择ownership是不同的。
- 如果AddChannel第二个参数不为空,会填入一个类型为brpc::SelectiveChannel::ChannelHandle的值,这个handle可作为RemoveAndDestroyChannel的参数来动态删除一个channel。
- SelectiveChannel会用自身的超时覆盖sub channel初始化时指定的超时。比如某个sub channel的超时为100ms,SelectiveChannel的超时为500ms,实际访问时的超时是500ms,而不是100ms
SelectiveChannel会用自身的超时覆盖sub channel初始化时指定的超时。比如某个sub channel的超时为100ms,SelectiveChannel的超时为500ms,实际访问时的超时是500ms。
## 以往多个bns分流为例
## 例子: 往多个名字服务分流
- 完成同一个检索功能的机器被挂载到了不同的bns下。
完成同一个检索功能的机器被挂载到了不同的名字服务下。
- 机器被拆成了多个组,流量先分流给一个组,再分流到组内机器。组间的分流方式和组内有所不同。
SelectiveChannel的创建和普通Channel类似,但不需要名字服务,而是通过AddChannel方法插入sub channel。下面的代码创建了一个SelectiveChannel,并插入三个访问不同bns的普通Channel。
brpc::SelectiveChannel channel;
brpc::ChannelOptions schan_options;
schan_options.timeout_ms = FLAGS_timeout_ms;
schan_options.backup_request_ms = FLAGS_backup_ms;
schan_options.max_retry = FLAGS_max_retry;
if (channel.Init("c_murmurhash", &schan_options) != 0) {
LOG(ERROR) << "Fail to init SelectiveChannel";
for (int i = 0; i < 3; ++i) {
brpc::Channel* sub_channel = new brpc::Channel;
if (sub_channel->Init(bns_node_name[i], "rr", NULL) != 0) {
if (sub_channel->Init(ns_node_name[i], "rr", NULL) != 0) {
LOG(ERROR) << "Fail to init sub channel " << i;
return -1;
......@@ -244,7 +247,7 @@ stub.FooMethod(&cntl, &request, &response, NULL);
ParititonChannel只能处理一种分库方法,当用户需要多种分库方法共存,或从一个分库方法平滑地切换为另一种分库方法时,可以使用DynamicPartitionChannel,它会根据不同的分库方式动态地建立对应的sub PartitionChannel,并根据容量把请求分配给不同的分库。示例代码见[example/dynamic_partition_echo_c++](
如果分库在不同的名字服务内,那么用户得自行用ParallelChannel组装,即每个sub channel对应一个分库(使用不同的名字服务)。ParellelChannel的使用方法请见上一节
如果分库在不同的名字服务内,那么用户得自行用ParallelChannel组装,即每个sub channel对应一个分库(使用不同的名字服务)。ParellelChannel的使用方法[上面](#ParallelChannel)
## 使用PartitionChannel
......@@ -302,9 +305,9 @@ if (channel.Init(num_partition_kinds, new MyPartitionParser(),
DynamicPartitionChannel的使用方法和PartitionChannel基本上是一样的,先定制PartitionParser再初始化,但Init时不需要num_partition_kinds,因为DynamicPartitionChannel会为不同的分库方法动态建立不同的sub PartitionChannel。
首先分别在8004, 8005, 8006端口启动三个server
$ ./echo_server -server_num 3
......@@ -316,14 +319,16 @@ TRACE: 09-06 10:40:41: * 0 server.cpp:192] S[0]=0 S[1]=0 S[2]=0 [total=0]
TRACE: 09-06 10:40:42: * 0 server.cpp:192] S[0]=0 S[1]=0 S[2]=0 [total=0]
brpc::DynamicPartitionChannel channel;
brpc::PartitionChannelOptions options;
options.succeed_without_server = true; // 表示允许server_list在DynamicPartitionChannel.Init启动时为空,否则Init会失败。
options.fail_limit = 1; // 任何访问分库失败都认为RPC失败。调大这个数值可以使访问更宽松,比如等于2的话表示至少两个分库失败才算失败。
// 访问任何分库失败都认为RPC失败。调大这个数值可以使访问更宽松,比如等于2的话表示至少两个分库失败才算失败。
options.fail_limit = 1;
options.fail_limit = 1;
if (channel.Init(new MyPartitionParser(), "file://server_list", "rr", &options) != 0) {
LOG(ERROR) << "Fail to init channel";
return -1;
......@@ -357,7 +362,7 @@ TRACE: 09-06 10:51:12: * 0 server.cpp:192] S[0]=398117 S[1]=0 S[2]=0 [total=39
TRACE: 09-06 10:51:13: * 0 server.cpp:192] S[0]=398873 S[1]=0 S[2]=0 [total=398873]
......@@ -382,7 +387,7 @@ TRACE: 09-06 10:57:14: * 0 client.cpp:226] Sending EchoRequest at qps=136775 l
TRACE: 09-06 10:57:15: * 0 client.cpp:226] Sending EchoRequest at qps=139043 latency=353
server端的变化比较大。8005收到了流量,并且和8004的流量比例关系约为4 : 3。
TRACE: 09-06 10:57:09: * 0 server.cpp:192] S[0]=398597 S[1]=0 S[2]=0 [total=398597]
......@@ -394,16 +399,16 @@ TRACE: 09-06 10:57:14: * 0 server.cpp:192] S[0]=207055 S[1]=273725 S[2]=0 [tot
TRACE: 09-06 10:57:15: * 0 server.cpp:192] S[0]=208453 S[1]=276803 S[2]=0 [total=485256]
- 普通连接NamingService的Channel的容量等于它其中所有server的容量之和。如果BNS上没有配置权值,单个server的容量为1。
普通Channel的容量等于它其中所有server的容量之和。如果名字服务没有配置权值,单个server的容量为1。
- ParallelChannel或PartitionChannel的容量等于它其中Sub Channel容量的最小值。
- SelectiveChannel的容量等于它其中Sub Channel的容量之和。
- DynamicPartitionChannel的容量等于它其中Sub PartitionChannel的容量之和。
......@@ -441,7 +446,7 @@ TRACE: 09-06 11:11:53: * 0 server.cpp:192] S[0]=133003 S[1]=178328 S[2]=178325
TRACE: 09-06 11:11:54: * 0 server.cpp:192] S[0]=135534 S[1]=180386 S[2]=180333 [total=496253]
尝试去掉3分库中的一个分库: (你可以在file://server_list中使用#注释一行)
......@@ -469,7 +474,7 @@ TRACE: 09-06 11:17:49: * 0 client.cpp:226] Sending EchoRequest at qps=124100 l
TRACE: 09-06 11:17:50: * 0 client.cpp:226] Sending EchoRequest at qps=123743 latency=397
TRACE: 09-06 11:17:47: * 0 server.cpp:192] S[0]=130864 S[1]=174499 S[2]=174548 [total=479911]
With the growth of the number of business products, the access pattern to downstream becomes increasingly complicate, which often contains multiple simultaneous RPCs or subsequent asynchronous ones. However, these could easily introduce very tricky bugs under multi-threaded environment, of which users may not even aware, and it's also difficult to debug and reproduce. Moreover, implementations may not provide full support for various access patterns, in which case you have to write your own. Take semi-synchronous RPC as an example, which means waiting for multiple asynchronous RPCs to complete. A common implementation for synchronous access would be issuing multiple requests asynchronously and waiting for their completion, while the implementation for asynchronous access makes use of a callback with a counter. Each time an asynchronous RPC finishes, the counter decrement itself until zero in which case the callback is called. Now let's analyze their weakness:
- The code is inconsistent between synchronous pattern and asynchronous one. It's difficult for users to move from one pattern to another. From the design point of view, inconsistencies suggest lose of essence.
- Cancellation is not supported in general. It's not easy to cancel an RPC in time correctly, let alone a combination of access. Most implementations do not support cancellation of a combo access. However, it's a must for some speed up technique such as backup request.
- Cascading is not supported, which means it's hard to turn a semi-synchronous access into one part of a "larger" access. Code may meet the current needs, but it's not generic.
With the growth of services, access patterns to downstream servers become increasingly complicated and often contain multiple RPCs in parallel or layered accesses. The complications could easily introduce tricky bugs around multi-threaded programming, which may not even be aware of by users and difficult to debug and reproduce. Moreover, implementations either only support synchronous patterns, or need to rewrite code for asynchronous patterns. Take running some code after completions of multiple asynchronous RPCs as an example, the synchronous pattern is often implemented as issuing multiple RPCs asynchronously and waiting for the completions respectively, while the asynchronous pattern is often implemented by a callback plus a referential count, which is decreased by one when one RPC completes. The callback is called when the count hits zero. Let's see drawbacks of the solution:
As a result, we need a better abstraction. If there is a structure whose combination is still the same structure, the interface to synchronous access, asynchronous one, cancellation and other operations would be the same for users. In fact, we already have this structure `Channel`. If we can combine some channels into larger and more complex ones in different ways along with different access patterns, then users will be armed with a consistent and modular building block. Welcome to this powerful tool.
- The code is inconsistent between synchronous and asynchronous pattern and it's not trivial for users to move from one pattern to another. From the designing point of view, inconsistencies implies that the essence is probably not grasped yet.
- Cancellation is often unsupported. It's not easy to cancel a single RPC correctly, let alone combinations of RPC. However cancellations are necessary to end pointless waiting.
- Not composable. It's hard to enclose the implementations above as one part of a "larger" pattern. The code can hardly be reused in a different scenario.
We need a better abstraction. If several channels are combined into a larger one with different access patterns enclosed, users would be able to do synchronous, asynchronous, cancelling operations with consistent and unified interfaces. The kind of channels are called combo channels in brpc.
# ParallelChannel
`ParallelChannel` (referred as "pchan") sends requests to all the sub channels inside at the same time and merges their results. The user can modify the request via `CallMapper` and merge the results with `ResponseMerger`. `ParallelChannel` looks like a `Channel`:
`ParallelChannel` (referred to as "pchan" sometimes) sends requests to all internal sub channels in parallel and merges the responses. Users can modify requests via `CallMapper` and merge responses with `ResponseMerger`. `ParallelChannel` looks like a `Channel`:
- Support synchronous and asynchronous access.
- Support synchronous and asynchronous accesses.
- Can be destroyed immediately after initiating an asynchronous operation.
- Support cancellation.
- Support timeout.
The sample code is shown in [example/parallel_echo_c++](
Check [example/parallel_echo_c++]( for an example.
Any subclasses of `brpc::ChannelBase` can join `ParallelChannel`, including `ParallelChannel` and other combo channels. The user can set `ParallelChannelOptions.fail_limit` to control the maximum number of acceptable failure. When the failed results reach this number, RPC will end immediately without waiting for timeout.
Any subclasses of `brpc::ChannelBase` can be added into `ParallelChannel`, including `ParallelChannel` and other combo channels. Set `ParallelChannelOptions.fail_limit` to control maximum number of failures. When number of failed responses reaches the limit, the RPC is ended immediately rather than waiting for timeout.
A sub channel can be added to the same `ParallelChannel` for multiple times. This is useful when you need to initiate multiple asynchronous visits to the same service and wait for them to complete.
A sub channel can be added to the same `ParallelChannel` more than once, which is useful when you need to initiate multiple asynchronous RPC to the same service and wait for their completions.
The following picture shows the internal structure of the `ParallelChannel`:
Following picture shows internal structure of `ParallelChannel` (Chinese in red: can be different from request/response respectively)
## Add sub channel
You can add a sub channel into `ParallelChannel` using the following API:
A sub channel can be added into `ParallelChannel` by following API:
int AddChannel(brpc::ChannelBase* sub_channel,
ResponseMerger* response_merger);
When `ownership` is `brpc::OWNS_CHANNEL`, the `sub_channel` will be destroyed when the `ParallelChannel` destructs. Since a sub channel can be added to a `ParallelChannel` multiple times, it will be deleted (only once) as long as one of the parameter `ownership` is `brpc::OWNS_CHANNEL`.
When `ownership` is `brpc::OWNS_CHANNEL`, `sub_channel` is destroyed when the `ParallelChannel` destructs. Although a sub channel may be added into a `ParallelChannel` multiple times, it's deleted for at most once when `ownership` in one of the additions is `brpc::OWNS_CHANNEL`.
Calling ` AddChannel` during a `ParallelChannel` RPC is **NOT thread safe**.
Calling ` AddChannel` during a RPC over `ParallelChannel` is **NOT thread safe**.
## CallMapper
This class converts `ParallelChannel` requests to `sub channel` ones. If `call_mapper` is NULL, the request for the sub channel is exactly the same as that for `ParallelChannel`, and the response is created by calling `New()` on `ParallelChannel`'s response. If `call_mapper` is not NULL, it will be deleted when `ParallelChannel` destructs. Due to the reference count inside, `call_mapper` can be associated with multiple sub channels.
This class converts RPCs to `ParallelChannel` to the ones to `sub channel`. If `call_mapper` is NULL, requests to the sub channel is just the ones to `ParallelChannel`, and responses are created by calling `New()` on the responses to `ParallelChannel`. `call_mapper` is deleted when `ParallelChannel` destructs. Due to the reference counting inside, one `call_mapper` can be associated with multiple sub channels.
class CallMapper {
......@@ -58,16 +60,16 @@ public:
`channel_index`: The position of the sub channel inside `ParallelChannel`, starting from zero.
`method/request/response`: Parameters fro `ParallelChannel::CallMethod()`.
method/request/response`: Parameters to `ParallelChannel::CallMethod()`.
Return `SubCall` to control the corresponding sub channel. It has two special values:
The returned `SubCall` configures the calls to the corresponding sub channel and has two special values:
- `SubCall::Bad()`: The current visit to `ParallelChannel` fails immediately with `Controller::ErrorCode()` being `EREQUEST`.
- `SubCall::Skip()`: To skip RPC to this sub channel. If all sub channels have been skipped, the request fails immediately with `Controller::ErrorCode()` being `ECANCELED`.
- `SubCall::Bad()`: The call to ParallelChannel fails immediately and `Controller::ErrorCode()` is set to `EREQUEST`.
- `SubCall::Skip()`: Skip the call to this sub channel. If all sub channels are skipped, the call to ParallelChannel fails immediately and `Controller::ErrorCode()` is set to `ECANCELED`.
The common implementations of `Map()` are listed below:
Common implementations of `Map()` are listed below:
- Broadcast request, which is also the behavior when `call_mapper` is NULL:
- Broadcast the request. This is also the behavior when `call_mapper` is NULL:
class Broadcaster : public CallMapper {
......@@ -76,15 +78,14 @@ The common implementations of `Map()` are listed below:
const google::protobuf::MethodDescriptor* method,
const google::protobuf::Message* request,
google::protobuf::Message* response) {
// Keep method/request to be same as those of pchan
// response is created by `new`
// The last flag tells pchan to delete response after RPC
// Use the method/request to pchan.
// response is created by `new` and the last flag tells pchan to delete response after completion of the RPC
return SubCall(method, request, response->New(), DELETE_RESPONSE);
- Modify some fields in request before sending:
- Modify some fields in the request before sending:
class ModifyRequest : public CallMapper {
......@@ -96,13 +97,13 @@ The common implementations of `Map()` are listed below:
FooRequest* copied_req = brpc::Clone<FooRequest>(request);
// Copy and modify the request
// The last flag tells pchan to delete request and response after RPC
// The last flag tells pchan to delete the request and response after completion of the RPC
return SubCall(method, copied_req, response->New(), DELETE_REQUEST | DELETE_RESPONSE);
- request/response already contains sub request/response. Use them to access sub channel directly.
- request/response already contains sub requests/responses, use them directly.
class UseFieldAsSubRequest : public CallMapper {
......@@ -112,16 +113,12 @@ The common implementations of `Map()` are listed below:
const google::protobuf::Message* request,
google::protobuf::Message* response) {
if (channel_index >= request->sub_request_size()) {
// Not enough sub_request
// The caller doesn't provide the same number of requests
// as number of sub channels in pchan
// Return Bad() to end this RPC immediately with EREQUEST
// Not enough sub_request. The caller doesn't provide same number of requests as number of sub channels in pchan
// Return Bad() to end this RPC immediately
// Return Bad() to end this RPC immediately
return SubCall::Bad();
// Fetch the corresponding sub request
// Add a new sub response
// The last flag tells pchan there is no need to delete anything
// since sub request/response will be destroyed with request/response
// Fetch the sub request and add a new sub response.
// The last flag(0) tells pchan that there is nothing to delete.
return SubCall(sub_method, request->sub_request(channel_index), response->add_sub_response(), 0);
## ResponseMerger
`response_merger` merges the response of all sub channels into the overall one. When it's NULL, `response->MergeFrom(*sub_response)` will be used instead, whose behavior can be summarized as "merge all the repeated fields and overwrite the rest". Your can implement `ResponseMerger` to achieve more complex behavior. `response_merger` will be used to merge sub response one by one so that you do not need to consider merging multiple response at the same time. It will be deleted when `ParallelChannel ` destructs if it's not NULL. Due to the reference count inside, `response_merger ` can be associated with multiple sub channels.
`response_merger` merges responses from all sub channels into one for the `ParallelChannel`. When it's NULL, `response->MergeFrom(*sub_response)` is used instead, whose behavior can be summarized as "merge repeated fields and overwrite the rest". If you need more complex behavior, implement `ResponseMerger`. Multiple `response_merger` are called one by one to merge sub responses so that you do not need to consider the race conditions between merging multiple responses simultaneously. The object is deleted when `ParallelChannel ` destructs. Due to the reference counting inside, `response_merger ` can be associated with multiple sub channels.
The accepted values of `Result` are:
Possible values of `Result` are:
- MERGED: Successful merged.
- FAIL (known as IGNORE): Count as one failure of merging. For example, if there are 10 sub channels & the `fail_limit=4` while 3 of which has already failed, a final merging failure will end this RPC with error at once due to the `fail_limit`.
- FAIL_ALL (known as CALL_FAILED): Immediately fails this RPC.
- MERGED: Successfully merged.
- FAIL: The `sub_response` was not merged successfully, counted as one failure. For example, there are 10 sub channels and `fail_limit` is 4, if 4 merges return FAIL, the RPC would reach fail_limit and end soon.
- FAIL_ALL: Directly fail the RPC.
## Get the controller object of each sub channel
## Get the controller to each sub channel
Sometimes users may need the details of each sub channel. This can be done by `Controller.sub(i)` to get the controller corresponding to a specific sub channel.
Sometimes users may need to know the details around sub calls. `Controller.sub(i)` gets the controller corresponding to a sub channel.
// Get the controllers for accessing sub channels in combo channels.
......@@ -162,24 +159,24 @@ const Controller* sub(int index) const;
# SelectiveChannel
[SelectiveChannel]( ("referred as schan") wraps multiple `Channel` using a specific load balancing algorithm to achieve a higher level of `Channel`. The requests will be sent to the sub channel rather than the specific Server. `SelectiveChannel` is mainly used to do load balancing between groups of machines. It has some basic properties of `Channel`:
[SelectiveChannel]( (referred to as "schan" sometimes) accesses one of the internal sub channels with a load balancing algorithm. It's more high-level compared to ordinary channels: The requests are sent to sub channels instead of servers directly. `SelectiveChannel` is mainly for load balancing between groups of machines and shares basic properties of `Channel`:
- Support synchronous and asynchronous access.
- Support synchronous and asynchronous accesses.
- Can be destroyed immediately after initiating an asynchronous operation.
- Support cancellation.
- Support timeout.
The sample code is shown in [example/selective_echo_c++](
Check [example/selective_echo_c++]( for an example.
Any subclasses of `brpc::ChannelBase` can join `SelectiveChannel`, including `SelectiveChannel` and other combo channels.
Any subclasses of `brpc::ChannelBase` can be added into `SelectiveChannel`, including `SelectiveChannel` and other combo channels.
The retry mechanism of `SelectiveChannel` is independent of its sub channels. When the access between `SelectiveChannel ` and one of its sub channel fails (Note that the sub channel may already retried for a couple of times), it will retry another sub channel.
Retries done by `SelectiveChannel` are independent from the ones in its sub channels. When a call to one of the sub channels fails(which may have been retried), other sub channels are retried.
Currently `SelectiveChannel` demands all requests remain valid until the end of RPC, while other channels do not have this requirement. If you plan to use `SelectiveChannel` asynchronously, make sure that the request is deleted inside `done`.
Currently `SelectiveChannel` requires **the request remains valid before completion of the RPC**, while other combo or regular channels do not. If you plan to use `SelectiveChannel` asynchronously, make sure that the request is deleted inside `done`.
## Use SelectiveChannel
## Using SelectiveChannel
The initialization of `SelectiveChannel` is almost the same as regular `Channel`, while it doesn't need a naming service parameter in `Init`. The reason is that `SelectiveChannel` is sub channel oriented and sub channels can be added into by `AddChannel` dynamically, but regular `Channel` is server oriented which has to be recorded in naming service.
The initialization of `SelectiveChannel` is almost the same as regular `Channel`, except that it doesn't need a naming service in `Init`, because `SelectiveChannel` adds sub channels dynamically by `AddChannel`, while regular `Channel` adds servers in the naming service.
......@@ -195,7 +192,7 @@ if (schan.Init(load_balancer, &schan_options) != 0) {
After a successful initialization, add sub channel using `AddChannel`.
After successful initialization, add sub channels with `AddChannel`.
// The second parameter ChannelHandle is used to delete sub channel,
......@@ -208,29 +205,28 @@ if (schan.AddChannel(sub_channel, NULL/*ChannelHandle*/) != 0) {
Note that:
- Unlike `ParallelChannel`, `SelectiveChannel::AddChannel` can be called at any time, even if the it's being used during RPC (which takes effect at the next access).
- `SelectiveChannel` always owns the sub channel objects, which is different from `ParallelChannel`'s configurable ownership.
- If the second parameter of `AddChannel` is not NULL, it will be filled using `brpc::SelectiveChannel::ChannelHandle`, which can be used as a parameter to `RemoveAndDestroyChannel` to delete a channel dynamically.
- `SelectiveChannel` overrides the timeout value of sub channel's using its own one. For example, having timeout set to 100ms for a sub channel and 500ms for `SelectiveChannel`, the actual request timeout is 500ms rather than 100ms.
- Unlike `ParallelChannel`, `SelectiveChannel::AddChannel` can be called at any time, even if a RPC over the SelectiveChannel is going on. (newly added channels take effects at the next RPC).
- `SelectiveChannel` always owns sub channels, which is different from `ParallelChannel`'s configurable ownership.
- If the second parameter to `AddChannel` is not NULL, it's filled with a value typed `brpc::SelectiveChannel::ChannelHandle`, which can be used as the parameter to `RemoveAndDestroyChannel` to remove and destroy a channel dynamically.
- `SelectiveChannel` overrides timeouts in sub channels. For example, having timeout set to 100ms for a sub channel and 500ms for `SelectiveChannel`, the actual timeout is 500ms.
The way of using `SelectiveChannel` is exactly the same as that of regular channels.
`SelectiveChannel`s are accessed same as regular channels.
## Divide requests into multiple DNS
## Example: divide traffic to multiple naming services
Sometimes we need to divide requests into multiple DNS node. The reasons may be:
Sometimes we need to divide traffic to multiple naming services, because:
- Machines of the same service are mounted under different DNS.
- Machines are split into multiple groups. Requests will be sent to one of the groups first and then travel inside that group. There is a difference in the way of traffic division between groups or inside a single group.
- Machines for one service are listed in multiple naming services.
- Machines are split into multiple groups. Requests are sent to one of the groups and then routed to one of the machines inside the group, and traffic are divided differently between groups and machines in a group.
The above can be achieved through `SelectiveChannel`.
Above requirements can be achieved by `SelectiveChannel`.
The following code creates a `SelectiveChannel` and inserts three regular channels which access different DNS nodes.
Following code creates a `SelectiveChannel` and inserts 3 regular channels for different naming services respectively.
brpc::SelectiveChannel channel;
brpc::ChannelOptions schan_options;
schan_options.timeout_ms = FLAGS_timeout_ms;
schan_options.backup_request_ms = FLAGS_backup_ms;
schan_options.max_retry = FLAGS_max_retry;
if (channel.Init("c_murmurhash", &schan_options) != 0) {
LOG(ERROR) << "Fail to init SelectiveChannel";
......@@ -239,7 +235,7 @@ if (channel.Init("c_murmurhash", &schan_options) != 0) {
for (int i = 0; i < 3; ++i) {
brpc::Channel* sub_channel = new brpc::Channel;
if (sub_channel->Init(dns_node_name[i], "rr", NULL) != 0) {
if (sub_channel->Init(ns_node_name[i], "rr", NULL) != 0) {
LOG(ERROR) << "Fail to init sub channel " << i;
return -1;
......@@ -256,15 +252,15 @@ stub.FooMethod(&cntl, &request, &response, NULL);
# PartitionChannel
[PartitionChannel]( is a specialized `ParallelChannel`, in which it can add sub channels automatically based on the tag value inside a naming service. As a result, users can group machines together inside one naming service and use tags to partition them apart. The sample code is shown in [example/partition_echo_c++](
[PartitionChannel]( is a specialized `ParallelChannel` to add sub channels automatically based on tags defined in the naming service. As a result, users can list all machines in one naming service and partition them by tags. Check [example/partition_echo_c++]( for an example.
`ParititonChannel` only supports one way to partition channels. When you need multiple scheme or replace the current one smoothly, you should try `DynamicPartitionChannel`. It will create the corresponding sub `PartitionChannel` based on different partition methods, and divide traffic into these partition channels. The sample code is shown in [example/dynamic_partition_echo_c++](
`ParititonChannel` only supports one kind to partitioning method. When multiple methods need to coexist, or one method needs to be changed to another smoothly, try `DynamicPartitionChannel`, which creates corresponding sub `PartitionChannel` for different partitioning methods, and divide traffic to partitions according to capacities of servers. Check [example/dynamic_partition_echo_c++]( for an example.
If partitions belong to different name services, you have to write your own channel, which should create and add a sub channel for each different naming service by means of `ParallelChannel`. Please refer to the previous section for `ParellelChannel`'s usage.
If partitions are listed in different naming services, users have to implement the partitioning by `ParallelChannel` and include sub channels to corresponding naming services respectively. Refer to [the previous section](#ParallelChannel) for usages of `ParellelChannel`.
## Use PartitionChannel
## Using PartitionChannel
First of all, implement your own `PartitionParser`. For this example, the tag format is `N/M`, where N represents the partition index and M for the total number of partitions. As a result, `0/3` means it's the first partition of the three.
First of all, implement your own `PartitionParser`. In this example, the tag's format is `N/M`, where N is index of the partition and M is total number of partitions. `0/3` means that there're 3 partitions and this is the first one of them.
#include <brpc/partition_channel.h>
Then initialize the `PartitionChannel`
Then initialize the `PartitionChannel`.
#include <brpc/partition_channel.h>
......@@ -316,13 +312,13 @@ if (channel.Init(num_partition_kinds, new MyPartitionParser(),
// The RPC interface is the same as regular Channel
## Use DynamicPartitionChannel
## Using DynamicPartitionChannel
`DynamicPartitionChannel` and `PartitionChannel` are basically the same in usage. Implementing `PartitionParser` first followed by initialization, where the `Init` does not need `num_partition_kinds` since `DynamicPartitionChannel` dynamically creates sub `PartitionChannel` for each partitions.
`DynamicPartitionChannel` and `PartitionChannel` are basically same on usages. Implement `PartitionParser` first then initialize the channel, which does not need `num_partition_kinds` since `DynamicPartitionChannel` dynamically creates sub `PartitionChannel` for each partition.
Now we demonstrate how to use `DynamicPartitionChannel` to migrate from 3-partition scheme to 4-partition scheme.
Following sections demonstrate how to use `DynamicPartitionChannel` to migrate from 3 partitions to 4 partitions smoothly.
First of all we start three `Server` objects on port 8004, 8005, 8006 respectively.
First of all, start 3 servers serving on port 8004, 8005, 8006 respectively.
$ ./echo_server -server_num 3
......@@ -334,16 +330,15 @@ TRACE: 09-06 10:40:41: * 0 server.cpp:192] S[0]=0 S[1]=0 S[2]=0 [total=0]
TRACE: 09-06 10:40:42: * 0 server.cpp:192] S[0]=0 S[1]=0 S[2]=0 [total=0]
Note that each server will print a flow summary every second, which is all 0 now. Then we start a client using `DynamicPartitionChannel`, whose initialization code is shown below:
Note that each server prints summaries on traffic received in last second, which is all 0 now.
Start a client using `DynamicPartitionChannel`, which is initialized as follows:
brpc::DynamicPartitionChannel channel;
brpc::PartitionChannelOptions options;
// Allow server_list to be empty when calling DynamicPartitionChannel::Init
options.succeed_without_server = true;
// Failure on any single partition terminates the RPC immediately.
// You can use a more relaxed value
// Failure on any single partition fails the RPC immediately. You can use a more relaxed value
options.fail_limit = 1;
if (channel.Init(new MyPartitionParser(), "file://server_list", "rr", &options) != 0) {
LOG(ERROR) << "Fail to init channel";
......@@ -360,7 +355,7 @@ The content inside the naming service `file://server_list` is: 2/3
Now all 3 partitions correspond to the same `Server` on port 8004, so the client begins to send requests to 8004 once started.
All 3 partitions are put on the server on port 8004, so the client begins to send requests to 8004 once started.
$ ./echo_client
......@@ -371,7 +366,7 @@ TRACE: 09-06 10:51:12: * 0 client.cpp:226] Sending EchoRequest at qps=132658 l
TRACE: 09-06 10:51:13: * 0 client.cpp:226] Sending EchoRequest at qps=133208 latency=369
At the same time, the server received triple flow due to the access of three partition for each request.
At the same time, the server on 8004 received tripled traffic due to the 3 partitions.
TRACE: 09-06 10:51:11: * 0 server.cpp:192] S[0]=398866 S[1]=0 S[2]=0 [total=398866]
......@@ -379,7 +374,7 @@ TRACE: 09-06 10:51:12: * 0 server.cpp:192] S[0]=398117 S[1]=0 S[2]=0 [total=39
TRACE: 09-06 10:51:13: * 0 server.cpp:192] S[0]=398873 S[1]=0 S[2]=0 [total=398873]
Now we change the partition: adding the new 4-partition scheme on port 8005 in `server_list`:
Add new 4 partitions on the server on port 8005.
......@@ -392,7 +387,7 @@ Now we change the partition: adding the new 4-partition scheme on port 8005 in ` 3/4
Notice the changes in the summary. The client found the modification of `server_list` and reloaded it, while it's QPS doesn't change.
Notice how summaries change. The client is aware of the modification to `server_list` and reloads it, but the QPS hardly changes.
TRACE: 09-06 10:57:10: * 0 src/brpc/policy/file_naming_service.cpp:83] Got 7 unique addresses from `server_list'
......@@ -404,7 +399,7 @@ TRACE: 09-06 10:57:14: * 0 client.cpp:226] Sending EchoRequest at qps=136775 l
TRACE: 09-06 10:57:15: * 0 client.cpp:226] Sending EchoRequest at qps=139043 latency=353
Change on the server's side is much bigger. Traffic appeared on port 8005 and its proportion against 8004 is roughly 4 : 3.
The server-side summary changes more obviously. The server on port 8005 has received requests and the proportion between traffic to 8004 and 8005 is roughly 3:4.
TRACE: 09-06 10:57:09: * 0 server.cpp:192] S[0]=398597 S[1]=0 S[2]=0 [total=398597]
......@@ -416,16 +411,16 @@ TRACE: 09-06 10:57:14: * 0 server.cpp:192] S[0]=207055 S[1]=273725 S[2]=0 [tot
TRACE: 09-06 10:57:15: * 0 server.cpp:192] S[0]=208453 S[1]=276803 S[2]=0 [total=485256]
The reason is that each request needs 3 access to 8004 or 4 access to 8005. Note that the flow ratio between 8004 and 8005 is 3 : 4, so the client issues requests to both partition schemes with the same probability. This flow ratio depends on capacity, which can be calculated recursively:
The traffic proportion between 8004 and 8005 is 3:4, considering that each RPC needs 3 calls to 8004 or 4 calls to 8005, the client issues requests to both partitioning methods in 1:1 manner, which depends on capacities calculated recursively:
- The capacity of a regular `Channel` using `NamingService` equals to the number of servers in the naming service, as the capacity of a single-server `Channel` is 1.
- The capacity of `ParallelChannel` or `PartitionChannel` equals to the minimum value of its sub channel's.
- The capacity of `SelectiveChannel` equals to the sum of all its sub channel's.
- The capacity of `DynamicPartitionChannel` equals to the sum of all its sub `PartitionChannel`'s.
- The capacity of a regular `Channel` is sum of capacities of servers that it addresses. Capacity of a server is 1 by default if the naming services does not configure weights.
- The capacity of `ParallelChannel` or `PartitionChannel` is the minimum of all its sub channel's.
- The capacity of `SelectiveChannel` is the sum of all its sub channel's.
- The capacity of `DynamicPartitionChannel` is the sum of all its sub `PartitionChannel`'s.
In this case, the capacity of the 3-partition channel and the 4-partition one both equal to 1 (only 1 regular channel in each partition such as 1/3). As all 3-partitions are on 8004 and all 4-partitions are on 8005, the traffic proportion between the two servers is the capacity ratio of the two partition channels.
In this example, capacities of the 3-partition method and the 4-partition method are both 1, since the 3 partitions are all on the server on 8004 and the 4 partitions are all on the server on 8005.
We can add more partitions on 8006 to 4-partition scheme by changing `server_list`:
Add the server on 8006 to the 4-partition method:
......@@ -443,7 +438,7 @@ We can add more partitions on 8006 to 4-partition scheme by changing `server_lis 3/4
The client still remains unchanged.
The client still hardly changes.
TRACE: 09-06 11:11:51: * 0 src/brpc/policy/file_naming_service.cpp:83] Got 11 unique addresses from `server_list'
......@@ -454,7 +449,7 @@ TRACE: 09-06 11:11:53: * 0 client.cpp:226] Sending EchoRequest at qps=133531 l
TRACE: 09-06 11:11:54: * 0 client.cpp:226] Sending EchoRequest at qps=136072 latency=361
Notice the traffic on 8006 at the server side. The flow ratio of the three servers is about 3 : 4 : 4, as the capacity of the 3-partition scheme is still 1 while capacity of 4-partition scheme increases to 2 (due to the addition of a regular channel on 8006). As a result, the overall proportion between the two schemes is 3 : 8. Each partition inside the 4-partition scheme has 2 instances on 8005 and 8006, between which the round-robin load balancing is applied to split the traffic equally. Finally, the proportion among the 3 servers is 3 : 4 : 4.
Notice the traffic on 8006 at the server side. The capacity of the 3-partition method is still 1 while capacity of the 4-partition method increases to 2 due to the addition of the server on 8006, thus the overall proportion between the methods becomes 3:8. Each partition inside the 4-partition method has 2 instances on 8005 and 8006 respectively, between which the round-robin load balancing is applied to split the traffic. As a result, the traffic proportion between the 3 servers becomes 3:4:4.
TRACE: 09-06 11:11:51: * 0 server.cpp:192] S[0]=199625 S[1]=263226 S[2]=0 [total=462851]
......@@ -463,7 +458,7 @@ TRACE: 09-06 11:11:53: * 0 server.cpp:192] S[0]=133003 S[1]=178328 S[2]=178325
TRACE: 09-06 11:11:54: * 0 server.cpp:192] S[0]=135534 S[1]=180386 S[2]=180333 [total=496253]
Let's see what happens if we remove one partition of the 3-partition scheme:
See what happens if one partition in the 3-partition method is removed: (You can comment one line in file://server_list by #)
......@@ -481,7 +476,7 @@ Let's see what happens if we remove one partition of the 3-partition scheme: 3/4
The client noticed the changes in the `server_list`:
The client senses the change in `server_list`:
TRACE: 09-06 11:17:47: * 0 src/brpc/policy/file_naming_service.cpp:83] Got 10 unique addresses from `server_list'
......@@ -491,7 +486,7 @@ TRACE: 09-06 11:17:49: * 0 client.cpp:226] Sending EchoRequest at qps=124100 l
TRACE: 09-06 11:17:50: * 0 client.cpp:226] Sending EchoRequest at qps=123743 latency=397
Notice the traffic drop on 8004 at the server side. The reason is that the 3-partition scheme is not complete anymore once the last 2/3 partition has been removed. The capacity of this scheme dropped down to zero so that there was no requests on 8004 anymore.
The traffic on 8004 drops to zero quickly at the server side. The reason is that the 3-partition method is not complete anymore once the last 2/3 partition has been removed. The capacity becomes zero and no requests are sent to the server on 8004 anymore.
TRACE: 09-06 11:17:47: * 0 server.cpp:192] S[0]=130864 S[1]=174499 S[2]=174548 [total=479911]
......@@ -500,4 +495,4 @@ TRACE: 09-06 11:17:49: * 0 server.cpp:192] S[0]=0 S[1]=245961 S[2]=245888 [tot
TRACE: 09-06 11:17:50: * 0 server.cpp:192] S[0]=0 S[1]=250198 S[2]=250150 [total=500348]
Under the production environment, we will gradually increase the number of instance on 4-partition scheme while terminating instance on 3-partition scheme. `DynamicParititonChannel` can divide the traffic based on the capacity of all partitions dynamically. When the capacity of 3-partition scheme drops down to 0, then we've smoothly migrated all the servers from 3-partition scheme to 4-partition one without changing the client's code.
In real online environments, we gradually increase the number of instances on the 4-partition method and removes instances on the 3-partition method. `DynamicParititonChannel` divides the traffic based on capacities of all partitions dynamically. When capacity of the 3-partition method drops to 0, we've smoothly migrated all servers from 3 partitions to 4 partitions without changing the client-side code.
