// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors // Licensed under the MIT License: // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. #include "common.h" #include <google/protobuf/io/zero_copy_stream_impl.h> #include <google/protobuf/io/coded_stream.h> #include <thread> #if HAVE_SNAPPY #include <snappy/snappy.h> #include <snappy/snappy-sinksource.h> #endif // HAVE_SNAPPY namespace capnp { namespace benchmark { namespace protobuf { // ======================================================================================= struct SingleUseMessages { template <typename MessageType> struct Message { struct Reusable {}; struct SingleUse: public MessageType { inline SingleUse(Reusable&) {} }; }; struct ReusableString {}; struct SingleUseString: std::string { inline SingleUseString(ReusableString&) {} }; template <typename MessageType> static inline void doneWith(MessageType& message) { // Don't clear -- single-use. } }; struct ReusableMessages { template <typename MessageType> struct Message { struct Reusable: public MessageType {}; typedef MessageType& SingleUse; }; typedef std::string ReusableString; typedef std::string& SingleUseString; template <typename MessageType> static inline void doneWith(MessageType& message) { message.Clear(); } }; // ======================================================================================= // The protobuf Java library defines a format for writing multiple protobufs to a stream, in which // each message is prefixed by a varint size. This was never added to the C++ library. It's easy // to do naively, but tricky to implement without accidentally losing various optimizations. These // two functions should be optimal. struct Uncompressed { typedef google::protobuf::io::FileInputStream InputStream; typedef google::protobuf::io::FileOutputStream OutputStream; static uint64_t write(const google::protobuf::MessageLite& message, google::protobuf::io::FileOutputStream* rawOutput) { google::protobuf::io::CodedOutputStream output(rawOutput); const int size = message.ByteSize(); output.WriteVarint32(size); uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size); if (buffer != NULL) { message.SerializeWithCachedSizesToArray(buffer); } else { message.SerializeWithCachedSizes(&output); if (output.HadError()) { throw OsException(rawOutput->GetErrno()); } } return size; } static void read(google::protobuf::io::ZeroCopyInputStream* rawInput, google::protobuf::MessageLite* message) { google::protobuf::io::CodedInputStream input(rawInput); uint32_t size; GOOGLE_CHECK(input.ReadVarint32(&size)); auto limit = input.PushLimit(size); GOOGLE_CHECK(message->MergePartialFromCodedStream(&input) && input.ConsumedEntireMessage()); input.PopLimit(limit); } static void flush(google::protobuf::io::FileOutputStream* output) { if (!output->Flush()) throw OsException(output->GetErrno()); } }; // ======================================================================================= // The Snappy interface is really obnoxious. I gave up here and am just reading/writing flat // arrays in some static scratch space. This probably gives protobufs an edge that it doesn't // deserve. #if HAVE_SNAPPY static char scratch[1 << 20]; static char scratch2[1 << 20]; struct SnappyCompressed { typedef int InputStream; typedef int OutputStream; static uint64_t write(const google::protobuf::MessageLite& message, int* output) { size_t size = message.ByteSize(); GOOGLE_CHECK_LE(size, sizeof(scratch)); message.SerializeWithCachedSizesToArray(reinterpret_cast<uint8_t*>(scratch)); size_t compressedSize = 0; snappy::RawCompress(scratch, size, scratch2 + sizeof(uint32_t), &compressedSize); uint32_t tag = compressedSize; memcpy(scratch2, &tag, sizeof(tag)); writeAll(*output, scratch2, compressedSize + sizeof(tag)); return compressedSize + sizeof(tag); } static void read(int* input, google::protobuf::MessageLite* message) { uint32_t size; readAll(*input, &size, sizeof(size)); readAll(*input, scratch, size); size_t uncompressedSize; GOOGLE_CHECK(snappy::GetUncompressedLength(scratch, size, &uncompressedSize)); GOOGLE_CHECK(snappy::RawUncompress(scratch, size, scratch2)); GOOGLE_CHECK(message->ParsePartialFromArray(scratch2, uncompressedSize)); } static void flush(OutputStream*) {} }; #endif // HAVE_SNAPPY // ======================================================================================= #define REUSABLE(type) \ typename ReuseStrategy::template Message<typename TestCase::type>::Reusable #define SINGLE_USE(type) \ typename ReuseStrategy::template Message<typename TestCase::type>::SingleUse template <typename TestCase, typename ReuseStrategy, typename Compression> struct BenchmarkMethods { static uint64_t syncClient(int inputFd, int outputFd, uint64_t iters) { uint64_t throughput = 0; typename Compression::OutputStream output(outputFd); typename Compression::InputStream input(inputFd); REUSABLE(Request) reusableRequest; REUSABLE(Response) reusableResponse; for (; iters > 0; --iters) { SINGLE_USE(Request) request(reusableRequest); typename TestCase::Expectation expected = TestCase::setupRequest(&request); throughput += Compression::write(request, &output); Compression::flush(&output); ReuseStrategy::doneWith(request); SINGLE_USE(Response) response(reusableResponse); Compression::read(&input, &response); if (!TestCase::checkResponse(response, expected)) { throw std::logic_error("Incorrect response."); } ReuseStrategy::doneWith(response); } return throughput; } static uint64_t asyncClientSender( int outputFd, ProducerConsumerQueue<typename TestCase::Expectation>* expectations, uint64_t iters) { uint64_t throughput = 0; typename Compression::OutputStream output(outputFd); REUSABLE(Request) reusableRequest; for (; iters > 0; --iters) { SINGLE_USE(Request) request(reusableRequest); expectations->post(TestCase::setupRequest(&request)); throughput += Compression::write(request, &output); Compression::flush(&output); ReuseStrategy::doneWith(request); } return throughput; } static void asyncClientReceiver( int inputFd, ProducerConsumerQueue<typename TestCase::Expectation>* expectations, uint64_t iters) { typename Compression::InputStream input(inputFd); REUSABLE(Response) reusableResponse; for (; iters > 0; --iters) { typename TestCase::Expectation expected = expectations->next(); SINGLE_USE(Response) response(reusableResponse); Compression::read(&input, &response); if (!TestCase::checkResponse(response, expected)) { throw std::logic_error("Incorrect response."); } ReuseStrategy::doneWith(response); } } static uint64_t asyncClient(int inputFd, int outputFd, uint64_t iters) { ProducerConsumerQueue<typename TestCase::Expectation> expectations; std::thread receiverThread(asyncClientReceiver, inputFd, &expectations, iters); uint64_t throughput = asyncClientSender(outputFd, &expectations, iters); receiverThread.join(); return throughput; } static uint64_t server(int inputFd, int outputFd, uint64_t iters) { uint64_t throughput = 0; typename Compression::OutputStream output(outputFd); typename Compression::InputStream input(inputFd); REUSABLE(Request) reusableRequest; REUSABLE(Response) reusableResponse; for (; iters > 0; --iters) { SINGLE_USE(Request) request(reusableRequest); Compression::read(&input, &request); SINGLE_USE(Response) response(reusableResponse); TestCase::handleRequest(request, &response); ReuseStrategy::doneWith(request); throughput += Compression::write(response, &output); Compression::flush(&output); ReuseStrategy::doneWith(response); } return throughput; } static uint64_t passByObject(uint64_t iters, bool countObjectSize) { uint64_t throughput = 0; REUSABLE(Request) reusableRequest; REUSABLE(Response) reusableResponse; for (; iters > 0; --iters) { SINGLE_USE(Request) request(reusableRequest); typename TestCase::Expectation expected = TestCase::setupRequest(&request); SINGLE_USE(Response) response(reusableResponse); TestCase::handleRequest(request, &response); ReuseStrategy::doneWith(request); if (!TestCase::checkResponse(response, expected)) { throw std::logic_error("Incorrect response."); } ReuseStrategy::doneWith(response); if (countObjectSize) { throughput += request.SpaceUsed(); throughput += response.SpaceUsed(); } } return throughput; } static uint64_t passByBytes(uint64_t iters) { uint64_t throughput = 0; REUSABLE(Request) reusableClientRequest; REUSABLE(Request) reusableServerRequest; REUSABLE(Response) reusableServerResponse; REUSABLE(Response) reusableClientResponse; typename ReuseStrategy::ReusableString reusableRequestString, reusableResponseString; for (; iters > 0; --iters) { SINGLE_USE(Request) clientRequest(reusableClientRequest); typename TestCase::Expectation expected = TestCase::setupRequest(&clientRequest); typename ReuseStrategy::SingleUseString requestString(reusableRequestString); clientRequest.SerializePartialToString(&requestString); throughput += requestString.size(); ReuseStrategy::doneWith(clientRequest); SINGLE_USE(Request) serverRequest(reusableServerRequest); serverRequest.ParsePartialFromString(requestString); SINGLE_USE(Response) serverResponse(reusableServerResponse); TestCase::handleRequest(serverRequest, &serverResponse); ReuseStrategy::doneWith(serverRequest); typename ReuseStrategy::SingleUseString responseString(reusableResponseString); serverResponse.SerializePartialToString(&responseString); throughput += responseString.size(); ReuseStrategy::doneWith(serverResponse); SINGLE_USE(Response) clientResponse(reusableClientResponse); clientResponse.ParsePartialFromString(responseString); if (!TestCase::checkResponse(clientResponse, expected)) { throw std::logic_error("Incorrect response."); } ReuseStrategy::doneWith(clientResponse); } return throughput; } }; struct BenchmarkTypes { typedef protobuf::Uncompressed Uncompressed; typedef protobuf::Uncompressed Packed; #if HAVE_SNAPPY typedef protobuf::SnappyCompressed SnappyCompressed; #endif // HAVE_SNAPPY typedef protobuf::ReusableMessages ReusableResources; typedef protobuf::SingleUseMessages SingleUseResources; template <typename TestCase, typename ReuseStrategy, typename Compression> struct BenchmarkMethods : public protobuf::BenchmarkMethods<TestCase, ReuseStrategy, Compression> {}; }; } // namespace protobuf } // namespace benchmark } // namespace capnp