Commit 441e079f authored by Ingvar Stepanyan's avatar Ingvar Stepanyan

Add common GzipOutputContext helper

Move generic gzip output operations to private helper for easier sharing of capabilities between GzipOutputStream and GzipAsyncOutputStream.
parent e9ddeba1
......@@ -26,6 +26,69 @@
namespace kj {
namespace _ { // private
GzipOutputContext::GzipOutputContext(kj::Maybe<int> compressionLevel) {
memset(&ctx, 0, sizeof(ctx));
ctx.next_in = nullptr;
ctx.avail_in = 0;
ctx.next_out = nullptr;
ctx.avail_out = 0;
int initResult;
KJ_IF_MAYBE(level, compressionLevel) {
compressing = true;
initResult =
deflateInit2(&ctx, *level, Z_DEFLATED,
15 + 16, // windowBits = 15 (maximum) + magic value 16 to ask for gzip.
8, // memLevel = 8 (the default)
Z_DEFAULT_STRATEGY);
} else {
compressing = false;
initResult = inflateInit2(&ctx, 15 + 16);
}
if (initResult != Z_OK) {
fail(initResult);
}
}
GzipOutputContext::~GzipOutputContext() noexcept(false) {
compressing ? deflateEnd(&ctx) : inflateEnd(&ctx);
}
void GzipOutputContext::setInput(const void* in, size_t size) {
ctx.next_in = const_cast<byte*>(reinterpret_cast<const byte*>(in));
ctx.avail_in = size;
}
kj::Tuple<bool, kj::ArrayPtr<const byte>> GzipOutputContext::pumpOnce(int flush) {
ctx.next_out = buffer;
ctx.avail_out = sizeof(buffer);
auto result = compressing ? deflate(&ctx, flush) : inflate(&ctx, flush);
if (result != Z_OK && result != Z_BUF_ERROR && result != Z_STREAM_END) {
fail(result);
}
// - Z_STREAM_END means we have finished the stream successfully.
// - Z_BUF_ERROR means we didn't have any more input to process
// (but still have to make a call to write to potentially flush data).
return kj::tuple(result == Z_OK, kj::arrayPtr(buffer, sizeof(buffer) - ctx.avail_out));
}
void GzipOutputContext::fail(int result) {
auto header = compressing ? "gzip compression failed" : "gzip decompression failed";
if (ctx.msg == nullptr) {
KJ_FAIL_REQUIRE(header, result);
} else {
KJ_FAIL_REQUIRE(header, ctx.msg);
}
}
} // namespace _ (private)
GzipInputStream::GzipInputStream(InputStream& inner)
: inner(inner) {
memset(&ctx, 0, sizeof(ctx));
......@@ -91,69 +154,26 @@ size_t GzipInputStream::readImpl(
// =======================================================================================
GzipOutputStream::GzipOutputStream(OutputStream& inner, int compressionLevel)
: inner(inner) {
memset(&ctx, 0, sizeof(ctx));
ctx.next_in = nullptr;
ctx.avail_in = 0;
ctx.next_out = nullptr;
ctx.avail_out = 0;
int initResult =
deflateInit2(&ctx, compressionLevel, Z_DEFLATED,
15 + 16, // windowBits = 15 (maximum) + magic value 16 to ask for gzip.
8, // memLevel = 8 (the default)
Z_DEFAULT_STRATEGY);
KJ_ASSERT(initResult == Z_OK, initResult);
}
GzipOutputStream::GzipOutputStream(OutputStream& inner, kj::Maybe<int> compressionLevel)
: inner(inner), ctx(compressionLevel) {}
GzipOutputStream::~GzipOutputStream() noexcept(false) {
KJ_DEFER(deflateEnd(&ctx));
for (;;) {
ctx.next_out = buffer;
ctx.avail_out = sizeof(buffer);
auto deflateResult = deflate(&ctx, Z_FINISH);
if (deflateResult != Z_OK && deflateResult != Z_STREAM_END) {
if (ctx.msg == nullptr) {
KJ_FAIL_REQUIRE("gzip compression failed", deflateResult);
} else {
KJ_FAIL_REQUIRE("gzip compression failed", ctx.msg);
}
}
size_t n = sizeof(buffer) - ctx.avail_out;
inner.write(buffer, n);
if (deflateResult == Z_STREAM_END) {
break;
}
}
pump(Z_FINISH);
}
void GzipOutputStream::write(const void* in, size_t size) {
ctx.next_in = const_cast<byte*>(reinterpret_cast<const byte*>(in));
ctx.avail_in = size;
pump();
ctx.setInput(in, size);
pump(Z_NO_FLUSH);
}
void GzipOutputStream::pump() {
while (ctx.avail_in > 0) {
ctx.next_out = buffer;
ctx.avail_out = sizeof(buffer);
auto deflateResult = deflate(&ctx, Z_NO_FLUSH);
if (deflateResult != Z_OK) {
if (ctx.msg == nullptr) {
KJ_FAIL_REQUIRE("gzip compression failed", deflateResult);
} else {
KJ_FAIL_REQUIRE("gzip compression failed", ctx.msg);
}
}
size_t n = sizeof(buffer) - ctx.avail_out;
inner.write(buffer, n);
}
void GzipOutputStream::pump(int flush) {
bool ok;
do {
auto result = ctx.pumpOnce(flush);
ok = get<0>(result);
auto chunk = get<1>(result);
inner.write(chunk.begin(), chunk.size());
} while (ok);
}
// =======================================================================================
......@@ -227,37 +247,10 @@ Promise<size_t> GzipAsyncInputStream::readImpl(
// =======================================================================================
GzipAsyncOutputStream::GzipAsyncOutputStream(AsyncOutputStream& inner, kj::Maybe<int> compressionLevel)
: inner(inner) {
memset(&ctx, 0, sizeof(ctx));
ctx.next_in = nullptr;
ctx.avail_in = 0;
ctx.next_out = nullptr;
ctx.avail_out = 0;
int initResult;
KJ_IF_MAYBE(level, compressionLevel) {
compressing = true;
initResult =
deflateInit2(&ctx, *level, Z_DEFLATED,
15 + 16, // windowBits = 15 (maximum) + magic value 16 to ask for gzip.
8, // memLevel = 8 (the default)
Z_DEFAULT_STRATEGY);
} else {
compressing = false;
initResult = inflateInit2(&ctx, 15 + 16);
}
if (initResult != Z_OK) {
fail(initResult);
}
}
GzipAsyncOutputStream::~GzipAsyncOutputStream() noexcept(false) {
compressing ? deflateEnd(&ctx) : inflateEnd(&ctx);
}
: inner(inner), ctx(compressionLevel) {}
Promise<void> GzipAsyncOutputStream::write(const void* in, size_t size) {
ctx.next_in = const_cast<byte*>(reinterpret_cast<const byte*>(in));
ctx.avail_in = size;
ctx.setInput(in, size);
return pump(Z_NO_FLUSH);
}
......@@ -269,40 +262,15 @@ Promise<void> GzipAsyncOutputStream::write(ArrayPtr<const ArrayPtr<const byte>>
});
}
Promise<void> GzipAsyncOutputStream::flush() {
return pump(Z_SYNC_FLUSH);
}
Promise<void> GzipAsyncOutputStream::end() {
return pump(Z_FINISH);
}
void GzipAsyncOutputStream::fail(int result) {
if (ctx.msg == nullptr) {
KJ_FAIL_REQUIRE("gzip failed", result);
} else {
KJ_FAIL_REQUIRE("gzip failed", ctx.msg);
}
}
kj::Promise<void> GzipAsyncOutputStream::pump(int flush) {
ctx.next_out = buffer;
ctx.avail_out = sizeof(buffer);
auto result = compressing ? deflate(&ctx, flush) : inflate(&ctx, flush);
if (result != Z_OK && result != Z_BUF_ERROR && result != Z_STREAM_END) {
fail(result);
auto result = ctx.pumpOnce(flush);
auto ok = get<0>(result);
auto chunk = get<1>(result);
auto promise = inner.write(chunk.begin(), chunk.size());
if (ok) {
promise = promise.then([this, flush]() { return pump(flush); });
}
size_t n = sizeof(buffer) - ctx.avail_out;
auto promise = inner.write(buffer, n);
if (result == Z_OK) {
return promise.then([this, flush]() { return pump(flush); });
} else {
// - Z_STREAM_END means we have finished the stream successfully.
// - Z_BUF_ERROR means we didn't have any more input to process
// (but had to make a call nevertheless to potentially flush data).
return promise;
}
}
} // namespace kj
......
......@@ -27,6 +27,28 @@
namespace kj {
namespace _ { // private
class GzipOutputContext final {
public:
GzipOutputContext(kj::Maybe<int> compressionLevel);
~GzipOutputContext() noexcept(false);
KJ_DISALLOW_COPY(GzipOutputContext);
GzipOutputContext(GzipOutputContext&&) = default;
void setInput(const void* in, size_t size);
kj::Tuple<bool, kj::ArrayPtr<const byte>> pumpOnce(int flush);
private:
bool compressing;
z_stream ctx;
byte buffer[4096];
void fail(int result);
};
} // namespace _ (private)
class GzipInputStream final: public InputStream {
public:
GzipInputStream(InputStream& inner);
......@@ -47,20 +69,27 @@ private:
class GzipOutputStream final: public OutputStream {
public:
GzipOutputStream(OutputStream& inner, int compressionLevel = Z_DEFAULT_COMPRESSION);
GzipOutputStream(OutputStream& inner, kj::Maybe<int> compressionLevel = Z_DEFAULT_COMPRESSION);
~GzipOutputStream() noexcept(false);
KJ_DISALLOW_COPY(GzipOutputStream);
GzipOutputStream(GzipOutputStream&&) = default;
static inline GzipOutputStream Decompress(OutputStream& inner) {
return GzipOutputStream(inner, nullptr);
}
void write(const void* buffer, size_t size) override;
using OutputStream::write;
inline void flush() {
pump(Z_SYNC_FLUSH);
}
private:
OutputStream& inner;
z_stream ctx;
_::GzipOutputContext ctx;
byte buffer[4096];
void pump();
void pump(int flush);
};
class GzipAsyncInputStream final: public AsyncInputStream {
......@@ -84,7 +113,6 @@ private:
class GzipAsyncOutputStream final: public AsyncOutputStream {
public:
GzipAsyncOutputStream(AsyncOutputStream& inner, kj::Maybe<int> compressionLevel = Z_DEFAULT_COMPRESSION);
~GzipAsyncOutputStream() noexcept(false);
KJ_DISALLOW_COPY(GzipAsyncOutputStream);
GzipAsyncOutputStream(GzipAsyncOutputStream&&) = default;
......@@ -95,23 +123,23 @@ public:
Promise<void> write(const void* buffer, size_t size) override;
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override;
Promise<void> flush();
inline Promise<void> flush() {
return pump(Z_SYNC_FLUSH);
}
// Call if you need to flush a stream at an arbitrary data point.
Promise<void> end();
Promise<void> end() {
return pump(Z_FINISH);
}
// Must call to flush and finish the stream, since some data may be buffered.
//
// TODO(cleanup): This should be a virtual method on AsyncOutputStream.
private:
AsyncOutputStream& inner;
bool compressing;
z_stream ctx;
byte buffer[4096];
_::GzipOutputContext ctx;
kj::Promise<void> pump(int flush);
void fail(int result);
};
} // namespace kj
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment