Commit 0542ba7f authored by Ge Jun's avatar Ge Jun

thrift code does not depend on TBase which is absent in thrift 0.9.3

parent 58551f87
...@@ -61,9 +61,11 @@ class RpcDumpMeta; ...@@ -61,9 +61,11 @@ class RpcDumpMeta;
class MongoContext; class MongoContext;
class RetryPolicy; class RetryPolicy;
class InputMessageBase; class InputMessageBase;
class ThriftStub;
namespace policy { namespace policy {
class OnServerStreamCreated; class OnServerStreamCreated;
void ProcessMongoRequest(InputMessageBase*); void ProcessMongoRequest(InputMessageBase*);
void ProcessThriftRequest(InputMessageBase*);
} }
namespace schan { namespace schan {
class Sender; class Sender;
...@@ -102,12 +104,14 @@ friend class ParallelChannelDone; ...@@ -102,12 +104,14 @@ friend class ParallelChannelDone;
friend class ControllerPrivateAccessor; friend class ControllerPrivateAccessor;
friend class ServerPrivateAccessor; friend class ServerPrivateAccessor;
friend class SelectiveChannel; friend class SelectiveChannel;
friend class ThriftStub;
friend class schan::Sender; friend class schan::Sender;
friend class schan::SubDone; friend class schan::SubDone;
friend class policy::OnServerStreamCreated; friend class policy::OnServerStreamCreated;
friend int StreamCreate(StreamId*, Controller&, const StreamOptions*); friend int StreamCreate(StreamId*, Controller&, const StreamOptions*);
friend int StreamAccept(StreamId*, Controller&, const StreamOptions*); friend int StreamAccept(StreamId*, Controller&, const StreamOptions*);
friend void policy::ProcessMongoRequest(InputMessageBase*); friend void policy::ProcessMongoRequest(InputMessageBase*);
friend void policy::ProcessThriftRequest(InputMessageBase*);
// << Flags >> // << Flags >>
static const uint32_t FLAGS_IGNORE_EOVERCROWDED = 1; static const uint32_t FLAGS_IGNORE_EOVERCROWDED = 1;
static const uint32_t FLAGS_SECURITY_MODE = (1 << 1); static const uint32_t FLAGS_SECURITY_MODE = (1 << 1);
......
...@@ -129,8 +129,6 @@ public: ...@@ -129,8 +129,6 @@ public:
_cntl->add_flag(Controller::FLAGS_REQUEST_WITH_AUTH); _cntl->add_flag(Controller::FLAGS_REQUEST_WITH_AUTH);
} }
std::string* mutable_thrift_method_name() { return &_cntl->_thrift_method_name; }
private: private:
Controller* _cntl; Controller* _cntl;
}; };
......
...@@ -116,7 +116,7 @@ WriteThriftMessageBegin(char* buf, ...@@ -116,7 +116,7 @@ WriteThriftMessageBegin(char* buf,
} }
bool ReadThriftStruct(const butil::IOBuf& body, bool ReadThriftStruct(const butil::IOBuf& body,
::apache::thrift::TBase* raw_msg, ThriftMessageBase* raw_msg,
int16_t expected_fid) { int16_t expected_fid) {
const size_t body_len = body.size(); const size_t body_len = body.size();
uint8_t* thrift_buffer = new uint8_t[body_len]; uint8_t* thrift_buffer = new uint8_t[body_len];
...@@ -143,7 +143,7 @@ bool ReadThriftStruct(const butil::IOBuf& body, ...@@ -143,7 +143,7 @@ bool ReadThriftStruct(const butil::IOBuf& body,
} }
if (fid == expected_fid) { if (fid == expected_fid) {
if (ftype == ::apache::thrift::protocol::T_STRUCT) { if (ftype == ::apache::thrift::protocol::T_STRUCT) {
xfer += raw_msg->read(&iprot); xfer += raw_msg->Read(&iprot);
success = true; success = true;
} else { } else {
xfer += iprot.skip(ftype); xfer += iprot.skip(ftype);
...@@ -299,7 +299,7 @@ void ThriftClosure::DoRun() { ...@@ -299,7 +299,7 @@ void ThriftClosure::DoRun() {
xfer += oprot.writeFieldBegin("success", xfer += oprot.writeFieldBegin("success",
::apache::thrift::protocol::T_STRUCT, ::apache::thrift::protocol::T_STRUCT,
THRIFT_RESPONSE_FID); THRIFT_RESPONSE_FID);
xfer += _response.raw_instance()->write(&oprot); xfer += _response.raw_instance()->Write(&oprot);
xfer += oprot.writeFieldEnd(); xfer += oprot.writeFieldEnd();
xfer += oprot.writeFieldStop(); xfer += oprot.writeFieldStop();
xfer += oprot.writeStructEnd(); xfer += oprot.writeStructEnd();
...@@ -457,7 +457,7 @@ void ProcessThriftRequest(InputMessageBase* msg_base) { ...@@ -457,7 +457,7 @@ void ProcessThriftRequest(InputMessageBase* msg_base) {
uint32_t seq_id; uint32_t seq_id;
::apache::thrift::protocol::TMessageType mtype; ::apache::thrift::protocol::TMessageType mtype;
butil::Status st = ReadThriftMessageBegin( butil::Status st = ReadThriftMessageBegin(
&msg->payload, accessor.mutable_thrift_method_name(), &mtype, &seq_id); &msg->payload, &cntl->_thrift_method_name, &mtype, &seq_id);
if (!st.ok()) { if (!st.ok()) {
cntl->SetFailed(EREQUEST, "%s", st.error_cstr()); cntl->SetFailed(EREQUEST, "%s", st.error_cstr());
return thrift_done->Run(); return thrift_done->Run();
...@@ -680,7 +680,7 @@ void SerializeThriftRequest(butil::IOBuf* request_buf, Controller* cntl, ...@@ -680,7 +680,7 @@ void SerializeThriftRequest(butil::IOBuf* request_buf, Controller* cntl,
THRIFT_REQUEST_FID); THRIFT_REQUEST_FID);
// request's write // request's write
xfer += req->raw_instance()->write(&oprot); xfer += req->raw_instance()->Write(&oprot);
xfer += oprot.writeFieldEnd(); xfer += oprot.writeFieldEnd();
xfer += oprot.writeFieldStop(); xfer += oprot.writeFieldStop();
......
...@@ -226,53 +226,12 @@ void ThriftFramedMessage::Swap(ThriftFramedMessage* other) { ...@@ -226,53 +226,12 @@ void ThriftFramedMessage::Swap(ThriftFramedMessage* other) {
return metadata; return metadata;
} }
// A wrapper closure to own the additional response required by ThriftStub
class ThriftFramedMessageAndDone : public ::google::protobuf::Closure {
public:
explicit ThriftFramedMessageAndDone(::google::protobuf::Closure* done)
: _done(done) {}
void Run() override { _done->Run(); }
ThriftFramedMessage response;
private:
::google::protobuf::Closure* _done;
};
void ThriftStub::CallMethod(const char* method_name,
Controller* cntl,
const ::apache::thrift::TBase* raw_request,
::apache::thrift::TBase* raw_response,
::google::protobuf::Closure* done) {
ControllerPrivateAccessor(cntl).mutable_thrift_method_name()->assign(method_name);
ThriftFramedMessage request;
request._own_raw_instance = false;
request._raw_instance = const_cast<::apache::thrift::TBase*>(raw_request);
if (done == NULL) {
// response is guaranteed to be unused after a synchronous RPC, no
// need to allocate it on heap.
ThriftFramedMessage response;
response._own_raw_instance = false;
response._raw_instance = raw_response;
_channel->CallMethod(NULL, cntl, &request, &response, NULL);
} else {
// Let the new_done own the response and release it after Run().
ThriftFramedMessageAndDone* new_done = new ThriftFramedMessageAndDone(done);
new_done->response._own_raw_instance = false;
new_done->response._raw_instance = raw_response;
_channel->CallMethod(NULL, cntl, &request, &new_done->response, new_done);
}
}
void ThriftStub::CallMethod(const char* method_name, void ThriftStub::CallMethod(const char* method_name,
Controller* cntl, Controller* cntl,
const ThriftFramedMessage* req, const ThriftFramedMessage* req,
ThriftFramedMessage* res, ThriftFramedMessage* res,
::google::protobuf::Closure* done) { ::google::protobuf::Closure* done) {
ControllerPrivateAccessor(cntl).mutable_thrift_method_name()->assign(method_name); cntl->_thrift_method_name.assign(method_name);
_channel->CallMethod(NULL, cntl, req, res, done); _channel->CallMethod(NULL, cntl, req, res, done);
} }
......
...@@ -28,10 +28,18 @@ ...@@ -28,10 +28,18 @@
#include "google/protobuf/descriptor.pb.h" #include "google/protobuf/descriptor.pb.h"
#include "butil/iobuf.h" #include "butil/iobuf.h"
#include "butil/class_name.h"
#include "brpc/channel_base.h" #include "brpc/channel_base.h"
#include "brpc/controller.h" #include "brpc/controller.h"
#include <thrift/TBase.h> namespace apache {
namespace thrift {
class TBase;
namespace protocol {
class TProtocol;
}
}
}
namespace brpc { namespace brpc {
...@@ -46,6 +54,16 @@ static const int16_t THRIFT_INVALID_FID = -1; ...@@ -46,6 +54,16 @@ static const int16_t THRIFT_INVALID_FID = -1;
static const int16_t THRIFT_REQUEST_FID = 1; static const int16_t THRIFT_REQUEST_FID = 1;
static const int16_t THRIFT_RESPONSE_FID = 0; static const int16_t THRIFT_RESPONSE_FID = 0;
// Problem: TBase is absent in thrift 0.9.3
// Solution: Wrap native messages with templates into instances inheriting
// from ThriftMessageBase which can be stored and handled uniformly.
class ThriftMessageBase {
public:
virtual ~ThriftMessageBase() {};
virtual uint32_t Read(::apache::thrift::protocol::TProtocol* iprot) = 0;
virtual uint32_t Write(::apache::thrift::protocol::TProtocol* oprot) const = 0;
};
// Representing a thrift framed request or response. // Representing a thrift framed request or response.
class ThriftFramedMessage : public ::google::protobuf::Message { class ThriftFramedMessage : public ::google::protobuf::Message {
friend class ThriftStub; friend class ThriftStub;
...@@ -55,10 +73,10 @@ public: ...@@ -55,10 +73,10 @@ public:
private: private:
bool _own_raw_instance; bool _own_raw_instance;
::apache::thrift::TBase* _raw_instance; ThriftMessageBase* _raw_instance;
public: public:
::apache::thrift::TBase* raw_instance() const { return _raw_instance; } ThriftMessageBase* raw_instance() const { return _raw_instance; }
template <typename T> T* Cast(); template <typename T> T* Cast();
...@@ -111,10 +129,11 @@ class ThriftStub { ...@@ -111,10 +129,11 @@ class ThriftStub {
public: public:
explicit ThriftStub(ChannelBase* channel) : _channel(channel) {} explicit ThriftStub(ChannelBase* channel) : _channel(channel) {}
template <typename REQUEST, typename RESPONSE>
void CallMethod(const char* method_name, void CallMethod(const char* method_name,
Controller* cntl, Controller* cntl,
const ::apache::thrift::TBase* raw_request, const REQUEST* raw_request,
::apache::thrift::TBase* raw_response, RESPONSE* raw_response,
::google::protobuf::Closure* done); ::google::protobuf::Closure* done);
void CallMethod(const char* method_name, void CallMethod(const char* method_name,
...@@ -130,32 +149,106 @@ private: ...@@ -130,32 +149,106 @@ private:
namespace policy { namespace policy {
// Implemented in policy/thrift_protocol.cpp // Implemented in policy/thrift_protocol.cpp
bool ReadThriftStruct(const butil::IOBuf& body, bool ReadThriftStruct(const butil::IOBuf& body,
::apache::thrift::TBase* raw_msg, ThriftMessageBase* raw_msg,
int16_t expected_fid); int16_t expected_fid);
} }
namespace details {
template <typename T>
class ThriftMessageWrapper final : public ThriftMessageBase {
public:
ThriftMessageWrapper() : msg_ptr(NULL) {}
ThriftMessageWrapper(T* msg2) : msg_ptr(msg2) {}
virtual ~ThriftMessageWrapper() {}
// NOTE: "T::" makes the function call work around vtable
uint32_t Read(::apache::thrift::protocol::TProtocol* iprot) override final
{ return msg_ptr->T::read(iprot); }
uint32_t Write(::apache::thrift::protocol::TProtocol* oprot) const override final
{ return msg_ptr->T::write(oprot); }
T* msg_ptr;
};
template <typename T>
class ThriftMessageHolder final : public ThriftMessageBase {
public:
virtual ~ThriftMessageHolder() {}
// NOTE: "T::" makes the function call work around vtable
uint32_t Read(::apache::thrift::protocol::TProtocol* iprot) override final
{ return msg.T::read(iprot); }
uint32_t Write(::apache::thrift::protocol::TProtocol* oprot) const override final
{ return msg.T::write(oprot); }
T msg;
};
// A wrapper closure to own additional stuffs required by ThriftStub
template <typename RESPONSE>
class ThriftDoneWrapper : public ::google::protobuf::Closure {
public:
explicit ThriftDoneWrapper(::google::protobuf::Closure* done)
: _done(done) {}
void Run() override { _done->Run(); }
private:
::google::protobuf::Closure* _done;
public:
ThriftMessageWrapper<RESPONSE> raw_response_wrapper;
ThriftFramedMessage response;
};
} // namespace details
template <typename T> template <typename T>
T* ThriftFramedMessage::Cast() { T* ThriftFramedMessage::Cast() {
if (_raw_instance) { if (_raw_instance) {
T* p = dynamic_cast<T*>(_raw_instance); auto p = dynamic_cast<details::ThriftMessageHolder<T>*>(_raw_instance);
if (p) { if (p) {
return p; return &p->msg;
} }
delete p; delete _raw_instance;
} }
T* raw_msg = new T; auto raw_msg_wrapper = new details::ThriftMessageHolder<T>;
_raw_instance = raw_msg; T* raw_msg = &raw_msg_wrapper->msg;
_raw_instance = raw_msg_wrapper;
_own_raw_instance = true; _own_raw_instance = true;
if (!body.empty()) { if (!body.empty()) {
if (!policy::ReadThriftStruct(body, raw_msg, field_id)) { if (!policy::ReadThriftStruct(body, _raw_instance, field_id)) {
LOG(ERROR) << "Fail to read xxx"; LOG(ERROR) << "Fail to parse " << butil::class_name<T>();
} }
} }
return raw_msg; return raw_msg;
} }
template <typename REQUEST, typename RESPONSE>
void ThriftStub::CallMethod(const char* method_name,
Controller* cntl,
const REQUEST* raw_request,
RESPONSE* raw_response,
::google::protobuf::Closure* done) {
cntl->_thrift_method_name.assign(method_name);
details::ThriftMessageWrapper<REQUEST>
raw_request_wrapper(const_cast<REQUEST*>(raw_request));
ThriftFramedMessage request;
request._raw_instance = &raw_request_wrapper;
if (done == NULL) {
// response is guaranteed to be unused after a synchronous RPC, no
// need to allocate it on heap.
ThriftFramedMessage response;
details::ThriftMessageWrapper<RESPONSE> raw_response_wrapper(raw_response);
response._raw_instance = &raw_response_wrapper;
_channel->CallMethod(NULL, cntl, &request, &response, NULL);
} else {
// Let the new_done own the response and release it after Run().
details::ThriftDoneWrapper<RESPONSE>* new_done =
new details::ThriftDoneWrapper<RESPONSE>(done);
new_done->raw_response_wrapper.msg_ptr = raw_response;
new_done->response._raw_instance = &new_done->raw_response_wrapper;
_channel->CallMethod(NULL, cntl, &request, &new_done->response, new_done);
}
}
} // namespace brpc } // namespace brpc
#endif // BRPC_THRIFT_MESSAGE_H #endif // BRPC_THRIFT_MESSAGE_H
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment