Commit ad4dbeca authored by wangxuefeng's avatar wangxuefeng

Thrift client side code refactor

parent 47faf695
......@@ -6,7 +6,8 @@ include config.mk
# 2. Added -D__const__= : Avoid over-optimizations of TLS variables by GCC>=4.8
# 3. Removed -Werror: Not block compilation for non-vital warnings, especially when the
# code is tested on newer systems. If the code is used in production, add -Werror back
CPPFLAGS+=-DBTHREAD_USE_FAST_PTHREAD_MUTEX -D__const__= -D_GNU_SOURCE -DUSE_SYMBOLIZE -DNO_TCMALLOC -D__STDC_FORMAT_MACROS -D__STDC_LIMIT_MACROS -D__STDC_CONSTANT_MACROS -DBRPC_REVISION=\"$(shell git rev-parse --short HEAD)\"
#CPPFLAGS+= -DENABLE_THRIFT_FRAMED_PROTOCOL -DBTHREAD_USE_FAST_PTHREAD_MUTEX -D__const__= -D_GNU_SOURCE -DUSE_SYMBOLIZE -DNO_TCMALLOC -D__STDC_FORMAT_MACROS -D__STDC_LIMIT_MACROS -D__STDC_CONSTANT_MACROS -DBRPC_REVISION=\"$(shell git rev-parse --short HEAD)\"
CPPFLAGS+= -DBTHREAD_USE_FAST_PTHREAD_MUTEX -D__const__= -D_GNU_SOURCE -DUSE_SYMBOLIZE -DNO_TCMALLOC -D__STDC_FORMAT_MACROS -D__STDC_LIMIT_MACROS -D__STDC_CONSTANT_MACROS -DBRPC_REVISION=\"$(shell git rev-parse --short HEAD)\"
CXXFLAGS=$(CPPFLAGS) -O2 -pipe -Wall -W -fPIC -fstrict-aliasing -Wno-invalid-offsetof -Wno-unused-parameter -fno-omit-frame-pointer -std=c++0x
CFLAGS=$(CPPFLAGS) -O2 -pipe -Wall -W -fPIC -fstrict-aliasing -Wno-unused-parameter -fno-omit-frame-pointer
DEBUG_CXXFLAGS = $(filter-out -DNDEBUG,$(CXXFLAGS)) -DUNIT_TEST -DBVAR_NOT_LINK_DEFAULT_VARIABLES
......
......@@ -3,7 +3,7 @@ include $(BRPC_PATH)/config.mk
# Notes on the flags:
# 1. Added -fno-omit-frame-pointer: perf/tcmalloc-profiler use frame pointers by default
# 2. Added -D__const__= : Avoid over-optimizations of TLS variables by GCC>=4.8
CXXFLAGS = -std=c++0x -g -DDEBUG -D__const__= -pipe -W -Wall -Werror -Wno-unused-parameter -fPIC -fno-omit-frame-pointer
CXXFLAGS = -std=c++0x -g -DENABLE_THRIFT_FRAMED_PROTOCOL -DDEBUG -D__const__= -pipe -W -Wall -Werror -Wno-unused-parameter -fPIC -fno-omit-frame-pointer
HDRS+=$(BRPC_PATH)/output/include
LIBS+=$(BRPC_PATH)/output/lib
HDRPATHS = $(addprefix -I, $(HDRS))
......@@ -13,8 +13,8 @@ SOPATHS=$(addprefix -Wl$(COMMA)-rpath=, $(LIBS))
STATIC_LINKINGS += -lbrpc -lthrift -lgflags
CLIENT_SOURCES = client.cpp thrift_brpc_helper_transport.cpp
SERVER_SOURCES = server.cpp thrift_brpc_helper_transport.cpp
CLIENT_SOURCES = client.cpp
SERVER_SOURCES = server.cpp
PROTOS = $(wildcard *.proto)
PROTO_OBJS = $(PROTOS:.proto=.pb.o)
......
......@@ -16,8 +16,12 @@
#include <gflags/gflags.h>
#include "gen-cpp/EchoService.h"
#include "gen-cpp/echo_types.h"
#include <butil/logging.h>
#include <butil/time.h>
#include <butil/thrift_utils.h>
#include <butil/strings/string_piece.h>
#include <brpc/channel.h>
#include <brpc/thrift_binary_message.h>
......@@ -26,11 +30,6 @@
#include <thrift/transport/TBufferTransports.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include "gen-cpp/EchoService.h"
#include "gen-cpp/echo_types.h"
#include "thrift_utils.h"
bvar::LatencyRecorder g_latency_recorder("client");
DEFINE_string(server, "0.0.0.0:8019", "IP Address of server");
......@@ -58,22 +57,28 @@ int main(int argc, char* argv[]) {
// Send a request and wait for the response every 1 second.
int log_id = 0;
apache::thrift::transport::TThriftBrpcHelperTransport* transport;
auto client = InitThriftClient<example::EchoServiceClient>(&channel, &transport);
std::string query_string = "hello";
for(auto i = 0; i < 1000000; i++) {
query_string += " test";
}
while (!brpc::IsAskedToQuit()) {
brpc::Controller cntl;
cntl.set_log_id(log_id ++); // set by user
transport->set_controller(&cntl);
// Thrift Req
example::EchoRequest thrift_request;
example::EchoResponse thrift_response;
thrift_request.data = "hello";
client->Echo(thrift_response, thrift_request);
// wrapper thrift raw request into ThriftMessage
brpc::ThriftMessage<example::EchoRequest> req(&thrift_request);
brpc::ThriftMessage<example::EchoResponse> res(&thrift_response);
cntl.set_thrift_method_name("Echo");
channel.CallMethod(NULL, &cntl, &req, &res, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to send thrift request, " << cntl.ErrorText();
......@@ -88,9 +93,13 @@ int main(int argc, char* argv[]) {
<< "Sending thrift requests at qps=" << g_latency_recorder.qps(1)
<< " latency=" << g_latency_recorder.latency(1);
//sleep(1);
sleep(1);
}
LOG(INFO) << "EchoClient is going to quit";
return 0;
}
template class brpc::ThriftMessage<example::EchoRequest>;
template class brpc::ThriftMessage<example::EchoResponse>;
......@@ -2,14 +2,14 @@
namespace cpp example
struct EchoRequest {
1: string data;
1: string data;
}
struct EchoResponse {
1: string data;
1: string data;
}
service EchoService {
EchoResponse Echo(1:EchoRequest request);
EchoResponse Echo(1:EchoRequest request);
}
......@@ -16,6 +16,7 @@
#include <gflags/gflags.h>
#include <butil/logging.h>
#include <butil/thrift_utils.h>
#include <brpc/server.h>
#include <brpc/thrift_service.h>
......@@ -25,8 +26,6 @@
#include "gen-cpp/EchoService.h"
#include "gen-cpp/echo_types.h"
#include "thrift_utils.h"
DEFINE_int32(port, 8019, "TCP Port of this server");
DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
"read/write operations during the last `idle_timeout_s'");
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <cassert>
#include <algorithm>
#include "thrift_brpc_helper_transport.h"
#include <brpc/thrift_binary_message.h>
using std::string;
namespace apache {
namespace thrift {
namespace transport {
void TThriftBrpcHelperTransport::computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give) {
// Correct rBound_ so we can use the fast path in the future.
rBound_ = wBase_;
// Decide how much to give.
uint32_t give = (std::min)(len, available_read());
*out_start = rBase_;
*out_give = give;
// Preincrement rBase_ so the caller doesn't have to.
rBase_ += give;
}
uint32_t TThriftBrpcHelperTransport::readSlow(uint8_t* buf, uint32_t len) {
uint8_t* start;
uint32_t give;
computeRead(len, &start, &give);
// Copy into the provided buffer.
memcpy(buf, start, give);
return give;
}
uint32_t TThriftBrpcHelperTransport::readAppendToString(std::string& str, uint32_t len) {
// Don't get some stupid assertion failure.
if (buffer_ == NULL) {
return 0;
}
uint8_t* start;
uint32_t give;
computeRead(len, &start, &give);
// Append to the provided string.
str.append((char*)start, give);
return give;
}
void TThriftBrpcHelperTransport::ensureCanWrite(uint32_t len) {
// Check available space
uint32_t avail = available_write();
if (len <= avail) {
return;
}
if (!owner_) {
throw TTransportException("Insufficient space in external TThriftBrpcHelperTransport memory");
}
// Grow the buffer as necessary.
uint32_t new_size = bufferSize_;
while (len > avail) {
new_size = new_size > 0 ? new_size * 2 : 1;
avail = available_write() + (new_size - bufferSize_);
}
// Allocate into a new pointer so we don't bork ours if it fails.
uint8_t* new_buffer = static_cast<uint8_t*>(std::realloc(buffer_, new_size));
if (new_buffer == NULL) {
throw std::bad_alloc();
}
rBase_ = new_buffer + (rBase_ - buffer_);
rBound_ = new_buffer + (rBound_ - buffer_);
wBase_ = new_buffer + (wBase_ - buffer_);
wBound_ = new_buffer + new_size;
buffer_ = new_buffer;
bufferSize_ = new_size;
}
void TThriftBrpcHelperTransport::writeSlow(const uint8_t* buf, uint32_t len) {
ensureCanWrite(len);
// Copy into the buffer and increment wBase_.
memcpy(wBase_, buf, len);
wBase_ += len;
}
void TThriftBrpcHelperTransport::wroteBytes(uint32_t len) {
uint32_t avail = available_write();
if (len > avail) {
throw TTransportException("Client wrote more bytes than size of buffer.");
}
wBase_ += len;
}
const uint8_t* TThriftBrpcHelperTransport::borrowSlow(uint8_t* buf, uint32_t* len) {
(void)buf;
rBound_ = wBase_;
if (available_read() >= *len) {
*len = available_read();
return rBase_;
}
return NULL;
}
// return number of bytes read
uint32_t TThriftBrpcHelperTransport::readEnd() {
// This cast should be safe, because buffer_'s size is a uint32_t
uint32_t bytes = static_cast<uint32_t>(rBase_ - buffer_);
if (rBase_ == wBase_) {
resetBuffer();
}
return bytes;
}
// Return number of bytes written
uint32_t TThriftBrpcHelperTransport::writeEnd() {
// This cast should be safe, because buffer_'s size is a uint32_t
brpc::ThriftBinaryMessage request;
brpc::ThriftBinaryMessage response;
butil::IOBuf buf;
buf.append(this->getBufferAsString());
request.body = buf;
// send the request the server
// Because `done'(last parameter) is NULL, this function waits until
// the response comes back or error occurs(including timedout).
channel_->CallMethod(NULL, cntl_, &request, &response, NULL);
if (!cntl_->Failed()) {
size_t body_len = response.head.body_len;
uint8_t* thrift_buffer = (uint8_t*)malloc(body_len);
const size_t k = response.body.copy_to(thrift_buffer, body_len);
if ( k != body_len) {
cntl_->SetFailed("copy response body to thrift buffer failed!");
free(thrift_buffer);
return 0;
}
this->resetBuffer(thrift_buffer, body_len, TAKE_OWNERSHIP);
}
return static_cast<uint32_t>(wBase_ - buffer_);
}
}
}
} // apache::thrift::transport
......@@ -50,8 +50,8 @@ int main(int argc, char **argv) {
client.Echo(res, req);
LOG(INFO) << "Req: " << req.data
<< "Res: " << res.data;
//sleep(1);
<< " Res: " << res.data;
sleep(1);
}
transport->close();
......
File mode changed from 100644 to 100755
......@@ -248,6 +248,7 @@ void Controller::InternalReset(bool in_constructor) {
_request_stream = INVALID_STREAM_ID;
_response_stream = INVALID_STREAM_ID;
_remote_stream_settings = NULL;
_thrift_method_name = "";
}
Controller::Call::Call(Controller::Call* rhs)
......
......@@ -434,6 +434,11 @@ public:
void set_idl_result(int64_t result) { _idl_result = result; }
int64_t idl_result() const { return _idl_result; }
void set_thrift_method_name(std::string method_name) {
_thrift_method_name = method_name;
}
std::string thrift_method_name() { return _thrift_method_name; }
private:
struct CompletionInfo {
CallId id; // call_id of the corresponding request
......@@ -664,6 +669,9 @@ private:
StreamId _response_stream;
// Defined at both sides
StreamSettings *_remote_stream_settings;
// Thrift method name, only used when thrift protocol enabled
std::string _thrift_method_name;
};
// Advises the RPC system that the caller desires that the RPC call be
......
......@@ -422,6 +422,7 @@ static void GlobalInitializeOrDieImpl() {
exit(1);
}
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
Protocol thrift_binary_protocol = { ParseThriftBinaryMessage,
SerializeThriftBinaryRequest, PackThriftBinaryRequest,
ProcessThriftBinaryRequest, ProcessThriftBinaryResponse,
......@@ -430,6 +431,7 @@ static void GlobalInitializeOrDieImpl() {
if (RegisterProtocol(PROTOCOL_THRIFT, thrift_binary_protocol) != 0) {
exit(1);
}
#endif //ENABLE_THRIFT_FRAMED_PROTOCOL
Protocol mc_binary_protocol = { ParseMemcacheMessage,
SerializeMemcacheRequest,
......
......@@ -17,12 +17,15 @@
#include <google/protobuf/descriptor.h> // MethodDescriptor
#include <google/protobuf/message.h> // Message
#include <gflags/gflags.h>
#include <boost/make_shared.hpp>
#include "butil/time.h"
#include "butil/iobuf.h" // butil::IOBuf
#include "butil/iobuf.h" // butil::IOBuf
#include "brpc/log.h"
#include "brpc/controller.h" // Controller
#include "brpc/socket.h" // Socket
#include "brpc/server.h" // Server
#include "brpc/controller.h" // Controller
#include "brpc/socket.h" // Socket
#include "brpc/server.h" // Server
#include "brpc/span.h"
#include "brpc/details/server_private_accessor.h"
#include "brpc/details/controller_private_accessor.h"
......@@ -31,11 +34,13 @@
#include "brpc/policy/thrift_protocol.h"
#include "brpc/details/usercode_backup_pool.h"
#include <thrift/transport/TBufferTransports.h>
#include <thrift/protocol/TBinaryProtocol.h>
extern "C" {
void bthread_assign_data(void* data) __THROW;
}
namespace brpc {
ThriftFramedClosure::ThriftFramedClosure(void* additional_space)
......@@ -358,6 +363,93 @@ void ProcessThriftBinaryResponse(InputMessageBase* msg_base) {
msg->meta.copy_to(&response->head, sizeof(thrift_binary_head_t));
response->head.body_len = ntohl(response->head.body_len);
msg->payload.swap(response->body);
uint32_t body_len = response->head.body_len;
// Deserialize binary message to thrift message
uint8_t* thrift_buffer =
static_cast<uint8_t*>(new uint8_t[body_len]);
const size_t k = response->body.copy_to(thrift_buffer, body_len);
if ( k != body_len) {
cntl->SetFailed("copy response body to thrift buffer failed!");
delete [] thrift_buffer;
return;
}
auto in_buffer =
boost::make_shared<apache::thrift::transport::TMemoryBuffer>();
auto in_portocol =
boost::make_shared<apache::thrift::protocol::TBinaryProtocol>(in_buffer);
in_buffer->resetBuffer(thrift_buffer, body_len,
::apache::thrift::transport::TMemoryBuffer::TAKE_OWNERSHIP);
// The following code was taken from thrift auto generate code
int32_t rseqid = 0;
std::string fname;
::apache::thrift::protocol::TMessageType mtype;
in_portocol->readMessageBegin(fname, mtype, rseqid);
if (mtype == ::apache::thrift::protocol::T_EXCEPTION) {
cntl->SetFailed("thrift process server response exception!");
return;
}
if (mtype != ::apache::thrift::protocol::T_REPLY) {
in_portocol->skip(::apache::thrift::protocol::T_STRUCT);
in_portocol->readMessageEnd();
in_portocol->getTransport()->readEnd();
}
if (fname.compare(cntl->thrift_method_name()) != 0) {
in_portocol->skip(::apache::thrift::protocol::T_STRUCT);
in_portocol->readMessageEnd();
in_portocol->getTransport()->readEnd();
}
// presult section
apache::thrift::protocol::TInputRecursionTracker tracker(*in_portocol);
uint32_t xfer = 0;
::apache::thrift::protocol::TType ftype;
int16_t fid;
xfer += in_portocol->readStructBegin(fname);
using ::apache::thrift::protocol::TProtocolException;
bool success = false;
while (true)
{
xfer += in_portocol->readFieldBegin(fname, ftype, fid);
if (ftype == ::apache::thrift::protocol::T_STOP) {
break;
}
switch (fid)
{
case 0:
if (ftype == ::apache::thrift::protocol::T_STRUCT) {
xfer += response->read(in_portocol.get());
success = true;
} else {
xfer += in_portocol->skip(ftype);
}
break;
default:
xfer += in_portocol->skip(ftype);
break;
}
xfer += in_portocol->readFieldEnd();
}
xfer += in_portocol->readStructEnd();
// end presult section
in_portocol->readMessageEnd();
in_portocol->getTransport()->readEnd();
if (!success) {
cntl->SetFailed("thrift process server response exception!");
return;
}
} // else just ignore the response.
// Unlocks correlation_id inside. Revert controller's
......@@ -383,11 +475,64 @@ void SerializeThriftBinaryRequest(butil::IOBuf* request_buf, Controller* cntl,
ControllerPrivateAccessor accessor(cntl);
const ThriftBinaryMessage* req = (const ThriftBinaryMessage*)req_base;
thrift_binary_head_t head = req->head;
head.body_len = ntohl(req->body.size());
auto out_buffer =
boost::make_shared<apache::thrift::transport::TMemoryBuffer>();
auto out_portocol =
boost::make_shared<apache::thrift::protocol::TBinaryProtocol>(out_buffer);
std::string thrift_method_name = cntl->thrift_method_name();
// we should do more check on the thrift method name, but since it is rare when
// the method_name is just some white space or something else
if (cntl->thrift_method_name() == "" ||
cntl->thrift_method_name().length() < 1 ||
cntl->thrift_method_name()[0] == ' ') {
return cntl->SetFailed(ENOMETHOD,
"invalid thrift method name or method name empty!");
}
// The following code was taken from thrift auto generated code
// send_xxx
int32_t cseqid = 0;
out_portocol->writeMessageBegin(thrift_method_name,
::apache::thrift::protocol::T_CALL, cseqid);
// xxx_pargs write
uint32_t xfer = 0;
apache::thrift::protocol::TOutputRecursionTracker tracker(*out_portocol);
std::string struct_begin_str = "ThriftService_" + thrift_method_name + "_pargs";
xfer += out_portocol->writeStructBegin(struct_begin_str.c_str());
xfer += out_portocol->writeFieldBegin("request", ::apache::thrift::protocol::T_STRUCT, 1);
// request's write
ThriftBinaryMessage* r = const_cast<ThriftBinaryMessage*>(req);
xfer += r->write(out_portocol.get());
// end request's write
xfer += out_portocol->writeFieldEnd();
xfer += out_portocol->writeFieldStop();
xfer += out_portocol->writeStructEnd();
// end xxx_pargs write
out_portocol->writeMessageEnd();
out_portocol->getTransport()->writeEnd();
out_portocol->getTransport()->flush();
// end send_xxx
// end thrift auto generated code
butil::IOBuf buf;
buf.append(out_buffer->getBufferAsString());
head.body_len = ntohl(buf.size());
request_buf->append(&head, sizeof(head));
request_buf->append(req->body);
// end auto generate code
request_buf->append(buf);
}
void PackThriftBinaryRequest(
......
......@@ -26,9 +26,10 @@
#include <google/protobuf/generated_message_reflection.h>
#include "google/protobuf/descriptor.pb.h"
#include "brpc/thrift_binary_head.h" // thrfit_binary_head_t
#include "brpc/thrift_binary_head.h" // thrfit_binary_head_t
#include "butil/iobuf.h" // IOBuf
#include <thrift/protocol/TBinaryProtocol.h>
namespace brpc {
......@@ -42,7 +43,7 @@ class ThriftBinaryMessage : public ::google::protobuf::Message {
public:
thrift_binary_head_t head;
butil::IOBuf body;
public:
ThriftBinaryMessage();
virtual ~ThriftBinaryMessage();
......@@ -78,6 +79,9 @@ public:
int GetCachedSize() const { return ByteSize(); }
::google::protobuf::Metadata GetMetadata() const;
virtual uint32_t write(::apache::thrift::protocol::TProtocol* oprot) { return 0;}
virtual uint32_t read(::apache::thrift::protocol::TProtocol* iprot) { return 0;}
private:
void SharedCtor();
void SharedDtor();
......@@ -91,7 +95,28 @@ friend void protobuf_ShutdownFile_baidu_2frpc_2fthrift_binary_5fmessage_2eproto(
static ThriftBinaryMessage* default_instance_;
};
} // namespace brpc
template <typename T>
class ThriftMessage : public ThriftBinaryMessage {
public:
ThriftMessage(T* thrift_message) {
thrift_message_ = thrift_message;
}
virtual ~ThriftMessage() {}
virtual uint32_t write(::apache::thrift::protocol::TProtocol* oprot) {
return thrift_message_->write(oprot);
}
virtual uint32_t read(::apache::thrift::protocol::TProtocol* iprot) {
return thrift_message_->read(iprot);
}
private:
T* thrift_message_;
};
} // namespace brpc
#endif // BRPC_THRIFT_BINARY_MESSAGE_H
......@@ -12,67 +12,59 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// utils for serilize/deserilize thrift binary message to thrift obj.
// utils for serilize/deserilize thrift binary message to brpc protobuf obj.
#include <brpc/channel.h>
#ifndef BRPC_THRIFT_UTILS_H
#define BRPC_THRIFT_UTILS_H
#include <boost/make_shared.hpp>
#include "thrift_brpc_helper_transport.h"
#include <brpc/channel.h>
#include <brpc/thrift_binary_message.h>
#include <thrift/TDispatchProcessor.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/protocol/TBinaryProtocol.h>
namespace brpc {
template <typename T>
boost::shared_ptr<T> InitThriftClient(brpc::Channel* channel,
apache::thrift::transport::TThriftBrpcHelperTransport** transport) {
auto thrift_brpc_transport =
boost::make_shared<apache::thrift::transport::TThriftBrpcHelperTransport>();
thrift_brpc_transport->set_channel(channel);
*transport = thrift_brpc_transport.get();
auto out = boost::make_shared<apache::thrift::protocol::TBinaryProtocol>(thrift_brpc_transport);
auto in = boost::make_shared<apache::thrift::protocol::TBinaryProtocol>(thrift_brpc_transport);
bool brpc_thrift_server_helper(const brpc::ThriftBinaryMessage& request,
brpc::ThriftBinaryMessage* response,
boost::shared_ptr<::apache::thrift::TDispatchProcessor> processor) {
return boost::make_shared<T>(in, out);
}
auto in_buffer =
boost::make_shared<apache::thrift::transport::TMemoryBuffer>();
auto in_portocol =
boost::make_shared<apache::thrift::protocol::TBinaryProtocol>(in_buffer);
bool brpc_thrift_server_helper(const brpc::ThriftBinaryMessage& request,
brpc::ThriftBinaryMessage* response,
boost::shared_ptr<apache::thrift::TDispatchProcessor> processor) {
boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> in_buffer(
new apache::thrift::transport::TMemoryBuffer());
boost::shared_ptr<apache::thrift::protocol::TBinaryProtocol> in(
new apache::thrift::protocol::TBinaryProtocol(in_buffer));
boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> out_buffer(
new apache::thrift::transport::TMemoryBuffer());
boost::shared_ptr<apache::thrift::protocol::TBinaryProtocol> out(
new apache::thrift::protocol::TBinaryProtocol(out_buffer));
auto out_buffer =
boost::make_shared<apache::thrift::transport::TMemoryBuffer>();
auto out_portocol =
boost::make_shared<apache::thrift::protocol::TBinaryProtocol>(out_buffer);
// Cut the thrift buffer and parse thrift message
size_t body_len = request.head.body_len;
//std::shared_ptr<uint8_t> thrift_buffer(new uint8_t[10],std::default_delete<uint8_t[]>());
uint8_t* thrift_buffer = (uint8_t*)malloc(body_len);
auto thrift_buffer = static_cast<uint8_t*>(new uint8_t[body_len]);
const size_t k = request.body.copy_to(thrift_buffer, body_len);
if ( k != body_len) {
free(thrift_buffer);
delete [] thrift_buffer;
return false;
}
in_buffer->resetBuffer(thrift_buffer, body_len);
if (processor->process(in, out, NULL)) {
if (processor->process(in_portocol, out_portocol, NULL)) {
response->body.append(out_buffer->getBufferAsString());
} else {
free(thrift_buffer);
delete [] thrift_buffer;
return false;
}
free(thrift_buffer);
delete [] thrift_buffer;
return true;
}
}
#endif //BRPC_THRIFT_UTILS_H
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment