Unverified Commit 355697c8 authored by Kenton Varda's avatar Kenton Varda Committed by GitHub

Merge pull request #829 from capnproto/http-over-capnp

 Define and implement HTTP-over-Cap'n-Proto
parents 71a8a674 52c63c5c
......@@ -82,6 +82,10 @@ kj::Promise<kj::Maybe<int>> Capability::Client::getFd() {
}
}
kj::Maybe<kj::Promise<Capability::Client>> Capability::Server::shortenPath() {
return nullptr;
}
Capability::Server::DispatchCallResult Capability::Server::internalUnimplemented(
const char* actualInterfaceName, uint64_t requestedTypeId) {
return {
......@@ -117,6 +121,10 @@ kj::Promise<void> ClientHook::whenResolved() {
}
}
kj::Promise<void> Capability::Client::whenResolved() {
return hook->whenResolved().attach(hook->addRef());
}
// =======================================================================================
static inline uint firstSegmentSize(kj::Maybe<MessageSize> sizeHint) {
......@@ -471,6 +479,13 @@ public:
LocalClient(kj::Own<Capability::Server>&& serverParam)
: server(kj::mv(serverParam)) {
server->thisHook = this;
resolveTask = server->shortenPath().map([this](kj::Promise<Capability::Client> promise) {
return promise.then([this](Capability::Client&& cap) {
auto hook = ClientHook::from(kj::mv(cap));
resolved = hook->addRef();
}).fork();
});
}
LocalClient(kj::Own<Capability::Server>&& serverParam,
_::CapabilityServerSetBase& capServerSet, void* ptr)
......@@ -533,11 +548,19 @@ public:
}
kj::Maybe<ClientHook&> getResolved() override {
return nullptr;
return resolved.map([](kj::Own<ClientHook>& hook) -> ClientHook& { return *hook; });
}
kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
return nullptr;
KJ_IF_MAYBE(r, resolved) {
return kj::Promise<kj::Own<ClientHook>>(r->get()->addRef());
} else KJ_IF_MAYBE(t, resolveTask) {
return t->addBranch().then([this]() {
return KJ_ASSERT_NONNULL(resolved)->addRef();
});
} else {
return nullptr;
}
}
kj::Own<ClientHook> addRef() override {
......@@ -551,7 +574,7 @@ public:
return &BRAND;
}
kj::Promise<void*> getLocalServer(_::CapabilityServerSetBase& capServerSet) {
kj::Maybe<kj::Promise<void*>> getLocalServer(_::CapabilityServerSetBase& capServerSet) {
// If this is a local capability created through `capServerSet`, return the underlying Server.
// Otherwise, return nullptr. Default implementation (which everyone except LocalClient should
// use) always returns nullptr.
......@@ -580,10 +603,10 @@ public:
return kj::newAdaptedPromise<kj::Promise<void>, BlockedCall>(*this)
.then([this]() { return ptr; });
} else {
return ptr;
return kj::Promise<void*>(ptr);
}
} else {
return (void*)nullptr;
return nullptr;
}
}
......@@ -596,6 +619,9 @@ private:
_::CapabilityServerSetBase* capServerSet = nullptr;
void* ptr = nullptr;
kj::Maybe<kj::ForkedPromise<void>> resolveTask;
kj::Maybe<kj::Own<ClientHook>> resolved;
class BlockedCall {
public:
BlockedCall(kj::PromiseFulfiller<kj::Promise<void>>& fulfiller, LocalClient& client,
......@@ -896,21 +922,35 @@ kj::Promise<void*> CapabilityServerSetBase::getLocalServerInternal(Capability::C
ClientHook* hook = client.hook.get();
// Get the most-resolved-so-far version of the hook.
KJ_IF_MAYBE(h, hook->getResolved()) {
hook = h;
};
for (;;) {
KJ_IF_MAYBE(h, hook->getResolved()) {
hook = h;
} else {
break;
}
}
// Try to unwrap that.
if (hook->getBrand() == &LocalClient::BRAND) {
KJ_IF_MAYBE(promise, kj::downcast<LocalClient>(*hook).getLocalServer(*this)) {
// This is definitely a member of our set and will resolve to non-null. We just have to wait
// for any existing streaming calls to complete.
return kj::mv(*promise);
}
}
// OK, the capability isn't part of this set.
KJ_IF_MAYBE(p, hook->whenMoreResolved()) {
// This hook is an unresolved promise. We need to wait for it.
// This hook is an unresolved promise. It might resolve eventually to a local server, so wait
// for it.
return p->attach(hook->addRef())
.then([this](kj::Own<ClientHook>&& resolved) {
Capability::Client client(kj::mv(resolved));
return getLocalServerInternal(client);
});
} else if (hook->getBrand() == &LocalClient::BRAND) {
return kj::downcast<LocalClient>(*hook).getLocalServer(*this);
} else {
return (void*)nullptr;
// Cap is settled, so it definitely will never resolve to a member of this set.
return kj::implicitCast<void*>(nullptr);
}
}
......
......@@ -407,6 +407,23 @@ public:
// returns that FD. When FD passing has been enabled in the RPC layer, this FD may be sent to
// other processes along with the capability.
virtual kj::Maybe<kj::Promise<Capability::Client>> shortenPath();
// If this returns non-null, then it is a promise which, when resolved, points to a new
// capability to which future calls can be sent. Use this in cases where an object implementation
// might discover a more-optimized path some time after it starts.
//
// Implementing this (and returning non-null) will cause the capability to be advertised as a
// promise at the RPC protocol level. Once the promise returned by shortenPath() resolves, the
// remote client will receive a `Resolve` message updating it to point at the new destination.
//
// `shortenPath()` can also be used as a hack to shut up the client. If shortenPath() returns
// a promise that resolves to an exception, then the client will be notified that the capability
// is now broken. Assuming the client is using a correct RPC implemnetation, this should cause
// all further calls initiated by the client to this capability to immediately fail client-side,
// sparing the server's bandwidth.
//
// The default implementation always returns nullptr.
// TODO(someday): Method which can optionally be overridden to implement Join when the object is
// a proxy.
......@@ -881,9 +898,6 @@ template <typename T>
inline typename T::Client Capability::Client::castAs() {
return typename T::Client(hook->addRef());
}
inline kj::Promise<void> Capability::Client::whenResolved() {
return hook->whenResolved();
}
inline Request<AnyPointer, AnyPointer> Capability::Client::typelessRequest(
uint64_t interfaceId, uint16_t methodId,
kj::Maybe<MessageSize> sizeHint) {
......
This diff is collapsed.
This diff is collapsed.
@0x8f5d14e1c273738d;
$import "/capnp/c++.capnp".namespace("capnp");
interface ByteStream {
write @0 (bytes :Data) -> stream;
# Write a chunk.
end @1 ();
# Signals clean EOF. (If the ByteStream is dropped without calling this, then the stream was
# prematurely canceled and so thet body should not be considered complete.)
getSubstream @2 (callback :SubstreamCallback,
limit :UInt64 = 0xffffffffffffffff) -> (substream :ByteStream);
# This method is used to implement path shortening optimization. It is designed in particular
# with KJ streams' pumpTo() in mind.
#
# getSubstream() returns a new stream object that can be used to write to the same destination
# as this stream. The substream will operate until it has received `limit` bytes, or its `end()`
# method has been called, whichever occurs first. At that time, it invokes one of the methods of
# `callback` based on the termination condition.
#
# While a substream is active, it is an error to call write() on the original stream. Doing so
# may throw an exception or may arbitrarily interleave bytes with the substream's writes.
interface SubstreamCallback {
ended @0 (byteCount :UInt64);
# `end()` was called on the substream after writing `byteCount` bytes. The `end()` call was
# NOT forwarded to the underlying stream, which remains open.
reachedLimit @1 () -> (next :ByteStream);
# The number of bytes specified by the `limit` parameter of `getSubstream()` was reached.
# The substream will "resolve itself" to `next`, so that all future calls to the substream
# are forwarded to `next`.
#
# If the `write()` call which reached the limit included bytes past the limit, then the first
# `write()` call to `next` will be for those leftover bytes.
}
}
// Copyright (c) 2019 Cloudflare, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#pragma once
// Bridges from KJ streams to Cap'n Proto ByteStream RPC protocol.
#include <capnp/compat/byte-stream.capnp.h>
#include <kj/async-io.h>
namespace capnp {
class ByteStreamFactory {
// In order to allow path-shortening through KJ, a common factory must be used for converting
// between RPC ByteStreams and KJ streams.
public:
capnp::ByteStream::Client kjToCapnp(kj::Own<kj::AsyncOutputStream> kjStream);
kj::Own<kj::AsyncOutputStream> capnpToKj(capnp::ByteStream::Client capnpStream);
private:
CapabilityServerSet<capnp::ByteStream> streamSet;
class StreamServerBase;
class SubstreamImpl;
class CapnpToKjStreamAdapter;
class KjToCapnpStreamAdapter;
};
} // namespace capnp
This diff is collapsed.
This diff is collapsed.
# Copyright (c) 2019 Cloudflare, Inc. and contributors
# Licensed under the MIT License:
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
@0xb665280aaff2e632;
# Cap'n Proto interface for HTTP.
using import "byte-stream.capnp".ByteStream;
$import "/capnp/c++.capnp".namespace("capnp");
interface HttpService {
startRequest @0 (request :HttpRequest, context :ClientRequestContext)
-> (requestBody :ByteStream, context :ServerRequestContext);
# Begin an HTTP request.
#
# The client sends the request method/url/headers. The server responds with a `ByteStream` where
# the client can make calls to stream up the request body. `requestBody` will be null in the case
# that request.bodySize.fixed == 0.
interface ClientRequestContext {
# Provides callbacks for the server to send the response.
startResponse @0 (response :HttpResponse) -> (body :ByteStream);
# Server calls this method to send the response status and headers and to begin streaming the
# response body. `body` will be null in the case that response.bodySize.fixed == 0, which is
# required for HEAD responses and status codes 204, 205, and 304.
startWebSocket @1 (headers :List(HttpHeader), upSocket :WebSocket)
-> (downSocket :WebSocket);
# Server calls this method to indicate that the request is a valid WebSocket handshake and it
# wishes to accept it as a WebSocket.
#
# Client -> Server WebSocket frames will be sent via method calls on `upSocket`, while
# Server -> Client will be sent as calls to `downSocket`.
}
interface ServerRequestContext {
# Represents execution of a particular request on the server side.
#
# Dropping this object before the request completes will cancel the request.
#
# ServerRequestContext is always a promise capability. The client must wait for it to
# resolve using whenMoreResolved() in order to find out when the server is really done
# processing the request. This will throw an exception if the server failed in some way that
# could not be captured in the HTTP response. Note that it's possible for such an exception to
# be thrown even after the response body has been completely transmitted.
}
}
interface WebSocket {
sendText @0 (text :Text) -> stream;
sendData @1 (data :Data) -> stream;
# Send a text or data frame.
close @2 (code :UInt16, reason :Text);
# Send a close frame.
}
struct HttpRequest {
# Standard HTTP request metadata.
method @0 :HttpMethod;
url @1 :Text;
headers @2 :List(HttpHeader);
bodySize :union {
unknown @3 :Void; # e.g. due to transfer-encoding: chunked
fixed @4 :UInt64; # e.g. due to content-length
}
}
struct HttpResponse {
# Standard HTTP response metadata.
statusCode @0 :UInt16;
statusText @1 :Text; # leave null if it matches the default for statusCode
headers @2 :List(HttpHeader);
bodySize :union {
unknown @3 :Void; # e.g. due to transfer-encoding: chunked
fixed @4 :UInt64; # e.g. due to content-length
}
}
enum HttpMethod {
# This enum aligns precisely with the kj::HttpMethod enum. However, the backwards-compat
# constraints of a public-facing C++ enum vs. an internal Cap'n Proto interface differ in
# several ways, which could possibly lead to divergence someday. For now, a unit test verifies
# that they match exactly; if that test ever fails, we'll have to figure out what to do about it.
get @0;
head @1;
post @2;
put @3;
delete @4;
patch @5;
purge @6;
options @7;
trace @8;
copy @9;
lock @10;
mkcol @11;
move @12;
propfind @13;
proppatch @14;
search @15;
unlock @16;
acl @17;
report @18;
mkactivity @19;
checkout @20;
merge @21;
msearch @22;
notify @23;
subscribe @24;
unsubscribe @25;
}
annotation commonText @0x857745131db6fc83(enumerant) :Text;
enum CommonHeaderName {
invalid @0;
# Dummy to serve as default value. Should never actually appear on wire.
acceptCharset @1 $commonText("Accept-Charset");
acceptEncoding @2 $commonText("Accept-Encoding");
acceptLanguage @3 $commonText("Accept-Language");
acceptRanges @4 $commonText("Accept-Ranges");
accept @5 $commonText("Accept");
accessControlAllowOrigin @6 $commonText("Access-Control-Allow-Origin");
age @7 $commonText("Age");
allow @8 $commonText("Allow");
authorization @9 $commonText("Authorization");
cacheControl @10 $commonText("Cache-Control");
contentDisposition @11 $commonText("Content-Disposition");
contentEncoding @12 $commonText("Content-Encoding");
contentLanguage @13 $commonText("Content-Language");
contentLength @14 $commonText("Content-Length");
contentLocation @15 $commonText("Content-Location");
contentRange @16 $commonText("Content-Range");
contentType @17 $commonText("Content-Type");
cookie @18 $commonText("Cookie");
date @19 $commonText("Date");
etag @20 $commonText("ETag");
expect @21 $commonText("Expect");
expires @22 $commonText("Expires");
from @23 $commonText("From");
host @24 $commonText("Host");
ifMatch @25 $commonText("If-Match");
ifModifiedSince @26 $commonText("If-Modified-Since");
ifNoneMatch @27 $commonText("If-None-Match");
ifRange @28 $commonText("If-Range");
ifUnmodifiedSince @29 $commonText("If-Unmodified-Since");
lastModified @30 $commonText("Last-Modified");
link @31 $commonText("Link");
location @32 $commonText("Location");
maxForwards @33 $commonText("Max-Forwards");
proxyAuthenticate @34 $commonText("Proxy-Authenticate");
proxyAuthorization @35 $commonText("Proxy-Authorization");
range @36 $commonText("Range");
referer @37 $commonText("Referer");
refresh @38 $commonText("Refresh");
retryAfter @39 $commonText("Retry-After");
server @40 $commonText("Server");
setCookie @41 $commonText("Set-Cookie");
strictTransportSecurity @42 $commonText("Strict-Transport-Security");
transferEncoding @43 $commonText("Transfer-Encoding");
userAgent @44 $commonText("User-Agent");
vary @45 $commonText("Vary");
via @46 $commonText("Via");
wwwAuthenticate @47 $commonText("WWW-Authenticate");
}
enum CommonHeaderValue {
invalid @0;
gzipDeflate @1 $commonText("gzip, deflate");
# TODO(someday): "gzip, deflate" is the only common header value recognized by HPACK.
}
struct HttpHeader {
union {
common :group {
name @0 :CommonHeaderName;
union {
commonValue @1 :CommonHeaderValue;
value @2 :Text;
}
}
uncommon @3 :NameValue;
}
struct NameValue {
name @0 :Text;
value @1 :Text;
}
}
// Copyright (c) 2019 Cloudflare, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#pragma once
// Bridges from KJ HTTP to Cap'n Proto HTTP-over-RPC.
#include <capnp/compat/http-over-capnp.capnp.h>
#include <kj/compat/http.h>
#include <kj/map.h>
#include "byte-stream.h"
namespace capnp {
class HttpOverCapnpFactory {
public:
HttpOverCapnpFactory(ByteStreamFactory& streamFactory,
kj::HttpHeaderTable::Builder& headerTableBuilder);
kj::Own<kj::HttpService> capnpToKj(capnp::HttpService::Client rpcService);
capnp::HttpService::Client kjToCapnp(kj::Own<kj::HttpService> service);
private:
ByteStreamFactory& streamFactory;
kj::HttpHeaderTable& headerTable;
kj::Array<capnp::CommonHeaderName> nameKjToCapnp;
kj::Array<kj::HttpHeaderId> nameCapnpToKj;
kj::Array<kj::StringPtr> valueCapnpToKj;
kj::HashMap<kj::StringPtr, capnp::CommonHeaderValue> valueKjToCapnp;
class RequestState;
class CapnpToKjWebSocketAdapter;
class KjToCapnpWebSocketAdapter;
class ClientRequestContextImpl;
class KjToCapnpHttpServiceAdapter;
class ServerRequestContextImpl;
class CapnpToKjHttpServiceAdapter;
kj::HttpHeaders headersToKj(capnp::List<capnp::HttpHeader>::Reader capnpHeaders) const;
// Returned headers may alias into `capnpHeaders`.
capnp::Orphan<capnp::List<capnp::HttpHeader>> headersToCapnp(
const kj::HttpHeaders& headers, capnp::Orphanage orphanage);
};
} // namespace capnp
......@@ -222,7 +222,12 @@ size_t TwoPartyVatNetwork::getWindow() {
KJ_ASSERT(len == sizeof(bufSize));
})) {
if (exception->getType() != kj::Exception::Type::UNIMPLEMENTED) {
kj::throwRecoverableException(kj::mv(*exception));
// TODO(someday): Figure out why getting SO_SNDBUF sometimes throws EINVAL. I suspect it
// happens when the remote side has closed their read end, meaning we no longer have
// a send buffer, but I don't know what is the best way to verify that that was actually
// the reason. I'd prefer not to ignore EINVAL errors in general.
// kj::throwRecoverableException(kj::mv(*exception));
}
solSndbufUnimplemented = true;
bufSize = RpcFlowController::DEFAULT_WINDOW_SIZE;
......
......@@ -2789,7 +2789,7 @@ private:
answerToRelease = answers.erase(finish.getQuestionId());
}
} else {
KJ_REQUIRE(answer->active, "'Finish' for invalid question ID.") { return; }
KJ_FAIL_REQUIRE("'Finish' for invalid question ID.") { return; }
}
}
......@@ -3119,18 +3119,53 @@ class WindowFlowController final: public RpcFlowController, private kj::TaskSet:
public:
WindowFlowController(RpcFlowController::WindowGetter& windowGetter)
: windowGetter(windowGetter), tasks(*this) {
state.init<std::queue<QueuedMessage>>();
state.init<Running>();
}
kj::Promise<void> send(kj::Own<OutgoingRpcMessage> message, kj::Promise<void> ack) override {
auto size = message->sizeInWords() * sizeof(capnp::word);
maxMessageSize = kj::max(size, maxMessageSize);
// We are REQUIRED to send the message NOW to maintain correct ordering.
message->send();
inFlight += size;
tasks.add(ack.then([this, size]() {
inFlight -= size;
KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(blockedSends, Running) {
if (isReady()) {
// Release all fulfillers.
for (auto& fulfiller: blockedSends) {
fulfiller->fulfill();
}
blockedSends.clear();
}
KJ_IF_MAYBE(f, emptyFulfiller) {
if (inFlight == 0) {
f->get()->fulfill(tasks.onEmpty());
}
}
}
KJ_CASE_ONEOF(exception, kj::Exception) {
// A previous call failed, but this one -- which was already in-flight at the time --
// ended up succeeding. That may indicate that the server side is not properly
// handling streaming error propagation. Nothing much we can do about it here though.
}
}
}));
KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(queue, std::queue<QueuedMessage>) {
auto size = message->sizeInWords() * sizeof(capnp::word);
maxMessageSize = kj::max(size, maxMessageSize);
auto paf = kj::newPromiseAndFulfiller<void>();
queue.push({kj::mv(message), kj::mv(ack), kj::mv(paf.fulfiller), size});
pumpQueue(queue);
return kj::mv(paf.promise);
KJ_CASE_ONEOF(blockedSends, Running) {
if (isReady()) {
return kj::READY_NOW;
} else {
auto paf = kj::newPromiseAndFulfiller<void>();
blockedSends.add(kj::mv(paf.fulfiller));
return kj::mv(paf.promise);
}
}
KJ_CASE_ONEOF(exception, kj::Exception) {
return kj::cp(exception);
......@@ -3140,7 +3175,7 @@ public:
}
kj::Promise<void> waitAllAcked() override {
KJ_IF_MAYBE(q, state.tryGet<std::queue<QueuedMessage>>()) {
KJ_IF_MAYBE(q, state.tryGet<Running>()) {
if (!q->empty()) {
auto paf = kj::newPromiseAndFulfiller<kj::Promise<void>>();
emptyFulfiller = kj::mv(paf.fulfiller);
......@@ -3155,63 +3190,19 @@ private:
size_t inFlight = 0;
size_t maxMessageSize = 0;
struct QueuedMessage {
kj::Own<OutgoingRpcMessage> message;
kj::Promise<void> ack;
kj::Own<kj::PromiseFulfiller<void>> sentFulfiller;
size_t size;
};
kj::OneOf<std::queue<QueuedMessage>, kj::Exception> state;
typedef kj::Vector<kj::Own<kj::PromiseFulfiller<void>>> Running;
kj::OneOf<Running, kj::Exception> state;
kj::Maybe<kj::Own<kj::PromiseFulfiller<kj::Promise<void>>>> emptyFulfiller;
kj::TaskSet tasks;
void pumpQueue(std::queue<QueuedMessage>& queue) {
size_t window = windowGetter.getWindow();
// We extend the window by maxMessageSize to avoid a pathological situation when a message
// is larger than the window size. Otherwise, after sending that message, we would end up
// not sending any others until the ack was received, wasting a round trip's worth of
// bandwidth.
while (!queue.empty() && inFlight < window + maxMessageSize) {
auto front = kj::mv(queue.front());
queue.pop();
front.sentFulfiller->rejectIfThrows([&]() {
front.message->send();
inFlight += front.size;
tasks.add(front.ack.then([this, size = front.size]() {
inFlight -= size;
KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(queue, std::queue<QueuedMessage>) {
pumpQueue(queue);
}
KJ_CASE_ONEOF(exception, kj::Exception) {
// A previous call failed, but this one -- which was already in-flight at the time --
// ended up succeeding. That may indicate that the server side is not properly
// handling streaming error propagation. Nothing much we can do about it here though.
}
}
}));
front.sentFulfiller->fulfill();
});
}
KJ_IF_MAYBE(f, emptyFulfiller) {
if (queue.empty()) {
f->get()->fulfill(tasks.onEmpty());
}
}
}
void taskFailed(kj::Exception&& exception) override {
KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(queue, std::queue<QueuedMessage>) {
KJ_CASE_ONEOF(blockedSends, Running) {
// Fail out all pending sends.
while (!queue.empty()) {
queue.front().sentFulfiller->reject(kj::cp(exception));
queue.pop();
for (auto& fulfiller: blockedSends) {
fulfiller->reject(kj::cp(exception));
}
// Fail out all future sends.
state = kj::mv(exception);
......@@ -3221,6 +3212,15 @@ private:
}
}
}
bool isReady() {
// We extend the window by maxMessageSize to avoid a pathological situation when a message
// is larger than the window size. Otherwise, after sending that message, we would end up
// not sending any others until the ack was received, wasting a round trip's worth of
// bandwidth.
return inFlight <= maxMessageSize // avoid getWindow() call if unnecessary
|| inFlight < windowGetter.getWindow() + maxMessageSize;
}
};
class FixedWindowFlowController final
......
......@@ -1702,6 +1702,36 @@ KJ_TEST("Userland pipe multi-part write doesn't quit early") {
writePromise.wait(ws);
}
KJ_TEST("Userland pipe BlockedRead gets empty tryPumpFrom") {
kj::EventLoop loop;
WaitScope ws(loop);
auto pipe = newOneWayPipe();
auto pipe2 = newOneWayPipe();
// First start a read from the back end.
char buffer[4];
auto readPromise = pipe2.in->tryRead(buffer, 1, 4);
// Now arrange a pump between the pipes, using tryPumpFrom().
auto pumpPromise = KJ_ASSERT_NONNULL(pipe2.out->tryPumpFrom(*pipe.in));
// Disconnect the front pipe, causing EOF on the pump.
pipe.out = nullptr;
// The pump should have produced zero bytes.
KJ_EXPECT(pumpPromise.wait(ws) == 0);
// The read is incomplete.
KJ_EXPECT(!readPromise.poll(ws));
// A subsequent write() completes the read.
pipe2.out->write("foo", 3).wait(ws);
KJ_EXPECT(readPromise.wait(ws) == 3);
buffer[3] = '\0';
KJ_EXPECT(kj::StringPtr(buffer, 3) == "foo");
}
constexpr static auto TEE_MAX_CHUNK_SIZE = 1 << 14;
// AsyncTee::MAX_CHUNK_SIZE, 16k as of this writing
......
......@@ -724,32 +724,33 @@ private:
auto maxToRead = kj::min(amount, readBuffer.size());
return canceler.wrap(input.tryRead(readBuffer.begin(), minToRead, maxToRead)
.then([this,&input,amount,minToRead](size_t actual) -> Promise<uint64_t> {
.then([this,&input,amount](size_t actual) -> Promise<uint64_t> {
readBuffer = readBuffer.slice(actual, readBuffer.size());
readSoFar += actual;
if (readSoFar >= minBytes || actual < minToRead) {
// We've read enough to close out this read (readSoFar >= minBytes)
// OR we reached EOF and couldn't complete the read (actual < minToRead)
// Either way, we want to close out this read.
if (readSoFar >= minBytes) {
// We've read enough to close out this read (readSoFar >= minBytes).
canceler.release();
fulfiller.fulfill(kj::cp(readSoFar));
pipe.endState(*this);
if (actual < amount) {
// We din't complete pumping. Restart from the pipe.
// We didn't read as much data as the pump requested, but we did fulfill the read, so
// we don't know whether we reached EOF on the input. We need to continue the pump,
// replacing the BlockedRead state.
return input.pumpTo(pipe, amount - actual)
.then([actual](uint64_t actual2) -> uint64_t { return actual + actual2; });
} else {
// We pumped as much data as was requested, so we can return that now.
return actual;
}
} else {
// The pump completed without fulfilling the read. This either means that the pump
// reached EOF or the `amount` requested was not enough to satisfy the read in the first
// place. Pumps do not propagate EOF, so either way we want to leave the BlockedRead in
// place waiting for more data.
return actual;
}
// If we read less than `actual`, but more than `minToRead`, it can only have been
// because we reached `minBytes`, so the conditional above would have executed. So, here
// we know that actual == amount.
KJ_ASSERT(actual == amount);
// We pumped the full amount, so we're done pumping.
return amount;
}));
}
......@@ -950,20 +951,20 @@ private:
public:
Promise<size_t> tryRead(void* readBufferPtr, size_t minBytes, size_t maxBytes) override {
KJ_FAIL_REQUIRE("abortRead() has been called");
return KJ_EXCEPTION(DISCONNECTED, "abortRead() has been called");
}
Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override {
KJ_FAIL_REQUIRE("abortRead() has been called");
return KJ_EXCEPTION(DISCONNECTED, "abortRead() has been called");
}
void abortRead() override {
// ignore repeated abort
}
Promise<void> write(const void* buffer, size_t size) override {
KJ_FAIL_REQUIRE("abortRead() has been called");
return KJ_EXCEPTION(DISCONNECTED, "abortRead() has been called");
}
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
KJ_FAIL_REQUIRE("abortRead() has been called");
return KJ_EXCEPTION(DISCONNECTED, "abortRead() has been called");
}
Maybe<Promise<uint64_t>> tryPumpFrom(AsyncInputStream& input, uint64_t amount) override {
// There might not actually be any data in `input`, in which case a pump wouldn't actually
......@@ -983,7 +984,9 @@ private:
return uint64_t(0);
} else {
// There was data in the input. The pump would have thrown.
KJ_FAIL_REQUIRE("abortRead() has been called");
kj::throwRecoverableException(
KJ_EXCEPTION(DISCONNECTED, "abortRead() has been called"));
return uint64_t(0);
}
});
}
......
......@@ -307,6 +307,29 @@ KJ_TEST("HttpHeaders validation") {
KJ_EXPECT_THROW_MESSAGE("invalid header value", headers.add("Valid-Name", "in\nvalid"));
}
KJ_TEST("HttpHeaders Set-Cookie handling") {
HttpHeaderTable::Builder builder;
auto hCookie = builder.add("Cookie");
auto hSetCookie = builder.add("Set-Cookie");
auto table = builder.build();
HttpHeaders headers(*table);
headers.set(hCookie, "Foo");
headers.add("Cookie", "Bar");
headers.add("Cookie", "Baz");
headers.set(hSetCookie, "Foo");
headers.add("Set-Cookie", "Bar");
headers.add("Set-Cookie", "Baz");
auto text = headers.toString();
KJ_EXPECT(text ==
"Cookie: Foo, Bar, Baz\r\n"
"Set-Cookie: Foo\r\n"
"Set-Cookie: Bar\r\n"
"Set-Cookie: Baz\r\n"
"\r\n", text);
}
// =======================================================================================
class ReadFragmenter final: public kj::AsyncIoStream {
......
......@@ -578,6 +578,16 @@ void HttpHeaders::clear() {
unindexedHeaders.clear();
}
size_t HttpHeaders::size() const {
size_t result = unindexedHeaders.size();
for (auto i: kj::indices(indexedHeaders)) {
if (indexedHeaders[i] != nullptr) {
++result;
}
}
return result;
}
HttpHeaders HttpHeaders::clone() const {
HttpHeaders result(*table);
......@@ -669,9 +679,21 @@ void HttpHeaders::addNoCheck(kj::StringPtr name, kj::StringPtr value) {
indexedHeaders[id->id] = value;
} else {
// Duplicate HTTP headers are equivalent to the values being separated by a comma.
auto concat = kj::str(indexedHeaders[id->id], ", ", value);
indexedHeaders[id->id] = concat;
ownedStrings.add(concat.releaseArray());
#if _MSC_VER
if (_stricmp(name.cStr(), "set-cookie") == 0) {
#else
if (strcasecmp(name.cStr(), "set-cookie") == 0) {
#endif
// Uh-oh, Set-Cookie will be corrupted if we try to concatenate it. We'll make it an
// unindexed header, which is weird, but the alternative is guaranteed corruption, so...
// TODO(cleanup): Maybe HttpHeaders should just special-case set-cookie in general?
unindexedHeaders.add(Header {name, value});
} else {
auto concat = kj::str(indexedHeaders[id->id], ", ", value);
indexedHeaders[id->id] = concat;
ownedStrings.add(concat.releaseArray());
}
}
} else {
unindexedHeaders.add(Header {name, value});
......
......@@ -117,6 +117,8 @@ public:
inline bool operator>=(const HttpHeaderId& other) const { return id >= other.id; }
inline size_t hashCode() const { return id; }
// Returned value is guaranteed to be small and never collide with other headers on the same
// table.
kj::StringPtr toString() const;
......@@ -251,6 +253,9 @@ public:
HttpHeaders(HttpHeaders&&) = default;
HttpHeaders& operator=(HttpHeaders&&) = default;
size_t size() const;
// Returns the number of headers that forEach() would iterate over.
void clear();
// Clears all contents, as if the object was freshly-allocated. However, calling this rather
// than actually re-allocating the object may avoid re-allocation of internal objects.
......@@ -277,6 +282,12 @@ public:
// Calls `func(name, value)` for each header in the set -- including headers that aren't mapped
// to IDs in the header table. Both inputs are of type kj::StringPtr.
template <typename Func1, typename Func2>
void forEach(Func1&& func1, Func2&& func2) const;
// Calls `func1(id, value)` for each header in the set that has a registered HttpHeaderId, and
// `func2(name, value)` for each header that does not. All calls to func1() preceed all calls to
// func2().
void set(HttpHeaderId id, kj::StringPtr value);
void set(HttpHeaderId id, kj::String&& value);
// Sets a header value, overwriting the existing value.
......@@ -957,4 +968,17 @@ inline void HttpHeaders::forEach(Func&& func) const {
}
}
template <typename Func1, typename Func2>
inline void HttpHeaders::forEach(Func1&& func1, Func2&& func2) const {
for (auto i: kj::indices(indexedHeaders)) {
if (indexedHeaders[i] != nullptr) {
func1(HttpHeaderId(table, i), indexedHeaders[i]);
}
}
for (auto& header: unindexedHeaders) {
func2(header.name, header.value);
}
}
} // namespace kj
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment