Commit 9575df0c authored by Kenton Varda's avatar Kenton Varda

More RPC protocol WIP.

parent 7298c7fa
......@@ -111,7 +111,8 @@ kj::Exception toException(const rpc::Exception::Reader& exception) {
}
void fromException(const kj::Exception& exception, rpc::Exception::Builder builder) {
builder.setReason(exception.getDescription());
// TODO(someday): Indicate the remote server name as part of the stack trace.
builder.setReason(kj::str("remote exception: ", exception.getDescription()));
builder.setIsCallersFault(exception.getNature() == kj::Exception::Nature::PRECONDITION);
switch (exception.getDurability()) {
case kj::Exception::Durability::PERMANENT:
......@@ -258,7 +259,7 @@ struct Export {
uint refcount = 0;
// When this reaches 0, drop `clientHook` and free this export.
kj::Own<ClientHook> clientHook;
kj::Own<const ClientHook> clientHook;
inline bool operator==(decltype(nullptr)) const { return refcount == 0; }
inline bool operator!=(decltype(nullptr)) const { return refcount != 0; }
......@@ -273,7 +274,7 @@ struct Import {
// =======================================================================================
class RpcConnectionState: public kj::TaskSet::ErrorHandler {
class RpcConnectionState final: public kj::TaskSet::ErrorHandler {
public:
RpcConnectionState(const kj::EventLoop& eventLoop,
kj::Own<VatNetworkBase::Connection>&& connection)
......@@ -282,8 +283,18 @@ public:
tasks.add(messageLoop());
}
kj::Own<const ClientHook> restore(_::StructReader ref) {
// TODO(now)
}
void taskFailed(kj::Exception&& exception) override {
// TODO(now): Kill the connection.
// - All present and future questions must complete with exceptions.
// - All answers should be canceled (if they allow cancellation).
// - All exports are dropped.
// - All imported promises resolve to exceptions.
// - Send abort message.
// - Remove from connection map.
}
private:
......@@ -302,6 +313,9 @@ private:
ImportTable<QuestionId, Answer<RpcCallContext>> answers;
ExportTable<ExportId, Export> exports;
ImportTable<ExportId, Import<ImportClient>> imports;
std::unordered_map<const ClientHook*, ExportId> exportsByCap;
// Maps already-exported ClientHook objects to their ID in the export table.
};
kj::MutexGuarded<Tables> tables;
......@@ -342,11 +356,16 @@ private:
RpcClient(const RpcConnectionState& connectionState)
: connectionState(connectionState) {}
virtual void writeDescriptor(rpc::CapDescriptor::Builder descriptor, Tables& tables) const = 0;
virtual kj::Maybe<ExportId> writeDescriptor(
rpc::CapDescriptor::Builder descriptor, Tables& tables) const = 0;
// Writes a CapDescriptor referencing this client. Must be called with the
// RpcConnectionState's table locked -- a reference to them is passed as the second argument.
// The CapDescriptor must be sent before unlocking the tables, as it may become invalid at
// any time once the tables are unlocked.
//
// If writing the descriptor adds a new export to the export table, or increments the refcount
// on an existing one, then the ID is returned and the caller is responsible for removing it
// later.
virtual kj::Maybe<kj::Own<const ClientHook>> writeTarget(
rpc::Call::Target::Builder target) const = 0;
......@@ -456,8 +475,10 @@ private:
}
}
void writeDescriptor(rpc::CapDescriptor::Builder descriptor, Tables& tables) const override {
kj::Maybe<ExportId> writeDescriptor(
rpc::CapDescriptor::Builder descriptor, Tables& tables) const override {
descriptor.setReceiverHosted(importId);
return nullptr;
}
kj::Maybe<kj::Own<const ClientHook>> writeTarget(
......@@ -550,7 +571,8 @@ private:
state.getWithoutLock().init<Waiting>(kj::mv(pipeline));
}
void writeDescriptor(rpc::CapDescriptor::Builder descriptor, Tables& tables) const override {
kj::Maybe<ExportId> writeDescriptor(
rpc::CapDescriptor::Builder descriptor, Tables& tables) const override {
auto lock = state.lockShared();
if (lock->is<Waiting>()) {
......@@ -558,7 +580,8 @@ private:
} else if (lock->is<Resolved>()) {
return connectionState.writeDescriptor(lock->get<Resolved>()->addRef(), descriptor, tables);
} else {
// TODO(now)
return connectionState.writeDescriptor(newBrokenCap(kj::cp(lock->get<Broken>())),
descriptor, tables);
}
}
......@@ -639,17 +662,30 @@ private:
}
};
void writeDescriptor(kj::Own<const ClientHook> cap, rpc::CapDescriptor::Builder descriptor,
kj::Maybe<ExportId> writeDescriptor(
kj::Own<const ClientHook> cap, rpc::CapDescriptor::Builder descriptor,
Tables& tables) const {
// Write a descriptor for the given capability. The tables must be locked by the caller and
// passed in as a parameter.
if (cap->getBrand() == this) {
kj::downcast<const RpcClient>(*cap).writeDescriptor(descriptor, tables);
return kj::downcast<const RpcClient>(*cap).writeDescriptor(descriptor, tables);
} else {
auto iter = tables.exportsByCap.find(cap);
if (iter != tables.exportsByCap.end()) {
auto& exp = KJ_ASSERT_NONNULL(tables.exports.find(iter->second));
++exp.refcount;
// TODO(now): Check if it's a promise.
descriptor.setSenderHosted(iter->second);
return iter->second;
} else {
// TODO(now): We have to figure out if the client is already in our table.
// TODO(now): We have to add a refcount to the export, and return an object that decrements
// that refcount later.
ExportId exportId;
auto& exp = tables.exports.next(exportId);
exp.refcount = 1;
exp.clientHook = kj::mv(cap);
descriptor.setSenderHosted(exportId);
return exportId;
}
}
}
......@@ -780,8 +816,21 @@ private:
}
case rpc::CapDescriptor::RECEIVER_ANSWER: {
// TODO(now): implement
return newBrokenCap("'receiverAnswer' not implemented");
auto lock = connectionState.tables.lockExclusive();
auto promisedAnswer = descriptor.getReceiverAnswer();
KJ_IF_MAYBE(answer, lock->answers.find(promisedAnswer.getQuestionId())) {
if (answer->active) {
KJ_IF_MAYBE(pipeline, answer->pipeline) {
KJ_IF_MAYBE(ops, toPipelineOps(promisedAnswer.getTransform())) {
return pipeline->get()->getPipelinedCap(*ops);
} else {
return newBrokenCap("unrecognized pipeline ops");
}
}
}
}
return newBrokenCap("invalid 'receiverAnswer'");
}
case rpc::CapDescriptor::THIRD_PARTY_HOSTED:
......@@ -808,14 +857,37 @@ private:
public:
CapInjectorImpl(const RpcConnectionState& connectionState)
: connectionState(connectionState) {}
~CapInjectorImpl() {}
~CapInjectorImpl() noexcept(false) {
kj::Vector<kj::Own<const ClientHook>> clientsToRelease(exports.size());
auto lock = connectionState.tables.lockExclusive();
for (auto exportId: exports) {
auto& exp = KJ_ASSERT_NONNULL(lock->exports.find(exportId));
if (--exp.refcount == 0) {
clientsToRelease.add(kj::mv(exp.clientHook));
lock->exports.erase(exportId);
}
}
}
bool hasCaps() {
// Return true if the message contains any capabilities. (If not, it may be possible to
// release earlier.)
void finish(Tables& tables) {
return !caps.getWithoutLock().empty();
}
void finishDescriptors(Tables& tables) {
// Finish writing all of the CapDescriptors. Must be called with the tables locked, and the
// message must be sent before the tables are unlocked.
exports = kj::Vector<ExportId>(caps.getWithoutLock().size());
for (auto& entry: caps.getWithoutLock()) {
connectionState.writeDescriptor(kj::mv(entry.second.cap), entry.second.builder, tables);
KJ_IF_MAYBE(exportId, connectionState.writeDescriptor(
kj::mv(entry.second.cap), entry.second.builder, tables)) {
exports.add(*exportId);
}
}
}
......@@ -836,56 +908,6 @@ private:
auto iter = lock->find(identity(descriptor));
KJ_REQUIRE(iter != lock->end(), "getInjectedCap() called on descriptor I didn't write.");
return iter->second.cap->addRef();
switch (descriptor.which()) {
case rpc::CapDescriptor::SENDER_HOSTED: {
auto lock = connectionState.tables.lockExclusive();
KJ_IF_MAYBE(exp, lock->exports.find(descriptor.getSenderHosted())) {
return exp->clientHook->addRef();
} else {
KJ_FAIL_REQUIRE("Dropped descriptor had invalid 'senderHosted'.") {
return newBrokenCap("Calling invalid CapDescriptor found in builder.");
}
}
}
case rpc::CapDescriptor::RECEIVER_HOSTED: {
auto lock = connectionState.tables.lockExclusive();
KJ_IF_MAYBE(import, lock->imports.find(descriptor.getReceiverHosted())) {
if (import->client != nullptr) {
KJ_IF_MAYBE(ref, kj::tryAddRef(*import->client)) {
return kj::mv(*ref);
}
}
}
// If we wrote this CapDescriptor then we should hold a reference to the import in
// our `receiverHosted` table, yet it seems that the import ID is not valid. Something
// is wrong.
return newBrokenCap("CapDescriptor in builder had invalid 'receiverHosted'.");
}
case rpc::CapDescriptor::RECEIVER_ANSWER: {
auto promisedAnswer = descriptor.getReceiverAnswer();
auto lock = connectionState.tables.lockExclusive();
KJ_IF_MAYBE(question, lock->questions.find(promisedAnswer.getQuestionId())) {
KJ_IF_MAYBE(pipeline, question->pipeline) {
KJ_IF_MAYBE(ops, toPipelineOps(promisedAnswer.getTransform())) {
return pipeline->getPipelinedCap(kj::mv(*ops));
}
}
}
return newBrokenCap("CapDescriptor in builder had invalid PromisedAnswer.");
}
default:
KJ_FAIL_REQUIRE("I don't think I wrote this descriptor.") {
return newBrokenCap("CapDescriptor in builder was invalid.");
}
break;
}
}
void dropCap(rpc::CapDescriptor::Reader descriptor) const override {
caps.lockExclusive()->erase(identity(descriptor));
......@@ -898,19 +920,26 @@ private:
rpc::CapDescriptor::Builder builder;
kj::Own<const ClientHook> cap;
CapInfo(): builder(nullptr) {}
CapInfo(rpc::CapDescriptor::Builder& builder, kj::Own<const ClientHook>&& cap)
: builder(builder), cap(kj::mv(cap)) {}
CapInfo(const CapInfo& other);
CapInfo(const CapInfo& other) = delete;
// Work around problem where std::pair complains about the copy constructor requiring a
// non-const argument due to `builder` inheriting kj::DisableConstCopy. The copy constructor
// should never be called anyway.
// should be deleted anyway because `cap` is not copyable.
CapInfo(CapInfo&& other) = default;
};
kj::MutexGuarded<std::map<const void*, CapInfo>> caps;
// Maps CapDescriptor locations to embedded caps. The descriptors aren't actually filled in
// until just before the message is sent.
kj::Vector<ExportId> exports;
// IDs of objects exported during finishDescriptors(). These will need to be released later.
static const void* identity(const rpc::CapDescriptor::Reader& desc) {
// TODO(cleanup): Don't rely on internal APIs here.
return _::PointerHelpers<rpc::CapDescriptor>::getInternalReader(desc).getLocation();
......@@ -1010,7 +1039,7 @@ private:
replacement.set(paramsBuilder);
return replacement.send();
} else {
injector->finish(*lock);
injector->finishDescriptors(*lock);
auto paf = kj::newPromiseAndFulfiller<kj::Own<RpcResponse>>(connectionState.eventLoop);
auto& question = lock->questions.next(questionId);
......@@ -1105,7 +1134,7 @@ private:
}
}
void writeDescriptor(rpc::CapDescriptor::Builder descriptor, Tables& tables,
kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor, Tables& tables,
kj::ArrayPtr<const PipelineOp> ops) const {
auto lock = state.lockExclusive();
if (lock->is<Waiting>()) {
......@@ -1113,12 +1142,14 @@ private:
promisedAnswer.setQuestionId(lock->get<Waiting>());
promisedAnswer.adoptTransform(fromPipelineOps(
Orphanage::getForMessageContaining(descriptor), ops));
return nullptr;
} else if (lock->is<Resolved>()) {
connectionState.writeDescriptor(lock->get<Resolved>()->getResults().getPipelinedCap(ops),
return connectionState.writeDescriptor(
lock->get<Resolved>()->getResults().getPipelinedCap(ops),
descriptor, tables);
} else {
connectionState.writeDescriptor(newBrokenCap(kj::cp(lock->get<Broken>())),
descriptor, tables);
return connectionState.writeDescriptor(
newBrokenCap(kj::cp(lock->get<Broken>())), descriptor, tables);
}
}
......@@ -1223,9 +1254,13 @@ private:
return builder;
}
bool hasCaps() {
return injector.hasCaps();
}
void send() {
auto lock = connectionState.tables.lockExclusive();
injector.finish(*lock);
injector.finishDescriptors(*lock);
message->send();
}
......@@ -1428,6 +1463,8 @@ private:
// The first time it is called, removes self from the answer table and returns true.
// On subsequent calls, returns false.
kj::Own<const PipelineHook> pipelineToRelease;
if (responseSent) {
return false;
} else {
......@@ -1455,7 +1492,18 @@ private:
lock->answers.erase(questionId);
} else {
// We just have to null out callContext.
lock->answers[questionId].callContext = nullptr;
auto& answer = lock->answers[questionId];
answer.callContext = nullptr;
// If the response has no capabilities in it, then we should also delete the pipeline
// so that the context can be released sooner.
KJ_IF_MAYBE(r, response) {
if (!r->get()->hasCaps()) {
KJ_IF_MAYBE(pipeline, answer.pipeline) {
pipelineToRelease = kj::mv(*pipeline);
}
}
}
}
return true;
......@@ -1470,12 +1518,12 @@ private:
return connection->receiveIncomingMessage().then(
[this](kj::Own<IncomingRpcMessage>&& message) {
handleMessage(kj::mv(message));
// TODO(now): Don't repeat messageLoop in case of exception.
}).then([this]() {
// No exceptions; continue loop.
//
// (We do this in a separate continuation to handle the case where exceptions are
// disabled.)
tasks.add(messageLoop());
}, [this](kj::Exception&& exception) {
// TODO(now): This probably isn't right.
connection = nullptr;
kj::throwRecoverableException(kj::mv(exception));
});
}
......@@ -1502,6 +1550,14 @@ private:
handleFinish(reader.getFinish());
break;
case rpc::Message::RESOLVE:
// TODO(now)
break;
case rpc::Message::RELEASE:
// TODO(now)
break;
default: {
auto message = connection->newOutgoingMessage(
reader.totalSizeInWords() + messageSizeHint<void>());
......@@ -1513,7 +1569,15 @@ private:
}
void handleUnimplemented(const rpc::Message::Reader& message) {
// TODO(now)
switch (message.which()) {
case rpc::Message::RESOLVE:
// TODO(now): Release the resolution.
break;
default:
KJ_FAIL_ASSERT("Peer did not implement required RPC message type.", (uint)message.which());
break;
}
}
void handleAbort(const rpc::Exception::Reader& exception) {
......@@ -1550,7 +1614,8 @@ private:
KJ_IF_MAYBE(p, base.pipeline) {
pipeline = p->get()->addRef();
} else {
KJ_FAIL_REQUIRE("PromisedAnswer.questionId is already finished.") {
KJ_FAIL_REQUIRE("PromisedAnswer.questionId is already finished or contained no "
"capabilities.") {
return;
}
}
......@@ -1637,14 +1702,32 @@ private:
}
}
// TODO(now): Handle exception/cancel response
switch (ret.which()) {
case rpc::Return::ANSWER:
question->fulfiller->fulfill(
kj::refcounted<RpcResponse>(*this, kj::mv(message), ret.getAnswer()));
break;
auto response = kj::refcounted<RpcResponse>(*this, kj::mv(message), ret.getAnswer());
question->fulfiller->fulfill(kj::mv(response));
case rpc::Return::EXCEPTION:
question->fulfiller->reject(toException(ret.getException()));
break;
case rpc::Return::CANCELED:
KJ_REQUIRE(question->isFinished,
"Return message falsely claims call was canceled.") { return; }
// We don't bother fulfilling the result. If someone is somehow still waiting on it
// (shouldn't be possible), that's OK: they'll get an exception due to the fulfiller
// being destroyed.
break;
default:
KJ_FAIL_REQUIRE("Unknown return type (not answer, exception, or canceled).") { return; }
}
if (question->pipeline == nullptr) {
if (question->isFinished) {
lock->questions.erase(ret.getQuestionId());
}
} else {
KJ_FAIL_REQUIRE("Invalid question ID in Return message.") { return; }
}
......@@ -1695,14 +1778,30 @@ public:
Impl(VatNetworkBase& network, SturdyRefRestorerBase& restorer, const kj::EventLoop& eventLoop)
: network(network), restorer(restorer), eventLoop(eventLoop) {}
Capability::Client connect(_::StructReader reader) {
// TODO(now)
Capability::Client connect(_::StructReader ref) {
auto connection = network.baseConnectToHostOf(ref);
auto lock = connections.lockExclusive();
auto iter = lock->find(connection);
RpcConnectionState* state;
if (iter == lock->end()) {
VatNetworkBase::Connection* connectionPtr = connection;
auto newState = kj::heap<RpcConnectionState>(eventLoop, kj::mv(connection));
state = newState;
lock->insert(std::make_pair(connectionPtr, kj::mv(newState)));
} else {
state = iter->second;
}
return Capability::Client(state->restore(ref));
}
private:
VatNetworkBase& network;
SturdyRefRestorerBase& restorer;
const kj::EventLoop& eventLoop;
kj::MutexGuarded<std::unordered_map<VatNetworkBase::Connection*, kj::Own<RpcConnectionState>>>
connections;
};
RpcSystemBase::RpcSystemBase(VatNetworkBase& network, SturdyRefRestorerBase& restorer,
......@@ -1710,8 +1809,8 @@ RpcSystemBase::RpcSystemBase(VatNetworkBase& network, SturdyRefRestorerBase& res
: impl(kj::heap<Impl>(network, restorer, eventLoop)) {}
RpcSystemBase::~RpcSystemBase() noexcept(false) {}
Capability::Client RpcSystemBase::baseConnect(_::StructReader reader) {
return impl->connect(reader);
Capability::Client RpcSystemBase::baseConnect(_::StructReader ref) {
return impl->connect(ref);
}
} // namespace _ (private)
......
......@@ -67,7 +67,7 @@ public:
virtual kj::Own<Connection> baseAcceptIntroducedConnection(
ObjectPointer::Reader recipientId) = 0;
};
virtual kj::Own<Connection> baseConnectToHostOf(ObjectPointer::Reader ref) = 0;
virtual kj::Own<Connection> baseConnectToHostOf(_::StructReader ref) = 0;
virtual kj::Promise<kj::Own<Connection>> baseAcceptConnectionAsRefHost() = 0;
};
......@@ -86,7 +86,7 @@ private:
class Impl;
kj::Own<Impl> impl;
Capability::Client baseConnect(_::StructReader reader);
Capability::Client baseConnect(_::StructReader ref);
// TODO(someday): Maybe define a public API called `TypelessStruct` so we don't have to rely
// on `_::StructReader` here?
......@@ -220,7 +220,7 @@ public:
private:
kj::Own<_::VatNetworkBase::Connection>
baseConnectToHostOf(ObjectPointer::Reader ref) override final;
baseConnectToHostOf(_::StructReader ref) override final;
kj::Promise<kj::Own<_::VatNetworkBase::Connection>>
baseAcceptConnectionAsRefHost() override final;
};
......@@ -317,8 +317,8 @@ template <typename SturdyRef, typename ProvisionId, typename RecipientId,
typename ThirdPartyCapId, typename JoinAnswer>
kj::Own<_::VatNetworkBase::Connection>
VatNetwork<SturdyRef, ProvisionId, RecipientId, ThirdPartyCapId, JoinAnswer>::
baseConnectToHostOf(ObjectPointer::Reader ref) {
return connectToHostOf(ref.getAs<SturdyRef>());
baseConnectToHostOf(_::StructReader ref) {
return connectToHostOf(typename SturdyRef::Reader(ref));
}
template <typename SturdyRef, typename ProvisionId, typename RecipientId,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment