Commit d33f601f authored by wangxuefeng's avatar wangxuefeng

Add reflection to server side to make the user life easy.

parent 56b220b8
...@@ -23,8 +23,8 @@ ...@@ -23,8 +23,8 @@
#include <brpc/thrift_binary_message.h> #include <brpc/thrift_binary_message.h>
#include <bvar/bvar.h> #include <bvar/bvar.h>
#include "thrift/transport/TBufferTransports.h" #include <thrift/transport/TBufferTransports.h>
#include "thrift/protocol/TBinaryProtocol.h" #include <thrift/protocol/TBinaryProtocol.h>
#include "gen-cpp/EchoService.h" #include "gen-cpp/EchoService.h"
#include "gen-cpp/echo_types.h" #include "gen-cpp/echo_types.h"
...@@ -58,28 +58,27 @@ int main(int argc, char* argv[]) { ...@@ -58,28 +58,27 @@ int main(int argc, char* argv[]) {
// Send a request and wait for the response every 1 second. // Send a request and wait for the response every 1 second.
int log_id = 0; int log_id = 0;
boost::shared_ptr<BrpcThriftClient<example::EchoServiceClient>> client =
boost::make_shared<BrpcThriftClient<example::EchoServiceClient>>();
while (!brpc::IsAskedToQuit()) { while (!brpc::IsAskedToQuit()) {
brpc::ThriftBinaryMessage request;
brpc::ThriftBinaryMessage response;
brpc::Controller cntl; brpc::Controller cntl;
cntl.set_log_id(log_id ++); // set by user
// Thrift Req // Thrift Req
example::EchoRequest thrift_request; example::EchoRequest thrift_request;
example::EchoResponse thrift_response;
thrift_request.data = "hello"; thrift_request.data = "hello";
std::string function_name = "Echo"; // util the Thrift client's send_XXX method, actuall do serilize work inside
int32_t seqid = 0; client->get_thrift_client()->send_Echo(thrift_request);
if (!serilize_thrift_server_message<example::EchoService_Echo_pargs>(thrift_request, function_name, seqid, &request)) {
LOG(ERROR) << "serilize_thrift_server_message error!";
continue;
}
cntl.set_log_id(log_id ++); // set by user // do rpc call actually
client->call_method(&channel, &cntl);
// Because `done'(last parameter) is NULL, this function waits until // util the Thrift client's recv_XXX method, actuall do deserilize work inside
// the response comes back or error occurs(including timedout). client->get_thrift_client()->recv_Echo(thrift_response);
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
if (cntl.Failed()) { if (cntl.Failed()) {
LOG(ERROR) << "Fail to send thrift request, " << cntl.ErrorText(); LOG(ERROR) << "Fail to send thrift request, " << cntl.ErrorText();
...@@ -88,14 +87,7 @@ int main(int argc, char* argv[]) { ...@@ -88,14 +87,7 @@ int main(int argc, char* argv[]) {
g_latency_recorder << cntl.latency_us(); g_latency_recorder << cntl.latency_us();
} }
example::EchoResponse thrift_response; LOG(INFO) << "Thrift Res data: " << thrift_response.data;
if (!deserilize_thrift_server_message<example::EchoService_Echo_presult>(response, &function_name, &seqid, &thrift_response)) {
LOG(ERROR) << "deserilize_thrift_server_message error!";
continue;
}
LOG(INFO) << "Thrift function_name: " << function_name
<< "Thrift Res data: " << thrift_response.data;
LOG_EVERY_SECOND(INFO) LOG_EVERY_SECOND(INFO)
<< "Sending thrift requests at qps=" << g_latency_recorder.qps(1) << "Sending thrift requests at qps=" << g_latency_recorder.qps(1)
......
...@@ -19,8 +19,8 @@ ...@@ -19,8 +19,8 @@
#include <brpc/server.h> #include <brpc/server.h>
#include <brpc/thrift_service.h> #include <brpc/thrift_service.h>
#include "thrift/transport/TBufferTransports.h" #include <thrift/protocol/TBinaryProtocol.h>
#include "thrift/protocol/TBinaryProtocol.h" #include <thrift/transport/TBufferTransports.h>
#include "gen-cpp/EchoService.h" #include "gen-cpp/EchoService.h"
#include "gen-cpp/echo_types.h" #include "gen-cpp/echo_types.h"
...@@ -32,6 +32,19 @@ DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no " ...@@ -32,6 +32,19 @@ DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
"read/write operations during the last `idle_timeout_s'"); "read/write operations during the last `idle_timeout_s'");
DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel"); DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel");
class EchoServiceHandler : virtual public example::EchoServiceIf {
public:
EchoServiceHandler() {}
void Echo(example::EchoResponse& res, const example::EchoRequest& req) {
// Process request, just attach a simple string.
res.data = req.data + " world";
LOG(INFO) << "Echo req.data: " << req.data;
return;
}
};
// Adapt your own thrift-based protocol to use brpc // Adapt your own thrift-based protocol to use brpc
class MyThriftProtocol : public brpc::ThriftFramedService { class MyThriftProtocol : public brpc::ThriftFramedService {
public: public:
...@@ -51,34 +64,18 @@ public: ...@@ -51,34 +64,18 @@ public:
return; return;
} }
example::EchoRequest thrift_request; // Just an example, you don't need to new the processor each time.
std::string function_name; boost::shared_ptr<EchoServiceHandler> service_hander(new EchoServiceHandler());
int32_t seqid; boost::shared_ptr<example::EchoServiceProcessor> processor(
new example::EchoServiceProcessor(service_hander));
// if (brpc_thrift_server_helper(request, response, processor)) {
if (!serilize_thrift_client_message<example::EchoService_Echo_args>(request, LOG(INFO) << "success to process thrift request in brpc";
&thrift_request, &function_name, &seqid)) { } else {
cntl->CloseConnection("Close connection due to serilize thrift client reuqest error!"); LOG(INFO) << "failed to process thrift request in brpc";
LOG(ERROR) << "serilize thrift client reuqest error!";
return;
} }
LOG(INFO) << "RPC funcname: " << function_name
<< "thrift request data: " << thrift_request.data;
example::EchoResponse thrift_response;
// Proc RPC , just append a simple string
thrift_response.data = thrift_request.data + " world";
if (!deserilize_thrift_client_message<example::EchoService_Echo_result>(thrift_response,
function_name, seqid, response)) {
cntl->CloseConnection("Close connection due to deserilize thrift client response error!");
LOG(ERROR) << "deserilize thrift client response error!";
return;
} }
LOG(INFO) << "success process thrift request in brpc";
}
}; };
int main(int argc, char* argv[]) { int main(int argc, char* argv[]) {
......
...@@ -45,12 +45,14 @@ int main(int argc, char **argv) { ...@@ -45,12 +45,14 @@ int main(int argc, char **argv) {
req.data = "hello"; req.data = "hello";
example::EchoResponse res; example::EchoResponse res;
while (1) {
client.Echo(res, req); client.Echo(res, req);
LOG(INFO) LOG(INFO) << "Req: " << req.data
<< "Req: " << req.data
<< "Res: " << res.data; << "Res: " << res.data;
sleep(1);
}
transport->close(); transport->close();
return 0; return 0;
......
...@@ -14,140 +14,104 @@ ...@@ -14,140 +14,104 @@
// utils for serilize/deserilize thrift binary message to thrift obj. // utils for serilize/deserilize thrift binary message to thrift obj.
#include "thrift/transport/TBufferTransports.h" #include <brpc/channel.h>
#include "thrift/protocol/TBinaryProtocol.h"
template <typename THRIFT_ARG, typename THRIFT_REQ> #include <boost/make_shared.hpp>
bool serilize_thrift_client_message(const brpc::ThriftBinaryMessage& request,
THRIFT_REQ* thrift_request, std::string* function_name, int32_t* seqid) {
boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> buffer( #include <thrift/transport/TBufferTransports.h>
new apache::thrift::transport::TMemoryBuffer()); #include <thrift/protocol/TBinaryProtocol.h>
boost::shared_ptr<apache::thrift::protocol::TBinaryProtocol> iprot(
new apache::thrift::protocol::TBinaryProtocol(buffer));
size_t body_len = request.head.body_len; template <class T>
uint8_t* thrift_buffer = (uint8_t*)malloc(body_len); class BrpcThriftClient {
const size_t k = request.body.copy_to(thrift_buffer, body_len);
if ( k != body_len) {
free(thrift_buffer);
return false;
}
THRIFT_ARG args; public:
buffer->resetBuffer(thrift_buffer, body_len); BrpcThriftClient() {
apache::thrift::protocol::TMessageType mtype;
// deserilize thrift message out_buffer_ = boost::make_shared<apache::thrift::transport::TMemoryBuffer>();
iprot->readMessageBegin(*function_name, mtype, *seqid); out_ = boost::make_shared<apache::thrift::protocol::TBinaryProtocol>(out_buffer_);
args.read(iprot.get()); in_buffer_ = boost::make_shared<apache::thrift::transport::TMemoryBuffer>();
iprot->readMessageEnd(); in_ = boost::make_shared<apache::thrift::protocol::TBinaryProtocol>(in_buffer_);
iprot->getTransport()->readEnd();
*thrift_request = args.request; client_ = boost::make_shared<T>(in_, out_);
}
free(thrift_buffer);
return true;
}
template <typename THRIFT_ARG, typename THRIFT_RES> boost::shared_ptr<T> get_thrift_client() {
bool deserilize_thrift_client_message(const THRIFT_RES& thrift_response, return client_;
const std::string& function_name, const int32_t seqid, brpc::ThriftBinaryMessage* response) { }
boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> o_buffer( void call_method(brpc::Channel* channel, brpc::Controller* cntl) {
new apache::thrift::transport::TMemoryBuffer());
boost::shared_ptr<apache::thrift::protocol::TBinaryProtocol> oprot(
new apache::thrift::protocol::TBinaryProtocol(o_buffer));
THRIFT_ARG result; brpc::ThriftBinaryMessage request;
result.success = thrift_response; brpc::ThriftBinaryMessage response;
result.__isset.success = true;
// serilize response in_buffer_->resetBuffer();
oprot->writeMessageBegin(function_name, ::apache::thrift::protocol::T_REPLY, seqid);
result.write(oprot.get());
oprot->writeMessageEnd();
oprot->getTransport()->writeEnd();
oprot->getTransport()->flush();
butil::IOBuf buf; butil::IOBuf buf;
std::string s = o_buffer->getBufferAsString(); buf.append(out_buffer_->getBufferAsString());
buf.append(s); request.body = buf;
// send the request the server
// Because `done'(last parameter) is NULL, this function waits until
// the response comes back or error occurs(including timedout).
channel->CallMethod(NULL, cntl, &request, &response, NULL);
if (!cntl->Failed()) {
size_t body_len = response.head.body_len;
uint8_t* thrift_buffer = (uint8_t*)malloc(body_len);
const size_t k = response.body.copy_to(thrift_buffer, body_len);
if ( k != body_len) {
free(thrift_buffer);
cntl->SetFailed("copy response buf failed!");
return;
}
in_buffer_->resetBuffer(thrift_buffer, body_len);
}
return;
}
response->body = buf; private:
boost::shared_ptr<T> client_;
boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> out_buffer_;
boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> in_buffer_;
boost::shared_ptr<apache::thrift::protocol::TBinaryProtocol> in_;
boost::shared_ptr<apache::thrift::protocol::TBinaryProtocol> out_;
return true; };
}
template <typename THRIFT_ARG, typename THRIFT_REQ>
bool serilize_thrift_server_message(const THRIFT_REQ& thrift_request,
const std::string& function_name, const int32_t seqid, brpc::ThriftBinaryMessage* request) {
boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> o_buffer( bool brpc_thrift_server_helper(const brpc::ThriftBinaryMessage& request,
brpc::ThriftBinaryMessage* response,
boost::shared_ptr<apache::thrift::TDispatchProcessor> processor) {
boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> in_buffer(
new apache::thrift::transport::TMemoryBuffer()); new apache::thrift::transport::TMemoryBuffer());
boost::shared_ptr<apache::thrift::protocol::TBinaryProtocol> oprot( boost::shared_ptr<apache::thrift::protocol::TBinaryProtocol> in(
new apache::thrift::protocol::TBinaryProtocol(o_buffer)); new apache::thrift::protocol::TBinaryProtocol(in_buffer));
oprot->writeMessageBegin(function_name, apache::thrift::protocol::T_CALL, seqid);
THRIFT_ARG args;
args.request = &thrift_request;
args.write(oprot.get());
oprot->writeMessageEnd(); boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> out_buffer(
oprot->getTransport()->writeEnd();
oprot->getTransport()->flush();
butil::IOBuf buf;
std::string s = o_buffer->getBufferAsString();
buf.append(s);
request->body = buf;
return true;
}
template<typename THRIFT_ARG, typename THRIFT_RES>
bool deserilize_thrift_server_message(const brpc::ThriftBinaryMessage& response,
std::string* function_name, int32_t* seqid, THRIFT_RES* thrift_response) {
boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> buffer(
new apache::thrift::transport::TMemoryBuffer()); new apache::thrift::transport::TMemoryBuffer());
boost::shared_ptr<apache::thrift::protocol::TBinaryProtocol> iprot( boost::shared_ptr<apache::thrift::protocol::TBinaryProtocol> out(
new apache::thrift::protocol::TBinaryProtocol(buffer)); new apache::thrift::protocol::TBinaryProtocol(out_buffer));
size_t body_len = response.head.body_len; // Cut the thrift buffer and parse thrift message
size_t body_len = request.head.body_len;
uint8_t* thrift_buffer = (uint8_t*)malloc(body_len); uint8_t* thrift_buffer = (uint8_t*)malloc(body_len);
const size_t k = response.body.copy_to(thrift_buffer, body_len);
const size_t k = request.body.copy_to(thrift_buffer, body_len);
if ( k != body_len) { if ( k != body_len) {
free(thrift_buffer); free(thrift_buffer);
return false; return false;
} }
buffer->resetBuffer(thrift_buffer, body_len); in_buffer->resetBuffer(thrift_buffer, body_len);
apache::thrift::protocol::TMessageType mtype; if (processor->process(in, out, NULL)) {
butil::IOBuf buf;
try { std::string s = out_buffer->getBufferAsString();
iprot->readMessageBegin(*function_name, mtype, *seqid); buf.append(s);
response->body = buf;
THRIFT_ARG result; } else {
result.success = thrift_response;
result.read(iprot.get());
iprot->readMessageEnd();
iprot->getTransport()->readEnd();
if (!result.__isset.success) {
free(thrift_buffer);
return false;
}
} catch (...) {
free(thrift_buffer);
return false; return false;
} }
free(thrift_buffer);
return true; return true;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment