Commit 8f5d1f10 authored by Kenton Varda's avatar Kenton Varda

Add HTTP client and server implementation.

Properties:
- Presented as a LIBRARY, designed to be unopinionated about the application using it.
- Uses KJ async framework.
- Header parsing is zero-copy. The whole header block is read into a contiguous buffer, then parsed all at once. Avoids complicated state machinery (and is probably pretty fast).
- Known headers are parsed to numeric identifiers so that the application doesn't need to look them up by string name. The app registers all headers it is interested in upfront, receiving numeric IDs for each. Some common headers also have pre-defined constants, avoiding the need for registration.
- Connection-level headers (e.g. Content-Length, Transfer-Encoding) are handled entirely internally.
- WebSocket support (planned).

Not done yet:
- Implement the version of HttpClient that connects to new servers as-needed, managing a pool of connections. Currently I've only implemented the version that takes a pre-existing connection and speaks HTTP on it.
- Implement WebSockets.
- Implement plugable transfer encodings (although I guess Chrome doesn't even support transfer encodings other than chunked; maybe it's a lost cause).
- Implement HTTP/2, hopefully transparently (... someday).
parent 86410ee7
// Copyright (c) 2017 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#include "http.h"
#include <kj/debug.h>
#include <kj/test.h>
#include <map>
namespace kj {
namespace {
KJ_TEST("HttpMethod parse / stringify") {
#define TRY(name) \
KJ_EXPECT(kj::str(HttpMethod::name) == #name); \
KJ_IF_MAYBE(parsed, tryParseHttpMethod(#name)) { \
KJ_EXPECT(*parsed == HttpMethod::name); \
} else { \
KJ_FAIL_EXPECT("couldn't parse \"" #name "\" as HttpMethod"); \
}
KJ_HTTP_FOR_EACH_METHOD(TRY)
#undef TRY
KJ_EXPECT(tryParseHttpMethod("FOO") == nullptr);
KJ_EXPECT(tryParseHttpMethod("") == nullptr);
KJ_EXPECT(tryParseHttpMethod("G") == nullptr);
KJ_EXPECT(tryParseHttpMethod("GE") == nullptr);
KJ_EXPECT(tryParseHttpMethod("GET ") == nullptr);
KJ_EXPECT(tryParseHttpMethod("get") == nullptr);
}
KJ_TEST("HttpHeaderTable") {
HttpHeaderTable::Builder builder;
auto host = builder.add("Host");
auto host2 = builder.add("hOsT");
auto fooBar = builder.add("Foo-Bar");
auto bazQux = builder.add("baz-qux");
auto bazQux2 = builder.add("Baz-Qux");
auto table = builder.build();
uint builtinHeaderCount = 0;
#define INCREMENT(id, name) ++builtinHeaderCount;
KJ_HTTP_FOR_EACH_BUILTIN_HEADER(INCREMENT)
#undef INCREMENT
KJ_EXPECT(table->idCount() == builtinHeaderCount + 2);
KJ_EXPECT(host == HttpHeaderId::HOST);
KJ_EXPECT(host != HttpHeaderId::DATE);
KJ_EXPECT(host2 == host);
KJ_EXPECT(host != fooBar);
KJ_EXPECT(host != bazQux);
KJ_EXPECT(fooBar != bazQux);
KJ_EXPECT(bazQux == bazQux2);
KJ_EXPECT(kj::str(host) == "Host");
KJ_EXPECT(kj::str(host2) == "Host");
KJ_EXPECT(kj::str(fooBar) == "Foo-Bar");
KJ_EXPECT(kj::str(bazQux) == "baz-qux");
KJ_EXPECT(kj::str(HttpHeaderId::HOST) == "Host");
KJ_EXPECT(table->idToString(HttpHeaderId::DATE) == "Date");
KJ_EXPECT(table->idToString(fooBar) == "Foo-Bar");
KJ_EXPECT(KJ_ASSERT_NONNULL(table->stringToId("Date")) == HttpHeaderId::DATE);
KJ_EXPECT(KJ_ASSERT_NONNULL(table->stringToId("dATE")) == HttpHeaderId::DATE);
KJ_EXPECT(KJ_ASSERT_NONNULL(table->stringToId("Foo-Bar")) == fooBar);
KJ_EXPECT(KJ_ASSERT_NONNULL(table->stringToId("foo-BAR")) == fooBar);
KJ_EXPECT(table->stringToId("foobar") == nullptr);
KJ_EXPECT(table->stringToId("barfoo") == nullptr);
}
KJ_TEST("HttpHeaders::parseRequest") {
HttpHeaderTable::Builder builder;
auto fooBar = builder.add("Foo-Bar");
auto bazQux = builder.add("baz-qux");
auto table = builder.build();
HttpHeaders headers(*table);
auto text = kj::heapString(
"POST /some/path \t HTTP/1.1\r\n"
"Foo-BaR: Baz\r\n"
"Host: example.com\r\n"
"Content-Length: 123\r\n"
"DATE: early\r\n"
"other-Header: yep\r\n"
"\r\n");
auto result = KJ_ASSERT_NONNULL(headers.tryParseRequest(text.asArray()));
KJ_EXPECT(result.method == HttpMethod::POST);
KJ_EXPECT(result.url == "/some/path");
KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::HOST)) == "example.com");
KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::DATE)) == "early");
KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(fooBar)) == "Baz");
KJ_EXPECT(headers.get(bazQux) == nullptr);
KJ_EXPECT(headers.get(HttpHeaderId::CONTENT_TYPE) == nullptr);
KJ_EXPECT(result.connectionHeaders.contentLength == "123");
KJ_EXPECT(result.connectionHeaders.transferEncoding == nullptr);
std::map<kj::StringPtr, kj::StringPtr> unpackedHeaders;
headers.forEach([&](kj::StringPtr name, kj::StringPtr value) {
KJ_EXPECT(unpackedHeaders.insert(std::make_pair(name, value)).second);
});
KJ_EXPECT(unpackedHeaders.size() == 4);
KJ_EXPECT(unpackedHeaders["Host"] == "example.com");
KJ_EXPECT(unpackedHeaders["Date"] == "early");
KJ_EXPECT(unpackedHeaders["Foo-Bar"] == "Baz");
KJ_EXPECT(unpackedHeaders["other-Header"] == "yep");
KJ_EXPECT(headers.serializeRequest(result.method, result.url, result.connectionHeaders) ==
"POST /some/path HTTP/1.1\r\n"
"Content-Length: 123\r\n"
"Host: example.com\r\n"
"Date: early\r\n"
"Foo-Bar: Baz\r\n"
"other-Header: yep\r\n"
"\r\n");
}
KJ_TEST("HttpHeaders::parseResponse") {
HttpHeaderTable::Builder builder;
auto fooBar = builder.add("Foo-Bar");
auto bazQux = builder.add("baz-qux");
auto table = builder.build();
HttpHeaders headers(*table);
auto text = kj::heapString(
"HTTP/1.1\t\t 418\t I'm a teapot\r\n"
"Foo-BaR: Baz\r\n"
"Host: example.com\r\n"
"Content-Length: 123\r\n"
"DATE: early\r\n"
"other-Header: yep\r\n"
"\r\n");
auto result = KJ_ASSERT_NONNULL(headers.tryParseResponse(text.asArray()));
KJ_EXPECT(result.statusCode == 418);
KJ_EXPECT(result.statusText == "I'm a teapot");
KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::HOST)) == "example.com");
KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::DATE)) == "early");
KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(fooBar)) == "Baz");
KJ_EXPECT(headers.get(bazQux) == nullptr);
KJ_EXPECT(headers.get(HttpHeaderId::CONTENT_TYPE) == nullptr);
KJ_EXPECT(result.connectionHeaders.contentLength == "123");
KJ_EXPECT(result.connectionHeaders.transferEncoding == nullptr);
std::map<kj::StringPtr, kj::StringPtr> unpackedHeaders;
headers.forEach([&](kj::StringPtr name, kj::StringPtr value) {
KJ_EXPECT(unpackedHeaders.insert(std::make_pair(name, value)).second);
});
KJ_EXPECT(unpackedHeaders.size() == 4);
KJ_EXPECT(unpackedHeaders["Host"] == "example.com");
KJ_EXPECT(unpackedHeaders["Date"] == "early");
KJ_EXPECT(unpackedHeaders["Foo-Bar"] == "Baz");
KJ_EXPECT(unpackedHeaders["other-Header"] == "yep");
KJ_EXPECT(headers.serializeResponse(
result.statusCode, result.statusText, result.connectionHeaders) ==
"HTTP/1.1 418 I'm a teapot\r\n"
"Content-Length: 123\r\n"
"Host: example.com\r\n"
"Date: early\r\n"
"Foo-Bar: Baz\r\n"
"other-Header: yep\r\n"
"\r\n");
}
KJ_TEST("HttpHeaders parse invalid") {
auto table = HttpHeaderTable::Builder().build();
HttpHeaders headers(*table);
// NUL byte in request.
KJ_EXPECT(headers.tryParseRequest(kj::heapString(
"POST \0 /some/path \t HTTP/1.1\r\n"
"Foo-BaR: Baz\r\n"
"Host: example.com\r\n"
"DATE: early\r\n"
"other-Header: yep\r\n"
"\r\n")) == nullptr);
// Control character in header name.
KJ_EXPECT(headers.tryParseRequest(kj::heapString(
"POST /some/path \t HTTP/1.1\r\n"
"Foo-BaR: Baz\r\n"
"Cont\001ent-Length: 123\r\n"
"DATE: early\r\n"
"other-Header: yep\r\n"
"\r\n")) == nullptr);
// Separator character in header name.
KJ_EXPECT(headers.tryParseRequest(kj::heapString(
"POST /some/path \t HTTP/1.1\r\n"
"Foo-BaR: Baz\r\n"
"Host: example.com\r\n"
"DATE/: early\r\n"
"other-Header: yep\r\n"
"\r\n")) == nullptr);
// Response status code not numeric.
KJ_EXPECT(headers.tryParseResponse(kj::heapString(
"HTTP/1.1\t\t abc\t I'm a teapot\r\n"
"Foo-BaR: Baz\r\n"
"Host: example.com\r\n"
"DATE: early\r\n"
"other-Header: yep\r\n"
"\r\n")) == nullptr);
}
// =======================================================================================
class ReadFragmenter final: public kj::AsyncIoStream {
public:
ReadFragmenter(AsyncIoStream& inner, size_t limit): inner(inner), limit(limit) {}
Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override {
return inner.read(buffer, minBytes, kj::max(minBytes, kj::min(limit, maxBytes)));
}
Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return inner.tryRead(buffer, minBytes, kj::max(minBytes, kj::min(limit, maxBytes)));
}
Maybe<uint64_t> tryGetLength() override { return inner.tryGetLength(); }
Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override {
return inner.pumpTo(output, amount);
}
Promise<void> write(const void* buffer, size_t size) override {
return inner.write(buffer, size);
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
return inner.write(pieces);
}
Maybe<Promise<uint64_t>> tryPumpFrom(AsyncInputStream& input, uint64_t amount) override {
return inner.tryPumpFrom(input, amount);
}
void shutdownWrite() override {
return inner.shutdownWrite();
}
void abortRead() override { return inner.abortRead(); }
void getsockopt(int level, int option, void* value, uint* length) override {
return inner.getsockopt(level, option, value, length);
}
void setsockopt(int level, int option, const void* value, uint length) override {
return inner.setsockopt(level, option, value, length);
}
void getsockname(struct sockaddr* addr, uint* length) override {
return inner.getsockname(addr, length);
}
void getpeername(struct sockaddr* addr, uint* length) override {
return inner.getsockname(addr, length);
}
private:
kj::AsyncIoStream& inner;
size_t limit;
};
template <typename T>
class InitializeableArray: public Array<T> {
public:
InitializeableArray(std::initializer_list<T> init)
: Array<T>(kj::heapArray(init)) {}
};
enum Side { BOTH, CLIENT_ONLY, SERVER_ONLY };
struct HeaderTestCase {
HttpHeaderId id;
kj::StringPtr value;
};
struct HttpRequestTestCase {
kj::StringPtr raw;
HttpMethod method;
kj::StringPtr path;
InitializeableArray<HeaderTestCase> requestHeaders;
kj::Maybe<uint64_t> requestBodySize;
InitializeableArray<kj::StringPtr> requestBodyParts;
Side side = BOTH;
};
struct HttpResponseTestCase {
kj::StringPtr raw;
uint64_t statusCode;
kj::StringPtr statusText;
InitializeableArray<HeaderTestCase> responseHeaders;
kj::Maybe<uint64_t> responseBodySize;
InitializeableArray<kj::StringPtr> responseBodyParts;
HttpMethod method = HttpMethod::GET;
Side side = BOTH;
};
struct HttpTestCase {
HttpRequestTestCase request;
HttpResponseTestCase response;
};
kj::Promise<void> writeEach(kj::AsyncOutputStream& out, kj::ArrayPtr<const kj::StringPtr> parts) {
if (parts.size() == 0) return kj::READY_NOW;
return out.write(parts[0].begin(), parts[0].size())
.then([&out,parts]() {
return writeEach(out, parts.slice(1, parts.size()));
});
}
kj::Promise<void> expectRead(kj::AsyncInputStream& in, kj::StringPtr expected) {
if (expected.size() == 0) return kj::READY_NOW;
auto buffer = kj::heapArray<char>(expected.size());
auto promise = in.read(buffer.begin(), 1, buffer.size());
return promise.then(kj::mvCapture(buffer, [&in,expected](kj::Array<char> buffer, size_t amount) {
auto actual = buffer.slice(0, amount);
if (memcmp(actual.begin(), expected.begin(), actual.size()) != 0) {
KJ_FAIL_ASSERT("data from stream doesn't match expected", expected, actual);
}
return expectRead(in, expected.slice(amount));
}));
}
void testHttpClientRequest(kj::AsyncIoContext& io, const HttpRequestTestCase& testCase) {
auto pipe = io.provider->newTwoWayPipe();
auto serverTask = expectRead(*pipe.ends[1], testCase.raw).then([&]() {
static const char SIMPLE_RESPONSE[] =
"HTTP/1.1 200 OK\r\n"
"Content-Length: 0\r\n"
"\r\n";
return pipe.ends[1]->write(SIMPLE_RESPONSE, strlen(SIMPLE_RESPONSE));
}).then([&]() -> kj::Promise<void> {
return kj::NEVER_DONE;
});
HttpHeaderTable table;
auto client = newHttpClient(table, *pipe.ends[0]);
HttpHeaders headers(table);
for (auto& header: testCase.requestHeaders) {
headers.set(header.id, header.value);
}
auto request = client->request(testCase.method, testCase.path, headers, testCase.requestBodySize);
if (testCase.requestBodyParts.size() > 0) {
writeEach(*request.body, testCase.requestBodyParts).wait(io.waitScope);
}
request.body = nullptr;
auto clientTask = request.response
.then([&](HttpClient::Response&& response) {
auto promise = response.body->readAllText();
return promise.attach(kj::mv(response.body));
}).ignoreResult();
serverTask.exclusiveJoin(kj::mv(clientTask)).wait(io.waitScope);
// Verify no more data written by client.
client = nullptr;
pipe.ends[0]->shutdownWrite();
KJ_EXPECT(pipe.ends[1]->readAllText().wait(io.waitScope) == "");
}
void testHttpClientResponse(kj::AsyncIoContext& io, const HttpResponseTestCase& testCase,
size_t readFragmentSize) {
auto pipe = io.provider->newTwoWayPipe();
ReadFragmenter fragmenter(*pipe.ends[0], readFragmentSize);
auto expectedReqText = testCase.method == HttpMethod::GET || testCase.method == HttpMethod::HEAD
? kj::str(testCase.method, " / HTTP/1.1\r\n\r\n")
: kj::str(testCase.method, " / HTTP/1.1\r\nContent-Length: 0\r\n");
auto serverTask = expectRead(*pipe.ends[1], expectedReqText).then([&]() {
return pipe.ends[1]->write(testCase.raw.begin(), testCase.raw.size());
}).then([&]() -> kj::Promise<void> {
pipe.ends[1]->shutdownWrite();
return kj::NEVER_DONE;
});
HttpHeaderTable table;
auto client = newHttpClient(table, fragmenter);
HttpHeaders headers(table);
auto request = client->request(testCase.method, "/", headers, size_t(0));
request.body = nullptr;
auto clientTask = request.response
.then([&](HttpClient::Response&& response) {
KJ_EXPECT(response.statusCode == testCase.statusCode);
KJ_EXPECT(response.statusText == testCase.statusText);
for (auto& header: testCase.responseHeaders) {
KJ_EXPECT(KJ_ASSERT_NONNULL(response.headers->get(header.id)) == header.value);
}
auto promise = response.body->readAllText();
return promise.attach(kj::mv(response.body));
}).then([&](kj::String body) {
KJ_EXPECT(body == kj::strArray(testCase.responseBodyParts, ""), body);
});
serverTask.exclusiveJoin(kj::mv(clientTask)).wait(io.waitScope);
// Verify no more data written by client.
client = nullptr;
pipe.ends[0]->shutdownWrite();
KJ_EXPECT(pipe.ends[1]->readAllText().wait(io.waitScope) == "");
}
class TestHttpService final: public HttpService {
public:
TestHttpService(const HttpRequestTestCase& expectedRequest,
const HttpResponseTestCase& response,
HttpHeaderTable& table)
: singleExpectedRequest(&expectedRequest),
singleResponse(&response),
responseHeaders(table) {}
TestHttpService(kj::ArrayPtr<const HttpTestCase> testCases,
HttpHeaderTable& table)
: singleExpectedRequest(nullptr),
singleResponse(nullptr),
testCases(testCases),
responseHeaders(table) {}
uint getRequestCount() { return requestCount; }
kj::Promise<void> request(
HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::AsyncInputStream& requestBody, Response& responseSender) override {
auto& expectedRequest = testCases == nullptr ? *singleExpectedRequest :
testCases[requestCount % testCases.size()].request;
auto& response = testCases == nullptr ? *singleResponse :
testCases[requestCount % testCases.size()].response;
++requestCount;
KJ_EXPECT(method == expectedRequest.method, method);
KJ_EXPECT(url == expectedRequest.path, url);
for (auto& header: expectedRequest.requestHeaders) {
KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(header.id)) == header.value);
}
auto size = requestBody.tryGetLength();
KJ_IF_MAYBE(expectedSize, expectedRequest.requestBodySize) {
KJ_IF_MAYBE(s, size) {
KJ_EXPECT(*s == *expectedSize, *s);
} else {
KJ_FAIL_EXPECT("tryGetLength() returned nullptr; expected known size");
}
} else {
KJ_EXPECT(size == nullptr);
}
return requestBody.readAllText()
.then([this,&expectedRequest,&response,&responseSender](kj::String text) {
KJ_EXPECT(text == kj::strArray(expectedRequest.requestBodyParts, ""), text);
responseHeaders.clear();
for (auto& header: response.responseHeaders) {
responseHeaders.set(header.id, header.value);
}
auto stream = responseSender.send(response.statusCode, response.statusText,
responseHeaders, response.responseBodySize);
auto promise = writeEach(*stream, response.responseBodyParts);
return promise.attach(kj::mv(stream));
});
}
private:
const HttpRequestTestCase* singleExpectedRequest;
const HttpResponseTestCase* singleResponse;
kj::ArrayPtr<const HttpTestCase> testCases;
HttpHeaders responseHeaders;
uint requestCount = 0;
};
void testHttpServerRequest(kj::AsyncIoContext& io,
const HttpRequestTestCase& requestCase,
const HttpResponseTestCase& responseCase) {
auto pipe = io.provider->newTwoWayPipe();
HttpHeaderTable table;
TestHttpService service(requestCase, responseCase, table);
HttpServer server(io.provider->getTimer(), table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
pipe.ends[1]->write(requestCase.raw.begin(), requestCase.raw.size()).wait(io.waitScope);
pipe.ends[1]->shutdownWrite();
expectRead(*pipe.ends[1], responseCase.raw).wait(io.waitScope);
listenTask.wait(io.waitScope);
KJ_EXPECT(service.getRequestCount() == 1);
}
auto HUGE_STRING = kj::strArray(kj::repeat("abcdefgh", 4096), "");
auto HUGE_REQUEST = kj::str(
"GET / HTTP/1.1\r\n"
"Host: ", HUGE_STRING, "\r\n"
"\r\n");
static const HttpRequestTestCase REQUEST_TEST_CASES[] {
{
"GET /foo/bar HTTP/1.1\r\n"
"Host: example.com\r\n"
"\r\n",
HttpMethod::GET,
"/foo/bar",
{{HttpHeaderId::HOST, "example.com"}},
nullptr, {},
},
{
"HEAD /foo/bar HTTP/1.1\r\n"
"Host: example.com\r\n"
"\r\n",
HttpMethod::HEAD,
"/foo/bar",
{{HttpHeaderId::HOST, "example.com"}},
nullptr, {},
},
{
"POST / HTTP/1.1\r\n"
"Content-Length: 9\r\n"
"Host: example.com\r\n"
"Content-Type: text/plain\r\n"
"\r\n"
"foobarbaz",
HttpMethod::POST,
"/",
{
{HttpHeaderId::HOST, "example.com"},
{HttpHeaderId::CONTENT_TYPE, "text/plain"},
},
9, { "foo", "bar", "baz" },
},
{
"POST / HTTP/1.1\r\n"
"Transfer-Encoding: chunked\r\n"
"Host: example.com\r\n"
"Content-Type: text/plain\r\n"
"\r\n"
"3\r\n"
"foo\r\n"
"6\r\n"
"barbaz\r\n"
"0\r\n"
"\r\n",
HttpMethod::POST,
"/",
{
{HttpHeaderId::HOST, "example.com"},
{HttpHeaderId::CONTENT_TYPE, "text/plain"},
},
nullptr, { "foo", "barbaz" },
},
{
"POST / HTTP/1.1\r\n"
"Transfer-Encoding: chunked\r\n"
"Host: example.com\r\n"
"Content-Type: text/plain\r\n"
"\r\n"
"1d\r\n"
"0123456789abcdef0123456789abc\r\n"
"0\r\n"
"\r\n",
HttpMethod::POST,
"/",
{
{HttpHeaderId::HOST, "example.com"},
{HttpHeaderId::CONTENT_TYPE, "text/plain"},
},
nullptr, { "0123456789abcdef0123456789abc" },
},
{
"POST /foo/bar HTTP/1.1\r\n"
"Host: example.com\r\n"
"Connection: close\r\n"
"\r\n"
"baz qux corge grault",
HttpMethod::POST,
"/foo/bar",
{{HttpHeaderId::HOST, "example.com"}},
nullptr, { "baz qux corge grault" },
SERVER_ONLY, // Client never sends connection: close
},
{
HUGE_REQUEST,
HttpMethod::GET,
"/",
{{HttpHeaderId::HOST, HUGE_STRING}},
nullptr, {}
},
};
static const HttpResponseTestCase RESPONSE_TEST_CASES[] {
{
"HTTP/1.1 200 OK\r\n"
"Content-Type: text/plain\r\n"
"Connection: close\r\n"
"\r\n"
"baz qux",
200, "OK",
{{HttpHeaderId::CONTENT_TYPE, "text/plain"}},
nullptr, {"baz qux"},
HttpMethod::GET,
CLIENT_ONLY, // Server never sends connection: close
},
{
"HTTP/1.1 200 OK\r\n"
"Content-Length: 123\r\n"
"Content-Type: text/plain\r\n"
"\r\n",
200, "OK",
{{HttpHeaderId::CONTENT_TYPE, "text/plain"}},
123, {},
HttpMethod::HEAD,
},
{
"HTTP/1.1 200 OK\r\n"
"Content-Length: 8\r\n"
"Content-Type: text/plain\r\n"
"\r\n"
"quxcorge",
200, "OK",
{{HttpHeaderId::CONTENT_TYPE, "text/plain"}},
8, { "qux", "corge" }
},
{
"HTTP/1.1 200 OK\r\n"
"Transfer-Encoding: chunked\r\n"
"Content-Type: text/plain\r\n"
"\r\n"
"3\r\n"
"qux\r\n"
"5\r\n"
"corge\r\n"
"0\r\n"
"\r\n",
200, "OK",
{{HttpHeaderId::CONTENT_TYPE, "text/plain"}},
nullptr, { "qux", "corge" }
},
};
KJ_TEST("HttpClient requests") {
auto io = kj::setupAsyncIo();
for (auto& testCase: REQUEST_TEST_CASES) {
if (testCase.side == SERVER_ONLY) continue;
KJ_CONTEXT(testCase.raw);
testHttpClientRequest(io, testCase);
}
}
KJ_TEST("HttpClient responses") {
auto io = kj::setupAsyncIo();
size_t FRAGMENT_SIZES[] = { 1, 2, 3, 4, 5, 6, 7, 8, 16, 31, kj::maxValue };
for (auto& testCase: RESPONSE_TEST_CASES) {
if (testCase.side == SERVER_ONLY) continue;
for (size_t fragmentSize: FRAGMENT_SIZES) {
KJ_CONTEXT(testCase.raw, fragmentSize);
testHttpClientResponse(io, testCase, fragmentSize);
}
}
}
KJ_TEST("HttpServer requests") {
HttpResponseTestCase RESPONSE = {
"HTTP/1.1 200 OK\r\n"
"Content-Length: 3\r\n"
"\r\n"
"foo",
200, "OK",
{},
3, {"foo"}
};
HttpResponseTestCase HEAD_RESPONSE = {
"HTTP/1.1 200 OK\r\n"
"Content-Length: 3\r\n"
"\r\n",
200, "OK",
{},
3, {"foo"}
};
auto io = kj::setupAsyncIo();
for (auto& testCase: REQUEST_TEST_CASES) {
if (testCase.side == CLIENT_ONLY) continue;
KJ_CONTEXT(testCase.raw);
testHttpServerRequest(io, testCase,
testCase.method == HttpMethod::HEAD ? HEAD_RESPONSE : RESPONSE);
}
}
KJ_TEST("HttpServer responses") {
HttpRequestTestCase REQUEST = {
"GET / HTTP/1.1\r\n"
"\r\n",
HttpMethod::GET,
"/",
{},
nullptr, {},
};
HttpRequestTestCase HEAD_REQUEST = {
"HEAD / HTTP/1.1\r\n"
"\r\n",
HttpMethod::HEAD,
"/",
{},
nullptr, {},
};
auto io = kj::setupAsyncIo();
for (auto& testCase: RESPONSE_TEST_CASES) {
if (testCase.side == CLIENT_ONLY) continue;
KJ_CONTEXT(testCase.raw);
testHttpServerRequest(io,
testCase.method == HttpMethod::HEAD ? HEAD_REQUEST : REQUEST, testCase);
}
}
// -----------------------------------------------------------------------------
static const HttpTestCase PIPELINE_TESTS[] = {
{
{
"GET / HTTP/1.1\r\n"
"\r\n",
HttpMethod::GET, "/", {}, nullptr, {},
},
{
"HTTP/1.1 200 OK\r\n"
"Content-Length: 7\r\n"
"\r\n"
"foo bar",
200, "OK", {}, 7, { "foo bar" }
},
},
{
{
"POST /foo HTTP/1.1\r\n"
"Content-Length: 6\r\n"
"\r\n"
"grault",
HttpMethod::POST, "/foo", {}, 6, { "grault" },
},
{
"HTTP/1.1 404 Not Found\r\n"
"Content-Length: 13\r\n"
"\r\n"
"baz qux corge",
404, "Not Found", {}, 13, { "baz qux corge" }
},
},
{
{
"POST /bar HTTP/1.1\r\n"
"Transfer-Encoding: chunked\r\n"
"\r\n"
"6\r\n"
"garply\r\n"
"5\r\n"
"waldo\r\n"
"0\r\n"
"\r\n",
HttpMethod::POST, "/bar", {}, nullptr, { "garply", "waldo" },
},
{
"HTTP/1.1 200 OK\r\n"
"Transfer-Encoding: chunked\r\n"
"\r\n"
"4\r\n"
"fred\r\n"
"5\r\n"
"plugh\r\n"
"0\r\n"
"\r\n",
200, "OK", {}, nullptr, { "fred", "plugh" }
},
},
{
{
"HEAD / HTTP/1.1\r\n"
"\r\n",
HttpMethod::HEAD, "/", {}, nullptr, {},
},
{
"HTTP/1.1 200 OK\r\n"
"Content-Length: 7\r\n"
"\r\n",
200, "OK", {}, 7, { "foo bar" }
},
},
};
KJ_TEST("HttpClient pipeline") {
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
auto readRequestsPromise = pipe.ends[1]->readAllText();
auto allRequestText =
kj::strArray(KJ_MAP(testCase, PIPELINE_TESTS) { return testCase.request.raw; }, "");
auto allResponseText =
kj::strArray(KJ_MAP(testCase, PIPELINE_TESTS) { return testCase.response.raw; }, "");
auto writeResponsesPromise = pipe.ends[1]->write(allResponseText.begin(), allResponseText.size());
HttpHeaderTable table;
auto client = newHttpClient(table, *pipe.ends[0]);
for (auto& testCase: PIPELINE_TESTS) {
KJ_CONTEXT(testCase.request.raw, testCase.response.raw);
HttpHeaders headers(table);
for (auto& header: testCase.request.requestHeaders) {
headers.set(header.id, header.value);
}
auto request = client->request(
testCase.request.method, testCase.request.path, headers, testCase.request.requestBodySize);
for (auto& part: testCase.request.requestBodyParts) {
request.body->write(part.begin(), part.size()).wait(io.waitScope);
}
request.body = nullptr;
auto response = request.response.wait(io.waitScope);
KJ_EXPECT(response.statusCode == testCase.response.statusCode);
auto body = response.body->readAllText().wait(io.waitScope);
if (testCase.request.method == HttpMethod::HEAD) {
KJ_EXPECT(body == "");
} else {
KJ_EXPECT(body == kj::strArray(testCase.response.responseBodyParts, ""), body);
}
}
client = nullptr;
pipe.ends[0]->shutdownWrite();
auto requests = readRequestsPromise.wait(io.waitScope);
KJ_EXPECT(requests == allRequestText, requests);
writeResponsesPromise.wait(io.waitScope);
}
KJ_TEST("HttpClient parallel pipeline") {
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
auto readRequestsPromise = pipe.ends[1]->readAllText();
auto allRequestText =
kj::strArray(KJ_MAP(testCase, PIPELINE_TESTS) { return testCase.request.raw; }, "");
auto allResponseText =
kj::strArray(KJ_MAP(testCase, PIPELINE_TESTS) { return testCase.response.raw; }, "");
auto writeResponsesPromise = pipe.ends[1]->write(allResponseText.begin(), allResponseText.size());
HttpHeaderTable table;
auto client = newHttpClient(table, *pipe.ends[0]);
auto responsePromises = KJ_MAP(testCase, PIPELINE_TESTS) {
KJ_CONTEXT(testCase.request.raw, testCase.response.raw);
HttpHeaders headers(table);
for (auto& header: testCase.request.requestHeaders) {
headers.set(header.id, header.value);
}
auto request = client->request(
testCase.request.method, testCase.request.path, headers, testCase.request.requestBodySize);
for (auto& part: testCase.request.requestBodyParts) {
request.body->write(part.begin(), part.size()).wait(io.waitScope);
}
return kj::mv(request.response);
};
for (auto i: kj::indices(PIPELINE_TESTS)) {
auto& testCase = PIPELINE_TESTS[i];
auto response = responsePromises[i].wait(io.waitScope);
KJ_EXPECT(response.statusCode == testCase.response.statusCode);
auto body = response.body->readAllText().wait(io.waitScope);
if (testCase.request.method == HttpMethod::HEAD) {
KJ_EXPECT(body == "");
} else {
KJ_EXPECT(body == kj::strArray(testCase.response.responseBodyParts, ""), body);
}
}
client = nullptr;
pipe.ends[0]->shutdownWrite();
auto requests = readRequestsPromise.wait(io.waitScope);
KJ_EXPECT(requests == allRequestText, requests);
writeResponsesPromise.wait(io.waitScope);
}
KJ_TEST("HttpServer pipeline") {
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
HttpHeaderTable table;
TestHttpService service(PIPELINE_TESTS, table);
HttpServer server(io.provider->getTimer(), table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
for (auto& testCase: PIPELINE_TESTS) {
KJ_CONTEXT(testCase.request.raw, testCase.response.raw);
pipe.ends[1]->write(testCase.request.raw.begin(), testCase.request.raw.size())
.wait(io.waitScope);
expectRead(*pipe.ends[1], testCase.response.raw).wait(io.waitScope);
}
pipe.ends[1]->shutdownWrite();
listenTask.wait(io.waitScope);
KJ_EXPECT(service.getRequestCount() == kj::size(PIPELINE_TESTS));
}
KJ_TEST("HttpServer parallel pipeline") {
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
auto allRequestText =
kj::strArray(KJ_MAP(testCase, PIPELINE_TESTS) { return testCase.request.raw; }, "");
auto allResponseText =
kj::strArray(KJ_MAP(testCase, PIPELINE_TESTS) { return testCase.response.raw; }, "");
HttpHeaderTable table;
TestHttpService service(PIPELINE_TESTS, table);
HttpServer server(io.provider->getTimer(), table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
pipe.ends[1]->write(allRequestText.begin(), allRequestText.size()).wait(io.waitScope);
pipe.ends[1]->shutdownWrite();
auto rawResponse = pipe.ends[1]->readAllText().wait(io.waitScope);
KJ_EXPECT(rawResponse == allResponseText, rawResponse);
listenTask.wait(io.waitScope);
KJ_EXPECT(service.getRequestCount() == kj::size(PIPELINE_TESTS));
}
KJ_TEST("HttpClient <-> HttpServer") {
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
HttpHeaderTable table;
TestHttpService service(PIPELINE_TESTS, table);
HttpServer server(io.provider->getTimer(), table, service);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[1]));
auto client = newHttpClient(table, *pipe.ends[0]);
for (auto& testCase: PIPELINE_TESTS) {
KJ_CONTEXT(testCase.request.raw, testCase.response.raw);
HttpHeaders headers(table);
for (auto& header: testCase.request.requestHeaders) {
headers.set(header.id, header.value);
}
auto request = client->request(
testCase.request.method, testCase.request.path, headers, testCase.request.requestBodySize);
for (auto& part: testCase.request.requestBodyParts) {
request.body->write(part.begin(), part.size()).wait(io.waitScope);
}
request.body = nullptr;
auto response = request.response.wait(io.waitScope);
KJ_EXPECT(response.statusCode == testCase.response.statusCode);
auto body = response.body->readAllText().wait(io.waitScope);
if (testCase.request.method == HttpMethod::HEAD) {
KJ_EXPECT(body == "");
} else {
KJ_EXPECT(body == kj::strArray(testCase.response.responseBodyParts, ""), body);
}
}
client = nullptr;
pipe.ends[0]->shutdownWrite();
listenTask.wait(io.waitScope);
KJ_EXPECT(service.getRequestCount() == kj::size(PIPELINE_TESTS));
}
// -----------------------------------------------------------------------------
KJ_TEST("HttpServer request timeout") {
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
HttpHeaderTable table;
TestHttpService service(PIPELINE_TESTS, table);
HttpServerSettings settings;
settings.headerTimeout = 1 * kj::MILLISECONDS;
HttpServer server(io.provider->getTimer(), table, service, settings);
// Shouldn't hang! Should time out.
server.listenHttp(kj::mv(pipe.ends[0])).wait(io.waitScope);
// Sends back 408 Request Timeout.
KJ_EXPECT(pipe.ends[1]->readAllText().wait(io.waitScope)
.startsWith("HTTP/1.1 408 Request Timeout"));
}
KJ_TEST("HttpServer pipeline timeout") {
auto io = kj::setupAsyncIo();
auto pipe = io.provider->newTwoWayPipe();
HttpHeaderTable table;
TestHttpService service(PIPELINE_TESTS, table);
HttpServerSettings settings;
settings.pipelineTimeout = 1 * kj::MILLISECONDS;
HttpServer server(io.provider->getTimer(), table, service, settings);
auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));
// Do one request.
pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
.wait(io.waitScope);
expectRead(*pipe.ends[1], PIPELINE_TESTS[0].response.raw).wait(io.waitScope);
// Listen task should time out even though we didn't shutdown the socket.
listenTask.wait(io.waitScope);
// In this case, no data is sent back.
KJ_EXPECT(pipe.ends[1]->readAllText().wait(io.waitScope) == "");
}
// -----------------------------------------------------------------------------
KJ_TEST("HttpClient to capnproto.org") {
auto io = kj::setupAsyncIo();
auto maybeConn = io.provider->getNetwork().parseAddress("capnproto.org", 80)
.then([](kj::Own<kj::NetworkAddress> addr) {
auto promise = addr->connect();
return promise.attach(kj::mv(addr));
}).then([](kj::Own<kj::AsyncIoStream>&& connection) -> kj::Maybe<kj::Own<kj::AsyncIoStream>> {
return kj::mv(connection);
}, [](kj::Exception&& e) -> kj::Maybe<kj::Own<kj::AsyncIoStream>> {
KJ_LOG(WARNING, "skipping test because couldn't connect to capnproto.org");
return nullptr;
}).wait(io.waitScope);
KJ_IF_MAYBE(conn, maybeConn) {
// Successfully connected to capnproto.org. Try doing GET /. We expect to get a redirect to
// HTTPS, because what kind of horrible web site would serve in plaintext, really?
HttpHeaderTable table;
auto client = newHttpClient(table, **conn);
HttpHeaders headers(table);
headers.set(HttpHeaderId::HOST, "capnproto.org");
auto response = client->request(HttpMethod::GET, "/", headers).response.wait(io.waitScope);
KJ_EXPECT(response.statusCode / 100 == 3);
auto location = KJ_ASSERT_NONNULL(response.headers->get(HttpHeaderId::LOCATION));
KJ_EXPECT(location == "https://capnproto.org/");
auto body = response.body->readAllText().wait(io.waitScope);
}
}
} // namespace
} // namespace kj
// Copyright (c) 2017 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#include "http.h"
#include <kj/debug.h>
#include <kj/parse/char.h>
#include <unordered_map>
namespace kj {
static const char* METHOD_NAMES[] = {
#define METHOD_NAME(id) #id,
KJ_HTTP_FOR_EACH_METHOD(METHOD_NAME)
#undef METHOD_NAME
};
kj::StringPtr KJ_STRINGIFY(HttpMethod method) {
return METHOD_NAMES[static_cast<uint>(method)];
}
static kj::Maybe<HttpMethod> consumeHttpMethod(char*& ptr) {
char* p = ptr;
#define EXPECT_REST(prefix, suffix) \
if (memcmp(p, #suffix, sizeof(#suffix)-1) == 0) { \
ptr = p + (sizeof(#suffix)-1); \
return HttpMethod::prefix##suffix; \
} else { \
return nullptr; \
}
switch (*p++) {
case 'C':
switch (*p++) {
case 'H': EXPECT_REST(CH,ECKOUT)
case 'O': EXPECT_REST(CO,PY)
default: return nullptr;
}
case 'D': EXPECT_REST(D,ELETE)
case 'G': EXPECT_REST(G,ET)
case 'H': EXPECT_REST(H,EAD)
case 'L': EXPECT_REST(L,OCK)
case 'M':
switch (*p++) {
case 'E': EXPECT_REST(ME,RGE)
case 'K':
switch (*p++) {
case 'A': EXPECT_REST(MKA,CTIVITY)
case 'C': EXPECT_REST(MKC,OL)
default: return nullptr;
}
case 'O': EXPECT_REST(MO,VE)
case 'S': EXPECT_REST(MS,EARCH)
default: return nullptr;
}
case 'N': EXPECT_REST(N,OTIFY)
case 'O': EXPECT_REST(O,PTIONS)
case 'P':
switch (*p++) {
case 'A': EXPECT_REST(PA,TCH)
case 'O': EXPECT_REST(PO,ST)
case 'R':
if (*p++ != 'O' || *p++ != 'P') return nullptr;
switch (*p++) {
case 'F': EXPECT_REST(PROPF,IND)
case 'P': EXPECT_REST(PROPP,ATCH)
default: return nullptr;
}
case 'U':
switch (*p++) {
case 'R': EXPECT_REST(PUR,GE)
case 'T': EXPECT_REST(PUT,)
default: return nullptr;
}
default: return nullptr;
}
case 'R': EXPECT_REST(R,EPORT)
case 'S':
switch (*p++) {
case 'E': EXPECT_REST(SE,ARCH)
case 'U': EXPECT_REST(SU,BSCRIBE)
default: return nullptr;
}
case 'T': EXPECT_REST(T,RACE)
case 'U':
if (*p++ != 'N') return nullptr;
switch (*p++) {
case 'L': EXPECT_REST(UNL,OCK)
case 'S': EXPECT_REST(UNS,UBSCRIBE)
default: return nullptr;
}
default: return nullptr;
}
#undef EXPECT_REST
}
kj::Maybe<HttpMethod> tryParseHttpMethod(kj::StringPtr name) {
// const_cast OK because we don't actually access it. consumeHttpMethod() is also called by some
// code later than explicitly needs to use a non-const pointer.
char* ptr = const_cast<char*>(name.begin());
auto result = consumeHttpMethod(ptr);
if (*ptr == '\0') {
return result;
} else {
return nullptr;
}
}
// =======================================================================================
namespace {
constexpr auto HTTP_SEPARATOR_CHARS = kj::parse::anyOfChars("()<>@,;:\\\"/[]?={} \t");
// RFC2616 section 2.2: https://www.w3.org/Protocols/rfc2616/rfc2616-sec2.html#sec2.2
constexpr auto HTTP_TOKEN_CHARS =
kj::parse::controlChar.orChar('\x7f')
.orGroup(kj::parse::whitespaceChar)
.orGroup(HTTP_SEPARATOR_CHARS)
.invert();
// RFC2616 section 2.2: https://www.w3.org/Protocols/rfc2616/rfc2616-sec2.html#sec2.2
constexpr auto HTTP_HEADER_NAME_CHARS = HTTP_TOKEN_CHARS;
// RFC2616 section 4.2: https://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.2
static void requireValidHeaderName(kj::StringPtr name) {
for (char c: name) {
KJ_REQUIRE(HTTP_HEADER_NAME_CHARS.contains(c), "invalid header name", name);
}
}
static void requireValidHeaderValue(kj::StringPtr value) {
for (char c: value) {
KJ_REQUIRE(c >= 0x20, "invalid header value", value);
}
}
static const char* BUILTIN_HEADER_NAMES[] = {
// Indexed by header ID, which includes connection headers, so we include those names too.
#define HEADER_NAME(id, name) name,
KJ_HTTP_FOR_EACH_BUILTIN_HEADER(HEADER_NAME)
#undef HEADER_NAME
};
enum class BuiltinHeaderIndices {
#define HEADER_ID(id, name) id,
KJ_HTTP_FOR_EACH_BUILTIN_HEADER(HEADER_ID)
#undef HEADER_ID
};
static constexpr size_t CONNECTION_HEADER_COUNT = 0
#define COUNT_HEADER(id, name) + 1
KJ_HTTP_FOR_EACH_CONNECTION_HEADER(COUNT_HEADER)
#undef COUNT_HEADER
;
enum class ConnectionHeaderIndices {
#define HEADER_ID(id, name) id,
KJ_HTTP_FOR_EACH_CONNECTION_HEADER(HEADER_ID)
#undef HEADER_ID
};
static constexpr uint CONNECTION_HEADER_XOR = kj::maxValue;
static constexpr uint CONNECTION_HEADER_THRESHOLD = CONNECTION_HEADER_XOR >> 1;
} // namespace
#define DEFINE_HEADER(id, name) \
const HttpHeaderId HttpHeaderId::id(nullptr, static_cast<uint>(BuiltinHeaderIndices::id));
KJ_HTTP_FOR_EACH_BUILTIN_HEADER(DEFINE_HEADER)
#undef DEFINE_HEADER
kj::StringPtr HttpHeaderId::toString() const {
if (table == nullptr) {
KJ_ASSERT(id < kj::size(BUILTIN_HEADER_NAMES));
return BUILTIN_HEADER_NAMES[id];
} else {
return table->idToString(*this);
}
}
namespace {
struct HeaderNameHash {
size_t operator()(kj::StringPtr s) const {
size_t result = 5381;
for (byte b: s.asBytes()) {
// Masking bit 0x20 makes our hash case-insensitive while conveniently avoiding any
// collisions that would matter for header names.
result = ((result << 5) + result) ^ (b & ~0x20);
}
return result;
}
bool operator()(kj::StringPtr a, kj::StringPtr b) const {
// TODO(perf): I wonder if we can beat strcasecmp() by masking bit 0x20 from each byte. We'd
// need to prohibit one of the technically-legal characters '^' or '~' from header names
// since they'd otherwise be ambiguous, but otherwise there is no ambiguity.
return strcasecmp(a.cStr(), b.cStr()) == 0;
}
};
} // namespace
struct HttpHeaderTable::IdsByNameMap {
// TODO(perf): If we were cool we could maybe use a perfect hash here, since our hashtable is
// static once built.
std::unordered_map<kj::StringPtr, uint, HeaderNameHash, HeaderNameHash> map;
};
HttpHeaderTable::Builder::Builder()
: table(kj::heap<HttpHeaderTable>()) {}
HttpHeaderId HttpHeaderTable::Builder::add(kj::StringPtr name) {
requireValidHeaderName(name);
auto insertResult = table->idsByName->map.insert(std::make_pair(name, table->namesById.size()));
if (insertResult.second) {
table->namesById.add(name);
}
return HttpHeaderId(table, insertResult.first->second);
}
HttpHeaderTable::HttpHeaderTable()
: idsByName(kj::heap<IdsByNameMap>()) {
#define ADD_HEADER(id, name) \
idsByName->map.insert(std::make_pair(name, \
static_cast<uint>(ConnectionHeaderIndices::id) ^ CONNECTION_HEADER_XOR));
KJ_HTTP_FOR_EACH_CONNECTION_HEADER(ADD_HEADER);
#undef ADD_HEADER
#define ADD_HEADER(id, name) \
namesById.add(name); \
idsByName->map.insert(std::make_pair(name, static_cast<uint>(BuiltinHeaderIndices::id)));
KJ_HTTP_FOR_EACH_BUILTIN_HEADER(ADD_HEADER);
#undef ADD_HEADER
}
HttpHeaderTable::~HttpHeaderTable() noexcept(false) {}
kj::Maybe<HttpHeaderId> HttpHeaderTable::stringToId(kj::StringPtr name) {
auto iter = idsByName->map.find(name);
if (iter == idsByName->map.end()) {
return nullptr;
} else {
return HttpHeaderId(this, iter->second);
}
}
// =======================================================================================
HttpHeaders::HttpHeaders(HttpHeaderTable& table)
: table(&table),
indexedHeaders(kj::heapArray<kj::StringPtr>(table.idCount())) {}
void HttpHeaders::clear() {
for (auto& header: indexedHeaders) {
header = nullptr;
}
unindexedHeaders.clear();
}
HttpHeaders HttpHeaders::clone() const {
HttpHeaders result(*table);
for (auto i: kj::indices(indexedHeaders)) {
if (indexedHeaders[i] != nullptr) {
result.indexedHeaders[i] = result.cloneToOwn(indexedHeaders[i]);
}
}
result.unindexedHeaders.resize(unindexedHeaders.size());
for (auto i: kj::indices(unindexedHeaders)) {
result.unindexedHeaders[i].name = result.cloneToOwn(unindexedHeaders[i].name);
result.unindexedHeaders[i].value = result.cloneToOwn(unindexedHeaders[i].value);
}
return result;
}
HttpHeaders HttpHeaders::cloneShallow() const {
HttpHeaders result(*table);
for (auto i: kj::indices(indexedHeaders)) {
if (indexedHeaders[i] != nullptr) {
result.indexedHeaders[i] = indexedHeaders[i];
}
}
result.unindexedHeaders.resize(unindexedHeaders.size());
for (auto i: kj::indices(unindexedHeaders)) {
result.unindexedHeaders[i] = unindexedHeaders[i];
}
return result;
}
kj::StringPtr HttpHeaders::cloneToOwn(kj::StringPtr str) {
auto copy = kj::heapString(str);
kj::StringPtr result = copy;
ownedStrings.add(copy.releaseArray());
return result;
}
void HttpHeaders::set(HttpHeaderId id, kj::StringPtr value) {
id.requireFrom(*table);
requireValidHeaderValue(value);
indexedHeaders[id.id] = value;
}
void HttpHeaders::add(kj::StringPtr name, kj::StringPtr value) {
requireValidHeaderName(name);
requireValidHeaderValue(value);
KJ_REQUIRE(addNoCheck(name, value) == nullptr,
"can't set connection-level headers on HttpHeaders", name, value) { break; }
}
kj::Maybe<uint> HttpHeaders::addNoCheck(kj::StringPtr name, kj::StringPtr value) {
KJ_IF_MAYBE(id, table->stringToId(name)) {
if (id->id > CONNECTION_HEADER_THRESHOLD) {
return id->id ^ CONNECTION_HEADER_XOR;
}
if (indexedHeaders[id->id] == nullptr) {
indexedHeaders[id->id] = value;
} else {
// Duplicate HTTP headers are equivalent to the values being separated by a comma.
auto concat = kj::str(indexedHeaders[id->id], ", ", value);
indexedHeaders[id->id] = concat;
ownedStrings.add(concat.releaseArray());
}
} else {
unindexedHeaders.add(Header {name, value});
}
return nullptr;
}
void HttpHeaders::takeOwnership(kj::String&& string) {
ownedStrings.add(string.releaseArray());
}
void HttpHeaders::takeOwnership(kj::Array<char>&& chars) {
ownedStrings.add(kj::mv(chars));
}
void HttpHeaders::takeOwnership(HttpHeaders&& otherHeaders) {
for (auto& str: otherHeaders.ownedStrings) {
ownedStrings.add(kj::mv(str));
}
otherHeaders.ownedStrings.clear();
}
// -----------------------------------------------------------------------------
static inline char* skipSpace(char* p) {
for (;;) {
switch (*p) {
case '\t':
case ' ':
++p;
break;
default:
return p;
}
}
}
static kj::Maybe<kj::StringPtr> consumeWord(char*& ptr) {
char* start = skipSpace(ptr);
char* p = start;
for (;;) {
switch (*p) {
case '\0':
ptr = p;
return kj::StringPtr(start, p);
case '\t':
case ' ': {
char* end = p++;
ptr = p;
*end = '\0';
return kj::StringPtr(start, end);
}
case '\n':
case '\r':
// Not expecting EOL!
return nullptr;
default:
++p;
break;
}
}
}
static kj::Maybe<uint> consumeNumber(char*& ptr) {
char* start = skipSpace(ptr);
char* p = start;
uint result = 0;
for (;;) {
char c = *p;
if ('0' <= c && c <= '9') {
result = result * 10 + (c - '0');
++p;
} else {
if (p == start) return nullptr;
ptr = p;
return result;
}
}
}
static kj::StringPtr consumeLine(char*& ptr) {
char* start = skipSpace(ptr);
char* p = start;
for (;;) {
switch (*p) {
case '\0':
ptr = p;
return kj::StringPtr(start, p);
case '\r': {
char* end = p++;
if (*p == '\n') ++p;
if (*p == ' ' || *p == '\t') {
// Whoa, continuation line. These are deprecated, but historically a line starting with
// a space was treated as a continuation of the previous line. The behavior should be
// the same as if the \r\n were replaced with spaces, so let's do that here to prevent
// confusion later.
*end = ' ';
p[-1] = ' ';
break;
}
ptr = p;
*end = '\0';
return kj::StringPtr(start, end);
}
case '\n': {
char* end = p++;
if (*p == ' ' || *p == '\t') {
// Whoa, continuation line. These are deprecated, but historically a line starting with
// a space was treated as a continuation of the previous line. The behavior should be
// the same as if the \n were replaced with spaces, so let's do that here to prevent
// confusion later.
*end = ' ';
break;
}
ptr = p;
*end = '\0';
return kj::StringPtr(start, end);
}
default:
++p;
break;
}
}
}
static kj::Maybe<kj::StringPtr> consumeHeaderName(char*& ptr) {
// Do NOT skip spaces before the header name. Leading spaces indicate a continuation line; they
// should have been handled in consumeLine().
char* p = ptr;
char* start = p;
while (HTTP_HEADER_NAME_CHARS.contains(*p)) ++p;
char* end = p;
p = skipSpace(p);
if (end == start || *p != ':') return nullptr;
++p;
p = skipSpace(p);
*end = '\0';
ptr = p;
return kj::StringPtr(start, end);
}
static char* trimHeaderEnding(kj::ArrayPtr<char> content) {
// Trim off the trailing \r\n from a header blob.
if (content.size() < 2) return nullptr;
// Remove trailing \r\n\r\n and replace with \0 sentinel char.
char* end = content.end();
if (end[-1] != '\n') return nullptr;
--end;
if (end[-1] == '\r') --end;
*end = '\0';
return end;
}
kj::Maybe<HttpHeaders::Request> HttpHeaders::tryParseRequest(kj::ArrayPtr<char> content) {
char* end = trimHeaderEnding(content);
if (end == nullptr) return nullptr;
char* ptr = content.begin();
HttpHeaders::Request request;
KJ_IF_MAYBE(method, consumeHttpMethod(ptr)) {
request.method = *method;
if (*ptr != ' ' && *ptr != '\t') {
return nullptr;
}
++ptr;
} else {
return nullptr;
}
KJ_IF_MAYBE(path, consumeWord(ptr)) {
request.url = *path;
} else {
return nullptr;
}
// Ignore rest of line. Don't care about "HTTP/1.1" or whatever.
consumeLine(ptr);
if (!parseHeaders(ptr, end, request.connectionHeaders)) return nullptr;
return request;
}
kj::Maybe<HttpHeaders::Response> HttpHeaders::tryParseResponse(kj::ArrayPtr<char> content) {
char* end = trimHeaderEnding(content);
if (end == nullptr) return nullptr;
char* ptr = content.begin();
HttpHeaders::Response response;
KJ_IF_MAYBE(version, consumeWord(ptr)) {
if (!version->startsWith("HTTP/")) return nullptr;
} else {
return nullptr;
}
KJ_IF_MAYBE(code, consumeNumber(ptr)) {
response.statusCode = *code;
} else {
return nullptr;
}
response.statusText = consumeLine(ptr);
if (!parseHeaders(ptr, end, response.connectionHeaders)) return nullptr;
return response;
}
bool HttpHeaders::parseHeaders(char* ptr, char* end, ConnectionHeaders& connectionHeaders) {
while (*ptr != '\0') {
KJ_IF_MAYBE(name, consumeHeaderName(ptr)) {
kj::StringPtr line = consumeLine(ptr);
KJ_IF_MAYBE(connectionHeaderId, addNoCheck(*name, line)) {
// Parsed a connection header.
switch (*connectionHeaderId) {
#define HANDLE_HEADER(id, name) \
case static_cast<uint>(ConnectionHeaderIndices::id): \
connectionHeaders.id = line; \
break;
KJ_HTTP_FOR_EACH_CONNECTION_HEADER(HANDLE_HEADER)
#undef HANDLE_HEADER
default:
KJ_UNREACHABLE;
}
}
} else {
return false;
}
}
return ptr == end;
}
// -----------------------------------------------------------------------------
kj::String HttpHeaders::serializeRequest(HttpMethod method, kj::StringPtr url,
const ConnectionHeaders& connectionHeaders) const {
return serialize(kj::toCharSequence(method), url, kj::StringPtr("HTTP/1.1"), connectionHeaders);
}
kj::String HttpHeaders::serializeResponse(uint statusCode, kj::StringPtr statusText,
const ConnectionHeaders& connectionHeaders) const {
auto statusCodeStr = kj::toCharSequence(statusCode);
return serialize(kj::StringPtr("HTTP/1.1"), statusCodeStr, statusText, connectionHeaders);
}
kj::String HttpHeaders::serialize(kj::ArrayPtr<const char> word1,
kj::ArrayPtr<const char> word2,
kj::ArrayPtr<const char> word3,
const ConnectionHeaders& connectionHeaders) const {
const kj::StringPtr space = " ";
const kj::StringPtr newline = "\r\n";
const kj::StringPtr colon = ": ";
size_t size = 2; // final \r\n
if (word1 != nullptr) {
size += word1.size() + word2.size() + word3.size() + 4;
}
#define HANDLE_HEADER(id, name) \
if (connectionHeaders.id != nullptr) { \
size += connectionHeaders.id.size() + (sizeof(name) + 3); \
}
KJ_HTTP_FOR_EACH_CONNECTION_HEADER(HANDLE_HEADER)
#undef HANDLE_HEADER
for (auto i: kj::indices(indexedHeaders)) {
if (indexedHeaders[i] != nullptr) {
size += table->idToString(HttpHeaderId(table, i)).size() + indexedHeaders[i].size() + 4;
}
}
for (auto& header: unindexedHeaders) {
size += header.name.size() + header.value.size() + 4;
}
String result = heapString(size);
char* ptr = result.begin();
if (word1 != nullptr) {
ptr = kj::_::fill(ptr, word1, space, word2, space, word3, newline);
}
#define HANDLE_HEADER(id, name) \
if (connectionHeaders.id != nullptr) { \
ptr = kj::_::fill(ptr, kj::StringPtr(name), colon, connectionHeaders.id, newline); \
}
KJ_HTTP_FOR_EACH_CONNECTION_HEADER(HANDLE_HEADER)
#undef HANDLE_HEADER
for (auto i: kj::indices(indexedHeaders)) {
if (indexedHeaders[i] != nullptr) {
ptr = kj::_::fill(ptr, table->idToString(HttpHeaderId(table, i)), colon,
indexedHeaders[i], newline);
}
}
for (auto& header: unindexedHeaders) {
ptr = kj::_::fill(ptr, header.name, colon, header.value, newline);
}
ptr = kj::_::fill(ptr, newline);
KJ_ASSERT(ptr == result.end());
return result;
}
kj::String HttpHeaders::toString() const {
return serialize(nullptr, nullptr, nullptr, {});
}
// =======================================================================================
namespace {
static constexpr size_t MIN_BUFFER = 4096;
static constexpr size_t MAX_BUFFER = 65536;
static constexpr size_t MAX_CHUNK_HEADER_SIZE = 32;
class HttpInputStream {
public:
explicit HttpInputStream(AsyncIoStream& inner, HttpHeaderTable& table)
: inner(inner), headerBuffer(kj::heapArray<char>(MIN_BUFFER)), headers(table) {
}
// ---------------------------------------------------------------------------
// Stream locking: While an entity-body is being read, the body stream "locks" the underlying
// HTTP stream. Once the entity-body is complete, we can read the next pipelined message.
void finishRead() {
// Called when entire request has been read.
KJ_REQUIRE_NONNULL(onMessageDone)->fulfill();
onMessageDone = nullptr;
}
void abortRead() {
// Called when a body input stream was destroyed without reading to the end.
KJ_REQUIRE_NONNULL(onMessageDone)->reject(KJ_EXCEPTION(FAILED,
"client did not finish reading previous HTTP response body",
"can't read next pipelined response"));
onMessageDone = nullptr;
}
// ---------------------------------------------------------------------------
kj::Promise<bool> awaitNextMessage() {
// Waits until more data is available, but doesn't consume it. Only meant for server-side use,
// after a request is handled, to check for pipelined requests. Returns false on EOF.
if (leftover != nullptr) {
return true;
}
return inner.tryRead(headerBuffer.begin(), 1, headerBuffer.size())
.then([this](size_t amount) {
if (amount > 0) {
leftover = headerBuffer.slice(0, amount);
return true;
} else {
return false;
}
});
}
kj::Promise<kj::ArrayPtr<char>> readMessageHeaders() {
auto paf = kj::newPromiseAndFulfiller<void>();
auto promise = messageReadQueue
.then(kj::mvCapture(paf.fulfiller, [this](kj::Own<kj::PromiseFulfiller<void>> fulfiller) {
onMessageDone = kj::mv(fulfiller);
return readHeader(HeaderType::MESSAGE, 0, 0);
}));
messageReadQueue = kj::mv(paf.promise);
return promise;
}
kj::Promise<uint64_t> readChunkHeader() {
KJ_REQUIRE(onMessageDone != nullptr);
// We use the portion of the header after the end of message headers.
return readHeader(HeaderType::CHUNK, messageHeaderEnd, messageHeaderEnd)
.then([](kj::ArrayPtr<char> text) -> uint64_t {
KJ_REQUIRE(text.size() > 0) { break; }
uint64_t value = 0;
for (char c: text) {
if ('0' <= c && c <= '9') {
value = value * 16 + (c - '0');
} else if ('a' <= c && c <= 'f') {
value = value * 16 + (c - 'a' + 10);
} else if ('A' <= c && c <= 'F') {
value = value * 16 + (c - 'A' + 10);
} else {
KJ_FAIL_REQUIRE("invalid HTTP chunk size", text, text.asBytes()) {
return value;
}
}
}
return value;
});
}
inline kj::Promise<kj::Maybe<HttpHeaders::Request>> readRequestHeaders() {
headers.clear();
return readMessageHeaders().then([this](kj::ArrayPtr<char> text) {
return headers.tryParseRequest(text);
});
}
inline kj::Promise<kj::Maybe<HttpHeaders::Response>> readResponseHeaders() {
headers.clear();
return readMessageHeaders().then([this](kj::ArrayPtr<char> text) {
return headers.tryParseResponse(text);
});
}
inline const HttpHeaders& getHeaders() const { return headers; }
Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) {
// Read message body data.
KJ_REQUIRE(onMessageDone != nullptr);
if (leftover == nullptr) {
// No leftovers. Forward directly to inner stream.
return inner.tryRead(buffer, minBytes, maxBytes);
} else if (leftover.size() >= maxBytes) {
// Didn't even read the entire leftover buffer.
memcpy(buffer, leftover.begin(), maxBytes);
leftover = leftover.slice(maxBytes, leftover.size());
return maxBytes;
} else {
// Read the entire leftover buffer, plus some.
memcpy(buffer, leftover.begin(), leftover.size());
size_t copied = leftover.size();
leftover = nullptr;
if (copied >= minBytes) {
// Got enough to stop here.
return copied;
} else {
// Read the rest from the underlying stream.
return inner.tryRead(reinterpret_cast<byte*>(buffer) + copied,
minBytes - copied, maxBytes - copied)
.then([copied](size_t n) { return n + copied; });
}
}
}
enum RequestOrResponse {
REQUEST,
RESPONSE
};
kj::Own<kj::AsyncInputStream> getEntityBody(
RequestOrResponse type, HttpMethod method, uint statusCode,
HttpHeaders::ConnectionHeaders& connectionHeaders);
private:
AsyncIoStream& inner;
kj::Array<char> headerBuffer;
size_t messageHeaderEnd = 0;
// Position in headerBuffer where the message headers end -- further buffer space can
// be used for chunk headers.
kj::ArrayPtr<char> leftover;
// Data in headerBuffer that comes immediately after the header content, if any.
HttpHeaders headers;
// Parsed headers, after a call to parseAwaited*().
bool lineBreakBeforeNextHeader = false;
// If true, the next await should expect to start with a spurrious '\n' or '\r\n'. This happens
// as a side-effect of HTTP chunked encoding, where such a newline is added to the end of each
// chunk, for no good reason.
kj::Promise<void> messageReadQueue = kj::READY_NOW;
kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> onMessageDone;
// Fulfill once the current message has been completely read. Unblocks reading of the next
// message headers.
enum class HeaderType {
MESSAGE,
CHUNK
};
kj::Promise<kj::ArrayPtr<char>> readHeader(
HeaderType type, size_t bufferStart, size_t bufferEnd) {
// Reads the HTTP message header or a chunk header (as in transfer-encoding chunked) and
// returns the buffer slice containing it.
//
// The main source of complication here is that we want to end up with one continuous buffer
// containing the result, and that the input is delimited by newlines rather than by an upfront
// length.
kj::Promise<size_t> readPromise = nullptr;
// Figure out where we're reading from.
if (leftover != nullptr) {
// Some data is still left over from the previous message, so start with that.
// This can only happen if this is the initial call to readHeader() (not recursive).
KJ_ASSERT(bufferStart == bufferEnd);
// OK, set bufferStart and bufferEnd to both point to the start of the leftover, and then
// fake a read promise as if we read the bytes from the leftover.
bufferStart = leftover.begin() - headerBuffer.begin();
bufferEnd = bufferStart;
readPromise = leftover.size();
leftover = nullptr;
} else {
// Need to read more data from the unfderlying stream.
if (bufferEnd == headerBuffer.size()) {
// Out of buffer space.
// Maybe we can move bufferStart backwards to make more space at the end?
size_t minStart = type == HeaderType::MESSAGE ? 0 : messageHeaderEnd;
if (bufferStart > minStart) {
// Move to make space.
memmove(headerBuffer.begin() + minStart, headerBuffer.begin() + bufferStart,
bufferEnd - bufferStart);
bufferEnd = bufferEnd - bufferStart + minStart;
bufferStart = minStart;
} else {
// Really out of buffer space. Grow the buffer.
if (type != HeaderType::MESSAGE) {
// Can't grow because we'd invalidate the HTTP headers.
return KJ_EXCEPTION(FAILED, "invalid HTTP chunk size");
}
KJ_REQUIRE(headerBuffer.size() < MAX_BUFFER, "request headers too large");
auto newBuffer = kj::heapArray<char>(headerBuffer.size() * 2);
memcpy(newBuffer.begin(), headerBuffer.begin(), headerBuffer.size());
headerBuffer = kj::mv(newBuffer);
}
}
// How many bytes will we read?
size_t maxBytes = headerBuffer.size() - bufferEnd;
if (type == HeaderType::CHUNK) {
// Roughly limit the amount of data we read to MAX_CHUNK_HEADER_SIZE.
// TODO(perf): This is mainly to avoid copying a lot of body data into our buffer just to
// copy it again when it is read. But maybe the copy would be cheaper than overhead of
// extra event loop turns?
KJ_REQUIRE(bufferEnd - bufferStart <= MAX_CHUNK_HEADER_SIZE, "invalid HTTP chunk size");
maxBytes = kj::min(maxBytes, MAX_CHUNK_HEADER_SIZE);
}
readPromise = inner.read(headerBuffer.begin() + bufferEnd, 1, maxBytes);
}
return readPromise.then([this,type,bufferStart,bufferEnd](size_t amount) mutable
-> kj::Promise<kj::ArrayPtr<char>> {
if (lineBreakBeforeNextHeader) {
// Hackily deal with expected leading line break.
if (bufferEnd == bufferStart && headerBuffer[bufferEnd] == '\r') {
++bufferEnd;
--amount;
}
if (amount > 0 && headerBuffer[bufferEnd] == '\n') {
lineBreakBeforeNextHeader = false;
++bufferEnd;
--amount;
// Cut the leading line break out of the buffer entirely.
bufferStart = bufferEnd;
}
if (amount == 0) {
return readHeader(type, bufferStart, bufferEnd);
}
}
size_t pos = bufferEnd;
size_t newEnd = pos + amount;
for (;;) {
// Search for next newline.
char* nl = reinterpret_cast<char*>(
memchr(headerBuffer.begin() + pos, '\n', newEnd - pos));
if (nl == nullptr) {
// No newline found. Wait for more data.
return readHeader(type, bufferStart, newEnd);
}
// Is this newline which we found the last of the header? For a chunk header, always. For
// a message header, we search for two newlines in a row. We accept either "\r\n" or just
// "\n" as a newline sequence (though the standard requires "\r\n").
if (type == HeaderType::CHUNK ||
(nl - headerBuffer.begin() >= 4 &&
((nl[-1] == '\r' && nl[-2] == '\n') || (nl[-1] == '\n')))) {
// OK, we've got all the data!
size_t endIndex = nl + 1 - headerBuffer.begin();
size_t leftoverStart = endIndex;
// Strip off the last newline from end.
endIndex -= 1 + (nl[-1] == '\r');
if (type == HeaderType::MESSAGE) {
if (headerBuffer.size() - newEnd < MAX_CHUNK_HEADER_SIZE) {
// Ugh, there's not enough space for the secondary await buffer. Grow once more.
auto newBuffer = kj::heapArray<char>(headerBuffer.size() * 2);
memcpy(newBuffer.begin(), headerBuffer.begin(), headerBuffer.size());
headerBuffer = kj::mv(newBuffer);
}
messageHeaderEnd = endIndex;
} else {
// For some reason, HTTP specifies that there will be a line break after each chunk.
lineBreakBeforeNextHeader = true;
}
auto result = headerBuffer.slice(bufferStart, endIndex);
leftover = headerBuffer.slice(leftoverStart, newEnd);
return result;
} else {
pos = nl - headerBuffer.begin() + 1;
}
}
});
}
};
// -----------------------------------------------------------------------------
class HttpEntityBodyReader: public kj::AsyncInputStream {
public:
HttpEntityBodyReader(HttpInputStream& inner): inner(inner) {}
~HttpEntityBodyReader() noexcept(false) {
if (!finished) {
inner.abortRead();
}
}
protected:
HttpInputStream& inner;
void doneReading() {
KJ_REQUIRE(!finished);
finished = true;
inner.finishRead();
}
inline bool alreadyDone() { return finished; }
private:
bool finished = false;
};
class HttpNullEntityReader final: public HttpEntityBodyReader {
// Stream which reads until EOF.
public:
HttpNullEntityReader(HttpInputStream& inner)
: HttpEntityBodyReader(inner) {
doneReading();
}
Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return size_t(0);
}
};
class HttpConnectionCloseEntityReader final: public HttpEntityBodyReader {
// Stream which reads until EOF.
public:
HttpConnectionCloseEntityReader(HttpInputStream& inner)
: HttpEntityBodyReader(inner) {}
Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
if (alreadyDone()) return size_t(0);
return inner.tryRead(buffer, minBytes, maxBytes)
.then([=](size_t amount) {
if (amount < minBytes) {
doneReading();
}
return amount;
});
}
};
class HttpFixedLengthEntityReader final: public HttpEntityBodyReader {
// Stream which reads only up to a fixed length from the underlying stream, then emulates EOF.
public:
HttpFixedLengthEntityReader(HttpInputStream& inner, size_t length)
: HttpEntityBodyReader(inner), length(length) {
if (length == 0) doneReading();
}
Maybe<uint64_t> tryGetLength() override {
return length;
}
Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
if (length == 0) return size_t(0);
return inner.tryRead(buffer, kj::min(minBytes, length), kj::min(maxBytes, length))
.then([=](size_t amount) {
length -= amount;
if (length > 0 && amount < minBytes) {
kj::throwRecoverableException(KJ_EXCEPTION(DISCONNECTED,
"premature EOF in HTTP entity body; did not reach Content-Length"));
} else if (length == 0) {
doneReading();
}
return amount;
});
}
private:
size_t length;
};
class HttpChunkedEntityReader final: public HttpEntityBodyReader {
// Stream which reads a Transfer-Encoding: Chunked stream.
public:
HttpChunkedEntityReader(HttpInputStream& inner)
: HttpEntityBodyReader(inner) {}
Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return tryReadInternal(buffer, minBytes, maxBytes, 0);
}
private:
size_t chunkSize = 0;
Promise<size_t> tryReadInternal(void* buffer, size_t minBytes, size_t maxBytes,
size_t alreadyRead) {
if (alreadyDone()) {
return alreadyRead;
} else if (chunkSize == 0) {
// Read next chunk header.
return inner.readChunkHeader().then([=](uint64_t nextChunkSize) {
if (nextChunkSize == 0) {
doneReading();
}
chunkSize = nextChunkSize;
return tryReadInternal(buffer, minBytes, maxBytes, alreadyRead);
});
} else if (chunkSize < minBytes) {
// Read entire current chunk and continue to next chunk.
return inner.tryRead(buffer, chunkSize, chunkSize)
.then([=](size_t amount) -> kj::Promise<size_t> {
chunkSize -= amount;
if (chunkSize > 0) {
return KJ_EXCEPTION(DISCONNECTED, "premature EOF in HTTP chunk");
}
return tryReadInternal(reinterpret_cast<byte*>(buffer) + amount,
minBytes - amount, maxBytes - amount, alreadyRead + amount);
});
} else {
// Read only part of the current chunk.
return inner.tryRead(buffer, minBytes, kj::min(maxBytes, chunkSize))
.then([=](size_t amount) -> size_t {
chunkSize -= amount;
return alreadyRead + amount;
});
}
}
};
template <char...>
struct FastCaseCmp;
template <char first, char... rest>
struct FastCaseCmp<first, rest...> {
static constexpr bool apply(const char* actual) {
if ('a' <= first && first <= 'z') {
return (*actual | 0x20) == first && FastCaseCmp<rest...>::apply(actual + 1);
} else if ('A' <= first && first <= 'Z') {
return (*actual & ~0x20) == first && FastCaseCmp<rest...>::apply(actual + 1);
} else {
return *actual == first && FastCaseCmp<rest...>::apply(actual + 1);
}
}
};
template <>
struct FastCaseCmp<> {
static constexpr bool apply(const char* actual) {
return *actual == '\0';
}
};
template <char... chars>
constexpr bool fastCaseCmp(const char* actual) {
return FastCaseCmp<chars...>::apply(actual);
}
// Tests
static_assert(fastCaseCmp<'f','O','o','B','1'>("FooB1"), "");
static_assert(!fastCaseCmp<'f','O','o','B','2'>("FooB1"), "");
static_assert(!fastCaseCmp<'n','O','o','B','1'>("FooB1"), "");
static_assert(!fastCaseCmp<'f','O','o','B'>("FooB1"), "");
static_assert(!fastCaseCmp<'f','O','o','B','1','a'>("FooB1"), "");
kj::Own<kj::AsyncInputStream> HttpInputStream::getEntityBody(
RequestOrResponse type, HttpMethod method, uint statusCode,
HttpHeaders::ConnectionHeaders& connectionHeaders) {
if (method == HttpMethod::HEAD ||
(type == REQUEST && method == HttpMethod::GET) ||
(type == RESPONSE && (statusCode == 204 || statusCode == 205 || statusCode == 304))) {
// No body.
return kj::heap<HttpNullEntityReader>(*this);
}
if (connectionHeaders.transferEncoding != nullptr) {
// TODO(someday): Support plugable transfer encodings? Or at least gzip?
// TODO(soon): Support stacked transfer encodings, e.g. "gzip, chunked".
if (fastCaseCmp<'c','h','u','n','k','e','d'>(connectionHeaders.transferEncoding.cStr())) {
return kj::heap<HttpChunkedEntityReader>(*this);
} else {
KJ_FAIL_REQUIRE("unknown transfer encoding") { break; }
}
}
if (connectionHeaders.contentLength != nullptr) {
return kj::heap<HttpFixedLengthEntityReader>(*this,
strtoull(connectionHeaders.contentLength.cStr(), nullptr, 10));
}
if (connectionHeaders.connection != nullptr) {
// TODO(soon): Connection header can actually have multiple tokens... but no one ever uses
// that feature?
if (fastCaseCmp<'c','l','o','s','e'>(connectionHeaders.connection.cStr())) {
return kj::heap<HttpConnectionCloseEntityReader>(*this);
}
}
KJ_FAIL_REQUIRE("don't know how HTTP body is delimited", headers);
return kj::heap<HttpNullEntityReader>(*this);
}
// =======================================================================================
class HttpOutputStream {
public:
HttpOutputStream(AsyncOutputStream& inner): inner(inner) {}
void writeHeaders(String content) {
// Writes some header content and begins a new entity body.
KJ_REQUIRE(!inBody, "previous HTTP message body incomplete; can't write more messages");
inBody = true;
queueWrite(kj::mv(content));
}
void writeBodyData(kj::String content) {
KJ_REQUIRE(inBody) { return; }
queueWrite(kj::mv(content));
}
kj::Promise<void> writeBodyData(const void* buffer, size_t size) {
KJ_REQUIRE(inBody) { return kj::READY_NOW; }
auto fork = writeQueue.then([this,buffer,size]() {
inner.write(buffer, size);
}).fork();
writeQueue = fork.addBranch();
return fork.addBranch();
}
kj::Promise<void> writeBodyData(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) {
KJ_REQUIRE(inBody) { return kj::READY_NOW; }
auto fork = writeQueue.then([this,pieces]() {
inner.write(pieces);
}).fork();
writeQueue = fork.addBranch();
return fork.addBranch();
}
Promise<uint64_t> pumpBodyFrom(AsyncInputStream& input, uint64_t amount) {
KJ_REQUIRE(inBody) { return uint64_t(0); }
auto fork = writeQueue.then([this,&input,amount]() {
return input.pumpTo(inner, amount);
}).fork();
writeQueue = fork.addBranch().ignoreResult();
return fork.addBranch();
}
void finishBody() {
// Called when entire body was written.
KJ_REQUIRE(inBody) { return; }
inBody = false;
}
void abortBody() {
// Called if the application failed to write all expected body bytes.
KJ_REQUIRE(inBody) { return; }
inBody = false;
writeQueue = writeQueue.then([]() -> kj::Promise<void> {
return KJ_EXCEPTION(FAILED,
"previous HTTP message body incomplete; can't write more messages");
});
}
kj::Promise<void> flush() {
auto fork = writeQueue.fork();
writeQueue = fork.addBranch();
return fork.addBranch();
}
private:
AsyncOutputStream& inner;
kj::Promise<void> writeQueue = kj::READY_NOW;
bool inBody = false;
void queueWrite(kj::String content) {
writeQueue = writeQueue.then(kj::mvCapture(content, [this](kj::String&& content) {
auto promise = inner.write(content.begin(), content.size());
return promise.attach(kj::mv(content));
}));
}
};
class HttpNullEntityWriter final: public kj::AsyncOutputStream {
public:
Promise<void> write(const void* buffer, size_t size) override {
return KJ_EXCEPTION(FAILED, "HTTP message has no entity-body; can't write()");
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
return KJ_EXCEPTION(FAILED, "HTTP message has no entity-body; can't write()");
}
};
class HttpDiscardingEntityWriter final: public kj::AsyncOutputStream {
public:
Promise<void> write(const void* buffer, size_t size) override {
return kj::READY_NOW;
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
return kj::READY_NOW;
}
};
class HttpFixedLengthEntityWriter final: public kj::AsyncOutputStream {
public:
HttpFixedLengthEntityWriter(HttpOutputStream& inner, uint64_t length)
: inner(inner), length(length) {}
~HttpFixedLengthEntityWriter() noexcept(false) {
if (length > 0) inner.abortBody();
}
Promise<void> write(const void* buffer, size_t size) override {
KJ_REQUIRE(size <= length, "overwrote Content-Length");
length -= size;
return maybeFinishAfter(inner.writeBodyData(buffer, size));
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
uint64_t size = 0;
for (auto& piece: pieces) size += piece.size();
KJ_REQUIRE(size <= length, "overwrote Content-Length");
length -= size;
return maybeFinishAfter(inner.writeBodyData(pieces));
}
Maybe<Promise<uint64_t>> tryPumpFrom(AsyncInputStream& input, uint64_t amount) override {
KJ_REQUIRE(amount <= length, "overwrote Content-Length");
length -= amount;
return inner.pumpBodyFrom(input, amount).then([this,amount](uint64_t actual) {
// Adjust for bytes not written.
length += amount - actual;
if (length == 0) inner.finishBody();
return actual;
});
}
private:
HttpOutputStream& inner;
uint64_t length;
kj::Promise<void> maybeFinishAfter(kj::Promise<void> promise) {
if (length == 0) {
return promise.then([this]() { inner.finishBody(); });
} else {
return kj::mv(promise);
}
}
};
class HttpChunkedEntityWriter final: public kj::AsyncOutputStream {
public:
HttpChunkedEntityWriter(HttpOutputStream& inner)
: inner(inner) {}
~HttpChunkedEntityWriter() noexcept(false) {
inner.writeBodyData(kj::str("0\r\n\r\n"));
inner.finishBody();
}
Promise<void> write(const void* buffer, size_t size) override {
if (size == 0) return kj::READY_NOW; // can't encode zero-size chunk since it indicates EOF.
auto header = kj::str(kj::hex(size), "\r\n");
auto parts = kj::heapArray<ArrayPtr<const byte>>(3);
parts[0] = header.asBytes();
parts[1] = kj::arrayPtr(reinterpret_cast<const byte*>(buffer), size);
parts[2] = kj::StringPtr("\r\n").asBytes();
auto promise = inner.writeBodyData(parts.asPtr());
return promise.attach(kj::mv(header), kj::mv(parts));
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
uint64_t size = 0;
for (auto& piece: pieces) size += piece.size();
if (size == 0) return kj::READY_NOW; // can't encode zero-size chunk since it indicates EOF.
auto header = kj::str(size, "\r\n");
auto partsBuilder = kj::heapArrayBuilder<ArrayPtr<const byte>>(pieces.size());
partsBuilder.add(header.asBytes());
for (auto& piece: pieces) {
partsBuilder.add(piece);
}
partsBuilder.add(kj::StringPtr("\r\n").asBytes());
auto parts = partsBuilder.finish();
auto promise = inner.writeBodyData(parts.asPtr());
return promise.attach(kj::mv(header), kj::mv(parts));
}
Maybe<Promise<uint64_t>> tryPumpFrom(AsyncInputStream& input, uint64_t amount) override {
KJ_IF_MAYBE(length, input.tryGetLength()) {
// Hey, we know exactly how large the input is, so we can write just one chunk.
inner.writeBodyData(kj::str(*length, "\r\n"));
auto lengthValue = *length;
return inner.pumpBodyFrom(input, *length)
.then([this,lengthValue](size_t actual) {
if (actual < lengthValue) {
inner.abortBody();
KJ_FAIL_REQUIRE(
"value returned by input.tryGetLength() was greater than actual bytes transferred") {
break;
}
}
inner.writeBodyData(kj::str("\r\n"));
return actual;
});
} else {
// Need to use naive read/write loop.
return nullptr;
}
}
private:
HttpOutputStream& inner;
};
// =======================================================================================
class HttpClientImpl: public HttpClient {
public:
HttpClientImpl(HttpHeaderTable& responseHeaderTable, kj::AsyncIoStream& rawStream)
: httpInput(rawStream, responseHeaderTable),
httpOutput(rawStream) {}
Request request(HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::Maybe<uint64_t> expectedBodySize = nullptr) override {
HttpHeaders::ConnectionHeaders connectionHeaders;
kj::String lengthStr;
if (method == HttpMethod::GET || method == HttpMethod::HEAD) {
// No entity-body.
} else KJ_IF_MAYBE(s, expectedBodySize) {
lengthStr = kj::str(*s);
connectionHeaders.contentLength = lengthStr;
} else {
connectionHeaders.transferEncoding = "chunked";
}
httpOutput.writeHeaders(headers.serializeRequest(method, url, connectionHeaders));
kj::Own<kj::AsyncOutputStream> bodyStream;
if (method == HttpMethod::GET || method == HttpMethod::HEAD) {
// No entity-body.
httpOutput.finishBody();
bodyStream = heap<HttpNullEntityWriter>();
} else KJ_IF_MAYBE(s, expectedBodySize) {
bodyStream = heap<HttpFixedLengthEntityWriter>(httpOutput, *s);
} else {
bodyStream = heap<HttpChunkedEntityWriter>(httpOutput);
}
auto responsePromise = httpInput.readResponseHeaders()
.then([this,method](kj::Maybe<HttpHeaders::Response>&& response) -> HttpClient::Response {
KJ_IF_MAYBE(r, response) {
return {
r->statusCode,
r->statusText,
&httpInput.getHeaders(),
httpInput.getEntityBody(HttpInputStream::RESPONSE, method, r->statusCode,
r->connectionHeaders)
};
} else {
KJ_FAIL_REQUIRE("received invalid HTTP response") { break; }
return {};
}
});
return { kj::mv(bodyStream), kj::mv(responsePromise) };
}
private:
HttpInputStream httpInput;
HttpOutputStream httpOutput;
};
} // namespace
kj::Promise<HttpClient::WebSocketResponse> HttpClient::openWebSocket(
kj::StringPtr url, const HttpHeaders& headers, kj::Own<WebSocket> downstream) {
return request(HttpMethod::GET, url, headers, nullptr)
.response.then([](HttpClient::Response&& response) -> WebSocketResponse {
kj::OneOf<kj::Own<kj::AsyncInputStream>, kj::Own<WebSocket>> body;
body.init<kj::Own<kj::AsyncInputStream>>(kj::mv(response.body));
return {
response.statusCode,
response.statusText,
response.headers,
kj::mv(body)
};
});
}
kj::Promise<kj::Own<kj::AsyncIoStream>> HttpClient::connect(kj::String host) {
KJ_UNIMPLEMENTED("CONNECT is not implemented by this HttpClient");
}
kj::Own<HttpClient> newHttpClient(
HttpHeaderTable& responseHeaderTable, kj::AsyncIoStream& stream) {
return kj::heap<HttpClientImpl>(responseHeaderTable, stream);
}
// =======================================================================================
kj::Promise<void> HttpService::openWebSocket(
kj::StringPtr url, const HttpHeaders& headers, WebSocketResponse& response) {
class EmptyStream final: public kj::AsyncInputStream {
public:
Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return size_t(0);
}
};
auto requestBody = heap<EmptyStream>();
auto promise = request(HttpMethod::GET, url, headers, *requestBody, response);
return promise.attach(kj::mv(requestBody));
}
kj::Promise<kj::Own<kj::AsyncIoStream>> HttpService::connect(kj::String host) {
KJ_UNIMPLEMENTED("CONNECT is not implemented by this HttpService");
}
class HttpServer::Connection: private HttpService::Response {
public:
Connection(HttpServer& server, kj::AsyncIoStream& stream)
: server(server),
httpInput(stream, server.requestHeaderTable),
httpOutput(stream) {
++server.connectionCount;
}
Connection(HttpServer& server, kj::Own<kj::AsyncIoStream>&& stream)
: server(server),
httpInput(*stream, server.requestHeaderTable),
httpOutput(*stream),
ownStream(kj::mv(stream)) {
++server.connectionCount;
}
~Connection() noexcept(false) {
if (--server.connectionCount == 0) {
KJ_IF_MAYBE(f, server.zeroConnectionsFulfiller) {
f->get()->fulfill();
}
}
}
kj::Promise<void> loop() {
// If the timeout promise finishes before the headers do, we kill the connection.
auto timeoutPromise = server.timer.afterDelay(server.settings.headerTimeout)
.then([this]() -> kj::Maybe<HttpHeaders::Request> {
timedOut = true;
return nullptr;
});
return httpInput.readRequestHeaders().exclusiveJoin(kj::mv(timeoutPromise))
.then([this](kj::Maybe<HttpHeaders::Request>&& request) -> kj::Promise<void> {
if (timedOut) {
return sendError(408, "Request Timeout", kj::str(
"ERROR: Your client took too long to send HTTP headers."));
}
KJ_IF_MAYBE(req, request) {
currentMethod = req->method;
auto body = httpInput.getEntityBody(
HttpInputStream::REQUEST, req->method, 0, req->connectionHeaders);
auto promise = server.service.request(
req->method, req->url, httpInput.getHeaders(), *body, *this);
return promise.attach(kj::mv(body))
.then([this]() { return httpOutput.flush(); })
.then([this]() -> kj::Promise<void> {
// Response done. Await next request.
if (server.draining) {
// Never mind, drain time.
return kj::READY_NOW;
}
auto timeoutPromise = server.timer.afterDelay(server.settings.pipelineTimeout)
.then([this]() { return false; });
auto awaitPromise = httpInput.awaitNextMessage();
return timeoutPromise.exclusiveJoin(kj::mv(awaitPromise))
.then([this](bool hasMore) -> kj::Promise<void> {
if (hasMore) {
return loop();
} else {
// In this case we assume the client has no more requests, so we simply close the
// connection.
return kj::READY_NOW;
}
});
});
} else {
// Bad request.
return sendError(400, "Bad Request", kj::str(
"ERROR: The headers sent by your client were not valid."));
}
}).catch_([this](kj::Exception&& e) {
// Exception; report 500.
if (e.getType() == kj::Exception::Type::OVERLOADED) {
return sendError(503, "Service Unavailable", kj::str(
"ERROR: The server is temporarily unable to handle your request. Details:\n\n", e));
} else {
return sendError(500, "Internal Server Error", kj::str(
"ERROR: The server threw an exception. Details:\n\n", e));
}
});
}
private:
HttpServer& server;
HttpInputStream httpInput;
HttpOutputStream httpOutput;
kj::Own<kj::AsyncIoStream> ownStream;
kj::Maybe<HttpMethod> currentMethod;
bool timedOut = false;
kj::Own<kj::AsyncOutputStream> send(
uint statusCode, kj::StringPtr statusText, const HttpHeaders& headers,
kj::Maybe<size_t> expectedBodySize) override {
auto method = KJ_REQUIRE_NONNULL(currentMethod, "already called startResponse()");
currentMethod = nullptr;
HttpHeaders::ConnectionHeaders connectionHeaders;
kj::String lengthStr;
if (statusCode == 204 || statusCode == 205 || statusCode == 304) {
// No entity-body.
} else KJ_IF_MAYBE(s, expectedBodySize) {
lengthStr = kj::str(*s);
connectionHeaders.contentLength = lengthStr;
} else {
connectionHeaders.transferEncoding = "chunked";
}
httpOutput.writeHeaders(headers.serializeResponse(statusCode, statusText, connectionHeaders));
kj::Own<kj::AsyncOutputStream> bodyStream;
if (method == HttpMethod::HEAD) {
// Ignore entity-body.
httpOutput.finishBody();
return heap<HttpDiscardingEntityWriter>();
} else if (statusCode == 204 || statusCode == 205 || statusCode == 304) {
// No entity-body.
httpOutput.finishBody();
return heap<HttpNullEntityWriter>();
} else KJ_IF_MAYBE(s, expectedBodySize) {
return heap<HttpFixedLengthEntityWriter>(httpOutput, *s);
} else {
return heap<HttpChunkedEntityWriter>(httpOutput);
}
}
kj::Promise<void> sendError(uint statusCode, kj::StringPtr statusText, kj::String body) {
auto bodySize = kj::str(body.size());
HttpHeaders failed(server.requestHeaderTable);
HttpHeaders::ConnectionHeaders connHeaders;
connHeaders.connection = "close";
connHeaders.contentLength = bodySize;
failed.set(HttpHeaderId::CONTENT_TYPE, "text/plain");
httpOutput.writeHeaders(failed.serializeResponse(statusCode, statusText, connHeaders));
httpOutput.writeBodyData(kj::mv(body));
httpOutput.finishBody();
return httpOutput.flush(); // loop ends after flush
}
};
HttpServer::HttpServer(kj::Timer& timer, HttpHeaderTable& requestHeaderTable, HttpService& service,
Settings settings)
: HttpServer(timer, requestHeaderTable, service, settings,
kj::newPromiseAndFulfiller<void>()) {}
HttpServer::HttpServer(kj::Timer& timer, HttpHeaderTable& requestHeaderTable, HttpService& service,
Settings settings, kj::PromiseFulfillerPair<void> paf)
: timer(timer), requestHeaderTable(requestHeaderTable), service(service), settings(settings),
onDrain(paf.promise.fork()), drainFulfiller(kj::mv(paf.fulfiller)), tasks(*this) {}
kj::Promise<void> HttpServer::drain() {
KJ_REQUIRE(!draining, "you can only call drain() once");
draining = true;
drainFulfiller->fulfill();
if (connectionCount == 0) {
return kj::READY_NOW;
} else {
auto paf = kj::newPromiseAndFulfiller<void>();
zeroConnectionsFulfiller = kj::mv(paf.fulfiller);
return kj::mv(paf.promise);
}
}
kj::Promise<void> HttpServer::listenHttp(kj::ConnectionReceiver& port) {
return listenLoop(port).exclusiveJoin(onDrain.addBranch());
}
kj::Promise<void> HttpServer::listenLoop(kj::ConnectionReceiver& port) {
return port.accept()
.then([this,&port](kj::Own<kj::AsyncIoStream>&& connection) -> kj::Promise<void> {
if (draining) {
// Can get here if we *just* started draining.
return kj::READY_NOW;
}
tasks.add(listenHttp(kj::mv(connection)));
return listenLoop(port);
});
}
kj::Promise<void> HttpServer::listenHttp(kj::Own<kj::AsyncIoStream> connection) {
auto obj = heap<Connection>(*this, kj::mv(connection));
auto promise = obj->loop();
// Eagerly evaluate so that we drop the connection when the promise resolves, even if the caller
// doesn't eagerly evaluate.
return promise.attach(kj::mv(obj)).eagerlyEvaluate(nullptr);
}
void HttpServer::taskFailed(kj::Exception&& exception) {
KJ_LOG(ERROR, "unhandled exception in HTTP server", exception);
}
} // namespace kj
// Copyright (c) 2017 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#ifndef KJ_COMPAT_HTTP_H_
#define KJ_COMPAT_HTTP_H_
// The KJ HTTP client/server library.
//
// This is a simple library which can be used to implement an HTTP client or server. Properties
// of this library include:
// - Uses KJ async framework.
// - Agnostic to transport layer -- you can provide your own.
// - Header parsing is zero-copy -- it results in strings that point directly into the buffer
// received off the wire.
// - Application code which reads and writes headers refers to headers by symbolic names, not by
// string literals, with lookups being array-index-based, not map-based. To make this possible,
// the application announces what headers it cares about in advance, in order to assign numeric
// values to them.
// - Methods are identified by an enum.
#include <kj/string.h>
#include <kj/vector.h>
#include <kj/memory.h>
#include <kj/one-of.h>
#include <kj/async-io.h>
namespace kj {
#define KJ_HTTP_FOR_EACH_METHOD(MACRO) \
MACRO(GET) \
MACRO(HEAD) \
MACRO(POST) \
MACRO(PUT) \
MACRO(DELETE) \
MACRO(PATCH) \
MACRO(PURGE) \
MACRO(OPTIONS) \
MACRO(TRACE) \
/* standard methods */ \
/* */ \
/* (CONNECT is intentionally omitted since it is handled specially in HttpHandler) */ \
\
MACRO(COPY) \
MACRO(LOCK) \
MACRO(MKCOL) \
MACRO(MOVE) \
MACRO(PROPFIND) \
MACRO(PROPPATCH) \
MACRO(SEARCH) \
MACRO(UNLOCK) \
/* WebDAV */ \
\
MACRO(REPORT) \
MACRO(MKACTIVITY) \
MACRO(CHECKOUT) \
MACRO(MERGE) \
/* Subversion */ \
\
MACRO(MSEARCH) \
MACRO(NOTIFY) \
MACRO(SUBSCRIBE) \
MACRO(UNSUBSCRIBE)
/* UPnP */
#define KJ_HTTP_FOR_EACH_CONNECTION_HEADER(MACRO) \
MACRO(connection, "Connection") \
MACRO(contentLength, "Content-Length") \
MACRO(keepAlive, "Keep-Alive") \
MACRO(te, "TE") \
MACRO(trailer, "Trailer") \
MACRO(transferEncoding, "Transfer-Encoding") \
MACRO(upgrade, "Upgrade")
enum class HttpMethod {
// Enum of known HTTP methods.
//
// We use an enum rather than a string to allow for faster parsing and switching and to reduce
// ambiguity.
#define DECLARE_METHOD(id) id,
KJ_HTTP_FOR_EACH_METHOD(DECLARE_METHOD)
#undef DECALRE_METHOD
};
kj::StringPtr KJ_STRINGIFY(HttpMethod method);
kj::Maybe<HttpMethod> tryParseHttpMethod(kj::StringPtr name);
class HttpHeaderTable;
class HttpHeaderId {
// Identifies an HTTP header by numeric ID that indexes into an HttpHeaderTable.
//
// The KJ HTTP API prefers that headers be identified by these IDs for a few reasons:
// - Integer lookups are much more efficient than string lookups.
// - Case-insensitivity is awkward to deal with when const strings are being passed to the lookup
// method.
// - Writing out strings less often means fewer typos.
//
// See HttpHeaderTable for usage hints.
public:
HttpHeaderId() = default;
inline bool operator==(const HttpHeaderId& other) const { return id == other.id; }
inline bool operator!=(const HttpHeaderId& other) const { return id != other.id; }
inline bool operator< (const HttpHeaderId& other) const { return id < other.id; }
inline bool operator> (const HttpHeaderId& other) const { return id > other.id; }
inline bool operator<=(const HttpHeaderId& other) const { return id <= other.id; }
inline bool operator>=(const HttpHeaderId& other) const { return id >= other.id; }
inline size_t hashCode() const { return id; }
kj::StringPtr toString() const;
void requireFrom(HttpHeaderTable& table) const;
// In debug mode, throws an exception if the HttpHeaderId is not from the given table.
//
// In opt mode, no-op.
#define KJ_HTTP_FOR_EACH_BUILTIN_HEADER(MACRO) \
MACRO(HOST, "Host") \
MACRO(DATE, "Date") \
MACRO(LOCATION, "Location") \
MACRO(CONTENT_TYPE, "Content-Type")
// For convenience, these very-common headers are valid for all HttpHeaderTables. You can refer
// to them like:
//
// HttpHeaderId::HOST
//
// TODO(soon): Fill this out with more common headers.
#define DECLARE_HEADER(id, name) \
static const HttpHeaderId id;
// Declare a constant for each builtin header, e.g.: HttpHeaderId::CONNECTION
KJ_HTTP_FOR_EACH_BUILTIN_HEADER(DECLARE_HEADER);
#undef DECLARE_HEADER
private:
HttpHeaderTable* table;
uint id;
inline explicit constexpr HttpHeaderId(HttpHeaderTable* table, uint id): table(table), id(id) {}
friend class HttpHeaderTable;
friend class HttpHeaders;
};
class HttpHeaderTable {
// Construct an HttpHeaderTable to declare which headers you'll be interested in later on, and
// to manufacture IDs for them.
//
// Example:
//
// // Build a header table with the headers we are interested in.
// kj::HttpHeaderTable::Builder builder;
// const HttpHeaderId accept = builder.add("Accept");
// const HttpHeaderId contentType = builder.add("Content-Type");
// kj::HttpHeaderTable table(kj::mv(builder));
//
// // Create an HTTP client.
// auto client = kj::newHttpClient(table, network);
//
// // Get http://example.com.
// HttpHeaders headers(table);
// headers.set(accept, "text/html");
// auto response = client->send(kj::HttpMethod::GET, "http://example.com", headers)
// .wait(waitScope);
// auto msg = kj::str("Response content type: ", response.headers.get(contentType));
struct IdsByNameMap;
public:
HttpHeaderTable();
// Constructs a table that only contains the builtin headers.
class Builder {
public:
Builder();
HttpHeaderId add(kj::StringPtr name);
Own<HttpHeaderTable> build();
private:
kj::Own<HttpHeaderTable> table;
};
KJ_DISALLOW_COPY(HttpHeaderTable); // Can't copy because HttpHeaderId points to the table.
~HttpHeaderTable() noexcept(false);
uint idCount();
// Return the number of IDs in the table.
kj::Maybe<HttpHeaderId> stringToId(kj::StringPtr name);
// Try to find an ID for the given name. The matching is case-insensitive, per the HTTP spec.
//
// Note: if `name` contains characters that aren't allowed in HTTP header names, this may return
// a bogus value rather than null, due to optimizations used in case-insensitive matching.
kj::StringPtr idToString(HttpHeaderId id);
// Get the canonical string name for the given ID.
private:
kj::Vector<kj::StringPtr> namesById;
kj::Own<IdsByNameMap> idsByName;
};
class HttpHeaders {
// Represents a set of HTTP headers.
//
// This class guards against basic HTTP header injection attacks: Trying to set a header name or
// value containing a newline, carriage return, or other invalid character will throw an
// exception.
public:
explicit HttpHeaders(HttpHeaderTable& table);
KJ_DISALLOW_COPY(HttpHeaders);
HttpHeaders(HttpHeaders&&) = default;
HttpHeaders& operator=(HttpHeaders&&) = default;
void clear();
// Clears all contents, as if the object was freshly-allocated. However, calling this rather
// than actually re-allocating the object may avoid re-allocation of internal objects.
HttpHeaders clone() const;
// Creates a deep clone of the HttpHeaders. The returned object owns all strings it references.
HttpHeaders cloneShallow() const;
// Creates a shallow clone of the HttpHeaders. The returned object references the same strings
// as the original, owning none of them.
kj::Maybe<kj::StringPtr> get(HttpHeaderId id) const;
// Read a header.
template <typename Func>
void forEach(Func&& func);
// Calls `func(name, value)` for each header in the set -- including headers that aren't mapped
// to IDs in the header table. Both inputs are of type kj::StringPtr.
void set(HttpHeaderId id, kj::StringPtr value);
// Sets a header value, overwriting the existing value.
//
// WARNING: It is the caller's responsibility to ensure that `value` remains valid until the
// HttpHeaders object is destroyed. This allows string literals to be passed without making a
// copy, but complicates the use of dynamic values. Hint: Consider using `takeOwnership()`.
void add(kj::StringPtr name, kj::StringPtr value);
// Append a header. `name` will be looked up in the header table, but if it's not mapped, the
// header will be added to the list of unmapped headers.
//
// WARNING: It is the caller's responsibility to ensure that `name` and `value` remain valid
// until the HttpHeaders object is destroyed. This allows string literals to be passed without
// making a copy, but complicates the use of dynamic values. Hint: Consider using
// `takeOwnership()`.
void unset(HttpHeaderId id);
// Removes a header.
//
// It's not possible to remove a header by string name because non-indexed headers would take
// O(n) time to remove. Instead, construct a new HttpHeaders object and copy contents.
void takeOwnership(kj::String&& string);
void takeOwnership(kj::Array<char>&& chars);
void takeOwnership(HttpHeaders&& otherHeaders);
// Takes overship of a string so that it lives until the HttpHeaders object is destroyed. Useful
// when you've passed a dynamic value to set() or add() or parse*().
//
// TODO(soon): Is takeOwnership() actually needed?
struct ConnectionHeaders {
// These headers govern details of the specific HTTP connection or framing of the content.
// Hence, they are managed internally within the HTTP library, and never appear in an
// HttpHeaders structure.
#define DECLARE_HEADER(id, name) \
kj::StringPtr id;
KJ_HTTP_FOR_EACH_CONNECTION_HEADER(DECLARE_HEADER)
#undef DECLARE_HEADER
};
struct Request {
HttpMethod method;
kj::StringPtr url;
ConnectionHeaders connectionHeaders;
};
struct Response {
uint statusCode;
kj::StringPtr statusText;
ConnectionHeaders connectionHeaders;
};
kj::Maybe<Request> tryParseRequest(kj::ArrayPtr<char> content);
kj::Maybe<Response> tryParseResponse(kj::ArrayPtr<char> content);
// Parse an HTTP header blob and add all the headers to this object.
//
// `content` should be all text from the start of the request to the first occurrance of two
// newlines in a row -- including the first of these two newlines, but excluding the second.
//
// The parse is performed with zero copies: The callee clobbers `content` with '\0' characters
// to split it into a bunch of shorter strings. The caller must keep `content` valid until the
// `HttpHeaders` is destroyed, or pass it to `takeOwnership()`.
kj::String serializeRequest(HttpMethod method, kj::StringPtr url,
const ConnectionHeaders& connectionHeaders) const;
kj::String serializeResponse(uint statusCode, kj::StringPtr statusText,
const ConnectionHeaders& connectionHeaders) const;
// Serialize the headers as a complete request or response blob. The blob uses '\r\n' newlines
// and includes the double-newline to indicate the end of the headers.
kj::String toString() const;
private:
HttpHeaderTable* table;
kj::Array<kj::StringPtr> indexedHeaders;
// Size is always table->idCount().
struct Header {
kj::StringPtr name;
kj::StringPtr value;
};
kj::Vector<Header> unindexedHeaders;
kj::Vector<kj::Array<char>> ownedStrings;
kj::Maybe<uint> addNoCheck(kj::StringPtr name, kj::StringPtr value);
kj::StringPtr cloneToOwn(kj::StringPtr str);
kj::String serialize(kj::ArrayPtr<const char> word1,
kj::ArrayPtr<const char> word2,
kj::ArrayPtr<const char> word3,
const ConnectionHeaders& connectionHeaders) const;
bool parseHeaders(char* ptr, char* end, ConnectionHeaders& connectionHeaders);
// TODO(perf): Arguably we should store a map, but header sets are never very long
// TODO(perf): We could optimize for common headers by storing them directly as fields. We could
// also add direct accessors for those headers.
};
class WebSocket {
public:
WebSocket(kj::Own<kj::AsyncIoStream> stream);
// Create a WebSocket wrapping the given I/O stream.
kj::Promise<void> send(kj::ArrayPtr<const byte> message);
kj::Promise<void> send(kj::ArrayPtr<const char> message);
};
class HttpClient {
// Interface to the client end of an HTTP connection.
//
// There are two kinds of clients:
// * Host clients are used when talking to a specific host. The `url` specified in a request
// is actually just a path. (A `Host` header is still required in all requests.)
// * Proxy clients are used when the target could be any arbitrary host on the internet.
// The `url` specified in a request is a full URL including protocol and hostname.
public:
struct Response {
uint statusCode;
kj::StringPtr statusText;
const HttpHeaders* headers; // pointer valid until `body` is dropped
kj::Own<kj::AsyncInputStream> body;
};
struct Request {
kj::Own<kj::AsyncOutputStream> body;
// Write the request entity body to this stream, then drop it when done.
//
// May be null for GET and HEAD requests (which have no body) and requests that have
// Content-Length: 0.
kj::Promise<Response> response;
// Promise for the eventual respnose.
};
virtual Request request(HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::Maybe<uint64_t> expectedBodySize = nullptr) = 0;
// Perform an HTTP request.
//
// `url` may be a full URL (with protocol and host) or it may be only the path part of the URL,
// depending on whether the client is a proxy client or a host client.
//
// `url` and `headers` must remain vaild at least until the `response` promise is received.
//
// `expectedBodySize`, if provided, must be exactly the number of bytes that will be written to
// the body. This will trigger use of the `Content-Length` connection header. Otherwise,
// `Transfer-Encoding: chunked` will be used.
struct WebSocketResponse {
uint statusCode;
kj::StringPtr statusText;
const HttpHeaders* headers;
kj::OneOf<kj::Own<kj::AsyncInputStream>, kj::Own<WebSocket>> upstreamOrBody;
// Once you start reading from `body` or drop it, strings and objects pointed to by the other
// members may be invalidated.
};
virtual kj::Promise<WebSocketResponse> openWebSocket(
kj::StringPtr url, const HttpHeaders& headers, kj::Own<WebSocket> downstream);
// Tries to open a WebSocket. Default implementation calls send() and never returns a WebSocket.
//
// `url` and `headers` are invalidated when the returned promise resolves.
virtual kj::Promise<kj::Own<kj::AsyncIoStream>> connect(kj::String host);
// Handles CONNECT requests. Only relevant for proxy clients. Default implementation throws
// UNIMPLEMENTED.
};
class HttpService {
// Interface which HTTP services should implement.
//
// This interface is functionally equivalent to HttpClient, but is intended for applications to
// implement rather than call. The ergonomics and performance of the method signatures are
// optimized for the serving end.
//
// As with clients, there are two kinds of services:
// * Host services are used when talking to a specific host. The `url` specified in a request
// is actually just a path. (A `Host` header is still required in all requests, and the service
// may in fact serve multiple origins via this header.)
// * Proxy services are used when the target could be any arbitrary host on the internet, i.e. to
// implement an HTTP proxy. The `url` specified in a request is a full URL including protocol
// and hostname.
public:
class Response {
public:
virtual kj::Own<kj::AsyncOutputStream> send(
uint statusCode, kj::StringPtr statusText, const HttpHeaders& headers,
kj::Maybe<size_t> expectedBodySize = nullptr) = 0;
};
virtual kj::Promise<void> request(
HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::AsyncInputStream& requestBody, Response& response) = 0;
// Perform an HTTP request.
//
// `url` may be a full URL (with protocol and host) or it may be only the path part of the URL,
// depending on whether the service is a proxy service or a host service.
//
// `url` and `headers` are invalidated on the first read from `requestBody` or when the returned
// promise resolves, whichever comes first.
class WebSocketResponse: public Response {
public:
kj::Own<WebSocket> startWebSocket(
uint statusCode, kj::StringPtr statusText, const HttpHeaders& headers,
WebSocket& upstream);
};
virtual kj::Promise<void> openWebSocket(
kj::StringPtr url, const HttpHeaders& headers, WebSocketResponse& response);
// Tries to open a WebSocket. Default implementation calls request() and never returns a
// WebSocket.
//
// `url` and `headers` are invalidated when the returned promise resolves.
virtual kj::Promise<kj::Own<kj::AsyncIoStream>> connect(kj::String host);
// Handles CONNECT requests. Only relevant for proxy services. Default implementation throws
// UNIMPLEMENTED.
};
kj::Own<HttpClient> newHttpClient(HttpHeaderTable& responseHeaderTable, kj::Network& network,
kj::Maybe<kj::Network&> tlsNetwork = nullptr);
// Creates a proxy HttpClient that connects to hosts over the given network.
//
// `responseHeaderTable` is used when parsing HTTP responses. Requests can use any header table.
//
// `tlsNetwork` is required to support HTTPS destination URLs. Otherwise, only HTTP URLs can be
// fetched.
kj::Own<HttpClient> newHttpClient(HttpHeaderTable& responseHeaderTable, kj::AsyncIoStream& stream);
// Creates an HttpClient that speaks over the given pre-established connection. The client may
// be used as a proxy client or a host client depending on whether the peer is operating as
// a proxy.
//
// Note that since this client has only one stream to work with, it will try to pipeline all
// requests on this stream. If one request or response has an I/O failure, all subsequent requests
// fail as well. If the destination server chooses to close the connection after a response,
// subsequent requests will fail. If a response takes a long time, it blocks subsequent responses.
// If a WebSocket is opened successfully, all subsequent requests fail.
kj::Own<HttpClient> newHttpClient(HttpService& service);
kj::Own<HttpService> newHttpService(HttpClient& client);
// Adapts an HttpClient to an HttpService and vice versa.
struct HttpServerSettings {
kj::Duration headerTimeout = 15 * kj::SECONDS;
// After initial connection open, or after receiving the first byte of a pipelined request,
// the client must send the complete request within this time.
kj::Duration pipelineTimeout = 5 * kj::SECONDS;
// After one request/response completes, we'll wait up to this long for a pipelined request to
// arrive.
};
class HttpServer: private kj::TaskSet::ErrorHandler {
// Class which listens for requests on ports or connections and sends them to an HttpService.
public:
typedef HttpServerSettings Settings;
HttpServer(kj::Timer& timer, HttpHeaderTable& requestHeaderTable, HttpService& service,
Settings settings = Settings());
// Set up an HttpServer that directs incoming connections to the given service. The service
// may be a host service or a proxy service depending on whether you are intending to implement
// an HTTP server or an HTTP proxy.
kj::Promise<void> drain();
// Stop accepting new connections or new requests on existing connections. Finish any requests
// that are already executing, then close the connections. Returns once no more requests are
// in-flight.
kj::Promise<void> listenHttp(kj::ConnectionReceiver& port);
// Accepts HTTP connections on the given port and directs them to the handler.
//
// The returned promise never completes normally. It may throw if port.accept() throws. Dropping
// the returned promise will cause the server to stop listening on the port, but already-open
// connections will continue to be served. Destroy the whole HttpServer to cancel all I/O.
kj::Promise<void> listenHttp(kj::Own<kj::AsyncIoStream> connection);
// Reads HTTP requests from the given connection and directs them to the handler. A successful
// completion of the promise indicates that all requests received on the connection resulted in
// a complete response, and the client closed the connection gracefully or drain() was called.
// The promise throws if an unparseable request is received or if some I/O error occurs. Dropping
// the returned promise will cancel all I/O on the connection and cancel any in-flight requests.
private:
class Connection;
kj::Timer& timer;
HttpHeaderTable& requestHeaderTable;
HttpService& service;
Settings settings;
bool draining = false;
kj::ForkedPromise<void> onDrain;
kj::Own<kj::PromiseFulfiller<void>> drainFulfiller;
uint connectionCount = 0;
kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> zeroConnectionsFulfiller;
kj::TaskSet tasks;
HttpServer(kj::Timer& timer, HttpHeaderTable& requestHeaderTable, HttpService& service,
Settings settings, kj::PromiseFulfillerPair<void> paf);
kj::Promise<void> listenLoop(kj::ConnectionReceiver& port);
void taskFailed(kj::Exception&& exception) override;
};
// =======================================================================================
// inline implementation
inline void HttpHeaderId::requireFrom(HttpHeaderTable& table) const {
KJ_IREQUIRE(this->table == nullptr || this->table == &table,
"the provided HttpHeaderId is from the wrong HttpHeaderTable");
}
inline kj::Own<HttpHeaderTable> HttpHeaderTable::Builder::build() { return kj::mv(table); }
inline uint HttpHeaderTable::idCount() { return namesById.size(); }
inline kj::StringPtr HttpHeaderTable::idToString(HttpHeaderId id) {
id.requireFrom(*this);
return namesById[id.id];
}
inline kj::Maybe<kj::StringPtr> HttpHeaders::get(HttpHeaderId id) const {
id.requireFrom(*table);
auto result = indexedHeaders[id.id];
return result == nullptr ? kj::Maybe<kj::StringPtr>(nullptr) : result;
}
inline void HttpHeaders::unset(HttpHeaderId id) {
id.requireFrom(*table);
indexedHeaders[id.id] = nullptr;
}
template <typename Func>
inline void HttpHeaders::forEach(Func&& func) {
for (auto i: kj::indices(indexedHeaders)) {
if (indexedHeaders[i] != nullptr) {
func(table->idToString(HttpHeaderId(table, i)), indexedHeaders[i]);
}
}
for (auto& header: unindexedHeaders) {
func(header.name, header.value);
}
}
} // namespace kj
#endif // KJ_COMPAT_HTTP_H_
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment