With the growth of the number of business products, the access pattern to downstream becomes increasingly complicate, which often contains multiple simultaneous RPCs or subsequent asynchronous ones. However, these could easily introduce very tricky bugs under multi-thread environment, of which users may not even aware, and it's also difficult to debug and reproduce. Moreover, implementations may not provide full support for various access patterns, in which case you have to write your own. Take semi-synchronous RPC as an example, which means waiting for multiple asynchronous RPCs to complete. A common implementation for synchronous access would be issuing multiple requests asynchronously and waiting for their completion, while the implementation for asynchronous access makes use of a callback with a counter. Each time an asynchronous RPC finishes, the counter decrement itself until zero in which case the callback is called. Now let's analyze their weakness:
- The code is inconsistent between synchronous pattern and asynchronous one. It's difficult for users to move from one pattern to another. From the design point of view, inconsistencies suggest lose of essence.
- Cancellation is not supported in general. It's not easy to cancel an RPC in time correctly, let alone a combination of access. Most implementations do not support cancellation of a combo access. However, it's a must for some speed up technique such as backup request.
- Cascading is not supported, which means it's hard to turn a semi-synchronous access into one part of a "larger" access. Code may meet the current needs, but it's not generic.
As a result, we need a better abstraction. If there is a structure whose combination is still the same structure, the interface to synchronous access, asynchronous one, cancellation and other operations would be the same for users. In fact, we already have this structure `Channel`. If we can combine some channels into larger and more complex ones in different ways along with different access patterns, then users will be armed with a consistent and modular building block. Welcome to this powerful tool.
# ParallelChannel
`ParallelChannel` (referred as "pchan") sends requests to all the sub channels inside at the same time and merges their results. The user can modify the request via `CallMapper` and merge the results with `ResponseMerger`. `ParallelChannel` looks like a `Channel`:
- Support synchronous and asynchronous access.
- Can be destroyed immediately after initiating an asynchronous operation.
- Support cancellation.
- Support timeout.
The sample code is shown in [example/parallel_echo_c++](https://github.com/brpc/brpc/tree/master/example/parallel_echo_c++/).
Any subclasses of `brpc::ChannelBase` can join `ParallelChannel`, including `ParallelChannel` and other combo channels. The user can set `ParallelChannelOptions.fail_limit` to control the maximum number of acceptable failure. When the failed results reach this number, RPC will end immediately without waiting for timeout.
A sub channel can be added to the same `ParallelChannel` for multiple times. This is useful when you need to initiate multiple asynchronous visits to the same service and wait for them to complete.
The following picture shows the internal structure of the `ParallelChannel`:
![img](../images/pchan.png)
## Add sub channel
You can add a sub channel into `ParallelChannel` using the following API:
```c++
intAddChannel(brpc::ChannelBase*sub_channel,
ChannelOwnershipownership,
CallMapper*call_mapper,
ResponseMerger*response_merger);
```
When `ownership` is `brpc::OWNS_CHANNEL`, the `sub_channel` will be destroyed when the `ParallelChannel` destructs. Since a sub channel can be added to a `ParallelChannel` multiple times, it will be deleted (only once) as long as one of the parameter `ownership` is `brpc::OWNS_CHANNEL`.
Calling ` AddChannel` during a `ParallelChannel` RPC is **NOT thread safe**.
## CallMapper
This class converts `ParallelChannel` requests to `sub channel` ones. If `call_mapper` is NULL, the request for the sub channel is exactly the same as that for `ParallelChannel`, and the response is created by calling `New()` on `ParallelChannel`'s response. If `call_mapper` is not NULL, it will be deleted when `ParallelChannel` destructs. Due to the reference count inside, `call_mapper` can be associated with multiple sub channels.
```c++
classCallMapper{
public:
virtual~CallMapper();
virtualSubCallMap(intchannel_index/*starting from 0*/,
constgoogle::protobuf::MethodDescriptor*method,
constgoogle::protobuf::Message*request,
google::protobuf::Message*response)=0;
};
```
`channel_index`: The position of the sub channel inside `ParallelChannel`, starting from zero.
Return `SubCall` to control the corresponding sub channel. It has two special values:
-`SubCall::Bad()`: The current visit to `ParallelChannel` fails immediately with `Controller::ErrorCode()` being `EREQUEST`.
-`SubCall::Skip()`: To skip RPC to this sub channel. If all sub channels have been skipped, the request fails immediately with `Controller::ErrorCode()` being `ECANCELED`.
The common implementations of `Map()` are listed below:
- Broadcast request, which is also the behavior when `call_mapper` is NULL:
```c++
classBroadcaster:publicCallMapper{
public:
SubCallMap(intchannel_index/*starting from 0*/,
constgoogle::protobuf::MethodDescriptor*method,
constgoogle::protobuf::Message*request,
google::protobuf::Message*response){
// Keep method/request to be same as those of pchan
// response is created by `new`
// The last flag tells pchan to delete response after RPC
`response_merger` merges the response of all sub channels into the overall one. When it's NULL, `response->MergeFrom(*sub_response)` will be used instead, whose behavior can be summarized as "merge all the repeated fields and overwrite the rest". Your can implement `ResponseMerger` to achieve more complex behavior. `response_merger` will be used to merge sub response one by one so that you do not need to consider merging multiple response at the same time. It will be deleted when `ParallelChannel ` destructs if it's not NULL. Due to the reference count inside, `response_merger ` can be associated with multiple sub channels.
The accepted values of `Result` are:
- MERGED: Successful merged.
- FAIL (known as IGNORE): Count as one failure of merging. For example, if there are 10 sub channels & the `fail_limit=4` while 3 of which has already failed, a final merging failure will end this RPC with error at once due to the `fail_limit`.
- FAIL_ALL (known as CALL_FAILED): Immediately fails this RPC.
## Get the controller object of each sub channel
Sometimes users may need the details of each sub channel. This can be done by `Controller.sub(i)` to get the controller corresponding to a specific sub channel.
```c++
// Get the controllers for accessing sub channels in combo channels.
// Ordinary channel:
// sub_count() is 0 and sub() is always NULL.
// ParallelChannel/PartitionChannel:
// sub_count() is #sub-channels and sub(i) is the controller for
// accessing i-th sub channel inside ParallelChannel, if i is outside
// [0, sub_count() - 1], sub(i) is NULL.
// NOTE: You must test sub() against NULL, ALWAYS. Even if i is inside
// range, sub(i) can still be NULL:
// * the rpc call may fail and terminate before accessing the sub channel
// * the sub channel was skipped
// SelectiveChannel/DynamicPartitionChannel:
// sub_count() is always 1 and sub(0) is the controller of successful
// or last call to sub channels.
intsub_count()const;
constController*sub(intindex)const;
```
# SelectiveChannel
[SelectiveChannel](https://github.com/brpc/brpc/blob/master/src/brpc/selective_channel.h)("referred as schan") wraps multiple `Channel` using a specific load balancing algorithm to achieve a higher level of `Channel`. The requests will be sent to the sub channel rather than the specific Server. `SelectiveChannel` is mainly used to do load balancing between groups of machines. It has some basic properties of `Channel`:
- Support synchronous and asynchronous access.
- Can be destroyed immediately after initiating an asynchronous operation.
- Support cancellation.
- Support timeout.
The sample code is shown in [example/selective_echo_c++](https://github.com/brpc/brpc/tree/master/example/selective_echo_c++/).
Any subclasses of `brpc::ChannelBase` can join `SelectiveChannel`, including `SelectiveChannel` and other combo channels.
The retry mechanism of `SelectiveChannel` is independent of its sub channels. When the access between `SelectiveChannel ` and one of its sub channel fails (Note that the sub channel may already retried for a couple of times), it will retry another sub channel.
Currently `SelectiveChannel` demands all requests remain valid until the end of RPC, while other channels do not have this requirement. If you plan to use `SelectiveChannel` asynchronously, make sure that the request is deleted inside `done`.
## Use SelectiveChannel
The initialization of `SelectiveChannel` is almost the same as regular `Channel`, while it doesn't need a naming service parameter in `Init`. The reason is that `SelectiveChannel` is sub channel oriented and sub channels can be added into by `AddChannel` dynamically, but regular `Channel` is server oriented which has to be recorded in naming service.
```c++
#include <brpc/selective_channel.h>
...
brpc::SelectiveChannelschan;
brpc::ChannelOptionsschan_options;
schan_options.timeout_ms=...;
schan_options.backup_request_ms=...;
schan_options.max_retry=...;
if(schan.Init(load_balancer,&schan_options)!=0){
LOG(ERROR)<<"Fail to init SelectiveChannel";
return-1;
}
```
After a successful initialization, add sub channel using `AddChannel`.
```c++
// The second parameter ChannelHandle is used to delete sub channel,
- Unlike `ParallelChannel`, `SelectiveChannel::AddChannel` can be called at any time, even if the it's being used during RPC (which takes effect at the next access).
-`SelectiveChannel` always owns the sub channel objects, which is different from `ParallelChannel`'s configurable ownership.
- If the second parameter of `AddChannel` is not NULL, it will be filled using `brpc::SelectiveChannel::ChannelHandle`, which can be used as a parameter to `RemoveAndDestroyChannel` to delete a channel dynamically.
-`SelectiveChannel` overrides the timeout value of sub channel's using its own one. For example, having timeout set to 100ms for a sub channel and 500ms for `SelectiveChannel`, the actual request timeout is 500ms rather than 100ms.
The way of using `SelectiveChannel` is exactly the same as that of regular channels.
## Divide requests into multiple DNS
Sometimes we need to divide requests into multiple DNS node. The reasons may be:
- Machines of the same service are mounted under different DNS.
- Machines are split into multiple groups. Requests will be sent to one of the groups first and then travel inside that group. There is a difference in the way of traffic division between groups or inside a single group.
The above can be achieved through `SelectiveChannel`.
The following code creates a `SelectiveChannel` and inserts three regular channels which access different DNS nodes.
if(channel.AddChannel(sub_channel,NULL/*handle for removal*/)!=0){
LOG(ERROR)<<"Fail to add sub_channel to channel";
return-1;
}
}
...
XXXService_Stubstub(&channel);
stub.FooMethod(&cntl,&request,&response,NULL);
...
```
# PartitionChannel
[PartitionChannel](https://github.com/brpc/brpc/blob/master/src/brpc/partition_channel.h) is a specialized `ParallelChannel`, in which it can add sub channels automatically based on the tag value inside a naming service. As a result, users can group machines together inside one naming service and use tags to partition them apart. The sample code is shown in [example/partition_echo_c++](https://github.com/brpc/brpc/tree/master/example/partition_echo_c++/).
`ParititonChannel` only supports one way to partition channels. When you need multiple scheme or replace the current one smoothly, you should try `DynamicPartitionChannel`. It will create the corresponding sub `PartitionChannel` based on different partition methods, and divide traffic into these partition channels. The sample code is shown in [example/dynamic_partition_echo_c++](https://github.com/brpc/brpc/tree/master/example/dynamic_partition_echo_c++/).
If partitions belong to different name services, you have to write your own channel, which should create and add a sub channel for each different naming service by means of `ParallelChannel`. Please refer to the previous section for `ParellelChannel`'s usage.
## Use PartitionChannel
First of all, implement your own `PartitionParser`. For this example, the tag format is `N/M`, where N represents the partition index and M for the total number of partitions. As a result, `0/3` means it's the first partition of the three.
// The RPC interface is the same as regular Channel
```
## Use DynamicPartitionChannel
`DynamicPartitionChannel` and `PartitionChannel` are basically the same in usage. Implementing `PartitionParser` first followed by initialization, where the `Init` does not need `num_partition_kinds` since `DynamicPartitionChannel` dynamically creates sub `PartitionChannel` for each partitions.
Now we demonstrate how to use `DynamicPartitionChannel` to migrate from 3-partition scheme to 4-partition scheme.
First of all we start three `Server` objects on port 8004, 8005, 8006 respectively.
```
$ ./echo_server -server_num 3
TRACE: 09-06 10:40:39: * 0 server.cpp:159] EchoServer is serving on port=8004
TRACE: 09-06 10:40:39: * 0 server.cpp:159] EchoServer is serving on port=8005
TRACE: 09-06 10:40:39: * 0 server.cpp:159] EchoServer is serving on port=8006
Note that each server will print a flow summary every second, which is all 0 now. Then we start a client using `DynamicPartitionChannel`, whose initialization code is shown below:
```c++
...
brpc::DynamicPartitionChannelchannel;
brpc::PartitionChannelOptionsoptions;
// Allow server_list to be empty when calling DynamicPartitionChannel::Init
options.succeed_without_server=true;
// Failure on any single partition terminates the RPC immediately.
The reason is that each request needs 3 access to 8004 or 4 access to 8005. Note that the flow ratio between 8004 and 8005 is 3 : 4, so the client issues requests to both partition schemes with the same probability. This flow ratio depends on capacity, which can be calculated recursively:
- The capacity of a regular `Channel` using `NamingService` equals to the number of servers in the naming service, as the capacity of a single-server `Channel` is 1.
- The capacity of `ParallelChannel` or `PartitionChannel` equals to the minimum value of its sub channel's.
- The capacity of `SelectiveChannel` equals to the sum of all its sub channel's.
- The capacity of `DynamicPartitionChannel` equals to the sum of all its sub `PartitionChannel`'s.
In this case, the capacity of the 3-partition channel and the 4-partition one both equal to 1 (only 1 regular channel in each partition such as 1/3). As all 3-partitions are on 8004 and all 4-partitions are on 8005, the traffic proportion between the two servers is the capacity ratio of the two partition channels.
We can add more partitions on 8006 to 4-partition scheme by changing `server_list`:
Notice the traffic on 8006 at the server side. The flow ratio of the three servers is about 3 : 4 : 4, as the capacity of the 3-partition scheme is still 1 while capacity of 4-partition scheme increases to 2 (due to the addition of a regular channel on 8006). As a result, the overall proportion between the two schemes is 3 : 8. Each partition inside the 4-partition scheme has 2 instances on 8005 and 8006, between which the round-robin load balancing is applied to split the traffic equally. Finally, the proportion among the 3 servers is 3 : 4 : 4.
Notice the traffic drop on 8004 at the server side. The reason is that the 3-partition scheme is not complete anymore once the last 2/3 partition has been removed. The capacity of this scheme dropped down to zero so that there was no requests on 8004 anymore.
Under the production environment, we will gradually increase the number of instance on 4-partition scheme while terminating instance on 3-partition scheme. `DynamicParititonChannel` can divide the traffic based on the capacity of all partitions dynamically. When the capacity of 3-partition scheme drops down to 0, then we've smoothly migrated all the servers from 3-partition scheme to 4-partition one without changing the client's code.