Commit 85a2ec20 authored by Kenton Varda's avatar Kenton Varda

Cancellation isn't evil, just misunderstood.

parent 51ef79f6
...@@ -411,8 +411,8 @@ private: ...@@ -411,8 +411,8 @@ private:
// Delete this promise to cancel the call. // Delete this promise to cancel the call.
kj::Maybe<const RpcCallContext&> callContext; kj::Maybe<const RpcCallContext&> callContext;
// The call context, if it's still active. Becomes null when the `Return` message is sent. This // The call context, if it's still active. Becomes null when the `Return` message is sent.
// object, if non-null, is owned by `asyncOp`. // This object, if non-null, is owned by `asyncOp`.
kj::Maybe<kj::Own<CapInjectorImpl>> resultCaps; kj::Maybe<kj::Own<CapInjectorImpl>> resultCaps;
// Set when `Return` is sent, free when `Finish` is received. // Set when `Return` is sent, free when `Finish` is received.
...@@ -1877,6 +1877,26 @@ private: ...@@ -1877,6 +1877,26 @@ private:
params(requestCapContext.imbue(params)), params(requestCapContext.imbue(params)),
returnMessage(nullptr) {} returnMessage(nullptr) {}
~RpcCallContext() noexcept(false) {
if (isFirstResponder()) {
// We haven't sent a return yet, so we must have been canceled. Send a cancellation return.
unwindDetector.catchExceptionsIfUnwinding([&]() {
auto message = connectionState->connection->newOutgoingMessage(
requestCapExtractor.retainedListSizeHint(true) + messageSizeHint<rpc::Return>());
auto builder = message->getBody().initAs<rpc::Message>().initReturn();
builder.setQuestionId(questionId);
auto retainedCaps = requestCapExtractor.finalizeRetainedCaps(
Orphanage::getForMessageContaining(builder));
builder.adoptRetainedCaps(kj::mv(retainedCaps.exportList));
builder.setCanceled();
message->send();
cleanupAnswerTable(connectionState->tables.lockExclusive(), nullptr);
});
}
}
void sendReturn() { void sendReturn() {
if (isFirstResponder()) { if (isFirstResponder()) {
if (response == nullptr) getResults(1); // force initialization of response if (response == nullptr) getResults(1); // force initialization of response
...@@ -1909,22 +1929,6 @@ private: ...@@ -1909,22 +1929,6 @@ private:
cleanupAnswerTable(connectionState->tables.lockExclusive(), nullptr); cleanupAnswerTable(connectionState->tables.lockExclusive(), nullptr);
} }
} }
void sendCancel() {
if (isFirstResponder()) {
auto message = connectionState->connection->newOutgoingMessage(
requestCapExtractor.retainedListSizeHint(true) + messageSizeHint<rpc::Return>());
auto builder = message->getBody().initAs<rpc::Message>().initReturn();
builder.setQuestionId(questionId);
auto retainedCaps = requestCapExtractor.finalizeRetainedCaps(
Orphanage::getForMessageContaining(builder));
builder.adoptRetainedCaps(kj::mv(retainedCaps.exportList));
builder.setCanceled();
message->send();
cleanupAnswerTable(connectionState->tables.lockExclusive(), nullptr);
}
}
void requestCancel() const { void requestCancel() const {
// Hints that the caller wishes to cancel this call. At the next time when cancellation is // Hints that the caller wishes to cancel this call. At the next time when cancellation is
...@@ -2009,9 +2013,9 @@ private: ...@@ -2009,9 +2013,9 @@ private:
auto promise = request->send(); auto promise = request->send();
// Link pipelines. // Link pipelines.
// KJ_IF_MAYBE(f, tailCallPipelineFulfiller) { KJ_IF_MAYBE(f, tailCallPipelineFulfiller) {
// f->get()->fulfill(kj::mv(kj::implicitCast<ObjectPointer::Pipeline&>(promise))); f->get()->fulfill(kj::mv(kj::implicitCast<ObjectPointer::Pipeline&>(promise)));
// } }
// Wait for response. // Wait for response.
return promise.then([this](Response<ObjectPointer>&& tailResponse) { return promise.then([this](Response<ObjectPointer>&& tailResponse) {
...@@ -2085,6 +2089,8 @@ private: ...@@ -2085,6 +2089,8 @@ private:
// on an application promise continuation callback to finish executing, which could take // on an application promise continuation callback to finish executing, which could take
// arbitrary time. // arbitrary time.
kj::UnwindDetector unwindDetector;
// ----------------------------------------------------- // -----------------------------------------------------
void scheduleCancel() const { void scheduleCancel() const {
...@@ -2109,9 +2115,13 @@ private: ...@@ -2109,9 +2115,13 @@ private:
// we try to schedule doCancel() on the application thread, so that it won't need to block. // we try to schedule doCancel() on the application thread, so that it won't need to block.
asyncOp = nullptr; asyncOp = nullptr;
// OK, now that we know the call isn't running in another thread, we can drop our thread // The `Return` will be sent when the context is destroyed. That might be right now, when
// safety and send a return message. // `self` goes out of scope. However, it is also possible that the pipeline is still in
const_cast<RpcCallContext*>(this)->sendCancel(); // use: although `Finish` removes the pipeline reference from the answer table, it might
// be held by an outstanding pipelined call, or by a pipelined promise that was echoed back
// to us later (as a `receiverAnswer` in a `CapDescriptor`), or it may be held in the
// resolution chain. In all of these cases, the call will continue running until those
// references are dropped or the call completes.
}); });
} }
......
...@@ -393,9 +393,12 @@ struct Finish { ...@@ -393,9 +393,12 @@ struct Finish {
# pipelined requests). # pipelined requests).
# 2) Any capabilities in the results other than the ones listed below should be implicitly # 2) Any capabilities in the results other than the ones listed below should be implicitly
# released. # released.
# 3) If the call has not returned yet, the caller no longer cares about the result, so the # 3) If the call has not returned yet, the caller no longer cares about the result. If nothing
# callee may wish to immediately cancel the operation and send back a Return message with # else cares about the result either (e.g. there are to other outstanding calls pipelined on
# "canceled" set. # the result of this one) then the callee may wish to immediately cancel the operation and
# send back a Return message with "canceled" set. However, implementations are not requried
# to support premature cancellation -- instead, the implementation may wait until the call
# actually completes and send a normal `Return` message.
# #
# TODO(soon): Should we separate (1) and (2)? It would be possible and useful to notify the # TODO(soon): Should we separate (1) and (2)? It would be possible and useful to notify the
# server that it doesn't need to keep around the response to service pipeline requests even # server that it doesn't need to keep around the response to service pipeline requests even
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment