Commit f5190d24 authored by Kenton Varda's avatar Kenton Varda

Define and implement HTTP-over-Cap'n-Proto.

This allows an HTTP request/response to be forwarded over Cap'n Proto RPC, multiplexed with arbitrary other RPC transactions.

This could be compared with HTTP/2, which is a binary protocol representation of HTTP that allows multiplexing. HTTP-over-Cap'n-Proto provides the same, but with some advantages inherent in leveraging Cap'n Proto:
- HTTP transactions can be multiplexed with regular Cap'n Proto RPC transactions. (While in theory you could also layer RPC on top of HTTP, as gRPC does, HTTP transactions are much heavier than basic RPC. In my opinion, layering HTTP over RPC makes much more sense because of this.)
- HTTP endpoints are object capabilities. Multiple private endpoints can be multiplexed over the same connection.
- Either end of the connection can act as the client or server, exposing endpoints to each other.
- Cap'n Proto path shortening can kick in. For instance, imagine a request that passes through several proxies, then eventually returns a large streaming response. If the proxies is each a Cap'n Proto server with a level 3 RPC implementation, and the response is simply passed through verbatim, the response stream will automatically be shortened to skip over the middleman servers. At present no Cap'n Proto implementation supports level 3, but path shortening can also apply with only level 1 RPC, if all calls proxy through a central hub process, as is often the case in multi-tenant sandboxing scenarios.

There are also disadvantages vs. HTTP/2:
- HTTP/2 is a standard. This is not.
- This protocol is not as finely optimized for the HTTP use case. It will take somewhat more bandwidth on the wire.
- No mechanism for server push has been defined, although this could be a relatively simple addition to the http-over-capnp interface definitions.
- No mechanism for stream prioritization is defined -- this would likely require new features in the Cap'n Proto RPC implementation itself.
- At present, the backpressure mechanism is naive and its performance will suffer as the network distance increases. I intend to solve this by adding better backpressure mechanisms into Cap'n Proto itself.

Shims are provided for compatibility with the KJ HTTP interfaces.

Note that Sandstorm has its own http-over-capnp protocol: https://github.com/sandstorm-io/sandstorm/blob/master/src/sandstorm/web-session.capnp

Sandstorm's protocol and this new one are intended for very different use cases. Sandstorm implements sandboxing of web applications on both the client and server sides. As a result, it cares deeply about the semantics of HTTP headers and how they affect the browser. This new http-over-capnp protocol is meant to be a dumb bridge that simply passes through all headers verbatim.
parent 00c0cbfb
// Copyright (c) 2019 Cloudflare, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#include "http-over-capnp.h"
#include <kj/test.h>
namespace capnp {
namespace {
KJ_TEST("KJ and RPC HTTP method enums match") {
#define EXPECT_MATCH(METHOD) \
KJ_EXPECT(static_cast<uint>(kj::HttpMethod::METHOD) == \
static_cast<uint>(capnp::HttpMethod::METHOD));
KJ_HTTP_FOR_EACH_METHOD(EXPECT_MATCH);
#undef EXPECT_MATCH
}
// =======================================================================================
kj::Promise<void> expectRead(kj::AsyncInputStream& in, kj::StringPtr expected) {
if (expected.size() == 0) return kj::READY_NOW;
auto buffer = kj::heapArray<char>(expected.size());
auto promise = in.tryRead(buffer.begin(), 1, buffer.size());
return promise.then(kj::mvCapture(buffer, [&in,expected](kj::Array<char> buffer, size_t amount) {
if (amount == 0) {
KJ_FAIL_ASSERT("expected data never sent", expected);
}
auto actual = buffer.slice(0, amount);
if (memcmp(actual.begin(), expected.begin(), actual.size()) != 0) {
KJ_FAIL_ASSERT("data from stream doesn't match expected", expected, actual);
}
return expectRead(in, expected.slice(amount));
}));
}
enum Direction {
CLIENT_TO_SERVER,
SERVER_TO_CLIENT
};
struct TestStep {
Direction direction;
kj::StringPtr send;
kj::StringPtr receive;
constexpr TestStep(Direction direction, kj::StringPtr send, kj::StringPtr receive)
: direction(direction), send(send), receive(receive) {}
constexpr TestStep(Direction direction, kj::StringPtr data)
: direction(direction), send(data), receive(data) {}
};
constexpr TestStep TEST_STEPS[] = {
// Test basic request.
{
CLIENT_TO_SERVER,
"GET / HTTP/1.1\r\n"
"Host: example.com\r\n"
"\r\n"_kj,
},
{
SERVER_TO_CLIENT,
"HTTP/1.1 200 OK\r\n"
"Content-Length: 3\r\n"
"\r\n"
"foo"_kj
},
// Try PUT, vary path, vary status
{
CLIENT_TO_SERVER,
"PUT /foo/bar HTTP/1.1\r\n"
"Content-Length: 5\r\n"
"Host: example.com\r\n"
"\r\n"
"corge"_kj,
},
{
SERVER_TO_CLIENT,
"HTTP/1.1 403 Unauthorized\r\n"
"Content-Length: 4\r\n"
"\r\n"
"nope"_kj
},
// HEAD request
{
CLIENT_TO_SERVER,
"HEAD /foo/bar HTTP/1.1\r\n"
"Host: example.com\r\n"
"\r\n"_kj,
},
{
SERVER_TO_CLIENT,
"HTTP/1.1 200 OK\r\n"
"Content-Length: 4\r\n"
"\r\n"_kj
},
// Empty-body response
{
CLIENT_TO_SERVER,
"GET /foo/bar HTTP/1.1\r\n"
"Host: example.com\r\n"
"\r\n"_kj,
},
{
SERVER_TO_CLIENT,
"HTTP/1.1 304 Not Modified\r\n"
"Server: foo\r\n"
"\r\n"_kj
},
// Chonky body
{
CLIENT_TO_SERVER,
"POST / HTTP/1.1\r\n"
"Transfer-Encoding: chunked\r\n"
"Host: example.com\r\n"
"\r\n"
"3\r\n"
"foo\r\n"
"5\r\n"
"corge\r\n"
"0\r\n"
"\r\n"_kj,
},
{
SERVER_TO_CLIENT,
"HTTP/1.1 200 OK\r\n"
"Transfer-Encoding: chunked\r\n"
"\r\n"
"6\r\n"
"barbaz\r\n"
"6\r\n"
"garply\r\n"
"0\r\n"
"\r\n"_kj
},
// Streaming
{
CLIENT_TO_SERVER,
"POST / HTTP/1.1\r\n"
"Content-Length: 9\r\n"
"Host: example.com\r\n"
"\r\n"_kj,
},
{
CLIENT_TO_SERVER,
"foo"_kj,
},
{
CLIENT_TO_SERVER,
"bar"_kj,
},
{
CLIENT_TO_SERVER,
"baz"_kj,
},
{
SERVER_TO_CLIENT,
"HTTP/1.1 200 OK\r\n"
"Transfer-Encoding: chunked\r\n"
"\r\n"_kj,
},
{
SERVER_TO_CLIENT,
"6\r\n"
"barbaz\r\n"_kj,
},
{
SERVER_TO_CLIENT,
"6\r\n"
"garply\r\n"_kj,
},
{
SERVER_TO_CLIENT,
"0\r\n"
"\r\n"_kj
},
// Bidirectional.
{
CLIENT_TO_SERVER,
"POST / HTTP/1.1\r\n"
"Content-Length: 9\r\n"
"Host: example.com\r\n"
"\r\n"_kj,
},
{
SERVER_TO_CLIENT,
"HTTP/1.1 200 OK\r\n"
"Transfer-Encoding: chunked\r\n"
"\r\n"_kj,
},
{
CLIENT_TO_SERVER,
"foo"_kj,
},
{
SERVER_TO_CLIENT,
"6\r\n"
"barbaz\r\n"_kj,
},
{
CLIENT_TO_SERVER,
"bar"_kj,
},
{
SERVER_TO_CLIENT,
"6\r\n"
"garply\r\n"_kj,
},
{
CLIENT_TO_SERVER,
"baz"_kj,
},
{
SERVER_TO_CLIENT,
"0\r\n"
"\r\n"_kj
},
// Test headers being re-ordered by KJ. This isn't necessary behavior, but it does prove that
// we're not testing a pure streaming pass-through...
{
CLIENT_TO_SERVER,
"GET / HTTP/1.1\r\n"
"Host: example.com\r\n"
"Accept: text/html\r\n"
"Foo-Header: 123\r\n"
"User-Agent: kj\r\n"
"Accept-Language: en\r\n"
"\r\n"_kj,
"GET / HTTP/1.1\r\n"
"Host: example.com\r\n"
"Accept-Language: en\r\n"
"Accept: text/html\r\n"
"User-Agent: kj\r\n"
"Foo-Header: 123\r\n"
"\r\n"_kj
},
{
SERVER_TO_CLIENT,
"HTTP/1.1 200 OK\r\n"
"Server: kj\r\n"
"Bar: 321\r\n"
"Content-Length: 3\r\n"
"\r\n"
"foo"_kj,
"HTTP/1.1 200 OK\r\n"
"Content-Length: 3\r\n"
"Server: kj\r\n"
"Bar: 321\r\n"
"\r\n"
"foo"_kj
},
// We finish up a request with no response, to test cancellation.
{
CLIENT_TO_SERVER,
"GET / HTTP/1.1\r\n"
"Host: example.com\r\n"
"\r\n"_kj,
},
};
class OneConnectNetworkAddress final: public kj::NetworkAddress {
public:
OneConnectNetworkAddress(kj::Own<kj::AsyncIoStream> stream)
: stream(kj::mv(stream)) {}
kj::Promise<kj::Own<kj::AsyncIoStream>> connect() override {
auto result = KJ_ASSERT_NONNULL(kj::mv(stream));
stream = nullptr;
return kj::mv(result);
}
kj::Own<kj::ConnectionReceiver> listen() override { KJ_UNIMPLEMENTED("test"); }
kj::Own<kj::NetworkAddress> clone() override { KJ_UNIMPLEMENTED("test"); }
kj::String toString() override { KJ_UNIMPLEMENTED("test"); }
private:
kj::Maybe<kj::Own<kj::AsyncIoStream>> stream;
};
void runEndToEndTests(kj::Timer& timer, kj::HttpHeaderTable& headerTable,
HttpOverCapnpFactory& clientFactory, HttpOverCapnpFactory& serverFactory,
kj::WaitScope& waitScope) {
auto clientPipe = kj::newTwoWayPipe();
auto serverPipe = kj::newTwoWayPipe();
OneConnectNetworkAddress oneConnectAddr(kj::mv(serverPipe.ends[0]));
auto backHttp = kj::newHttpClient(timer, headerTable, oneConnectAddr);
auto backCapnp = serverFactory.kjToCapnp(kj::newHttpService(*backHttp));
auto frontCapnp = clientFactory.capnpToKj(backCapnp);
kj::HttpServer frontKj(timer, headerTable, *frontCapnp);
auto listenTask = frontKj.listenHttp(kj::mv(clientPipe.ends[1]))
.eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); });
for (auto& step: TEST_STEPS) {
KJ_CONTEXT(step.send);
kj::AsyncOutputStream* out;
kj::AsyncInputStream* in;
switch (step.direction) {
case CLIENT_TO_SERVER:
out = clientPipe.ends[0];
in = serverPipe.ends[1];
break;
case SERVER_TO_CLIENT:
out = serverPipe.ends[1];
in = clientPipe.ends[0];
break;
}
auto writePromise = out->write(step.send.begin(), step.send.size());
auto readPromise = expectRead(*in, step.receive);
if (!writePromise.poll(waitScope)) {
if (readPromise.poll(waitScope)) {
readPromise.wait(waitScope);
KJ_FAIL_ASSERT("write hung, read worked fine");
} else {
KJ_FAIL_ASSERT("write and read both hung");
}
}
writePromise.wait(waitScope);
KJ_ASSERT(readPromise.poll(waitScope), "read hung");
readPromise.wait(waitScope);
}
// The last test message was a request with no response. If we now close the client end, this
// should propagate all the way through to close the server end!
clientPipe.ends[0] = nullptr;
auto lastRead = serverPipe.ends[1]->readAllText();
KJ_ASSERT(lastRead.poll(waitScope), "last read hung");
KJ_EXPECT(lastRead.wait(waitScope) == 0);
}
KJ_TEST("HTTP-over-Cap'n-Proto E2E, no path shortening") {
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
ByteStreamFactory streamFactory1;
ByteStreamFactory streamFactory2;
kj::HttpHeaderTable::Builder tableBuilder;
HttpOverCapnpFactory factory1(streamFactory1, tableBuilder);
HttpOverCapnpFactory factory2(streamFactory2, tableBuilder);
auto headerTable = tableBuilder.build();
runEndToEndTests(timer, *headerTable, factory1, factory2, waitScope);
}
KJ_TEST("HTTP-over-Cap'n-Proto E2E, with path shortening") {
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::TimerImpl timer(kj::origin<kj::TimePoint>());
ByteStreamFactory streamFactory;
kj::HttpHeaderTable::Builder tableBuilder;
HttpOverCapnpFactory factory(streamFactory, tableBuilder);
auto headerTable = tableBuilder.build();
runEndToEndTests(timer, *headerTable, factory, factory, waitScope);
}
// =======================================================================================
class WebSocketAccepter final: public kj::HttpService {
public:
WebSocketAccepter(kj::HttpHeaderTable& headerTable,
kj::Own<kj::PromiseFulfiller<kj::Own<kj::WebSocket>>> fulfiller,
kj::Promise<void> done)
: headerTable(headerTable), fulfiller(kj::mv(fulfiller)), done(kj::mv(done)) {}
kj::Promise<void> request(
kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders& headers,
kj::AsyncInputStream& requestBody, Response& response) {
kj::HttpHeaders respHeaders(headerTable);
respHeaders.add("X-Foo", "bar");
fulfiller->fulfill(response.acceptWebSocket(respHeaders));
return kj::mv(done);
}
private:
kj::HttpHeaderTable& headerTable;
kj::Own<kj::PromiseFulfiller<kj::Own<kj::WebSocket>>> fulfiller;
kj::Promise<void> done;
};
void runWebSocketTests(kj::HttpHeaderTable& headerTable,
HttpOverCapnpFactory& clientFactory, HttpOverCapnpFactory& serverFactory,
kj::WaitScope& waitScope) {
// We take a different approach here, because writing out raw WebSocket frames is a pain.
// It's easier to test WebSockets at the KJ API level.
auto wsPaf = kj::newPromiseAndFulfiller<kj::Own<kj::WebSocket>>();
auto donePaf = kj::newPromiseAndFulfiller<void>();
auto back = serverFactory.kjToCapnp(kj::heap<WebSocketAccepter>(
headerTable, kj::mv(wsPaf.fulfiller), kj::mv(donePaf.promise)));
auto front = clientFactory.capnpToKj(back);
auto client = kj::newHttpClient(*front);
auto resp = client->openWebSocket("/ws", kj::HttpHeaders(headerTable)).wait(waitScope);
KJ_ASSERT(resp.webSocketOrBody.is<kj::Own<kj::WebSocket>>());
auto clientWs = kj::mv(resp.webSocketOrBody.get<kj::Own<kj::WebSocket>>());
auto serverWs = wsPaf.promise.wait(waitScope);
{
auto promise = clientWs->send("foo"_kj);
auto message = serverWs->receive().wait(waitScope);
promise.wait(waitScope);
KJ_ASSERT(message.is<kj::String>());
KJ_EXPECT(message.get<kj::String>() == "foo");
}
{
auto promise = serverWs->send("bar"_kj.asBytes());
auto message = clientWs->receive().wait(waitScope);
promise.wait(waitScope);
KJ_ASSERT(message.is<kj::Array<kj::byte>>());
KJ_EXPECT(kj::str(message.get<kj::Array<kj::byte>>().asChars()) == "bar");
}
{
auto promise = clientWs->close(1234, "baz"_kj);
auto message = serverWs->receive().wait(waitScope);
promise.wait(waitScope);
KJ_ASSERT(message.is<kj::WebSocket::Close>());
KJ_EXPECT(message.get<kj::WebSocket::Close>().code == 1234);
KJ_EXPECT(message.get<kj::WebSocket::Close>().reason == "baz");
}
{
auto promise = serverWs->disconnect();
auto receivePromise = clientWs->receive();
KJ_EXPECT(receivePromise.poll(waitScope));
KJ_EXPECT_THROW(DISCONNECTED, receivePromise.wait(waitScope));
promise.wait(waitScope);
}
}
KJ_TEST("HTTP-over-Cap'n Proto WebSocket, no path shortening") {
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
ByteStreamFactory streamFactory1;
ByteStreamFactory streamFactory2;
kj::HttpHeaderTable::Builder tableBuilder;
HttpOverCapnpFactory factory1(streamFactory1, tableBuilder);
HttpOverCapnpFactory factory2(streamFactory2, tableBuilder);
auto headerTable = tableBuilder.build();
runWebSocketTests(*headerTable, factory1, factory2, waitScope);
}
KJ_TEST("HTTP-over-Cap'n Proto WebSocket, with path shortening") {
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
ByteStreamFactory streamFactory;
kj::HttpHeaderTable::Builder tableBuilder;
HttpOverCapnpFactory factory(streamFactory, tableBuilder);
auto headerTable = tableBuilder.build();
runWebSocketTests(*headerTable, factory, factory, waitScope);
}
} // namespace
} // namespace capnp
// Copyright (c) 2019 Cloudflare, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#include "http-over-capnp.h"
#include <kj/debug.h>
#include <capnp/schema.h>
namespace capnp {
using kj::uint;
using kj::byte;
class HttpOverCapnpFactory::RequestState final
: public kj::Refcounted, public kj::TaskSet::ErrorHandler {
public:
RequestState() {
tasks.emplace(*this);
}
template <typename T>
kj::Promise<T> wrap(kj::Promise<T>&& promise) {
if (tasks == nullptr) {
return KJ_EXCEPTION(DISCONNECTED, "client canceled HTTP request");
} else {
return canceler.wrap(kj::mv(promise));
}
}
void cancel() {
if (tasks != nullptr) {
if (canceler.isEmpty()) {
canceler.cancel(KJ_EXCEPTION(DISCONNECTED, "request canceled"));
}
tasks = nullptr;
webSocket = nullptr;
}
}
void assertNotCanceled() {
if (tasks == nullptr) {
kj::throwFatalException(KJ_EXCEPTION(DISCONNECTED, "client canceled HTTP request"));
}
}
void addTask(kj::Promise<void> task) {
KJ_IF_MAYBE(t, tasks) {
t->add(kj::mv(task));
} else {
// Just drop the task.
}
}
kj::Promise<void> finishTasks() {
// This is merged into the final promise, so we don't need to worry about wrapping it for
// cancellation.
return KJ_REQUIRE_NONNULL(tasks).onEmpty()
.then([this]() {
KJ_IF_MAYBE(e, error) {
kj::throwRecoverableException(kj::mv(*e));
}
});
}
void taskFailed(kj::Exception&& exception) override {
if (error == nullptr) {
error = kj::mv(exception);
}
}
void holdWebSocket(kj::Own<kj::WebSocket> webSocket) {
// Hold on to this WebSocket until cancellation.
KJ_REQUIRE(this->webSocket == nullptr);
KJ_REQUIRE(tasks != nullptr);
this->webSocket = kj::mv(webSocket);
}
void disconnectWebSocket() {
KJ_IF_MAYBE(t, tasks) {
t->add(kj::evalNow([&]() { return KJ_ASSERT_NONNULL(webSocket)->disconnect(); }));
}
}
private:
kj::Maybe<kj::Exception> error;
kj::Maybe<kj::Own<kj::WebSocket>> webSocket;
kj::Canceler canceler;
kj::Maybe<kj::TaskSet> tasks;
};
// =======================================================================================
class HttpOverCapnpFactory::CapnpToKjWebSocketAdapter final: public capnp::WebSocket::Server {
public:
CapnpToKjWebSocketAdapter(kj::Own<RequestState> state, kj::WebSocket& webSocket,
kj::Promise<Capability::Client> shorteningPromise)
: state(kj::mv(state)), webSocket(webSocket),
shorteningPromise(kj::mv(shorteningPromise)) {}
~CapnpToKjWebSocketAdapter() noexcept(false) {
state->disconnectWebSocket();
}
kj::Maybe<kj::Promise<Capability::Client>> shortenPath() override {
return kj::mv(shorteningPromise);
}
kj::Promise<void> sendText(SendTextContext context) override {
return state->wrap(webSocket.send(context.getParams().getText()));
}
kj::Promise<void> sendData(SendDataContext context) override {
return state->wrap(webSocket.send(context.getParams().getData()));
}
kj::Promise<void> close(CloseContext context) override {
auto params = context.getParams();
return state->wrap(webSocket.close(params.getCode(), params.getReason()));
}
private:
kj::Own<RequestState> state;
kj::WebSocket& webSocket;
kj::Promise<Capability::Client> shorteningPromise;
};
class HttpOverCapnpFactory::KjToCapnpWebSocketAdapter final: public kj::WebSocket {
public:
KjToCapnpWebSocketAdapter(
kj::Maybe<kj::Own<kj::WebSocket>> in, capnp::WebSocket::Client out,
kj::Own<kj::PromiseFulfiller<kj::Promise<Capability::Client>>> shorteningFulfiller)
: in(kj::mv(in)), out(kj::mv(out)), shorteningFulfiller(kj::mv(shorteningFulfiller)) {}
~KjToCapnpWebSocketAdapter() noexcept(false) {
if (shorteningFulfiller->isWaiting()) {
// We want to make sure the fulfiller is not rejected with a bogus "PromiseFulfiller
// destroyed" error, so fulfill it with never-done.
shorteningFulfiller->fulfill(kj::NEVER_DONE);
}
}
kj::Promise<void> send(kj::ArrayPtr<const byte> message) override {
auto req = KJ_REQUIRE_NONNULL(out, "already called disconnect()").sendDataRequest(
MessageSize { 8 + message.size() / sizeof(word), 0 });
req.setData(message);
return req.send();
}
kj::Promise<void> send(kj::ArrayPtr<const char> message) override {
auto req = KJ_REQUIRE_NONNULL(out, "already called disconnect()").sendTextRequest(
MessageSize { 8 + message.size() / sizeof(word), 0 });
memcpy(req.initText(message.size()).begin(), message.begin(), message.size());
return req.send();
}
kj::Promise<void> close(uint16_t code, kj::StringPtr reason) override {
auto req = KJ_REQUIRE_NONNULL(out, "already called disconnect()").closeRequest();
req.setCode(code);
req.setReason(reason);
return req.send().ignoreResult();
}
kj::Promise<void> disconnect() override {
out = nullptr;
return kj::READY_NOW;
}
void abort() override {
KJ_ASSERT_NONNULL(in)->abort();
}
kj::Promise<void> whenAborted() override {
return KJ_ASSERT_NONNULL(out).whenResolved()
.then([]() -> kj::Promise<void> {
// It would seem this capability resolved to an implementation of the WebSocket RPC interface
// that does not support further path-shortening (so, it's not the implementation found in
// this file). Since the path-shortening facility is also how we discover disconnects, we
// apparently have no way to be alerted on disconnect. We have to assume the other end
// never aborts.
return kj::NEVER_DONE;
}, [](kj::Exception&& e) -> kj::Promise<void> {
if (e.getType() == kj::Exception::Type::DISCONNECTED) {
// Looks like we were aborted!
return kj::READY_NOW;
} else {
// Some other error... propagate it.
return kj::mv(e);
}
});
}
kj::Promise<Message> receive() override {
return KJ_ASSERT_NONNULL(in)->receive();
}
kj::Promise<void> pumpTo(WebSocket& other) override {
KJ_IF_MAYBE(optimized, kj::dynamicDowncastIfAvailable<KjToCapnpWebSocketAdapter>(other)) {
shorteningFulfiller->fulfill(
kj::cp(KJ_REQUIRE_NONNULL(optimized->out, "already called disconnect()")));
// We expect the `in` pipe will stop receiving messages after the redirect, but we need to
// pump anything already in-flight.
return KJ_ASSERT_NONNULL(in)->pumpTo(other);
} else KJ_IF_MAYBE(promise, other.tryPumpFrom(*this)) {
// We may have unwrapped some layers around `other` leading to a shorter path.
return kj::mv(*promise);
} else {
return KJ_ASSERT_NONNULL(in)->pumpTo(other);
}
}
private:
kj::Maybe<kj::Own<kj::WebSocket>> in; // One end of a WebSocketPipe, used only for receiving.
kj::Maybe<capnp::WebSocket::Client> out; // Used only for sending.
kj::Own<kj::PromiseFulfiller<kj::Promise<Capability::Client>>> shorteningFulfiller;
};
// =======================================================================================
class HttpOverCapnpFactory::ClientRequestContextImpl final
: public capnp::HttpService::ClientRequestContext::Server {
public:
ClientRequestContextImpl(HttpOverCapnpFactory& factory,
kj::Own<RequestState> state,
kj::HttpService::Response& kjResponse)
: factory(factory), state(kj::mv(state)), kjResponse(kjResponse) {}
~ClientRequestContextImpl() noexcept(false) {
// Note this implicitly cancels the upstream pump task.
}
kj::Promise<void> startResponse(StartResponseContext context) override {
KJ_REQUIRE(!sent, "already called startResponse() or startWebSocket()");
sent = true;
state->assertNotCanceled();
auto params = context.getParams();
auto rpcResponse = params.getResponse();
auto bodySize = rpcResponse.getBodySize();
kj::Maybe<uint64_t> expectedSize;
bool hasBody = true;
if (bodySize.isFixed()) {
auto size = bodySize.getFixed();
expectedSize = bodySize.getFixed();
hasBody = size > 0;
}
auto bodyStream = kjResponse.send(rpcResponse.getStatusCode(), rpcResponse.getStatusText(),
factory.headersToKj(rpcResponse.getHeaders()), expectedSize);
auto results = context.getResults(MessageSize { 16, 1 });
if (hasBody) {
auto pipe = kj::newOneWayPipe();
results.setBody(factory.streamFactory.kjToCapnp(kj::mv(pipe.out)));
state->addTask(pipe.in->pumpTo(*bodyStream)
.ignoreResult()
.attach(kj::mv(bodyStream), kj::mv(pipe.in)));
}
return kj::READY_NOW;
}
kj::Promise<void> startWebSocket(StartWebSocketContext context) override {
KJ_REQUIRE(!sent, "already called startResponse() or startWebSocket()");
sent = true;
state->assertNotCanceled();
auto params = context.getParams();
auto shorteningPaf = kj::newPromiseAndFulfiller<kj::Promise<Capability::Client>>();
auto ownWebSocket = kjResponse.acceptWebSocket(factory.headersToKj(params.getHeaders()));
auto& webSocket = *ownWebSocket;
state->holdWebSocket(kj::mv(ownWebSocket));
auto upWrapper = kj::heap<KjToCapnpWebSocketAdapter>(
nullptr, params.getUpSocket(), kj::mv(shorteningPaf.fulfiller));
state->addTask(webSocket.pumpTo(*upWrapper).attach(kj::mv(upWrapper))
.catch_([&webSocket=webSocket](kj::Exception&& e) -> kj::Promise<void> {
// The pump in the client -> server direction failed. The error may have originated from
// either the client or the server. In case it came from the server, we want to call .abort()
// to propagate the problem back to the client. If the error came from the client, then
// .abort() probably is a noop.
webSocket.abort();
return kj::mv(e);
}));
auto results = context.getResults(MessageSize { 16, 1 });
results.setDownSocket(kj::heap<CapnpToKjWebSocketAdapter>(
kj::addRef(*state), webSocket, kj::mv(shorteningPaf.promise)));
return kj::READY_NOW;
}
private:
HttpOverCapnpFactory& factory;
kj::Own<RequestState> state;
bool sent = false;
kj::HttpService::Response& kjResponse;
// Must check state->assertNotCanceled() before using this.
};
class HttpOverCapnpFactory::KjToCapnpHttpServiceAdapter final: public kj::HttpService {
public:
KjToCapnpHttpServiceAdapter(HttpOverCapnpFactory& factory, capnp::HttpService::Client inner)
: factory(factory), inner(kj::mv(inner)) {}
kj::Promise<void> request(
kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders& headers,
kj::AsyncInputStream& requestBody, kj::HttpService::Response& kjResponse) override {
auto rpcRequest = inner.startRequestRequest();
auto metadata = rpcRequest.initRequest();
metadata.setMethod(static_cast<capnp::HttpMethod>(method));
metadata.setUrl(url);
metadata.adoptHeaders(factory.headersToCapnp(
headers, Orphanage::getForMessageContaining(metadata)));
kj::Maybe<kj::AsyncInputStream&> maybeRequestBody;
KJ_IF_MAYBE(s, requestBody.tryGetLength()) {
metadata.getBodySize().setFixed(*s);
if (*s == 0) {
maybeRequestBody = nullptr;
} else {
maybeRequestBody = requestBody;
}
} else if ((method == kj::HttpMethod::GET || method == kj::HttpMethod::HEAD) &&
headers.get(kj::HttpHeaderId::TRANSFER_ENCODING) == nullptr) {
maybeRequestBody = nullptr;
metadata.getBodySize().setFixed(0);
} else {
metadata.getBodySize().setUnknown();
maybeRequestBody = requestBody;
}
auto state = kj::refcounted<RequestState>();
auto deferredCancel = kj::defer([state = kj::addRef(*state)]() mutable {
state->cancel();
});
rpcRequest.setContext(
kj::heap<ClientRequestContextImpl>(factory, kj::addRef(*state), kjResponse));
auto pipeline = rpcRequest.send();
// Pump upstream -- unless we don't expect a request body.
kj::Maybe<kj::Promise<void>> pumpRequestTask;
KJ_IF_MAYBE(rb, maybeRequestBody) {
auto bodyOut = factory.streamFactory.capnpToKj(pipeline.getRequestBody());
pumpRequestTask = rb->pumpTo(*bodyOut).attach(kj::mv(bodyOut)).ignoreResult()
.eagerlyEvaluate([state = kj::addRef(*state)](kj::Exception&& e) mutable {
state->taskFailed(kj::mv(e));
});
}
// Wait for the ServerRequestContext to resolve, which indicates completion. Meanwhile, if the
// promise is canceled from the client side, we drop the ServerRequestContext naturally, and we
// also call state->cancel().
return pipeline.getContext().whenResolved()
// Once the server indicates it is done, then we can cancel pumping the request, because
// obviously the server won't use it. We should not cancel pumping the response since there
// could be data in-flight still.
.attach(kj::mv(pumpRequestTask))
// finishTasks() will wait for the respones to complete.
.then([state = kj::mv(state)]() mutable { return state->finishTasks(); })
.attach(kj::mv(deferredCancel));
}
private:
HttpOverCapnpFactory& factory;
capnp::HttpService::Client inner;
};
kj::Own<kj::HttpService> HttpOverCapnpFactory::capnpToKj(capnp::HttpService::Client rpcService) {
return kj::heap<KjToCapnpHttpServiceAdapter>(*this, kj::mv(rpcService));
}
// =======================================================================================
namespace {
class NullInputStream final: public kj::AsyncInputStream {
// TODO(cleanup): This class has been replicated in a bunch of places now, make it public
// somewhere.
public:
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return size_t(0);
}
kj::Maybe<uint64_t> tryGetLength() override {
return uint64_t(0);
}
kj::Promise<uint64_t> pumpTo(kj::AsyncOutputStream& output, uint64_t amount) override {
return uint64_t(0);
}
};
class NullOutputStream final: public kj::AsyncOutputStream {
// TODO(cleanup): This class has been replicated in a bunch of places now, make it public
// somewhere.
public:
kj::Promise<void> write(const void* buffer, size_t size) override {
return kj::READY_NOW;
}
kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
return kj::READY_NOW;
}
kj::Promise<void> whenWriteDisconnected() override {
return kj::NEVER_DONE;
}
// We can't really optimize tryPumpFrom() unless AsyncInputStream grows a skip() method.
};
class ResolvedServerRequestContext final: public capnp::HttpService::ServerRequestContext::Server {
public:
// Nothing! It's done.
};
} // namespace
class HttpOverCapnpFactory::ServerRequestContextImpl final
: public capnp::HttpService::ServerRequestContext::Server,
public kj::HttpService::Response {
public:
ServerRequestContextImpl(HttpOverCapnpFactory& factory,
capnp::HttpRequest::Reader request,
capnp::HttpService::ClientRequestContext::Client clientContext,
kj::Own<kj::AsyncInputStream> requestBodyIn,
kj::HttpService& kjService)
: factory(factory),
method(validateMethod(request.getMethod())),
headers(factory.headersToKj(request.getHeaders()).clone()),
clientContext(kj::mv(clientContext)),
// Note we attach `requestBodyIn` to `task` so that we will implicitly cancel reading
// the request body as soon as the service returns. This is important in particular when
// the request body is not fully consumed, in order to propagate cancellation.
task(kjService.request(method, request.getUrl(), headers, *requestBodyIn, *this)
.attach(kj::mv(requestBodyIn))) {}
KJ_DISALLOW_COPY(ServerRequestContextImpl);
kj::Maybe<kj::Promise<Capability::Client>> shortenPath() override {
return task.then([this]() -> kj::Promise<void> {
// Merge in any errors from our reply call, so that they propagate somewhere.
return kj::mv(KJ_REQUIRE_NONNULL(replyTask,
"server never called send() or acceptWebSocket()"));
}).then([]() -> Capability::Client {
// If all went well, resolve to a settled capability.
// TODO(perf): Could save a message by resolving to a capability hosted by the client, or
// some special "null" capability that isn't an error but is still transmitted by value.
// Otherwise we need a Release message from client -> server just to drop this...
return kj::heap<ResolvedServerRequestContext>();
});
}
kj::Own<kj::AsyncOutputStream> send(
uint statusCode, kj::StringPtr statusText, const kj::HttpHeaders& headers,
kj::Maybe<uint64_t> expectedBodySize = nullptr) override {
KJ_REQUIRE(replyTask == nullptr, "already called send() or acceptWebSocket()");
auto req = clientContext.startResponseRequest();
if (method == kj::HttpMethod::HEAD ||
statusCode == 204 || statusCode == 205 || statusCode == 304) {
expectedBodySize = uint64_t(0);
}
auto rpcResponse = req.initResponse();
rpcResponse.setStatusCode(statusCode);
rpcResponse.setStatusText(statusText);
rpcResponse.adoptHeaders(factory.headersToCapnp(
headers, Orphanage::getForMessageContaining(rpcResponse)));
bool hasBody = true;
KJ_IF_MAYBE(s, expectedBodySize) {
rpcResponse.getBodySize().setFixed(*s);
hasBody = *s > 0;
}
if (hasBody) {
auto pipeline = req.send();
auto result = factory.streamFactory.capnpToKj(pipeline.getBody());
replyTask = pipeline.ignoreResult();
return result;
} else {
replyTask = req.send().ignoreResult();
return kj::heap<NullOutputStream>();
}
}
kj::Own<kj::WebSocket> acceptWebSocket(const kj::HttpHeaders& headers) override {
KJ_REQUIRE(replyTask == nullptr, "already called send() or acceptWebSocket()");
auto req = clientContext.startWebSocketRequest();
req.adoptHeaders(factory.headersToCapnp(
headers, Orphanage::getForMessageContaining(
capnp::HttpService::ClientRequestContext::StartWebSocketParams::Builder(req))));
auto pipe = kj::newWebSocketPipe();
auto shorteningPaf = kj::newPromiseAndFulfiller<kj::Promise<Capability::Client>>();
// We don't need the RequestState mechanism on the server side because
// CapnpToKjWebSocketAdapter wraps a pipe end, and that pipe end can continue to exist beyond
// the lifetime of the request, because the other end will have been dropped. We only create
// a RequestState here so that we can reuse the implementation of CapnpToKjWebSocketAdapter
// that needs this for the client side.
auto dummyState = kj::refcounted<RequestState>();
auto& pipeEnd0Ref = *pipe.ends[0];
dummyState->holdWebSocket(kj::mv(pipe.ends[0]));
req.setUpSocket(kj::heap<CapnpToKjWebSocketAdapter>(
kj::mv(dummyState), pipeEnd0Ref, kj::mv(shorteningPaf.promise)));
auto pipeline = req.send();
auto result = kj::heap<KjToCapnpWebSocketAdapter>(
kj::mv(pipe.ends[1]), pipeline.getDownSocket(), kj::mv(shorteningPaf.fulfiller));
// Note we need eagerlyEvaluate() here to force proactively discarding the response object,
// since it holds a reference to `downSocket`.
replyTask = pipeline.ignoreResult().eagerlyEvaluate(nullptr);
return result;
}
private:
HttpOverCapnpFactory& factory;
kj::HttpMethod method;
kj::HttpHeaders headers;
capnp::HttpService::ClientRequestContext::Client clientContext;
kj::Promise<void> task;
kj::Maybe<kj::Promise<void>> replyTask;
static kj::HttpMethod validateMethod(capnp::HttpMethod method) {
KJ_REQUIRE(method <= capnp::HttpMethod::UNSUBSCRIBE, "unknown method", method);
return static_cast<kj::HttpMethod>(method);
}
};
class HttpOverCapnpFactory::CapnpToKjHttpServiceAdapter final: public capnp::HttpService::Server {
public:
CapnpToKjHttpServiceAdapter(HttpOverCapnpFactory& factory, kj::Own<kj::HttpService> inner)
: factory(factory), inner(kj::mv(inner)) {}
kj::Promise<void> startRequest(StartRequestContext context) override {
auto params = context.getParams();
auto metadata = params.getRequest();
auto bodySize = metadata.getBodySize();
kj::Maybe<uint64_t> expectedSize;
bool hasBody = true;
if (bodySize.isFixed()) {
auto size = bodySize.getFixed();
expectedSize = bodySize.getFixed();
hasBody = size > 0;
}
auto results = context.getResults(MessageSize {8, 2});
kj::Own<kj::AsyncInputStream> requestBody;
if (hasBody) {
auto pipe = kj::newOneWayPipe(expectedSize);
results.setRequestBody(factory.streamFactory.kjToCapnp(kj::mv(pipe.out)));
requestBody = kj::mv(pipe.in);
} else {
requestBody = kj::heap<NullInputStream>();
}
results.setContext(kj::heap<ServerRequestContextImpl>(
factory, metadata, params.getContext(), kj::mv(requestBody), *inner));
return kj::READY_NOW;
}
private:
HttpOverCapnpFactory& factory;
kj::Own<kj::HttpService> inner;
};
capnp::HttpService::Client HttpOverCapnpFactory::kjToCapnp(kj::Own<kj::HttpService> service) {
return kj::heap<CapnpToKjHttpServiceAdapter>(*this, kj::mv(service));
}
// =======================================================================================
static constexpr uint64_t COMMON_TEXT_ANNOTATION = 0x857745131db6fc83ull;
// Type ID of `commonText` from `http.capnp`.
// TODO(cleanup): Cap'n Proto should auto-generate constants for these.
HttpOverCapnpFactory::HttpOverCapnpFactory(ByteStreamFactory& streamFactory,
kj::HttpHeaderTable::Builder& headerTableBuilder)
: streamFactory(streamFactory), headerTable(headerTableBuilder.getFutureTable()) {
auto commonHeaderNames = Schema::from<capnp::CommonHeaderName>().getEnumerants();
size_t maxHeaderId = 0;
nameCapnpToKj = kj::heapArray<kj::HttpHeaderId>(commonHeaderNames.size());
for (size_t i = 1; i < commonHeaderNames.size(); i++) {
kj::StringPtr nameText;
for (auto ann: commonHeaderNames[i].getProto().getAnnotations()) {
if (ann.getId() == COMMON_TEXT_ANNOTATION) {
nameText = ann.getValue().getText();
break;
}
}
KJ_ASSERT(nameText != nullptr);
kj::HttpHeaderId headerId = headerTableBuilder.add(nameText);
nameCapnpToKj[i] = headerId;
maxHeaderId = kj::max(maxHeaderId, headerId.hashCode());
}
nameKjToCapnp = kj::heapArray<capnp::CommonHeaderName>(maxHeaderId + 1);
for (auto& slot: nameKjToCapnp) slot = capnp::CommonHeaderName::INVALID;
for (size_t i = 1; i < commonHeaderNames.size(); i++) {
auto& slot = nameKjToCapnp[nameCapnpToKj[i].hashCode()];
KJ_ASSERT(slot == capnp::CommonHeaderName::INVALID);
slot = static_cast<capnp::CommonHeaderName>(i);
}
auto commonHeaderValues = Schema::from<capnp::CommonHeaderValue>().getEnumerants();
valueCapnpToKj = kj::heapArray<kj::StringPtr>(commonHeaderValues.size());
for (size_t i = 1; i < commonHeaderValues.size(); i++) {
kj::StringPtr valueText;
for (auto ann: commonHeaderValues[i].getProto().getAnnotations()) {
if (ann.getId() == COMMON_TEXT_ANNOTATION) {
valueText = ann.getValue().getText();
break;
}
}
KJ_ASSERT(valueText != nullptr);
valueCapnpToKj[i] = valueText;
valueKjToCapnp.insert(valueText, static_cast<capnp::CommonHeaderValue>(i));
}
}
Orphan<List<capnp::HttpHeader>> HttpOverCapnpFactory::headersToCapnp(
const kj::HttpHeaders& headers, Orphanage orphanage) {
auto result = orphanage.newOrphan<List<capnp::HttpHeader>>(headers.size());
auto rpcHeaders = result.get();
uint i = 0;
headers.forEach([&](kj::HttpHeaderId id, kj::StringPtr value) {
auto capnpName = id.hashCode() < nameKjToCapnp.size()
? nameKjToCapnp[id.hashCode()]
: capnp::CommonHeaderName::INVALID;
if (capnpName == capnp::CommonHeaderName::INVALID) {
auto header = rpcHeaders[i++].initUncommon();
header.setName(id.toString());
header.setValue(value);
} else {
auto header = rpcHeaders[i++].initCommon();
header.setName(capnpName);
header.setValue(value);
}
}, [&](kj::StringPtr name, kj::StringPtr value) {
auto header = rpcHeaders[i++].initUncommon();
header.setName(name);
header.setValue(value);
});
KJ_ASSERT(i == rpcHeaders.size());
return result;
}
kj::HttpHeaders HttpOverCapnpFactory::headersToKj(
List<capnp::HttpHeader>::Reader capnpHeaders) const {
kj::HttpHeaders result(headerTable);
for (auto header: capnpHeaders) {
switch (header.which()) {
case capnp::HttpHeader::COMMON: {
auto nv = header.getCommon();
auto nameInt = static_cast<uint>(nv.getName());
KJ_REQUIRE(nameInt < nameCapnpToKj.size(), "unknown common header name", nv.getName());
switch (nv.which()) {
case capnp::HttpHeader::Common::COMMON_VALUE: {
auto cvInt = static_cast<uint>(nv.getCommonValue());
KJ_REQUIRE(nameInt < valueCapnpToKj.size(),
"unknown common header value", nv.getCommonValue());
result.set(nameCapnpToKj[nameInt], valueCapnpToKj[cvInt]);
break;
}
case capnp::HttpHeader::Common::VALUE:
result.set(nameCapnpToKj[nameInt], nv.getValue());
break;
}
break;
}
case capnp::HttpHeader::UNCOMMON: {
auto nv = header.getUncommon();
result.add(nv.getName(), nv.getValue());
}
}
}
return result;
}
} // namespace capnp
# Copyright (c) 2019 Cloudflare, Inc. and contributors
# Licensed under the MIT License:
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
@0xb665280aaff2e632;
# Cap'n Proto interface for HTTP.
using import "byte-stream.capnp".ByteStream;
$import "/capnp/c++.capnp".namespace("capnp");
interface HttpService {
startRequest @0 (request :HttpRequest, context :ClientRequestContext)
-> (requestBody :ByteStream, context :ServerRequestContext);
# Begin an HTTP request.
#
# The client sends the request method/url/headers. The server responds with a `ByteStream` where
# the client can make calls to stream up the request body. `requestBody` will be null in the case
# that request.bodySize.fixed == 0.
interface ClientRequestContext {
# Provides callbacks for the server to send the response.
startResponse @0 (response :HttpResponse) -> (body :ByteStream);
# Server calls this method to send the response status and headers and to begin streaming the
# response body. `body` will be null in the case that response.bodySize.fixed == 0, which is
# required for HEAD responses and status codes 204, 205, and 304.
startWebSocket @1 (headers :List(HttpHeader), upSocket :WebSocket)
-> (downSocket :WebSocket);
# Server calls this method to indicate that the request is a valid WebSocket handshake and it
# wishes to accept it as a WebSocket.
#
# Client -> Server WebSocket frames will be sent via method calls on `upSocket`, while
# Server -> Client will be sent as calls to `downSocket`.
}
interface ServerRequestContext {
# Represents execution of a particular request on the server side.
#
# Dropping this object before the request completes will cancel the request.
#
# ServerRequestContext is always a promise capability. The client must wait for it to
# resolve using whenMoreResolved() in order to find out when the server is really done
# processing the request. This will throw an exception if the server failed in some way that
# could not be captured in the HTTP response. Note that it's possible for such an exception to
# be thrown even after the response body has been completely transmitted.
}
}
interface WebSocket {
sendText @0 (text :Text) -> stream;
sendData @1 (data :Data) -> stream;
# Send a text or data frame.
close @2 (code :UInt16, reason :Text);
# Send a close frame.
}
struct HttpRequest {
# Standard HTTP request metadata.
method @0 :HttpMethod;
url @1 :Text;
headers @2 :List(HttpHeader);
bodySize :union {
unknown @3 :Void; # e.g. due to transfer-encoding: chunked
fixed @4 :UInt64; # e.g. due to content-length
}
}
struct HttpResponse {
# Standard HTTP response metadata.
statusCode @0 :UInt16;
statusText @1 :Text; # leave null if it matches the default for statusCode
headers @2 :List(HttpHeader);
bodySize :union {
unknown @3 :Void; # e.g. due to transfer-encoding: chunked
fixed @4 :UInt64; # e.g. due to content-length
}
}
enum HttpMethod {
# This enum aligns precisely with the kj::HttpMethod enum. However, the backwards-compat
# constraints of a public-facing C++ enum vs. an internal Cap'n Proto interface differ in
# several ways, which could possibly lead to divergence someday. For now, a unit test verifies
# that they match exactly; if that test ever fails, we'll have to figure out what to do about it.
get @0;
head @1;
post @2;
put @3;
delete @4;
patch @5;
purge @6;
options @7;
trace @8;
copy @9;
lock @10;
mkcol @11;
move @12;
propfind @13;
proppatch @14;
search @15;
unlock @16;
acl @17;
report @18;
mkactivity @19;
checkout @20;
merge @21;
msearch @22;
notify @23;
subscribe @24;
unsubscribe @25;
}
annotation commonText @0x857745131db6fc83(enumerant) :Text;
enum CommonHeaderName {
invalid @0;
# Dummy to serve as default value. Should never actually appear on wire.
acceptCharset @1 $commonText("Accept-Charset");
acceptEncoding @2 $commonText("Accept-Encoding");
acceptLanguage @3 $commonText("Accept-Language");
acceptRanges @4 $commonText("Accept-Ranges");
accept @5 $commonText("Accept");
accessControlAllowOrigin @6 $commonText("Access-Control-Allow-Origin");
age @7 $commonText("Age");
allow @8 $commonText("Allow");
authorization @9 $commonText("Authorization");
cacheControl @10 $commonText("Cache-Control");
contentDisposition @11 $commonText("Content-Disposition");
contentEncoding @12 $commonText("Content-Encoding");
contentLanguage @13 $commonText("Content-Language");
contentLength @14 $commonText("Content-Length");
contentLocation @15 $commonText("Content-Location");
contentRange @16 $commonText("Content-Range");
contentType @17 $commonText("Content-Type");
cookie @18 $commonText("Cookie");
date @19 $commonText("Date");
etag @20 $commonText("ETag");
expect @21 $commonText("Expect");
expires @22 $commonText("Expires");
from @23 $commonText("From");
host @24 $commonText("Host");
ifMatch @25 $commonText("If-Match");
ifModifiedSince @26 $commonText("If-Modified-Since");
ifNoneMatch @27 $commonText("If-None-Match");
ifRange @28 $commonText("If-Range");
ifUnmodifiedSince @29 $commonText("If-Unmodified-Since");
lastModified @30 $commonText("Last-Modified");
link @31 $commonText("Link");
location @32 $commonText("Location");
maxForwards @33 $commonText("Max-Forwards");
proxyAuthenticate @34 $commonText("Proxy-Authenticate");
proxyAuthorization @35 $commonText("Proxy-Authorization");
range @36 $commonText("Range");
referer @37 $commonText("Referer");
refresh @38 $commonText("Refresh");
retryAfter @39 $commonText("Retry-After");
server @40 $commonText("Server");
setCookie @41 $commonText("Set-Cookie");
strictTransportSecurity @42 $commonText("Strict-Transport-Security");
transferEncoding @43 $commonText("Transfer-Encoding");
userAgent @44 $commonText("User-Agent");
vary @45 $commonText("Vary");
via @46 $commonText("Via");
wwwAuthenticate @47 $commonText("WWW-Authenticate");
}
enum CommonHeaderValue {
invalid @0;
gzipDeflate @1 $commonText("gzip, deflate");
# TODO(someday): "gzip, deflate" is the only common header value recognized by HPACK.
}
struct HttpHeader {
union {
common :group {
name @0 :CommonHeaderName;
union {
commonValue @1 :CommonHeaderValue;
value @2 :Text;
}
}
uncommon @3 :NameValue;
}
struct NameValue {
name @0 :Text;
value @1 :Text;
}
}
// Copyright (c) 2019 Cloudflare, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#pragma once
// Bridges from KJ HTTP to Cap'n Proto HTTP-over-RPC.
#include <capnp/compat/http-over-capnp.capnp.h>
#include <kj/compat/http.h>
#include <kj/map.h>
#include "byte-stream.h"
namespace capnp {
class HttpOverCapnpFactory {
public:
HttpOverCapnpFactory(ByteStreamFactory& streamFactory,
kj::HttpHeaderTable::Builder& headerTableBuilder);
kj::Own<kj::HttpService> capnpToKj(capnp::HttpService::Client rpcService);
capnp::HttpService::Client kjToCapnp(kj::Own<kj::HttpService> service);
private:
ByteStreamFactory& streamFactory;
kj::HttpHeaderTable& headerTable;
kj::Array<capnp::CommonHeaderName> nameKjToCapnp;
kj::Array<kj::HttpHeaderId> nameCapnpToKj;
kj::Array<kj::StringPtr> valueCapnpToKj;
kj::HashMap<kj::StringPtr, capnp::CommonHeaderValue> valueKjToCapnp;
class RequestState;
class CapnpToKjWebSocketAdapter;
class KjToCapnpWebSocketAdapter;
class ClientRequestContextImpl;
class KjToCapnpHttpServiceAdapter;
class ServerRequestContextImpl;
class CapnpToKjHttpServiceAdapter;
kj::HttpHeaders headersToKj(capnp::List<capnp::HttpHeader>::Reader capnpHeaders) const;
// Returned headers may alias into `capnpHeaders`.
capnp::Orphan<capnp::List<capnp::HttpHeader>> headersToCapnp(
const kj::HttpHeaders& headers, capnp::Orphanage orphanage);
};
} // namespace capnp
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment