Commit 8d51edb8 authored by jiangrujie's avatar jiangrujie

+ Add more docs for client

Change-Id: I1847547214a107322b69af5e7f8dbcba8bf02968
parent 93d98471
有时为了保证可用性,需要同时访问两路服务,哪个先返回就取哪个。在baidu-rpc中,这有多种做法:
# 当后端server可以挂在一个名字服务内时
Channel开启backup request。这个Channel会先向其中一个server发送请求,如果在ChannelOptions.backup_request_ms后还没回来,再向另一个server发送。之后哪个先回来就取哪个。在设置了合理的backup_request_ms后,大部分时候只会发一个请求,对后端服务只有一倍压力。
示例代码见[example/backup_request_c++](https://svn.baidu.com/public/trunk/baidu-rpc/example/backup_request_c++)。这个例子中,client设定了在2ms后发送backup request,server在碰到偶数位的请求后会故意睡眠20ms以触发backup request。
运行后,client端和server端的日志分别如下,“index”是请求的编号。可以看到server端在收到第一个请求后会故意sleep 20ms,client端之后发送另一个同样index的请求,最终的延时并没有受到故意sleep的影响。
![img](http://wiki.baidu.com/download/attachments/160281427/image2015-12-28%2019%3A48%3A54.png?version=1&modificationDate=1451303334000&api=v2)
![img](http://wiki.baidu.com/download/attachments/160281427/image2015-12-28%2019%3A48%3A2.png?version=1&modificationDate=1451303282000&api=v2)
/rpcz也显示client在2ms后触发了backup超时并发出了第二个请求。
![img](http://wiki.baidu.com/download/attachments/160281427/image2015-12-28%2019%3A54%3A22.png?version=1&modificationDate=1451303662000&api=v2)
## 选择合理的backup_request_ms
可以观察baidu-rpc默认提供的latency_cdf图,或自行添加。cdf图的y轴是延时(默认微秒),x轴是小于y轴延时的请求的比例。在下图中,选择backup_request_ms=2ms可以大约覆盖95.5%的请求,选择backup_request_ms=10ms则可以覆盖99.99%的请求。
![img](http://wiki.baidu.com/download/attachments/160281427/image2015-12-28%2021%3A23%3A48.png?version=1&modificationDate=1451309036000&api=v2)
自行添加的方法:
```c++
#include <bvar/bvar.h>
#include <base/time.h>
...
bvar::LatencyRecorder my_func_latency("my_func");
...
base::Timer tm;
tm.start();
my_func();
tm.stop();
my_func_latency << tm.u_elapsed(); // u代表微秒,还有s_elapsed(), m_elapsed(), n_elapsed()分别对应秒,毫秒,纳秒。
// 好了,在/vars中会显示my_func_qps, my_func_latency, my_func_latency_cdf等很多计数器。
```
# 当后端server不能挂在一个名字服务内时
【推荐】建立一个开启backup request的SelectiveChannel,其中包含两个sub channel。访问这个SelectiveChannel和上面的情况类似,会先访问一个sub channel,如果在ChannelOptions.backup_request_ms后没返回,再访问另一个sub channel。如果一个sub channel对应一个集群,这个方法就是在两个集群间做互备。SelectiveChannel的例子见[example/selective_echo_c++](https://svn.baidu.com/public/trunk/baidu-rpc/example/selective_echo_c++),具体做法请参考上面的过程。
【不推荐】发起两个异步RPC后Join它们,它们的done内是相互取消的逻辑。示例代码见[example/cancel_c++](https://svn.baidu.com/public/trunk/baidu-rpc/example/cancel_c++)。这种方法的问题是总会发两个请求,对后端服务有两倍压力,这个方法怎么算都是不经济的,你应该尽量避免用这个方法。
\ No newline at end of file
如果你的程序只使用了baidu-rpc的client或根本没有使用baidu-rpc,但你也想使用baidu-rpc的内置服务,只要在程序中启动一个空的server就行了,这种server我们称为**dummy server**
# 使用了baidu-rpc的client
只要在程序运行目录建立dummy_server.port文件,填入一个端口号(比如8888),程序会马上在这个端口上启动一个dummy server。在浏览器中访问它的内置服务,便可看到同进程内的所有bvar。
![img](http://wiki.baidu.com/download/attachments/71337189/image2015-12-25%2017%3A46%3A20.png?version=1&modificationDate=1451036781000&api=v2)
![img](http://wiki.baidu.com/download/attachments/71337189/image2015-12-25%2017%3A47%3A30.png?version=1&modificationDate=1451036850000&api=v2)
![img](http://wiki.baidu.com/download/attachments/71337189/image2015-12-25%2017%3A48%3A24.png?version=1&modificationDate=1451036904000&api=v2)
# 没有使用baidu-rpc
你必须手动加入dummy server。你得先查看[Getting Started](http://wiki.baidu.com/display/RPC/Getting+Started)如何下载和编译baidu-rpc,然后在程序入口处加入如下代码片段:
```c++
#include <baidu/rpc/server.h>
...
int main() {
...
baidu::rpc::Server dummy_server;
baidu::rpc::ServerOptions dummy_server_options;
dummy_server_options.num_threads = 0; // 不要改变寄主程序的线程数。
if (dummy_server.Start(8888/*port*/, &dummy_server_options) != 0) {
LOG(FATAL) << "Fail to start dummy server";
return -1;
}
...
}
```
r31803之后加入dummy server更容易了,只要一行:
```c++
#include <baidu/rpc/server.h>
...
int main() {
...
baidu::rpc::StartDummyServerAt(8888/*port*/);
...
}
```
\ No newline at end of file
This diff is collapsed.
[memcached](http://memcached.org/)是常用的缓存服务,为了使用户更快捷地访问memcached并充分利用bthread的并发能力,baidu-rpc直接支持memcache协议。示例程序:<https://svn.baidu.com/public/trunk/baidu-rpc/example/memcache_c++/>
> 注意:baidu-rpc只支持memcache的二进制协议。memcached在1.3前只有文本协议,但在当前看来支持的意义甚微。如果你的memcached早于1.3,升级版本。
相比使用[libmemcached](http://libmemcached.org/libMemcached.html)(官方client)的优势有:
- 线程安全。用户不需要为每个线程建立独立的client。
- 支持同步、异步、批量同步、批量异步等访问方式,能使用ParallelChannel等组合访问方式。
- 有明确的request和response。而libmemcached是没有的,收到的消息不能直接和发出的消息对应上,用户需要自己做维护工作。
- 支持多种[连接方式](http://wiki.baidu.com/pages/viewpage.action?pageId=213828702#id-访问Memcached-连接方式)。支持超时、backup request、取消、tracing、内置服务等一系列RPC基本福利。
当前实现充分利用了RPC的并发机制并尽量避免了拷贝。一个client可以轻松地把一个同机memcached实例([版本1.4.15](https://svn.baidu.com/third-64/tags/memcached/memcached_1-4-15-100_PD_BL/))压到极限:单连接9万,多连接33万。在大部分情况下,baidu-rpc应该能充分发挥memcached的性能。
# 访问单台memcached
创建一个访问memcached的Channel:
```c++
#include <baidu/rpc/memcache.h>
#include <baidu/rpc/channel.h>
ChannelOptions options;
options.protocol = baidu::rpc::PROTOCOL_MEMCACHE;
if (channel.Init("0.0.0.0:11211", &options) != 0) { // 11211是memcached的默认端口
LOG(FATAL) << "Fail to init channel to memcached";
return -1;
}
...
```
往memcached中设置一份数据。
```c++
// 写入key="hello" value="world" flags=0xdeadbeef,10秒失效,无视cas。
baidu::rpc::MemcacheRequest request;
baidu::rpc::MemcacheResponse response;
baidu::rpc::Controller cntl;
if (!request.Set("hello", "world", 0xdeadbeef/*flags*/, 10/*expiring seconds*/, 0/*ignore cas*/)) {
LOG(FATAL) << "Fail to SET request";
return -1;
}
channel.CallMethod(NULL, &cntl, &request, &response, NULL/*done*/);
if (cntl.Failed()) {
LOG(FATAL) << "Fail to access memcached, " << cntl.ErrorText();
return -1;
}
if (!response.PopSet(NULL)) {
LOG(FATAL) << "Fail to SET memcached, " << response.LastError();
return -1;
}
...
```
上述的代码有如下注意点:
- 请求类型必须为MemcacheRequest,回复类型必须为MemcacheResponse,否则CallMethod会失败。不需要stub,直接调用channel.CallMethod,method填NULL。
- 调用request.XXX()增加操作,本例XXX=Set,一个request多次调用不同的操作,这些操作会被同时送到memcached(常被称为pipeline模式)。
- 依次调用response.PopXXX()弹出操作结果,本例XXX=Set,成功返回true,失败返回false,调用response.LastError()可获得错误信息。XXX必须和request的依次对应,否则失败。本例中若用PopGet就会失败,错误信息为“not a GET response"。
- Pop结果独立于RPC结果。即使Set失败,RPC可能还是成功的。RPC失败意味着连接断开,超时之类的。“不能把某个值设入memcached”对于RPC来说还是成功的。如果业务上认为要成功操作才算成功,那么你不仅要判RPC成功,还要判PopXXX是成功的。
目前支持的请求操作有:
```c++
bool Set(const Slice& key, const Slice& value, uint32_t flags, uint32_t exptime, uint64_t cas_value);
bool Add(const Slice& key, const Slice& value, uint32_t flags, uint32_t exptime, uint64_t cas_value);
bool Replace(const Slice& key, const Slice& value, uint32_t flags, uint32_t exptime, uint64_t cas_value);
bool Append(const Slice& key, const Slice& value, uint32_t flags, uint32_t exptime, uint64_t cas_value);
bool Prepend(const Slice& key, const Slice& value, uint32_t flags, uint32_t exptime, uint64_t cas_value);
bool Delete(const Slice& key);
bool Flush(uint32_t timeout);
bool Increment(const Slice& key, uint64_t delta, uint64_t initial_value, uint32_t exptime);
bool Decrement(const Slice& key, uint64_t delta, uint64_t initial_value, uint32_t exptime);
bool Touch(const Slice& key, uint32_t exptime);
bool Version();
```
对应的回复操作:
```c++
// Call LastError() of the response to check the error text when any following operation fails.
bool PopGet(IOBuf* value, uint32_t* flags, uint64_t* cas_value);
bool PopGet(std::string* value, uint32_t* flags, uint64_t* cas_value);
bool PopSet(uint64_t* cas_value);
bool PopAdd(uint64_t* cas_value);
bool PopReplace(uint64_t* cas_value);
bool PopAppend(uint64_t* cas_value);
bool PopPrepend(uint64_t* cas_value);
bool PopDelete();
bool PopFlush();
bool PopIncrement(uint64_t* new_value, uint64_t* cas_value);
bool PopDecrement(uint64_t* new_value, uint64_t* cas_value);
bool PopTouch();
bool PopVersion(std::string* version);
```
# 访问memcached集群
建立一个使用c_md5负载均衡算法的channel,每个MemcacheRequest只包含一个操作或确保所有的操作始终落在同一台server,就能访问挂载在对应名字服务下的memcached集群了。如果request包含了多个操作,在当前实现下这些操作总会送向同一个server。比方说一个request中包含了多个Get操作,而对应的key分布在多个server上,那么结果就肯定不对了,这个情况下你必须把一个request分开为多个。
或者你可以沿用常见的[twemproxy](https://github.com/twitter/twemproxy)方案。这个方案虽然需要额外部署proxy,还增加了延时,但client端仍可以像访问单点一样的访问它。
\ No newline at end of file
This diff is collapsed.
# 概述
在一些应用场景中, client或server需要像对面发送大量数据,这些数据非常大或者持续地在产生以至于无法放在一个RPC的附件中。比如一个分布式系统的不同节点间传递replica或snapshot。client/server之间虽然可以通过多次RPC把数据切分后传输过去,但存在如下问题:
- 如果这些RPC是并行的,无法保证接收端有序地收到数据,拼接数据的逻辑相当复杂。
- 如果这些RPC是串行的,每次传递都得等待一次网络RTT+处理数据的延时,特别是后者的延时可能是难以预估的。
为了让大块数据以流水线的方式在client/server之间传递, 我们提供了Streaming RPC这种交互模型。Streaming RPC让用户能够在client/service之间建立用户态连接,称为Stream, 同一个TCP连接之上能同时存在多个Stream。 Stream的传输数据以消息为基本单位, 输入端可以源源不断的往Stream中写入消息, 接收端会按输入端写入顺序收到消息。
Streaming RPC保证:
- 有消息边界。
- 接收消息的顺序和发送消息的顺序严格一致。
- 全双工。
- 支持流控。
- 提供超时提醒
目前的实现还没有自动切割过大的消息,同一个tcp连接上的多个Stream之间可能有[Head-of-line blocking](https://en.wikipedia.org/wiki/Head-of-line_blocking)问题,请尽量避免过大的单个消息,实现自动切割后我们会告知并更新文档。
例子见[example/streaming_echo_c++](https://svn.baidu.com/public/trunk/baidu-rpc/example/streaming_echo_c++/)
# 建立Stream
目前Stream都由Client端建立。Client先在本地创建一个Stream,再通过一次RPC(必须使用标准协议)与指定的Service建立一个Stream,如果Service在收到请求之后选择接受这个Stream, 那在response返回Client后Stream就会建立成功。过程中的任何错误都把RPC标记为失败,同时也意味着Stream创建失败。用linux下建立连接的过程打比方,Client先创建[socket](http://linux.die.net/man/7/socket)(创建Stream),再调用[connect](http://linux.die.net/man/2/connect)尝试与远端建立连接(通过RPC建立Stream),远端[accept](http://linux.die.net/man/2/accept)后连接就建立了(service接受后创建成功)。
> 如果Client尝试向不支持Streaming RPC的老Server建立Stream,将总是失败。
程序中我们用StreamId代表一个Stream,对Stream的读写,关闭操作都将作用在这个Id上。
```c++
struct StreamOptions
// The max size of unconsumed data allowed at remote side.
// If |max_buf_size| <= 0, there's no limit of buf size
// default: 2097152 (2M)
int max_buf_size;
// Notify user when there's no data for at least |idle_timeout_ms|
// milliseconds since the last time that on_received_messages or on_idle_timeout
// finished.
// default: -1
long idle_timeout_ms;
// How many messages at most passed to handler->on_received_messages
// default: 1
size_t max_messages_size;
// Handle input message, if handler is NULL, the remote side is not allowd to
// write any message, who will get EBADF on writting
// default: NULL
StreamInputHandler* handler;
};
// [Called at the client side]
// Create a Stream at client-side along with the |cntl|, which will be connected
// when receiving the response with a Stream from server-side. If |options| is
// NULL, the Stream will be created with default options
// Return 0 on success, -1 otherwise
int StreamCreate(StreamId* request_stream, Controller &cntl, const StreamOptions* options);
```
# 接受Stream
如果client在RPC上附带了一个Stream, service在收到RPC后可以通过调用StreamAccept接受。接受后Server端对应产生的Stream存放在response_stream中,Server可通过这个Stream向Client发送数据。
```c++
// [Called at the server side]
// Accept the Stream. If client didn't create a Stream with the request
// (cntl.has_remote_stream() returns false), this method would fail.
// Return 0 on success, -1 otherwise.
int StreamAccept(StreamId* response_stream, Controller &cntl, const StreamOptions* options);
```
# 读取Stream
在建立或者接受一个Stream的时候, 用户可以继承StreamInputHandler并把这个handler填入StreamOptions中. 通过这个handler,用户可以处理对端的写入数据,连接关闭以及idle timeout
```c++
class StreamInputHandler {
public:
// 当接收到消息后被调用
virtual int on_received_messages(StreamId id, base::IOBuf *const messages[], size_t size) = 0;
// 当Stream上长时间没有数据交互后被调用
virtual void on_idle_timeout(StreamId id) = 0;
// 当Stream被关闭时被调用
virtual void on_closed(StreamId id) = 0;
};
```
>***第一次收到请求的时间***
>
>在client端,如果建立过程是一次同步RPC, 那在等待的线程被唤醒之后,on_received_message就可能会被调用到。 如果是异步RPC请求, 那等到这次请求的done->Run() 执行完毕之后, on_received_message就可能会被调用。
>
>在server端, 当框架传入的done->Run()被调用完之后, on_received_message就可能会被调用。
# 写入Stream
```c++
// Write |message| into |stream_id|. The remote-side handler will received the
// message by the written order
// Returns 0 on success, errno otherwise
// Errno:
// - EAGAIN: |stream_id| is created with positive |max_buf_size| and buf size
// which the remote side hasn't consumed yet excceeds the number.
// - EINVAL: |stream_id| is invalied or has been closed
int StreamWrite(StreamId stream_id, const base::IOBuf &message);
```
# 流控
当存在较多已发送但未接收的数据时,发送端的Write操作会立即失败(返回EAGAIN), 这时候可以通过同步或异步的方式等待对端消费掉数据。
```c++
// Wait util the pending buffer size is less than |max_buf_size| or error occurs
// Returns 0 on success, errno otherwise
// Errno:
// - ETIMEDOUT: when |due_time| is not NULL and time expired this
// - EINVAL: the Stream was close during waiting
int StreamWait(StreamId stream_id, const timespec* due_time);
// Async wait
void StreamWait(StreamId stream_id, const timespec *due_time,
void (*on_writable)(StreamId stream_id, void* arg, int error_code),
void *arg);
```
# 关闭Stream
```c++
// Close |stream_id|, after this function is called:
// - All the following |StreamWrite| would fail
// - |StreamWait| wakes up immediately.
// - Both sides |on_closed| would be notifed after all the pending buffers have
// been received
// This function could be called multiple times without side-effects
int StreamClose(StreamId stream_id);
```
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment