Commit 6e4c5ce3 authored by Kenton Varda's avatar Kenton Varda

Fix bug in closing proxied WebSockets.

The proxying code was responding to a `Close` message by ending the pump loop, which had the effect of immediately dropping the connection after a `Close` had been seen in each direction. This is arguably incorrect behavior: for proxying purposes, `Close` messages and underlying TCP disconnects should be treated as independent events, forwarded separately.

In practice this "bug" probably would never cause a problem and perhaps doesn't even violate spec (since `Close` was seen in both directions). But, OSX's implementation of shutdown() returns ENOTCONN if the connection has already been disconnected from the remote end. This is the case here, as the proxy dropped all connections immediately after sending the final `Close`.

This in turn led to a unit test failure.

The intended behavior was that the proxy would forward exactly what it saw: If a `Close` was sent, it would be forwarded, without changing the underlying connection state. If a TCP disconnect was detected, it would be "forwarded" by disconnecting the next leg. This change implements that behavior.
parent 74913d84
...@@ -2284,6 +2284,13 @@ KJ_TEST("newHttpService from HttpClient WebSockets") { ...@@ -2284,6 +2284,13 @@ KJ_TEST("newHttpService from HttpClient WebSockets") {
.then([&]() { return backPipe.ends[1]->write({WEBSOCKET_REPLY_MESSAGE}); }) .then([&]() { return backPipe.ends[1]->write({WEBSOCKET_REPLY_MESSAGE}); })
.then([&]() { return expectRead(*backPipe.ends[1], WEBSOCKET_SEND_CLOSE); }) .then([&]() { return expectRead(*backPipe.ends[1], WEBSOCKET_SEND_CLOSE); })
.then([&]() { return backPipe.ends[1]->write({WEBSOCKET_REPLY_CLOSE}); }) .then([&]() { return backPipe.ends[1]->write({WEBSOCKET_REPLY_CLOSE}); })
// expect EOF
.then([&]() { return backPipe.ends[1]->readAllBytes(); })
.then([&](kj::ArrayPtr<byte> content) {
KJ_EXPECT(content.size() == 0);
// Send EOF.
backPipe.ends[1]->shutdownWrite();
})
.eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); }); .eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); });
{ {
......
...@@ -1901,21 +1901,23 @@ public: ...@@ -1901,21 +1901,23 @@ public:
} }
kj::Promise<void> disconnect() override { kj::Promise<void> disconnect() override {
KJ_REQUIRE(!sendClosed, "WebSocket already closed"); if (!sendClosed) {
KJ_REQUIRE(!currentlySending, "another message send is already in progress"); KJ_REQUIRE(!currentlySending, "another message send is already in progress");
KJ_IF_MAYBE(p, sendingPong) {
// We recently sent a pong, make sure it's finished before proceeding.
currentlySending = true;
auto promise = p->then([this]() {
currentlySending = false;
return disconnect();
});
sendingPong = nullptr;
return promise;
}
KJ_IF_MAYBE(p, sendingPong) { sendClosed = true;
// We recently sent a pong, make sure it's finished before proceeding.
currentlySending = true;
auto promise = p->then([this]() {
currentlySending = false;
return disconnect();
});
sendingPong = nullptr;
return promise;
} }
sendClosed = true;
stream->shutdownWrite(); stream->shutdownWrite();
return kj::READY_NOW; return kj::READY_NOW;
} }
...@@ -2623,7 +2625,8 @@ private: ...@@ -2623,7 +2625,8 @@ private:
} }
KJ_CASE_ONEOF(close, WebSocket::Close) { KJ_CASE_ONEOF(close, WebSocket::Close) {
return to.close(close.code, close.reason) return to.close(close.code, close.reason)
.attach(kj::mv(close)); .attach(kj::mv(close))
.then([&from,&to]() { return pumpWebSocketLoop(from, to); });
} }
} }
KJ_UNREACHABLE; KJ_UNREACHABLE;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment