Commit 540b1887 authored by Kenton Varda's avatar Kenton Varda

Use UnwindDetector to protect against exceptions in all these RPC object desturctors.

parent 2d1c9a53
......@@ -735,28 +735,30 @@ private:
: RpcClient(connectionState), importId(importId) {}
~ImportClient() noexcept(false) {
// Remove self from the import table, if the table is still pointing at us. (It's possible
// that another thread attempted to obtain this import just as the destructor started, in
// which case that other thread will have constructed a new ImportClient and placed it in
// the import table. Therefore, we must actually verify that the import table points at
// this object.)
KJ_IF_MAYBE(import, connectionState->imports.find(importId)) {
KJ_IF_MAYBE(i, import->importClient) {
if (i == this) {
connectionState->imports.erase(importId);
unwindDetector.catchExceptionsIfUnwinding([&]() {
// Remove self from the import table, if the table is still pointing at us. (It's possible
// that another thread attempted to obtain this import just as the destructor started, in
// which case that other thread will have constructed a new ImportClient and placed it in
// the import table. Therefore, we must actually verify that the import table points at
// this object.)
KJ_IF_MAYBE(import, connectionState->imports.find(importId)) {
KJ_IF_MAYBE(i, import->importClient) {
if (i == this) {
connectionState->imports.erase(importId);
}
}
}
}
// Send a message releasing our remote references.
if (remoteRefcount > 0) {
auto message = connectionState->connection->newOutgoingMessage(
messageSizeHint<rpc::Release>());
rpc::Release::Builder builder = message->getBody().initAs<rpc::Message>().initRelease();
builder.setId(importId);
builder.setReferenceCount(remoteRefcount);
message->send();
}
// Send a message releasing our remote references.
if (remoteRefcount > 0) {
auto message = connectionState->connection->newOutgoingMessage(
messageSizeHint<rpc::Release>());
rpc::Release::Builder builder = message->getBody().initAs<rpc::Message>().initRelease();
builder.setId(importId);
builder.setReferenceCount(remoteRefcount);
message->send();
}
});
}
void addRemoteRef() {
......@@ -794,6 +796,8 @@ private:
uint remoteRefcount = 0;
// Number of times we've received this import from the peer.
kj::UnwindDetector unwindDetector;
};
class PipelineClient final: public RpcClient {
......@@ -1372,13 +1376,15 @@ private:
CapInjectorImpl(RpcConnectionState& connectionState)
: connectionState(connectionState) {}
~CapInjectorImpl() noexcept(false) {
kj::Vector<kj::Own<ResolutionChain>> thingsToRelease(exports.size());
unwindDetector.catchExceptionsIfUnwinding([&]() {
kj::Vector<kj::Own<ResolutionChain>> thingsToRelease(exports.size());
if (connectionState.networkException == nullptr) {
for (auto exportId: exports) {
thingsToRelease.add(connectionState.releaseExport(exportId, 1));
if (connectionState.networkException == nullptr) {
for (auto exportId: exports) {
thingsToRelease.add(connectionState.releaseExport(exportId, 1));
}
}
}
});
}
bool hasCaps() {
......@@ -1451,6 +1457,8 @@ private:
kj::Vector<ExportId> exports;
// IDs of objects exported during finishDescriptors(). These will need to be released later.
kj::UnwindDetector unwindDetector;
static const void* identity(const rpc::CapDescriptor::Reader& desc) {
// TODO(cleanup): Don't rely on internal APIs here.
return _::PointerHelpers<rpc::CapDescriptor>::getInternalReader(desc).getLocation();
......@@ -1471,40 +1479,42 @@ private:
: connectionState(kj::addRef(connectionState)), id(id), fulfiller(kj::mv(fulfiller)) {}
~QuestionRef() {
if (connectionState->networkException != nullptr) {
return;
}
unwindDetector.catchExceptionsIfUnwinding([&]() {
if (connectionState->networkException != nullptr) {
return;
}
// Send the "Finish" message.
{
uint retainedListSizeHint = resultCaps
.map([](kj::Own<CapExtractorImpl>& caps) { return caps->retainedListSizeHint(true); })
.orDefault(0);
auto message = connectionState->connection->newOutgoingMessage(
messageSizeHint<rpc::Finish>() + retainedListSizeHint);
auto builder = message->getBody().getAs<rpc::Message>().initFinish();
builder.setQuestionId(id);
CapExtractorImpl::FinalizedRetainedCaps retainedCaps;
KJ_IF_MAYBE(caps, resultCaps) {
retainedCaps = caps->get()->finalizeRetainedCaps(
Orphanage::getForMessageContaining(builder));
// Send the "Finish" message.
{
uint retainedListSizeHint = resultCaps
.map([](kj::Own<CapExtractorImpl>& caps) { return caps->retainedListSizeHint(true); })
.orDefault(0);
auto message = connectionState->connection->newOutgoingMessage(
messageSizeHint<rpc::Finish>() + retainedListSizeHint);
auto builder = message->getBody().getAs<rpc::Message>().initFinish();
builder.setQuestionId(id);
CapExtractorImpl::FinalizedRetainedCaps retainedCaps;
KJ_IF_MAYBE(caps, resultCaps) {
retainedCaps = caps->get()->finalizeRetainedCaps(
Orphanage::getForMessageContaining(builder));
}
builder.adoptRetainedCaps(kj::mv(retainedCaps.exportList));
message->send();
}
builder.adoptRetainedCaps(kj::mv(retainedCaps.exportList));
message->send();
}
// Check if the question has returned and, if so, remove it from the table.
// Remove question ID from the table. Must do this *after* sending `Finish` to ensure that
// the ID is not re-allocated before the `Finish` message can be sent.
auto& question = KJ_ASSERT_NONNULL(
connectionState->questions.find(id), "Question ID no longer on table?");
if (question.paramCaps == nullptr) {
// Call has already returned, so we can now remove it from the table.
KJ_ASSERT(connectionState->questions.erase(id));
} else {
question.selfRef = nullptr;
}
// Check if the question has returned and, if so, remove it from the table.
// Remove question ID from the table. Must do this *after* sending `Finish` to ensure that
// the ID is not re-allocated before the `Finish` message can be sent.
auto& question = KJ_ASSERT_NONNULL(
connectionState->questions.find(id), "Question ID no longer on table?");
if (question.paramCaps == nullptr) {
// Call has already returned, so we can now remove it from the table.
KJ_ASSERT(connectionState->questions.erase(id));
} else {
question.selfRef = nullptr;
}
});
}
inline QuestionId getId() const { return id; }
......@@ -1533,6 +1543,7 @@ private:
QuestionId id;
kj::Own<kj::PromiseFulfiller<kj::Promise<kj::Own<RpcResponse>>>> fulfiller;
kj::Maybe<kj::Own<CapExtractorImpl>> resultCaps;
kj::UnwindDetector unwindDetector;
};
class RpcRequest final: public RequestHook {
......@@ -2811,18 +2822,20 @@ public:
}
~Impl() noexcept(false) {
// std::unordered_map doesn't like it when elements' destructors throw, so carefully
// disassemble it.
if (!connections.empty()) {
kj::Vector<kj::Own<RpcConnectionState>> deleteMe(connections.size());
kj::Exception shutdownException(
kj::Exception::Nature::LOCAL_BUG, kj::Exception::Durability::PERMANENT,
__FILE__, __LINE__, kj::str("RpcSystem was destroyed."));
for (auto& entry: connections) {
entry.second->disconnect(kj::cp(shutdownException));
deleteMe.add(kj::mv(entry.second));
unwindDetector.catchExceptionsIfUnwinding([&]() {
// std::unordered_map doesn't like it when elements' destructors throw, so carefully
// disassemble it.
if (!connections.empty()) {
kj::Vector<kj::Own<RpcConnectionState>> deleteMe(connections.size());
kj::Exception shutdownException(
kj::Exception::Nature::LOCAL_BUG, kj::Exception::Durability::PERMANENT,
__FILE__, __LINE__, kj::str("RpcSystem was destroyed."));
for (auto& entry: connections) {
entry.second->disconnect(kj::cp(shutdownException));
deleteMe.add(kj::mv(entry.second));
}
}
}
});
}
Capability::Client restore(_::StructReader hostId, ObjectPointer::Reader objectId) {
......@@ -2850,6 +2863,8 @@ private:
ConnectionMap;
ConnectionMap connections;
kj::UnwindDetector unwindDetector;
RpcConnectionState& getConnectionState(kj::Own<VatNetworkBase::Connection>&& connection) {
auto iter = connections.find(connection);
if (iter == connections.end()) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment