Commit f203027c authored by Kenton Varda's avatar Kenton Varda

Allow capability servers to redirect themselves.

With this change, a capability server can implement `Capability::Server::shortenPath()` to return a promise that resolves to a new capability in the future. Once it resolves, further calls can be redirected to the new capability.

The RPC system will automatically apply path-shortening on resolution. For example:
* Say Alice and Bob are two vats communicating via RPC. Alice holds a capability to an object hosted by Bob. Bob's object later resolves itself (via shortenPath()) to a capability pointing to an object hoste by Alice. Once everything settles, if Alice makes calls on the capability that used to point to Bob, those calls will go directly to the local object that Bob's object resolved to, without crossing the network at all.
* In a level 3 RPC implementation (not yet implemented), Bob could instead resolve his capability to point to a capability hosted by Carol. Alice would then automatically create a direct connection to Carol and start using it to make further calls.

All this works automatically because the implementation of `shortenPath()` is based on existing infrastructure designed to support promise pipelining. If a capability server implements `shortenPath()` to return non-null, then capabilities pointing to it will appear to be unsettled promise capabilities. `whenResolved()` or `whenMoreResolved()` can be called on these to wait for them to "resolve" to the shorter path later on. Up until this point, RPC calls on a promise capability couldn't possibly return until the capability was settled, but nothing actually depended on this in practice.

This feature will be used to implement dynamic path shortening through KJ streams and `pumpTo()`.
parent fe6024d0
......@@ -82,6 +82,10 @@ kj::Promise<kj::Maybe<int>> Capability::Client::getFd() {
}
}
kj::Maybe<kj::Promise<Capability::Client>> Capability::Server::shortenPath() {
return nullptr;
}
Capability::Server::DispatchCallResult Capability::Server::internalUnimplemented(
const char* actualInterfaceName, uint64_t requestedTypeId) {
return {
......@@ -471,6 +475,13 @@ public:
LocalClient(kj::Own<Capability::Server>&& serverParam)
: server(kj::mv(serverParam)) {
server->thisHook = this;
resolveTask = server->shortenPath().map([this](kj::Promise<Capability::Client> promise) {
return promise.then([this](Capability::Client&& cap) {
auto hook = ClientHook::from(kj::mv(cap));
resolved = hook->addRef();
}).fork();
});
}
LocalClient(kj::Own<Capability::Server>&& serverParam,
_::CapabilityServerSetBase& capServerSet, void* ptr)
......@@ -533,11 +544,19 @@ public:
}
kj::Maybe<ClientHook&> getResolved() override {
return nullptr;
return resolved.map([](kj::Own<ClientHook>& hook) -> ClientHook& { return *hook; });
}
kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
return nullptr;
KJ_IF_MAYBE(r, resolved) {
return kj::Promise<kj::Own<ClientHook>>(r->get()->addRef());
} else KJ_IF_MAYBE(t, resolveTask) {
return t->addBranch().then([this]() {
return KJ_ASSERT_NONNULL(resolved)->addRef();
});
} else {
return nullptr;
}
}
kj::Own<ClientHook> addRef() override {
......@@ -551,7 +570,7 @@ public:
return &BRAND;
}
kj::Promise<void*> getLocalServer(_::CapabilityServerSetBase& capServerSet) {
kj::Maybe<kj::Promise<void*>> getLocalServer(_::CapabilityServerSetBase& capServerSet) {
// If this is a local capability created through `capServerSet`, return the underlying Server.
// Otherwise, return nullptr. Default implementation (which everyone except LocalClient should
// use) always returns nullptr.
......@@ -580,10 +599,10 @@ public:
return kj::newAdaptedPromise<kj::Promise<void>, BlockedCall>(*this)
.then([this]() { return ptr; });
} else {
return ptr;
return kj::Promise<void*>(ptr);
}
} else {
return (void*)nullptr;
return nullptr;
}
}
......@@ -596,6 +615,9 @@ private:
_::CapabilityServerSetBase* capServerSet = nullptr;
void* ptr = nullptr;
kj::Maybe<kj::ForkedPromise<void>> resolveTask;
kj::Maybe<kj::Own<ClientHook>> resolved;
class BlockedCall {
public:
BlockedCall(kj::PromiseFulfiller<kj::Promise<void>>& fulfiller, LocalClient& client,
......@@ -896,21 +918,35 @@ kj::Promise<void*> CapabilityServerSetBase::getLocalServerInternal(Capability::C
ClientHook* hook = client.hook.get();
// Get the most-resolved-so-far version of the hook.
KJ_IF_MAYBE(h, hook->getResolved()) {
hook = h;
};
for (;;) {
KJ_IF_MAYBE(h, hook->getResolved()) {
hook = h;
} else {
break;
}
}
// Try to unwrap that.
if (hook->getBrand() == &LocalClient::BRAND) {
KJ_IF_MAYBE(promise, kj::downcast<LocalClient>(*hook).getLocalServer(*this)) {
// This is definitely a member of our set and will resolve to non-null. We just have to wait
// for any existing streaming calls to complete.
return kj::mv(*promise);
}
}
// OK, the capability isn't part of this set.
KJ_IF_MAYBE(p, hook->whenMoreResolved()) {
// This hook is an unresolved promise. We need to wait for it.
// This hook is an unresolved promise. It might resolve eventually to a local server, so wait
// for it.
return p->attach(hook->addRef())
.then([this](kj::Own<ClientHook>&& resolved) {
Capability::Client client(kj::mv(resolved));
return getLocalServerInternal(client);
});
} else if (hook->getBrand() == &LocalClient::BRAND) {
return kj::downcast<LocalClient>(*hook).getLocalServer(*this);
} else {
return (void*)nullptr;
// Cap is settled, so it definitely will never resolve to a member of this set.
return kj::implicitCast<void*>(nullptr);
}
}
......
......@@ -407,6 +407,23 @@ public:
// returns that FD. When FD passing has been enabled in the RPC layer, this FD may be sent to
// other processes along with the capability.
virtual kj::Maybe<kj::Promise<Capability::Client>> shortenPath();
// If this returns non-null, then it is a promise which, when resolved, points to a new
// capability to which future calls can be sent. Use this in cases where an object implementation
// might discover a more-optimized path some time after it starts.
//
// Implementing this (and returning non-null) will cause the capability to be advertised as a
// promise at the RPC protocol level. Once the promise returned by shortenPath() resolves, the
// remote client will receive a `Resolve` message updating it to point at the new destination.
//
// `shortenPath()` can also be used as a hack to shut up the client. If shortenPath() returns
// a promise that resolves to an exception, then the client will be notified that the capability
// is now broken. Assuming the client is using a correct RPC implemnetation, this should cause
// all further calls initiated by the client to this capability to immediately fail client-side,
// sparing the server's bandwidth.
//
// The default implementation always returns nullptr.
// TODO(someday): Method which can optionally be overridden to implement Join when the object is
// a proxy.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment