Unverified Commit eedf4e50 authored by Kenton Varda's avatar Kenton Varda Committed by GitHub

Merge pull request #687 from RReverser/gz

Add GzipAsyncOutputStream::Decompress and ::flush
parents cba1b179 40d46a81
...@@ -158,6 +158,31 @@ if(NOT CAPNP_LITE) ...@@ -158,6 +158,31 @@ if(NOT CAPNP_LITE)
install(FILES ${kj-http_headers} DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/kj/compat") install(FILES ${kj-http_headers} DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/kj/compat")
endif() endif()
# kj-gzip ======================================================================
set(kj-gzip_sources
compat/gzip.c++
)
set(kj-gzip_headers
compat/gzip.h
)
if(NOT CAPNP_LITE)
add_library(kj-gzip ${kj-gzip_sources})
add_library(CapnProto::kj-gzip ALIAS kj-gzip)
find_package(ZLIB)
if(ZLIB_FOUND)
add_definitions(-D KJ_HAS_ZLIB=1)
include_directories(${ZLIB_INCLUDE_DIRS})
target_link_libraries(kj-gzip PUBLIC kj-async kj ${ZLIB_LIBRARIES})
endif()
# Ensure the library has a version set to match autotools build
set_target_properties(kj-gzip PROPERTIES VERSION ${VERSION})
install(TARGETS kj-gzip ${INSTALL_TARGETS_DEFAULT_ARGS})
install(FILES ${kj-gzip_headers} DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/kj/compat")
endif()
# Tests ======================================================================== # Tests ========================================================================
if(BUILD_TESTING) if(BUILD_TESTING)
...@@ -200,8 +225,9 @@ if(BUILD_TESTING) ...@@ -200,8 +225,9 @@ if(BUILD_TESTING)
parse/char-test.c++ parse/char-test.c++
compat/url-test.c++ compat/url-test.c++
compat/http-test.c++ compat/http-test.c++
compat/gzip-test.c++
) )
target_link_libraries(kj-heavy-tests kj-http kj-async kj-test kj) target_link_libraries(kj-heavy-tests kj-http kj-gzip kj-async kj-test kj)
add_dependencies(check kj-heavy-tests) add_dependencies(check kj-heavy-tests)
add_test(NAME kj-heavy-tests-run COMMAND kj-heavy-tests) add_test(NAME kj-heavy-tests-run COMMAND kj-heavy-tests)
endif() # NOT CAPNP_LITE endif() # NOT CAPNP_LITE
......
...@@ -86,6 +86,48 @@ private: ...@@ -86,6 +86,48 @@ private:
size_t blockSize; size_t blockSize;
}; };
class MockOutputStream: public OutputStream {
public:
kj::Vector<byte> bytes;
kj::String decompress() {
MockInputStream rawInput(bytes, kj::maxValue);
GzipInputStream gzip(rawInput);
return gzip.readAllText();
}
void write(const void* buffer, size_t size) override {
bytes.addAll(arrayPtr(reinterpret_cast<const byte*>(buffer), size));
}
void write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
for (auto& piece: pieces) {
bytes.addAll(piece);
}
}
};
class MockAsyncOutputStream: public AsyncOutputStream {
public:
kj::Vector<byte> bytes;
kj::String decompress(WaitScope& ws) {
MockAsyncInputStream rawInput(bytes, kj::maxValue);
GzipAsyncInputStream gzip(rawInput);
return gzip.readAllText().wait(ws);
}
Promise<void> write(const void* buffer, size_t size) override {
bytes.addAll(arrayPtr(reinterpret_cast<const byte*>(buffer), size));
return kj::READY_NOW;
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
for (auto& piece: pieces) {
bytes.addAll(piece);
}
return kj::READY_NOW;
}
};
KJ_TEST("gzip decompression") { KJ_TEST("gzip decompression") {
// Normal read. // Normal read.
{ {
...@@ -168,49 +210,24 @@ KJ_TEST("async gzip decompression") { ...@@ -168,49 +210,24 @@ KJ_TEST("async gzip decompression") {
KJ_EXPECT(gzip.readAllText().wait(io.waitScope) == "foobarfoobar"); KJ_EXPECT(gzip.readAllText().wait(io.waitScope) == "foobarfoobar");
} }
}
class MockOutputStream: public OutputStream {
public:
kj::Vector<byte> bytes;
kj::String decompress() {
MockInputStream rawInput(bytes, kj::maxValue);
GzipInputStream gzip(rawInput);
return gzip.readAllText();
}
void write(const void* buffer, size_t size) override { // Decompress using an output stream.
bytes.addAll(arrayPtr(reinterpret_cast<const byte*>(buffer), size)); {
} MockAsyncOutputStream rawOutput;
void write(ArrayPtr<const ArrayPtr<const byte>> pieces) override { GzipAsyncOutputStream gzip(rawOutput, GzipAsyncOutputStream::DECOMPRESS);
for (auto& piece: pieces) {
bytes.addAll(piece);
}
}
};
class MockAsyncOutputStream: public AsyncOutputStream { auto mid = sizeof(FOOBAR_GZIP) / 2;
public: gzip.write(FOOBAR_GZIP, mid).wait(io.waitScope);
kj::Vector<byte> bytes; auto str1 = kj::heapString(rawOutput.bytes.asPtr().asChars());
KJ_EXPECT(str1 == "fo", str1);
kj::String decompress(WaitScope& ws) { gzip.write(FOOBAR_GZIP + mid, sizeof(FOOBAR_GZIP) - mid).wait(io.waitScope);
MockAsyncInputStream rawInput(bytes, kj::maxValue); auto str2 = kj::heapString(rawOutput.bytes.asPtr().asChars());
GzipAsyncInputStream gzip(rawInput); KJ_EXPECT(str2 == "foobar", str2);
return gzip.readAllText().wait(ws);
}
Promise<void> write(const void* buffer, size_t size) override { gzip.end().wait(io.waitScope);
bytes.addAll(arrayPtr(reinterpret_cast<const byte*>(buffer), size));
return kj::READY_NOW;
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
for (auto& piece: pieces) {
bytes.addAll(piece);
}
return kj::READY_NOW;
} }
}; }
KJ_TEST("gzip compression") { KJ_TEST("gzip compression") {
// Normal write. // Normal write.
...@@ -291,8 +308,18 @@ KJ_TEST("async gzip compression") { ...@@ -291,8 +308,18 @@ KJ_TEST("async gzip compression") {
{ {
MockAsyncOutputStream rawOutput; MockAsyncOutputStream rawOutput;
GzipAsyncOutputStream gzip(rawOutput); GzipAsyncOutputStream gzip(rawOutput);
gzip.write("foo", 3).wait(io.waitScope); gzip.write("foo", 3).wait(io.waitScope);
auto prevSize = rawOutput.bytes.size();
gzip.write("bar", 3).wait(io.waitScope); gzip.write("bar", 3).wait(io.waitScope);
auto curSize = rawOutput.bytes.size();
KJ_EXPECT(prevSize == curSize, prevSize, curSize);
gzip.flush().wait(io.waitScope);
curSize = rawOutput.bytes.size();
KJ_EXPECT(prevSize < curSize, prevSize, curSize);
gzip.end().wait(io.waitScope); gzip.end().wait(io.waitScope);
KJ_EXPECT(rawOutput.decompress(io.waitScope) == "foobar"); KJ_EXPECT(rawOutput.decompress(io.waitScope) == "foobar");
......
...@@ -26,14 +26,65 @@ ...@@ -26,14 +26,65 @@
namespace kj { namespace kj {
namespace _ { // private
GzipOutputContext::GzipOutputContext(kj::Maybe<int> compressionLevel) {
int initResult;
KJ_IF_MAYBE(level, compressionLevel) {
compressing = true;
initResult =
deflateInit2(&ctx, *level, Z_DEFLATED,
15 + 16, // windowBits = 15 (maximum) + magic value 16 to ask for gzip.
8, // memLevel = 8 (the default)
Z_DEFAULT_STRATEGY);
} else {
compressing = false;
initResult = inflateInit2(&ctx, 15 + 16);
}
if (initResult != Z_OK) {
fail(initResult);
}
}
GzipOutputContext::~GzipOutputContext() noexcept(false) {
compressing ? deflateEnd(&ctx) : inflateEnd(&ctx);
}
void GzipOutputContext::setInput(const void* in, size_t size) {
ctx.next_in = const_cast<byte*>(reinterpret_cast<const byte*>(in));
ctx.avail_in = size;
}
kj::Tuple<bool, kj::ArrayPtr<const byte>> GzipOutputContext::pumpOnce(int flush) {
ctx.next_out = buffer;
ctx.avail_out = sizeof(buffer);
auto result = compressing ? deflate(&ctx, flush) : inflate(&ctx, flush);
if (result != Z_OK && result != Z_BUF_ERROR && result != Z_STREAM_END) {
fail(result);
}
// - Z_STREAM_END means we have finished the stream successfully.
// - Z_BUF_ERROR means we didn't have any more input to process
// (but still have to make a call to write to potentially flush data).
return kj::tuple(result == Z_OK, kj::arrayPtr(buffer, sizeof(buffer) - ctx.avail_out));
}
void GzipOutputContext::fail(int result) {
auto header = compressing ? "gzip compression failed" : "gzip decompression failed";
if (ctx.msg == nullptr) {
KJ_FAIL_REQUIRE(header, result);
} else {
KJ_FAIL_REQUIRE(header, ctx.msg);
}
}
} // namespace _ (private)
GzipInputStream::GzipInputStream(InputStream& inner) GzipInputStream::GzipInputStream(InputStream& inner)
: inner(inner) { : inner(inner) {
memset(&ctx, 0, sizeof(ctx));
ctx.next_in = nullptr;
ctx.avail_in = 0;
ctx.next_out = nullptr;
ctx.avail_out = 0;
// windowBits = 15 (maximum) + magic value 16 to ask for gzip. // windowBits = 15 (maximum) + magic value 16 to ask for gzip.
KJ_ASSERT(inflateInit2(&ctx, 15 + 16) == Z_OK); KJ_ASSERT(inflateInit2(&ctx, 15 + 16) == Z_OK);
} }
...@@ -92,80 +143,34 @@ size_t GzipInputStream::readImpl( ...@@ -92,80 +143,34 @@ size_t GzipInputStream::readImpl(
// ======================================================================================= // =======================================================================================
GzipOutputStream::GzipOutputStream(OutputStream& inner, int compressionLevel) GzipOutputStream::GzipOutputStream(OutputStream& inner, int compressionLevel)
: inner(inner) { : inner(inner), ctx(compressionLevel) {}
memset(&ctx, 0, sizeof(ctx));
ctx.next_in = nullptr;
ctx.avail_in = 0;
ctx.next_out = nullptr;
ctx.avail_out = 0;
int initResult =
deflateInit2(&ctx, compressionLevel, Z_DEFLATED,
15 + 16, // windowBits = 15 (maximum) + magic value 16 to ask for gzip.
8, // memLevel = 8 (the default)
Z_DEFAULT_STRATEGY);
KJ_ASSERT(initResult == Z_OK, initResult);
}
GzipOutputStream::~GzipOutputStream() noexcept(false) { GzipOutputStream::GzipOutputStream(OutputStream& inner, decltype(DECOMPRESS))
KJ_DEFER(deflateEnd(&ctx)); : inner(inner), ctx(nullptr) {}
for (;;) {
ctx.next_out = buffer;
ctx.avail_out = sizeof(buffer);
auto deflateResult = deflate(&ctx, Z_FINISH); GzipOutputStream::~GzipOutputStream() noexcept(false) {
if (deflateResult != Z_OK && deflateResult != Z_STREAM_END) { pump(Z_FINISH);
if (ctx.msg == nullptr) {
KJ_FAIL_REQUIRE("gzip compression failed", deflateResult);
} else {
KJ_FAIL_REQUIRE("gzip compression failed", ctx.msg);
}
}
size_t n = sizeof(buffer) - ctx.avail_out;
inner.write(buffer, n);
if (deflateResult == Z_STREAM_END) {
break;
}
}
} }
void GzipOutputStream::write(const void* in, size_t size) { void GzipOutputStream::write(const void* in, size_t size) {
ctx.next_in = const_cast<byte*>(reinterpret_cast<const byte*>(in)); ctx.setInput(in, size);
ctx.avail_in = size; pump(Z_NO_FLUSH);
pump();
} }
void GzipOutputStream::pump() { void GzipOutputStream::pump(int flush) {
while (ctx.avail_in > 0) { bool ok;
ctx.next_out = buffer; do {
ctx.avail_out = sizeof(buffer); auto result = ctx.pumpOnce(flush);
ok = get<0>(result);
auto deflateResult = deflate(&ctx, Z_NO_FLUSH); auto chunk = get<1>(result);
if (deflateResult != Z_OK) { inner.write(chunk.begin(), chunk.size());
if (ctx.msg == nullptr) { } while (ok);
KJ_FAIL_REQUIRE("gzip compression failed", deflateResult);
} else {
KJ_FAIL_REQUIRE("gzip compression failed", ctx.msg);
}
}
size_t n = sizeof(buffer) - ctx.avail_out;
inner.write(buffer, n);
}
} }
// ======================================================================================= // =======================================================================================
GzipAsyncInputStream::GzipAsyncInputStream(AsyncInputStream& inner) GzipAsyncInputStream::GzipAsyncInputStream(AsyncInputStream& inner)
: inner(inner) { : inner(inner) {
memset(&ctx, 0, sizeof(ctx));
ctx.next_in = nullptr;
ctx.avail_in = 0;
ctx.next_out = nullptr;
ctx.avail_out = 0;
// windowBits = 15 (maximum) + magic value 16 to ask for gzip. // windowBits = 15 (maximum) + magic value 16 to ask for gzip.
KJ_ASSERT(inflateInit2(&ctx, 15 + 16) == Z_OK); KJ_ASSERT(inflateInit2(&ctx, 15 + 16) == Z_OK);
} }
...@@ -227,34 +232,17 @@ Promise<size_t> GzipAsyncInputStream::readImpl( ...@@ -227,34 +232,17 @@ Promise<size_t> GzipAsyncInputStream::readImpl(
// ======================================================================================= // =======================================================================================
GzipAsyncOutputStream::GzipAsyncOutputStream(AsyncOutputStream& inner, int compressionLevel) GzipAsyncOutputStream::GzipAsyncOutputStream(AsyncOutputStream& inner, int compressionLevel)
: inner(inner) { : inner(inner), ctx(compressionLevel) {}
memset(&ctx, 0, sizeof(ctx));
ctx.next_in = nullptr;
ctx.avail_in = 0;
ctx.next_out = nullptr;
ctx.avail_out = 0;
int initResult =
deflateInit2(&ctx, compressionLevel, Z_DEFLATED,
15 + 16, // windowBits = 15 (maximum) + magic value 16 to ask for gzip.
8, // memLevel = 8 (the default)
Z_DEFAULT_STRATEGY);
KJ_ASSERT(initResult == Z_OK, initResult);
}
GzipAsyncOutputStream::~GzipAsyncOutputStream() noexcept(false) { GzipAsyncOutputStream::GzipAsyncOutputStream(AsyncOutputStream& inner, decltype(DECOMPRESS))
deflateEnd(&ctx); : inner(inner), ctx(nullptr) {}
}
Promise<void> GzipAsyncOutputStream::write(const void* in, size_t size) { Promise<void> GzipAsyncOutputStream::write(const void* in, size_t size) {
ctx.next_in = const_cast<byte*>(reinterpret_cast<const byte*>(in)); ctx.setInput(in, size);
ctx.avail_in = size; return pump(Z_NO_FLUSH);
return pump();
} }
Promise<void> GzipAsyncOutputStream::write(ArrayPtr<const ArrayPtr<const byte>> pieces) { Promise<void> GzipAsyncOutputStream::write(ArrayPtr<const ArrayPtr<const byte>> pieces) {
KJ_REQUIRE(!ended, "already ended");
if (pieces.size() == 0) return kj::READY_NOW; if (pieces.size() == 0) return kj::READY_NOW;
return write(pieces[0].begin(), pieces[0].size()) return write(pieces[0].begin(), pieces[0].size())
.then([this,pieces]() { .then([this,pieces]() {
...@@ -262,51 +250,15 @@ Promise<void> GzipAsyncOutputStream::write(ArrayPtr<const ArrayPtr<const byte>> ...@@ -262,51 +250,15 @@ Promise<void> GzipAsyncOutputStream::write(ArrayPtr<const ArrayPtr<const byte>>
}); });
} }
Promise<void> GzipAsyncOutputStream::end() { kj::Promise<void> GzipAsyncOutputStream::pump(int flush) {
KJ_REQUIRE(!ended, "already ended"); auto result = ctx.pumpOnce(flush);
auto ok = get<0>(result);
ctx.next_out = buffer; auto chunk = get<1>(result);
ctx.avail_out = sizeof(buffer); auto promise = inner.write(chunk.begin(), chunk.size());
if (ok) {
auto deflateResult = deflate(&ctx, Z_FINISH); promise = promise.then([this, flush]() { return pump(flush); });
if (deflateResult == Z_OK || deflateResult == Z_STREAM_END) {
size_t n = sizeof(buffer) - ctx.avail_out;
auto promise = inner.write(buffer, n);
if (deflateResult == Z_OK) {
return promise.then([this]() { return end(); });
} else {
ended = true;
return promise;
}
} else {
if (ctx.msg == nullptr) {
KJ_FAIL_REQUIRE("gzip compression failed", deflateResult);
} else {
KJ_FAIL_REQUIRE("gzip compression failed", ctx.msg);
}
}
}
kj::Promise<void> GzipAsyncOutputStream::pump() {
if (ctx.avail_in == 0) {
return kj::READY_NOW;
}
ctx.next_out = buffer;
ctx.avail_out = sizeof(buffer);
auto deflateResult = deflate(&ctx, Z_NO_FLUSH);
if (deflateResult == Z_OK) {
size_t n = sizeof(buffer) - ctx.avail_out;
return inner.write(buffer, n)
.then([this]() { return pump(); });
} else {
if (ctx.msg == nullptr) {
KJ_FAIL_REQUIRE("gzip compression failed", deflateResult);
} else {
KJ_FAIL_REQUIRE("gzip compression failed", ctx.msg);
}
} }
return promise;
} }
} // namespace kj } // namespace kj
......
...@@ -27,6 +27,27 @@ ...@@ -27,6 +27,27 @@
namespace kj { namespace kj {
namespace _ { // private
class GzipOutputContext final {
public:
GzipOutputContext(kj::Maybe<int> compressionLevel);
~GzipOutputContext() noexcept(false);
KJ_DISALLOW_COPY(GzipOutputContext);
void setInput(const void* in, size_t size);
kj::Tuple<bool, kj::ArrayPtr<const byte>> pumpOnce(int flush);
private:
bool compressing;
z_stream ctx = {};
byte buffer[4096];
void fail(int result);
};
} // namespace _ (private)
class GzipInputStream final: public InputStream { class GzipInputStream final: public InputStream {
public: public:
GzipInputStream(InputStream& inner); GzipInputStream(InputStream& inner);
...@@ -37,7 +58,7 @@ public: ...@@ -37,7 +58,7 @@ public:
private: private:
InputStream& inner; InputStream& inner;
z_stream ctx; z_stream ctx = {};
bool atValidEndpoint = false; bool atValidEndpoint = false;
byte buffer[4096]; byte buffer[4096];
...@@ -47,20 +68,25 @@ private: ...@@ -47,20 +68,25 @@ private:
class GzipOutputStream final: public OutputStream { class GzipOutputStream final: public OutputStream {
public: public:
enum { DECOMPRESS };
GzipOutputStream(OutputStream& inner, int compressionLevel = Z_DEFAULT_COMPRESSION); GzipOutputStream(OutputStream& inner, int compressionLevel = Z_DEFAULT_COMPRESSION);
GzipOutputStream(OutputStream& inner, decltype(DECOMPRESS));
~GzipOutputStream() noexcept(false); ~GzipOutputStream() noexcept(false);
KJ_DISALLOW_COPY(GzipOutputStream); KJ_DISALLOW_COPY(GzipOutputStream);
void write(const void* buffer, size_t size) override; void write(const void* buffer, size_t size) override;
using OutputStream::write; using OutputStream::write;
inline void flush() {
pump(Z_SYNC_FLUSH);
}
private: private:
OutputStream& inner; OutputStream& inner;
z_stream ctx; _::GzipOutputContext ctx;
byte buffer[4096];
void pump(); void pump(int flush);
}; };
class GzipAsyncInputStream final: public AsyncInputStream { class GzipAsyncInputStream final: public AsyncInputStream {
...@@ -73,7 +99,7 @@ public: ...@@ -73,7 +99,7 @@ public:
private: private:
AsyncInputStream& inner; AsyncInputStream& inner;
z_stream ctx; z_stream ctx = {};
bool atValidEndpoint = false; bool atValidEndpoint = false;
byte buffer[4096]; byte buffer[4096];
...@@ -83,26 +109,32 @@ private: ...@@ -83,26 +109,32 @@ private:
class GzipAsyncOutputStream final: public AsyncOutputStream { class GzipAsyncOutputStream final: public AsyncOutputStream {
public: public:
enum { DECOMPRESS };
GzipAsyncOutputStream(AsyncOutputStream& inner, int compressionLevel = Z_DEFAULT_COMPRESSION); GzipAsyncOutputStream(AsyncOutputStream& inner, int compressionLevel = Z_DEFAULT_COMPRESSION);
~GzipAsyncOutputStream() noexcept(false); GzipAsyncOutputStream(AsyncOutputStream& inner, decltype(DECOMPRESS));
KJ_DISALLOW_COPY(GzipAsyncOutputStream); KJ_DISALLOW_COPY(GzipAsyncOutputStream);
Promise<void> write(const void* buffer, size_t size) override; Promise<void> write(const void* buffer, size_t size) override;
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override; Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override;
Promise<void> end(); inline Promise<void> flush() {
// Must call to flush the stream, since some data may be buffered. return pump(Z_SYNC_FLUSH);
}
// Call if you need to flush a stream at an arbitrary data point.
Promise<void> end() {
return pump(Z_FINISH);
}
// Must call to flush and finish the stream, since some data may be buffered.
// //
// TODO(cleanup): This should be a virtual method on AsyncOutputStream. // TODO(cleanup): This should be a virtual method on AsyncOutputStream.
private: private:
AsyncOutputStream& inner; AsyncOutputStream& inner;
bool ended = false; _::GzipOutputContext ctx;
z_stream ctx;
byte buffer[4096];
kj::Promise<void> pump(); kj::Promise<void> pump(int flush);
}; };
} // namespace kj } // namespace kj
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment