Commit c332d839 authored by wangxuefeng's avatar wangxuefeng

refactor server side code, support raw pb manner way.

parent f4f9dd4b
......@@ -6,7 +6,7 @@ include config.mk
# 2. Added -D__const__= : Avoid over-optimizations of TLS variables by GCC>=4.8
# 3. Removed -Werror: Not block compilation for non-vital warnings, especially when the
# code is tested on newer systems. If the code is used in production, add -Werror back
CPPFLAGS+=-DBTHREAD_USE_FAST_PTHREAD_MUTEX -D__const__= -D_GNU_SOURCE -DUSE_SYMBOLIZE -DNO_TCMALLOC -D__STDC_FORMAT_MACROS -D__STDC_LIMIT_MACROS -D__STDC_CONSTANT_MACROS -DNDEBUG -DBRPC_REVISION=\"$(shell git rev-parse --short HEAD)\"
CPPFLAGS+=-DENABLE_THRIFT_FRAMED_PROTOCOL -DBTHREAD_USE_FAST_PTHREAD_MUTEX -D__const__= -D_GNU_SOURCE -DUSE_SYMBOLIZE -DNO_TCMALLOC -D__STDC_FORMAT_MACROS -D__STDC_LIMIT_MACROS -D__STDC_CONSTANT_MACROS -DNDEBUG -DBRPC_REVISION=\"$(shell git rev-parse --short HEAD)\"
CXXFLAGS=$(CPPFLAGS) -O2 -pipe -Wall -W -fPIC -fstrict-aliasing -Wno-invalid-offsetof -Wno-unused-parameter -fno-omit-frame-pointer -std=c++0x
CFLAGS=$(CPPFLAGS) -O2 -pipe -Wall -W -fPIC -fstrict-aliasing -Wno-unused-parameter -fno-omit-frame-pointer
DEBUG_CXXFLAGS = $(filter-out -DNDEBUG,$(CXXFLAGS)) -DUNIT_TEST -DBVAR_NOT_LINK_DEFAULT_VARIABLES
......
......@@ -67,14 +67,11 @@ int main(int argc, char* argv[]) {
brpc::Controller cntl;
cntl.set_log_id(log_id ++); // set by user
// Thrift Req
example::EchoRequest thrift_request;
example::EchoResponse thrift_response;
thrift_request.data = "hello";
// wrapper thrift raw request into ThriftMessage
brpc::ThriftMessage<example::EchoRequest> req(&thrift_request);
brpc::ThriftMessage<example::EchoResponse> res(&thrift_response);
brpc::ThriftMessage<example::EchoRequest> req;
brpc::ThriftMessage<example::EchoResponse> res;
req.raw().data = "hello";
cntl.set_thrift_method_name("Echo");
......@@ -87,13 +84,14 @@ int main(int argc, char* argv[]) {
g_latency_recorder << cntl.latency_us();
}
LOG(INFO) << "Thrift Res data: " << thrift_response.data;
LOG(INFO) << "Thrift Res data: " << res.raw().data;
LOG_EVERY_SECOND(INFO)
<< "Sending thrift requests at qps=" << g_latency_recorder.qps(1)
<< " latency=" << g_latency_recorder.latency(1);
sleep(1);
}
LOG(INFO) << "EchoClient is going to quit";
......
......@@ -27,6 +27,7 @@
#include "gen-cpp/echo_types.h"
DEFINE_int32(port, 8019, "TCP Port of this server");
DEFINE_int32(port2, 8018, "TCP Port of this server");
DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
"read/write operations during the last `idle_timeout_s'");
DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel");
......@@ -38,7 +39,7 @@ public:
void Echo(example::EchoResponse& res, const example::EchoRequest& req) {
// Process request, just attach a simple string.
res.data = req.data + " world";
LOG(INFO) << "Echo req.data: " << req.data;
//LOG(INFO) << "Echo req.data: " << req.data;
return;
}
......@@ -77,6 +78,43 @@ public:
};
// Adapt your own thrift-based protocol to use brpc
class MyThriftProtocolAnother : public brpc::ThriftFramedService {
public:
void ProcessThriftBinaryRequest(const brpc::Server&,
brpc::Controller* cntl,
const brpc::ThriftBinaryMessage& request,
brpc::ThriftBinaryMessage* response,
brpc::ThriftFramedClosure* done) {
// This object helps you to call done->Run() in RAII style. If you need
// to process the request asynchronously, pass done_guard.release().
brpc::ClosureGuard done_guard(done);
if (cntl->Failed()) {
// NOTE: You can send back a response containing error information
// back to client instead of closing the connection.
cntl->CloseConnection("Close connection due to previous error");
return;
}
brpc::ThriftBinaryMessage request_ref = request;
example::EchoRequest* req = request_ref.cast<example::EchoRequest>();
example::EchoResponse* res = response->cast<example::EchoResponse>();
// MUST set the thrift method name, we need this info when serializing response.
cntl->set_thrift_method_name("Echo");
// process with req and res
res->data = req->data + " world another!";
LOG(INFO) << "success to process thrift request in brpc with pb manner";
}
};
int main(int argc, char* argv[]) {
// Parse gflags. We recommend you to use gflags as well.
google::ParseCommandLineFlags(&argc, &argv, true);
......@@ -93,6 +131,18 @@ int main(int argc, char* argv[]) {
return -1;
}
brpc::Server server2;
brpc::ServerOptions options2;
options2.thrift_service = new MyThriftProtocolAnother;
options2.idle_timeout_sec = FLAGS_idle_timeout_s;
options2.max_concurrency = FLAGS_max_concurrency;
// Start the server2.
if (server2.Start(FLAGS_port2, &options2) != 0) {
LOG(ERROR) << "Fail to start EchoServer";
return -1;
}
// Wait until Ctrl-C is pressed, then Stop() and Join() the server.
server.RunUntilAskedToQuit();
return 0;
......
......@@ -434,11 +434,21 @@ public:
void set_idl_result(int64_t result) { _idl_result = result; }
int64_t idl_result() const { return _idl_result; }
void set_thrift_method_name(std::string& method_name) {
_thrift_method_name = method_name;
}
void set_thrift_method_name(std::string method_name) {
_thrift_method_name = method_name;
}
std::string thrift_method_name() { return _thrift_method_name; }
void set_thrift_seq_id(uint32_t seq_id) {
_thrift_seq_id = seq_id;
}
uint32_t thrift_seq_id() { return _thrift_seq_id; }
private:
struct CompletionInfo {
CallId id; // call_id of the corresponding request
......@@ -672,6 +682,7 @@ private:
// Thrift method name, only used when thrift protocol enabled
std::string _thrift_method_name;
uint32_t _thrift_seq_id;
};
// Advises the RPC system that the caller desires that the RPC call be
......
......@@ -101,6 +101,54 @@ void ThriftFramedClosure::Run() {
if (_do_respond) {
// response uses request's head as default.
_response.head = _request.head;
if (_response.thrift_raw_instance) {
if (_controller.thrift_method_name() == "" ||
_controller.thrift_method_name().length() < 1 ||
_controller.thrift_method_name()[0] == ' ') {
_controller.SetFailed(ENOMETHOD,
"invalid thrift method name or method name empty!");
return;
}
auto out_buffer =
boost::make_shared<apache::thrift::transport::TMemoryBuffer>();
auto oprot =
boost::make_shared<apache::thrift::protocol::TBinaryProtocol>(out_buffer);
// The following code was taken and modified from thrift auto generated code
oprot->writeMessageBegin(_controller.thrift_method_name(),
::apache::thrift::protocol::T_REPLY, _controller.thrift_seq_id());
uint32_t xfer = 0;
xfer += oprot->writeStructBegin("placeholder");
xfer += oprot->writeFieldBegin("success",
::apache::thrift::protocol::T_STRUCT, 0);
if (_response.thrift_raw_instance && _response.thrift_raw_instance_writer) {
xfer += _response.thrift_raw_instance_writer(
_response.thrift_raw_instance, oprot.get());
} else {
_controller.SetFailed(ERESPONSE, "thrift_raw_instance or"
"thrift_raw_instance_writer is null!");
}
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
oprot->writeMessageEnd();
oprot->getTransport()->writeEnd();
oprot->getTransport()->flush();
// End thrfit auto generated code
butil::IOBuf buf;
buf.append(out_buffer->getBufferAsString());
_response.body =buf;
}
uint32_t length = _response.body.length();
_response.head.body_len = htonl(length);
......
......@@ -123,6 +123,9 @@ void ThriftBinaryMessage::SharedCtor() {
ThriftBinaryMessage::~ThriftBinaryMessage() {
SharedDtor();
if (thrift_raw_instance && thrift_raw_instance_deleter) {
thrift_raw_instance_deleter(thrift_raw_instance);
}
}
void ThriftBinaryMessage::SharedDtor() {
......
......@@ -17,6 +17,7 @@
#ifndef BRPC_THRIFT_BINARY_MESSAGE_H
#define BRPC_THRIFT_BINARY_MESSAGE_H
#include <functional>
#include <string>
#include <google/protobuf/stubs/common.h>
......@@ -30,9 +31,22 @@
#include "butil/iobuf.h" // IOBuf
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/transport/TBufferTransports.h>
namespace brpc {
template <typename T>
void thrift_framed_message_deleter(void* p) {
delete static_cast<T*>(p);
}
template <typename T>
uint32_t thrift_framed_message_writer(void* p, ::apache::thrift::protocol::TProtocol* prot) {
T* writer = static_cast<T*>(p);
return writer->write(prot);
}
// Internal implementation detail -- do not call these.
void protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto();
void protobuf_AssignDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto();
......@@ -43,6 +57,9 @@ class ThriftBinaryMessage : public ::google::protobuf::Message {
public:
thrift_binary_head_t head;
butil::IOBuf body;
std::function< void (void*) > thrift_raw_instance_deleter;
std::function<uint32_t (void*, ::apache::thrift::protocol::TProtocol*) > thrift_raw_instance_writer;
void* thrift_raw_instance;
public:
ThriftBinaryMessage();
......@@ -82,6 +99,85 @@ public:
virtual uint32_t write(::apache::thrift::protocol::TProtocol* oprot) { return 0;}
virtual uint32_t read(::apache::thrift::protocol::TProtocol* iprot) { return 0;}
template<typename T>
T* cast() {
thrift_raw_instance = new T;
// serilize binary thrift message to thrift struct request
// for response, we just return the new instance and deserialize it in Closure
if (body.size() > 0) {
auto in_buffer =
boost::make_shared<apache::thrift::transport::TMemoryBuffer>();
auto in_portocol =
boost::make_shared<apache::thrift::protocol::TBinaryProtocol>(in_buffer);
// Cut the thrift buffer and parse thrift message
size_t body_len = head.body_len;
auto thrift_buffer = static_cast<uint8_t*>(new uint8_t[body_len]);
const size_t k = body.copy_to(thrift_buffer, body_len);
if ( k != body_len) {
delete [] thrift_buffer;
return false;
}
in_buffer->resetBuffer(thrift_buffer, body_len);
// The following code was taken and modified from thrift auto generated code
int32_t rseqid = 0;
std::string fname;
::apache::thrift::protocol::TMessageType mtype;
in_portocol->readMessageBegin(fname, mtype, rseqid);
apache::thrift::protocol::TInputRecursionTracker tracker(*in_portocol);
uint32_t xfer = 0;
::apache::thrift::protocol::TType ftype;
int16_t fid;
xfer += in_portocol->readStructBegin(fname);
using ::apache::thrift::protocol::TProtocolException;
while (true)
{
xfer += in_portocol->readFieldBegin(fname, ftype, fid);
if (ftype == ::apache::thrift::protocol::T_STOP) {
break;
}
switch (fid)
{
case 1:
if (ftype == ::apache::thrift::protocol::T_STRUCT) {
xfer += static_cast<T*>(thrift_raw_instance)->read(in_portocol.get());
} else {
xfer += in_portocol->skip(ftype);
}
break;
default:
xfer += in_portocol->skip(ftype);
break;
}
xfer += in_portocol->readFieldEnd();
}
xfer += in_portocol->readStructEnd();
in_portocol->readMessageEnd();
in_portocol->getTransport()->readEnd();
// End thrfit auto generated code
delete [] thrift_buffer;
}
thrift_raw_instance_deleter = &thrift_framed_message_deleter<T>;
thrift_raw_instance_writer = &thrift_framed_message_writer<T>;
return static_cast<T*>(thrift_raw_instance);
}
private:
void SharedCtor();
void SharedDtor();
......@@ -90,7 +186,7 @@ friend void protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto_impl(
friend void protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto();
friend void protobuf_AssignDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto();
friend void protobuf_ShutdownFile_baidu_2frpc_2fthrift_binary_5fmessage_2eproto();
void InitAsDefaultInstance();
static ThriftBinaryMessage* default_instance_;
};
......@@ -99,11 +195,17 @@ template <typename T>
class ThriftMessage : public ThriftBinaryMessage {
public:
ThriftMessage(T* thrift_message) {
thrift_message_ = thrift_message;
ThriftMessage() {
thrift_message_ = new T;
assert(thrift_message_ != nullptr);
}
virtual ~ThriftMessage() {}
virtual ~ThriftMessage() { delete thrift_message_; }
ThriftMessage<T>& operator= (const ThriftMessage<T>& other) {
*thrift_message_ = *(other.thrift_message_);
return *this;
}
virtual uint32_t write(::apache::thrift::protocol::TProtocol* oprot) {
return thrift_message_->write(oprot);
......@@ -113,6 +215,10 @@ public:
return thrift_message_->read(iprot);
}
T& raw(){
return *thrift_message_;
}
private:
T* thrift_message_;
};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment