Unverified Commit 7260fe75 authored by Kenton Varda's avatar Kenton Varda Committed by GitHub

Merge pull request #748 from capnproto/json-rpc

Implement JSON-RPC adapter to Cap'n Proto interfaces.
parents 9f31cd49 5f422f65
// Copyright (c) 2018 Kenton Varda and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#include "json-rpc.h"
#include <kj/test.h>
#include <capnp/test-util.h>
namespace capnp {
namespace _ { // private
namespace {
KJ_TEST("json-rpc basics") {
auto io = kj::setupAsyncIo();
auto pipe = kj::newTwoWayPipe();
JsonRpc::ContentLengthTransport clientTransport(*pipe.ends[0]);
JsonRpc::ContentLengthTransport serverTransport(*pipe.ends[1]);
int callCount = 0;
JsonRpc client(clientTransport);
JsonRpc server(serverTransport, toDynamic(kj::heap<TestInterfaceImpl>(callCount)));
auto cap = client.getPeer<test::TestInterface>();
auto req = cap.fooRequest();
req.setI(123);
req.setJ(true);
auto resp = req.send().wait(io.waitScope);
KJ_EXPECT(resp.getX() == "foo");
KJ_EXPECT(callCount == 1);
}
KJ_TEST("json-rpc error") {
auto io = kj::setupAsyncIo();
auto pipe = kj::newTwoWayPipe();
JsonRpc::ContentLengthTransport clientTransport(*pipe.ends[0]);
JsonRpc::ContentLengthTransport serverTransport(*pipe.ends[1]);
int callCount = 0;
JsonRpc client(clientTransport);
JsonRpc server(serverTransport, toDynamic(kj::heap<TestInterfaceImpl>(callCount)));
auto cap = client.getPeer<test::TestInterface>();
KJ_EXPECT_THROW_MESSAGE("Method not implemented", cap.barRequest().send().wait(io.waitScope));
}
KJ_TEST("json-rpc multiple calls") {
auto io = kj::setupAsyncIo();
auto pipe = kj::newTwoWayPipe();
JsonRpc::ContentLengthTransport clientTransport(*pipe.ends[0]);
JsonRpc::ContentLengthTransport serverTransport(*pipe.ends[1]);
int callCount = 0;
JsonRpc client(clientTransport);
JsonRpc server(serverTransport, toDynamic(kj::heap<TestInterfaceImpl>(callCount)));
auto cap = client.getPeer<test::TestInterface>();
auto req1 = cap.fooRequest();
req1.setI(123);
req1.setJ(true);
auto promise1 = req1.send();
auto req2 = cap.bazRequest();
initTestMessage(req2.initS());
auto promise2 = req2.send();
auto resp1 = promise1.wait(io.waitScope);
KJ_EXPECT(resp1.getX() == "foo");
auto resp2 = promise2.wait(io.waitScope);
KJ_EXPECT(callCount == 2);
}
} // namespace
} // namespace _ (private)
} // namespace capnp
This diff is collapsed.
@0xd04299800d6725ba;
$import "/capnp/c++.capnp".namespace("capnp::json");
using Json = import "json.capnp";
struct RpcMessage {
jsonrpc @0 :Text;
# Must always be "2.0".
id @1 :Json.Value;
# Correlates a request to a response. Technically must be a string or number. Our implementation
# will always use a number for calls it initiates, and will reflect IDs of any type for calls
# it receives.
#
# May be omitted when caller doesn't care about the response. The implementation will omit `id`
# and return immediately when calling methods with the annotation `@notification` (defined in
# `json.capnp`). The `@notification` annotation only matters for outgoing calls; for incoming
# calls, it's the client's decision whether it wants to receive the response.
method @2 :Text;
# Method name. Only expected when `params` is sent.
union {
none @3 :Void $Json.name("!missing params, result, or error");
# Dummy default value of union, to detect when none of the fields below were received.
params @4 :Json.Value;
# Initiates a call.
result @5 :Json.Value;
# Completes a call.
error @6 :Error;
# Completes a call throwing an exception.
}
struct Error {
code @0 :Int32;
message @1 :Text;
data @2 :Json.Value;
}
}
// Copyright (c) 2018 Kenton Varda and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#pragma once
#include "json.h"
#include <kj/async-io.h>
#include <capnp/capability.h>
#include <kj/map.h>
namespace kj { class HttpInputStream; }
namespace capnp {
class JsonRpc: private kj::TaskSet::ErrorHandler {
// An implementation of JSON-RPC 2.0: https://www.jsonrpc.org/specification
//
// This allows you to use Cap'n Proto interface declarations to implement JSON-RPC protocols.
// Of course, JSON-RPC does not support capabilities. So, the client and server each expose
// exactly one object to the other.
public:
class Transport;
class ContentLengthTransport;
JsonRpc(Transport& transport, DynamicCapability::Client interface = {});
KJ_DISALLOW_COPY(JsonRpc);
DynamicCapability::Client getPeer(InterfaceSchema schema);
template <typename T>
typename T::Client getPeer() {
return getPeer(Schema::from<T>()).template castAs<T>();
}
kj::Promise<void> onError() { return errorPromise.addBranch(); }
private:
JsonCodec codec;
Transport& transport;
DynamicCapability::Client interface;
kj::HashMap<kj::StringPtr, InterfaceSchema::Method> methodMap;
uint callCount = 0;
kj::Promise<void> writeQueue = kj::READY_NOW;
kj::ForkedPromise<void> errorPromise;
kj::Own<kj::PromiseFulfiller<void>> errorFulfiller;
kj::Promise<void> readTask;
struct AwaitedResponse {
CallContext<DynamicStruct, DynamicStruct> context;
kj::Own<kj::PromiseFulfiller<void>> fulfiller;
};
kj::HashMap<uint, AwaitedResponse> awaitedResponses;
kj::TaskSet tasks;
class CapabilityImpl;
kj::Promise<void> queueWrite(kj::String text);
void queueError(kj::Maybe<json::Value::Reader> id, int code, kj::StringPtr message);
kj::Promise<void> readLoop();
void taskFailed(kj::Exception&& exception) override;
JsonRpc(Transport& transport, DynamicCapability::Client interface,
kj::PromiseFulfillerPair<void> paf);
};
class JsonRpc::Transport {
public:
virtual kj::Promise<void> send(kj::StringPtr text) = 0;
virtual kj::Promise<kj::String> receive() = 0;
};
class JsonRpc::ContentLengthTransport: public Transport {
// The transport used by Visual Studio Code: Each message is composed like an HTTP message
// without the first line. That is, a list of headers, followed by a blank line, followed by the
// content whose length is determined by the content-length header.
public:
explicit ContentLengthTransport(kj::AsyncIoStream& stream);
~ContentLengthTransport() noexcept(false);
KJ_DISALLOW_COPY(ContentLengthTransport);
kj::Promise<void> send(kj::StringPtr text) override;
kj::Promise<kj::String> receive() override;
private:
kj::AsyncIoStream& stream;
kj::Own<kj::HttpInputStream> input;
kj::ArrayPtr<const byte> parts[2];
};
} // namespace capnp
...@@ -66,14 +66,14 @@ struct Value { ...@@ -66,14 +66,14 @@ struct Value {
# #
# myField @0 :Text $Json.name("my_field"); # myField @0 :Text $Json.name("my_field");
annotation name @0xfa5b1fd61c2e7c3d (field, enumerant, method, group, union): Text; annotation name @0xfa5b1fd61c2e7c3d (field, enumerant, method, group, union) :Text;
# Define an alternative name to use when encoding the given item in JSON. This can be used, for # Define an alternative name to use when encoding the given item in JSON. This can be used, for
# example, to use snake_case names where needed, even though Cap'n Proto uses strictly camelCase. # example, to use snake_case names where needed, even though Cap'n Proto uses strictly camelCase.
# #
# (However, because JSON is derived from JavaScript, you *should* use camelCase names when # (However, because JSON is derived from JavaScript, you *should* use camelCase names when
# defining JSON-based APIs. But, when supporting a pre-existing API you may not have a choice.) # defining JSON-based APIs. But, when supporting a pre-existing API you may not have a choice.)
annotation flatten @0x82d3e852af0336bf (field, group, union): FlattenOptions; annotation flatten @0x82d3e852af0336bf (field, group, union) :FlattenOptions;
# Specifies that an aggregate field should be flattened into its parent. # Specifies that an aggregate field should be flattened into its parent.
# #
# In order to flatten a member of a union, the union (or, for an anonymous union, the parent # In order to flatten a member of a union, the union (or, for an anonymous union, the parent
...@@ -87,7 +87,7 @@ struct FlattenOptions { ...@@ -87,7 +87,7 @@ struct FlattenOptions {
# Optional: Adds the given prefix to flattened field names. # Optional: Adds the given prefix to flattened field names.
} }
annotation discriminator @0xcfa794e8d19a0162 (struct, union): DiscriminatorOptions; annotation discriminator @0xcfa794e8d19a0162 (struct, union) :DiscriminatorOptions;
# Specifies that a union's variant will be decided not by which fields are present, but instead # Specifies that a union's variant will be decided not by which fields are present, but instead
# by a special discriminator field. The value of the discriminator field is a string naming which # by a special discriminator field. The value of the discriminator field is a string naming which
# variant is active. This allows the members of the union to have the $jsonFlatten annotation, or # variant is active. This allows the members of the union to have the $jsonFlatten annotation, or
...@@ -105,8 +105,11 @@ struct DiscriminatorOptions { ...@@ -105,8 +105,11 @@ struct DiscriminatorOptions {
# It is an error to use `valueName` while also declaring some variants as $flatten. # It is an error to use `valueName` while also declaring some variants as $flatten.
} }
annotation base64 @0xd7d879450a253e4b (field): Void; annotation base64 @0xd7d879450a253e4b (field) :Void;
# Place on a field of type `Data` to indicate that its JSON representation is a Base64 string. # Place on a field of type `Data` to indicate that its JSON representation is a Base64 string.
annotation hex @0xf061e22f0ae5c7b5 (field): Void; annotation hex @0xf061e22f0ae5c7b5 (field) :Void;
# Place on a field of type `Data` to indicate that its JSON representation is a hex string. # Place on a field of type `Data` to indicate that its JSON representation is a hex string.
annotation notification @0xa0a054dea32fd98c (method) :Void;
# Indicates that this method is a JSON-RPC "notification", meaning it expects no response.
...@@ -303,6 +303,12 @@ void JsonCodec::encode(T&& value, JsonValue::Builder output) const { ...@@ -303,6 +303,12 @@ void JsonCodec::encode(T&& value, JsonValue::Builder output) const {
encode(DynamicValue::Reader(ReaderFor<Base>(kj::fwd<T>(value))), Type::from<Base>(), output); encode(DynamicValue::Reader(ReaderFor<Base>(kj::fwd<T>(value))), Type::from<Base>(), output);
} }
template <>
inline void JsonCodec::encode<DynamicStruct::Reader>(
DynamicStruct::Reader&& value, JsonValue::Builder output) const {
encode(DynamicValue::Reader(value), value.getSchema(), output);
}
template <typename T> template <typename T>
inline Orphan<T> JsonCodec::decode(JsonValue::Reader input, Orphanage orphanage) const { inline Orphan<T> JsonCodec::decode(JsonValue::Reader input, Orphanage orphanage) const {
return decode(input, Type::from<T>(), orphanage).template releaseAs<T>(); return decode(input, Type::from<T>(), orphanage).template releaseAs<T>();
......
...@@ -584,6 +584,9 @@ public: ...@@ -584,6 +584,9 @@ public:
kj::Promise<void> tailCall(Request<SubParams, DynamicStruct>&& tailRequest); kj::Promise<void> tailCall(Request<SubParams, DynamicStruct>&& tailRequest);
void allowCancellation(); void allowCancellation();
StructSchema getParamsType() const { return paramType; }
StructSchema getResultsType() const { return resultType; }
private: private:
CallContextHook* hook; CallContextHook* hook;
StructSchema paramType; StructSchema paramType;
......
...@@ -71,9 +71,8 @@ kj::Promise<bool> AsyncMessageReader::read(kj::AsyncInputStream& inputStream, ...@@ -71,9 +71,8 @@ kj::Promise<bool> AsyncMessageReader::read(kj::AsyncInputStream& inputStream,
return false; return false;
} else if (n < sizeof(firstWord)) { } else if (n < sizeof(firstWord)) {
// EOF in first word. // EOF in first word.
KJ_FAIL_REQUIRE("Premature EOF.") { kj::throwRecoverableException(KJ_EXCEPTION(DISCONNECTED, "Premature EOF."));
return false; return false;
}
} }
return readAfterFirstWord(inputStream, scratchSpace).then([]() { return true; }); return readAfterFirstWord(inputStream, scratchSpace).then([]() { return true; });
...@@ -153,7 +152,9 @@ kj::Promise<kj::Own<MessageReader>> readMessage( ...@@ -153,7 +152,9 @@ kj::Promise<kj::Own<MessageReader>> readMessage(
auto reader = kj::heap<AsyncMessageReader>(options); auto reader = kj::heap<AsyncMessageReader>(options);
auto promise = reader->read(input, scratchSpace); auto promise = reader->read(input, scratchSpace);
return promise.then(kj::mvCapture(reader, [](kj::Own<MessageReader>&& reader, bool success) { return promise.then(kj::mvCapture(reader, [](kj::Own<MessageReader>&& reader, bool success) {
KJ_REQUIRE(success, "Premature EOF.") { break; } if (!success) {
kj::throwRecoverableException(KJ_EXCEPTION(DISCONNECTED, "Premature EOF."));
}
return kj::mv(reader); return kj::mv(reader);
})); }));
} }
......
...@@ -1366,6 +1366,141 @@ KJ_TEST("HttpClient <-> HttpServer") { ...@@ -1366,6 +1366,141 @@ KJ_TEST("HttpClient <-> HttpServer") {
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
KJ_TEST("HttpInputStream requests") {
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::HttpHeaderTable table;
auto pipe = kj::newOneWayPipe();
auto input = newHttpInputStream(*pipe.in, table);
kj::Promise<void> writeQueue = kj::READY_NOW;
for (auto& testCase: requestTestCases()) {
writeQueue = writeQueue.then([&]() {
return pipe.out->write(testCase.raw.begin(), testCase.raw.size());
});
}
writeQueue = writeQueue.then([&]() {
pipe.out = nullptr;
});
for (auto& testCase: requestTestCases()) {
KJ_CONTEXT(testCase.raw);
KJ_ASSERT(input->awaitNextMessage().wait(waitScope));
auto req = input->readRequest().wait(waitScope);
KJ_EXPECT(req.method == testCase.method);
KJ_EXPECT(req.url == testCase.path);
for (auto& header: testCase.requestHeaders) {
KJ_EXPECT(KJ_ASSERT_NONNULL(req.headers.get(header.id)) == header.value);
}
auto body = req.body->readAllText().wait(waitScope);
KJ_EXPECT(body == kj::strArray(testCase.requestBodyParts, ""));
}
writeQueue.wait(waitScope);
KJ_EXPECT(!input->awaitNextMessage().wait(waitScope));
}
KJ_TEST("HttpInputStream responses") {
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::HttpHeaderTable table;
auto pipe = kj::newOneWayPipe();
auto input = newHttpInputStream(*pipe.in, table);
kj::Promise<void> writeQueue = kj::READY_NOW;
for (auto& testCase: responseTestCases()) {
if (testCase.side == CLIENT_ONLY) continue; // skip Connection: close case.
writeQueue = writeQueue.then([&]() {
return pipe.out->write(testCase.raw.begin(), testCase.raw.size());
});
}
writeQueue = writeQueue.then([&]() {
pipe.out = nullptr;
});
for (auto& testCase: responseTestCases()) {
if (testCase.side == CLIENT_ONLY) continue; // skip Connection: close case.
KJ_CONTEXT(testCase.raw);
KJ_ASSERT(input->awaitNextMessage().wait(waitScope));
auto resp = input->readResponse(testCase.method).wait(waitScope);
KJ_EXPECT(resp.statusCode == testCase.statusCode);
KJ_EXPECT(resp.statusText == testCase.statusText);
for (auto& header: testCase.responseHeaders) {
KJ_EXPECT(KJ_ASSERT_NONNULL(resp.headers.get(header.id)) == header.value);
}
auto body = resp.body->readAllText().wait(waitScope);
KJ_EXPECT(body == kj::strArray(testCase.responseBodyParts, ""));
}
writeQueue.wait(waitScope);
KJ_EXPECT(!input->awaitNextMessage().wait(waitScope));
}
KJ_TEST("HttpInputStream bare messages") {
kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop);
kj::HttpHeaderTable table;
auto pipe = kj::newOneWayPipe();
auto input = newHttpInputStream(*pipe.in, table);
kj::StringPtr messages =
"Content-Length: 6\r\n"
"\r\n"
"foobar"
"Content-Length: 11\r\n"
"Content-Type: some/type\r\n"
"\r\n"
"bazquxcorge"
"Transfer-Encoding: chunked\r\n"
"\r\n"
"6\r\n"
"grault\r\n"
"b\r\n"
"garplywaldo\r\n"
"0\r\n"
"\r\n"_kj;
kj::Promise<void> writeTask = pipe.out->write(messages.begin(), messages.size())
.then([&]() { pipe.out = nullptr; });
{
KJ_ASSERT(input->awaitNextMessage().wait(waitScope));
auto message = input->readMessage().wait(waitScope);
KJ_EXPECT(KJ_ASSERT_NONNULL(message.headers.get(HttpHeaderId::CONTENT_LENGTH)) == "6");
KJ_EXPECT(message.body->readAllText().wait(waitScope) == "foobar");
}
{
KJ_ASSERT(input->awaitNextMessage().wait(waitScope));
auto message = input->readMessage().wait(waitScope);
KJ_EXPECT(KJ_ASSERT_NONNULL(message.headers.get(HttpHeaderId::CONTENT_LENGTH)) == "11");
KJ_EXPECT(KJ_ASSERT_NONNULL(message.headers.get(HttpHeaderId::CONTENT_TYPE)) == "some/type");
KJ_EXPECT(message.body->readAllText().wait(waitScope) == "bazquxcorge");
}
{
KJ_ASSERT(input->awaitNextMessage().wait(waitScope));
auto message = input->readMessage().wait(waitScope);
KJ_EXPECT(KJ_ASSERT_NONNULL(message.headers.get(HttpHeaderId::TRANSFER_ENCODING)) == "chunked");
KJ_EXPECT(message.body->readAllText().wait(waitScope) == "graultgarplywaldo");
}
writeTask.wait(waitScope);
KJ_EXPECT(!input->awaitNextMessage().wait(waitScope));
}
// -----------------------------------------------------------------------------
KJ_TEST("WebSocket core protocol") { KJ_TEST("WebSocket core protocol") {
kj::EventLoop eventLoop; kj::EventLoop eventLoop;
kj::WaitScope waitScope(eventLoop); kj::WaitScope waitScope(eventLoop);
......
...@@ -903,6 +903,14 @@ kj::Maybe<HttpHeaders::Response> HttpHeaders::tryParseResponse(kj::ArrayPtr<char ...@@ -903,6 +903,14 @@ kj::Maybe<HttpHeaders::Response> HttpHeaders::tryParseResponse(kj::ArrayPtr<char
return response; return response;
} }
bool HttpHeaders::tryParse(kj::ArrayPtr<char> content) {
char* end = trimHeaderEnding(content);
if (end == nullptr) return false;
char* ptr = content.begin();
return parseHeaders(ptr, end);
}
bool HttpHeaders::parseHeaders(char* ptr, char* end) { bool HttpHeaders::parseHeaders(char* ptr, char* end) {
while (*ptr != '\0') { while (*ptr != '\0') {
KJ_IF_MAYBE(name, consumeHeaderName(ptr)) { KJ_IF_MAYBE(name, consumeHeaderName(ptr)) {
...@@ -988,9 +996,9 @@ static constexpr size_t MIN_BUFFER = 4096; ...@@ -988,9 +996,9 @@ static constexpr size_t MIN_BUFFER = 4096;
static constexpr size_t MAX_BUFFER = 65536; static constexpr size_t MAX_BUFFER = 65536;
static constexpr size_t MAX_CHUNK_HEADER_SIZE = 32; static constexpr size_t MAX_CHUNK_HEADER_SIZE = 32;
class HttpInputStream { class HttpInputStreamImpl final: public HttpInputStream {
public: public:
explicit HttpInputStream(AsyncIoStream& inner, HttpHeaderTable& table) explicit HttpInputStreamImpl(AsyncInputStream& inner, HttpHeaderTable& table)
: inner(inner), headerBuffer(kj::heapArray<char>(MIN_BUFFER)), headers(table) { : inner(inner), headerBuffer(kj::heapArray<char>(MIN_BUFFER)), headers(table) {
} }
...@@ -998,6 +1006,41 @@ public: ...@@ -998,6 +1006,41 @@ public:
return !broken && pendingMessageCount == 0; return !broken && pendingMessageCount == 0;
} }
// ---------------------------------------------------------------------------
// public interface
kj::Promise<Request> readRequest() override {
return readRequestHeaders()
.then([this](kj::Maybe<HttpHeaders::Request>&& maybeRequest) -> HttpInputStream::Request {
auto request = KJ_REQUIRE_NONNULL(maybeRequest, "bad request");
auto body = getEntityBody(HttpInputStreamImpl::REQUEST, request.method, 0, headers);
return { request.method, request.url, headers, kj::mv(body) };
});
}
kj::Promise<Response> readResponse(HttpMethod requestMethod) override {
return readResponseHeaders()
.then([this,requestMethod](kj::Maybe<HttpHeaders::Response>&& maybeResponse)
-> HttpInputStream::Response {
auto response = KJ_REQUIRE_NONNULL(maybeResponse, "bad response");
auto body = getEntityBody(HttpInputStreamImpl::RESPONSE, requestMethod, 0, headers);
return { response.statusCode, response.statusText, headers, kj::mv(body) };
});
}
kj::Promise<Message> readMessage() override {
return readMessageHeaders()
.then([this](kj::ArrayPtr<char> text) -> HttpInputStream::Message {
headers.clear();
KJ_REQUIRE(headers.tryParse(text), "bad message");
auto body = getEntityBody(HttpInputStreamImpl::RESPONSE, HttpMethod::GET, 0, headers);
return { headers, kj::mv(body) };
});
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Stream locking: While an entity-body is being read, the body stream "locks" the underlying // Stream locking: While an entity-body is being read, the body stream "locks" the underlying
// HTTP stream. Once the entity-body is complete, we can read the next pipelined message. // HTTP stream. Once the entity-body is complete, we can read the next pipelined message.
...@@ -1022,7 +1065,7 @@ public: ...@@ -1022,7 +1065,7 @@ public:
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
kj::Promise<bool> awaitNextMessage() { kj::Promise<bool> awaitNextMessage() override {
// Waits until more data is available, but doesn't consume it. Returns false on EOF. // Waits until more data is available, but doesn't consume it. Returns false on EOF.
// //
// Used on the server after a request is handled, to check for pipelined requests. // Used on the server after a request is handled, to check for pipelined requests.
...@@ -1172,7 +1215,7 @@ public: ...@@ -1172,7 +1215,7 @@ public:
} }
private: private:
AsyncIoStream& inner; AsyncInputStream& inner;
kj::Array<char> headerBuffer; kj::Array<char> headerBuffer;
size_t messageHeaderEnd = 0; size_t messageHeaderEnd = 0;
...@@ -1367,7 +1410,7 @@ private: ...@@ -1367,7 +1410,7 @@ private:
class HttpEntityBodyReader: public kj::AsyncInputStream { class HttpEntityBodyReader: public kj::AsyncInputStream {
public: public:
HttpEntityBodyReader(HttpInputStream& inner): inner(inner) {} HttpEntityBodyReader(HttpInputStreamImpl& inner): inner(inner) {}
~HttpEntityBodyReader() noexcept(false) { ~HttpEntityBodyReader() noexcept(false) {
if (!finished) { if (!finished) {
inner.abortRead(); inner.abortRead();
...@@ -1375,7 +1418,7 @@ public: ...@@ -1375,7 +1418,7 @@ public:
} }
protected: protected:
HttpInputStream& inner; HttpInputStreamImpl& inner;
void doneReading() { void doneReading() {
KJ_REQUIRE(!finished); KJ_REQUIRE(!finished);
...@@ -1394,7 +1437,7 @@ class HttpNullEntityReader final: public HttpEntityBodyReader { ...@@ -1394,7 +1437,7 @@ class HttpNullEntityReader final: public HttpEntityBodyReader {
// may indicate non-zero in the special case of a response to a HEAD request. // may indicate non-zero in the special case of a response to a HEAD request.
public: public:
HttpNullEntityReader(HttpInputStream& inner, kj::Maybe<uint64_t> length) HttpNullEntityReader(HttpInputStreamImpl& inner, kj::Maybe<uint64_t> length)
: HttpEntityBodyReader(inner), length(length) { : HttpEntityBodyReader(inner), length(length) {
// `length` is what to return from tryGetLength(). For a response to a HEAD request, this may // `length` is what to return from tryGetLength(). For a response to a HEAD request, this may
// be non-zero. // be non-zero.
...@@ -1417,7 +1460,7 @@ class HttpConnectionCloseEntityReader final: public HttpEntityBodyReader { ...@@ -1417,7 +1460,7 @@ class HttpConnectionCloseEntityReader final: public HttpEntityBodyReader {
// Stream which reads until EOF. // Stream which reads until EOF.
public: public:
HttpConnectionCloseEntityReader(HttpInputStream& inner) HttpConnectionCloseEntityReader(HttpInputStreamImpl& inner)
: HttpEntityBodyReader(inner) {} : HttpEntityBodyReader(inner) {}
Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
...@@ -1437,7 +1480,7 @@ class HttpFixedLengthEntityReader final: public HttpEntityBodyReader { ...@@ -1437,7 +1480,7 @@ class HttpFixedLengthEntityReader final: public HttpEntityBodyReader {
// Stream which reads only up to a fixed length from the underlying stream, then emulates EOF. // Stream which reads only up to a fixed length from the underlying stream, then emulates EOF.
public: public:
HttpFixedLengthEntityReader(HttpInputStream& inner, size_t length) HttpFixedLengthEntityReader(HttpInputStreamImpl& inner, size_t length)
: HttpEntityBodyReader(inner), length(length) { : HttpEntityBodyReader(inner), length(length) {
if (length == 0) doneReading(); if (length == 0) doneReading();
} }
...@@ -1470,7 +1513,7 @@ class HttpChunkedEntityReader final: public HttpEntityBodyReader { ...@@ -1470,7 +1513,7 @@ class HttpChunkedEntityReader final: public HttpEntityBodyReader {
// Stream which reads a Transfer-Encoding: Chunked stream. // Stream which reads a Transfer-Encoding: Chunked stream.
public: public:
HttpChunkedEntityReader(HttpInputStream& inner) HttpChunkedEntityReader(HttpInputStreamImpl& inner)
: HttpEntityBodyReader(inner) {} : HttpEntityBodyReader(inner) {}
Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
...@@ -1551,7 +1594,7 @@ static_assert(!fastCaseCmp<'n','O','o','B','1'>("FooB1"), ""); ...@@ -1551,7 +1594,7 @@ static_assert(!fastCaseCmp<'n','O','o','B','1'>("FooB1"), "");
static_assert(!fastCaseCmp<'f','O','o','B'>("FooB1"), ""); static_assert(!fastCaseCmp<'f','O','o','B'>("FooB1"), "");
static_assert(!fastCaseCmp<'f','O','o','B','1','a'>("FooB1"), ""); static_assert(!fastCaseCmp<'f','O','o','B','1','a'>("FooB1"), "");
kj::Own<kj::AsyncInputStream> HttpInputStream::getEntityBody( kj::Own<kj::AsyncInputStream> HttpInputStreamImpl::getEntityBody(
RequestOrResponse type, HttpMethod method, uint statusCode, RequestOrResponse type, HttpMethod method, uint statusCode,
const kj::HttpHeaders& headers) { const kj::HttpHeaders& headers) {
if (type == RESPONSE) { if (type == RESPONSE) {
...@@ -1599,8 +1642,16 @@ kj::Own<kj::AsyncInputStream> HttpInputStream::getEntityBody( ...@@ -1599,8 +1642,16 @@ kj::Own<kj::AsyncInputStream> HttpInputStream::getEntityBody(
return kj::heap<HttpNullEntityReader>(*this, uint64_t(0)); return kj::heap<HttpNullEntityReader>(*this, uint64_t(0));
} }
} // namespace
kj::Own<HttpInputStream> newHttpInputStream(kj::AsyncInputStream& input, HttpHeaderTable& table) {
return kj::heap<HttpInputStreamImpl>(input, table);
}
// ======================================================================================= // =======================================================================================
namespace {
class HttpOutputStream { class HttpOutputStream {
public: public:
HttpOutputStream(AsyncOutputStream& inner): inner(inner) {} HttpOutputStream(AsyncOutputStream& inner): inner(inner) {}
...@@ -2397,7 +2448,7 @@ private: ...@@ -2397,7 +2448,7 @@ private:
}; };
kj::Own<WebSocket> upgradeToWebSocket( kj::Own<WebSocket> upgradeToWebSocket(
kj::Own<kj::AsyncIoStream> stream, HttpInputStream& httpInput, HttpOutputStream& httpOutput, kj::Own<kj::AsyncIoStream> stream, HttpInputStreamImpl& httpInput, HttpOutputStream& httpOutput,
kj::Maybe<EntropySource&> maskKeyGenerator) { kj::Maybe<EntropySource&> maskKeyGenerator) {
// Create a WebSocket upgraded from an HTTP stream. // Create a WebSocket upgraded from an HTTP stream.
auto releasedBuffer = httpInput.releaseBuffer(); auto releasedBuffer = httpInput.releaseBuffer();
...@@ -3064,7 +3115,7 @@ public: ...@@ -3064,7 +3115,7 @@ public:
r->statusCode, r->statusCode,
r->statusText, r->statusText,
&headers, &headers,
httpInput.getEntityBody(HttpInputStream::RESPONSE, method, r->statusCode, headers) httpInput.getEntityBody(HttpInputStreamImpl::RESPONSE, method, r->statusCode, headers)
}; };
if (fastCaseCmp<'c', 'l', 'o', 's', 'e'>( if (fastCaseCmp<'c', 'l', 'o', 's', 'e'>(
...@@ -3156,7 +3207,7 @@ public: ...@@ -3156,7 +3207,7 @@ public:
r->statusCode, r->statusCode,
r->statusText, r->statusText,
&headers, &headers,
httpInput.getEntityBody(HttpInputStream::RESPONSE, HttpMethod::GET, r->statusCode, httpInput.getEntityBody(HttpInputStreamImpl::RESPONSE, HttpMethod::GET, r->statusCode,
headers) headers)
}; };
if (fastCaseCmp<'c', 'l', 'o', 's', 'e'>( if (fastCaseCmp<'c', 'l', 'o', 's', 'e'>(
...@@ -3178,7 +3229,7 @@ public: ...@@ -3178,7 +3229,7 @@ public:
} }
private: private:
HttpInputStream httpInput; HttpInputStreamImpl httpInput;
HttpOutputStream httpOutput; HttpOutputStream httpOutput;
kj::Own<AsyncIoStream> ownStream; kj::Own<AsyncIoStream> ownStream;
HttpClientSettings settings; HttpClientSettings settings;
...@@ -4159,7 +4210,7 @@ public: ...@@ -4159,7 +4210,7 @@ public:
currentMethod = req->method; currentMethod = req->method;
auto body = httpInput.getEntityBody( auto body = httpInput.getEntityBody(
HttpInputStream::REQUEST, req->method, 0, headers); HttpInputStreamImpl::REQUEST, req->method, 0, headers);
// TODO(perf): If the client disconnects, should we cancel the response? Probably, to // TODO(perf): If the client disconnects, should we cancel the response? Probably, to
// prevent permanent deadlock. It's slightly weird in that arguably the client should // prevent permanent deadlock. It's slightly weird in that arguably the client should
...@@ -4312,7 +4363,7 @@ private: ...@@ -4312,7 +4363,7 @@ private:
HttpServer& server; HttpServer& server;
kj::AsyncIoStream& stream; kj::AsyncIoStream& stream;
HttpService& service; HttpService& service;
HttpInputStream httpInput; HttpInputStreamImpl httpInput;
HttpOutputStream httpOutput; HttpOutputStream httpOutput;
kj::Maybe<HttpMethod> currentMethod; kj::Maybe<HttpMethod> currentMethod;
bool timedOut = false; bool timedOut = false;
......
...@@ -332,6 +332,9 @@ public: ...@@ -332,6 +332,9 @@ public:
// to split it into a bunch of shorter strings. The caller must keep `content` valid until the // to split it into a bunch of shorter strings. The caller must keep `content` valid until the
// `HttpHeaders` is destroyed, or pass it to `takeOwnership()`. // `HttpHeaders` is destroyed, or pass it to `takeOwnership()`.
bool tryParse(kj::ArrayPtr<char> content);
// Like tryParseRequest()/tryParseResponse(), but don't expect any request/response line.
kj::String serializeRequest(HttpMethod method, kj::StringPtr url, kj::String serializeRequest(HttpMethod method, kj::StringPtr url,
kj::ArrayPtr<const kj::StringPtr> connectionHeaders = nullptr) const; kj::ArrayPtr<const kj::StringPtr> connectionHeaders = nullptr) const;
kj::String serializeResponse(uint statusCode, kj::StringPtr statusText, kj::String serializeResponse(uint statusCode, kj::StringPtr statusText,
...@@ -396,6 +399,58 @@ private: ...@@ -396,6 +399,58 @@ private:
// also add direct accessors for those headers. // also add direct accessors for those headers.
}; };
class HttpInputStream {
// Low-level interface to receive HTTP-formatted messages (headers followed by body) from an
// input stream, without a paired output stream.
//
// Most applications will not use this. Regular HTTP clients and servers don't need this. This
// is mainly useful for apps implementing various protocols that look like HTTP but aren't
// really.
public:
struct Request {
HttpMethod method;
kj::StringPtr url;
const HttpHeaders& headers;
kj::Own<kj::AsyncInputStream> body;
};
virtual kj::Promise<Request> readRequest() = 0;
// Reads one HTTP request from the input stream.
//
// The returned struct contains pointers directly into a buffer that is invalidated on the next
// message read.
struct Response {
uint statusCode;
kj::StringPtr statusText;
const HttpHeaders& headers;
kj::Own<kj::AsyncInputStream> body;
};
virtual kj::Promise<Response> readResponse(HttpMethod requestMethod) = 0;
// Reads one HTTP response from the input stream.
//
// You must provide the request method because responses to HEAD requests require special
// treatment.
//
// The returned struct contains pointers directly into a buffer that is invalidated on the next
// message read.
struct Message {
const HttpHeaders& headers;
kj::Own<kj::AsyncInputStream> body;
};
virtual kj::Promise<Message> readMessage() = 0;
// Reads an HTTP header set followed by a body, with no request or response line. This is not
// useful for HTTP but may be useful for other protocols that make the unfortunate choice to
// mimic HTTP message format, such as Visual Studio Code's JSON-RPC transport.
//
// The returned struct contains pointers directly into a buffer that is invalidated on the next
// message read.
virtual kj::Promise<bool> awaitNextMessage() = 0;
// Waits until more data is available, but doesn't consume it. Returns false on EOF.
};
class EntropySource { class EntropySource {
// Interface for an object that generates entropy. Typically, cryptographically-random entropy // Interface for an object that generates entropy. Typically, cryptographically-random entropy
// is expected. // is expected.
...@@ -641,6 +696,16 @@ kj::Own<HttpClient> newHttpClient(HttpService& service); ...@@ -641,6 +696,16 @@ kj::Own<HttpClient> newHttpClient(HttpService& service);
kj::Own<HttpService> newHttpService(HttpClient& client); kj::Own<HttpService> newHttpService(HttpClient& client);
// Adapts an HttpClient to an HttpService and vice versa. // Adapts an HttpClient to an HttpService and vice versa.
kj::Own<HttpInputStream> newHttpInputStream(
kj::AsyncInputStream& input, HttpHeaderTable& headerTable);
// Create an HttpInputStream on top of the given stream. Normally applications would not call this
// directly, but it can be useful for implementing protocols that aren't quite HTTP but use similar
// message delimiting.
//
// The HttpInputStream implementation does read-ahead buffering on `input`. Therefore, when the
// HttpInputStream is destroyed, some data read from `input` may be lost, so it's not possible to
// continue reading from `input` in a reliable way.
kj::Own<WebSocket> newWebSocket(kj::Own<kj::AsyncIoStream> stream, kj::Own<WebSocket> newWebSocket(kj::Own<kj::AsyncIoStream> stream,
kj::Maybe<EntropySource&> maskEntropySource); kj::Maybe<EntropySource&> maskEntropySource);
// Create a new WebSocket on top of the given stream. It is assumed that the HTTP -> WebSocket // Create a new WebSocket on top of the given stream. It is assumed that the HTTP -> WebSocket
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment