Unverified Commit 33604cea authored by Ge Jun's avatar Ge Jun Committed by GitHub

Merge pull request #397 from kenshinxf/thrift

Fix thrift method name issue
parents 1a3cb1e7 bab70ab0
...@@ -97,7 +97,7 @@ public: ...@@ -97,7 +97,7 @@ public:
example::EchoRequest* req = request->Cast<example::EchoRequest>(); example::EchoRequest* req = request->Cast<example::EchoRequest>();
example::EchoResponse* res = response->Cast<example::EchoResponse>(); example::EchoResponse* res = response->Cast<example::EchoResponse>();
       // 通过request->method_name()获得被访问的方法名, 必须在Cast()被调用之后        // 通过cntl->thrift_method_name()获得被访问的方法名
       if (_native_handler) {        if (_native_handler) {
_native_handler->Echo(*res, *req); _native_handler->Echo(*res, *req);
} else { } else {
......
...@@ -97,7 +97,7 @@ public: ...@@ -97,7 +97,7 @@ public:
example::EchoRequest* req = request->Cast<example::EchoRequest>(); example::EchoRequest* req = request->Cast<example::EchoRequest>();
example::EchoResponse* res = response->Cast<example::EchoResponse>(); example::EchoResponse* res = response->Cast<example::EchoResponse>();
       // Get method-name for thrift via request->method_name() after Cast() was called        // Get method-name for thrift by cntl->thrift_method_name();
       if (_native_handler) {        if (_native_handler) {
_native_handler->Echo(*res, *req); _native_handler->Echo(*res, *req);
} else { } else {
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
#include <butil/logging.h> #include <butil/logging.h>
#include <brpc/server.h> #include <brpc/server.h>
#include <brpc/thrift_service.h> #include <brpc/thrift_service.h>
#include <brpc/details/thrift_utils.h>
#include <thrift/protocol/TBinaryProtocol.h> #include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/transport/TBufferTransports.h> #include <thrift/transport/TBufferTransports.h>
...@@ -65,6 +65,7 @@ public: ...@@ -65,6 +65,7 @@ public:
return; return;
} }
// get method name by cntl->thrift_method_name() if needed
example::EchoRequest* req = request->Cast<example::EchoRequest>(); example::EchoRequest* req = request->Cast<example::EchoRequest>();
example::EchoResponse* res = response->Cast<example::EchoResponse>(); example::EchoResponse* res = response->Cast<example::EchoResponse>();
...@@ -78,6 +79,8 @@ public: ...@@ -78,6 +79,8 @@ public:
} else { } else {
res->data = req->data + " (processed directly)"; res->data = req->data + " (processed directly)";
} }
} }
private: private:
......
...@@ -454,7 +454,7 @@ public: ...@@ -454,7 +454,7 @@ public:
void set_thrift_method_name(const std::string& method_name) { void set_thrift_method_name(const std::string& method_name) {
_thrift_method_name = method_name; _thrift_method_name = method_name;
} }
std::string thrift_method_name() { return _thrift_method_name; } const std::string& thrift_method_name() { return _thrift_method_name; }
private: private:
struct CompletionInfo { struct CompletionInfo {
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#define BRPC_THRIFT_UTILS_H #define BRPC_THRIFT_UTILS_H
#include "butil/iobuf.h" #include "butil/iobuf.h"
#include "butil/logging.h"
#include <thrift/TDispatchProcessor.h> #include <thrift/TDispatchProcessor.h>
#include <thrift/transport/TBufferTransports.h> #include <thrift/transport/TBufferTransports.h>
...@@ -49,8 +50,8 @@ uint32_t thrift_framed_message_writer(void* p, void* prot) { ...@@ -49,8 +50,8 @@ uint32_t thrift_framed_message_writer(void* p, void* prot) {
} }
template<typename T> template<typename T>
bool serialize_iobuf_to_thrift_message(butil::IOBuf& body, bool serialize_iobuf_to_thrift_message(const butil::IOBuf& body,
void* thrift_raw_instance, std::string* method_name, int32_t* thrift_message_seq_id) { void* thrift_raw_instance, int32_t* thrift_message_seq_id) {
auto in_buffer = auto in_buffer =
THRIFT_STDCXX::make_shared<apache::thrift::transport::TMemoryBuffer>(); THRIFT_STDCXX::make_shared<apache::thrift::transport::TMemoryBuffer>();
...@@ -73,7 +74,7 @@ bool serialize_iobuf_to_thrift_message(butil::IOBuf& body, ...@@ -73,7 +74,7 @@ bool serialize_iobuf_to_thrift_message(butil::IOBuf& body,
std::string fname; std::string fname;
::apache::thrift::protocol::TMessageType mtype; ::apache::thrift::protocol::TMessageType mtype;
in_portocol->readMessageBegin(*method_name, mtype, *thrift_message_seq_id); in_portocol->readMessageBegin(fname, mtype, *thrift_message_seq_id);
apache::thrift::protocol::TInputRecursionTracker tracker(*in_portocol); apache::thrift::protocol::TInputRecursionTracker tracker(*in_portocol);
uint32_t xfer = 0; uint32_t xfer = 0;
......
...@@ -47,11 +47,43 @@ ...@@ -47,11 +47,43 @@
#endif #endif
extern "C" { extern "C" {
void bthread_assign_data(void* data); void bthread_assign_data(void* data) __THROW;
} }
namespace brpc { namespace brpc {
static int32_t parse_thrift_method_name(const butil::IOBuf& body, std::string* method_name) {
// Thrift protocol format:
// Version + Message type + Length + Method + Sequence Id
// | | | | |
// 2 + 2 + 4 + >0 + 4
if (body.size() < 12) {
LOG(ERROR) << "No Enough data to get method name, request body size: " << body.size();
return -1;
}
char version_and_len_buf[8];
size_t k = body.copy_to(version_and_len_buf, sizeof(version_and_len_buf));
if (k != sizeof(version_and_len_buf) ) {
LOG(ERROR) << "copy "<< sizeof(version_and_len_buf) << " bytes from body failed";
return -1;
}
uint32_t method_name_length = ntohl(*(int32_t*)(version_and_len_buf + 4));
char fname[method_name_length];
k = body.copy_to(fname, method_name_length, sizeof(version_and_len_buf));
if ( k != method_name_length) {
LOG(ERROR) << "copy " << method_name_length << " bytes from body failed";
return -1;
}
method_name->assign(fname, method_name_length);
return sizeof(version_and_len_buf) + method_name_length;
}
ThriftClosure::ThriftClosure(void* additional_space) ThriftClosure::ThriftClosure(void* additional_space)
: _socket_ptr(NULL) : _socket_ptr(NULL)
, _server(NULL) , _server(NULL)
...@@ -112,7 +144,7 @@ void ThriftClosure::Run() { ...@@ -112,7 +144,7 @@ void ThriftClosure::Run() {
_response.head = _request.head; _response.head = _request.head;
if (_response.thrift_raw_instance) { if (_response.thrift_raw_instance) {
std::string method_name = _request.method_name; const std::string& method_name = _controller.thrift_method_name();
if (method_name == "" || if (method_name == "" ||
method_name.length() < 1 || method_name.length() < 1 ||
method_name[0] == ' ') { method_name[0] == ' ') {
...@@ -248,6 +280,15 @@ struct CallMethodInBackupThreadArgs { ...@@ -248,6 +280,15 @@ struct CallMethodInBackupThreadArgs {
static void CallMethodInBackupThread(void* void_args) { static void CallMethodInBackupThread(void* void_args) {
CallMethodInBackupThreadArgs* args = (CallMethodInBackupThreadArgs*)void_args; CallMethodInBackupThreadArgs* args = (CallMethodInBackupThreadArgs*)void_args;
std::string method_name;
if (parse_thrift_method_name(args->request->body, &method_name) < 0) {
LOG(ERROR) << "Fail to get thrift method name";
delete args;
return;
}
args->controller->set_thrift_method_name(method_name);
args->service->ProcessThriftFramedRequest(*args->server, args->controller, args->service->ProcessThriftFramedRequest(*args->server, args->controller,
args->request, args->response, args->request, args->response,
args->done); args->done);
...@@ -380,22 +421,37 @@ void ProcessThriftRequest(InputMessageBase* msg_base) { ...@@ -380,22 +421,37 @@ void ProcessThriftRequest(InputMessageBase* msg_base) {
span->AsParent(); span->AsParent();
} }
try { std::string method_name;
if (!FLAGS_usercode_in_pthread) { if (parse_thrift_method_name(req->body, &method_name) < 0) {
cntl->SetFailed(EREQUEST, "Fail to get thrift method name!");
return;
}
cntl->set_thrift_method_name(method_name);
if (!FLAGS_usercode_in_pthread) {
try {
return service->ProcessThriftFramedRequest(*server, cntl, return service->ProcessThriftFramedRequest(*server, cntl,
req, res, thrift_done); req, res, thrift_done);
} catch (::apache::thrift::TException& e) {
cntl->SetFailed(EREQUEST, "Invalid request data, reason: %s", e.what());
} catch (...) {
cntl->SetFailed(EINTERNAL, "Internal server error!");
} }
if (BeginRunningUserCode()) {
}
if (BeginRunningUserCode()) {
try {
service->ProcessThriftFramedRequest(*server, cntl, req, res, thrift_done); service->ProcessThriftFramedRequest(*server, cntl, req, res, thrift_done);
return EndRunningUserCodeInPlace(); } catch (::apache::thrift::TException& e) {
} else { cntl->SetFailed(EREQUEST, "Invalid request data, reason: %s", e.what());
return EndRunningCallMethodInPool( } catch (...) {
service, *server, cntl, req, res, thrift_done); cntl->SetFailed(EINTERNAL, "Internal server error!");
} }
} catch (::apache::thrift::TException& e) { return EndRunningUserCodeInPlace();
cntl->SetFailed(EREQUEST, "Invalid request data, reason: %s", e.what()); } else {
} catch (...) { return EndRunningCallMethodInPool(
cntl->SetFailed(EINTERNAL, "Internal server error!"); service, *server, cntl, req, res, thrift_done);
} }
} }
......
...@@ -125,8 +125,6 @@ void ThriftFramedMessage::SharedCtor() { ...@@ -125,8 +125,6 @@ void ThriftFramedMessage::SharedCtor() {
thrift_raw_instance_deleter = nullptr; thrift_raw_instance_deleter = nullptr;
thrift_raw_instance = nullptr; thrift_raw_instance = nullptr;
thrift_message_seq_id = 0; thrift_message_seq_id = 0;
method_name = "";
//RegisterThriftProtocolDummy dummy;
} }
ThriftFramedMessage::~ThriftFramedMessage() { ThriftFramedMessage::~ThriftFramedMessage() {
......
...@@ -55,7 +55,6 @@ public: ...@@ -55,7 +55,6 @@ public:
void* thrift_raw_instance; void* thrift_raw_instance;
int32_t thrift_message_seq_id; int32_t thrift_message_seq_id;
std::string method_name;
public: public:
ThriftFramedMessage(); ThriftFramedMessage();
...@@ -103,8 +102,7 @@ public: ...@@ -103,8 +102,7 @@ public:
// serialize binary thrift message to thrift struct request // serialize binary thrift message to thrift struct request
// for response, we just return the new instance and deserialize it in Closure // for response, we just return the new instance and deserialize it in Closure
if (body.size() > 0 ) { if (body.size() > 0 ) {
if (serialize_iobuf_to_thrift_message<T>(body, thrift_raw_instance, if (serialize_iobuf_to_thrift_message<T>(body, thrift_raw_instance, &thrift_message_seq_id)) {
&method_name, &thrift_message_seq_id)) {
} else { } else {
delete static_cast<T*>(thrift_raw_instance); delete static_cast<T*>(thrift_raw_instance);
return nullptr; return nullptr;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment