Commit 442cf7f2 authored by Kenton Varda's avatar Kenton Varda

Try to fix test failure on Win32.

Due to differences in the way I/O events are queued on Windows, the timing of this test end up different, such that the two incoming pings are not both received before the large outgoing message is sent.

To fix this, I removed the dependency on native I/O altogether by implementing an in-memory pipe that does no buffering (it requires a read() and write() to rendezvous, then copies between their buffers).
parent 5f7b02d6
......@@ -1391,12 +1391,207 @@ KJ_TEST("WebSocket ping mid-send") {
serverTask.wait(io.waitScope);
}
class UnbufferedPipe final: public AsyncIoStream {
// An in-memory one-way pipe with no internal buffer. read() blocks waiting for write()s and
// write() blocks waiting for read()s.
//
// TODO(cleanup): This is probably broadly useful. Put it in a utility library somewhere.
// NOTE: Must implement handling of cancellation first!
public:
kj::Promise<void> write(const void* buffer, size_t size) override {
KJ_SWITCH_ONEOF(current) {
KJ_CASE_ONEOF(w, CurrentWrite) {
KJ_FAIL_REQUIRE("can only call write() once at a time");
}
KJ_CASE_ONEOF(r, CurrentRead) {
if (size < r.minBytes) {
// Write does not complete the current read.
memcpy(r.buffer.begin(), buffer, size);
r.minBytes -= size;
r.alreadyRead += size;
r.buffer = r.buffer.slice(size, r.buffer.size());
return kj::READY_NOW;
} else if (size <= r.buffer.size()) {
// Write satisfies the current read, and read satisfies the write.
memcpy(r.buffer.begin(), buffer, size);
r.fulfiller->fulfill(r.alreadyRead + size);
current = None();
return kj::READY_NOW;
} else {
// Write satisfies the read and still has more data leftover to write.
size_t amount = r.buffer.size();
memcpy(r.buffer.begin(), buffer, amount);
r.fulfiller->fulfill(amount + r.alreadyRead);
auto paf = kj::newPromiseAndFulfiller<void>();
current = CurrentWrite {
kj::arrayPtr(reinterpret_cast<const byte*>(buffer) + amount, size - amount),
kj::mv(paf.fulfiller)
};
return kj::mv(paf.promise);
}
}
KJ_CASE_ONEOF(e, Eof) {
KJ_FAIL_REQUIRE("write after EOF");
}
KJ_CASE_ONEOF(n, None) {
auto paf = kj::newPromiseAndFulfiller<void>();
current = CurrentWrite {
kj::arrayPtr(reinterpret_cast<const byte*>(buffer), size),
kj::mv(paf.fulfiller)
};
return kj::mv(paf.promise);
}
}
KJ_UNREACHABLE;
}
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
KJ_SWITCH_ONEOF(current) {
KJ_CASE_ONEOF(w, CurrentWrite) {
if (maxBytes < w.buffer.size()) {
// Entire read satisfied by write, write is still pending.
memcpy(buffer, w.buffer.begin(), maxBytes);
w.buffer = w.buffer.slice(maxBytes, w.buffer.size());
return maxBytes;
} else if (minBytes <= w.buffer.size()) {
// Read is satisfied by write and consumes entire write.
size_t result = w.buffer.size();
memcpy(buffer, w.buffer.begin(), result);
w.fulfiller->fulfill();
current = None();
return result;
} else {
// Read consumes entire write and is not satisfied.
size_t alreadyRead = w.buffer.size();
memcpy(buffer, w.buffer.begin(), alreadyRead);
w.fulfiller->fulfill();
auto paf = kj::newPromiseAndFulfiller<size_t>();
current = CurrentRead {
kj::arrayPtr(reinterpret_cast<byte*>(buffer) + alreadyRead, maxBytes - alreadyRead),
minBytes - alreadyRead,
alreadyRead,
kj::mv(paf.fulfiller)
};
return kj::mv(paf.promise);
}
}
KJ_CASE_ONEOF(r, CurrentRead) {
KJ_FAIL_REQUIRE("can only call read() once at a time");
}
KJ_CASE_ONEOF(e, Eof) {
return size_t(0);
}
KJ_CASE_ONEOF(n, None) {
auto paf = kj::newPromiseAndFulfiller<size_t>();
current = CurrentRead {
kj::arrayPtr(reinterpret_cast<byte*>(buffer), maxBytes),
minBytes,
0,
kj::mv(paf.fulfiller)
};
return kj::mv(paf.promise);
}
}
KJ_UNREACHABLE;
}
kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
// TODO(cleanup): Should this be the defalut implementation of this method?
if (pieces.size() == 0) return kj::READY_NOW;
return write(pieces[0].begin(), pieces[0].size())
.then([this, pieces]() {
return write(pieces.slice(1, pieces.size()));
});
}
void shutdownWrite() override {
KJ_SWITCH_ONEOF(current) {
KJ_CASE_ONEOF(w, CurrentWrite) {
KJ_FAIL_REQUIRE("can't call shutdownWrite() during a write()");
}
KJ_CASE_ONEOF(r, CurrentRead) {
r.fulfiller->fulfill(kj::mv(r.alreadyRead));
}
KJ_CASE_ONEOF(e, Eof) {
// ignore
}
KJ_CASE_ONEOF(n, None) {
// ignore
}
}
current = Eof();
}
private:
struct CurrentWrite {
kj::ArrayPtr<const byte> buffer;
kj::Own<kj::PromiseFulfiller<void>> fulfiller;
};
struct CurrentRead {
kj::ArrayPtr<byte> buffer;
size_t minBytes;
size_t alreadyRead;
kj::Own<kj::PromiseFulfiller<size_t>> fulfiller;
};
struct Eof {};
struct None {};
kj::OneOf<CurrentWrite, CurrentRead, Eof, None> current = None();
};
class InputOutputPair final: public kj::AsyncIoStream {
// Creates an AsyncIoStream out of an AsyncInputStream and an AsyncOutputStream.
public:
InputOutputPair(kj::AsyncInputStream& in, kj::AsyncIoStream& out)
: in(in), out(out) {}
kj::Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override {
return in.read(buffer, minBytes, maxBytes);
}
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return in.tryRead(buffer, minBytes, maxBytes);
}
Maybe<uint64_t> tryGetLength() override {
return in.tryGetLength();
}
Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount = kj::maxValue) override {
return in.pumpTo(output, amount);
}
kj::Promise<void> write(const void* buffer, size_t size) override {
return out.write(buffer, size);
}
kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
return out.write(pieces);
}
kj::Maybe<kj::Promise<uint64_t>> tryPumpFrom(
kj::AsyncInputStream& input, uint64_t amount = kj::maxValue) override {
return out.tryPumpFrom(input, amount);
}
void shutdownWrite() override {
return out.shutdownWrite();
}
private:
kj::AsyncInputStream& in;
kj::AsyncIoStream& out;
};
KJ_TEST("WebSocket double-ping mid-send") {
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
auto client = kj::mv(pipe.ends[0]);
auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr);
UnbufferedPipe upPipe;
UnbufferedPipe downPipe;
InputOutputPair client(downPipe, upPipe);
auto server = newWebSocket(kj::heap<InputOutputPair>(upPipe, downPipe), nullptr);
auto bigString = kj::strArray(kj::repeat(kj::StringPtr("12345678"), 65536), "");
auto serverTask = server->send(bigString).eagerlyEvaluate(nullptr);
......@@ -1407,7 +1602,7 @@ KJ_TEST("WebSocket double-ping mid-send") {
0x81, 0x03, 'b', 'a', 'r', // some other message
};
auto clientTask = client->write(DATA, sizeof(DATA));
auto clientTask = client.write(DATA, sizeof(DATA));
{
auto message = server->receive().wait(io.waitScope);
......@@ -1416,11 +1611,11 @@ KJ_TEST("WebSocket double-ping mid-send") {
}
byte EXPECTED1[] = { 0x81, 0x7f, 0, 0, 0, 0, 0, 8, 0, 0 };
expectRead(*client, EXPECTED1).wait(io.waitScope);
expectRead(*client, bigString).wait(io.waitScope);
expectRead(client, EXPECTED1).wait(io.waitScope);
expectRead(client, bigString).wait(io.waitScope);
byte EXPECTED2[] = { 0x8A, 0x03, 'q', 'u', 'x' };
expectRead(*client, EXPECTED2).wait(io.waitScope);
expectRead(client, EXPECTED2).wait(io.waitScope);
clientTask.wait(io.waitScope);
serverTask.wait(io.waitScope);
......
......@@ -2342,7 +2342,7 @@ private:
sendParts[0] = sendHeader.compose(true, OPCODE_PONG, payload.size(), Mask(maskKeyGenerator));
sendParts[1] = payload;
return stream->write(sendParts);
return stream->write(sendParts).attach(kj::mv(payload));
}
};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment