Commit 515a8d0b authored by Ge Jun's avatar Ge Jun

1. Add ThriftStub to send and receive native thrift messages & specify…

1. Add ThriftStub to send and receive native thrift messages & specify method-name directly. As a result, ThriftMessage<T> is removed.
2. ThriftFramedMessage (no matter Cast<>-ed or not) can be sent/received as well so that building proxies of thrift is much easier.
3. ThriftFramedMessage::Cast<T> can be called multiple times with reasonable behaviors, even if T is changed.
4. Server-side errors are sent to client as TApplicationException instead of closing the connection.
5. Code in ThriftService::ProcessThriftFramedRequest() can throw exceptions which will be sent to client as errors as well.
6. Simplify ThriftClosure which does not need many stuffs inherited from NsheadClosure.
7. Port protocol-related patches to thrift_protocol.cpp which was changed before the patches.
8. Remove the unnecessary default malloc when constructing TMemoryBuffer.
9. Use TBinaryProtocolT instead of TBinaryProtocol to make read/write non-virtual, and remove the unnecessary shared_ptr on iprot/oprot.
10. request/response must be ThriftFramedRequest when protocol is thrift, which was not checked before.
11. Limit max length of thrift_method_name (to a reasonable large value) so that intermediate buffer can be allocated on stack directly.
12. Make ThriftFramedMessage uncopyable since the TBase* inside does not have a general copy function.
parent 3589e27b
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
- 线程安全。用户不需要为每个线程建立独立的client. - 线程安全。用户不需要为每个线程建立独立的client.
- 支持同步、异步、批量同步、批量异步等访问方式,能使用ParallelChannel等组合访问方式. - 支持同步、异步、批量同步、批量异步等访问方式,能使用ParallelChannel等组合访问方式.
- 支持多种连接方式(连接池, 短连接), 支持超时、backup request、取消、tracing、内置服务等一系列RPC基本福利. - 支持多种连接方式(连接池, 短连接), 支持超时、backup request、取消、tracing、内置服务等一系列RPC基本福利.
- 性能更好.
# 编译 # 编译
为了复用解析代码,brpc对thrift的支持仍需要依赖thrift库以及thrift生成的代码,thrift格式怎么写,代码怎么生成,怎么编译等问题请参考thrift官方文档。 为了复用解析代码,brpc对thrift的支持仍需要依赖thrift库以及thrift生成的代码,thrift格式怎么写,代码怎么生成,怎么编译等问题请参考thrift官方文档。
...@@ -35,13 +36,13 @@ mkdir build && cd build && cmake ../ -DWITH_THRIFT=1 ...@@ -35,13 +36,13 @@ mkdir build && cd build && cmake ../ -DWITH_THRIFT=1
# Client端访问thrift server # Client端访问thrift server
基本步骤: 基本步骤:
- 创建一个协议设置为brpc::PROTOCOL_THRIFT的Channel - 创建一个协议设置为brpc::PROTOCOL_THRIFT的Channel
- 定义brpc::ThriftMessage<原生Request>作为请求,brpc::ThriftMessage<原生Response>作为回复。raw()方法可以操作原生thrift消息。 - 创建brpc::ThriftStub
- 通过Controller::set_thrift_method_name()设置thrift方法名。 - 使用原生Request和原生Response>发起访问
示例代码如下: 示例代码如下:
```c++ ```c++
#include <brpc/channel.h> #include <brpc/channel.h>
#include <brpc/thrift_message.h>         // 定义了ThriftMessage #include <brpc/thrift_message.h>         // 定义了ThriftStub
... ...
DEFINE_string(server, "0.0.0.0:8019", "IP Address of thrift server"); DEFINE_string(server, "0.0.0.0:8019", "IP Address of thrift server");
...@@ -56,16 +57,15 @@ if (thrift_channel.Init(Flags_server.c_str(), FLAGS_load_balancer.c_str(), &opti ...@@ -56,16 +57,15 @@ if (thrift_channel.Init(Flags_server.c_str(), FLAGS_load_balancer.c_str(), &opti
return -1; return -1;
} }
brpc::ThriftStub stub(&thrift_channel);
... ...
// example::[EchoRequest/EchoResponse]是thrift生成的消息 // example::[EchoRequest/EchoResponse]是thrift生成的消息
brpc::ThriftMessage<example::EchoRequest> req; example::EchoRequest req;
brpc::ThriftMessage<example::EchoResponse> res; example::EchoResponse res;
req.data = "hello";
req.raw().data = "hello"; stub.CallMethod("Echo", &cntl, &req, &res, NULL);
cntl.set_thrift_method_name("Echo");
channel.CallMethod(NULL, &cntl, &req, &res, NULL);
if (cntl.Failed()) { if (cntl.Failed()) {
LOG(ERROR) << "Fail to send thrift request, " << cntl.ErrorText(); LOG(ERROR) << "Fail to send thrift request, " << cntl.ErrorText();
...@@ -76,36 +76,33 @@ if (cntl.Failed()) { ...@@ -76,36 +76,33 @@ if (cntl.Failed()) {
# Server端处理thrift请求 # Server端处理thrift请求
用户通过继承brpc::ThriftService实现处理逻辑,既可以调用thrift生成的handler以直接复用原有的函数入口,也可以像protobuf服务那样直接读取request和设置response。 用户通过继承brpc::ThriftService实现处理逻辑,既可以调用thrift生成的handler以直接复用原有的函数入口,也可以像protobuf服务那样直接读取request和设置response。
```c++ ```c++
class MyThriftProtocol : public brpc::ThriftService { class EchoServiceImpl : public brpc::ThriftService {
public: public:
void ProcessThriftFramedRequest(const brpc::Server&, void ProcessThriftFramedRequest(brpc::Controller* cntl,
brpc::Controller* cntl, brpc::ThriftFramedMessage* req,
brpc::ThriftFramedMessage* request, brpc::ThriftFramedMessage* res,
brpc::ThriftFramedMessage* response, google::protobuf::Closure* done) override {
brpc::ThriftClosure* done) { // Dispatch calls to different methods
if (cntl->thrift_method_name() == "Echo") {
return Echo(cntl, req->Cast<example::EchoRequest>(),
res->Cast<example::EchoResponse>(), done);
} else {
cntl->SetFailed(brpc::ENOMETHOD, "Fail to find method=%s",
cntl->thrift_method_name().c_str());
done->Run();
}
}
void Echo(brpc::Controller* cntl,
const example::EchoRequest* req,
example::EchoResponse* res,
google::protobuf::Closure* done) {
// This object helps you to call done->Run() in RAII style. If you need // This object helps you to call done->Run() in RAII style. If you need
// to process the request asynchronously, pass done_guard.release(). // to process the request asynchronously, pass done_guard.release().
brpc::ClosureGuard done_guard(done); brpc::ClosureGuard done_guard(done);
if (cntl->Failed()) { res->data = req->data + " (processed)";
// NOTE: You can send back a response containing error information
// back to client instead of closing the connection.
cntl->CloseConnection("Close connection due to previous error");
return;
}
example::EchoRequest* req = request->Cast<example::EchoRequest>();
example::EchoResponse* res = response->Cast<example::EchoResponse>();
       // 通过cntl->thrift_method_name()获得被访问的方法名
       if (_native_handler) {
_native_handler->Echo(*res, *req);
} else {
res->data = req->data + "user data";
}
} }
private:
EchoServiceHandler* _native_handler;
}; };
``` ```
...@@ -113,7 +110,7 @@ private: ...@@ -113,7 +110,7 @@ private:
```c++ ```c++
brpc::Server server; brpc::Server server;
brpc::ServerOptions options; brpc::ServerOptions options;
options.thrift_service = new MyThriftProtocol; options.thrift_service = new EchoServiceImpl;
options.idle_timeout_sec = FLAGS_idle_timeout_s; options.idle_timeout_sec = FLAGS_idle_timeout_s;
options.max_concurrency = FLAGS_max_concurrency; options.max_concurrency = FLAGS_max_concurrency;
......
...@@ -8,6 +8,7 @@ Advantages compared to the official solution: ...@@ -8,6 +8,7 @@ Advantages compared to the official solution:
- Thread safety. No need to set up separate clients for each thread. - Thread safety. No need to set up separate clients for each thread.
- Supports synchronous, asynchronous, batch synchronous, batch asynchronous, and other access methods. Combination channels such as ParallelChannel are also supported. - Supports synchronous, asynchronous, batch synchronous, batch asynchronous, and other access methods. Combination channels such as ParallelChannel are also supported.
- Support various connection types(short, connection pool). Support timeout, backup request, cancellation, tracing, built-in services, and other benefits offered by brpc. - Support various connection types(short, connection pool). Support timeout, backup request, cancellation, tracing, built-in services, and other benefits offered by brpc.
- Better performance.
# Compile # Compile
brpc depends on the thrift lib and the code generated by thrift tools to reuse the parsing code. Please read official documents to find out how to write thrift files, generate code, compilations etc. brpc depends on the thrift lib and the code generated by thrift tools to reuse the parsing code. Please read official documents to find out how to write thrift files, generate code, compilations etc.
...@@ -35,13 +36,13 @@ mkdir build && cd build && cmake ../ -DWITH_THRIFT=1 ...@@ -35,13 +36,13 @@ mkdir build && cd build && cmake ../ -DWITH_THRIFT=1
# Client accesses thrift server # Client accesses thrift server
Steps: Steps:
- Create a Channel setting protocol to brpc::PROTOCOL_THRIFT - Create a Channel setting protocol to brpc::PROTOCOL_THRIFT
- Define and use brpc::ThriftMessage<Native-Request> as the request, brpc::ThriftMessage<Native-Response> as the response. Call raw() method to get the native thrift message. - Create brpc::ThriftStub
- Set method-name for thrift via Controller::set_thrift_method_name() - Use native request and response to start RPC directly.
Example code: Example code:
```c++ ```c++
#include <brpc/channel.h> #include <brpc/channel.h>
#include <brpc/thrift_message.h>         // Defines ThriftMessage #include <brpc/thrift_message.h>         // Defines ThriftStub
... ...
DEFINE_string(server, "0.0.0.0:8019", "IP Address of thrift server"); DEFINE_string(server, "0.0.0.0:8019", "IP Address of thrift server");
...@@ -56,17 +57,15 @@ if (thrift_channel.Init(Flags_server.c_str(), FLAGS_load_balancer.c_str(), &opti ...@@ -56,17 +57,15 @@ if (thrift_channel.Init(Flags_server.c_str(), FLAGS_load_balancer.c_str(), &opti
return -1; return -1;
} }
brpc::ThriftStub stub(&thrift_channel);
... ...
// example::[EchoRequest/EchoResponse] are generated by thrift // example::[EchoRequest/EchoResponse] are types generated by thrift
brpc::ThriftMessage<example::EchoRequest> req; example::EchoRequest req;
brpc::ThriftMessage<example::EchoResponse> res; example::EchoResponse res;
req.data = "hello";
req.raw().data = "hello";
cntl.set_thrift_method_name("Echo");
channel.CallMethod(NULL, &cntl, &req, &res, NULL);
stub.CallMethod("Echo", &cntl, &req, &res, NULL);
if (cntl.Failed()) { if (cntl.Failed()) {
LOG(ERROR) << "Fail to send thrift request, " << cntl.ErrorText(); LOG(ERROR) << "Fail to send thrift request, " << cntl.ErrorText();
return -1; return -1;
...@@ -76,36 +75,33 @@ if (cntl.Failed()) { ...@@ -76,36 +75,33 @@ if (cntl.Failed()) {
# Server processes thrift requests # Server processes thrift requests
Inherit brpc::ThriftService to implement the processing code, which may call the native handler generated by thrift to re-use existing entry directly, or read the request and set the response directly just as in other protobuf services. Inherit brpc::ThriftService to implement the processing code, which may call the native handler generated by thrift to re-use existing entry directly, or read the request and set the response directly just as in other protobuf services.
```c++ ```c++
class MyThriftProtocol : public brpc::ThriftService { class EchoServiceImpl : public brpc::ThriftService {
public: public:
void ProcessThriftFramedRequest(const brpc::Server&, void ProcessThriftFramedRequest(brpc::Controller* cntl,
brpc::Controller* cntl, brpc::ThriftFramedMessage* req,
brpc::ThriftFramedMessage* request, brpc::ThriftFramedMessage* res,
brpc::ThriftFramedMessage* response, google::protobuf::Closure* done) override {
brpc::ThriftClosure* done) { // Dispatch calls to different methods
if (cntl->thrift_method_name() == "Echo") {
return Echo(cntl, req->Cast<example::EchoRequest>(),
res->Cast<example::EchoResponse>(), done);
} else {
cntl->SetFailed(brpc::ENOMETHOD, "Fail to find method=%s",
cntl->thrift_method_name().c_str());
done->Run();
}
}
void Echo(brpc::Controller* cntl,
const example::EchoRequest* req,
example::EchoResponse* res,
google::protobuf::Closure* done) {
// This object helps you to call done->Run() in RAII style. If you need // This object helps you to call done->Run() in RAII style. If you need
// to process the request asynchronously, pass done_guard.release(). // to process the request asynchronously, pass done_guard.release().
brpc::ClosureGuard done_guard(done); brpc::ClosureGuard done_guard(done);
if (cntl->Failed()) { res->data = req->data + " (processed)";
// NOTE: You can send back a response containing error information
// back to client instead of closing the connection.
cntl->CloseConnection("Close connection due to previous error");
return;
}
example::EchoRequest* req = request->Cast<example::EchoRequest>();
example::EchoResponse* res = response->Cast<example::EchoResponse>();
       // Get method-name for thrift by cntl->thrift_method_name();
       if (_native_handler) {
_native_handler->Echo(*res, *req);
} else {
res->data = req->data + "user data";
}
} }
private:
EchoServiceHandler* _native_handler;
}; };
``` ```
...@@ -113,7 +109,7 @@ Set the implemented service to ServerOptions.thrift_service and start the servic ...@@ -113,7 +109,7 @@ Set the implemented service to ServerOptions.thrift_service and start the servic
```c++ ```c++
brpc::Server server; brpc::Server server;
brpc::ServerOptions options; brpc::ServerOptions options;
options.thrift_service = new MyThriftProtocol; options.thrift_service = new EchoServiceImpl;
options.idle_timeout_sec = FLAGS_idle_timeout_s; options.idle_timeout_sec = FLAGS_idle_timeout_s;
options.max_concurrency = FLAGS_max_concurrency; options.max_concurrency = FLAGS_max_concurrency;
......
...@@ -54,32 +54,26 @@ int main(int argc, char* argv[]) { ...@@ -54,32 +54,26 @@ int main(int argc, char* argv[]) {
return -1; return -1;
} }
// Send a request and wait for the response every 1 second. brpc::ThriftStub stub(&channel);
int log_id = 0;
// Send a request and wait for the response every 1 second.
while (!brpc::IsAskedToQuit()) { while (!brpc::IsAskedToQuit()) {
brpc::Controller cntl; brpc::Controller cntl;
cntl.set_log_id(log_id ++); // set by user example::EchoRequest req;
example::EchoResponse res;
// wrapper thrift raw request into ThriftMessage
brpc::ThriftMessage<example::EchoRequest> req;
brpc::ThriftMessage<example::EchoResponse> res;
req.raw().data = "hello"; req.data = "hello";
cntl.set_thrift_method_name("Echo"); stub.CallMethod("Echo", &cntl, &req, &res, NULL);
channel.CallMethod(NULL, &cntl, &req, &res, NULL);
if (cntl.Failed()) { if (cntl.Failed()) {
LOG(ERROR) << "Fail to send thrift request, " << cntl.ErrorText(); LOG(ERROR) << "Fail to send thrift request, " << cntl.ErrorText();
sleep(1); // Remove this sleep in production code. sleep(1); // Remove this sleep in production code.
} else { } else {
g_latency_recorder << cntl.latency_us(); g_latency_recorder << cntl.latency_us();
LOG(INFO) << "Thrift Response: " << res.data;
} }
LOG(INFO) << "Thrift Res data: " << res.raw().data;
LOG_EVERY_SECOND(INFO) LOG_EVERY_SECOND(INFO)
<< "Sending thrift requests at qps=" << g_latency_recorder.qps(1) << "Sending thrift requests at qps=" << g_latency_recorder.qps(1)
<< " latency=" << g_latency_recorder.latency(1); << " latency=" << g_latency_recorder.latency(1);
......
...@@ -56,10 +56,13 @@ int main(int argc, char **argv) { ...@@ -56,10 +56,13 @@ int main(int argc, char **argv) {
example::EchoResponse res; example::EchoResponse res;
while (1) { while (1) {
client.Echo(res, req); try {
client.Echo(res, req);
LOG(INFO) << "Req: " << req.data LOG(INFO) << "Req: " << req.data
<< " Res: " << res.data; << " Res: " << res.data;
} catch (std::exception& e) {
LOG(ERROR) << "Fail to rpc, " << e.what();
}
sleep(1); sleep(1);
} }
transport->close(); transport->close();
......
...@@ -18,11 +18,6 @@ ...@@ -18,11 +18,6 @@
#include <butil/logging.h> #include <butil/logging.h>
#include <brpc/server.h> #include <brpc/server.h>
#include <brpc/thrift_service.h> #include <brpc/thrift_service.h>
#include <brpc/details/thrift_utils.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/transport/TBufferTransports.h>
#include "gen-cpp/EchoService.h"
#include "gen-cpp/echo_types.h" #include "gen-cpp/echo_types.h"
DEFINE_int32(port, 8019, "TCP Port of this server"); DEFINE_int32(port, 8019, "TCP Port of this server");
...@@ -30,62 +25,34 @@ DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no " ...@@ -30,62 +25,34 @@ DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
"read/write operations during the last `idle_timeout_s'"); "read/write operations during the last `idle_timeout_s'");
DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel"); DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel");
class EchoServiceHandler : virtual public example::EchoServiceIf {
public:
EchoServiceHandler() {}
void Echo(example::EchoResponse& res, const example::EchoRequest& req) {
// Process request, just attach a simple string.
res.data = req.data + " (processed by handler)";
return;
}
};
static std::atomic<int> g_counter(0);
// Adapt your own thrift-based protocol to use brpc // Adapt your own thrift-based protocol to use brpc
class MyThriftProtocol : public brpc::ThriftService { class EchoServiceImpl : public brpc::ThriftService {
public: public:
explicit MyThriftProtocol(EchoServiceHandler* handler) : _handler(handler) { } void ProcessThriftFramedRequest(brpc::Controller* cntl,
brpc::ThriftFramedMessage* req,
brpc::ThriftFramedMessage* res,
google::protobuf::Closure* done) override {
// Dispatch calls to different methods
if (cntl->thrift_method_name() == "Echo") {
return Echo(cntl, req->Cast<example::EchoRequest>(),
res->Cast<example::EchoResponse>(), done);
} else {
cntl->SetFailed(brpc::ENOMETHOD, "Fail to find method=%s",
cntl->thrift_method_name().c_str());
done->Run();
}
}
void ProcessThriftFramedRequest(const brpc::Server&, void Echo(brpc::Controller* cntl,
brpc::Controller* cntl, const example::EchoRequest* req,
brpc::ThriftFramedMessage* request, example::EchoResponse* res,
brpc::ThriftFramedMessage* response, google::protobuf::Closure* done) {
brpc::ThriftClosure* done) {
// This object helps you to call done->Run() in RAII style. If you need // This object helps you to call done->Run() in RAII style. If you need
// to process the request asynchronously, pass done_guard.release(). // to process the request asynchronously, pass done_guard.release().
brpc::ClosureGuard done_guard(done); brpc::ClosureGuard done_guard(done);
if (cntl->Failed()) { res->data = req->data + " (processed)";
// NOTE: You can send back a response containing error information
// back to client instead of closing the connection.
cntl->CloseConnection("Close connection due to previous error");
return;
}
// get method name by cntl->thrift_method_name() if needed
example::EchoRequest* req = request->Cast<example::EchoRequest>();
example::EchoResponse* res = response->Cast<example::EchoResponse>();
if (g_counter++ % 2 == 0) {
if (!_handler) {
cntl->CloseConnection("Close connection due to no valid handler");
LOG(ERROR) << "No valid handler";
return;
}
_handler->Echo(*res, *req);
} else {
res->data = req->data + " (processed directly)";
}
} }
private:
EchoServiceHandler* _handler;
}; };
int main(int argc, char* argv[]) { int main(int argc, char* argv[]) {
...@@ -95,8 +62,7 @@ int main(int argc, char* argv[]) { ...@@ -95,8 +62,7 @@ int main(int argc, char* argv[]) {
brpc::Server server; brpc::Server server;
brpc::ServerOptions options; brpc::ServerOptions options;
EchoServiceHandler thrift_service_handler; options.thrift_service = new EchoServiceImpl;
options.thrift_service = new MyThriftProtocol(&thrift_service_handler);
options.idle_timeout_sec = FLAGS_idle_timeout_s; options.idle_timeout_sec = FLAGS_idle_timeout_s;
options.max_concurrency = FLAGS_max_concurrency; options.max_concurrency = FLAGS_max_concurrency;
......
...@@ -451,9 +451,6 @@ public: ...@@ -451,9 +451,6 @@ public:
void set_idl_result(int64_t result) { _idl_result = result; } void set_idl_result(int64_t result) { _idl_result = result; }
int64_t idl_result() const { return _idl_result; } int64_t idl_result() const { return _idl_result; }
void set_thrift_method_name(const std::string& method_name) {
_thrift_method_name = method_name;
}
const std::string& thrift_method_name() { return _thrift_method_name; } const std::string& thrift_method_name() { return _thrift_method_name; }
private: private:
...@@ -689,7 +686,6 @@ private: ...@@ -689,7 +686,6 @@ private:
// Thrift method name, only used when thrift protocol enabled // Thrift method name, only used when thrift protocol enabled
std::string _thrift_method_name; std::string _thrift_method_name;
uint32_t _thrift_seq_id;
}; };
// Advises the RPC system that the caller desires that the RPC call be // Advises the RPC system that the caller desires that the RPC call be
......
...@@ -128,6 +128,9 @@ public: ...@@ -128,6 +128,9 @@ public:
void add_with_auth() { void add_with_auth() {
_cntl->add_flag(Controller::FLAGS_REQUEST_WITH_AUTH); _cntl->add_flag(Controller::FLAGS_REQUEST_WITH_AUTH);
} }
std::string* mutable_thrift_method_name() { return &_cntl->_thrift_method_name; }
private: private:
Controller* _cntl; Controller* _cntl;
}; };
......
// Copyright (c) 2017 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// utils for serialize/parse thrift binary message to brpc protobuf obj.
#ifndef BRPC_THRIFT_UTILS_H
#define BRPC_THRIFT_UTILS_H
#include "butil/iobuf.h"
#include "butil/logging.h"
#include <thrift/TDispatchProcessor.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/protocol/TBinaryProtocol.h>
// _THRIFT_STDCXX_H_ is defined by thrift/stdcxx.h which was added since thrift 0.11.0
// TDispatcherProcessor.h above uses shared_ptr and should include stdcxx.h
#ifndef THRIFT_STDCXX
#if defined(_THRIFT_STDCXX_H_)
# define THRIFT_STDCXX apache::thrift::stdcxx
#else
# include <boost/make_shared.hpp>
# define THRIFT_STDCXX boost
#include <boost/make_shared.hpp>
#endif
#endif
namespace brpc {
template <typename T>
void thrift_framed_message_deleter(void* p) {
delete static_cast<T*>(p);
}
template <typename T>
uint32_t thrift_framed_message_writer(void* p, void* prot) {
T* writer = static_cast<T*>(p);
return writer->write(static_cast<::apache::thrift::protocol::TProtocol*>(prot));
}
template<typename T>
bool serialize_iobuf_to_thrift_message(const butil::IOBuf& body,
void* thrift_raw_instance, int32_t* thrift_message_seq_id) {
auto in_buffer =
THRIFT_STDCXX::make_shared<apache::thrift::transport::TMemoryBuffer>();
auto in_portocol =
THRIFT_STDCXX::make_shared<apache::thrift::protocol::TBinaryProtocol>(in_buffer);
// Cut the thrift buffer and parse thrift message
size_t body_len = body.size();
std::unique_ptr<uint8_t[]> thrift_buffer(new uint8_t[body_len]);
const size_t k = body.copy_to(thrift_buffer.get(), body_len);
if ( k != body_len) {
return false;
}
in_buffer->resetBuffer(thrift_buffer.get(), body_len);
// The following code was taken and modified from thrift auto generated code
std::string fname;
::apache::thrift::protocol::TMessageType mtype;
in_portocol->readMessageBegin(fname, mtype, *thrift_message_seq_id);
apache::thrift::protocol::TInputRecursionTracker tracker(*in_portocol);
uint32_t xfer = 0;
::apache::thrift::protocol::TType ftype;
int16_t fid;
xfer += in_portocol->readStructBegin(fname);
using ::apache::thrift::protocol::TProtocolException;
while (true)
{
xfer += in_portocol->readFieldBegin(fname, ftype, fid);
if (ftype == ::apache::thrift::protocol::T_STOP) {
break;
}
switch (fid)
{
case 1:
if (ftype == ::apache::thrift::protocol::T_STRUCT) {
xfer += static_cast<T*>(thrift_raw_instance)->read(in_portocol.get());
} else {
xfer += in_portocol->skip(ftype);
}
break;
default:
xfer += in_portocol->skip(ftype);
break;
}
xfer += in_portocol->readFieldEnd();
}
xfer += in_portocol->readStructEnd();
in_portocol->readMessageEnd();
in_portocol->getTransport()->readEnd();
// End thrift auto generated code
return true;
}
}
#endif //BRPC_THRIFT_UTILS_H
...@@ -61,7 +61,9 @@ ...@@ -61,7 +61,9 @@
#include "brpc/policy/nshead_mcpack_protocol.h" #include "brpc/policy/nshead_mcpack_protocol.h"
#include "brpc/policy/rtmp_protocol.h" #include "brpc/policy/rtmp_protocol.h"
#include "brpc/policy/esp_protocol.h" #include "brpc/policy/esp_protocol.h"
#include "brpc/policy/thrift_protocol.h" #ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
# include "brpc/policy/thrift_protocol.h"
#endif
#include "brpc/input_messenger.h" // get_or_new_client_side_messenger #include "brpc/input_messenger.h" // get_or_new_client_side_messenger
#include "brpc/socket_map.h" // SocketMapList #include "brpc/socket_map.h" // SocketMapList
...@@ -77,10 +79,6 @@ ...@@ -77,10 +79,6 @@
extern "C" { extern "C" {
// defined in gperftools/malloc_extension_c.h // defined in gperftools/malloc_extension_c.h
void BAIDU_WEAK MallocExtension_ReleaseFreeMemory(void); void BAIDU_WEAK MallocExtension_ReleaseFreeMemory(void);
// Register Thrift Protocol if thrift was enabled
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
void RegisterThriftProtocol();
#endif
} }
namespace brpc { namespace brpc {
...@@ -471,7 +469,15 @@ static void GlobalInitializeOrDieImpl() { ...@@ -471,7 +469,15 @@ static void GlobalInitializeOrDieImpl() {
// Use Macro is more straight forward than weak link technology(becasue of static link issue) // Use Macro is more straight forward than weak link technology(becasue of static link issue)
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL #ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
RegisterThriftProtocol(); Protocol thrift_binary_protocol = {
policy::ParseThriftMessage,
policy::SerializeThriftRequest, policy::PackThriftRequest,
policy::ProcessThriftRequest, policy::ProcessThriftResponse,
policy::VerifyThriftRequest, NULL, NULL,
CONNECTION_TYPE_POOLED_AND_SHORT, "thrift" };
if (RegisterProtocol(PROTOCOL_THRIFT, thrift_binary_protocol) != 0) {
exit(1);
}
#endif #endif
// Only valid at client side // Only valid at client side
......
...@@ -308,7 +308,6 @@ void ProcessNsheadRequest(InputMessageBase* msg_base) { ...@@ -308,7 +308,6 @@ void ProcessNsheadRequest(InputMessageBase* msg_base) {
} while (false); } while (false);
msg.reset(); // optional, just release resourse ASAP msg.reset(); // optional, just release resourse ASAP
// `socket' will be held until response has been sent
if (span) { if (span) {
span->ResetServerSpanName(service->_cached_name); span->ResetServerSpanName(service->_cached_name);
span->set_start_callback_us(butil::cpuwide_time_us()); span->set_start_callback_us(butil::cpuwide_time_us());
...@@ -376,7 +375,6 @@ void SerializeNsheadRequest(butil::IOBuf* request_buf, Controller* cntl, ...@@ -376,7 +375,6 @@ void SerializeNsheadRequest(butil::IOBuf* request_buf, Controller* cntl,
if (req_base == NULL) { if (req_base == NULL) {
return cntl->SetFailed(EREQUEST, "request is NULL"); return cntl->SetFailed(EREQUEST, "request is NULL");
} }
ControllerPrivateAccessor accessor(cntl);
if (req_base->GetDescriptor() != NsheadMessage::descriptor()) { if (req_base->GetDescriptor() != NsheadMessage::descriptor()) {
return cntl->SetFailed(EINVAL, "Type of request must be NsheadMessage"); return cntl->SetFailed(EINVAL, "Type of request must be NsheadMessage");
} }
......
...@@ -35,6 +35,7 @@ ...@@ -35,6 +35,7 @@
#include <thrift/Thrift.h> #include <thrift/Thrift.h>
#include <thrift/transport/TBufferTransports.h> #include <thrift/transport/TBufferTransports.h>
#include <thrift/protocol/TBinaryProtocol.h> #include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/TApplicationException.h>
// _THRIFT_STDCXX_H_ is defined by thrift/stdcxx.h which was added since thrift 0.11.0 // _THRIFT_STDCXX_H_ is defined by thrift/stdcxx.h which was added since thrift 0.11.0
#include <thrift/TProcessor.h> // to include stdcxx.h if present #include <thrift/TProcessor.h> // to include stdcxx.h if present
...@@ -51,75 +52,186 @@ void bthread_assign_data(void* data) __THROW; ...@@ -51,75 +52,186 @@ void bthread_assign_data(void* data) __THROW;
} }
namespace brpc { namespace brpc {
namespace policy {
static int32_t parse_thrift_method_name(const butil::IOBuf& body, std::string* method_name) { static const uint32_t MAX_THRIFT_METHOD_NAME_LENGTH = 256; // reasonably large
static const uint32_t THRIFT_HEAD_VERSION_MASK = (uint32_t)0xffffff00;
static const uint32_t THRIFT_HEAD_VERSION_1 = (uint32_t)0x80010000;
struct thrift_head_t {
uint32_t body_len;
};
// A faster implementation of TProtocol::readMessageBegin without depending
// on thrift stuff.
static butil::Status
ReadThriftMessageBegin(butil::IOBuf* body,
std::string* method_name,
::apache::thrift::protocol::TMessageType* mtype,
uint32_t* seq_id) {
// Thrift protocol format: // Thrift protocol format:
// Version + Message type + Length + Method + Sequence Id // Version + Message type + Length + Method + Sequence Id
// | | | | | // | | | | |
// 2 + 2 + 4 + >0 + 4 // 3 + 1 + 4 + >0 + 4
if (body.size() < 12) {
LOG(ERROR) << "No Enough data to get method name, request body size: " << body.size();
return -1;
}
char version_and_len_buf[8]; char version_and_len_buf[8];
size_t k = body.copy_to(version_and_len_buf, sizeof(version_and_len_buf)); size_t k = body->copy_to(version_and_len_buf, sizeof(version_and_len_buf));
if (k != sizeof(version_and_len_buf) ) { if (k != sizeof(version_and_len_buf) ) {
LOG(ERROR) << "copy "<< sizeof(version_and_len_buf) << " bytes from body failed"; return butil::Status(-1, "Fail to copy %" PRIu64 " bytes from body",
return -1; sizeof(version_and_len_buf));
}
*mtype = (apache::thrift::protocol::TMessageType)
(ntohl(*(uint32_t*)version_and_len_buf) & 0x000000FF);
const uint32_t method_name_length = ntohl(*(uint32_t*)(version_and_len_buf + 4));
if (method_name_length > MAX_THRIFT_METHOD_NAME_LENGTH) {
return butil::Status(-1, "method_name_length=%u is too long",
method_name_length);
} }
uint32_t method_name_length = ntohl(*(int32_t*)(version_and_len_buf + 4)); char buf[sizeof(version_and_len_buf) + method_name_length + 4];
k = body->cutn(buf, sizeof(buf));
if (k != sizeof(buf)) {
return butil::Status(-1, "Fail to cut %" PRIu64 " bytes", sizeof(buf));
}
method_name->assign(buf + sizeof(version_and_len_buf), method_name_length);
*seq_id = ntohl(*(uint32_t*)(buf + sizeof(version_and_len_buf) + method_name_length));
return butil::Status::OK();
}
inline size_t ThriftMessageBeginSize(const std::string& method_name) {
return 12 + method_name.size();
}
static void
WriteThriftMessageBegin(char* buf,
const std::string& method_name,
::apache::thrift::protocol::TMessageType mtype,
uint32_t seq_id) {
char* p = buf;
*(uint32_t*)p = htonl(THRIFT_HEAD_VERSION_1 | (((uint32_t)mtype) & 0x000000FF));
p += 4;
*(uint32_t*)p = htonl(method_name.size());
p += 4;
memcpy(p, method_name.data(), method_name.size());
p += method_name.size();
*p = htonl(seq_id);
}
bool ReadThriftStruct(const butil::IOBuf& body,
::apache::thrift::TBase* raw_msg,
int16_t expected_fid) {
const size_t body_len = body.size();
uint8_t* thrift_buffer = new uint8_t[body_len];
body.copy_to(thrift_buffer, body_len);
auto in_buffer =
THRIFT_STDCXX::make_shared<apache::thrift::transport::TMemoryBuffer>(
thrift_buffer, body_len,
::apache::thrift::transport::TMemoryBuffer::TAKE_OWNERSHIP);
apache::thrift::protocol::TBinaryProtocolT<apache::thrift::transport::TMemoryBuffer> iprot(in_buffer);
// The following code was taken from thrift auto generate code
std::string fname;
char fname[method_name_length]; uint32_t xfer = 0;
k = body.copy_to(fname, method_name_length, sizeof(version_and_len_buf)); ::apache::thrift::protocol::TType ftype;
if ( k != method_name_length) { int16_t fid;
LOG(ERROR) << "copy " << method_name_length << " bytes from body failed";
return -1; xfer += iprot.readStructBegin(fname);
bool success = false;
while (true) {
xfer += iprot.readFieldBegin(fname, ftype, fid);
if (ftype == ::apache::thrift::protocol::T_STOP) {
break;
}
if (fid == expected_fid) {
if (ftype == ::apache::thrift::protocol::T_STRUCT) {
xfer += raw_msg->read(&iprot);
success = true;
} else {
xfer += iprot.skip(ftype);
}
} else {
xfer += iprot.skip(ftype);
}
xfer += iprot.readFieldEnd();
} }
method_name->assign(fname, method_name_length); xfer += iprot.readStructEnd();
return sizeof(version_and_len_buf) + method_name_length; iprot.getTransport()->readEnd();
return success;
}
void ReadThriftException(const butil::IOBuf& body,
::apache::thrift::TApplicationException* x) {
size_t body_len = body.size();
uint8_t* thrift_buffer = new uint8_t[body_len];
body.copy_to(thrift_buffer, body_len);
auto in_buffer =
THRIFT_STDCXX::make_shared<apache::thrift::transport::TMemoryBuffer>(
thrift_buffer, body_len,
::apache::thrift::transport::TMemoryBuffer::TAKE_OWNERSHIP);
apache::thrift::protocol::TBinaryProtocolT<apache::thrift::transport::TMemoryBuffer> iprot(in_buffer);
x->read(&iprot);
iprot.readMessageEnd();
iprot.getTransport()->readEnd();
} }
ThriftClosure::ThriftClosure(void* additional_space) // The continuation of request processing. Namely send response back to client.
: _socket_ptr(NULL) class ThriftClosure : public google::protobuf::Closure {
, _server(NULL) public:
, _start_parse_us(0) explicit ThriftClosure();
, _do_respond(true) ~ThriftClosure();
, _additional_space(additional_space) {
// [Required] Call this to send response back to the client.
void Run() override;
// Run() is suspended before this method is called.
void AllowToRun();
private:
void DoRun();
friend void ProcessThriftRequest(InputMessageBase* msg_base);
butil::atomic<int> _run_counter;
int64_t _start_parse_us;
ThriftFramedMessage _request;
ThriftFramedMessage _response;
Controller _controller;
};
inline ThriftClosure::ThriftClosure()
: _run_counter(0), _start_parse_us(0) {
} }
ThriftClosure::~ThriftClosure() { ThriftClosure::~ThriftClosure() {
LogErrorTextAndDelete(false)(&_controller); LogErrorTextAndDelete(false)(&_controller);
} }
void ThriftClosure::DoNotRespond() { inline void ThriftClosure::AllowToRun() {
_do_respond = false; if (_run_counter.fetch_add(1, butil::memory_order_relaxed) == 1) {
DoRun();
}
} }
class DeleteThriftClosure { void ThriftClosure::Run() {
public: if (_run_counter.fetch_add(1, butil::memory_order_relaxed) == 1) {
void operator()(ThriftClosure* done) const { DoRun();
done->~ThriftClosure();
free(done);
} }
}; }
void ThriftClosure::Run() { void ThriftClosure::DoRun() {
// Recycle itself after `Run' // Recycle itself after `Run'
std::unique_ptr<ThriftClosure, DeleteThriftClosure> recycle_ctx(this); std::unique_ptr<ThriftClosure> recycle_ctx(this);
SocketUniquePtr sock(_socket_ptr); const Server* server = _controller.server();
ScopedRemoveConcurrency remove_concurrency_dummy(_server, &_controller); ScopedRemoveConcurrency remove_concurrency_dummy(server, &_controller);
ControllerPrivateAccessor accessor(&_controller); ControllerPrivateAccessor accessor(&_controller);
Span* span = accessor.span(); Span* span = accessor.span();
if (span) { if (span) {
span->set_start_send_us(butil::cpuwide_time_us()); span->set_start_send_us(butil::cpuwide_time_us());
} }
ScopedMethodStatus method_status(_server->options().thrift_service->_status); Socket* sock = accessor.get_sending_socket();
ScopedMethodStatus method_status(server->options().thrift_service ?
server->options().thrift_service->_status : NULL);
if (!method_status) { if (!method_status) {
// Judge errors belongings. // Judge errors belongings.
// may not be accurate, but it does not matter too much. // may not be accurate, but it does not matter too much.
...@@ -130,89 +242,101 @@ void ThriftClosure::Run() { ...@@ -130,89 +242,101 @@ void ThriftClosure::Run() {
error_code == ECLOSE || error_code == ECLOSE ||
error_code == ELOGOFF || error_code == ELOGOFF ||
error_code == ELIMIT) { error_code == ELIMIT) {
ServerPrivateAccessor(_server).AddError(); ServerPrivateAccessor(server).AddError();
} }
} }
if (_controller.IsCloseConnection()) { if (_controller.IsCloseConnection() ||
// seq_id is not read yet, no valid response can be sent back
!_controller.has_log_id()) {
sock->SetFailed(); sock->SetFailed();
return; return;
} }
if (_do_respond) { const std::string& method_name = _controller.thrift_method_name();
// response uses request's head as default. if (method_name.empty() || method_name[0] == ' ') {
_response.head = _request.head; _controller.SetFailed(ENOMETHOD, "Invalid thrift_method_name!");
}
if (_response.thrift_raw_instance) { if (method_name.size() > MAX_THRIFT_METHOD_NAME_LENGTH) {
const std::string& method_name = _controller.thrift_method_name(); _controller.SetFailed(ENOMETHOD, "thrift_method_name is too long");
if (method_name == "" || }
method_name.length() < 1 || if (_controller.log_id() > (uint64_t)0xffffffff) {
method_name[0] == ' ') { _controller.SetFailed(ERESPONSE, "Invalid thrift seq_id=%" PRIu64,
_controller.SetFailed(ENOMETHOD, _controller.log_id());
"invalid thrift method name or method name empty in server!"); }
return; const uint32_t seq_id = (uint32_t)_controller.log_id();
}
auto out_buffer =
THRIFT_STDCXX::make_shared<apache::thrift::transport::TMemoryBuffer>();
auto oprot =
THRIFT_STDCXX::make_shared<apache::thrift::protocol::TBinaryProtocol>(out_buffer);
// The following code was taken and modified from thrift auto generated code
oprot->writeMessageBegin(method_name,
::apache::thrift::protocol::T_REPLY, _request.thrift_message_seq_id);
uint32_t xfer = 0;
xfer += oprot->writeStructBegin("placeholder");
xfer += oprot->writeFieldBegin("success",
::apache::thrift::protocol::T_STRUCT, 0);
if (_response.thrift_raw_instance && _response.thrift_raw_instance_writer) {
xfer += _response.thrift_raw_instance_writer(
_response.thrift_raw_instance, oprot.get());
} else {
_controller.SetFailed(ERESPONSE, "thrift_raw_instance or"
"thrift_raw_instance_writer is null!");
}
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
oprot->writeMessageEnd(); butil::IOBuf write_buf;
oprot->getTransport()->writeEnd();
oprot->getTransport()->flush();
// End thrfit auto generated code
uint8_t* buf; // The following code was taken and modified from thrift auto generated code
uint32_t sz; if (_controller.Failed()) {
out_buffer->getBuffer(&buf, &sz); auto out_buffer =
_response.body.append(buf, sz); THRIFT_STDCXX::make_shared<apache::thrift::transport::TMemoryBuffer>();
} apache::thrift::protocol::TBinaryProtocolT<apache::thrift::transport::TMemoryBuffer> oprot(out_buffer);
::apache::thrift::TApplicationException x(_controller.ErrorText());
oprot.writeMessageBegin(
method_name, ::apache::thrift::protocol::T_EXCEPTION, seq_id);
x.write(&oprot);
oprot.writeMessageEnd();
oprot.getTransport()->writeEnd();
oprot.getTransport()->flush();
uint8_t* buf;
uint32_t sz;
out_buffer->getBuffer(&buf, &sz);
const thrift_head_t head = { htonl(sz) };
write_buf.append(&head, sizeof(head));
write_buf.append(buf, sz);
} else if (_response.raw_instance()) {
auto out_buffer =
THRIFT_STDCXX::make_shared<apache::thrift::transport::TMemoryBuffer>();
apache::thrift::protocol::TBinaryProtocolT<apache::thrift::transport::TMemoryBuffer> oprot(out_buffer);
oprot.writeMessageBegin(
method_name, ::apache::thrift::protocol::T_REPLY, seq_id);
uint32_t length = _response.body.length(); uint32_t xfer = 0;
_response.head.body_len = htonl(length); xfer += oprot.writeStructBegin("rpc_result"); // can be any valid name
xfer += oprot.writeFieldBegin("success",
if (span) { ::apache::thrift::protocol::T_STRUCT,
int response_size = sizeof(thrift_head_t) + _response.head.body_len; THRIFT_RESPONSE_FID);
span->set_response_size(response_size); xfer += _response.raw_instance()->write(&oprot);
} xfer += oprot.writeFieldEnd();
butil::IOBuf write_buf; xfer += oprot.writeFieldStop();
write_buf.append(&_response.head, sizeof(thrift_head_t)); xfer += oprot.writeStructEnd();
oprot.writeMessageEnd();
oprot.getTransport()->writeEnd();
oprot.getTransport()->flush();
uint8_t* buf;
uint32_t sz;
out_buffer->getBuffer(&buf, &sz);
const thrift_head_t head = { htonl(sz) };
write_buf.append(&head, sizeof(head));
write_buf.append(buf, sz);
} else {
const size_t mb_size = ThriftMessageBeginSize(method_name);
char buf[sizeof(thrift_head_t) + mb_size];
((thrift_head_t*)buf)->body_len = htonl(mb_size + _response.body.size());
WriteThriftMessageBegin(buf + sizeof(thrift_head_t), method_name,
::apache::thrift::protocol::T_REPLY, seq_id);
write_buf.append(buf, sizeof(buf));
write_buf.append(_response.body.movable()); write_buf.append(_response.body.movable());
// Have the risk of unlimited pending responses, in which case, tell }
// users to set max_concurrency.
Socket::WriteOptions wopt; if (span) {
wopt.ignore_eovercrowded = true; span->set_response_size(write_buf.size());
if (sock->Write(&write_buf, &wopt) != 0) { }
const int errcode = errno; // Have the risk of unlimited pending responses, in which case, tell
PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock; // users to set max_concurrency.
_controller.SetFailed(errcode, "Fail to write into %s", Socket::WriteOptions wopt;
sock->description().c_str()); wopt.ignore_eovercrowded = true;
return; if (sock->Write(&write_buf, &wopt) != 0) {
} const int errcode = errno;
PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
_controller.SetFailed(errcode, "Fail to write into %s",
sock->description().c_str());
return;
} }
if (span) { if (span) {
...@@ -221,57 +345,59 @@ void ThriftClosure::Run() { ...@@ -221,57 +345,59 @@ void ThriftClosure::Run() {
} }
if (method_status) { if (method_status) {
method_status.release()->OnResponded( method_status.release()->OnResponded(
!_controller.Failed(), butil::cpuwide_time_us() - cpuwide_start_us()); !_controller.Failed(), butil::cpuwide_time_us() - _start_parse_us);
} }
} }
void ThriftClosure::SetMethodName(const std::string& full_method_name) {
ControllerPrivateAccessor accessor(&_controller);
Span* span = accessor.span();
if (span) {
span->ResetServerSpanName(full_method_name);
}
}
namespace policy {
ParseResult ParseThriftMessage(butil::IOBuf* source, ParseResult ParseThriftMessage(butil::IOBuf* source,
Socket*, bool /*read_eof*/, const void* /*arg*/) { Socket*, bool /*read_eof*/, const void* /*arg*/) {
char header_buf[sizeof(thrift_head_t) + 3]; char header_buf[sizeof(thrift_head_t) + 4];
const size_t n = source->copy_to(header_buf, sizeof(thrift_head_t) + 3); const size_t n = source->copy_to(header_buf, sizeof(header_buf));
if (n < sizeof(header_buf)) {
if (n < 7) {
return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
} }
const void* dummy = header_buf + sizeof(thrift_head_t); const uint32_t sz = ntohl(*(uint32_t*)(header_buf + sizeof(thrift_head_t)));
const int32_t sz = ntohl(*(int32_t*)dummy); uint32_t version = sz & THRIFT_HEAD_VERSION_MASK;
int32_t version = sz & THRIFT_HEAD_VERSION_MASK;
if (version != THRIFT_HEAD_VERSION_1) { if (version != THRIFT_HEAD_VERSION_1) {
RPC_VLOG << "magic_num=" << version RPC_VLOG << "version=" << version
<< " doesn't match THRIFT_MAGIC_NUM=" << THRIFT_HEAD_VERSION_1; << " doesn't match THRIFT_VERSION=" << THRIFT_HEAD_VERSION_1;
return MakeParseError(PARSE_ERROR_TRY_OTHERS); return MakeParseError(PARSE_ERROR_TRY_OTHERS);
} }
thrift_head_t* thrift = (thrift_head_t *)header_buf; const uint32_t body_len = ntohl(((thrift_head_t*)header_buf)->body_len);
thrift->body_len = ntohl(thrift->body_len);
uint32_t body_len = thrift->body_len;
if (body_len > FLAGS_max_body_size) { if (body_len > FLAGS_max_body_size) {
return MakeParseError(PARSE_ERROR_TOO_BIG_DATA); return MakeParseError(PARSE_ERROR_TOO_BIG_DATA);
} else if (source->length() < sizeof(header_buf) + body_len - 3) { } else if (source->length() < sizeof(thrift_head_t) + body_len) {
return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
} }
policy::MostCommonMessage* msg = policy::MostCommonMessage::Get(); MostCommonMessage* msg = MostCommonMessage::Get();
source->cutn(&msg->meta, sizeof(thrift_head_t)); source->pop_front(sizeof(thrift_head_t));
source->cutn(&msg->payload, body_len); source->cutn(&msg->payload, body_len);
return MakeMessage(msg); return MakeMessage(msg);
} }
inline void ProcessThriftFramedRequestNoExcept(ThriftService* service,
Controller* cntl,
ThriftFramedMessage* req,
ThriftFramedMessage* res,
ThriftClosure* done) {
// NOTE: done is not actually run before AllowToRun() is called so that
// we can still set `cntl' in the catch branch.
try {
service->ProcessThriftFramedRequest(cntl, req, res, done);
} catch (::apache::thrift::TException& e) {
cntl->SetFailed(EINTERNAL, "Catched exception: %s", e.what());
} catch (...) {
cntl->SetFailed(EINTERNAL, "Catched unknown exception");
}
done->AllowToRun();
}
struct CallMethodInBackupThreadArgs { struct CallMethodInBackupThreadArgs {
ThriftService* service; ThriftService* service;
const Server* server;
Controller* controller; Controller* controller;
ThriftFramedMessage* request; ThriftFramedMessage* request;
ThriftFramedMessage* response; ThriftFramedMessage* response;
...@@ -280,30 +406,21 @@ struct CallMethodInBackupThreadArgs { ...@@ -280,30 +406,21 @@ struct CallMethodInBackupThreadArgs {
static void CallMethodInBackupThread(void* void_args) { static void CallMethodInBackupThread(void* void_args) {
CallMethodInBackupThreadArgs* args = (CallMethodInBackupThreadArgs*)void_args; CallMethodInBackupThreadArgs* args = (CallMethodInBackupThreadArgs*)void_args;
ProcessThriftFramedRequestNoExcept(args->service,
std::string method_name; args->controller,
if (parse_thrift_method_name(args->request->body, &method_name) < 0) { args->request,
LOG(ERROR) << "Fail to get thrift method name"; args->response,
delete args; args->done);
return;
}
args->controller->set_thrift_method_name(method_name);
args->service->ProcessThriftFramedRequest(*args->server, args->controller,
args->request, args->response,
args->done);
delete args; delete args;
} }
static void EndRunningCallMethodInPool(ThriftService* service, static void EndRunningCallMethodInPool(ThriftService* service,
const Server& server,
Controller* controller, Controller* controller,
ThriftFramedMessage* request, ThriftFramedMessage* request,
ThriftFramedMessage* response, ThriftFramedMessage* response,
ThriftClosure* done) { ThriftClosure* done) {
CallMethodInBackupThreadArgs* args = new CallMethodInBackupThreadArgs; CallMethodInBackupThreadArgs* args = new CallMethodInBackupThreadArgs;
args->service = service; args->service = service;
args->server = &server;
args->controller = controller; args->controller = controller;
args->request = request; args->request = request;
args->response = response; args->response = response;
...@@ -312,71 +429,64 @@ static void EndRunningCallMethodInPool(ThriftService* service, ...@@ -312,71 +429,64 @@ static void EndRunningCallMethodInPool(ThriftService* service,
}; };
void ProcessThriftRequest(InputMessageBase* msg_base) { void ProcessThriftRequest(InputMessageBase* msg_base) {
const int64_t start_parse_us = butil::cpuwide_time_us(); const int64_t start_parse_us = butil::cpuwide_time_us();
DestroyingPtr<MostCommonMessage> msg(static_cast<MostCommonMessage*>(msg_base)); DestroyingPtr<MostCommonMessage> msg(static_cast<MostCommonMessage*>(msg_base));
SocketUniquePtr socket(msg->ReleaseSocket()); SocketUniquePtr socket_guard(msg->ReleaseSocket());
Socket* socket = socket_guard.get();
const Server* server = static_cast<const Server*>(msg_base->arg()); const Server* server = static_cast<const Server*>(msg_base->arg());
ScopedNonServiceError non_service_error(server); ScopedNonServiceError non_service_error(server);
char buf[sizeof(thrift_head_t)]; ThriftClosure* thrift_done = new ThriftClosure;
const char *p = (const char *)msg->meta.fetch(buf, sizeof(buf));
thrift_head_t *req_head = (thrift_head_t *)p;
req_head->body_len = ntohl(req_head->body_len);
ThriftService* service = server->options().thrift_service;
if (service == NULL) {
LOG_EVERY_SECOND(WARNING)
<< "Received thrift request however the server does not set"
" ServerOptions.thrift_service, close the connection.";
socket->SetFailed();
return;
}
void* space = malloc(sizeof(ThriftClosure) + service->_additional_space);
if (!space) {
LOG(FATAL) << "Fail to new ThriftClosure";
socket->SetFailed();
return;
}
// Switch to service-specific error.
non_service_error.release();
MethodStatus* method_status = service->_status;
if (method_status) {
CHECK(method_status->OnRequested());
}
void* sub_space = NULL;
if (service->_additional_space) {
sub_space = (char*)space + sizeof(ThriftClosure);
}
ThriftClosure* thrift_done = new (space) ThriftClosure(sub_space);
Controller* cntl = &(thrift_done->_controller); Controller* cntl = &(thrift_done->_controller);
ThriftFramedMessage* req = &(thrift_done->_request); ThriftFramedMessage* req = &(thrift_done->_request);
ThriftFramedMessage* res = &(thrift_done->_response); ThriftFramedMessage* res = &(thrift_done->_response);
req->head = *req_head;
msg->payload.swap(req->body);
thrift_done->_start_parse_us = start_parse_us; thrift_done->_start_parse_us = start_parse_us;
thrift_done->_socket_ptr = socket.get();
thrift_done->_server = server;
ServerPrivateAccessor server_accessor(server); ServerPrivateAccessor server_accessor(server);
ControllerPrivateAccessor accessor(cntl);
const bool security_mode = server->options().security_mode() && const bool security_mode = server->options().security_mode() &&
socket->user() == server_accessor.acceptor(); socket->user() == server_accessor.acceptor();
// Initialize log_id with the log_id in thrift. Notice that the protocols ControllerPrivateAccessor accessor(cntl);
// on top of ThriftService may pack log_id in meta or user messages and
// overwrite the value.
//cntl->set_log_id(req_head->log_id);
accessor.set_server(server) accessor.set_server(server)
.set_security_mode(security_mode) .set_security_mode(security_mode)
.set_peer_id(socket->id()) .set_peer_id(socket->id())
.set_remote_side(socket->remote_side()) .set_remote_side(socket->remote_side())
.set_local_side(socket->local_side()) .set_local_side(socket->local_side())
.set_request_protocol(PROTOCOL_THRIFT); .set_request_protocol(PROTOCOL_THRIFT)
.move_in_server_receiving_sock(socket_guard);
uint32_t seq_id;
::apache::thrift::protocol::TMessageType mtype;
butil::Status st = ReadThriftMessageBegin(
&msg->payload, accessor.mutable_thrift_method_name(), &mtype, &seq_id);
if (!st.ok()) {
cntl->SetFailed(EREQUEST, "%s", st.error_cstr());
return thrift_done->Run();
}
msg->payload.swap(req->body);
req->field_id = THRIFT_REQUEST_FID;
cntl->set_log_id(seq_id); // Pass seq_id by log_id
ThriftService* service = server->options().thrift_service;
if (service == NULL) {
LOG_EVERY_SECOND(ERROR)
<< "Received thrift request however the server does not set"
" ServerOptions.thrift_service, close the connection.";
cntl->SetFailed(EINTERNAL, "ServerOptions.thrift_service is NULL");
return thrift_done->Run();
}
// Switch to service-specific error.
non_service_error.release();
MethodStatus* method_status = service->_status;
if (method_status) {
if (!method_status->OnRequested()) {
cntl->SetFailed(ELIMIT, "Reached %s's max_concurrency=%d",
cntl->thrift_method_name().c_str(),
method_status->max_concurrency());
return thrift_done->Run();
}
}
// Tag the bthread with this server's key for thread_local_data(). // Tag the bthread with this server's key for thread_local_data().
if (server->thread_local_options().thread_local_data_factory) { if (server->thread_local_options().thread_local_data_factory) {
...@@ -387,12 +497,12 @@ void ProcessThriftRequest(InputMessageBase* msg_base) { ...@@ -387,12 +497,12 @@ void ProcessThriftRequest(InputMessageBase* msg_base) {
if (IsTraceable(false)) { if (IsTraceable(false)) {
span = Span::CreateServerSpan(0, 0, 0, msg->base_real_us()); span = Span::CreateServerSpan(0, 0, 0, msg->base_real_us());
accessor.set_span(span); accessor.set_span(span);
//span->set_log_id(req_head->log_id); span->set_log_id(seq_id);
span->set_remote_side(cntl->remote_side()); span->set_remote_side(cntl->remote_side());
span->set_protocol(PROTOCOL_THRIFT); span->set_protocol(PROTOCOL_THRIFT);
span->set_received_us(msg->received_us()); span->set_received_us(msg->received_us());
span->set_start_parse_us(start_parse_us); span->set_start_parse_us(start_parse_us);
span->set_request_size(sizeof(thrift_head_t) + req_head->body_len); span->set_request_size(sizeof(thrift_head_t) + req->body.size());
} }
do { do {
...@@ -400,6 +510,11 @@ void ProcessThriftRequest(InputMessageBase* msg_base) { ...@@ -400,6 +510,11 @@ void ProcessThriftRequest(InputMessageBase* msg_base) {
cntl->SetFailed(ELOGOFF, "Server is stopping"); cntl->SetFailed(ELOGOFF, "Server is stopping");
break; break;
} }
if (socket->is_overcrowded()) {
cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
butil::endpoint2str(socket->remote_side()).c_str());
break;
}
if (!server_accessor.AddConcurrency(cntl)) { if (!server_accessor.AddConcurrency(cntl)) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d", cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
server->options().max_concurrency); server->options().max_concurrency);
...@@ -413,47 +528,22 @@ void ProcessThriftRequest(InputMessageBase* msg_base) { ...@@ -413,47 +528,22 @@ void ProcessThriftRequest(InputMessageBase* msg_base) {
} while (false); } while (false);
msg.reset(); // optional, just release resourse ASAP msg.reset(); // optional, just release resourse ASAP
// `socket' will be held until response has been sent
socket.release();
if (span) { if (span) {
span->ResetServerSpanName(service->_cached_name); span->ResetServerSpanName(cntl->thrift_method_name());
span->set_start_callback_us(butil::cpuwide_time_us()); span->set_start_callback_us(butil::cpuwide_time_us());
span->AsParent(); span->AsParent();
} }
std::string method_name;
if (parse_thrift_method_name(req->body, &method_name) < 0) {
cntl->SetFailed(EREQUEST, "Fail to get thrift method name!");
return;
}
cntl->set_thrift_method_name(method_name);
if (!FLAGS_usercode_in_pthread) { if (!FLAGS_usercode_in_pthread) {
try { return ProcessThriftFramedRequestNoExcept(service, cntl, req, res, thrift_done);
return service->ProcessThriftFramedRequest(*server, cntl,
req, res, thrift_done);
} catch (::apache::thrift::TException& e) {
cntl->SetFailed(EREQUEST, "Invalid request data, reason: %s", e.what());
} catch (...) {
cntl->SetFailed(EINTERNAL, "Internal server error!");
}
} }
if (BeginRunningUserCode()) { if (BeginRunningUserCode()) {
try { ProcessThriftFramedRequestNoExcept(service, cntl, req, res, thrift_done);
service->ProcessThriftFramedRequest(*server, cntl, req, res, thrift_done);
} catch (::apache::thrift::TException& e) {
cntl->SetFailed(EREQUEST, "Invalid request data, reason: %s", e.what());
} catch (...) {
cntl->SetFailed(EINTERNAL, "Internal server error!");
}
return EndRunningUserCodeInPlace(); return EndRunningUserCodeInPlace();
} else { } else {
return EndRunningCallMethodInPool( return EndRunningCallMethodInPool(service, cntl, req, res, thrift_done);
service, *server, cntl, req, res, thrift_done);
} }
} }
void ProcessThriftResponse(InputMessageBase* msg_base) { void ProcessThriftResponse(InputMessageBase* msg_base) {
...@@ -479,98 +569,53 @@ void ProcessThriftResponse(InputMessageBase* msg_base) { ...@@ -479,98 +569,53 @@ void ProcessThriftResponse(InputMessageBase* msg_base) {
span->set_start_parse_us(start_parse_us); span->set_start_parse_us(start_parse_us);
} }
// MUST be ThriftFramedMessage (checked in SerializeThriftRequest)
ThriftFramedMessage* response = (ThriftFramedMessage*)cntl->response();
const int saved_error = cntl->ErrorCode(); const int saved_error = cntl->ErrorCode();
if (response != NULL) { do {
msg->meta.copy_to(&response->head, sizeof(thrift_head_t));
response->head.body_len = ntohl(response->head.body_len);
msg->payload.swap(response->body);
uint32_t body_len = response->head.body_len;
// Deserialize binary message to thrift message
std::unique_ptr<uint8_t[]>thrift_buffer(new uint8_t[body_len]);
const size_t k = response->body.copy_to(thrift_buffer.get(), body_len);
if ( k != body_len) {
cntl->SetFailed("copy response body to thrift buffer failed!");
return;
}
auto in_buffer =
THRIFT_STDCXX::make_shared<apache::thrift::transport::TMemoryBuffer>();
auto in_portocol =
THRIFT_STDCXX::make_shared<apache::thrift::protocol::TBinaryProtocol>(in_buffer);
in_buffer->resetBuffer(thrift_buffer.get(), body_len);
// The following code was taken from thrift auto generate code // The following code was taken from thrift auto generate code
int32_t rseqid = 0;
std::string fname; std::string fname;
::apache::thrift::protocol::TMessageType mtype; ::apache::thrift::protocol::TMessageType mtype;
uint32_t seq_id = 0; // unchecked
in_portocol->readMessageBegin(fname, mtype, rseqid); butil::Status st = ReadThriftMessageBegin(&msg->payload, &fname, &mtype, &seq_id);
if (!st.ok()) {
cntl->SetFailed(ERESPONSE, "%s", st.error_cstr());
break;
}
if (mtype == ::apache::thrift::protocol::T_EXCEPTION) { if (mtype == ::apache::thrift::protocol::T_EXCEPTION) {
cntl->SetFailed("thrift process server response exception!"); ::apache::thrift::TApplicationException x;
return; ReadThriftException(msg->payload, &x);
// TODO: Convert exception type to brpc errors.
cntl->SetFailed(x.what());
break;
} }
if (mtype != ::apache::thrift::protocol::T_REPLY) { if (mtype != ::apache::thrift::protocol::T_REPLY) {
in_portocol->skip(::apache::thrift::protocol::T_STRUCT); cntl->SetFailed(ERESPONSE, "message_type is not T_REPLY");
in_portocol->readMessageEnd(); break;
in_portocol->getTransport()->readEnd();
}
if (fname.compare(cntl->thrift_method_name()) != 0) {
in_portocol->skip(::apache::thrift::protocol::T_STRUCT);
in_portocol->readMessageEnd();
in_portocol->getTransport()->readEnd();
} }
if (fname != cntl->thrift_method_name()) {
// presult section cntl->SetFailed(ERESPONSE,
apache::thrift::protocol::TInputRecursionTracker tracker(*in_portocol); "response.method_name=%s does not match request.method_name=%s",
uint32_t xfer = 0; fname.c_str(), cntl->thrift_method_name().c_str());
::apache::thrift::protocol::TType ftype;
int16_t fid;
xfer += in_portocol->readStructBegin(fname);
using ::apache::thrift::protocol::TProtocolException;
bool success = false;
while (true)
{
xfer += in_portocol->readFieldBegin(fname, ftype, fid);
if (ftype == ::apache::thrift::protocol::T_STOP) {
break; break;
}
switch (fid)
{
case 0:
if (ftype == ::apache::thrift::protocol::T_STRUCT) {
xfer += response->read(in_portocol.get());
success = true;
} else {
xfer += in_portocol->skip(ftype);
}
break;
default:
xfer += in_portocol->skip(ftype);
break;
}
xfer += in_portocol->readFieldEnd();
} }
xfer += in_portocol->readStructEnd();
// end presult section
in_portocol->readMessageEnd(); // Read presult
in_portocol->getTransport()->readEnd();
if (!success) { // MUST be ThriftFramedMessage (checked in SerializeThriftRequest)
cntl->SetFailed("thrift process server response exception!"); ThriftFramedMessage* response = (ThriftFramedMessage*)cntl->response();
return; if (response) {
} if (response->raw_instance()) {
if (!ReadThriftStruct(msg->payload, response->raw_instance(),
} // else just ignore the response. THRIFT_RESPONSE_FID)) {
cntl->SetFailed(ERESPONSE, "Fail to read presult");
break;
}
} else {
msg->payload.swap(response->body);
response->field_id = THRIFT_RESPONSE_FID;
}
} // else just ignore the response.
} while (false);
// Unlocks correlation_id inside. Revert controller's // Unlocks correlation_id inside. Revert controller's
// error code if it version check of `cid' fails // error code if it version check of `cid' fails
...@@ -592,68 +637,76 @@ void SerializeThriftRequest(butil::IOBuf* request_buf, Controller* cntl, ...@@ -592,68 +637,76 @@ void SerializeThriftRequest(butil::IOBuf* request_buf, Controller* cntl,
if (req_base == NULL) { if (req_base == NULL) {
return cntl->SetFailed(EREQUEST, "request is NULL"); return cntl->SetFailed(EREQUEST, "request is NULL");
} }
ControllerPrivateAccessor accessor(cntl); if (req_base->GetDescriptor() != ThriftFramedMessage::descriptor()) {
return cntl->SetFailed(EINVAL, "Type of request must be ThriftFramedMessage");
const ThriftFramedMessage* req = (const ThriftFramedMessage*)req_base; }
if (cntl->response() != NULL &&
thrift_head_t head = req->head; cntl->response()->GetDescriptor() != ThriftFramedMessage::descriptor()) {
return cntl->SetFailed(EINVAL, "Type of response must be ThriftFramedMessage");
auto out_buffer = }
THRIFT_STDCXX::make_shared<apache::thrift::transport::TMemoryBuffer>();
auto out_portocol =
THRIFT_STDCXX::make_shared<apache::thrift::protocol::TBinaryProtocol>(out_buffer);
std::string thrift_method_name = cntl->thrift_method_name(); const std::string& method_name = cntl->thrift_method_name();
// we should do more check on the thrift method name, but since it is rare when // we should do more check on the thrift method name, but since it is rare when
// the method_name is just some white space or something else // the method_name is just some white space or something else
if (cntl->thrift_method_name() == "" || if (method_name.empty() || method_name[0] == ' ') {
cntl->thrift_method_name().length() < 1 || return cntl->SetFailed(ENOMETHOD, "Invalid thrift_method_name!");
cntl->thrift_method_name()[0] == ' ') { }
return cntl->SetFailed(ENOMETHOD, if (method_name.size() > MAX_THRIFT_METHOD_NAME_LENGTH) {
"invalid thrift method name or method name empty!"); return cntl->SetFailed(ENOMETHOD, "thrift_method_name is too long");
} }
// The following code was taken from thrift auto generated code const ThriftFramedMessage* req = (const ThriftFramedMessage*)req_base;
// send_xxx
int32_t cseqid = 0;
out_portocol->writeMessageBegin(thrift_method_name,
::apache::thrift::protocol::T_CALL, cseqid);
// xxx_pargs write // xxx_pargs write
uint32_t xfer = 0; if (req->raw_instance()) {
apache::thrift::protocol::TOutputRecursionTracker tracker(*out_portocol); auto out_buffer =
THRIFT_STDCXX::make_shared<apache::thrift::transport::TMemoryBuffer>();
std::string struct_begin_str = "ThriftService_" + thrift_method_name + "_pargs"; apache::thrift::protocol::TBinaryProtocolT<apache::thrift::transport::TMemoryBuffer> oprot(out_buffer);
xfer += out_portocol->writeStructBegin(struct_begin_str.c_str());
xfer += out_portocol->writeFieldBegin("request", ::apache::thrift::protocol::T_STRUCT, 1);
// request's write
ThriftFramedMessage* r = const_cast<ThriftFramedMessage*>(req);
xfer += r->write(out_portocol.get());
// end request's write
xfer += out_portocol->writeFieldEnd();
xfer += out_portocol->writeFieldStop();
xfer += out_portocol->writeStructEnd();
// end xxx_pargs write
out_portocol->writeMessageEnd(); oprot.writeMessageBegin(
out_portocol->getTransport()->writeEnd(); method_name, ::apache::thrift::protocol::T_CALL, 0/*seq_id*/);
out_portocol->getTransport()->flush();
// end send_xxx
// end thrift auto generated code
uint8_t* buf; uint32_t xfer = 0;
uint32_t sz; char struct_begin_str[32 + method_name.size()];
out_buffer->getBuffer(&buf, &sz); char* p = struct_begin_str;
memcpy(p, "ThriftService_", 14);
p += 14;
memcpy(p, method_name.data(), method_name.size());
p += method_name.size();
memcpy(p, "_pargs", 6);
p += 6;
*p = '\0';
xfer += oprot.writeStructBegin(struct_begin_str);
xfer += oprot.writeFieldBegin("request", ::apache::thrift::protocol::T_STRUCT,
THRIFT_REQUEST_FID);
// request's write
xfer += req->raw_instance()->write(&oprot);
xfer += oprot.writeFieldEnd();
xfer += oprot.writeFieldStop();
xfer += oprot.writeStructEnd();
head.body_len = ntohl(sz); oprot.writeMessageEnd();
request_buf->append(&head, sizeof(head)); oprot.getTransport()->writeEnd();
// end auto generate code oprot.getTransport()->flush();
request_buf->append(buf, sz); uint8_t* buf;
uint32_t sz;
out_buffer->getBuffer(&buf, &sz);
const thrift_head_t head = { htonl(sz) };
request_buf->append(&head, sizeof(head));
request_buf->append(buf, sz);
} else {
const size_t mb_size = ThriftMessageBeginSize(method_name);
char buf[sizeof(thrift_head_t) + mb_size];
((thrift_head_t*)buf)->body_len = htonl(mb_size + req->body.size());
WriteThriftMessageBegin(buf + sizeof(thrift_head_t), method_name,
::apache::thrift::protocol::T_CALL, 0/*seq_id*/);
request_buf->append(buf, sizeof(buf));
request_buf->append(req->body);
}
} }
void PackThriftRequest( void PackThriftRequest(
...@@ -686,19 +739,3 @@ void PackThriftRequest( ...@@ -686,19 +739,3 @@ void PackThriftRequest(
} // namespace policy } // namespace policy
} // namespace brpc } // namespace brpc
extern "C" {
void RegisterThriftProtocol() {
brpc::Protocol thrift_binary_protocol = {brpc::policy::ParseThriftMessage,
brpc::policy::SerializeThriftRequest, brpc::policy::PackThriftRequest,
brpc::policy::ProcessThriftRequest, brpc::policy::ProcessThriftResponse,
brpc::policy::VerifyThriftRequest, NULL, NULL,
brpc::CONNECTION_TYPE_POOLED_AND_SHORT, "thrift" };
if (brpc::RegisterProtocol(brpc::PROTOCOL_THRIFT, thrift_binary_protocol) != 0) {
exit(1);
}
}
}
...@@ -19,9 +19,7 @@ ...@@ -19,9 +19,7 @@
#include <algorithm> #include <algorithm>
#include "butil/logging.h" #include "butil/logging.h"
#include "brpc/details/controller_private_accessor.h"
#include <brpc/protocol.h> // RegisterProtocol
#include <brpc/policy/thrift_protocol.h>
#include <google/protobuf/stubs/once.h> #include <google/protobuf/stubs/once.h>
#include <google/protobuf/io/coded_stream.h> #include <google/protobuf/io/coded_stream.h>
...@@ -30,9 +28,7 @@ ...@@ -30,9 +28,7 @@
#include <google/protobuf/reflection_ops.h> #include <google/protobuf/reflection_ops.h>
#include <google/protobuf/wire_format.h> #include <google/protobuf/wire_format.h>
namespace brpc { namespace brpc {
BAIDU_CASSERT(sizeof(thrift_head_t) == 4, sizeof_thrift_must_be_4);
namespace { namespace {
const ::google::protobuf::Descriptor* ThriftFramedMessage_descriptor_ = NULL; const ::google::protobuf::Descriptor* ThriftFramedMessage_descriptor_ = NULL;
...@@ -43,7 +39,7 @@ void protobuf_AssignDesc_baidu_2frpc_2fthrift_framed_5fmessage_2eproto() { ...@@ -43,7 +39,7 @@ void protobuf_AssignDesc_baidu_2frpc_2fthrift_framed_5fmessage_2eproto() {
protobuf_AddDesc_baidu_2frpc_2fthrift_framed_5fmessage_2eproto(); protobuf_AddDesc_baidu_2frpc_2fthrift_framed_5fmessage_2eproto();
const ::google::protobuf::FileDescriptor* file = const ::google::protobuf::FileDescriptor* file =
::google::protobuf::DescriptorPool::generated_pool()->FindFileByName( ::google::protobuf::DescriptorPool::generated_pool()->FindFileByName(
"baidu/rpc/thrift_framed_message.proto"); "thrift_framed_message.proto");
GOOGLE_CHECK(file != NULL); GOOGLE_CHECK(file != NULL);
ThriftFramedMessage_descriptor_ = file->message_type(0); ThriftFramedMessage_descriptor_ = file->message_type(0);
} }
...@@ -114,23 +110,16 @@ ThriftFramedMessage::ThriftFramedMessage() ...@@ -114,23 +110,16 @@ ThriftFramedMessage::ThriftFramedMessage()
void ThriftFramedMessage::InitAsDefaultInstance() { void ThriftFramedMessage::InitAsDefaultInstance() {
} }
ThriftFramedMessage::ThriftFramedMessage(const ThriftFramedMessage& from)
: ::google::protobuf::Message() {
SharedCtor();
MergeFrom(from);
}
void ThriftFramedMessage::SharedCtor() { void ThriftFramedMessage::SharedCtor() {
memset(&head, 0, sizeof(head)); field_id = THRIFT_INVALID_FID;
thrift_raw_instance_deleter = nullptr; _own_raw_instance = false;
thrift_raw_instance = nullptr; _raw_instance = nullptr;
thrift_message_seq_id = 0;
} }
ThriftFramedMessage::~ThriftFramedMessage() { ThriftFramedMessage::~ThriftFramedMessage() {
SharedDtor(); SharedDtor();
if (thrift_raw_instance && thrift_raw_instance_deleter) { if (_own_raw_instance) {
thrift_raw_instance_deleter(thrift_raw_instance); delete _raw_instance;
} }
} }
...@@ -157,8 +146,12 @@ ThriftFramedMessage* ThriftFramedMessage::New() const { ...@@ -157,8 +146,12 @@ ThriftFramedMessage* ThriftFramedMessage::New() const {
} }
void ThriftFramedMessage::Clear() { void ThriftFramedMessage::Clear() {
memset(&head, 0, sizeof(head));
body.clear(); body.clear();
if (_own_raw_instance) {
delete _raw_instance;
_own_raw_instance = false;
_raw_instance = NULL;
}
} }
bool ThriftFramedMessage::MergePartialFromCodedStream( bool ThriftFramedMessage::MergePartialFromCodedStream(
...@@ -185,38 +178,31 @@ void ThriftFramedMessage::SerializeWithCachedSizes( ...@@ -185,38 +178,31 @@ void ThriftFramedMessage::SerializeWithCachedSizes(
} }
int ThriftFramedMessage::ByteSize() const { int ThriftFramedMessage::ByteSize() const {
return sizeof(thrift_head_t) + body.size(); if (_raw_instance) {
LOG(ERROR) << "ByteSize() is always 0 when _raw_instance is set";
return 0;
}
return body.size();
} }
void ThriftFramedMessage::MergeFrom(const ::google::protobuf::Message& from) { void ThriftFramedMessage::MergeFrom(const ::google::protobuf::Message& from) {
GOOGLE_CHECK_NE(&from, this); GOOGLE_CHECK_NE(&from, this);
const ThriftFramedMessage* source = LOG(ERROR) << "ThriftFramedMessage does not support MergeFrom";
::google::protobuf::internal::dynamic_cast_if_available<const ThriftFramedMessage*>(
&from);
if (source == NULL) {
LOG(ERROR) << "Can only merge from ThriftFramedMessage";
return;
} else {
MergeFrom(*source);
}
} }
void ThriftFramedMessage::MergeFrom(const ThriftFramedMessage& from) { void ThriftFramedMessage::MergeFrom(const ThriftFramedMessage& from) {
GOOGLE_CHECK_NE(&from, this); GOOGLE_CHECK_NE(&from, this);
head = from.head; LOG(ERROR) << "ThriftFramedMessage does not support MergeFrom";
body = from.body;
} }
void ThriftFramedMessage::CopyFrom(const ::google::protobuf::Message& from) { void ThriftFramedMessage::CopyFrom(const ::google::protobuf::Message& from) {
if (&from == this) return; if (&from == this) return;
Clear(); LOG(ERROR) << "ThriftFramedMessage does not support CopyFrom";
MergeFrom(from);
} }
void ThriftFramedMessage::CopyFrom(const ThriftFramedMessage& from) { void ThriftFramedMessage::CopyFrom(const ThriftFramedMessage& from) {
if (&from == this) return; if (&from == this) return;
Clear(); LOG(ERROR) << "ThriftFramedMessage does not support CopyFrom";
MergeFrom(from);
} }
bool ThriftFramedMessage::IsInitialized() const { bool ThriftFramedMessage::IsInitialized() const {
...@@ -225,10 +211,10 @@ bool ThriftFramedMessage::IsInitialized() const { ...@@ -225,10 +211,10 @@ bool ThriftFramedMessage::IsInitialized() const {
void ThriftFramedMessage::Swap(ThriftFramedMessage* other) { void ThriftFramedMessage::Swap(ThriftFramedMessage* other) {
if (other != this) { if (other != this) {
const thrift_head_t tmp = other->head;
other->head = head;
head = tmp;
body.swap(other->body); body.swap(other->body);
std::swap(field_id, other->field_id);
std::swap(_own_raw_instance, other->_own_raw_instance);
std::swap(_raw_instance, other->_raw_instance);
} }
} }
...@@ -240,5 +226,55 @@ void ThriftFramedMessage::Swap(ThriftFramedMessage* other) { ...@@ -240,5 +226,55 @@ void ThriftFramedMessage::Swap(ThriftFramedMessage* other) {
return metadata; return metadata;
} }
// A wrapper closure to own the additional response required by ThriftStub
class ThriftFramedMessageAndDone : public ::google::protobuf::Closure {
public:
explicit ThriftFramedMessageAndDone(::google::protobuf::Closure* done)
: _done(done) {}
void Run() override { _done->Run(); }
ThriftFramedMessage response;
private:
::google::protobuf::Closure* _done;
};
void ThriftStub::CallMethod(const char* method_name,
Controller* cntl,
const ::apache::thrift::TBase* raw_request,
::apache::thrift::TBase* raw_response,
::google::protobuf::Closure* done) {
ControllerPrivateAccessor(cntl).mutable_thrift_method_name()->assign(method_name);
ThriftFramedMessage request;
request._own_raw_instance = false;
request._raw_instance = const_cast<::apache::thrift::TBase*>(raw_request);
if (done == NULL) {
// response is guaranteed to be unused after a synchronous RPC, no
// need to allocate it on heap.
ThriftFramedMessage response;
response._own_raw_instance = false;
response._raw_instance = raw_response;
_channel->CallMethod(NULL, cntl, &request, &response, NULL);
} else {
// Let the new_done own the response and release it after Run().
ThriftFramedMessageAndDone* new_done = new ThriftFramedMessageAndDone(done);
new_done->response._own_raw_instance = false;
new_done->response._raw_instance = raw_response;
_channel->CallMethod(NULL, cntl, &request, &new_done->response, new_done);
}
}
void ThriftStub::CallMethod(const char* method_name,
Controller* cntl,
const ThriftFramedMessage* req,
ThriftFramedMessage* res,
::google::protobuf::Closure* done) {
ControllerPrivateAccessor(cntl).mutable_thrift_method_name()->assign(method_name);
_channel->CallMethod(NULL, cntl, req, res, done);
}
} // namespace brpc } // namespace brpc
...@@ -27,10 +27,11 @@ ...@@ -27,10 +27,11 @@
#include <google/protobuf/generated_message_reflection.h> #include <google/protobuf/generated_message_reflection.h>
#include "google/protobuf/descriptor.pb.h" #include "google/protobuf/descriptor.pb.h"
#include "brpc/details/thrift_utils.h"
#include "butil/iobuf.h" #include "butil/iobuf.h"
#include "brpc/channel_base.h"
#include "brpc/controller.h"
#include <thrift/protocol/TBinaryProtocol.h> #include <thrift/TBase.h>
namespace brpc { namespace brpc {
...@@ -39,33 +40,35 @@ void protobuf_AddDesc_baidu_2frpc_2fthrift_framed_5fmessage_2eproto(); ...@@ -39,33 +40,35 @@ void protobuf_AddDesc_baidu_2frpc_2fthrift_framed_5fmessage_2eproto();
void protobuf_AssignDesc_baidu_2frpc_2fthrift_framed_5fmessage_2eproto(); void protobuf_AssignDesc_baidu_2frpc_2fthrift_framed_5fmessage_2eproto();
void protobuf_ShutdownFile_baidu_2frpc_2fthrift_framed_5fmessage_2eproto(); void protobuf_ShutdownFile_baidu_2frpc_2fthrift_framed_5fmessage_2eproto();
static const int32_t THRIFT_HEAD_VERSION_MASK = (int32_t)0xffffff00; class ThriftStub;
static const int32_t THRIFT_HEAD_VERSION_1 = (int32_t)0x80010000;
struct thrift_head_t { static const int16_t THRIFT_INVALID_FID = -1;
int32_t body_len; static const int16_t THRIFT_REQUEST_FID = 1;
}; static const int16_t THRIFT_RESPONSE_FID = 0;
// Representing a thrift framed request or response. // Representing a thrift framed request or response.
class ThriftFramedMessage : public ::google::protobuf::Message { class ThriftFramedMessage : public ::google::protobuf::Message {
friend class ThriftStub;
public: public:
thrift_head_t head; butil::IOBuf body; // ~= "{ raw_instance }"
butil::IOBuf body; int16_t field_id; // must be set when body is set.
void (*thrift_raw_instance_deleter) (void*);
uint32_t (*thrift_raw_instance_writer) (void*, void*); private:
void* thrift_raw_instance; bool _own_raw_instance;
::apache::thrift::TBase* _raw_instance;
int32_t thrift_message_seq_id;
public: public:
::apache::thrift::TBase* raw_instance() const { return _raw_instance; }
template <typename T> T* Cast();
ThriftFramedMessage(); ThriftFramedMessage();
virtual ~ThriftFramedMessage(); virtual ~ThriftFramedMessage();
ThriftFramedMessage(const ThriftFramedMessage& from); ThriftFramedMessage(const ThriftFramedMessage& from) = delete;
inline ThriftFramedMessage& operator=(const ThriftFramedMessage& from) { ThriftFramedMessage& operator=(const ThriftFramedMessage& from) = delete;
CopyFrom(from);
return *this;
}
static const ::google::protobuf::Descriptor* descriptor(); static const ::google::protobuf::Descriptor* descriptor();
static const ThriftFramedMessage& default_instance(); static const ThriftFramedMessage& default_instance();
...@@ -91,29 +94,6 @@ public: ...@@ -91,29 +94,6 @@ public:
int GetCachedSize() const { return ByteSize(); } int GetCachedSize() const { return ByteSize(); }
::google::protobuf::Metadata GetMetadata() const; ::google::protobuf::Metadata GetMetadata() const;
virtual uint32_t write(void* /*oprot*/) { return 0;}
virtual uint32_t read(void* /*iprot*/) { return 0;}
template<typename T>
T* Cast() {
thrift_raw_instance = new T;
assert(thrift_raw_instance);
// serialize binary thrift message to thrift struct request
// for response, we just return the new instance and deserialize it in Closure
if (body.size() > 0 ) {
if (serialize_iobuf_to_thrift_message<T>(body, thrift_raw_instance, &thrift_message_seq_id)) {
} else {
delete static_cast<T*>(thrift_raw_instance);
return nullptr;
}
}
thrift_raw_instance_deleter = &thrift_framed_message_deleter<T>;
thrift_raw_instance_writer = &thrift_framed_message_writer<T>;
return static_cast<T*>(thrift_raw_instance);
}
private: private:
void SharedCtor(); void SharedCtor();
void SharedDtor(); void SharedDtor();
...@@ -127,37 +107,53 @@ friend void protobuf_ShutdownFile_baidu_2frpc_2fthrift_framed_5fmessage_2eproto( ...@@ -127,37 +107,53 @@ friend void protobuf_ShutdownFile_baidu_2frpc_2fthrift_framed_5fmessage_2eproto(
static ThriftFramedMessage* default_instance_; static ThriftFramedMessage* default_instance_;
}; };
template <typename T> class ThriftStub {
class ThriftMessage : public ThriftFramedMessage {
public: public:
ThriftMessage() { explicit ThriftStub(ChannelBase* channel) : _channel(channel) {}
thrift_message_ = new T;
assert(thrift_message_ != nullptr);
}
virtual ~ThriftMessage() { delete thrift_message_; } void CallMethod(const char* method_name,
Controller* cntl,
const ::apache::thrift::TBase* raw_request,
::apache::thrift::TBase* raw_response,
::google::protobuf::Closure* done);
ThriftMessage<T>& operator= (const ThriftMessage<T>& other) { void CallMethod(const char* method_name,
*thrift_message_ = *(other.thrift_message_); Controller* cntl,
return *this; const ThriftFramedMessage* req,
} ThriftFramedMessage* res,
::google::protobuf::Closure* done);
virtual uint32_t write(void* oprot) { private:
return thrift_message_->write(static_cast<::apache::thrift::protocol::TProtocol*>(oprot)); ChannelBase* _channel;
} };
virtual uint32_t read(void* iprot) { namespace policy {
return thrift_message_->read(static_cast<::apache::thrift::protocol::TProtocol*>(iprot)); // Implemented in policy/thrift_protocol.cpp
} bool ReadThriftStruct(const butil::IOBuf& body,
::apache::thrift::TBase* raw_msg,
int16_t expected_fid);
}
T& raw() { template <typename T>
return *thrift_message_; T* ThriftFramedMessage::Cast() {
if (_raw_instance) {
T* p = dynamic_cast<T*>(_raw_instance);
if (p) {
return p;
}
delete p;
} }
T* raw_msg = new T;
_raw_instance = raw_msg;
_own_raw_instance = true;
private: if (!body.empty()) {
T* thrift_message_; if (!policy::ReadThriftStruct(body, raw_msg, field_id)) {
}; LOG(ERROR) << "Fail to read xxx";
}
}
return raw_msg;
}
} // namespace brpc } // namespace brpc
......
...@@ -20,19 +20,11 @@ ...@@ -20,19 +20,11 @@
namespace brpc { namespace brpc {
ThriftService::ThriftService() : _additional_space(0) { ThriftService::ThriftService() {
_status = new (std::nothrow) MethodStatus; _status = new (std::nothrow) MethodStatus;
LOG_IF(FATAL, _status == NULL) << "Fail to new MethodStatus"; LOG_IF(FATAL, _status == NULL) << "Fail to new MethodStatus";
} }
ThriftService::ThriftService(const ThriftServiceOptions& options)
: _status(NULL), _additional_space(options.additional_space) {
if (options.generate_status) {
_status = new (std::nothrow) MethodStatus;
LOG_IF(FATAL, _status == NULL) << "Fail to new MethodStatus";
}
}
ThriftService::~ThriftService() { ThriftService::~ThriftService() {
delete _status; delete _status;
_status = NULL; _status = NULL;
...@@ -43,15 +35,15 @@ void ThriftService::Describe(std::ostream &os, const DescribeOptions&) const { ...@@ -43,15 +35,15 @@ void ThriftService::Describe(std::ostream &os, const DescribeOptions&) const {
} }
void ThriftService::Expose(const butil::StringPiece& prefix) { void ThriftService::Expose(const butil::StringPiece& prefix) {
_cached_name = butil::class_name_str(*this);
if (_status == NULL) { if (_status == NULL) {
return; return;
} }
std::string s; std::string s;
s.reserve(prefix.size() + 1 + _cached_name.size()); const std::string& cached_name = butil::class_name_str(*this);
s.reserve(prefix.size() + 1 + cached_name.size());
s.append(prefix.data(), prefix.size()); s.append(prefix.data(), prefix.size());
s.push_back('_'); s.push_back('_');
s.append(_cached_name); s.append(cached_name);
_status->Expose(s); _status->Expose(s);
} }
......
...@@ -21,7 +21,6 @@ ...@@ -21,7 +21,6 @@
#include "brpc/thrift_message.h" // ThriftFramedMessage #include "brpc/thrift_message.h" // ThriftFramedMessage
#include "brpc/describable.h" #include "brpc/describable.h"
namespace brpc { namespace brpc {
class Socket; class Socket;
...@@ -29,63 +28,14 @@ class Server; ...@@ -29,63 +28,14 @@ class Server;
class MethodStatus; class MethodStatus;
class StatusService; class StatusService;
namespace policy { namespace policy {
class ThriftClosure;
void ProcessThriftRequest(InputMessageBase* msg_base); void ProcessThriftRequest(InputMessageBase* msg_base);
} }
// The continuation of request processing. Namely send response back to client.
// NOTE: you DON'T need to inherit this class or create instance of this class.
class ThriftClosure : public google::protobuf::Closure {
public:
explicit ThriftClosure(void* additional_space);
// [Required] Call this to send response back to the client.
void Run();
// [Optional] Set the full method name. If unset, use name of the service.
void SetMethodName(const std::string& full_method_name);
// The space required by subclass at ThriftServiceOptions. subclass may
// utilizes this feature to save the cost of allocating closure separately.
// If subclass does not require space, this return value is NULL.
void* additional_space() { return _additional_space; }
// The starting time of the RPC, got from butil::cpuwide_time_us().
int64_t cpuwide_start_us() const { return _start_parse_us; }
// Don't send response back, used by MIMO.
void DoNotRespond();
private:
friend void policy::ProcessThriftRequest(InputMessageBase* msg_base);
friend class DeleteThriftClosure;
// Only callable by Run().
~ThriftClosure();
Socket* _socket_ptr;
const Server* _server;
int64_t _start_parse_us;
ThriftFramedMessage _request;
ThriftFramedMessage _response;
bool _do_respond;
void* _additional_space;
Controller _controller;
};
struct ThriftServiceOptions {
ThriftServiceOptions() : generate_status(true), additional_space(0) {}
ThriftServiceOptions(bool generate_status2, size_t additional_space2)
: generate_status(generate_status2)
, additional_space(additional_space2) {}
bool generate_status;
size_t additional_space;
};
// Inherit this class to let brpc server understands thrift_binary requests. // Inherit this class to let brpc server understands thrift_binary requests.
class ThriftService : public Describable { class ThriftService : public Describable {
public: public:
ThriftService(); ThriftService();
ThriftService(const ThriftServiceOptions&);
virtual ~ThriftService(); virtual ~ThriftService();
// Implement this method to handle thrift_binary requests. Notice that this // Implement this method to handle thrift_binary requests. Notice that this
...@@ -93,23 +43,22 @@ public: ...@@ -93,23 +43,22 @@ public:
// request before calling this method), in which case the implemenetation // request before calling this method), in which case the implemenetation
// shall send specific response with error information back to client. // shall send specific response with error information back to client.
// Parameters: // Parameters:
// server The server receiving the request.
// controller per-rpc settings. // controller per-rpc settings.
// request The thrift_binary request received. // request The thrift_binary request received.
// response The thrift_binary response that you should fill in. // response The thrift_binary response that you should fill in.
// done You must call done->Run() to end the processing. // done You must call done->Run() to end the processing.
virtual void ProcessThriftFramedRequest(const Server& server, virtual void ProcessThriftFramedRequest(
Controller* controller, Controller* controller,
ThriftFramedMessage* request, ThriftFramedMessage* request,
ThriftFramedMessage* response, ThriftFramedMessage* response,
ThriftClosure* done) = 0; ::google::protobuf::Closure* done) = 0;
// Put descriptions into the stream. // Put descriptions into the stream.
void Describe(std::ostream &os, const DescribeOptions&) const; void Describe(std::ostream &os, const DescribeOptions&) const;
private: private:
DISALLOW_COPY_AND_ASSIGN(ThriftService); DISALLOW_COPY_AND_ASSIGN(ThriftService);
friend class ThriftClosure; friend class policy::ThriftClosure;
friend void policy::ProcessThriftRequest(InputMessageBase* msg_base); friend void policy::ProcessThriftRequest(InputMessageBase* msg_base);
friend class StatusService; friend class StatusService;
friend class Server; friend class Server;
...@@ -117,14 +66,9 @@ friend class Server; ...@@ -117,14 +66,9 @@ friend class Server;
private: private:
void Expose(const butil::StringPiece& prefix); void Expose(const butil::StringPiece& prefix);
// Tracking status of non ThriftPbService
MethodStatus* _status; MethodStatus* _status;
size_t _additional_space;
std::string _cached_name;
}; };
} // namespace brpc } // namespace brpc
#endif // BRPC_THRIFT_SERVICE_H #endif // BRPC_THRIFT_SERVICE_H
...@@ -557,11 +557,12 @@ static bool read_disk_stat(DiskStat* s) { ...@@ -557,11 +557,12 @@ static bool read_disk_stat(DiskStat* s) {
PLOG(WARNING) << "Fail to fscanf"; PLOG(WARNING) << "Fail to fscanf";
return false; return false;
} }
return true;
#elif defined(OS_MACOSX) #elif defined(OS_MACOSX)
// TODO(zhujiashun) // TODO(zhujiashun)
return true; return false;
#else #else
return true; return false;
#endif #endif
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment