Commit ee64a212 authored by Kenton Varda's avatar Kenton Varda

Add function to predict the size of a message from its prefix (and detect if the…

Add function to predict the size of a message from its prefix (and detect if the message is complete).
parent 8c88885a
......@@ -63,6 +63,12 @@ private:
uint desiredSegmentCount;
};
kj::Array<word> copyWords(kj::ArrayPtr<const word> input) {
auto result = kj::heapArray<word>(input.size());
memcpy(result.asBytes().begin(), input.asBytes().begin(), input.asBytes().size());
return result;
}
TEST(Serialize, FlatArray) {
TestMessageBuilder builder(1);
initTestMessage(builder.initRoot<TestAllTypes>());
......@@ -99,6 +105,16 @@ TEST(Serialize, FlatArray) {
EXPECT_EQ(serializedWithSuffix.end() - 5, remaining.begin());
EXPECT_EQ(serializedWithSuffix.end(), remaining.end());
}
{
// Test expectedSizeInWordsFromPrefix(). We pass in a copy of the slice so that valgrind can
// detect out-of-bounds access.
EXPECT_EQ(1, expectedSizeInWordsFromPrefix(copyWords(serialized.slice(0, 0))));
for (uint i = 1; i <= serialized.size(); i++) {
EXPECT_EQ(serialized.size(),
expectedSizeInWordsFromPrefix(copyWords(serialized.slice(0, i))));
}
}
}
TEST(Serialize, FlatArrayOddSegmentCount) {
......@@ -121,6 +137,23 @@ TEST(Serialize, FlatArrayOddSegmentCount) {
checkTestMessage(reader.getRoot<TestAllTypes>());
EXPECT_EQ(serializedWithSuffix.end() - 5, reader.getEnd());
}
{
// Test expectedSizeInWordsFromPrefix(). We pass in a copy of the slice so that valgrind can
// detect out-of-bounds access.
// Segment table is 4 words, so with fewer words we'll have incomplete information.
for (uint i = 0; i < 4; i++) {
size_t expectedSize = expectedSizeInWordsFromPrefix(copyWords(serialized.slice(0, i)));
EXPECT_LT(expectedSize, serialized.size());
EXPECT_GT(expectedSize, i);
}
// After that, we get the exact length.
for (uint i = 4; i <= serialized.size(); i++) {
EXPECT_EQ(serialized.size(),
expectedSizeInWordsFromPrefix(copyWords(serialized.slice(0, i))));
}
}
}
TEST(Serialize, FlatArrayEvenSegmentCount) {
......@@ -143,6 +176,23 @@ TEST(Serialize, FlatArrayEvenSegmentCount) {
checkTestMessage(reader.getRoot<TestAllTypes>());
EXPECT_EQ(serializedWithSuffix.end() - 5, reader.getEnd());
}
{
// Test expectedSizeInWordsFromPrefix(). We pass in a copy of the slice so that valgrind can
// detect out-of-bounds access.
// Segment table is 6 words, so with fewer words we'll have incomplete information.
for (uint i = 0; i < 6; i++) {
size_t expectedSize = expectedSizeInWordsFromPrefix(copyWords(serialized.slice(0, i)));
EXPECT_LT(expectedSize, serialized.size());
EXPECT_GT(expectedSize, i);
}
// After that, we get the exact length.
for (uint i = 6; i <= serialized.size(); i++) {
EXPECT_EQ(serialized.size(),
expectedSizeInWordsFromPrefix(copyWords(serialized.slice(0, i))));
}
}
}
class TestInputStream: public kj::InputStream {
......
......@@ -44,11 +44,6 @@ FlatArrayMessageReader::FlatArrayMessageReader(
return;
}
if (segmentCount == 0) {
end = array.begin() + offset;
return;
}
{
uint segmentSize = table[1].get();
......@@ -80,6 +75,29 @@ FlatArrayMessageReader::FlatArrayMessageReader(
end = array.begin() + offset;
}
size_t expectedSizeInWordsFromPrefix(kj::ArrayPtr<const word> array) {
if (array.size() < 1) {
// All messages are at least one word.
return 1;
}
const _::WireValue<uint32_t>* table =
reinterpret_cast<const _::WireValue<uint32_t>*>(array.begin());
uint segmentCount = table[0].get() + 1;
size_t offset = segmentCount / 2u + 1u;
// If the array is too small to contain the full segment table, truncate segmentCount to just
// what is available.
segmentCount = kj::min(segmentCount, array.size() * 2 - 1u);
size_t totalSize = offset;
for (uint i = 0; i < segmentCount; i++) {
totalSize += table[i + 1].get();
}
return totalSize;
}
kj::ArrayPtr<const word> FlatArrayMessageReader::getSegment(uint id) {
if (id == 0) {
return segment0;
......
......@@ -108,6 +108,20 @@ size_t computeSerializedSizeInWords(MessageBuilder& builder);
size_t computeSerializedSizeInWords(kj::ArrayPtr<const kj::ArrayPtr<const word>> segments);
// Version of computeSerializedSizeInWords that takes a raw segment array.
size_t expectedSizeInWordsFromPrefix(kj::ArrayPtr<const word> messagePrefix);
// Given a prefix of a serialized message, try to determine the expected total size of the message,
// in words. The returned size is based on the information known so far; it may be an underestimate
// if the prefix doesn't contain the full segment table.
//
// If the returned value is greater than `messagePrefix.size()`, then the message is not yet
// complete and the app cannot parse it yet. If the returned value is less than or equal to
// `messagePrefix.size()`, then the returned value is the exact total size of the message; any
// remaining bytes are part of the next mesasge.
//
// This function is useful when reading messages from a stream in an asynchronous way, but when
// using the full KJ async infrastructure would be too difficult. Each time bytes are received,
// use this function to determine if an entire message is ready to be parsed.
// =======================================================================================
class InputStreamMessageReader: public MessageReader {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment