Commit 178b5fdd authored by Kenton Varda's avatar Kenton Varda

Snappy serialization.

parent 2bcb6e69
...@@ -4,13 +4,13 @@ all: ...@@ -4,13 +4,13 @@ all:
echo "You probably accidentally told Eclipse to build. Stopping." echo "You probably accidentally told Eclipse to build. Stopping."
once: once:
CXX=g++-4.7 CXXFLAGS='-std=gnu++0x -O2 -Wall' LIBS='-lz -pthread' ekam -j6 CXX=g++-4.7 CXXFLAGS='-std=gnu++0x -O2 -DNDEBUG -Wall' LIBS='-lz -pthread' ekam -j6
continuous: continuous:
CXX=g++-4.7 CXXFLAGS='-std=gnu++0x -g -Wall' LIBS='-lz -pthread' ekam -j6 -c -n :51315 CXX=g++-4.7 CXXFLAGS='-std=gnu++0x -g -Wall' LIBS='-lz -pthread' ekam -j6 -c -n :51315
continuous-opt: continuous-opt:
CXX=g++-4.7 CXXFLAGS='-std=gnu++0x -O2 -Wall' LIBS='-lz -pthread' ekam -j6 -c -n :51315 CXX=g++-4.7 CXXFLAGS='-std=gnu++0x -O2 -DNDEBUG -Wall' LIBS='-lz -pthread' ekam -j6 -c -n :51315
clean: clean:
rm -rf bin lib tmp rm -rf bin lib tmp
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include "benchmark.capnp.h" #include "benchmark.capnp.h"
#include <capnproto/serialize.h> #include <capnproto/serialize.h>
#include <capnproto/serialize-snappy.h>
#include <unistd.h> #include <unistd.h>
#include <stdlib.h> #include <stdlib.h>
#include <inttypes.h> #include <inttypes.h>
...@@ -185,13 +186,53 @@ public: ...@@ -185,13 +186,53 @@ public:
// ======================================================================================= // =======================================================================================
class CountingOutputStream: public FdOutputStream {
public:
CountingOutputStream(int fd): FdOutputStream(fd), throughput(0) {}
uint64_t throughput;
void write(const void* buffer, size_t size) override {
FdOutputStream::write(buffer, size);
throughput += size;
}
void write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
FdOutputStream::write(pieces);
for (auto& piece: pieces) {
throughput += piece.size();
}
}
};
// =======================================================================================
struct Uncompressed {
typedef StreamFdMessageReader MessageReader;
static inline void write(OutputStream& output, MessageBuilder& builder) {
writeMessage(output, builder);
}
};
struct SnappyCompressed {
typedef SnappyFdMessageReader MessageReader;
static inline void write(OutputStream& output, MessageBuilder& builder) {
writeSnappyMessage(output, builder);
}
};
// =======================================================================================
template <typename Compression>
struct NoScratch { struct NoScratch {
struct ScratchSpace {}; struct ScratchSpace {};
class MessageReader: public StreamFdMessageReader { class MessageReader: public Compression::MessageReader {
public: public:
inline MessageReader(int fd, ScratchSpace& scratch) inline MessageReader(int fd, ScratchSpace& scratch)
: StreamFdMessageReader(fd) {} : Compression::MessageReader(fd) {}
}; };
class MessageBuilder: public MallocMessageBuilder { class MessageBuilder: public MallocMessageBuilder {
...@@ -200,16 +241,16 @@ struct NoScratch { ...@@ -200,16 +241,16 @@ struct NoScratch {
}; };
}; };
template <size_t size> template <typename Compression, size_t size>
struct UseScratch { struct UseScratch {
struct ScratchSpace { struct ScratchSpace {
word words[size]; word words[size];
}; };
class MessageReader: public StreamFdMessageReader { class MessageReader: public Compression::MessageReader {
public: public:
inline MessageReader(int fd, ScratchSpace& scratch) inline MessageReader(int fd, ScratchSpace& scratch)
: StreamFdMessageReader(fd, ReaderOptions(), arrayPtr(scratch.words, size)) {} : Compression::MessageReader(fd, ReaderOptions(), arrayPtr(scratch.words, size)) {}
}; };
class MessageBuilder: public MallocMessageBuilder { class MessageBuilder: public MallocMessageBuilder {
...@@ -221,8 +262,9 @@ struct UseScratch { ...@@ -221,8 +262,9 @@ struct UseScratch {
// ======================================================================================= // =======================================================================================
template <typename TestCase, typename ReuseStrategy> template <typename TestCase, typename ReuseStrategy, typename Compression>
void syncClient(int inputFd, int outputFd, uint64_t iters) { uint64_t syncClient(int inputFd, int outputFd, uint64_t iters) {
CountingOutputStream output(outputFd);
typename ReuseStrategy::ScratchSpace scratch; typename ReuseStrategy::ScratchSpace scratch;
for (; iters > 0; --iters) { for (; iters > 0; --iters) {
...@@ -231,7 +273,7 @@ void syncClient(int inputFd, int outputFd, uint64_t iters) { ...@@ -231,7 +273,7 @@ void syncClient(int inputFd, int outputFd, uint64_t iters) {
typename ReuseStrategy::MessageBuilder builder(scratch); typename ReuseStrategy::MessageBuilder builder(scratch);
expected = TestCase::setupRequest( expected = TestCase::setupRequest(
builder.template initRoot<typename TestCase::Request>()); builder.template initRoot<typename TestCase::Request>());
writeMessageToFd(outputFd, builder); Compression::write(output, builder);
} }
{ {
...@@ -242,23 +284,28 @@ void syncClient(int inputFd, int outputFd, uint64_t iters) { ...@@ -242,23 +284,28 @@ void syncClient(int inputFd, int outputFd, uint64_t iters) {
} }
} }
} }
return output.throughput;
} }
template <typename TestCase, typename ReuseStrategy> template <typename TestCase, typename ReuseStrategy, typename Compression>
void asyncClientSender(int outputFd, uint64_t asyncClientSender(int outputFd,
ProducerConsumerQueue<typename TestCase::Expectation>* expectations, ProducerConsumerQueue<typename TestCase::Expectation>* expectations,
uint64_t iters) { uint64_t iters) {
CountingOutputStream output(outputFd);
typename ReuseStrategy::ScratchSpace scratch; typename ReuseStrategy::ScratchSpace scratch;
for (; iters > 0; --iters) { for (; iters > 0; --iters) {
typename ReuseStrategy::MessageBuilder builder(scratch); typename ReuseStrategy::MessageBuilder builder(scratch);
expectations->post(TestCase::setupRequest( expectations->post(TestCase::setupRequest(
builder.template initRoot<typename TestCase::Request>())); builder.template initRoot<typename TestCase::Request>()));
writeMessageToFd(outputFd, builder); Compression::write(output, builder);
} }
return output.throughput;
} }
template <typename TestCase, typename ReuseStrategy> template <typename TestCase, typename ReuseStrategy, typename Compression>
void asyncClientReceiver(int inputFd, void asyncClientReceiver(int inputFd,
ProducerConsumerQueue<typename TestCase::Expectation>* expectations, ProducerConsumerQueue<typename TestCase::Expectation>* expectations,
uint64_t iters) { uint64_t iters) {
...@@ -274,17 +321,20 @@ void asyncClientReceiver(int inputFd, ...@@ -274,17 +321,20 @@ void asyncClientReceiver(int inputFd,
} }
} }
template <typename TestCase, typename ReuseStrategy> template <typename TestCase, typename ReuseStrategy, typename Compression>
void asyncClient(int inputFd, int outputFd, uint64_t iters) { uint64_t asyncClient(int inputFd, int outputFd, uint64_t iters) {
ProducerConsumerQueue<typename TestCase::Expectation> expectations; ProducerConsumerQueue<typename TestCase::Expectation> expectations;
std::thread receiverThread( std::thread receiverThread(
asyncClientReceiver<TestCase, ReuseStrategy>, inputFd, &expectations, iters); asyncClientReceiver<TestCase, ReuseStrategy, Compression>, inputFd, &expectations, iters);
asyncClientSender<TestCase, ReuseStrategy>(outputFd, &expectations, iters); uint64_t throughput =
asyncClientSender<TestCase, ReuseStrategy, Compression>(outputFd, &expectations, iters);
receiverThread.join(); receiverThread.join();
return throughput;
} }
template <typename TestCase, typename ReuseStrategy> template <typename TestCase, typename ReuseStrategy, typename Compression>
void server(int inputFd, int outputFd, uint64_t iters) { uint64_t server(int inputFd, int outputFd, uint64_t iters) {
CountingOutputStream output(outputFd);
typename ReuseStrategy::ScratchSpace builderScratch; typename ReuseStrategy::ScratchSpace builderScratch;
typename ReuseStrategy::ScratchSpace readerScratch; typename ReuseStrategy::ScratchSpace readerScratch;
...@@ -293,12 +343,14 @@ void server(int inputFd, int outputFd, uint64_t iters) { ...@@ -293,12 +343,14 @@ void server(int inputFd, int outputFd, uint64_t iters) {
typename ReuseStrategy::MessageReader reader(inputFd, readerScratch); typename ReuseStrategy::MessageReader reader(inputFd, readerScratch);
TestCase::handleRequest(reader.template getRoot<typename TestCase::Request>(), TestCase::handleRequest(reader.template getRoot<typename TestCase::Request>(),
builder.template initRoot<typename TestCase::Response>()); builder.template initRoot<typename TestCase::Response>());
writeMessageToFd(outputFd, builder); Compression::write(output, builder);
} }
return output.throughput;
} }
template <typename TestCase, typename ReuseStrategy> template <typename TestCase, typename ReuseStrategy, typename Compression>
void passByObject(uint64_t iters) { uint64_t passByObject(uint64_t iters) {
typename ReuseStrategy::ScratchSpace requestScratch; typename ReuseStrategy::ScratchSpace requestScratch;
typename ReuseStrategy::ScratchSpace responseScratch; typename ReuseStrategy::ScratchSpace responseScratch;
...@@ -315,10 +367,13 @@ void passByObject(uint64_t iters) { ...@@ -315,10 +367,13 @@ void passByObject(uint64_t iters) {
throw std::logic_error("Incorrect response."); throw std::logic_error("Incorrect response.");
} }
} }
return 0;
} }
template <typename TestCase, typename ReuseStrategy> template <typename TestCase, typename ReuseStrategy, typename Compression>
void passByBytes(uint64_t iters) { uint64_t passByBytes(uint64_t iters) {
uint64_t throughput = 0;
typename ReuseStrategy::ScratchSpace requestScratch; typename ReuseStrategy::ScratchSpace requestScratch;
typename ReuseStrategy::ScratchSpace responseScratch; typename ReuseStrategy::ScratchSpace responseScratch;
...@@ -328,22 +383,26 @@ void passByBytes(uint64_t iters) { ...@@ -328,22 +383,26 @@ void passByBytes(uint64_t iters) {
requestBuilder.template initRoot<typename TestCase::Request>()); requestBuilder.template initRoot<typename TestCase::Request>());
Array<word> requestBytes = messageToFlatArray(requestBuilder); Array<word> requestBytes = messageToFlatArray(requestBuilder);
throughput += requestBytes.size() * sizeof(word);
FlatArrayMessageReader requestReader(requestBytes.asPtr()); FlatArrayMessageReader requestReader(requestBytes.asPtr());
typename ReuseStrategy::MessageBuilder responseBuilder(responseScratch); typename ReuseStrategy::MessageBuilder responseBuilder(responseScratch);
TestCase::handleRequest(requestReader.template getRoot<typename TestCase::Request>(), TestCase::handleRequest(requestReader.template getRoot<typename TestCase::Request>(),
responseBuilder.template initRoot<typename TestCase::Response>()); responseBuilder.template initRoot<typename TestCase::Response>());
Array<word> responseBytes = messageToFlatArray(responseBuilder); Array<word> responseBytes = messageToFlatArray(responseBuilder);
throughput += responseBytes.size() * sizeof(word);
FlatArrayMessageReader responseReader(responseBytes.asPtr()); FlatArrayMessageReader responseReader(responseBytes.asPtr());
if (!TestCase::checkResponse( if (!TestCase::checkResponse(
responseReader.template getRoot<typename TestCase::Response>(), expected)) { responseReader.template getRoot<typename TestCase::Response>(), expected)) {
throw std::logic_error("Incorrect response."); throw std::logic_error("Incorrect response.");
} }
} }
return throughput;
} }
template <typename TestCase, typename ReuseStrategy, typename Func> template <typename TestCase, typename ReuseStrategy, typename Compression, typename Func>
void passByPipe(Func&& clientFunc, uint64_t iters) { uint64_t passByPipe(Func&& clientFunc, uint64_t iters) {
int clientToServer[2]; int clientToServer[2];
int serverToClient[2]; int serverToClient[2];
if (pipe(clientToServer) < 0) throw OsException(errno); if (pipe(clientToServer) < 0) throw OsException(errno);
...@@ -355,14 +414,22 @@ void passByPipe(Func&& clientFunc, uint64_t iters) { ...@@ -355,14 +414,22 @@ void passByPipe(Func&& clientFunc, uint64_t iters) {
close(clientToServer[0]); close(clientToServer[0]);
close(serverToClient[1]); close(serverToClient[1]);
clientFunc(serverToClient[0], clientToServer[1], iters); uint64_t throughput = clientFunc(serverToClient[0], clientToServer[1], iters);
FdOutputStream(clientToServer[1]).write(&throughput, sizeof(throughput));
exit(0); exit(0);
} else { } else {
// Server. // Server.
close(clientToServer[1]); close(clientToServer[1]);
close(serverToClient[0]); close(serverToClient[0]);
server<TestCase, ReuseStrategy>(clientToServer[0], serverToClient[1], iters); uint64_t throughput =
server<TestCase, ReuseStrategy, Compression>(clientToServer[0], serverToClient[1], iters);
uint64_t clientThroughput = 0;
FdInputStream(clientToServer[0]).InputStream::read(&clientThroughput, sizeof(clientThroughput));
throughput += clientThroughput;
int status; int status;
if (waitpid(child, &status, 0) != child) { if (waitpid(child, &status, 0) != child) {
...@@ -371,52 +438,72 @@ void passByPipe(Func&& clientFunc, uint64_t iters) { ...@@ -371,52 +438,72 @@ void passByPipe(Func&& clientFunc, uint64_t iters) {
if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) { if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
throw std::logic_error("Child exited abnormally."); throw std::logic_error("Child exited abnormally.");
} }
return throughput;
} }
} }
template <typename ReuseStrategy> template <typename ReuseStrategy, typename Compression>
void doBenchmark(const std::string& mode, uint64_t iters) { uint64_t doBenchmark(const std::string& mode, uint64_t iters) {
if (mode == "client") { if (mode == "client") {
syncClient<ExpressionTestCase, ReuseStrategy>(STDIN_FILENO, STDOUT_FILENO, iters); return syncClient<ExpressionTestCase, ReuseStrategy, Compression>(
STDIN_FILENO, STDOUT_FILENO, iters);
} else if (mode == "server") { } else if (mode == "server") {
server<ExpressionTestCase, ReuseStrategy>(STDIN_FILENO, STDOUT_FILENO, iters); return server<ExpressionTestCase, ReuseStrategy, Compression>(
STDIN_FILENO, STDOUT_FILENO, iters);
} else if (mode == "object") { } else if (mode == "object") {
passByObject<ExpressionTestCase, ReuseStrategy>(iters); return passByObject<ExpressionTestCase, ReuseStrategy, Compression>(iters);
} else if (mode == "bytes") { } else if (mode == "bytes") {
passByBytes<ExpressionTestCase, ReuseStrategy>(iters); return passByBytes<ExpressionTestCase, ReuseStrategy, Compression>(iters);
} else if (mode == "pipe") { } else if (mode == "pipe") {
passByPipe<ExpressionTestCase, ReuseStrategy>( return passByPipe<ExpressionTestCase, ReuseStrategy, Compression>(
syncClient<ExpressionTestCase, ReuseStrategy>, iters); syncClient<ExpressionTestCase, ReuseStrategy, Compression>, iters);
} else if (mode == "pipe-async") { } else if (mode == "pipe-async") {
passByPipe<ExpressionTestCase, ReuseStrategy>( return passByPipe<ExpressionTestCase, ReuseStrategy, Compression>(
asyncClient<ExpressionTestCase, ReuseStrategy>, iters); asyncClient<ExpressionTestCase, ReuseStrategy, Compression>, iters);
} else { } else {
std::cerr << "Unknown mode: " << mode << std::endl; std::cerr << "Unknown mode: " << mode << std::endl;
exit(1); exit(1);
} }
} }
template <typename Compression>
uint64_t doBenchmark2(const std::string& mode, const std::string& reuse, uint64_t iters) {
if (reuse == "reuse") {
return doBenchmark<UseScratch<Compression, 1024>, Compression>(mode, iters);
} else if (reuse == "no-reuse") {
return doBenchmark<NoScratch<Compression>, Compression>(mode, iters);
} else {
std::cerr << "Unknown reuse mode: " << reuse << std::endl;
exit(1);
}
}
int main(int argc, char* argv[]) { int main(int argc, char* argv[]) {
if (argc != 4) { if (argc != 5) {
std::cerr << "USAGE: " << argv[0] << " MODE REUSE ITERATION_COUNT" << std::endl; std::cerr << "USAGE: " << argv[0] << " MODE REUSE COMPRESSION ITERATION_COUNT" << std::endl;
return 1; return 1;
} }
uint64_t iters = strtoull(argv[3], nullptr, 0); uint64_t iters = strtoull(argv[4], nullptr, 0);
srand(123); srand(123);
std::cerr << "Doing " << iters << " iterations..." << std::endl; std::cerr << "Doing " << iters << " iterations..." << std::endl;
std::string reuse = argv[2]; uint64_t throughput;
if (reuse == "reuse") {
doBenchmark<UseScratch<1024>>(argv[1], iters); std::string compression = argv[3];
} else if (reuse == "no-reuse") { if (compression == "none") {
doBenchmark<NoScratch>(argv[1], iters); throughput = doBenchmark2<Uncompressed>(argv[1], argv[2], iters);
} else if (compression == "snappy") {
throughput = doBenchmark2<SnappyCompressed>(argv[1], argv[2], iters);
} else { } else {
std::cerr << "Unknown reuse mode: " << reuse << std::endl; std::cerr << "Unknown compression mode: " << compression << std::endl;
return 1; return 1;
} }
std::cerr << "Average messages size = " << (throughput / iters) << std::endl;
return 0; return 0;
} }
......
...@@ -35,6 +35,8 @@ ...@@ -35,6 +35,8 @@
#include <sys/types.h> #include <sys/types.h>
#include <sys/wait.h> #include <sys/wait.h>
#include <semaphore.h> #include <semaphore.h>
#include <snappy/snappy.h>
#include <snappy/snappy-sinksource.h>
namespace capnproto { namespace capnproto {
namespace benchmark { namespace benchmark {
...@@ -225,40 +227,109 @@ struct ReusableMessages { ...@@ -225,40 +227,109 @@ struct ReusableMessages {
// to do naively, but tricky to implement without accidentally losing various optimizations. These // to do naively, but tricky to implement without accidentally losing various optimizations. These
// two functions should be optimal. // two functions should be optimal.
void writeDelimited(const google::protobuf::MessageLite& message, struct Uncompressed {
google::protobuf::io::FileOutputStream* rawOutput) { typedef google::protobuf::io::FileInputStream InputStream;
google::protobuf::io::CodedOutputStream output(rawOutput); typedef google::protobuf::io::FileOutputStream OutputStream;
const int size = message.ByteSize();
output.WriteVarint32(size); static uint64_t write(const google::protobuf::MessageLite& message,
uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size); google::protobuf::io::FileOutputStream* rawOutput) {
if (buffer != NULL) { google::protobuf::io::CodedOutputStream output(rawOutput);
message.SerializeWithCachedSizesToArray(buffer); const int size = message.ByteSize();
} else { output.WriteVarint32(size);
message.SerializeWithCachedSizes(&output); uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
if (output.HadError()) { if (buffer != NULL) {
throw OsException(rawOutput->GetErrno()); message.SerializeWithCachedSizesToArray(buffer);
} else {
message.SerializeWithCachedSizes(&output);
if (output.HadError()) {
throw OsException(rawOutput->GetErrno());
}
} }
return size;
} }
}
void readDelimited(google::protobuf::io::ZeroCopyInputStream* rawInput, static void read(google::protobuf::io::ZeroCopyInputStream* rawInput,
google::protobuf::MessageLite* message) { google::protobuf::MessageLite* message) {
google::protobuf::io::CodedInputStream input(rawInput); google::protobuf::io::CodedInputStream input(rawInput);
uint32_t size; uint32_t size;
if (!input.ReadVarint32(&size)) { GOOGLE_CHECK(input.ReadVarint32(&size));
throw std::logic_error("Read failed.");
auto limit = input.PushLimit(size);
GOOGLE_CHECK(message->MergePartialFromCodedStream(&input) &&
input.ConsumedEntireMessage());
input.PopLimit(limit);
} }
auto limit = input.PushLimit(size); static void flush(google::protobuf::io::FileOutputStream* output) {
if (!output->Flush()) throw OsException(output->GetErrno());
}
};
if (!message->MergePartialFromCodedStream(&input) || // =======================================================================================
!input.ConsumedEntireMessage()) { // The Snappy interface is really obnoxious. I gave up here and am just reading/writing flat
throw std::logic_error("Read failed."); // arrays in some static scratch space. This probably gives protobufs an edge that it doesn't
// deserve.
void writeAll(int fd, const void* buffer, size_t size) {
const char* pos = reinterpret_cast<const char*>(buffer);
while (size > 0) {
ssize_t n = write(fd, pos, size);
GOOGLE_CHECK_GT(n, 0);
pos += n;
size -= n;
} }
}
input.PopLimit(limit); void readAll(int fd, void* buffer, size_t size) {
char* pos = reinterpret_cast<char*>(buffer);
while (size > 0) {
ssize_t n = read(fd, pos, size);
GOOGLE_CHECK_GT(n, 0);
pos += n;
size -= n;
}
} }
static char scratch[128 << 10];
static char scratch2[128 << 10];
struct SnappyCompressed {
typedef int InputStream;
typedef int OutputStream;
static uint64_t write(const google::protobuf::MessageLite& message, int* output) {
size_t size = message.ByteSize();
GOOGLE_CHECK_LE(size, sizeof(scratch));
message.SerializeWithCachedSizesToArray(reinterpret_cast<uint8_t*>(scratch));
size_t compressedSize = 0;
snappy::RawCompress(scratch, size, scratch2 + sizeof(uint32_t), &compressedSize);
uint32_t tag = compressedSize;
memcpy(scratch2, &tag, sizeof(tag));
writeAll(*output, scratch2, compressedSize + sizeof(tag));
return compressedSize + sizeof(tag);
}
static void read(int* input, google::protobuf::MessageLite* message) {
uint32_t size;
readAll(*input, &size, sizeof(size));
readAll(*input, scratch, size);
size_t uncompressedSize;
GOOGLE_CHECK(snappy::GetUncompressedLength(scratch, size, &uncompressedSize));
GOOGLE_CHECK(snappy::RawUncompress(scratch, size, scratch2));
GOOGLE_CHECK(message->ParsePartialFromArray(scratch2, uncompressedSize));
}
static void flush(OutputStream*) {}
};
// ======================================================================================= // =======================================================================================
#define REUSABLE(type) \ #define REUSABLE(type) \
...@@ -266,10 +337,12 @@ void readDelimited(google::protobuf::io::ZeroCopyInputStream* rawInput, ...@@ -266,10 +337,12 @@ void readDelimited(google::protobuf::io::ZeroCopyInputStream* rawInput,
#define SINGLE_USE(type) \ #define SINGLE_USE(type) \
typename ReuseStrategy::template Message<typename TestCase::type>::SingleUse typename ReuseStrategy::template Message<typename TestCase::type>::SingleUse
template <typename TestCase, typename ReuseStrategy> template <typename TestCase, typename ReuseStrategy, typename Compression>
void syncClient(int inputFd, int outputFd, uint64_t iters) { uint64_t syncClient(int inputFd, int outputFd, uint64_t iters) {
google::protobuf::io::FileOutputStream output(outputFd); uint64_t throughput = 0;
google::protobuf::io::FileInputStream input(inputFd);
typename Compression::OutputStream output(outputFd);
typename Compression::InputStream input(inputFd);
REUSABLE(Request) reusableRequest; REUSABLE(Request) reusableRequest;
REUSABLE(Response) reusableResponse; REUSABLE(Response) reusableResponse;
...@@ -277,47 +350,52 @@ void syncClient(int inputFd, int outputFd, uint64_t iters) { ...@@ -277,47 +350,52 @@ void syncClient(int inputFd, int outputFd, uint64_t iters) {
for (; iters > 0; --iters) { for (; iters > 0; --iters) {
SINGLE_USE(Request) request(reusableRequest); SINGLE_USE(Request) request(reusableRequest);
typename TestCase::Expectation expected = TestCase::setupRequest(&request); typename TestCase::Expectation expected = TestCase::setupRequest(&request);
writeDelimited(request, &output); throughput += Compression::write(request, &output);
if (!output.Flush()) throw OsException(output.GetErrno()); Compression::flush(&output);
ReuseStrategy::doneWith(request); ReuseStrategy::doneWith(request);
SINGLE_USE(Response) response(reusableResponse); SINGLE_USE(Response) response(reusableResponse);
readDelimited(&input, &response); Compression::read(&input, &response);
if (!TestCase::checkResponse(response, expected)) { if (!TestCase::checkResponse(response, expected)) {
throw std::logic_error("Incorrect response."); throw std::logic_error("Incorrect response.");
} }
ReuseStrategy::doneWith(response); ReuseStrategy::doneWith(response);
} }
return throughput;
} }
template <typename TestCase, typename ReuseStrategy> template <typename TestCase, typename ReuseStrategy, typename Compression>
void asyncClientSender(int outputFd, uint64_t asyncClientSender(int outputFd,
ProducerConsumerQueue<typename TestCase::Expectation>* expectations, ProducerConsumerQueue<typename TestCase::Expectation>* expectations,
uint64_t iters) { uint64_t iters) {
google::protobuf::io::FileOutputStream output(outputFd); uint64_t throughput = 0;
typename Compression::OutputStream output(outputFd);
REUSABLE(Request) reusableRequest; REUSABLE(Request) reusableRequest;
for (; iters > 0; --iters) { for (; iters > 0; --iters) {
SINGLE_USE(Request) request(reusableRequest); SINGLE_USE(Request) request(reusableRequest);
expectations->post(TestCase::setupRequest(&request)); expectations->post(TestCase::setupRequest(&request));
writeDelimited(request, &output); throughput += Compression::write(request, &output);
Compression::flush(&output);
ReuseStrategy::doneWith(request); ReuseStrategy::doneWith(request);
} }
if (!output.Flush()) throw OsException(output.GetErrno()); return throughput;
} }
template <typename TestCase, typename ReuseStrategy> template <typename TestCase, typename ReuseStrategy, typename Compression>
void asyncClientReceiver(int inputFd, void asyncClientReceiver(int inputFd,
ProducerConsumerQueue<typename TestCase::Expectation>* expectations, ProducerConsumerQueue<typename TestCase::Expectation>* expectations,
uint64_t iters) { uint64_t iters) {
google::protobuf::io::FileInputStream input(inputFd); typename Compression::InputStream input(inputFd);
REUSABLE(Response) reusableResponse; REUSABLE(Response) reusableResponse;
for (; iters > 0; --iters) { for (; iters > 0; --iters) {
typename TestCase::Expectation expected = expectations->next(); typename TestCase::Expectation expected = expectations->next();
SINGLE_USE(Response) response(reusableResponse); SINGLE_USE(Response) response(reusableResponse);
readDelimited(&input, &response); Compression::read(&input, &response);
if (!TestCase::checkResponse(response, expected)) { if (!TestCase::checkResponse(response, expected)) {
throw std::logic_error("Incorrect response."); throw std::logic_error("Incorrect response.");
} }
...@@ -325,39 +403,46 @@ void asyncClientReceiver(int inputFd, ...@@ -325,39 +403,46 @@ void asyncClientReceiver(int inputFd,
} }
} }
template <typename TestCase, typename ReuseStrategy> template <typename TestCase, typename ReuseStrategy, typename Compression>
void asyncClient(int inputFd, int outputFd, uint64_t iters) { uint64_t asyncClient(int inputFd, int outputFd, uint64_t iters) {
ProducerConsumerQueue<typename TestCase::Expectation> expectations; ProducerConsumerQueue<typename TestCase::Expectation> expectations;
std::thread receiverThread( std::thread receiverThread(
asyncClientReceiver<TestCase, ReuseStrategy>, inputFd, &expectations, iters); asyncClientReceiver<TestCase, ReuseStrategy, Compression>, inputFd, &expectations, iters);
asyncClientSender<TestCase, ReuseStrategy>(outputFd, &expectations, iters); uint64_t throughput =
asyncClientSender<TestCase, ReuseStrategy, Compression>(outputFd, &expectations, iters);
receiverThread.join(); receiverThread.join();
return throughput;
} }
template <typename TestCase, typename ReuseStrategy> template <typename TestCase, typename ReuseStrategy, typename Compression>
void server(int inputFd, int outputFd, uint64_t iters) { uint64_t server(int inputFd, int outputFd, uint64_t iters) {
google::protobuf::io::FileOutputStream output(outputFd); uint64_t throughput = 0;
google::protobuf::io::FileInputStream input(inputFd);
typename Compression::OutputStream output(outputFd);
typename Compression::InputStream input(inputFd);
REUSABLE(Request) reusableRequest; REUSABLE(Request) reusableRequest;
REUSABLE(Response) reusableResponse; REUSABLE(Response) reusableResponse;
for (; iters > 0; --iters) { for (; iters > 0; --iters) {
SINGLE_USE(Request) request(reusableRequest); SINGLE_USE(Request) request(reusableRequest);
readDelimited(&input, &request); Compression::read(&input, &request);
SINGLE_USE(Response) response(reusableResponse); SINGLE_USE(Response) response(reusableResponse);
TestCase::handleRequest(request, &response); TestCase::handleRequest(request, &response);
ReuseStrategy::doneWith(request); ReuseStrategy::doneWith(request);
writeDelimited(response, &output); throughput += Compression::write(response, &output);
if (!output.Flush()) throw std::logic_error("Write failed."); Compression::flush(&output);
ReuseStrategy::doneWith(response); ReuseStrategy::doneWith(response);
} }
return throughput;
} }
template <typename TestCase, typename ReuseStrategy> template <typename TestCase, typename ReuseStrategy, typename Compression>
void passByObject(uint64_t iters) { uint64_t passByObject(uint64_t iters) {
REUSABLE(Request) reusableRequest; REUSABLE(Request) reusableRequest;
REUSABLE(Response) reusableResponse; REUSABLE(Response) reusableResponse;
...@@ -373,10 +458,14 @@ void passByObject(uint64_t iters) { ...@@ -373,10 +458,14 @@ void passByObject(uint64_t iters) {
} }
ReuseStrategy::doneWith(response); ReuseStrategy::doneWith(response);
} }
return 0;
} }
template <typename TestCase, typename ReuseStrategy> template <typename TestCase, typename ReuseStrategy, typename Compression>
void passByBytes(uint64_t iters) { uint64_t passByBytes(uint64_t iters) {
uint64_t throughput = 0;
REUSABLE(Request) reusableClientRequest; REUSABLE(Request) reusableClientRequest;
REUSABLE(Request) reusableServerRequest; REUSABLE(Request) reusableServerRequest;
REUSABLE(Response) reusableServerResponse; REUSABLE(Response) reusableServerResponse;
...@@ -389,6 +478,7 @@ void passByBytes(uint64_t iters) { ...@@ -389,6 +478,7 @@ void passByBytes(uint64_t iters) {
typename ReuseStrategy::SingleUseString requestString(reusableRequestString); typename ReuseStrategy::SingleUseString requestString(reusableRequestString);
clientRequest.SerializePartialToString(&requestString); clientRequest.SerializePartialToString(&requestString);
throughput += requestString.size();
ReuseStrategy::doneWith(clientRequest); ReuseStrategy::doneWith(clientRequest);
SINGLE_USE(Request) serverRequest(reusableServerRequest); SINGLE_USE(Request) serverRequest(reusableServerRequest);
...@@ -400,6 +490,7 @@ void passByBytes(uint64_t iters) { ...@@ -400,6 +490,7 @@ void passByBytes(uint64_t iters) {
typename ReuseStrategy::SingleUseString responseString(reusableResponseString); typename ReuseStrategy::SingleUseString responseString(reusableResponseString);
serverResponse.SerializePartialToString(&responseString); serverResponse.SerializePartialToString(&responseString);
throughput += responseString.size();
ReuseStrategy::doneWith(serverResponse); ReuseStrategy::doneWith(serverResponse);
SINGLE_USE(Response) clientResponse(reusableClientResponse); SINGLE_USE(Response) clientResponse(reusableClientResponse);
...@@ -410,10 +501,12 @@ void passByBytes(uint64_t iters) { ...@@ -410,10 +501,12 @@ void passByBytes(uint64_t iters) {
} }
ReuseStrategy::doneWith(clientResponse); ReuseStrategy::doneWith(clientResponse);
} }
return throughput;
} }
template <typename TestCase, typename ReuseStrategy, typename Func> template <typename TestCase, typename ReuseStrategy, typename Compression, typename Func>
void passByPipe(Func&& clientFunc, uint64_t iters) { uint64_t passByPipe(Func&& clientFunc, uint64_t iters) {
int clientToServer[2]; int clientToServer[2];
int serverToClient[2]; int serverToClient[2];
if (pipe(clientToServer) < 0) throw OsException(errno); if (pipe(clientToServer) < 0) throw OsException(errno);
...@@ -425,14 +518,21 @@ void passByPipe(Func&& clientFunc, uint64_t iters) { ...@@ -425,14 +518,21 @@ void passByPipe(Func&& clientFunc, uint64_t iters) {
close(clientToServer[0]); close(clientToServer[0]);
close(serverToClient[1]); close(serverToClient[1]);
clientFunc(serverToClient[0], clientToServer[1], iters); uint64_t throughput = clientFunc(serverToClient[0], clientToServer[1], iters);
writeAll(clientToServer[1], &throughput, sizeof(throughput));
exit(0); exit(0);
} else { } else {
// Server. // Server.
close(clientToServer[1]); close(clientToServer[1]);
close(serverToClient[0]); close(serverToClient[0]);
server<TestCase, ReuseStrategy>(clientToServer[0], serverToClient[1], iters); uint64_t throughput =
server<TestCase, ReuseStrategy, Compression>(clientToServer[0], serverToClient[1], iters);
uint64_t clientThroughput = 0;
readAll(clientToServer[0], &clientThroughput, sizeof(clientThroughput));
throughput += clientThroughput;
int status; int status;
if (waitpid(child, &status, 0) != child) { if (waitpid(child, &status, 0) != child) {
...@@ -441,52 +541,72 @@ void passByPipe(Func&& clientFunc, uint64_t iters) { ...@@ -441,52 +541,72 @@ void passByPipe(Func&& clientFunc, uint64_t iters) {
if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) { if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
throw std::logic_error("Child exited abnormally."); throw std::logic_error("Child exited abnormally.");
} }
return throughput;
} }
} }
template <typename ReuseStrategy> template <typename ReuseStrategy, typename Compression>
void doBenchmark(const std::string& mode, uint64_t iters) { uint64_t doBenchmark(const std::string& mode, uint64_t iters) {
if (mode == "client") { if (mode == "client") {
syncClient<ExpressionTestCase, ReuseStrategy>(STDIN_FILENO, STDOUT_FILENO, iters); return syncClient<ExpressionTestCase, ReuseStrategy, Compression>(
STDIN_FILENO, STDOUT_FILENO, iters);
} else if (mode == "server") { } else if (mode == "server") {
server<ExpressionTestCase, ReuseStrategy>(STDIN_FILENO, STDOUT_FILENO, iters); return server<ExpressionTestCase, ReuseStrategy, Compression>(
STDIN_FILENO, STDOUT_FILENO, iters);
} else if (mode == "object") { } else if (mode == "object") {
passByObject<ExpressionTestCase, ReuseStrategy>(iters); return passByObject<ExpressionTestCase, ReuseStrategy, Compression>(iters);
} else if (mode == "bytes") { } else if (mode == "bytes") {
passByBytes<ExpressionTestCase, ReuseStrategy>(iters); return passByBytes<ExpressionTestCase, ReuseStrategy, Compression>(iters);
} else if (mode == "pipe") { } else if (mode == "pipe") {
passByPipe<ExpressionTestCase, ReuseStrategy>( return passByPipe<ExpressionTestCase, ReuseStrategy, Compression>(
syncClient<ExpressionTestCase, ReuseStrategy>, iters); syncClient<ExpressionTestCase, ReuseStrategy, Compression>, iters);
} else if (mode == "pipe-async") { } else if (mode == "pipe-async") {
passByPipe<ExpressionTestCase, ReuseStrategy>( return passByPipe<ExpressionTestCase, ReuseStrategy, Compression>(
asyncClient<ExpressionTestCase, ReuseStrategy>, iters); asyncClient<ExpressionTestCase, ReuseStrategy, Compression>, iters);
} else { } else {
std::cerr << "Unknown mode: " << mode << std::endl; std::cerr << "Unknown mode: " << mode << std::endl;
exit(1); exit(1);
} }
} }
template <typename Compression>
uint64_t doBenchmark2(const std::string& mode, const std::string& reuse, uint64_t iters) {
if (reuse == "reuse") {
return doBenchmark<ReusableMessages, Compression>(mode, iters);
} else if (reuse == "no-reuse") {
return doBenchmark<SingleUseMessages, Compression>(mode, iters);
} else {
std::cerr << "Unknown reuse mode: " << reuse << std::endl;
exit(1);
}
}
int main(int argc, char* argv[]) { int main(int argc, char* argv[]) {
if (argc != 4) { if (argc != 5) {
std::cerr << "USAGE: " << argv[0] << " MODE REUSE ITERATION_COUNT" << std::endl; std::cerr << "USAGE: " << argv[0] << " MODE REUSE COMPRESSION ITERATION_COUNT" << std::endl;
return 1; return 1;
} }
uint64_t iters = strtoull(argv[3], nullptr, 0); uint64_t iters = strtoull(argv[4], nullptr, 0);
srand(123); srand(123);
std::cerr << "Doing " << iters << " iterations..." << std::endl; std::cerr << "Doing " << iters << " iterations..." << std::endl;
std::string reuse = argv[2]; uint64_t throughput;
if (reuse == "reuse") {
doBenchmark<ReusableMessages>(argv[1], iters); std::string compression = argv[3];
} else if (reuse == "no-reuse") { if (compression == "none") {
doBenchmark<SingleUseMessages>(argv[1], iters); throughput = doBenchmark2<Uncompressed>(argv[1], argv[2], iters);
} else if (compression == "snappy") {
throughput = doBenchmark2<SnappyCompressed>(argv[1], argv[2], iters);
} else { } else {
std::cerr << "Unknown reuse mode: " << reuse << std::endl; std::cerr << "Unknown compression mode: " << compression << std::endl;
return 1; return 1;
} }
std::cerr << "Average messages size = " << (throughput / iters) << std::endl;
return 0; return 0;
} }
......
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "message.h"
#include <gtest/gtest.h>
namespace capnproto {
namespace internal {
namespace {
TEST(Message, MallocBuilderWithFirstSegment) {
word scratch[16];
MallocMessageBuilder builder(arrayPtr(scratch, 16), AllocationStrategy::FIXED_SIZE);
ArrayPtr<word> segment = builder.allocateSegment(1);
EXPECT_EQ(scratch, segment.begin());
EXPECT_EQ(16u, segment.size());
segment = builder.allocateSegment(1);
EXPECT_NE(scratch, segment.begin());
EXPECT_EQ(16u, segment.size());
segment = builder.allocateSegment(1);
EXPECT_NE(scratch, segment.begin());
EXPECT_EQ(16u, segment.size());
}
// TODO: More tests.
} // namespace
} // namespace internal
} // namespace capnproto
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "test.capnp.h"
#include "serialize-snappy.h"
#include <gtest/gtest.h>
#include <string>
#include <stdlib.h>
#include "test-util.h"
namespace capnproto {
namespace internal {
namespace {
class TestMessageBuilder: public MallocMessageBuilder {
// A MessageBuilder that tries to allocate an exact number of total segments, by allocating
// minimum-size segments until it reaches the number, then allocating one large segment to
// finish.
public:
explicit TestMessageBuilder(uint desiredSegmentCount)
: MallocMessageBuilder(0, AllocationStrategy::FIXED_SIZE),
desiredSegmentCount(desiredSegmentCount) {}
~TestMessageBuilder() {
EXPECT_EQ(0u, desiredSegmentCount);
}
ArrayPtr<word> allocateSegment(uint minimumSize) override {
if (desiredSegmentCount <= 1) {
if (desiredSegmentCount < 1) {
ADD_FAILURE() << "Allocated more segments than desired.";
} else {
--desiredSegmentCount;
}
return MallocMessageBuilder::allocateSegment(SUGGESTED_FIRST_SEGMENT_WORDS);
} else {
--desiredSegmentCount;
return MallocMessageBuilder::allocateSegment(minimumSize);
}
}
private:
uint desiredSegmentCount;
};
class TestPipe: public InputStream, public OutputStream {
public:
TestPipe(bool lazy)
: lazy(lazy), readPos(0) {}
~TestPipe() {}
void write(const void* buffer, size_t size) override {
data.append(reinterpret_cast<const char*>(buffer), size);
}
size_t read(void* buffer, size_t minBytes, size_t maxBytes) override {
CAPNPROTO_ASSERT(maxBytes <= data.size() - readPos, "Overran end of stream.");
size_t amount = lazy ? minBytes : maxBytes;
memcpy(buffer, data.data() + readPos, amount);
readPos += amount;
return amount;
}
private:
bool lazy;
std::string data;
std::string::size_type readPos;
};
TEST(Snappy, RoundTrip) {
TestMessageBuilder builder(1);
initTestMessage(builder.initRoot<TestAllTypes>());
TestPipe pipe(false);
writeSnappyMessage(pipe, builder);
SnappyMessageReader reader(pipe);
checkTestMessage(reader.getRoot<TestAllTypes>());
}
TEST(Snappy, RoundTripScratchSpace) {
TestMessageBuilder builder(1);
initTestMessage(builder.initRoot<TestAllTypes>());
TestPipe pipe(false);
writeSnappyMessage(pipe, builder);
word scratch[1024];
SnappyMessageReader reader(pipe, ReaderOptions(), ArrayPtr<word>(scratch, 1024));
checkTestMessage(reader.getRoot<TestAllTypes>());
}
TEST(Snappy, RoundTripLazy) {
TestMessageBuilder builder(1);
initTestMessage(builder.initRoot<TestAllTypes>());
TestPipe pipe(true);
writeSnappyMessage(pipe, builder);
SnappyMessageReader reader(pipe);
checkTestMessage(reader.getRoot<TestAllTypes>());
}
TEST(Snappy, RoundTripOddSegmentCount) {
TestMessageBuilder builder(7);
initTestMessage(builder.initRoot<TestAllTypes>());
TestPipe pipe(false);
writeSnappyMessage(pipe, builder);
SnappyMessageReader reader(pipe);
checkTestMessage(reader.getRoot<TestAllTypes>());
}
TEST(Snappy, RoundTripOddSegmentCountLazy) {
TestMessageBuilder builder(7);
initTestMessage(builder.initRoot<TestAllTypes>());
TestPipe pipe(true);
writeSnappyMessage(pipe, builder);
SnappyMessageReader reader(pipe);
checkTestMessage(reader.getRoot<TestAllTypes>());
}
TEST(Snappy, RoundTripEvenSegmentCount) {
TestMessageBuilder builder(10);
initTestMessage(builder.initRoot<TestAllTypes>());
TestPipe pipe(false);
writeSnappyMessage(pipe, builder);
SnappyMessageReader reader(pipe);
checkTestMessage(reader.getRoot<TestAllTypes>());
}
TEST(Snappy, RoundTripEvenSegmentCountLazy) {
TestMessageBuilder builder(10);
initTestMessage(builder.initRoot<TestAllTypes>());
TestPipe pipe(true);
writeSnappyMessage(pipe, builder);
SnappyMessageReader reader(pipe);
checkTestMessage(reader.getRoot<TestAllTypes>());
}
TEST(Snappy, RoundTripTwoMessages) {
TestMessageBuilder builder(1);
initTestMessage(builder.initRoot<TestAllTypes>());
TestMessageBuilder builder2(1);
builder2.initRoot<TestAllTypes>().setTextField("Second message.");
TestPipe pipe(false);
writeSnappyMessage(pipe, builder);
writeSnappyMessage(pipe, builder2);
{
SnappyMessageReader reader(pipe);
checkTestMessage(reader.getRoot<TestAllTypes>());
}
{
SnappyMessageReader reader(pipe);
EXPECT_EQ("Second message.", reader.getRoot<TestAllTypes>().getTextField());
}
}
// TODO: Test error cases.
} // namespace
} // namespace internal
} // namespace capnproto
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "serialize-snappy.h"
#include "wire-format.h"
#include <snappy/snappy.h>
#include <snappy/snappy-sinksource.h>
#include <vector>
namespace capnproto {
namespace {
class InputStreamSource: public snappy::Source {
public:
inline InputStreamSource(InputStream& inputStream, size_t available)
: inputStream(inputStream), available(available), pos(nullptr), end(nullptr) {
// Read at least 10 bytes (or all available bytes if less than 10), and at most the whole buffer
// (unless less is available).
size_t firstSize = inputStream.read(buffer,
std::min<size_t>(available, 10),
std::min<size_t>(available, sizeof(buffer)));
CAPNPROTO_ASSERT(snappy::GetUncompressedLength(buffer, firstSize, &uncompressedSize),
"Invalid snappy-compressed data.");
pos = buffer;
end = pos + firstSize;
}
inline ~InputStreamSource() {};
inline size_t getUncompressedSize() { return uncompressedSize; }
// implements snappy::Source ---------------------------------------
size_t Available() const override {
return available;
}
const char* Peek(size_t* len) override {
*len = end - pos;
return pos;
}
void Skip(size_t n) override {
pos += n;
available -= n;
if (pos == end && available > 0) {
// Read more from the input.
pos = buffer;
end = pos + inputStream.read(buffer, 1, std::min<size_t>(available, sizeof(buffer)));
}
}
private:
InputStream& inputStream;
size_t available;
size_t uncompressedSize;
char* pos;
char* end;
char buffer[8192];
};
} // namespcae
SnappyMessageReader::SnappyMessageReader(
InputStream& inputStream, ReaderOptions options, ArrayPtr<word> scratchSpace)
: MessageReader(options), inputStream(inputStream) {
internal::WireValue<uint32_t> wireCompressedSize;
inputStream.read(&wireCompressedSize, sizeof(wireCompressedSize));
size_t compressedSize = wireCompressedSize.get();
InputStreamSource source(inputStream, compressedSize);
CAPNPROTO_ASSERT(source.getUncompressedSize() % sizeof(word) == 0,
"Uncompressed size was not a whole number of words.");
size_t uncompressedWords = source.getUncompressedSize() / sizeof(word);
if (scratchSpace.size() < uncompressedWords) {
space = newArray<word>(uncompressedWords);
scratchSpace = space;
}
CAPNPROTO_ASSERT(
snappy::RawUncompress(&source, reinterpret_cast<char*>(scratchSpace.begin())),
"Snappy decompression failed.");
new(&underlyingReader) FlatArrayMessageReader(scratchSpace, options);
}
SnappyMessageReader::~SnappyMessageReader() {
underlyingReader.~FlatArrayMessageReader();
}
ArrayPtr<const word> SnappyMessageReader::getSegment(uint id) {
return underlyingReader.getSegment(id);
}
SnappyFdMessageReader::~SnappyFdMessageReader() {}
// =======================================================================================
namespace {
class SegmentArraySource: public snappy::Source {
public:
SegmentArraySource(ArrayPtr<const ArrayPtr<const byte>> pieces)
: pieces(pieces), offset(0), available(0) {
for (auto& piece: pieces) {
available += piece.size();
}
Skip(0); // Skip leading zero-sized pieces, if any.
}
~SegmentArraySource() {}
// implements snappy::Source ---------------------------------------
size_t Available() const override {
return available;
}
const char* Peek(size_t* len) override {
if (pieces.size() == 0) {
*len = 0;
return nullptr;
} else {
*len = pieces[0].size() - offset;
return reinterpret_cast<const char*>(pieces[0].begin()) + offset;
}
}
void Skip(size_t n) override {
available -= n;
while (pieces.size() > 0 && n >= pieces[0].size() - offset) {
n -= pieces[0].size() - offset;
offset = 0;
pieces = pieces.slice(1, pieces.size());
}
offset += n;
}
private:
ArrayPtr<const ArrayPtr<const byte>> pieces;
size_t offset;
size_t available;
};
class AccumulatingSink: public snappy::Sink {
public:
AccumulatingSink()
: pos(firstPiece + sizeof(compressedSize)),
end(firstPiece + sizeof(firstPiece)), firstEnd(firstPiece) {}
~AccumulatingSink() {}
void writeTo(size_t size, OutputStream& output) {
finalizePiece();
compressedSize.set(size);
ArrayPtr<const byte> pieces[morePieces.size() + 1];
pieces[0] = arrayPtr(reinterpret_cast<byte*>(firstPiece),
reinterpret_cast<byte*>(firstEnd));
for (uint i = 0; i < morePieces.size(); i++) {
auto& piece = morePieces[i];
pieces[i + 1] = arrayPtr(reinterpret_cast<byte*>(piece.start.get()),
reinterpret_cast<byte*>(piece.end));
}
output.write(arrayPtr(pieces, morePieces.size() + 1));
}
// implements snappy::Sink -----------------------------------------
void Append(const char* bytes, size_t n) override {
if (bytes == pos) {
pos += n;
} else {
size_t a = available();
if (n > a) {
memcpy(pos, bytes, a);
bytes += a;
addPiece(n);
}
memcpy(pos, bytes, n);
pos += n;
}
}
char* GetAppendBuffer(size_t length, char* scratch) override {
if (length > available()) {
addPiece(length);
}
return pos;
}
private:
char* pos;
char* end;
// Stand and end point of the current available space.
char* firstEnd;
// End point of the used portion of the first piece.
struct Piece {
std::unique_ptr<char[]> start;
// Start point of the piece.
char* end;
// End point of the used portion of the piece.
Piece() = default;
Piece(std::unique_ptr<char[]> start, char* end): start(std::move(start)), end(end) {}
};
std::vector<Piece> morePieces;
union {
internal::WireValue<uint32_t> compressedSize;
char firstPiece[8192];
};
inline size_t available() {
return end - pos;
}
inline void finalizePiece() {
if (morePieces.empty()) {
firstEnd = pos;
} else {
morePieces.back().end = pos;
}
}
void addPiece(size_t minSize) {
finalizePiece();
std::unique_ptr<char[]> newPiece(new char[minSize]);
pos = newPiece.get();
end = pos + minSize;
morePieces.emplace_back(std::move(newPiece), pos);
}
};
class SnappyOutputStream: public OutputStream {
public:
SnappyOutputStream(OutputStream& output): output(output), sawWrite(false) {}
// implements OutputStream -----------------------------------------
void write(const void* buffer, size_t size) override {
CAPNPROTO_ASSERT(false, "writeMessage() was not expected to call this.");
}
void write(ArrayPtr<const ArrayPtr<const byte>> pieces) {
CAPNPROTO_ASSERT(!sawWrite, "writeMessage() was expected to issue exactly one write.");
sawWrite = true;
SegmentArraySource source(pieces);
AccumulatingSink sink;
size_t size = snappy::Compress(&source, &sink);
sink.writeTo(size, output);
}
private:
OutputStream& output;
bool sawWrite;
};
} // namespace
void writeSnappyMessage(OutputStream& output, ArrayPtr<const ArrayPtr<const word>> segments) {
SnappyOutputStream snappyOutput(output);
writeMessage(snappyOutput, segments);
}
void writeSnappyMessageToFd(int fd, ArrayPtr<const ArrayPtr<const word>> segments) {
FdOutputStream output(fd);
writeSnappyMessage(output, segments);
}
} // namespace capnproto
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#ifndef CAPNPROTO_SERIALIZE_SNAPPY_H_
#define CAPNPROTO_SERIALIZE_SNAPPY_H_
#include "serialize.h"
namespace capnproto {
class SnappyMessageReader: public MessageReader {
public:
SnappyMessageReader(InputStream& inputStream, ReaderOptions options = ReaderOptions(),
ArrayPtr<word> scratchSpace = nullptr);
~SnappyMessageReader();
ArrayPtr<const word> getSegment(uint id) override;
private:
InputStream& inputStream;
Array<word> space;
union {
FlatArrayMessageReader underlyingReader;
};
};
class SnappyFdMessageReader: private FdInputStream, public SnappyMessageReader {
public:
SnappyFdMessageReader(int fd, ReaderOptions options = ReaderOptions(),
ArrayPtr<word> scratchSpace = nullptr)
: FdInputStream(fd), SnappyMessageReader(*this, options, scratchSpace) {}
// Read message from a file descriptor, without taking ownership of the descriptor.
SnappyFdMessageReader(AutoCloseFd fd, ReaderOptions options = ReaderOptions(),
ArrayPtr<word> scratchSpace = nullptr)
: FdInputStream(move(fd)), SnappyMessageReader(*this, options, scratchSpace) {}
// Read a message from a file descriptor, taking ownership of the descriptor.
~SnappyFdMessageReader();
};
void writeSnappyMessage(OutputStream& output, MessageBuilder& builder);
void writeSnappyMessage(OutputStream& output, ArrayPtr<const ArrayPtr<const word>> segments);
void writeSnappyMessageToFd(int fd, MessageBuilder& builder);
void writeSnappyMessageToFd(int fd, ArrayPtr<const ArrayPtr<const word>> segments);
// =======================================================================================
// inline stuff
inline void writeSnappyMessage(OutputStream& output, MessageBuilder& builder) {
writeSnappyMessage(output, builder.getSegmentsForOutput());
}
inline void writeSnappyMessageToFd(int fd, MessageBuilder& builder) {
writeSnappyMessageToFd(fd, builder.getSegmentsForOutput());
}
} // namespace capnproto
#endif // CAPNPROTO_SERIALIZE_SNAPPY_H_
...@@ -152,7 +152,7 @@ InputStreamMessageReader::InputStreamMessageReader( ...@@ -152,7 +152,7 @@ InputStreamMessageReader::InputStreamMessageReader(
: MessageReader(options), inputStream(inputStream), readPos(nullptr) { : MessageReader(options), inputStream(inputStream), readPos(nullptr) {
internal::WireValue<uint32_t> firstWord[2]; internal::WireValue<uint32_t> firstWord[2];
inputStream.read(firstWord, sizeof(firstWord), sizeof(firstWord)); inputStream.read(firstWord, sizeof(firstWord));
uint segmentCount = firstWord[0].get(); uint segmentCount = firstWord[0].get();
uint segment0Size = segmentCount == 0 ? 0 : firstWord[1].get(); uint segment0Size = segmentCount == 0 ? 0 : firstWord[1].get();
...@@ -162,7 +162,7 @@ InputStreamMessageReader::InputStreamMessageReader( ...@@ -162,7 +162,7 @@ InputStreamMessageReader::InputStreamMessageReader(
// Read sizes for all segments except the first. Include padding if necessary. // Read sizes for all segments except the first. Include padding if necessary.
internal::WireValue<uint32_t> moreSizes[segmentCount & ~1]; internal::WireValue<uint32_t> moreSizes[segmentCount & ~1];
if (segmentCount > 1) { if (segmentCount > 1) {
inputStream.read(moreSizes, sizeof(moreSizes), sizeof(moreSizes)); inputStream.read(moreSizes, sizeof(moreSizes));
for (uint i = 0; i < segmentCount - 1; i++) { for (uint i = 0; i < segmentCount - 1; i++) {
totalWords += moreSizes[i].get(); totalWords += moreSizes[i].get();
} }
...@@ -188,7 +188,7 @@ InputStreamMessageReader::InputStreamMessageReader( ...@@ -188,7 +188,7 @@ InputStreamMessageReader::InputStreamMessageReader(
} }
if (segmentCount == 1) { if (segmentCount == 1) {
inputStream.read(scratchSpace.begin(), totalWords * sizeof(word), totalWords * sizeof(word)); inputStream.read(scratchSpace.begin(), totalWords * sizeof(word));
} else if (segmentCount > 1) { } else if (segmentCount > 1) {
readPos = reinterpret_cast<byte*>(scratchSpace.begin()); readPos = reinterpret_cast<byte*>(scratchSpace.begin());
readPos += inputStream.read(readPos, segment0Size * sizeof(word), totalWords * sizeof(word)); readPos += inputStream.read(readPos, segment0Size * sizeof(word), totalWords * sizeof(word));
...@@ -200,7 +200,16 @@ InputStreamMessageReader::~InputStreamMessageReader() { ...@@ -200,7 +200,16 @@ InputStreamMessageReader::~InputStreamMessageReader() {
// Note that lazy reads only happen when we have multiple segments, so moreSegments.back() is // Note that lazy reads only happen when we have multiple segments, so moreSegments.back() is
// valid. // valid.
const byte* allEnd = reinterpret_cast<const byte*>(moreSegments.back().end()); const byte* allEnd = reinterpret_cast<const byte*>(moreSegments.back().end());
inputStream.skip(allEnd - readPos);
if (std::uncaught_exception()) {
try {
inputStream.skip(allEnd - readPos);
} catch (...) {
// TODO: Devise some way to report secondary errors during unwind.
}
} else {
inputStream.skip(allEnd - readPos);
}
} }
} }
...@@ -328,7 +337,12 @@ void FdOutputStream::write(const void* buffer, size_t size) { ...@@ -328,7 +337,12 @@ void FdOutputStream::write(const void* buffer, size_t size) {
ssize_t n = ::write(fd, pos, size); ssize_t n = ::write(fd, pos, size);
if (n <= 0) { if (n <= 0) {
CAPNPROTO_ASSERT(n < 0, "write() returned zero."); CAPNPROTO_ASSERT(n < 0, "write() returned zero.");
throw OsException("write", errno); int error = errno;
if (error == EINTR) {
continue;
} else {
throw OsException("write", error);
}
} }
pos += n; pos += n;
size -= n; size -= n;
......
...@@ -91,6 +91,9 @@ public: ...@@ -91,6 +91,9 @@ public:
// message. If it can't even reach minBytes, it MUST throw an exception, as the caller is not // message. If it can't even reach minBytes, it MUST throw an exception, as the caller is not
// expected to understand how to deal with partial reads. // expected to understand how to deal with partial reads.
inline void read(void* buffer, size_t bytes) { read(buffer, bytes, bytes); }
// Convenience method for reading an exact number of bytes.
virtual void skip(size_t bytes); virtual void skip(size_t bytes);
// Skips past the given number of bytes, discarding them. The default implementation read()s // Skips past the given number of bytes, discarding them. The default implementation read()s
// into a scratch buffer. // into a scratch buffer.
...@@ -210,18 +213,11 @@ public: ...@@ -210,18 +213,11 @@ public:
ArrayPtr<word> scratchSpace = nullptr) ArrayPtr<word> scratchSpace = nullptr)
: FdInputStream(fd), InputStreamMessageReader(*this, options, scratchSpace) {} : FdInputStream(fd), InputStreamMessageReader(*this, options, scratchSpace) {}
// Read message from a file descriptor, without taking ownership of the descriptor. // Read message from a file descriptor, without taking ownership of the descriptor.
//
// Since this version implies that the caller intends to read more data from the fd later on, the
// default is to read the entire message eagerly in the constructor, so that the fd will be
// deterministically positioned just past the end of the message.
StreamFdMessageReader(AutoCloseFd fd, ReaderOptions options = ReaderOptions(), StreamFdMessageReader(AutoCloseFd fd, ReaderOptions options = ReaderOptions(),
ArrayPtr<word> scratchSpace = nullptr) ArrayPtr<word> scratchSpace = nullptr)
: FdInputStream(move(fd)), InputStreamMessageReader(*this, options, scratchSpace) {} : FdInputStream(move(fd)), InputStreamMessageReader(*this, options, scratchSpace) {}
// Read a message from a file descriptor, taking ownership of the descriptor. // Read a message from a file descriptor, taking ownership of the descriptor.
//
// Since this version implies that the caller does not intend to read any more data from the fd,
// the default is to read the message lazily as needed.
~StreamFdMessageReader(); ~StreamFdMessageReader();
}; };
......
...@@ -116,7 +116,7 @@ class Array { ...@@ -116,7 +116,7 @@ class Array {
public: public:
inline Array(): ptr(nullptr), size_(0) {} inline Array(): ptr(nullptr), size_(0) {}
inline Array(std::nullptr_t): ptr(nullptr), size_(0) {} inline Array(std::nullptr_t): ptr(nullptr), size_(0) {}
inline Array(Array&& other): ptr(other.ptr), size_(other.size_) { inline Array(Array&& other) noexcept: ptr(other.ptr), size_(other.size_) {
other.ptr = nullptr; other.ptr = nullptr;
other.size_ = 0; other.size_ = 0;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment