Commit 85397657 authored by Kenton Varda's avatar Kenton Varda Committed by Kenton Varda

Implement readiness-based async I/O wrapper around KJ's completion-based streams.

parent 3c254baf
// Copyright (c) 2016 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#include "readiness-io.h"
#include <kj/test.h>
#include <stdlib.h>
namespace kj {
namespace {
KJ_TEST("readiness IO: write small") {
auto io = setupAsyncIo();
auto pipe = io.provider->newOneWayPipe();
char buf[4];
auto readPromise = pipe.in->read(buf, 3, 4);
ReadyOutputStreamWrapper out(*pipe.out);
KJ_ASSERT(KJ_ASSERT_NONNULL(out.write(kj::StringPtr("foo").asBytes())) == 3);
KJ_ASSERT(readPromise.wait(io.waitScope) == 3);
buf[3] = '\0';
KJ_ASSERT(kj::StringPtr(buf) == "foo");
}
KJ_TEST("readiness IO: write many odd") {
auto io = setupAsyncIo();
auto pipe = io.provider->newOneWayPipe();
ReadyOutputStreamWrapper out(*pipe.out);
size_t totalWritten = 0;
for (;;) {
KJ_IF_MAYBE(n, out.write(kj::StringPtr("bar").asBytes())) {
totalWritten += *n;
if (*n < 3) {
break;
}
} else {
KJ_FAIL_ASSERT("pipe buffer is divisible by 3? really?");
}
}
auto buf = kj::heapArray<char>(totalWritten + 1);
size_t n = pipe.in->read(buf.begin(), totalWritten, buf.size()).wait(io.waitScope);
KJ_ASSERT(n == totalWritten);
for (size_t i = 0; i < totalWritten; i++) {
KJ_ASSERT(buf[i] == "bar"[i%3]);
}
}
KJ_TEST("readiness IO: write even") {
auto io = setupAsyncIo();
auto pipe = io.provider->newOneWayPipe();
ReadyOutputStreamWrapper out(*pipe.out);
size_t totalWritten = 0;
for (;;) {
KJ_IF_MAYBE(n, out.write(kj::StringPtr("ba").asBytes())) {
totalWritten += *n;
if (*n < 2) {
KJ_FAIL_ASSERT("pipe buffer is not divisible by 2? really?");
}
} else {
break;
}
}
auto buf = kj::heapArray<char>(totalWritten + 1);
size_t n = pipe.in->read(buf.begin(), totalWritten, buf.size()).wait(io.waitScope);
KJ_ASSERT(n == totalWritten);
for (size_t i = 0; i < totalWritten; i++) {
KJ_ASSERT(buf[i] == "ba"[i%2]);
}
}
KJ_TEST("readiness IO: read small") {
auto io = setupAsyncIo();
auto pipe = io.provider->newOneWayPipe();
ReadyInputStreamWrapper in(*pipe.in);
char buf[4];
KJ_ASSERT(in.read(kj::ArrayPtr<char>(buf).asBytes()) == nullptr);
pipe.out->write("foo", 3).wait(io.waitScope);
in.whenReady().wait(io.waitScope);
KJ_ASSERT(KJ_ASSERT_NONNULL(in.read(kj::ArrayPtr<char>(buf).asBytes())) == 3);
buf[3] = '\0';
KJ_ASSERT(kj::StringPtr(buf) == "foo");
pipe.out = nullptr;
kj::Maybe<size_t> finalRead;
for (;;) {
finalRead = in.read(kj::ArrayPtr<char>(buf).asBytes());
KJ_IF_MAYBE(n, finalRead) {
KJ_ASSERT(*n == 0);
break;
} else {
in.whenReady().wait(io.waitScope);
}
}
}
KJ_TEST("readiness IO: read many odd") {
auto io = setupAsyncIo();
auto pipe = io.provider->newOneWayPipe();
char dummy[8192];
for (auto i: kj::indices(dummy)) {
dummy[i] = "bar"[i%3];
}
auto writeTask = pipe.out->write(dummy, sizeof(dummy)).then([&]() {
// shutdown
pipe.out = nullptr;
}).eagerlyEvaluate(nullptr);
ReadyInputStreamWrapper in(*pipe.in);
char buf[3];
for (;;) {
auto result = in.read(kj::ArrayPtr<char>(buf).asBytes());
KJ_IF_MAYBE(n, result) {
for (size_t i = 0; i < *n; i++) {
KJ_ASSERT(buf[i] == "bar"[i]);
}
KJ_ASSERT(*n != 0, "ended at wrong spot");
if (*n < 3) {
break;
}
} else {
in.whenReady().wait(io.waitScope);
}
}
kj::Maybe<size_t> finalRead;
for (;;) {
finalRead = in.read(kj::ArrayPtr<char>(buf).asBytes());
KJ_IF_MAYBE(n, finalRead) {
KJ_ASSERT(*n == 0);
break;
} else {
in.whenReady().wait(io.waitScope);
}
}
}
KJ_TEST("readiness IO: read many even") {
auto io = setupAsyncIo();
auto pipe = io.provider->newOneWayPipe();
// Abort on hang.
// TODO(now): Remove this.
io.provider->getTimer().afterDelay(1 * kj::SECONDS).then([]() {
abort();
}).detach([](kj::Exception&&) {});
char dummy[8192];
for (auto i: kj::indices(dummy)) {
dummy[i] = "ba"[i%2];
}
auto writeTask = pipe.out->write(dummy, sizeof(dummy)).then([&]() {
// shutdown
pipe.out = nullptr;
}).eagerlyEvaluate(nullptr);
ReadyInputStreamWrapper in(*pipe.in);
char buf[2];
for (;;) {
auto result = in.read(kj::ArrayPtr<char>(buf).asBytes());
KJ_IF_MAYBE(n, result) {
for (size_t i = 0; i < *n; i++) {
KJ_ASSERT(buf[i] == "ba"[i]);
}
if (*n == 0) {
break;
}
KJ_ASSERT(*n == 2, "ended at wrong spot");
} else {
in.whenReady().wait(io.waitScope);
}
}
kj::Maybe<size_t> finalRead;
for (;;) {
finalRead = in.read(kj::ArrayPtr<char>(buf).asBytes());
KJ_IF_MAYBE(n, finalRead) {
KJ_ASSERT(*n == 0);
break;
} else {
in.whenReady().wait(io.waitScope);
}
}
}
} // namespace
} // namespace kj
// Copyright (c) 2016 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#include "readiness-io.h"
namespace kj {
static size_t copyInto(kj::ArrayPtr<byte> dst, kj::ArrayPtr<const byte>& src) {
size_t n = kj::min(dst.size(), src.size());
memcpy(dst.begin(), src.begin(), n);
src = src.slice(n, src.size());
return n;
}
// =======================================================================================
ReadyInputStreamWrapper::ReadyInputStreamWrapper(AsyncInputStream& input): input(input) {}
ReadyInputStreamWrapper::~ReadyInputStreamWrapper() noexcept(false) {}
kj::Maybe<size_t> ReadyInputStreamWrapper::read(kj::ArrayPtr<byte> dst) {
if (eof || dst.size() == 0) return size_t(0);
if (content.size() == 0) {
// No data available. Try to read more.
if (!isPumping) {
isPumping = true;
pumpTask = kj::evalNow([&]() {
return input.tryRead(buffer, 1, sizeof(buffer)).then([this](size_t n) {
if (n == 0) {
eof = true;
} else {
content = kj::arrayPtr(buffer, n);
}
}).attach(kj::defer([this]() {isPumping = false;}));
}).fork();
}
return nullptr;
}
return copyInto(dst, content);
}
kj::Promise<void> ReadyInputStreamWrapper::whenReady() {
return pumpTask.addBranch();
}
// =======================================================================================
ReadyOutputStreamWrapper::ReadyOutputStreamWrapper(AsyncOutputStream& output): output(output) {}
ReadyOutputStreamWrapper::~ReadyOutputStreamWrapper() noexcept(false) {}
kj::Maybe<size_t> ReadyOutputStreamWrapper::write(kj::ArrayPtr<const byte> data) {
if (data.size() == 0) return size_t(0);
if (filled == sizeof(buffer)) {
// No space.
return nullptr;
}
uint end = start + filled;
size_t result = 0;
if (end < sizeof(buffer)) {
// The filled part of the buffer is somewhere in the middle.
// Copy into space after filled space.
result += copyInto(kj::arrayPtr(buffer + end, buffer + sizeof(buffer)), data);
// Copy into space before filled space.
result += copyInto(kj::arrayPtr(buffer, buffer + start), data);
} else {
// Fill currently loops, so we only have one segment of empty space to copy into.
// Copy into the space between the fill's end and the fill's start.
result += copyInto(kj::arrayPtr(buffer + end % sizeof(buffer), buffer + start), data);
}
filled += result;
if (!isPumping) {
isPumping = true;
pumpTask = kj::evalNow([&]() {
return pump().attach(kj::defer([this]() {isPumping = false;}));
}).fork();
}
return result;
}
kj::Promise<void> ReadyOutputStreamWrapper::whenReady() {
return pumpTask.addBranch();
}
kj::Promise<void> ReadyOutputStreamWrapper::pump() {
uint oldFilled = filled;
uint end = start + filled;
kj::Promise<void> promise = nullptr;
if (end <= sizeof(buffer)) {
promise = output.write(buffer + start, filled);
} else {
end = end % sizeof(buffer);
segments[0] = kj::arrayPtr(buffer + start, buffer + sizeof(buffer));
segments[1] = kj::arrayPtr(buffer, buffer + end);
promise = output.write(segments);
}
return promise.then([this,oldFilled,end]() -> kj::Promise<void> {
filled -= oldFilled;
start = end;
if (filled > 0) {
return pump();
} else {
return kj::READY_NOW;
}
});
}
} // namespace kj
// Copyright (c) 2016 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#ifndef KJ_COMPAT_READINESS_IO_H_
#define KJ_COMPAT_READINESS_IO_H_
#include <kj/async-io.h>
namespace kj {
class ReadyInputStreamWrapper {
// Provides readiness-based Async I/O as a wrapper around KJ's standard completion-based API, for
// compatibility with libraries that use readiness-based abstractions (e.g. OpenSSL).
//
// Unfortunately this requires buffering, so is not very efficient.
public:
ReadyInputStreamWrapper(AsyncInputStream& input);
~ReadyInputStreamWrapper() noexcept(false);
KJ_DISALLOW_COPY(ReadyInputStreamWrapper);
kj::Maybe<size_t> read(kj::ArrayPtr<byte> dst);
// Reads bytes into `dst`, returning the number of bytes read. Returns zero only at EOF. Returns
// nullptr if not ready.
kj::Promise<void> whenReady();
// Returns a promise that resolves when read() will return non-null.
private:
AsyncInputStream& input;
kj::ForkedPromise<void> pumpTask = nullptr;
bool isPumping = false;
bool eof = false;
kj::ArrayPtr<const byte> content = nullptr; // Points to currently-valid part of `buffer`.
byte buffer[8192];
};
class ReadyOutputStreamWrapper {
// Provides readiness-based Async I/O as a wrapper around KJ's standard completion-based API, for
// compatibility with libraries that use readiness-based abstractions (e.g. OpenSSL).
//
// Unfortunately this requires buffering, so is not very efficient.
public:
ReadyOutputStreamWrapper(AsyncOutputStream& output);
~ReadyOutputStreamWrapper() noexcept(false);
KJ_DISALLOW_COPY(ReadyOutputStreamWrapper);
kj::Maybe<size_t> write(kj::ArrayPtr<const byte> src);
// Writes bytes from `src`, returning the number of bytes written. Never returns zero. Returns
// nullptr if not ready.
kj::Promise<void> whenReady();
// Returns a promise that resolves when write() will return non-null.
private:
AsyncOutputStream& output;
ArrayPtr<const byte> segments[2];
kj::ForkedPromise<void> pumpTask = nullptr;
bool isPumping = false;
uint start = 0; // index of first byte
uint filled = 0; // number of bytes currently in buffer
byte buffer[8192];
kj::Promise<void> pump();
// Asyncronously push the buffer out to the underlying stream.
};
} // namespace kj
#endif // KJ_COMPAT_READINESS_IO_H_
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment