Commit 7a0e0fd0 authored by Kenton Varda's avatar Kenton Varda

Implement client RPC side of streaming.

parent 56493100
...@@ -35,6 +35,7 @@ namespace capnp { ...@@ -35,6 +35,7 @@ namespace capnp {
class OutgoingRpcMessage; class OutgoingRpcMessage;
class IncomingRpcMessage; class IncomingRpcMessage;
class RpcFlowController;
template <typename SturdyRefHostId> template <typename SturdyRefHostId>
class RpcSystem; class RpcSystem;
...@@ -59,6 +60,7 @@ public: ...@@ -59,6 +60,7 @@ public:
virtual kj::Promise<kj::Maybe<kj::Own<IncomingRpcMessage>>> receiveIncomingMessage() = 0; virtual kj::Promise<kj::Maybe<kj::Own<IncomingRpcMessage>>> receiveIncomingMessage() = 0;
virtual kj::Promise<void> shutdown() = 0; virtual kj::Promise<void> shutdown() = 0;
virtual AnyStruct::Reader baseGetPeerVatId() = 0; virtual AnyStruct::Reader baseGetPeerVatId() = 0;
virtual kj::Own<RpcFlowController> newStream() = 0;
}; };
virtual kj::Maybe<kj::Own<Connection>> baseConnect(AnyStruct::Reader vatId) = 0; virtual kj::Maybe<kj::Own<Connection>> baseConnect(AnyStruct::Reader vatId) = 0;
virtual kj::Promise<kj::Own<Connection>> baseAccept() = 0; virtual kj::Promise<kj::Own<Connection>> baseAccept() = 0;
......
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
#include <kj/thread.h> #include <kj/thread.h>
#include <kj/compat/gtest.h> #include <kj/compat/gtest.h>
#include <kj/miniposix.h> #include <kj/miniposix.h>
#include <sys/socket.h>
// TODO(cleanup): Auto-generate stringification functions for union discriminants. // TODO(cleanup): Auto-generate stringification functions for union discriminants.
namespace capnp { namespace capnp {
...@@ -522,6 +523,122 @@ KJ_TEST("FD per message limit") { ...@@ -522,6 +523,122 @@ KJ_TEST("FD per message limit") {
} }
#endif // !_WIN32 && !__CYGWIN__ #endif // !_WIN32 && !__CYGWIN__
// =======================================================================================
class MockSndbufStream final: public kj::AsyncIoStream {
public:
MockSndbufStream(kj::Own<AsyncIoStream> inner, size_t& window, size_t& written)
: inner(kj::mv(inner)), window(window), written(written) {}
kj::Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override {
return inner->read(buffer, minBytes, maxBytes);
}
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return inner->tryRead(buffer, minBytes, maxBytes);
}
kj::Maybe<uint64_t> tryGetLength() override {
return inner->tryGetLength();
}
kj::Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override {
return inner->pumpTo(output, amount);
}
kj::Promise<void> write(const void* buffer, size_t size) override {
written += size;
return inner->write(buffer, size);
}
kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
for (auto& piece: pieces) written += piece.size();
return inner->write(pieces);
}
kj::Maybe<kj::Promise<uint64_t>> tryPumpFrom(
kj::AsyncInputStream& input, uint64_t amount) override {
return inner->tryPumpFrom(input, amount);
}
kj::Promise<void> whenWriteDisconnected() override { return inner->whenWriteDisconnected(); }
void shutdownWrite() override { return inner->shutdownWrite(); }
void abortRead() override { return inner->abortRead(); }
void getsockopt(int level, int option, void* value, uint* length) override {
if (level == SOL_SOCKET && option == SO_SNDBUF) {
KJ_ASSERT(*length == sizeof(int));
*reinterpret_cast<int*>(value) = window;
} else {
KJ_UNIMPLEMENTED("not implemented for test", level, option);
}
}
private:
kj::Own<AsyncIoStream> inner;
size_t& window;
size_t& written;
};
KJ_TEST("Streaming over RPC") {
kj::EventLoop loop;
kj::WaitScope waitScope(loop);
auto pipe = kj::newTwoWayPipe();
size_t window = 1024;
size_t clientWritten = 0;
size_t serverWritten = 0;
pipe.ends[0] = kj::heap<MockSndbufStream>(kj::mv(pipe.ends[0]), window, clientWritten);
pipe.ends[1] = kj::heap<MockSndbufStream>(kj::mv(pipe.ends[1]), window, serverWritten);
auto ownServer = kj::heap<TestStreamingImpl>();
auto& server = *ownServer;
test::TestStreaming::Client serverCap(kj::mv(ownServer));
TwoPartyClient tpClient(*pipe.ends[0]);
TwoPartyClient tpServer(*pipe.ends[1], serverCap, rpc::twoparty::Side::SERVER);
auto cap = tpClient.bootstrap().castAs<test::TestStreaming>();
// Send stream requests until we can't anymore.
kj::Promise<void> promise = kj::READY_NOW;
uint count = 0;
while (promise.poll(waitScope)) {
promise.wait(waitScope);
auto req = cap.doStreamIRequest();
req.setI(++count);
promise = req.send();
}
// We should have sent... several.
KJ_EXPECT(count > 5);
// Now, cause calls to finish server-side one-at-a-time and check that this causes the client
// side to be willing to send more.
uint countReceived = 0;
for (uint i = 0; i < 50; i++) {
KJ_EXPECT(server.iSum == ++countReceived);
server.iSum = 0;
KJ_ASSERT_NONNULL(server.fulfiller)->fulfill();
KJ_ASSERT(promise.poll(waitScope));
promise.wait(waitScope);
auto req = cap.doStreamIRequest();
req.setI(++count);
promise = req.send();
if (promise.poll(waitScope)) {
// We'll see a couple of instances where completing one request frees up space to make two
// more. This is because the first few requests we made are a little bit larger than the
// rest due to being pipelined on the bootstrap. Once the bootstrap resolves, the request
// size gets smaller.
promise.wait(waitScope);
req = cap.doStreamIRequest();
req.setI(++count);
promise = req.send();
// We definitely shouldn't have freed up stream space for more than two additional requests!
KJ_ASSERT(!promise.poll(waitScope));
}
}
}
} // namespace } // namespace
} // namespace _ } // namespace _
} // namespace capnp } // namespace capnp
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include "serialize-async.h" #include "serialize-async.h"
#include <kj/debug.h> #include <kj/debug.h>
#include <kj/io.h> #include <kj/io.h>
#include <sys/socket.h>
namespace capnp { namespace capnp {
...@@ -167,6 +168,60 @@ private: ...@@ -167,6 +168,60 @@ private:
kj::ArrayPtr<kj::AutoCloseFd> fds; kj::ArrayPtr<kj::AutoCloseFd> fds;
}; };
kj::Own<RpcFlowController> TwoPartyVatNetwork::newStream() {
return RpcFlowController::newVariableWindowController(*this);
}
size_t TwoPartyVatNetwork::getWindow() {
// The socket's send buffer size -- as returned by getsockopt(SO_SNDBUF) -- tells us how much
// data the kernel itself is willing to buffer. The kernel will increase the send buffer size if
// needed to fill the connection's congestion window. So we can cheat and use it as our stream
// window, too, to make sure we saturate said congestion window.
//
// TODO(perf): Unfortunately, this hack breaks down in the presence of proxying. What we really
// want is the window all the way to the endpoint, which could cross multiple connections. The
// first-hop window could be either too big or too small: it's too big if the first hop has
// much higher bandwidth than the full path (causing buffering at the bottleneck), and it's
// too small if the first hop has much lower latency than the full path (causing not enough
// data to be sent to saturate the connection). To handle this, we could either:
// 1. Have proxies be aware of streaming, by flagging streaming calls in the RPC protocol. The
// proxies would then handle backpressure at each hop. This seems simple to implement but
// requires base RPC protocol changes and might require thinking carefully about e-ordering
// implications. Also, it only fixes underutilization; it does not fix buffer bloat.
// 2. Do our own BBR-like computation, where the client measures the end-to-end latency and
// bandwidth based on the observed sends and returns, and then compute the window based on
// that. This seems complicated, but avoids the need for any changes to the RPC protocol.
// In theory it solves both underutilization and buffer bloat. Note that this approach would
// require the RPC system to use a clock, which feels dirty and adds non-determinism.
if (solSndbufUnimplemented) {
return RpcFlowController::DEFAULT_WINDOW_SIZE;
} else {
// TODO(perf): It might be nice to have a tryGetsockopt() that doesn't require catching
// exceptions?
int bufSize = 0;
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
socklen_t len = sizeof(int);
KJ_SWITCH_ONEOF(stream) {
KJ_CASE_ONEOF(s, kj::AsyncIoStream*) {
s->getsockopt(SOL_SOCKET, SO_SNDBUF, &bufSize, &len);
}
KJ_CASE_ONEOF(s, kj::AsyncCapabilityStream*) {
s->getsockopt(SOL_SOCKET, SO_SNDBUF, &bufSize, &len);
}
}
KJ_ASSERT(len == sizeof(bufSize));
})) {
if (exception->getType() != kj::Exception::Type::UNIMPLEMENTED) {
kj::throwRecoverableException(kj::mv(*exception));
}
solSndbufUnimplemented = true;
bufSize = RpcFlowController::DEFAULT_WINDOW_SIZE;
}
return bufSize;
}
}
rpc::twoparty::VatId::Reader TwoPartyVatNetwork::getPeerVatId() { rpc::twoparty::VatId::Reader TwoPartyVatNetwork::getPeerVatId() {
return peerVatId.getRoot<rpc::twoparty::VatId>(); return peerVatId.getRoot<rpc::twoparty::VatId>();
} }
......
...@@ -44,7 +44,8 @@ typedef VatNetwork<rpc::twoparty::VatId, rpc::twoparty::ProvisionId, ...@@ -44,7 +44,8 @@ typedef VatNetwork<rpc::twoparty::VatId, rpc::twoparty::ProvisionId,
TwoPartyVatNetworkBase; TwoPartyVatNetworkBase;
class TwoPartyVatNetwork: public TwoPartyVatNetworkBase, class TwoPartyVatNetwork: public TwoPartyVatNetworkBase,
private TwoPartyVatNetworkBase::Connection { private TwoPartyVatNetworkBase::Connection,
private RpcFlowController::WindowGetter {
// A `VatNetwork` that consists of exactly two parties communicating over an arbitrary byte // A `VatNetwork` that consists of exactly two parties communicating over an arbitrary byte
// stream. This is used to implement the common case of a client/server network. // stream. This is used to implement the common case of a client/server network.
// //
...@@ -91,6 +92,9 @@ private: ...@@ -91,6 +92,9 @@ private:
ReaderOptions receiveOptions; ReaderOptions receiveOptions;
bool accepted = false; bool accepted = false;
bool solSndbufUnimplemented = false;
// Whether stream.getsockopt(SO_SNDBUF) has been observed to throw UNIMPLEMENTED.
kj::Maybe<kj::Promise<void>> previousWrite; kj::Maybe<kj::Promise<void>> previousWrite;
// Resolves when the previous write completes. This effectively serves as the write queue. // Resolves when the previous write completes. This effectively serves as the write queue.
// Becomes null when shutdown() is called. // Becomes null when shutdown() is called.
...@@ -121,10 +125,15 @@ private: ...@@ -121,10 +125,15 @@ private:
// implements Connection ----------------------------------------------------- // implements Connection -----------------------------------------------------
kj::Own<RpcFlowController> newStream() override;
rpc::twoparty::VatId::Reader getPeerVatId() override; rpc::twoparty::VatId::Reader getPeerVatId() override;
kj::Own<OutgoingRpcMessage> newOutgoingMessage(uint firstSegmentWordSize) override; kj::Own<OutgoingRpcMessage> newOutgoingMessage(uint firstSegmentWordSize) override;
kj::Promise<kj::Maybe<kj::Own<IncomingRpcMessage>>> receiveIncomingMessage() override; kj::Promise<kj::Maybe<kj::Own<IncomingRpcMessage>>> receiveIncomingMessage() override;
kj::Promise<void> shutdown() override; kj::Promise<void> shutdown() override;
// implements WindowGetter ---------------------------------------------------
size_t getWindow() override;
}; };
class TwoPartyServer: private kj::TaskSet::ErrorHandler { class TwoPartyServer: private kj::TaskSet::ErrorHandler {
......
...@@ -602,6 +602,25 @@ private: ...@@ -602,6 +602,25 @@ private:
// that other client -- return a reference to the other client, transitively. Otherwise, // that other client -- return a reference to the other client, transitively. Otherwise,
// return a new reference to *this. // return a new reference to *this.
virtual void adoptFlowController(kj::Own<RpcFlowController> flowController) {
// Called when a PromiseClient resolves to another RpcClient. If streaming calls were
// outstanding on the old client, we'd like to keep using the same FlowController on the new
// client, so as to keep the flow steady.
if (this->flowController == nullptr) {
// We don't have any existing flowController so we can adopt this one, yay!
this->flowController = kj::mv(flowController);
} else {
// Apparently, there is an existing flowController. This is an unusual scenario: Apparenly
// we had two stream capabilities, we were streaming to both of them, and they later
// resolved to the same capability. This probably never happens because streaming use cases
// normally call for there to be only one client. But, it's certainly possible, and we need
// to handle it. We'll do the conservative thing and just make sure that all the calls
// finish. This may mean we'll over-buffer temporarily; oh well.
connectionState->tasks.add(flowController->waitAllAcked().attach(kj::mv(flowController)));
}
}
// implements ClientHook ----------------------------------------- // implements ClientHook -----------------------------------------
Request<AnyPointer, AnyPointer> newCall( Request<AnyPointer, AnyPointer> newCall(
...@@ -708,6 +727,9 @@ private: ...@@ -708,6 +727,9 @@ private:
} }
kj::Own<RpcConnectionState> connectionState; kj::Own<RpcConnectionState> connectionState;
kj::Maybe<kj::Own<RpcFlowController>> flowController;
// Becomes non-null the first time a streaming call is made on this capability.
}; };
class ImportClient final: public RpcClient { class ImportClient final: public RpcClient {
...@@ -847,7 +869,7 @@ private: ...@@ -847,7 +869,7 @@ private:
public: public:
PromiseClient(RpcConnectionState& connectionState, PromiseClient(RpcConnectionState& connectionState,
kj::Own<ClientHook> initial, kj::Own<RpcClient> initial,
kj::Promise<kj::Own<ClientHook>> eventual, kj::Promise<kj::Own<ClientHook>> eventual,
kj::Maybe<ImportId> importId) kj::Maybe<ImportId> importId)
: RpcClient(connectionState), : RpcClient(connectionState),
...@@ -906,6 +928,17 @@ private: ...@@ -906,6 +928,17 @@ private:
return connectionState->getInnermostClient(*cap); return connectionState->getInnermostClient(*cap);
} }
void adoptFlowController(kj::Own<RpcFlowController> flowController) override {
if (cap->getBrand() == connectionState.get()) {
// Pass the flow controller on to our inner cap.
kj::downcast<RpcClient>(*cap).adoptFlowController(kj::mv(flowController));
} else {
// We resolved to a capability that isn't another RPC capability. We should simply make
// sure that all the calls complete.
connectionState->tasks.add(flowController->waitAllAcked().attach(kj::mv(flowController)));
}
}
// implements ClientHook ----------------------------------------- // implements ClientHook -----------------------------------------
Request<AnyPointer, AnyPointer> newCall( Request<AnyPointer, AnyPointer> newCall(
...@@ -991,6 +1024,25 @@ private: ...@@ -991,6 +1024,25 @@ private:
void resolve(kj::Own<ClientHook> replacement, bool isError) { void resolve(kj::Own<ClientHook> replacement, bool isError) {
const void* replacementBrand = replacement->getBrand(); const void* replacementBrand = replacement->getBrand();
// If the original capability was used for streaming calls, it will have a
// `flowController` that might still be shepherding those calls. We'll need make sure that
// it doesn't get thrown away. Note that we know that *cap is an RpcClient because resolve()
// is only called once and our constructor required that the initial capability is an
// RpcClient.
KJ_IF_MAYBE(f, kj::downcast<RpcClient>(*cap).flowController) {
if (replacementBrand == connectionState.get()) {
// The new target is on the same connection. It would make a lot of sense to keep using
// the same flow controller if possible.
kj::downcast<RpcClient>(*replacement).adoptFlowController(kj::mv(*f));
} else {
// The new target is something else. The best we can do is wait for the controller to
// drain. New calls will be flow-controlled in a new way without knowing about the old
// controller.
connectionState->tasks.add(f->get()->waitAllAcked().attach(kj::mv(*f)));
}
}
if (replacementBrand != connectionState.get() && if (replacementBrand != connectionState.get() &&
replacementBrand != &ClientHook::NULL_CAPABILITY_BRAND && replacementBrand != &ClientHook::NULL_CAPABILITY_BRAND &&
receivedCall && !isError && connectionState->connection.is<Connected>()) { receivedCall && !isError && connectionState->connection.is<Connected>()) {
...@@ -1511,7 +1563,22 @@ private: ...@@ -1511,7 +1563,22 @@ private:
} }
kj::Promise<void> sendStreaming() override { kj::Promise<void> sendStreaming() override {
KJ_UNIMPLEMENTED("TODO(now)"); if (!connectionState->connection.is<Connected>()) {
// Connection is broken.
return kj::cp(connectionState->connection.get<Disconnected>());
}
KJ_IF_MAYBE(redirect, target->writeTarget(callBuilder.getTarget())) {
// Whoops, this capability has been redirected while we were building the request!
// We'll have to make a new request and do a copy. Ick.
auto replacement = redirect->get()->newCall(
callBuilder.getInterfaceId(), callBuilder.getMethodId(), paramsBuilder.targetSize());
replacement.set(paramsBuilder);
return RequestHook::from(kj::mv(replacement))->sendStreaming();
} else {
return sendStreamingInternal(false);
}
} }
struct TailInfo { struct TailInfo {
...@@ -1571,7 +1638,12 @@ private: ...@@ -1571,7 +1638,12 @@ private:
kj::Promise<kj::Own<RpcResponse>> promise = nullptr; kj::Promise<kj::Own<RpcResponse>> promise = nullptr;
}; };
SendInternalResult sendInternal(bool isTailCall) { struct SetupSendResult: public SendInternalResult {
QuestionId questionId;
Question& question;
};
SetupSendResult setupSend(bool isTailCall) {
// Build the cap table. // Build the cap table.
kj::Vector<int> fds; kj::Vector<int> fds;
auto exports = connectionState->writeDescriptors( auto exports = connectionState->writeDescriptors(
...@@ -1593,8 +1665,14 @@ private: ...@@ -1593,8 +1665,14 @@ private:
question.selfRef = *result.questionRef; question.selfRef = *result.questionRef;
result.promise = paf.promise.attach(kj::addRef(*result.questionRef)); result.promise = paf.promise.attach(kj::addRef(*result.questionRef));
return { kj::mv(result), questionId, question };
}
SendInternalResult sendInternal(bool isTailCall) {
auto result = setupSend(isTailCall);
// Finish and send. // Finish and send.
callBuilder.setQuestionId(questionId); callBuilder.setQuestionId(result.questionId);
if (isTailCall) { if (isTailCall) {
callBuilder.getSendResultsTo().setYourself(); callBuilder.getSendResultsTo().setYourself();
} }
...@@ -1605,14 +1683,46 @@ private: ...@@ -1605,14 +1683,46 @@ private:
})) { })) {
// We can't safely throw the exception from here since we've already modified the question // We can't safely throw the exception from here since we've already modified the question
// table state. We'll have to reject the promise instead. // table state. We'll have to reject the promise instead.
question.isAwaitingReturn = false; result.question.isAwaitingReturn = false;
question.skipFinish = true; result.question.skipFinish = true;
result.questionRef->reject(kj::mv(*exception)); result.questionRef->reject(kj::mv(*exception));
} }
// Send and return. // Send and return.
return kj::mv(result); return kj::mv(result);
} }
kj::Promise<void> sendStreamingInternal(bool isTailCall) {
auto setup = setupSend(isTailCall);
// Finish and send.
callBuilder.setQuestionId(setup.questionId);
if (isTailCall) {
callBuilder.getSendResultsTo().setYourself();
}
kj::Promise<void> flowPromise = nullptr;
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
KJ_CONTEXT("sending RPC call",
callBuilder.getInterfaceId(), callBuilder.getMethodId());
RpcFlowController* flow;
KJ_IF_MAYBE(f, target->flowController) {
flow = *f;
} else {
flow = target->flowController.emplace(
connectionState->connection.get<Connected>()->newStream());
}
flowPromise = flow->send(kj::mv(message), setup.promise.ignoreResult());
})) {
// We can't safely throw the exception from here since we've already modified the question
// table state. We'll have to reject the promise instead.
setup.question.isAwaitingReturn = false;
setup.question.skipFinish = true;
setup.questionRef->reject(kj::cp(*exception));
return kj::mv(*exception);
}
return kj::mv(flowPromise);
}
}; };
class RpcPipeline final: public PipelineHook, public kj::Refcounted { class RpcPipeline final: public PipelineHook, public kj::Refcounted {
...@@ -2996,4 +3106,146 @@ void RpcSystemBase::baseSetFlowLimit(size_t words) { ...@@ -2996,4 +3106,146 @@ void RpcSystemBase::baseSetFlowLimit(size_t words) {
} }
} // namespace _ (private) } // namespace _ (private)
// =======================================================================================
namespace {
class WindowFlowController final: public RpcFlowController, private kj::TaskSet::ErrorHandler {
public:
WindowFlowController(RpcFlowController::WindowGetter& windowGetter)
: windowGetter(windowGetter), tasks(*this) {
state.init<std::queue<QueuedMessage>>();
}
kj::Promise<void> send(kj::Own<OutgoingRpcMessage> message, kj::Promise<void> ack) override {
KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(queue, std::queue<QueuedMessage>) {
auto size = message->sizeInWords() * sizeof(capnp::word);
maxMessageSize = kj::max(size, maxMessageSize);
auto paf = kj::newPromiseAndFulfiller<void>();
queue.push({kj::mv(message), kj::mv(ack), kj::mv(paf.fulfiller), size});
pumpQueue(queue);
return kj::mv(paf.promise);
}
KJ_CASE_ONEOF(exception, kj::Exception) {
return kj::cp(exception);
}
}
KJ_UNREACHABLE;
}
kj::Promise<void> waitAllAcked() override {
KJ_IF_MAYBE(q, state.tryGet<std::queue<QueuedMessage>>()) {
if (!q->empty()) {
auto paf = kj::newPromiseAndFulfiller<kj::Promise<void>>();
emptyFulfiller = kj::mv(paf.fulfiller);
return kj::mv(paf.promise);
}
}
return tasks.onEmpty();
}
private:
RpcFlowController::WindowGetter& windowGetter;
size_t inFlight = 0;
size_t maxMessageSize = 0;
struct QueuedMessage {
kj::Own<OutgoingRpcMessage> message;
kj::Promise<void> ack;
kj::Own<kj::PromiseFulfiller<void>> sentFulfiller;
size_t size;
};
kj::OneOf<std::queue<QueuedMessage>, kj::Exception> state;
kj::Maybe<kj::Own<kj::PromiseFulfiller<kj::Promise<void>>>> emptyFulfiller;
kj::TaskSet tasks;
void pumpQueue(std::queue<QueuedMessage>& queue) {
size_t window = windowGetter.getWindow();
// We extend the window by maxMessageSize to avoid a pathological situation when a message
// is larger than the window size. Otherwise, after sending that message, we would end up
// not sending any others until the ack was received, wasting a round trip's worth of
// bandwidth.
while (!queue.empty() && inFlight < window + maxMessageSize) {
auto front = kj::mv(queue.front());
queue.pop();
front.sentFulfiller->rejectIfThrows([&]() {
front.message->send();
inFlight += front.size;
tasks.add(front.ack.then([this, size = front.size]() {
inFlight -= size;
KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(queue, std::queue<QueuedMessage>) {
pumpQueue(queue);
}
KJ_CASE_ONEOF(exception, kj::Exception) {
// A previous call failed, but this one -- which was already in-flight at the time --
// ended up succeeding. That may indicate that the server side is not properly
// handling streaming error propagation. Nothing much we can do about it here though.
}
}
}));
front.sentFulfiller->fulfill();
});
}
KJ_IF_MAYBE(f, emptyFulfiller) {
if (queue.empty()) {
f->get()->fulfill(tasks.onEmpty());
}
}
}
void taskFailed(kj::Exception&& exception) override {
KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(queue, std::queue<QueuedMessage>) {
// Fail out all pending sends.
while (!queue.empty()) {
queue.front().sentFulfiller->reject(kj::cp(exception));
queue.pop();
}
// Fail out all future sends.
state = kj::mv(exception);
}
KJ_CASE_ONEOF(exception, kj::Exception) {
// ignore redundant exception
}
}
}
};
class FixedWindowFlowController final
: public RpcFlowController, public RpcFlowController::WindowGetter {
public:
FixedWindowFlowController(size_t windowSize): windowSize(windowSize), inner(*this) {}
kj::Promise<void> send(kj::Own<OutgoingRpcMessage> message, kj::Promise<void> ack) override {
return inner.send(kj::mv(message), kj::mv(ack));
}
kj::Promise<void> waitAllAcked() override {
return inner.waitAllAcked();
}
size_t getWindow() override { return windowSize; }
private:
size_t windowSize;
WindowFlowController inner;
};
} // namespace
kj::Own<RpcFlowController> RpcFlowController::newFixedWindowController(size_t windowSize) {
return kj::heap<FixedWindowFlowController>(windowSize);
}
kj::Own<RpcFlowController> RpcFlowController::newVariableWindowController(WindowGetter& getter) {
return kj::heap<WindowFlowController>(getter);
}
} // namespace capnp } // namespace capnp
...@@ -343,6 +343,58 @@ public: ...@@ -343,6 +343,58 @@ public:
// implementations can compute the size more cheaply by summing segment sizes. // implementations can compute the size more cheaply by summing segment sizes.
}; };
class RpcFlowController {
// Tracks a particular RPC stream in order to implement a flow control algorithm.
public:
virtual kj::Promise<void> send(kj::Own<OutgoingRpcMessage> message, kj::Promise<void> ack) = 0;
// Like calling message->send(), but the promise resolves when it's a good time to send the
// next message.
//
// `ack` is a promise that resolves when the message has been acknowledged from the other side.
// In practice, `message` is typically a `Call` message and `ack` is a `Return`. Note that this
// means `ack` counts not only time to transmit the message but also time for the remote
// application to process the message. The flow controller is expected to apply backpressure if
// the remote application responds slowly. If `ack` rejects, then all outstanding and future
// sends will propagate the exception.
//
// Note that messages sent with this method must still be delivered in the same order as if they
// had been sent with `message->send()`; they cannot be delayed until later. This is important
// because the message may introduce state changes in the RPC system that later messages rely on,
// such as introducing a new Question ID that a later message may reference. Thus, the controller
// can only create backpressure by having the returned promise resolve slowly.
//
// Dropping the returned promise does not cancel the send. Once send() is called, there's no way
// to stop it.
virtual kj::Promise<void> waitAllAcked() = 0;
// Wait for all `ack`s previously passed to send() to finish. It is an error to call send() again
// after this.
// ---------------------------------------------------------------------------
// Common implementations.
static kj::Own<RpcFlowController> newFixedWindowController(size_t windowSize);
// Constructs a flow controller that implements a strict fixed window of the given size. In other
// words, the controller will throttle the stream when the total bytes in-flight exceeds the
// window.
class WindowGetter {
public:
virtual size_t getWindow() = 0;
};
static kj::Own<RpcFlowController> newVariableWindowController(WindowGetter& getter);
// Like newFixedWindowController(), but the window size is allowed to vary over time. Useful if
// you have a technique for estimating one good window size for the connection as a whole but not
// for individual streams. Keep in mind, though, that in situations where the other end of the
// connection is merely proxying capabilities from a variety of final destinations across a
// variety of networks, no single window will be appropriate for all streams.
static constexpr size_t DEFAULT_WINDOW_SIZE = 65536;
// The window size used by the default implementation of Connection::newStream().
};
template <typename VatId, typename ProvisionId, typename RecipientId, template <typename VatId, typename ProvisionId, typename RecipientId,
typename ThirdPartyCapId, typename JoinResult> typename ThirdPartyCapId, typename JoinResult>
class VatNetwork: public _::VatNetworkBase { class VatNetwork: public _::VatNetworkBase {
...@@ -387,6 +439,19 @@ public: ...@@ -387,6 +439,19 @@ public:
// connection is ready, so that the caller doesn't need to know the difference. // connection is ready, so that the caller doesn't need to know the difference.
public: public:
virtual kj::Own<RpcFlowController> newStream()
{ return RpcFlowController::newFixedWindowController(65536); }
// Construct a flow controller for a new stream on this connection. The controller can be
// passed into OutgoingRpcMessage::sendStreaming().
//
// The default implementation returns a dummy stream controller that just applies a fixed
// window of 64k to everything. This always works but may constrain throughput on networks
// where the bandwidth-delay product is high, while conversely providing too much buffer when
// the bandwidth-delay product is low.
//
// TODO(perf): We should introduce a flow controller implementation that uses a clock to
// measure RTT and bandwidth and dynamically update the window size, like BBR.
// Level 0 features ---------------------------------------------- // Level 0 features ----------------------------------------------
virtual typename VatId::Reader getPeerVatId() = 0; virtual typename VatId::Reader getPeerVatId() = 0;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment