Commit d3869607 authored by Kenton Varda's avatar Kenton Varda

Implement embargoes.

parent 1c847b38
......@@ -203,7 +203,9 @@ public:
selfResolutionOp(loop.there(this->promise.addBranch(),
[this](kj::Own<const ClientHook>&& inner) {
*redirect.lockExclusive() = kj::mv(inner);
})) {}
})) {
selfResolutionOp.eagerlyEvaluate(loop);
}
Request<ObjectPointer, ObjectPointer> newCall(
uint64_t interfaceId, uint16_t methodId, uint firstSegmentWordSize) const override {
......@@ -449,4 +451,9 @@ kj::Own<const ClientHook> Capability::Client::makeLocalClient(
return kj::refcounted<LocalClient>(eventLoop, kj::mv(server));
}
kj::Own<ClientHook> newLocalPromiseClient(kj::Promise<kj::Own<const ClientHook>>&& promise,
const kj::EventLoop& loop) {
return kj::refcounted<QueuedClient>(loop, kj::mv(promise));
}
} // namespace capnp
......@@ -353,6 +353,12 @@ public:
virtual kj::Own<CallContextHook> addRef() = 0;
};
kj::Own<ClientHook> newLocalPromiseClient(kj::Promise<kj::Own<const ClientHook>>&& promise,
const kj::EventLoop& loop = kj::EventLoop::current());
// Returns a ClientHook that queues up calls until `promise` resolves, then forwards them to
// the new client. This hook's `getResolved()` and `whenMoreResolved()` methods will reflect the
// redirection to the eventual replacement client.
// =======================================================================================
// Extend PointerHelpers for interfaces
......
......@@ -108,12 +108,14 @@ kj::Exception toException(const rpc::Exception::Reader& exception) {
break;
}
return kj::Exception(nature, durability, "(remote)", 0, kj::heapString(exception.getReason()));
return kj::Exception(nature, durability, "(remote)", 0,
kj::str("remote exception: ", exception.getReason()));
}
void fromException(const kj::Exception& exception, rpc::Exception::Builder builder) {
// TODO(someday): Indicate the remote server name as part of the stack trace.
builder.setReason(kj::str("remote exception: ", exception.getDescription()));
// TODO(someday): Indicate the remote server name as part of the stack trace. Maybe even
// transmit stack traces?
builder.setReason(exception.getDescription());
builder.setIsCallersFault(exception.getNature() == kj::Exception::Nature::PRECONDITION);
switch (exception.getDurability()) {
case kj::Exception::Durability::PERMANENT:
......@@ -129,8 +131,7 @@ void fromException(const kj::Exception& exception, rpc::Exception::Builder build
}
uint exceptionSizeHint(const kj::Exception& exception) {
return sizeInWords<rpc::Exception>() +
(exception.getDescription().size() + strlen("remote exception: ")) / sizeof(word) + 1;
return sizeInWords<rpc::Exception>() + exception.getDescription().size() / sizeof(word) + 1;
}
// =======================================================================================
......@@ -442,6 +443,18 @@ private:
// If non-null, the import is a promise.
};
typedef uint32_t EmbargoId;
struct Embargo {
// For handling the `Disembargo` message when looping back to self.
kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> fulfiller;
// Fulfill this when the Disembargo arrives.
inline bool operator==(decltype(nullptr)) const { return fulfiller == nullptr; }
inline bool operator!=(decltype(nullptr)) const { return fulfiller != nullptr; }
};
// =======================================================================================
// OK, now we can define RpcConnectionState's member data.
......@@ -469,6 +482,10 @@ private:
kj::Maybe<kj::Exception> networkException;
// If the connection has failed, this is the exception describing the failure. All future
// calls should throw this exception.
ExportTable<EmbargoId, Embargo> embargoes;
// There are only four tables. This definitely isn't a fifth table. I don't know what you're
// talking about.
};
kj::MutexGuarded<Tables> tables;
......@@ -631,6 +648,11 @@ private:
// If calls have been redirected to some other local ClientHook, returns that hook instead.
// This can happen if the capability represents a promise that has been resolved.
virtual kj::Own<const ClientHook> getInnermostClient() const = 0;
// If this client just wraps some other client -- even if it is only *temporarily* wrapping
// that other client -- return a reference to the other client, transitively. Otherwise,
// return a new reference to *this.
// implements ClientHook -----------------------------------------
VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
......@@ -741,6 +763,10 @@ private:
return nullptr;
}
kj::Own<const ClientHook> getInnermostClient() const override {
return kj::addRef(*this);
}
// implements ClientHook -----------------------------------------
Request<ObjectPointer, ObjectPointer> newCall(
......@@ -798,6 +824,10 @@ private:
return nullptr;
}
kj::Own<const ClientHook> getInnermostClient() const override {
return kj::addRef(*this);
}
// implements ClientHook -----------------------------------------
Request<ObjectPointer, ObjectPointer> newCall(
......@@ -878,9 +908,14 @@ private:
kj::Maybe<kj::Own<const ClientHook>> writeTarget(
rpc::MessageTarget::Builder target) const override {
__atomic_store_n(&receivedCall, true, __ATOMIC_RELAXED);
return connectionState->writeTarget(*inner.lockExclusive()->cap, target);
}
kj::Own<const ClientHook> getInnermostClient() const override {
return connectionState->getInnermostClient(*inner.lockExclusive()->cap);
}
// implements ClientHook -----------------------------------------
Request<ObjectPointer, ObjectPointer> newCall(
......@@ -915,10 +950,52 @@ private:
// ensure the continuation is not still running.
kj::Promise<void> resolveSelfPromise;
mutable bool receivedCall = false;
void resolve(kj::Own<const ClientHook> replacement) {
if (__atomic_load_n(&receivedCall, __ATOMIC_RELAXED)) {
// There were calls to the old inner. We have to send it an embargo and temporarily
// delay resolution.
auto message = connectionState->connection->newOutgoingMessage(
messageSizeHint<rpc::Disembargo>() + sizeInWords<rpc::MessageTarget>() +
sizeInWords<rpc::PromisedAnswer>());
auto disembargo = message->getBody().initAs<rpc::Message>().getDisembargo();
{
auto redirect = connectionState->writeTarget(
*inner.lockExclusive()->cap, disembargo.initTarget());
KJ_ASSERT(redirect == nullptr,
"Original promise target should always be from this RPC connection.");
}
EmbargoId embargoId;
auto lock = connectionState->tables.lockExclusive();
Embargo& embargo = lock->embargoes.next(embargoId);
disembargo.getContext().setSenderLoopback(embargoId);
auto paf = kj::newPromiseAndFulfiller<void>();
embargo.fulfiller = kj::mv(paf.fulfiller);
// Make a promise which resolves to `replacement` as soon as the `Disembargo` comes back.
auto embargoPromise = connectionState->eventLoop.there(kj::mv(paf.promise),
kj::mvCapture(replacement, [this](kj::Own<const ClientHook>&& replacement) {
return kj::mv(replacement);
}));
// We need to queue up calls in the meantime, so we'll resolve ourselves to a local promise
// client instead.
replacement = newLocalPromiseClient(kj::mv(embargoPromise), connectionState->eventLoop);
// Send the `Disembargo`.
message->send();
}
// Careful to make sure the old client is not destroyed until we release the lock.
kj::Own<const ClientHook> old;
auto lock = inner.lockExclusive();
old = kj::mv(lock->cap);
lock->cap = replacement->addRef();
lock->isResolved = true;
......@@ -955,31 +1032,14 @@ private:
// This is the first time we've seen this capability.
ExportId exportId;
auto& exp = tables.exports.next(exportId);
tables.exportsByCap[inner] = exportId;
exp.refcount = 1;
exp.clientHook = inner->addRef();
descriptor.setSenderHosted(exportId);
KJ_IF_MAYBE(wrapped, inner->whenMoreResolved()) {
// This is a promise. Arrange for the `Resolve` message to be sent later.
exp.resolveOp = eventLoop.there(kj::mv(*wrapped),
[this,exportId](kj::Own<const ClientHook>&& resolution) {
// send resolve
auto message = connection->newOutgoingMessage(
messageSizeHint<rpc::Resolve>() + sizeInWords<rpc::CapDescriptor>() + 16);
auto resolve = message->getBody().initAs<rpc::Message>().initResolve();
resolve.setPromiseId(exportId);
writeDescriptor(*resolution, resolve.initCap(), *this->tables.lockExclusive());
message->send();
}, [this,exportId](kj::Exception&& exception) {
// send resolve
auto message = connection->newOutgoingMessage(
messageSizeHint<rpc::Resolve>() + exceptionSizeHint(exception) + 8);
auto resolve = message->getBody().initAs<rpc::Message>().initResolve();
resolve.setPromiseId(exportId);
fromException(exception, resolve.initException());
message->send();
});
exp.resolveOp.eagerlyEvaluate(eventLoop);
exp.resolveOp = resolveExportedPromise(exportId, kj::mv(*wrapped));
}
return exportId;
......@@ -1006,6 +1066,92 @@ private:
}
}
kj::Own<const ClientHook> getInnermostClient(const ClientHook& client) const {
const ClientHook* ptr = &client;
for (;;) {
KJ_IF_MAYBE(inner, ptr->getResolved()) {
ptr = inner;
} else {
break;
}
}
if (ptr->getBrand() == this) {
return kj::downcast<const RpcClient>(*ptr).getInnermostClient();
} else {
return ptr->addRef();
}
}
kj::Promise<void> resolveExportedPromise(
ExportId exportId, kj::Promise<kj::Own<const ClientHook>>&& promise) const {
// Implements exporting of a promise. The promise has been exported under the given ID, and is
// to eventually resolve to the ClientHook produced by `promise`. This method waits for that
// resolve to happen and then sends the appropriate `Resolve` message to the peer.
auto result = eventLoop.there(kj::mv(promise),
[this,exportId](kj::Own<const ClientHook>&& resolution) -> kj::Promise<void> {
// Successful resolution.
// Get the innermost ClientHook backing the resolved client. This includes traversing
// PromiseClients that haven't resolved yet to their underlying ImportClient or
// PipelineClient, so that we get a remote promise that might resolve later. This is
// important to make sure that if the peer sends a `Disembargo` back to us, it bounces back
// correctly instead of going to the result of some future resolution. See the documentation
// for `Disembargo` in `rpc.capnp`.
resolution = getInnermostClient(*resolution);
// Update the export table to point at this object instead. We know that our entry in the
// export table is still live because when it is destroyed the asynchronous resolution task
// (i.e. this code) is canceled.
auto lock = tables.lockExclusive();
auto& exp = KJ_ASSERT_NONNULL(lock->exports.find(exportId));
lock->exportsByCap.erase(exp.clientHook);
exp.clientHook = kj::mv(resolution);
if (resolution->getBrand() != this) {
// We're resolving to a local capability. If we're resolving to a promise, we might be
// able to reuse our export table entry and avoid sending a message.
KJ_IF_MAYBE(promise, resolution->whenMoreResolved()) {
// We're replacing a promise with another local promise. In this case, we might actually
// be able to just reuse the existing export table entry to represent the new promise --
// unless it already has an entry. Let's check.
auto insertResult = lock->exportsByCap.insert(
std::make_pair(exp.clientHook.get(), exportId));
if (insertResult.second) {
// The new promise was not already in the table, therefore the existing export table
// entry has now been repurposed to represent it. There is no need to send a resolve
// message at all. We do, however, have to start resolving the next promise.
return resolveExportedPromise(exportId, kj::mv(*promise));
}
}
}
// OK, we have to send a `Resolve` message.
auto message = connection->newOutgoingMessage(
messageSizeHint<rpc::Resolve>() + sizeInWords<rpc::CapDescriptor>() + 16);
auto resolve = message->getBody().initAs<rpc::Message>().initResolve();
resolve.setPromiseId(exportId);
writeDescriptor(*resolution, resolve.initCap(), *this->tables.lockExclusive());
message->send();
return kj::READY_NOW;
}, [this,exportId](kj::Exception&& exception) {
// send error resolution
auto message = connection->newOutgoingMessage(
messageSizeHint<rpc::Resolve>() + exceptionSizeHint(exception) + 8);
auto resolve = message->getBody().initAs<rpc::Message>().initResolve();
resolve.setPromiseId(exportId);
fromException(exception, resolve.initException());
message->send();
});
result.eagerlyEvaluate(eventLoop);
return kj::mv(result);
}
// =====================================================================================
// CapExtractor / CapInjector implementations
......@@ -1945,7 +2091,7 @@ private:
break;
case rpc::Message::DISEMBARGO:
// TODO(now)
handleDisembargo(reader.getDisembargo());
break;
case rpc::Message::RESTORE:
......@@ -2000,55 +2146,12 @@ private:
void handleCall(kj::Own<IncomingRpcMessage>&& message, const rpc::Call::Reader& call) {
kj::Own<const ClientHook> capability;
auto target = call.getTarget();
switch (target.which()) {
case rpc::MessageTarget::EXPORTED_CAP: {
auto lock = tables.lockExclusive(); // TODO(perf): shared?
KJ_IF_MAYBE(exp, lock->exports.find(target.getExportedCap())) {
capability = exp->clientHook->addRef();
} else {
KJ_FAIL_REQUIRE("Call target is not a current export ID.") {
return;
}
}
break;
}
case rpc::MessageTarget::PROMISED_ANSWER: {
auto promisedAnswer = target.getPromisedAnswer();
kj::Own<const PipelineHook> pipeline;
{
auto lock = tables.lockExclusive(); // TODO(perf): shared?
auto& base = lock->answers[promisedAnswer.getQuestionId()];
KJ_REQUIRE(base.active, "PromisedAnswer.questionId is not a current question.") {
return;
}
KJ_IF_MAYBE(p, base.pipeline) {
pipeline = p->get()->addRef();
} else {
KJ_FAIL_REQUIRE("PromisedAnswer.questionId is already finished or contained no "
"capabilities.") {
return;
}
}
}
KJ_IF_MAYBE(ops, toPipelineOps(promisedAnswer.getTransform())) {
capability = pipeline->getPipelinedCap(*ops);
KJ_IF_MAYBE(t, getMessageTarget(call.getTarget())) {
capability = kj::mv(*t);
} else {
// Exception already thrown.
// Exception already reported.
return;
}
break;
}
default:
// TODO(soon): Handle better.
KJ_FAIL_REQUIRE("Unknown call target type.", (uint)target.which()) {
return;
}
}
QuestionId questionId = call.getQuestionId();
// Note: resolutionChainTail couldn't possibly be changing here because we only handle one
......@@ -2099,6 +2202,55 @@ private:
}
}
kj::Maybe<kj::Own<const ClientHook>> getMessageTarget(const rpc::MessageTarget::Reader& target) {
switch (target.which()) {
case rpc::MessageTarget::EXPORTED_CAP: {
auto lock = tables.lockExclusive(); // TODO(perf): shared?
KJ_IF_MAYBE(exp, lock->exports.find(target.getExportedCap())) {
return exp->clientHook->addRef();
} else {
KJ_FAIL_REQUIRE("Message target is not a current export ID.") {
return nullptr;
}
}
break;
}
case rpc::MessageTarget::PROMISED_ANSWER: {
auto promisedAnswer = target.getPromisedAnswer();
kj::Own<const PipelineHook> pipeline;
{
auto lock = tables.lockExclusive(); // TODO(perf): shared?
auto& base = lock->answers[promisedAnswer.getQuestionId()];
KJ_REQUIRE(base.active, "PromisedAnswer.questionId is not a current question.") {
return nullptr;
}
KJ_IF_MAYBE(p, base.pipeline) {
pipeline = p->get()->addRef();
} else {
KJ_FAIL_REQUIRE("PromisedAnswer.questionId is already finished or contained no "
"capabilities.") {
return nullptr;
}
}
}
KJ_IF_MAYBE(ops, toPipelineOps(promisedAnswer.getTransform())) {
return pipeline->getPipelinedCap(*ops);
} else {
// Exception already thrown.
return nullptr;
}
}
default:
KJ_FAIL_REQUIRE("Unknown message target type.", target) {
return nullptr;
}
}
}
void handleReturn(kj::Own<IncomingRpcMessage>&& message, const rpc::Return::Reader& ret) {
kj::Own<CapInjectorImpl> paramCapsToRelease;
......@@ -2270,6 +2422,78 @@ private:
}
}
void handleDisembargo(const rpc::Disembargo::Reader& disembargo) {
auto lock = tables.lockExclusive();
auto context = disembargo.getContext();
switch (context.which()) {
case rpc::Disembargo::Context::SENDER_LOOPBACK: {
kj::Own<const ClientHook> target;
KJ_IF_MAYBE(t, getMessageTarget(disembargo.getTarget())) {
target = kj::mv(*t);
} else {
// Exception already reported.
return;
}
for (;;) {
KJ_IF_MAYBE(r, target->getResolved()) {
target = r->addRef();
} else {
break;
}
}
KJ_REQUIRE(target->getBrand() == this,
"'Disembargo' of type 'senderLoopback' sent to an object that does not point "
"back to the sender.") {
return;
}
const RpcClient& downcasted = kj::downcast<const RpcClient>(*target);
auto message = connection->newOutgoingMessage(messageSizeHint<rpc::Disembargo>() +
sizeInWords<rpc::MessageTarget>() + sizeInWords<rpc::PromisedAnswer>());
auto builder = message->getBody().initAs<rpc::Message>().initDisembargo();
{
auto redirect = downcasted.writeTarget(builder.initTarget());
// Disembargoes should only be sent to capabilities that were previously the object of
// a `Resolve` message. But `writeTarget` only ever returns non-null when called on
// a PromiseClient. The code which sends `Resolve` should have replaced any promise
// with a direct node in order to solve the Tribble 4-way race condition.
KJ_REQUIRE(redirect == nullptr,
"'Disembargo' of type 'senderLoopback' sent to an object that does not appear "
"to have been the object of a previous 'Resolve' message.") {
return;
}
}
builder.getContext().setReceiverLoopback(context.getSenderLoopback());
message->send();
break;
}
case rpc::Disembargo::Context::RECEIVER_LOOPBACK:
KJ_IF_MAYBE(embargo, lock->embargoes.find(context.getReceiverLoopback())) {
KJ_ASSERT_NONNULL(embargo->fulfiller)->fulfill();
lock->embargoes.erase(context.getReceiverLoopback());
} else {
KJ_FAIL_REQUIRE("Invalid embargo ID in 'Disembargo.context.receiverLoopback'.") {
return;
}
}
break;
default:
KJ_FAIL_REQUIRE("Unimplemented Disembargo type.", disembargo) { return; }
}
}
// ---------------------------------------------------------------------------
// Level 2
......
......@@ -441,6 +441,15 @@ struct Resolve {
union {
cap @1 :CapDescriptor;
# The object to which the promise resolved.
#
# The sender promises that from this point forth, until `promiseId` is released, it shall
# simply forward all messages to the capability designated by `cap`. This is true even if
# `cap` itself happens to desigate another promise, and that other promise later resolves --
# messages sent to `promiseId` shall still go to that other promise, not to its resolution.
# This is important in the case that the receiver of the `Resolve` ends up sending a
# `Disembargo` message towards `promiseId` in order to control message ordering -- that
# `Disembargo` really needs to reflect back to exactly the object designated by `cap` even
# if that object is itself a promise.
exception @2 :Exception;
# Indicates that the promise was broken.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment