Unverified Commit 3529a6ee authored by Kenton Varda's avatar Kenton Varda Committed by GitHub

Merge pull request #661 from capnproto/userspace-pipe

 Implement in-process byte stream pipes.
parents 394643ba 8ec0cbb5
......@@ -661,5 +661,633 @@ KJ_TEST("Network::restrictPeers()") {
KJ_EXPECT(conn->readAllText().wait(w) == "");
}
kj::Promise<void> expectRead(kj::AsyncInputStream& in, kj::StringPtr expected) {
if (expected.size() == 0) return kj::READY_NOW;
auto buffer = kj::heapArray<char>(expected.size());
auto promise = in.tryRead(buffer.begin(), 1, buffer.size());
return promise.then(kj::mvCapture(buffer, [&in,expected](kj::Array<char> buffer, size_t amount) {
if (amount == 0) {
KJ_FAIL_ASSERT("expected data never sent", expected);
}
auto actual = buffer.slice(0, amount);
if (memcmp(actual.begin(), expected.begin(), actual.size()) != 0) {
KJ_FAIL_ASSERT("data from stream doesn't match expected", expected, actual);
}
return expectRead(in, expected.slice(amount));
}));
}
KJ_TEST("Userland pipe") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto promise = pipe.out->write("foo", 3);
KJ_EXPECT(!promise.poll(ws));
char buf[4];
KJ_EXPECT(pipe.in->tryRead(buf, 1, 4).wait(ws) == 3);
buf[3] = '\0';
KJ_EXPECT(buf == "foo"_kj);
promise.wait(ws);
auto promise2 = pipe.in->readAllText();
KJ_EXPECT(!promise2.poll(ws));
pipe.out = nullptr;
KJ_EXPECT(promise2.wait(ws) == "");
}
KJ_TEST("Userland pipe cancel write") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto promise = pipe.out->write("foobar", 6);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe.in, "foo").wait(ws);
KJ_EXPECT(!promise.poll(ws));
promise = nullptr;
promise = pipe.out->write("baz", 3);
expectRead(*pipe.in, "baz").wait(ws);
promise.wait(ws);
pipe.out = nullptr;
KJ_EXPECT(pipe.in->readAllText().wait(ws) == "");
}
KJ_TEST("Userland pipe cancel read") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto writeOp = pipe.out->write("foo", 3);
auto readOp = expectRead(*pipe.in, "foobar");
writeOp.wait(ws);
KJ_EXPECT(!readOp.poll(ws));
readOp = nullptr;
auto writeOp2 = pipe.out->write("baz", 3);
expectRead(*pipe.in, "baz").wait(ws);
}
KJ_TEST("Userland pipe pumpTo") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = pipe.in->pumpTo(*pipe2.out);
auto promise = pipe.out->write("foo", 3);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "foo").wait(ws);
promise.wait(ws);
auto promise2 = pipe2.in->readAllText();
KJ_EXPECT(!promise2.poll(ws));
pipe.out = nullptr;
KJ_EXPECT(pumpPromise.wait(ws) == 3);
}
KJ_TEST("Userland pipe tryPumpFrom") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in));
auto promise = pipe.out->write("foo", 3);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "foo").wait(ws);
promise.wait(ws);
auto promise2 = pipe2.in->readAllText();
KJ_EXPECT(!promise2.poll(ws));
pipe.out = nullptr;
KJ_EXPECT(!promise2.poll(ws));
KJ_EXPECT(pumpPromise.wait(ws) == 3);
}
KJ_TEST("Userland pipe pumpTo cancel") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = pipe.in->pumpTo(*pipe2.out);
auto promise = pipe.out->write("foobar", 3);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "foo").wait(ws);
// Cancel pump.
pumpPromise = nullptr;
auto promise3 = pipe2.out->write("baz", 3);
expectRead(*pipe2.in, "baz").wait(ws);
}
KJ_TEST("Userland pipe tryPumpFrom cancel") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in));
auto promise = pipe.out->write("foobar", 3);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "foo").wait(ws);
// Cancel pump.
pumpPromise = nullptr;
auto promise3 = pipe2.out->write("baz", 3);
expectRead(*pipe2.in, "baz").wait(ws);
}
KJ_TEST("Userland pipe with limit") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe(6);
{
auto promise = pipe.out->write("foo", 3);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe.in, "foo").wait(ws);
promise.wait(ws);
}
{
auto promise = pipe.in->readAllText();
KJ_EXPECT(!promise.poll(ws));
auto promise2 = pipe.out->write("barbaz", 6);
KJ_EXPECT(promise.wait(ws) == "bar");
KJ_EXPECT_THROW_MESSAGE("read end of pipe was aborted", promise2.wait(ws));
}
// Further writes throw and reads return EOF.
KJ_EXPECT_THROW_MESSAGE("abortRead() has been called", pipe.out->write("baz", 3).wait(ws));
KJ_EXPECT(pipe.in->readAllText().wait(ws) == "");
}
KJ_TEST("Userland pipe pumpTo with limit") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe(6);
auto pipe2 = newOneWayPipe();
auto pumpPromise = pipe.in->pumpTo(*pipe2.out);
{
auto promise = pipe.out->write("foo", 3);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "foo").wait(ws);
promise.wait(ws);
}
{
auto promise = expectRead(*pipe2.in, "bar");
KJ_EXPECT(!promise.poll(ws));
auto promise2 = pipe.out->write("barbaz", 6);
promise.wait(ws);
pumpPromise.wait(ws);
KJ_EXPECT_THROW_MESSAGE("read end of pipe was aborted", promise2.wait(ws));
}
// Further writes throw.
KJ_EXPECT_THROW_MESSAGE("abortRead() has been called", pipe.out->write("baz", 3).wait(ws));
}
KJ_TEST("Userland pipe gather write") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
auto promise = pipe.out->write(parts);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe.in, "foobar").wait(ws);
promise.wait(ws);
auto promise2 = pipe.in->readAllText();
KJ_EXPECT(!promise2.poll(ws));
pipe.out = nullptr;
KJ_EXPECT(promise2.wait(ws) == "");
}
KJ_TEST("Userland pipe gather write split on buffer boundary") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
auto promise = pipe.out->write(parts);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe.in, "foo").wait(ws);
expectRead(*pipe.in, "bar").wait(ws);
promise.wait(ws);
auto promise2 = pipe.in->readAllText();
KJ_EXPECT(!promise2.poll(ws));
pipe.out = nullptr;
KJ_EXPECT(promise2.wait(ws) == "");
}
KJ_TEST("Userland pipe gather write split mid-first-buffer") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
auto promise = pipe.out->write(parts);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe.in, "fo").wait(ws);
expectRead(*pipe.in, "obar").wait(ws);
promise.wait(ws);
auto promise2 = pipe.in->readAllText();
KJ_EXPECT(!promise2.poll(ws));
pipe.out = nullptr;
KJ_EXPECT(promise2.wait(ws) == "");
}
KJ_TEST("Userland pipe gather write split mid-second-buffer") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
auto promise = pipe.out->write(parts);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe.in, "foob").wait(ws);
expectRead(*pipe.in, "ar").wait(ws);
promise.wait(ws);
auto promise2 = pipe.in->readAllText();
KJ_EXPECT(!promise2.poll(ws));
pipe.out = nullptr;
KJ_EXPECT(promise2.wait(ws) == "");
}
KJ_TEST("Userland pipe gather write pump") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = pipe.in->pumpTo(*pipe2.out);
ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
auto promise = pipe.out->write(parts);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "foobar").wait(ws);
promise.wait(ws);
pipe.out = nullptr;
KJ_EXPECT(pumpPromise.wait(ws) == 6);
}
KJ_TEST("Userland pipe gather write pump split on buffer boundary") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = pipe.in->pumpTo(*pipe2.out);
ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
auto promise = pipe.out->write(parts);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "foo").wait(ws);
expectRead(*pipe2.in, "bar").wait(ws);
promise.wait(ws);
pipe.out = nullptr;
KJ_EXPECT(pumpPromise.wait(ws) == 6);
}
KJ_TEST("Userland pipe gather write pump split mid-first-buffer") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = pipe.in->pumpTo(*pipe2.out);
ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
auto promise = pipe.out->write(parts);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "fo").wait(ws);
expectRead(*pipe2.in, "obar").wait(ws);
promise.wait(ws);
pipe.out = nullptr;
KJ_EXPECT(pumpPromise.wait(ws) == 6);
}
KJ_TEST("Userland pipe gather write pump split mid-second-buffer") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = pipe.in->pumpTo(*pipe2.out);
ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
auto promise = pipe.out->write(parts);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "foob").wait(ws);
expectRead(*pipe2.in, "ar").wait(ws);
promise.wait(ws);
pipe.out = nullptr;
KJ_EXPECT(pumpPromise.wait(ws) == 6);
}
KJ_TEST("Userland pipe gather write split pump on buffer boundary") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = pipe.in->pumpTo(*pipe2.out, 3)
.then([&](uint64_t i) {
KJ_EXPECT(i == 3);
return pipe.in->pumpTo(*pipe2.out, 3);
});
ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
auto promise = pipe.out->write(parts);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "foobar").wait(ws);
promise.wait(ws);
pipe.out = nullptr;
KJ_EXPECT(pumpPromise.wait(ws) == 3);
}
KJ_TEST("Userland pipe gather write split pump mid-first-buffer") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = pipe.in->pumpTo(*pipe2.out, 2)
.then([&](uint64_t i) {
KJ_EXPECT(i == 2);
return pipe.in->pumpTo(*pipe2.out, 4);
});
ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
auto promise = pipe.out->write(parts);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "foobar").wait(ws);
promise.wait(ws);
pipe.out = nullptr;
KJ_EXPECT(pumpPromise.wait(ws) == 4);
}
KJ_TEST("Userland pipe gather write split pump mid-second-buffer") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = pipe.in->pumpTo(*pipe2.out, 4)
.then([&](uint64_t i) {
KJ_EXPECT(i == 4);
return pipe.in->pumpTo(*pipe2.out, 2);
});
ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
auto promise = pipe.out->write(parts);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "foobar").wait(ws);
promise.wait(ws);
pipe.out = nullptr;
KJ_EXPECT(pumpPromise.wait(ws) == 2);
}
KJ_TEST("Userland pipe gather write pumpFrom") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in));
ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
auto promise = pipe.out->write(parts);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "foobar").wait(ws);
promise.wait(ws);
pipe.out = nullptr;
char c;
auto eofPromise = pipe2.in->tryRead(&c, 1, 1);
eofPromise.poll(ws); // force pump to notice EOF
KJ_EXPECT(pumpPromise.wait(ws) == 6);
pipe2.out = nullptr;
KJ_EXPECT(eofPromise.wait(ws) == 0);
}
KJ_TEST("Userland pipe gather write pumpFrom split on buffer boundary") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in));
ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
auto promise = pipe.out->write(parts);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "foo").wait(ws);
expectRead(*pipe2.in, "bar").wait(ws);
promise.wait(ws);
pipe.out = nullptr;
char c;
auto eofPromise = pipe2.in->tryRead(&c, 1, 1);
eofPromise.poll(ws); // force pump to notice EOF
KJ_EXPECT(pumpPromise.wait(ws) == 6);
pipe2.out = nullptr;
KJ_EXPECT(eofPromise.wait(ws) == 0);
}
KJ_TEST("Userland pipe gather write pumpFrom split mid-first-buffer") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in));
ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
auto promise = pipe.out->write(parts);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "fo").wait(ws);
expectRead(*pipe2.in, "obar").wait(ws);
promise.wait(ws);
pipe.out = nullptr;
char c;
auto eofPromise = pipe2.in->tryRead(&c, 1, 1);
eofPromise.poll(ws); // force pump to notice EOF
KJ_EXPECT(pumpPromise.wait(ws) == 6);
pipe2.out = nullptr;
KJ_EXPECT(eofPromise.wait(ws) == 0);
}
KJ_TEST("Userland pipe gather write pumpFrom split mid-second-buffer") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in));
ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
auto promise = pipe.out->write(parts);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "foob").wait(ws);
expectRead(*pipe2.in, "ar").wait(ws);
promise.wait(ws);
pipe.out = nullptr;
char c;
auto eofPromise = pipe2.in->tryRead(&c, 1, 1);
eofPromise.poll(ws); // force pump to notice EOF
KJ_EXPECT(pumpPromise.wait(ws) == 6);
pipe2.out = nullptr;
KJ_EXPECT(eofPromise.wait(ws) == 0);
}
KJ_TEST("Userland pipe gather write split pumpFrom on buffer boundary") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 3))
.then([&](uint64_t i) {
KJ_EXPECT(i == 3);
return KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 3));
});
ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
auto promise = pipe.out->write(parts);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "foobar").wait(ws);
promise.wait(ws);
pipe.out = nullptr;
KJ_EXPECT(pumpPromise.wait(ws) == 3);
}
KJ_TEST("Userland pipe gather write split pumpFrom mid-first-buffer") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 2))
.then([&](uint64_t i) {
KJ_EXPECT(i == 2);
return KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 4));
});
ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
auto promise = pipe.out->write(parts);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "foobar").wait(ws);
promise.wait(ws);
pipe.out = nullptr;
KJ_EXPECT(pumpPromise.wait(ws) == 4);
}
KJ_TEST("Userland pipe gather write split pumpFrom mid-second-buffer") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 4))
.then([&](uint64_t i) {
KJ_EXPECT(i == 4);
return KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in, 2));
});
ArrayPtr<const byte> parts[] = { "foo"_kj.asBytes(), "bar"_kj.asBytes() };
auto promise = pipe.out->write(parts);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "foobar").wait(ws);
promise.wait(ws);
pipe.out = nullptr;
KJ_EXPECT(pumpPromise.wait(ws) == 2);
}
KJ_TEST("Userland pipe pumpTo less than write amount") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = pipe.in->pumpTo(*pipe2.out, 1);
auto pieces = kj::heapArray<ArrayPtr<const byte>>(2);
byte a[1] = { 'a' };
byte b[1] = { 'b' };
pieces[0] = arrayPtr(a, 1);
pieces[1] = arrayPtr(b, 1);
auto writePromise = pipe.out->write(pieces);
KJ_EXPECT(!writePromise.poll(ws));
expectRead(*pipe2.in, "a").wait(ws);
KJ_EXPECT(pumpPromise.wait(ws) == 1);
KJ_EXPECT(!writePromise.poll(ws));
pumpPromise = pipe.in->pumpTo(*pipe2.out, 1);
expectRead(*pipe2.in, "b").wait(ws);
KJ_EXPECT(pumpPromise.wait(ws) == 1);
writePromise.wait(ws);
}
} // namespace
} // namespace kj
......@@ -30,6 +30,7 @@
#include "debug.h"
#include "vector.h"
#include "io.h"
#include "one-of.h"
#if _WIN32
#include <winsock2.h>
......@@ -185,6 +186,923 @@ Maybe<Promise<uint64_t>> AsyncOutputStream::tryPumpFrom(
return nullptr;
}
namespace {
class AsyncPipe final: public AsyncIoStream, public Refcounted {
public:
~AsyncPipe() noexcept(false) {
KJ_REQUIRE(state == nullptr || ownState.get() != nullptr,
"destroying AsyncPipe with operation still in-progress; probably going to segfault") {
// Don't std::terminate().
break;
}
}
Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
if (minBytes == 0) {
return size_t(0);
} else KJ_IF_MAYBE(s, state) {
return s->tryRead(buffer, minBytes, maxBytes);
} else {
return newAdaptedPromise<size_t, BlockedRead>(
*this, arrayPtr(reinterpret_cast<byte*>(buffer), maxBytes), minBytes);
}
}
Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override {
if (amount == 0) {
return uint64_t(0);
} else KJ_IF_MAYBE(s, state) {
return s->pumpTo(output, amount);
} else {
return newAdaptedPromise<uint64_t, BlockedPumpTo>(*this, output, amount);
}
}
void abortRead() override {
KJ_IF_MAYBE(s, state) {
s->abortRead();
} else {
ownState = kj::heap<AbortedRead>();
state = *ownState;
}
}
Promise<void> write(const void* buffer, size_t size) override {
if (size == 0) {
return READY_NOW;
} else KJ_IF_MAYBE(s, state) {
return s->write(buffer, size);
} else {
return newAdaptedPromise<void, BlockedWrite>(
*this, arrayPtr(reinterpret_cast<const byte*>(buffer), size), nullptr);
}
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
while (pieces.size() > 0 && pieces[0].size() == 0) {
pieces = pieces.slice(1, pieces.size());
}
if (pieces.size() == 0) {
return kj::READY_NOW;
} else KJ_IF_MAYBE(s, state) {
return s->write(pieces);
} else {
return newAdaptedPromise<void, BlockedWrite>(
*this, pieces[0], pieces.slice(1, pieces.size()));
}
}
Maybe<Promise<uint64_t>> tryPumpFrom(
AsyncInputStream& input, uint64_t amount) override {
if (amount == 0) {
return Promise<uint64_t>(uint64_t(0));
} else KJ_IF_MAYBE(s, state) {
return s->tryPumpFrom(input, amount);
} else {
return newAdaptedPromise<uint64_t, BlockedPumpFrom>(*this, input, amount);
}
}
void shutdownWrite() override {
KJ_IF_MAYBE(s, state) {
s->shutdownWrite();
} else {
ownState = kj::heap<ShutdownedWrite>();
state = *ownState;
}
}
private:
Maybe<AsyncIoStream&> state;
// Object-oriented state! If any method call is blocked waiting on activity from the other end,
// then `state` is non-null and method calls should be forwarded to it. If no calls are
// outstanding, `state` is null.
kj::Own<AsyncIoStream> ownState;
void endState(AsyncIoStream& obj) {
KJ_IF_MAYBE(s, state) {
if (s == &obj) {
state = nullptr;
}
}
}
class BlockedWrite final: public AsyncIoStream {
// AsyncPipe state when a write() is currently waiting for a corresponding read().
public:
BlockedWrite(PromiseFulfiller<void>& fulfiller, AsyncPipe& pipe,
ArrayPtr<const byte> writeBuffer,
ArrayPtr<const ArrayPtr<const byte>> morePieces)
: fulfiller(fulfiller), pipe(pipe), writeBuffer(writeBuffer), morePieces(morePieces) {
KJ_REQUIRE(pipe.state == nullptr);
pipe.state = *this;
}
~BlockedWrite() noexcept(false) {
pipe.endState(*this);
}
Promise<size_t> tryRead(void* readBufferPtr, size_t minBytes, size_t maxBytes) override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
auto readBuffer = arrayPtr(reinterpret_cast<byte*>(readBufferPtr), maxBytes);
size_t totalRead = 0;
while (readBuffer.size() >= writeBuffer.size()) {
// The whole current write buffer can be copied into the read buffer.
{
auto n = writeBuffer.size();
memcpy(readBuffer.begin(), writeBuffer.begin(), n);
totalRead += n;
readBuffer = readBuffer.slice(n, readBuffer.size());
}
if (morePieces.size() == 0) {
// All done writing.
fulfiller.fulfill();
pipe.endState(*this);
if (totalRead >= minBytes) {
// Also all done reading.
return totalRead;
} else {
return pipe.tryRead(readBuffer.begin(), minBytes - totalRead, readBuffer.size())
.then([totalRead](size_t amount) { return amount + totalRead; });
}
}
writeBuffer = morePieces[0];
morePieces = morePieces.slice(1, morePieces.size());
}
// At this point, the read buffer is smaller than the current write buffer, so we can fill
// it completely.
{
auto n = readBuffer.size();
memcpy(readBuffer.begin(), writeBuffer.begin(), n);
writeBuffer = writeBuffer.slice(n, writeBuffer.size());
totalRead += n;
}
return totalRead;
}
Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
if (amount < writeBuffer.size()) {
// Consume a portion of the write buffer.
return canceler.wrap(output.write(writeBuffer.begin(), amount)
.then([this,amount]() {
writeBuffer = writeBuffer.slice(amount, writeBuffer.size());
// We pumped the full amount, so we're done pumping.
return amount;
}));
}
// First piece doesn't cover the whole pump. Figure out how many more pieces to add.
uint64_t actual = writeBuffer.size();
size_t i = 0;
while (i < morePieces.size() &&
amount >= actual + morePieces[i].size()) {
actual += morePieces[i++].size();
}
// Write the first piece.
auto promise = output.write(writeBuffer.begin(), writeBuffer.size());
// Write full pieces as a singcle gather-write.
if (i > 0) {
auto more = morePieces.slice(0, i);
promise = promise.then([&output,more]() { return output.write(more); });
}
if (i == morePieces.size()) {
// This will complete the write.
return canceler.wrap(promise.then([this,&output,amount,actual]() -> Promise<uint64_t> {
canceler.release();
fulfiller.fulfill();
pipe.endState(*this);
if (actual == amount) {
// Oh, we had exactly enough.
return actual;
} else {
return pipe.pumpTo(output, amount - actual)
.then([actual](uint64_t actual2) { return actual + actual2; });
}
}));
} else {
// Pump ends mid-piece. Write the last, partial piece.
auto n = amount - actual;
auto splitPiece = morePieces[i];
KJ_ASSERT(n <= splitPiece.size());
auto newWriteBuffer = splitPiece.slice(n, splitPiece.size());
auto newMorePieces = morePieces.slice(i + 1, morePieces.size());
auto prefix = splitPiece.slice(0, n);
if (prefix.size() > 0) {
promise = promise.then([&output,prefix]() {
return output.write(prefix.begin(), prefix.size());
});
}
return canceler.wrap(promise.then([this,newWriteBuffer,newMorePieces,amount]() {
writeBuffer = newWriteBuffer;
morePieces = newMorePieces;
canceler.release();
return amount;
}));
}
}
void abortRead() override {
canceler.cancel("abortRead() was called");
fulfiller.reject(KJ_EXCEPTION(FAILED, "read end of pipe was aborted"));
pipe.endState(*this);
pipe.abortRead();
}
Promise<void> write(const void* buffer, size_t size) override {
KJ_FAIL_REQUIRE("can't write() again until previous write() completes");
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
KJ_FAIL_REQUIRE("can't write() again until previous write() completes");
}
Maybe<Promise<uint64_t>> tryPumpFrom(AsyncInputStream& input, uint64_t amount) override {
KJ_FAIL_REQUIRE("can't tryPumpFrom() again until previous write() completes");
}
void shutdownWrite() override {
KJ_FAIL_REQUIRE("can't shutdownWrite() until previous write() completes");
}
private:
PromiseFulfiller<void>& fulfiller;
AsyncPipe& pipe;
ArrayPtr<const byte> writeBuffer;
ArrayPtr<const ArrayPtr<const byte>> morePieces;
Canceler canceler;
};
class BlockedPumpFrom final: public AsyncIoStream {
// AsyncPipe state when a tryPumpFrom() is currently waiting for a corresponding read().
public:
BlockedPumpFrom(PromiseFulfiller<uint64_t>& fulfiller, AsyncPipe& pipe,
AsyncInputStream& input, uint64_t amount)
: fulfiller(fulfiller), pipe(pipe), input(input), amount(amount) {
KJ_REQUIRE(pipe.state == nullptr);
pipe.state = *this;
}
~BlockedPumpFrom() noexcept(false) {
pipe.endState(*this);
}
Promise<size_t> tryRead(void* readBuffer, size_t minBytes, size_t maxBytes) override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
auto pumpLeft = amount - pumpedSoFar;
auto min = kj::min(pumpLeft, minBytes);
auto max = kj::min(pumpLeft, maxBytes);
return canceler.wrap(input.tryRead(readBuffer, min, max)
.then([this,readBuffer,minBytes,maxBytes,min](size_t actual) -> kj::Promise<size_t> {
canceler.release();
pumpedSoFar += actual;
KJ_ASSERT(pumpedSoFar <= amount);
if (pumpedSoFar == amount || actual < min) {
// Either we pumped all we wanted or we hit EOF.
fulfiller.fulfill(kj::cp(pumpedSoFar));
pipe.endState(*this);
}
if (actual >= minBytes) {
return actual;
} else {
return pipe.tryRead(reinterpret_cast<byte*>(readBuffer) + actual,
minBytes - actual, maxBytes - actual)
.then([actual](size_t actual2) { return actual + actual2; });
}
}));
}
Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount2) override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
auto n = kj::min(amount2, amount - pumpedSoFar);
return canceler.wrap(input.pumpTo(output, n)
.then([this,&output,amount2,n](uint64_t actual) -> Promise<uint64_t> {
canceler.release();
pumpedSoFar += actual;
KJ_ASSERT(pumpedSoFar <= amount);
if (pumpedSoFar == amount) {
fulfiller.fulfill(kj::cp(amount));
pipe.endState(*this);
}
KJ_ASSERT(actual <= amount2);
if (actual == amount2) {
// Completed entire pumpTo amount.
return amount2;
} else if (actual < n) {
// Received less than requested, presumably because EOF.
return actual;
} else {
// We received all the bytes that were requested but it didn't complete the pump.
KJ_ASSERT(pumpedSoFar == amount);
return pipe.pumpTo(output, amount2 - actual);
}
}));
}
void abortRead() override {
canceler.cancel("abortRead() was called");
fulfiller.reject(KJ_EXCEPTION(FAILED, "read end of pipe was aborted"));
pipe.endState(*this);
pipe.abortRead();
}
Promise<void> write(const void* buffer, size_t size) override {
KJ_FAIL_REQUIRE("can't write() again until previous tryPumpFrom() completes");
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
KJ_FAIL_REQUIRE("can't write() again until previous tryPumpFrom() completes");
}
Maybe<Promise<uint64_t>> tryPumpFrom(AsyncInputStream& input, uint64_t amount) override {
KJ_FAIL_REQUIRE("can't tryPumpFrom() again until previous tryPumpFrom() completes");
}
void shutdownWrite() override {
KJ_FAIL_REQUIRE("can't shutdownWrite() until previous tryPumpFrom() completes");
}
private:
PromiseFulfiller<uint64_t>& fulfiller;
AsyncPipe& pipe;
AsyncInputStream& input;
uint64_t amount;
uint64_t pumpedSoFar = 0;
Canceler canceler;
};
class BlockedRead final: public AsyncIoStream {
// AsyncPipe state when a tryRead() is currently waiting for a corresponding write().
public:
BlockedRead(PromiseFulfiller<size_t>& fulfiller, AsyncPipe& pipe,
ArrayPtr<byte> readBuffer, size_t minBytes)
: fulfiller(fulfiller), pipe(pipe), readBuffer(readBuffer), minBytes(minBytes) {
KJ_REQUIRE(pipe.state == nullptr);
pipe.state = *this;
}
~BlockedRead() noexcept(false) {
pipe.endState(*this);
}
Promise<size_t> tryRead(void* readBuffer, size_t minBytes, size_t maxBytes) override {
KJ_FAIL_REQUIRE("can't read() again until previous read() completes");
}
Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override {
KJ_FAIL_REQUIRE("can't read() again until previous read() completes");
}
void abortRead() override {
canceler.cancel("abortRead() was called");
fulfiller.reject(KJ_EXCEPTION(FAILED, "read end of pipe was aborted"));
pipe.endState(*this);
pipe.abortRead();
}
Promise<void> write(const void* writeBuffer, size_t size) override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
if (size < readBuffer.size()) {
// Consume a portion of the read buffer.
memcpy(readBuffer.begin(), writeBuffer, size);
readSoFar += size;
readBuffer = readBuffer.slice(size, readBuffer.size());
if (readSoFar >= minBytes) {
// We've read enough to close out this read.
fulfiller.fulfill(kj::cp(readSoFar));
pipe.endState(*this);
}
return READY_NOW;
} else {
// Consume entire read buffer.
auto n = readBuffer.size();
fulfiller.fulfill(readSoFar + n);
pipe.endState(*this);
memcpy(readBuffer.begin(), writeBuffer, n);
if (n == size) {
// That's it.
return READY_NOW;
} else {
return pipe.write(reinterpret_cast<const byte*>(writeBuffer) + n, size - n);
}
}
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
while (pieces.size() > 0) {
if (pieces[0].size() < readBuffer.size()) {
// Consume a portion of the read buffer.
auto n = pieces[0].size();
memcpy(readBuffer.begin(), pieces[0].begin(), n);
readSoFar += n;
readBuffer = readBuffer.slice(n, readBuffer.size());
pieces = pieces.slice(1, pieces.size());
// loop
} else {
// Consume entire read buffer.
auto n = readBuffer.size();
fulfiller.fulfill(readSoFar + n);
pipe.endState(*this);
memcpy(readBuffer.begin(), pieces[0].begin(), n);
auto restOfPiece = pieces[0].slice(n, pieces[0].size());
pieces = pieces.slice(1, pieces.size());
if (restOfPiece.size() == 0) {
// We exactly finished the current piece, so just issue a write for the remaining
// pieces.
if (pieces.size() == 0) {
// Nothing left.
return READY_NOW;
} else {
// Write remaining pieces.
return pipe.write(pieces);
}
} else {
// Unfortunately we have to execute a separate write() for the remaining part of this
// piece, because we can't modify the pieces array.
auto promise = pipe.write(restOfPiece.begin(), restOfPiece.size());
if (pieces.size() > 0) {
// No more pieces so that's it.
return kj::mv(promise);
} else {
// Also need to write the remaining pieces.
auto& pipeRef = pipe;
return promise.then([pieces,&pipeRef]() {
return pipeRef.write(pieces);
});
}
}
}
}
// Consumed all written pieces.
if (readSoFar >= minBytes) {
// We've read enough to close out this read.
fulfiller.fulfill(kj::cp(readSoFar));
pipe.endState(*this);
}
return READY_NOW;
}
Maybe<Promise<uint64_t>> tryPumpFrom(AsyncInputStream& input, uint64_t amount) override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
KJ_ASSERT(minBytes > readSoFar);
auto minToRead = kj::min(amount, minBytes - readSoFar);
auto maxToRead = kj::min(amount, readBuffer.size());
return canceler.wrap(input.tryRead(readBuffer.begin(), minToRead, maxToRead)
.then([this,&input,amount,minToRead](size_t actual) -> Promise<uint64_t> {
readBuffer = readBuffer.slice(actual, readBuffer.size());
readSoFar += actual;
if (readSoFar >= minBytes || actual < minToRead) {
// We've read enough to close out this read (readSoFar >= minBytes)
// OR we reached EOF and couldn't complete the read (actual < minToRead)
// Either way, we want to close out this read.
canceler.release();
fulfiller.fulfill(kj::cp(readSoFar));
pipe.endState(*this);
if (actual < amount) {
// We din't complete pumping. Restart from the pipe.
return input.pumpTo(pipe, amount - actual)
.then([actual](uint64_t actual2) -> uint64_t { return actual + actual2; });
}
}
// If we read less than `actual`, but more than `minToRead`, it can only have been
// because we reached `minBytes`, so the conditional above would have executed. So, here
// we know that actual == amount.
KJ_ASSERT(actual == amount);
// We pumped the full amount, so we're done pumping.
return amount;
}));
}
void shutdownWrite() override {
canceler.cancel("shutdownWrite() was called");
fulfiller.fulfill(kj::cp(readSoFar));
pipe.endState(*this);
pipe.shutdownWrite();
}
private:
PromiseFulfiller<size_t>& fulfiller;
AsyncPipe& pipe;
ArrayPtr<byte> readBuffer;
size_t minBytes;
size_t readSoFar = 0;
Canceler canceler;
};
class BlockedPumpTo final: public AsyncIoStream {
// AsyncPipe state when a pumpTo() is currently waiting for a corresponding write().
public:
BlockedPumpTo(PromiseFulfiller<uint64_t>& fulfiller, AsyncPipe& pipe,
AsyncOutputStream& output, uint64_t amount)
: fulfiller(fulfiller), pipe(pipe), output(output), amount(amount) {
KJ_REQUIRE(pipe.state == nullptr);
pipe.state = *this;
}
~BlockedPumpTo() noexcept(false) {
pipe.endState(*this);
}
Promise<size_t> tryRead(void* readBuffer, size_t minBytes, size_t maxBytes) override {
KJ_FAIL_REQUIRE("can't read() again until previous pumpTo() completes");
}
Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override {
KJ_FAIL_REQUIRE("can't read() again until previous pumpTo() completes");
}
void abortRead() override {
canceler.cancel("abortRead() was called");
fulfiller.reject(KJ_EXCEPTION(FAILED, "read end of pipe was aborted"));
pipe.endState(*this);
pipe.abortRead();
}
Promise<void> write(const void* writeBuffer, size_t size) override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
auto actual = kj::min(amount - pumpedSoFar, size);
return canceler.wrap(output.write(writeBuffer, actual)
.then([this,size,actual,writeBuffer]() -> kj::Promise<void> {
canceler.release();
pumpedSoFar += actual;
KJ_ASSERT(pumpedSoFar <= amount);
KJ_ASSERT(actual <= size);
if (pumpedSoFar == amount) {
// Done with pump.
fulfiller.fulfill(kj::cp(pumpedSoFar));
pipe.endState(*this);
}
if (actual == size) {
return kj::READY_NOW;
} else {
KJ_ASSERT(pumpedSoFar == amount);
return pipe.write(reinterpret_cast<const byte*>(writeBuffer) + actual, size - actual);
}
}));
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
size_t size = 0;
size_t needed = amount - pumpedSoFar;
for (auto i: kj::indices(pieces)) {
if (pieces[i].size() > needed) {
// The pump ends in the middle of this write.
auto promise = output.write(pieces.slice(0, i));
if (needed > 0) {
// The pump includes part of this piece, but not all. Unfortunately we need to split
// writes.
auto partial = pieces[i].slice(0, needed);
promise = promise.then([this,partial]() {
return output.write(partial.begin(), partial.size());
});
auto partial2 = pieces[i].slice(needed, pieces[i].size());
promise = canceler.wrap(promise.then([this,partial2]() {
canceler.release();
fulfiller.fulfill(kj::cp(amount));
pipe.endState(*this);
return pipe.write(partial2.begin(), partial2.size());
}));
++i;
} else {
// The pump ends exactly at the end of a piece, how nice.
promise = canceler.wrap(promise.then([this]() {
canceler.release();
fulfiller.fulfill(kj::cp(amount));
pipe.endState(*this);
}));
}
auto remainder = pieces.slice(i, pieces.size());
if (remainder.size() > 0) {
auto& pipeRef = pipe;
promise = promise.then([&pipeRef,remainder]() {
return pipeRef.write(remainder);
});
}
return promise;
} else {
size += pieces[i].size();
needed -= pieces[i].size();
}
}
// Turns out we can forward this whole write.
KJ_ASSERT(size <= amount - pumpedSoFar);
return canceler.wrap(output.write(pieces).then([this,size]() {
pumpedSoFar += size;
KJ_ASSERT(pumpedSoFar <= amount);
if (pumpedSoFar == amount) {
// Done pumping.
canceler.release();
fulfiller.fulfill(kj::cp(amount));
pipe.endState(*this);
}
}));
}
Maybe<Promise<uint64_t>> tryPumpFrom(AsyncInputStream& input, uint64_t amount2) override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
auto n = kj::min(amount2, amount - pumpedSoFar);
return output.tryPumpFrom(input, n)
.map([&](Promise<uint64_t> subPump) {
return canceler.wrap(subPump
.then([this,&input,amount2,n](uint64_t actual) -> Promise<uint64_t> {
canceler.release();
pumpedSoFar += actual;
KJ_ASSERT(pumpedSoFar <= amount);
if (pumpedSoFar == amount) {
fulfiller.fulfill(kj::cp(amount));
pipe.endState(*this);
}
KJ_ASSERT(actual <= amount2);
if (actual == amount2) {
// Completed entire tryPumpFrom amount.
return amount2;
} else if (actual < n) {
// Received less than requested, presumably because EOF.
return actual;
} else {
// We received all the bytes that were requested but it didn't complete the pump.
KJ_ASSERT(pumpedSoFar == amount);
return input.pumpTo(pipe, amount2 - actual);
}
}));
});
}
void shutdownWrite() override {
canceler.cancel("shutdownWrite() was called");
fulfiller.fulfill(kj::cp(pumpedSoFar));
pipe.endState(*this);
pipe.shutdownWrite();
}
private:
PromiseFulfiller<uint64_t>& fulfiller;
AsyncPipe& pipe;
AsyncOutputStream& output;
uint64_t amount;
size_t pumpedSoFar = 0;
Canceler canceler;
};
class AbortedRead final: public AsyncIoStream {
// AsyncPipe state when abortRead() has been called.
public:
Promise<size_t> tryRead(void* readBufferPtr, size_t minBytes, size_t maxBytes) override {
KJ_FAIL_REQUIRE("abortRead() has been called");
}
Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override {
KJ_FAIL_REQUIRE("abortRead() has been called");
}
void abortRead() override {
// ignore repeated abort
}
Promise<void> write(const void* buffer, size_t size) override {
KJ_FAIL_REQUIRE("abortRead() has been called");
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
KJ_FAIL_REQUIRE("abortRead() has been called");
}
Maybe<Promise<uint64_t>> tryPumpFrom(AsyncInputStream& input, uint64_t amount) override {
KJ_FAIL_REQUIRE("abortRead() has been called");
}
void shutdownWrite() override {
// ignore -- currently shutdownWrite() actually means that the PipeWriteEnd was dropped,
// which is not an error even if reads have been aborted.
}
};
class ShutdownedWrite final: public AsyncIoStream {
// AsyncPipe state when shutdownWrite() has been called.
public:
Promise<size_t> tryRead(void* readBufferPtr, size_t minBytes, size_t maxBytes) override {
return size_t(0);
}
Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override {
return uint64_t(0);
}
void abortRead() override {
// ignore
}
Promise<void> write(const void* buffer, size_t size) override {
KJ_FAIL_REQUIRE("shutdownWrite() has been called");
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
KJ_FAIL_REQUIRE("shutdownWrite() has been called");
}
Maybe<Promise<uint64_t>> tryPumpFrom(AsyncInputStream& input, uint64_t amount) override {
KJ_FAIL_REQUIRE("shutdownWrite() has been called");
}
void shutdownWrite() override {
// ignore -- currently shutdownWrite() actually means that the PipeWriteEnd was dropped,
// so it will only be called once anyhow.
}
};
};
class PipeReadEnd final: public AsyncInputStream {
public:
PipeReadEnd(kj::Own<AsyncPipe> pipe): pipe(kj::mv(pipe)) {}
~PipeReadEnd() noexcept(false) {
unwind.catchExceptionsIfUnwinding([&]() {
pipe->abortRead();
});
}
Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return pipe->tryRead(buffer, minBytes, maxBytes);
}
Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override {
return pipe->pumpTo(output, amount);
}
private:
Own<AsyncPipe> pipe;
UnwindDetector unwind;
};
class PipeWriteEnd final: public AsyncOutputStream {
public:
PipeWriteEnd(kj::Own<AsyncPipe> pipe): pipe(kj::mv(pipe)) {}
~PipeWriteEnd() noexcept(false) {
unwind.catchExceptionsIfUnwinding([&]() {
pipe->shutdownWrite();
});
}
Promise<void> write(const void* buffer, size_t size) override {
return pipe->write(buffer, size);
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
return pipe->write(pieces);
}
Maybe<Promise<uint64_t>> tryPumpFrom(
AsyncInputStream& input, uint64_t amount) override {
return pipe->tryPumpFrom(input, amount);
}
private:
Own<AsyncPipe> pipe;
UnwindDetector unwind;
};
class TwoWayPipeEnd final: public AsyncIoStream {
public:
TwoWayPipeEnd(kj::Own<AsyncPipe> in, kj::Own<AsyncPipe> out)
: in(kj::mv(in)), out(kj::mv(out)) {}
~TwoWayPipeEnd() noexcept(false) {
unwind.catchExceptionsIfUnwinding([&]() {
out->shutdownWrite();
in->abortRead();
});
}
Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return in->tryRead(buffer, minBytes, maxBytes);
}
Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override {
return in->pumpTo(output, amount);
}
void abortRead() override {
in->abortRead();
}
Promise<void> write(const void* buffer, size_t size) override {
return out->write(buffer, size);
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
return out->write(pieces);
}
Maybe<Promise<uint64_t>> tryPumpFrom(
AsyncInputStream& input, uint64_t amount) override {
return out->tryPumpFrom(input, amount);
}
void shutdownWrite() override {
out->shutdownWrite();
}
private:
kj::Own<AsyncPipe> in;
kj::Own<AsyncPipe> out;
UnwindDetector unwind;
};
class LimitedInputStream final: public AsyncInputStream {
public:
LimitedInputStream(kj::Own<AsyncInputStream> inner, uint64_t limit)
: inner(kj::mv(inner)), limit(limit) {
if (limit == 0) {
inner = nullptr;
}
}
Maybe<uint64_t> tryGetLength() override {
return limit;
}
Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
if (limit == 0) return size_t(0);
return inner->tryRead(buffer, kj::min(minBytes, limit), kj::min(maxBytes, limit))
.then([this,minBytes](size_t actual) {
decreaseLimit(actual, minBytes);
return actual;
});
}
Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override {
if (limit == 0) return uint64_t(0);
auto requested = kj::min(amount, limit);
return inner->pumpTo(output, requested)
.then([this,requested](uint64_t actual) {
decreaseLimit(actual, requested);
return actual;
});
}
private:
Own<AsyncInputStream> inner;
uint64_t limit;
void decreaseLimit(uint64_t amount, uint64_t requested) {
KJ_ASSERT(limit >= amount);
limit -= amount;
if (limit == 0) {
inner = nullptr;
} else if (amount < requested) {
KJ_FAIL_REQUIRE("pipe ended prematurely");
}
}
};
} // namespace
OneWayPipe newOneWayPipe(kj::Maybe<uint64_t> expectedLength) {
auto impl = kj::refcounted<AsyncPipe>();
Own<AsyncInputStream> readEnd = kj::heap<PipeReadEnd>(kj::addRef(*impl));
KJ_IF_MAYBE(l, expectedLength) {
readEnd = kj::heap<LimitedInputStream>(kj::mv(readEnd), *l);
}
Own<AsyncOutputStream> writeEnd = kj::heap<PipeWriteEnd>(kj::mv(impl));
return { kj::mv(readEnd), kj::mv(writeEnd) };
}
TwoWayPipe newTwoWayPipe() {
auto pipe1 = kj::refcounted<AsyncPipe>();
auto pipe2 = kj::refcounted<AsyncPipe>();
auto end1 = kj::heap<TwoWayPipeEnd>(kj::addRef(*pipe1), kj::addRef(*pipe2));
auto end2 = kj::heap<TwoWayPipeEnd>(kj::mv(pipe2), kj::mv(pipe1));
return { { kj::mv(end1), kj::mv(end2) } };
}
Promise<Own<AsyncCapabilityStream>> AsyncCapabilityStream::receiveStream() {
return tryReceiveStream()
.then([](Maybe<Own<AsyncCapabilityStream>>&& result)
......
......@@ -175,6 +175,13 @@ struct OneWayPipe {
Own<AsyncOutputStream> out;
};
OneWayPipe newOneWayPipe(kj::Maybe<uint64_t> expectedLength = nullptr);
// Constructs a OneWayPipe that operates in-process. The pipe does not do any buffering -- it waits
// until both a read() and a write() call are pending, then resolves both.
//
// If `expectedLength` is non-null, then the pipe will be expected to transmit exactly that many
// bytes. The input end's `tryGetLength()` will return the number of bytes left.
struct TwoWayPipe {
// A data pipe that supports sending in both directions. Each end's output sends data to the
// other end's input. (Typically backed by socketpair() system call.)
......@@ -182,6 +189,10 @@ struct TwoWayPipe {
Own<AsyncIoStream> ends[2];
};
TwoWayPipe newTwoWayPipe();
// Constructs a TwoWayPipe that operates in-process. The pipe does not do any buffering -- it waits
// until both a read() and a write() call are pending, then resolves both.
struct CapabilityPipe {
// Like TwoWayPipe but allowing capability-passing.
......
......@@ -580,6 +580,29 @@ TEST(Async, ArrayJoinVoid) {
promise.wait(waitScope);
}
TEST(Async, Canceler) {
EventLoop loop;
WaitScope waitScope(loop);
Canceler canceler;
auto never = canceler.wrap(kj::Promise<void>(kj::NEVER_DONE));
auto now = canceler.wrap(kj::Promise<void>(kj::READY_NOW));
auto neverI = canceler.wrap(kj::Promise<void>(kj::NEVER_DONE).then([]() { return 123u; }));
auto nowI = canceler.wrap(kj::Promise<uint>(123u));
KJ_EXPECT(!never.poll(waitScope));
KJ_EXPECT(now.poll(waitScope));
KJ_EXPECT(!neverI.poll(waitScope));
KJ_EXPECT(nowI.poll(waitScope));
canceler.cancel("foobar");
KJ_EXPECT_THROW_MESSAGE("foobar", never.wait(waitScope));
now.wait(waitScope);
KJ_EXPECT_THROW_MESSAGE("foobar", neverI.wait(waitScope));
KJ_EXPECT(nowI.wait(waitScope) == 123u);
}
class ErrorHandlerImpl: public TaskSet::ErrorHandler {
public:
uint exceptionCount = 0;
......
......@@ -84,6 +84,76 @@ public:
} // namespace
// =======================================================================================
Canceler::~Canceler() noexcept(false) {
cancel("operation canceled");
}
void Canceler::cancel(StringPtr cancelReason) {
if (isEmpty()) return;
cancel(Exception(Exception::Type::FAILED, __FILE__, __LINE__, kj::str(cancelReason)));
}
void Canceler::cancel(const Exception& exception) {
for (;;) {
KJ_IF_MAYBE(a, list) {
list = a->next;
a->prev = nullptr;
a->next = nullptr;
a->cancel(kj::cp(exception));
} else {
break;
}
}
}
void Canceler::release() {
for (;;) {
KJ_IF_MAYBE(a, list) {
list = a->next;
a->prev = nullptr;
a->next = nullptr;
} else {
break;
}
}
}
Canceler::AdapterBase::AdapterBase(Canceler& canceler)
: prev(canceler.list),
next(canceler.list) {
canceler.list = *this;
KJ_IF_MAYBE(n, next) {
n->prev = next;
}
}
Canceler::AdapterBase::~AdapterBase() noexcept(false) {
KJ_IF_MAYBE(p, prev) {
*p = next;
}
KJ_IF_MAYBE(n, next) {
n->prev = prev;
}
}
Canceler::AdapterImpl<void>::AdapterImpl(kj::PromiseFulfiller<void>& fulfiller,
Canceler& canceler, kj::Promise<void> inner)
: AdapterBase(canceler),
fulfiller(fulfiller),
inner(inner.then(
[&fulfiller]() { fulfiller.fulfill(); },
[&fulfiller](kj::Exception&& e) { fulfiller.reject(kj::mv(e)); })
.eagerlyEvaluate(nullptr)) {}
void Canceler::AdapterImpl<void>::cancel(kj::Exception&& e) {
fulfiller.reject(kj::mv(e));
inner = nullptr;
}
// =======================================================================================
TaskSet::TaskSet(TaskSet::ErrorHandler& errorHandler)
: errorHandler(errorHandler) {}
......
......@@ -500,6 +500,109 @@ PromiseFulfillerPair<T> newPromiseAndFulfiller();
// fulfiller will be of type `PromiseFulfiller<Promise<U>>`. Thus you pass a `Promise<U>` to the
// `fulfill()` callback, and the promises are chained.
// =======================================================================================
// Canceler
class Canceler {
// A Canceler can wrap some set of Promises and then forcefully cancel them on-demand, or
// implicitly when the Canceler is destroyed.
//
// The cancellation is done in such a way that once cancel() (or the Canceler's destructor)
// returns, it's guaranteed that the promise has already been canceled and destroyed. This
// guarantee is important for enforcing ownership constraints. For example, imagine that Alice
// calls a method on Bob that returns a Promise. That Promise encapsulates a task that uses Bob's
// internal state. But, imagine that Alice does not own Bob, and indeed Bob might be destroyed
// at random without Alice having canceled the promise. In this case, it is necessary for Bob to
// ensure that the promise will be forcefully canceled. Bob can do this by constructing a
// Canceler and using it to wrap promises before returning them to callers. When Bob is
// destroyed, the Canceler is destroyed too, and all promises Bob wrapped with it throw errors.
//
// Note that another common strategy for cancelation is to use exclusiveJoin() to join a promise
// with some "cancellation promise" which only resolves if the operation should be canceled. The
// cancellation promise could itself be created by newPromiseAndFulfiller<void>(), and thus
// calling the PromiseFulfiller cancels the operation. There is a major problem with this
// approach: upon invoking the fulfiller, an arbitrary amount of time may pass before the
// exclusive-joined promise actually resolves and cancels its other fork. During that time, the
// task might continue to execute. If it holds pointers to objects that have been destroyed, this
// might cause segfaults. Thus, it is safer to use a Canceler.
public:
inline Canceler() {}
~Canceler() noexcept(false);
KJ_DISALLOW_COPY(Canceler);
template <typename T>
Promise<T> wrap(Promise<T> promise) {
return newAdaptedPromise<T, AdapterImpl<T>>(*this, kj::mv(promise));
}
void cancel(StringPtr cancelReason);
void cancel(const Exception& exception);
// Cancel all previously-wrapped promises that have not already completed, causing them to throw
// the given exception. If you provide just a description message instead of an exception, then
// an exception object will be constructed from it -- but only if there are requests to cancel.
void release();
// Releases previously-wrapped promises, so that they will not be canceled regardless of what
// happens to this Canceler.
bool isEmpty() { return list == nullptr; }
// Indicates if any previously-wrapped promises are still executing. (If this returns false, then
// cancel() would be a no-op.)
private:
class AdapterBase {
public:
AdapterBase(Canceler& canceler);
~AdapterBase() noexcept(false);
virtual void cancel(Exception&& e) = 0;
private:
Maybe<Maybe<AdapterBase&>&> prev;
Maybe<AdapterBase&> next;
friend class Canceler;
};
template <typename T>
class AdapterImpl: public AdapterBase {
public:
AdapterImpl(PromiseFulfiller<T>& fulfiller,
Canceler& canceler, Promise<T> inner)
: AdapterBase(canceler),
fulfiller(fulfiller),
inner(inner.then(
[&fulfiller](T&& value) { fulfiller.fulfill(kj::mv(value)); },
[&fulfiller](Exception&& e) { fulfiller.reject(kj::mv(e)); })
.eagerlyEvaluate(nullptr)) {}
void cancel(Exception&& e) override {
fulfiller.reject(kj::mv(e));
inner = nullptr;
}
private:
PromiseFulfiller<T>& fulfiller;
Promise<void> inner;
};
Maybe<AdapterBase&> list;
};
template <>
class Canceler::AdapterImpl<void>: public AdapterBase {
public:
AdapterImpl(kj::PromiseFulfiller<void>& fulfiller,
Canceler& canceler, kj::Promise<void> inner);
void cancel(kj::Exception&& e) override;
// These must be defined in async.c++ to prevent translation units compiled by MSVC from trying to
// link with symbols defined in async.c++ merely because they included async.h.
private:
kj::PromiseFulfiller<void>& fulfiller;
kj::Promise<void> inner;
};
// =======================================================================================
// TaskSet
......
......@@ -428,8 +428,8 @@ kj::Promise<void> expectRead(kj::AsyncInputStream& in, kj::ArrayPtr<const byte>
}));
}
void testHttpClientRequest(kj::AsyncIoContext& io, const HttpRequestTestCase& testCase) {
auto pipe = io.provider->newTwoWayPipe();
void testHttpClientRequest(kj::WaitScope& waitScope, const HttpRequestTestCase& testCase) {
auto pipe = kj::newTwoWayPipe();
auto serverTask = expectRead(*pipe.ends[1], testCase.raw).then([&]() {
static const char SIMPLE_RESPONSE[] =
......@@ -451,7 +451,7 @@ void testHttpClientRequest(kj::AsyncIoContext& io, const HttpRequestTestCase& te
auto request = client->request(testCase.method, testCase.path, headers, testCase.requestBodySize);
if (testCase.requestBodyParts.size() > 0) {
writeEach(*request.body, testCase.requestBodyParts).wait(io.waitScope);
writeEach(*request.body, testCase.requestBodyParts).wait(waitScope);
}
request.body = nullptr;
auto clientTask = request.response
......@@ -460,17 +460,17 @@ void testHttpClientRequest(kj::AsyncIoContext& io, const HttpRequestTestCase& te
return promise.attach(kj::mv(response.body));
}).ignoreResult();
serverTask.exclusiveJoin(kj::mv(clientTask)).wait(io.waitScope);
serverTask.exclusiveJoin(kj::mv(clientTask)).wait(waitScope);
// Verify no more data written by client.
client = nullptr;
pipe.ends[0]->shutdownWrite();
KJ_EXPECT(pipe.ends[1]->readAllText().wait(io.waitScope) == "");
KJ_EXPECT(pipe.ends[1]->readAllText().wait(waitScope) == "");
}
void testHttpClientResponse(kj::AsyncIoContext& io, const HttpResponseTestCase& testCase,
void testHttpClientResponse(kj::WaitScope& waitScope, const HttpResponseTestCase& testCase,
size_t readFragmentSize) {
auto pipe = io.provider->newTwoWayPipe();
auto pipe = kj::newTwoWayPipe();
ReadFragmenter fragmenter(*pipe.ends[0], readFragmentSize);
auto expectedReqText = testCase.method == HttpMethod::GET || testCase.method == HttpMethod::HEAD
......@@ -504,12 +504,12 @@ void testHttpClientResponse(kj::AsyncIoContext& io, const HttpResponseTestCase&
KJ_EXPECT(body == kj::strArray(testCase.responseBodyParts, ""), body);
});
serverTask.exclusiveJoin(kj::mv(clientTask)).wait(io.waitScope);
serverTask.exclusiveJoin(kj::mv(clientTask)).wait(waitScope);
// Verify no more data written by client.
client = nullptr;
pipe.ends[0]->shutdownWrite();
KJ_EXPECT(pipe.ends[1]->readAllText().wait(io.waitScope) == "");
KJ_EXPECT(pipe.ends[1]->readAllText().wait(waitScope) == "");
}
class TestHttpService final: public HttpService {
......@@ -581,23 +581,23 @@ private:
uint requestCount = 0;
};
void testHttpServerRequest(kj::AsyncIoContext& io,
void testHttpServerRequest(kj::WaitScope& waitScope, kj::Timer& timer,
const HttpRequestTestCase& requestCase,
const HttpResponseTestCase& responseCase) {
auto pipe = io.provider->newTwoWayPipe();
auto pipe = kj::newTwoWayPipe();
HttpHeaderTable table;
TestHttpService service(requestCase, responseCase, table);
HttpServer server(io.provider->getTimer(), table, service);
HttpServer server(timer, table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
pipe.ends[1]->write(requestCase.raw.begin(), requestCase.raw.size()).wait(io.waitScope);
pipe.ends[1]->write(requestCase.raw.begin(), requestCase.raw.size()).wait(waitScope);
pipe.ends[1]->shutdownWrite();
expectRead(*pipe.ends[1], responseCase.raw).wait(io.waitScope);
expectRead(*pipe.ends[1], responseCase.raw).wait(waitScope);
listenTask.wait(io.waitScope);
listenTask.wait(waitScope);
KJ_EXPECT(service.getRequestCount() == 1);
}
......@@ -818,24 +818,26 @@ kj::ArrayPtr<const HttpResponseTestCase> responseTestCases() {
}
KJ_TEST("HttpClient requests") {
auto io = kj::setupAsyncIo();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
for (auto& testCase: requestTestCases()) {
if (testCase.side == SERVER_ONLY) continue;
KJ_CONTEXT(testCase.raw);
testHttpClientRequest(io, testCase);
testHttpClientRequest(waitScope, testCase);
}
}
KJ_TEST("HttpClient responses") {
auto io = kj::setupAsyncIo();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
size_t FRAGMENT_SIZES[] = { 1, 2, 3, 4, 5, 6, 7, 8, 16, 31, kj::maxValue };
for (auto& testCase: responseTestCases()) {
if (testCase.side == SERVER_ONLY) continue;
for (size_t fragmentSize: FRAGMENT_SIZES) {
KJ_CONTEXT(testCase.raw, fragmentSize);
testHttpClientResponse(io, testCase, fragmentSize);
testHttpClientResponse(waitScope, testCase, fragmentSize);
}
}
}
......@@ -862,12 +864,14 @@ KJ_TEST("HttpServer requests") {
3, {"foo"}
};
auto io = kj::setupAsyncIo();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
for (auto& testCase: requestTestCases()) {
if (testCase.side == CLIENT_ONLY) continue;
KJ_CONTEXT(testCase.raw);
testHttpServerRequest(io, testCase,
testHttpServerRequest(waitScope, timer, testCase,
testCase.method == HttpMethod::HEAD ? HEAD_RESPONSE : RESPONSE);
}
}
......@@ -893,12 +897,14 @@ KJ_TEST("HttpServer responses") {
uint64_t(0), {},
};
auto io = kj::setupAsyncIo();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
for (auto& testCase: responseTestCases()) {
if (testCase.side == CLIENT_ONLY) continue;
KJ_CONTEXT(testCase.raw);
testHttpServerRequest(io,
testHttpServerRequest(waitScope, timer,
testCase.method == HttpMethod::HEAD ? HEAD_REQUEST : REQUEST, testCase);
}
}
......@@ -1038,8 +1044,9 @@ kj::ArrayPtr<const HttpTestCase> pipelineTestCases() {
KJ_TEST("HttpClient pipeline") {
auto PIPELINE_TESTS = pipelineTestCases();
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
auto pipe = kj::newTwoWayPipe();
kj::Promise<void> writeResponsesPromise = kj::READY_NOW;
for (auto& testCase: PIPELINE_TESTS) {
......@@ -1065,14 +1072,14 @@ KJ_TEST("HttpClient pipeline") {
auto request = client->request(
testCase.request.method, testCase.request.path, headers, testCase.request.requestBodySize);
for (auto& part: testCase.request.requestBodyParts) {
request.body->write(part.begin(), part.size()).wait(io.waitScope);
request.body->write(part.begin(), part.size()).wait(waitScope);
}
request.body = nullptr;
auto response = request.response.wait(io.waitScope);
auto response = request.response.wait(waitScope);
KJ_EXPECT(response.statusCode == testCase.response.statusCode);
auto body = response.body->readAllText().wait(io.waitScope);
auto body = response.body->readAllText().wait(waitScope);
if (testCase.request.method == HttpMethod::HEAD) {
KJ_EXPECT(body == "");
} else {
......@@ -1083,21 +1090,30 @@ KJ_TEST("HttpClient pipeline") {
client = nullptr;
pipe.ends[0]->shutdownWrite();
writeResponsesPromise.wait(io.waitScope);
writeResponsesPromise.wait(waitScope);
}
KJ_TEST("HttpClient parallel pipeline") {
auto PIPELINE_TESTS = pipelineTestCases();
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
auto pipe = kj::newTwoWayPipe();
kj::Promise<void> readRequestsPromise = kj::READY_NOW;
kj::Promise<void> writeResponsesPromise = kj::READY_NOW;
for (auto& testCase: PIPELINE_TESTS) {
writeResponsesPromise = writeResponsesPromise
auto forked = readRequestsPromise
.then([&]() {
return expectRead(*pipe.ends[1], testCase.request.raw);
}).then([&]() {
}).fork();
readRequestsPromise = forked.addBranch();
// Don't write each response until the corresponding request is received.
auto promises = kj::heapArrayBuilder<kj::Promise<void>>(2);
promises.add(forked.addBranch());
promises.add(kj::mv(writeResponsesPromise));
writeResponsesPromise = kj::joinPromises(promises.finish()).then([&]() {
return pipe.ends[1]->write(testCase.response.raw.begin(), testCase.response.raw.size());
});
}
......@@ -1116,7 +1132,7 @@ KJ_TEST("HttpClient parallel pipeline") {
auto request = client->request(
testCase.request.method, testCase.request.path, headers, testCase.request.requestBodySize);
for (auto& part: testCase.request.requestBodyParts) {
request.body->write(part.begin(), part.size()).wait(io.waitScope);
request.body->write(part.begin(), part.size()).wait(waitScope);
}
return kj::mv(request.response);
......@@ -1124,10 +1140,10 @@ KJ_TEST("HttpClient parallel pipeline") {
for (auto i: kj::indices(PIPELINE_TESTS)) {
auto& testCase = PIPELINE_TESTS[i];
auto response = responsePromises[i].wait(io.waitScope);
auto response = responsePromises[i].wait(waitScope);
KJ_EXPECT(response.statusCode == testCase.response.statusCode);
auto body = response.body->readAllText().wait(io.waitScope);
auto body = response.body->readAllText().wait(waitScope);
if (testCase.request.method == HttpMethod::HEAD) {
KJ_EXPECT(body == "");
} else {
......@@ -1138,18 +1154,20 @@ KJ_TEST("HttpClient parallel pipeline") {
client = nullptr;
pipe.ends[0]->shutdownWrite();
writeResponsesPromise.wait(io.waitScope);
writeResponsesPromise.wait(waitScope);
}
KJ_TEST("HttpServer pipeline") {
auto PIPELINE_TESTS = pipelineTestCases();
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = kj::newTwoWayPipe();
HttpHeaderTable table;
TestHttpService service(PIPELINE_TESTS, table);
HttpServer server(io.provider->getTimer(), table, service);
HttpServer server(timer, table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
......@@ -1157,13 +1175,13 @@ KJ_TEST("HttpServer pipeline") {
KJ_CONTEXT(testCase.request.raw, testCase.response.raw);
pipe.ends[1]->write(testCase.request.raw.begin(), testCase.request.raw.size())
.wait(io.waitScope);
.wait(waitScope);
expectRead(*pipe.ends[1], testCase.response.raw).wait(io.waitScope);
expectRead(*pipe.ends[1], testCase.response.raw).wait(waitScope);
}
pipe.ends[1]->shutdownWrite();
listenTask.wait(io.waitScope);
listenTask.wait(waitScope);
KJ_EXPECT(service.getRequestCount() == kj::size(PIPELINE_TESTS));
}
......@@ -1171,8 +1189,10 @@ KJ_TEST("HttpServer pipeline") {
KJ_TEST("HttpServer parallel pipeline") {
auto PIPELINE_TESTS = pipelineTestCases();
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = kj::newTwoWayPipe();
auto allRequestText =
kj::strArray(KJ_MAP(testCase, PIPELINE_TESTS) { return testCase.request.raw; }, "");
......@@ -1181,17 +1201,17 @@ KJ_TEST("HttpServer parallel pipeline") {
HttpHeaderTable table;
TestHttpService service(PIPELINE_TESTS, table);
HttpServer server(io.provider->getTimer(), table, service);
HttpServer server(timer, table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
pipe.ends[1]->write(allRequestText.begin(), allRequestText.size()).wait(io.waitScope);
pipe.ends[1]->write(allRequestText.begin(), allRequestText.size()).wait(waitScope);
pipe.ends[1]->shutdownWrite();
auto rawResponse = pipe.ends[1]->readAllText().wait(io.waitScope);
auto rawResponse = pipe.ends[1]->readAllText().wait(waitScope);
KJ_EXPECT(rawResponse == allResponseText, rawResponse);
listenTask.wait(io.waitScope);
listenTask.wait(waitScope);
KJ_EXPECT(service.getRequestCount() == kj::size(PIPELINE_TESTS));
}
......@@ -1199,12 +1219,14 @@ KJ_TEST("HttpServer parallel pipeline") {
KJ_TEST("HttpClient <-> HttpServer") {
auto PIPELINE_TESTS = pipelineTestCases();
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = kj::newTwoWayPipe();
HttpHeaderTable table;
TestHttpService service(PIPELINE_TESTS, table);
HttpServer server(io.provider->getTimer(), table, service);
HttpServer server(timer, table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[1]));
auto client = newHttpClient(table, *pipe.ends[0]);
......@@ -1220,14 +1242,14 @@ KJ_TEST("HttpClient <-> HttpServer") {
auto request = client->request(
testCase.request.method, testCase.request.path, headers, testCase.request.requestBodySize);
for (auto& part: testCase.request.requestBodyParts) {
request.body->write(part.begin(), part.size()).wait(io.waitScope);
request.body->write(part.begin(), part.size()).wait(waitScope);
}
request.body = nullptr;
auto response = request.response.wait(io.waitScope);
auto response = request.response.wait(waitScope);
KJ_EXPECT(response.statusCode == testCase.response.statusCode);
auto body = response.body->readAllText().wait(io.waitScope);
auto body = response.body->readAllText().wait(waitScope);
if (testCase.request.method == HttpMethod::HEAD) {
KJ_EXPECT(body == "");
} else {
......@@ -1237,15 +1259,16 @@ KJ_TEST("HttpClient <-> HttpServer") {
client = nullptr;
pipe.ends[0]->shutdownWrite();
listenTask.wait(io.waitScope);
listenTask.wait(waitScope);
KJ_EXPECT(service.getRequestCount() == kj::size(PIPELINE_TESTS));
}
// -----------------------------------------------------------------------------
KJ_TEST("WebSocket core protocol") {
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
auto pipe = kj::newTwoWayPipe();
auto client = newWebSocket(kj::mv(pipe.ends[0]), nullptr);
auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr);
......@@ -1260,31 +1283,31 @@ KJ_TEST("WebSocket core protocol") {
.then([&]() { return client->close(1234, "bored"); });
{
auto message = server->receive().wait(io.waitScope);
auto message = server->receive().wait(waitScope);
KJ_ASSERT(message.is<kj::String>());
KJ_EXPECT(message.get<kj::String>() == "hello");
}
{
auto message = server->receive().wait(io.waitScope);
auto message = server->receive().wait(waitScope);
KJ_ASSERT(message.is<kj::String>());
KJ_EXPECT(message.get<kj::String>() == mediumString);
}
{
auto message = server->receive().wait(io.waitScope);
auto message = server->receive().wait(waitScope);
KJ_ASSERT(message.is<kj::String>());
KJ_EXPECT(message.get<kj::String>() == bigString);
}
{
auto message = server->receive().wait(io.waitScope);
auto message = server->receive().wait(waitScope);
KJ_ASSERT(message.is<kj::Array<byte>>());
KJ_EXPECT(kj::str(message.get<kj::Array<byte>>().asChars()) == "world");
}
{
auto message = server->receive().wait(io.waitScope);
auto message = server->receive().wait(waitScope);
KJ_ASSERT(message.is<WebSocket::Close>());
KJ_EXPECT(message.get<WebSocket::Close>().code == 1234);
KJ_EXPECT(message.get<WebSocket::Close>().reason == "bored");
......@@ -1293,19 +1316,20 @@ KJ_TEST("WebSocket core protocol") {
auto serverTask = server->close(4321, "whatever");
{
auto message = client->receive().wait(io.waitScope);
auto message = client->receive().wait(waitScope);
KJ_ASSERT(message.is<WebSocket::Close>());
KJ_EXPECT(message.get<WebSocket::Close>().code == 4321);
KJ_EXPECT(message.get<WebSocket::Close>().reason == "whatever");
}
clientTask.wait(io.waitScope);
serverTask.wait(io.waitScope);
clientTask.wait(waitScope);
serverTask.wait(waitScope);
}
KJ_TEST("WebSocket fragmented") {
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
auto pipe = kj::newTwoWayPipe();
auto client = kj::mv(pipe.ends[0]);
auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr);
......@@ -1321,12 +1345,12 @@ KJ_TEST("WebSocket fragmented") {
auto clientTask = client->write(DATA, sizeof(DATA));
{
auto message = server->receive().wait(io.waitScope);
auto message = server->receive().wait(waitScope);
KJ_ASSERT(message.is<kj::String>());
KJ_EXPECT(message.get<kj::String>() == "hello world");
}
clientTask.wait(io.waitScope);
clientTask.wait(waitScope);
}
class FakeEntropySource final: public EntropySource {
......@@ -1341,8 +1365,9 @@ public:
};
KJ_TEST("WebSocket masked") {
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
auto pipe = kj::newTwoWayPipe();
FakeEntropySource maskGenerator;
auto client = kj::mv(pipe.ends[0]);
......@@ -1356,20 +1381,21 @@ KJ_TEST("WebSocket masked") {
auto serverTask = server->send(kj::StringPtr("hello "));
{
auto message = server->receive().wait(io.waitScope);
auto message = server->receive().wait(waitScope);
KJ_ASSERT(message.is<kj::String>());
KJ_EXPECT(message.get<kj::String>() == "hello ");
}
expectRead(*client, DATA).wait(io.waitScope);
expectRead(*client, DATA).wait(waitScope);
clientTask.wait(io.waitScope);
serverTask.wait(io.waitScope);
clientTask.wait(waitScope);
serverTask.wait(waitScope);
}
KJ_TEST("WebSocket unsolicited pong") {
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
auto pipe = kj::newTwoWayPipe();
auto client = kj::mv(pipe.ends[0]);
auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr);
......@@ -1385,17 +1411,18 @@ KJ_TEST("WebSocket unsolicited pong") {
auto clientTask = client->write(DATA, sizeof(DATA));
{
auto message = server->receive().wait(io.waitScope);
auto message = server->receive().wait(waitScope);
KJ_ASSERT(message.is<kj::String>());
KJ_EXPECT(message.get<kj::String>() == "hello world");
}
clientTask.wait(io.waitScope);
clientTask.wait(waitScope);
}
KJ_TEST("WebSocket ping") {
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
auto pipe = kj::newTwoWayPipe();
auto client = kj::mv(pipe.ends[0]);
auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr);
......@@ -1412,7 +1439,7 @@ KJ_TEST("WebSocket ping") {
auto clientTask = client->write(DATA, sizeof(DATA));
{
auto message = server->receive().wait(io.waitScope);
auto message = server->receive().wait(waitScope);
KJ_ASSERT(message.is<kj::String>());
KJ_EXPECT(message.get<kj::String>() == "hello world");
}
......@@ -1424,15 +1451,16 @@ KJ_TEST("WebSocket ping") {
0x81, 0x03, 'b', 'a', 'r', // message
};
expectRead(*client, EXPECTED).wait(io.waitScope);
expectRead(*client, EXPECTED).wait(waitScope);
clientTask.wait(io.waitScope);
serverTask.wait(io.waitScope);
clientTask.wait(waitScope);
serverTask.wait(waitScope);
}
KJ_TEST("WebSocket ping mid-send") {
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
auto pipe = kj::newTwoWayPipe();
auto client = kj::mv(pipe.ends[0]);
auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr);
......@@ -1448,223 +1476,75 @@ KJ_TEST("WebSocket ping mid-send") {
auto clientTask = client->write(DATA, sizeof(DATA));
{
auto message = server->receive().wait(io.waitScope);
auto message = server->receive().wait(waitScope);
KJ_ASSERT(message.is<kj::String>());
KJ_EXPECT(message.get<kj::String>() == "bar");
}
byte EXPECTED1[] = { 0x81, 0x7f, 0, 0, 0, 0, 0, 8, 0, 0 };
expectRead(*client, EXPECTED1).wait(io.waitScope);
expectRead(*client, bigString).wait(io.waitScope);
expectRead(*client, EXPECTED1).wait(waitScope);
expectRead(*client, bigString).wait(waitScope);
byte EXPECTED2[] = { 0x8A, 0x03, 'f', 'o', 'o' };
expectRead(*client, EXPECTED2).wait(io.waitScope);
expectRead(*client, EXPECTED2).wait(waitScope);
clientTask.wait(io.waitScope);
serverTask.wait(io.waitScope);
clientTask.wait(waitScope);
serverTask.wait(waitScope);
}
class UnbufferedPipe final: public AsyncIoStream {
// An in-memory one-way pipe with no internal buffer. read() blocks waiting for write()s and
// write() blocks waiting for read()s.
//
// TODO(cleanup): This is probably broadly useful. Put it in a utility library somewhere.
// NOTE: Must implement handling of cancellation first!
public:
kj::Promise<void> write(const void* buffer, size_t size) override {
KJ_SWITCH_ONEOF(current) {
KJ_CASE_ONEOF(w, CurrentWrite) {
KJ_FAIL_REQUIRE("can only call write() once at a time");
}
KJ_CASE_ONEOF(r, CurrentRead) {
if (size < r.minBytes) {
// Write does not complete the current read.
memcpy(r.buffer.begin(), buffer, size);
r.minBytes -= size;
r.alreadyRead += size;
r.buffer = r.buffer.slice(size, r.buffer.size());
return kj::READY_NOW;
} else if (size <= r.buffer.size()) {
// Write satisfies the current read, and read satisfies the write.
memcpy(r.buffer.begin(), buffer, size);
r.fulfiller->fulfill(r.alreadyRead + size);
current = None();
return kj::READY_NOW;
} else {
// Write satisfies the read and still has more data leftover to write.
size_t amount = r.buffer.size();
memcpy(r.buffer.begin(), buffer, amount);
r.fulfiller->fulfill(amount + r.alreadyRead);
auto paf = kj::newPromiseAndFulfiller<void>();
current = CurrentWrite {
kj::arrayPtr(reinterpret_cast<const byte*>(buffer) + amount, size - amount),
kj::mv(paf.fulfiller)
};
return kj::mv(paf.promise);
}
}
KJ_CASE_ONEOF(e, Eof) {
KJ_FAIL_REQUIRE("write after EOF");
}
KJ_CASE_ONEOF(n, None) {
auto paf = kj::newPromiseAndFulfiller<void>();
current = CurrentWrite {
kj::arrayPtr(reinterpret_cast<const byte*>(buffer), size),
kj::mv(paf.fulfiller)
};
return kj::mv(paf.promise);
}
}
KJ_UNREACHABLE;
}
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
KJ_SWITCH_ONEOF(current) {
KJ_CASE_ONEOF(w, CurrentWrite) {
if (maxBytes < w.buffer.size()) {
// Entire read satisfied by write, write is still pending.
memcpy(buffer, w.buffer.begin(), maxBytes);
w.buffer = w.buffer.slice(maxBytes, w.buffer.size());
return maxBytes;
} else if (minBytes <= w.buffer.size()) {
// Read is satisfied by write and consumes entire write.
size_t result = w.buffer.size();
memcpy(buffer, w.buffer.begin(), result);
w.fulfiller->fulfill();
current = None();
return result;
} else {
// Read consumes entire write and is not satisfied.
size_t alreadyRead = w.buffer.size();
memcpy(buffer, w.buffer.begin(), alreadyRead);
w.fulfiller->fulfill();
auto paf = kj::newPromiseAndFulfiller<size_t>();
current = CurrentRead {
kj::arrayPtr(reinterpret_cast<byte*>(buffer) + alreadyRead, maxBytes - alreadyRead),
minBytes - alreadyRead,
alreadyRead,
kj::mv(paf.fulfiller)
};
return kj::mv(paf.promise);
}
}
KJ_CASE_ONEOF(r, CurrentRead) {
KJ_FAIL_REQUIRE("can only call read() once at a time");
}
KJ_CASE_ONEOF(e, Eof) {
return size_t(0);
}
KJ_CASE_ONEOF(n, None) {
auto paf = kj::newPromiseAndFulfiller<size_t>();
current = CurrentRead {
kj::arrayPtr(reinterpret_cast<byte*>(buffer), maxBytes),
minBytes,
0,
kj::mv(paf.fulfiller)
};
return kj::mv(paf.promise);
}
}
KJ_UNREACHABLE;
}
kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
// TODO(cleanup): Should this be the defalut implementation of this method?
if (pieces.size() == 0) return kj::READY_NOW;
return write(pieces[0].begin(), pieces[0].size())
.then([this, pieces]() {
return write(pieces.slice(1, pieces.size()));
});
}
void shutdownWrite() override {
KJ_SWITCH_ONEOF(current) {
KJ_CASE_ONEOF(w, CurrentWrite) {
KJ_FAIL_REQUIRE("can't call shutdownWrite() during a write()");
}
KJ_CASE_ONEOF(r, CurrentRead) {
r.fulfiller->fulfill(kj::mv(r.alreadyRead));
}
KJ_CASE_ONEOF(e, Eof) {
// ignore
}
KJ_CASE_ONEOF(n, None) {
// ignore
}
}
current = Eof();
}
private:
struct CurrentWrite {
kj::ArrayPtr<const byte> buffer;
kj::Own<kj::PromiseFulfiller<void>> fulfiller;
};
struct CurrentRead {
kj::ArrayPtr<byte> buffer;
size_t minBytes;
size_t alreadyRead;
kj::Own<kj::PromiseFulfiller<size_t>> fulfiller;
};
struct Eof {};
struct None {};
kj::OneOf<CurrentWrite, CurrentRead, Eof, None> current = None();
};
class InputOutputPair final: public kj::AsyncIoStream {
// Creates an AsyncIoStream out of an AsyncInputStream and an AsyncOutputStream.
public:
InputOutputPair(kj::AsyncInputStream& in, kj::AsyncIoStream& out)
: in(in), out(out) {}
InputOutputPair(kj::Own<kj::AsyncInputStream> in, kj::Own<kj::AsyncOutputStream> out)
: in(kj::mv(in)), out(kj::mv(out)) {}
kj::Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override {
return in.read(buffer, minBytes, maxBytes);
return in->read(buffer, minBytes, maxBytes);
}
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return in.tryRead(buffer, minBytes, maxBytes);
return in->tryRead(buffer, minBytes, maxBytes);
}
Maybe<uint64_t> tryGetLength() override {
return in.tryGetLength();
return in->tryGetLength();
}
Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount = kj::maxValue) override {
return in.pumpTo(output, amount);
return in->pumpTo(output, amount);
}
kj::Promise<void> write(const void* buffer, size_t size) override {
return out.write(buffer, size);
return out->write(buffer, size);
}
kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
return out.write(pieces);
return out->write(pieces);
}
kj::Maybe<kj::Promise<uint64_t>> tryPumpFrom(
kj::AsyncInputStream& input, uint64_t amount = kj::maxValue) override {
return out.tryPumpFrom(input, amount);
return out->tryPumpFrom(input, amount);
}
void shutdownWrite() override {
return out.shutdownWrite();
out = nullptr;
}
private:
kj::AsyncInputStream& in;
kj::AsyncIoStream& out;
kj::Own<kj::AsyncInputStream> in;
kj::Own<kj::AsyncOutputStream> out;
};
KJ_TEST("WebSocket double-ping mid-send") {
auto io = kj::setupAsyncIo();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
UnbufferedPipe upPipe;
UnbufferedPipe downPipe;
InputOutputPair client(downPipe, upPipe);
auto server = newWebSocket(kj::heap<InputOutputPair>(upPipe, downPipe), nullptr);
auto upPipe = newOneWayPipe();
auto downPipe = newOneWayPipe();
InputOutputPair client(kj::mv(downPipe.in), kj::mv(upPipe.out));
auto server = newWebSocket(kj::heap<InputOutputPair>(kj::mv(upPipe.in), kj::mv(downPipe.out)),
nullptr);
auto bigString = kj::strArray(kj::repeat(kj::StringPtr("12345678"), 65536), "");
auto serverTask = server->send(bigString).eagerlyEvaluate(nullptr);
......@@ -1678,25 +1558,26 @@ KJ_TEST("WebSocket double-ping mid-send") {
auto clientTask = client.write(DATA, sizeof(DATA));
{
auto message = server->receive().wait(io.waitScope);
auto message = server->receive().wait(waitScope);
KJ_ASSERT(message.is<kj::String>());
KJ_EXPECT(message.get<kj::String>() == "bar");
}
byte EXPECTED1[] = { 0x81, 0x7f, 0, 0, 0, 0, 0, 8, 0, 0 };
expectRead(client, EXPECTED1).wait(io.waitScope);
expectRead(client, bigString).wait(io.waitScope);
expectRead(client, EXPECTED1).wait(waitScope);
expectRead(client, bigString).wait(waitScope);
byte EXPECTED2[] = { 0x8A, 0x03, 'q', 'u', 'x' };
expectRead(client, EXPECTED2).wait(io.waitScope);
expectRead(client, EXPECTED2).wait(waitScope);
clientTask.wait(io.waitScope);
serverTask.wait(io.waitScope);
clientTask.wait(waitScope);
serverTask.wait(waitScope);
}
KJ_TEST("WebSocket ping received during pong send") {
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
auto pipe = kj::newTwoWayPipe();
auto client = kj::mv(pipe.ends[0]);
auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr);
......@@ -1714,19 +1595,19 @@ KJ_TEST("WebSocket ping received during pong send") {
auto clientTask = client->write(parts);
{
auto message = server->receive().wait(io.waitScope);
auto message = server->receive().wait(waitScope);
KJ_ASSERT(message.is<kj::String>());
KJ_EXPECT(message.get<kj::String>() == "bar");
}
byte EXPECTED1[] = { 0x8A, 0x7f, 0, 0, 0, 0, 0, 8, 0, 0 };
expectRead(*client, EXPECTED1).wait(io.waitScope);
expectRead(*client, bigString).wait(io.waitScope);
expectRead(*client, EXPECTED1).wait(waitScope);
expectRead(*client, bigString).wait(waitScope);
byte EXPECTED2[] = { 0x8A, 0x03, 'f', 'o', 'o' };
expectRead(*client, EXPECTED2).wait(io.waitScope);
expectRead(*client, EXPECTED2).wait(waitScope);
clientTask.wait(io.waitScope);
clientTask.wait(waitScope);
}
class TestWebSocketService final: public HttpService, private kj::TaskSet::ErrorHandler {
......@@ -1824,8 +1705,9 @@ kj::ArrayPtr<const byte> asBytes(const char (&chars)[s]) {
}
KJ_TEST("HttpClient WebSocket handshake") {
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
auto pipe = kj::newTwoWayPipe();
auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE);
......@@ -1848,7 +1730,7 @@ KJ_TEST("HttpClient WebSocket handshake") {
kj::HttpHeaders headers(*headerTable);
headers.set(hMyHeader, "foo");
auto response = client->openWebSocket("/websocket", headers).wait(io.waitScope);
auto response = client->openWebSocket("/websocket", headers).wait(waitScope);
KJ_EXPECT(response.statusCode == 101);
KJ_EXPECT(response.statusText == "Switching Protocols", response.statusText);
......@@ -1857,32 +1739,33 @@ KJ_TEST("HttpClient WebSocket handshake") {
auto ws = kj::mv(response.webSocketOrBody.get<kj::Own<WebSocket>>());
{
auto message = ws->receive().wait(io.waitScope);
auto message = ws->receive().wait(waitScope);
KJ_ASSERT(message.is<kj::String>());
KJ_EXPECT(message.get<kj::String>() == "start-inline");
}
ws->send(kj::StringPtr("bar")).wait(io.waitScope);
ws->send(kj::StringPtr("bar")).wait(waitScope);
{
auto message = ws->receive().wait(io.waitScope);
auto message = ws->receive().wait(waitScope);
KJ_ASSERT(message.is<kj::String>());
KJ_EXPECT(message.get<kj::String>() == "reply:bar");
}
ws->close(0x1234, "qux").wait(io.waitScope);
ws->close(0x1234, "qux").wait(waitScope);
{
auto message = ws->receive().wait(io.waitScope);
auto message = ws->receive().wait(waitScope);
KJ_ASSERT(message.is<WebSocket::Close>());
KJ_EXPECT(message.get<WebSocket::Close>().code == 0x1235);
KJ_EXPECT(message.get<WebSocket::Close>().reason == "close-reply:qux");
}
serverTask.wait(io.waitScope);
serverTask.wait(waitScope);
}
KJ_TEST("HttpClient WebSocket error") {
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
auto pipe = kj::newTwoWayPipe();
auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE);
......@@ -1904,7 +1787,7 @@ KJ_TEST("HttpClient WebSocket error") {
headers.set(hMyHeader, "foo");
{
auto response = client->openWebSocket("/websocket", headers).wait(io.waitScope);
auto response = client->openWebSocket("/websocket", headers).wait(waitScope);
KJ_EXPECT(response.statusCode == 404);
KJ_EXPECT(response.statusText == "Not Found", response.statusText);
......@@ -1913,7 +1796,7 @@ KJ_TEST("HttpClient WebSocket error") {
}
{
auto response = client->openWebSocket("/websocket", headers).wait(io.waitScope);
auto response = client->openWebSocket("/websocket", headers).wait(waitScope);
KJ_EXPECT(response.statusCode == 404);
KJ_EXPECT(response.statusText == "Not Found", response.statusText);
......@@ -1921,57 +1804,61 @@ KJ_TEST("HttpClient WebSocket error") {
KJ_ASSERT(response.webSocketOrBody.is<kj::Own<AsyncInputStream>>());
}
serverTask.wait(io.waitScope);
serverTask.wait(waitScope);
}
KJ_TEST("HttpServer WebSocket handshake") {
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = kj::newTwoWayPipe();
HttpHeaderTable::Builder tableBuilder;
HttpHeaderId hMyHeader = tableBuilder.add("My-Header");
auto headerTable = tableBuilder.build();
TestWebSocketService service(*headerTable, hMyHeader);
HttpServer server(io.provider->getTimer(), *headerTable, service);
HttpServer server(timer, *headerTable, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
auto request = kj::str("GET /ws-inline", WEBSOCKET_REQUEST_HANDSHAKE);
pipe.ends[1]->write({request.asBytes()}).wait(io.waitScope);
expectRead(*pipe.ends[1], WEBSOCKET_RESPONSE_HANDSHAKE).wait(io.waitScope);
pipe.ends[1]->write({request.asBytes()}).wait(waitScope);
expectRead(*pipe.ends[1], WEBSOCKET_RESPONSE_HANDSHAKE).wait(waitScope);
expectRead(*pipe.ends[1], WEBSOCKET_FIRST_MESSAGE_INLINE).wait(io.waitScope);
pipe.ends[1]->write({WEBSOCKET_SEND_MESSAGE}).wait(io.waitScope);
expectRead(*pipe.ends[1], WEBSOCKET_REPLY_MESSAGE).wait(io.waitScope);
pipe.ends[1]->write({WEBSOCKET_SEND_CLOSE}).wait(io.waitScope);
expectRead(*pipe.ends[1], WEBSOCKET_REPLY_CLOSE).wait(io.waitScope);
expectRead(*pipe.ends[1], WEBSOCKET_FIRST_MESSAGE_INLINE).wait(waitScope);
pipe.ends[1]->write({WEBSOCKET_SEND_MESSAGE}).wait(waitScope);
expectRead(*pipe.ends[1], WEBSOCKET_REPLY_MESSAGE).wait(waitScope);
pipe.ends[1]->write({WEBSOCKET_SEND_CLOSE}).wait(waitScope);
expectRead(*pipe.ends[1], WEBSOCKET_REPLY_CLOSE).wait(waitScope);
listenTask.wait(io.waitScope);
listenTask.wait(waitScope);
}
KJ_TEST("HttpServer WebSocket handshake error") {
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = kj::newTwoWayPipe();
HttpHeaderTable::Builder tableBuilder;
HttpHeaderId hMyHeader = tableBuilder.add("My-Header");
auto headerTable = tableBuilder.build();
TestWebSocketService service(*headerTable, hMyHeader);
HttpServer server(io.provider->getTimer(), *headerTable, service);
HttpServer server(timer, *headerTable, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
auto request = kj::str("GET /return-error", WEBSOCKET_REQUEST_HANDSHAKE);
pipe.ends[1]->write({request.asBytes()}).wait(io.waitScope);
expectRead(*pipe.ends[1], WEBSOCKET_RESPONSE_HANDSHAKE_ERROR).wait(io.waitScope);
pipe.ends[1]->write({request.asBytes()}).wait(waitScope);
expectRead(*pipe.ends[1], WEBSOCKET_RESPONSE_HANDSHAKE_ERROR).wait(waitScope);
// Can send more requests!
pipe.ends[1]->write({request.asBytes()}).wait(io.waitScope);
expectRead(*pipe.ends[1], WEBSOCKET_RESPONSE_HANDSHAKE_ERROR).wait(io.waitScope);
pipe.ends[1]->write({request.asBytes()}).wait(waitScope);
expectRead(*pipe.ends[1], WEBSOCKET_RESPONSE_HANDSHAKE_ERROR).wait(waitScope);
pipe.ends[1]->shutdownWrite();
listenTask.wait(io.waitScope);
listenTask.wait(waitScope);
}
// -----------------------------------------------------------------------------
......@@ -1979,46 +1866,59 @@ KJ_TEST("HttpServer WebSocket handshake error") {
KJ_TEST("HttpServer request timeout") {
auto PIPELINE_TESTS = pipelineTestCases();
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = kj::newTwoWayPipe();
HttpHeaderTable table;
TestHttpService service(PIPELINE_TESTS, table);
HttpServerSettings settings;
settings.headerTimeout = 1 * kj::MILLISECONDS;
HttpServer server(io.provider->getTimer(), table, service, settings);
HttpServer server(timer, table, service, settings);
// Shouldn't hang! Should time out.
server.listenHttp(kj::mv(pipe.ends[0])).wait(io.waitScope);
auto promise = server.listenHttp(kj::mv(pipe.ends[0]));
KJ_EXPECT(!promise.poll(waitScope));
timer.advanceTo(timer.now() + settings.headerTimeout / 2);
KJ_EXPECT(!promise.poll(waitScope));
timer.advanceTo(timer.now() + settings.headerTimeout);
promise.wait(waitScope);
// Closes the connection without sending anything.
KJ_EXPECT(pipe.ends[1]->readAllText().wait(io.waitScope) == "");
KJ_EXPECT(pipe.ends[1]->readAllText().wait(waitScope) == "");
}
KJ_TEST("HttpServer pipeline timeout") {
auto PIPELINE_TESTS = pipelineTestCases();
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = kj::newTwoWayPipe();
HttpHeaderTable table;
TestHttpService service(PIPELINE_TESTS, table);
HttpServerSettings settings;
settings.pipelineTimeout = 1 * kj::MILLISECONDS;
HttpServer server(io.provider->getTimer(), table, service, settings);
HttpServer server(timer, table, service, settings);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
// Do one request.
pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
.wait(io.waitScope);
expectRead(*pipe.ends[1], PIPELINE_TESTS[0].response.raw).wait(io.waitScope);
.wait(waitScope);
expectRead(*pipe.ends[1], PIPELINE_TESTS[0].response.raw).wait(waitScope);
// Listen task should time out even though we didn't shutdown the socket.
listenTask.wait(io.waitScope);
KJ_EXPECT(!listenTask.poll(waitScope));
timer.advanceTo(timer.now() + settings.pipelineTimeout / 2);
KJ_EXPECT(!listenTask.poll(waitScope));
timer.advanceTo(timer.now() + settings.pipelineTimeout);
listenTask.wait(waitScope);
// In this case, no data is sent back.
KJ_EXPECT(pipe.ends[1]->readAllText().wait(io.waitScope) == "");
KJ_EXPECT(pipe.ends[1]->readAllText().wait(waitScope) == "");
}
class BrokenHttpService final: public HttpService {
......@@ -2046,19 +1946,21 @@ private:
KJ_TEST("HttpServer no response") {
auto PIPELINE_TESTS = pipelineTestCases();
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = kj::newTwoWayPipe();
HttpHeaderTable table;
BrokenHttpService service;
HttpServer server(io.provider->getTimer(), table, service);
HttpServer server(timer, table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
// Do one request.
pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
.wait(io.waitScope);
auto text = pipe.ends[1]->readAllText().wait(io.waitScope);
.wait(waitScope);
auto text = pipe.ends[1]->readAllText().wait(waitScope);
KJ_EXPECT(text ==
"HTTP/1.1 500 Internal Server Error\r\n"
......@@ -2072,19 +1974,21 @@ KJ_TEST("HttpServer no response") {
KJ_TEST("HttpServer disconnected") {
auto PIPELINE_TESTS = pipelineTestCases();
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = kj::newTwoWayPipe();
HttpHeaderTable table;
BrokenHttpService service(KJ_EXCEPTION(DISCONNECTED, "disconnected"));
HttpServer server(io.provider->getTimer(), table, service);
HttpServer server(timer, table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
// Do one request.
pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
.wait(io.waitScope);
auto text = pipe.ends[1]->readAllText().wait(io.waitScope);
.wait(waitScope);
auto text = pipe.ends[1]->readAllText().wait(waitScope);
KJ_EXPECT(text == "", text);
}
......@@ -2092,19 +1996,21 @@ KJ_TEST("HttpServer disconnected") {
KJ_TEST("HttpServer overloaded") {
auto PIPELINE_TESTS = pipelineTestCases();
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = kj::newTwoWayPipe();
HttpHeaderTable table;
BrokenHttpService service(KJ_EXCEPTION(OVERLOADED, "overloaded"));
HttpServer server(io.provider->getTimer(), table, service);
HttpServer server(timer, table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
// Do one request.
pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
.wait(io.waitScope);
auto text = pipe.ends[1]->readAllText().wait(io.waitScope);
.wait(waitScope);
auto text = pipe.ends[1]->readAllText().wait(waitScope);
KJ_EXPECT(text.startsWith("HTTP/1.1 503 Service Unavailable"), text);
}
......@@ -2112,19 +2018,21 @@ KJ_TEST("HttpServer overloaded") {
KJ_TEST("HttpServer unimplemented") {
auto PIPELINE_TESTS = pipelineTestCases();
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = kj::newTwoWayPipe();
HttpHeaderTable table;
BrokenHttpService service(KJ_EXCEPTION(UNIMPLEMENTED, "unimplemented"));
HttpServer server(io.provider->getTimer(), table, service);
HttpServer server(timer, table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
// Do one request.
pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
.wait(io.waitScope);
auto text = pipe.ends[1]->readAllText().wait(io.waitScope);
.wait(waitScope);
auto text = pipe.ends[1]->readAllText().wait(waitScope);
KJ_EXPECT(text.startsWith("HTTP/1.1 501 Not Implemented"), text);
}
......@@ -2132,19 +2040,21 @@ KJ_TEST("HttpServer unimplemented") {
KJ_TEST("HttpServer threw exception") {
auto PIPELINE_TESTS = pipelineTestCases();
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = kj::newTwoWayPipe();
HttpHeaderTable table;
BrokenHttpService service(KJ_EXCEPTION(FAILED, "failed"));
HttpServer server(io.provider->getTimer(), table, service);
HttpServer server(timer, table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
// Do one request.
pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
.wait(io.waitScope);
auto text = pipe.ends[1]->readAllText().wait(io.waitScope);
.wait(waitScope);
auto text = pipe.ends[1]->readAllText().wait(waitScope);
KJ_EXPECT(text.startsWith("HTTP/1.1 500 Internal Server Error"), text);
}
......@@ -2174,12 +2084,14 @@ private:
KJ_TEST("HttpServer threw exception after starting response") {
auto PIPELINE_TESTS = pipelineTestCases();
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = kj::newTwoWayPipe();
HttpHeaderTable table;
PartialResponseService service;
HttpServer server(io.provider->getTimer(), table, service);
HttpServer server(timer, table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
......@@ -2187,8 +2099,8 @@ KJ_TEST("HttpServer threw exception after starting response") {
// Do one request.
pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
.wait(io.waitScope);
auto text = pipe.ends[1]->readAllText().wait(io.waitScope);
.wait(waitScope);
auto text = pipe.ends[1]->readAllText().wait(waitScope);
KJ_EXPECT(text ==
"HTTP/1.1 200 OK\r\n"
......@@ -2245,20 +2157,22 @@ private:
KJ_TEST("HttpFixedLengthEntityWriter correctly implements tryPumpFrom") {
auto PIPELINE_TESTS = pipelineTestCases();
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = kj::newTwoWayPipe();
HttpHeaderTable table;
PumpResponseService service;
HttpServer server(io.provider->getTimer(), table, service);
HttpServer server(timer, table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
// Do one request.
pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
.wait(io.waitScope);
.wait(waitScope);
pipe.ends[1]->shutdownWrite();
auto text = pipe.ends[1]->readAllText().wait(io.waitScope);
auto text = pipe.ends[1]->readAllText().wait(waitScope);
KJ_EXPECT(text ==
"HTTP/1.1 200 OK\r\n"
......@@ -2272,9 +2186,11 @@ KJ_TEST("HttpFixedLengthEntityWriter correctly implements tryPumpFrom") {
KJ_TEST("newHttpService from HttpClient") {
auto PIPELINE_TESTS = pipelineTestCases();
auto io = kj::setupAsyncIo();
auto frontPipe = io.provider->newTwoWayPipe();
auto backPipe = io.provider->newTwoWayPipe();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto frontPipe = kj::newTwoWayPipe();
auto backPipe = kj::newTwoWayPipe();
kj::Promise<void> writeResponsesPromise = kj::READY_NOW;
for (auto& testCase: PIPELINE_TESTS) {
......@@ -2290,30 +2206,32 @@ KJ_TEST("newHttpService from HttpClient") {
HttpHeaderTable table;
auto backClient = newHttpClient(table, *backPipe.ends[0]);
auto frontService = newHttpService(*backClient);
HttpServer frontServer(io.provider->getTimer(), table, *frontService);
HttpServer frontServer(timer, table, *frontService);
auto listenTask = frontServer.listenHttp(kj::mv(frontPipe.ends[1]));
for (auto& testCase: PIPELINE_TESTS) {
KJ_CONTEXT(testCase.request.raw, testCase.response.raw);
frontPipe.ends[0]->write(testCase.request.raw.begin(), testCase.request.raw.size())
.wait(io.waitScope);
.wait(waitScope);
expectRead(*frontPipe.ends[0], testCase.response.raw).wait(io.waitScope);
expectRead(*frontPipe.ends[0], testCase.response.raw).wait(waitScope);
}
frontPipe.ends[0]->shutdownWrite();
listenTask.wait(io.waitScope);
listenTask.wait(waitScope);
}
backPipe.ends[0]->shutdownWrite();
writeResponsesPromise.wait(io.waitScope);
writeResponsesPromise.wait(waitScope);
}
KJ_TEST("newHttpService from HttpClient WebSockets") {
auto io = kj::setupAsyncIo();
auto frontPipe = io.provider->newTwoWayPipe();
auto backPipe = io.provider->newTwoWayPipe();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto frontPipe = kj::newTwoWayPipe();
auto backPipe = kj::newTwoWayPipe();
auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE);
auto writeResponsesPromise = expectRead(*backPipe.ends[1], request)
......@@ -2337,29 +2255,31 @@ KJ_TEST("newHttpService from HttpClient WebSockets") {
FakeEntropySource entropySource;
auto backClient = newHttpClient(table, *backPipe.ends[0], entropySource);
auto frontService = newHttpService(*backClient);
HttpServer frontServer(io.provider->getTimer(), table, *frontService);
HttpServer frontServer(timer, table, *frontService);
auto listenTask = frontServer.listenHttp(kj::mv(frontPipe.ends[1]));
frontPipe.ends[0]->write({request.asBytes()}).wait(io.waitScope);
expectRead(*frontPipe.ends[0], WEBSOCKET_RESPONSE_HANDSHAKE).wait(io.waitScope);
frontPipe.ends[0]->write({request.asBytes()}).wait(waitScope);
expectRead(*frontPipe.ends[0], WEBSOCKET_RESPONSE_HANDSHAKE).wait(waitScope);
expectRead(*frontPipe.ends[0], WEBSOCKET_FIRST_MESSAGE_INLINE).wait(io.waitScope);
frontPipe.ends[0]->write({WEBSOCKET_SEND_MESSAGE}).wait(io.waitScope);
expectRead(*frontPipe.ends[0], WEBSOCKET_REPLY_MESSAGE).wait(io.waitScope);
frontPipe.ends[0]->write({WEBSOCKET_SEND_CLOSE}).wait(io.waitScope);
expectRead(*frontPipe.ends[0], WEBSOCKET_REPLY_CLOSE).wait(io.waitScope);
expectRead(*frontPipe.ends[0], WEBSOCKET_FIRST_MESSAGE_INLINE).wait(waitScope);
frontPipe.ends[0]->write({WEBSOCKET_SEND_MESSAGE}).wait(waitScope);
expectRead(*frontPipe.ends[0], WEBSOCKET_REPLY_MESSAGE).wait(waitScope);
frontPipe.ends[0]->write({WEBSOCKET_SEND_CLOSE}).wait(waitScope);
expectRead(*frontPipe.ends[0], WEBSOCKET_REPLY_CLOSE).wait(waitScope);
frontPipe.ends[0]->shutdownWrite();
listenTask.wait(io.waitScope);
listenTask.wait(waitScope);
}
writeResponsesPromise.wait(io.waitScope);
writeResponsesPromise.wait(waitScope);
}
KJ_TEST("newHttpService from HttpClient WebSockets disconnect") {
auto io = kj::setupAsyncIo();
auto frontPipe = io.provider->newTwoWayPipe();
auto backPipe = io.provider->newTwoWayPipe();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto frontPipe = kj::newTwoWayPipe();
auto backPipe = kj::newTwoWayPipe();
auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE);
auto writeResponsesPromise = expectRead(*backPipe.ends[1], request)
......@@ -2374,22 +2294,22 @@ KJ_TEST("newHttpService from HttpClient WebSockets disconnect") {
FakeEntropySource entropySource;
auto backClient = newHttpClient(table, *backPipe.ends[0], entropySource);
auto frontService = newHttpService(*backClient);
HttpServer frontServer(io.provider->getTimer(), table, *frontService);
HttpServer frontServer(timer, table, *frontService);
auto listenTask = frontServer.listenHttp(kj::mv(frontPipe.ends[1]));
frontPipe.ends[0]->write({request.asBytes()}).wait(io.waitScope);
expectRead(*frontPipe.ends[0], WEBSOCKET_RESPONSE_HANDSHAKE).wait(io.waitScope);
frontPipe.ends[0]->write({request.asBytes()}).wait(waitScope);
expectRead(*frontPipe.ends[0], WEBSOCKET_RESPONSE_HANDSHAKE).wait(waitScope);
expectRead(*frontPipe.ends[0], WEBSOCKET_FIRST_MESSAGE_INLINE).wait(io.waitScope);
frontPipe.ends[0]->write({WEBSOCKET_SEND_MESSAGE}).wait(io.waitScope);
expectRead(*frontPipe.ends[0], WEBSOCKET_FIRST_MESSAGE_INLINE).wait(waitScope);
frontPipe.ends[0]->write({WEBSOCKET_SEND_MESSAGE}).wait(waitScope);
KJ_EXPECT(frontPipe.ends[0]->readAllText().wait(io.waitScope) == "");
KJ_EXPECT(frontPipe.ends[0]->readAllText().wait(waitScope) == "");
frontPipe.ends[0]->shutdownWrite();
listenTask.wait(io.waitScope);
listenTask.wait(waitScope);
}
writeResponsesPromise.wait(io.waitScope);
writeResponsesPromise.wait(waitScope);
}
// -----------------------------------------------------------------------------
......
......@@ -2492,8 +2492,10 @@ public:
bodyStream = heap<HttpChunkedEntityWriter>(httpOutput);
}
auto responsePromise = httpInput.readResponseHeaders()
.then([this,method](kj::Maybe<HttpHeaders::Response>&& response) -> HttpClient::Response {
auto id = ++counter;
auto responsePromise = httpInput.readResponseHeaders().then(
[this,method,id](kj::Maybe<HttpHeaders::Response>&& response) -> HttpClient::Response {
KJ_IF_MAYBE(r, response) {
auto& headers = httpInput.getHeaders();
HttpClient::Response result {
......@@ -2506,8 +2508,11 @@ public:
if (fastCaseCmp<'c', 'l', 'o', 's', 'e'>(
headers.get(HttpHeaderId::CONNECTION).orDefault(nullptr).cStr())) {
closed = true;
} else {
} else if (counter == id) {
watchForClose();
} else {
// Anothe request was already queued after this one, so we don't want to watch for
// stream closure because we're fully expecting another response.
}
return result;
} else {
......@@ -2550,9 +2555,11 @@ public:
// No entity-body.
httpOutput.finishBody();
auto id = ++counter;
return httpInput.readResponseHeaders()
.then(kj::mvCapture(keyBase64,
[this](kj::StringPtr keyBase64, kj::Maybe<HttpHeaders::Response>&& response)
[this,id](kj::StringPtr keyBase64, kj::Maybe<HttpHeaders::Response>&& response)
-> HttpClient::WebSocketResponse {
KJ_IF_MAYBE(r, response) {
auto& headers = httpInput.getHeaders();
......@@ -2593,8 +2600,11 @@ public:
if (fastCaseCmp<'c', 'l', 'o', 's', 'e'>(
headers.get(HttpHeaderId::CONNECTION).orDefault(nullptr).cStr())) {
closed = true;
} else {
} else if (counter == id) {
watchForClose();
} else {
// Anothe request was already queued after this one, so we don't want to watch for
// stream closure because we're fully expecting another response.
}
return result;
}
......@@ -2614,6 +2624,10 @@ private:
bool upgraded = false;
bool closed = false;
uint counter = 0;
// Counts requests for the sole purpose of detecting if more requests have been made after some
// point in history.
void watchForClose() {
closeWatcherTask = httpInput.awaitNextMessage().then([this](bool hasData) {
if (hasData) {
......
......@@ -131,7 +131,7 @@ public:
inline bool isShared() const {
#if _MSC_VER
KJ_MSVC_INTERLOCKED(Or, acq)(&refcount, 0) > 1;
return KJ_MSVC_INTERLOCKED(Or, acq)(&refcount, 0) > 1;
#else
return __atomic_load_n(&refcount, __ATOMIC_ACQUIRE) > 1;
#endif
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment