Commit 3070f627 authored by Kenton Varda's avatar Kenton Varda

Implement handling of Resolve messages. Though currently they are never sent.

parent e4a5344b
......@@ -278,10 +278,9 @@ TEST_F(RpcTest, Basic) {
auto promise1 = request1.send();
auto request2 = client.bazRequest();
auto promise2 = request2.send();
// We used to call bar() after baz(), hence the numbering, but this masked the case where the
// RPC system actually disconnected on bar() (thus returning an exception, which we decided
// was expected).
bool barFailed = false;
auto request3 = client.barRequest();
auto promise3 = loop.there(request3.send(),
......@@ -291,6 +290,10 @@ TEST_F(RpcTest, Basic) {
barFailed = true;
auto request2 = client.bazRequest();
auto promise2 = request2.send();
EXPECT_EQ(0, restorer.callCount);
auto response1 = loop.wait(kj::mv(promise1));
......@@ -241,6 +241,7 @@ public:
tasks(eventLoop, *this) {
tables.getWithoutLock().resolutionChainTail = kj::refcounted<ResolutionChain>();
kj::Own<const ClientHook> restore(ObjectPointer::Reader objectId) {
......@@ -255,7 +256,8 @@ public:
// We need a dummy paramCaps since null normally indicates that the question has completed.
question.paramCaps = kj::heap<CapInjectorImpl>(*this);
questionRef = kj::refcounted<QuestionRef>(*this, questionId, kj::mv(paf.fulfiller));
questionRef = kj::refcounted<QuestionRef>(*this, questionId, kj::mv(paf.fulfiller),
question.selfRef = *questionRef;
......@@ -347,6 +349,8 @@ public:
class ResolutionChain;
class RpcClient;
class ImportClient;
class PromiseClient;
class CapInjectorImpl;
......@@ -413,9 +417,14 @@ private:
Import& operator=(Import&&) = default;
// If we don't explicitly write all this, we get some stupid error deep in STL.
kj::Maybe<ImportClient&> client;
kj::Maybe<ImportClient&> importClient;
// Becomes null when the import is destroyed.
kj::Maybe<RpcClient&> appClient;
// Either a copy of importClient, or, in the case of promises, the wrapping PromiseClient.
// Becomes null when it is discarded *or* when the import is destroyed (e.g. the promise is
// resolved and the import is no longer needed).
kj::Maybe<kj::Own<kj::PromiseFulfiller<kj::Own<const ClientHook>>>> promiseFulfiller;
// If non-null, the import is a promise.
......@@ -438,6 +447,12 @@ private:
std::unordered_map<const ClientHook*, ExportId> exportsByCap;
// Maps already-exported ClientHook objects to their ID in the export table.
kj::Own<ResolutionChain> resolutionChainTail;
// The end of the resolution chain. This node actually isn't filled in yet, but it will be
// filled in and the chain will be extended with a new node any time a `Resolve` is received.
// CapExtractors need to hold a ref to resolutionChainTail to prevent resolved promises from
// becoming invalid while the app is still processing the message. See `ResolutionChain`.
kj::Maybe<kj::Exception> networkException;
// If the connection has failed, this is the exception describing the failure. All future
// calls should throw this exception.
......@@ -446,6 +461,68 @@ private:
kj::TaskSet tasks;
// =====================================================================================
class ResolutionChain: public kj::Refcounted {
// A chain of pending promise resolutions which may affect messages that are still being
// processed.
// When a `Resolve` message comes in, we can't just handle it and then release the original
// promise import all at once, because it's possible that the application is still processing
// the `params` or `results` from a previous call, and that it will encounter an instance of
// the promise as it does. We need to hold off on the release until we've actually gotten
// through all outstanding messages.
// To that end, we have the resolution chain. Each time a `CapExtractorImpl` is created --
// representing a message to be consumed by the application -- it takes a reference to the
// current end of the chain. When a `Resolve` message arrives, it is added to the end of the
// chain, and thus all `CapExtractorImpl`s that exist at that point now hold a reference to it,
// but new `CapExtractorImpl`s will not. Once all references are dropped, the original promise
// can be released.
// The connection state actually holds one instance of ResolutionChain which doesn't yet have
// a promise attached to it, representing the end of the chain. This is what allows a new
// resolution to be "added to the end" and have existing `CapExtractorImpl`s suddenly be
// holding a reference to it.
kj::Own<ResolutionChain> add(ExportId importId,
kj::Own<const ClientHook>&& replacement) {
// Add the a new resolution to the chain. Returns the new end-of-chain.
this->importId = importId;
this->replacement = kj::mv(replacement);
auto result = kj::refcounted<ResolutionChain>();
next = kj::addRef(*result);
filled = true;
return kj::mv(result);
kj::Maybe<kj::Own<const ClientHook>> find(ExportId importId) const {
// Look for the given import ID in the resolution chain.
const ResolutionChain* ptr = this;
while (ptr->filled) {
if (ptr->importId == importId) {
return ptr->replacement->addRef();
ptr = ptr->next;
return nullptr;
kj::Own<const ResolutionChain> next;
bool filled = false;
ExportId importId;
kj::Own<const ClientHook> replacement;
// =====================================================================================
// ClientHook implementations
......@@ -547,10 +624,11 @@ private:
// Remove self from the import table, if the table is still pointing at us. (It's possible
// that another thread attempted to obtain this import just as the destructor started, in
// which case that other thread will have constructed a new ImportClient and placed it in
// the import table.)
// the import table. Therefore, we must actually verify that the import table points at
// this object.)
auto lock = connectionState->tables.lockExclusive();
KJ_IF_MAYBE(import, lock->imports.find(importId)) {
KJ_IF_MAYBE(i, import->client) {
KJ_IF_MAYBE(i, import->importClient) {
if (i == this) {
......@@ -564,17 +642,9 @@ private:
kj::Maybe<kj::Own<ImportClient>> tryAddRemoteRef() {
// Add a new RemoteRef and return a new ref to this client representing it. Returns null
// if this client is being deleted in another thread, in which case the caller should
// construct a new one.
KJ_IF_MAYBE(ref, kj::tryAddRef(*this)) {
void addRemoteRef() {
// Add a new RemoteRef and return a new ref to this client representing it.
return kj::mv(*ref);
} else {
return nullptr;
kj::Maybe<ExportId> writeDescriptor(
......@@ -673,9 +743,11 @@ private:
PromiseClient(const RpcConnectionState& connectionState,
kj::Own<const ClientHook> initial,
kj::Promise<kj::Own<const ClientHook>> eventual)
kj::Promise<kj::Own<const ClientHook>> eventual,
kj::Maybe<ExportId> importId)
: RpcClient(connectionState),
[this](kj::Own<const ClientHook>&& resolution) {
......@@ -692,6 +764,23 @@ private:
~PromiseClient() noexcept(false) {
KJ_IF_MAYBE(id, importId) {
// This object is representing an import promise. That means the import table may still
// contain a pointer back to it. Remove that pointer. Note that we have to verify that
// the import still exists and the pointer still points back to this object because this
// object may actually outlive the import.
auto lock = connectionState->tables.lockExclusive();
KJ_IF_MAYBE(import, lock->imports.find(*id)) {
KJ_IF_MAYBE(c, import->appClient) {
if (c == this) {
import->appClient = nullptr;
kj::Maybe<ExportId> writeDescriptor(
rpc::CapDescriptor::Builder descriptor, Tables& tables) const override {
auto cap = inner.lockExclusive()->get()->addRef();
......@@ -716,6 +805,7 @@ private:
kj::MutexGuarded<kj::Own<const ClientHook>> inner;
kj::Maybe<ExportId> importId;
kj::ForkedPromise<kj::Own<const ClientHook>> fork;
// Keep this last, because the continuation uses *this, so it should be destroyed first to
......@@ -784,8 +874,10 @@ private:
// Reads CapDescriptors from a received message.
CapExtractorImpl(const RpcConnectionState& connectionState)
: connectionState(connectionState) {}
CapExtractorImpl(const RpcConnectionState& connectionState,
kj::Own<const ResolutionChain> resolutionChain)
: connectionState(connectionState),
resolutionChain(kj::mv(resolutionChain)) {}
~CapExtractorImpl() noexcept(false) {
KJ_ASSERT(retainedCaps.getWithoutLock().size() == 0,
......@@ -806,9 +898,27 @@ private:
return (count * sizeof(ExportId) + (sizeof(ExportId) - 1)) / sizeof(word);
Orphan<List<ExportId>> finalizeRetainedCaps(Orphanage orphanage) {
// Called on finalization, when the lock is no longer needed.
struct FinalizedRetainedCaps {
// List of capabilities extracted from this message which are to be retained past the
// message's release.
Orphan<List<ExportId>> exportList;
// List of export IDs, to be placed in the Return/Finish message.
kj::Vector<kj::Own<const ClientHook>> refs;
// List of ClientHooks which need to be kept live until the message is sent, to prevent
// their premature release.
FinalizedRetainedCaps finalizeRetainedCaps(Orphanage orphanage) {
// Build the list of export IDs found in this message which are to be retained past the
// message's release.
// `capsToKeepUntilSend` will be filled in with
// Called on finalization, when all extractions have ceased, so we can skip the lock.
kj::Vector<ExportId> retainedCaps = kj::mv(this->retainedCaps.getWithoutLock());
kj::Vector<kj::Own<const ClientHook>> refs(retainedCaps.size());
auto lock = connectionState.tables.lockExclusive();
......@@ -816,11 +926,13 @@ private:
for (ExportId importId: retainedCaps) {
// Check if the import still exists under this ID.
KJ_IF_MAYBE(import, lock->imports.find(importId)) {
KJ_IF_MAYBE(i, import->client) {
if (i->tryAddRemoteRef() != nullptr) {
// Import indeed still exists! We are responsible for retaining it.
// TODO(now): Do we need to hold on to the ref that tryAddRemoteRef() returned?
KJ_IF_MAYBE(ic, import->importClient) {
KJ_IF_MAYBE(ref, kj::tryAddRef(*ic)) {
// Import indeed still exists! We'll return it in the retained caps, which means it
// now has a new remote ref.
*actualRetained++ = importId;
......@@ -836,24 +948,65 @@ private:
resultBuilder.set(count++, *iter);
return kj::mv(result);
return FinalizedRetainedCaps { kj::mv(result), kj::mv(refs) };
static kj::Own<const ClientHook> extractCapAndAddRef(
const RpcConnectionState& connectionState, Tables& lockedTables,
rpc::CapDescriptor::Reader descriptor) {
// Interpret the given capability descriptor and, if it is an import, immediately give it
// a remote ref. This is called when interpreting messages that have a CapabilityDescriptor
// but do not have a corresponding response message where a list of retained caps is given.
// In these cases, the cap is always assumed retained, and must be explicitly released.
// For example, the 'Resolve' message contains a capability which is presumed to be retained.
return extractCapImpl(connectionState, lockedTables, descriptor,
*lockedTables.resolutionChainTail, nullptr);
// implements CapDescriptor ------------------------------------------------
kj::Own<const ClientHook> extractCap(rpc::CapDescriptor::Reader descriptor) const override {
return extractCapImpl(connectionState, *connectionState.tables.lockExclusive(), descriptor,
*resolutionChain, retainedCaps);
const RpcConnectionState& connectionState;
kj::Own<const ResolutionChain> resolutionChain;
// Reference to the resolution chain, which prevents any promises that might be extracted from
// this message from being invalidated by `Resolve` messages before extraction is finished.
// Simply holding on to the chain keeps the import table entries valid.
kj::MutexGuarded<kj::Vector<ExportId>> retainedCaps;
// Imports which we are responsible for retaining, should they still exist at the time that
// this message is released.
static kj::Own<const ClientHook> extractCapImpl(
const RpcConnectionState& connectionState, Tables& tables,
rpc::CapDescriptor::Reader descriptor,
const ResolutionChain& resolutionChain,
kj::Maybe<const kj::MutexGuarded<kj::Vector<ExportId>>&> retainedCaps) {
switch (descriptor.which()) {
case rpc::CapDescriptor::SENDER_HOSTED:
case rpc::CapDescriptor::SENDER_PROMISE: {
ExportId importId = descriptor.getSenderHosted();
auto lock = connectionState.tables.lockExclusive();
// First check to see if this import ID is a promise that has resolved since when this
// message was received. In this case, the original import ID will already have been
// dropped and could even have been reused for another capability. Luckily, the
// resolution chain holds the capability we actually want.
KJ_IF_MAYBE(resolution, resolutionChain.find(importId)) {
return kj::mv(*resolution);
auto& import = lock->imports[importId];
KJ_IF_MAYBE(i, import.client) {
// No recent resolutions. Check the import table then.
auto& import = tables.imports[importId];
KJ_IF_MAYBE(c, import.appClient) {
// The import is already on the table, but it could be being deleted in another
// thread.
KJ_IF_MAYBE(ref, kj::tryAddRef(*i)) {
KJ_IF_MAYBE(ref, kj::tryAddRef(*c)) {
// We successfully grabbed a reference to the import without it being deleted in
// another thread. Since this import already exists, we don't have to take
// responsibility for retaining it. We can just return the existing object and
......@@ -865,9 +1018,17 @@ private:
// No import for this ID exists currently, so create one.
kj::Own<ImportClient> importClient =
kj::refcounted<ImportClient>(connectionState, importId);
import.client = *importClient;
import.importClient = *importClient;
KJ_IF_MAYBE(rc, retainedCaps) {
// We need to retain this import later if it still exists.
} else {
// Automatically increment the refcount.
kj::Own<ClientHook> result;
kj::Own<RpcClient> result;
if (descriptor.which() == rpc::CapDescriptor::SENDER_PROMISE) {
// TODO(now): Check for pending `Resolve` messages replacing this import ID, and if
// one exists, use that client instead.
......@@ -876,29 +1037,26 @@ private:
import.promiseFulfiller = kj::mv(paf.fulfiller);
result = kj::refcounted<PromiseClient>(
connectionState, kj::mv(importClient), kj::mv(paf.promise));
connectionState, kj::mv(importClient), kj::mv(paf.promise), importId);
} else {
result = kj::mv(importClient);
// Note that we need to retain this import later if it still exists.
import.appClient = *result;
return kj::mv(result);
case rpc::CapDescriptor::RECEIVER_HOSTED: {
auto lock = connectionState.tables.lockExclusive(); // TODO(perf): shared?
KJ_IF_MAYBE(exp, lock->exports.find(descriptor.getReceiverHosted())) {
KJ_IF_MAYBE(exp, tables.exports.find(descriptor.getReceiverHosted())) {
return exp->clientHook->addRef();
return newBrokenCap("invalid 'receiverHosted' export ID");
case rpc::CapDescriptor::RECEIVER_ANSWER: {
auto lock = connectionState.tables.lockExclusive();
auto promisedAnswer = descriptor.getReceiverAnswer();
KJ_IF_MAYBE(answer, lock->answers.find(promisedAnswer.getQuestionId())) {
KJ_IF_MAYBE(answer, tables.answers.find(promisedAnswer.getQuestionId())) {
if (answer->active) {
KJ_IF_MAYBE(pipeline, answer->pipeline) {
KJ_IF_MAYBE(ops, toPipelineOps(promisedAnswer.getTransform())) {
......@@ -920,13 +1078,6 @@ private:
return newBrokenCap("unknown CapDescriptor type");
const RpcConnectionState& connectionState;
kj::MutexGuarded<kj::Vector<ExportId>> retainedCaps;
// Imports which we are responsible for retaining, should they still exist at the time that
// this message is released.
// -----------------------------------------------------------------
......@@ -944,7 +1095,6 @@ private:
if (lock->networkException == nullptr) {
for (auto exportId: exports) {
KJ_DBG(&connectionState, exportId);
auto& exp = KJ_ASSERT_NONNULL(lock->exports.find(exportId));
if (--exp.refcount == 0) {
......@@ -1042,21 +1192,24 @@ private:
inline QuestionRef(const RpcConnectionState& connectionState, QuestionId id,
kj::Own<kj::PromiseFulfiller<kj::Own<const RpcResponse>>> fulfiller)
kj::Own<kj::PromiseFulfiller<kj::Own<const RpcResponse>>> fulfiller,
kj::Own<const ResolutionChain> resolutionChain)
: connectionState(kj::addRef(connectionState)), id(id), fulfiller(kj::mv(fulfiller)),
resultCaps(connectionState) {}
resultCaps(connectionState, kj::mv(resolutionChain)) {}
~QuestionRef() {
// Send the "Finish" message.
auto message = connectionState->connection->newOutgoingMessage(
messageSizeHint<rpc::Finish>() + resultCaps.retainedListSizeHint(true));
auto builder = message->getBody().getAs<rpc::Message>().initFinish();
auto retainedCaps = resultCaps.finalizeRetainedCaps(
// Check if the question has returned and, if so, remove it from the table.
// Remove question ID from the table. Must do this *after* sending `Finish` to ensure that
......@@ -1157,7 +1310,8 @@ private:
question.paramCaps = kj::mv(injector);
questionRef = kj::refcounted<QuestionRef>(
*connectionState, questionId, kj::mv(paf.fulfiller));
*connectionState, questionId, kj::mv(paf.fulfiller),
question.selfRef = *questionRef;
......@@ -1170,7 +1324,7 @@ private:
auto forkedPromise = connectionState->eventLoop.fork(kj::mv(promise));
auto appPromise = forkedPromise.addBranch().thenInAnyThread(
[](kj::Own<const RpcResponse>&& response) {
[=](kj::Own<const RpcResponse>&& response) {
auto reader = response->getResults();
return Response<ObjectPointer>(reader, kj::mv(response));
......@@ -1244,7 +1398,7 @@ private:
return kj::refcounted<PromiseClient>(
*connectionState, kj::mv(pipelineClient), kj::mv(resolutionPromise));
*connectionState, kj::mv(pipelineClient), kj::mv(resolutionPromise), nullptr);
} else if (lock->is<Resolved>()) {
return lock->get<Resolved>()->getResults().getPipelinedCap(ops);
} else {
......@@ -1346,11 +1500,12 @@ private:
class RpcCallContext final: public CallContextHook, public kj::Refcounted {
RpcCallContext(const RpcConnectionState& connectionState, QuestionId questionId,
kj::Own<IncomingRpcMessage>&& request, const ObjectPointer::Reader& params)
kj::Own<IncomingRpcMessage>&& request, const ObjectPointer::Reader& params,
kj::Own<const ResolutionChain> resolutionChain)
: connectionState(kj::addRef(connectionState)),
requestCapExtractor(connectionState, kj::mv(resolutionChain)),
returnMessage(nullptr) {}
......@@ -1360,8 +1515,9 @@ private:
if (response == nullptr) getResults(1); // force initialization of response
auto retainedCaps = requestCapExtractor.finalizeRetainedCaps(
......@@ -1374,8 +1530,9 @@ private:
auto builder = message->getBody().initAs<rpc::Message>().initReturn();
auto retainedCaps = requestCapExtractor.finalizeRetainedCaps(
fromException(exception, builder.initException());
......@@ -1388,8 +1545,9 @@ private:
auto builder = message->getBody().initAs<rpc::Message>().initReturn();
auto retainedCaps = requestCapExtractor.finalizeRetainedCaps(
......@@ -1626,7 +1784,7 @@ private:
case rpc::Message::RESOLVE:
// TODO(now)
case rpc::Message::RELEASE:
......@@ -1720,8 +1878,11 @@ private:
QuestionId questionId = call.getQuestionId();
// Note: resolutionChainTail couldn't possibly be changing here because we only handle one
// message at a time, so we can hold off locking the tables for a bit longer.
auto context = kj::refcounted<RpcCallContext>(
*this, questionId, kj::mv(message), call.getParams());
*this, questionId, kj::mv(message), call.getParams(),
auto promiseAndPipeline = capability->call(
call.getInterfaceId(), call.getMethodId(), context->addRef());
......@@ -1753,6 +1914,12 @@ private:
}, [contextPtr](kj::Exception&& exception) {
}).then([]() {
// Success.
}, [&](kj::Exception&& exception) {
// We never actually wait on `asyncOp` so we need to manually report exceptions.
// TODO(cleanup): Perhaps there should be a better, more-automated approach to this?
......@@ -1851,6 +2018,51 @@ private:
// ---------------------------------------------------------------------------
// Level 1
void handleResolve(const rpc::Resolve::Reader& resolve) {
kj::Own<ResolutionChain> oldResolutionChainTail; // must be freed outside of lock
auto lock = tables.lockExclusive();
kj::Own<const ClientHook> replacement;
// Extract the replacement capability.
switch (resolve.which()) {
case rpc::Resolve::CAP:
replacement = CapExtractorImpl::extractCapAndAddRef(*this, *lock, resolve.getCap());
case rpc::Resolve::EXCEPTION:
replacement = newBrokenCap(toException(resolve.getException()));
case rpc::Resolve::CANCELED:
// Right, this can't possibly affect anything, then.
// TODO(now): Am I doing something wrong or is this not needed?
KJ_FAIL_REQUIRE("Unknown 'Resolve' type.") { return; }
// Extend the resolution chain.
auto oldTail = kj::mv(lock->resolutionChainTail);
lock->resolutionChainTail = oldTail->add(resolve.getPromiseId(), kj::mv(replacement));
lock.release(); // in case oldTail is destroyed
// If the import is on the table, fulfill it.
KJ_IF_MAYBE(import, lock->imports.find(resolve.getPromiseId())) {
KJ_IF_MAYBE(fulfiller, import->promiseFulfiller) {
// OK, this is in fact an unfulfilled promise!
} else if (import->importClient != nullptr) {
// It appears this is a valid entry on the import table, but was not expected to be a
// promise.
KJ_FAIL_REQUIRE("Got 'Resolve' for a non-promise import.") { break; }
// ---------------------------------------------------------------------------
// Level 2
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment