Commit e9ddeba1 authored by Ingvar Stepanyan's avatar Ingvar Stepanyan

Allow decompression and flush on GzipAsyncOutputStream

 - Makes it possible to decompress data granularly via push-based API.
 - Adds ability to flush output at an arbitrary point.
 - Removes manual `ended` check as zlib checks it for us.
parent 04862a2e
...@@ -86,6 +86,48 @@ private: ...@@ -86,6 +86,48 @@ private:
size_t blockSize; size_t blockSize;
}; };
class MockOutputStream: public OutputStream {
public:
kj::Vector<byte> bytes;
kj::String decompress() {
MockInputStream rawInput(bytes, kj::maxValue);
GzipInputStream gzip(rawInput);
return gzip.readAllText();
}
void write(const void* buffer, size_t size) override {
bytes.addAll(arrayPtr(reinterpret_cast<const byte*>(buffer), size));
}
void write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
for (auto& piece: pieces) {
bytes.addAll(piece);
}
}
};
class MockAsyncOutputStream: public AsyncOutputStream {
public:
kj::Vector<byte> bytes;
kj::String decompress(WaitScope& ws) {
MockAsyncInputStream rawInput(bytes, kj::maxValue);
GzipAsyncInputStream gzip(rawInput);
return gzip.readAllText().wait(ws);
}
Promise<void> write(const void* buffer, size_t size) override {
bytes.addAll(arrayPtr(reinterpret_cast<const byte*>(buffer), size));
return kj::READY_NOW;
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
for (auto& piece: pieces) {
bytes.addAll(piece);
}
return kj::READY_NOW;
}
};
KJ_TEST("gzip decompression") { KJ_TEST("gzip decompression") {
// Normal read. // Normal read.
{ {
...@@ -168,49 +210,24 @@ KJ_TEST("async gzip decompression") { ...@@ -168,49 +210,24 @@ KJ_TEST("async gzip decompression") {
KJ_EXPECT(gzip.readAllText().wait(io.waitScope) == "foobarfoobar"); KJ_EXPECT(gzip.readAllText().wait(io.waitScope) == "foobarfoobar");
} }
}
class MockOutputStream: public OutputStream {
public:
kj::Vector<byte> bytes;
kj::String decompress() {
MockInputStream rawInput(bytes, kj::maxValue);
GzipInputStream gzip(rawInput);
return gzip.readAllText();
}
void write(const void* buffer, size_t size) override { // Decompress using an output stream.
bytes.addAll(arrayPtr(reinterpret_cast<const byte*>(buffer), size)); {
} MockAsyncOutputStream rawOutput;
void write(ArrayPtr<const ArrayPtr<const byte>> pieces) override { auto gzip = GzipAsyncOutputStream::Decompress(rawOutput);
for (auto& piece: pieces) {
bytes.addAll(piece);
}
}
};
class MockAsyncOutputStream: public AsyncOutputStream { auto mid = sizeof(FOOBAR_GZIP) / 2;
public: gzip.write(FOOBAR_GZIP, mid).wait(io.waitScope);
kj::Vector<byte> bytes; auto str1 = kj::heapString(rawOutput.bytes.asPtr().asChars());
KJ_EXPECT(str1 == "fo", str1);
kj::String decompress(WaitScope& ws) { gzip.write(FOOBAR_GZIP + mid, sizeof(FOOBAR_GZIP) - mid).wait(io.waitScope);
MockAsyncInputStream rawInput(bytes, kj::maxValue); auto str2 = kj::heapString(rawOutput.bytes.asPtr().asChars());
GzipAsyncInputStream gzip(rawInput); KJ_EXPECT(str2 == "foobar", str2);
return gzip.readAllText().wait(ws);
}
Promise<void> write(const void* buffer, size_t size) override { gzip.end().wait(io.waitScope);
bytes.addAll(arrayPtr(reinterpret_cast<const byte*>(buffer), size));
return kj::READY_NOW;
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
for (auto& piece: pieces) {
bytes.addAll(piece);
}
return kj::READY_NOW;
} }
}; }
KJ_TEST("gzip compression") { KJ_TEST("gzip compression") {
// Normal write. // Normal write.
...@@ -291,8 +308,18 @@ KJ_TEST("async gzip compression") { ...@@ -291,8 +308,18 @@ KJ_TEST("async gzip compression") {
{ {
MockAsyncOutputStream rawOutput; MockAsyncOutputStream rawOutput;
GzipAsyncOutputStream gzip(rawOutput); GzipAsyncOutputStream gzip(rawOutput);
gzip.write("foo", 3).wait(io.waitScope); gzip.write("foo", 3).wait(io.waitScope);
auto prevSize = rawOutput.bytes.size();
gzip.write("bar", 3).wait(io.waitScope); gzip.write("bar", 3).wait(io.waitScope);
auto curSize = rawOutput.bytes.size();
KJ_EXPECT(prevSize == curSize, prevSize, curSize);
gzip.flush().wait(io.waitScope);
curSize = rawOutput.bytes.size();
KJ_EXPECT(prevSize < curSize, prevSize, curSize);
gzip.end().wait(io.waitScope); gzip.end().wait(io.waitScope);
KJ_EXPECT(rawOutput.decompress(io.waitScope) == "foobar"); KJ_EXPECT(rawOutput.decompress(io.waitScope) == "foobar");
......
...@@ -226,7 +226,7 @@ Promise<size_t> GzipAsyncInputStream::readImpl( ...@@ -226,7 +226,7 @@ Promise<size_t> GzipAsyncInputStream::readImpl(
// ======================================================================================= // =======================================================================================
GzipAsyncOutputStream::GzipAsyncOutputStream(AsyncOutputStream& inner, int compressionLevel) GzipAsyncOutputStream::GzipAsyncOutputStream(AsyncOutputStream& inner, kj::Maybe<int> compressionLevel)
: inner(inner) { : inner(inner) {
memset(&ctx, 0, sizeof(ctx)); memset(&ctx, 0, sizeof(ctx));
ctx.next_in = nullptr; ctx.next_in = nullptr;
...@@ -234,27 +234,34 @@ GzipAsyncOutputStream::GzipAsyncOutputStream(AsyncOutputStream& inner, int compr ...@@ -234,27 +234,34 @@ GzipAsyncOutputStream::GzipAsyncOutputStream(AsyncOutputStream& inner, int compr
ctx.next_out = nullptr; ctx.next_out = nullptr;
ctx.avail_out = 0; ctx.avail_out = 0;
int initResult = int initResult;
deflateInit2(&ctx, compressionLevel, Z_DEFLATED, KJ_IF_MAYBE(level, compressionLevel) {
compressing = true;
initResult =
deflateInit2(&ctx, *level, Z_DEFLATED,
15 + 16, // windowBits = 15 (maximum) + magic value 16 to ask for gzip. 15 + 16, // windowBits = 15 (maximum) + magic value 16 to ask for gzip.
8, // memLevel = 8 (the default) 8, // memLevel = 8 (the default)
Z_DEFAULT_STRATEGY); Z_DEFAULT_STRATEGY);
KJ_ASSERT(initResult == Z_OK, initResult); } else {
compressing = false;
initResult = inflateInit2(&ctx, 15 + 16);
}
if (initResult != Z_OK) {
fail(initResult);
}
} }
GzipAsyncOutputStream::~GzipAsyncOutputStream() noexcept(false) { GzipAsyncOutputStream::~GzipAsyncOutputStream() noexcept(false) {
deflateEnd(&ctx); compressing ? deflateEnd(&ctx) : inflateEnd(&ctx);
} }
Promise<void> GzipAsyncOutputStream::write(const void* in, size_t size) { Promise<void> GzipAsyncOutputStream::write(const void* in, size_t size) {
ctx.next_in = const_cast<byte*>(reinterpret_cast<const byte*>(in)); ctx.next_in = const_cast<byte*>(reinterpret_cast<const byte*>(in));
ctx.avail_in = size; ctx.avail_in = size;
return pump(); return pump(Z_NO_FLUSH);
} }
Promise<void> GzipAsyncOutputStream::write(ArrayPtr<const ArrayPtr<const byte>> pieces) { Promise<void> GzipAsyncOutputStream::write(ArrayPtr<const ArrayPtr<const byte>> pieces) {
KJ_REQUIRE(!ended, "already ended");
if (pieces.size() == 0) return kj::READY_NOW; if (pieces.size() == 0) return kj::READY_NOW;
return write(pieces[0].begin(), pieces[0].size()) return write(pieces[0].begin(), pieces[0].size())
.then([this,pieces]() { .then([this,pieces]() {
...@@ -262,50 +269,39 @@ Promise<void> GzipAsyncOutputStream::write(ArrayPtr<const ArrayPtr<const byte>> ...@@ -262,50 +269,39 @@ Promise<void> GzipAsyncOutputStream::write(ArrayPtr<const ArrayPtr<const byte>>
}); });
} }
Promise<void> GzipAsyncOutputStream::end() { Promise<void> GzipAsyncOutputStream::flush() {
KJ_REQUIRE(!ended, "already ended"); return pump(Z_SYNC_FLUSH);
}
ctx.next_out = buffer; Promise<void> GzipAsyncOutputStream::end() {
ctx.avail_out = sizeof(buffer); return pump(Z_FINISH);
}
auto deflateResult = deflate(&ctx, Z_FINISH); void GzipAsyncOutputStream::fail(int result) {
if (deflateResult == Z_OK || deflateResult == Z_STREAM_END) { if (ctx.msg == nullptr) {
size_t n = sizeof(buffer) - ctx.avail_out; KJ_FAIL_REQUIRE("gzip failed", result);
auto promise = inner.write(buffer, n);
if (deflateResult == Z_OK) {
return promise.then([this]() { return end(); });
} else {
ended = true;
return promise;
}
} else { } else {
if (ctx.msg == nullptr) { KJ_FAIL_REQUIRE("gzip failed", ctx.msg);
KJ_FAIL_REQUIRE("gzip compression failed", deflateResult);
} else {
KJ_FAIL_REQUIRE("gzip compression failed", ctx.msg);
}
} }
} }
kj::Promise<void> GzipAsyncOutputStream::pump() { kj::Promise<void> GzipAsyncOutputStream::pump(int flush) {
if (ctx.avail_in == 0) {
return kj::READY_NOW;
}
ctx.next_out = buffer; ctx.next_out = buffer;
ctx.avail_out = sizeof(buffer); ctx.avail_out = sizeof(buffer);
auto deflateResult = deflate(&ctx, Z_NO_FLUSH); auto result = compressing ? deflate(&ctx, flush) : inflate(&ctx, flush);
if (deflateResult == Z_OK) { if (result != Z_OK && result != Z_BUF_ERROR && result != Z_STREAM_END) {
size_t n = sizeof(buffer) - ctx.avail_out; fail(result);
return inner.write(buffer, n) }
.then([this]() { return pump(); }); size_t n = sizeof(buffer) - ctx.avail_out;
auto promise = inner.write(buffer, n);
if (result == Z_OK) {
return promise.then([this, flush]() { return pump(flush); });
} else { } else {
if (ctx.msg == nullptr) { // - Z_STREAM_END means we have finished the stream successfully.
KJ_FAIL_REQUIRE("gzip compression failed", deflateResult); // - Z_BUF_ERROR means we didn't have any more input to process
} else { // (but had to make a call nevertheless to potentially flush data).
KJ_FAIL_REQUIRE("gzip compression failed", ctx.msg); return promise;
}
} }
} }
......
...@@ -83,26 +83,35 @@ private: ...@@ -83,26 +83,35 @@ private:
class GzipAsyncOutputStream final: public AsyncOutputStream { class GzipAsyncOutputStream final: public AsyncOutputStream {
public: public:
GzipAsyncOutputStream(AsyncOutputStream& inner, int compressionLevel = Z_DEFAULT_COMPRESSION); GzipAsyncOutputStream(AsyncOutputStream& inner, kj::Maybe<int> compressionLevel = Z_DEFAULT_COMPRESSION);
~GzipAsyncOutputStream() noexcept(false); ~GzipAsyncOutputStream() noexcept(false);
KJ_DISALLOW_COPY(GzipAsyncOutputStream); KJ_DISALLOW_COPY(GzipAsyncOutputStream);
GzipAsyncOutputStream(GzipAsyncOutputStream&&) = default;
static inline GzipAsyncOutputStream Decompress(AsyncOutputStream& inner) {
return GzipAsyncOutputStream(inner, nullptr);
}
Promise<void> write(const void* buffer, size_t size) override; Promise<void> write(const void* buffer, size_t size) override;
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override; Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override;
Promise<void> flush();
// Call if you need to flush a stream at an arbitrary data point.
Promise<void> end(); Promise<void> end();
// Must call to flush the stream, since some data may be buffered. // Must call to flush and finish the stream, since some data may be buffered.
// //
// TODO(cleanup): This should be a virtual method on AsyncOutputStream. // TODO(cleanup): This should be a virtual method on AsyncOutputStream.
private: private:
AsyncOutputStream& inner; AsyncOutputStream& inner;
bool ended = false; bool compressing;
z_stream ctx; z_stream ctx;
byte buffer[4096]; byte buffer[4096];
kj::Promise<void> pump(); kj::Promise<void> pump(int flush);
void fail(int result);
}; };
} // namespace kj } // namespace kj
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment