Commit 742b2166 authored by Kenton Varda's avatar Kenton Varda Committed by GitHub

Merge pull request #554 from capnproto/gzip

Add gzip (zlib) bindings.
parents a76d34f2 6c3ff2ce
// Copyright (c) 2017 Cloudflare, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#include "gzip.h"
#include <kj/test.h>
#include <kj/debug.h>
#include <stdlib.h>
namespace kj {
namespace {
static const byte FOOBAR_GZIP[] = {
0x1F, 0x8B, 0x08, 0x00, 0xF9, 0x05, 0xB7, 0x59,
0x00, 0x03, 0x4B, 0xCB, 0xCF, 0x4F, 0x4A, 0x2C,
0x02, 0x00, 0x95, 0x1F, 0xF6, 0x9E, 0x06, 0x00,
0x00, 0x00,
};
class MockInputStream: public AsyncInputStream {
public:
MockInputStream(kj::ArrayPtr<const byte> bytes, size_t blockSize)
: bytes(bytes), blockSize(blockSize) {}
Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
// Clamp max read to blockSize.
size_t n = kj::min(blockSize, maxBytes);
// Unless that's less than minBytes -- in which case, use minBytes.
n = kj::max(n, minBytes);
// But also don't read more data than we have.
n = kj::min(n, bytes.size());
memcpy(buffer, bytes.begin(), n);
bytes = bytes.slice(n, bytes.size());
return n;
}
private:
kj::ArrayPtr<const byte> bytes;
size_t blockSize;
};
KJ_TEST("gzip decompression") {
auto io = setupAsyncIo();
// Normal read.
{
MockInputStream rawInput(FOOBAR_GZIP, kj::maxValue);
GzipAsyncInputStream gzip(rawInput);
KJ_EXPECT(gzip.readAllText().wait(io.waitScope) == "foobar");
}
// Force read one byte at a time.
{
MockInputStream rawInput(FOOBAR_GZIP, 1);
GzipAsyncInputStream gzip(rawInput);
KJ_EXPECT(gzip.readAllText().wait(io.waitScope) == "foobar");
}
// Read truncated input.
{
MockInputStream rawInput(kj::arrayPtr(FOOBAR_GZIP, sizeof(FOOBAR_GZIP) / 2), kj::maxValue);
GzipAsyncInputStream gzip(rawInput);
char text[16];
size_t n = gzip.tryRead(text, 1, sizeof(text)).wait(io.waitScope);
text[n] = '\0';
KJ_EXPECT(StringPtr(text, n) == "fo");
KJ_EXPECT_THROW_MESSAGE("gzip compressed stream ended prematurely",
gzip.tryRead(text, 1, sizeof(text)).wait(io.waitScope));
}
// Read concatenated input.
{
Vector<byte> bytes;
bytes.addAll(ArrayPtr<const byte>(FOOBAR_GZIP));
bytes.addAll(ArrayPtr<const byte>(FOOBAR_GZIP));
MockInputStream rawInput(bytes, kj::maxValue);
GzipAsyncInputStream gzip(rawInput);
KJ_EXPECT(gzip.readAllText().wait(io.waitScope) == "foobarfoobar");
}
}
class MockOutputStream: public AsyncOutputStream {
public:
kj::Vector<byte> bytes;
kj::String decompress(WaitScope& ws) {
MockInputStream rawInput(bytes, kj::maxValue);
GzipAsyncInputStream gzip(rawInput);
return gzip.readAllText().wait(ws);
}
Promise<void> write(const void* buffer, size_t size) override {
bytes.addAll(arrayPtr(reinterpret_cast<const byte*>(buffer), size));
return kj::READY_NOW;
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
for (auto& piece: pieces) {
bytes.addAll(piece);
}
return kj::READY_NOW;
}
};
KJ_TEST("gzip compression") {
auto io = setupAsyncIo();
// Normal write.
{
MockOutputStream rawOutput;
GzipAsyncOutputStream gzip(rawOutput);
gzip.write("foobar", 6).wait(io.waitScope);
gzip.end().wait(io.waitScope);
KJ_EXPECT(rawOutput.decompress(io.waitScope) == "foobar");
}
// Multi-part write.
{
MockOutputStream rawOutput;
GzipAsyncOutputStream gzip(rawOutput);
gzip.write("foo", 3).wait(io.waitScope);
gzip.write("bar", 3).wait(io.waitScope);
gzip.end().wait(io.waitScope);
KJ_EXPECT(rawOutput.decompress(io.waitScope) == "foobar");
}
// Array-of-arrays write.
{
MockOutputStream rawOutput;
GzipAsyncOutputStream gzip(rawOutput);
ArrayPtr<const byte> pieces[] = {
kj::StringPtr("foo").asBytes(),
kj::StringPtr("bar").asBytes(),
};
gzip.write(pieces).wait(io.waitScope);
gzip.end().wait(io.waitScope);
KJ_EXPECT(rawOutput.decompress(io.waitScope) == "foobar");
}
}
KJ_TEST("gzip huge round trip") {
auto io = setupAsyncIo();
auto bytes = heapArray<byte>(65536);
for (auto& b: bytes) {
b = rand();
}
MockOutputStream rawOutput;
GzipAsyncOutputStream gzipOut(rawOutput);
gzipOut.write(bytes.begin(), bytes.size()).wait(io.waitScope);
gzipOut.end().wait(io.waitScope);
MockInputStream rawInput(rawOutput.bytes, kj::maxValue);
GzipAsyncInputStream gzipIn(rawInput);
auto decompressed = gzipIn.readAllBytes().wait(io.waitScope);
KJ_ASSERT(decompressed.size() == bytes.size());
KJ_ASSERT(memcmp(bytes.begin(), decompressed.begin(), bytes.size()) == 0);
}
} // namespace
} // namespace kj
// Copyright (c) 2017 Cloudflare, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#include "gzip.h"
#include <kj/debug.h>
namespace kj {
GzipAsyncInputStream::GzipAsyncInputStream(AsyncInputStream& inner)
: inner(inner) {
memset(&ctx, 0, sizeof(ctx));
ctx.next_in = nullptr;
ctx.avail_in = 0;
ctx.next_out = nullptr;
ctx.avail_out = 0;
// windowBits = 15 (maximum) + magic value 16 to ask for gzip.
KJ_ASSERT(inflateInit2(&ctx, 15 + 16) == Z_OK);
}
GzipAsyncInputStream::~GzipAsyncInputStream() noexcept(false) {
inflateEnd(&ctx);
}
Promise<size_t> GzipAsyncInputStream::tryRead(void* out, size_t minBytes, size_t maxBytes) {
if (maxBytes == 0) return size_t(0);
return readImpl(reinterpret_cast<byte*>(out), minBytes, maxBytes, 0);
}
Promise<size_t> GzipAsyncInputStream::readImpl(
byte* out, size_t minBytes, size_t maxBytes, size_t alreadyRead) {
if (ctx.avail_in == 0) {
return inner.tryRead(buffer, 1, sizeof(buffer))
.then([this,out,minBytes,maxBytes,alreadyRead](size_t amount) -> Promise<size_t> {
if (amount == 0) {
KJ_REQUIRE(atValidEndpoint, "gzip compressed stream ended prematurely");
return alreadyRead;
} else {
ctx.next_in = buffer;
ctx.avail_in = amount;
return readImpl(out, minBytes, maxBytes, alreadyRead);
}
});
}
ctx.next_out = reinterpret_cast<byte*>(out);
ctx.avail_out = maxBytes;
auto inflateResult = inflate(&ctx, Z_NO_FLUSH);
atValidEndpoint = inflateResult == Z_STREAM_END;
if (inflateResult == Z_OK || inflateResult == Z_STREAM_END) {
if (atValidEndpoint && ctx.avail_in > 0) {
// There's more data available. Assume start of new content.
KJ_ASSERT(inflateReset(&ctx) == Z_OK);
}
size_t n = maxBytes - ctx.avail_out;
if (n >= minBytes) {
return n + alreadyRead;
} else {
return readImpl(out + n, minBytes - n, maxBytes - n, alreadyRead + n);
}
} else {
if (ctx.msg == nullptr) {
KJ_FAIL_REQUIRE("gzip decompression failed", inflateResult);
} else {
KJ_FAIL_REQUIRE("gzip decompression failed", ctx.msg);
}
}
}
// =======================================================================================
GzipAsyncOutputStream::GzipAsyncOutputStream(AsyncOutputStream& inner, int compressionLevel)
: inner(inner) {
memset(&ctx, 0, sizeof(ctx));
ctx.next_in = nullptr;
ctx.avail_in = 0;
ctx.next_out = nullptr;
ctx.avail_out = 0;
int initResult =
deflateInit2(&ctx, compressionLevel, Z_DEFLATED,
15 + 16, // windowBits = 15 (maximum) + magic value 16 to ask for gzip.
8, // memLevel = 8 (the default)
Z_DEFAULT_STRATEGY);
KJ_ASSERT(initResult == Z_OK, initResult);
}
GzipAsyncOutputStream::~GzipAsyncOutputStream() noexcept(false) {
deflateEnd(&ctx);
}
Promise<void> GzipAsyncOutputStream::write(const void* in, size_t size) {
ctx.next_in = const_cast<byte*>(reinterpret_cast<const byte*>(in));
ctx.avail_in = size;
return pump();
}
Promise<void> GzipAsyncOutputStream::write(ArrayPtr<const ArrayPtr<const byte>> pieces) {
KJ_REQUIRE(!ended, "already ended");
if (pieces.size() == 0) return kj::READY_NOW;
return write(pieces[0].begin(), pieces[0].size())
.then([this,pieces]() {
return write(pieces.slice(1, pieces.size()));
});
}
Promise<void> GzipAsyncOutputStream::end() {
KJ_REQUIRE(!ended, "already ended");
ctx.next_out = buffer;
ctx.avail_out = sizeof(buffer);
auto deflateResult = deflate(&ctx, Z_FINISH);
if (deflateResult == Z_OK || deflateResult == Z_STREAM_END) {
size_t n = sizeof(buffer) - ctx.avail_out;
auto promise = inner.write(buffer, n);
if (deflateResult == Z_OK) {
return promise.then([this]() { return end(); });
} else {
ended = true;
return promise;
}
} else {
if (ctx.msg == nullptr) {
KJ_FAIL_REQUIRE("gzip compression failed", deflateResult);
} else {
KJ_FAIL_REQUIRE("gzip compression failed", ctx.msg);
}
}
}
kj::Promise<void> GzipAsyncOutputStream::pump() {
if (ctx.avail_in == 0) {
return kj::READY_NOW;
}
ctx.next_out = buffer;
ctx.avail_out = sizeof(buffer);
auto deflateResult = deflate(&ctx, Z_NO_FLUSH);
if (deflateResult == Z_OK) {
size_t n = sizeof(buffer) - ctx.avail_out;
return inner.write(buffer, n)
.then([this]() { return pump(); });
} else {
if (ctx.msg == nullptr) {
KJ_FAIL_REQUIRE("gzip compression failed", deflateResult);
} else {
KJ_FAIL_REQUIRE("gzip compression failed", ctx.msg);
}
}
}
} // namespace kj
// Copyright (c) 2017 Cloudflare, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#ifndef KJ_COMPAT_GZIP_H_
#define KJ_COMPAT_GZIP_H_
#include <kj/async-io.h>
#include <zlib.h>
namespace kj {
class GzipAsyncInputStream: public AsyncInputStream {
public:
GzipAsyncInputStream(AsyncInputStream& inner);
~GzipAsyncInputStream() noexcept(false);
KJ_DISALLOW_COPY(GzipAsyncInputStream);
Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override;
private:
AsyncInputStream& inner;
z_stream ctx;
bool atValidEndpoint = false;
byte buffer[4096];
Promise<size_t> readImpl(byte* buffer, size_t minBytes, size_t maxBytes, size_t alreadyRead);
};
class GzipAsyncOutputStream: public AsyncOutputStream {
public:
GzipAsyncOutputStream(AsyncOutputStream& inner, int compressionLevel = Z_DEFAULT_COMPRESSION);
~GzipAsyncOutputStream() noexcept(false);
KJ_DISALLOW_COPY(GzipAsyncOutputStream);
Promise<void> write(const void* buffer, size_t size) override;
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override;
Promise<void> end();
// Must call to flush the stream, since some data may be buffered.
//
// TODO(cleanup): This should be a virtual method on AsyncOutputStream.
private:
AsyncOutputStream& inner;
bool ended = false;
z_stream ctx;
byte buffer[4096];
kj::Promise<void> pump();
};
} // namespace kj
#endif // KJ_COMPAT_GZIP_H_
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment