Commit 1c847b38 authored by Kenton Varda's avatar Kenton Varda

Tweaks

parent 15f599c8
......@@ -80,22 +80,17 @@ public:
void send() override {
auto lock = network.previousWrite.lockExclusive();
*lock = network.eventLoop.there(kj::mv(*lock),
kj::mvCapture(kj::addRef(*this), [&](kj::Own<OutgoingMessageImpl>&& self) {
return writeMessage(network.stream, message)
.then(kj::mvCapture(kj::mv(self),
[](kj::Own<OutgoingMessageImpl>&& self) -> kj::Promise<void> {
// Just here to hold a reference to `self` until the write completes.
// Hack to force this continuation to run (thus allowing `self` to be released) even if
// no one is waiting on the promise.
return kj::READY_NOW;
}), [&](kj::Exception&& exception) -> kj::Promise<void> {
*lock = network.eventLoop.there(kj::mv(*lock), [&]() {
auto promise = writeMessage(network.stream, message).then([]() {
// success; do nothing
}, [&](kj::Exception&& exception) {
// Exception during write!
network.disconnectFulfiller.lockExclusive()->get()->fulfill();
return kj::READY_NOW;
});
}));
promise.eagerlyEvaluate();
return kj::mv(promise);
});
lock->attach(kj::addRef(*this));
}
private:
......
......@@ -128,6 +128,11 @@ void fromException(const kj::Exception& exception, rpc::Exception::Builder build
}
}
uint exceptionSizeHint(const kj::Exception& exception) {
return sizeInWords<rpc::Exception>() +
(exception.getDescription().size() + strlen("remote exception: ")) / sizeof(word) + 1;
}
// =======================================================================================
template <typename Id, typename T>
......@@ -343,8 +348,7 @@ public:
{
// Send an abort message.
auto message = connection->newOutgoingMessage(
messageSizeHint<rpc::Exception>() +
(exception.getDescription().size() + 7) / sizeof(word));
messageSizeHint<void>() + exceptionSizeHint(exception));
fromException(exception, message->getBody().getAs<rpc::Message>().initAbort());
message->send();
}
......@@ -619,7 +623,7 @@ private:
// later.
virtual kj::Maybe<kj::Own<const ClientHook>> writeTarget(
rpc::Call::Target::Builder target) const = 0;
rpc::MessageTarget::Builder target) const = 0;
// Writes the appropriate call target for calls to this capability and returns null.
//
// - OR -
......@@ -732,7 +736,7 @@ private:
}
kj::Maybe<kj::Own<const ClientHook>> writeTarget(
rpc::Call::Target::Builder target) const override {
rpc::MessageTarget::Builder target) const override {
target.setExportedCap(importId);
return nullptr;
}
......@@ -787,7 +791,7 @@ private:
}
kj::Maybe<kj::Own<const ClientHook>> writeTarget(
rpc::Call::Target::Builder target) const override {
rpc::MessageTarget::Builder target) const override {
auto builder = target.initPromisedAnswer();
builder.setQuestionId(questionRef->getId());
builder.adoptTransform(fromPipelineOps(Orphanage::getForMessageContaining(builder), ops));
......@@ -873,7 +877,7 @@ private:
}
kj::Maybe<kj::Own<const ClientHook>> writeTarget(
rpc::Call::Target::Builder target) const override {
rpc::MessageTarget::Builder target) const override {
return connectionState->writeTarget(*inner.lockExclusive()->cap, target);
}
......@@ -969,8 +973,7 @@ private:
}, [this,exportId](kj::Exception&& exception) {
// send resolve
auto message = connection->newOutgoingMessage(
messageSizeHint<rpc::Resolve>() + sizeInWords<rpc::Exception>() +
(exception.getDescription().size() + 7 / 8) + 8);
messageSizeHint<rpc::Resolve>() + exceptionSizeHint(exception) + 8);
auto resolve = message->getBody().initAs<rpc::Message>().initResolve();
resolve.setPromiseId(exportId);
fromException(exception, resolve.initException());
......@@ -985,7 +988,7 @@ private:
}
kj::Maybe<kj::Own<const ClientHook>> writeTarget(
const ClientHook& cap, rpc::Call::Target::Builder target) const {
const ClientHook& cap, rpc::MessageTarget::Builder target) const {
// If calls to the given capability should pass over this connection, fill in `target`
// appropriately for such a call and return nullptr. Otherwise, return a `ClientHook` to which
// the call should be forwarded; the caller should then delegate the call to that `ClientHook`.
......@@ -1031,7 +1034,7 @@ private:
// access is possible. It's probably not worth taking the lock to look; we'll just return
// a silly estimate.
uint count = final ? retainedCaps.getWithoutLock().size() : 32;
return (count * sizeof(ExportId) + (sizeof(ExportId) - 1)) / sizeof(word);
return (count * sizeof(ExportId) + (sizeof(word) - 1)) / sizeof(word);
}
struct FinalizedRetainedCaps {
......@@ -1399,7 +1402,8 @@ private:
: connectionState(kj::addRef(connectionState)),
target(kj::mv(target)),
message(connectionState.connection->newOutgoingMessage(
firstSegmentWordSize == 0 ? 0 : firstSegmentWordSize + messageSizeHint<rpc::Call>())),
firstSegmentWordSize == 0 ? 0 : firstSegmentWordSize + messageSizeHint<rpc::Call>() +
sizeInWords<rpc::MessageTarget>() + sizeInWords<rpc::PromisedAnswer>())),
injector(kj::heap<CapInjectorImpl>(connectionState)),
context(*injector),
callBuilder(message->getBody().getAs<rpc::Message>().initCall()),
......@@ -1672,8 +1676,8 @@ private:
void sendErrorReturn(kj::Exception&& exception) {
if (isFirstResponder()) {
auto message = connectionState->connection->newOutgoingMessage(
messageSizeHint<rpc::Return>() + sizeInWords<rpc::Exception>() +
exception.getDescription().size() / sizeof(word) + 1);
messageSizeHint<rpc::Return>() + requestCapExtractor.retainedListSizeHint(true) +
exceptionSizeHint(exception));
auto builder = message->getBody().initAs<rpc::Message>().initReturn();
builder.setQuestionId(questionId);
......@@ -1688,7 +1692,7 @@ private:
void sendCancel() {
if (isFirstResponder()) {
auto message = connectionState->connection->newOutgoingMessage(
messageSizeHint<rpc::Return>());
requestCapExtractor.retainedListSizeHint(true) + messageSizeHint<rpc::Return>());
auto builder = message->getBody().initAs<rpc::Message>().initReturn();
builder.setQuestionId(questionId);
......@@ -1894,7 +1898,9 @@ private:
KJ_IF_MAYBE(m, message) {
handleMessage(kj::mv(*m));
} else {
KJ_FAIL_REQUIRE("Peer disconnected.") { break; }
disconnect(kj::Exception(
kj::Exception::Nature::PRECONDITION, kj::Exception::Durability::PERMANENT,
__FILE__, __LINE__, kj::str("Peer disconnected.")));
}
});
return eventLoop.there(kj::mv(receive),
......@@ -1996,7 +2002,7 @@ private:
auto target = call.getTarget();
switch (target.which()) {
case rpc::Call::Target::EXPORTED_CAP: {
case rpc::MessageTarget::EXPORTED_CAP: {
auto lock = tables.lockExclusive(); // TODO(perf): shared?
KJ_IF_MAYBE(exp, lock->exports.find(target.getExportedCap())) {
capability = exp->clientHook->addRef();
......@@ -2008,7 +2014,7 @@ private:
break;
}
case rpc::Call::Target::PROMISED_ANSWER: {
case rpc::MessageTarget::PROMISED_ANSWER: {
auto promisedAnswer = target.getPromisedAnswer();
kj::Own<const PipelineHook> pipeline;
......@@ -2213,12 +2219,6 @@ private:
replacement = newBrokenCap(toException(resolve.getException()));
break;
case rpc::Resolve::CANCELED:
// Right, this can't possibly affect anything, then.
//
// TODO(now): Am I doing something wrong or is this not needed?
return;
default:
KJ_FAIL_REQUIRE("Unknown 'Resolve' type.") { return; }
}
......
......@@ -260,25 +260,16 @@ struct Call {
# - A matching Return has been received from the callee.
# - A matching Finish has been sent from the caller.
target :union {
exportedCap @1 :ExportId;
# This call is to a capability or promise previously exported by the receiver.
target @1 :MessageTarget;
# The object that should receive this call.
promisedAnswer @2 :PromisedAnswer;
# This call is to a capability that is expected to be returned by another call that has not
# yet been completed.
#
# At level 0, this is supported only for addressing the result of a previous `Restore`, so that
# initial startup doesn't require a round trip.
}
interfaceId @3 :UInt64;
interfaceId @2 :UInt64;
# The type ID of the interface being called. Each capability may implement multiple interfaces.
methodId @4 :UInt16;
methodId @3 :UInt16;
# The ordinal number of the method to call within the requested interface.
params @5 :Object;
params @4 :Object;
# The params struct. The fields of this struct correspond to the parameters of the method.
#
# The params may contain capabilities. These capabilities are automatically released when the
......@@ -287,10 +278,10 @@ struct Call {
sendReturnTo :union {
# Where should the return message be sent?
caller @6 :Void;
caller @5 :Void;
# Send the return message back to the caller (the usual).
yourself @7 :QuestionId;
yourself @6 :QuestionId;
# **(level 1)**
#
# This is actually an echo of a call originally made by the receiver, with the given question
......@@ -312,7 +303,7 @@ struct Call {
# - Vat B, on receiving the `Finish`, sends a corresponding `Finish` for bar'().
# - Neither bar() nor bar'() ever see a `Return` message sent over the wire.
thirdParty @8 :RecipientId;
thirdParty @7 :RecipientId;
# **(level 3)**
#
# The call's result should be returned to a different vat. The receiver (the callee) expects
......@@ -425,18 +416,18 @@ struct Resolve {
# Unlike all other instances of `ExportId` sent from the exporter, the `Resolve` message does
# _not_ increase the reference count of `promiseId`.
#
# When a promise ID is first sent over the wire (e.g. in a `CapDescriptor`), the sender (exporter)
# guarantees that it will follow up at some point with exactly one `Resolve` message. If the
# When an export ID sent over the wire (e.g. in a `CapDescriptor`) is indicated to be a promise,
# this indicates that the sender will follow up at some point with a `Resolve` message. If the
# same `promiseId` is sent again before `Resolve`, still only one `Resolve` is sent. If the
# same ID is sent again later _after_ a `Resolve`, it can only be because the export's
# reference count hit zero in the meantime and the ID was re-assigned to a new export, therefore
# this later promise does _not_ correspond to the earlier `Resolve`.
#
# If a promise ID's reference count reaches zero before a `Resolve` is sent, the `Resolve`
# message must still be sent, and the ID cannot be reused in the meantime. Moreover, the object
# to which the promise resolved itself needs to be released, even if the promise was already
# released before it resolved. (Although, the exporter may notice that the promise was released
# and send a `canceled` resolution, in which case nothing new is exported.)
# message may or may not still be sent. If it is sent, the ID cannot be reused before hand
# (except to represent the same promise, of course). If it is never sent, then the ID can be
# reused to represent a new object. If a `Resolve` is sent, the object to which the promise
# resolved needs to be released, even if the promise was already released before it resolved.
#
# RPC implementations should keep in mind when they receive a `Resolve` that the promise ID may
# have appeared in a previous question or answer which the application has not consumed yet.
......@@ -453,10 +444,6 @@ struct Resolve {
exception @2 :Exception;
# Indicates that the promise was broken.
canceled @3 :Void;
# Indicates that this promise won't be resolved because its reference count reached zero before
# it had completed, so the operation was canceled.
}
}
......@@ -514,21 +501,14 @@ struct Disembargo {
# reduced. (In the two-party loopback case, the `Disembargo` message is just a more explicit way
# of accomplishing the same thing as a no-op call, but isn't any faster.)
target :union {
# What is to be disembargoed.
exportedCap @0 :ExportId;
# An exported capability.
promisedAnswer @1 :PromisedAnswer;
# A capability expected to be returned in the answer to an outstanding question.
}
target @0 :MessageTarget;
# What is to be disembargoed.
using EmbargoId = UInt32;
# Used in `senderLoopback` and `receiverLoopback`, below.
context :union {
senderLoopback @2 :EmbargoId;
senderLoopback @1 :EmbargoId;
# The sender is requesting a disembargo on a promise which is known to resolve back to a
# capability hoste by the sender. As soon as the receiver has echoed back all pipelined calls
# on this promise, it will deliver the Disembargo back to the sender with `receiverLoopback`
......@@ -539,11 +519,11 @@ struct Disembargo {
# The receiver must verify that the target capability actually resolves back to the sender's
# vat. Otherwise, the sender has committed a protocol error and should be disconnected.
receiverLoopback @3 :EmbargoId;
receiverLoopback @2 :EmbargoId;
# The receiver previously sent a `senderLoopback` Disembargo towards a promise resolving to
# this capability, and that Disembargo is now being echoed back.
accept @4 :Void;
accept @3 :Void;
# **(level 3)**
#
# The sender is requesting a disembargo on a promise which is known to resolve to a third-party
......@@ -554,7 +534,7 @@ struct Disembargo {
#
# See `Accept.embargo` for an example.
provide @5 :QuestionId;
provide @4 :QuestionId;
# **(level 3)**
#
# The sender is requesting a disembargo on a capability currently being provided to a third
......@@ -578,15 +558,8 @@ struct Save {
# A new question ID identifying this request, which will eventually receive a Return
# message whose `results` is a SturdyRef.
target :union {
# What is to be saved.
exportedCap @1 :ExportId;
# An exported capability.
promisedAnswer @2 :PromisedAnswer;
# A capability expected to be returned in the answer to an outstanding question.
}
target @1 :MessageTarget;
# What is to be saved.
}
struct Restore {
......@@ -657,17 +630,10 @@ struct Provide {
# at some point send a `Finish` message as with any other call, and such a message can be
# used to cancel the whole operation.
target :union {
# What is to be provided to the third party.
exportedCap @1 :ExportId;
# An exported capability.
target @1 :MessageTarget;
# What is to be provided to the third party.
promisedAnswer @2 :PromisedAnswer;
# A capability expected to be returned in the results of an outstanding question.
}
recipient @3 :RecipientId;
recipient @2 :RecipientId;
# Identity of the third party which is expected to pick up the capability.
}
......@@ -797,6 +763,22 @@ struct Join {
# ========================================================================================
# Common structures used in messages
struct MessageTarget {
# The target of a `Call` or other messages that target a capability.
union {
exportedCap @0 :ExportId;
# This message is to a capability or promise previously exported by the receiver.
promisedAnswer @1 :PromisedAnswer;
# This message is to a capability that is expected to be returned by another call that has not
# yet been completed.
#
# At level 0, this is supported only for addressing the result of a previous `Restore`, so that
# initial startup doesn't require a round trip.
}
}
struct CapDescriptor {
# **(level 1)**
#
......@@ -808,6 +790,18 @@ struct CapDescriptor {
#
# Keep in mind that `ExportIds` in a `CapDescriptor` are subject to reference counting. See the
# description of `ExportId`.
#
# Note that CapDescriptors are commonly processed some time after they are received, because
# the RPC system must wait for the application to actually traverse the message and find the
# CapDescriptors. RPC implementations must keep in mind that the state of the Four Tables may
# change between when a message is received and when a CapDescriptor within it is actually
# extracted. To that end, the RPC system may need to keep track of a list of relevant state
# changes that have happened in the meantime, including:
# - `Resolve` messages that have replaced an imported promise referenced by `senderPromise`.
# - `Release` messages that have released an export referenced by `receiverHosted`.
# - `Finish` messages that have released an answer referenced by `receiverAnswer`.
# Applications are advised not to hold on to received messages long-term as this could cause
# state logs to accumulate.
union {
senderHosted @0 :ExportId;
......
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment