Commit 30722cc2 authored by Kenton Varda's avatar Kenton Varda

Update newHttpService(HttpClient&) for WebSocket.

parent 745f8a5c
...@@ -2076,6 +2076,81 @@ KJ_TEST("newHttpService from HttpClient") { ...@@ -2076,6 +2076,81 @@ KJ_TEST("newHttpService from HttpClient") {
writeResponsesPromise.wait(io.waitScope); writeResponsesPromise.wait(io.waitScope);
} }
KJ_TEST("newHttpService from HttpClient WebSockets") {
auto io = kj::setupAsyncIo();
auto frontPipe = io.provider->newTwoWayPipe();
auto backPipe = io.provider->newTwoWayPipe();
auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE);
auto writeResponsesPromise = expectRead(*backPipe.ends[1], request)
.then([&]() { return backPipe.ends[1]->write({asBytes(WEBSOCKET_RESPONSE_HANDSHAKE)}); })
.then([&]() { return backPipe.ends[1]->write({WEBSOCKET_FIRST_MESSAGE_INLINE}); })
.then([&]() { return expectRead(*backPipe.ends[1], WEBSOCKET_SEND_MESSAGE); })
.then([&]() { return backPipe.ends[1]->write({WEBSOCKET_REPLY_MESSAGE}); })
.then([&]() { return expectRead(*backPipe.ends[1], WEBSOCKET_SEND_CLOSE); })
.then([&]() { return backPipe.ends[1]->write({WEBSOCKET_REPLY_CLOSE}); })
.eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); });
{
HttpHeaderTable table;
FakeEntropySource entropySource;
auto backClient = newHttpClient(table, *backPipe.ends[0], entropySource);
auto frontService = newHttpService(*backClient);
HttpServer frontServer(io.provider->getTimer(), table, *frontService);
auto listenTask = frontServer.listenHttp(kj::mv(frontPipe.ends[1]));
frontPipe.ends[0]->write({request.asBytes()}).wait(io.waitScope);
expectRead(*frontPipe.ends[0], WEBSOCKET_RESPONSE_HANDSHAKE).wait(io.waitScope);
expectRead(*frontPipe.ends[0], WEBSOCKET_FIRST_MESSAGE_INLINE).wait(io.waitScope);
frontPipe.ends[0]->write({WEBSOCKET_SEND_MESSAGE}).wait(io.waitScope);
expectRead(*frontPipe.ends[0], WEBSOCKET_REPLY_MESSAGE).wait(io.waitScope);
frontPipe.ends[0]->write({WEBSOCKET_SEND_CLOSE}).wait(io.waitScope);
expectRead(*frontPipe.ends[0], WEBSOCKET_REPLY_CLOSE).wait(io.waitScope);
frontPipe.ends[0]->shutdownWrite();
listenTask.wait(io.waitScope);
}
writeResponsesPromise.wait(io.waitScope);
}
KJ_TEST("newHttpService from HttpClient WebSockets disconnect") {
auto io = kj::setupAsyncIo();
auto frontPipe = io.provider->newTwoWayPipe();
auto backPipe = io.provider->newTwoWayPipe();
auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE);
auto writeResponsesPromise = expectRead(*backPipe.ends[1], request)
.then([&]() { return backPipe.ends[1]->write({asBytes(WEBSOCKET_RESPONSE_HANDSHAKE)}); })
.then([&]() { return backPipe.ends[1]->write({WEBSOCKET_FIRST_MESSAGE_INLINE}); })
.then([&]() { return expectRead(*backPipe.ends[1], WEBSOCKET_SEND_MESSAGE); })
.then([&]() { backPipe.ends[1]->shutdownWrite(); })
.eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); });
{
HttpHeaderTable table;
FakeEntropySource entropySource;
auto backClient = newHttpClient(table, *backPipe.ends[0], entropySource);
auto frontService = newHttpService(*backClient);
HttpServer frontServer(io.provider->getTimer(), table, *frontService);
auto listenTask = frontServer.listenHttp(kj::mv(frontPipe.ends[1]));
frontPipe.ends[0]->write({request.asBytes()}).wait(io.waitScope);
expectRead(*frontPipe.ends[0], WEBSOCKET_RESPONSE_HANDSHAKE).wait(io.waitScope);
expectRead(*frontPipe.ends[0], WEBSOCKET_FIRST_MESSAGE_INLINE).wait(io.waitScope);
frontPipe.ends[0]->write({WEBSOCKET_SEND_MESSAGE}).wait(io.waitScope);
KJ_EXPECT(frontPipe.ends[0]->readAllText().wait(io.waitScope) == "");
frontPipe.ends[0]->shutdownWrite();
listenTask.wait(io.waitScope);
}
writeResponsesPromise.wait(io.waitScope);
}
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
KJ_TEST("HttpClient to capnproto.org") { KJ_TEST("HttpClient to capnproto.org") {
......
...@@ -1900,6 +1900,26 @@ public: ...@@ -1900,6 +1900,26 @@ public:
return promise.attach(kj::mv(payload)); return promise.attach(kj::mv(payload));
} }
kj::Promise<void> disconnect() override {
KJ_REQUIRE(!sendClosed, "WebSocket already closed");
KJ_REQUIRE(!currentlySending, "another message send is already in progress");
KJ_IF_MAYBE(p, sendingPong) {
// We recently sent a pong, make sure it's finished before proceeding.
currentlySending = true;
auto promise = p->then([this]() {
currentlySending = false;
return disconnect();
});
sendingPong = nullptr;
return promise;
}
sendClosed = true;
stream->shutdownWrite();
return kj::READY_NOW;
}
kj::Promise<Message> receive() override { kj::Promise<Message> receive() override {
auto& recvHeader = *reinterpret_cast<Header*>(recvData.begin()); auto& recvHeader = *reinterpret_cast<Header*>(recvData.begin());
size_t headerSize = recvHeader.headerSize(recvData.size()); size_t headerSize = recvHeader.headerSize(recvData.size());
...@@ -2543,6 +2563,30 @@ public: ...@@ -2543,6 +2563,30 @@ public:
return kj::joinPromises(promises.finish()); return kj::joinPromises(promises.finish());
} }
kj::Promise<void> openWebSocket(
kj::StringPtr url, const HttpHeaders& headers, WebSocketResponse& response) override {
return client.openWebSocket(url, headers)
.then([&response](HttpClient::WebSocketResponse&& innerResponse) -> kj::Promise<void> {
KJ_SWITCH_ONEOF(innerResponse.webSocketOrBody) {
KJ_CASE_ONEOF(ws, kj::Own<WebSocket>) {
auto ws2 = response.acceptWebSocket(*innerResponse.headers);
auto promises = kj::heapArrayBuilder<kj::Promise<void>>(2);
promises.add(pumpWebSocket(*ws, *ws2));
promises.add(pumpWebSocket(*ws2, *ws));
return kj::joinPromises(promises.finish()).attach(kj::mv(ws), kj::mv(ws2));
}
KJ_CASE_ONEOF(body, kj::Own<kj::AsyncInputStream>) {
auto out = response.send(
innerResponse.statusCode, innerResponse.statusText, *innerResponse.headers,
body->tryGetLength());
auto promise = body->pumpTo(*out);
return promise.ignoreResult().attach(kj::mv(out), kj::mv(body));
}
}
KJ_UNREACHABLE;
});
}
kj::Promise<kj::Own<kj::AsyncIoStream>> connect(kj::StringPtr host) override { kj::Promise<kj::Own<kj::AsyncIoStream>> connect(kj::StringPtr host) override {
return client.connect(kj::mv(host)); return client.connect(kj::mv(host));
} }
...@@ -2551,6 +2595,40 @@ public: ...@@ -2551,6 +2595,40 @@ public:
private: private:
HttpClient& client; HttpClient& client;
static kj::Promise<void> pumpWebSocket(WebSocket& from, WebSocket& to) {
return kj::evalNow([&]() {
return pumpWebSocketLoop(from, to);
}).catch_([&to](kj::Exception&& e) -> kj::Promise<void> {
if (e.getType() == kj::Exception::Type::DISCONNECTED) {
return to.disconnect();
} else {
return to.close(1002, e.getDescription());
}
});
}
static kj::Promise<void> pumpWebSocketLoop(WebSocket& from, WebSocket& to) {
return from.receive().then([&from,&to](WebSocket::Message&& message) {
KJ_SWITCH_ONEOF(message) {
KJ_CASE_ONEOF(text, kj::String) {
return to.send(text)
.attach(kj::mv(text))
.then([&from,&to]() { return pumpWebSocketLoop(from, to); });
}
KJ_CASE_ONEOF(data, kj::Array<byte>) {
return to.send(data)
.attach(kj::mv(data))
.then([&from,&to]() { return pumpWebSocketLoop(from, to); });
}
KJ_CASE_ONEOF(close, WebSocket::Close) {
return to.close(close.code, close.reason)
.attach(kj::mv(close));
}
}
KJ_UNREACHABLE;
});
}
}; };
} // namespace } // namespace
......
...@@ -404,6 +404,11 @@ public: ...@@ -404,6 +404,11 @@ public:
// for the other end to send a Close reply. The application should await a reply before dropping // for the other end to send a Close reply. The application should await a reply before dropping
// the WebSocket object. // the WebSocket object.
virtual kj::Promise<void> disconnect() = 0;
// Sends EOF on the underlying connection without sending a "close" message. This is NOT a clean
// shutdown, but is sometimes useful when you want the other end to trigger whatever behavior
// it normally triggers when a connection is dropped.
struct Close { struct Close {
uint16_t code; uint16_t code;
kj::String reason; kj::String reason;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment