Commit ce83d192 authored by Kenton Varda's avatar Kenton Varda

Fix handling of disconnect during WebSocket::pumpTo().

Before this change, if *either* the `from` or the `to` end of the pump threw a DISCONNECTED exception, we'd then try to call `to.disconnect()`. But if the exception had been thrown from `to` in the first place, then this would throw again, this time complaining about a previous write not having completed. But the whole point was always to propagate errors from the *receiving* end to the sending end. An error on the sending end should just propagate up the stack.
parent eedf4e50
...@@ -424,7 +424,7 @@ private: ...@@ -424,7 +424,7 @@ private:
void abortRead() override { void abortRead() override {
canceler.cancel("abortRead() was called"); canceler.cancel("abortRead() was called");
fulfiller.reject(KJ_EXCEPTION(FAILED, "read end of pipe was aborted")); fulfiller.reject(KJ_EXCEPTION(DISCONNECTED, "read end of pipe was aborted"));
pipe.endState(*this); pipe.endState(*this);
pipe.abortRead(); pipe.abortRead();
} }
...@@ -537,7 +537,7 @@ private: ...@@ -537,7 +537,7 @@ private:
if (n == 0) { if (n == 0) {
fulfiller.fulfill(kj::cp(pumpedSoFar)); fulfiller.fulfill(kj::cp(pumpedSoFar));
} else { } else {
fulfiller.reject(KJ_EXCEPTION(FAILED, "read end of pipe was aborted")); fulfiller.reject(KJ_EXCEPTION(DISCONNECTED, "read end of pipe was aborted"));
} }
}).eagerlyEvaluate([this](kj::Exception&& e) { }).eagerlyEvaluate([this](kj::Exception&& e) {
fulfiller.reject(kj::mv(e)); fulfiller.reject(kj::mv(e));
...@@ -595,7 +595,7 @@ private: ...@@ -595,7 +595,7 @@ private:
void abortRead() override { void abortRead() override {
canceler.cancel("abortRead() was called"); canceler.cancel("abortRead() was called");
fulfiller.reject(KJ_EXCEPTION(FAILED, "read end of pipe was aborted")); fulfiller.reject(KJ_EXCEPTION(DISCONNECTED, "read end of pipe was aborted"));
pipe.endState(*this); pipe.endState(*this);
pipe.abortRead(); pipe.abortRead();
} }
...@@ -765,7 +765,7 @@ private: ...@@ -765,7 +765,7 @@ private:
void abortRead() override { void abortRead() override {
canceler.cancel("abortRead() was called"); canceler.cancel("abortRead() was called");
fulfiller.reject(KJ_EXCEPTION(FAILED, "read end of pipe was aborted")); fulfiller.reject(KJ_EXCEPTION(DISCONNECTED, "read end of pipe was aborted"));
pipe.endState(*this); pipe.endState(*this);
pipe.abortRead(); pipe.abortRead();
} }
......
...@@ -1709,6 +1709,56 @@ KJ_TEST("WebSocket ping received during pong send") { ...@@ -1709,6 +1709,56 @@ KJ_TEST("WebSocket ping received during pong send") {
clientTask.wait(waitScope); clientTask.wait(waitScope);
} }
KJ_TEST("WebSocket pump disconnect on send") {
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
auto pipe1 = kj::newTwoWayPipe();
auto pipe2 = kj::newTwoWayPipe();
auto client1 = newWebSocket(kj::mv(pipe1.ends[0]), nullptr);
auto server1 = newWebSocket(kj::mv(pipe1.ends[1]), nullptr);
auto client2 = newWebSocket(kj::mv(pipe2.ends[0]), nullptr);
auto pumpTask = server1->pumpTo(*client2);
auto sendTask = client1->send("hello"_kj);
// Endpoint reads three bytes and then disconnects.
char buffer[3];
pipe2.ends[1]->read(buffer, 3).wait(waitScope);
pipe2.ends[1] = nullptr;
// Pump throws disconnected.
KJ_EXPECT_THROW(DISCONNECTED, pumpTask.wait(waitScope));
// client1 managed to send its whole message into the pump, though.
sendTask.wait(waitScope);
}
KJ_TEST("WebSocket pump disconnect on receive") {
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
auto pipe1 = kj::newTwoWayPipe();
auto pipe2 = kj::newTwoWayPipe();
auto server1 = newWebSocket(kj::mv(pipe1.ends[1]), nullptr);
auto client2 = newWebSocket(kj::mv(pipe2.ends[0]), nullptr);
auto server2 = newWebSocket(kj::mv(pipe2.ends[1]), nullptr);
auto pumpTask = server1->pumpTo(*client2);
auto receiveTask = server2->receive();
// Client sends three bytes of a valid message then disconnects.
const char DATA[] = {0x01, 0x06, 'h'};
pipe1.ends[0]->write(DATA, 3).wait(waitScope);
pipe1.ends[0] = nullptr;
// The pump completes successfully, forwarding the disconnect.
pumpTask.wait(waitScope);
// The eventual receiver gets a disconnect execption.
KJ_EXPECT_THROW(DISCONNECTED, receiveTask.wait(waitScope));
}
class TestWebSocketService final: public HttpService, private kj::TaskSet::ErrorHandler { class TestWebSocketService final: public HttpService, private kj::TaskSet::ErrorHandler {
public: public:
TestWebSocketService(HttpHeaderTable& headerTable, HttpHeaderId hMyHeader) TestWebSocketService(HttpHeaderTable& headerTable, HttpHeaderId hMyHeader)
......
...@@ -2445,6 +2445,12 @@ static kj::Promise<void> pumpWebSocketLoop(WebSocket& from, WebSocket& to) { ...@@ -2445,6 +2445,12 @@ static kj::Promise<void> pumpWebSocketLoop(WebSocket& from, WebSocket& to) {
} }
} }
KJ_UNREACHABLE; KJ_UNREACHABLE;
}, [&to](kj::Exception&& e) {
if (e.getType() == kj::Exception::Type::DISCONNECTED) {
return to.disconnect();
} else {
return to.close(1002, e.getDescription());
}
}); });
} }
...@@ -2456,12 +2462,6 @@ kj::Promise<void> WebSocket::pumpTo(WebSocket& other) { ...@@ -2456,12 +2462,6 @@ kj::Promise<void> WebSocket::pumpTo(WebSocket& other) {
// Fall back to default implementation. // Fall back to default implementation.
return kj::evalNow([&]() { return kj::evalNow([&]() {
return pumpWebSocketLoop(*this, other); return pumpWebSocketLoop(*this, other);
}).catch_([&other](kj::Exception&& e) -> kj::Promise<void> {
if (e.getType() == kj::Exception::Type::DISCONNECTED) {
return other.disconnect();
} else {
return other.close(1002, e.getDescription());
}
}); });
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment