Commit a97e3d2a authored by wangxuefeng's avatar wangxuefeng

Update according to comments by Gejun.

parent e018977c
......@@ -17,8 +17,9 @@ else
LDD=ldd
fi
TEMP=`getopt -o v: --long headers:,libs:,cc:,cxx:,with-glog,nodebugsymbols -n 'config_brpc' -- "$@"`
TEMP=`getopt -o v: --long headers:,libs:,cc:,cxx:,with-glog,with-thrift,nodebugsymbols -n 'config_brpc' -- "$@"`
WITH_GLOG=0
WITH_THRIFT=0
DEBUGSYMBOLS=-g
if [ $? != 0 ] ; then >&2 $ECHO "Terminating..."; exit 1 ; fi
......@@ -34,6 +35,7 @@ while true; do
--cc ) CC=$2; shift 2 ;;
--cxx ) CXX=$2; shift 2 ;;
--with-glog ) WITH_GLOG=1; shift 1 ;;
--with-thrift) WITH_THRIFT=1; shift 1 ;;
--nodebugsymbols ) DEBUGSYMBOLS=; shift 1 ;;
-- ) shift; break ;;
* ) break ;;
......@@ -235,6 +237,22 @@ fi
if [ "$SYSTEM" = "Darwin" ]; then
CPPFLAGS="${CPPFLAGS} -Wno-deprecated-declarations"
fi
if [ $WITH_THRIFT != 0 ]; then
THRIFT_LIB=$(find_dir_of_lib_or_die thriftnb)
THRIFT_HDR=$(find_dir_of_header_or_die thrift/Thrift.h)
append_to_output_libs "$THRIFT_LIB"
append_to_output_headers "$THRIFT_HDR"
CPPFLAGS="${CPPFLAGS} -DENABLE_THRIFT_FRAMED_PROTOCOL"
if [ -f "$THRIFT_LIB/libthriftnb.$SO" ]; then
append_to_output "DYNAMIC_LINKINGS+=-lthriftnb"
else
append_to_output "STATIC_LINKINGS+=-lthriftnb"
fi
fi
append_to_output "CPPFLAGS=${CPPFLAGS}"
append_to_output "ifeq (\$(NEED_LIBPROTOC), 1)"
......
......@@ -23,12 +23,12 @@ CLIENT_OBJS = $(addsuffix .o, $(basename $(CLIENT_SOURCES)))
SERVER_OBJS = $(addsuffix .o, $(basename $(SERVER_SOURCES)))
.PHONY:all
all: echo_client echo_server thrift_server thrift_client libechothrift.a client.o server.o
all: echo_client echo_server native_server native_client libechothrift.a client.o server.o
.PHONY:clean
clean:
@echo "Cleaning"
@rm -rf echo_client echo_server $(PROTO_GENS) $(PROTO_OBJS) $(CLIENT_OBJS) $(SERVER_OBJS) thrift_server thrift_client EchoService.o echo_types.o libechothrift.a gen-cpp
@rm -rf echo_client echo_server $(PROTO_GENS) $(PROTO_OBJS) $(CLIENT_OBJS) $(SERVER_OBJS) native_server native_client EchoService.o echo_types.o libechothrift.a gen-cpp gen-py
echo_client:$(PROTO_OBJS) $(CLIENT_OBJS) libechothrift.a
@echo "Linking $@"
......@@ -57,12 +57,13 @@ endif
libechothrift.a:
@echo "Generating thrift files"
@thrift --gen cpp echo.thrift
@thrift --gen py echo.thrift
@$(CXX) -c gen-cpp/echo_types.cpp -o echo_types.o
@$(CXX) -c gen-cpp/EchoService.cpp -o EchoService.o
@ar -crv libechothrift.a EchoService.o echo_types.o
thrift_server: libechothrift.a
@$(CXX) thrift_server.cpp gen-cpp/echo_types.cpp gen-cpp/EchoService.cpp $(HDRPATHS) $(LIBPATHS) $(CXXFLAGS) $(STATIC_LINKINGS) -lthriftnb -lthrift -levent -lpthread -o thrift_server
native_server: libechothrift.a
@$(CXX) native_server.cpp gen-cpp/echo_types.cpp gen-cpp/EchoService.cpp $(HDRPATHS) $(LIBPATHS) $(CXXFLAGS) $(STATIC_LINKINGS) -lthriftnb -lthrift -levent -lpthread -o native_server
thrift_client: libechothrift.a
@$(CXX) thrift_client.cpp gen-cpp/echo_types.cpp gen-cpp/EchoService.cpp $(HDRPATHS) $(LIBPATHS) $(CXXFLAGS) $(STATIC_LINKINGS) -lthriftnb -lthrift -levent -lpthread -o thrift_client
native_client: libechothrift.a
@$(CXX) native_client.cpp gen-cpp/echo_types.cpp gen-cpp/EchoService.cpp $(HDRPATHS) $(LIBPATHS) $(CXXFLAGS) $(STATIC_LINKINGS) -lthriftnb -lthrift -levent -lpthread -o native_client
......@@ -90,7 +90,7 @@ int main(int argc, char* argv[]) {
<< "Sending thrift requests at qps=" << g_latency_recorder.qps(1)
<< " latency=" << g_latency_recorder.latency(1);
//sleep(1);
sleep(1);
}
......
......@@ -2,11 +2,12 @@
namespace cpp example
struct EchoRequest {
1: string data;
1: required string data;
2: required i32 s;
}
struct EchoResponse {
1: string data;
1: required string data;
}
service EchoService {
......
......@@ -30,7 +30,7 @@ DEFINE_int32(port, 8019, "TCP Port of this server");
DEFINE_int32(port2, 8018, "TCP Port of this server");
DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
"read/write operations during the last `idle_timeout_s'");
DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel");
DEFINE_int32(max_concurrency, 1, "Limit of request processing in parallel");
class EchoServiceHandler : virtual public example::EchoServiceIf {
public:
......@@ -39,7 +39,7 @@ public:
void Echo(example::EchoResponse& res, const example::EchoRequest& req) {
// Process request, just attach a simple string.
res.data = req.data + " world";
//LOG(INFO) << "Echo req.data: " << req.data;
LOG(INFO) << "Echo req.data: " << req.data;
return;
}
......@@ -50,7 +50,7 @@ class MyThriftProtocol : public brpc::ThriftFramedService {
public:
void ProcessThriftFramedRequest(const brpc::Server&,
brpc::Controller* cntl,
const brpc::ThriftFramedMessage& request,
brpc::ThriftFramedMessage* request,
brpc::ThriftFramedMessage* response,
brpc::ThriftFramedClosure* done) {
// This object helps you to call done->Run() in RAII style. If you need
......@@ -79,11 +79,11 @@ public:
};
// Adapt your own thrift-based protocol to use brpc
class MyThriftProtocolAnother : public brpc::ThriftFramedService {
class MyThriftProtocolPbManner : public brpc::ThriftFramedService {
public:
void ProcessThriftFramedRequest(const brpc::Server&,
brpc::Controller* cntl,
const brpc::ThriftFramedMessage& request,
brpc::ThriftFramedMessage* request,
brpc::ThriftFramedMessage* response,
brpc::ThriftFramedClosure* done) {
// This object helps you to call done->Run() in RAII style. If you need
......@@ -97,14 +97,9 @@ public:
return;
}
brpc::ThriftFramedMessage request_ref = request;
example::EchoRequest* req = request_ref.cast<example::EchoRequest>();
example::EchoRequest* req = request->cast<example::EchoRequest>();
example::EchoResponse* res = response->cast<example::EchoResponse>();
// MUST set the thrift method name, we need this info when serializing response.
cntl->set_thrift_method_name("Echo");
// process with req and res
res->data = req->data + " world another!";
......@@ -133,7 +128,7 @@ int main(int argc, char* argv[]) {
brpc::Server server2;
brpc::ServerOptions options2;
options2.thrift_service = new MyThriftProtocolAnother;
options2.thrift_service = new MyThriftProtocolPbManner;
options2.idle_timeout_sec = FLAGS_idle_timeout_s;
options2.max_concurrency = FLAGS_max_concurrency;
......
......@@ -454,11 +454,6 @@ public:
std::string thrift_method_name() { return _thrift_method_name; }
void set_thrift_seq_id(uint32_t seq_id) {
_thrift_seq_id = seq_id;
}
uint32_t thrift_seq_id() { return _thrift_seq_id; }
private:
struct CompletionInfo {
CallId id; // call_id of the corresponding request
......
......@@ -36,6 +36,7 @@
#include "brpc/policy/thrift_protocol.h"
#include "brpc/details/usercode_backup_pool.h"
#include <thrift/Thrift.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/protocol/TBinaryProtocol.h>
......@@ -105,11 +106,12 @@ void ThriftFramedClosure::Run() {
_response.head = _request.head;
if (_response.thrift_raw_instance) {
if (_controller.thrift_method_name() == "" ||
_controller.thrift_method_name().length() < 1 ||
_controller.thrift_method_name()[0] == ' ') {
std::string method_name = _request.method_name;
if (method_name == "" ||
method_name.length() < 1 ||
method_name[0] == ' ') {
_controller.SetFailed(ENOMETHOD,
"invalid thrift method name or method name empty!");
"invalid thrift method name or method name empty in server!");
return;
}
......@@ -119,8 +121,8 @@ void ThriftFramedClosure::Run() {
boost::make_shared<apache::thrift::protocol::TBinaryProtocol>(out_buffer);
// The following code was taken and modified from thrift auto generated code
oprot->writeMessageBegin(_controller.thrift_method_name(),
::apache::thrift::protocol::T_REPLY, _controller.thrift_seq_id());
oprot->writeMessageBegin(method_name,
::apache::thrift::protocol::T_REPLY, _request.thrift_message_seq_id);
uint32_t xfer = 0;
......@@ -146,9 +148,10 @@ void ThriftFramedClosure::Run() {
oprot->getTransport()->flush();
// End thrfit auto generated code
butil::IOBuf buf;
buf.append(out_buffer->getBufferAsString());
_response.body =buf;
uint8_t* buf;
uint32_t sz;
out_buffer->getBuffer(&buf, &sz);
_response.body.append(buf, sz);
}
uint32_t length = _response.body.length();
......@@ -232,7 +235,7 @@ struct CallMethodInBackupThreadArgs {
ThriftFramedService* service;
const Server* server;
Controller* controller;
const ThriftFramedMessage* request;
ThriftFramedMessage* request;
ThriftFramedMessage* response;
ThriftFramedClosure* done;
};
......@@ -240,7 +243,7 @@ struct CallMethodInBackupThreadArgs {
static void CallMethodInBackupThread(void* void_args) {
CallMethodInBackupThreadArgs* args = (CallMethodInBackupThreadArgs*)void_args;
args->service->ProcessThriftFramedRequest(*args->server, args->controller,
*args->request, args->response,
args->request, args->response,
args->done);
delete args;
}
......@@ -248,14 +251,14 @@ static void CallMethodInBackupThread(void* void_args) {
static void EndRunningCallMethodInPool(ThriftFramedService* service,
const Server& server,
Controller* controller,
const ThriftFramedMessage& request,
ThriftFramedMessage* request,
ThriftFramedMessage* response,
ThriftFramedClosure* done) {
CallMethodInBackupThreadArgs* args = new CallMethodInBackupThreadArgs;
args->service = service;
args->server = &server;
args->controller = controller;
args->request = &request;
args->request = request;
args->response = response;
args->done = done;
return EndRunningUserCodeInPool(CallMethodInBackupThread, args);
......@@ -370,15 +373,23 @@ void ProcessThriftFramedRequest(InputMessageBase* msg_base) {
span->set_start_callback_us(butil::cpuwide_time_us());
span->AsParent();
}
try {
if (!FLAGS_usercode_in_pthread) {
return service->ProcessThriftFramedRequest(*server, cntl, *req, res, thrift_done);
return service->ProcessThriftFramedRequest(*server, cntl,
req, res, thrift_done);
}
if (BeginRunningUserCode()) {
service->ProcessThriftFramedRequest(*server, cntl, *req, res, thrift_done);
service->ProcessThriftFramedRequest(*server, cntl, req, res, thrift_done);
return EndRunningUserCodeInPlace();
} else {
return EndRunningCallMethodInPool(
service, *server, cntl, *req, res, thrift_done);
service, *server, cntl, req, res, thrift_done);
}
} catch (::apache::thrift::TException& e) {
cntl->SetFailed(EREQUEST, "Invalid request data, reason: %s", e.what());
} catch (...) {
cntl->SetFailed(EINTERNAL, "Internal server error!");
}
}
......@@ -574,14 +585,15 @@ void SerializeThriftFramedRequest(butil::IOBuf* request_buf, Controller* cntl,
// end send_xxx
// end thrift auto generated code
butil::IOBuf buf;
buf.append(out_buffer->getBufferAsString());
uint8_t* buf;
uint32_t sz;
out_buffer->getBuffer(&buf, &sz);
head.body_len = ntohl(buf.size());
head.body_len = ntohl(sz);
request_buf->append(&head, sizeof(head));
// end auto generate code
request_buf->append(buf);
request_buf->append(buf, sz);
}
......
......@@ -121,6 +121,9 @@ ThriftFramedMessage::ThriftFramedMessage(const ThriftFramedMessage& from)
void ThriftFramedMessage::SharedCtor() {
memset(&head, 0, sizeof(head));
thrift_raw_instance = nullptr;
thrift_message_seq_id = 0;
method_name = "";
}
ThriftFramedMessage::~ThriftFramedMessage() {
......
......@@ -61,10 +61,13 @@ class ThriftFramedMessage : public ::google::protobuf::Message {
public:
thrift_binary_head_t head;
butil::IOBuf body;
std::function< void (void*) > thrift_raw_instance_deleter;
std::function<uint32_t (void*, ::apache::thrift::protocol::TProtocol*) > thrift_raw_instance_writer;
void (*thrift_raw_instance_deleter) (void*);
uint32_t (*thrift_raw_instance_writer) (void*, ::apache::thrift::protocol::TProtocol*);
void* thrift_raw_instance;
int32_t thrift_message_seq_id;
std::string method_name;
public:
ThriftFramedMessage();
virtual ~ThriftFramedMessage();
......@@ -130,11 +133,10 @@ public:
// The following code was taken and modified from thrift auto generated code
int32_t rseqid = 0;
std::string fname;
::apache::thrift::protocol::TMessageType mtype;
in_portocol->readMessageBegin(fname, mtype, rseqid);
in_portocol->readMessageBegin(method_name, mtype, thrift_message_seq_id);
apache::thrift::protocol::TInputRecursionTracker tracker(*in_portocol);
uint32_t xfer = 0;
......
......@@ -102,7 +102,7 @@ public:
// done You must call done->Run() to end the processing.
virtual void ProcessThriftFramedRequest(const Server& server,
Controller* controller,
const ThriftFramedMessage& request,
ThriftFramedMessage* request,
ThriftFramedMessage* response,
ThriftFramedClosure* done) = 0;
......
......@@ -28,7 +28,7 @@
namespace brpc {
bool brpc_thrift_server_helper(const brpc::ThriftFramedMessage& request,
bool brpc_thrift_server_helper(const brpc::ThriftFramedMessage* request,
brpc::ThriftFramedMessage* response,
boost::shared_ptr<::apache::thrift::TDispatchProcessor> processor) {
......@@ -43,10 +43,10 @@ bool brpc_thrift_server_helper(const brpc::ThriftFramedMessage& request,
boost::make_shared<apache::thrift::protocol::TBinaryProtocol>(out_buffer);
// Cut the thrift buffer and parse thrift message
size_t body_len = request.head.body_len;
size_t body_len = request->head.body_len;
auto thrift_buffer = static_cast<uint8_t*>(new uint8_t[body_len]);
const size_t k = request.body.copy_to(thrift_buffer, body_len);
const size_t k = request->body.copy_to(thrift_buffer, body_len);
if ( k != body_len) {
delete [] thrift_buffer;
return false;
......@@ -55,7 +55,10 @@ bool brpc_thrift_server_helper(const brpc::ThriftFramedMessage& request,
in_buffer->resetBuffer(thrift_buffer, body_len);
if (processor->process(in_portocol, out_portocol, NULL)) {
response->body.append(out_buffer->getBufferAsString());
uint8_t* buf;
uint32_t sz;
out_buffer->getBuffer(&buf, &sz);
response->body.append(buf, sz);
} else {
delete [] thrift_buffer;
return false;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment