Commit 6c3ff2ce authored by Kenton Varda's avatar Kenton Varda

gzip bindings: Apply Harris' review comments.

parent c2b2b5f5
...@@ -65,21 +65,21 @@ KJ_TEST("gzip decompression") { ...@@ -65,21 +65,21 @@ KJ_TEST("gzip decompression") {
// Normal read. // Normal read.
{ {
MockInputStream rawInput(FOOBAR_GZIP, kj::maxValue); MockInputStream rawInput(FOOBAR_GZIP, kj::maxValue);
GzipInputStream gzip(rawInput); GzipAsyncInputStream gzip(rawInput);
KJ_EXPECT(gzip.readAllText().wait(io.waitScope) == "foobar"); KJ_EXPECT(gzip.readAllText().wait(io.waitScope) == "foobar");
} }
// Force read one byte at a time. // Force read one byte at a time.
{ {
MockInputStream rawInput(FOOBAR_GZIP, 1); MockInputStream rawInput(FOOBAR_GZIP, 1);
GzipInputStream gzip(rawInput); GzipAsyncInputStream gzip(rawInput);
KJ_EXPECT(gzip.readAllText().wait(io.waitScope) == "foobar"); KJ_EXPECT(gzip.readAllText().wait(io.waitScope) == "foobar");
} }
// Read truncated input. // Read truncated input.
{ {
MockInputStream rawInput(kj::arrayPtr(FOOBAR_GZIP, sizeof(FOOBAR_GZIP) / 2), kj::maxValue); MockInputStream rawInput(kj::arrayPtr(FOOBAR_GZIP, sizeof(FOOBAR_GZIP) / 2), kj::maxValue);
GzipInputStream gzip(rawInput); GzipAsyncInputStream gzip(rawInput);
char text[16]; char text[16];
size_t n = gzip.tryRead(text, 1, sizeof(text)).wait(io.waitScope); size_t n = gzip.tryRead(text, 1, sizeof(text)).wait(io.waitScope);
...@@ -96,7 +96,7 @@ KJ_TEST("gzip decompression") { ...@@ -96,7 +96,7 @@ KJ_TEST("gzip decompression") {
bytes.addAll(ArrayPtr<const byte>(FOOBAR_GZIP)); bytes.addAll(ArrayPtr<const byte>(FOOBAR_GZIP));
bytes.addAll(ArrayPtr<const byte>(FOOBAR_GZIP)); bytes.addAll(ArrayPtr<const byte>(FOOBAR_GZIP));
MockInputStream rawInput(bytes, kj::maxValue); MockInputStream rawInput(bytes, kj::maxValue);
GzipInputStream gzip(rawInput); GzipAsyncInputStream gzip(rawInput);
KJ_EXPECT(gzip.readAllText().wait(io.waitScope) == "foobarfoobar"); KJ_EXPECT(gzip.readAllText().wait(io.waitScope) == "foobarfoobar");
} }
...@@ -108,7 +108,7 @@ public: ...@@ -108,7 +108,7 @@ public:
kj::String decompress(WaitScope& ws) { kj::String decompress(WaitScope& ws) {
MockInputStream rawInput(bytes, kj::maxValue); MockInputStream rawInput(bytes, kj::maxValue);
GzipInputStream gzip(rawInput); GzipAsyncInputStream gzip(rawInput);
return gzip.readAllText().wait(ws); return gzip.readAllText().wait(ws);
} }
...@@ -130,7 +130,7 @@ KJ_TEST("gzip compression") { ...@@ -130,7 +130,7 @@ KJ_TEST("gzip compression") {
// Normal write. // Normal write.
{ {
MockOutputStream rawOutput; MockOutputStream rawOutput;
GzipOutputStream gzip(rawOutput); GzipAsyncOutputStream gzip(rawOutput);
gzip.write("foobar", 6).wait(io.waitScope); gzip.write("foobar", 6).wait(io.waitScope);
gzip.end().wait(io.waitScope); gzip.end().wait(io.waitScope);
...@@ -140,13 +140,28 @@ KJ_TEST("gzip compression") { ...@@ -140,13 +140,28 @@ KJ_TEST("gzip compression") {
// Multi-part write. // Multi-part write.
{ {
MockOutputStream rawOutput; MockOutputStream rawOutput;
GzipOutputStream gzip(rawOutput); GzipAsyncOutputStream gzip(rawOutput);
gzip.write("foo", 3).wait(io.waitScope); gzip.write("foo", 3).wait(io.waitScope);
gzip.write("bar", 3).wait(io.waitScope); gzip.write("bar", 3).wait(io.waitScope);
gzip.end().wait(io.waitScope); gzip.end().wait(io.waitScope);
KJ_EXPECT(rawOutput.decompress(io.waitScope) == "foobar"); KJ_EXPECT(rawOutput.decompress(io.waitScope) == "foobar");
} }
// Array-of-arrays write.
{
MockOutputStream rawOutput;
GzipAsyncOutputStream gzip(rawOutput);
ArrayPtr<const byte> pieces[] = {
kj::StringPtr("foo").asBytes(),
kj::StringPtr("bar").asBytes(),
};
gzip.write(pieces).wait(io.waitScope);
gzip.end().wait(io.waitScope);
KJ_EXPECT(rawOutput.decompress(io.waitScope) == "foobar");
}
} }
KJ_TEST("gzip huge round trip") { KJ_TEST("gzip huge round trip") {
...@@ -158,12 +173,12 @@ KJ_TEST("gzip huge round trip") { ...@@ -158,12 +173,12 @@ KJ_TEST("gzip huge round trip") {
} }
MockOutputStream rawOutput; MockOutputStream rawOutput;
GzipOutputStream gzipOut(rawOutput); GzipAsyncOutputStream gzipOut(rawOutput);
gzipOut.write(bytes.begin(), bytes.size()).wait(io.waitScope); gzipOut.write(bytes.begin(), bytes.size()).wait(io.waitScope);
gzipOut.end().wait(io.waitScope); gzipOut.end().wait(io.waitScope);
MockInputStream rawInput(rawOutput.bytes, kj::maxValue); MockInputStream rawInput(rawOutput.bytes, kj::maxValue);
GzipInputStream gzipIn(rawInput); GzipAsyncInputStream gzipIn(rawInput);
auto decompressed = gzipIn.readAllBytes().wait(io.waitScope); auto decompressed = gzipIn.readAllBytes().wait(io.waitScope);
KJ_ASSERT(decompressed.size() == bytes.size()); KJ_ASSERT(decompressed.size() == bytes.size());
......
...@@ -24,7 +24,7 @@ ...@@ -24,7 +24,7 @@
namespace kj { namespace kj {
GzipInputStream::GzipInputStream(AsyncInputStream& inner) GzipAsyncInputStream::GzipAsyncInputStream(AsyncInputStream& inner)
: inner(inner) { : inner(inner) {
memset(&ctx, 0, sizeof(ctx)); memset(&ctx, 0, sizeof(ctx));
ctx.next_in = nullptr; ctx.next_in = nullptr;
...@@ -36,17 +36,17 @@ GzipInputStream::GzipInputStream(AsyncInputStream& inner) ...@@ -36,17 +36,17 @@ GzipInputStream::GzipInputStream(AsyncInputStream& inner)
KJ_ASSERT(inflateInit2(&ctx, 15 + 16) == Z_OK); KJ_ASSERT(inflateInit2(&ctx, 15 + 16) == Z_OK);
} }
GzipInputStream::~GzipInputStream() noexcept(false) { GzipAsyncInputStream::~GzipAsyncInputStream() noexcept(false) {
inflateEnd(&ctx); inflateEnd(&ctx);
} }
Promise<size_t> GzipInputStream::tryRead(void* out, size_t minBytes, size_t maxBytes) { Promise<size_t> GzipAsyncInputStream::tryRead(void* out, size_t minBytes, size_t maxBytes) {
if (maxBytes == 0) return size_t(0); if (maxBytes == 0) return size_t(0);
return readImpl(reinterpret_cast<byte*>(out), minBytes, maxBytes, 0); return readImpl(reinterpret_cast<byte*>(out), minBytes, maxBytes, 0);
} }
Promise<size_t> GzipInputStream::readImpl( Promise<size_t> GzipAsyncInputStream::readImpl(
byte* out, size_t minBytes, size_t maxBytes, size_t alreadyRead) { byte* out, size_t minBytes, size_t maxBytes, size_t alreadyRead) {
if (ctx.avail_in == 0) { if (ctx.avail_in == 0) {
return inner.tryRead(buffer, 1, sizeof(buffer)) return inner.tryRead(buffer, 1, sizeof(buffer))
...@@ -90,7 +90,7 @@ Promise<size_t> GzipInputStream::readImpl( ...@@ -90,7 +90,7 @@ Promise<size_t> GzipInputStream::readImpl(
// ======================================================================================= // =======================================================================================
GzipOutputStream::GzipOutputStream(AsyncOutputStream& inner, int compressionLevel) GzipAsyncOutputStream::GzipAsyncOutputStream(AsyncOutputStream& inner, int compressionLevel)
: inner(inner) { : inner(inner) {
memset(&ctx, 0, sizeof(ctx)); memset(&ctx, 0, sizeof(ctx));
ctx.next_in = nullptr; ctx.next_in = nullptr;
...@@ -106,17 +106,17 @@ GzipOutputStream::GzipOutputStream(AsyncOutputStream& inner, int compressionLeve ...@@ -106,17 +106,17 @@ GzipOutputStream::GzipOutputStream(AsyncOutputStream& inner, int compressionLeve
KJ_ASSERT(initResult == Z_OK, initResult); KJ_ASSERT(initResult == Z_OK, initResult);
} }
GzipOutputStream::~GzipOutputStream() noexcept(false) { GzipAsyncOutputStream::~GzipAsyncOutputStream() noexcept(false) {
deflateEnd(&ctx); deflateEnd(&ctx);
} }
Promise<void> GzipOutputStream::write(const void* in, size_t size) { Promise<void> GzipAsyncOutputStream::write(const void* in, size_t size) {
ctx.next_in = const_cast<byte*>(reinterpret_cast<const byte*>(in)); ctx.next_in = const_cast<byte*>(reinterpret_cast<const byte*>(in));
ctx.avail_in = size; ctx.avail_in = size;
return pump(); return pump();
} }
Promise<void> GzipOutputStream::write(ArrayPtr<const ArrayPtr<const byte>> pieces) { Promise<void> GzipAsyncOutputStream::write(ArrayPtr<const ArrayPtr<const byte>> pieces) {
KJ_REQUIRE(!ended, "already ended"); KJ_REQUIRE(!ended, "already ended");
if (pieces.size() == 0) return kj::READY_NOW; if (pieces.size() == 0) return kj::READY_NOW;
...@@ -126,7 +126,7 @@ Promise<void> GzipOutputStream::write(ArrayPtr<const ArrayPtr<const byte>> piece ...@@ -126,7 +126,7 @@ Promise<void> GzipOutputStream::write(ArrayPtr<const ArrayPtr<const byte>> piece
}); });
} }
Promise<void> GzipOutputStream::end() { Promise<void> GzipAsyncOutputStream::end() {
KJ_REQUIRE(!ended, "already ended"); KJ_REQUIRE(!ended, "already ended");
ctx.next_out = buffer; ctx.next_out = buffer;
...@@ -151,7 +151,7 @@ Promise<void> GzipOutputStream::end() { ...@@ -151,7 +151,7 @@ Promise<void> GzipOutputStream::end() {
} }
} }
kj::Promise<void> GzipOutputStream::pump() { kj::Promise<void> GzipAsyncOutputStream::pump() {
if (ctx.avail_in == 0) { if (ctx.avail_in == 0) {
return kj::READY_NOW; return kj::READY_NOW;
} }
......
...@@ -27,10 +27,11 @@ ...@@ -27,10 +27,11 @@
namespace kj { namespace kj {
class GzipInputStream: public AsyncInputStream { class GzipAsyncInputStream: public AsyncInputStream {
public: public:
GzipInputStream(AsyncInputStream& inner); GzipAsyncInputStream(AsyncInputStream& inner);
~GzipInputStream() noexcept(false); ~GzipAsyncInputStream() noexcept(false);
KJ_DISALLOW_COPY(GzipAsyncInputStream);
Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override; Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override;
...@@ -44,10 +45,11 @@ private: ...@@ -44,10 +45,11 @@ private:
Promise<size_t> readImpl(byte* buffer, size_t minBytes, size_t maxBytes, size_t alreadyRead); Promise<size_t> readImpl(byte* buffer, size_t minBytes, size_t maxBytes, size_t alreadyRead);
}; };
class GzipOutputStream: public AsyncOutputStream { class GzipAsyncOutputStream: public AsyncOutputStream {
public: public:
GzipOutputStream(AsyncOutputStream& inner, int compressionLevel = Z_DEFAULT_COMPRESSION); GzipAsyncOutputStream(AsyncOutputStream& inner, int compressionLevel = Z_DEFAULT_COMPRESSION);
~GzipOutputStream() noexcept(false); ~GzipAsyncOutputStream() noexcept(false);
KJ_DISALLOW_COPY(GzipAsyncOutputStream);
Promise<void> write(const void* buffer, size_t size) override; Promise<void> write(const void* buffer, size_t size) override;
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override; Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment