Unverified Commit fac09cf7 authored by Kenton Varda's avatar Kenton Varda Committed by GitHub

Merge pull request #662 from capnproto/http-client-adapter

Implement newHttpClient(HttpService&).
parents 8243bab5 8d2ed06b
......@@ -1289,5 +1289,25 @@ KJ_TEST("Userland pipe pumpTo less than write amount") {
writePromise.wait(ws);
}
KJ_TEST("Userland pipe pumpFrom EOF on abortRead()") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in));
auto promise = pipe.out->write("foobar", 6);
KJ_EXPECT(!promise.poll(ws));
expectRead(*pipe2.in, "foobar").wait(ws);
promise.wait(ws);
KJ_EXPECT(!pumpPromise.poll(ws));
pipe.out = nullptr;
pipe2.in = nullptr; // force pump to notice EOF
KJ_EXPECT(pumpPromise.wait(ws) == 6);
pipe2.out = nullptr;
}
} // namespace
} // namespace kj
......@@ -522,7 +522,26 @@ private:
void abortRead() override {
canceler.cancel("abortRead() was called");
fulfiller.reject(KJ_EXCEPTION(FAILED, "read end of pipe was aborted"));
// The input might have reached EOF, but we haven't detected it yet because we haven't tried
// to read that far. If we had not optimized tryPumpFrom() and instead used the default
// pumpTo() implementation, then the input would not have called write() again once it
// reached EOF, and therefore the abortRead() on the other end would *not* propagate an
// exception! We need the same behavior here. To that end, we need to detect if we're at EOF
// by reading one last byte.
checkEofTask = kj::evalNow([&]() {
static char junk;
return input.tryRead(&junk, 1, 1).then([this](uint64_t n) {
if (n == 0) {
fulfiller.fulfill(kj::cp(pumpedSoFar));
} else {
fulfiller.reject(KJ_EXCEPTION(FAILED, "read end of pipe was aborted"));
}
}).eagerlyEvaluate([this](kj::Exception&& e) {
fulfiller.reject(kj::mv(e));
});
});
pipe.endState(*this);
pipe.abortRead();
}
......@@ -547,6 +566,7 @@ private:
uint64_t amount;
uint64_t pumpedSoFar = 0;
Canceler canceler;
kj::Promise<void> checkEofTask = nullptr;
};
class BlockedRead final: public AsyncIoStream {
......
......@@ -512,6 +512,33 @@ void testHttpClientResponse(kj::WaitScope& waitScope, const HttpResponseTestCase
KJ_EXPECT(pipe.ends[1]->readAllText().wait(waitScope) == "");
}
void testHttpClient(kj::WaitScope& waitScope, HttpHeaderTable& table,
HttpClient& client, const HttpTestCase& testCase) {
KJ_CONTEXT(testCase.request.raw, testCase.response.raw);
HttpHeaders headers(table);
for (auto& header: testCase.request.requestHeaders) {
headers.set(header.id, header.value);
}
auto request = client.request(
testCase.request.method, testCase.request.path, headers, testCase.request.requestBodySize);
for (auto& part: testCase.request.requestBodyParts) {
request.body->write(part.begin(), part.size()).wait(waitScope);
}
request.body = nullptr;
auto response = request.response.wait(waitScope);
KJ_EXPECT(response.statusCode == testCase.response.statusCode);
auto body = response.body->readAllText().wait(waitScope);
if (testCase.request.method == HttpMethod::HEAD) {
KJ_EXPECT(body == "");
} else {
KJ_EXPECT(body == kj::strArray(testCase.response.responseBodyParts, ""), body);
}
}
class TestHttpService final: public HttpService {
public:
TestHttpService(const HttpRequestTestCase& expectedRequest,
......@@ -1062,29 +1089,7 @@ KJ_TEST("HttpClient pipeline") {
auto client = newHttpClient(table, *pipe.ends[0]);
for (auto& testCase: PIPELINE_TESTS) {
KJ_CONTEXT(testCase.request.raw, testCase.response.raw);
HttpHeaders headers(table);
for (auto& header: testCase.request.requestHeaders) {
headers.set(header.id, header.value);
}
auto request = client->request(
testCase.request.method, testCase.request.path, headers, testCase.request.requestBodySize);
for (auto& part: testCase.request.requestBodyParts) {
request.body->write(part.begin(), part.size()).wait(waitScope);
}
request.body = nullptr;
auto response = request.response.wait(waitScope);
KJ_EXPECT(response.statusCode == testCase.response.statusCode);
auto body = response.body->readAllText().wait(waitScope);
if (testCase.request.method == HttpMethod::HEAD) {
KJ_EXPECT(body == "");
} else {
KJ_EXPECT(body == kj::strArray(testCase.response.responseBodyParts, ""), body);
}
testHttpClient(waitScope, table, *client, testCase);
}
client = nullptr;
......@@ -1232,29 +1237,7 @@ KJ_TEST("HttpClient <-> HttpServer") {
auto client = newHttpClient(table, *pipe.ends[0]);
for (auto& testCase: PIPELINE_TESTS) {
KJ_CONTEXT(testCase.request.raw, testCase.response.raw);
HttpHeaders headers(table);
for (auto& header: testCase.request.requestHeaders) {
headers.set(header.id, header.value);
}
auto request = client->request(
testCase.request.method, testCase.request.path, headers, testCase.request.requestBodySize);
for (auto& part: testCase.request.requestBodyParts) {
request.body->write(part.begin(), part.size()).wait(waitScope);
}
request.body = nullptr;
auto response = request.response.wait(waitScope);
KJ_EXPECT(response.statusCode == testCase.response.statusCode);
auto body = response.body->readAllText().wait(waitScope);
if (testCase.request.method == HttpMethod::HEAD) {
KJ_EXPECT(body == "");
} else {
KJ_EXPECT(body == kj::strArray(testCase.response.responseBodyParts, ""), body);
}
testHttpClient(waitScope, table, *client, testCase);
}
client = nullptr;
......@@ -1628,7 +1611,7 @@ public:
if (url == "/return-error") {
response.send(404, "Not Found", responseHeaders, uint64_t(0));
return kj::READY_NOW;
} else if (url == "/ws-inline") {
} else if (url == "/websocket") {
auto ws = response.acceptWebSocket(responseHeaders);
return doWebSocket(*ws, "start-inline").attach(kj::mv(ws));
} else {
......@@ -1704,33 +1687,11 @@ kj::ArrayPtr<const byte> asBytes(const char (&chars)[s]) {
return kj::ArrayPtr<const char>(chars, s - 1).asBytes();
}
KJ_TEST("HttpClient WebSocket handshake") {
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
auto pipe = kj::newTwoWayPipe();
auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE);
auto serverTask = expectRead(*pipe.ends[1], request)
.then([&]() { return pipe.ends[1]->write({asBytes(WEBSOCKET_RESPONSE_HANDSHAKE)}); })
.then([&]() { return pipe.ends[1]->write({WEBSOCKET_FIRST_MESSAGE_INLINE}); })
.then([&]() { return expectRead(*pipe.ends[1], WEBSOCKET_SEND_MESSAGE); })
.then([&]() { return pipe.ends[1]->write({WEBSOCKET_REPLY_MESSAGE}); })
.then([&]() { return expectRead(*pipe.ends[1], WEBSOCKET_SEND_CLOSE); })
.then([&]() { return pipe.ends[1]->write({WEBSOCKET_REPLY_CLOSE}); })
.eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); });
HttpHeaderTable::Builder tableBuilder;
HttpHeaderId hMyHeader = tableBuilder.add("My-Header");
auto headerTable = tableBuilder.build();
FakeEntropySource entropySource;
auto client = newHttpClient(*headerTable, *pipe.ends[0], entropySource);
kj::HttpHeaders headers(*headerTable);
void testWebSocketClient(kj::WaitScope& waitScope, HttpHeaderTable& headerTable,
kj::HttpHeaderId hMyHeader, HttpClient& client) {
kj::HttpHeaders headers(headerTable);
headers.set(hMyHeader, "foo");
auto response = client->openWebSocket("/websocket", headers).wait(waitScope);
auto response = client.openWebSocket("/websocket", headers).wait(waitScope);
KJ_EXPECT(response.statusCode == 101);
KJ_EXPECT(response.statusText == "Switching Protocols", response.statusText);
......@@ -1758,6 +1719,33 @@ KJ_TEST("HttpClient WebSocket handshake") {
KJ_EXPECT(message.get<WebSocket::Close>().code == 0x1235);
KJ_EXPECT(message.get<WebSocket::Close>().reason == "close-reply:qux");
}
}
KJ_TEST("HttpClient WebSocket handshake") {
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
auto pipe = kj::newTwoWayPipe();
auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE);
auto serverTask = expectRead(*pipe.ends[1], request)
.then([&]() { return pipe.ends[1]->write({asBytes(WEBSOCKET_RESPONSE_HANDSHAKE)}); })
.then([&]() { return pipe.ends[1]->write({WEBSOCKET_FIRST_MESSAGE_INLINE}); })
.then([&]() { return expectRead(*pipe.ends[1], WEBSOCKET_SEND_MESSAGE); })
.then([&]() { return pipe.ends[1]->write({WEBSOCKET_REPLY_MESSAGE}); })
.then([&]() { return expectRead(*pipe.ends[1], WEBSOCKET_SEND_CLOSE); })
.then([&]() { return pipe.ends[1]->write({WEBSOCKET_REPLY_CLOSE}); })
.eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); });
HttpHeaderTable::Builder tableBuilder;
HttpHeaderId hMyHeader = tableBuilder.add("My-Header");
auto headerTable = tableBuilder.build();
FakeEntropySource entropySource;
auto client = newHttpClient(*headerTable, *pipe.ends[0], entropySource);
testWebSocketClient(waitScope, *headerTable, hMyHeader, *client);
serverTask.wait(waitScope);
}
......@@ -1821,7 +1809,7 @@ KJ_TEST("HttpServer WebSocket handshake") {
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
auto request = kj::str("GET /ws-inline", WEBSOCKET_REQUEST_HANDSHAKE);
auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE);
pipe.ends[1]->write({request.asBytes()}).wait(waitScope);
expectRead(*pipe.ends[1], WEBSOCKET_RESPONSE_HANDSHAKE).wait(waitScope);
......@@ -2314,6 +2302,39 @@ KJ_TEST("newHttpService from HttpClient WebSockets disconnect") {
// -----------------------------------------------------------------------------
KJ_TEST("newHttpClient from HttpService") {
auto PIPELINE_TESTS = pipelineTestCases();
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
HttpHeaderTable table;
TestHttpService service(PIPELINE_TESTS, table);
auto client = newHttpClient(service);
for (auto& testCase: PIPELINE_TESTS) {
testHttpClient(waitScope, table, *client, testCase);
}
}
KJ_TEST("newHttpClient from HttpService WebSockets") {
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
auto pipe = kj::newTwoWayPipe();
HttpHeaderTable::Builder tableBuilder;
HttpHeaderId hMyHeader = tableBuilder.add("My-Header");
auto headerTable = tableBuilder.build();
TestWebSocketService service(*headerTable, hMyHeader);
auto client = newHttpClient(service);
testWebSocketClient(waitScope, *headerTable, hMyHeader, *client);
}
// -----------------------------------------------------------------------------
class CountingIoStream final: public kj::AsyncIoStream {
// An AsyncIoStream wrapper which decrements a counter when destroyed (allowing us to count how
// many connections are open).
......
......@@ -2406,17 +2406,544 @@ static kj::Promise<void> pumpWebSocketLoop(WebSocket& from, WebSocket& to) {
}
kj::Promise<void> WebSocket::pumpTo(WebSocket& other) {
return kj::evalNow([&]() {
return pumpWebSocketLoop(*this, other);
}).catch_([&other](kj::Exception&& e) -> kj::Promise<void> {
if (e.getType() == kj::Exception::Type::DISCONNECTED) {
return other.disconnect();
KJ_IF_MAYBE(p, other.tryPumpFrom(*this)) {
// Yay, optimized pump!
return kj::mv(*p);
} else {
// Fall back to default implementation.
return kj::evalNow([&]() {
return pumpWebSocketLoop(*this, other);
}).catch_([&other](kj::Exception&& e) -> kj::Promise<void> {
if (e.getType() == kj::Exception::Type::DISCONNECTED) {
return other.disconnect();
} else {
return other.close(1002, e.getDescription());
}
});
}
}
kj::Maybe<kj::Promise<void>> WebSocket::tryPumpFrom(WebSocket& other) {
return nullptr;
}
namespace {
class AbortableWebSocket: public WebSocket {
public:
virtual void abort() = 0;
};
class WebSocketPipeImpl final: public AbortableWebSocket, public kj::Refcounted {
// Represents one direction of a WebSocket pipe.
//
// This class behaves as a "loopback" WebSocket: a message sent using send() is received using
// receive(), on the same object. This is *not* how WebSocket implementations usually behave.
// But, this object is actually used to implement only one direction of a bidirectional pipe. At
// another layer above this, the pipe is actually composed of two WebSocketPipeEnd instances,
// which layer on top of two WebSocketPipeImpl instances representing the two directions. So,
// send() calls on a WebSocketPipeImpl instance always come from one of the two WebSocketPipeEnds
// while receive() calls come from the other end.
public:
~WebSocketPipeImpl() noexcept(false) {
KJ_REQUIRE(state == nullptr || ownState.get() != nullptr,
"destroying WebSocketPipe with operation still in-progress; probably going to segfault") {
// Don't std::terminate().
break;
}
}
void abort() override {
KJ_IF_MAYBE(s, state) {
s->abort();
} else {
return other.close(1002, e.getDescription());
ownState = heap<Aborted>();
state = *ownState;
}
});
}
}
kj::Promise<void> send(kj::ArrayPtr<const byte> message) override {
KJ_IF_MAYBE(s, state) {
return s->send(message);
} else {
return newAdaptedPromise<void, BlockedSend>(*this, MessagePtr(message));
}
}
kj::Promise<void> send(kj::ArrayPtr<const char> message) override {
KJ_IF_MAYBE(s, state) {
return s->send(message);
} else {
return newAdaptedPromise<void, BlockedSend>(*this, MessagePtr(message));
}
}
kj::Promise<void> close(uint16_t code, kj::StringPtr reason) override {
KJ_IF_MAYBE(s, state) {
return s->close(code, reason);
} else {
return newAdaptedPromise<void, BlockedSend>(*this, MessagePtr(ClosePtr { code, reason }));
}
}
kj::Promise<void> disconnect() override {
KJ_IF_MAYBE(s, state) {
return s->disconnect();
} else {
ownState = heap<Disconnected>();
state = *ownState;
return kj::READY_NOW;
}
}
kj::Maybe<kj::Promise<void>> tryPumpFrom(WebSocket& other) override {
KJ_IF_MAYBE(s, state) {
return s->tryPumpFrom(other);
} else {
return newAdaptedPromise<void, BlockedPumpFrom>(*this, other);
}
}
kj::Promise<Message> receive() override {
KJ_IF_MAYBE(s, state) {
return s->receive();
} else {
return newAdaptedPromise<Message, BlockedReceive>(*this);
}
}
kj::Promise<void> pumpTo(WebSocket& other) override {
KJ_IF_MAYBE(s, state) {
return s->pumpTo(other);
} else {
return newAdaptedPromise<void, BlockedPumpTo>(*this, other);
}
}
private:
kj::Maybe<AbortableWebSocket&> state;
// Object-oriented state! If any method call is blocked waiting on activity from the other end,
// then `state` is non-null and method calls should be forwarded to it. If no calls are
// outstanding, `state` is null.
kj::Own<AbortableWebSocket> ownState;
void endState(WebSocket& obj) {
KJ_IF_MAYBE(s, state) {
if (s == &obj) {
state = nullptr;
}
}
}
struct ClosePtr {
uint16_t code;
kj::StringPtr reason;
};
typedef kj::OneOf<kj::ArrayPtr<const char>, kj::ArrayPtr<const byte>, ClosePtr> MessagePtr;
class BlockedSend final: public AbortableWebSocket {
public:
BlockedSend(kj::PromiseFulfiller<void>& fulfiller, WebSocketPipeImpl& pipe, MessagePtr message)
: fulfiller(fulfiller), pipe(pipe), message(kj::mv(message)) {
KJ_REQUIRE(pipe.state == nullptr);
pipe.state = *this;
}
~BlockedSend() noexcept(false) {
pipe.endState(*this);
}
void abort() override {
canceler.cancel("other end of WebSocketPipe was destroyed");
fulfiller.reject(KJ_EXCEPTION(DISCONNECTED, "other end of WebSocketPipe was destroyed"));
pipe.endState(*this);
pipe.abort();
}
kj::Promise<void> send(kj::ArrayPtr<const byte> message) override {
KJ_FAIL_ASSERT("another message send is already in progress");
}
kj::Promise<void> send(kj::ArrayPtr<const char> message) override {
KJ_FAIL_ASSERT("another message send is already in progress");
}
kj::Promise<void> close(uint16_t code, kj::StringPtr reason) override {
KJ_FAIL_ASSERT("another message send is already in progress");
}
kj::Promise<void> disconnect() override {
KJ_FAIL_ASSERT("another message send is already in progress");
}
kj::Maybe<kj::Promise<void>> tryPumpFrom(WebSocket& other) override {
KJ_FAIL_ASSERT("another message send is already in progress");
}
kj::Promise<Message> receive() override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
fulfiller.fulfill();
pipe.endState(*this);
KJ_SWITCH_ONEOF(message) {
KJ_CASE_ONEOF(arr, kj::ArrayPtr<const char>) {
return Message(kj::str(arr));
}
KJ_CASE_ONEOF(arr, kj::ArrayPtr<const byte>) {
auto copy = kj::heapArray<byte>(arr.size());
memcpy(copy.begin(), arr.begin(), arr.size());
return Message(kj::mv(copy));
}
KJ_CASE_ONEOF(close, ClosePtr) {
return Message(Close { close.code, kj::str(close.reason) });
}
}
KJ_UNREACHABLE;
}
kj::Promise<void> pumpTo(WebSocket& other) override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
kj::Promise<void> promise = nullptr;
KJ_SWITCH_ONEOF(message) {
KJ_CASE_ONEOF(arr, kj::ArrayPtr<const char>) {
promise = other.send(arr);
}
KJ_CASE_ONEOF(arr, kj::ArrayPtr<const byte>) {
promise = other.send(arr);
}
KJ_CASE_ONEOF(close, ClosePtr) {
promise = other.close(close.code, close.reason);
}
}
return canceler.wrap(promise.then([this,&other]() {
canceler.release();
fulfiller.fulfill();
pipe.endState(*this);
return pipe.pumpTo(other);
}, [this](kj::Exception&& e) -> kj::Promise<void> {
canceler.release();
fulfiller.reject(kj::cp(e));
pipe.endState(*this);
return kj::mv(e);
}));
}
private:
kj::PromiseFulfiller<void>& fulfiller;
WebSocketPipeImpl& pipe;
MessagePtr message;
Canceler canceler;
};
class BlockedPumpFrom final: public AbortableWebSocket {
public:
BlockedPumpFrom(kj::PromiseFulfiller<void>& fulfiller, WebSocketPipeImpl& pipe,
WebSocket& input)
: fulfiller(fulfiller), pipe(pipe), input(input) {
KJ_REQUIRE(pipe.state == nullptr);
pipe.state = *this;
}
~BlockedPumpFrom() noexcept(false) {
pipe.endState(*this);
}
void abort() override {
canceler.cancel("other end of WebSocketPipe was destroyed");
fulfiller.reject(KJ_EXCEPTION(DISCONNECTED, "other end of WebSocketPipe was destroyed"));
pipe.endState(*this);
pipe.abort();
}
kj::Promise<void> send(kj::ArrayPtr<const byte> message) override {
KJ_FAIL_ASSERT("another message send is already in progress");
}
kj::Promise<void> send(kj::ArrayPtr<const char> message) override {
KJ_FAIL_ASSERT("another message send is already in progress");
}
kj::Promise<void> close(uint16_t code, kj::StringPtr reason) override {
KJ_FAIL_ASSERT("another message send is already in progress");
}
kj::Promise<void> disconnect() override {
KJ_FAIL_ASSERT("another message send is already in progress");
}
kj::Maybe<kj::Promise<void>> tryPumpFrom(WebSocket& other) override {
KJ_FAIL_ASSERT("another message send is already in progress");
}
kj::Promise<Message> receive() override {
KJ_REQUIRE(canceler.isEmpty(), "another message receive is already in progress");
return canceler.wrap(input.receive()
.then([this](Message message) {
if (message.is<Close>()) {
canceler.release();
fulfiller.fulfill();
pipe.endState(*this);
}
return kj::mv(message);
}, [this](kj::Exception&& e) -> Message {
canceler.release();
fulfiller.reject(kj::cp(e));
pipe.endState(*this);
kj::throwRecoverableException(kj::mv(e));
return Message(kj::String());
}));
}
kj::Promise<void> pumpTo(WebSocket& other) override {
KJ_REQUIRE(canceler.isEmpty(), "another message receive is already in progress");
return canceler.wrap(input.pumpTo(other)
.then([this]() {
canceler.release();
fulfiller.fulfill();
pipe.endState(*this);
}, [this](kj::Exception&& e) {
canceler.release();
fulfiller.reject(kj::cp(e));
pipe.endState(*this);
kj::throwRecoverableException(kj::mv(e));
}));
}
private:
kj::PromiseFulfiller<void>& fulfiller;
WebSocketPipeImpl& pipe;
WebSocket& input;
Canceler canceler;
};
class BlockedReceive final: public AbortableWebSocket {
public:
BlockedReceive(kj::PromiseFulfiller<Message>& fulfiller, WebSocketPipeImpl& pipe)
: fulfiller(fulfiller), pipe(pipe) {
KJ_REQUIRE(pipe.state == nullptr);
pipe.state = *this;
}
~BlockedReceive() noexcept(false) {
pipe.endState(*this);
}
void abort() override {
canceler.cancel("other end of WebSocketPipe was destroyed");
fulfiller.reject(KJ_EXCEPTION(DISCONNECTED, "other end of WebSocketPipe was destroyed"));
pipe.endState(*this);
pipe.abort();
}
kj::Promise<void> send(kj::ArrayPtr<const byte> message) override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
auto copy = kj::heapArray<byte>(message.size());
memcpy(copy.begin(), message.begin(), message.size());
fulfiller.fulfill(Message(kj::mv(copy)));
pipe.endState(*this);
return kj::READY_NOW;
}
kj::Promise<void> send(kj::ArrayPtr<const char> message) override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
fulfiller.fulfill(Message(kj::str(message)));
pipe.endState(*this);
return kj::READY_NOW;
}
kj::Promise<void> close(uint16_t code, kj::StringPtr reason) override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
fulfiller.fulfill(Message(Close { code, kj::str(reason) }));
pipe.endState(*this);
return kj::READY_NOW;
}
kj::Promise<void> disconnect() override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
fulfiller.reject(KJ_EXCEPTION(DISCONNECTED, "WebSocket disconnected"));
pipe.endState(*this);
return pipe.disconnect();
}
kj::Maybe<kj::Promise<void>> tryPumpFrom(WebSocket& other) override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping");
return canceler.wrap(other.receive().then([this,&other](Message message) {
canceler.release();
fulfiller.fulfill(kj::mv(message));
pipe.endState(*this);
return other.pumpTo(pipe);
}, [this](kj::Exception&& e) -> kj::Promise<void> {
canceler.release();
fulfiller.reject(kj::cp(e));
pipe.endState(*this);
return kj::mv(e);
}));
}
kj::Promise<Message> receive() override {
KJ_FAIL_ASSERT("another message receive is already in progress");
}
kj::Promise<void> pumpTo(WebSocket& other) override {
KJ_FAIL_ASSERT("another message receive is already in progress");
}
private:
kj::PromiseFulfiller<Message>& fulfiller;
WebSocketPipeImpl& pipe;
Canceler canceler;
};
class BlockedPumpTo final: public AbortableWebSocket {
public:
BlockedPumpTo(kj::PromiseFulfiller<void>& fulfiller, WebSocketPipeImpl& pipe, WebSocket& output)
: fulfiller(fulfiller), pipe(pipe), output(output) {
KJ_REQUIRE(pipe.state == nullptr);
pipe.state = *this;
}
~BlockedPumpTo() noexcept(false) {
pipe.endState(*this);
}
void abort() override {
canceler.cancel("other end of WebSocketPipe was destroyed");
fulfiller.reject(KJ_EXCEPTION(DISCONNECTED, "other end of WebSocketPipe was destroyed"));
pipe.endState(*this);
pipe.abort();
}
kj::Promise<void> send(kj::ArrayPtr<const byte> message) override {
KJ_REQUIRE(canceler.isEmpty(), "another message send is already in progress");
return canceler.wrap(output.send(message));
}
kj::Promise<void> send(kj::ArrayPtr<const char> message) override {
KJ_REQUIRE(canceler.isEmpty(), "another message send is already in progress");
return canceler.wrap(output.send(message));
}
kj::Promise<void> close(uint16_t code, kj::StringPtr reason) override {
KJ_REQUIRE(canceler.isEmpty(), "another message send is already in progress");
return canceler.wrap(output.close(code, reason));
}
kj::Promise<void> disconnect() override {
KJ_REQUIRE(canceler.isEmpty(), "another message send is already in progress");
return canceler.wrap(output.disconnect().then([this]() {
canceler.release();
pipe.endState(*this);
fulfiller.fulfill();
return pipe.disconnect();
}));
}
kj::Maybe<kj::Promise<void>> tryPumpFrom(WebSocket& other) override {
KJ_REQUIRE(canceler.isEmpty(), "another message send is already in progress");
return canceler.wrap(other.pumpTo(output).then([this]() {
canceler.release();
pipe.endState(*this);
fulfiller.fulfill();
}));
}
kj::Promise<Message> receive() override {
KJ_FAIL_ASSERT("another message receive is already in progress");
}
kj::Promise<void> pumpTo(WebSocket& other) override {
KJ_FAIL_ASSERT("another message receive is already in progress");
}
private:
kj::PromiseFulfiller<void>& fulfiller;
WebSocketPipeImpl& pipe;
WebSocket& output;
Canceler canceler;
};
class Disconnected final: public AbortableWebSocket {
public:
void abort() override {
// can ignore
}
kj::Promise<void> send(kj::ArrayPtr<const byte> message) override {
KJ_FAIL_REQUIRE("can't send() after disconnect()");
}
kj::Promise<void> send(kj::ArrayPtr<const char> message) override {
KJ_FAIL_REQUIRE("can't send() after disconnect()");
}
kj::Promise<void> close(uint16_t code, kj::StringPtr reason) override {
KJ_FAIL_REQUIRE("can't close() after disconnect()");
}
kj::Promise<void> disconnect() override {
return kj::READY_NOW;
}
kj::Maybe<kj::Promise<void>> tryPumpFrom(WebSocket& other) override {
KJ_FAIL_REQUIRE("can't tryPumpFrom() after disconnect()");
}
kj::Promise<Message> receive() override {
return KJ_EXCEPTION(DISCONNECTED, "WebSocket disconnected");
}
kj::Promise<void> pumpTo(WebSocket& other) override {
return kj::READY_NOW;
}
};
class Aborted final: public AbortableWebSocket {
public:
void abort() override {
// can ignore
}
kj::Promise<void> send(kj::ArrayPtr<const byte> message) override {
return KJ_EXCEPTION(DISCONNECTED, "other end of WebSocketPipe was destroyed");
}
kj::Promise<void> send(kj::ArrayPtr<const char> message) override {
return KJ_EXCEPTION(DISCONNECTED, "other end of WebSocketPipe was destroyed");
}
kj::Promise<void> close(uint16_t code, kj::StringPtr reason) override {
return KJ_EXCEPTION(DISCONNECTED, "other end of WebSocketPipe was destroyed");
}
kj::Promise<void> disconnect() override {
return KJ_EXCEPTION(DISCONNECTED, "other end of WebSocketPipe was destroyed");
}
kj::Maybe<kj::Promise<void>> tryPumpFrom(WebSocket& other) override {
return kj::Promise<void>(KJ_EXCEPTION(DISCONNECTED,
"other end of WebSocketPipe was destroyed"));
}
kj::Promise<Message> receive() override {
return KJ_EXCEPTION(DISCONNECTED, "other end of WebSocketPipe was destroyed");
}
kj::Promise<void> pumpTo(WebSocket& other) override {
return KJ_EXCEPTION(DISCONNECTED, "other end of WebSocketPipe was destroyed");
}
};
};
class WebSocketPipeEnd final: public WebSocket {
public:
WebSocketPipeEnd(kj::Own<WebSocketPipeImpl> in, kj::Own<WebSocketPipeImpl> out)
: in(kj::mv(in)), out(kj::mv(out)) {}
~WebSocketPipeEnd() noexcept(false) {
in->abort();
out->abort();
}
kj::Promise<void> send(kj::ArrayPtr<const byte> message) override {
return out->send(message);
}
kj::Promise<void> send(kj::ArrayPtr<const char> message) override {
return out->send(message);
}
kj::Promise<void> close(uint16_t code, kj::StringPtr reason) override {
return out->close(code, reason);
}
kj::Promise<void> disconnect() override {
return out->disconnect();
}
kj::Maybe<kj::Promise<void>> tryPumpFrom(WebSocket& other) override {
return out->tryPumpFrom(other);
}
kj::Promise<Message> receive() override {
return in->receive();
}
kj::Promise<void> pumpTo(WebSocket& other) override {
return in->pumpTo(other);
}
private:
kj::Own<WebSocketPipeImpl> in;
kj::Own<WebSocketPipeImpl> out;
};
} // namespace
WebSocketPipe newWebSocketPipe() {
auto pipe1 = kj::refcounted<WebSocketPipeImpl>();
auto pipe2 = kj::refcounted<WebSocketPipeImpl>();
auto end1 = kj::heap<WebSocketPipeEnd>(kj::addRef(*pipe1), kj::addRef(*pipe2));
auto end2 = kj::heap<WebSocketPipeEnd>(kj::mv(pipe2), kj::mv(pipe1));
return { { kj::mv(end1), kj::mv(end2) } };
}
// =======================================================================================
......@@ -3204,6 +3731,207 @@ kj::Own<HttpClient> newHttpClient(kj::Timer& timer, HttpHeaderTable& responseHea
namespace {
class NullInputStream final: public kj::AsyncInputStream {
public:
NullInputStream(kj::Maybe<size_t> expectedLength = size_t(0))
: expectedLength(expectedLength) {}
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return size_t(0);
}
kj::Maybe<uint64_t> tryGetLength() override {
return expectedLength;
}
kj::Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override {
return uint64_t(0);
}
private:
kj::Maybe<size_t> expectedLength;
};
class NullOutputStream final: public kj::AsyncOutputStream {
public:
Promise<void> write(const void* buffer, size_t size) override {
return kj::READY_NOW;
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
return kj::READY_NOW;
}
// We can't really optimize tryPumpFrom() unless AsyncInputStream grows a skip() method.
};
class HttpClientAdapter final: public HttpClient {
public:
HttpClientAdapter(HttpService& service): service(service) {}
Request request(HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::Maybe<uint64_t> expectedBodySize = nullptr) override {
// We have to clone the URL and headers because HttpService implementation are allowed to
// assume that they remain valid until the service handler completes whereas HttpClient callers
// are allowed to destroy them immediately after the call.
auto urlCopy = kj::str(url);
auto headersCopy = kj::heap(headers.clone());
auto pipe = newOneWayPipe(expectedBodySize);
auto paf = kj::newPromiseAndFulfiller<Response>();
auto responder = kj::refcounted<ResponseImpl>(method, kj::mv(paf.fulfiller));
auto promise = service.request(method, urlCopy, *headersCopy, *pipe.in, *responder);
responder->setPromise(promise.attach(kj::mv(pipe.in), kj::mv(urlCopy), kj::mv(headersCopy)));
return {
kj::mv(pipe.out),
paf.promise.attach(kj::mv(responder))
};
}
kj::Promise<WebSocketResponse> openWebSocket(
kj::StringPtr url, const HttpHeaders& headers) override {
// We have to clone the URL and headers because HttpService implementation are allowed to
// assume that they remain valid until the service handler completes whereas HttpClient callers
// are allowed to destroy them immediately after the call. Also we need to add
// `Upgrade: websocket` so that headers.isWebSocket() returns true on the service side.
auto urlCopy = kj::str(url);
auto headersCopy = kj::heap(headers.clone());
headersCopy->set(HttpHeaderId::UPGRADE, "websocket");
KJ_DASSERT(headersCopy->isWebSocket());
auto paf = kj::newPromiseAndFulfiller<WebSocketResponse>();
auto responder = kj::refcounted<WebSocketResponseImpl>(kj::mv(paf.fulfiller));
auto in = kj::heap<NullInputStream>();
auto promise = service.request(HttpMethod::GET, urlCopy, *headersCopy, *in, *responder);
responder->setPromise(promise.attach(kj::mv(in), kj::mv(urlCopy), kj::mv(headersCopy)));
return paf.promise.attach(kj::mv(responder));
}
kj::Promise<kj::Own<kj::AsyncIoStream>> connect(kj::StringPtr host) override {
return service.connect(kj::mv(host));
}
private:
HttpService& service;
class ResponseImpl final: public HttpService::Response, public kj::Refcounted {
public:
ResponseImpl(kj::HttpMethod method,
kj::Own<kj::PromiseFulfiller<HttpClient::Response>> fulfiller)
: method(method), fulfiller(kj::mv(fulfiller)) {}
void setPromise(kj::Promise<void> promise) {
task = promise.eagerlyEvaluate([this](kj::Exception&& exception) {
if (fulfiller->isWaiting()) {
fulfiller->reject(kj::mv(exception));
} else {
KJ_LOG(ERROR, "HttpService threw an exception after having already started responding",
exception);
}
});
}
kj::Own<kj::AsyncOutputStream> send(
uint statusCode, kj::StringPtr statusText, const HttpHeaders& headers,
kj::Maybe<uint64_t> expectedBodySize = nullptr) override {
// The caller of HttpClient is allowed to assume that the statusText and headers remain
// valid until the body stream is dropped, but the HttpService implementation is allowed to
// send values that are only valid until send() returns, so we have to copy.
auto statusTextCopy = kj::str(statusText);
auto headersCopy = kj::heap(headers.clone());
if (method == kj::HttpMethod::HEAD) {
fulfiller->fulfill({
statusCode, statusTextCopy, headersCopy.get(),
kj::heap<NullInputStream>(expectedBodySize)
.attach(kj::addRef(*this), kj::mv(statusTextCopy), kj::mv(headersCopy))
});
return kj::heap<NullOutputStream>();
} else {
auto pipe = newOneWayPipe(expectedBodySize);
fulfiller->fulfill({
statusCode, statusTextCopy, headersCopy.get(),
pipe.in.attach(kj::addRef(*this), kj::mv(statusTextCopy), kj::mv(headersCopy))
});
return kj::mv(pipe.out);
}
}
kj::Own<WebSocket> acceptWebSocket(const HttpHeaders& headers) override {
KJ_FAIL_REQUIRE("a WebSocket was not requested");
}
private:
kj::HttpMethod method;
kj::Own<kj::PromiseFulfiller<HttpClient::Response>> fulfiller;
kj::Promise<void> task = nullptr;
};
class WebSocketResponseImpl final: public HttpService::Response, public kj::Refcounted {
public:
WebSocketResponseImpl(kj::Own<kj::PromiseFulfiller<HttpClient::WebSocketResponse>> fulfiller)
: fulfiller(kj::mv(fulfiller)) {}
void setPromise(kj::Promise<void> promise) {
task = promise.eagerlyEvaluate([this](kj::Exception&& exception) {
if (fulfiller->isWaiting()) {
fulfiller->reject(kj::mv(exception));
} else {
KJ_LOG(ERROR, "HttpService threw an exception after having already started responding",
exception);
}
});
}
kj::Own<kj::AsyncOutputStream> send(
uint statusCode, kj::StringPtr statusText, const HttpHeaders& headers,
kj::Maybe<uint64_t> expectedBodySize = nullptr) override {
// The caller of HttpClient is allowed to assume that the statusText and headers remain
// valid until the body stream is dropped, but the HttpService implementation is allowed to
// send values that are only valid until send() returns, so we have to copy.
auto statusTextCopy = kj::str(statusText);
auto headersCopy = kj::heap(headers.clone());
auto pipe = newOneWayPipe(expectedBodySize);
fulfiller->fulfill({
statusCode, statusTextCopy, headersCopy.get(),
pipe.in.attach(kj::addRef(*this), kj::mv(statusTextCopy), kj::mv(headersCopy))
});
return kj::mv(pipe.out);
}
kj::Own<WebSocket> acceptWebSocket(const HttpHeaders& headers) override {
// The caller of HttpClient is allowed to assume that the headers remain valid until the body
// stream is dropped, but the HttpService implementation is allowed to send headers that are
// only valid until acceptWebSocket() returns, so we have to copy.
auto headersCopy = kj::heap(headers.clone());
auto pipe = newWebSocketPipe();
fulfiller->fulfill({
101, "Switching Protocols", headersCopy.get(),
pipe.ends[0].attach(kj::addRef(*this), kj::mv(headersCopy))
});
return kj::mv(pipe.ends[1]);
}
private:
kj::Own<kj::PromiseFulfiller<HttpClient::WebSocketResponse>> fulfiller;
kj::Promise<void> task = nullptr;
};
};
} // namespace
kj::Own<HttpClient> newHttpClient(HttpService& service) {
return kj::heap<HttpClientAdapter>(service);
}
// =======================================================================================
namespace {
class HttpServiceAdapter final: public HttpService {
public:
HttpServiceAdapter(HttpClient& client): client(client) {}
......@@ -3256,8 +3984,6 @@ public:
return client.connect(kj::mv(host));
}
// TODO(soon): other methods
private:
HttpClient& client;
};
......
......@@ -424,7 +424,7 @@ public:
// Read one message from the WebSocket and return it. Can only call once at a time. Do not call
// again after Close is received.
kj::Promise<void> pumpTo(WebSocket& other);
virtual kj::Promise<void> pumpTo(WebSocket& other);
// Continuously receives messages from this WebSocket and send them to `other`.
//
// On EOF, calls other.disconnect(), then resolves.
......@@ -432,6 +432,12 @@ public:
// On other read errors, calls other.close() with the error, then resolves.
//
// On write error, rejects with the error.
virtual kj::Maybe<kj::Promise<void>> tryPumpFrom(WebSocket& other);
// Either returns null, or performs the equivalent of other.pumpTo(*this). Only returns non-null
// if this WebSocket implementation is able to perform the pump in an optimized way, better than
// the default implementation of pumpTo(). The default implementation of pumpTo() always tries
// calling this first, and the default implementation of tryPumpFrom() always returns null.
};
class HttpClient {
......@@ -634,6 +640,15 @@ kj::Own<WebSocket> newWebSocket(kj::Own<kj::AsyncIoStream> stream,
// like HTTP requests" in a message as being actual HTTP requests, which could result in cache
// poisoning. See RFC6455 section 10.3.
struct WebSocketPipe {
kj::Own<WebSocket> ends[2];
};
WebSocketPipe newWebSocketPipe();
// Create a WebSocket pipe. Messages written to one end of the pipe will be readable from the other
// end. No buffering occurs -- a message send does not complete until a corresponding receive
// accepts the message.
struct HttpServerSettings {
kj::Duration headerTimeout = 15 * kj::SECONDS;
// After initial connection open, or after receiving the first byte of a pipelined request,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment