Commit d2cc9fe2 authored by Kenton Varda's avatar Kenton Varda

Make pumpWebSocket() public as WebSocket::pumpTo().

parent 43abe32e
......@@ -2365,6 +2365,42 @@ kj::Own<WebSocket> newWebSocket(kj::Own<kj::AsyncIoStream> stream,
return kj::heap<WebSocketImpl>(kj::mv(stream), maskKeyGenerator);
}
static kj::Promise<void> pumpWebSocketLoop(WebSocket& from, WebSocket& to) {
return from.receive().then([&from,&to](WebSocket::Message&& message) {
KJ_SWITCH_ONEOF(message) {
KJ_CASE_ONEOF(text, kj::String) {
return to.send(text)
.attach(kj::mv(text))
.then([&from,&to]() { return pumpWebSocketLoop(from, to); });
}
KJ_CASE_ONEOF(data, kj::Array<byte>) {
return to.send(data)
.attach(kj::mv(data))
.then([&from,&to]() { return pumpWebSocketLoop(from, to); });
}
KJ_CASE_ONEOF(close, WebSocket::Close) {
return to.close(close.code, close.reason)
.attach(kj::mv(close))
.then([&from,&to]() { return pumpWebSocketLoop(from, to); });
}
}
KJ_UNREACHABLE;
});
}
kj::Promise<void> WebSocket::pumpTo(WebSocket& other) {
return kj::evalNow([&]() {
return pumpWebSocketLoop(*this, other);
}).catch_([&other](kj::Exception&& e) -> kj::Promise<void> {
if (e.getType() == kj::Exception::Type::DISCONNECTED) {
return other.disconnect();
} else {
return other.close(1002, e.getDescription());
}
});
}
// =======================================================================================
namespace {
......@@ -2573,8 +2609,8 @@ public:
KJ_CASE_ONEOF(ws, kj::Own<WebSocket>) {
auto ws2 = response.acceptWebSocket(*innerResponse.headers);
auto promises = kj::heapArrayBuilder<kj::Promise<void>>(2);
promises.add(pumpWebSocket(*ws, *ws2));
promises.add(pumpWebSocket(*ws2, *ws));
promises.add(ws->pumpTo(*ws2));
promises.add(ws2->pumpTo(*ws));
return kj::joinPromises(promises.finish()).attach(kj::mv(ws), kj::mv(ws2));
}
KJ_CASE_ONEOF(body, kj::Own<kj::AsyncInputStream>) {
......@@ -2597,41 +2633,6 @@ public:
private:
HttpClient& client;
static kj::Promise<void> pumpWebSocket(WebSocket& from, WebSocket& to) {
return kj::evalNow([&]() {
return pumpWebSocketLoop(from, to);
}).catch_([&to](kj::Exception&& e) -> kj::Promise<void> {
if (e.getType() == kj::Exception::Type::DISCONNECTED) {
return to.disconnect();
} else {
return to.close(1002, e.getDescription());
}
});
}
static kj::Promise<void> pumpWebSocketLoop(WebSocket& from, WebSocket& to) {
return from.receive().then([&from,&to](WebSocket::Message&& message) {
KJ_SWITCH_ONEOF(message) {
KJ_CASE_ONEOF(text, kj::String) {
return to.send(text)
.attach(kj::mv(text))
.then([&from,&to]() { return pumpWebSocketLoop(from, to); });
}
KJ_CASE_ONEOF(data, kj::Array<byte>) {
return to.send(data)
.attach(kj::mv(data))
.then([&from,&to]() { return pumpWebSocketLoop(from, to); });
}
KJ_CASE_ONEOF(close, WebSocket::Close) {
return to.close(close.code, close.reason)
.attach(kj::mv(close))
.then([&from,&to]() { return pumpWebSocketLoop(from, to); });
}
}
KJ_UNREACHABLE;
});
}
};
} // namespace
......
......@@ -420,6 +420,15 @@ public:
virtual kj::Promise<Message> receive() = 0;
// Read one message from the WebSocket and return it. Can only call once at a time. Do not call
// again after EndOfStream is received.
kj::Promise<void> pumpTo(WebSocket& other);
// Continuously receives messages from this WebSocket and send them to `other`.
//
// On EOF, calls other.disconnect(), then resolves.
//
// On other read errors, calls other.close() with the error, then resolves.
//
// On write error, rejects with the error.
};
class HttpClient {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment