Commit d396f854 authored by wangxuefeng's avatar wangxuefeng

Fix as comments by gejun.

parent d7bbec1d
......@@ -11,7 +11,7 @@ LIBPATHS = $(addprefix -L, $(LIBS))
COMMA=,
SOPATHS=$(addprefix -Wl$(COMMA)-rpath=, $(LIBS))
STATIC_LINKINGS += -lbrpc -lthrift
STATIC_LINKINGS += -lbrpc -lthrift -lgflags
CLIENT_SOURCES = client.cpp
SERVER_SOURCES = server.cpp
......@@ -28,7 +28,7 @@ all: echo_client echo_server thrift_server thrift_client libechothrift.a
.PHONY:clean
clean:
@echo "Cleaning"
@rm -rf echo_client echo_server $(PROTO_GENS) $(PROTO_OBJS) $(CLIENT_OBJS) $(SERVER_OBJS) thrift_server thrift_client EchoService.o echo_types.o libechothrift.a
@rm -rf echo_client echo_server $(PROTO_GENS) $(PROTO_OBJS) $(CLIENT_OBJS) $(SERVER_OBJS) thrift_server thrift_client EchoService.o echo_types.o libechothrift.a gen-cpp
echo_client:$(PROTO_OBJS) $(CLIENT_OBJS) libechothrift.a
@echo "Linking $@"
......@@ -46,23 +46,23 @@ else
@$(CXX) $(LIBPATHS) -Xlinker "-(" $^ libechothrift.a -Wl,-Bstatic $(STATIC_LINKINGS) -Wl,-Bdynamic -Xlinker "-)" $(DYNAMIC_LINKINGS) -o $@
endif
%.o:%.cpp
%.o:%.cpp libechothrift.a
@echo "Compiling $@"
@$(CXX) -c $(HDRPATHS) $(CXXFLAGS) $< -o $@
%.o:%.cc
%.o:%.cc libechothrift.a
@echo "Compiling $@"
@$(CXX) -c $(HDRPATHS) $(CXXFLAGS) $< -o $@
libechothrift.a:
@echo "Generating thrift files"
@thrift --gen cpp echo.thrift
@g++ -c gen-cpp/echo_types.cpp -o echo_types.o
@g++ -c gen-cpp/EchoService.cpp -o EchoService.o
@$(CXX) -c gen-cpp/echo_types.cpp -o echo_types.o
@$(CXX) -c gen-cpp/EchoService.cpp -o EchoService.o
@ar -crv libechothrift.a EchoService.o echo_types.o
thrift_server: libechothrift.a
@g++ thrift_server.cpp gen-cpp/echo_types.cpp gen-cpp/EchoService.cpp -lthriftnb -lthrift -levent -lpthread -o thrift_server
@$(CXX) thrift_server.cpp gen-cpp/echo_types.cpp gen-cpp/EchoService.cpp $(HDRPATHS) $(LIBPATHS) $(CXXFLAGS) $(STATIC_LINKINGS) -lthriftnb -lthrift -levent -lpthread -o thrift_server
thrift_client: libechothrift.a
@g++ thrift_client.cpp gen-cpp/echo_types.cpp gen-cpp/EchoService.cpp -lthriftnb -lthrift -levent -lpthread -o thrift_client
@$(CXX) thrift_client.cpp gen-cpp/echo_types.cpp gen-cpp/EchoService.cpp $(HDRPATHS) $(LIBPATHS) $(CXXFLAGS) $(STATIC_LINKINGS) -lthriftnb -lthrift -levent -lpthread -o thrift_client
......@@ -7,6 +7,4 @@ summary:
thrift_client/thrift_server:
thrift cpp version
build:
you need to add thrift as dep library in config.mk
// Copyright (c) 2014 Baidu, Inc.
// Copyright (c) 2017 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
......@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// A client sending requests to server every 1 second.
// A client sending thrift requests to server every 1 second.
#include <gflags/gflags.h>
......@@ -29,12 +29,9 @@
#include "gen-cpp/EchoService.h"
#include "gen-cpp/echo_types.h"
bvar::LatencyRecorder g_latency_recorder("client");
using apache::thrift::transport::TMemoryBuffer;
#include "thrift_utils.h"
using namespace std;
using namespace example;
bvar::LatencyRecorder g_latency_recorder("client");
DEFINE_string(server, "0.0.0.0:8019", "IP Address of server");
DEFINE_string(load_balancer, "", "The algorithm for load balancing");
......@@ -66,31 +63,17 @@ int main(int argc, char* argv[]) {
brpc::ThriftBinaryMessage response;
brpc::Controller cntl;
// Append message to `request'
boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> o_buffer(new apache::thrift::transport::TMemoryBuffer());
boost::shared_ptr<apache::thrift::protocol::TBinaryProtocol> oprot(new apache::thrift::protocol::TBinaryProtocol(o_buffer));
// Construct request
EchoRequest t_request;
t_request.data = "hello";
// Serialize thrift request to binary request
oprot->writeMessageBegin("Echo", ::apache::thrift::protocol::T_CALL, 0);
EchoService_Echo_pargs args;
args.request = &t_request;
args.write(oprot.get());
oprot->writeMessageEnd();
oprot->getTransport()->writeEnd();
oprot->getTransport()->flush();
// Thrift Req
example::EchoRequest thrift_request;
thrift_request.data = "hello";
butil::IOBuf buf;
std::string s = o_buffer->getBufferAsString();
buf.append(s);
request.body = buf;
std::string function_name = "Echo";
int32_t seqid = 0;
if (!serilize_thrift_server_message<example::EchoService_Echo_pargs>(thrift_request, function_name, seqid, &request)) {
LOG(ERROR) << "serilize_thrift_server_message error!";
continue;
}
cntl.set_log_id(log_id ++); // set by user
......@@ -105,71 +88,20 @@ int main(int argc, char* argv[]) {
g_latency_recorder << cntl.latency_us();
}
example::EchoResponse thrift_response;
if (!deserilize_thrift_server_message<example::EchoService_Echo_presult>(response, &function_name, &seqid, &thrift_response)) {
LOG(ERROR) << "deserilize_thrift_server_message error!";
continue;
}
// Parse/Desrialize binary response to thrift response
boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> buffer(new apache::thrift::transport::TMemoryBuffer());
boost::shared_ptr<apache::thrift::protocol::TBinaryProtocol> iprot(new apache::thrift::protocol::TBinaryProtocol(buffer));
size_t body_len = response.head.body_len;
uint8_t* thrfit_b = (uint8_t*)malloc(body_len);
const size_t k = response.body.copy_to(thrfit_b, body_len);
if ( k != body_len) {
std::cout << "copy_to error!" << std::endl;
printf("k: %lu, body_len: %lu\n", k, body_len);
return -1;
}
buffer->resetBuffer(thrfit_b, body_len);
int32_t rseqid = 0;
std::string fname; // Thrift function name
::apache::thrift::protocol::TMessageType mtype;
try {
iprot->readMessageBegin(fname, mtype, rseqid);
if (mtype == ::apache::thrift::protocol::T_EXCEPTION) {
::apache::thrift::TApplicationException x;
x.read(iprot.get());
iprot->readMessageEnd();
iprot->getTransport()->readEnd();
throw x;
}
if (mtype != ::apache::thrift::protocol::T_REPLY) {
iprot->skip(::apache::thrift::protocol::T_STRUCT);
iprot->readMessageEnd();
iprot->getTransport()->readEnd();
}
if (fname.compare("Echo") != 0) {
iprot->skip(::apache::thrift::protocol::T_STRUCT);
iprot->readMessageEnd();
iprot->getTransport()->readEnd();
}
EchoResponse t_response;
EchoService_Echo_presult result;
result.success = &t_response;
result.read(iprot.get());
iprot->readMessageEnd();
iprot->getTransport()->readEnd();
if (!result.__isset.success) {
// _return pointer has now been filled
std::cout << "result.success not set!" << std::endl;
return -1;
}
std::cout << "response: " << t_response.data << std::endl;
} catch (...) {
std::cout << "Thrift Exception!" << std::endl;
}
LOG(INFO) << "Thrift function_name: " << function_name
<< "Thrift Res data: " << thrift_response.data;
LOG_EVERY_SECOND(INFO)
<< "Sending thrift requests at qps=" << g_latency_recorder.qps(1)
<< " latency=" << g_latency_recorder.latency(1);
sleep(1);
}
LOG(INFO) << "EchoClient is going to quit";
......
// Copyright (c) 2014 Baidu, Inc.
// Copyright (c) 2016 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
......@@ -25,17 +25,13 @@
#include "gen-cpp/EchoService.h"
#include "gen-cpp/echo_types.h"
#include "thrift_utils.h"
DEFINE_int32(port, 8019, "TCP Port of this server");
DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
"read/write operations during the last `idle_timeout_s'");
DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel");
using apache::thrift::protocol::TBinaryProtocol;
using apache::thrift::transport::TMemoryBuffer;
using namespace std;
using namespace example;
// Adapt your own thrift-based protocol to use brpc
class MyThriftProtocol : public brpc::ThriftFramedService {
public:
......@@ -55,63 +51,33 @@ public:
return;
}
boost::shared_ptr<TMemoryBuffer> buffer(new TMemoryBuffer());
boost::shared_ptr<TBinaryProtocol> iprot(new TBinaryProtocol(buffer));
EchoRequest t_request;
size_t body_len = request.head.body_len;
uint8_t* thrfit_b = (uint8_t*)malloc(body_len);
const size_t k = request.body.copy_to(thrfit_b, body_len);
if ( k != body_len) {
cntl->CloseConnection("Close connection due to copy thrift binary message error");
example::EchoRequest thrift_request;
std::string function_name;
int32_t seqid;
//
if (!serilize_thrift_client_message<example::EchoService_Echo_args>(request,
&thrift_request, &function_name, &seqid)) {
cntl->CloseConnection("Close connection due to serilize thrift client reuqest error!");
LOG(ERROR) << "serilize thrift client reuqest error!";
return;
}
}
EchoService_Echo_args args;
buffer->resetBuffer(thrfit_b, body_len);
int32_t rseqid = 0;
std::string fname;
::apache::thrift::protocol::TMessageType mtype;
// deserilize thrift message
iprot->readMessageBegin(fname, mtype, rseqid);
LOG(INFO) << "RPC funcname: " << function_name
<< "thrift request data: " << thrift_request.data;
args.read(iprot.get());
iprot->readMessageEnd();
iprot->getTransport()->readEnd();
example::EchoResponse thrift_response;
// Proc RPC , just append a simple string
thrift_response.data = thrift_request.data + " world";
t_request = args.request;
std::cout << "RPC funcname: " << fname << std::endl;
std::cout << "request.data: " << t_request.data << std::endl;
boost::shared_ptr<TMemoryBuffer> o_buffer(new TMemoryBuffer());
boost::shared_ptr<TBinaryProtocol> oprot(new TBinaryProtocol(o_buffer));
// Proc RPC
EchoResponse t_response;
t_response.data = t_request.data + " world";
EchoService_Echo_result result;
result.success = t_response;
result.__isset.success = true;
// serilize response
oprot->writeMessageBegin("Echo", ::apache::thrift::protocol::T_REPLY, 0);
result.write(oprot.get());
oprot->writeMessageEnd();
oprot->getTransport()->writeEnd();
oprot->getTransport()->flush();
butil::IOBuf buf;
std::string s = o_buffer->getBufferAsString();
buf.append(s);
response->body = buf;
if (!deserilize_thrift_client_message<example::EchoService_Echo_result>(thrift_response,
function_name, seqid, response)) {
cntl->CloseConnection("Close connection due to deserilize thrift client response error!");
LOG(ERROR) << "deserilize thrift client response error!";
return;
}
printf("process in MyThriftProtocol\n");
LOG(INFO) << "success process thrift request in brpc";
}
};
......
// Copyright (c) 2017 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// A thrift client sending requests to server every 1 second.
#include <gflags/gflags.h>
#include "gen-cpp/EchoService.h"
#include "gen-cpp/echo_types.h"
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <iostream>
#include <butil/logging.h>
DEFINE_string(server, "0.0.0.0", "IP Address of server");
DEFINE_int32(port, 8019, "Port of server");
int main(int argc, char **argv) {
boost::shared_ptr<apache::thrift::transport::TSocket> socket(new apache::thrift::transport::TSocket("127.0.0.1", 8019));
boost::shared_ptr<apache::thrift::transport::TTransport> transport(new apache::thrift::transport::TFramedTransport(socket));
boost::shared_ptr<apache::thrift::protocol::TProtocol> protocol(new apache::thrift::protocol::TBinaryProtocol(transport));
example::EchoServiceClient client(protocol);
transport->open();
example::EchoRequest req;
req.data = "hello";
// Parse gflags. We recommend you to use gflags as well.
google::ParseCommandLineFlags(&argc, &argv, true);
boost::shared_ptr<apache::thrift::transport::TSocket> socket(
new apache::thrift::transport::TSocket(FLAGS_server, FLAGS_port));
boost::shared_ptr<apache::thrift::transport::TTransport> transport(
new apache::thrift::transport::TFramedTransport(socket));
boost::shared_ptr<apache::thrift::protocol::TProtocol> protocol(
new apache::thrift::protocol::TBinaryProtocol(transport));
example::EchoServiceClient client(protocol);
transport->open();
example::EchoRequest req;
req.data = "hello";
example::EchoResponse res;
client.Echo(res, req);
example::EchoResponse res;
client.Echo(res, req);
std::cout << "Res: " << res.data << std::endl;
LOG(INFO)
<< "Req: " << req.data
<< "Res: " << res.data;
transport->close();
transport->close();
return 0;
return 0;
}
// Copyright (c) 2017 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// A thrift server to receive EchoRequest and send back EchoResponse.
#include <gflags/gflags.h>
#include <butil/logging.h>
#include "gen-cpp/EchoService.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
......@@ -6,46 +26,46 @@
#include <thrift/server/TNonblockingServer.h>
#include <thrift/concurrency/PosixThreadFactory.h>
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace apache::thrift::server;
using namespace apache::thrift::concurrency;
using boost::shared_ptr;
using namespace example;
DEFINE_int32(port, 8019, "Port of server");
class EchoServiceHandler : virtual public EchoServiceIf {
class EchoServiceHandler : virtual public example::EchoServiceIf {
public:
EchoServiceHandler() {}
void Echo(EchoResponse& res, const EchoRequest& req) {
void Echo(example::EchoResponse& res, const example::EchoRequest& req) {
// Process request, just attach a simple string.
res.data = req.data + " world";
return;
}
};
int main(int argc, char **argv) {
int port = 8019;
int main(int argc, char *argv[]) {
// Parse gflags. We recommend you to use gflags as well.
google::ParseCommandLineFlags(&argc, &argv, true);
shared_ptr<EchoServiceHandler> handler(new EchoServiceHandler());
shared_ptr<PosixThreadFactory> thread_factory(
new PosixThreadFactory(PosixThreadFactory::ROUND_ROBIN,
PosixThreadFactory::NORMAL, 1, false) );
boost::shared_ptr<EchoServiceHandler> handler(new EchoServiceHandler());
boost::shared_ptr<apache::thrift::concurrency::PosixThreadFactory> thread_factory(
new apache::thrift::concurrency::PosixThreadFactory(
apache::thrift::concurrency::PosixThreadFactory::ROUND_ROBIN,
apache::thrift::concurrency::PosixThreadFactory::NORMAL, 1, false));
shared_ptr<TProcessor> processor(new EchoServiceProcessor(handler));
shared_ptr<TProtocolFactory> protocol_factory(new TBinaryProtocolFactory());
shared_ptr<TTransportFactory> transport_factory(new TBufferedTransportFactory());
shared_ptr<ThreadManager> thread_mgr(ThreadManager::newSimpleThreadManager(2));
boost::shared_ptr<apache::thrift::server::TProcessor> processor(
new example::EchoServiceProcessor(handler));
boost::shared_ptr<apache::thrift::protocol::TProtocolFactory> protocol_factory(
new apache::thrift::protocol::TBinaryProtocolFactory());
boost::shared_ptr<apache::thrift::transport::TTransportFactory> transport_factory(
new apache::thrift::transport::TBufferedTransportFactory());
boost::shared_ptr<apache::thrift::concurrency::ThreadManager> thread_mgr(
apache::thrift::concurrency::ThreadManager::newSimpleThreadManager(2));
thread_mgr->threadFactory(thread_factory);
thread_mgr->start();
TNonblockingServer server(processor,
apache::thrift::server::TNonblockingServer server(processor,
transport_factory, transport_factory, protocol_factory,
protocol_factory, port, thread_mgr);
protocol_factory, FLAGS_port, thread_mgr);
server.serve();
return 0;
......
// Copyright (c) 2017 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// utils for serilize/deserilize thrift binary message to thrift obj.
#include "thrift/transport/TBufferTransports.h"
#include "thrift/protocol/TBinaryProtocol.h"
template <typename THRIFT_ARG, typename THRIFT_REQ>
bool serilize_thrift_client_message(const brpc::ThriftBinaryMessage& request,
THRIFT_REQ* thrift_request, std::string* function_name, int32_t* seqid) {
boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> buffer(
new apache::thrift::transport::TMemoryBuffer());
boost::shared_ptr<apache::thrift::protocol::TBinaryProtocol> iprot(
new apache::thrift::protocol::TBinaryProtocol(buffer));
size_t body_len = request.head.body_len;
uint8_t* thrift_buffer = (uint8_t*)malloc(body_len);
const size_t k = request.body.copy_to(thrift_buffer, body_len);
if ( k != body_len) {
free(thrift_buffer);
return false;
}
THRIFT_ARG args;
buffer->resetBuffer(thrift_buffer, body_len);
apache::thrift::protocol::TMessageType mtype;
// deserilize thrift message
iprot->readMessageBegin(*function_name, mtype, *seqid);
args.read(iprot.get());
iprot->readMessageEnd();
iprot->getTransport()->readEnd();
*thrift_request = args.request;
free(thrift_buffer);
return true;
}
template <typename THRIFT_ARG, typename THRIFT_RES>
bool deserilize_thrift_client_message(const THRIFT_RES& thrift_response,
const std::string& function_name, const int32_t seqid, brpc::ThriftBinaryMessage* response) {
boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> o_buffer(
new apache::thrift::transport::TMemoryBuffer());
boost::shared_ptr<apache::thrift::protocol::TBinaryProtocol> oprot(
new apache::thrift::protocol::TBinaryProtocol(o_buffer));
THRIFT_ARG result;
result.success = thrift_response;
result.__isset.success = true;
// serilize response
oprot->writeMessageBegin(function_name, ::apache::thrift::protocol::T_REPLY, seqid);
result.write(oprot.get());
oprot->writeMessageEnd();
oprot->getTransport()->writeEnd();
oprot->getTransport()->flush();
butil::IOBuf buf;
std::string s = o_buffer->getBufferAsString();
buf.append(s);
response->body = buf;
return true;
}
template <typename THRIFT_ARG, typename THRIFT_REQ>
bool serilize_thrift_server_message(const THRIFT_REQ& thrift_request,
const std::string& function_name, const int32_t seqid, brpc::ThriftBinaryMessage* request) {
boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> o_buffer(
new apache::thrift::transport::TMemoryBuffer());
boost::shared_ptr<apache::thrift::protocol::TBinaryProtocol> oprot(
new apache::thrift::protocol::TBinaryProtocol(o_buffer));
oprot->writeMessageBegin(function_name, apache::thrift::protocol::T_CALL, seqid);
THRIFT_ARG args;
args.request = &thrift_request;
args.write(oprot.get());
oprot->writeMessageEnd();
oprot->getTransport()->writeEnd();
oprot->getTransport()->flush();
butil::IOBuf buf;
std::string s = o_buffer->getBufferAsString();
buf.append(s);
request->body = buf;
return true;
}
template<typename THRIFT_ARG, typename THRIFT_RES>
bool deserilize_thrift_server_message(const brpc::ThriftBinaryMessage& response,
std::string* function_name, int32_t* seqid, THRIFT_RES* thrift_response) {
boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> buffer(
new apache::thrift::transport::TMemoryBuffer());
boost::shared_ptr<apache::thrift::protocol::TBinaryProtocol> iprot(
new apache::thrift::protocol::TBinaryProtocol(buffer));
size_t body_len = response.head.body_len;
uint8_t* thrift_buffer = (uint8_t*)malloc(body_len);
const size_t k = response.body.copy_to(thrift_buffer, body_len);
if ( k != body_len) {
free(thrift_buffer);
return false;
}
buffer->resetBuffer(thrift_buffer, body_len);
apache::thrift::protocol::TMessageType mtype;
try {
iprot->readMessageBegin(*function_name, mtype, *seqid);
THRIFT_ARG result;
result.success = thrift_response;
result.read(iprot.get());
iprot->readMessageEnd();
iprot->getTransport()->readEnd();
if (!result.__isset.success) {
free(thrift_buffer);
return false;
}
} catch (...) {
free(thrift_buffer);
return false;
}
free(thrift_buffer);
return true;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment