Commit 802639e4 authored by Sumant Tambe's avatar Sumant Tambe Committed by Wouter van Oortmerssen

Efficient Conversion of a FlatBufferBuilder to a MessageBuilder (#4980)

* Efficient conversion of FlatBufferBuilder to grpc::MessageBuilder

* Added a variety of tests to validate correctness of the MessageBuilder move operations.
Disable MessageBuilder half-n-half tests on MacOS.

* Fix failing Android build

* Generalized the MessageBuilder move constructor to accept a deallocator
parent ad8b1e5d
...@@ -23,6 +23,9 @@ ...@@ -23,6 +23,9 @@
#include "test_assert.h" #include "test_assert.h"
using namespace MyGame::Example; using namespace MyGame::Example;
using flatbuffers::grpc::MessageBuilder;
using flatbuffers::FlatBufferBuilder;
void message_builder_tests(); void message_builder_tests();
// The callback implementation of our server, that derives from the generated // The callback implementation of our server, that derives from the generated
...@@ -46,9 +49,9 @@ class ServiceImpl final : public MyGame::Example::MonsterStorage::Service { ...@@ -46,9 +49,9 @@ class ServiceImpl final : public MyGame::Example::MonsterStorage::Service {
const flatbuffers::grpc::Message<Stat> *request, const flatbuffers::grpc::Message<Stat> *request,
::grpc::ServerWriter<flatbuffers::grpc::Message<Monster>> *writer) ::grpc::ServerWriter<flatbuffers::grpc::Message<Monster>> *writer)
override { override {
for (int i = 0; i < 10; i++) { for (int i = 0; i < 5; i++) {
fbb_.Clear(); fbb_.Clear();
// Create 10 monsters for resposne. // Create 5 monsters for resposne.
auto monster_offset = auto monster_offset =
CreateMonster(fbb_, 0, 0, 0, CreateMonster(fbb_, 0, 0, 0,
fbb_.CreateString(request->GetRoot()->id()->str() + fbb_.CreateString(request->GetRoot()->id()->str() +
...@@ -94,26 +97,15 @@ void RunServer() { ...@@ -94,26 +97,15 @@ void RunServer() {
server_instance->Wait(); server_instance->Wait();
} }
int grpc_server_test() { template <class Builder>
// Launch server. void StoreRPC(MonsterStorage::Stub *stub) {
std::thread server_thread(RunServer); Builder fbb;
// wait for server to spin up.
std::unique_lock<std::mutex> lock(wait_for_server);
while (!server_instance) server_instance_cv.wait(lock);
// Now connect the client.
auto channel = grpc::CreateChannel("localhost:50051",
grpc::InsecureChannelCredentials());
auto stub = MyGame::Example::MonsterStorage::NewStub(channel);
flatbuffers::grpc::MessageBuilder fbb;
{
grpc::ClientContext context; grpc::ClientContext context;
// Build a request with the name set. // Build a request with the name set.
auto monster_offset = CreateMonster(fbb, 0, 0, 0, fbb.CreateString("Fred")); auto monster_offset = CreateMonster(fbb, 0, 0, 0, fbb.CreateString("Fred"));
fbb.Finish(monster_offset); MessageBuilder mb(std::move(fbb));
auto request = fbb.ReleaseMessage<Monster>(); mb.Finish(monster_offset);
auto request = mb.ReleaseMessage<Monster>();
flatbuffers::grpc::Message<Stat> response; flatbuffers::grpc::Message<Stat> response;
// The actual RPC. // The actual RPC.
...@@ -125,13 +117,16 @@ int grpc_server_test() { ...@@ -125,13 +117,16 @@ int grpc_server_test() {
} else { } else {
std::cout << "RPC failed" << std::endl; std::cout << "RPC failed" << std::endl;
} }
} }
{
template <class Builder>
void RetrieveRPC(MonsterStorage::Stub *stub) {
Builder fbb;
grpc::ClientContext context; grpc::ClientContext context;
fbb.Clear(); fbb.Clear();
auto stat_offset = CreateStat(fbb, fbb.CreateString("Fred")); auto stat_offset = CreateStat(fbb, fbb.CreateString("Fred"));
fbb.Finish(stat_offset); fbb.Finish(stat_offset);
auto request = fbb.ReleaseMessage<Stat>(); auto request = MessageBuilder(std::move(fbb)).ReleaseMessage<Stat>();
flatbuffers::grpc::Message<Monster> response; flatbuffers::grpc::Message<Monster> response;
auto stream = stub->Retrieve(&context, request); auto stream = stub->Retrieve(&context, request);
...@@ -139,7 +134,27 @@ int grpc_server_test() { ...@@ -139,7 +134,27 @@ int grpc_server_test() {
auto resp = response.GetRoot()->name(); auto resp = response.GetRoot()->name();
std::cout << "RPC Streaming response: " << resp->str() << std::endl; std::cout << "RPC Streaming response: " << resp->str() << std::endl;
} }
} }
int grpc_server_test() {
// Launch server.
std::thread server_thread(RunServer);
// wait for server to spin up.
std::unique_lock<std::mutex> lock(wait_for_server);
while (!server_instance) server_instance_cv.wait(lock);
// Now connect the client.
auto channel = grpc::CreateChannel("localhost:50051",
grpc::InsecureChannelCredentials());
auto stub = MyGame::Example::MonsterStorage::NewStub(channel);
StoreRPC<MessageBuilder>(stub.get());
StoreRPC<FlatBufferBuilder>(stub.get());
RetrieveRPC<MessageBuilder>(stub.get());
RetrieveRPC<FlatBufferBuilder>(stub.get());
#if !FLATBUFFERS_GRPC_DISABLE_AUTO_VERIFICATION #if !FLATBUFFERS_GRPC_DISABLE_AUTO_VERIFICATION
{ {
......
This diff is collapsed.
...@@ -425,6 +425,10 @@ class DefaultAllocator : public Allocator { ...@@ -425,6 +425,10 @@ class DefaultAllocator : public Allocator {
void deallocate(uint8_t *p, size_t) FLATBUFFERS_OVERRIDE { void deallocate(uint8_t *p, size_t) FLATBUFFERS_OVERRIDE {
delete[] p; delete[] p;
} }
static void dealloc(void *p, size_t) {
delete[] static_cast<uint8_t *>(p);
}
}; };
// These functions allow for a null allocator to mean use the default allocator, // These functions allow for a null allocator to mean use the default allocator,
...@@ -549,7 +553,7 @@ class DetachedBuffer { ...@@ -549,7 +553,7 @@ class DetachedBuffer {
#endif // !defined(FLATBUFFERS_CPP98_STL) #endif // !defined(FLATBUFFERS_CPP98_STL)
// clang-format on // clang-format on
protected: protected:
Allocator *allocator_; Allocator *allocator_;
bool own_allocator_; bool own_allocator_;
uint8_t *buf_; uint8_t *buf_;
...@@ -986,7 +990,7 @@ class FlatBufferBuilder { ...@@ -986,7 +990,7 @@ class FlatBufferBuilder {
/// `FlatBuffer` starts. /// `FlatBuffer` starts.
/// @return A raw pointer to the start of the memory block containing /// @return A raw pointer to the start of the memory block containing
/// the serialized `FlatBuffer`. /// the serialized `FlatBuffer`.
/// @remark If the allocator is owned, it gets deleted during this call. /// @remark If the allocator is owned, it gets deleted when the destructor is called..
uint8_t *ReleaseRaw(size_t &size, size_t &offset) { uint8_t *ReleaseRaw(size_t &size, size_t &offset) {
Finished(); Finished();
return buf_.release_raw(size, offset); return buf_.release_raw(size, offset);
...@@ -1759,7 +1763,12 @@ class FlatBufferBuilder { ...@@ -1759,7 +1763,12 @@ class FlatBufferBuilder {
Finish(root.o, file_identifier, true); Finish(root.o, file_identifier, true);
} }
protected: void SwapBufAllocator(FlatBufferBuilder &other) {
buf_.swap_allocator(other.buf_);
}
protected:
// You shouldn't really be copying instances of this class. // You shouldn't really be copying instances of this class.
FlatBufferBuilder(const FlatBufferBuilder &); FlatBufferBuilder(const FlatBufferBuilder &);
FlatBufferBuilder &operator=(const FlatBufferBuilder &); FlatBufferBuilder &operator=(const FlatBufferBuilder &);
......
...@@ -175,6 +175,30 @@ class MessageBuilder : private detail::SliceAllocatorMember, ...@@ -175,6 +175,30 @@ class MessageBuilder : private detail::SliceAllocatorMember,
Swap(other); Swap(other);
} }
/// Create a MessageBuilder from a FlatBufferBuilder.
explicit MessageBuilder(FlatBufferBuilder &&src, void (*dealloc)(void*, size_t) = &DefaultAllocator::dealloc)
: FlatBufferBuilder(1024, &slice_allocator_, false) {
src.Swap(*this);
src.SwapBufAllocator(*this);
if (buf_.capacity()) {
uint8_t *buf = buf_.scratch_data(); // pointer to memory
size_t capacity = buf_.capacity(); // size of memory
slice_allocator_.slice_ = grpc_slice_new_with_len(buf, capacity, dealloc);
}
else {
slice_allocator_.slice_ = grpc_empty_slice();
}
}
/// Move-assign a FlatBufferBuilder to a MessageBuilder.
/// Only FlatBufferBuilder with default allocator (basically, nullptr) is supported.
MessageBuilder &operator=(FlatBufferBuilder &&src) {
// Move construct a temporary and swap
MessageBuilder temp(std::move(src));
Swap(temp);
return *this;
}
MessageBuilder &operator=(MessageBuilder &&other) { MessageBuilder &operator=(MessageBuilder &&other) {
// Move construct a temporary and swap // Move construct a temporary and swap
MessageBuilder temp(std::move(other)); MessageBuilder temp(std::move(other));
......
...@@ -129,7 +129,9 @@ bool release_n_verify(flatbuffers::FlatBufferBuilder &fbb, const std::string &ex ...@@ -129,7 +129,9 @@ bool release_n_verify(flatbuffers::FlatBufferBuilder &fbb, const std::string &ex
} }
void FlatBufferBuilderTest() { void FlatBufferBuilderTest() {
BuilderTests<flatbuffers::FlatBufferBuilder>::all_tests(); using flatbuffers::FlatBufferBuilder;
BuilderTests<FlatBufferBuilder>::all_tests();
BuilderTests<TestHeapBuilder>::all_tests(); BuilderTests<TestHeapBuilder>::all_tests();
BuilderTests<GrpcLikeMessageBuilder>::all_tests(); BuilderTests<GrpcLikeMessageBuilder>::all_tests();
...@@ -140,7 +142,7 @@ void FlatBufferBuilderTest() { ...@@ -140,7 +142,7 @@ void FlatBufferBuilderTest() {
REUSABLE_AFTER_RELEASE_RAW_AND_MOVE_ASSIGN REUSABLE_AFTER_RELEASE_RAW_AND_MOVE_ASSIGN
}; };
BuilderReuseTests<flatbuffers::FlatBufferBuilder>::run_tests(TestSelector(tests, tests+4)); BuilderReuseTests<FlatBufferBuilder, FlatBufferBuilder>::run_tests(TestSelector(tests, tests+4));
BuilderReuseTests<TestHeapBuilder>::run_tests(TestSelector(tests, tests+4)); BuilderReuseTests<TestHeapBuilder, TestHeapBuilder>::run_tests(TestSelector(tests, tests+4));
BuilderReuseTests<GrpcLikeMessageBuilder>::run_tests(TestSelector(tests, tests+4)); BuilderReuseTests<GrpcLikeMessageBuilder, GrpcLikeMessageBuilder>::run_tests(TestSelector(tests, tests+4));
} }
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment