Commit 25b62852 authored by wangxuefeng's avatar wangxuefeng

Seperate libbrpc and librpc_thrift

parent 2cafaf51
......@@ -164,7 +164,9 @@ JSON2PB_SOURCES = $(foreach d,$(JSON2PB_DIRS),$(wildcard $(addprefix $(d)/*,$(SR
JSON2PB_OBJS = $(addsuffix .o, $(basename $(JSON2PB_SOURCES)))
BRPC_DIRS = src/brpc src/brpc/details src/brpc/builtin src/brpc/policy
BRPC_SOURCES = $(foreach d,$(BRPC_DIRS),$(wildcard $(addprefix $(d)/*,$(SRCEXTS))))
THRIFT_SOURCES = $(foreach d,$(BRPC_DIRS),$(wildcard $(addprefix $(d)/thrift*,$(SRCEXTS))))
BRPC_SOURCES_ALL = $(foreach d,$(BRPC_DIRS),$(wildcard $(addprefix $(d)/*,$(SRCEXTS))))
BRPC_SOURCES = $(filter-out $(THRIFT_SOURCES), $(BRPC_SOURCES_ALL))
BRPC_PROTOS = $(filter %.proto,$(BRPC_SOURCES))
BRPC_CFAMILIES = $(filter-out %.proto %.pb.cc,$(BRPC_SOURCES))
BRPC_OBJS = $(BRPC_PROTOS:.proto=.pb.o) $(addsuffix .o, $(basename $(BRPC_CFAMILIES)))
......@@ -176,6 +178,10 @@ MCPACK2PB_SOURCES = \
src/mcpack2pb/serializer.cpp
MCPACK2PB_OBJS = src/idl_options.pb.o $(addsuffix .o, $(basename $(MCPACK2PB_SOURCES)))
ifeq (ENABLE_THRIFT_FRAMED_PROTOCOL, $(findstring ENABLE_THRIFT_FRAMED_PROTOCOL, $(CPPFLAGS)))
THRIFT_OBJS = $(addsuffix .o, $(basename $(THRIFT_SOURCES)))
endif
OBJS=$(BUTIL_OBJS) $(BVAR_OBJS) $(BTHREAD_OBJS) $(JSON2PB_OBJS) $(MCPACK2PB_OBJS) $(BRPC_OBJS)
BVAR_DEBUG_OBJS=$(BUTIL_OBJS:.o=.dbg.o) $(BVAR_OBJS:.o=.dbg.o)
......@@ -184,7 +190,7 @@ DEBUG_OBJS = $(OBJS:.o=.dbg.o)
PROTOS=$(BRPC_PROTOS) src/idl_options.proto
.PHONY:all
all: protoc-gen-mcpack libbrpc.a libbrpc.so output/include output/lib output/bin
all: protoc-gen-mcpack libbrpc.a libbrpc.so libbrpc_thrift.a output/include output/lib output/bin
.PHONY:debug
debug: test/libbrpc.dbg.a test/libbvar.dbg.a
......@@ -192,7 +198,7 @@ debug: test/libbrpc.dbg.a test/libbvar.dbg.a
.PHONY:clean
clean:
@echo "Cleaning"
@rm -rf src/mcpack2pb/generator.o protoc-gen-mcpack libbrpc.a libbrpc.so $(OBJS) output/include output/lib output/bin $(PROTOS:.proto=.pb.h) $(PROTOS:.proto=.pb.cc)
@rm -rf src/mcpack2pb/generator.o protoc-gen-mcpack libbrpc.a libbrpc_thrift.a libbrpc.so $(OBJS) output/include output/lib output/bin $(PROTOS:.proto=.pb.h) $(PROTOS:.proto=.pb.cc)
.PHONY:clean_debug
clean_debug:
......@@ -221,6 +227,10 @@ test/libbrpc.dbg.a:$(BRPC_PROTOS:.proto=.pb.h) $(DEBUG_OBJS)
@echo "Packing $@"
@ar crs $@ $(filter %.o,$^)
libbrpc_thrift.a:$(THRIFT_OBJS)
@echo "Packing $@"
@ar crs $@ $(filter %.o,$^)
.PHONY:output/include
output/include:
@echo "Copying to $@"
......@@ -229,7 +239,7 @@ output/include:
@cp src/idl_options.proto src/idl_options.pb.h $@
.PHONY:output/lib
output/lib:libbrpc.a libbrpc.so
output/lib:libbrpc.a libbrpc.so libbrpc_thrift.a
@echo "Copying to $@"
@mkdir -p $@
@cp $^ $@
......
......@@ -11,7 +11,7 @@ LIBPATHS = $(addprefix -L, $(LIBS))
COMMA=,
SOPATHS=$(addprefix -Wl$(COMMA)-rpath=, $(LIBS))
STATIC_LINKINGS += -lbrpc -lthrift -lgflags
STATIC_LINKINGS += -lbrpc -lthrift -lgflags -Wl,--whole-archive -lbrpc_thrift -Wl,--no-whole-archive -levent
CLIENT_SOURCES = client.cpp
SERVER_SOURCES = server.cpp
......@@ -32,19 +32,11 @@ clean:
echo_client:$(PROTO_OBJS) $(CLIENT_OBJS) libechothrift.a
@echo "Linking $@"
ifneq ("$(LINK_SO)", "")
@$(CXX) $(LIBPATHS) $(SOPATHS) -Xlinker "-(" $^ -Xlinker "-)" $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS) -o $@
else
@$(CXX) $(LIBPATHS) -Xlinker "-(" $^ -Wl,-Bstatic $(STATIC_LINKINGS) -Wl,-Bdynamic -Xlinker "-)" $(DYNAMIC_LINKINGS) -o $@
endif
echo_server:$(PROTO_OBJS) $(SERVER_OBJS) libechothrift.a
@echo "Linking $@"
ifneq ("$(LINK_SO)", "")
@$(CXX) $(LIBPATHS) $(SOPATHS) -Xlinker "-(" $^ libechothrift.a -Xlinker "-)" $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS) -o $@
else
@$(CXX) $(LIBPATHS) -Xlinker "-(" $^ libechothrift.a -Wl,-Bstatic $(STATIC_LINKINGS) -Wl,-Bdynamic -Xlinker "-)" $(DYNAMIC_LINKINGS) -o $@
endif
%.o:%.cpp libechothrift.a
@echo "Compiling $@"
......@@ -63,7 +55,7 @@ libechothrift.a:
@ar -crv libechothrift.a EchoService.o echo_types.o
native_server: libechothrift.a
@$(CXX) native_server.cpp gen-cpp/echo_types.cpp gen-cpp/EchoService.cpp $(HDRPATHS) $(LIBPATHS) $(CXXFLAGS) $(STATIC_LINKINGS) -lthriftnb -lthrift -levent -lpthread -o native_server
@$(CXX) native_server.cpp gen-cpp/echo_types.cpp gen-cpp/EchoService.cpp $(HDRPATHS) $(LIBPATHS) $(CXXFLAGS) -lthriftnb -lthrift -levent -lpthread -lgflags -lbrpc -lbrpc_thrift -o native_server
native_client: libechothrift.a
@$(CXX) native_client.cpp gen-cpp/echo_types.cpp gen-cpp/EchoService.cpp $(HDRPATHS) $(LIBPATHS) $(CXXFLAGS) $(STATIC_LINKINGS) -lthriftnb -lthrift -levent -lpthread -o native_client
@$(CXX) native_client.cpp gen-cpp/echo_types.cpp gen-cpp/EchoService.cpp $(HDRPATHS) $(LIBPATHS) $(CXXFLAGS) -lthriftnb -lthrift -levent -lpthread -lgflags -lbrpc -lbrpc_thrift -o native_client
......@@ -96,6 +96,8 @@ using namespace policy;
const char* const DUMMY_SERVER_PORT_FILE = "dummy_server.port";
void __attribute__((weak)) RegisterThriftFramedProtocol();
struct GlobalExtensions {
GlobalExtensions()
: ch_mh_lb(MurmurHash32)
......@@ -433,17 +435,6 @@ static void GlobalInitializeOrDieImpl() {
exit(1);
}
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
Protocol thrift_binary_protocol = { ParseThriftFramedMessage,
SerializeThriftFramedRequest, PackThriftFramedRequest,
ProcessThriftFramedRequest, ProcessThriftFramedResponse,
VerifyThriftFramedRequest, NULL, NULL,
CONNECTION_TYPE_POOLED_AND_SHORT, "thrift" };
if (RegisterProtocol(PROTOCOL_THRIFT, thrift_binary_protocol) != 0) {
exit(1);
}
#endif //ENABLE_THRIFT_FRAMED_PROTOCOL
Protocol mc_binary_protocol = { ParseMemcacheMessage,
SerializeMemcacheRequest,
PackMemcacheRequest,
......@@ -473,6 +464,12 @@ static void GlobalInitializeOrDieImpl() {
exit(1);
}
// Register Thrift framed protocol if linked
if (brpc::RegisterThriftFramedProtocol) {
brpc::RegisterThriftFramedProtocol();
}
// Only valid at client side
Protocol ubrpc_compack_protocol = {
ParseNsheadMessage,
......
......@@ -533,7 +533,7 @@ void SerializeThriftFramedRequest(butil::IOBuf* request_buf, Controller* cntl,
ControllerPrivateAccessor accessor(cntl);
const ThriftFramedMessage* req = (const ThriftFramedMessage*)req_base;
thrift_binary_head_t head = req->head;
auto out_buffer =
......@@ -623,6 +623,19 @@ void PackThriftFramedRequest(
}
} // namespace policy
void RegisterThriftFramedProtocol() {
Protocol thrift_binary_protocol = {policy::ParseThriftFramedMessage,
policy::SerializeThriftFramedRequest, policy::PackThriftFramedRequest,
policy::ProcessThriftFramedRequest, policy::ProcessThriftFramedResponse,
policy::VerifyThriftFramedRequest, NULL, NULL,
CONNECTION_TYPE_POOLED_AND_SHORT, "thrift" };
if (RegisterProtocol(PROTOCOL_THRIFT, thrift_binary_protocol) != 0) {
exit(1);
}
}
} // namespace brpc
#endif
......@@ -20,7 +20,6 @@
#define BRPC_POLICY_THRIFT_PROTOCOL_H
#include "brpc/protocol.h"
#include "brpc/thrift_framed_message.h"
namespace brpc {
namespace policy {
......
......@@ -22,6 +22,9 @@
#include <algorithm>
#include "butil/logging.h"
#include <brpc/protocol.h> // RegisterProtocol
#include <brpc/policy/thrift_protocol.h>
#include <google/protobuf/stubs/once.h>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/wire_format_lite_inl.h>
......@@ -121,9 +124,11 @@ ThriftFramedMessage::ThriftFramedMessage(const ThriftFramedMessage& from)
void ThriftFramedMessage::SharedCtor() {
memset(&head, 0, sizeof(head));
thrift_raw_instance_deleter = nullptr;
thrift_raw_instance = nullptr;
thrift_message_seq_id = 0;
method_name = "";
method_name = "";
//RegisterThriftFramedProtocolDummy dummy;
}
ThriftFramedMessage::~ThriftFramedMessage() {
......
......@@ -22,8 +22,6 @@
#include <functional>
#include <string>
#include <boost/make_shared.hpp>
#include <google/protobuf/stubs/common.h>
#include <google/protobuf/generated_message_util.h>
#include <google/protobuf/repeated_field.h>
......@@ -33,24 +31,12 @@
#include "brpc/thrift_binary_head.h" // thrfit_binary_head_t
#include "butil/iobuf.h" // IOBuf
#include "butil/thrift_utils.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/transport/TBufferTransports.h>
namespace brpc {
template <typename T>
void thrift_framed_message_deleter(void* p) {
delete static_cast<T*>(p);
}
template <typename T>
uint32_t thrift_framed_message_writer(void* p, ::apache::thrift::protocol::TProtocol* prot) {
T* writer = static_cast<T*>(p);
return writer->write(prot);
}
// Internal implementation detail -- do not call these.
void protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto();
void protobuf_AssignDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto();
......@@ -62,7 +48,7 @@ public:
thrift_binary_head_t head;
butil::IOBuf body;
void (*thrift_raw_instance_deleter) (void*);
uint32_t (*thrift_raw_instance_writer) (void*, ::apache::thrift::protocol::TProtocol*);
uint32_t (*thrift_raw_instance_writer) (void*, void*);
void* thrift_raw_instance;
int32_t thrift_message_seq_id;
......@@ -103,76 +89,24 @@ public:
int GetCachedSize() const { return ByteSize(); }
::google::protobuf::Metadata GetMetadata() const;
virtual uint32_t write(::apache::thrift::protocol::TProtocol* oprot) { return 0;}
virtual uint32_t read(::apache::thrift::protocol::TProtocol* iprot) { return 0;}
virtual uint32_t write(void* oprot) { return 0;}
virtual uint32_t read(void* iprot) { return 0;}
template<typename T>
T* cast() {
thrift_raw_instance = new T;
assert(thrift_raw_instance);
// serilize binary thrift message to thrift struct request
// for response, we just return the new instance and deserialize it in Closure
if (body.size() > 0) {
auto in_buffer =
boost::make_shared<apache::thrift::transport::TMemoryBuffer>();
auto in_portocol =
boost::make_shared<apache::thrift::protocol::TBinaryProtocol>(in_buffer);
// Cut the thrift buffer and parse thrift message
size_t body_len = head.body_len;
std::unique_ptr<uint8_t[]>thrift_buffer(new uint8_t[body_len]);
const size_t k = body.copy_to(thrift_buffer.get(), body_len);
if ( k != body_len) {
return false;
}
in_buffer->resetBuffer(thrift_buffer.get(), body_len);
// The following code was taken and modified from thrift auto generated code
std::string fname;
::apache::thrift::protocol::TMessageType mtype;
in_portocol->readMessageBegin(method_name, mtype, thrift_message_seq_id);
apache::thrift::protocol::TInputRecursionTracker tracker(*in_portocol);
uint32_t xfer = 0;
::apache::thrift::protocol::TType ftype;
int16_t fid;
xfer += in_portocol->readStructBegin(fname);
using ::apache::thrift::protocol::TProtocolException;
while (true)
{
xfer += in_portocol->readFieldBegin(fname, ftype, fid);
if (ftype == ::apache::thrift::protocol::T_STOP) {
break;
}
switch (fid)
{
case 1:
if (ftype == ::apache::thrift::protocol::T_STRUCT) {
xfer += static_cast<T*>(thrift_raw_instance)->read(in_portocol.get());
} else {
xfer += in_portocol->skip(ftype);
}
break;
default:
xfer += in_portocol->skip(ftype);
break;
}
xfer += in_portocol->readFieldEnd();
if (body.size() > 0 ) {
if (serialize_iobuf_to_thrift_message<T>(body, thrift_raw_instance,
&method_name, &thrift_message_seq_id)) {
} else {
delete static_cast<T*>(thrift_raw_instance);
return nullptr;
}
xfer += in_portocol->readStructEnd();
in_portocol->readMessageEnd();
in_portocol->getTransport()->readEnd();
// End thrfit auto generated code
}
thrift_raw_instance_deleter = &thrift_framed_message_deleter<T>;
......@@ -209,15 +143,15 @@ public:
return *this;
}
virtual uint32_t write(::apache::thrift::protocol::TProtocol* oprot) {
return thrift_message_->write(oprot);
virtual uint32_t write(void* oprot) {
return thrift_message_->write(static_cast<::apache::thrift::protocol::TProtocol*>(oprot));
}
virtual uint32_t read(::apache::thrift::protocol::TProtocol* iprot) {
return thrift_message_->read(iprot);
virtual uint32_t read(void* iprot) {
return thrift_message_->read(static_cast<::apache::thrift::protocol::TProtocol*>(iprot));
}
T& raw(){
T& raw() {
return *thrift_message_;
}
......@@ -227,6 +161,6 @@ private:
} // namespace brpc
#endif // BRPC_THRIFT_FRAMED_MESSAGE_H
#endif // BRPC_THRIFT_FRAMED_MESSAGE_H
#endif
#endif //ENABLE_THRIFT_FRAMED_PROTOCOL
......@@ -19,7 +19,6 @@
#include "brpc/thrift_service.h"
#include "brpc/details/method_status.h"
namespace brpc {
BAIDU_CASSERT(sizeof(thrift_binary_head_t) == 4, sizeof_thrift_must_be_4);
......@@ -34,7 +33,7 @@ ThriftFramedService::ThriftFramedService(const ThriftFramedServiceOptions& optio
if (options.generate_status) {
_status = new (std::nothrow) MethodStatus;
LOG_IF(FATAL, _status == NULL) << "Fail to new MethodStatus";
}
}
}
ThriftFramedService::~ThriftFramedService() {
......
......@@ -14,13 +14,14 @@
// utils for serilize/deserilize thrift binary message to brpc protobuf obj.
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
#ifndef BRPC_THRIFT_UTILS_H
#define BRPC_THRIFT_UTILS_H
#include <boost/make_shared.hpp>
#include <brpc/channel.h>
#include <brpc/thrift_framed_message.h>
#include <thrift/TDispatchProcessor.h>
#include <thrift/transport/TBufferTransports.h>
......@@ -28,46 +29,85 @@
namespace brpc {
bool brpc_thrift_server_helper(const brpc::ThriftFramedMessage* request,
brpc::ThriftFramedMessage* response,
boost::shared_ptr<::apache::thrift::TDispatchProcessor> processor) {
template <typename T>
void thrift_framed_message_deleter(void* p) {
delete static_cast<T*>(p);
}
template <typename T>
uint32_t thrift_framed_message_writer(void* p, void* prot) {
T* writer = static_cast<T*>(p);
return writer->write(static_cast<::apache::thrift::protocol::TProtocol*>(prot));
}
template<typename T>
bool serialize_iobuf_to_thrift_message(butil::IOBuf& body,
void* thrift_raw_instance, std::string* method_name, int32_t* thrift_message_seq_id) {
auto in_buffer =
boost::make_shared<apache::thrift::transport::TMemoryBuffer>();
auto in_portocol =
boost::make_shared<apache::thrift::protocol::TBinaryProtocol>(in_buffer);
auto out_buffer =
boost::make_shared<apache::thrift::transport::TMemoryBuffer>();
auto out_portocol =
boost::make_shared<apache::thrift::protocol::TBinaryProtocol>(out_buffer);
// Cut the thrift buffer and parse thrift message
size_t body_len = request->head.body_len;
auto thrift_buffer = static_cast<uint8_t*>(new uint8_t[body_len]);
const size_t k = request->body.copy_to(thrift_buffer, body_len);
size_t body_len = body.size();
std::unique_ptr<uint8_t[]> thrift_buffer(new uint8_t[body_len]);
const size_t k = body.copy_to(thrift_buffer.get(), body_len);
if ( k != body_len) {
delete [] thrift_buffer;
return false;
}
in_buffer->resetBuffer(thrift_buffer, body_len);
if (processor->process(in_portocol, out_portocol, NULL)) {
uint8_t* buf;
uint32_t sz;
out_buffer->getBuffer(&buf, &sz);
response->body.append(buf, sz);
} else {
delete [] thrift_buffer;
return false;
in_buffer->resetBuffer(thrift_buffer.get(), body_len);
// The following code was taken and modified from thrift auto generated code
std::string fname;
::apache::thrift::protocol::TMessageType mtype;
in_portocol->readMessageBegin(*method_name, mtype, *thrift_message_seq_id);
apache::thrift::protocol::TInputRecursionTracker tracker(*in_portocol);
uint32_t xfer = 0;
::apache::thrift::protocol::TType ftype;
int16_t fid;
xfer += in_portocol->readStructBegin(fname);
using ::apache::thrift::protocol::TProtocolException;
while (true)
{
xfer += in_portocol->readFieldBegin(fname, ftype, fid);
if (ftype == ::apache::thrift::protocol::T_STOP) {
break;
}
switch (fid)
{
case 1:
if (ftype == ::apache::thrift::protocol::T_STRUCT) {
xfer += static_cast<T*>(thrift_raw_instance)->read(in_portocol.get());
} else {
xfer += in_portocol->skip(ftype);
}
break;
default:
xfer += in_portocol->skip(ftype);
break;
}
xfer += in_portocol->readFieldEnd();
}
delete [] thrift_buffer;
xfer += in_portocol->readStructEnd();
in_portocol->readMessageEnd();
in_portocol->getTransport()->readEnd();
// End thrift auto generated code
return true;
}
}
#endif //BRPC_THRIFT_UTILS_H
#endif //ENABLE_THRIFT_FRAMED_PROTOCOL
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment