Commit 4cd5a7fe authored by wangxuefeng's avatar wangxuefeng

Add thrift protocol support for brpc

parent a45df223
BRPC_PATH = ../../
include $(BRPC_PATH)/config.mk
# Notes on the flags:
# 1. Added -fno-omit-frame-pointer: perf/tcmalloc-profiler use frame pointers by default
# 2. Added -D__const__= : Avoid over-optimizations of TLS variables by GCC>=4.8
CXXFLAGS = -std=c++0x -g -DDEBUG -D__const__= -pipe -W -Wall -Werror -Wno-unused-parameter -fPIC -fno-omit-frame-pointer
HDRS+=$(BRPC_PATH)/output/include
LIBS+=$(BRPC_PATH)/output/lib
HDRPATHS = $(addprefix -I, $(HDRS))
LIBPATHS = $(addprefix -L, $(LIBS))
COMMA=,
SOPATHS=$(addprefix -Wl$(COMMA)-rpath=, $(LIBS))
STATIC_LINKINGS += -lbrpc -lits-thrift
CLIENT_SOURCES = client.cpp
SERVER_SOURCES = server.cpp
PROTOS = $(wildcard *.proto)
PROTO_OBJS = $(PROTOS:.proto=.pb.o)
PROTO_GENS = $(PROTOS:.proto=.pb.h) $(PROTOS:.proto=.pb.cc)
CLIENT_OBJS = $(addsuffix .o, $(basename $(CLIENT_SOURCES)))
SERVER_OBJS = $(addsuffix .o, $(basename $(SERVER_SOURCES)))
.PHONY:all
all: echo_client echo_server thrift_server thrift_client libechothrift.a
.PHONY:clean
clean:
@echo "Cleaning"
@rm -rf echo_client echo_server $(PROTO_GENS) $(PROTO_OBJS) $(CLIENT_OBJS) $(SERVER_OBJS) thrift_server thrift_client EchoService.o echo_types.o libechothrift.a
echo_client:$(PROTO_OBJS) $(CLIENT_OBJS) libechothrift.a
@echo "Linking $@"
ifneq ("$(LINK_SO)", "")
@$(CXX) $(LIBPATHS) $(SOPATHS) -Xlinker "-(" $^ -Xlinker "-)" $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS) -o $@
else
@$(CXX) $(LIBPATHS) -Xlinker "-(" $^ -Wl,-Bstatic $(STATIC_LINKINGS) -Wl,-Bdynamic -Xlinker "-)" $(DYNAMIC_LINKINGS) -o $@
endif
echo_server:$(PROTO_OBJS) $(SERVER_OBJS) libechothrift.a
@echo "Linking $@"
ifneq ("$(LINK_SO)", "")
@$(CXX) $(LIBPATHS) $(SOPATHS) -Xlinker "-(" $^ libechothrift.a -Xlinker "-)" $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS) -o $@
else
@$(CXX) $(LIBPATHS) -Xlinker "-(" $^ libechothrift.a -Wl,-Bstatic $(STATIC_LINKINGS) -Wl,-Bdynamic -Xlinker "-)" $(DYNAMIC_LINKINGS) -o $@
endif
%.o:%.cpp
@echo "Compiling $@"
@$(CXX) -c $(HDRPATHS) $(CXXFLAGS) $< -o $@
%.o:%.cc
@echo "Compiling $@"
@$(CXX) -c $(HDRPATHS) $(CXXFLAGS) $< -o $@
libechothrift.a:
@echo "Generating thrift files"
@/home/wangxuefeng/work/third-64/thrift/bin/thrift --gen cpp echo.thrift
@g++ -c gen-cpp/echo_types.cpp -o echo_types.o -I/home/wangxuefeng/work/overall-eta-predictor/script/data/third-64-gcc485/thrift/include
@g++ -c gen-cpp/EchoService.cpp -o EchoService.o -I/home/wangxuefeng/work/overall-eta-predictor/script/data/third-64-gcc485/thrift/include
@ar -crv libechothrift.a EchoService.o echo_types.o
thrift_server: libechothrift.a
@g++ thrift_server.cpp gen-cpp/echo_types.cpp gen-cpp/EchoService.cpp -I/home/wangxuefeng/work/overall-eta-predictor/script/data/third-64-gcc485/thrift/include -L/home/wangxuefeng/work/overall-eta-predictor/script/data/third-64-gcc485/thrift/lib -L/home/wangxuefeng/work/overall-eta-predictor/script/data/third-64-gcc485/libevent/lib -lthriftnb -lthrift -levent -lpthread -o thrift_server
thrift_client: libechothrift.a
@g++ thrift_client.cpp gen-cpp/echo_types.cpp gen-cpp/EchoService.cpp -I/home/wangxuefeng/work/overall-eta-predictor/script/data/third-64-gcc485/thrift/include -L/home/wangxuefeng/work/overall-eta-predictor/script/data/third-64-gcc485/thrift/lib -lthriftnb -lthrift -levent -lpthread -o thrift_client
note:
Only thrift framed transport supported now, in another words, only working on thrift nonblocking mode.
summary:
echo_client/echo_server:
brpc + thrift protocol version
thrift_client/thrift_server:
thrift cpp version
build:
you need to add thrift as dep library in config.mk
// Copyright (c) 2014 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// A client sending requests to server every 1 second.
#include <gflags/gflags.h>
#include <butil/logging.h>
#include <butil/time.h>
#include <butil/strings/string_piece.h>
#include <brpc/channel.h>
#include <brpc/thrift_binary_message.h>
#include <bvar/bvar.h>
#include "thrift/transport/TBufferTransports.h"
#include "thrift/protocol/TBinaryProtocol.h"
#include "gen-cpp/EchoService.h"
#include "gen-cpp/echo_types.h"
bvar::LatencyRecorder g_latency_recorder("client");
using apache::thrift::protocol::TBinaryProtocol;
using apache::thrift::transport::TMemoryBuffer;
using namespace std;
using namespace example;
DEFINE_string(server, "0.0.0.0:8019", "IP Address of server");
DEFINE_string(load_balancer, "", "The algorithm for load balancing");
DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
int main(int argc, char* argv[]) {
// Parse gflags. We recommend you to use gflags as well.
google::ParseCommandLineFlags(&argc, &argv, true);
// A Channel represents a communication line to a Server. Notice that
// Channel is thread-safe and can be shared by all threads in your program.
brpc::Channel channel;
// Initialize the channel, NULL means using default options.
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_THRIFT;
options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/;
options.max_retry = FLAGS_max_retry;
if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) {
LOG(ERROR) << "Fail to initialize channel";
return -1;
}
// Send a request and wait for the response every 1 second.
int log_id = 0;
while (!brpc::IsAskedToQuit()) {
brpc::ThriftBinaryMessage request;
brpc::ThriftBinaryMessage response;
brpc::Controller cntl;
// Append message to `request'
boost::shared_ptr<TMemoryBuffer> o_buffer(new TMemoryBuffer());
boost::shared_ptr<TBinaryProtocol> oprot(new TBinaryProtocol(o_buffer));
// Construct request
EchoRequest t_request;
t_request.data = "hello";
// Serialize thrift request to binary request
oprot->writeMessageBegin("Echo", ::apache::thrift::protocol::T_CALL, 0);
EchoService_Echo_pargs args;
args.request = &t_request;
args.write(oprot.get());
oprot->writeMessageEnd();
oprot->getTransport()->writeEnd();
oprot->getTransport()->flush();
butil::IOBuf buf;
std::string s = o_buffer->getBufferAsString();
buf.append(s);
request.body = buf;
cntl.set_log_id(log_id ++); // set by user
// Because `done'(last parameter) is NULL, this function waits until
// the response comes back or error occurs(including timedout).
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to send thrift request, " << cntl.ErrorText();
sleep(1); // Remove this sleep in production code.
} else {
g_latency_recorder << cntl.latency_us();
}
// Parse/Desrialize binary response to thrift response
boost::shared_ptr<TMemoryBuffer> buffer(new TMemoryBuffer());
boost::shared_ptr<TBinaryProtocol> iprot(new TBinaryProtocol(buffer));
size_t body_len = response.head.body_len;
uint8_t* thrfit_b = (uint8_t*)malloc(body_len);
const size_t k = response.body.copy_to(thrfit_b, body_len);
if ( k != body_len) {
std::cout << "copy_to error!" << std::endl;
printf("k: %lu, body_len: %lu\n", k, body_len);
return -1;
}
buffer->resetBuffer(thrfit_b, body_len);
int32_t rseqid = 0;
std::string fname; // Thrift function name
::apache::thrift::protocol::TMessageType mtype;
try {
iprot->readMessageBegin(fname, mtype, rseqid);
if (mtype == ::apache::thrift::protocol::T_EXCEPTION) {
::apache::thrift::TApplicationException x;
x.read(iprot.get());
iprot->readMessageEnd();
iprot->getTransport()->readEnd();
throw x;
}
if (mtype != ::apache::thrift::protocol::T_REPLY) {
iprot->skip(::apache::thrift::protocol::T_STRUCT);
iprot->readMessageEnd();
iprot->getTransport()->readEnd();
}
if (fname.compare("Echo") != 0) {
iprot->skip(::apache::thrift::protocol::T_STRUCT);
iprot->readMessageEnd();
iprot->getTransport()->readEnd();
}
EchoResponse t_response;
EchoService_Echo_presult result;
result.success = &t_response;
result.read(iprot.get());
iprot->readMessageEnd();
iprot->getTransport()->readEnd();
if (!result.__isset.success) {
// _return pointer has now been filled
std::cout << "result.success not set!" << std::endl;
return -1;
}
std::cout << "response: " << t_response.data << std::endl;
} catch (...) {
std::cout << "Thrift Exception!" << std::endl;
}
LOG_EVERY_SECOND(INFO)
<< "Sending thrift requests at qps=" << g_latency_recorder.qps(1)
<< " latency=" << g_latency_recorder.latency(1);
}
LOG(INFO) << "EchoClient is going to quit";
return 0;
}
namespace cpp example
struct EchoRequest {
1: string data;
}
struct EchoResponse {
1: string data;
}
service EchoService {
EchoResponse Echo(1:EchoRequest request);
}
// Copyright (c) 2014 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// A server to receive EchoRequest and send back EchoResponse.
#include <gflags/gflags.h>
#include <butil/logging.h>
#include <brpc/server.h>
#include <brpc/thrift_service.h>
#include "thrift/transport/TBufferTransports.h"
#include "thrift/protocol/TBinaryProtocol.h"
#include "gen-cpp/EchoService.h"
#include "gen-cpp/echo_types.h"
DEFINE_int32(port, 8019, "TCP Port of this server");
DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
"read/write operations during the last `idle_timeout_s'");
DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel");
using apache::thrift::protocol::TBinaryProtocol;
using apache::thrift::transport::TMemoryBuffer;
using namespace std;
using namespace example;
// Adapt your own thrift-based protocol to use brpc
class MyThriftProtocol : public brpc::ThriftFramedService {
public:
void ProcessThriftBinaryRequest(const brpc::Server&,
brpc::Controller* cntl,
const brpc::ThriftBinaryMessage& request,
brpc::ThriftBinaryMessage* response,
brpc::ThriftFramedClosure* done) {
// This object helps you to call done->Run() in RAII style. If you need
// to process the request asynchronously, pass done_guard.release().
brpc::ClosureGuard done_guard(done);
if (cntl->Failed()) {
// NOTE: You can send back a response containing error information
// back to client instead of closing the connection.
cntl->CloseConnection("Close connection due to previous error");
return;
}
boost::shared_ptr<TMemoryBuffer> buffer(new TMemoryBuffer());
boost::shared_ptr<TBinaryProtocol> iprot(new TBinaryProtocol(buffer));
EchoRequest t_request;
size_t body_len = request.head.body_len;
uint8_t* thrfit_b = (uint8_t*)malloc(body_len);
const size_t k = request.body.copy_to(thrfit_b, body_len);
if ( k != body_len) {
cntl->CloseConnection("Close connection due to copy thrift binary message error");
return;
}
EchoService_Echo_args args;
buffer->resetBuffer(thrfit_b, body_len);
int32_t rseqid = 0;
std::string fname;
::apache::thrift::protocol::TMessageType mtype;
// deserilize thrift message
iprot->readMessageBegin(fname, mtype, rseqid);
args.read(iprot.get());
iprot->readMessageEnd();
iprot->getTransport()->readEnd();
t_request = args.request;
std::cout << "RPC funcname: " << fname << std::endl;
std::cout << "request.data: " << t_request.data << std::endl;
boost::shared_ptr<TMemoryBuffer> o_buffer(new TMemoryBuffer());
boost::shared_ptr<TBinaryProtocol> oprot(new TBinaryProtocol(o_buffer));
// Proc RPC
EchoResponse t_response;
t_response.data = t_request.data + " world";
EchoService_Echo_result result;
result.success = t_response;
result.__isset.success = true;
// serilize response
oprot->writeMessageBegin("Echo", ::apache::thrift::protocol::T_REPLY, 0);
result.write(oprot.get());
oprot->writeMessageEnd();
oprot->getTransport()->writeEnd();
oprot->getTransport()->flush();
butil::IOBuf buf;
std::string s = o_buffer->getBufferAsString();
buf.append(s);
response->body = buf;
printf("process in MyThriftProtocol\n");
}
};
int main(int argc, char* argv[]) {
// Parse gflags. We recommend you to use gflags as well.
google::ParseCommandLineFlags(&argc, &argv, true);
brpc::Server server;
brpc::ServerOptions options;
options.thrift_service = new MyThriftProtocol;
options.idle_timeout_sec = FLAGS_idle_timeout_s;
options.max_concurrency = FLAGS_max_concurrency;
// Start the server.
if (server.Start(FLAGS_port, &options) != 0) {
LOG(ERROR) << "Fail to start EchoServer";
return -1;
}
// Wait until Ctrl-C is pressed, then Stop() and Join() the server.
server.RunUntilAskedToQuit();
return 0;
}
#include "gen-cpp/EchoService.h"
#include "gen-cpp/echo_types.h"
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <iostream>
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace example;
int main(int argc, char **argv) {
boost::shared_ptr<TSocket> socket(new TSocket("127.0.0.1", 8019));
boost::shared_ptr<TTransport> transport(new TFramedTransport(socket));
boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
EchoServiceClient client(protocol);
transport->open();
EchoRequest req;
req.data = "hello";
EchoResponse res;
client.Echo(res, req);
std::cout << "Res: " << res.data << std::endl;
transport->close();
return 0;
}
#include "gen-cpp/EchoService.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TTransportUtils.h>
#include <thrift/server/TNonblockingServer.h>
#include <thrift/concurrency/PosixThreadFactory.h>
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace apache::thrift::server;
using namespace apache::thrift::concurrency;
using boost::shared_ptr;
using namespace example;
class EchoServiceHandler : virtual public EchoServiceIf {
public:
EchoServiceHandler() {}
void Echo(EchoResponse& res, const EchoRequest& req) {
res.data = req.data + " world";
return;
}
};
int main(int argc, char **argv) {
int port = 8019;
shared_ptr<EchoServiceHandler> handler(new EchoServiceHandler());
shared_ptr<PosixThreadFactory> thread_factory(
new PosixThreadFactory(PosixThreadFactory::ROUND_ROBIN,
PosixThreadFactory::NORMAL, 1, false) );
shared_ptr<TProcessor> processor(new EchoServiceProcessor(handler));
shared_ptr<TProtocolFactory> protocol_factory(new TBinaryProtocolFactory());
shared_ptr<TTransportFactory> transport_factory(new TBufferedTransportFactory());
shared_ptr<ThreadManager> thread_mgr(ThreadManager::newSimpleThreadManager(2));
thread_mgr->threadFactory(thread_factory);
thread_mgr->start();
TNonblockingServer server(processor,
transport_factory, transport_factory, protocol_factory,
protocol_factory, port, thread_mgr);
server.serve();
return 0;
}
...@@ -57,30 +57,21 @@ ...@@ -57,30 +57,21 @@
#include "brpc/policy/nshead_mcpack_protocol.h" #include "brpc/policy/nshead_mcpack_protocol.h"
#include "brpc/policy/rtmp_protocol.h" #include "brpc/policy/rtmp_protocol.h"
#include "brpc/policy/esp_protocol.h" #include "brpc/policy/esp_protocol.h"
#include "brpc/policy/thrift_protocol.h"
#include "brpc/input_messenger.h" // get_or_new_client_side_messenger #include "brpc/input_messenger.h" // get_or_new_client_side_messenger
#include "brpc/socket_map.h" // SocketMapList #include "brpc/socket_map.h" // SocketMapList
#include "brpc/server.h" #include "brpc/server.h"
#include "brpc/trackme.h" // TrackMe #include "brpc/trackme.h" // TrackMe
#include "brpc/details/usercode_backup_pool.h"
#include <malloc.h> // malloc_trim #include <malloc.h> // malloc_trim
#include "brpc/details/usercode_backup_pool.h"
#include "butil/fd_guard.h" #include "butil/fd_guard.h"
#include "butil/files/file_watcher.h" #include "butil/files/file_watcher.h"
extern "C" {
// defined in gperftools/malloc_extension_c.h
void BAIDU_WEAK MallocExtension_ReleaseFreeMemory(void);
}
namespace brpc { namespace brpc {
DECLARE_bool(usercode_in_pthread); DECLARE_bool(usercode_in_pthread);
DEFINE_int32(free_memory_to_system_interval, 0,
"Try to return free memory to system every so many seconds, "
"values <= 0 disables this feature");
BRPC_VALIDATE_GFLAG(free_memory_to_system_interval, PassValidate);
namespace policy { namespace policy {
// Defined in http_rpc_protocol.cpp // Defined in http_rpc_protocol.cpp
void InitCommonStrings(); void InitCommonStrings();
...@@ -187,7 +178,7 @@ static void* GlobalUpdate(void*) { ...@@ -187,7 +178,7 @@ static void* GlobalUpdate(void*) {
const int WARN_NOSLEEP_THRESHOLD = 2; const int WARN_NOSLEEP_THRESHOLD = 2;
int64_t last_time_us = start_time_us; int64_t last_time_us = start_time_us;
int consecutive_nosleep = 0; int consecutive_nosleep = 0;
int64_t last_return_free_memory_time = start_time_us; //int64_t last_malloc_trim_time = start_time_us;
while (1) { while (1) {
const int64_t sleep_us = 1000000L + last_time_us - butil::gettimeofday_us(); const int64_t sleep_us = 1000000L + last_time_us - butil::gettimeofday_us();
if (sleep_us > 0) { if (sleep_us > 0) {
...@@ -224,24 +215,11 @@ static void* GlobalUpdate(void*) { ...@@ -224,24 +215,11 @@ static void* GlobalUpdate(void*) {
} }
} }
const int return_mem_interval = // TODO: Add branch for tcmalloc.
FLAGS_free_memory_to_system_interval/*reloadable*/; // if (last_time_us > last_malloc_trim_time + 10*1000000L) {
if (return_mem_interval > 0 && // last_malloc_trim_time = last_time_us;
last_time_us >= last_return_free_memory_time + // malloc_trim(10*1024*1024/*leave 10M pad*/);
return_mem_interval * 1000000L) { // }
last_return_free_memory_time = last_time_us;
// TODO: Calling MallocExtension::instance()->ReleaseFreeMemory may
// crash the program in later calls to malloc, verified on tcmalloc
// 1.7 and 2.5, which means making the static member function weak
// in details/tcmalloc_extension.cpp is probably not correct, however
// it does work for heap profilers.
if (MallocExtension_ReleaseFreeMemory != NULL) {
MallocExtension_ReleaseFreeMemory();
} else {
// GNU specific.
malloc_trim(10 * 1024 * 1024/*leave 10M pad*/);
}
}
} }
return NULL; return NULL;
} }
...@@ -421,6 +399,15 @@ static void GlobalInitializeOrDieImpl() { ...@@ -421,6 +399,15 @@ static void GlobalInitializeOrDieImpl() {
exit(1); exit(1);
} }
Protocol thrift_binary_protocol = { ParseThriftBinaryMessage,
SerializeThriftBinaryRequest, PackThriftBinaryRequest,
ProcessThriftBinaryRequest, ProcessThriftBinaryResponse,
VerifyThriftBinaryRequest, NULL, NULL,
CONNECTION_TYPE_POOLED_AND_SHORT, "thrift" };
if (RegisterProtocol(PROTOCOL_THRIFT, thrift_binary_protocol) != 0) {
exit(1);
}
Protocol mc_binary_protocol = { ParseMemcacheMessage, Protocol mc_binary_protocol = { ParseMemcacheMessage,
SerializeMemcacheRequest, SerializeMemcacheRequest,
PackMemcacheRequest, PackMemcacheRequest,
......
...@@ -45,6 +45,7 @@ enum ProtocolType { ...@@ -45,6 +45,7 @@ enum ProtocolType {
// Reserve special protocol for cds-agent, which depends on FIFO right now // Reserve special protocol for cds-agent, which depends on FIFO right now
PROTOCOL_CDS_AGENT = 23; // Client side only PROTOCOL_CDS_AGENT = 23; // Client side only
PROTOCOL_ESP = 24; // Client side only PROTOCOL_ESP = 24; // Client side only
PROTOCOL_THRIFT = 25; // Server side only
} }
enum CompressType { enum CompressType {
......
// Copyright (c) 2015 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Ge,Jun (gejun@baidu.com)
#include <google/protobuf/descriptor.h> // MethodDescriptor
#include <google/protobuf/message.h> // Message
#include <gflags/gflags.h>
#include "butil/time.h"
#include "butil/iobuf.h" // butil::IOBuf
#include "brpc/log.h"
#include "brpc/controller.h" // Controller
#include "brpc/socket.h" // Socket
#include "brpc/server.h" // Server
#include "brpc/span.h"
#include "brpc/details/server_private_accessor.h"
#include "brpc/details/controller_private_accessor.h"
#include "brpc/thrift_service.h"
#include "brpc/policy/most_common_message.h"
#include "brpc/policy/thrift_protocol.h"
#include "brpc/details/usercode_backup_pool.h"
extern "C" {
void bthread_assign_data(void* data) __THROW;
}
namespace brpc {
ThriftFramedClosure::ThriftFramedClosure(void* additional_space)
: _socket_ptr(NULL)
, _server(NULL)
, _start_parse_us(0)
, _do_respond(true)
, _additional_space(additional_space) {
}
ThriftFramedClosure::~ThriftFramedClosure() {
LogErrorTextAndDelete(false)(&_controller);
}
void ThriftFramedClosure::DoNotRespond() {
_do_respond = false;
}
class DeleteThriftFramedClosure {
public:
void operator()(ThriftFramedClosure* done) const {
done->~ThriftFramedClosure();
free(done);
}
};
void ThriftFramedClosure::Run() {
// Recycle itself after `Run'
std::unique_ptr<ThriftFramedClosure, DeleteThriftFramedClosure> recycle_ctx(this);
SocketUniquePtr sock(_socket_ptr);
ScopedRemoveConcurrency remove_concurrency_dummy(_server, &_controller);
ControllerPrivateAccessor accessor(&_controller);
Span* span = accessor.span();
if (span) {
span->set_start_send_us(butil::cpuwide_time_us());
}
ScopedMethodStatus method_status(_server->options().thrift_service->_status);
if (!method_status) {
// Judge errors belongings.
// may not be accurate, but it does not matter too much.
const int error_code = _controller.ErrorCode();
if (error_code == ENOSERVICE ||
error_code == ENOMETHOD ||
error_code == EREQUEST ||
error_code == ECLOSE ||
error_code == ELOGOFF ||
error_code == ELIMIT) {
ServerPrivateAccessor(_server).AddError();
}
}
if (_controller.IsCloseConnection()) {
sock->SetFailed();
return;
}
if (_do_respond) {
// response uses request's head as default.
_response.head = _request.head;
uint32_t length = _response.body.length();
_response.head.body_len = htonl(length);
if (span) {
int response_size = sizeof(thrift_binary_head_t) + _response.head.body_len;
span->set_response_size(response_size);
}
butil::IOBuf write_buf;
write_buf.append(&_response.head, sizeof(thrift_binary_head_t));
write_buf.append(_response.body.movable());
// Have the risk of unlimited pending responses, in which case, tell
// users to set max_concurrency.
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (sock->Write(&write_buf, &wopt) != 0) {
const int errcode = errno;
PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
_controller.SetFailed(errcode, "Fail to write into %s",
sock->description().c_str());
return;
}
}
if (span) {
// TODO: this is not sent
span->set_sent_us(butil::cpuwide_time_us());
}
if (method_status) {
method_status.release()->OnResponded(
!_controller.Failed(), butil::cpuwide_time_us() - cpuwide_start_us());
}
}
void ThriftFramedClosure::SetMethodName(const std::string& full_method_name) {
ControllerPrivateAccessor accessor(&_controller);
Span* span = accessor.span();
if (span) {
span->ResetServerSpanName(full_method_name);
}
}
namespace policy {
ParseResult ParseThriftBinaryMessage(butil::IOBuf* source,
Socket*, bool /*read_eof*/, const void* /*arg*/) {
char header_buf[sizeof(thrift_binary_head_t) + 3];
const size_t n = source->copy_to(header_buf, sizeof(thrift_binary_head_t) + 3);
if (n < 7) {
return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
}
const void* dummy = header_buf + sizeof(thrift_binary_head_t);
const int32_t sz = ntohl(*(int32_t*)dummy);
int32_t version = sz & VERSION_MASK;
if (version != VERSION_1) {
RPC_VLOG << "magic_num=" << version
<< " doesn't match THRIFT_MAGIC_NUM=" << VERSION_1;
return MakeParseError(PARSE_ERROR_TRY_OTHERS);
}
thrift_binary_head_t* thrift = (thrift_binary_head_t *)header_buf;
thrift->body_len = ntohl(thrift->body_len);
uint32_t body_len = thrift->body_len;
if (body_len > FLAGS_max_body_size) {
return MakeParseError(PARSE_ERROR_TOO_BIG_DATA);
} else if (source->length() < sizeof(header_buf) + body_len - 3) {
return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
}
policy::MostCommonMessage* msg = policy::MostCommonMessage::Get();
source->cutn(&msg->meta, sizeof(thrift_binary_head_t));
source->cutn(&msg->payload, body_len);
return MakeMessage(msg);
}
struct CallMethodInBackupThreadArgs {
ThriftFramedService* service;
const Server* server;
Controller* controller;
const ThriftBinaryMessage* request;
ThriftBinaryMessage* response;
ThriftFramedClosure* done;
};
static void CallMethodInBackupThread(void* void_args) {
CallMethodInBackupThreadArgs* args = (CallMethodInBackupThreadArgs*)void_args;
args->service->ProcessThriftBinaryRequest(*args->server, args->controller,
*args->request, args->response,
args->done);
delete args;
}
static void EndRunningCallMethodInPool(ThriftFramedService* service,
const Server& server,
Controller* controller,
const ThriftBinaryMessage& request,
ThriftBinaryMessage* response,
ThriftFramedClosure* done) {
CallMethodInBackupThreadArgs* args = new CallMethodInBackupThreadArgs;
args->service = service;
args->server = &server;
args->controller = controller;
args->request = &request;
args->response = response;
args->done = done;
return EndRunningUserCodeInPool(CallMethodInBackupThread, args);
};
void ProcessThriftBinaryRequest(InputMessageBase* msg_base) {
const int64_t start_parse_us = butil::cpuwide_time_us();
DestroyingPtr<MostCommonMessage> msg(static_cast<MostCommonMessage*>(msg_base));
SocketUniquePtr socket(msg->ReleaseSocket());
const Server* server = static_cast<const Server*>(msg_base->arg());
ScopedNonServiceError non_service_error(server);
char buf[sizeof(thrift_binary_head_t)];
const char *p = (const char *)msg->meta.fetch(buf, sizeof(buf));
thrift_binary_head_t *req_head = (thrift_binary_head_t *)p;
req_head->body_len = ntohl(req_head->body_len);
ThriftFramedService* service = server->options().thrift_service;
if (service == NULL) {
LOG_EVERY_SECOND(WARNING)
<< "Received thrift request however the server does not set"
" ServerOptions.thrift_service, close the connection.";
socket->SetFailed();
return;
}
void* space = malloc(sizeof(ThriftFramedClosure) + service->_additional_space);
if (!space) {
LOG(FATAL) << "Fail to new ThriftFramedClosure";
socket->SetFailed();
return;
}
// Switch to service-specific error.
non_service_error.release();
MethodStatus* method_status = service->_status;
if (method_status) {
CHECK(method_status->OnRequested());
}
void* sub_space = NULL;
if (service->_additional_space) {
sub_space = (char*)space + sizeof(ThriftFramedClosure);
}
ThriftFramedClosure* thrift_done = new (space) ThriftFramedClosure(sub_space);
Controller* cntl = &(thrift_done->_controller);
ThriftBinaryMessage* req = &(thrift_done->_request);
ThriftBinaryMessage* res = &(thrift_done->_response);
req->head = *req_head;
msg->payload.swap(req->body);
thrift_done->_start_parse_us = start_parse_us;
thrift_done->_socket_ptr = socket.get();
thrift_done->_server = server;
ServerPrivateAccessor server_accessor(server);
ControllerPrivateAccessor accessor(cntl);
const bool security_mode = server->options().security_mode() &&
socket->user() == server_accessor.acceptor();
// Initialize log_id with the log_id in thrift. Notice that the protocols
// on top of ThriftFramedService may pack log_id in meta or user messages and
// overwrite the value.
//cntl->set_log_id(req_head->log_id);
accessor.set_server(server)
.set_security_mode(security_mode)
.set_peer_id(socket->id())
.set_remote_side(socket->remote_side())
.set_local_side(socket->local_side())
.set_request_protocol(PROTOCOL_NSHEAD);
// Tag the bthread with this server's key for thread_local_data().
if (server->thread_local_options().thread_local_data_factory) {
bthread_assign_data((void*)&server->thread_local_options());
}
Span* span = NULL;
if (IsTraceable(false)) {
span = Span::CreateServerSpan(0, 0, 0, msg->base_real_us());
accessor.set_span(span);
//span->set_log_id(req_head->log_id);
span->set_remote_side(cntl->remote_side());
span->set_protocol(PROTOCOL_NSHEAD);
span->set_received_us(msg->received_us());
span->set_start_parse_us(start_parse_us);
span->set_request_size(sizeof(thrift_binary_head_t) + req_head->body_len);
}
do {
if (!server->IsRunning()) {
cntl->SetFailed(ELOGOFF, "Server is stopping");
break;
}
if (!server_accessor.AddConcurrency(cntl)) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
server->options().max_concurrency);
break;
}
if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
cntl->SetFailed(ELIMIT, "Too many user code to run when"
" -usercode_in_pthread is on");
break;
}
} while (false);
msg.reset(); // optional, just release resourse ASAP
// `socket' will be held until response has been sent
socket.release();
if (span) {
span->ResetServerSpanName(service->_cached_name);
span->set_start_callback_us(butil::cpuwide_time_us());
span->AsParent();
}
if (!FLAGS_usercode_in_pthread) {
return service->ProcessThriftBinaryRequest(*server, cntl, *req, res, thrift_done);
}
if (BeginRunningUserCode()) {
service->ProcessThriftBinaryRequest(*server, cntl, *req, res, thrift_done);
return EndRunningUserCodeInPlace();
} else {
return EndRunningCallMethodInPool(
service, *server, cntl, *req, res, thrift_done);
}
}
void ProcessThriftBinaryResponse(InputMessageBase* msg_base) {
const int64_t start_parse_us = butil::cpuwide_time_us();
DestroyingPtr<MostCommonMessage> msg(static_cast<MostCommonMessage*>(msg_base));
// Fetch correlation id that we saved before in `PacThriftBinaryRequest'
const CallId cid = { static_cast<uint64_t>(msg->socket()->correlation_id()) };
Controller* cntl = NULL;
const int rc = bthread_id_lock(cid, (void**)&cntl);
if (rc != 0) {
LOG_IF(ERROR, rc != EINVAL && rc != EPERM)
<< "Fail to lock correlation_id=" << cid << ": " << berror(rc);
return;
}
ControllerPrivateAccessor accessor(cntl);
Span* span = accessor.span();
if (span) {
span->set_base_real_us(msg->base_real_us());
span->set_received_us(msg->received_us());
span->set_response_size(msg->payload.length());
span->set_start_parse_us(start_parse_us);
}
// MUST be ThriftBinaryMessage (checked in SerializeThriftBinaryRequest)
ThriftBinaryMessage* response = (ThriftBinaryMessage*)cntl->response();
const int saved_error = cntl->ErrorCode();
if (response != NULL) {
msg->meta.copy_to(&response->head, sizeof(thrift_binary_head_t));
response->head.body_len = ntohl(response->head.body_len);
msg->payload.swap(response->body);
} // else just ignore the response.
// Unlocks correlation_id inside. Revert controller's
// error code if it version check of `cid' fails
msg.reset(); // optional, just release resourse ASAP
accessor.OnResponse(cid, saved_error);
}
bool VerifyThriftBinaryRequest(const InputMessageBase* msg_base) {
Server* server = (Server*)msg_base->arg();
if (server->options().auth) {
LOG(WARNING) << "thrift does not support authentication";
return false;
}
return true;
}
void SerializeThriftBinaryRequest(butil::IOBuf* request_buf, Controller* cntl,
const google::protobuf::Message* req_base) {
if (req_base == NULL) {
return cntl->SetFailed(EREQUEST, "request is NULL");
}
ControllerPrivateAccessor accessor(cntl);
const ThriftBinaryMessage* req = (const ThriftBinaryMessage*)req_base;
thrift_binary_head_t head = req->head;
head.body_len = ntohl(req->body.size());
request_buf->append(&head, sizeof(head));
request_buf->append(req->body);
}
void PackThriftBinaryRequest(
butil::IOBuf* packet_buf,
SocketMessage**,
uint64_t correlation_id,
const google::protobuf::MethodDescriptor*,
Controller* cntl,
const butil::IOBuf& request,
const Authenticator*) {
ControllerPrivateAccessor accessor(cntl);
if (accessor.connection_type() == CONNECTION_TYPE_SINGLE) {
return cntl->SetFailed(
EINVAL, "thrift protocol can't work with CONNECTION_TYPE_SINGLE");
}
// Store `correlation_id' into the socket since thrift protocol can't
// pack the field.
accessor.get_sending_socket()->set_correlation_id(correlation_id);
Span* span = accessor.span();
if (span) {
span->set_request_size(request.length());
// TODO: Nowhere to set tracing ids.
// request_meta->set_trace_id(span->trace_id());
// request_meta->set_span_id(span->span_id());
// request_meta->set_parent_span_id(span->parent_span_id());
}
packet_buf->append(request);
}
} // namespace policy
} // namespace brpc
// Copyright (c) 2015 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Ge,Jun (gejun@baidu.com)
#ifndef BRPC_POLICY_THRIFT_PROTOCOL_H
#define BRPC_POLICY_THRIFT_PROTOCOL_H
#include "brpc/protocol.h"
#include "brpc/thrift_binary_message.h"
namespace brpc {
namespace policy {
// Parse binary protocol format of thrift framed
ParseResult ParseThriftBinaryMessage(butil::IOBuf* source, Socket* socket, bool read_eof, const void *arg);
// Actions to a (client) request in thrift binary framed format
void ProcessThriftBinaryRequest(InputMessageBase* msg);
// Actions to a (server) response in thrift binary framed format
void ProcessThriftBinaryResponse(InputMessageBase* msg);
void SerializeThriftBinaryRequest(butil::IOBuf* request_buf, Controller* controller,
const google::protobuf::Message* request);
void PackThriftBinaryRequest(
butil::IOBuf* packet_buf,
SocketMessage**,
uint64_t correlation_id,
const google::protobuf::MethodDescriptor*,
Controller* controller,
const butil::IOBuf&,
const Authenticator*);
// Verify authentication information in thrift binary format
bool VerifyThriftBinaryRequest(const InputMessageBase *msg);
} // namespace policy
} // namespace brpc
#endif // BRPC_POLICY_THRIFT_PROTOCOL_H
...@@ -167,6 +167,10 @@ const ConnectionType CONNECTION_TYPE_ALL = ...@@ -167,6 +167,10 @@ const ConnectionType CONNECTION_TYPE_ALL =
(int)CONNECTION_TYPE_POOLED | (int)CONNECTION_TYPE_POOLED |
(int)CONNECTION_TYPE_SHORT); (int)CONNECTION_TYPE_SHORT);
// DEPRECATED: old names.
const ProtocolType PROTOCOL_BAIDU_RPC = PROTOCOL_BAIDU_STD;
const ProtocolType PROTOCOL_MEMCACHE_BINARY = PROTOCOL_MEMCACHE;
// [thread-safe] // [thread-safe]
// Register `protocol' using key=`type'. // Register `protocol' using key=`type'.
// Returns 0 on success, -1 otherwise // Returns 0 on success, -1 otherwise
......
...@@ -40,6 +40,7 @@ ...@@ -40,6 +40,7 @@
#include "brpc/details/ssl_helper.h" // CreateSSLContext #include "brpc/details/ssl_helper.h" // CreateSSLContext
#include "brpc/protocol.h" // ListProtocols #include "brpc/protocol.h" // ListProtocols
#include "brpc/nshead_service.h" // NsheadService #include "brpc/nshead_service.h" // NsheadService
#include "brpc/thrift_service.h" // ThriftService
#include "brpc/builtin/bad_method_service.h" // BadMethodService #include "brpc/builtin/bad_method_service.h" // BadMethodService
#include "brpc/builtin/get_favicon_service.h" #include "brpc/builtin/get_favicon_service.h"
#include "brpc/builtin/get_js_service.h" #include "brpc/builtin/get_js_service.h"
...@@ -127,6 +128,7 @@ SSLOptions::SSLOptions() ...@@ -127,6 +128,7 @@ SSLOptions::SSLOptions()
ServerOptions::ServerOptions() ServerOptions::ServerOptions()
: idle_timeout_sec(-1) : idle_timeout_sec(-1)
, nshead_service(NULL) , nshead_service(NULL)
, thrift_service(NULL)
, mongo_service_adaptor(NULL) , mongo_service_adaptor(NULL)
, auth(NULL) , auth(NULL)
, server_owns_auth(false) , server_owns_auth(false)
...@@ -208,15 +210,30 @@ static void PrintSupportedCompressions(std::ostream& os, void*) { ...@@ -208,15 +210,30 @@ static void PrintSupportedCompressions(std::ostream& os, void*) {
} }
} }
static bool check_TCMALLOC_SAMPLE_PARAMETER() {
char* str = getenv("TCMALLOC_SAMPLE_PARAMETER");
if (str == NULL) {
return false;
}
char* endptr;
int val = strtol(str, &endptr, 10);
return (*endptr == '\0' && val > 0);
}
static bool has_TCMALLOC_SAMPLE_PARAMETER() {
static bool val = check_TCMALLOC_SAMPLE_PARAMETER();
return val;
}
static void PrintEnabledProfilers(std::ostream& os, void*) { static void PrintEnabledProfilers(std::ostream& os, void*) {
if (cpu_profiler_enabled) { if (cpu_profiler_enabled) {
os << "cpu "; os << "cpu ";
} }
if (IsHeapProfilerEnabled()) { if (IsHeapProfilerEnabled) {
if (has_TCMALLOC_SAMPLE_PARAMETER()) { if (has_TCMALLOC_SAMPLE_PARAMETER()) {
os << "heap "; os << "heap ";
} else { } else {
os << "heap(no TCMALLOC_SAMPLE_PARAMETER in env) "; os << "heap(lack of TCMALLOC_SAMPLE_PARAMETER) ";
} }
} }
os << "contention"; os << "contention";
...@@ -315,6 +332,10 @@ void* Server::UpdateDerivedVars(void* arg) { ...@@ -315,6 +332,10 @@ void* Server::UpdateDerivedVars(void* arg) {
server->options().nshead_service->Expose(prefix); server->options().nshead_service->Expose(prefix);
} }
if (server->options().thrift_service) {
server->options().thrift_service->Expose(prefix);
}
int64_t last_time = butil::gettimeofday_us(); int64_t last_time = butil::gettimeofday_us();
int consecutive_nosleep = 0; int consecutive_nosleep = 0;
while (1) { while (1) {
...@@ -397,6 +418,9 @@ Server::~Server() { ...@@ -397,6 +418,9 @@ Server::~Server() {
delete _options.nshead_service; delete _options.nshead_service;
_options.nshead_service = NULL; _options.nshead_service = NULL;
delete _options.thrift_service;
_options.thrift_service = NULL;
delete _options.http_master_service; delete _options.http_master_service;
_options.http_master_service = NULL; _options.http_master_service = NULL;
...@@ -646,15 +670,6 @@ struct RevertServerStatus { ...@@ -646,15 +670,6 @@ struct RevertServerStatus {
} }
}; };
static int get_port_from_fd(int fd) {
struct sockaddr_in addr;
socklen_t size = sizeof(addr);
if (getsockname(fd, (struct sockaddr*)&addr, &size) < 0) {
return -1;
}
return ntohs(addr.sin_port);
}
int Server::StartInternal(const butil::ip_t& ip, int Server::StartInternal(const butil::ip_t& ip,
const PortRange& port_range, const PortRange& port_range,
const ServerOptions *opt) { const ServerOptions *opt) {
...@@ -886,15 +901,6 @@ int Server::StartInternal(const butil::ip_t& ip, ...@@ -886,15 +901,6 @@ int Server::StartInternal(const butil::ip_t& ip,
} }
return -1; return -1;
} }
if (_listen_addr.port == 0) {
// port=0 makes kernel dynamically select a port from
// https://en.wikipedia.org/wiki/Ephemeral_port
_listen_addr.port = get_port_from_fd(sockfd);
if (_listen_addr.port <= 0) {
LOG(ERROR) << "Fail to get port from fd=" << sockfd;
return -1;
}
}
if (_am == NULL) { if (_am == NULL) {
_am = BuildAcceptor(); _am = BuildAcceptor();
if (NULL == _am) { if (NULL == _am) {
...@@ -924,12 +930,6 @@ int Server::StartInternal(const butil::ip_t& ip, ...@@ -924,12 +930,6 @@ int Server::StartInternal(const butil::ip_t& ip,
<< " is same with port=" << _listen_addr.port << " to Start()"; << " is same with port=" << _listen_addr.port << " to Start()";
return -1; return -1;
} }
if (_options.internal_port == 0) {
LOG(ERROR) << "ServerOptions.internal_port cannot be 0, which"
" allocates a dynamic and probabaly unfiltered port,"
" against the purpose of \"being internal\".";
return -1;
}
butil::EndPoint internal_point = _listen_addr; butil::EndPoint internal_point = _listen_addr;
internal_point.port = _options.internal_port; internal_point.port = _options.internal_port;
butil::fd_guard sockfd(tcp_listen(internal_point, FLAGS_reuse_addr)); butil::fd_guard sockfd(tcp_listen(internal_point, FLAGS_reuse_addr));
...@@ -1519,7 +1519,7 @@ void Server::GenerateVersionIfNeeded() { ...@@ -1519,7 +1519,7 @@ void Server::GenerateVersionIfNeeded() {
if (!_version.empty()) { if (!_version.empty()) {
return; return;
} }
int extra_count = !!_options.nshead_service + !!_options.rtmp_service; int extra_count = !!_options.nshead_service + !!_options.rtmp_service + !!_options.thrift_service;
_version.reserve((extra_count + service_count()) * 20); _version.reserve((extra_count + service_count()) * 20);
for (ServiceMap::const_iterator it = _fullname_service_map.begin(); for (ServiceMap::const_iterator it = _fullname_service_map.begin();
it != _fullname_service_map.end(); ++it) { it != _fullname_service_map.end(); ++it) {
...@@ -1536,6 +1536,13 @@ void Server::GenerateVersionIfNeeded() { ...@@ -1536,6 +1536,13 @@ void Server::GenerateVersionIfNeeded() {
} }
_version.append(butil::class_name_str(*_options.nshead_service)); _version.append(butil::class_name_str(*_options.nshead_service));
} }
if (_options.thrift_service) {
if (!_version.empty()) {
_version.push_back('+');
}
_version.append(butil::class_name_str(*_options.thrift_service));
}
if (_options.rtmp_service) { if (_options.rtmp_service) {
if (!_version.empty()) { if (!_version.empty()) {
_version.push_back('+'); _version.push_back('+');
......
...@@ -45,6 +45,7 @@ namespace brpc { ...@@ -45,6 +45,7 @@ namespace brpc {
class Acceptor; class Acceptor;
class MethodStatus; class MethodStatus;
class NsheadService; class NsheadService;
class ThriftFramedService;
class SimpleDataPool; class SimpleDataPool;
class MongoServiceAdaptor; class MongoServiceAdaptor;
class RestfulMap; class RestfulMap;
...@@ -139,6 +140,11 @@ struct ServerOptions { ...@@ -139,6 +140,11 @@ struct ServerOptions {
// Default: NULL // Default: NULL
NsheadService* nshead_service; NsheadService* nshead_service;
// Process requests in format of thrift_binary_head_t + blob.
// Owned by Server and deleted in server's destructor
// Default: NULL
ThriftFramedService* thrift_service;
// Adaptor for Mongo protocol, check src/brpc/mongo_service_adaptor.h for details // Adaptor for Mongo protocol, check src/brpc/mongo_service_adaptor.h for details
// The adaptor will not be deleted by server // The adaptor will not be deleted by server
// and must remain valid when server is running. // and must remain valid when server is running.
...@@ -241,7 +247,7 @@ struct ServerOptions { ...@@ -241,7 +247,7 @@ struct ServerOptions {
// Provide builtin services at this port rather than the port to Start(). // Provide builtin services at this port rather than the port to Start().
// When your server needs to be accessed from public (including traffic // When your server needs to be accessed from public (including traffic
// redirected by nginx or other http front-end servers), set this port // redirected by nginx or other http front-end servers), set this port
// to a port number that's ONLY accessible from internal network // to a port number that's ONLY accessible from Baidu's internal network
// so that you can check out the builtin services from this port while // so that you can check out the builtin services from this port while
// hiding them from public. Setting this option also enables security // hiding them from public. Setting this option also enables security
// protection code which we may add constantly. // protection code which we may add constantly.
...@@ -409,25 +415,28 @@ public: ...@@ -409,25 +415,28 @@ public:
Server(ProfilerLinker = ProfilerLinker()); Server(ProfilerLinker = ProfilerLinker());
~Server(); ~Server();
// A set of functions to start this server. // Start this server. Use default options if `opt' is NULL.
// Returns 0 on success, -1 otherwise and errno is set appropriately. // This function can be called multiple times if the server is completely
// Notes:
// * Default options are taken if `opt' is NULL.
// * A server can be started more than once if the server is completely
// stopped by Stop() and Join(). // stopped by Stop() and Join().
// * port can be 0, which makes kernel to choose a port dynamically. // Returns 0 on success, -1 otherwise and errno is set appropriately.
// Start on an address in form of "0.0.0.0:8000". // Start on a single address "0.0.0.0:8000".
int Start(const char* ip_port_str, const ServerOptions* opt); int Start(const char* ip_port_str, const ServerOptions* opt);
int Start(const butil::EndPoint& ip_port, const ServerOptions* opt);
// Start on IP_ANY:port. // Start on IP_ANY:port.
int Start(int port, const ServerOptions* opt); int Start(int port, const ServerOptions* opt);
// Start on `ip_str' + any useable port in `range'
int Start(const char* ip_str, PortRange range, const ServerOptions *opt);
// NOTE: Stop() is paired with Join() to stop a server without losing // Start on ip:port enclosed in butil::EndPoint which is defined in
// requests. The point of separating them is that you can Stop() multiple // src/butil/endpoint.h
// servers before Join() them, in which case the total time to Join is int Start(const butil::EndPoint& ip_port, const ServerOptions* opt);
// Start on `ip_str' + any useable port in `port_range'
int Start(const char* ip_str, PortRange port_range,
const ServerOptions *opt);
// NOTE: Stop() is paired with Join() to stop a server with minimum lost
// of requests. The point of separating them is that you can Stop()
// multiple servers before Join()-ing them, the total time to Join is
// time of the slowest Join(). Otherwise you have to Join() them one by // time of the slowest Join(). Otherwise you have to Join() them one by
// one, in which case the total time is sum of all Join(). // one, in which case the total time is sum of all Join().
......
// Copyright (c) 2015 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef BRPC_THRIFT_HEADER_H
#define BRPC_THRIFT_HEADER_H
namespace brpc {
static const int32_t VERSION_MASK = ((int32_t)0xffffff00);
static const int32_t VERSION_1 = ((int32_t)0x80010000);
struct thrift_binary_head_t {
int32_t body_len;
};
} // namespace brpc
#endif // BRPC_THRIFT_HEADER_H
// Copyright (c) 2014 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Ge,Jun (gejun@baidu.com)
#define INTERNAL_SUPPRESS_PROTOBUF_FIELD_DEPRECATION
#include "brpc/thrift_binary_message.h"
#include <algorithm>
#include "butil/logging.h"
#include <google/protobuf/stubs/once.h>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/wire_format_lite_inl.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/reflection_ops.h>
#include <google/protobuf/wire_format.h>
namespace brpc {
namespace {
const ::google::protobuf::Descriptor* ThriftBinaryMessage_descriptor_ = NULL;
} // namespace
void protobuf_AssignDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto() {
protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto();
const ::google::protobuf::FileDescriptor* file =
::google::protobuf::DescriptorPool::generated_pool()->FindFileByName(
"baidu/rpc/thrift_binary_message.proto");
GOOGLE_CHECK(file != NULL);
ThriftBinaryMessage_descriptor_ = file->message_type(0);
}
namespace {
GOOGLE_PROTOBUF_DECLARE_ONCE(protobuf_AssignDescriptors_once_);
inline void protobuf_AssignDescriptorsOnce() {
::google::protobuf::GoogleOnceInit(&protobuf_AssignDescriptors_once_,
&protobuf_AssignDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto);
}
void protobuf_RegisterTypes(const ::std::string&) {
protobuf_AssignDescriptorsOnce();
::google::protobuf::MessageFactory::InternalRegisterGeneratedMessage(
ThriftBinaryMessage_descriptor_, &ThriftBinaryMessage::default_instance());
}
} // namespace
void protobuf_ShutdownFile_baidu_2frpc_2fthrift_binary_5fmessage_2eproto() {
delete ThriftBinaryMessage::default_instance_;
}
void protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto_impl() {
GOOGLE_PROTOBUF_VERIFY_VERSION;
#if GOOGLE_PROTOBUF_VERSION >= 3002000
::google::protobuf::internal::InitProtobufDefaults();
#else
::google::protobuf::protobuf_AddDesc_google_2fprotobuf_2fdescriptor_2eproto();
#endif
::google::protobuf::DescriptorPool::InternalAddGeneratedFile(
"\n\033thrift_binary_message.proto\022\004brpc\"\025\n\023T"
"hriftBinaryMessage", 58);
::google::protobuf::MessageFactory::InternalRegisterGeneratedFile(
"thrift_binary_message.proto", &protobuf_RegisterTypes);
ThriftBinaryMessage::default_instance_ = new ThriftBinaryMessage();
ThriftBinaryMessage::default_instance_->InitAsDefaultInstance();
::google::protobuf::internal::OnShutdown(&protobuf_ShutdownFile_baidu_2frpc_2fthrift_binary_5fmessage_2eproto);
}
GOOGLE_PROTOBUF_DECLARE_ONCE(protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto_once);
void protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto() {
::google::protobuf::GoogleOnceInit(
&protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto_once,
&protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto_impl);
}
// Force AddDescriptors() to be called at static initialization time.
struct StaticDescriptorInitializer_baidu_2frpc_2fthrift_binary_5fmessage_2eproto {
StaticDescriptorInitializer_baidu_2frpc_2fthrift_binary_5fmessage_2eproto() {
protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto();
}
} static_descriptor_initializer_baidu_2frpc_2fthrift_binary_5fmessage_2eproto_;
// ===================================================================
#ifndef _MSC_VER
#endif // !_MSC_VER
ThriftBinaryMessage::ThriftBinaryMessage()
: ::google::protobuf::Message() {
SharedCtor();
}
void ThriftBinaryMessage::InitAsDefaultInstance() {
}
ThriftBinaryMessage::ThriftBinaryMessage(const ThriftBinaryMessage& from)
: ::google::protobuf::Message() {
SharedCtor();
MergeFrom(from);
}
void ThriftBinaryMessage::SharedCtor() {
memset(&head, 0, sizeof(head));
}
ThriftBinaryMessage::~ThriftBinaryMessage() {
SharedDtor();
}
void ThriftBinaryMessage::SharedDtor() {
if (this != default_instance_) {
}
}
const ::google::protobuf::Descriptor* ThriftBinaryMessage::descriptor() {
protobuf_AssignDescriptorsOnce();
return ThriftBinaryMessage_descriptor_;
}
const ThriftBinaryMessage& ThriftBinaryMessage::default_instance() {
if (default_instance_ == NULL)
protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto();
return *default_instance_;
}
ThriftBinaryMessage* ThriftBinaryMessage::default_instance_ = NULL;
ThriftBinaryMessage* ThriftBinaryMessage::New() const {
return new ThriftBinaryMessage;
}
void ThriftBinaryMessage::Clear() {
memset(&head, 0, sizeof(head));
body.clear();
}
bool ThriftBinaryMessage::MergePartialFromCodedStream(
::google::protobuf::io::CodedInputStream* input) {
#define DO_(EXPRESSION) if (!(EXPRESSION)) return false
::google::protobuf::uint32 tag;
while ((tag = input->ReadTag()) != 0) {
if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) ==
::google::protobuf::internal::WireFormatLite::WIRETYPE_END_GROUP) {
return true;
}
}
return true;
#undef DO_
}
void ThriftBinaryMessage::SerializeWithCachedSizes(
::google::protobuf::io::CodedOutputStream*) const {
}
::google::protobuf::uint8* ThriftBinaryMessage::SerializeWithCachedSizesToArray(
::google::protobuf::uint8* target) const {
return target;
}
int ThriftBinaryMessage::ByteSize() const {
return sizeof(thrift_binary_head_t) + body.size();
}
void ThriftBinaryMessage::MergeFrom(const ::google::protobuf::Message& from) {
GOOGLE_CHECK_NE(&from, this);
const ThriftBinaryMessage* source =
::google::protobuf::internal::dynamic_cast_if_available<const ThriftBinaryMessage*>(
&from);
if (source == NULL) {
LOG(ERROR) << "Can only merge from ThriftBinaryMessage";
return;
} else {
MergeFrom(*source);
}
}
void ThriftBinaryMessage::MergeFrom(const ThriftBinaryMessage& from) {
GOOGLE_CHECK_NE(&from, this);
head = from.head;
body = from.body;
}
void ThriftBinaryMessage::CopyFrom(const ::google::protobuf::Message& from) {
if (&from == this) return;
Clear();
MergeFrom(from);
}
void ThriftBinaryMessage::CopyFrom(const ThriftBinaryMessage& from) {
if (&from == this) return;
Clear();
MergeFrom(from);
}
bool ThriftBinaryMessage::IsInitialized() const {
return true;
}
void ThriftBinaryMessage::Swap(ThriftBinaryMessage* other) {
if (other != this) {
const thrift_binary_head_t tmp = other->head;
other->head = head;
head = tmp;
body.swap(other->body);
}
}
::google::protobuf::Metadata ThriftBinaryMessage::GetMetadata() const {
protobuf_AssignDescriptorsOnce();
::google::protobuf::Metadata metadata;
metadata.descriptor = ThriftBinaryMessage_descriptor_;
metadata.reflection = NULL;
return metadata;
}
} // namespace brpc
// Copyright (c) 2014 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Ge,Jun (gejun@baidu.com)
#ifndef BRPC_THRIFT_BINARY_MESSAGE_H
#define BRPC_THRIFT_BINARY_MESSAGE_H
#include <string>
#include <google/protobuf/stubs/common.h>
#include <google/protobuf/generated_message_util.h>
#include <google/protobuf/repeated_field.h>
#include <google/protobuf/extension_set.h>
#include <google/protobuf/generated_message_reflection.h>
#include "google/protobuf/descriptor.pb.h"
#include "brpc/thrift_binary_head.h" // thrfit_binary_head_t
#include "butil/iobuf.h" // IOBuf
namespace brpc {
// Internal implementation detail -- do not call these.
void protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto();
void protobuf_AssignDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto();
void protobuf_ShutdownFile_baidu_2frpc_2fthrift_binary_5fmessage_2eproto();
// Representing a thrift_binary request or response.
class ThriftBinaryMessage : public ::google::protobuf::Message {
public:
thrift_binary_head_t head;
butil::IOBuf body;
public:
ThriftBinaryMessage();
virtual ~ThriftBinaryMessage();
ThriftBinaryMessage(const ThriftBinaryMessage& from);
inline ThriftBinaryMessage& operator=(const ThriftBinaryMessage& from) {
CopyFrom(from);
return *this;
}
static const ::google::protobuf::Descriptor* descriptor();
static const ThriftBinaryMessage& default_instance();
void Swap(ThriftBinaryMessage* other);
// implements Message ----------------------------------------------
ThriftBinaryMessage* New() const;
void CopyFrom(const ::google::protobuf::Message& from);
void MergeFrom(const ::google::protobuf::Message& from);
void CopyFrom(const ThriftBinaryMessage& from);
void MergeFrom(const ThriftBinaryMessage& from);
void Clear();
bool IsInitialized() const;
int ByteSize() const;
bool MergePartialFromCodedStream(
::google::protobuf::io::CodedInputStream* input);
void SerializeWithCachedSizes(
::google::protobuf::io::CodedOutputStream* output) const;
::google::protobuf::uint8* SerializeWithCachedSizesToArray(::google::protobuf::uint8* output) const;
int GetCachedSize() const { return ByteSize(); }
::google::protobuf::Metadata GetMetadata() const;
private:
void SharedCtor();
void SharedDtor();
private:
friend void protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto_impl();
friend void protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto();
friend void protobuf_AssignDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto();
friend void protobuf_ShutdownFile_baidu_2frpc_2fthrift_binary_5fmessage_2eproto();
void InitAsDefaultInstance();
static ThriftBinaryMessage* default_instance_;
};
} // namespace brpc
#endif // BRPC_THRIFT_BINARY_MESSAGE_H
// Copyright (c) 2015 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Ge,Jun (gejun@baidu.com)
#include "butil/class_name.h"
#include "brpc/thrift_service.h"
#include "brpc/details/method_status.h"
namespace brpc {
BAIDU_CASSERT(sizeof(thrift_binary_head_t) == 4, sizeof_thrift_must_be_4);
ThriftFramedService::ThriftFramedService() : _additional_space(0) {
_status = new (std::nothrow) MethodStatus;
LOG_IF(FATAL, _status == NULL) << "Fail to new MethodStatus";
}
ThriftFramedService::ThriftFramedService(const ThriftFramedServiceOptions& options)
: _status(NULL), _additional_space(options.additional_space) {
if (options.generate_status) {
_status = new (std::nothrow) MethodStatus;
LOG_IF(FATAL, _status == NULL) << "Fail to new MethodStatus";
}
}
ThriftFramedService::~ThriftFramedService() {
delete _status;
_status = NULL;
}
void ThriftFramedService::Describe(std::ostream &os, const DescribeOptions&) const {
os << butil::class_name_str(*this);
}
void ThriftFramedService::Expose(const butil::StringPiece& prefix) {
_cached_name = butil::class_name_str(*this);
if (_status == NULL) {
return;
}
std::string s;
s.reserve(prefix.size() + 1 + _cached_name.size());
s.append(prefix.data(), prefix.size());
s.push_back('_');
s.append(_cached_name);
_status->Expose(s);
}
} // namespace brpc
// Copyright (c) 2015 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Ge,Jun (gejun@baidu.com)
#ifndef BRPC_THRIFT_SERVICE_H
#define BRPC_THRIFT_SERVICE_H
#include "brpc/controller.h" // Controller
#include "brpc/thrift_binary_message.h" // ThriftBinaryMessage
#include "brpc/describable.h"
namespace brpc {
class Socket;
class Server;
class MethodStatus;
class StatusService;
namespace policy {
void ProcessThriftBinaryRequest(InputMessageBase* msg_base);
}
// The continuation of request processing. Namely send response back to client.
// NOTE: you DON'T need to inherit this class or create instance of this class.
class ThriftFramedClosure : public google::protobuf::Closure {
public:
explicit ThriftFramedClosure(void* additional_space);
// [Required] Call this to send response back to the client.
void Run();
// [Optional] Set the full method name. If unset, use name of the service.
void SetMethodName(const std::string& full_method_name);
// The space required by subclass at ThriftFramedServiceOptions. subclass may
// utilizes this feature to save the cost of allocating closure separately.
// If subclass does not require space, this return value is NULL.
void* additional_space() { return _additional_space; }
// The starting time of the RPC, got from butil::cpuwide_time_us().
int64_t cpuwide_start_us() const { return _start_parse_us; }
// Don't send response back, used by MIMO.
void DoNotRespond();
private:
friend void policy::ProcessThriftBinaryRequest(InputMessageBase* msg_base);
friend class DeleteThriftFramedClosure;
// Only callable by Run().
~ThriftFramedClosure();
Socket* _socket_ptr;
const Server* _server;
int64_t _start_parse_us;
ThriftBinaryMessage _request;
ThriftBinaryMessage _response;
bool _do_respond;
void* _additional_space;
Controller _controller;
};
struct ThriftFramedServiceOptions {
ThriftFramedServiceOptions() : generate_status(true), additional_space(0) {}
ThriftFramedServiceOptions(bool generate_status2, size_t additional_space2)
: generate_status(generate_status2)
, additional_space(additional_space2) {}
bool generate_status;
size_t additional_space;
};
// Inherit this class to let brpc server understands thrift_binary requests.
class ThriftFramedService : public Describable {
public:
ThriftFramedService();
ThriftFramedService(const ThriftFramedServiceOptions&);
virtual ~ThriftFramedService();
// Implement this method to handle thrift_binary requests. Notice that this
// method can be called with a failed Controller(something wrong with the
// request before calling this method), in which case the implemenetation
// shall send specific response with error information back to client.
// Parameters:
// server The server receiving the request.
// controller per-rpc settings.
// request The thrift_binary request received.
// response The thrift_binary response that you should fill in.
// done You must call done->Run() to end the processing.
virtual void ProcessThriftBinaryRequest(const Server& server,
Controller* controller,
const ThriftBinaryMessage& request,
ThriftBinaryMessage* response,
ThriftFramedClosure* done) = 0;
// Put descriptions into the stream.
void Describe(std::ostream &os, const DescribeOptions&) const;
private:
DISALLOW_COPY_AND_ASSIGN(ThriftFramedService);
friend class ThriftFramedClosure;
friend void policy::ProcessThriftBinaryRequest(InputMessageBase* msg_base);
friend class StatusService;
friend class Server;
private:
void Expose(const butil::StringPiece& prefix);
// Tracking status of non ThriftBinaryPbService
MethodStatus* _status;
size_t _additional_space;
std::string _cached_name;
};
} // namespace brpc
#endif // BRPC_THRIFT_SERVICE_H
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment