Unverified Commit 026c5610 authored by Ge Jun's avatar Ge Jun Committed by GitHub

Merge pull request #98 from kenshinxf/kenshinxf

Add thrift protocol for brpc.
parents a3de73f9 bd4cb578
......@@ -17,9 +17,10 @@ before_script:
before_install:
- wget --no-clobber https://github.com/bazelbuild/bazel/releases/download/0.8.1/bazel_0.8.1-linux-x86_64.deb
- sudo dpkg -i bazel_0.8.1-linux-x86_64.deb
- wget http://www.us.apache.org/dist/thrift/0.9.3/thrift-0.9.3.tar.gz && tar -xf thrift-0.9.3.tar.gz && cd thrift-0.9.3/ && ./configure --prefix=/usr --with-ruby=no --with-python=no --with-java=no --with-go=no --with-perl=no --with-php=no --with-csharp=no --with-erlang=no --with-lua=no --with-nodejs=no && make -j 3 -s && sudo make install && cd -
install:
- sudo apt-get install -qq realpath libgflags-dev libprotobuf-dev libprotoc-dev protobuf-compiler libleveldb-dev libgoogle-perftools-dev
- sudo apt-get install -qq realpath libgflags-dev libprotobuf-dev libprotoc-dev protobuf-compiler libleveldb-dev libgoogle-perftools-dev libboost-dev libssl-dev libevent-dev libboost-test-dev
- sudo apt-get install libgtest-dev && cd /usr/src/gtest && sudo env "PATH=$PATH" cmake . && sudo make && sudo mv libgtest* /usr/lib/ && cd -
- sudo apt-get install -y gdb # install gdb
......
......@@ -23,6 +23,7 @@ endif()
option(BRPC_WITH_GLOG "With glog" OFF)
option(DEBUG "Print debug logs" OFF)
option(WITH_DEBUG_SYMBOLS "With debug symbols" ON)
option(BRPC_WITH_THRIFT "With thrift framed protocol supported" OFF)
option(BUILD_UNIT_TESTS "Whether to build unit tests" OFF)
set(WITH_GLOG_VAL "0")
......@@ -34,6 +35,12 @@ if(WITH_DEBUG_SYMBOLS)
set(DEBUG_SYMBOL "-g")
endif()
if(BRPC_WITH_THRIFT)
set(THRIFT_CPP_FLAG "-DENABLE_THRIFT_FRAMED_PROTOCOL")
set(THRIFT_LIB "thriftnb")
message("Enable thrift framed procotol")
endif()
include(GNUInstallDirs)
configure_file(${CMAKE_SOURCE_DIR}/config.h.in ${CMAKE_SOURCE_DIR}/src/butil/config.h @ONLY)
......@@ -73,7 +80,7 @@ endif()
set(CMAKE_CPP_FLAGS "${DEFINE_CLOCK_GETTIME} -DBRPC_WITH_GLOG=${WITH_GLOG_VAL} -DGFLAGS_NS=${GFLAGS_NS}")
set(CMAKE_CPP_FLAGS "${CMAKE_CPP_FLAGS} -DBTHREAD_USE_FAST_PTHREAD_MUTEX -D__const__= -D_GNU_SOURCE -DUSE_SYMBOLIZE -DNO_TCMALLOC -D__STDC_FORMAT_MACROS -D__STDC_LIMIT_MACROS -D__STDC_CONSTANT_MACROS -DBRPC_REVISION=\\\"${BRPC_REVISION}\\\" -D__STRICT_ANSI__")
set(CMAKE_CPP_FLAGS "${CMAKE_CPP_FLAGS} ${DEBUG_SYMBOL}")
set(CMAKE_CPP_FLAGS "${CMAKE_CPP_FLAGS} ${DEBUG_SYMBOL} ${THRIFT_CPP_FLAG}")
set(CMAKE_CXX_FLAGS "${CMAKE_CPP_FLAGS} -O2 -pipe -Wall -W -fPIC -fstrict-aliasing -Wno-invalid-offsetof -Wno-unused-parameter -fno-omit-frame-pointer")
set(CMAKE_C_FLAGS "${CMAKE_CPP_FLAGS} -O2 -pipe -Wall -W -fPIC -fstrict-aliasing -Wno-unused-parameter -fno-omit-frame-pointer")
......@@ -150,6 +157,8 @@ set(DYNAMIC_LIB
${LEVELDB_LIB}
${PROTOC_LIB}
${CMAKE_THREAD_LIBS_INIT}
${THRIFT_LIB}
rt
${SSL_LIB}
${CRYPTO_LIB}
dl
......
......@@ -177,7 +177,9 @@ JSON2PB_SOURCES = $(foreach d,$(JSON2PB_DIRS),$(wildcard $(addprefix $(d)/*,$(SR
JSON2PB_OBJS = $(addsuffix .o, $(basename $(JSON2PB_SOURCES)))
BRPC_DIRS = src/brpc src/brpc/details src/brpc/builtin src/brpc/policy
BRPC_SOURCES = $(foreach d,$(BRPC_DIRS),$(wildcard $(addprefix $(d)/*,$(SRCEXTS))))
THRIFT_SOURCES = $(foreach d,$(BRPC_DIRS),$(wildcard $(addprefix $(d)/thrift*,$(SRCEXTS))))
BRPC_SOURCES_ALL = $(foreach d,$(BRPC_DIRS),$(wildcard $(addprefix $(d)/*,$(SRCEXTS))))
BRPC_SOURCES = $(filter-out $(THRIFT_SOURCES), $(BRPC_SOURCES_ALL))
BRPC_PROTOS = $(filter %.proto,$(BRPC_SOURCES))
BRPC_CFAMILIES = $(filter-out %.proto %.pb.cc,$(BRPC_SOURCES))
BRPC_OBJS = $(BRPC_PROTOS:.proto=.pb.o) $(addsuffix .o, $(basename $(BRPC_CFAMILIES)))
......@@ -189,6 +191,10 @@ MCPACK2PB_SOURCES = \
src/mcpack2pb/serializer.cpp
MCPACK2PB_OBJS = src/idl_options.pb.o $(addsuffix .o, $(basename $(MCPACK2PB_SOURCES)))
ifeq (ENABLE_THRIFT_FRAMED_PROTOCOL, $(findstring ENABLE_THRIFT_FRAMED_PROTOCOL, $(CPPFLAGS)))
THRIFT_OBJS = $(addsuffix .o, $(basename $(THRIFT_SOURCES)))
endif
OBJS=$(BUTIL_OBJS) $(BVAR_OBJS) $(BTHREAD_OBJS) $(JSON2PB_OBJS) $(MCPACK2PB_OBJS) $(BRPC_OBJS)
BVAR_DEBUG_OBJS=$(BUTIL_OBJS:.o=.dbg.o) $(BVAR_OBJS:.o=.dbg.o)
......@@ -197,7 +203,7 @@ DEBUG_OBJS = $(OBJS:.o=.dbg.o)
PROTOS=$(BRPC_PROTOS) src/idl_options.proto
.PHONY:all
all: protoc-gen-mcpack libbrpc.a $(TARGET_LIB_DY) output/include output/lib output/bin
all: protoc-gen-mcpack libbrpc.a $(TARGET_LIB_DY) libbrpc_thrift.a output/include output/lib output/bin
.PHONY:debug
debug: test/libbrpc.dbg.a test/libbvar.dbg.a
......@@ -205,7 +211,7 @@ debug: test/libbrpc.dbg.a test/libbvar.dbg.a
.PHONY:clean
clean:
@echo "Cleaning"
@rm -rf src/mcpack2pb/generator.o protoc-gen-mcpack libbrpc.a $(TARGET_LIB_DY) $(OBJS) output/include output/lib output/bin $(PROTOS:.proto=.pb.h) $(PROTOS:.proto=.pb.cc)
@rm -rf src/mcpack2pb/generator.o protoc-gen-mcpack libbrpc.a $(TARGET_LIB_DY) libbrpc_thrift.a $(OBJS) output/include output/lib output/bin $(PROTOS:.proto=.pb.h) $(PROTOS:.proto=.pb.cc)
.PHONY:clean_debug
clean_debug:
......@@ -242,6 +248,10 @@ test/libbrpc.dbg.a:$(BRPC_PROTOS:.proto=.pb.h) $(DEBUG_OBJS)
@echo "Packing $@"
@ar crs $@ $(filter %.o,$^)
libbrpc_thrift.a:$(THRIFT_OBJS)
@echo "Packing $@"
@ar crs $@ $(filter %.o,$^)
.PHONY:output/include
output/include:
@echo "Copying to $@"
......@@ -250,7 +260,7 @@ output/include:
@cp src/idl_options.proto src/idl_options.pb.h $@
.PHONY:output/lib
output/lib:libbrpc.a $(TARGET_LIB_DY)
output/lib:libbrpc.a $(TARGET_LIB_DY) libbrpc_thrift.a
@echo "Copying to $@"
@mkdir -p $@
@cp $^ $@
......
......@@ -17,8 +17,9 @@ else
LDD=ldd
fi
TEMP=`getopt -o v: --long headers:,libs:,cc:,cxx:,with-glog,nodebugsymbols -n 'config_brpc' -- "$@"`
TEMP=`getopt -o v: --long headers:,libs:,cc:,cxx:,with-glog,with-thrift,nodebugsymbols -n 'config_brpc' -- "$@"`
WITH_GLOG=0
WITH_THRIFT=0
DEBUGSYMBOLS=-g
if [ $? != 0 ] ; then >&2 $ECHO "Terminating..."; exit 1 ; fi
......@@ -34,6 +35,7 @@ while true; do
--cc ) CC=$2; shift 2 ;;
--cxx ) CXX=$2; shift 2 ;;
--with-glog ) WITH_GLOG=1; shift 1 ;;
--with-thrift) WITH_THRIFT=1; shift 1 ;;
--nodebugsymbols ) DEBUGSYMBOLS=; shift 1 ;;
-- ) shift; break ;;
* ) break ;;
......@@ -128,18 +130,18 @@ OPENSSL_HDR=$(find_dir_of_header_or_die openssl/ssl.h)
STATIC_LINKINGS=
DYNAMIC_LINKINGS="-lpthread -lssl -lcrypto -ldl -lz"
if [ "$SYSTEM" = "Linux" ]; then
DYNAMIC_LINKINGS+=" -lrt"
DYNAMIC_LINKINGS="$DYNAMIC_LINKINGS -lrt"
fi
if [ "$SYSTEM" = "Darwin" ]; then
DYNAMIC_LINKINGS+=" -framework CoreFoundation"
DYNAMIC_LINKINGS+=" -framework CoreGraphics"
DYNAMIC_LINKINGS+=" -framework CoreData"
DYNAMIC_LINKINGS+=" -framework CoreText"
DYNAMIC_LINKINGS+=" -framework Security"
DYNAMIC_LINKINGS+=" -framework Foundation"
DYNAMIC_LINKINGS+=" -Wl,-U,_MallocExtension_ReleaseFreeMemory"
DYNAMIC_LINKINGS+=" -Wl,-U,_ProfilerStart"
DYNAMIC_LINKINGS+=" -Wl,-U,_ProfilerStop"
DYNAMIC_LINKINGS="$DYNAMIC_LINKINGS -framework CoreFoundation"
DYNAMIC_LINKINGS="$DYNAMIC_LINKINGS -framework CoreGraphics"
DYNAMIC_LINKINGS="$DYNAMIC_LINKINGS -framework CoreData"
DYNAMIC_LINKINGS="$DYNAMIC_LINKINGS -framework CoreText"
DYNAMIC_LINKINGS="$DYNAMIC_LINKINGS -framework Security"
DYNAMIC_LINKINGS="$DYNAMIC_LINKINGS -framework Foundation"
DYNAMIC_LINKINGS="$DYNAMIC_LINKINGS -Wl,-U,_MallocExtension_ReleaseFreeMemory"
DYNAMIC_LINKINGS="$DYNAMIC_LINKINGS -Wl,-U,_ProfilerStart"
DYNAMIC_LINKINGS="$DYNAMIC_LINKINGS -Wl,-U,_ProfilerStop"
fi
append_linking() {
if [ -f $1/lib${2}.a ]; then
......@@ -275,6 +277,22 @@ if [ "$SYSTEM" = "Darwin" ]; then
CPPFLAGS="${CPPFLAGS} -DNO_CLOCK_GETTIME_IN_MAC"
fi
fi
if [ $WITH_THRIFT != 0 ]; then
THRIFT_LIB=$(find_dir_of_lib_or_die thriftnb)
THRIFT_HDR=$(find_dir_of_header_or_die thrift/Thrift.h)
append_to_output_libs "$THRIFT_LIB"
append_to_output_headers "$THRIFT_HDR"
CPPFLAGS="${CPPFLAGS} -DENABLE_THRIFT_FRAMED_PROTOCOL"
if [ -f "$THRIFT_LIB/libthriftnb.$SO" ]; then
append_to_output "DYNAMIC_LINKINGS+=-lthriftnb -levent -lthrift"
else
append_to_output "STATIC_LINKINGS+=-lthriftnb"
fi
fi
append_to_output "CPPFLAGS=${CPPFLAGS}"
append_to_output "ifeq (\$(NEED_LIBPROTOC), 1)"
......
BRPC_PATH = ../../
include $(BRPC_PATH)/config.mk
# Notes on the flags:
# 1. Added -fno-omit-frame-pointer: perf/tcmalloc-profiler use frame pointers by default
# 2. Added -D__const__= : Avoid over-optimizations of TLS variables by GCC>=4.8
CXXFLAGS = -std=c++0x -g -DENABLE_THRIFT_FRAMED_PROTOCOL -DDEBUG -D__const__= -pipe -W -Wall -Werror -Wno-unused-parameter -fPIC -fno-omit-frame-pointer
HDRS+=$(BRPC_PATH)/output/include
LIBS+=$(BRPC_PATH)/output/lib
HDRPATHS = $(addprefix -I, $(HDRS))
LIBPATHS = $(addprefix -L, $(LIBS))
COMMA=,
SOPATHS=$(addprefix -Wl$(COMMA)-rpath=, $(LIBS))
STATIC_LINKINGS += -lbrpc -lthrift -lgflags -Wl,--whole-archive -lbrpc_thrift -Wl,--no-whole-archive -levent
CLIENT_SOURCES = client.cpp
SERVER_SOURCES = server.cpp
PROTOS = $(wildcard *.proto)
PROTO_OBJS = $(PROTOS:.proto=.pb.o)
PROTO_GENS = $(PROTOS:.proto=.pb.h) $(PROTOS:.proto=.pb.cc)
CLIENT_OBJS = $(addsuffix .o, $(basename $(CLIENT_SOURCES)))
SERVER_OBJS = $(addsuffix .o, $(basename $(SERVER_SOURCES)))
.PHONY:all
all: echo_client echo_server native_server native_client libechothrift.a client.o server.o
.PHONY:clean
clean:
@echo "Cleaning"
@rm -rf echo_client echo_server $(PROTO_GENS) $(PROTO_OBJS) $(CLIENT_OBJS) $(SERVER_OBJS) native_server native_client EchoService.o echo_types.o libechothrift.a gen-cpp gen-py
echo_client:$(PROTO_OBJS) $(CLIENT_OBJS) libechothrift.a
@echo "Linking $@"
@$(CXX) $(LIBPATHS) $(SOPATHS) -Xlinker "-(" $^ -Xlinker "-)" $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS) -o $@
echo_server:$(PROTO_OBJS) $(SERVER_OBJS) libechothrift.a
@echo "Linking $@"
@$(CXX) $(LIBPATHS) $(SOPATHS) -Xlinker "-(" $^ libechothrift.a -Xlinker "-)" $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS) -o $@
%.o:%.cpp libechothrift.a
@echo "Compiling $@"
@$(CXX) -c $(HDRPATHS) $(CXXFLAGS) $< -o $@
%.o:%.cc libechothrift.a
@echo "Compiling $@"
@$(CXX) -c $(HDRPATHS) $(CXXFLAGS) $< -o $@
libechothrift.a:
@echo "Generating thrift files"
@thrift --gen cpp echo.thrift
@thrift --gen py echo.thrift
@$(CXX) -c gen-cpp/echo_types.cpp -o echo_types.o
@$(CXX) -c gen-cpp/EchoService.cpp -o EchoService.o
@ar -crv libechothrift.a EchoService.o echo_types.o
native_server: libechothrift.a
@$(CXX) native_server.cpp gen-cpp/echo_types.cpp gen-cpp/EchoService.cpp $(HDRPATHS) $(LIBPATHS) $(CXXFLAGS) -lthriftnb -lthrift -levent -lpthread -lgflags -lbrpc -lbrpc_thrift -o native_server
native_client: libechothrift.a
@$(CXX) native_client.cpp gen-cpp/echo_types.cpp gen-cpp/EchoService.cpp $(HDRPATHS) $(LIBPATHS) $(CXXFLAGS) -lthriftnb -lthrift -levent -lpthread -lgflags -lbrpc -lbrpc_thrift -o native_client
Note:
Only thrift framed transport supported now, in another words, only working on thrift nonblocking mode.
summary:
echo_client/echo_server:
brpc + thrift protocol version
native_client/native_server:
native thrift cpp version
// Copyright (c) 2017 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// A client sending thrift requests to server every 1 second.
#include <gflags/gflags.h>
#include "gen-cpp/EchoService.h"
#include "gen-cpp/echo_types.h"
#include <butil/logging.h>
#include <butil/time.h>
#include <butil/strings/string_piece.h>
#include <brpc/channel.h>
#include <brpc/details/thrift_utils.h>
#include <brpc/thrift_message.h>
#include <bvar/bvar.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/protocol/TBinaryProtocol.h>
bvar::LatencyRecorder g_latency_recorder("client");
DEFINE_string(server, "0.0.0.0:8019", "IP Address of server");
DEFINE_string(load_balancer, "", "The algorithm for load balancing");
DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
int main(int argc, char* argv[]) {
// Parse gflags. We recommend you to use gflags as well.
google::ParseCommandLineFlags(&argc, &argv, true);
// A Channel represents a communication line to a Server. Notice that
// Channel is thread-safe and can be shared by all threads in your program.
brpc::Channel channel;
// Initialize the channel, NULL means using default options.
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_THRIFT;
options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/;
options.max_retry = FLAGS_max_retry;
if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) {
LOG(ERROR) << "Fail to initialize channel";
return -1;
}
// Send a request and wait for the response every 1 second.
int log_id = 0;
std::string query_string = "hello";
for(auto i = 0; i < 1000000; i++) {
query_string += " test";
}
while (!brpc::IsAskedToQuit()) {
brpc::Controller cntl;
cntl.set_log_id(log_id ++); // set by user
// wrapper thrift raw request into ThriftMessage
brpc::ThriftTemplateMessage<example::EchoRequest> req;
brpc::ThriftTemplateMessage<example::EchoResponse> res;
req.raw().data = "hello";
cntl.set_thrift_method_name("Echo");
channel.CallMethod(NULL, &cntl, &req, &res, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to send thrift request, " << cntl.ErrorText();
sleep(1); // Remove this sleep in production code.
} else {
g_latency_recorder << cntl.latency_us();
}
LOG(INFO) << "Thrift Res data: " << res.raw().data;
LOG_EVERY_SECOND(INFO)
<< "Sending thrift requests at qps=" << g_latency_recorder.qps(1)
<< " latency=" << g_latency_recorder.latency(1);
sleep(1);
}
LOG(INFO) << "EchoClient is going to quit";
return 0;
}
namespace cpp example
struct EchoRequest {
1: required string data;
2: required i32 s;
}
struct EchoResponse {
1: required string data;
}
service EchoService {
EchoResponse Echo(1:EchoRequest request);
}
// Copyright (c) 2017 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// A thrift client sending requests to server every 1 second.
#include <gflags/gflags.h>
#include "gen-cpp/EchoService.h"
#include "gen-cpp/echo_types.h"
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <butil/logging.h>
DEFINE_string(server, "0.0.0.0", "IP Address of server");
DEFINE_int32(port, 8019, "Port of server");
int main(int argc, char **argv) {
// Parse gflags. We recommend you to use gflags as well.
google::ParseCommandLineFlags(&argc, &argv, true);
boost::shared_ptr<apache::thrift::transport::TSocket> socket(
new apache::thrift::transport::TSocket(FLAGS_server, FLAGS_port));
boost::shared_ptr<apache::thrift::transport::TTransport> transport(
new apache::thrift::transport::TFramedTransport(socket));
boost::shared_ptr<apache::thrift::protocol::TProtocol> protocol(
new apache::thrift::protocol::TBinaryProtocol(transport));
example::EchoServiceClient client(protocol);
transport->open();
example::EchoRequest req;
req.data = "hello";
example::EchoResponse res;
while (1) {
client.Echo(res, req);
LOG(INFO) << "Req: " << req.data
<< " Res: " << res.data;
sleep(1);
}
transport->close();
return 0;
}
// Copyright (c) 2017 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// A thrift server to receive EchoRequest and send back EchoResponse.
#include <gflags/gflags.h>
#include <butil/logging.h>
#include "gen-cpp/EchoService.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TTransportUtils.h>
#include <thrift/server/TNonblockingServer.h>
#include <thrift/concurrency/PosixThreadFactory.h>
DEFINE_int32(port, 8019, "Port of server");
class EchoServiceHandler : virtual public example::EchoServiceIf {
public:
EchoServiceHandler() {}
void Echo(example::EchoResponse& res, const example::EchoRequest& req) {
// Process request, just attach a simple string.
res.data = req.data + " world";
return;
}
};
int main(int argc, char *argv[]) {
// Parse gflags. We recommend you to use gflags as well.
google::ParseCommandLineFlags(&argc, &argv, true);
boost::shared_ptr<EchoServiceHandler> handler(new EchoServiceHandler());
boost::shared_ptr<apache::thrift::concurrency::PosixThreadFactory> thread_factory(
new apache::thrift::concurrency::PosixThreadFactory(
apache::thrift::concurrency::PosixThreadFactory::ROUND_ROBIN,
apache::thrift::concurrency::PosixThreadFactory::NORMAL, 1, false));
boost::shared_ptr<apache::thrift::server::TProcessor> processor(
new example::EchoServiceProcessor(handler));
boost::shared_ptr<apache::thrift::protocol::TProtocolFactory> protocol_factory(
new apache::thrift::protocol::TBinaryProtocolFactory());
boost::shared_ptr<apache::thrift::transport::TTransportFactory> transport_factory(
new apache::thrift::transport::TBufferedTransportFactory());
boost::shared_ptr<apache::thrift::concurrency::ThreadManager> thread_mgr(
apache::thrift::concurrency::ThreadManager::newSimpleThreadManager(2));
thread_mgr->threadFactory(thread_factory);
thread_mgr->start();
apache::thrift::server::TNonblockingServer server(processor,
transport_factory, transport_factory, protocol_factory,
protocol_factory, FLAGS_port, thread_mgr);
server.serve();
return 0;
}
// Copyright (c) 2016 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// A server to receive EchoRequest and send back EchoResponse.
#include <gflags/gflags.h>
#include <butil/logging.h>
#include <brpc/server.h>
#include <brpc/thrift_service.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/transport/TBufferTransports.h>
#include "gen-cpp/EchoService.h"
#include "gen-cpp/echo_types.h"
DEFINE_int32(port, 8019, "TCP Port of this server");
DEFINE_int32(port2, 8018, "TCP Port of this server");
DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
"read/write operations during the last `idle_timeout_s'");
DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel");
class EchoServiceHandler : virtual public example::EchoServiceIf {
public:
EchoServiceHandler() {}
void Echo(example::EchoResponse& res, const example::EchoRequest& req) {
// Process request, just attach a simple string.
res.data = req.data + " world";
LOG(INFO) << "Echo req.data: " << req.data;
return;
}
};
// Adapt your own thrift-based protocol to use brpc
class MyThriftProtocol : public brpc::ThriftService {
public:
MyThriftProtocol(EchoServiceHandler* handler) : _handler(handler) { }
void ProcessThriftFramedRequest(const brpc::Server&,
brpc::Controller* cntl,
brpc::ThriftMessage* request,
brpc::ThriftMessage* response,
brpc::ThriftClosure* done) {
// This object helps you to call done->Run() in RAII style. If you need
// to process the request asynchronously, pass done_guard.release().
brpc::ClosureGuard done_guard(done);
if (cntl->Failed()) {
// NOTE: You can send back a response containing error information
// back to client instead of closing the connection.
cntl->CloseConnection("Close connection due to previous error");
return;
}
example::EchoRequest* req = request->cast<example::EchoRequest>();
example::EchoResponse* res = response->cast<example::EchoResponse>();
// process with req and res
if (_handler) {
_handler->Echo(*res, *req);
} else {
cntl->CloseConnection("Close connection due to no valid handler");
LOG(ERROR) << "Fail to process thrift request due to no valid handler";
return;
}
LOG(INFO) << "success to process thrift request in brpc with handler";
}
private:
EchoServiceHandler* _handler;
};
// Adapt your own thrift-based protocol to use brpc
class MyThriftProtocolPbManner : public brpc::ThriftService {
public:
void ProcessThriftFramedRequest(const brpc::Server&,
brpc::Controller* cntl,
brpc::ThriftMessage* request,
brpc::ThriftMessage* response,
brpc::ThriftClosure* done) {
// This object helps you to call done->Run() in RAII style. If you need
// to process the request asynchronously, pass done_guard.release().
brpc::ClosureGuard done_guard(done);
if (cntl->Failed()) {
// NOTE: You can send back a response containing error information
// back to client instead of closing the connection.
cntl->CloseConnection("Close connection due to previous error");
return;
}
example::EchoRequest* req = request->cast<example::EchoRequest>();
example::EchoResponse* res = response->cast<example::EchoResponse>();
// process with req and res
res->data = req->data + " world another!";
LOG(INFO) << "success to process thrift request in brpc with pb manner";
}
};
int main(int argc, char* argv[]) {
// Parse gflags. We recommend you to use gflags as well.
google::ParseCommandLineFlags(&argc, &argv, true);
brpc::Server server;
brpc::ServerOptions options;
auto thrift_service_handler = new EchoServiceHandler();
options.thrift_service = new MyThriftProtocol(thrift_service_handler);
options.idle_timeout_sec = FLAGS_idle_timeout_s;
options.max_concurrency = FLAGS_max_concurrency;
// Start the server.
if (server.Start(FLAGS_port, &options) != 0) {
LOG(ERROR) << "Fail to start EchoServer";
return -1;
}
brpc::Server server2;
brpc::ServerOptions options2;
options2.thrift_service = new MyThriftProtocolPbManner;
options2.idle_timeout_sec = FLAGS_idle_timeout_s;
options2.max_concurrency = FLAGS_max_concurrency;
// Start the server2.
if (server2.Start(FLAGS_port2, &options2) != 0) {
LOG(ERROR) << "Fail to start EchoServer";
return -1;
}
// Wait until Ctrl-C is pressed, then Stop() and Join() the server.
server.RunUntilAskedToQuit();
return 0;
}
......@@ -29,6 +29,11 @@ if(BRPC_WITH_GLOG)
target_link_libraries(brpc-shared ${GLOG_LIB})
endif()
if(BRPC_WITH_THRIFT)
target_link_libraries(brpc-shared thrift)
target_link_libraries(brpc-static thrift)
endif()
SET_TARGET_PROPERTIES(brpc-static PROPERTIES OUTPUT_NAME brpc CLEAN_DIRECT_OUTPUT 1)
SET_TARGET_PROPERTIES(brpc-shared PROPERTIES OUTPUT_NAME brpc CLEAN_DIRECT_OUTPUT 1)
......
File mode changed from 100644 to 100755
......@@ -252,6 +252,7 @@ void Controller::InternalReset(bool in_constructor) {
_request_stream = INVALID_STREAM_ID;
_response_stream = INVALID_STREAM_ID;
_remote_stream_settings = NULL;
_thrift_method_name = "";
}
Controller::Call::Call(Controller::Call* rhs)
......
......@@ -451,6 +451,11 @@ public:
void set_idl_result(int64_t result) { _idl_result = result; }
int64_t idl_result() const { return _idl_result; }
void set_thrift_method_name(const std::string& method_name) {
_thrift_method_name = method_name;
}
std::string thrift_method_name() { return _thrift_method_name; }
private:
struct CompletionInfo {
CallId id; // call_id of the corresponding request
......@@ -681,6 +686,10 @@ private:
StreamId _response_stream;
// Defined at both sides
StreamSettings *_remote_stream_settings;
// Thrift method name, only used when thrift protocol enabled
std::string _thrift_method_name;
uint32_t _thrift_seq_id;
};
// Advises the RPC system that the caller desires that the RPC call be
......
// Copyright (c) 2015 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef BRPC_THRIFT_HEADER_H
#define BRPC_THRIFT_HEADER_H
namespace brpc {
static const int32_t VERSION_MASK = ((int32_t)0xffffff00);
static const int32_t VERSION_1 = ((int32_t)0x80010000);
struct thrift_binary_head_t {
int32_t body_len;
};
} // namespace brpc
#endif // BRPC_THRIFT_HEADER_H
// Copyright (c) 2017 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// utils for serilize/deserilize thrift binary message to brpc protobuf obj.
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
#ifndef BRPC_THRIFT_UTILS_H
#define BRPC_THRIFT_UTILS_H
#include <boost/make_shared.hpp>
#include "butil/iobuf.h"
#include <thrift/TDispatchProcessor.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/protocol/TBinaryProtocol.h>
namespace brpc {
template <typename T>
void thrift_framed_message_deleter(void* p) {
delete static_cast<T*>(p);
}
template <typename T>
uint32_t thrift_framed_message_writer(void* p, void* prot) {
T* writer = static_cast<T*>(p);
return writer->write(static_cast<::apache::thrift::protocol::TProtocol*>(prot));
}
template<typename T>
bool serialize_iobuf_to_thrift_message(butil::IOBuf& body,
void* thrift_raw_instance, std::string* method_name, int32_t* thrift_message_seq_id) {
auto in_buffer =
boost::make_shared<apache::thrift::transport::TMemoryBuffer>();
auto in_portocol =
boost::make_shared<apache::thrift::protocol::TBinaryProtocol>(in_buffer);
// Cut the thrift buffer and parse thrift message
size_t body_len = body.size();
std::unique_ptr<uint8_t[]> thrift_buffer(new uint8_t[body_len]);
const size_t k = body.copy_to(thrift_buffer.get(), body_len);
if ( k != body_len) {
return false;
}
in_buffer->resetBuffer(thrift_buffer.get(), body_len);
// The following code was taken and modified from thrift auto generated code
std::string fname;
::apache::thrift::protocol::TMessageType mtype;
in_portocol->readMessageBegin(*method_name, mtype, *thrift_message_seq_id);
apache::thrift::protocol::TInputRecursionTracker tracker(*in_portocol);
uint32_t xfer = 0;
::apache::thrift::protocol::TType ftype;
int16_t fid;
xfer += in_portocol->readStructBegin(fname);
using ::apache::thrift::protocol::TProtocolException;
while (true)
{
xfer += in_portocol->readFieldBegin(fname, ftype, fid);
if (ftype == ::apache::thrift::protocol::T_STOP) {
break;
}
switch (fid)
{
case 1:
if (ftype == ::apache::thrift::protocol::T_STRUCT) {
xfer += static_cast<T*>(thrift_raw_instance)->read(in_portocol.get());
} else {
xfer += in_portocol->skip(ftype);
}
break;
default:
xfer += in_portocol->skip(ftype);
break;
}
xfer += in_portocol->readFieldEnd();
}
xfer += in_portocol->readStructEnd();
in_portocol->readMessageEnd();
in_portocol->getTransport()->readEnd();
// End thrift auto generated code
return true;
}
}
#endif //BRPC_THRIFT_UTILS_H
#endif //ENABLE_THRIFT_FRAMED_PROTOCOL
......@@ -61,6 +61,7 @@
#include "brpc/policy/nshead_mcpack_protocol.h"
#include "brpc/policy/rtmp_protocol.h"
#include "brpc/policy/esp_protocol.h"
#include "brpc/policy/thrift_protocol.h"
#include "brpc/input_messenger.h" // get_or_new_client_side_messenger
#include "brpc/socket_map.h" // SocketMapList
......@@ -96,6 +97,8 @@ using namespace policy;
const char* const DUMMY_SERVER_PORT_FILE = "dummy_server.port";
void __attribute__((weak)) RegisterThriftProtocol();
struct GlobalExtensions {
GlobalExtensions()
: ch_mh_lb(MurmurHash32)
......@@ -463,6 +466,12 @@ static void GlobalInitializeOrDieImpl() {
exit(1);
}
// Register Thrift framed protocol if linked
if (brpc::RegisterThriftProtocol) {
brpc::RegisterThriftProtocol();
}
// Only valid at client side
Protocol ubrpc_compack_protocol = {
ParseNsheadMessage,
......
......@@ -45,6 +45,7 @@ enum ProtocolType {
// Reserve special protocol for cds-agent, which depends on FIFO right now
PROTOCOL_CDS_AGENT = 23; // Client side only
PROTOCOL_ESP = 24; // Client side only
PROTOCOL_THRIFT = 25; // Server side only
}
enum CompressType {
......
// Copyright (c) 2015 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: wangxuefeng (wangxuefeng@didichuxing.com)
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
#include <google/protobuf/descriptor.h> // MethodDescriptor
#include <google/protobuf/message.h> // Message
#include <gflags/gflags.h>
#include <boost/make_shared.hpp>
#include "butil/time.h"
#include "butil/iobuf.h" // butil::IOBuf
#include "brpc/log.h"
#include "brpc/controller.h" // Controller
#include "brpc/socket.h" // Socket
#include "brpc/server.h" // Server
#include "brpc/span.h"
#include "brpc/details/server_private_accessor.h"
#include "brpc/details/controller_private_accessor.h"
#include "brpc/thrift_service.h"
#include "brpc/policy/most_common_message.h"
#include "brpc/policy/thrift_protocol.h"
#include "brpc/details/usercode_backup_pool.h"
#include <thrift/Thrift.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/protocol/TBinaryProtocol.h>
extern "C" {
void bthread_assign_data(void* data) __THROW;
}
namespace brpc {
ThriftClosure::ThriftClosure(void* additional_space)
: _socket_ptr(NULL)
, _server(NULL)
, _start_parse_us(0)
, _do_respond(true)
, _additional_space(additional_space) {
}
ThriftClosure::~ThriftClosure() {
LogErrorTextAndDelete(false)(&_controller);
}
void ThriftClosure::DoNotRespond() {
_do_respond = false;
}
class DeleteThriftClosure {
public:
void operator()(ThriftClosure* done) const {
done->~ThriftClosure();
free(done);
}
};
void ThriftClosure::Run() {
// Recycle itself after `Run'
std::unique_ptr<ThriftClosure, DeleteThriftClosure> recycle_ctx(this);
SocketUniquePtr sock(_socket_ptr);
ScopedRemoveConcurrency remove_concurrency_dummy(_server, &_controller);
ControllerPrivateAccessor accessor(&_controller);
Span* span = accessor.span();
if (span) {
span->set_start_send_us(butil::cpuwide_time_us());
}
ScopedMethodStatus method_status(_server->options().thrift_service->_status);
if (!method_status) {
// Judge errors belongings.
// may not be accurate, but it does not matter too much.
const int error_code = _controller.ErrorCode();
if (error_code == ENOSERVICE ||
error_code == ENOMETHOD ||
error_code == EREQUEST ||
error_code == ECLOSE ||
error_code == ELOGOFF ||
error_code == ELIMIT) {
ServerPrivateAccessor(_server).AddError();
}
}
if (_controller.IsCloseConnection()) {
sock->SetFailed();
return;
}
if (_do_respond) {
// response uses request's head as default.
_response.head = _request.head;
if (_response.thrift_raw_instance) {
std::string method_name = _request.method_name;
if (method_name == "" ||
method_name.length() < 1 ||
method_name[0] == ' ') {
_controller.SetFailed(ENOMETHOD,
"invalid thrift method name or method name empty in server!");
return;
}
auto out_buffer =
boost::make_shared<apache::thrift::transport::TMemoryBuffer>();
auto oprot =
boost::make_shared<apache::thrift::protocol::TBinaryProtocol>(out_buffer);
// The following code was taken and modified from thrift auto generated code
oprot->writeMessageBegin(method_name,
::apache::thrift::protocol::T_REPLY, _request.thrift_message_seq_id);
uint32_t xfer = 0;
xfer += oprot->writeStructBegin("placeholder");
xfer += oprot->writeFieldBegin("success",
::apache::thrift::protocol::T_STRUCT, 0);
if (_response.thrift_raw_instance && _response.thrift_raw_instance_writer) {
xfer += _response.thrift_raw_instance_writer(
_response.thrift_raw_instance, oprot.get());
} else {
_controller.SetFailed(ERESPONSE, "thrift_raw_instance or"
"thrift_raw_instance_writer is null!");
}
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
oprot->writeMessageEnd();
oprot->getTransport()->writeEnd();
oprot->getTransport()->flush();
// End thrfit auto generated code
uint8_t* buf;
uint32_t sz;
out_buffer->getBuffer(&buf, &sz);
_response.body.append(buf, sz);
}
uint32_t length = _response.body.length();
_response.head.body_len = htonl(length);
if (span) {
int response_size = sizeof(thrift_binary_head_t) + _response.head.body_len;
span->set_response_size(response_size);
}
butil::IOBuf write_buf;
write_buf.append(&_response.head, sizeof(thrift_binary_head_t));
write_buf.append(_response.body.movable());
// Have the risk of unlimited pending responses, in which case, tell
// users to set max_concurrency.
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (sock->Write(&write_buf, &wopt) != 0) {
const int errcode = errno;
PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
_controller.SetFailed(errcode, "Fail to write into %s",
sock->description().c_str());
return;
}
}
if (span) {
// TODO: this is not sent
span->set_sent_us(butil::cpuwide_time_us());
}
if (method_status) {
method_status.release()->OnResponded(
!_controller.Failed(), butil::cpuwide_time_us() - cpuwide_start_us());
}
}
void ThriftClosure::SetMethodName(const std::string& full_method_name) {
ControllerPrivateAccessor accessor(&_controller);
Span* span = accessor.span();
if (span) {
span->ResetServerSpanName(full_method_name);
}
}
namespace policy {
ParseResult ParseThriftMessage(butil::IOBuf* source,
Socket*, bool /*read_eof*/, const void* /*arg*/) {
char header_buf[sizeof(thrift_binary_head_t) + 3];
const size_t n = source->copy_to(header_buf, sizeof(thrift_binary_head_t) + 3);
if (n < 7) {
return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
}
const void* dummy = header_buf + sizeof(thrift_binary_head_t);
const int32_t sz = ntohl(*(int32_t*)dummy);
int32_t version = sz & VERSION_MASK;
if (version != VERSION_1) {
RPC_VLOG << "magic_num=" << version
<< " doesn't match THRIFT_MAGIC_NUM=" << VERSION_1;
return MakeParseError(PARSE_ERROR_TRY_OTHERS);
}
thrift_binary_head_t* thrift = (thrift_binary_head_t *)header_buf;
thrift->body_len = ntohl(thrift->body_len);
uint32_t body_len = thrift->body_len;
if (body_len > FLAGS_max_body_size) {
return MakeParseError(PARSE_ERROR_TOO_BIG_DATA);
} else if (source->length() < sizeof(header_buf) + body_len - 3) {
return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
}
policy::MostCommonMessage* msg = policy::MostCommonMessage::Get();
source->cutn(&msg->meta, sizeof(thrift_binary_head_t));
source->cutn(&msg->payload, body_len);
return MakeMessage(msg);
}
struct CallMethodInBackupThreadArgs {
ThriftService* service;
const Server* server;
Controller* controller;
ThriftMessage* request;
ThriftMessage* response;
ThriftClosure* done;
};
static void CallMethodInBackupThread(void* void_args) {
CallMethodInBackupThreadArgs* args = (CallMethodInBackupThreadArgs*)void_args;
args->service->ProcessThriftFramedRequest(*args->server, args->controller,
args->request, args->response,
args->done);
delete args;
}
static void EndRunningCallMethodInPool(ThriftService* service,
const Server& server,
Controller* controller,
ThriftMessage* request,
ThriftMessage* response,
ThriftClosure* done) {
CallMethodInBackupThreadArgs* args = new CallMethodInBackupThreadArgs;
args->service = service;
args->server = &server;
args->controller = controller;
args->request = request;
args->response = response;
args->done = done;
return EndRunningUserCodeInPool(CallMethodInBackupThread, args);
};
void ProcessThriftRequest(InputMessageBase* msg_base) {
const int64_t start_parse_us = butil::cpuwide_time_us();
DestroyingPtr<MostCommonMessage> msg(static_cast<MostCommonMessage*>(msg_base));
SocketUniquePtr socket(msg->ReleaseSocket());
const Server* server = static_cast<const Server*>(msg_base->arg());
ScopedNonServiceError non_service_error(server);
char buf[sizeof(thrift_binary_head_t)];
const char *p = (const char *)msg->meta.fetch(buf, sizeof(buf));
thrift_binary_head_t *req_head = (thrift_binary_head_t *)p;
req_head->body_len = ntohl(req_head->body_len);
ThriftService* service = server->options().thrift_service;
if (service == NULL) {
LOG_EVERY_SECOND(WARNING)
<< "Received thrift request however the server does not set"
" ServerOptions.thrift_service, close the connection.";
socket->SetFailed();
return;
}
void* space = malloc(sizeof(ThriftClosure) + service->_additional_space);
if (!space) {
LOG(FATAL) << "Fail to new ThriftClosure";
socket->SetFailed();
return;
}
// Switch to service-specific error.
non_service_error.release();
MethodStatus* method_status = service->_status;
if (method_status) {
CHECK(method_status->OnRequested());
}
void* sub_space = NULL;
if (service->_additional_space) {
sub_space = (char*)space + sizeof(ThriftClosure);
}
ThriftClosure* thrift_done = new (space) ThriftClosure(sub_space);
Controller* cntl = &(thrift_done->_controller);
ThriftMessage* req = &(thrift_done->_request);
ThriftMessage* res = &(thrift_done->_response);
req->head = *req_head;
msg->payload.swap(req->body);
thrift_done->_start_parse_us = start_parse_us;
thrift_done->_socket_ptr = socket.get();
thrift_done->_server = server;
ServerPrivateAccessor server_accessor(server);
ControllerPrivateAccessor accessor(cntl);
const bool security_mode = server->options().security_mode() &&
socket->user() == server_accessor.acceptor();
// Initialize log_id with the log_id in thrift. Notice that the protocols
// on top of ThriftService may pack log_id in meta or user messages and
// overwrite the value.
//cntl->set_log_id(req_head->log_id);
accessor.set_server(server)
.set_security_mode(security_mode)
.set_peer_id(socket->id())
.set_remote_side(socket->remote_side())
.set_local_side(socket->local_side())
.set_request_protocol(PROTOCOL_THRIFT);
// Tag the bthread with this server's key for thread_local_data().
if (server->thread_local_options().thread_local_data_factory) {
bthread_assign_data((void*)&server->thread_local_options());
}
Span* span = NULL;
if (IsTraceable(false)) {
span = Span::CreateServerSpan(0, 0, 0, msg->base_real_us());
accessor.set_span(span);
//span->set_log_id(req_head->log_id);
span->set_remote_side(cntl->remote_side());
span->set_protocol(PROTOCOL_THRIFT);
span->set_received_us(msg->received_us());
span->set_start_parse_us(start_parse_us);
span->set_request_size(sizeof(thrift_binary_head_t) + req_head->body_len);
}
do {
if (!server->IsRunning()) {
cntl->SetFailed(ELOGOFF, "Server is stopping");
break;
}
if (!server_accessor.AddConcurrency(cntl)) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
server->options().max_concurrency);
break;
}
if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
cntl->SetFailed(ELIMIT, "Too many user code to run when"
" -usercode_in_pthread is on");
break;
}
} while (false);
msg.reset(); // optional, just release resourse ASAP
// `socket' will be held until response has been sent
socket.release();
if (span) {
span->ResetServerSpanName(service->_cached_name);
span->set_start_callback_us(butil::cpuwide_time_us());
span->AsParent();
}
try {
if (!FLAGS_usercode_in_pthread) {
return service->ProcessThriftFramedRequest(*server, cntl,
req, res, thrift_done);
}
if (BeginRunningUserCode()) {
service->ProcessThriftFramedRequest(*server, cntl, req, res, thrift_done);
return EndRunningUserCodeInPlace();
} else {
return EndRunningCallMethodInPool(
service, *server, cntl, req, res, thrift_done);
}
} catch (::apache::thrift::TException& e) {
cntl->SetFailed(EREQUEST, "Invalid request data, reason: %s", e.what());
} catch (...) {
cntl->SetFailed(EINTERNAL, "Internal server error!");
}
}
void ProcessThriftResponse(InputMessageBase* msg_base) {
const int64_t start_parse_us = butil::cpuwide_time_us();
DestroyingPtr<MostCommonMessage> msg(static_cast<MostCommonMessage*>(msg_base));
// Fetch correlation id that we saved before in `PacThriftRequest'
const CallId cid = { static_cast<uint64_t>(msg->socket()->correlation_id()) };
Controller* cntl = NULL;
const int rc = bthread_id_lock(cid, (void**)&cntl);
if (rc != 0) {
LOG_IF(ERROR, rc != EINVAL && rc != EPERM)
<< "Fail to lock correlation_id=" << cid << ": " << berror(rc);
return;
}
ControllerPrivateAccessor accessor(cntl);
Span* span = accessor.span();
if (span) {
span->set_base_real_us(msg->base_real_us());
span->set_received_us(msg->received_us());
span->set_response_size(msg->payload.length());
span->set_start_parse_us(start_parse_us);
}
// MUST be ThriftMessage (checked in SerializeThriftRequest)
ThriftMessage* response = (ThriftMessage*)cntl->response();
const int saved_error = cntl->ErrorCode();
if (response != NULL) {
msg->meta.copy_to(&response->head, sizeof(thrift_binary_head_t));
response->head.body_len = ntohl(response->head.body_len);
msg->payload.swap(response->body);
uint32_t body_len = response->head.body_len;
// Deserialize binary message to thrift message
std::unique_ptr<uint8_t[]>thrift_buffer(new uint8_t[body_len]);
const size_t k = response->body.copy_to(thrift_buffer.get(), body_len);
if ( k != body_len) {
cntl->SetFailed("copy response body to thrift buffer failed!");
return;
}
auto in_buffer =
boost::make_shared<apache::thrift::transport::TMemoryBuffer>();
auto in_portocol =
boost::make_shared<apache::thrift::protocol::TBinaryProtocol>(in_buffer);
in_buffer->resetBuffer(thrift_buffer.get(), body_len);
// The following code was taken from thrift auto generate code
int32_t rseqid = 0;
std::string fname;
::apache::thrift::protocol::TMessageType mtype;
in_portocol->readMessageBegin(fname, mtype, rseqid);
if (mtype == ::apache::thrift::protocol::T_EXCEPTION) {
cntl->SetFailed("thrift process server response exception!");
return;
}
if (mtype != ::apache::thrift::protocol::T_REPLY) {
in_portocol->skip(::apache::thrift::protocol::T_STRUCT);
in_portocol->readMessageEnd();
in_portocol->getTransport()->readEnd();
}
if (fname.compare(cntl->thrift_method_name()) != 0) {
in_portocol->skip(::apache::thrift::protocol::T_STRUCT);
in_portocol->readMessageEnd();
in_portocol->getTransport()->readEnd();
}
// presult section
apache::thrift::protocol::TInputRecursionTracker tracker(*in_portocol);
uint32_t xfer = 0;
::apache::thrift::protocol::TType ftype;
int16_t fid;
xfer += in_portocol->readStructBegin(fname);
using ::apache::thrift::protocol::TProtocolException;
bool success = false;
while (true)
{
xfer += in_portocol->readFieldBegin(fname, ftype, fid);
if (ftype == ::apache::thrift::protocol::T_STOP) {
break;
}
switch (fid)
{
case 0:
if (ftype == ::apache::thrift::protocol::T_STRUCT) {
xfer += response->read(in_portocol.get());
success = true;
} else {
xfer += in_portocol->skip(ftype);
}
break;
default:
xfer += in_portocol->skip(ftype);
break;
}
xfer += in_portocol->readFieldEnd();
}
xfer += in_portocol->readStructEnd();
// end presult section
in_portocol->readMessageEnd();
in_portocol->getTransport()->readEnd();
if (!success) {
cntl->SetFailed("thrift process server response exception!");
return;
}
} // else just ignore the response.
// Unlocks correlation_id inside. Revert controller's
// error code if it version check of `cid' fails
msg.reset(); // optional, just release resourse ASAP
accessor.OnResponse(cid, saved_error);
}
bool VerifyThriftRequest(const InputMessageBase* msg_base) {
Server* server = (Server*)msg_base->arg();
if (server->options().auth) {
LOG(WARNING) << "thrift does not support authentication";
return false;
}
return true;
}
void SerializeThriftRequest(butil::IOBuf* request_buf, Controller* cntl,
const google::protobuf::Message* req_base) {
if (req_base == NULL) {
return cntl->SetFailed(EREQUEST, "request is NULL");
}
ControllerPrivateAccessor accessor(cntl);
const ThriftMessage* req = (const ThriftMessage*)req_base;
thrift_binary_head_t head = req->head;
auto out_buffer =
boost::make_shared<apache::thrift::transport::TMemoryBuffer>();
auto out_portocol =
boost::make_shared<apache::thrift::protocol::TBinaryProtocol>(out_buffer);
std::string thrift_method_name = cntl->thrift_method_name();
// we should do more check on the thrift method name, but since it is rare when
// the method_name is just some white space or something else
if (cntl->thrift_method_name() == "" ||
cntl->thrift_method_name().length() < 1 ||
cntl->thrift_method_name()[0] == ' ') {
return cntl->SetFailed(ENOMETHOD,
"invalid thrift method name or method name empty!");
}
// The following code was taken from thrift auto generated code
// send_xxx
int32_t cseqid = 0;
out_portocol->writeMessageBegin(thrift_method_name,
::apache::thrift::protocol::T_CALL, cseqid);
// xxx_pargs write
uint32_t xfer = 0;
apache::thrift::protocol::TOutputRecursionTracker tracker(*out_portocol);
std::string struct_begin_str = "ThriftService_" + thrift_method_name + "_pargs";
xfer += out_portocol->writeStructBegin(struct_begin_str.c_str());
xfer += out_portocol->writeFieldBegin("request", ::apache::thrift::protocol::T_STRUCT, 1);
// request's write
ThriftMessage* r = const_cast<ThriftMessage*>(req);
xfer += r->write(out_portocol.get());
// end request's write
xfer += out_portocol->writeFieldEnd();
xfer += out_portocol->writeFieldStop();
xfer += out_portocol->writeStructEnd();
// end xxx_pargs write
out_portocol->writeMessageEnd();
out_portocol->getTransport()->writeEnd();
out_portocol->getTransport()->flush();
// end send_xxx
// end thrift auto generated code
uint8_t* buf;
uint32_t sz;
out_buffer->getBuffer(&buf, &sz);
head.body_len = ntohl(sz);
request_buf->append(&head, sizeof(head));
// end auto generate code
request_buf->append(buf, sz);
}
void PackThriftRequest(
butil::IOBuf* packet_buf,
SocketMessage**,
uint64_t correlation_id,
const google::protobuf::MethodDescriptor*,
Controller* cntl,
const butil::IOBuf& request,
const Authenticator*) {
ControllerPrivateAccessor accessor(cntl);
if (accessor.connection_type() == CONNECTION_TYPE_SINGLE) {
return cntl->SetFailed(
EINVAL, "thrift protocol can't work with CONNECTION_TYPE_SINGLE");
}
// Store `correlation_id' into the socket since thrift protocol can't
// pack the field.
accessor.get_sending_socket()->set_correlation_id(correlation_id);
Span* span = accessor.span();
if (span) {
span->set_request_size(request.length());
// TODO: Nowhere to set tracing ids.
// request_meta->set_trace_id(span->trace_id());
// request_meta->set_span_id(span->span_id());
// request_meta->set_parent_span_id(span->parent_span_id());
}
packet_buf->append(request);
}
} // namespace policy
void RegisterThriftProtocol() {
Protocol thrift_binary_protocol = {policy::ParseThriftMessage,
policy::SerializeThriftRequest, policy::PackThriftRequest,
policy::ProcessThriftRequest, policy::ProcessThriftResponse,
policy::VerifyThriftRequest, NULL, NULL,
CONNECTION_TYPE_POOLED_AND_SHORT, "thrift" };
if (RegisterProtocol(PROTOCOL_THRIFT, thrift_binary_protocol) != 0) {
exit(1);
}
}
} // namespace brpc
#endif
// Copyright (c) 2015 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: wangxuefeng (wangxuefeng@didichuxing.com)
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
#ifndef BRPC_POLICY_THRIFT_PROTOCOL_H
#define BRPC_POLICY_THRIFT_PROTOCOL_H
#include "brpc/protocol.h"
namespace brpc {
namespace policy {
// Parse binary protocol format of thrift framed
ParseResult ParseThriftMessage(butil::IOBuf* source, Socket* socket, bool read_eof, const void *arg);
// Actions to a (client) request in thrift binary framed format
void ProcessThriftRequest(InputMessageBase* msg);
// Actions to a (server) response in thrift binary framed format
void ProcessThriftResponse(InputMessageBase* msg);
void SerializeThriftRequest(butil::IOBuf* request_buf, Controller* controller,
const google::protobuf::Message* request);
void PackThriftRequest(
butil::IOBuf* packet_buf,
SocketMessage**,
uint64_t correlation_id,
const google::protobuf::MethodDescriptor*,
Controller* controller,
const butil::IOBuf&,
const Authenticator*);
// Verify authentication information in thrift binary format
bool VerifyThriftRequest(const InputMessageBase *msg);
} // namespace policy
} // namespace brpc
#endif // BRPC_POLICY_THRIFT_PROTOCOL_H
#endif
File mode changed from 100644 to 100755
......@@ -40,6 +40,7 @@
#include "brpc/details/ssl_helper.h" // CreateServerSSLContext
#include "brpc/protocol.h" // ListProtocols
#include "brpc/nshead_service.h" // NsheadService
#include "brpc/thrift_service.h" // ThriftService
#include "brpc/builtin/bad_method_service.h" // BadMethodService
#include "brpc/builtin/get_favicon_service.h"
#include "brpc/builtin/get_js_service.h"
......@@ -119,6 +120,7 @@ const int s_ncore = sysconf(_SC_NPROCESSORS_ONLN);
ServerOptions::ServerOptions()
: idle_timeout_sec(-1)
, nshead_service(NULL)
, thrift_service(NULL)
, mongo_service_adaptor(NULL)
, auth(NULL)
, server_owns_auth(false)
......@@ -307,6 +309,12 @@ void* Server::UpdateDerivedVars(void* arg) {
server->options().nshead_service->Expose(prefix);
}
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
if (server->options().thrift_service) {
server->options().thrift_service->Expose(prefix);
}
#endif
int64_t last_time = butil::gettimeofday_us();
int consecutive_nosleep = 0;
while (1) {
......@@ -389,6 +397,11 @@ Server::~Server() {
delete _options.nshead_service;
_options.nshead_service = NULL;
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
delete _options.thrift_service;
_options.thrift_service = NULL;
#endif
delete _options.http_master_service;
_options.http_master_service = NULL;
......@@ -1511,7 +1524,7 @@ void Server::GenerateVersionIfNeeded() {
if (!_version.empty()) {
return;
}
int extra_count = !!_options.nshead_service + !!_options.rtmp_service;
int extra_count = !!_options.nshead_service + !!_options.rtmp_service + !!_options.thrift_service;
_version.reserve((extra_count + service_count()) * 20);
for (ServiceMap::const_iterator it = _fullname_service_map.begin();
it != _fullname_service_map.end(); ++it) {
......@@ -1528,6 +1541,16 @@ void Server::GenerateVersionIfNeeded() {
}
_version.append(butil::class_name_str(*_options.nshead_service));
}
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
if (_options.thrift_service) {
if (!_version.empty()) {
_version.push_back('+');
}
_version.append(butil::class_name_str(*_options.thrift_service));
}
#endif
if (_options.rtmp_service) {
if (!_version.empty()) {
_version.push_back('+');
......
......@@ -46,6 +46,7 @@ namespace brpc {
class Acceptor;
class MethodStatus;
class NsheadService;
class ThriftService;
class SimpleDataPool;
class MongoServiceAdaptor;
class RestfulMap;
......@@ -69,6 +70,11 @@ struct ServerOptions {
// Default: NULL
NsheadService* nshead_service;
// Process requests in format of thrift_binary_head_t + blob.
// Owned by Server and deleted in server's destructor
// Default: NULL
ThriftService* thrift_service;
// Adaptor for Mongo protocol, check src/brpc/mongo_service_adaptor.h for details
// The adaptor will not be deleted by server
// and must remain valid when server is running.
......
// Copyright (c) 2014 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: wangxuefeng (wangxuefeng@didichuxing.com)
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
#define INTERNAL_SUPPRESS_PROTOBUF_FIELD_DEPRECATION
#include "brpc/thrift_message.h"
#include <algorithm>
#include "butil/logging.h"
#include <brpc/protocol.h> // RegisterProtocol
#include <brpc/policy/thrift_protocol.h>
#include <google/protobuf/stubs/once.h>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/wire_format_lite_inl.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/reflection_ops.h>
#include <google/protobuf/wire_format.h>
namespace brpc {
namespace {
const ::google::protobuf::Descriptor* ThriftMessage_descriptor_ = NULL;
} // namespace
void protobuf_AssignDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto() {
protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto();
const ::google::protobuf::FileDescriptor* file =
::google::protobuf::DescriptorPool::generated_pool()->FindFileByName(
"baidu/rpc/thrift_framed_message.proto");
GOOGLE_CHECK(file != NULL);
ThriftMessage_descriptor_ = file->message_type(0);
}
namespace {
GOOGLE_PROTOBUF_DECLARE_ONCE(protobuf_AssignDescriptors_once_);
inline void protobuf_AssignDescriptorsOnce() {
::google::protobuf::GoogleOnceInit(&protobuf_AssignDescriptors_once_,
&protobuf_AssignDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto);
}
void protobuf_RegisterTypes(const ::std::string&) {
protobuf_AssignDescriptorsOnce();
::google::protobuf::MessageFactory::InternalRegisterGeneratedMessage(
ThriftMessage_descriptor_, &ThriftMessage::default_instance());
}
} // namespace
void protobuf_ShutdownFile_baidu_2frpc_2fthrift_binary_5fmessage_2eproto() {
delete ThriftMessage::default_instance_;
}
void protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto_impl() {
GOOGLE_PROTOBUF_VERIFY_VERSION;
#if GOOGLE_PROTOBUF_VERSION >= 3002000
::google::protobuf::internal::InitProtobufDefaults();
#else
::google::protobuf::protobuf_AddDesc_google_2fprotobuf_2fdescriptor_2eproto();
#endif
::google::protobuf::DescriptorPool::InternalAddGeneratedFile(
"\n\033thrift_framed_message.proto\022\004brpc\"\025\n\023T"
"hriftBinaryMessage", 58);
::google::protobuf::MessageFactory::InternalRegisterGeneratedFile(
"thrift_framed_message.proto", &protobuf_RegisterTypes);
ThriftMessage::default_instance_ = new ThriftMessage();
ThriftMessage::default_instance_->InitAsDefaultInstance();
::google::protobuf::internal::OnShutdown(&protobuf_ShutdownFile_baidu_2frpc_2fthrift_binary_5fmessage_2eproto);
}
GOOGLE_PROTOBUF_DECLARE_ONCE(protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto_once);
void protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto() {
::google::protobuf::GoogleOnceInit(
&protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto_once,
&protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto_impl);
}
// Force AddDescriptors() to be called at static initialization time.
struct StaticDescriptorInitializer_baidu_2frpc_2fthrift_binary_5fmessage_2eproto {
StaticDescriptorInitializer_baidu_2frpc_2fthrift_binary_5fmessage_2eproto() {
protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto();
}
} static_descriptor_initializer_baidu_2frpc_2fthrift_binary_5fmessage_2eproto_;
// ===================================================================
#ifndef _MSC_VER
#endif // !_MSC_VER
ThriftMessage::ThriftMessage()
: ::google::protobuf::Message() {
SharedCtor();
}
void ThriftMessage::InitAsDefaultInstance() {
}
ThriftMessage::ThriftMessage(const ThriftMessage& from)
: ::google::protobuf::Message() {
SharedCtor();
MergeFrom(from);
}
void ThriftMessage::SharedCtor() {
memset(&head, 0, sizeof(head));
thrift_raw_instance_deleter = nullptr;
thrift_raw_instance = nullptr;
thrift_message_seq_id = 0;
method_name = "";
//RegisterThriftProtocolDummy dummy;
}
ThriftMessage::~ThriftMessage() {
SharedDtor();
if (thrift_raw_instance && thrift_raw_instance_deleter) {
thrift_raw_instance_deleter(thrift_raw_instance);
}
}
void ThriftMessage::SharedDtor() {
if (this != default_instance_) {
}
}
const ::google::protobuf::Descriptor* ThriftMessage::descriptor() {
protobuf_AssignDescriptorsOnce();
return ThriftMessage_descriptor_;
}
const ThriftMessage& ThriftMessage::default_instance() {
if (default_instance_ == NULL)
protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto();
return *default_instance_;
}
ThriftMessage* ThriftMessage::default_instance_ = NULL;
ThriftMessage* ThriftMessage::New() const {
return new ThriftMessage;
}
void ThriftMessage::Clear() {
memset(&head, 0, sizeof(head));
body.clear();
}
bool ThriftMessage::MergePartialFromCodedStream(
::google::protobuf::io::CodedInputStream* input) {
#define DO_(EXPRESSION) if (!(EXPRESSION)) return false
::google::protobuf::uint32 tag;
while ((tag = input->ReadTag()) != 0) {
if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) ==
::google::protobuf::internal::WireFormatLite::WIRETYPE_END_GROUP) {
return true;
}
}
return true;
#undef DO_
}
void ThriftMessage::SerializeWithCachedSizes(
::google::protobuf::io::CodedOutputStream*) const {
}
::google::protobuf::uint8* ThriftMessage::SerializeWithCachedSizesToArray(
::google::protobuf::uint8* target) const {
return target;
}
int ThriftMessage::ByteSize() const {
return sizeof(thrift_binary_head_t) + body.size();
}
void ThriftMessage::MergeFrom(const ::google::protobuf::Message& from) {
GOOGLE_CHECK_NE(&from, this);
const ThriftMessage* source =
::google::protobuf::internal::dynamic_cast_if_available<const ThriftMessage*>(
&from);
if (source == NULL) {
LOG(ERROR) << "Can only merge from ThriftMessage";
return;
} else {
MergeFrom(*source);
}
}
void ThriftMessage::MergeFrom(const ThriftMessage& from) {
GOOGLE_CHECK_NE(&from, this);
head = from.head;
body = from.body;
}
void ThriftMessage::CopyFrom(const ::google::protobuf::Message& from) {
if (&from == this) return;
Clear();
MergeFrom(from);
}
void ThriftMessage::CopyFrom(const ThriftMessage& from) {
if (&from == this) return;
Clear();
MergeFrom(from);
}
bool ThriftMessage::IsInitialized() const {
return true;
}
void ThriftMessage::Swap(ThriftMessage* other) {
if (other != this) {
const thrift_binary_head_t tmp = other->head;
other->head = head;
head = tmp;
body.swap(other->body);
}
}
::google::protobuf::Metadata ThriftMessage::GetMetadata() const {
protobuf_AssignDescriptorsOnce();
::google::protobuf::Metadata metadata;
metadata.descriptor = ThriftMessage_descriptor_;
metadata.reflection = NULL;
return metadata;
}
} // namespace brpc
#endif
// Copyright (c) 2014 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: wangxuefeng (wangxuefeng@didichuxing.com)
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
#ifndef BRPC_THRIFT_MESSAGE_H
#define BRPC_THRIFT_MESSAGE_H
#include <functional>
#include <string>
#include <google/protobuf/stubs/common.h>
#include <google/protobuf/generated_message_util.h>
#include <google/protobuf/repeated_field.h>
#include <google/protobuf/extension_set.h>
#include <google/protobuf/generated_message_reflection.h>
#include "google/protobuf/descriptor.pb.h"
#include "brpc/details/thrift_binary_head.h" // thrfit_binary_head_t
#include "brpc/details/thrift_utils.h"
#include "butil/iobuf.h"
#include <thrift/protocol/TBinaryProtocol.h>
namespace brpc {
// Internal implementation detail -- do not call these.
void protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto();
void protobuf_AssignDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto();
void protobuf_ShutdownFile_baidu_2frpc_2fthrift_binary_5fmessage_2eproto();
// Representing a thrift_binary request or response.
class ThriftMessage : public ::google::protobuf::Message {
public:
thrift_binary_head_t head;
butil::IOBuf body;
void (*thrift_raw_instance_deleter) (void*);
uint32_t (*thrift_raw_instance_writer) (void*, void*);
void* thrift_raw_instance;
int32_t thrift_message_seq_id;
std::string method_name;
public:
ThriftMessage();
virtual ~ThriftMessage();
ThriftMessage(const ThriftMessage& from);
inline ThriftMessage& operator=(const ThriftMessage& from) {
CopyFrom(from);
return *this;
}
static const ::google::protobuf::Descriptor* descriptor();
static const ThriftMessage& default_instance();
void Swap(ThriftMessage* other);
// implements Message ----------------------------------------------
ThriftMessage* New() const;
void CopyFrom(const ::google::protobuf::Message& from);
void MergeFrom(const ::google::protobuf::Message& from);
void CopyFrom(const ThriftMessage& from);
void MergeFrom(const ThriftMessage& from);
void Clear();
bool IsInitialized() const;
int ByteSize() const;
bool MergePartialFromCodedStream(
::google::protobuf::io::CodedInputStream* input);
void SerializeWithCachedSizes(
::google::protobuf::io::CodedOutputStream* output) const;
::google::protobuf::uint8* SerializeWithCachedSizesToArray(::google::protobuf::uint8* output) const;
int GetCachedSize() const { return ByteSize(); }
::google::protobuf::Metadata GetMetadata() const;
virtual uint32_t write(void* oprot) { return 0;}
virtual uint32_t read(void* iprot) { return 0;}
template<typename T>
T* cast() {
thrift_raw_instance = new T;
assert(thrift_raw_instance);
// serilize binary thrift message to thrift struct request
// for response, we just return the new instance and deserialize it in Closure
if (body.size() > 0 ) {
if (serialize_iobuf_to_thrift_message<T>(body, thrift_raw_instance,
&method_name, &thrift_message_seq_id)) {
} else {
delete static_cast<T*>(thrift_raw_instance);
return nullptr;
}
}
thrift_raw_instance_deleter = &thrift_framed_message_deleter<T>;
thrift_raw_instance_writer = &thrift_framed_message_writer<T>;
return static_cast<T*>(thrift_raw_instance);
}
private:
void SharedCtor();
void SharedDtor();
private:
friend void protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto_impl();
friend void protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto();
friend void protobuf_AssignDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto();
friend void protobuf_ShutdownFile_baidu_2frpc_2fthrift_binary_5fmessage_2eproto();
void InitAsDefaultInstance();
static ThriftMessage* default_instance_;
};
template <typename T>
class ThriftTemplateMessage : public ThriftMessage {
public:
ThriftTemplateMessage() {
thrift_message_ = new T;
assert(thrift_message_ != nullptr);
}
virtual ~ThriftTemplateMessage() { delete thrift_message_; }
ThriftTemplateMessage<T>& operator= (const ThriftTemplateMessage<T>& other) {
*thrift_message_ = *(other.thrift_message_);
return *this;
}
virtual uint32_t write(void* oprot) {
return thrift_message_->write(static_cast<::apache::thrift::protocol::TProtocol*>(oprot));
}
virtual uint32_t read(void* iprot) {
return thrift_message_->read(static_cast<::apache::thrift::protocol::TProtocol*>(iprot));
}
T& raw() {
return *thrift_message_;
}
private:
T* thrift_message_;
};
} // namespace brpc
#endif // BRPC_THRIFT_MESSAGE_H
#endif //ENABLE_THRIFT_FRAMED_PROTOCOL
// Copyright (c) 2015 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: wangxuefeng (wangxuefeng@didichuxing.com)
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
#include "butil/class_name.h"
#include "brpc/thrift_service.h"
#include "brpc/details/method_status.h"
namespace brpc {
BAIDU_CASSERT(sizeof(thrift_binary_head_t) == 4, sizeof_thrift_must_be_4);
ThriftService::ThriftService() : _additional_space(0) {
_status = new (std::nothrow) MethodStatus;
LOG_IF(FATAL, _status == NULL) << "Fail to new MethodStatus";
}
ThriftService::ThriftService(const ThriftServiceOptions& options)
: _status(NULL), _additional_space(options.additional_space) {
if (options.generate_status) {
_status = new (std::nothrow) MethodStatus;
LOG_IF(FATAL, _status == NULL) << "Fail to new MethodStatus";
}
}
ThriftService::~ThriftService() {
delete _status;
_status = NULL;
}
void ThriftService::Describe(std::ostream &os, const DescribeOptions&) const {
os << butil::class_name_str(*this);
}
void ThriftService::Expose(const butil::StringPiece& prefix) {
_cached_name = butil::class_name_str(*this);
if (_status == NULL) {
return;
}
std::string s;
s.reserve(prefix.size() + 1 + _cached_name.size());
s.append(prefix.data(), prefix.size());
s.push_back('_');
s.append(_cached_name);
_status->Expose(s);
}
} // namespace brpc
#endif
// Copyright (c) 2015 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: wangxuefeng (wangxuefeng@didichuxing.com)
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
#ifndef BRPC_THRIFT_SERVICE_H
#define BRPC_THRIFT_SERVICE_H
#include "brpc/controller.h" // Controller
#include "brpc/thrift_message.h" // ThriftMessage
#include "brpc/describable.h"
namespace brpc {
class Socket;
class Server;
class MethodStatus;
class StatusService;
namespace policy {
void ProcessThriftRequest(InputMessageBase* msg_base);
}
// The continuation of request processing. Namely send response back to client.
// NOTE: you DON'T need to inherit this class or create instance of this class.
class ThriftClosure : public google::protobuf::Closure {
public:
explicit ThriftClosure(void* additional_space);
// [Required] Call this to send response back to the client.
void Run();
// [Optional] Set the full method name. If unset, use name of the service.
void SetMethodName(const std::string& full_method_name);
// The space required by subclass at ThriftServiceOptions. subclass may
// utilizes this feature to save the cost of allocating closure separately.
// If subclass does not require space, this return value is NULL.
void* additional_space() { return _additional_space; }
// The starting time of the RPC, got from butil::cpuwide_time_us().
int64_t cpuwide_start_us() const { return _start_parse_us; }
// Don't send response back, used by MIMO.
void DoNotRespond();
private:
friend void policy::ProcessThriftRequest(InputMessageBase* msg_base);
friend class DeleteThriftClosure;
// Only callable by Run().
~ThriftClosure();
Socket* _socket_ptr;
const Server* _server;
int64_t _start_parse_us;
ThriftMessage _request;
ThriftMessage _response;
bool _do_respond;
void* _additional_space;
Controller _controller;
};
struct ThriftServiceOptions {
ThriftServiceOptions() : generate_status(true), additional_space(0) {}
ThriftServiceOptions(bool generate_status2, size_t additional_space2)
: generate_status(generate_status2)
, additional_space(additional_space2) {}
bool generate_status;
size_t additional_space;
};
// Inherit this class to let brpc server understands thrift_binary requests.
class ThriftService : public Describable {
public:
ThriftService();
ThriftService(const ThriftServiceOptions&);
virtual ~ThriftService();
// Implement this method to handle thrift_binary requests. Notice that this
// method can be called with a failed Controller(something wrong with the
// request before calling this method), in which case the implemenetation
// shall send specific response with error information back to client.
// Parameters:
// server The server receiving the request.
// controller per-rpc settings.
// request The thrift_binary request received.
// response The thrift_binary response that you should fill in.
// done You must call done->Run() to end the processing.
virtual void ProcessThriftFramedRequest(const Server& server,
Controller* controller,
ThriftMessage* request,
ThriftMessage* response,
ThriftClosure* done) = 0;
// Put descriptions into the stream.
void Describe(std::ostream &os, const DescribeOptions&) const;
private:
DISALLOW_COPY_AND_ASSIGN(ThriftService);
friend class ThriftClosure;
friend void policy::ProcessThriftRequest(InputMessageBase* msg_base);
friend class StatusService;
friend class Server;
private:
void Expose(const butil::StringPiece& prefix);
// Tracking status of non ThriftPbService
MethodStatus* _status;
size_t _additional_space;
std::string _cached_name;
};
} // namespace brpc
#endif // BRPC_THRIFT_SERVICE_H
#endif
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment