Commit 76104a88 authored by Kenton Varda's avatar Kenton Varda

Implement newHttpClient(HttpService&).

It turns out wrapping an HttpService in an HttpClient is considerably more complicated than vice versa, due to the need for pipes. This commit adds a WebSocket pipe implementation very similar to the recent byte-stream pipe (though considerably simpler since there's no need to deal with mismatched buffer sizes).
parent 3529a6ee
...@@ -512,6 +512,33 @@ void testHttpClientResponse(kj::WaitScope& waitScope, const HttpResponseTestCase ...@@ -512,6 +512,33 @@ void testHttpClientResponse(kj::WaitScope& waitScope, const HttpResponseTestCase
KJ_EXPECT(pipe.ends[1]->readAllText().wait(waitScope) == ""); KJ_EXPECT(pipe.ends[1]->readAllText().wait(waitScope) == "");
} }
void testHttpClient(kj::WaitScope& waitScope, HttpHeaderTable& table,
HttpClient& client, const HttpTestCase& testCase) {
KJ_CONTEXT(testCase.request.raw, testCase.response.raw);
HttpHeaders headers(table);
for (auto& header: testCase.request.requestHeaders) {
headers.set(header.id, header.value);
}
auto request = client.request(
testCase.request.method, testCase.request.path, headers, testCase.request.requestBodySize);
for (auto& part: testCase.request.requestBodyParts) {
request.body->write(part.begin(), part.size()).wait(waitScope);
}
request.body = nullptr;
auto response = request.response.wait(waitScope);
KJ_EXPECT(response.statusCode == testCase.response.statusCode);
auto body = response.body->readAllText().wait(waitScope);
if (testCase.request.method == HttpMethod::HEAD) {
KJ_EXPECT(body == "");
} else {
KJ_EXPECT(body == kj::strArray(testCase.response.responseBodyParts, ""), body);
}
}
class TestHttpService final: public HttpService { class TestHttpService final: public HttpService {
public: public:
TestHttpService(const HttpRequestTestCase& expectedRequest, TestHttpService(const HttpRequestTestCase& expectedRequest,
...@@ -1062,29 +1089,7 @@ KJ_TEST("HttpClient pipeline") { ...@@ -1062,29 +1089,7 @@ KJ_TEST("HttpClient pipeline") {
auto client = newHttpClient(table, *pipe.ends[0]); auto client = newHttpClient(table, *pipe.ends[0]);
for (auto& testCase: PIPELINE_TESTS) { for (auto& testCase: PIPELINE_TESTS) {
KJ_CONTEXT(testCase.request.raw, testCase.response.raw); testHttpClient(waitScope, table, *client, testCase);
HttpHeaders headers(table);
for (auto& header: testCase.request.requestHeaders) {
headers.set(header.id, header.value);
}
auto request = client->request(
testCase.request.method, testCase.request.path, headers, testCase.request.requestBodySize);
for (auto& part: testCase.request.requestBodyParts) {
request.body->write(part.begin(), part.size()).wait(waitScope);
}
request.body = nullptr;
auto response = request.response.wait(waitScope);
KJ_EXPECT(response.statusCode == testCase.response.statusCode);
auto body = response.body->readAllText().wait(waitScope);
if (testCase.request.method == HttpMethod::HEAD) {
KJ_EXPECT(body == "");
} else {
KJ_EXPECT(body == kj::strArray(testCase.response.responseBodyParts, ""), body);
}
} }
client = nullptr; client = nullptr;
...@@ -1232,29 +1237,7 @@ KJ_TEST("HttpClient <-> HttpServer") { ...@@ -1232,29 +1237,7 @@ KJ_TEST("HttpClient <-> HttpServer") {
auto client = newHttpClient(table, *pipe.ends[0]); auto client = newHttpClient(table, *pipe.ends[0]);
for (auto& testCase: PIPELINE_TESTS) { for (auto& testCase: PIPELINE_TESTS) {
KJ_CONTEXT(testCase.request.raw, testCase.response.raw); testHttpClient(waitScope, table, *client, testCase);
HttpHeaders headers(table);
for (auto& header: testCase.request.requestHeaders) {
headers.set(header.id, header.value);
}
auto request = client->request(
testCase.request.method, testCase.request.path, headers, testCase.request.requestBodySize);
for (auto& part: testCase.request.requestBodyParts) {
request.body->write(part.begin(), part.size()).wait(waitScope);
}
request.body = nullptr;
auto response = request.response.wait(waitScope);
KJ_EXPECT(response.statusCode == testCase.response.statusCode);
auto body = response.body->readAllText().wait(waitScope);
if (testCase.request.method == HttpMethod::HEAD) {
KJ_EXPECT(body == "");
} else {
KJ_EXPECT(body == kj::strArray(testCase.response.responseBodyParts, ""), body);
}
} }
client = nullptr; client = nullptr;
...@@ -1628,7 +1611,7 @@ public: ...@@ -1628,7 +1611,7 @@ public:
if (url == "/return-error") { if (url == "/return-error") {
response.send(404, "Not Found", responseHeaders, uint64_t(0)); response.send(404, "Not Found", responseHeaders, uint64_t(0));
return kj::READY_NOW; return kj::READY_NOW;
} else if (url == "/ws-inline") { } else if (url == "/websocket") {
auto ws = response.acceptWebSocket(responseHeaders); auto ws = response.acceptWebSocket(responseHeaders);
return doWebSocket(*ws, "start-inline").attach(kj::mv(ws)); return doWebSocket(*ws, "start-inline").attach(kj::mv(ws));
} else { } else {
...@@ -1704,33 +1687,11 @@ kj::ArrayPtr<const byte> asBytes(const char (&chars)[s]) { ...@@ -1704,33 +1687,11 @@ kj::ArrayPtr<const byte> asBytes(const char (&chars)[s]) {
return kj::ArrayPtr<const char>(chars, s - 1).asBytes(); return kj::ArrayPtr<const char>(chars, s - 1).asBytes();
} }
KJ_TEST("HttpClient WebSocket handshake") { void testWebSocketClient(kj::WaitScope& waitScope, HttpHeaderTable& headerTable,
kj::EventLoop eventLoop; kj::HttpHeaderId hMyHeader, HttpClient& client) {
kj::WaitScope waitScope(eventLoop); kj::HttpHeaders headers(headerTable);
auto pipe = kj::newTwoWayPipe();
auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE);
auto serverTask = expectRead(*pipe.ends[1], request)
.then([&]() { return pipe.ends[1]->write({asBytes(WEBSOCKET_RESPONSE_HANDSHAKE)}); })
.then([&]() { return pipe.ends[1]->write({WEBSOCKET_FIRST_MESSAGE_INLINE}); })
.then([&]() { return expectRead(*pipe.ends[1], WEBSOCKET_SEND_MESSAGE); })
.then([&]() { return pipe.ends[1]->write({WEBSOCKET_REPLY_MESSAGE}); })
.then([&]() { return expectRead(*pipe.ends[1], WEBSOCKET_SEND_CLOSE); })
.then([&]() { return pipe.ends[1]->write({WEBSOCKET_REPLY_CLOSE}); })
.eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); });
HttpHeaderTable::Builder tableBuilder;
HttpHeaderId hMyHeader = tableBuilder.add("My-Header");
auto headerTable = tableBuilder.build();
FakeEntropySource entropySource;
auto client = newHttpClient(*headerTable, *pipe.ends[0], entropySource);
kj::HttpHeaders headers(*headerTable);
headers.set(hMyHeader, "foo"); headers.set(hMyHeader, "foo");
auto response = client->openWebSocket("/websocket", headers).wait(waitScope); auto response = client.openWebSocket("/websocket", headers).wait(waitScope);
KJ_EXPECT(response.statusCode == 101); KJ_EXPECT(response.statusCode == 101);
KJ_EXPECT(response.statusText == "Switching Protocols", response.statusText); KJ_EXPECT(response.statusText == "Switching Protocols", response.statusText);
...@@ -1758,6 +1719,33 @@ KJ_TEST("HttpClient WebSocket handshake") { ...@@ -1758,6 +1719,33 @@ KJ_TEST("HttpClient WebSocket handshake") {
KJ_EXPECT(message.get<WebSocket::Close>().code == 0x1235); KJ_EXPECT(message.get<WebSocket::Close>().code == 0x1235);
KJ_EXPECT(message.get<WebSocket::Close>().reason == "close-reply:qux"); KJ_EXPECT(message.get<WebSocket::Close>().reason == "close-reply:qux");
} }
}
KJ_TEST("HttpClient WebSocket handshake") {
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
auto pipe = kj::newTwoWayPipe();
auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE);
auto serverTask = expectRead(*pipe.ends[1], request)
.then([&]() { return pipe.ends[1]->write({asBytes(WEBSOCKET_RESPONSE_HANDSHAKE)}); })
.then([&]() { return pipe.ends[1]->write({WEBSOCKET_FIRST_MESSAGE_INLINE}); })
.then([&]() { return expectRead(*pipe.ends[1], WEBSOCKET_SEND_MESSAGE); })
.then([&]() { return pipe.ends[1]->write({WEBSOCKET_REPLY_MESSAGE}); })
.then([&]() { return expectRead(*pipe.ends[1], WEBSOCKET_SEND_CLOSE); })
.then([&]() { return pipe.ends[1]->write({WEBSOCKET_REPLY_CLOSE}); })
.eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); });
HttpHeaderTable::Builder tableBuilder;
HttpHeaderId hMyHeader = tableBuilder.add("My-Header");
auto headerTable = tableBuilder.build();
FakeEntropySource entropySource;
auto client = newHttpClient(*headerTable, *pipe.ends[0], entropySource);
testWebSocketClient(waitScope, *headerTable, hMyHeader, *client);
serverTask.wait(waitScope); serverTask.wait(waitScope);
} }
...@@ -1821,7 +1809,7 @@ KJ_TEST("HttpServer WebSocket handshake") { ...@@ -1821,7 +1809,7 @@ KJ_TEST("HttpServer WebSocket handshake") {
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
auto request = kj::str("GET /ws-inline", WEBSOCKET_REQUEST_HANDSHAKE); auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE);
pipe.ends[1]->write({request.asBytes()}).wait(waitScope); pipe.ends[1]->write({request.asBytes()}).wait(waitScope);
expectRead(*pipe.ends[1], WEBSOCKET_RESPONSE_HANDSHAKE).wait(waitScope); expectRead(*pipe.ends[1], WEBSOCKET_RESPONSE_HANDSHAKE).wait(waitScope);
...@@ -2314,6 +2302,39 @@ KJ_TEST("newHttpService from HttpClient WebSockets disconnect") { ...@@ -2314,6 +2302,39 @@ KJ_TEST("newHttpService from HttpClient WebSockets disconnect") {
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
KJ_TEST("newHttpClient from HttpService") {
auto PIPELINE_TESTS = pipelineTestCases();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
HttpHeaderTable table;
TestHttpService service(PIPELINE_TESTS, table);
auto client = newHttpClient(service);
for (auto& testCase: PIPELINE_TESTS) {
testHttpClient(waitScope, table, *client, testCase);
}
}
KJ_TEST("newHttpClient from HttpService WebSockets") {
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = kj::newTwoWayPipe();
HttpHeaderTable::Builder tableBuilder;
HttpHeaderId hMyHeader = tableBuilder.add("My-Header");
auto headerTable = tableBuilder.build();
TestWebSocketService service(*headerTable, hMyHeader);
auto client = newHttpClient(service);
testWebSocketClient(waitScope, *headerTable, hMyHeader, *client);
}
// -----------------------------------------------------------------------------
class CountingIoStream final: public kj::AsyncIoStream { class CountingIoStream final: public kj::AsyncIoStream {
// An AsyncIoStream wrapper which decrements a counter when destroyed (allowing us to count how // An AsyncIoStream wrapper which decrements a counter when destroyed (allowing us to count how
// many connections are open). // many connections are open).
......
...@@ -2406,17 +2406,532 @@ static kj::Promise<void> pumpWebSocketLoop(WebSocket& from, WebSocket& to) { ...@@ -2406,17 +2406,532 @@ static kj::Promise<void> pumpWebSocketLoop(WebSocket& from, WebSocket& to) {
} }
kj::Promise<void> WebSocket::pumpTo(WebSocket& other) { kj::Promise<void> WebSocket::pumpTo(WebSocket& other) {
return kj::evalNow([&]() { KJ_IF_MAYBE(p, other.tryPumpFrom(*this)) {
return pumpWebSocketLoop(*this, other); // Yay, optimized pump!
}).catch_([&other](kj::Exception&& e) -> kj::Promise<void> { return kj::mv(*p);
if (e.getType() == kj::Exception::Type::DISCONNECTED) { } else {
return other.disconnect(); // Fall back to default implementation.
return kj::evalNow([&]() {
return pumpWebSocketLoop(*this, other);
}).catch_([&other](kj::Exception&& e) -> kj::Promise<void> {
if (e.getType() == kj::Exception::Type::DISCONNECTED) {
return other.disconnect();
} else {
return other.close(1002, e.getDescription());
}
});
}
}
kj::Maybe<kj::Promise<void>> WebSocket::tryPumpFrom(WebSocket& other) {
return nullptr;
}
namespace {
class AbortableWebSocket: public WebSocket {
public:
virtual void abort() = 0;
};
class WebSocketPipeImpl final: public AbortableWebSocket, public kj::Refcounted {
public:
~WebSocketPipeImpl() noexcept(false) {
KJ_REQUIRE(state == nullptr || ownState.get() != nullptr,
"destroying WebSocketPipe with operation still in-progress; probably going to segfault") {
// Don't std::terminate().
break;
}
}
void abort() override {
KJ_IF_MAYBE(s, state) {
s->abort();
} else { } else {
return other.close(1002, e.getDescription()); ownState = heap<Aborted>();
state = *ownState;
}
}
kj::Promise<void> send(kj::ArrayPtr<const byte> message) override {
KJ_IF_MAYBE(s, state) {
return s->send(message);
} else {
return newAdaptedPromise<void, BlockedSend>(*this, MessagePtr(message));
}
}
kj::Promise<void> send(kj::ArrayPtr<const char> message) override {
KJ_IF_MAYBE(s, state) {
return s->send(message);
} else {
return newAdaptedPromise<void, BlockedSend>(*this, MessagePtr(message));
}
}
kj::Promise<void> close(uint16_t code, kj::StringPtr reason) override {
KJ_IF_MAYBE(s, state) {
return s->close(code, reason);
} else {
return newAdaptedPromise<void, BlockedSend>(*this, MessagePtr(ClosePtr { code, reason }));
}
}
kj::Promise<void> disconnect() override {
KJ_IF_MAYBE(s, state) {
return s->disconnect();
} else {
ownState = heap<Disconnected>();
state = *ownState;
return kj::READY_NOW;
}
}
kj::Maybe<kj::Promise<void>> tryPumpFrom(WebSocket& other) override {
KJ_IF_MAYBE(s, state) {
return s->tryPumpFrom(other);
} else {
return newAdaptedPromise<void, BlockedPumpFrom>(*this, other);
}
}
kj::Promise<Message> receive() override {
KJ_IF_MAYBE(s, state) {
return s->receive();
} else {
return newAdaptedPromise<Message, BlockedReceive>(*this);
}
}
kj::Promise<void> pumpTo(WebSocket& other) override {
KJ_IF_MAYBE(s, state) {
return s->pumpTo(other);
} else {
return newAdaptedPromise<void, BlockedPumpTo>(*this, other);
}
}
private:
kj::Maybe<AbortableWebSocket&> state;
// Represents the state of the rendezvous for this end's send() calls / the other end's
// receive() calls. (otherEnd.sendState provides the reverse state.)
kj::Own<AbortableWebSocket> ownState;
void endState(WebSocket& obj) {
KJ_IF_MAYBE(s, state) {
if (s == &obj) {
state = nullptr;
}
}
}
struct ClosePtr {
uint16_t code;
kj::StringPtr reason;
};
typedef kj::OneOf<kj::ArrayPtr<const char>, kj::ArrayPtr<const byte>, ClosePtr> MessagePtr;
class BlockedSend final: public AbortableWebSocket {
public:
BlockedSend(kj::PromiseFulfiller<void>& fulfiller, WebSocketPipeImpl& pipe, MessagePtr message)
: fulfiller(fulfiller), pipe(pipe), message(kj::mv(message)) {
KJ_REQUIRE(pipe.state == nullptr);
pipe.state = *this;
}
~BlockedSend() noexcept(false) {
pipe.endState(*this);
}
void abort() override {
canceler.cancel("other end of WebSocketPipe was destroyed");
fulfiller.reject(KJ_EXCEPTION(DISCONNECTED, "other end of WebSocketPipe was destroyed"));
pipe.endState(*this);
pipe.abort();
}
kj::Promise<void> send(kj::ArrayPtr<const byte> message) override {
KJ_FAIL_ASSERT("another message send is already in progress");
}
kj::Promise<void> send(kj::ArrayPtr<const char> message) override {
KJ_FAIL_ASSERT("another message send is already in progress");
}
kj::Promise<void> close(uint16_t code, kj::StringPtr reason) override {
KJ_FAIL_ASSERT("another message send is already in progress");
}
kj::Promise<void> disconnect() override {
KJ_FAIL_ASSERT("another message send is already in progress");
}
kj::Maybe<kj::Promise<void>> tryPumpFrom(WebSocket& other) override {
KJ_FAIL_ASSERT("another message send is already in progress");
}
kj::Promise<Message> receive() override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
fulfiller.fulfill();
pipe.endState(*this);
KJ_SWITCH_ONEOF(message) {
KJ_CASE_ONEOF(arr, kj::ArrayPtr<const char>) {
return Message(kj::str(arr));
}
KJ_CASE_ONEOF(arr, kj::ArrayPtr<const byte>) {
auto copy = kj::heapArray<byte>(arr.size());
memcpy(copy.begin(), arr.begin(), arr.size());
return Message(kj::mv(copy));
}
KJ_CASE_ONEOF(close, ClosePtr) {
return Message(Close { close.code, kj::str(close.reason) });
}
}
KJ_UNREACHABLE;
}
kj::Promise<void> pumpTo(WebSocket& other) override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
kj::Promise<void> promise = nullptr;
KJ_SWITCH_ONEOF(message) {
KJ_CASE_ONEOF(arr, kj::ArrayPtr<const char>) {
promise = other.send(arr);
}
KJ_CASE_ONEOF(arr, kj::ArrayPtr<const byte>) {
promise = other.send(arr);
}
KJ_CASE_ONEOF(close, ClosePtr) {
promise = other.close(close.code, close.reason);
}
}
return canceler.wrap(promise.then([this,&other]() {
canceler.release();
fulfiller.fulfill();
pipe.endState(*this);
return pipe.pumpTo(other);
}, [this](kj::Exception&& e) -> kj::Promise<void> {
canceler.release();
fulfiller.reject(kj::cp(e));
pipe.endState(*this);
return kj::mv(e);
}));
}
private:
kj::PromiseFulfiller<void>& fulfiller;
WebSocketPipeImpl& pipe;
MessagePtr message;
Canceler canceler;
};
class BlockedPumpFrom final: public AbortableWebSocket {
public:
BlockedPumpFrom(kj::PromiseFulfiller<void>& fulfiller, WebSocketPipeImpl& pipe,
WebSocket& input)
: fulfiller(fulfiller), pipe(pipe), input(input) {
KJ_REQUIRE(pipe.state == nullptr);
pipe.state = *this;
}
~BlockedPumpFrom() noexcept(false) {
pipe.endState(*this);
}
void abort() override {
canceler.cancel("other end of WebSocketPipe was destroyed");
fulfiller.reject(KJ_EXCEPTION(DISCONNECTED, "other end of WebSocketPipe was destroyed"));
pipe.endState(*this);
pipe.abort();
}
kj::Promise<void> send(kj::ArrayPtr<const byte> message) override {
KJ_FAIL_ASSERT("another message send is already in progress");
}
kj::Promise<void> send(kj::ArrayPtr<const char> message) override {
KJ_FAIL_ASSERT("another message send is already in progress");
}
kj::Promise<void> close(uint16_t code, kj::StringPtr reason) override {
KJ_FAIL_ASSERT("another message send is already in progress");
}
kj::Promise<void> disconnect() override {
KJ_FAIL_ASSERT("another message send is already in progress");
}
kj::Maybe<kj::Promise<void>> tryPumpFrom(WebSocket& other) override {
KJ_FAIL_ASSERT("another message send is already in progress");
}
kj::Promise<Message> receive() override {
KJ_REQUIRE(canceler.isEmpty(), "another message receive is already in progress");
return canceler.wrap(input.receive()
.then([this](Message message) {
if (message.is<Close>()) {
canceler.release();
fulfiller.fulfill();
pipe.endState(*this);
}
return kj::mv(message);
}, [this](kj::Exception&& e) -> Message {
canceler.release();
fulfiller.reject(kj::cp(e));
pipe.endState(*this);
kj::throwRecoverableException(kj::mv(e));
return Message(kj::String());
}));
}
kj::Promise<void> pumpTo(WebSocket& other) override {
KJ_REQUIRE(canceler.isEmpty(), "another message receive is already in progress");
return canceler.wrap(input.pumpTo(other)
.then([this]() {
canceler.release();
fulfiller.fulfill();
pipe.endState(*this);
}, [this](kj::Exception&& e) {
canceler.release();
fulfiller.reject(kj::cp(e));
pipe.endState(*this);
kj::throwRecoverableException(kj::mv(e));
}));
}
private:
kj::PromiseFulfiller<void>& fulfiller;
WebSocketPipeImpl& pipe;
WebSocket& input;
Canceler canceler;
};
class BlockedReceive final: public AbortableWebSocket {
public:
BlockedReceive(kj::PromiseFulfiller<Message>& fulfiller, WebSocketPipeImpl& pipe)
: fulfiller(fulfiller), pipe(pipe) {
KJ_REQUIRE(pipe.state == nullptr);
pipe.state = *this;
}
~BlockedReceive() noexcept(false) {
pipe.endState(*this);
} }
});
}
void abort() override {
canceler.cancel("other end of WebSocketPipe was destroyed");
fulfiller.reject(KJ_EXCEPTION(DISCONNECTED, "other end of WebSocketPipe was destroyed"));
pipe.endState(*this);
pipe.abort();
}
kj::Promise<void> send(kj::ArrayPtr<const byte> message) override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
auto copy = kj::heapArray<byte>(message.size());
memcpy(copy.begin(), message.begin(), message.size());
fulfiller.fulfill(Message(kj::mv(copy)));
pipe.endState(*this);
return kj::READY_NOW;
}
kj::Promise<void> send(kj::ArrayPtr<const char> message) override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
fulfiller.fulfill(Message(kj::str(message)));
pipe.endState(*this);
return kj::READY_NOW;
}
kj::Promise<void> close(uint16_t code, kj::StringPtr reason) override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
fulfiller.fulfill(Message(Close { code, kj::str(reason) }));
pipe.endState(*this);
return kj::READY_NOW;
}
kj::Promise<void> disconnect() override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
fulfiller.reject(KJ_EXCEPTION(DISCONNECTED, "WebSocket disconnected"));
pipe.endState(*this);
return pipe.disconnect();
}
kj::Maybe<kj::Promise<void>> tryPumpFrom(WebSocket& other) override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
return canceler.wrap(other.receive().then([this,&other](Message message) {
fulfiller.fulfill(kj::mv(message));
pipe.endState(*this);
return other.pumpTo(pipe);
}, [this](kj::Exception&& e) -> kj::Promise<void> {
canceler.release();
fulfiller.reject(kj::cp(e));
pipe.endState(*this);
return kj::mv(e);
}));
}
kj::Promise<Message> receive() override {
KJ_FAIL_ASSERT("another message receive is already in progress");
}
kj::Promise<void> pumpTo(WebSocket& other) override {
KJ_FAIL_ASSERT("another message receive is already in progress");
}
private:
kj::PromiseFulfiller<Message>& fulfiller;
WebSocketPipeImpl& pipe;
Canceler canceler;
};
class BlockedPumpTo final: public AbortableWebSocket {
public:
BlockedPumpTo(kj::PromiseFulfiller<void>& fulfiller, WebSocketPipeImpl& pipe, WebSocket& output)
: fulfiller(fulfiller), pipe(pipe), output(output) {
KJ_REQUIRE(pipe.state == nullptr);
pipe.state = *this;
}
~BlockedPumpTo() noexcept(false) {
pipe.endState(*this);
}
void abort() override {
canceler.cancel("other end of WebSocketPipe was destroyed");
fulfiller.reject(KJ_EXCEPTION(DISCONNECTED, "other end of WebSocketPipe was destroyed"));
pipe.endState(*this);
pipe.abort();
}
kj::Promise<void> send(kj::ArrayPtr<const byte> message) override {
KJ_REQUIRE(canceler.isEmpty(), "another message send is already in progress");
return canceler.wrap(output.send(message));
}
kj::Promise<void> send(kj::ArrayPtr<const char> message) override {
KJ_REQUIRE(canceler.isEmpty(), "another message send is already in progress");
return canceler.wrap(output.send(message));
}
kj::Promise<void> close(uint16_t code, kj::StringPtr reason) override {
KJ_REQUIRE(canceler.isEmpty(), "another message send is already in progress");
return canceler.wrap(output.close(code, reason));
}
kj::Promise<void> disconnect() override {
KJ_REQUIRE(canceler.isEmpty(), "another message send is already in progress");
return canceler.wrap(output.disconnect().then([this]() {
canceler.release();
pipe.endState(*this);
fulfiller.fulfill();
return pipe.disconnect();
}));
}
kj::Maybe<kj::Promise<void>> tryPumpFrom(WebSocket& other) override {
KJ_REQUIRE(canceler.isEmpty(), "another message send is already in progress");
return canceler.wrap(other.pumpTo(output).then([this]() {
canceler.release();
pipe.endState(*this);
fulfiller.fulfill();
}));
}
kj::Promise<Message> receive() override {
KJ_FAIL_ASSERT("another message receive is already in progress");
}
kj::Promise<void> pumpTo(WebSocket& other) override {
KJ_FAIL_ASSERT("another message receive is already in progress");
}
private:
kj::PromiseFulfiller<void>& fulfiller;
WebSocketPipeImpl& pipe;
WebSocket& output;
Canceler canceler;
};
class Disconnected final: public AbortableWebSocket {
public:
void abort() override {
// can ignore
}
kj::Promise<void> send(kj::ArrayPtr<const byte> message) override {
KJ_FAIL_REQUIRE("can't send() after disconnect()");
}
kj::Promise<void> send(kj::ArrayPtr<const char> message) override {
KJ_FAIL_REQUIRE("can't send() after disconnect()");
}
kj::Promise<void> close(uint16_t code, kj::StringPtr reason) override {
KJ_FAIL_REQUIRE("can't close() after disconnect()");
}
kj::Promise<void> disconnect() override {
return kj::READY_NOW;
}
kj::Maybe<kj::Promise<void>> tryPumpFrom(WebSocket& other) override {
KJ_FAIL_REQUIRE("can't tryPumpFrom() after disconnect()");
}
kj::Promise<Message> receive() override {
return KJ_EXCEPTION(DISCONNECTED, "WebSocket disconnected");
}
kj::Promise<void> pumpTo(WebSocket& other) override {
return kj::READY_NOW;
}
};
class Aborted final: public AbortableWebSocket {
public:
void abort() override {
// can ignore
}
kj::Promise<void> send(kj::ArrayPtr<const byte> message) override {
return KJ_EXCEPTION(DISCONNECTED, "other end of WebSocketPipe was destroyed");
}
kj::Promise<void> send(kj::ArrayPtr<const char> message) override {
return KJ_EXCEPTION(DISCONNECTED, "other end of WebSocketPipe was destroyed");
}
kj::Promise<void> close(uint16_t code, kj::StringPtr reason) override {
return KJ_EXCEPTION(DISCONNECTED, "other end of WebSocketPipe was destroyed");
}
kj::Promise<void> disconnect() override {
return KJ_EXCEPTION(DISCONNECTED, "other end of WebSocketPipe was destroyed");
}
kj::Maybe<kj::Promise<void>> tryPumpFrom(WebSocket& other) override {
return kj::Promise<void>(KJ_EXCEPTION(DISCONNECTED,
"other end of WebSocketPipe was destroyed"));
}
kj::Promise<Message> receive() override {
return KJ_EXCEPTION(DISCONNECTED, "other end of WebSocketPipe was destroyed");
}
kj::Promise<void> pumpTo(WebSocket& other) override {
return KJ_EXCEPTION(DISCONNECTED, "other end of WebSocketPipe was destroyed");
}
};
};
class WebSocketPipeEnd final: public WebSocket {
public:
WebSocketPipeEnd(kj::Own<WebSocketPipeImpl> in, kj::Own<WebSocketPipeImpl> out)
: in(kj::mv(in)), out(kj::mv(out)) {}
~WebSocketPipeEnd() noexcept(false) {
in->abort();
out->abort();
}
kj::Promise<void> send(kj::ArrayPtr<const byte> message) override {
return out->send(message);
}
kj::Promise<void> send(kj::ArrayPtr<const char> message) override {
return out->send(message);
}
kj::Promise<void> close(uint16_t code, kj::StringPtr reason) override {
return out->close(code, reason);
}
kj::Promise<void> disconnect() override {
return out->disconnect();
}
kj::Maybe<kj::Promise<void>> tryPumpFrom(WebSocket& other) override {
return out->tryPumpFrom(other);
}
kj::Promise<Message> receive() override {
return in->receive();
}
kj::Promise<void> pumpTo(WebSocket& other) override {
return in->pumpTo(other);
}
private:
kj::Own<WebSocketPipeImpl> in;
kj::Own<WebSocketPipeImpl> out;
};
} // namespace
WebSocketPipe newWebSocketPipe() {
auto pipe1 = kj::refcounted<WebSocketPipeImpl>();
auto pipe2 = kj::refcounted<WebSocketPipeImpl>();
auto end1 = kj::heap<WebSocketPipeEnd>(kj::addRef(*pipe1), kj::addRef(*pipe2));
auto end2 = kj::heap<WebSocketPipeEnd>(kj::mv(pipe2), kj::mv(pipe1));
return { { kj::mv(end1), kj::mv(end2) } };
}
// ======================================================================================= // =======================================================================================
...@@ -3204,6 +3719,207 @@ kj::Own<HttpClient> newHttpClient(kj::Timer& timer, HttpHeaderTable& responseHea ...@@ -3204,6 +3719,207 @@ kj::Own<HttpClient> newHttpClient(kj::Timer& timer, HttpHeaderTable& responseHea
namespace { namespace {
class NullInputStream final: public kj::AsyncInputStream {
public:
NullInputStream(kj::Maybe<size_t> expectedLength = size_t(0))
: expectedLength(expectedLength) {}
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return size_t(0);
}
kj::Maybe<uint64_t> tryGetLength() override {
return expectedLength;
}
kj::Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override {
return uint64_t(0);
}
private:
kj::Maybe<size_t> expectedLength;
};
class NullOutputStream final: public kj::AsyncOutputStream {
public:
Promise<void> write(const void* buffer, size_t size) override {
return kj::READY_NOW;
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
return kj::READY_NOW;
}
// We can't really optimize tryPumpFrom() unless AsyncInputStream grows a skip() method.
};
class HttpClientAdapter final: public HttpClient {
public:
HttpClientAdapter(HttpService& service): service(service) {}
Request request(HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::Maybe<uint64_t> expectedBodySize = nullptr) override {
// We have to clone the URL and headers because HttpService implementation are allowed to
// assume that they remain valid until the service handler completes whereas HttpClient callers
// are allowed to destroy them immediately after the call.
auto urlCopy = kj::str(url);
auto headersCopy = kj::heap(headers.clone());
auto pipe = newOneWayPipe(expectedBodySize);
auto paf = kj::newPromiseAndFulfiller<Response>();
auto responder = kj::refcounted<ResponseImpl>(method, kj::mv(paf.fulfiller));
auto promise = service.request(method, urlCopy, *headersCopy, *pipe.in, *responder);
responder->setPromise(promise.attach(kj::mv(pipe.in), kj::mv(urlCopy), kj::mv(headersCopy)));
return {
kj::mv(pipe.out),
paf.promise.attach(kj::mv(responder))
};
}
kj::Promise<WebSocketResponse> openWebSocket(
kj::StringPtr url, const HttpHeaders& headers) override {
// We have to clone the URL and headers because HttpService implementation are allowed to
// assume that they remain valid until the service handler completes whereas HttpClient callers
// are allowed to destroy them immediately after the call. Also we need to add
// `Upgrade: websocket` so that headers.isWebSocket() returns true on the service side.
auto urlCopy = kj::str(url);
auto headersCopy = kj::heap(headers.clone());
headersCopy->set(HttpHeaderId::UPGRADE, "websocket");
KJ_DASSERT(headersCopy->isWebSocket());
auto paf = kj::newPromiseAndFulfiller<WebSocketResponse>();
auto responder = kj::refcounted<WebSocketResponseImpl>(kj::mv(paf.fulfiller));
auto in = kj::heap<NullInputStream>();
auto promise = service.request(HttpMethod::GET, urlCopy, *headersCopy, *in, *responder);
responder->setPromise(promise.attach(kj::mv(in), kj::mv(urlCopy), kj::mv(headersCopy)));
return paf.promise.attach(kj::mv(responder));
}
kj::Promise<kj::Own<kj::AsyncIoStream>> connect(kj::StringPtr host) override {
return service.connect(kj::mv(host));
}
private:
HttpService& service;
class ResponseImpl final: public HttpService::Response, public kj::Refcounted {
public:
ResponseImpl(kj::HttpMethod method,
kj::Own<kj::PromiseFulfiller<HttpClient::Response>> fulfiller)
: method(method), fulfiller(kj::mv(fulfiller)) {}
void setPromise(kj::Promise<void> promise) {
task = promise.eagerlyEvaluate([this](kj::Exception&& exception) {
if (fulfiller->isWaiting()) {
fulfiller->reject(kj::mv(exception));
} else {
KJ_LOG(ERROR, "HttpService threw an exception after having already started responding",
exception);
}
});
}
kj::Own<kj::AsyncOutputStream> send(
uint statusCode, kj::StringPtr statusText, const HttpHeaders& headers,
kj::Maybe<uint64_t> expectedBodySize = nullptr) override {
// The caller of HttpClient is allowed to assume that the statusText and headers remain
// valid until the body stream is dropped, but the HttpService implementation is allowed to
// send values that are only valid until send() returns, so we have to copy.
auto statusTextCopy = kj::str(statusText);
auto headersCopy = kj::heap(headers.clone());
if (method == kj::HttpMethod::HEAD) {
fulfiller->fulfill({
statusCode, statusTextCopy, headersCopy.get(),
kj::heap<NullInputStream>(expectedBodySize)
.attach(kj::addRef(*this), kj::mv(statusTextCopy), kj::mv(headersCopy))
});
return kj::heap<NullOutputStream>();
} else {
auto pipe = newOneWayPipe(expectedBodySize);
fulfiller->fulfill({
statusCode, statusTextCopy, headersCopy.get(),
pipe.in.attach(kj::addRef(*this), kj::mv(statusTextCopy), kj::mv(headersCopy))
});
return kj::mv(pipe.out);
}
}
kj::Own<WebSocket> acceptWebSocket(const HttpHeaders& headers) override {
KJ_FAIL_REQUIRE("a WebSocket was not requested");
}
private:
kj::HttpMethod method;
kj::Own<kj::PromiseFulfiller<HttpClient::Response>> fulfiller;
kj::Promise<void> task = nullptr;
};
class WebSocketResponseImpl final: public HttpService::Response, public kj::Refcounted {
public:
WebSocketResponseImpl(kj::Own<kj::PromiseFulfiller<HttpClient::WebSocketResponse>> fulfiller)
: fulfiller(kj::mv(fulfiller)) {}
void setPromise(kj::Promise<void> promise) {
task = promise.eagerlyEvaluate([this](kj::Exception&& exception) {
if (fulfiller->isWaiting()) {
fulfiller->reject(kj::mv(exception));
} else {
KJ_LOG(ERROR, "HttpService threw an exception after having already started responding",
exception);
}
});
}
kj::Own<kj::AsyncOutputStream> send(
uint statusCode, kj::StringPtr statusText, const HttpHeaders& headers,
kj::Maybe<uint64_t> expectedBodySize = nullptr) override {
// The caller of HttpClient is allowed to assume that the statusText and headers remain
// valid until the body stream is dropped, but the HttpService implementation is allowed to
// send values that are only valid until send() returns, so we have to copy.
auto statusTextCopy = kj::str(statusText);
auto headersCopy = kj::heap(headers.clone());
auto pipe = newOneWayPipe(expectedBodySize);
fulfiller->fulfill({
statusCode, statusTextCopy, headersCopy.get(),
pipe.in.attach(kj::addRef(*this), kj::mv(statusTextCopy), kj::mv(headersCopy))
});
return kj::mv(pipe.out);
}
kj::Own<WebSocket> acceptWebSocket(const HttpHeaders& headers) override {
// The caller of HttpClient is allowed to assume that the headers remain valid until the body
// stream is dropped, but the HttpService implementation is allowed to send headers that are
// only valid until acceptWebSocket() returns, so we have to copy.
auto headersCopy = kj::heap(headers.clone());
auto pipe = newWebSocketPipe();
fulfiller->fulfill({
101, "Switching Protocols", headersCopy.get(),
pipe.ends[0].attach(kj::addRef(*this), kj::mv(headersCopy))
});
return kj::mv(pipe.ends[1]);
}
private:
kj::Own<kj::PromiseFulfiller<HttpClient::WebSocketResponse>> fulfiller;
kj::Promise<void> task = nullptr;
};
};
} // namespace
kj::Own<HttpClient> newHttpClient(HttpService& service) {
return kj::heap<HttpClientAdapter>(service);
}
// =======================================================================================
namespace {
class HttpServiceAdapter final: public HttpService { class HttpServiceAdapter final: public HttpService {
public: public:
HttpServiceAdapter(HttpClient& client): client(client) {} HttpServiceAdapter(HttpClient& client): client(client) {}
...@@ -3256,8 +3972,6 @@ public: ...@@ -3256,8 +3972,6 @@ public:
return client.connect(kj::mv(host)); return client.connect(kj::mv(host));
} }
// TODO(soon): other methods
private: private:
HttpClient& client; HttpClient& client;
}; };
......
...@@ -424,7 +424,7 @@ public: ...@@ -424,7 +424,7 @@ public:
// Read one message from the WebSocket and return it. Can only call once at a time. Do not call // Read one message from the WebSocket and return it. Can only call once at a time. Do not call
// again after Close is received. // again after Close is received.
kj::Promise<void> pumpTo(WebSocket& other); virtual kj::Promise<void> pumpTo(WebSocket& other);
// Continuously receives messages from this WebSocket and send them to `other`. // Continuously receives messages from this WebSocket and send them to `other`.
// //
// On EOF, calls other.disconnect(), then resolves. // On EOF, calls other.disconnect(), then resolves.
...@@ -432,6 +432,12 @@ public: ...@@ -432,6 +432,12 @@ public:
// On other read errors, calls other.close() with the error, then resolves. // On other read errors, calls other.close() with the error, then resolves.
// //
// On write error, rejects with the error. // On write error, rejects with the error.
virtual kj::Maybe<kj::Promise<void>> tryPumpFrom(WebSocket& other);
// Either returns null, or performs the equivalent of other.pumpTo(*this). Only returns non-null
// if this WebSocket implementation is able to perform the pump in an optimized way, better than
// the default implementation of pumpTo(). The default implementation of pumpTo() always tries
// calling this first, and the default implementation of tryPumpFrom() always returns null.
}; };
class HttpClient { class HttpClient {
...@@ -634,6 +640,15 @@ kj::Own<WebSocket> newWebSocket(kj::Own<kj::AsyncIoStream> stream, ...@@ -634,6 +640,15 @@ kj::Own<WebSocket> newWebSocket(kj::Own<kj::AsyncIoStream> stream,
// like HTTP requests" in a message as being actual HTTP requests, which could result in cache // like HTTP requests" in a message as being actual HTTP requests, which could result in cache
// poisoning. See RFC6455 section 10.3. // poisoning. See RFC6455 section 10.3.
struct WebSocketPipe {
kj::Own<WebSocket> ends[2];
};
WebSocketPipe newWebSocketPipe();
// Create a WebSocket pipe. Messages written to one end of the pipe will be readable from the other
// end. No buffering occurs -- a message send does not complete under a corresponding receive
// accepts the message.
struct HttpServerSettings { struct HttpServerSettings {
kj::Duration headerTimeout = 15 * kj::SECONDS; kj::Duration headerTimeout = 15 * kj::SECONDS;
// After initial connection open, or after receiving the first byte of a pipelined request, // After initial connection open, or after receiving the first byte of a pipelined request,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment