Commit 05a0bd10 authored by jamesge's avatar jamesge Committed by Ge Jun

Renamed ThriftMessage to ThriftFramedMessage, ThriftTemplateMessage to ThriftMessage

Removed thrift_binary_head.h and renamed thrift_binary_head_t to thrift_head_t which is in thrift_message.h
Adapt thrift 0.11 which uses shared_ptr defined in stdcxx.h
Simplify example/thrift_extension_c++ by roundrobin between processing by handler or directly.
Removed libbrpc_thrift.a, the objs are archived into libbrpc.a (and libbrpc.so) directly
parent 18014aed
...@@ -195,7 +195,7 @@ ifeq (ENABLE_THRIFT_FRAMED_PROTOCOL, $(findstring ENABLE_THRIFT_FRAMED_PROTOCOL, ...@@ -195,7 +195,7 @@ ifeq (ENABLE_THRIFT_FRAMED_PROTOCOL, $(findstring ENABLE_THRIFT_FRAMED_PROTOCOL,
THRIFT_OBJS = $(addsuffix .o, $(basename $(THRIFT_SOURCES))) THRIFT_OBJS = $(addsuffix .o, $(basename $(THRIFT_SOURCES)))
endif endif
OBJS=$(BUTIL_OBJS) $(BVAR_OBJS) $(BTHREAD_OBJS) $(JSON2PB_OBJS) $(MCPACK2PB_OBJS) $(BRPC_OBJS) OBJS=$(BUTIL_OBJS) $(BVAR_OBJS) $(BTHREAD_OBJS) $(JSON2PB_OBJS) $(MCPACK2PB_OBJS) $(BRPC_OBJS) $(THRIFT_OBJS)
BVAR_DEBUG_OBJS=$(BUTIL_OBJS:.o=.dbg.o) $(BVAR_OBJS:.o=.dbg.o) BVAR_DEBUG_OBJS=$(BUTIL_OBJS:.o=.dbg.o) $(BVAR_OBJS:.o=.dbg.o)
DEBUG_OBJS = $(OBJS:.o=.dbg.o) DEBUG_OBJS = $(OBJS:.o=.dbg.o)
...@@ -203,7 +203,7 @@ DEBUG_OBJS = $(OBJS:.o=.dbg.o) ...@@ -203,7 +203,7 @@ DEBUG_OBJS = $(OBJS:.o=.dbg.o)
PROTOS=$(BRPC_PROTOS) src/idl_options.proto PROTOS=$(BRPC_PROTOS) src/idl_options.proto
.PHONY:all .PHONY:all
all: protoc-gen-mcpack libbrpc.a $(TARGET_LIB_DY) libbrpc_thrift.a output/include output/lib output/bin all: protoc-gen-mcpack libbrpc.a $(TARGET_LIB_DY) output/include output/lib output/bin
.PHONY:debug .PHONY:debug
debug: test/libbrpc.dbg.a test/libbvar.dbg.a debug: test/libbrpc.dbg.a test/libbvar.dbg.a
...@@ -211,7 +211,7 @@ debug: test/libbrpc.dbg.a test/libbvar.dbg.a ...@@ -211,7 +211,7 @@ debug: test/libbrpc.dbg.a test/libbvar.dbg.a
.PHONY:clean .PHONY:clean
clean: clean:
@echo "Cleaning" @echo "Cleaning"
@rm -rf src/mcpack2pb/generator.o protoc-gen-mcpack libbrpc.a $(TARGET_LIB_DY) libbrpc_thrift.a $(OBJS) output/include output/lib output/bin $(PROTOS:.proto=.pb.h) $(PROTOS:.proto=.pb.cc) @rm -rf src/mcpack2pb/generator.o protoc-gen-mcpack libbrpc.a $(TARGET_LIB_DY) $(OBJS) output/include output/lib output/bin $(PROTOS:.proto=.pb.h) $(PROTOS:.proto=.pb.cc)
.PHONY:clean_debug .PHONY:clean_debug
clean_debug: clean_debug:
...@@ -248,10 +248,6 @@ test/libbrpc.dbg.a:$(BRPC_PROTOS:.proto=.pb.h) $(DEBUG_OBJS) ...@@ -248,10 +248,6 @@ test/libbrpc.dbg.a:$(BRPC_PROTOS:.proto=.pb.h) $(DEBUG_OBJS)
@echo "Packing $@" @echo "Packing $@"
@ar crs $@ $(filter %.o,$^) @ar crs $@ $(filter %.o,$^)
libbrpc_thrift.a:$(THRIFT_OBJS)
@echo "Packing $@"
@ar crs $@ $(filter %.o,$^)
.PHONY:output/include .PHONY:output/include
output/include: output/include:
@echo "Copying to $@" @echo "Copying to $@"
...@@ -260,7 +256,7 @@ output/include: ...@@ -260,7 +256,7 @@ output/include:
@cp src/idl_options.proto src/idl_options.pb.h $@ @cp src/idl_options.proto src/idl_options.pb.h $@
.PHONY:output/lib .PHONY:output/lib
output/lib:libbrpc.a $(TARGET_LIB_DY) libbrpc_thrift.a output/lib:libbrpc.a $(TARGET_LIB_DY)
@echo "Copying to $@" @echo "Copying to $@"
@mkdir -p $@ @mkdir -p $@
@cp $^ $@ @cp $^ $@
......
...@@ -11,7 +11,7 @@ LIBPATHS = $(addprefix -L, $(LIBS)) ...@@ -11,7 +11,7 @@ LIBPATHS = $(addprefix -L, $(LIBS))
COMMA=, COMMA=,
SOPATHS=$(addprefix -Wl$(COMMA)-rpath=, $(LIBS)) SOPATHS=$(addprefix -Wl$(COMMA)-rpath=, $(LIBS))
STATIC_LINKINGS += -lbrpc -lthrift -lgflags -Wl,--whole-archive -lbrpc_thrift -Wl,--no-whole-archive -levent STATIC_LINKINGS += -lbrpc -lthrift -lgflags -Wl,--whole-archive -Wl,--no-whole-archive -levent
CLIENT_SOURCES = client.cpp CLIENT_SOURCES = client.cpp
SERVER_SOURCES = server.cpp SERVER_SOURCES = server.cpp
...@@ -55,7 +55,7 @@ libechothrift.a: ...@@ -55,7 +55,7 @@ libechothrift.a:
@ar -crv libechothrift.a EchoService.o echo_types.o @ar -crv libechothrift.a EchoService.o echo_types.o
native_server: libechothrift.a native_server: libechothrift.a
@$(CXX) native_server.cpp gen-cpp/echo_types.cpp gen-cpp/EchoService.cpp $(HDRPATHS) $(LIBPATHS) $(CXXFLAGS) -lthriftnb -lthrift -levent -lpthread -lgflags -lbrpc -lbrpc_thrift -o native_server @$(CXX) native_server.cpp gen-cpp/echo_types.cpp gen-cpp/EchoService.cpp $(HDRPATHS) $(LIBPATHS) $(SOPATHS) $(CXXFLAGS) -lthriftnb -lthrift -levent -lpthread -lgflags -lbrpc -o native_server
native_client: libechothrift.a native_client: libechothrift.a
@$(CXX) native_client.cpp gen-cpp/echo_types.cpp gen-cpp/EchoService.cpp $(HDRPATHS) $(LIBPATHS) $(CXXFLAGS) -lthriftnb -lthrift -levent -lpthread -lgflags -lbrpc -lbrpc_thrift -o native_client @$(CXX) native_client.cpp gen-cpp/echo_types.cpp gen-cpp/EchoService.cpp $(HDRPATHS) $(LIBPATHS) $(SOPATHS) $(CXXFLAGS) -lthriftnb -lthrift -levent -lpthread -lgflags -lbrpc -o native_client
...@@ -68,8 +68,8 @@ int main(int argc, char* argv[]) { ...@@ -68,8 +68,8 @@ int main(int argc, char* argv[]) {
cntl.set_log_id(log_id ++); // set by user cntl.set_log_id(log_id ++); // set by user
// wrapper thrift raw request into ThriftMessage // wrapper thrift raw request into ThriftMessage
brpc::ThriftTemplateMessage<example::EchoRequest> req; brpc::ThriftMessage<example::EchoRequest> req;
brpc::ThriftTemplateMessage<example::EchoResponse> res; brpc::ThriftMessage<example::EchoResponse> res;
req.raw().data = "hello"; req.raw().data = "hello";
......
...@@ -23,6 +23,15 @@ ...@@ -23,6 +23,15 @@
#include <butil/logging.h> #include <butil/logging.h>
// _THRIFT_STDCXX_H_ is defined by thrift/stdcxx.h which was added since thrift 0.11.0
#ifndef THRIFT_STDCXX
#if defined(_THRIFT_STDCXX_H_)
# define THRIFT_STDCXX apache::thrift::stdcxx
#else
# define THRIFT_STDCXX boost
#endif
#endif
DEFINE_string(server, "0.0.0.0", "IP Address of server"); DEFINE_string(server, "0.0.0.0", "IP Address of server");
DEFINE_int32(port, 8019, "Port of server"); DEFINE_int32(port, 8019, "Port of server");
...@@ -31,11 +40,11 @@ int main(int argc, char **argv) { ...@@ -31,11 +40,11 @@ int main(int argc, char **argv) {
// Parse gflags. We recommend you to use gflags as well. // Parse gflags. We recommend you to use gflags as well.
google::ParseCommandLineFlags(&argc, &argv, true); google::ParseCommandLineFlags(&argc, &argv, true);
boost::shared_ptr<apache::thrift::transport::TSocket> socket( THRIFT_STDCXX::shared_ptr<apache::thrift::transport::TSocket> socket(
new apache::thrift::transport::TSocket(FLAGS_server, FLAGS_port)); new apache::thrift::transport::TSocket(FLAGS_server, FLAGS_port));
boost::shared_ptr<apache::thrift::transport::TTransport> transport( THRIFT_STDCXX::shared_ptr<apache::thrift::transport::TTransport> transport(
new apache::thrift::transport::TFramedTransport(socket)); new apache::thrift::transport::TFramedTransport(socket));
boost::shared_ptr<apache::thrift::protocol::TProtocol> protocol( THRIFT_STDCXX::shared_ptr<apache::thrift::protocol::TProtocol> protocol(
new apache::thrift::protocol::TBinaryProtocol(transport)); new apache::thrift::protocol::TBinaryProtocol(transport));
example::EchoServiceClient client(protocol); example::EchoServiceClient client(protocol);
......
...@@ -26,6 +26,14 @@ ...@@ -26,6 +26,14 @@
#include <thrift/server/TNonblockingServer.h> #include <thrift/server/TNonblockingServer.h>
#include <thrift/concurrency/PosixThreadFactory.h> #include <thrift/concurrency/PosixThreadFactory.h>
// _THRIFT_STDCXX_H_ is defined by thrift/stdcxx.h which was added since thrift 0.11.0
#ifndef THRIFT_STDCXX
#if defined(_THRIFT_STDCXX_H_)
# define THRIFT_STDCXX apache::thrift::stdcxx
#else
# define THRIFT_STDCXX boost
#endif
#endif
DEFINE_int32(port, 8019, "Port of server"); DEFINE_int32(port, 8019, "Port of server");
...@@ -45,19 +53,19 @@ int main(int argc, char *argv[]) { ...@@ -45,19 +53,19 @@ int main(int argc, char *argv[]) {
// Parse gflags. We recommend you to use gflags as well. // Parse gflags. We recommend you to use gflags as well.
google::ParseCommandLineFlags(&argc, &argv, true); google::ParseCommandLineFlags(&argc, &argv, true);
boost::shared_ptr<EchoServiceHandler> handler(new EchoServiceHandler()); THRIFT_STDCXX::shared_ptr<EchoServiceHandler> handler(new EchoServiceHandler());
boost::shared_ptr<apache::thrift::concurrency::PosixThreadFactory> thread_factory( THRIFT_STDCXX::shared_ptr<apache::thrift::concurrency::PosixThreadFactory> thread_factory(
new apache::thrift::concurrency::PosixThreadFactory( new apache::thrift::concurrency::PosixThreadFactory(
apache::thrift::concurrency::PosixThreadFactory::ROUND_ROBIN, apache::thrift::concurrency::PosixThreadFactory::ROUND_ROBIN,
apache::thrift::concurrency::PosixThreadFactory::NORMAL, 1, false)); apache::thrift::concurrency::PosixThreadFactory::NORMAL, 1, false));
boost::shared_ptr<apache::thrift::server::TProcessor> processor( THRIFT_STDCXX::shared_ptr<apache::thrift::server::TProcessor> processor(
new example::EchoServiceProcessor(handler)); new example::EchoServiceProcessor(handler));
boost::shared_ptr<apache::thrift::protocol::TProtocolFactory> protocol_factory( THRIFT_STDCXX::shared_ptr<apache::thrift::protocol::TProtocolFactory> protocol_factory(
new apache::thrift::protocol::TBinaryProtocolFactory()); new apache::thrift::protocol::TBinaryProtocolFactory());
boost::shared_ptr<apache::thrift::transport::TTransportFactory> transport_factory( THRIFT_STDCXX::shared_ptr<apache::thrift::transport::TTransportFactory> transport_factory(
new apache::thrift::transport::TBufferedTransportFactory()); new apache::thrift::transport::TBufferedTransportFactory());
boost::shared_ptr<apache::thrift::concurrency::ThreadManager> thread_mgr( THRIFT_STDCXX::shared_ptr<apache::thrift::concurrency::ThreadManager> thread_mgr(
apache::thrift::concurrency::ThreadManager::newSimpleThreadManager(2)); apache::thrift::concurrency::ThreadManager::newSimpleThreadManager(2));
thread_mgr->threadFactory(thread_factory); thread_mgr->threadFactory(thread_factory);
......
...@@ -26,7 +26,6 @@ ...@@ -26,7 +26,6 @@
#include "gen-cpp/echo_types.h" #include "gen-cpp/echo_types.h"
DEFINE_int32(port, 8019, "TCP Port of this server"); DEFINE_int32(port, 8019, "TCP Port of this server");
DEFINE_int32(port2, 8018, "TCP Port of this server");
DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no " DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
"read/write operations during the last `idle_timeout_s'"); "read/write operations during the last `idle_timeout_s'");
DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel"); DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel");
...@@ -37,22 +36,23 @@ public: ...@@ -37,22 +36,23 @@ public:
void Echo(example::EchoResponse& res, const example::EchoRequest& req) { void Echo(example::EchoResponse& res, const example::EchoRequest& req) {
// Process request, just attach a simple string. // Process request, just attach a simple string.
res.data = req.data + " world"; res.data = req.data + " (processed by handler)";
LOG(INFO) << "Echo req.data: " << req.data;
return; return;
} }
}; };
static std::atomic<int> g_counter(0);
// Adapt your own thrift-based protocol to use brpc // Adapt your own thrift-based protocol to use brpc
class MyThriftProtocol : public brpc::ThriftService { class MyThriftProtocol : public brpc::ThriftService {
public: public:
MyThriftProtocol(EchoServiceHandler* handler) : _handler(handler) { } explicit MyThriftProtocol(EchoServiceHandler* handler) : _handler(handler) { }
void ProcessThriftFramedRequest(const brpc::Server&, void ProcessThriftFramedRequest(const brpc::Server&,
brpc::Controller* cntl, brpc::Controller* cntl,
brpc::ThriftMessage* request, brpc::ThriftFramedMessage* request,
brpc::ThriftMessage* response, brpc::ThriftFramedMessage* response,
brpc::ThriftClosure* done) { brpc::ThriftClosure* done) {
// This object helps you to call done->Run() in RAII style. If you need // This object helps you to call done->Run() in RAII style. If you need
// to process the request asynchronously, pass done_guard.release(). // to process the request asynchronously, pass done_guard.release().
...@@ -65,20 +65,19 @@ public: ...@@ -65,20 +65,19 @@ public:
return; return;
} }
example::EchoRequest* req = request->cast<example::EchoRequest>(); example::EchoRequest* req = request->Cast<example::EchoRequest>();
example::EchoResponse* res = response->cast<example::EchoResponse>(); example::EchoResponse* res = response->Cast<example::EchoResponse>();
// process with req and res if (g_counter++ % 2 == 0) {
if (_handler) { if (!_handler) {
cntl->CloseConnection("Close connection due to no valid handler");
LOG(ERROR) << "No valid handler";
return;
}
_handler->Echo(*res, *req); _handler->Echo(*res, *req);
} else { } else {
cntl->CloseConnection("Close connection due to no valid handler"); res->data = req->data + " (processed directly)";
LOG(ERROR) << "Fail to process thrift request due to no valid handler";
return;
} }
LOG(INFO) << "success to process thrift request in brpc with handler";
} }
private: private:
...@@ -86,38 +85,6 @@ private: ...@@ -86,38 +85,6 @@ private:
}; };
// Adapt your own thrift-based protocol to use brpc
class MyThriftProtocolPbManner : public brpc::ThriftService {
public:
void ProcessThriftFramedRequest(const brpc::Server&,
brpc::Controller* cntl,
brpc::ThriftMessage* request,
brpc::ThriftMessage* response,
brpc::ThriftClosure* done) {
// This object helps you to call done->Run() in RAII style. If you need
// to process the request asynchronously, pass done_guard.release().
brpc::ClosureGuard done_guard(done);
if (cntl->Failed()) {
// NOTE: You can send back a response containing error information
// back to client instead of closing the connection.
cntl->CloseConnection("Close connection due to previous error");
return;
}
example::EchoRequest* req = request->cast<example::EchoRequest>();
example::EchoResponse* res = response->cast<example::EchoResponse>();
// process with req and res
res->data = req->data + " world another!";
LOG(INFO) << "success to process thrift request in brpc with pb manner";
}
};
int main(int argc, char* argv[]) { int main(int argc, char* argv[]) {
// Parse gflags. We recommend you to use gflags as well. // Parse gflags. We recommend you to use gflags as well.
google::ParseCommandLineFlags(&argc, &argv, true); google::ParseCommandLineFlags(&argc, &argv, true);
...@@ -125,9 +92,8 @@ int main(int argc, char* argv[]) { ...@@ -125,9 +92,8 @@ int main(int argc, char* argv[]) {
brpc::Server server; brpc::Server server;
brpc::ServerOptions options; brpc::ServerOptions options;
auto thrift_service_handler = new EchoServiceHandler(); EchoServiceHandler thrift_service_handler;
options.thrift_service = new MyThriftProtocol(&thrift_service_handler);
options.thrift_service = new MyThriftProtocol(thrift_service_handler);
options.idle_timeout_sec = FLAGS_idle_timeout_s; options.idle_timeout_sec = FLAGS_idle_timeout_s;
options.max_concurrency = FLAGS_max_concurrency; options.max_concurrency = FLAGS_max_concurrency;
...@@ -137,18 +103,6 @@ int main(int argc, char* argv[]) { ...@@ -137,18 +103,6 @@ int main(int argc, char* argv[]) {
return -1; return -1;
} }
brpc::Server server2;
brpc::ServerOptions options2;
options2.thrift_service = new MyThriftProtocolPbManner;
options2.idle_timeout_sec = FLAGS_idle_timeout_s;
options2.max_concurrency = FLAGS_max_concurrency;
// Start the server2.
if (server2.Start(FLAGS_port2, &options2) != 0) {
LOG(ERROR) << "Fail to start EchoServer";
return -1;
}
// Wait until Ctrl-C is pressed, then Stop() and Join() the server. // Wait until Ctrl-C is pressed, then Stop() and Join() the server.
server.RunUntilAskedToQuit(); server.RunUntilAskedToQuit();
return 0; return 0;
......
// Copyright (c) 2015 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef BRPC_THRIFT_HEADER_H
#define BRPC_THRIFT_HEADER_H
namespace brpc {
static const int32_t VERSION_MASK = ((int32_t)0xffffff00);
static const int32_t VERSION_1 = ((int32_t)0x80010000);
struct thrift_binary_head_t {
int32_t body_len;
};
} // namespace brpc
#endif // BRPC_THRIFT_HEADER_H
...@@ -12,21 +12,29 @@ ...@@ -12,21 +12,29 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
// utils for serilize/deserilize thrift binary message to brpc protobuf obj. // utils for serialize/parse thrift binary message to brpc protobuf obj.
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL #ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
#ifndef BRPC_THRIFT_UTILS_H #ifndef BRPC_THRIFT_UTILS_H
#define BRPC_THRIFT_UTILS_H #define BRPC_THRIFT_UTILS_H
#include <boost/make_shared.hpp>
#include "butil/iobuf.h" #include "butil/iobuf.h"
#include <thrift/TDispatchProcessor.h> #include <thrift/TDispatchProcessor.h>
#include <thrift/transport/TBufferTransports.h> #include <thrift/transport/TBufferTransports.h>
#include <thrift/protocol/TBinaryProtocol.h> #include <thrift/protocol/TBinaryProtocol.h>
// _THRIFT_STDCXX_H_ is defined by thrift/stdcxx.h which was added since thrift 0.11.0
// TDispatcherProcessor.h above uses shared_ptr and should include stdcxx.h
#ifndef THRIFT_STDCXX
#if defined(_THRIFT_STDCXX_H_)
# define THRIFT_STDCXX apache::thrift::stdcxx
#else
# define THRIFT_STDCXX boost
#endif
#endif
namespace brpc { namespace brpc {
template <typename T> template <typename T>
...@@ -45,9 +53,9 @@ bool serialize_iobuf_to_thrift_message(butil::IOBuf& body, ...@@ -45,9 +53,9 @@ bool serialize_iobuf_to_thrift_message(butil::IOBuf& body,
void* thrift_raw_instance, std::string* method_name, int32_t* thrift_message_seq_id) { void* thrift_raw_instance, std::string* method_name, int32_t* thrift_message_seq_id) {
auto in_buffer = auto in_buffer =
boost::make_shared<apache::thrift::transport::TMemoryBuffer>(); THRIFT_STDCXX::make_shared<apache::thrift::transport::TMemoryBuffer>();
auto in_portocol = auto in_portocol =
boost::make_shared<apache::thrift::protocol::TBinaryProtocol>(in_buffer); THRIFT_STDCXX::make_shared<apache::thrift::protocol::TBinaryProtocol>(in_buffer);
// Cut the thrift buffer and parse thrift message // Cut the thrift buffer and parse thrift message
size_t body_len = body.size(); size_t body_len = body.size();
......
...@@ -20,8 +20,6 @@ ...@@ -20,8 +20,6 @@
#include <google/protobuf/message.h> // Message #include <google/protobuf/message.h> // Message
#include <gflags/gflags.h> #include <gflags/gflags.h>
#include <boost/make_shared.hpp>
#include "butil/time.h" #include "butil/time.h"
#include "butil/iobuf.h" // butil::IOBuf #include "butil/iobuf.h" // butil::IOBuf
#include "brpc/log.h" #include "brpc/log.h"
...@@ -40,6 +38,16 @@ ...@@ -40,6 +38,16 @@
#include <thrift/transport/TBufferTransports.h> #include <thrift/transport/TBufferTransports.h>
#include <thrift/protocol/TBinaryProtocol.h> #include <thrift/protocol/TBinaryProtocol.h>
// _THRIFT_STDCXX_H_ is defined by thrift/stdcxx.h which was added since thrift 0.11.0
#include <thrift/TProcessor.h> // to include stdcxx.h if present
#ifndef THRIFT_STDCXX
#if defined(_THRIFT_STDCXX_H_)
# define THRIFT_STDCXX apache::thrift::stdcxx
#else
# define THRIFT_STDCXX boost
#endif
#endif
extern "C" { extern "C" {
void bthread_assign_data(void* data) __THROW; void bthread_assign_data(void* data) __THROW;
} }
...@@ -116,9 +124,9 @@ void ThriftClosure::Run() { ...@@ -116,9 +124,9 @@ void ThriftClosure::Run() {
} }
auto out_buffer = auto out_buffer =
boost::make_shared<apache::thrift::transport::TMemoryBuffer>(); THRIFT_STDCXX::make_shared<apache::thrift::transport::TMemoryBuffer>();
auto oprot = auto oprot =
boost::make_shared<apache::thrift::protocol::TBinaryProtocol>(out_buffer); THRIFT_STDCXX::make_shared<apache::thrift::protocol::TBinaryProtocol>(out_buffer);
// The following code was taken and modified from thrift auto generated code // The following code was taken and modified from thrift auto generated code
oprot->writeMessageBegin(method_name, oprot->writeMessageBegin(method_name,
...@@ -158,11 +166,11 @@ void ThriftClosure::Run() { ...@@ -158,11 +166,11 @@ void ThriftClosure::Run() {
_response.head.body_len = htonl(length); _response.head.body_len = htonl(length);
if (span) { if (span) {
int response_size = sizeof(thrift_binary_head_t) + _response.head.body_len; int response_size = sizeof(thrift_head_t) + _response.head.body_len;
span->set_response_size(response_size); span->set_response_size(response_size);
} }
butil::IOBuf write_buf; butil::IOBuf write_buf;
write_buf.append(&_response.head, sizeof(thrift_binary_head_t)); write_buf.append(&_response.head, sizeof(thrift_head_t));
write_buf.append(_response.body.movable()); write_buf.append(_response.body.movable());
// Have the risk of unlimited pending responses, in which case, tell // Have the risk of unlimited pending responses, in which case, tell
// users to set max_concurrency. // users to set max_concurrency.
...@@ -200,23 +208,23 @@ namespace policy { ...@@ -200,23 +208,23 @@ namespace policy {
ParseResult ParseThriftMessage(butil::IOBuf* source, ParseResult ParseThriftMessage(butil::IOBuf* source,
Socket*, bool /*read_eof*/, const void* /*arg*/) { Socket*, bool /*read_eof*/, const void* /*arg*/) {
char header_buf[sizeof(thrift_binary_head_t) + 3]; char header_buf[sizeof(thrift_head_t) + 3];
const size_t n = source->copy_to(header_buf, sizeof(thrift_binary_head_t) + 3); const size_t n = source->copy_to(header_buf, sizeof(thrift_head_t) + 3);
if (n < 7) { if (n < 7) {
return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
} }
const void* dummy = header_buf + sizeof(thrift_binary_head_t); const void* dummy = header_buf + sizeof(thrift_head_t);
const int32_t sz = ntohl(*(int32_t*)dummy); const int32_t sz = ntohl(*(int32_t*)dummy);
int32_t version = sz & VERSION_MASK; int32_t version = sz & THRIFT_HEAD_VERSION_MASK;
if (version != VERSION_1) { if (version != THRIFT_HEAD_VERSION_1) {
RPC_VLOG << "magic_num=" << version RPC_VLOG << "magic_num=" << version
<< " doesn't match THRIFT_MAGIC_NUM=" << VERSION_1; << " doesn't match THRIFT_MAGIC_NUM=" << THRIFT_HEAD_VERSION_1;
return MakeParseError(PARSE_ERROR_TRY_OTHERS); return MakeParseError(PARSE_ERROR_TRY_OTHERS);
} }
thrift_binary_head_t* thrift = (thrift_binary_head_t *)header_buf; thrift_head_t* thrift = (thrift_head_t *)header_buf;
thrift->body_len = ntohl(thrift->body_len); thrift->body_len = ntohl(thrift->body_len);
uint32_t body_len = thrift->body_len; uint32_t body_len = thrift->body_len;
if (body_len > FLAGS_max_body_size) { if (body_len > FLAGS_max_body_size) {
...@@ -226,7 +234,7 @@ ParseResult ParseThriftMessage(butil::IOBuf* source, ...@@ -226,7 +234,7 @@ ParseResult ParseThriftMessage(butil::IOBuf* source,
} }
policy::MostCommonMessage* msg = policy::MostCommonMessage::Get(); policy::MostCommonMessage* msg = policy::MostCommonMessage::Get();
source->cutn(&msg->meta, sizeof(thrift_binary_head_t)); source->cutn(&msg->meta, sizeof(thrift_head_t));
source->cutn(&msg->payload, body_len); source->cutn(&msg->payload, body_len);
return MakeMessage(msg); return MakeMessage(msg);
} }
...@@ -235,8 +243,8 @@ struct CallMethodInBackupThreadArgs { ...@@ -235,8 +243,8 @@ struct CallMethodInBackupThreadArgs {
ThriftService* service; ThriftService* service;
const Server* server; const Server* server;
Controller* controller; Controller* controller;
ThriftMessage* request; ThriftFramedMessage* request;
ThriftMessage* response; ThriftFramedMessage* response;
ThriftClosure* done; ThriftClosure* done;
}; };
...@@ -251,8 +259,8 @@ static void CallMethodInBackupThread(void* void_args) { ...@@ -251,8 +259,8 @@ static void CallMethodInBackupThread(void* void_args) {
static void EndRunningCallMethodInPool(ThriftService* service, static void EndRunningCallMethodInPool(ThriftService* service,
const Server& server, const Server& server,
Controller* controller, Controller* controller,
ThriftMessage* request, ThriftFramedMessage* request,
ThriftMessage* response, ThriftFramedMessage* response,
ThriftClosure* done) { ThriftClosure* done) {
CallMethodInBackupThreadArgs* args = new CallMethodInBackupThreadArgs; CallMethodInBackupThreadArgs* args = new CallMethodInBackupThreadArgs;
args->service = service; args->service = service;
...@@ -273,9 +281,9 @@ void ProcessThriftRequest(InputMessageBase* msg_base) { ...@@ -273,9 +281,9 @@ void ProcessThriftRequest(InputMessageBase* msg_base) {
const Server* server = static_cast<const Server*>(msg_base->arg()); const Server* server = static_cast<const Server*>(msg_base->arg());
ScopedNonServiceError non_service_error(server); ScopedNonServiceError non_service_error(server);
char buf[sizeof(thrift_binary_head_t)]; char buf[sizeof(thrift_head_t)];
const char *p = (const char *)msg->meta.fetch(buf, sizeof(buf)); const char *p = (const char *)msg->meta.fetch(buf, sizeof(buf));
thrift_binary_head_t *req_head = (thrift_binary_head_t *)p; thrift_head_t *req_head = (thrift_head_t *)p;
req_head->body_len = ntohl(req_head->body_len); req_head->body_len = ntohl(req_head->body_len);
ThriftService* service = server->options().thrift_service; ThriftService* service = server->options().thrift_service;
...@@ -307,8 +315,8 @@ void ProcessThriftRequest(InputMessageBase* msg_base) { ...@@ -307,8 +315,8 @@ void ProcessThriftRequest(InputMessageBase* msg_base) {
} }
ThriftClosure* thrift_done = new (space) ThriftClosure(sub_space); ThriftClosure* thrift_done = new (space) ThriftClosure(sub_space);
Controller* cntl = &(thrift_done->_controller); Controller* cntl = &(thrift_done->_controller);
ThriftMessage* req = &(thrift_done->_request); ThriftFramedMessage* req = &(thrift_done->_request);
ThriftMessage* res = &(thrift_done->_response); ThriftFramedMessage* res = &(thrift_done->_response);
req->head = *req_head; req->head = *req_head;
msg->payload.swap(req->body); msg->payload.swap(req->body);
...@@ -345,7 +353,7 @@ void ProcessThriftRequest(InputMessageBase* msg_base) { ...@@ -345,7 +353,7 @@ void ProcessThriftRequest(InputMessageBase* msg_base) {
span->set_protocol(PROTOCOL_THRIFT); span->set_protocol(PROTOCOL_THRIFT);
span->set_received_us(msg->received_us()); span->set_received_us(msg->received_us());
span->set_start_parse_us(start_parse_us); span->set_start_parse_us(start_parse_us);
span->set_request_size(sizeof(thrift_binary_head_t) + req_head->body_len); span->set_request_size(sizeof(thrift_head_t) + req_head->body_len);
} }
do { do {
...@@ -417,11 +425,11 @@ void ProcessThriftResponse(InputMessageBase* msg_base) { ...@@ -417,11 +425,11 @@ void ProcessThriftResponse(InputMessageBase* msg_base) {
span->set_start_parse_us(start_parse_us); span->set_start_parse_us(start_parse_us);
} }
// MUST be ThriftMessage (checked in SerializeThriftRequest) // MUST be ThriftFramedMessage (checked in SerializeThriftRequest)
ThriftMessage* response = (ThriftMessage*)cntl->response(); ThriftFramedMessage* response = (ThriftFramedMessage*)cntl->response();
const int saved_error = cntl->ErrorCode(); const int saved_error = cntl->ErrorCode();
if (response != NULL) { if (response != NULL) {
msg->meta.copy_to(&response->head, sizeof(thrift_binary_head_t)); msg->meta.copy_to(&response->head, sizeof(thrift_head_t));
response->head.body_len = ntohl(response->head.body_len); response->head.body_len = ntohl(response->head.body_len);
msg->payload.swap(response->body); msg->payload.swap(response->body);
...@@ -436,9 +444,9 @@ void ProcessThriftResponse(InputMessageBase* msg_base) { ...@@ -436,9 +444,9 @@ void ProcessThriftResponse(InputMessageBase* msg_base) {
} }
auto in_buffer = auto in_buffer =
boost::make_shared<apache::thrift::transport::TMemoryBuffer>(); THRIFT_STDCXX::make_shared<apache::thrift::transport::TMemoryBuffer>();
auto in_portocol = auto in_portocol =
boost::make_shared<apache::thrift::protocol::TBinaryProtocol>(in_buffer); THRIFT_STDCXX::make_shared<apache::thrift::protocol::TBinaryProtocol>(in_buffer);
in_buffer->resetBuffer(thrift_buffer.get(), body_len); in_buffer->resetBuffer(thrift_buffer.get(), body_len);
...@@ -532,14 +540,14 @@ void SerializeThriftRequest(butil::IOBuf* request_buf, Controller* cntl, ...@@ -532,14 +540,14 @@ void SerializeThriftRequest(butil::IOBuf* request_buf, Controller* cntl,
} }
ControllerPrivateAccessor accessor(cntl); ControllerPrivateAccessor accessor(cntl);
const ThriftMessage* req = (const ThriftMessage*)req_base; const ThriftFramedMessage* req = (const ThriftFramedMessage*)req_base;
thrift_binary_head_t head = req->head; thrift_head_t head = req->head;
auto out_buffer = auto out_buffer =
boost::make_shared<apache::thrift::transport::TMemoryBuffer>(); THRIFT_STDCXX::make_shared<apache::thrift::transport::TMemoryBuffer>();
auto out_portocol = auto out_portocol =
boost::make_shared<apache::thrift::protocol::TBinaryProtocol>(out_buffer); THRIFT_STDCXX::make_shared<apache::thrift::protocol::TBinaryProtocol>(out_buffer);
std::string thrift_method_name = cntl->thrift_method_name(); std::string thrift_method_name = cntl->thrift_method_name();
// we should do more check on the thrift method name, but since it is rare when // we should do more check on the thrift method name, but since it is rare when
...@@ -566,7 +574,7 @@ void SerializeThriftRequest(butil::IOBuf* request_buf, Controller* cntl, ...@@ -566,7 +574,7 @@ void SerializeThriftRequest(butil::IOBuf* request_buf, Controller* cntl,
xfer += out_portocol->writeFieldBegin("request", ::apache::thrift::protocol::T_STRUCT, 1); xfer += out_portocol->writeFieldBegin("request", ::apache::thrift::protocol::T_STRUCT, 1);
// request's write // request's write
ThriftMessage* r = const_cast<ThriftMessage*>(req); ThriftFramedMessage* r = const_cast<ThriftFramedMessage*>(req);
xfer += r->write(out_portocol.get()); xfer += r->write(out_portocol.get());
// end request's write // end request's write
......
This diff is collapsed.
...@@ -29,7 +29,6 @@ ...@@ -29,7 +29,6 @@
#include <google/protobuf/generated_message_reflection.h> #include <google/protobuf/generated_message_reflection.h>
#include "google/protobuf/descriptor.pb.h" #include "google/protobuf/descriptor.pb.h"
#include "brpc/details/thrift_binary_head.h" // thrfit_binary_head_t
#include "brpc/details/thrift_utils.h" #include "brpc/details/thrift_utils.h"
#include "butil/iobuf.h" #include "butil/iobuf.h"
...@@ -38,14 +37,20 @@ ...@@ -38,14 +37,20 @@
namespace brpc { namespace brpc {
// Internal implementation detail -- do not call these. // Internal implementation detail -- do not call these.
void protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto(); void protobuf_AddDesc_baidu_2frpc_2fthrift_framed_5fmessage_2eproto();
void protobuf_AssignDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto(); void protobuf_AssignDesc_baidu_2frpc_2fthrift_framed_5fmessage_2eproto();
void protobuf_ShutdownFile_baidu_2frpc_2fthrift_binary_5fmessage_2eproto(); void protobuf_ShutdownFile_baidu_2frpc_2fthrift_framed_5fmessage_2eproto();
static const int32_t THRIFT_HEAD_VERSION_MASK = (int32_t)0xffffff00;
static const int32_t THRIFT_HEAD_VERSION_1 = (int32_t)0x80010000;
struct thrift_head_t {
int32_t body_len;
};
// Representing a thrift_binary request or response. // Representing a thrift framed request or response.
class ThriftMessage : public ::google::protobuf::Message { class ThriftFramedMessage : public ::google::protobuf::Message {
public: public:
thrift_binary_head_t head; thrift_head_t head;
butil::IOBuf body; butil::IOBuf body;
void (*thrift_raw_instance_deleter) (void*); void (*thrift_raw_instance_deleter) (void*);
uint32_t (*thrift_raw_instance_writer) (void*, void*); uint32_t (*thrift_raw_instance_writer) (void*, void*);
...@@ -55,28 +60,28 @@ public: ...@@ -55,28 +60,28 @@ public:
std::string method_name; std::string method_name;
public: public:
ThriftMessage(); ThriftFramedMessage();
virtual ~ThriftMessage(); virtual ~ThriftFramedMessage();
ThriftMessage(const ThriftMessage& from); ThriftFramedMessage(const ThriftFramedMessage& from);
inline ThriftMessage& operator=(const ThriftMessage& from) { inline ThriftFramedMessage& operator=(const ThriftFramedMessage& from) {
CopyFrom(from); CopyFrom(from);
return *this; return *this;
} }
static const ::google::protobuf::Descriptor* descriptor(); static const ::google::protobuf::Descriptor* descriptor();
static const ThriftMessage& default_instance(); static const ThriftFramedMessage& default_instance();
void Swap(ThriftMessage* other); void Swap(ThriftFramedMessage* other);
// implements Message ---------------------------------------------- // implements Message ----------------------------------------------
ThriftMessage* New() const; ThriftFramedMessage* New() const;
void CopyFrom(const ::google::protobuf::Message& from); void CopyFrom(const ::google::protobuf::Message& from);
void MergeFrom(const ::google::protobuf::Message& from); void MergeFrom(const ::google::protobuf::Message& from);
void CopyFrom(const ThriftMessage& from); void CopyFrom(const ThriftFramedMessage& from);
void MergeFrom(const ThriftMessage& from); void MergeFrom(const ThriftFramedMessage& from);
void Clear(); void Clear();
bool IsInitialized() const; bool IsInitialized() const;
...@@ -93,12 +98,11 @@ public: ...@@ -93,12 +98,11 @@ public:
virtual uint32_t read(void* iprot) { return 0;} virtual uint32_t read(void* iprot) { return 0;}
template<typename T> template<typename T>
T* cast() { T* Cast() {
thrift_raw_instance = new T; thrift_raw_instance = new T;
assert(thrift_raw_instance); assert(thrift_raw_instance);
// serilize binary thrift message to thrift struct request // serialize binary thrift message to thrift struct request
// for response, we just return the new instance and deserialize it in Closure // for response, we just return the new instance and deserialize it in Closure
if (body.size() > 0 ) { if (body.size() > 0 ) {
if (serialize_iobuf_to_thrift_message<T>(body, thrift_raw_instance, if (serialize_iobuf_to_thrift_message<T>(body, thrift_raw_instance,
...@@ -118,27 +122,27 @@ private: ...@@ -118,27 +122,27 @@ private:
void SharedCtor(); void SharedCtor();
void SharedDtor(); void SharedDtor();
private: private:
friend void protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto_impl(); friend void protobuf_AddDesc_baidu_2frpc_2fthrift_framed_5fmessage_2eproto_impl();
friend void protobuf_AddDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto(); friend void protobuf_AddDesc_baidu_2frpc_2fthrift_framed_5fmessage_2eproto();
friend void protobuf_AssignDesc_baidu_2frpc_2fthrift_binary_5fmessage_2eproto(); friend void protobuf_AssignDesc_baidu_2frpc_2fthrift_framed_5fmessage_2eproto();
friend void protobuf_ShutdownFile_baidu_2frpc_2fthrift_binary_5fmessage_2eproto(); friend void protobuf_ShutdownFile_baidu_2frpc_2fthrift_framed_5fmessage_2eproto();
void InitAsDefaultInstance(); void InitAsDefaultInstance();
static ThriftMessage* default_instance_; static ThriftFramedMessage* default_instance_;
}; };
template <typename T> template <typename T>
class ThriftTemplateMessage : public ThriftMessage { class ThriftMessage : public ThriftFramedMessage {
public: public:
ThriftTemplateMessage() { ThriftMessage() {
thrift_message_ = new T; thrift_message_ = new T;
assert(thrift_message_ != nullptr); assert(thrift_message_ != nullptr);
} }
virtual ~ThriftTemplateMessage() { delete thrift_message_; } virtual ~ThriftMessage() { delete thrift_message_; }
ThriftTemplateMessage<T>& operator= (const ThriftTemplateMessage<T>& other) { ThriftMessage<T>& operator= (const ThriftMessage<T>& other) {
*thrift_message_ = *(other.thrift_message_); *thrift_message_ = *(other.thrift_message_);
return *this; return *this;
} }
......
...@@ -21,8 +21,6 @@ ...@@ -21,8 +21,6 @@
namespace brpc { namespace brpc {
BAIDU_CASSERT(sizeof(thrift_binary_head_t) == 4, sizeof_thrift_must_be_4);
ThriftService::ThriftService() : _additional_space(0) { ThriftService::ThriftService() : _additional_space(0) {
_status = new (std::nothrow) MethodStatus; _status = new (std::nothrow) MethodStatus;
LOG_IF(FATAL, _status == NULL) << "Fail to new MethodStatus"; LOG_IF(FATAL, _status == NULL) << "Fail to new MethodStatus";
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
#define BRPC_THRIFT_SERVICE_H #define BRPC_THRIFT_SERVICE_H
#include "brpc/controller.h" // Controller #include "brpc/controller.h" // Controller
#include "brpc/thrift_message.h" // ThriftMessage #include "brpc/thrift_message.h" // ThriftFramedMessage
#include "brpc/describable.h" #include "brpc/describable.h"
...@@ -66,8 +66,8 @@ friend class DeleteThriftClosure; ...@@ -66,8 +66,8 @@ friend class DeleteThriftClosure;
Socket* _socket_ptr; Socket* _socket_ptr;
const Server* _server; const Server* _server;
int64_t _start_parse_us; int64_t _start_parse_us;
ThriftMessage _request; ThriftFramedMessage _request;
ThriftMessage _response; ThriftFramedMessage _response;
bool _do_respond; bool _do_respond;
void* _additional_space; void* _additional_space;
Controller _controller; Controller _controller;
...@@ -102,8 +102,8 @@ public: ...@@ -102,8 +102,8 @@ public:
// done You must call done->Run() to end the processing. // done You must call done->Run() to end the processing.
virtual void ProcessThriftFramedRequest(const Server& server, virtual void ProcessThriftFramedRequest(const Server& server,
Controller* controller, Controller* controller,
ThriftMessage* request, ThriftFramedMessage* request,
ThriftMessage* response, ThriftFramedMessage* response,
ThriftClosure* done) = 0; ThriftClosure* done) = 0;
// Put descriptions into the stream. // Put descriptions into the stream.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment