Commit 98696a57 authored by Kenton Varda's avatar Kenton Varda

Correctly generate Resolve messages.

parent 3070f627
...@@ -197,6 +197,10 @@ public: ...@@ -197,6 +197,10 @@ public:
return VoidPromiseAndPipeline { kj::cp(exception), kj::heap<BrokenPipeline>(exception) }; return VoidPromiseAndPipeline { kj::cp(exception), kj::heap<BrokenPipeline>(exception) };
} }
kj::Maybe<const ClientHook&> getResolved() const {
return nullptr;
}
kj::Maybe<kj::Promise<kj::Own<const ClientHook>>> whenMoreResolved() const override { kj::Maybe<kj::Promise<kj::Own<const ClientHook>>> whenMoreResolved() const override {
return kj::Promise<kj::Own<const ClientHook>>(kj::cp(exception)); return kj::Promise<kj::Own<const ClientHook>>(kj::cp(exception));
} }
......
...@@ -199,7 +199,11 @@ class QueuedClient final: public ClientHook, public kj::Refcounted { ...@@ -199,7 +199,11 @@ class QueuedClient final: public ClientHook, public kj::Refcounted {
public: public:
QueuedClient(const kj::EventLoop& loop, kj::Promise<kj::Own<const ClientHook>>&& promise) QueuedClient(const kj::EventLoop& loop, kj::Promise<kj::Own<const ClientHook>>&& promise)
: loop(loop), : loop(loop),
promise(loop.fork(kj::mv(promise))) {} promise(loop.fork(kj::mv(promise))),
selfResolutionOp(loop.there(this->promise.addBranch(),
[this](kj::Own<const ClientHook>&& inner) {
*redirect.lockExclusive() = kj::mv(inner);
})) {}
Request<ObjectPointer, ObjectPointer> newCall( Request<ObjectPointer, ObjectPointer> newCall(
uint64_t interfaceId, uint16_t methodId, uint firstSegmentWordSize) const override { uint64_t interfaceId, uint16_t methodId, uint firstSegmentWordSize) const override {
...@@ -264,6 +268,14 @@ public: ...@@ -264,6 +268,14 @@ public:
return VoidPromiseAndPipeline { kj::mv(completionPromise), kj::mv(pipeline) }; return VoidPromiseAndPipeline { kj::mv(completionPromise), kj::mv(pipeline) };
} }
kj::Maybe<const ClientHook&> getResolved() const {
KJ_IF_MAYBE(inner, *redirect.lockExclusive()) {
return **inner;
} else {
return nullptr;
}
}
kj::Maybe<kj::Promise<kj::Own<const ClientHook>>> whenMoreResolved() const override { kj::Maybe<kj::Promise<kj::Own<const ClientHook>>> whenMoreResolved() const override {
return getPromiseForClientResolution().addBranch(); return getPromiseForClientResolution().addBranch();
} }
...@@ -300,6 +312,12 @@ private: ...@@ -300,6 +312,12 @@ private:
// resolves). Luckily, we know that queued calls will involve, at the very least, an // resolves). Luckily, we know that queued calls will involve, at the very least, an
// eventLoop.evalLater. // eventLoop.evalLater.
kj::MutexGuarded<kj::Maybe<kj::Own<const ClientHook>>> redirect;
// Once the promise resolves, this will become non-null and point to the underlying object.
kj::Promise<void> selfResolutionOp;
// Represents the operation which will set `redirect` when possible.
const ClientHookPromiseFork& getPromiseForCallForwarding() const { const ClientHookPromiseFork& getPromiseForCallForwarding() const {
return promiseForCallForwarding.get([this](kj::SpaceFor<ClientHookPromiseFork>& space) { return promiseForCallForwarding.get([this](kj::SpaceFor<ClientHookPromiseFork>& space) {
return space.construct(loop.fork(promise.addBranch())); return space.construct(loop.fork(promise.addBranch()));
...@@ -403,6 +421,10 @@ public: ...@@ -403,6 +421,10 @@ public:
kj::refcounted<QueuedPipeline>(server.getEventLoop(), kj::mv(pipelinePromise)) }; kj::refcounted<QueuedPipeline>(server.getEventLoop(), kj::mv(pipelinePromise)) };
} }
kj::Maybe<const ClientHook&> getResolved() const {
return nullptr;
}
kj::Maybe<kj::Promise<kj::Own<const ClientHook>>> whenMoreResolved() const override { kj::Maybe<kj::Promise<kj::Own<const ClientHook>>> whenMoreResolved() const override {
return nullptr; return nullptr;
} }
......
...@@ -315,10 +315,17 @@ public: ...@@ -315,10 +315,17 @@ public:
// //
// The call must not begin synchronously, as the caller may hold arbitrary mutexes. // The call must not begin synchronously, as the caller may hold arbitrary mutexes.
virtual kj::Maybe<const ClientHook&> getResolved() const = 0;
// If this ClientHook is a promise that has already resolved, returns the inner, resolved version
// of the capability. The caller may permanently replace this client with the resolved one if
// desired. Returns null if the client isn't a promise or hasn't resolved yet -- use
// `whenMoreResolved()` to distinguish between them.
virtual kj::Maybe<kj::Promise<kj::Own<const ClientHook>>> whenMoreResolved() const = 0; virtual kj::Maybe<kj::Promise<kj::Own<const ClientHook>>> whenMoreResolved() const = 0;
// If this client is a settled reference (not a promise), return nullptr. Otherwise, return a // If this client is a settled reference (not a promise), return nullptr. Otherwise, return a
// promise that eventually resolves to a new client that is closer to being the final, settled // promise that eventually resolves to a new client that is closer to being the final, settled
// client. Calling this repeatedly should eventually produce a settled client. // client (i.e. the value eventually returned by `getResolved()`). Calling this repeatedly
// should eventually produce a settled client.
kj::Promise<void> whenResolved() const; kj::Promise<void> whenResolved() const;
// Repeatedly calls whenMoreResolved() until it returns nullptr. // Repeatedly calls whenMoreResolved() until it returns nullptr.
......
...@@ -406,6 +406,10 @@ private: ...@@ -406,6 +406,10 @@ private:
kj::Own<const ClientHook> clientHook; kj::Own<const ClientHook> clientHook;
kj::Promise<void> resolveOp = nullptr;
// If this export is a promise (not a settled capability), the `resolveOp` represents the
// ongoing operation to wait for that promise to resolve and then send a `Resolve` message.
inline bool operator==(decltype(nullptr)) const { return refcount == 0; } inline bool operator==(decltype(nullptr)) const { return refcount == 0; }
inline bool operator!=(decltype(nullptr)) const { return refcount != 0; } inline bool operator!=(decltype(nullptr)) const { return refcount != 0; }
}; };
...@@ -675,6 +679,10 @@ private: ...@@ -675,6 +679,10 @@ private:
return Request<ObjectPointer, ObjectPointer>(root, kj::mv(request)); return Request<ObjectPointer, ObjectPointer>(root, kj::mv(request));
} }
kj::Maybe<const ClientHook&> getResolved() const {
return nullptr;
}
kj::Maybe<kj::Promise<kj::Own<const ClientHook>>> whenMoreResolved() const override { kj::Maybe<kj::Promise<kj::Own<const ClientHook>>> whenMoreResolved() const override {
return nullptr; return nullptr;
} }
...@@ -727,6 +735,10 @@ private: ...@@ -727,6 +735,10 @@ private:
return Request<ObjectPointer, ObjectPointer>(root, kj::mv(request)); return Request<ObjectPointer, ObjectPointer>(root, kj::mv(request));
} }
kj::Maybe<const ClientHook&> getResolved() const {
return nullptr;
}
kj::Maybe<kj::Promise<kj::Own<const ClientHook>>> whenMoreResolved() const override { kj::Maybe<kj::Promise<kj::Own<const ClientHook>>> whenMoreResolved() const override {
return nullptr; return nullptr;
} }
...@@ -746,7 +758,7 @@ private: ...@@ -746,7 +758,7 @@ private:
kj::Promise<kj::Own<const ClientHook>> eventual, kj::Promise<kj::Own<const ClientHook>> eventual,
kj::Maybe<ExportId> importId) kj::Maybe<ExportId> importId)
: RpcClient(connectionState), : RpcClient(connectionState),
inner(kj::mv(initial)), inner(Inner {false, kj::mv(initial)}),
importId(importId), importId(importId),
fork(connectionState.eventLoop.fork(kj::mv(eventual))), fork(connectionState.eventLoop.fork(kj::mv(eventual))),
resolveSelfPromise(connectionState.eventLoop.there(fork.addBranch(), resolveSelfPromise(connectionState.eventLoop.there(fork.addBranch(),
...@@ -783,20 +795,28 @@ private: ...@@ -783,20 +795,28 @@ private:
kj::Maybe<ExportId> writeDescriptor( kj::Maybe<ExportId> writeDescriptor(
rpc::CapDescriptor::Builder descriptor, Tables& tables) const override { rpc::CapDescriptor::Builder descriptor, Tables& tables) const override {
auto cap = inner.lockExclusive()->get()->addRef(); return connectionState->writeDescriptor(*inner.lockExclusive()->cap, descriptor, tables);
return connectionState->writeDescriptor(kj::mv(cap), descriptor, tables);
} }
kj::Maybe<kj::Own<const ClientHook>> writeTarget( kj::Maybe<kj::Own<const ClientHook>> writeTarget(
rpc::Call::Target::Builder target) const override { rpc::Call::Target::Builder target) const override {
return connectionState->writeTarget(**inner.lockExclusive(), target); return connectionState->writeTarget(*inner.lockExclusive()->cap, target);
} }
// implements ClientHook ----------------------------------------- // implements ClientHook -----------------------------------------
Request<ObjectPointer, ObjectPointer> newCall( Request<ObjectPointer, ObjectPointer> newCall(
uint64_t interfaceId, uint16_t methodId, uint firstSegmentWordSize) const override { uint64_t interfaceId, uint16_t methodId, uint firstSegmentWordSize) const override {
return inner.lockExclusive()->get()->newCall(interfaceId, methodId, firstSegmentWordSize); return inner.lockExclusive()->cap->newCall(interfaceId, methodId, firstSegmentWordSize);
}
kj::Maybe<const ClientHook&> getResolved() const {
auto lock = inner.lockShared();
if (lock->isResolved) {
return *lock->cap;
} else {
return nullptr;
}
} }
kj::Maybe<kj::Promise<kj::Own<const ClientHook>>> whenMoreResolved() const override { kj::Maybe<kj::Promise<kj::Own<const ClientHook>>> whenMoreResolved() const override {
...@@ -804,7 +824,12 @@ private: ...@@ -804,7 +824,12 @@ private:
} }
private: private:
kj::MutexGuarded<kj::Own<const ClientHook>> inner; struct Inner {
bool isResolved;
kj::Own<const ClientHook> cap;
};
kj::MutexGuarded<Inner> inner;
kj::Maybe<ExportId> importId; kj::Maybe<ExportId> importId;
kj::ForkedPromise<kj::Own<const ClientHook>> fork; kj::ForkedPromise<kj::Own<const ClientHook>> fork;
...@@ -816,33 +841,70 @@ private: ...@@ -816,33 +841,70 @@ private:
// Careful to make sure the old client is not destroyed until we release the lock. // Careful to make sure the old client is not destroyed until we release the lock.
kj::Own<const ClientHook> old; kj::Own<const ClientHook> old;
auto lock = inner.lockExclusive(); auto lock = inner.lockExclusive();
old = kj::mv(*lock); old = kj::mv(lock->cap);
*lock = replacement->addRef(); lock->cap = replacement->addRef();
lock->isResolved = true;
} }
}; };
kj::Maybe<ExportId> writeDescriptor( kj::Maybe<ExportId> writeDescriptor(
kj::Own<const ClientHook> cap, rpc::CapDescriptor::Builder descriptor, const ClientHook& cap, rpc::CapDescriptor::Builder descriptor,
Tables& tables) const { Tables& tables) const {
// Write a descriptor for the given capability. The tables must be locked by the caller and // Write a descriptor for the given capability. The tables must be locked by the caller and
// passed in as a parameter. // passed in as a parameter.
if (cap->getBrand() == this) { // Find the innermost wrapped capability.
return kj::downcast<const RpcClient>(*cap).writeDescriptor(descriptor, tables); const ClientHook* inner = &cap;
for (;;) {
KJ_IF_MAYBE(resolved, inner->getResolved()) {
inner = resolved;
} else {
break;
}
}
if (inner->getBrand() == this) {
return kj::downcast<const RpcClient>(*inner).writeDescriptor(descriptor, tables);
} else { } else {
auto iter = tables.exportsByCap.find(cap); auto iter = tables.exportsByCap.find(inner);
if (iter != tables.exportsByCap.end()) { if (iter != tables.exportsByCap.end()) {
// We've already seen and exported this capability before. Just up the refcount.
auto& exp = KJ_ASSERT_NONNULL(tables.exports.find(iter->second)); auto& exp = KJ_ASSERT_NONNULL(tables.exports.find(iter->second));
++exp.refcount; ++exp.refcount;
// TODO(now): Check if it's a promise.
descriptor.setSenderHosted(iter->second); descriptor.setSenderHosted(iter->second);
return iter->second; return iter->second;
} else { } else {
// This is the first time we've seen this capability.
ExportId exportId; ExportId exportId;
auto& exp = tables.exports.next(exportId); auto& exp = tables.exports.next(exportId);
exp.refcount = 1; exp.refcount = 1;
exp.clientHook = kj::mv(cap); exp.clientHook = inner->addRef();
descriptor.setSenderHosted(exportId); descriptor.setSenderHosted(exportId);
KJ_IF_MAYBE(wrapped, inner->whenMoreResolved()) {
// This is a promise. Arrange for the `Resolve` message to be sent later.
exp.resolveOp = eventLoop.there(kj::mv(*wrapped),
[this,exportId](kj::Own<const ClientHook>&& resolution) {
// send resolve
auto message = connection->newOutgoingMessage(
messageSizeHint<rpc::Resolve>() + sizeInWords<rpc::CapDescriptor>() + 16);
auto resolve = message->getBody().initAs<rpc::Message>().initResolve();
resolve.setPromiseId(exportId);
writeDescriptor(*resolution, resolve.initCap(), *this->tables.lockExclusive());
message->send();
}, [this,exportId](kj::Exception&& exception) {
// send resolve
auto message = connection->newOutgoingMessage(
messageSizeHint<rpc::Resolve>() + sizeInWords<rpc::Exception>() +
(exception.getDescription().size() + 7 / 8) + 8);
auto resolve = message->getBody().initAs<rpc::Message>().initResolve();
resolve.setPromiseId(exportId);
fromException(exception, resolve.initException());
message->send();
});
exp.resolveOp.eagerlyEvaluate(eventLoop);
}
return exportId; return exportId;
} }
} }
...@@ -1097,6 +1159,7 @@ private: ...@@ -1097,6 +1159,7 @@ private:
for (auto exportId: exports) { for (auto exportId: exports) {
auto& exp = KJ_ASSERT_NONNULL(lock->exports.find(exportId)); auto& exp = KJ_ASSERT_NONNULL(lock->exports.find(exportId));
if (--exp.refcount == 0) { if (--exp.refcount == 0) {
lock->exportsByCap.erase(exp.clientHook);
clientsToRelease.add(kj::mv(exp.clientHook)); clientsToRelease.add(kj::mv(exp.clientHook));
lock->exports.erase(exportId); lock->exports.erase(exportId);
} }
...@@ -1121,7 +1184,7 @@ private: ...@@ -1121,7 +1184,7 @@ private:
// If maybeExportId is inlined, GCC 4.7 reports a spurious "may be used uninitialized" // If maybeExportId is inlined, GCC 4.7 reports a spurious "may be used uninitialized"
// error (GCC 4.8 and Clang do not complain). // error (GCC 4.8 and Clang do not complain).
auto maybeExportId = connectionState.writeDescriptor( auto maybeExportId = connectionState.writeDescriptor(
entry.second.cap->addRef(), entry.second.builder, tables); *entry.second.cap, entry.second.builder, tables);
KJ_IF_MAYBE(exportId, maybeExportId) { KJ_IF_MAYBE(exportId, maybeExportId) {
KJ_ASSERT(tables.exports.find(*exportId) != nullptr); KJ_ASSERT(tables.exports.find(*exportId) != nullptr);
exports.add(*exportId); exports.add(*exportId);
...@@ -1791,6 +1854,10 @@ private: ...@@ -1791,6 +1854,10 @@ private:
// TODO(now) // TODO(now)
break; break;
case rpc::Message::DISEMBARGO:
// TODO(now)
break;
case rpc::Message::RESTORE: case rpc::Message::RESTORE:
handleRestore(kj::mv(message), reader.getRestore()); handleRestore(kj::mv(message), reader.getRestore());
break; break;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment