Commit 47faf695 authored by wangxuefeng's avatar wangxuefeng

Implement thrift brpc transport.

parent abd72b3b
...@@ -3,7 +3,6 @@ include $(BRPC_PATH)/config.mk ...@@ -3,7 +3,6 @@ include $(BRPC_PATH)/config.mk
# Notes on the flags: # Notes on the flags:
# 1. Added -fno-omit-frame-pointer: perf/tcmalloc-profiler use frame pointers by default # 1. Added -fno-omit-frame-pointer: perf/tcmalloc-profiler use frame pointers by default
# 2. Added -D__const__= : Avoid over-optimizations of TLS variables by GCC>=4.8 # 2. Added -D__const__= : Avoid over-optimizations of TLS variables by GCC>=4.8
#CXXFLAGS = -std=c++0x -g -DDEBUG -D__const__= -DBRPC_ENABLE_CPU_PROFILER -pipe -W -Wall -Werror -Wno-unused-parameter -fPIC -fno-omit-frame-pointer
CXXFLAGS = -std=c++0x -g -DDEBUG -D__const__= -pipe -W -Wall -Werror -Wno-unused-parameter -fPIC -fno-omit-frame-pointer CXXFLAGS = -std=c++0x -g -DDEBUG -D__const__= -pipe -W -Wall -Werror -Wno-unused-parameter -fPIC -fno-omit-frame-pointer
HDRS+=$(BRPC_PATH)/output/include HDRS+=$(BRPC_PATH)/output/include
LIBS+=$(BRPC_PATH)/output/lib LIBS+=$(BRPC_PATH)/output/lib
...@@ -14,8 +13,8 @@ SOPATHS=$(addprefix -Wl$(COMMA)-rpath=, $(LIBS)) ...@@ -14,8 +13,8 @@ SOPATHS=$(addprefix -Wl$(COMMA)-rpath=, $(LIBS))
STATIC_LINKINGS += -lbrpc -lthrift -lgflags STATIC_LINKINGS += -lbrpc -lthrift -lgflags
CLIENT_SOURCES = client.cpp CLIENT_SOURCES = client.cpp thrift_brpc_helper_transport.cpp
SERVER_SOURCES = server.cpp SERVER_SOURCES = server.cpp thrift_brpc_helper_transport.cpp
PROTOS = $(wildcard *.proto) PROTOS = $(wildcard *.proto)
PROTO_OBJS = $(PROTOS:.proto=.pb.o) PROTO_OBJS = $(PROTOS:.proto=.pb.o)
...@@ -24,7 +23,7 @@ CLIENT_OBJS = $(addsuffix .o, $(basename $(CLIENT_SOURCES))) ...@@ -24,7 +23,7 @@ CLIENT_OBJS = $(addsuffix .o, $(basename $(CLIENT_SOURCES)))
SERVER_OBJS = $(addsuffix .o, $(basename $(SERVER_SOURCES))) SERVER_OBJS = $(addsuffix .o, $(basename $(SERVER_SOURCES)))
.PHONY:all .PHONY:all
all: echo_client echo_server thrift_server thrift_client libechothrift.a all: echo_client echo_server thrift_server thrift_client libechothrift.a client.o server.o
.PHONY:clean .PHONY:clean
clean: clean:
......
File mode changed from 100755 to 100644
...@@ -58,27 +58,22 @@ int main(int argc, char* argv[]) { ...@@ -58,27 +58,22 @@ int main(int argc, char* argv[]) {
// Send a request and wait for the response every 1 second. // Send a request and wait for the response every 1 second.
int log_id = 0; int log_id = 0;
boost::shared_ptr<BrpcThriftClient<example::EchoServiceClient>> client = apache::thrift::transport::TThriftBrpcHelperTransport* transport;
boost::make_shared<BrpcThriftClient<example::EchoServiceClient>>(); auto client = InitThriftClient<example::EchoServiceClient>(&channel, &transport);
while (!brpc::IsAskedToQuit()) { while (!brpc::IsAskedToQuit()) {
brpc::Controller cntl; brpc::Controller cntl;
cntl.set_log_id(log_id ++); // set by user cntl.set_log_id(log_id ++); // set by user
transport->set_controller(&cntl);
// Thrift Req // Thrift Req
example::EchoRequest thrift_request; example::EchoRequest thrift_request;
example::EchoResponse thrift_response; example::EchoResponse thrift_response;
thrift_request.data = "hello"; thrift_request.data = "hello";
// util the Thrift client's send_XXX method, actuall do serilize work inside client->Echo(thrift_response, thrift_request);
client->get_thrift_client()->send_Echo(thrift_request);
// do rpc call actually
client->call_method(&channel, &cntl);
// util the Thrift client's recv_XXX method, actuall do deserilize work inside
client->get_thrift_client()->recv_Echo(thrift_response);
if (cntl.Failed()) { if (cntl.Failed()) {
LOG(ERROR) << "Fail to send thrift request, " << cntl.ErrorText(); LOG(ERROR) << "Fail to send thrift request, " << cntl.ErrorText();
......
File mode changed from 100755 to 100644
File mode changed from 100755 to 100644
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <cassert>
#include <algorithm>
#include "thrift_brpc_helper_transport.h"
#include <brpc/thrift_binary_message.h>
using std::string;
namespace apache {
namespace thrift {
namespace transport {
void TThriftBrpcHelperTransport::computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give) {
// Correct rBound_ so we can use the fast path in the future.
rBound_ = wBase_;
// Decide how much to give.
uint32_t give = (std::min)(len, available_read());
*out_start = rBase_;
*out_give = give;
// Preincrement rBase_ so the caller doesn't have to.
rBase_ += give;
}
uint32_t TThriftBrpcHelperTransport::readSlow(uint8_t* buf, uint32_t len) {
uint8_t* start;
uint32_t give;
computeRead(len, &start, &give);
// Copy into the provided buffer.
memcpy(buf, start, give);
return give;
}
uint32_t TThriftBrpcHelperTransport::readAppendToString(std::string& str, uint32_t len) {
// Don't get some stupid assertion failure.
if (buffer_ == NULL) {
return 0;
}
uint8_t* start;
uint32_t give;
computeRead(len, &start, &give);
// Append to the provided string.
str.append((char*)start, give);
return give;
}
void TThriftBrpcHelperTransport::ensureCanWrite(uint32_t len) {
// Check available space
uint32_t avail = available_write();
if (len <= avail) {
return;
}
if (!owner_) {
throw TTransportException("Insufficient space in external TThriftBrpcHelperTransport memory");
}
// Grow the buffer as necessary.
uint32_t new_size = bufferSize_;
while (len > avail) {
new_size = new_size > 0 ? new_size * 2 : 1;
avail = available_write() + (new_size - bufferSize_);
}
// Allocate into a new pointer so we don't bork ours if it fails.
uint8_t* new_buffer = static_cast<uint8_t*>(std::realloc(buffer_, new_size));
if (new_buffer == NULL) {
throw std::bad_alloc();
}
rBase_ = new_buffer + (rBase_ - buffer_);
rBound_ = new_buffer + (rBound_ - buffer_);
wBase_ = new_buffer + (wBase_ - buffer_);
wBound_ = new_buffer + new_size;
buffer_ = new_buffer;
bufferSize_ = new_size;
}
void TThriftBrpcHelperTransport::writeSlow(const uint8_t* buf, uint32_t len) {
ensureCanWrite(len);
// Copy into the buffer and increment wBase_.
memcpy(wBase_, buf, len);
wBase_ += len;
}
void TThriftBrpcHelperTransport::wroteBytes(uint32_t len) {
uint32_t avail = available_write();
if (len > avail) {
throw TTransportException("Client wrote more bytes than size of buffer.");
}
wBase_ += len;
}
const uint8_t* TThriftBrpcHelperTransport::borrowSlow(uint8_t* buf, uint32_t* len) {
(void)buf;
rBound_ = wBase_;
if (available_read() >= *len) {
*len = available_read();
return rBase_;
}
return NULL;
}
// return number of bytes read
uint32_t TThriftBrpcHelperTransport::readEnd() {
// This cast should be safe, because buffer_'s size is a uint32_t
uint32_t bytes = static_cast<uint32_t>(rBase_ - buffer_);
if (rBase_ == wBase_) {
resetBuffer();
}
return bytes;
}
// Return number of bytes written
uint32_t TThriftBrpcHelperTransport::writeEnd() {
// This cast should be safe, because buffer_'s size is a uint32_t
brpc::ThriftBinaryMessage request;
brpc::ThriftBinaryMessage response;
butil::IOBuf buf;
buf.append(this->getBufferAsString());
request.body = buf;
// send the request the server
// Because `done'(last parameter) is NULL, this function waits until
// the response comes back or error occurs(including timedout).
channel_->CallMethod(NULL, cntl_, &request, &response, NULL);
if (!cntl_->Failed()) {
size_t body_len = response.head.body_len;
uint8_t* thrift_buffer = (uint8_t*)malloc(body_len);
const size_t k = response.body.copy_to(thrift_buffer, body_len);
if ( k != body_len) {
cntl_->SetFailed("copy response body to thrift buffer failed!");
free(thrift_buffer);
return 0;
}
this->resetBuffer(thrift_buffer, body_len, TAKE_OWNERSHIP);
}
return static_cast<uint32_t>(wBase_ - buffer_);
}
}
}
} // apache::thrift::transport
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef _THRIFT_BRPC_HELPER_TRANSPORT_H_
#define _THRIFT_BRPC_HELPER_TRANSPORT_H_ 1
#include <cstdlib>
#include <cstring>
#include <limits>
#include <boost/scoped_array.hpp>
#include <thrift/transport/TTransport.h>
#include <thrift/transport/TVirtualTransport.h>
#include <thrift/transport/TBufferTransports.h>
#include "butil/iobuf.h"
#include <brpc/channel.h>
#ifdef __GNUC__
#define TDB_LIKELY(val) (__builtin_expect((val), 1))
#define TDB_UNLIKELY(val) (__builtin_expect((val), 0))
#else
#define TDB_LIKELY(val) (val)
#define TDB_UNLIKELY(val) (val)
#endif
namespace apache {
namespace thrift {
namespace transport {
/**
* A memory buffer is a tranpsort that simply reads from and writes to an
* in memory buffer. Anytime you call write on it, the data is simply placed
* into a buffer, and anytime you call read, data is read from that buffer.
*
* The buffers are allocated using C constructs malloc,realloc, and the size
* doubles as necessary. We've considered using scoped
*
*/
class TThriftBrpcHelperTransport : public TVirtualTransport<TThriftBrpcHelperTransport, TBufferBase> {
private:
// Common initialization done by all constructors.
void initCommon(uint8_t* buf, uint32_t size, bool owner, uint32_t wPos) {
if (buf == NULL && size != 0) {
assert(owner);
buf = (uint8_t*)std::malloc(size);
if (buf == NULL) {
throw std::bad_alloc();
}
}
buffer_ = buf;
bufferSize_ = size;
rBase_ = buffer_;
rBound_ = buffer_ + wPos;
// TODO(dreiss): Investigate NULL-ing this if !owner.
wBase_ = buffer_ + wPos;
wBound_ = buffer_ + bufferSize_;
owner_ = owner;
// rBound_ is really an artifact. In principle, it should always be
// equal to wBase_. We update it in a few places (computeRead, etc.).
}
public:
static const uint32_t defaultSize = 1024;
/**
* This enum specifies how a TThriftBrpcHelperTransport should treat
* memory passed to it via constructors or resetBuffer.
*
* OBSERVE:
* TThriftBrpcHelperTransport will simply store a pointer to the memory.
* It is the callers responsibility to ensure that the pointer
* remains valid for the lifetime of the TThriftBrpcHelperTransport,
* and that it is properly cleaned up.
* Note that no data can be written to observed buffers.
*
* COPY:
* TThriftBrpcHelperTransport will make an internal copy of the buffer.
* The caller has no responsibilities.
*
* TAKE_OWNERSHIP:
* TThriftBrpcHelperTransport will become the "owner" of the buffer,
* and will be responsible for freeing it.
* The membory must have been allocated with malloc.
*/
enum MemoryPolicy { OBSERVE = 1, COPY = 2, TAKE_OWNERSHIP = 3 };
/**
* Construct a TThriftBrpcHelperTransport with a default-sized buffer,
* owned by the TThriftBrpcHelperTransport object.
*/
TThriftBrpcHelperTransport() { initCommon(NULL, defaultSize, true, 0); }
/**
* Construct a TThriftBrpcHelperTransport with a buffer of a specified size,
* owned by the TThriftBrpcHelperTransport object.
*
* @param sz The initial size of the buffer.
*/
TThriftBrpcHelperTransport(uint32_t sz) { initCommon(NULL, sz, true, 0); }
/**
* Construct a TThriftBrpcHelperTransport with buf as its initial contents.
*
* @param buf The initial contents of the buffer.
* Note that, while buf is a non-const pointer,
* TThriftBrpcHelperTransport will not write to it if policy == OBSERVE,
* so it is safe to const_cast<uint8_t*>(whatever).
* @param sz The size of @c buf.
* @param policy See @link MemoryPolicy @endlink .
*/
TThriftBrpcHelperTransport(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) {
if (buf == NULL && sz != 0) {
throw TTransportException(TTransportException::BAD_ARGS,
"TThriftBrpcHelperTransport given null buffer with non-zero size.");
}
switch (policy) {
case OBSERVE:
case TAKE_OWNERSHIP:
initCommon(buf, sz, policy == TAKE_OWNERSHIP, sz);
break;
case COPY:
initCommon(NULL, sz, true, 0);
this->write(buf, sz);
break;
default:
throw TTransportException(TTransportException::BAD_ARGS,
"Invalid MemoryPolicy for TThriftBrpcHelperTransport");
}
}
~TThriftBrpcHelperTransport() {
if (owner_) {
std::free(buffer_);
}
}
void set_controller(brpc::Controller* cntl) {
cntl_ = cntl;
}
void set_channel(brpc::Channel* channel) {
channel_ = channel;
}
bool isOpen() { return true; }
bool peek() { return (rBase_ < wBase_); }
void open() {}
void close() {}
// TODO(dreiss): Make bufPtr const.
void getBuffer(uint8_t** bufPtr, uint32_t* sz) {
*bufPtr = rBase_;
*sz = static_cast<uint32_t>(wBase_ - rBase_);
}
std::string getBufferAsString() {
if (buffer_ == NULL) {
return "";
}
uint8_t* buf;
uint32_t sz;
getBuffer(&buf, &sz);
return std::string((char*)buf, (std::string::size_type)sz);
}
void appendBufferToString(std::string& str) {
if (buffer_ == NULL) {
return;
}
uint8_t* buf;
uint32_t sz;
getBuffer(&buf, &sz);
str.append((char*)buf, sz);
}
void resetBuffer() {
rBase_ = buffer_;
rBound_ = buffer_;
wBase_ = buffer_;
// It isn't safe to write into a buffer we don't own.
if (!owner_) {
wBound_ = wBase_;
bufferSize_ = 0;
}
}
/// See constructor documentation.
void resetBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) {
// Use a variant of the copy-and-swap trick for assignment operators.
// This is sub-optimal in terms of performance for two reasons:
// 1/ The constructing and swapping of the (small) values
// in the temporary object takes some time, and is not necessary.
// 2/ If policy == COPY, we allocate the new buffer before
// freeing the old one, precluding the possibility of
// reusing that memory.
// I doubt that either of these problems could be optimized away,
// but the second is probably no a common case, and the first is minor.
// I don't expect resetBuffer to be a common operation, so I'm willing to
// bite the performance bullet to make the method this simple.
// Construct the new buffer.
TThriftBrpcHelperTransport new_buffer(buf, sz, policy);
// Move it into ourself.
this->swap(new_buffer);
// Our old self gets destroyed.
}
/// See constructor documentation.
void resetBuffer(uint32_t sz) {
// Construct the new buffer.
TThriftBrpcHelperTransport new_buffer(sz);
// Move it into ourself.
this->swap(new_buffer);
// Our old self gets destroyed.
}
std::string readAsString(uint32_t len) {
std::string str;
(void)readAppendToString(str, len);
return str;
}
uint32_t readAppendToString(std::string& str, uint32_t len);
// return number of bytes read
uint32_t readEnd();
// Return number of bytes written
uint32_t writeEnd();
uint32_t available_read() const {
// Remember, wBase_ is the real rBound_.
return static_cast<uint32_t>(wBase_ - rBase_);
}
uint32_t available_write() const { return static_cast<uint32_t>(wBound_ - wBase_); }
// Returns a pointer to where the client can write data to append to
// the TThriftBrpcHelperTransport, and ensures the buffer is big enough to accommodate a
// write of the provided length. The returned pointer is very convenient for
// passing to read(), recv(), or similar. You must call wroteBytes() as soon
// as data is written or the buffer will not be aware that data has changed.
uint8_t* getWritePtr(uint32_t len) {
ensureCanWrite(len);
return wBase_;
}
// Informs the buffer that the client has written 'len' bytes into storage
// that had been provided by getWritePtr().
void wroteBytes(uint32_t len);
/*
* TVirtualTransport provides a default implementation of readAll().
* We want to use the TBufferBase version instead.
*/
uint32_t readAll(uint8_t* buf, uint32_t len) { return TBufferBase::readAll(buf, len); }
protected:
void swap(TThriftBrpcHelperTransport& that) {
using std::swap;
swap(buffer_, that.buffer_);
swap(bufferSize_, that.bufferSize_);
swap(rBase_, that.rBase_);
swap(rBound_, that.rBound_);
swap(wBase_, that.wBase_);
swap(wBound_, that.wBound_);
swap(owner_, that.owner_);
}
// Make sure there's at least 'len' bytes available for writing.
void ensureCanWrite(uint32_t len);
// Compute the position and available data for reading.
void computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give);
uint32_t readSlow(uint8_t* buf, uint32_t len);
void writeSlow(const uint8_t* buf, uint32_t len);
const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len);
// Data buffer
uint8_t* buffer_;
// Allocated buffer size
uint32_t bufferSize_;
// Is this object the owner of the buffer?
bool owner_;
brpc::Controller* cntl_;
brpc::Channel* channel_;
// Don't forget to update constrctors, initCommon, and swap if
// you add new members.
};
}
}
} // apache::thrift::transport
#endif // #ifndef _THRIFT_BRPC_HELPER_TRANSPORT_H_
...@@ -51,7 +51,7 @@ int main(int argc, char **argv) { ...@@ -51,7 +51,7 @@ int main(int argc, char **argv) {
LOG(INFO) << "Req: " << req.data LOG(INFO) << "Req: " << req.data
<< "Res: " << res.data; << "Res: " << res.data;
sleep(1); //sleep(1);
} }
transport->close(); transport->close();
......
File mode changed from 100755 to 100644
...@@ -18,67 +18,26 @@ ...@@ -18,67 +18,26 @@
#include <boost/make_shared.hpp> #include <boost/make_shared.hpp>
#include "thrift_brpc_helper_transport.h"
#include <thrift/transport/TBufferTransports.h> #include <thrift/transport/TBufferTransports.h>
#include <thrift/protocol/TBinaryProtocol.h> #include <thrift/protocol/TBinaryProtocol.h>
template <class T>
class BrpcThriftClient {
public:
BrpcThriftClient() {
out_buffer_ = boost::make_shared<apache::thrift::transport::TMemoryBuffer>();
out_ = boost::make_shared<apache::thrift::protocol::TBinaryProtocol>(out_buffer_);
in_buffer_ = boost::make_shared<apache::thrift::transport::TMemoryBuffer>();
in_ = boost::make_shared<apache::thrift::protocol::TBinaryProtocol>(in_buffer_);
client_ = boost::make_shared<T>(in_, out_);
}
boost::shared_ptr<T> get_thrift_client() {
return client_;
}
void call_method(brpc::Channel* channel, brpc::Controller* cntl) {
brpc::ThriftBinaryMessage request;
brpc::ThriftBinaryMessage response;
in_buffer_->resetBuffer();
butil::IOBuf buf;
buf.append(out_buffer_->getBufferAsString());
request.body = buf;
// send the request the server
// Because `done'(last parameter) is NULL, this function waits until
// the response comes back or error occurs(including timedout).
channel->CallMethod(NULL, cntl, &request, &response, NULL);
if (!cntl->Failed()) {
size_t body_len = response.head.body_len;
uint8_t* thrift_buffer = (uint8_t*)malloc(body_len);
const size_t k = response.body.copy_to(thrift_buffer, body_len);
if ( k != body_len) {
free(thrift_buffer);
cntl->SetFailed("copy response buf failed!");
return;
}
in_buffer_->resetBuffer(thrift_buffer, body_len);
}
free(thrift_buffer);
return;
}
private:
boost::shared_ptr<T> client_;
boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> out_buffer_;
boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> in_buffer_;
boost::shared_ptr<apache::thrift::protocol::TBinaryProtocol> in_;
boost::shared_ptr<apache::thrift::protocol::TBinaryProtocol> out_;
};
template <typename T>
boost::shared_ptr<T> InitThriftClient(brpc::Channel* channel,
apache::thrift::transport::TThriftBrpcHelperTransport** transport) {
auto thrift_brpc_transport =
boost::make_shared<apache::thrift::transport::TThriftBrpcHelperTransport>();
thrift_brpc_transport->set_channel(channel);
*transport = thrift_brpc_transport.get();
auto out = boost::make_shared<apache::thrift::protocol::TBinaryProtocol>(thrift_brpc_transport);
auto in = boost::make_shared<apache::thrift::protocol::TBinaryProtocol>(thrift_brpc_transport);
return boost::make_shared<T>(in, out);
}
bool brpc_thrift_server_helper(const brpc::ThriftBinaryMessage& request, bool brpc_thrift_server_helper(const brpc::ThriftBinaryMessage& request,
brpc::ThriftBinaryMessage* response, brpc::ThriftBinaryMessage* response,
...@@ -95,8 +54,8 @@ bool brpc_thrift_server_helper(const brpc::ThriftBinaryMessage& request, ...@@ -95,8 +54,8 @@ bool brpc_thrift_server_helper(const brpc::ThriftBinaryMessage& request,
// Cut the thrift buffer and parse thrift message // Cut the thrift buffer and parse thrift message
size_t body_len = request.head.body_len; size_t body_len = request.head.body_len;
//std::shared_ptr<uint8_t> thrift_buffer(new uint8_t[10],std::default_delete<uint8_t[]>());
uint8_t* thrift_buffer = (uint8_t*)malloc(body_len); uint8_t* thrift_buffer = (uint8_t*)malloc(body_len);
const size_t k = request.body.copy_to(thrift_buffer, body_len); const size_t k = request.body.copy_to(thrift_buffer, body_len);
if ( k != body_len) { if ( k != body_len) {
free(thrift_buffer); free(thrift_buffer);
...@@ -106,16 +65,14 @@ bool brpc_thrift_server_helper(const brpc::ThriftBinaryMessage& request, ...@@ -106,16 +65,14 @@ bool brpc_thrift_server_helper(const brpc::ThriftBinaryMessage& request,
in_buffer->resetBuffer(thrift_buffer, body_len); in_buffer->resetBuffer(thrift_buffer, body_len);
if (processor->process(in, out, NULL)) { if (processor->process(in, out, NULL)) {
butil::IOBuf buf; response->body.append(out_buffer->getBufferAsString());
std::string s = out_buffer->getBufferAsString();
buf.append(s);
response->body = buf;
} else { } else {
free(thrift_buffer); free(thrift_buffer);
return false; return false;
} }
free(thrift_buffer); free(thrift_buffer);
return true; return true;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment