Commit 6def52fd authored by Kenton Varda's avatar Kenton Varda

Misc tweaks.

parent 0d849158
......@@ -152,13 +152,16 @@ public:
}
}
bool erase(Id id) {
kj::Maybe<T> erase(Id id) {
// Remove an entry from the table and return it. We return it so that the caller can be
// careful to release it (possibly invoking arbitrary destructors) at a time that makes sense.
if (id < slots.size() && slots[id] != nullptr) {
kj::Maybe<T> toRelease = kj::mv(slots[id]);
slots[id] = T();
freeIds.push(id);
return true;
return toRelease;
} else {
return false;
return nullptr;
}
}
......@@ -213,11 +216,17 @@ public:
}
}
void erase(Id id) {
T erase(Id id) {
// Remove an entry from the table and return it. We return it so that the caller can be
// careful to release it (possibly invoking arbitrary destructors) at a time that makes sense.
if (id < kj::size(low)) {
T toRelease = kj::mv(low[id]);
low[id] = T();
return toRelease;
} else {
T toRelease = kj::mv(high[id]);
high.erase(id);
return toRelease;
}
}
......@@ -1366,11 +1375,9 @@ private:
: connectionState(connectionState) {}
~CapInjectorImpl() noexcept(false) {
unwindDetector.catchExceptionsIfUnwinding([&]() {
kj::Vector<kj::Own<ResolutionChain>> thingsToRelease(exports.size());
if (connectionState.networkException == nullptr) {
for (auto exportId: exports) {
thingsToRelease.add(connectionState.releaseExport(exportId, 1));
connectionState.releaseExport(exportId, 1);
}
}
});
......@@ -1499,7 +1506,7 @@ private:
connectionState->questions.find(id), "Question ID no longer on table?");
if (question.paramCaps == nullptr) {
// Call has already returned, so we can now remove it from the table.
KJ_ASSERT(connectionState->questions.erase(id));
connectionState->questions.erase(id);
} else {
question.selfRef = nullptr;
}
......@@ -1942,9 +1949,8 @@ private:
Orphanage::getForMessageContaining(returnMessage));
returnMessage.adoptRetainedCaps(kj::mv(retainedCaps.exportList));
kj::Own<PipelineHook> pipelineToRelease;
cleanupAnswerTable(
kj::downcast<RpcServerResponseImpl>(*KJ_ASSERT_NONNULL(response)).send(), true);
cleanupAnswerTable(kj::downcast<RpcServerResponseImpl>(
*KJ_ASSERT_NONNULL(response)).send(), true);
}
}
void sendErrorReturn(kj::Exception&& exception) {
......@@ -2152,32 +2158,24 @@ private:
// answer table. Or we might even be responsible for removing the entire answer table
// entry.
kj::Own<PipelineHook> pipelineToRelease;
Answer answerToDelete;
if (cancellationFlags & CANCEL_REQUESTED) {
answerToDelete = kj::mv(connectionState->answers[questionId]);
// Erase from the table.
// Already received `Finish` so it's our job to erase the table entry.
connectionState->answers.erase(questionId);
} else {
// We just have to null out callContext.
auto& answer = connectionState->answers[questionId];
answer.callContext = nullptr;
answer.resultCaps = kj::mv(resultCaps);
// If the response has capabilities, we need to arrange to keep the CapInjector around
// until the `Finish` message.
// If the response has no capabilities in it, then we should also delete the pipeline
// so that the context can be released sooner.
if (freePipelineIfNoCaps &&
!resultCaps.map([](kj::Own<CapInjectorImpl>& i) { return i->hasCaps(); })
.orDefault(false)) {
KJ_IF_MAYBE(pipeline, answer.pipeline) {
pipelineToRelease = kj::mv(*pipeline);
}
!answer.resultCaps.map([](kj::Own<CapInjectorImpl>& i) { return i->hasCaps(); })
.orDefault(false)) {
answer.pipeline = nullptr;
}
answer.resultCaps = kj::mv(resultCaps);
}
}
};
......@@ -2431,6 +2429,8 @@ private:
}
void handleReturn(kj::Own<IncomingRpcMessage>&& message, const rpc::Return::Reader& ret) {
// Transitive destructors can end up manipulating the question table and invalidating our
// pointer into it, so make sure these destructors run later.
kj::Own<CapInjectorImpl> paramCapsToRelease;
kj::Maybe<kj::Promise<kj::Own<RpcResponse>>> promiseToRelease;
......@@ -2438,7 +2438,7 @@ private:
KJ_REQUIRE(question->paramCaps != nullptr, "Duplicate Return.") { return; }
KJ_IF_MAYBE(pc, question->paramCaps) {
// Release these later, after unlocking.
// Release these later, after we're done with the answer table.
paramCapsToRelease = kj::mv(*pc);
} else {
KJ_FAIL_REQUIRE("Duplicate return.") { return; }
......@@ -2524,6 +2524,8 @@ private:
}
void handleFinish(const rpc::Finish::Reader& finish) {
// Delay release of these things until return so that transitive destructors don't accidentally
// modify the answer table and invalidate our pointer into it.
kj::Own<ResolutionChain> chainToRelease;
Answer answerToRelease;
......@@ -2551,8 +2553,7 @@ private:
KJ_IF_MAYBE(context, answer->callContext) {
context->requestCancel();
} else {
answerToRelease = kj::mv(*answer);
answers.erase(finish.getQuestionId());
answerToRelease = answers.erase(finish.getQuestionId());
}
} else {
KJ_REQUIRE(answer->active, "'Finish' for invalid question ID.") { return; }
......@@ -2600,30 +2601,25 @@ private:
}
void handleRelease(const rpc::Release::Reader& release) {
auto chainToRelease = releaseExport(release.getId(), release.getReferenceCount());
releaseExport(release.getId(), release.getReferenceCount());
}
kj::Own<ResolutionChain> releaseExport(ExportId id, uint refcount) {
kj::Own<ResolutionChain> result;
void releaseExport(ExportId id, uint refcount) {
KJ_IF_MAYBE(exp, exports.find(id)) {
KJ_REQUIRE(refcount <= exp->refcount, "Tried to drop export's refcount below zero.") {
return result;
return;
}
exp->refcount -= refcount;
if (exp->refcount == 0) {
exportsByCap.erase(exp->clientHook);
result = kj::mv(resolutionChainTail);
resolutionChainTail = result->addRelease(id, kj::mv(exp->clientHook));
auto client = kj::mv(exp->clientHook);
exports.erase(id);
return result;
} else {
return result;
resolutionChainTail = resolutionChainTail->addRelease(id, kj::mv(client));
}
} else {
KJ_FAIL_REQUIRE("Tried to release invalid export ID.") {
return result;
return;
}
}
}
......
......@@ -195,17 +195,15 @@ class EventLoop {
// construct some promises and wait on the result. Example:
//
// int main() {
// // `loop` becomes the official EventLoop for the thread.
// SimpleEventLoop loop;
//
// // Most code that does I/O needs to be run from within an
// // EventLoop, so it can use Promise::then(). So, we need to
// // use `evalLater()` to run `getHttp()` inside the event
// // loop.
// Promise<String> textPromise = loop.evalLater(
// []() { return getHttp("http://example.com"); });
// // Now we can call an async function.
// Promise<String> textPromise = getHttp("http://example.com");
//
// // Now we can wait for the promise to complete.
// String text = loop.wait(kj::mv(textPromise));
// // And we can wait for the promise to complete. Note that you can only use `wait()`
// // from the top level, not from inside a promise callback.
// String text = textPromise.wait();
// print(text);
// return 0;
// }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment