Commit 513cd4e8 authored by Kenton Varda's avatar Kenton Varda

Make HttpInputStream reusable.

Some protocols, like Visual Studio Code's Language Server Protocol, have made the unfortunate decision to use HTTP-style message envelope even though they are not HTTP protocols. LSP, for example, sends each JSON-RPC messages as a Content-Length header followed by two CRLF followed by a JSON message of that length. I didn't want to rewrite HTTP parsing, so I extended the HTTP library to make this functionality reusable.
parent 9f31cd49
......@@ -1366,6 +1366,141 @@ KJ_TEST("HttpClient <-> HttpServer") {
// -----------------------------------------------------------------------------
KJ_TEST("HttpInputStream requests") {
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::HttpHeaderTable table;
auto pipe = kj::newOneWayPipe();
auto input = newHttpInputStream(*pipe.in, table);
kj::Promise<void> writeQueue = kj::READY_NOW;
for (auto& testCase: requestTestCases()) {
writeQueue = writeQueue.then([&]() {
return pipe.out->write(testCase.raw.begin(), testCase.raw.size());
});
}
writeQueue = writeQueue.then([&]() {
pipe.out = nullptr;
});
for (auto& testCase: requestTestCases()) {
KJ_CONTEXT(testCase.raw);
KJ_ASSERT(input->awaitNextMessage().wait(waitScope));
auto req = input->readRequest().wait(waitScope);
KJ_EXPECT(req.method == testCase.method);
KJ_EXPECT(req.url == testCase.path);
for (auto& header: testCase.requestHeaders) {
KJ_EXPECT(KJ_ASSERT_NONNULL(req.headers.get(header.id)) == header.value);
}
auto body = req.body->readAllText().wait(waitScope);
KJ_EXPECT(body == kj::strArray(testCase.requestBodyParts, ""));
}
writeQueue.wait(waitScope);
KJ_EXPECT(!input->awaitNextMessage().wait(waitScope));
}
KJ_TEST("HttpInputStream responses") {
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::HttpHeaderTable table;
auto pipe = kj::newOneWayPipe();
auto input = newHttpInputStream(*pipe.in, table);
kj::Promise<void> writeQueue = kj::READY_NOW;
for (auto& testCase: responseTestCases()) {
if (testCase.side == CLIENT_ONLY) continue; // skip Connection: close case.
writeQueue = writeQueue.then([&]() {
return pipe.out->write(testCase.raw.begin(), testCase.raw.size());
});
}
writeQueue = writeQueue.then([&]() {
pipe.out = nullptr;
});
for (auto& testCase: responseTestCases()) {
if (testCase.side == CLIENT_ONLY) continue; // skip Connection: close case.
KJ_CONTEXT(testCase.raw);
KJ_ASSERT(input->awaitNextMessage().wait(waitScope));
auto resp = input->readResponse(testCase.method).wait(waitScope);
KJ_EXPECT(resp.statusCode == testCase.statusCode);
KJ_EXPECT(resp.statusText == testCase.statusText);
for (auto& header: testCase.responseHeaders) {
KJ_EXPECT(KJ_ASSERT_NONNULL(resp.headers.get(header.id)) == header.value);
}
auto body = resp.body->readAllText().wait(waitScope);
KJ_EXPECT(body == kj::strArray(testCase.responseBodyParts, ""));
}
writeQueue.wait(waitScope);
KJ_EXPECT(!input->awaitNextMessage().wait(waitScope));
}
KJ_TEST("HttpInputStream bare messages") {
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::HttpHeaderTable table;
auto pipe = kj::newOneWayPipe();
auto input = newHttpInputStream(*pipe.in, table);
kj::StringPtr messages =
"Content-Length: 6\r\n"
"\r\n"
"foobar"
"Content-Length: 11\r\n"
"Content-Type: some/type\r\n"
"\r\n"
"bazquxcorge"
"Transfer-Encoding: chunked\r\n"
"\r\n"
"6\r\n"
"grault\r\n"
"b\r\n"
"garplywaldo\r\n"
"0\r\n"
"\r\n"_kj;
kj::Promise<void> writeTask = pipe.out->write(messages.begin(), messages.size())
.then([&]() { pipe.out = nullptr; });
{
KJ_ASSERT(input->awaitNextMessage().wait(waitScope));
auto message = input->readMessage().wait(waitScope);
KJ_EXPECT(KJ_ASSERT_NONNULL(message.headers.get(HttpHeaderId::CONTENT_LENGTH)) == "6");
KJ_EXPECT(message.body->readAllText().wait(waitScope) == "foobar");
}
{
KJ_ASSERT(input->awaitNextMessage().wait(waitScope));
auto message = input->readMessage().wait(waitScope);
KJ_EXPECT(KJ_ASSERT_NONNULL(message.headers.get(HttpHeaderId::CONTENT_LENGTH)) == "11");
KJ_EXPECT(KJ_ASSERT_NONNULL(message.headers.get(HttpHeaderId::CONTENT_TYPE)) == "some/type");
KJ_EXPECT(message.body->readAllText().wait(waitScope) == "bazquxcorge");
}
{
KJ_ASSERT(input->awaitNextMessage().wait(waitScope));
auto message = input->readMessage().wait(waitScope);
KJ_EXPECT(KJ_ASSERT_NONNULL(message.headers.get(HttpHeaderId::TRANSFER_ENCODING)) == "chunked");
KJ_EXPECT(message.body->readAllText().wait(waitScope) == "graultgarplywaldo");
}
writeTask.wait(waitScope);
KJ_EXPECT(!input->awaitNextMessage().wait(waitScope));
}
// -----------------------------------------------------------------------------
KJ_TEST("WebSocket core protocol") {
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
......
......@@ -903,6 +903,14 @@ kj::Maybe<HttpHeaders::Response> HttpHeaders::tryParseResponse(kj::ArrayPtr<char
return response;
}
bool HttpHeaders::tryParse(kj::ArrayPtr<char> content) {
char* end = trimHeaderEnding(content);
if (end == nullptr) return false;
char* ptr = content.begin();
return parseHeaders(ptr, end);
}
bool HttpHeaders::parseHeaders(char* ptr, char* end) {
while (*ptr != '\0') {
KJ_IF_MAYBE(name, consumeHeaderName(ptr)) {
......@@ -988,9 +996,9 @@ static constexpr size_t MIN_BUFFER = 4096;
static constexpr size_t MAX_BUFFER = 65536;
static constexpr size_t MAX_CHUNK_HEADER_SIZE = 32;
class HttpInputStream {
class HttpInputStreamImpl final: public HttpInputStream {
public:
explicit HttpInputStream(AsyncIoStream& inner, HttpHeaderTable& table)
explicit HttpInputStreamImpl(AsyncInputStream& inner, HttpHeaderTable& table)
: inner(inner), headerBuffer(kj::heapArray<char>(MIN_BUFFER)), headers(table) {
}
......@@ -998,6 +1006,41 @@ public:
return !broken && pendingMessageCount == 0;
}
// ---------------------------------------------------------------------------
// public interface
kj::Promise<Request> readRequest() override {
return readRequestHeaders()
.then([this](kj::Maybe<HttpHeaders::Request>&& maybeRequest) -> HttpInputStream::Request {
auto request = KJ_REQUIRE_NONNULL(maybeRequest, "bad request");
auto body = getEntityBody(HttpInputStreamImpl::REQUEST, request.method, 0, headers);
return { request.method, request.url, headers, kj::mv(body) };
});
}
kj::Promise<Response> readResponse(HttpMethod requestMethod) override {
return readResponseHeaders()
.then([this,requestMethod](kj::Maybe<HttpHeaders::Response>&& maybeResponse)
-> HttpInputStream::Response {
auto response = KJ_REQUIRE_NONNULL(maybeResponse, "bad response");
auto body = getEntityBody(HttpInputStreamImpl::RESPONSE, requestMethod, 0, headers);
return { response.statusCode, response.statusText, headers, kj::mv(body) };
});
}
kj::Promise<Message> readMessage() override {
return readMessageHeaders()
.then([this](kj::ArrayPtr<char> text) -> HttpInputStream::Message {
headers.clear();
KJ_REQUIRE(headers.tryParse(text), "bad message");
auto body = getEntityBody(HttpInputStreamImpl::RESPONSE, HttpMethod::GET, 0, headers);
return { headers, kj::mv(body) };
});
}
// ---------------------------------------------------------------------------
// Stream locking: While an entity-body is being read, the body stream "locks" the underlying
// HTTP stream. Once the entity-body is complete, we can read the next pipelined message.
......@@ -1022,7 +1065,7 @@ public:
// ---------------------------------------------------------------------------
kj::Promise<bool> awaitNextMessage() {
kj::Promise<bool> awaitNextMessage() override {
// Waits until more data is available, but doesn't consume it. Returns false on EOF.
//
// Used on the server after a request is handled, to check for pipelined requests.
......@@ -1172,7 +1215,7 @@ public:
}
private:
AsyncIoStream& inner;
AsyncInputStream& inner;
kj::Array<char> headerBuffer;
size_t messageHeaderEnd = 0;
......@@ -1367,7 +1410,7 @@ private:
class HttpEntityBodyReader: public kj::AsyncInputStream {
public:
HttpEntityBodyReader(HttpInputStream& inner): inner(inner) {}
HttpEntityBodyReader(HttpInputStreamImpl& inner): inner(inner) {}
~HttpEntityBodyReader() noexcept(false) {
if (!finished) {
inner.abortRead();
......@@ -1375,7 +1418,7 @@ public:
}
protected:
HttpInputStream& inner;
HttpInputStreamImpl& inner;
void doneReading() {
KJ_REQUIRE(!finished);
......@@ -1394,7 +1437,7 @@ class HttpNullEntityReader final: public HttpEntityBodyReader {
// may indicate non-zero in the special case of a response to a HEAD request.
public:
HttpNullEntityReader(HttpInputStream& inner, kj::Maybe<uint64_t> length)
HttpNullEntityReader(HttpInputStreamImpl& inner, kj::Maybe<uint64_t> length)
: HttpEntityBodyReader(inner), length(length) {
// `length` is what to return from tryGetLength(). For a response to a HEAD request, this may
// be non-zero.
......@@ -1417,7 +1460,7 @@ class HttpConnectionCloseEntityReader final: public HttpEntityBodyReader {
// Stream which reads until EOF.
public:
HttpConnectionCloseEntityReader(HttpInputStream& inner)
HttpConnectionCloseEntityReader(HttpInputStreamImpl& inner)
: HttpEntityBodyReader(inner) {}
Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
......@@ -1437,7 +1480,7 @@ class HttpFixedLengthEntityReader final: public HttpEntityBodyReader {
// Stream which reads only up to a fixed length from the underlying stream, then emulates EOF.
public:
HttpFixedLengthEntityReader(HttpInputStream& inner, size_t length)
HttpFixedLengthEntityReader(HttpInputStreamImpl& inner, size_t length)
: HttpEntityBodyReader(inner), length(length) {
if (length == 0) doneReading();
}
......@@ -1470,7 +1513,7 @@ class HttpChunkedEntityReader final: public HttpEntityBodyReader {
// Stream which reads a Transfer-Encoding: Chunked stream.
public:
HttpChunkedEntityReader(HttpInputStream& inner)
HttpChunkedEntityReader(HttpInputStreamImpl& inner)
: HttpEntityBodyReader(inner) {}
Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
......@@ -1551,7 +1594,7 @@ static_assert(!fastCaseCmp<'n','O','o','B','1'>("FooB1"), "");
static_assert(!fastCaseCmp<'f','O','o','B'>("FooB1"), "");
static_assert(!fastCaseCmp<'f','O','o','B','1','a'>("FooB1"), "");
kj::Own<kj::AsyncInputStream> HttpInputStream::getEntityBody(
kj::Own<kj::AsyncInputStream> HttpInputStreamImpl::getEntityBody(
RequestOrResponse type, HttpMethod method, uint statusCode,
const kj::HttpHeaders& headers) {
if (type == RESPONSE) {
......@@ -1599,8 +1642,16 @@ kj::Own<kj::AsyncInputStream> HttpInputStream::getEntityBody(
return kj::heap<HttpNullEntityReader>(*this, uint64_t(0));
}
} // namespace
kj::Own<HttpInputStream> newHttpInputStream(kj::AsyncInputStream& input, HttpHeaderTable& table) {
return kj::heap<HttpInputStreamImpl>(input, table);
}
// =======================================================================================
namespace {
class HttpOutputStream {
public:
HttpOutputStream(AsyncOutputStream& inner): inner(inner) {}
......@@ -2397,7 +2448,7 @@ private:
};
kj::Own<WebSocket> upgradeToWebSocket(
kj::Own<kj::AsyncIoStream> stream, HttpInputStream& httpInput, HttpOutputStream& httpOutput,
kj::Own<kj::AsyncIoStream> stream, HttpInputStreamImpl& httpInput, HttpOutputStream& httpOutput,
kj::Maybe<EntropySource&> maskKeyGenerator) {
// Create a WebSocket upgraded from an HTTP stream.
auto releasedBuffer = httpInput.releaseBuffer();
......@@ -3064,7 +3115,7 @@ public:
r->statusCode,
r->statusText,
&headers,
httpInput.getEntityBody(HttpInputStream::RESPONSE, method, r->statusCode, headers)
httpInput.getEntityBody(HttpInputStreamImpl::RESPONSE, method, r->statusCode, headers)
};
if (fastCaseCmp<'c', 'l', 'o', 's', 'e'>(
......@@ -3156,7 +3207,7 @@ public:
r->statusCode,
r->statusText,
&headers,
httpInput.getEntityBody(HttpInputStream::RESPONSE, HttpMethod::GET, r->statusCode,
httpInput.getEntityBody(HttpInputStreamImpl::RESPONSE, HttpMethod::GET, r->statusCode,
headers)
};
if (fastCaseCmp<'c', 'l', 'o', 's', 'e'>(
......@@ -3178,7 +3229,7 @@ public:
}
private:
HttpInputStream httpInput;
HttpInputStreamImpl httpInput;
HttpOutputStream httpOutput;
kj::Own<AsyncIoStream> ownStream;
HttpClientSettings settings;
......@@ -4159,7 +4210,7 @@ public:
currentMethod = req->method;
auto body = httpInput.getEntityBody(
HttpInputStream::REQUEST, req->method, 0, headers);
HttpInputStreamImpl::REQUEST, req->method, 0, headers);
// TODO(perf): If the client disconnects, should we cancel the response? Probably, to
// prevent permanent deadlock. It's slightly weird in that arguably the client should
......@@ -4312,7 +4363,7 @@ private:
HttpServer& server;
kj::AsyncIoStream& stream;
HttpService& service;
HttpInputStream httpInput;
HttpInputStreamImpl httpInput;
HttpOutputStream httpOutput;
kj::Maybe<HttpMethod> currentMethod;
bool timedOut = false;
......
......@@ -332,6 +332,9 @@ public:
// to split it into a bunch of shorter strings. The caller must keep `content` valid until the
// `HttpHeaders` is destroyed, or pass it to `takeOwnership()`.
bool tryParse(kj::ArrayPtr<char> content);
// Like tryParseRequest()/tryParseResponse(), but don't expect any request/response line.
kj::String serializeRequest(HttpMethod method, kj::StringPtr url,
kj::ArrayPtr<const kj::StringPtr> connectionHeaders = nullptr) const;
kj::String serializeResponse(uint statusCode, kj::StringPtr statusText,
......@@ -396,6 +399,58 @@ private:
// also add direct accessors for those headers.
};
class HttpInputStream {
// Low-level interface to receive HTTP-formatted messages (headers followed by body) from an
// input stream, without a paired output stream.
//
// Most applications will not use this. Regular HTTP clients and servers don't need this. This
// is mainly useful for apps implementing various protocols that look like HTTP but aren't
// really.
public:
struct Request {
HttpMethod method;
kj::StringPtr url;
const HttpHeaders& headers;
kj::Own<kj::AsyncInputStream> body;
};
virtual kj::Promise<Request> readRequest() = 0;
// Reads one HTTP request from the input stream.
//
// The returned struct contains pointers directly into a buffer that is invalidated on the next
// message read.
struct Response {
uint statusCode;
kj::StringPtr statusText;
const HttpHeaders& headers;
kj::Own<kj::AsyncInputStream> body;
};
virtual kj::Promise<Response> readResponse(HttpMethod requestMethod) = 0;
// Reads one HTTP response from the input stream.
//
// You must provide the request method because responses to HEAD requests require special
// treatment.
//
// The returned struct contains pointers directly into a buffer that is invalidated on the next
// message read.
struct Message {
const HttpHeaders& headers;
kj::Own<kj::AsyncInputStream> body;
};
virtual kj::Promise<Message> readMessage() = 0;
// Reads an HTTP header set followed by a body, with no request or response line. This is not
// useful for HTTP but may be useful for other protocols that make the unfortunate choice to
// mimic HTTP message format, such as Visual Studio Code's JSON-RPC transport.
//
// The returned struct contains pointers directly into a buffer that is invalidated on the next
// message read.
virtual kj::Promise<bool> awaitNextMessage() = 0;
// Waits until more data is available, but doesn't consume it. Returns false on EOF.
};
class EntropySource {
// Interface for an object that generates entropy. Typically, cryptographically-random entropy
// is expected.
......@@ -641,6 +696,16 @@ kj::Own<HttpClient> newHttpClient(HttpService& service);
kj::Own<HttpService> newHttpService(HttpClient& client);
// Adapts an HttpClient to an HttpService and vice versa.
kj::Own<HttpInputStream> newHttpInputStream(
kj::AsyncInputStream& input, HttpHeaderTable& headerTable);
// Create an HttpInputStream on top of the given stream. Normally applications would not call this
// directly, but it can be useful for implementing protocols that aren't quite HTTP but use similar
// message delimiting.
//
// The HttpInputStream implementation does read-ahead buffering on `input`. Therefore, when the
// HttpInputStream is destroyed, some data read from `input` may be lost, so it's not possible to
// continue reading from `input` in a reliable way.
kj::Own<WebSocket> newWebSocket(kj::Own<kj::AsyncIoStream> stream,
kj::Maybe<EntropySource&> maskEntropySource);
// Create a new WebSocket on top of the given stream. It is assumed that the HTTP -> WebSocket
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment