Commit 53cc6ec8 authored by Kenton Varda's avatar Kenton Varda

Packed serialization -- removes the zeros.

parent 7b330237
......@@ -26,6 +26,7 @@
#include "common.h"
#include <capnproto/serialize.h>
#include <capnproto/serialize-packed.h>
#include <capnproto/serialize-snappy.h>
#include <thread>
......@@ -55,19 +56,66 @@ public:
// =======================================================================================
struct Uncompressed {
typedef StreamFdMessageReader MessageReader;
typedef FdInputStream& BufferedInput;
typedef InputStreamMessageReader MessageReader;
class ArrayMessageReader: public FlatArrayMessageReader {
public:
ArrayMessageReader(ArrayPtr<const byte> array,
ReaderOptions options = ReaderOptions(),
ArrayPtr<word> scratchSpace = nullptr)
: FlatArrayMessageReader(arrayPtr(
reinterpret_cast<const word*>(array.begin()),
reinterpret_cast<const word*>(array.end())), options) {}
};
static inline void write(OutputStream& output, MessageBuilder& builder) {
writeMessage(output, builder);
}
};
struct Packed {
typedef BufferedInputStreamWrapper BufferedInput;
typedef PackedMessageReader MessageReader;
class ArrayMessageReader: private ArrayInputStream, public PackedMessageReader {
public:
ArrayMessageReader(ArrayPtr<const byte> array,
ReaderOptions options = ReaderOptions(),
ArrayPtr<word> scratchSpace = nullptr)
: ArrayInputStream(array),
PackedMessageReader(*this, options, scratchSpace) {}
};
static inline void write(OutputStream& output, MessageBuilder& builder) {
writePackedMessage(output, builder);
}
static inline void write(BufferedOutputStream& output, MessageBuilder& builder) {
writePackedMessage(output, builder);
}
};
struct SnappyCompressed {
typedef SnappyFdMessageReader MessageReader;
typedef FdInputStream& BufferedInput;
typedef SnappyMessageReader MessageReader;
class ArrayMessageReader: private ArrayInputStream, public SnappyMessageReader {
public:
ArrayMessageReader(ArrayPtr<const byte> array,
ReaderOptions options = ReaderOptions(),
ArrayPtr<word> scratchSpace = nullptr)
: ArrayInputStream(array),
SnappyMessageReader(*this, options, scratchSpace) {}
};
static inline void write(OutputStream& output, MessageBuilder& builder) {
writeSnappyMessage(output, builder);
}
static inline void write(BufferedOutputStream& output, MessageBuilder& builder) {
writeSnappyMessage(output, builder);
}
};
// =======================================================================================
......@@ -78,8 +126,15 @@ struct NoScratch {
template <typename Compression>
class MessageReader: public Compression::MessageReader {
public:
inline MessageReader(int fd, ScratchSpace& scratch)
: Compression::MessageReader(fd) {}
inline MessageReader(typename Compression::BufferedInput& input, ScratchSpace& scratch)
: Compression::MessageReader(input) {}
};
template <typename Compression>
class ArrayMessageReader: public Compression::ArrayMessageReader {
public:
inline ArrayMessageReader(ArrayPtr<const byte> input, ScratchSpace& scratch)
: Compression::ArrayMessageReader(input) {}
};
class MessageBuilder: public MallocMessageBuilder {
......@@ -109,7 +164,7 @@ struct NoScratch {
};
constexpr size_t SCRATCH_SIZE = 128 * 1024;
word scratchSpace[4 * SCRATCH_SIZE];
word scratchSpace[6 * SCRATCH_SIZE];
int scratchCounter = 0;
struct UseScratch {
......@@ -117,7 +172,7 @@ struct UseScratch {
word* words;
ScratchSpace() {
CAPNPROTO_ASSERT(scratchCounter < 4, "Too many scratch spaces needed at once.");
CAPNPROTO_ASSERT(scratchCounter < 6, "Too many scratch spaces needed at once.");
words = scratchSpace + scratchCounter++ * SCRATCH_SIZE;
}
~ScratchSpace() {
......@@ -128,8 +183,17 @@ struct UseScratch {
template <typename Compression>
class MessageReader: public Compression::MessageReader {
public:
inline MessageReader(int fd, ScratchSpace& scratch)
: Compression::MessageReader(fd, ReaderOptions(), arrayPtr(scratch.words, SCRATCH_SIZE)) {}
inline MessageReader(typename Compression::BufferedInput& input, ScratchSpace& scratch)
: Compression::MessageReader(
input, ReaderOptions(), arrayPtr(scratch.words, SCRATCH_SIZE)) {}
};
template <typename Compression>
class ArrayMessageReader: public Compression::ArrayMessageReader {
public:
inline ArrayMessageReader(ArrayPtr<const byte> input, ScratchSpace& scratch)
: Compression::ArrayMessageReader(
input, ReaderOptions(), arrayPtr(scratch.words, SCRATCH_SIZE)) {}
};
class MessageBuilder: public MallocMessageBuilder {
......@@ -167,6 +231,9 @@ struct UseScratch {
template <typename TestCase, typename ReuseStrategy, typename Compression>
struct BenchmarkMethods {
static uint64_t syncClient(int inputFd, int outputFd, uint64_t iters) {
FdInputStream inputStream(inputFd);
typename Compression::BufferedInput bufferedInput(inputStream);
CountingOutputStream output(outputFd);
typename ReuseStrategy::ScratchSpace scratch;
......@@ -180,7 +247,7 @@ struct BenchmarkMethods {
}
{
typename ReuseStrategy::template MessageReader<Compression> reader(inputFd, scratch);
typename ReuseStrategy::template MessageReader<Compression> reader(bufferedInput, scratch);
if (!TestCase::checkResponse(
reader.template getRoot<typename TestCase::Response>(), expected)) {
throw std::logic_error("Incorrect response.");
......@@ -210,11 +277,14 @@ struct BenchmarkMethods {
static void asyncClientReceiver(
int inputFd, ProducerConsumerQueue<typename TestCase::Expectation>* expectations,
uint64_t iters) {
FdInputStream inputStream(inputFd);
typename Compression::BufferedInput bufferedInput(inputStream);
typename ReuseStrategy::ScratchSpace scratch;
for (; iters > 0; --iters) {
typename TestCase::Expectation expected = expectations->next();
typename ReuseStrategy::template MessageReader<Compression> reader(inputFd, scratch);
typename ReuseStrategy::template MessageReader<Compression> reader(bufferedInput, scratch);
if (!TestCase::checkResponse(
reader.template getRoot<typename TestCase::Response>(), expected)) {
throw std::logic_error("Incorrect response.");
......@@ -231,13 +301,17 @@ struct BenchmarkMethods {
}
static uint64_t server(int inputFd, int outputFd, uint64_t iters) {
FdInputStream inputStream(inputFd);
typename Compression::BufferedInput bufferedInput(inputStream);
CountingOutputStream output(outputFd);
typename ReuseStrategy::ScratchSpace builderScratch;
typename ReuseStrategy::ScratchSpace readerScratch;
for (; iters > 0; --iters) {
typename ReuseStrategy::MessageBuilder builder(builderScratch);
typename ReuseStrategy::template MessageReader<Compression> reader(inputFd, readerScratch);
typename ReuseStrategy::template MessageReader<Compression> reader(
bufferedInput, readerScratch);
TestCase::handleRequest(reader.template getRoot<typename TestCase::Request>(),
builder.template initRoot<typename TestCase::Response>());
Compression::write(output, builder);
......@@ -275,24 +349,36 @@ struct BenchmarkMethods {
static uint64_t passByBytes(uint64_t iters) {
uint64_t throughput = 0;
typename ReuseStrategy::ScratchSpace requestScratch;
typename ReuseStrategy::ScratchSpace responseScratch;
typename ReuseStrategy::ScratchSpace clientRequestScratch;
UseScratch::ScratchSpace requestBytesScratch;
typename ReuseStrategy::ScratchSpace serverRequestScratch;
typename ReuseStrategy::ScratchSpace serverResponseScratch;
UseScratch::ScratchSpace responseBytesScratch;
typename ReuseStrategy::ScratchSpace clientResponseScratch;
for (; iters > 0; --iters) {
typename ReuseStrategy::MessageBuilder requestBuilder(requestScratch);
typename ReuseStrategy::MessageBuilder requestBuilder(clientRequestScratch);
typename TestCase::Expectation expected = TestCase::setupRequest(
requestBuilder.template initRoot<typename TestCase::Request>());
Array<word> requestBytes = messageToFlatArray(requestBuilder);
throughput += requestBytes.size() * sizeof(word);
FlatArrayMessageReader requestReader(requestBytes.asPtr());
typename ReuseStrategy::MessageBuilder responseBuilder(responseScratch);
ArrayOutputStream requestOutput(arrayPtr(reinterpret_cast<byte*>(requestBytesScratch.words),
SCRATCH_SIZE * sizeof(word)));
Compression::write(requestOutput, requestBuilder);
throughput += requestOutput.getArray().size();
typename ReuseStrategy::template ArrayMessageReader<Compression> requestReader(
requestOutput.getArray(), serverRequestScratch);
typename ReuseStrategy::MessageBuilder responseBuilder(serverResponseScratch);
TestCase::handleRequest(requestReader.template getRoot<typename TestCase::Request>(),
responseBuilder.template initRoot<typename TestCase::Response>());
Array<word> responseBytes = messageToFlatArray(responseBuilder);
throughput += responseBytes.size() * sizeof(word);
FlatArrayMessageReader responseReader(responseBytes.asPtr());
ArrayOutputStream responseOutput(arrayPtr(reinterpret_cast<byte*>(responseBytesScratch.words),
SCRATCH_SIZE * sizeof(word)));
Compression::write(responseOutput, responseBuilder);
throughput += responseOutput.getArray().size();
typename ReuseStrategy::template ArrayMessageReader<Compression> responseReader(
responseOutput.getArray(), clientResponseScratch);
if (!TestCase::checkResponse(
responseReader.template getRoot<typename TestCase::Response>(), expected)) {
throw std::logic_error("Incorrect response.");
......@@ -304,8 +390,9 @@ struct BenchmarkMethods {
};
struct BenchmarkTypes {
typedef capnp::SnappyCompressed SnappyCompressed;
typedef capnp::Uncompressed Uncompressed;
typedef capnp::Packed Packed;
typedef capnp::SnappyCompressed SnappyCompressed;
typedef capnp::UseScratch ReusableResources;
typedef capnp::NoScratch SingleUseResources;
......
......@@ -251,6 +251,9 @@ uint64_t doBenchmark3(const std::string& mode, const std::string& reuse,
if (compression == "none") {
return doBenchmark2<BenchmarkTypes, TestCase, typename BenchmarkTypes::Uncompressed>(
mode, reuse, iters);
} else if (compression == "packed") {
return doBenchmark2<BenchmarkTypes, TestCase, typename BenchmarkTypes::Packed>(
mode, reuse, iters);
} else if (compression == "snappy") {
return doBenchmark2<BenchmarkTypes, TestCase, typename BenchmarkTypes::SnappyCompressed>(
mode, reuse, iters);
......
......@@ -147,8 +147,9 @@ struct BenchmarkMethods {
};
struct BenchmarkTypes {
typedef void SnappyCompressed;
typedef void Uncompressed;
typedef void Packed;
typedef void SnappyCompressed;
typedef void ReusableResources;
typedef void SingleUseResources;
......
......@@ -335,8 +335,9 @@ struct BenchmarkMethods {
};
struct BenchmarkTypes {
typedef protobuf::SnappyCompressed SnappyCompressed;
typedef protobuf::Uncompressed Uncompressed;
typedef protobuf::Uncompressed Packed;
typedef protobuf::SnappyCompressed SnappyCompressed;
typedef protobuf::ReusableMessages ReusableResources;
typedef protobuf::SingleUseMessages SingleUseResources;
......
......@@ -121,8 +121,9 @@ enum class Reuse {
};
enum class Compression {
SNAPPY,
NONE
NONE,
PACKED,
SNAPPY
};
TestResult runTest(Product product, TestCase testCase, Mode mode, Reuse reuse,
......@@ -174,12 +175,15 @@ TestResult runTest(Product product, TestCase testCase, Mode mode, Reuse reuse,
}
switch (compression) {
case Compression::SNAPPY:
argv[3] = strdup("snappy");
break;
case Compression::NONE:
argv[3] = strdup("none");
break;
case Compression::PACKED:
argv[3] = strdup("packed");
break;
case Compression::SNAPPY:
argv[3] = strdup("snappy");
break;
}
char itersStr[64];
......@@ -295,7 +299,7 @@ ostream& operator<<(ostream& os, Gain gain) {
}
void reportComparison(const char* name, double base, double protobuf, double capnproto,
double iters) {
uint64_t iters) {
cout << setw(35) << left << name
<< setw(14) << right << Gain(base, protobuf)
<< setw(14) << right << Gain(base, capnproto);
......@@ -305,7 +309,7 @@ void reportComparison(const char* name, double base, double protobuf, double cap
}
void reportComparison(const char* name, const char* unit, double protobuf, double capnproto,
double iters) {
uint64_t iters) {
cout << setw(35) << left << name
<< setw(15-strlen(unit)) << right << setprecision(2) << (protobuf / iters) << unit
<< setw(15-strlen(unit)) << right << setprecision(2) << (capnproto / iters) << unit;
......@@ -356,6 +360,8 @@ int main(int argc, char* argv[]) {
testCase = TestCase::CARSALES;
} else if (arg == "no-reuse") {
reuse = Reuse::NO;
} else if (arg == "packed") {
compression = Compression::PACKED;
} else if (arg == "snappy") {
compression = Compression::SNAPPY;
} else {
......@@ -417,12 +423,16 @@ int main(int argc, char* argv[]) {
break;
}
switch (compression) {
case Compression::SNAPPY:
cout << "* Snappy compression" << endl;
break;
case Compression::NONE:
cout << "* no compression" << endl;
break;
case Compression::PACKED:
cout << "* de-zero packing for Cap'n Proto" << endl;
cout << "* standard packing for Protobuf" << endl;
break;
case Compression::SNAPPY:
cout << "* Snappy compression" << endl;
break;
}
reportTableHeader();
......@@ -458,10 +468,12 @@ int main(int argc, char* argv[]) {
nullCase.throughput, protobufBase.throughput, capnpBase.throughput, iters);
reportComparison("object manipulation",
nullCase.time.cpu(), protobufBase.time.cpu(), capnpBase.time.cpu(), iters);
reportComparison("I/O", "us",
reportComparison("I/O time", "us",
((int64_t)protobuf.time.cpu() - (int64_t)protobufBase.time.cpu()) / 1000.0,
((int64_t)capnp.time.cpu() - (int64_t)capnpBase.time.cpu()) / 1000.0, iters);
reportComparison("bandwidth", "B", protobuf.throughput, capnp.throughput, iters);
reportComparison("binary size", "kB",
fileSize("protobuf-" + std::string(testCaseName(testCase))) / 1024.0,
fileSize("capnproto-" + std::string(testCaseName(testCase))) / 1024.0, 1);
......
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "io.h"
#include <errno.h>
#include <unistd.h>
#include <sys/uio.h>
#include <string>
#include <string.h>
namespace capnproto {
class PrematureEofException: public std::exception {
public:
PrematureEofException() {}
~PrematureEofException() noexcept {}
const char* what() const noexcept override {
return "Stream ended prematurely.";
}
};
InputStream::~InputStream() {}
OutputStream::~OutputStream() {}
BufferedInputStream::~BufferedInputStream() {}
BufferedOutputStream::~BufferedOutputStream() {}
void InputStream::skip(size_t bytes) {
char scratch[8192];
while (bytes > 0) {
size_t amount = std::min(bytes, sizeof(scratch));
read(scratch, amount);
bytes -= amount;
}
}
void OutputStream::write(ArrayPtr<const ArrayPtr<const byte>> pieces) {
for (auto piece: pieces) {
write(piece.begin(), piece.size());
}
}
// =======================================================================================
BufferedInputStreamWrapper::BufferedInputStreamWrapper(InputStream& inner, ArrayPtr<byte> buffer)
: inner(inner), ownedBuffer(buffer == nullptr ? newArray<byte>(8192) : nullptr),
buffer(buffer == nullptr ? ownedBuffer : buffer) {}
BufferedInputStreamWrapper::~BufferedInputStreamWrapper() {}
ArrayPtr<const byte> BufferedInputStreamWrapper::getReadBuffer() {
if (bufferAvailable.size() == 0) {
size_t n = inner.read(buffer.begin(), 1, buffer.size());
bufferAvailable = buffer.slice(0, n);
}
return bufferAvailable;
}
size_t BufferedInputStreamWrapper::read(void* dst, size_t minBytes, size_t maxBytes) {
if (minBytes <= bufferAvailable.size()) {
// Serve from current buffer.
size_t n = std::min(bufferAvailable.size(), maxBytes);
memcpy(dst, bufferAvailable.begin(), n);
bufferAvailable = bufferAvailable.slice(n, bufferAvailable.size());
return n;
} else {
// Copy current available into destination.
memcpy(dst, bufferAvailable.begin(), bufferAvailable.size());
size_t fromFirstBuffer = bufferAvailable.size();
dst = reinterpret_cast<byte*>(dst) + fromFirstBuffer;
minBytes -= fromFirstBuffer;
maxBytes -= fromFirstBuffer;
if (maxBytes <= buffer.size()) {
// Read the next buffer-full.
size_t n = inner.read(buffer.begin(), minBytes, buffer.size());
size_t fromSecondBuffer = std::min(n, maxBytes);
memcpy(dst, buffer.begin(), fromSecondBuffer);
bufferAvailable = buffer.slice(fromSecondBuffer, n);
return fromFirstBuffer + fromSecondBuffer;
} else {
// Forward large read to the underlying stream.
bufferAvailable = nullptr;
return fromFirstBuffer + inner.read(dst, minBytes, maxBytes);
}
}
}
void BufferedInputStreamWrapper::skip(size_t bytes) {
if (bytes <= bufferAvailable.size()) {
bufferAvailable = bufferAvailable.slice(bytes, bufferAvailable.size());
} else {
bytes -= bufferAvailable.size();
if (bytes <= buffer.size()) {
// Read the next buffer-full.
size_t n = inner.read(buffer.begin(), bytes, buffer.size());
bufferAvailable = buffer.slice(bytes, n);
} else {
// Forward large skip to the underlying stream.
bufferAvailable = nullptr;
inner.skip(bytes - bufferAvailable.size());
}
}
}
// -------------------------------------------------------------------
BufferedOutputStreamWrapper::BufferedOutputStreamWrapper(OutputStream& inner, ArrayPtr<byte> buffer)
: inner(inner),
ownedBuffer(buffer == nullptr ? newArray<byte>(8192) : nullptr),
buffer(buffer == nullptr ? ownedBuffer : buffer),
bufferPos(this->buffer.begin()) {}
BufferedOutputStreamWrapper::~BufferedOutputStreamWrapper() {
if (bufferPos > buffer.begin()) {
if (std::uncaught_exception()) {
try {
inner.write(buffer.begin(), bufferPos - buffer.begin());
} catch (...) {
// TODO: Report secondary faults.
}
} else {
flush();
}
}
}
void BufferedOutputStreamWrapper::flush() {
if (bufferPos > buffer.begin()) {
inner.write(buffer.begin(), bufferPos - buffer.begin());
bufferPos = buffer.begin();
}
}
ArrayPtr<byte> BufferedOutputStreamWrapper::getWriteBuffer() {
return arrayPtr(bufferPos, buffer.end());
}
void BufferedOutputStreamWrapper::write(const void* src, size_t size) {
if (src == bufferPos) {
// Oh goody, the caller wrote directly into our buffer.
bufferPos += size;
} else {
size_t available = buffer.end() - bufferPos;
if (size <= available) {
memcpy(bufferPos, src, size);
bufferPos += size;
} else if (size <= buffer.size()) {
// Too much for this buffer, but not a full buffer's worth, so we'll go ahead and copy.
memcpy(bufferPos, src, available);
inner.write(buffer.begin(), buffer.size());
size -= available;
src = reinterpret_cast<const byte*>(src) + available;
memcpy(buffer.begin(), src, size);
bufferPos = buffer.begin() + size;
} else {
// Writing so much data that we might as well write directly to avoid a copy.
inner.write(buffer.begin(), bufferPos - buffer.begin());
bufferPos = buffer.begin();
inner.write(src, size);
}
}
}
// =======================================================================================
ArrayInputStream::ArrayInputStream(ArrayPtr<const byte> array): array(array) {}
ArrayInputStream::~ArrayInputStream() {}
ArrayPtr<const byte> ArrayInputStream::getReadBuffer() {
return array;
}
size_t ArrayInputStream::read(void* dst, size_t minBytes, size_t maxBytes) {
size_t n = std::min(maxBytes, array.size());
if (n < minBytes) {
throw PrematureEofException();
}
memcpy(dst, array.begin(), n);
array = array.slice(n, array.size());
return n;
}
void ArrayInputStream::skip(size_t bytes) {
if (array.size() < bytes) {
throw PrematureEofException();
}
array = array.slice(bytes, array.size());
}
// -------------------------------------------------------------------
ArrayOutputStream::ArrayOutputStream(ArrayPtr<byte> array): array(array), fillPos(array.begin()) {}
ArrayOutputStream::~ArrayOutputStream() {}
ArrayPtr<byte> ArrayOutputStream::getWriteBuffer() {
return arrayPtr(fillPos, array.end());
}
void ArrayOutputStream::write(const void* src, size_t size) {
if (src == fillPos) {
// Oh goody, the caller wrote directly into our buffer.
fillPos += size;
} else {
CAPNPROTO_ASSERT(size <= (size_t)(array.end() - fillPos),
"ArrayOutputStream's array is not big enough.");
memcpy(fillPos, src, size);
fillPos += size;
}
}
// =======================================================================================
class OsException: public std::exception {
public:
OsException(const char* function, int error) {
char buffer[256];
message = function;
message += ": ";
message.append(strerror_r(error, buffer, sizeof(buffer)));
}
~OsException() noexcept {}
const char* what() const noexcept override {
return message.c_str();
}
private:
std::string message;
};
AutoCloseFd::~AutoCloseFd() {
if (fd >= 0 && close(fd) < 0) {
if (std::uncaught_exception()) {
// TODO: Devise some way to report secondary errors during unwind.
} else {
throw OsException("close", errno);
}
}
}
FdInputStream::~FdInputStream() {}
size_t FdInputStream::read(void* buffer, size_t minBytes, size_t maxBytes) {
byte* pos = reinterpret_cast<byte*>(buffer);
byte* min = pos + minBytes;
byte* max = pos + maxBytes;
while (pos < min) {
ssize_t n = ::read(fd, pos, max - pos);
if (n <= 0) {
if (n < 0) {
int error = errno;
if (error == EINTR) {
continue;
} else {
throw OsException("read", error);
}
} else if (n == 0) {
throw PrematureEofException();
}
return false;
}
pos += n;
}
return pos - reinterpret_cast<byte*>(buffer);
}
FdOutputStream::~FdOutputStream() {}
void FdOutputStream::write(const void* buffer, size_t size) {
const char* pos = reinterpret_cast<const char*>(buffer);
while (size > 0) {
ssize_t n = ::write(fd, pos, size);
if (n <= 0) {
CAPNPROTO_ASSERT(n < 0, "write() returned zero.");
int error = errno;
if (error == EINTR) {
continue;
} else {
throw OsException("write", error);
}
}
pos += n;
size -= n;
}
}
void FdOutputStream::write(ArrayPtr<const ArrayPtr<const byte>> pieces) {
CAPNPROTO_STACK_ARRAY(struct iovec, iov, pieces.size(), 128);
for (uint i = 0; i < pieces.size(); i++) {
// writev() interface is not const-correct. :(
iov[i].iov_base = const_cast<byte*>(pieces[i].begin());
iov[i].iov_len = pieces[i].size();
}
struct iovec* current = iov.begin();
// Make sure we don't do anything on an empty write.
while (current < iov.end() && current->iov_len == 0) {
++current;
}
while (current < iov.end()) {
ssize_t n = ::writev(fd, current, iov.end() - current);
if (n <= 0) {
if (n <= 0) {
CAPNPROTO_ASSERT(n < 0, "write() returned zero.");
throw OsException("writev", errno);
}
}
while (static_cast<size_t>(n) >= current->iov_len) {
n -= current->iov_len;
++current;
}
if (n > 0) {
current->iov_base = reinterpret_cast<byte*>(current->iov_base) + n;
current->iov_len -= n;
}
}
}
} // namespace capnproto
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#ifndef CAPNPROTO_IO_H_
#define CAPNPROTO_IO_H_
#include <cstddef>
#include "macros.h"
#include "type-safety.h"
#include <utility> // for std::forward; TODO: pulls in too much stuff just for std::forward.
namespace capnproto {
// =======================================================================================
// Abstract interfaces
class InputStream {
public:
virtual ~InputStream();
virtual size_t read(void* buffer, size_t minBytes, size_t maxBytes) = 0;
// Reads at least minBytes and at most maxBytes, copying them into the given buffer. Returns
// the size read. Throws an exception on errors.
//
// maxBytes is the number of bytes the caller really wants, but minBytes is the minimum amount
// needed by the caller before it can start doing useful processing. If the stream returns less
// than maxBytes, the caller will usually call read() again later to get the rest. Returning
// less than maxBytes is useful when it makes sense for the caller to parallelize processing
// with I/O.
//
// Cap'n Proto never asks for more bytes than it knows are part of the message. Therefore, if
// the InputStream happens to know that the stream will never reach maxBytes -- even if it has
// reached minBytes -- it should throw an exception to avoid wasting time processing an incomplete
// message. If it can't even reach minBytes, it MUST throw an exception, as the caller is not
// expected to understand how to deal with partial reads.
inline void read(void* buffer, size_t bytes) { read(buffer, bytes, bytes); }
// Convenience method for reading an exact number of bytes.
virtual void skip(size_t bytes);
// Skips past the given number of bytes, discarding them. The default implementation read()s
// into a scratch buffer.
};
class OutputStream {
public:
virtual ~OutputStream();
virtual void write(const void* buffer, size_t size) = 0;
// Always writes the full size. Throws exception on error.
virtual void write(ArrayPtr<const ArrayPtr<const byte>> pieces);
// Equivalent to write()ing each byte array in sequence, which is what the default implementation
// does. Override if you can do something better, e.g. use writev() to do the write in a single
// syscall.
};
class BufferedInputStream: public InputStream {
// An input stream which buffers some bytes in memory to reduce system call overhead.
// - OR -
// An input stream that actually reads from some in-memory data structure and wants to give its
// caller a direct pointer to that memory to potentially avoid a copy.
public:
virtual ~BufferedInputStream();
virtual ArrayPtr<const byte> getReadBuffer() = 0;
// Get a direct pointer into the read buffer, which contains the next bytes in the input. If the
// caller consumes any bytes, it should then call skip() to indicate this. This always returns a
// non-empty buffer unless at EOF.
};
class BufferedOutputStream: public OutputStream {
// An output stream which buffers some bytes in memory to reduce system call overhead.
// - OR -
// An output stream that actually writes into some in-memory data structure and wants to give its
// caller a direct pointer to that memory to potentially avoid a copy.
public:
virtual ~BufferedOutputStream();
virtual ArrayPtr<byte> getWriteBuffer() = 0;
// Get a direct pointer into the write buffer. The caller may choose to fill in some prefix of
// this buffer and then pass it to write(), in which case write() may avoid a copy. It is
// incorrect to pass to write any slice of this buffer which is not a prefix.
};
// =======================================================================================
// Buffered streams implemented as wrappers around regular streams
class BufferedInputStreamWrapper: public BufferedInputStream {
// Implements BufferedInputStream in terms of an InputStream.
//
// Note that the underlying stream's position is unpredictable once the wrapper is destroyed,
// unless the entire stream was consumed. To read a predictable number of bytes in a buffered
// way without going over, you'd need this wrapper to wrap some other wrapper which itself
// implements an artificial EOF at the desired point. Such a stream should be trivial to write
// but is not provided by the library at this time.
public:
explicit BufferedInputStreamWrapper(InputStream& inner, ArrayPtr<byte> buffer = nullptr);
// Creates a buffered stream wrapping the given non-buffered stream. No guarantee is made about
// the position of the inner stream after a buffered wrapper has been created unless the entire
// input is read.
//
// If the second parameter is non-null, the stream uses the given buffer instead of allocating
// its own. This may improve performance if the buffer can be reused.
CAPNPROTO_DISALLOW_COPY(BufferedInputStreamWrapper);
~BufferedInputStreamWrapper();
// implements BufferedInputStream ----------------------------------
ArrayPtr<const byte> getReadBuffer() override;
size_t read(void* buffer, size_t minBytes, size_t maxBytes) override;
void skip(size_t bytes) override;
private:
InputStream& inner;
Array<byte> ownedBuffer;
ArrayPtr<byte> buffer;
ArrayPtr<byte> bufferAvailable;
};
class BufferedOutputStreamWrapper: public BufferedOutputStream {
// Implements BufferedOutputStream in terms of an OutputStream. Note that writes to the
// underlying stream may be delayed until flush() is called or the wrapper is destroyed.
public:
explicit BufferedOutputStreamWrapper(OutputStream& inner, ArrayPtr<byte> buffer = nullptr);
// Creates a buffered stream wrapping the given non-buffered stream.
//
// If the second parameter is non-null, the stream uses the given buffer instead of allocating
// its own. This may improve performance if the buffer can be reused.
CAPNPROTO_DISALLOW_COPY(BufferedOutputStreamWrapper);
~BufferedOutputStreamWrapper();
void flush();
// Force the wrapper to write any remaining bytes in its buffer to the inner stream. Note that
// this only flushes this object's buffer; this object has no idea how to flush any other buffers
// that may be present in the underlying stream.
// implements BufferedOutputStream ---------------------------------
ArrayPtr<byte> getWriteBuffer() override;
void write(const void* buffer, size_t size) override;
private:
OutputStream& inner;
Array<byte> ownedBuffer;
ArrayPtr<byte> buffer;
byte* bufferPos;
};
// =======================================================================================
// Array I/O
class ArrayInputStream: public BufferedInputStream {
public:
explicit ArrayInputStream(ArrayPtr<const byte> array);
CAPNPROTO_DISALLOW_COPY(ArrayInputStream);
~ArrayInputStream();
// implements BufferedInputStream ----------------------------------
ArrayPtr<const byte> getReadBuffer() override;
size_t read(void* buffer, size_t minBytes, size_t maxBytes) override;
void skip(size_t bytes) override;
private:
ArrayPtr<const byte> array;
};
class ArrayOutputStream: public BufferedOutputStream {
public:
explicit ArrayOutputStream(ArrayPtr<byte> array);
CAPNPROTO_DISALLOW_COPY(ArrayOutputStream);
~ArrayOutputStream();
ArrayPtr<byte> getArray() {
// Get the portion of the array which has been filled in.
return arrayPtr(array.begin(), fillPos);
}
// implements BufferedInputStream ----------------------------------
ArrayPtr<byte> getWriteBuffer() override;
void write(const void* buffer, size_t size) override;
private:
ArrayPtr<byte> array;
byte* fillPos;
};
// =======================================================================================
// File descriptor I/O
class AutoCloseFd {
// A wrapper around a file descriptor which automatically closes the descriptor when destroyed.
// The wrapper supports move construction for transferring ownership of the descriptor. If
// close() returns an error, the destructor throws an exception, UNLESS the destructor is being
// called during unwind from another exception, in which case the close error is ignored.
//
// If your code is not exception-safe, you should not use AutoCloseFd. In this case you will
// have to call close() yourself and handle errors appropriately.
//
// TODO: Create a general helper library for reporting/detecting secondary exceptions that
// occurred during unwind of some primary exception.
public:
inline AutoCloseFd(): fd(-1) {}
inline AutoCloseFd(std::nullptr_t): fd(-1) {}
inline explicit AutoCloseFd(int fd): fd(fd) {}
inline AutoCloseFd(AutoCloseFd&& other): fd(other.fd) { other.fd = -1; }
CAPNPROTO_DISALLOW_COPY(AutoCloseFd);
~AutoCloseFd();
inline operator int() { return fd; }
inline int get() { return fd; }
inline bool operator==(std::nullptr_t) { return fd < 0; }
inline bool operator!=(std::nullptr_t) { return fd >= 0; }
private:
int fd;
};
class FdInputStream: public InputStream {
// An InputStream wrapping a file descriptor.
public:
explicit FdInputStream(int fd): fd(fd) {};
explicit FdInputStream(AutoCloseFd fd): fd(fd), autoclose(move(fd)) {}
CAPNPROTO_DISALLOW_COPY(FdInputStream);
~FdInputStream();
size_t read(void* buffer, size_t minBytes, size_t maxBytes) override;
private:
int fd;
AutoCloseFd autoclose;
};
class FdOutputStream: public OutputStream {
// An OutputStream wrapping a file descriptor.
public:
explicit FdOutputStream(int fd): fd(fd) {};
explicit FdOutputStream(AutoCloseFd fd): fd(fd), autoclose(move(fd)) {}
CAPNPROTO_DISALLOW_COPY(FdOutputStream);
~FdOutputStream();
void write(const void* buffer, size_t size) override;
void write(ArrayPtr<const ArrayPtr<const byte>> pieces) override;
private:
int fd;
AutoCloseFd autoclose;
};
} // namespace capnproto
#endif // CAPNPROTO_IO_H_
......@@ -60,6 +60,17 @@ void assertionFailure(const char* file, int line, const char* expectation, const
#define CAPNPROTO_DEBUG_ASSERT(condition, message) CAPNPROTO_ASSERT(condition, message)
#endif
// Allocate an array, preferably on the stack, unless it is too big. On GCC this will use
// variable-sized arrays. For other compilers we could just use a fixed-size array.
#define CAPNPROTO_STACK_ARRAY(type, name, size, maxStack) \
size_t name##_size = (size); \
bool name##_isOnStack = name##_size <= (maxStack); \
type name##_stack[name##_isOnStack ? size : 0]; \
::capnproto::Array<type> name##_heap = name##_isOnStack ? \
nullptr : newArray<type>(name##_size); \
::capnproto::ArrayPtr<type> name = name##_isOnStack ? \
arrayPtr(name##_stack, name##_size) : name##_heap
} // namespace internal
template <typename T, typename U>
......
......@@ -50,7 +50,7 @@ internal::StructReader MessageReader::getRoot(const word* defaultValue) {
internal::SegmentReader* segment = arena()->tryGetSegment(SegmentId(0));
if (segment == nullptr ||
!segment->containsInterval(segment->getStartPtr(), segment->getStartPtr() + 1)) {
segment->getArena()->reportInvalidData("Message did not contain a root pointer.");
arena()->reportInvalidData("Message did not contain a root pointer.");
return internal::StructReader::readRootTrusted(defaultValue, defaultValue);
} else {
return internal::StructReader::readRoot(
......
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "test.capnp.h"
#include "serialize-packed.h"
#include <gtest/gtest.h>
#include <string>
#include <stdlib.h>
#include "test-util.h"
namespace capnproto {
namespace internal {
namespace {
class TestPipe: public BufferedInputStream, public OutputStream {
public:
TestPipe()
: preferredReadSize(std::numeric_limits<size_t>::max()), readPos(0) {}
explicit TestPipe(size_t preferredReadSize)
: preferredReadSize(preferredReadSize), readPos(0) {}
~TestPipe() {}
const std::string& getData() { return data; }
void resetRead(size_t preferredReadSize = std::numeric_limits<size_t>::max()) {
readPos = 0;
this->preferredReadSize = preferredReadSize;
}
bool allRead() {
return readPos == data.size();
}
void clear(size_t preferredReadSize = std::numeric_limits<size_t>::max()) {
resetRead(preferredReadSize);
data.clear();
}
void write(const void* buffer, size_t size) override {
data.append(reinterpret_cast<const char*>(buffer), size);
}
size_t read(void* buffer, size_t minBytes, size_t maxBytes) override {
CAPNPROTO_ASSERT(maxBytes <= data.size() - readPos, "Overran end of stream.");
size_t amount = std::min(maxBytes, std::max(minBytes, preferredReadSize));
memcpy(buffer, data.data() + readPos, amount);
readPos += amount;
return amount;
}
void skip(size_t bytes) override {
CAPNPROTO_ASSERT(bytes <= data.size() - readPos, "Overran end of stream.");
readPos += bytes;
}
ArrayPtr<const byte> getReadBuffer() override {
size_t amount = std::min(data.size() - readPos, preferredReadSize);
return arrayPtr(reinterpret_cast<const byte*>(data.data() + readPos), amount);
}
private:
size_t preferredReadSize;
std::string data;
std::string::size_type readPos;
};
struct DisplayByteArray {
DisplayByteArray(const std::string& str)
: data(reinterpret_cast<const uint8_t*>(str.data())), size(str.size()) {}
DisplayByteArray(const std::initializer_list<uint8_t>& list)
: data(list.begin()), size(list.size()) {}
const uint8_t* data;
size_t size;
};
std::ostream& operator<<(std::ostream& os, const DisplayByteArray& bytes) {
os << "{ ";
for (size_t i = 0; i < bytes.size; i++) {
if (i > 0) {
os << ", ";
}
os << (uint)bytes.data[i];
}
os << " }";
return os;
}
void expectPacksTo(std::initializer_list<uint8_t> unpacked,
std::initializer_list<uint8_t> packed) {
TestPipe pipe;
// -----------------------------------------------------------------
// write
{
BufferedOutputStreamWrapper bufferedOut(pipe);
PackedOutputStream packedOut(bufferedOut);
packedOut.write(unpacked.begin(), unpacked.size());
}
if (pipe.getData() != std::string(reinterpret_cast<const char*>(packed.begin()), packed.size())) {
ADD_FAILURE()
<< "Tried to pack: " << DisplayByteArray(unpacked) << "\n"
<< "Expected: " << DisplayByteArray(packed) << "\n"
<< "Actual: " << DisplayByteArray(pipe.getData());
return;
}
// -----------------------------------------------------------------
// read
std::string roundTrip;
roundTrip.resize(unpacked.size());
{
PackedInputStream packedIn(pipe);
packedIn.InputStream::read(&*roundTrip.begin(), roundTrip.size());
EXPECT_TRUE(pipe.allRead());
}
if (roundTrip != std::string(reinterpret_cast<const char*>(unpacked.begin()), unpacked.size())) {
ADD_FAILURE()
<< "Tried to unpack: " << DisplayByteArray(packed) << "\n"
<< "Expected: " << DisplayByteArray(unpacked) << "\n"
<< "Actual: " << DisplayByteArray(roundTrip);
return;
}
for (uint blockSize = 1; blockSize < packed.size(); blockSize <<= 1) {
pipe.resetRead(blockSize);
{
PackedInputStream packedIn(pipe);
packedIn.InputStream::read(&*roundTrip.begin(), roundTrip.size());
EXPECT_TRUE(pipe.allRead());
}
if (roundTrip !=
std::string(reinterpret_cast<const char*>(unpacked.begin()), unpacked.size())) {
ADD_FAILURE()
<< "Tried to unpack: " << DisplayByteArray(packed) << "\n"
<< " Block size: " << blockSize << "\n"
<< "Expected: " << DisplayByteArray(unpacked) << "\n"
<< "Actual: " << DisplayByteArray(roundTrip);
}
}
// -----------------------------------------------------------------
// skip
pipe.resetRead();
{
PackedInputStream packedIn(pipe);
packedIn.skip(unpacked.size());
EXPECT_TRUE(pipe.allRead());
}
for (uint blockSize = 1; blockSize < packed.size(); blockSize <<= 1) {
pipe.resetRead(blockSize);
{
PackedInputStream packedIn(pipe);
packedIn.skip(unpacked.size());
EXPECT_TRUE(pipe.allRead());
}
}
pipe.clear();
// -----------------------------------------------------------------
// write / read multiple
{
BufferedOutputStreamWrapper bufferedOut(pipe);
PackedOutputStream packedOut(bufferedOut);
for (uint i = 0; i < 5; i++) {
packedOut.write(unpacked.begin(), unpacked.size());
}
}
for (uint i = 0; i < 5; i++) {
PackedInputStream packedIn(pipe);
packedIn.InputStream::read(&*roundTrip.begin(), roundTrip.size());
if (roundTrip !=
std::string(reinterpret_cast<const char*>(unpacked.begin()), unpacked.size())) {
ADD_FAILURE()
<< "Tried to unpack: " << DisplayByteArray(packed) << "\n"
<< " Index: " << i << "\n"
<< "Expected: " << DisplayByteArray(unpacked) << "\n"
<< "Actual: " << DisplayByteArray(roundTrip);
}
}
EXPECT_TRUE(pipe.allRead());
}
#ifdef __CDT_PARSER__
// CDT doesn't seem to understand these initializer lists.
#define expectPacksTo(...)
#endif
TEST(Packed, SimplePacking) {
expectPacksTo({}, {});
expectPacksTo({0,0,0,0,0,0,0,0}, {0,0});
expectPacksTo({0,0,12,0,0,34,0,0}, {0x24,12,34});
expectPacksTo({1,3,2,4,5,7,6,8}, {0xff,1,3,2,4,5,7,6,8,0});
expectPacksTo({0,0,0,0,0,0,0,0,1,3,2,4,5,7,6,8}, {0,0,0xff,1,3,2,4,5,7,6,8,0});
expectPacksTo({0,0,12,0,0,34,0,0,1,3,2,4,5,7,6,8}, {0x24,12,34,0xff,1,3,2,4,5,7,6,8,0});
expectPacksTo({1,3,2,4,5,7,6,8,8,6,7,4,5,2,3,1}, {0xff,1,3,2,4,5,7,6,8,1,8,6,7,4,5,2,3,1});
expectPacksTo(
{1,2,3,4,5,6,7,8, 1,2,3,4,5,6,7,8, 1,2,3,4,5,6,7,8, 1,2,3,4,5,6,7,8, 0,2,4,0,9,0,5,1},
{0xff,1,2,3,4,5,6,7,8, 3, 1,2,3,4,5,6,7,8, 1,2,3,4,5,6,7,8, 1,2,3,4,5,6,7,8, 0xd6,2,4,9,5,1});
expectPacksTo(
{1,2,3,4,5,6,7,8, 1,2,3,4,5,6,7,8, 6,2,4,3,9,0,5,1, 1,2,3,4,5,6,7,8, 0,2,4,0,9,0,5,1},
{0xff,1,2,3,4,5,6,7,8, 3, 1,2,3,4,5,6,7,8, 6,2,4,3,9,0,5,1, 1,2,3,4,5,6,7,8, 0xd6,2,4,9,5,1});
expectPacksTo(
{8,0,100,6,0,1,1,2, 0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0, 0,0,1,0,2,0,3,1},
{0xed,8,100,6,1,1,2, 0,2, 0xd4,1,2,3,1});
}
// =======================================================================================
class TestMessageBuilder: public MallocMessageBuilder {
// A MessageBuilder that tries to allocate an exact number of total segments, by allocating
// minimum-size segments until it reaches the number, then allocating one large segment to
// finish.
public:
explicit TestMessageBuilder(uint desiredSegmentCount)
: MallocMessageBuilder(0, AllocationStrategy::FIXED_SIZE),
desiredSegmentCount(desiredSegmentCount) {}
~TestMessageBuilder() {
EXPECT_EQ(0u, desiredSegmentCount);
}
ArrayPtr<word> allocateSegment(uint minimumSize) override {
if (desiredSegmentCount <= 1) {
if (desiredSegmentCount < 1) {
ADD_FAILURE() << "Allocated more segments than desired.";
} else {
--desiredSegmentCount;
}
return MallocMessageBuilder::allocateSegment(SUGGESTED_FIRST_SEGMENT_WORDS);
} else {
--desiredSegmentCount;
return MallocMessageBuilder::allocateSegment(minimumSize);
}
}
private:
uint desiredSegmentCount;
};
TEST(Packed, RoundTrip) {
TestMessageBuilder builder(1);
initTestMessage(builder.initRoot<TestAllTypes>());
TestPipe pipe;
writePackedMessage(pipe, builder);
PackedMessageReader reader(pipe);
checkTestMessage(reader.getRoot<TestAllTypes>());
}
TEST(Packed, RoundTripScratchSpace) {
TestMessageBuilder builder(1);
initTestMessage(builder.initRoot<TestAllTypes>());
TestPipe pipe;
writePackedMessage(pipe, builder);
word scratch[1024];
PackedMessageReader reader(pipe, ReaderOptions(), ArrayPtr<word>(scratch, 1024));
checkTestMessage(reader.getRoot<TestAllTypes>());
}
TEST(Packed, RoundTripLazy) {
TestMessageBuilder builder(1);
initTestMessage(builder.initRoot<TestAllTypes>());
TestPipe pipe(1);
writePackedMessage(pipe, builder);
PackedMessageReader reader(pipe);
checkTestMessage(reader.getRoot<TestAllTypes>());
}
TEST(Packed, RoundTripOddSegmentCount) {
TestMessageBuilder builder(7);
initTestMessage(builder.initRoot<TestAllTypes>());
TestPipe pipe;
writePackedMessage(pipe, builder);
PackedMessageReader reader(pipe);
checkTestMessage(reader.getRoot<TestAllTypes>());
}
TEST(Packed, RoundTripOddSegmentCountLazy) {
TestMessageBuilder builder(7);
initTestMessage(builder.initRoot<TestAllTypes>());
TestPipe pipe(1);
writePackedMessage(pipe, builder);
PackedMessageReader reader(pipe);
checkTestMessage(reader.getRoot<TestAllTypes>());
}
TEST(Packed, RoundTripEvenSegmentCount) {
TestMessageBuilder builder(10);
initTestMessage(builder.initRoot<TestAllTypes>());
TestPipe pipe;
writePackedMessage(pipe, builder);
PackedMessageReader reader(pipe);
checkTestMessage(reader.getRoot<TestAllTypes>());
}
TEST(Packed, RoundTripEvenSegmentCountLazy) {
TestMessageBuilder builder(10);
initTestMessage(builder.initRoot<TestAllTypes>());
TestPipe pipe(1);
writePackedMessage(pipe, builder);
PackedMessageReader reader(pipe);
checkTestMessage(reader.getRoot<TestAllTypes>());
}
TEST(Packed, RoundTripTwoMessages) {
TestMessageBuilder builder(1);
initTestMessage(builder.initRoot<TestAllTypes>());
TestMessageBuilder builder2(1);
builder2.initRoot<TestAllTypes>().setTextField("Second message.");
TestPipe pipe;
writePackedMessage(pipe, builder);
writePackedMessage(pipe, builder2);
{
PackedMessageReader reader(pipe);
checkTestMessage(reader.getRoot<TestAllTypes>());
}
{
PackedMessageReader reader(pipe);
EXPECT_EQ("Second message.", reader.getRoot<TestAllTypes>().getTextField());
}
}
// =======================================================================================
TEST(Packed, RoundTripAllZero) {
TestMessageBuilder builder(1);
builder.initRoot<TestAllTypes>();
TestPipe pipe;
writePackedMessage(pipe, builder);
PackedMessageReader reader(pipe);
checkTestMessageAllZero(reader.getRoot<TestAllTypes>());
// Most of the bytes are the segment table and root reference.
EXPECT_LE(pipe.getData().size(), 9u);
}
TEST(Packed, RoundTripAllZeroScratchSpace) {
TestMessageBuilder builder(1);
builder.initRoot<TestAllTypes>();
TestPipe pipe;
writePackedMessage(pipe, builder);
word scratch[1024];
PackedMessageReader reader(pipe, ReaderOptions(), ArrayPtr<word>(scratch, 1024));
checkTestMessageAllZero(reader.getRoot<TestAllTypes>());
}
TEST(Packed, RoundTripAllZeroLazy) {
TestMessageBuilder builder(1);
builder.initRoot<TestAllTypes>();
TestPipe pipe(1);
writePackedMessage(pipe, builder);
PackedMessageReader reader(pipe);
checkTestMessageAllZero(reader.getRoot<TestAllTypes>());
}
TEST(Packed, RoundTripAllZeroOddSegmentCount) {
TestMessageBuilder builder(3);
builder.initRoot<TestAllTypes>().initStructField().initStructField();
TestPipe pipe;
writePackedMessage(pipe, builder);
PackedMessageReader reader(pipe);
checkTestMessageAllZero(reader.getRoot<TestAllTypes>());
}
TEST(Packed, RoundTripAllZeroOddSegmentCountLazy) {
TestMessageBuilder builder(3);
builder.initRoot<TestAllTypes>().initStructField().initStructField();
TestPipe pipe(1);
writePackedMessage(pipe, builder);
PackedMessageReader reader(pipe);
checkTestMessageAllZero(reader.getRoot<TestAllTypes>());
}
TEST(Packed, RoundTripAllZeroEvenSegmentCount) {
TestMessageBuilder builder(2);
builder.initRoot<TestAllTypes>().initStructField().initStructField();
TestPipe pipe;
writePackedMessage(pipe, builder);
PackedMessageReader reader(pipe);
checkTestMessageAllZero(reader.getRoot<TestAllTypes>());
}
TEST(Packed, RoundTripAllZeroEvenSegmentCountLazy) {
TestMessageBuilder builder(2);
builder.initRoot<TestAllTypes>().initStructField().initStructField();
TestPipe pipe(1);
writePackedMessage(pipe, builder);
PackedMessageReader reader(pipe);
checkTestMessageAllZero(reader.getRoot<TestAllTypes>());
}
// =======================================================================================
TEST(Packed, RoundTripHugeString) {
TestMessageBuilder builder(1);
builder.initRoot<TestAllTypes>().setTextField(std::string(5023, 'x'));
TestPipe pipe;
writePackedMessage(pipe, builder);
PackedMessageReader reader(pipe);
EXPECT_TRUE(reader.getRoot<TestAllTypes>().getTextField() == std::string(5023, 'x'));
}
TEST(Packed, RoundTripHugeStringScratchSpace) {
TestMessageBuilder builder(1);
builder.initRoot<TestAllTypes>().setTextField(std::string(5023, 'x'));
TestPipe pipe;
writePackedMessage(pipe, builder);
word scratch[1024];
PackedMessageReader reader(pipe, ReaderOptions(), ArrayPtr<word>(scratch, 1024));
EXPECT_TRUE(reader.getRoot<TestAllTypes>().getTextField() == std::string(5023, 'x'));
}
TEST(Packed, RoundTripHugeStringLazy) {
TestMessageBuilder builder(1);
builder.initRoot<TestAllTypes>().setTextField(std::string(5023, 'x'));
TestPipe pipe(1);
writePackedMessage(pipe, builder);
PackedMessageReader reader(pipe);
EXPECT_TRUE(reader.getRoot<TestAllTypes>().getTextField() == std::string(5023, 'x'));
}
TEST(Packed, RoundTripHugeStringOddSegmentCount) {
TestMessageBuilder builder(3);
builder.initRoot<TestAllTypes>().setTextField(std::string(5023, 'x'));
TestPipe pipe;
writePackedMessage(pipe, builder);
PackedMessageReader reader(pipe);
EXPECT_TRUE(reader.getRoot<TestAllTypes>().getTextField() == std::string(5023, 'x'));
}
TEST(Packed, RoundTripHugeStringOddSegmentCountLazy) {
TestMessageBuilder builder(3);
builder.initRoot<TestAllTypes>().setTextField(std::string(5023, 'x'));
TestPipe pipe(1);
writePackedMessage(pipe, builder);
PackedMessageReader reader(pipe);
EXPECT_TRUE(reader.getRoot<TestAllTypes>().getTextField() == std::string(5023, 'x'));
}
TEST(Packed, RoundTripHugeStringEvenSegmentCount) {
TestMessageBuilder builder(2);
builder.initRoot<TestAllTypes>().setTextField(std::string(5023, 'x'));
TestPipe pipe;
writePackedMessage(pipe, builder);
PackedMessageReader reader(pipe);
EXPECT_TRUE(reader.getRoot<TestAllTypes>().getTextField() == std::string(5023, 'x'));
}
TEST(Packed, RoundTripHugeStringEvenSegmentCountLazy) {
TestMessageBuilder builder(2);
builder.initRoot<TestAllTypes>().setTextField(std::string(5023, 'x'));
TestPipe pipe(1);
writePackedMessage(pipe, builder);
PackedMessageReader reader(pipe);
EXPECT_TRUE(reader.getRoot<TestAllTypes>().getTextField() == std::string(5023, 'x'));
}
// TODO: Test error cases.
} // namespace
} // namespace internal
} // namespace capnproto
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "serialize-packed.h"
#include "wire-format.h"
#include <snappy/snappy.h>
#include <snappy/snappy-sinksource.h>
#include <vector>
namespace capnproto {
namespace internal {
PackedInputStream::PackedInputStream(BufferedInputStream& inner): inner(inner) {}
PackedInputStream::~PackedInputStream() {}
size_t PackedInputStream::read(void* dst, size_t minBytes, size_t maxBytes) {
if (maxBytes == 0) {
return 0;
}
CAPNPROTO_DEBUG_ASSERT(minBytes % sizeof(word) == 0,
"PackedInputStream reads must be word-aligned.");
CAPNPROTO_DEBUG_ASSERT(maxBytes % sizeof(word) == 0,
"PackedInputStream reads must be word-aligned.");
uint8_t* __restrict__ out = reinterpret_cast<uint8_t*>(dst);
uint8_t* const outEnd = reinterpret_cast<uint8_t*>(dst) + maxBytes;
uint8_t* const outMin = reinterpret_cast<uint8_t*>(dst) + minBytes;
ArrayPtr<const byte> buffer = inner.getReadBuffer();
CAPNPROTO_ASSERT(buffer.size() > 0, "Premature end of packed input.");
const uint8_t* __restrict__ in = reinterpret_cast<const uint8_t*>(buffer.begin());
#define REFRESH_BUFFER() \
inner.skip(buffer.size()); \
buffer = inner.getReadBuffer(); \
CAPNPROTO_ASSERT(buffer.size() > 0, "Premature end of packed input."); \
in = reinterpret_cast<const uint8_t*>(buffer.begin())
#define BUFFER_END (reinterpret_cast<const uint8_t*>(buffer.end()))
#define BUFFER_REMAINING ((size_t)(BUFFER_END - in))
for (;;) {
uint8_t tag;
CAPNPROTO_DEBUG_ASSERT((out - reinterpret_cast<uint8_t*>(dst)) % sizeof(word) == 0,
"Output pointer should always be aligned here.");
if (BUFFER_REMAINING < 10) {
if (out >= outMin) {
// We read at least the minimum amount, so go ahead and return.
inner.skip(in - reinterpret_cast<const uint8_t*>(buffer.begin()));
return out - reinterpret_cast<uint8_t*>(dst);
}
if (BUFFER_REMAINING == 0) {
REFRESH_BUFFER();
continue;
}
// We have at least 1, but not 10, bytes available. We need to read slowly, doing a bounds
// check on each byte.
tag = *in++;
for (uint i = 0; i < 8; i++) {
if (tag & (1u << i)) {
if (BUFFER_REMAINING == 0) {
REFRESH_BUFFER();
}
*out++ = *in++;
} else {
*out++ = 0;
}
}
if (BUFFER_REMAINING == 0 && (tag == 0 || tag == 0xffu)) {
REFRESH_BUFFER();
}
} else {
tag = *in++;
#define HANDLE_BYTE(n) \
{ \
bool isNonzero = (tag & (1u << n)) != 0; \
*out++ = *in & (-(int8_t)isNonzero); \
in += isNonzero; \
}
HANDLE_BYTE(0);
HANDLE_BYTE(1);
HANDLE_BYTE(2);
HANDLE_BYTE(3);
HANDLE_BYTE(4);
HANDLE_BYTE(5);
HANDLE_BYTE(6);
HANDLE_BYTE(7);
#undef HANDLE_BYTE
}
if (tag == 0) {
CAPNPROTO_DEBUG_ASSERT(BUFFER_REMAINING > 0,
"Bug in this function: Should always have non-empty buffer here.");
uint runLength = *in++ * sizeof(word);
CAPNPROTO_ASSERT(runLength <= outEnd - out,
"Packed input did not end cleanly on a segment boundary.");
memset(out, 0, runLength);
out += runLength;
} else if (tag == 0xffu) {
CAPNPROTO_DEBUG_ASSERT(BUFFER_REMAINING > 0,
"Bug in this function: Should always have non-empty buffer here.");
uint runLength = *in++ * sizeof(word);
CAPNPROTO_ASSERT(runLength <= outEnd - out,
"Packed input did not end cleanly on a segment boundary.");
uint inRemaining = BUFFER_REMAINING;
if (inRemaining >= runLength) {
// Fast path.
memcpy(out, in, runLength);
out += runLength;
in += runLength;
} else {
// Copy over the first buffer, then do one big read for the rest.
memcpy(out, in, inRemaining);
out += inRemaining;
runLength -= inRemaining;
inner.skip(buffer.size());
inner.read(out, runLength);
out += runLength;
if (out == outEnd) {
return maxBytes;
} else {
buffer = inner.getReadBuffer();
in = reinterpret_cast<const uint8_t*>(buffer.begin());
// Skip the bounds check below since we just did the same check above.
continue;
}
}
}
if (out == outEnd) {
inner.skip(in - reinterpret_cast<const uint8_t*>(buffer.begin()));
return maxBytes;
}
}
CAPNPROTO_ASSERT(false, "Can't get here.");
return 0;
}
void PackedInputStream::skip(size_t bytes) {
// We can't just read into buffers because buffers must end on block boundaries.
if (bytes == 0) {
return;
}
CAPNPROTO_DEBUG_ASSERT(bytes % sizeof(word) == 0,
"PackedInputStream reads must be word-aligned.");
ArrayPtr<const byte> buffer = inner.getReadBuffer();
const uint8_t* __restrict__ in = reinterpret_cast<const uint8_t*>(buffer.begin());
for (;;) {
uint8_t tag;
if (BUFFER_REMAINING < 10) {
if (BUFFER_REMAINING == 0) {
REFRESH_BUFFER();
continue;
}
// We have at least 1, but not 10, bytes available. We need to read slowly, doing a bounds
// check on each byte.
tag = *in++;
for (uint i = 0; i < 8; i++) {
if (tag & (1u << i)) {
if (BUFFER_REMAINING == 0) {
REFRESH_BUFFER();
}
in++;
}
}
bytes -= 8;
if (BUFFER_REMAINING == 0 && (tag == 0 || tag == 0xffu)) {
REFRESH_BUFFER();
}
} else {
tag = *in++;
#define HANDLE_BYTE(n) \
in += (tag & (1u << n)) != 0
HANDLE_BYTE(0);
HANDLE_BYTE(1);
HANDLE_BYTE(2);
HANDLE_BYTE(3);
HANDLE_BYTE(4);
HANDLE_BYTE(5);
HANDLE_BYTE(6);
HANDLE_BYTE(7);
#undef HANDLE_BYTE
bytes -= 8;
}
if (tag == 0) {
CAPNPROTO_DEBUG_ASSERT(BUFFER_REMAINING > 0,
"Bug in this function: Should always have non-empty buffer here.");
uint runLength = *in++ * sizeof(word);
CAPNPROTO_ASSERT(runLength <= bytes,
"Packed input did not end cleanly on a segment boundary.");
bytes -= runLength;
} else if (tag == 0xffu) {
CAPNPROTO_DEBUG_ASSERT(BUFFER_REMAINING > 0,
"Bug in this function: Should always have non-empty buffer here.");
uint runLength = *in++ * sizeof(word);
CAPNPROTO_ASSERT(runLength <= bytes,
"Packed input did not end cleanly on a segment boundary.");
bytes -= runLength;
uint inRemaining = BUFFER_REMAINING;
if (inRemaining > runLength) {
// Fast path.
in += runLength;
} else {
// Forward skip to the underlying stream.
runLength -= inRemaining;
inner.skip(buffer.size() + runLength);
if (bytes == 0) {
return;
} else {
buffer = inner.getReadBuffer();
in = reinterpret_cast<const uint8_t*>(buffer.begin());
// Skip the bounds check below since we just did the same check above.
continue;
}
}
}
if (bytes == 0) {
inner.skip(in - reinterpret_cast<const uint8_t*>(buffer.begin()));
return;
}
}
CAPNPROTO_ASSERT(false, "Can't get here.");
}
// -------------------------------------------------------------------
PackedOutputStream::PackedOutputStream(BufferedOutputStream& inner)
: inner(inner) {}
PackedOutputStream::~PackedOutputStream() {}
void PackedOutputStream::write(const void* src, size_t size) {
ArrayPtr<byte> buffer = inner.getWriteBuffer();
byte slowBuffer[20];
uint8_t* __restrict__ out = reinterpret_cast<uint8_t*>(buffer.begin());
const uint8_t* __restrict__ in = reinterpret_cast<const uint8_t*>(src);
const uint8_t* const inEnd = reinterpret_cast<const uint8_t*>(src) + size;
while (in < inEnd) {
if (reinterpret_cast<uint8_t*>(buffer.end()) - out < 10) {
// Oops, we're out of space. We need at least 10 bytes for the fast path, since we don't
// bounds-check on every byte.
// Write what we have so far.
inner.write(buffer.begin(), out - reinterpret_cast<uint8_t*>(buffer.begin()));
// Use a slow buffer into which we'll encode 10 to 20 bytes. This should get us past the
// output stream's buffer boundary.
buffer = arrayPtr(slowBuffer, sizeof(slowBuffer));
out = reinterpret_cast<uint8_t*>(buffer.begin());
}
uint8_t* tagPos = out++;
#define HANDLE_BYTE(n) \
uint8_t bit##n = *in != 0; \
*out = *in; \
out += bit##n; /* out only advances if the byte was non-zero */ \
++in
HANDLE_BYTE(0);
HANDLE_BYTE(1);
HANDLE_BYTE(2);
HANDLE_BYTE(3);
HANDLE_BYTE(4);
HANDLE_BYTE(5);
HANDLE_BYTE(6);
HANDLE_BYTE(7);
#undef HANDLE_BYTE
uint8_t tag = (bit0 << 0) | (bit1 << 1) | (bit2 << 2) | (bit3 << 3)
| (bit4 << 4) | (bit5 << 5) | (bit6 << 6) | (bit7 << 7);
*tagPos = tag;
if (tag == 0) {
// An all-zero word is followed by a count of consecutive zero words (not including the
// first one).
// We can check a whole word at a time.
const uint64_t* inWord = reinterpret_cast<const uint64_t*>(in);
// The count must fit it 1 byte, so limit to 255 words.
const uint64_t* limit = reinterpret_cast<const uint64_t*>(inEnd);
if (limit - inWord > 255) {
limit = inWord + 255;
}
while (inWord < limit && *inWord == 0) {
++inWord;
}
// Write the count.
*out++ = inWord - reinterpret_cast<const uint64_t*>(in);
// Advance input.
in = reinterpret_cast<const uint8_t*>(inWord);
} else if (tag == 0xffu) {
// An all-nonzero word is followed by a count of consecutive uncompressed words, followed
// by the uncompressed words themselves.
// Count the number of consecutive words in the input which have no more than a single
// zero-byte. We look for at least two zeros because that's the point where our compression
// scheme becomes a net win.
// TODO: Maybe look for three zeros? Compressing a two-zero word is a loss if the
// following word has no zeros.
const uint8_t* runStart = in;
const uint8_t* limit = inEnd;
if ((size_t)(limit - in) > 255 * sizeof(word)) {
limit = in + 255 * sizeof(word);
}
while (in < limit) {
// Check eight input bytes for zeros.
uint c = *in++ == 0;
c += *in++ == 0;
c += *in++ == 0;
c += *in++ == 0;
c += *in++ == 0;
c += *in++ == 0;
c += *in++ == 0;
c += *in++ == 0;
if (c >= 2) {
// Un-read the word with multiple zeros, since we'll want to compress that one.
in -= 8;
break;
}
}
// Write the count.
uint count = in - runStart;
*out++ = count / sizeof(word);
if (count <= reinterpret_cast<uint8_t*>(buffer.end()) - out) {
// There's enough space to memcpy.
memcpy(out, runStart, count);
out += count;
} else {
// Input overruns the output buffer. We'll give it to the output stream in one chunk
// and let it decide what to do.
inner.write(buffer.begin(), reinterpret_cast<byte*>(out) - buffer.begin());
inner.write(runStart, in - runStart);
buffer = inner.getWriteBuffer();
out = reinterpret_cast<uint8_t*>(buffer.begin());
}
}
}
// Write whatever is left.
inner.write(buffer.begin(), reinterpret_cast<byte*>(out) - buffer.begin());
}
} // namespace internal
// =======================================================================================
PackedMessageReader::PackedMessageReader(
BufferedInputStream& inputStream, ReaderOptions options, ArrayPtr<word> scratchSpace)
: PackedInputStream(inputStream),
InputStreamMessageReader(static_cast<PackedInputStream&>(*this), options, scratchSpace) {}
PackedMessageReader::~PackedMessageReader() {}
PackedFdMessageReader::PackedFdMessageReader(
int fd, ReaderOptions options, ArrayPtr<word> scratchSpace)
: FdInputStream(fd),
BufferedInputStreamWrapper(static_cast<FdInputStream&>(*this)),
PackedMessageReader(static_cast<BufferedInputStreamWrapper&>(*this),
options, scratchSpace) {}
PackedFdMessageReader::PackedFdMessageReader(
AutoCloseFd fd, ReaderOptions options, ArrayPtr<word> scratchSpace)
: FdInputStream(move(fd)),
BufferedInputStreamWrapper(static_cast<FdInputStream&>(*this)),
PackedMessageReader(static_cast<BufferedInputStreamWrapper&>(*this),
options, scratchSpace) {}
PackedFdMessageReader::~PackedFdMessageReader() {}
void writePackedMessage(BufferedOutputStream& output,
ArrayPtr<const ArrayPtr<const word>> segments) {
internal::PackedOutputStream packedOutput(output);
writeMessage(packedOutput, segments);
}
void writePackedMessage(OutputStream& output, ArrayPtr<const ArrayPtr<const word>> segments) {
if (BufferedOutputStream* bufferedOutputPtr = dynamic_cast<BufferedOutputStream*>(&output)) {
writePackedMessage(*bufferedOutputPtr, segments);
} else {
byte buffer[8192];
BufferedOutputStreamWrapper bufferedOutput(output, arrayPtr(buffer, sizeof(buffer)));
writePackedMessage(bufferedOutput, segments);
}
}
void writePackedMessageToFd(int fd, ArrayPtr<const ArrayPtr<const word>> segments) {
FdOutputStream output(fd);
writePackedMessage(output, segments);
}
} // namespace capnproto
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#ifndef CAPNPROTO_SERIALIZE_PACKED_H_
#define CAPNPROTO_SERIALIZE_PACKED_H_
#include "serialize.h"
namespace capnproto {
namespace internal {
class PackedInputStream: public InputStream {
// An input stream that unpacks packed data with a picky constraint: The caller must read data
// in the exact same size and sequence as the data was written to PackedOutputStream.
public:
explicit PackedInputStream(BufferedInputStream& inner);
CAPNPROTO_DISALLOW_COPY(PackedInputStream);
~PackedInputStream();
// implements InputStream ------------------------------------------
size_t read(void* buffer, size_t minBytes, size_t maxBytes) override;
void skip(size_t bytes) override;
private:
BufferedInputStream& inner;
};
class PackedOutputStream: public OutputStream {
public:
explicit PackedOutputStream(BufferedOutputStream& inner);
CAPNPROTO_DISALLOW_COPY(PackedOutputStream);
~PackedOutputStream();
// implements OutputStream -----------------------------------------
void write(const void* buffer, size_t bytes) override;
private:
BufferedOutputStream& inner;
};
} // namespace internal
class PackedMessageReader: private internal::PackedInputStream, public InputStreamMessageReader {
public:
PackedMessageReader(BufferedInputStream& inputStream, ReaderOptions options = ReaderOptions(),
ArrayPtr<word> scratchSpace = nullptr);
CAPNPROTO_DISALLOW_COPY(PackedMessageReader);
~PackedMessageReader();
};
class PackedFdMessageReader: private FdInputStream, private BufferedInputStreamWrapper,
public PackedMessageReader {
public:
PackedFdMessageReader(int fd, ReaderOptions options = ReaderOptions(),
ArrayPtr<word> scratchSpace = nullptr);
// Read message from a file descriptor, without taking ownership of the descriptor.
// Note that if you want to reuse the descriptor after the reader is destroyed, you'll need to
// seek it, since otherwise the position is unspecified.
PackedFdMessageReader(AutoCloseFd fd, ReaderOptions options = ReaderOptions(),
ArrayPtr<word> scratchSpace = nullptr);
// Read a message from a file descriptor, taking ownership of the descriptor.
CAPNPROTO_DISALLOW_COPY(PackedFdMessageReader);
~PackedFdMessageReader();
};
void writePackedMessage(BufferedOutputStream& output, MessageBuilder& builder);
void writePackedMessage(BufferedOutputStream& output,
ArrayPtr<const ArrayPtr<const word>> segments);
// Write a packed message to a buffered output stream.
void writePackedMessage(OutputStream& output, MessageBuilder& builder);
void writePackedMessage(OutputStream& output, ArrayPtr<const ArrayPtr<const word>> segments);
// Write a packed message to an unbuffered output stream. If you intend to write multiple messages
// in succession, consider wrapping your output in a buffered stream in order to reduce system
// call overhead.
void writePackedMessageToFd(int fd, MessageBuilder& builder);
void writePackedMessageToFd(int fd, ArrayPtr<const ArrayPtr<const word>> segments);
// Write a single packed message to the file descriptor.
// =======================================================================================
// inline stuff
inline void writePackedMessage(BufferedOutputStream& output, MessageBuilder& builder) {
writePackedMessage(output, builder.getSegmentsForOutput());
}
inline void writePackedMessage(OutputStream& output, MessageBuilder& builder) {
writePackedMessage(output, builder.getSegmentsForOutput());
}
inline void writePackedMessageToFd(int fd, MessageBuilder& builder) {
writePackedMessageToFd(fd, builder.getSegmentsForOutput());
}
} // namespace capnproto
#endif // CAPNPROTO_SERIALIZE_PACKED_H_
......@@ -63,106 +63,166 @@ private:
uint desiredSegmentCount;
};
class TestPipe: public InputStream, public OutputStream {
class TestPipe: public BufferedInputStream, public OutputStream {
public:
TestPipe(bool lazy)
: lazy(lazy), readPos(0) {}
TestPipe()
: preferredReadSize(std::numeric_limits<size_t>::max()), readPos(0) {}
explicit TestPipe(size_t preferredReadSize)
: preferredReadSize(preferredReadSize), readPos(0) {}
~TestPipe() {}
const std::string& getData() { return data; }
std::string getUnreadData() { return data.substr(readPos); }
std::string::size_type getReadPos() { return readPos; }
void resetRead(size_t preferredReadSize = std::numeric_limits<size_t>::max()) {
readPos = 0;
this->preferredReadSize = preferredReadSize;
}
bool allRead() {
return readPos == data.size();
}
void clear(size_t preferredReadSize = std::numeric_limits<size_t>::max()) {
resetRead(preferredReadSize);
data.clear();
}
void write(const void* buffer, size_t size) override {
data.append(reinterpret_cast<const char*>(buffer), size);
}
size_t read(void* buffer, size_t minBytes, size_t maxBytes) override {
CAPNPROTO_ASSERT(maxBytes <= data.size() - readPos, "Overran end of stream.");
size_t amount = lazy ? minBytes : maxBytes;
size_t amount = std::min(maxBytes, std::max(minBytes, preferredReadSize));
memcpy(buffer, data.data() + readPos, amount);
readPos += amount;
return amount;
}
void skip(size_t bytes) override {
CAPNPROTO_ASSERT(bytes <= data.size() - readPos, "Overran end of stream.");
readPos += bytes;
}
ArrayPtr<const byte> getReadBuffer() override {
size_t amount = std::min(data.size() - readPos, preferredReadSize);
return arrayPtr(reinterpret_cast<const byte*>(data.data() + readPos), amount);
}
private:
bool lazy;
size_t preferredReadSize;
std::string data;
std::string::size_type readPos;
};
struct DisplayByteArray {
DisplayByteArray(const std::string& str)
: data(reinterpret_cast<const uint8_t*>(str.data())), size(str.size()) {}
DisplayByteArray(const std::initializer_list<uint8_t>& list)
: data(list.begin()), size(list.size()) {}
const uint8_t* data;
size_t size;
};
std::ostream& operator<<(std::ostream& os, const DisplayByteArray& bytes) {
os << "{ ";
for (size_t i = 0; i < bytes.size; i++) {
if (i > 0) {
os << ", ";
}
os << (uint)bytes.data[i];
}
os << " }";
return os;
}
TEST(Snappy, RoundTrip) {
TestMessageBuilder builder(1);
initTestMessage(builder.initRoot<TestAllTypes>());
TestPipe pipe(false);
writeSnappyMessage(pipe, builder);
TestPipe pipe;
writeSnappyPackedMessage(pipe, builder);
SnappyMessageReader reader(pipe);
SnappyPackedMessageReader reader(pipe);
checkTestMessage(reader.getRoot<TestAllTypes>());
EXPECT_TRUE(pipe.allRead());
}
TEST(Snappy, RoundTripScratchSpace) {
TestMessageBuilder builder(1);
initTestMessage(builder.initRoot<TestAllTypes>());
TestPipe pipe(false);
writeSnappyMessage(pipe, builder);
TestPipe pipe;
writeSnappyPackedMessage(pipe, builder);
word scratch[1024];
SnappyMessageReader reader(pipe, ReaderOptions(), ArrayPtr<word>(scratch, 1024));
SnappyPackedMessageReader reader(pipe, ReaderOptions(), ArrayPtr<word>(scratch, 1024));
checkTestMessage(reader.getRoot<TestAllTypes>());
EXPECT_TRUE(pipe.allRead());
}
TEST(Snappy, RoundTripLazy) {
TestMessageBuilder builder(1);
initTestMessage(builder.initRoot<TestAllTypes>());
TestPipe pipe(true);
writeSnappyMessage(pipe, builder);
TestPipe pipe(1);
writeSnappyPackedMessage(pipe, builder);
SnappyMessageReader reader(pipe);
SnappyPackedMessageReader reader(pipe);
checkTestMessage(reader.getRoot<TestAllTypes>());
EXPECT_TRUE(pipe.allRead());
}
TEST(Snappy, RoundTripOddSegmentCount) {
TestMessageBuilder builder(7);
initTestMessage(builder.initRoot<TestAllTypes>());
TestPipe pipe(false);
writeSnappyMessage(pipe, builder);
TestPipe pipe;
writeSnappyPackedMessage(pipe, builder);
SnappyMessageReader reader(pipe);
SnappyPackedMessageReader reader(pipe);
checkTestMessage(reader.getRoot<TestAllTypes>());
EXPECT_TRUE(pipe.allRead());
}
TEST(Snappy, RoundTripOddSegmentCountLazy) {
TestMessageBuilder builder(7);
initTestMessage(builder.initRoot<TestAllTypes>());
TestPipe pipe(true);
writeSnappyMessage(pipe, builder);
TestPipe pipe(1);
writeSnappyPackedMessage(pipe, builder);
SnappyMessageReader reader(pipe);
SnappyPackedMessageReader reader(pipe);
checkTestMessage(reader.getRoot<TestAllTypes>());
EXPECT_TRUE(pipe.allRead());
}
TEST(Snappy, RoundTripEvenSegmentCount) {
TestMessageBuilder builder(10);
initTestMessage(builder.initRoot<TestAllTypes>());
TestPipe pipe(false);
writeSnappyMessage(pipe, builder);
TestPipe pipe;
writeSnappyPackedMessage(pipe, builder);
SnappyMessageReader reader(pipe);
SnappyPackedMessageReader reader(pipe);
checkTestMessage(reader.getRoot<TestAllTypes>());
EXPECT_TRUE(pipe.allRead());
}
TEST(Snappy, RoundTripEvenSegmentCountLazy) {
TestMessageBuilder builder(10);
initTestMessage(builder.initRoot<TestAllTypes>());
TestPipe pipe(true);
writeSnappyMessage(pipe, builder);
TestPipe pipe(1);
writeSnappyPackedMessage(pipe, builder);
SnappyMessageReader reader(pipe);
SnappyPackedMessageReader reader(pipe);
checkTestMessage(reader.getRoot<TestAllTypes>());
EXPECT_TRUE(pipe.allRead());
}
TEST(Snappy, RoundTripTwoMessages) {
......@@ -172,19 +232,23 @@ TEST(Snappy, RoundTripTwoMessages) {
TestMessageBuilder builder2(1);
builder2.initRoot<TestAllTypes>().setTextField("Second message.");
TestPipe pipe(false);
writeSnappyMessage(pipe, builder);
writeSnappyMessage(pipe, builder2);
TestPipe pipe(1);
writeSnappyPackedMessage(pipe, builder);
size_t firstSize = pipe.getData().size();
writeSnappyPackedMessage(pipe, builder2);
{
SnappyMessageReader reader(pipe);
SnappyPackedMessageReader reader(pipe);
checkTestMessage(reader.getRoot<TestAllTypes>());
}
EXPECT_EQ(firstSize, pipe.getReadPos());
{
SnappyMessageReader reader(pipe);
SnappyPackedMessageReader reader(pipe);
EXPECT_EQ("Second message.", reader.getRoot<TestAllTypes>().getTextField());
}
EXPECT_TRUE(pipe.allRead());
}
// TODO: Test error cases.
......
......@@ -26,280 +26,182 @@
#include <snappy/snappy.h>
#include <snappy/snappy-sinksource.h>
#include <vector>
#include <iostream>
namespace capnproto {
namespace {
class InputStreamSource: public snappy::Source {
class SnappyInputStream::InputStreamSnappySource: public snappy::Source {
public:
inline InputStreamSource(InputStream& inputStream, size_t available)
: inputStream(inputStream), available(available), pos(nullptr), end(nullptr) {
// Read at least 10 bytes (or all available bytes if less than 10), and at most the whole buffer
// (unless less is available).
size_t firstSize = inputStream.read(buffer,
std::min<size_t>(available, 10),
std::min<size_t>(available, sizeof(buffer)));
CAPNPROTO_ASSERT(snappy::GetUncompressedLength(buffer, firstSize, &uncompressedSize),
"Invalid snappy-compressed data.");
pos = buffer;
end = pos + firstSize;
}
inline ~InputStreamSource() {};
inline size_t getUncompressedSize() { return uncompressedSize; }
inline InputStreamSnappySource(BufferedInputStream& inputStream)
: inputStream(inputStream) {}
inline ~InputStreamSnappySource() {};
// implements snappy::Source ---------------------------------------
size_t Available() const override {
return available;
CAPNPROTO_ASSERT(false, "Snappy doesn't actually call this.");
return 0;
}
const char* Peek(size_t* len) override {
*len = end - pos;
return pos;
ArrayPtr<const byte> buffer = inputStream.getReadBuffer();
*len = buffer.size();
return reinterpret_cast<const char*>(buffer.begin());
}
void Skip(size_t n) override {
pos += n;
available -= n;
if (pos == end && available > 0) {
// Read more from the input.
pos = buffer;
end = pos + inputStream.read(buffer, 1, std::min<size_t>(available, sizeof(buffer)));
}
inputStream.skip(n);
}
private:
InputStream& inputStream;
size_t available;
size_t uncompressedSize;
char* pos;
char* end;
char buffer[8192];
BufferedInputStream& inputStream;
};
} // namespcae
SnappyMessageReader::SnappyMessageReader(
InputStream& inputStream, ReaderOptions options, ArrayPtr<word> scratchSpace)
: MessageReader(options), inputStream(inputStream) {
internal::WireValue<uint32_t> wireCompressedSize;
inputStream.read(&wireCompressedSize, sizeof(wireCompressedSize));
size_t compressedSize = wireCompressedSize.get();
InputStreamSource source(inputStream, compressedSize);
CAPNPROTO_ASSERT(source.getUncompressedSize() % sizeof(word) == 0,
"Uncompressed size was not a whole number of words.");
size_t uncompressedWords = source.getUncompressedSize() / sizeof(word);
if (scratchSpace.size() < uncompressedWords) {
space = newArray<word>(uncompressedWords);
scratchSpace = space;
SnappyInputStream::SnappyInputStream(BufferedInputStream& inner, ArrayPtr<byte> buffer)
: inner(inner) {
if (buffer.size() < SNAPPY_BUFFER_SIZE) {
ownedBuffer = newArray<byte>(SNAPPY_BUFFER_SIZE);
buffer = ownedBuffer;
}
CAPNPROTO_ASSERT(
snappy::RawUncompress(&source, reinterpret_cast<char*>(scratchSpace.begin())),
"Snappy decompression failed.");
new(&underlyingReader) FlatArrayMessageReader(scratchSpace, options);
}
SnappyMessageReader::~SnappyMessageReader() {
underlyingReader.~FlatArrayMessageReader();
this->buffer = buffer;
}
ArrayPtr<const word> SnappyMessageReader::getSegment(uint id) {
return underlyingReader.getSegment(id);
}
SnappyFdMessageReader::~SnappyFdMessageReader() {}
// =======================================================================================
namespace {
SnappyInputStream::~SnappyInputStream() {}
class SegmentArraySource: public snappy::Source {
public:
SegmentArraySource(ArrayPtr<const ArrayPtr<const byte>> pieces)
: pieces(pieces), offset(0), available(0) {
for (auto& piece: pieces) {
available += piece.size();
}
Skip(0); // Skip leading zero-sized pieces, if any.
}
~SegmentArraySource() {}
// implements snappy::Source ---------------------------------------
size_t Available() const override {
return available;
ArrayPtr<const byte> SnappyInputStream::getReadBuffer() {
if (bufferAvailable.size() == 0) {
refill();
}
const char* Peek(size_t* len) override {
if (pieces.size() == 0) {
*len = 0;
return nullptr;
} else {
*len = pieces[0].size() - offset;
return reinterpret_cast<const char*>(pieces[0].begin()) + offset;
}
}
void Skip(size_t n) override {
available -= n;
while (pieces.size() > 0 && n >= pieces[0].size() - offset) {
n -= pieces[0].size() - offset;
offset = 0;
pieces = pieces.slice(1, pieces.size());
}
offset += n;
}
private:
ArrayPtr<const ArrayPtr<const byte>> pieces;
size_t offset;
size_t available;
};
class AccumulatingSink: public snappy::Sink {
public:
AccumulatingSink()
: pos(firstPiece + sizeof(compressedSize)),
end(firstPiece + sizeof(firstPiece)), firstEnd(firstPiece) {}
~AccumulatingSink() {}
void writeTo(size_t size, OutputStream& output) {
finalizePiece();
compressedSize.set(size);
return bufferAvailable;
}
ArrayPtr<const byte> pieces[morePieces.size() + 1];
size_t SnappyInputStream::read(void* dst, size_t minBytes, size_t maxBytes) {
while (minBytes > bufferAvailable.size()) {
memcpy(dst, bufferAvailable.begin(), bufferAvailable.size());
pieces[0] = arrayPtr(reinterpret_cast<byte*>(firstPiece),
reinterpret_cast<byte*>(firstEnd));
for (uint i = 0; i < morePieces.size(); i++) {
auto& piece = morePieces[i];
pieces[i + 1] = arrayPtr(reinterpret_cast<byte*>(piece.start.get()),
reinterpret_cast<byte*>(piece.end));
}
dst = reinterpret_cast<byte*>(dst) + bufferAvailable.size();
minBytes -= bufferAvailable.size();
maxBytes -= bufferAvailable.size();
output.write(arrayPtr(pieces, morePieces.size() + 1));
refill();
}
// implements snappy::Sink -----------------------------------------
void Append(const char* bytes, size_t n) override {
if (bytes == pos) {
pos += n;
} else {
size_t a = available();
if (n > a) {
memcpy(pos, bytes, a);
bytes += a;
addPiece(n);
}
memcpy(pos, bytes, n);
pos += n;
}
}
// Serve from current buffer.
size_t n = std::min(bufferAvailable.size(), maxBytes);
memcpy(dst, bufferAvailable.begin(), n);
bufferAvailable = bufferAvailable.slice(n, bufferAvailable.size());
return n;
}
char* GetAppendBuffer(size_t length, char* scratch) override {
if (length > available()) {
addPiece(length);
}
return pos;
void SnappyInputStream::skip(size_t bytes) {
while (bytes > bufferAvailable.size()) {
bytes -= bufferAvailable.size();
refill();
}
bufferAvailable = bufferAvailable.slice(bytes, bufferAvailable.size());
}
private:
char* pos;
char* end;
// Stand and end point of the current available space.
char* firstEnd;
// End point of the used portion of the first piece.
struct Piece {
std::unique_ptr<char[]> start;
// Start point of the piece.
void SnappyInputStream::refill() {
uint32_t length = 0;
InputStreamSnappySource snappySource(inner);
CAPNPROTO_ASSERT(
snappy::RawUncompress(
&snappySource, reinterpret_cast<char*>(buffer.begin()), buffer.size(), &length),
"Snappy decompression failed.");
char* end;
// End point of the used portion of the piece.
bufferAvailable = buffer.slice(0, length);
}
Piece() = default;
Piece(std::unique_ptr<char[]> start, char* end): start(std::move(start)), end(end) {}
};
// =======================================================================================
std::vector<Piece> morePieces;
SnappyOutputStream::SnappyOutputStream(
OutputStream& inner, ArrayPtr<byte> buffer, ArrayPtr<byte> compressedBuffer)
: inner(inner) {
CAPNPROTO_DEBUG_ASSERT(
SNAPPY_COMPRESSED_BUFFER_SIZE >= snappy::MaxCompressedLength(snappy::kBlockSize),
"snappy::MaxCompressedLength() changed?");
union {
internal::WireValue<uint32_t> compressedSize;
char firstPiece[8192];
};
if (buffer.size() < SNAPPY_BUFFER_SIZE) {
ownedBuffer = newArray<byte>(SNAPPY_BUFFER_SIZE);
buffer = ownedBuffer;
}
this->buffer = buffer;
bufferPos = buffer.begin();
inline size_t available() {
return end - pos;
if (compressedBuffer.size() < SNAPPY_COMPRESSED_BUFFER_SIZE) {
ownedCompressedBuffer = newArray<byte>(SNAPPY_COMPRESSED_BUFFER_SIZE);
compressedBuffer = ownedCompressedBuffer;
}
this->compressedBuffer = compressedBuffer;
}
inline void finalizePiece() {
if (morePieces.empty()) {
firstEnd = pos;
SnappyOutputStream::~SnappyOutputStream() {
if (bufferPos > buffer.begin()) {
if (std::uncaught_exception()) {
try {
flush();
} catch (...) {
// TODO: report secondary faults
}
} else {
morePieces.back().end = pos;
flush();
}
}
}
void addPiece(size_t minSize) {
finalizePiece();
void SnappyOutputStream::flush() {
if (bufferPos > buffer.begin()) {
snappy::ByteArraySource source(
reinterpret_cast<char*>(buffer.begin()), bufferPos - buffer.begin());
snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(compressedBuffer.begin()));
std::unique_ptr<char[]> newPiece(new char[minSize]);
pos = newPiece.get();
end = pos + minSize;
morePieces.emplace_back(std::move(newPiece), pos);
}
};
class SnappyOutputStream: public OutputStream {
public:
SnappyOutputStream(OutputStream& output): output(output), sawWrite(false) {}
size_t n = snappy::Compress(&source, &sink);
CAPNPROTO_ASSERT(n <= compressedBuffer.size(),
"Critical security bug: Snappy compression overran its output buffer.");
inner.write(compressedBuffer.begin(), n);
// implements OutputStream -----------------------------------------
void write(const void* buffer, size_t size) override {
CAPNPROTO_ASSERT(false, "writeMessage() was not expected to call this.");
bufferPos = buffer.begin();
}
}
void write(ArrayPtr<const ArrayPtr<const byte>> pieces) {
CAPNPROTO_ASSERT(!sawWrite, "writeMessage() was expected to issue exactly one write.");
sawWrite = true;
ArrayPtr<byte> SnappyOutputStream::getWriteBuffer() {
return arrayPtr(bufferPos, buffer.end());
}
SegmentArraySource source(pieces);
AccumulatingSink sink;
size_t size = snappy::Compress(&source, &sink);
void SnappyOutputStream::write(const void* src, size_t size) {
if (src == bufferPos) {
// Oh goody, the caller wrote directly into our buffer.
bufferPos += size;
} else {
for (;;) {
size_t available = buffer.end() - bufferPos;
if (size < available) break;
memcpy(bufferPos, src, available);
size -= available;
bufferPos = buffer.end();
flush();
}
sink.writeTo(size, output);
memcpy(bufferPos, src, size);
bufferPos += size;
}
}
private:
OutputStream& output;
bool sawWrite;
};
// =======================================================================================
} // namespace
SnappyPackedMessageReader::SnappyPackedMessageReader(
BufferedInputStream& inputStream, ReaderOptions options,
ArrayPtr<word> scratchSpace, ArrayPtr<byte> buffer)
: SnappyInputStream(inputStream, buffer),
PackedMessageReader(static_cast<SnappyInputStream&>(*this), options, scratchSpace) {}
void writeSnappyMessage(OutputStream& output, ArrayPtr<const ArrayPtr<const word>> segments) {
SnappyOutputStream snappyOutput(output);
writeMessage(snappyOutput, segments);
}
SnappyPackedMessageReader::~SnappyPackedMessageReader() {}
void writeSnappyMessageToFd(int fd, ArrayPtr<const ArrayPtr<const word>> segments) {
FdOutputStream output(fd);
writeSnappyMessage(output, segments);
void writeSnappyPackedMessage(OutputStream& output, ArrayPtr<const ArrayPtr<const word>> segments,
ArrayPtr<byte> buffer, ArrayPtr<byte> compressedBuffer) {
SnappyOutputStream snappyOut(output, buffer, compressedBuffer);
writePackedMessage(snappyOut, segments);
}
} // namespace capnproto
......@@ -25,56 +25,84 @@
#define CAPNPROTO_SERIALIZE_SNAPPY_H_
#include "serialize.h"
#include "serialize-packed.h"
namespace capnproto {
class SnappyMessageReader: public MessageReader {
constexpr size_t SNAPPY_BUFFER_SIZE = 65536;
constexpr size_t SNAPPY_COMPRESSED_BUFFER_SIZE = 76490;
class SnappyInputStream: public BufferedInputStream {
public:
SnappyMessageReader(InputStream& inputStream, ReaderOptions options = ReaderOptions(),
ArrayPtr<word> scratchSpace = nullptr);
~SnappyMessageReader();
explicit SnappyInputStream(BufferedInputStream& inner, ArrayPtr<byte> buffer = nullptr);
CAPNPROTO_DISALLOW_COPY(SnappyInputStream);
~SnappyInputStream();
ArrayPtr<const word> getSegment(uint id) override;
// implements BufferedInputStream ----------------------------------
ArrayPtr<const byte> getReadBuffer() override;
size_t read(void* buffer, size_t minBytes, size_t maxBytes) override;
void skip(size_t bytes) override;
private:
InputStream& inputStream;
Array<word> space;
class InputStreamSnappySource;
BufferedInputStream& inner;
Array<byte> ownedBuffer;
ArrayPtr<byte> buffer;
ArrayPtr<byte> bufferAvailable;
union {
FlatArrayMessageReader underlyingReader;
};
void refill();
};
class SnappyFdMessageReader: private FdInputStream, public SnappyMessageReader {
class SnappyOutputStream: public BufferedOutputStream {
public:
SnappyFdMessageReader(int fd, ReaderOptions options = ReaderOptions(),
ArrayPtr<word> scratchSpace = nullptr)
: FdInputStream(fd), SnappyMessageReader(*this, options, scratchSpace) {}
// Read message from a file descriptor, without taking ownership of the descriptor.
explicit SnappyOutputStream(OutputStream& inner,
ArrayPtr<byte> buffer = nullptr,
ArrayPtr<byte> compressedBuffer = nullptr);
CAPNPROTO_DISALLOW_COPY(SnappyOutputStream);
~SnappyOutputStream();
void flush();
// Force the stream to write any remaining bytes in its buffer to the inner stream. This will
// hurt compression, of course, by forcing the current block to end prematurely.
// implements BufferedOutputStream ---------------------------------
ArrayPtr<byte> getWriteBuffer() override;
void write(const void* buffer, size_t size) override;
private:
OutputStream& inner;
SnappyFdMessageReader(AutoCloseFd fd, ReaderOptions options = ReaderOptions(),
ArrayPtr<word> scratchSpace = nullptr)
: FdInputStream(move(fd)), SnappyMessageReader(*this, options, scratchSpace) {}
// Read a message from a file descriptor, taking ownership of the descriptor.
Array<byte> ownedBuffer;
ArrayPtr<byte> buffer;
byte* bufferPos;
~SnappyFdMessageReader();
Array<byte> ownedCompressedBuffer;
ArrayPtr<byte> compressedBuffer;
};
void writeSnappyMessage(OutputStream& output, MessageBuilder& builder);
void writeSnappyMessage(OutputStream& output, ArrayPtr<const ArrayPtr<const word>> segments);
class SnappyPackedMessageReader: private SnappyInputStream, public PackedMessageReader {
public:
SnappyPackedMessageReader(
BufferedInputStream& inputStream, ReaderOptions options = ReaderOptions(),
ArrayPtr<word> scratchSpace = nullptr, ArrayPtr<byte> buffer = nullptr);
~SnappyPackedMessageReader();
};
void writeSnappyMessageToFd(int fd, MessageBuilder& builder);
void writeSnappyMessageToFd(int fd, ArrayPtr<const ArrayPtr<const word>> segments);
void writeSnappyPackedMessage(OutputStream& output, MessageBuilder& builder,
ArrayPtr<byte> buffer = nullptr,
ArrayPtr<byte> compressedBuffer = nullptr);
void writeSnappyPackedMessage(OutputStream& output, ArrayPtr<const ArrayPtr<const word>> segments,
ArrayPtr<byte> buffer = nullptr,
ArrayPtr<byte> compressedBuffer = nullptr);
// =======================================================================================
// inline stuff
inline void writeSnappyMessage(OutputStream& output, MessageBuilder& builder) {
writeSnappyMessage(output, builder.getSegmentsForOutput());
}
inline void writeSnappyMessageToFd(int fd, MessageBuilder& builder) {
writeSnappyMessageToFd(fd, builder.getSegmentsForOutput());
inline void writeSnappyPackedMessage(OutputStream& output, MessageBuilder& builder,
ArrayPtr<byte> buffer,
ArrayPtr<byte> compressedBuffer) {
writeSnappyPackedMessage(output, builder.getSegmentsForOutput(), buffer, compressedBuffer);
}
} // namespace capnproto
......
......@@ -23,11 +23,6 @@
#include "serialize.h"
#include "wire-format.h"
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <string>
#include <sys/uio.h>
namespace capnproto {
......@@ -41,7 +36,7 @@ FlatArrayMessageReader::FlatArrayMessageReader(ArrayPtr<const word> array, Reade
const internal::WireValue<uint32_t>* table =
reinterpret_cast<const internal::WireValue<uint32_t>*>(array.begin());
uint segmentCount = table[0].get();
uint segmentCount = table[0].get() + 1;
size_t offset = segmentCount / 2u + 1u;
if (array.size() < offset) {
......@@ -92,6 +87,8 @@ ArrayPtr<const word> FlatArrayMessageReader::getSegment(uint id) {
}
Array<word> messageToFlatArray(ArrayPtr<const ArrayPtr<const word>> segments) {
CAPNPROTO_ASSERT(segments.size() > 0, "Tried to serialize uninitialized message.");
size_t totalSize = segments.size() / 2 + 1;
for (auto& segment: segments) {
......@@ -103,7 +100,7 @@ Array<word> messageToFlatArray(ArrayPtr<const ArrayPtr<const word>> segments) {
internal::WireValue<uint32_t>* table =
reinterpret_cast<internal::WireValue<uint32_t>*>(result.begin());
table[0].set(segments.size());
table[0].set(segments.size() - 1);
for (uint i = 0; i < segments.size(); i++) {
table[i + 1].set(segments[i].size());
......@@ -128,25 +125,6 @@ Array<word> messageToFlatArray(ArrayPtr<const ArrayPtr<const word>> segments) {
// =======================================================================================
InputStream::~InputStream() {}
OutputStream::~OutputStream() {}
void InputStream::skip(size_t bytes) {
char scratch[8192];
while (bytes > 0) {
size_t amount = std::min(bytes, sizeof(scratch));
bytes -= read(scratch, amount, amount);
}
}
void OutputStream::write(ArrayPtr<const ArrayPtr<const byte>> pieces) {
for (auto piece: pieces) {
write(piece.begin(), piece.size());
}
}
// -------------------------------------------------------------------
InputStreamMessageReader::InputStreamMessageReader(
InputStream& inputStream, ReaderOptions options, ArrayPtr<word> scratchSpace)
: MessageReader(options), inputStream(inputStream), readPos(nullptr) {
......@@ -154,7 +132,7 @@ InputStreamMessageReader::InputStreamMessageReader(
inputStream.read(firstWord, sizeof(firstWord));
uint segmentCount = firstWord[0].get();
uint segmentCount = firstWord[0].get() + 1;
uint segment0Size = segmentCount == 0 ? 0 : firstWord[1].get();
size_t totalWords = segment0Size;
......@@ -237,9 +215,11 @@ ArrayPtr<const word> InputStreamMessageReader::getSegment(uint id) {
// -------------------------------------------------------------------
void writeMessage(OutputStream& output, ArrayPtr<const ArrayPtr<const word>> segments) {
CAPNPROTO_ASSERT(segments.size() > 0, "Tried to serialize uninitialized message.");
internal::WireValue<uint32_t> table[(segments.size() + 2) & ~size_t(1)];
table[0].set(segments.size());
table[0].set(segments.size() - 1);
for (uint i = 0; i < segments.size(); i++) {
table[i + 1].set(segments[i].size());
}
......@@ -260,133 +240,6 @@ void writeMessage(OutputStream& output, ArrayPtr<const ArrayPtr<const word>> seg
}
// =======================================================================================
class OsException: public std::exception {
public:
OsException(const char* function, int error) {
char buffer[256];
message = function;
message += ": ";
message.append(strerror_r(error, buffer, sizeof(buffer)));
}
~OsException() noexcept {}
const char* what() const noexcept override {
return message.c_str();
}
private:
std::string message;
};
class PrematureEofException: public std::exception {
public:
PrematureEofException() {}
~PrematureEofException() noexcept {}
const char* what() const noexcept override {
return "Stream ended prematurely.";
}
};
AutoCloseFd::~AutoCloseFd() {
if (fd >= 0 && close(fd) < 0) {
if (std::uncaught_exception()) {
// TODO: Devise some way to report secondary errors during unwind.
} else {
throw OsException("close", errno);
}
}
}
FdInputStream::~FdInputStream() {}
size_t FdInputStream::read(void* buffer, size_t minBytes, size_t maxBytes) {
byte* pos = reinterpret_cast<byte*>(buffer);
byte* min = pos + minBytes;
byte* max = pos + maxBytes;
while (pos < min) {
ssize_t n = ::read(fd, pos, max - pos);
if (n <= 0) {
if (n < 0) {
int error = errno;
if (error == EINTR) {
continue;
} else {
throw OsException("read", error);
}
} else if (n == 0) {
throw PrematureEofException();
}
return false;
}
pos += n;
}
return pos - reinterpret_cast<byte*>(buffer);
}
FdOutputStream::~FdOutputStream() {}
void FdOutputStream::write(const void* buffer, size_t size) {
const char* pos = reinterpret_cast<const char*>(buffer);
while (size > 0) {
ssize_t n = ::write(fd, pos, size);
if (n <= 0) {
CAPNPROTO_ASSERT(n < 0, "write() returned zero.");
int error = errno;
if (error == EINTR) {
continue;
} else {
throw OsException("write", error);
}
}
pos += n;
size -= n;
}
}
void FdOutputStream::write(ArrayPtr<const ArrayPtr<const byte>> pieces) {
struct iovec iov[pieces.size()];
for (uint i = 0; i < pieces.size(); i++) {
// writev() interface is not const-correct. :(
iov[i].iov_base = const_cast<byte*>(pieces[i].begin());
iov[i].iov_len = pieces[i].size();
}
struct iovec* current = iov;
struct iovec* end = iov + pieces.size();
// Make sure we don't do anything on an empty write.
while (current < end && current->iov_len == 0) {
++current;
}
while (current < end) {
ssize_t n = ::writev(fd, iov, end - current);
if (n <= 0) {
if (n <= 0) {
CAPNPROTO_ASSERT(n < 0, "write() returned zero.");
throw OsException("writev", errno);
}
}
while (static_cast<size_t>(n) >= current->iov_len) {
n -= current->iov_len;
++current;
}
if (n > 0) {
current->iov_base = reinterpret_cast<byte*>(current->iov_base) + n;
current->iov_len -= n;
}
}
}
StreamFdMessageReader::~StreamFdMessageReader() {}
void writeMessageToFd(int fd, ArrayPtr<const ArrayPtr<const word>> segments) {
......
......@@ -44,6 +44,7 @@
#define CAPNPROTO_SERIALIZE_H_
#include "message.h"
#include "io.h"
namespace capnproto {
......@@ -71,47 +72,6 @@ Array<word> messageToFlatArray(ArrayPtr<const ArrayPtr<const word>> segments);
// =======================================================================================
class InputStream {
public:
virtual ~InputStream();
virtual size_t read(void* buffer, size_t minBytes, size_t maxBytes) = 0;
// Reads at least minBytes and at most maxBytes, copying them into the given buffer. Returns
// the size read. Throws an exception on errors.
//
// maxBytes is the number of bytes the caller really wants, but minBytes is the minimum amount
// needed by the caller before it can start doing useful processing. If the stream returns less
// than maxBytes, the caller will usually call read() again later to get the rest. Returning
// less than maxBytes is useful when it makes sense for the caller to parallelize processing
// with I/O.
//
// Cap'n Proto never asks for more bytes than it knows are part of the message. Therefore, if
// the InputStream happens to know that the stream will never reach maxBytes -- even if it has
// reached minBytes -- it should throw an exception to avoid wasting time processing an incomplete
// message. If it can't even reach minBytes, it MUST throw an exception, as the caller is not
// expected to understand how to deal with partial reads.
inline void read(void* buffer, size_t bytes) { read(buffer, bytes, bytes); }
// Convenience method for reading an exact number of bytes.
virtual void skip(size_t bytes);
// Skips past the given number of bytes, discarding them. The default implementation read()s
// into a scratch buffer.
};
class OutputStream {
public:
virtual ~OutputStream();
virtual void write(const void* buffer, size_t size) = 0;
// Always writes the full size. Throws exception on error.
virtual void write(ArrayPtr<const ArrayPtr<const byte>> pieces);
// Equivalent to write()ing each byte array in sequence, which is what the default implementation
// does. Override if you can do something better, e.g. use writev() to do the write in a single
// syscall.
};
class InputStreamMessageReader: public MessageReader {
public:
InputStreamMessageReader(InputStream& inputStream,
......@@ -143,67 +103,6 @@ void writeMessage(OutputStream& output, ArrayPtr<const ArrayPtr<const word>> seg
// =======================================================================================
// Specializations for reading from / writing to file descriptors.
class AutoCloseFd {
// A wrapper around a file descriptor which automatically closes the descriptor when destroyed.
// The wrapper supports move construction for transferring ownership of the descriptor. If
// close() returns an error, the destructor throws an exception, UNLESS the destructor is being
// called during unwind from another exception, in which case the close error is ignored.
//
// If your code is not exception-safe, you should not use AutoCloseFd. In this case you will
// have to call close() yourself and handle errors appropriately.
//
// TODO: Create a general helper library for reporting/detecting secondary exceptions that
// occurred during unwind of some primary exception.
public:
inline AutoCloseFd(): fd(-1) {}
inline AutoCloseFd(std::nullptr_t): fd(-1) {}
inline explicit AutoCloseFd(int fd): fd(fd) {}
inline AutoCloseFd(AutoCloseFd&& other): fd(other.fd) { other.fd = -1; }
CAPNPROTO_DISALLOW_COPY(AutoCloseFd);
~AutoCloseFd();
inline operator int() { return fd; }
inline int get() { return fd; }
inline bool operator==(std::nullptr_t) { return fd < 0; }
inline bool operator!=(std::nullptr_t) { return fd >= 0; }
private:
int fd;
};
class FdInputStream: public InputStream {
// An InputStream wrapping a file descriptor.
public:
FdInputStream(int fd): fd(fd) {};
FdInputStream(AutoCloseFd fd): fd(fd), autoclose(move(fd)) {}
~FdInputStream();
size_t read(void* buffer, size_t minBytes, size_t maxBytes) override;
private:
int fd;
AutoCloseFd autoclose;
};
class FdOutputStream: public OutputStream {
// An OutputStream wrapping a file descriptor.
public:
FdOutputStream(int fd): fd(fd) {};
FdOutputStream(AutoCloseFd fd): fd(fd), autoclose(move(fd)) {}
~FdOutputStream();
void write(const void* buffer, size_t size) override;
void write(ArrayPtr<const ArrayPtr<const byte>> pieces) override;
private:
int fd;
AutoCloseFd autoclose;
};
class StreamFdMessageReader: private FdInputStream, public InputStreamMessageReader {
// A MessageReader that reads from a steam-based file descriptor. For seekable file descriptors
// (e.g. actual disk files), FdFileMessageReader is better, but this will still work.
......
......@@ -225,6 +225,78 @@ void genericCheckTestMessage(Reader reader) {
}
}
template <typename Reader>
void genericCheckTestMessageAllZero(Reader reader) {
EXPECT_EQ(Void::VOID, reader.getVoidField());
EXPECT_EQ(false, reader.getBoolField());
EXPECT_EQ(0, reader.getInt8Field());
EXPECT_EQ(0, reader.getInt16Field());
EXPECT_EQ(0, reader.getInt32Field());
EXPECT_EQ(0, reader.getInt64Field());
EXPECT_EQ(0u, reader.getUInt8Field());
EXPECT_EQ(0u, reader.getUInt16Field());
EXPECT_EQ(0u, reader.getUInt32Field());
EXPECT_EQ(0u, reader.getUInt64Field());
EXPECT_FLOAT_EQ(0, reader.getFloat32Field());
EXPECT_DOUBLE_EQ(0, reader.getFloat64Field());
EXPECT_EQ("", reader.getTextField());
EXPECT_EQ("", reader.getDataField());
{
auto subReader = reader.getStructField();
EXPECT_EQ(Void::VOID, subReader.getVoidField());
EXPECT_EQ(false, subReader.getBoolField());
EXPECT_EQ(0, subReader.getInt8Field());
EXPECT_EQ(0, subReader.getInt16Field());
EXPECT_EQ(0, subReader.getInt32Field());
EXPECT_EQ(0, subReader.getInt64Field());
EXPECT_EQ(0u, subReader.getUInt8Field());
EXPECT_EQ(0u, subReader.getUInt16Field());
EXPECT_EQ(0u, subReader.getUInt32Field());
EXPECT_EQ(0u, subReader.getUInt64Field());
EXPECT_FLOAT_EQ(0, subReader.getFloat32Field());
EXPECT_DOUBLE_EQ(0, subReader.getFloat64Field());
EXPECT_EQ("", subReader.getTextField());
EXPECT_EQ("", subReader.getDataField());
{
auto subSubReader = subReader.getStructField();
EXPECT_EQ("", subSubReader.getTextField());
EXPECT_EQ("", subSubReader.getStructField().getTextField());
}
EXPECT_EQ(0u, subReader.getVoidList().size());
EXPECT_EQ(0u, subReader.getBoolList().size());
EXPECT_EQ(0u, subReader.getInt8List().size());
EXPECT_EQ(0u, subReader.getInt16List().size());
EXPECT_EQ(0u, subReader.getInt32List().size());
EXPECT_EQ(0u, subReader.getInt64List().size());
EXPECT_EQ(0u, subReader.getUInt8List().size());
EXPECT_EQ(0u, subReader.getUInt16List().size());
EXPECT_EQ(0u, subReader.getUInt32List().size());
EXPECT_EQ(0u, subReader.getUInt64List().size());
EXPECT_EQ(0u, subReader.getFloat32List().size());
EXPECT_EQ(0u, subReader.getFloat64List().size());
EXPECT_EQ(0u, subReader.getTextList().size());
EXPECT_EQ(0u, subReader.getDataList().size());
EXPECT_EQ(0u, subReader.getStructList().size());
}
EXPECT_EQ(0u, reader.getVoidList().size());
EXPECT_EQ(0u, reader.getBoolList().size());
EXPECT_EQ(0u, reader.getInt8List().size());
EXPECT_EQ(0u, reader.getInt16List().size());
EXPECT_EQ(0u, reader.getInt32List().size());
EXPECT_EQ(0u, reader.getInt64List().size());
EXPECT_EQ(0u, reader.getUInt8List().size());
EXPECT_EQ(0u, reader.getUInt16List().size());
EXPECT_EQ(0u, reader.getUInt32List().size());
EXPECT_EQ(0u, reader.getUInt64List().size());
EXPECT_EQ(0u, reader.getFloat32List().size());
EXPECT_EQ(0u, reader.getFloat64List().size());
EXPECT_EQ(0u, reader.getTextList().size());
EXPECT_EQ(0u, reader.getDataList().size());
EXPECT_EQ(0u, reader.getStructList().size());
}
} // namespace
void initTestMessage(TestAllTypes::Builder builder) { genericInitTestMessage(builder); }
......@@ -236,5 +308,12 @@ void checkTestMessage(TestDefaults::Builder builder) { genericCheckTestMessage(b
void checkTestMessage(TestAllTypes::Reader reader) { genericCheckTestMessage(reader); }
void checkTestMessage(TestDefaults::Reader reader) { genericCheckTestMessage(reader); }
void checkTestMessageAllZero(TestAllTypes::Builder builder) {
genericCheckTestMessageAllZero(builder);
}
void checkTestMessageAllZero(TestAllTypes::Reader reader) {
genericCheckTestMessageAllZero(reader);
}
} // namespace internal
} // namespace capnproto
......@@ -38,6 +38,9 @@ void checkTestMessage(TestDefaults::Builder builder);
void checkTestMessage(TestAllTypes::Reader reader);
void checkTestMessage(TestDefaults::Reader reader);
void checkTestMessageAllZero(TestAllTypes::Builder builder);
void checkTestMessageAllZero(TestAllTypes::Reader reader);
} // namespace internal
} // namespace capnproto
......
......@@ -88,8 +88,8 @@ public:
return ArrayPtr(ptr + start, end - start);
}
inline bool operator==(std::nullptr_t) { return ptr == nullptr; }
inline bool operator!=(std::nullptr_t) { return ptr != nullptr; }
inline bool operator==(std::nullptr_t) { return size_ == 0; }
inline bool operator!=(std::nullptr_t) { return size_ != 0; }
private:
T* ptr;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment