Commit fe3f7212 authored by Kenton Varda's avatar Kenton Varda

Just realized isCanceled() doesn't work. Remove it.

parent 533feaa4
......@@ -189,7 +189,7 @@ TEST(Capability, TailCall) {
}
TEST(Capability, AsyncCancelation) {
// Tests allowAsyncCancellation().
// Tests allowCancellation().
kj::EventLoop loop;
kj::WaitScope waitScope(loop);
......@@ -206,10 +206,10 @@ TEST(Capability, AsyncCancelation) {
bool returned = false;
{
auto request = client.expectAsyncCancelRequest();
auto request = client.expectCancelRequest();
request.setCap(test::TestInterface::Client(kj::heap<TestCapDestructor>(kj::mv(paf.fulfiller))));
promise = request.send().then(
[&](Response<test::TestMoreStuff::ExpectAsyncCancelResults>&& response) {
[&](Response<test::TestMoreStuff::ExpectCancelResults>&& response) {
returned = true;
}).eagerlyEvaluate(nullptr);
}
......@@ -227,48 +227,6 @@ TEST(Capability, AsyncCancelation) {
EXPECT_FALSE(returned);
}
TEST(Capability, SyncCancelation) {
// Tests isCanceled() without allowAsyncCancellation().
kj::EventLoop loop;
kj::WaitScope waitScope(loop);
int callCount = 0;
int innerCallCount = 0;
test::TestMoreStuff::Client client(kj::heap<TestMoreStuffImpl>(callCount));
kj::Promise<void> promise = nullptr;
bool returned = false;
{
auto request = client.expectSyncCancelRequest();
request.setCap(test::TestInterface::Client(kj::heap<TestInterfaceImpl>(innerCallCount)));
promise = request.send().then(
[&](Response<test::TestMoreStuff::ExpectSyncCancelResults>&& response) {
returned = true;
}).eagerlyEvaluate(nullptr);
}
kj::evalLater([]() {}).wait(waitScope);
kj::evalLater([]() {}).wait(waitScope);
kj::evalLater([]() {}).wait(waitScope);
kj::evalLater([]() {}).wait(waitScope);
// expectSyncCancel() will make a call to the TestInterfaceImpl only once it noticed isCanceled()
// is true.
EXPECT_EQ(0, innerCallCount);
EXPECT_FALSE(returned);
promise = nullptr; // request cancellation
kj::evalLater([]() {}).wait(waitScope);
kj::evalLater([]() {}).wait(waitScope);
kj::evalLater([]() {}).wait(waitScope);
kj::evalLater([]() {}).wait(waitScope);
EXPECT_EQ(1, innerCallCount);
EXPECT_FALSE(returned);
}
// =======================================================================================
TEST(Capability, DynamicClient) {
......
......@@ -143,13 +143,10 @@ public:
tailCallPipelineFulfiller = kj::mv(paf.fulfiller);
return kj::mv(paf.promise);
}
void allowAsyncCancellation() override {
KJ_REQUIRE(request == nullptr, "Must call releaseParams() before allowAsyncCancellation().");
void allowCancellation() override {
KJ_REQUIRE(request == nullptr, "Must call releaseParams() before allowCancellation().");
cancelAllowedFulfiller->fulfill();
}
bool isCanceled() override {
return cancelRequested;
}
kj::Own<CallContextHook> addRef() override {
return kj::addRef(*this);
}
......@@ -160,20 +157,6 @@ public:
kj::Own<ClientHook> clientRef;
kj::Maybe<kj::Own<kj::PromiseFulfiller<AnyPointer::Pipeline>>> tailCallPipelineFulfiller;
kj::Own<kj::PromiseFulfiller<void>> cancelAllowedFulfiller;
bool cancelRequested = false;
class Canceler {
public:
Canceler(kj::Own<LocalCallContext>&& context): context(kj::mv(context)) {}
Canceler(Canceler&&) = default;
~Canceler() {
if (context) context->cancelRequested = true;
}
private:
kj::Own<LocalCallContext> context;
};
};
class LocalRequest final: public RequestHook {
......@@ -209,15 +192,11 @@ public:
.detach([](kj::Exception&&) {}); // ignore exceptions
// Now the other branch returns the response from the context.
auto contextPtr = context.get();
auto promise = forked.addBranch().then([contextPtr]() {
contextPtr->getResults(1); // force response allocation
return kj::mv(KJ_ASSERT_NONNULL(contextPtr->response));
});
// We also want to notify the context that cancellation was requested if this branch is
// destroyed.
promise = promise.attach(LocalCallContext::Canceler(kj::mv(context)));
auto promise = forked.addBranch().then(kj::mvCapture(context,
[](kj::Own<LocalCallContext>&& context) {
context->getResults(1); // force response allocation
return kj::mv(KJ_ASSERT_NONNULL(context->response));
}));
// We return the other branch.
return RemotePromise<AnyPointer>(
......
......@@ -237,7 +237,7 @@ public:
// In general, this should be the last thing a method implementation calls, and the promise
// returned from `tailCall()` should then be returned by the method implementation.
void allowAsyncCancellation();
void allowCancellation();
// Indicate that it is OK for the RPC system to discard its Promise for this call's result if
// the caller cancels the call, thereby transitively canceling any asynchronous operations the
// call implementation was performing. This is not done by default because it could represent a
......@@ -249,16 +249,19 @@ public:
// executing on a local thread. The method must perform an asynchronous operation or call
// `EventLoop::current().runLater()` to yield control.
//
// Currently, you must call `releaseParams()` before `allowAsyncCancellation()`, otherwise the
// Currently, you must call `releaseParams()` before `allowCancellation()`, otherwise the
// latter will throw an exception. This is a limitation of the current RPC implementation, but
// this requirement could be lifted in the future.
bool isCanceled();
// As an alternative to `allowAsyncCancellation()`, a server can call this to check for
// cancellation.
//
// Keep in mind that if the method is blocking the event loop, the cancel message won't be
// received, so it is necessary to use `EventLoop::current().runLater()` occasionally.
// Note: You might think that we should offer `onCancel()` and/or `isCanceled()` methods that
// provide notification when the caller cancels the request without forcefully killing off the
// promise chain. Unfortunately, this composes poorly with promise forking: the canceled
// path may be just one branch of a fork of the result promise. The other branches still want
// the call to continue. Promise forking is used within the Cap'n Proto implementation -- in
// particular each pipelined call forks the result promise. So, if a caller made a pipelined
// call and then dropped the original object, the call should not be canceled, but it would be
// excessively complicated for the framework to avoid notififying of cancellation as long as
// pipelined calls still exist.
private:
CallContextHook* hook;
......@@ -353,7 +356,7 @@ public:
//
// Since the caller of this method chooses the CallContext implementation, it is the caller's
// responsibility to ensure that the returned promise is not canceled unless allowed via
// the context's `allowAsyncCancellation()`.
// the context's `allowCancellation()`.
//
// The call must not begin synchronously, as the caller may hold arbitrary mutexes.
......@@ -390,8 +393,7 @@ public:
virtual void releaseParams() = 0;
virtual AnyPointer::Builder getResults(uint firstSegmentWordSize) = 0;
virtual kj::Promise<void> tailCall(kj::Own<RequestHook>&& request) = 0;
virtual void allowAsyncCancellation() = 0;
virtual bool isCanceled() = 0;
virtual void allowCancellation() = 0;
virtual kj::Promise<AnyPointer::Pipeline> onTailCall() = 0;
// If `tailCall()` is called, resolves to the PipelineHook from the tail call. An
......@@ -621,12 +623,8 @@ inline kj::Promise<void> CallContext<Params, Results>::tailCall(
return hook->tailCall(kj::mv(tailRequest.hook));
}
template <typename Params, typename Results>
inline void CallContext<Params, Results>::allowAsyncCancellation() {
hook->allowAsyncCancellation();
}
template <typename Params, typename Results>
inline bool CallContext<Params, Results>::isCanceled() {
return hook->isCanceled();
inline void CallContext<Params, Results>::allowCancellation() {
hook->allowCancellation();
}
template <typename Params, typename Results>
......
......@@ -539,8 +539,7 @@ public:
Orphanage getResultsOrphanage(uint firstSegmentWordSize = 0);
template <typename SubParams>
kj::Promise<void> tailCall(Request<SubParams, DynamicStruct>&& tailRequest);
void allowAsyncCancellation();
bool isCanceled();
void allowCancellation();
private:
CallContextHook* hook;
......@@ -1540,11 +1539,8 @@ inline kj::Promise<void> CallContext<DynamicStruct, DynamicStruct>::tailCall(
Request<SubParams, DynamicStruct>&& tailRequest) {
return hook->tailCall(kj::mv(tailRequest.hook));
}
inline void CallContext<DynamicStruct, DynamicStruct>::allowAsyncCancellation() {
hook->allowAsyncCancellation();
}
inline bool CallContext<DynamicStruct, DynamicStruct>::isCanceled() {
return hook->isCanceled();
inline void CallContext<DynamicStruct, DynamicStruct>::allowCancellation() {
hook->allowCancellation();
}
template <>
......
......@@ -564,8 +564,8 @@ TEST(Rpc, TailCall) {
EXPECT_EQ(1, context.restorer.callCount);
}
TEST(Rpc, AsyncCancelation) {
// Tests allowAsyncCancellation().
TEST(Rpc, Cancelation) {
// Tests allowCancellation().
TestContext context;
......@@ -580,10 +580,10 @@ TEST(Rpc, AsyncCancelation) {
bool returned = false;
{
auto request = client.expectAsyncCancelRequest();
auto request = client.expectCancelRequest();
request.setCap(kj::heap<TestCapDestructor>(kj::mv(paf.fulfiller)));
promise = request.send().then(
[&](Response<test::TestMoreStuff::ExpectAsyncCancelResults>&& response) {
[&](Response<test::TestMoreStuff::ExpectCancelResults>&& response) {
returned = true;
}).eagerlyEvaluate(nullptr);
}
......@@ -605,49 +605,6 @@ TEST(Rpc, AsyncCancelation) {
EXPECT_FALSE(returned);
}
TEST(Rpc, SyncCancelation) {
// Tests isCanceled() without allowAsyncCancellation().
TestContext context;
int innerCallCount = 0;
auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF)
.castAs<test::TestMoreStuff>();
kj::Promise<void> promise = nullptr;
bool returned = false;
{
auto request = client.expectSyncCancelRequest();
request.setCap(kj::heap<TestInterfaceImpl>(innerCallCount));
promise = request.send().then(
[&](Response<test::TestMoreStuff::ExpectSyncCancelResults>&& response) {
returned = true;
}).eagerlyEvaluate(nullptr);
}
kj::evalLater([]() {}).wait(context.waitScope);
kj::evalLater([]() {}).wait(context.waitScope);
kj::evalLater([]() {}).wait(context.waitScope);
kj::evalLater([]() {}).wait(context.waitScope);
kj::evalLater([]() {}).wait(context.waitScope);
kj::evalLater([]() {}).wait(context.waitScope);
// expectSyncCancel() will make a call to the TestInterfaceImpl only once it noticed isCanceled()
// is true.
EXPECT_EQ(0, innerCallCount);
EXPECT_FALSE(returned);
promise = nullptr; // request cancellation
kj::evalLater([]() {}).wait(context.waitScope);
kj::evalLater([]() {}).wait(context.waitScope);
kj::evalLater([]() {}).wait(context.waitScope);
kj::evalLater([]() {}).wait(context.waitScope);
EXPECT_EQ(1, innerCallCount);
EXPECT_FALSE(returned);
}
TEST(Rpc, PromiseResolve) {
TestContext context;
......
......@@ -574,7 +574,7 @@ private:
context->releaseParams();
// We can and should propagate cancellation.
context->allowAsyncCancellation();
context->allowCancellation();
return context->directTailCall(RequestHook::from(kj::mv(request)));
}
......@@ -1721,7 +1721,7 @@ private:
tailCallPipelineFulfiller = kj::mv(paf.fulfiller);
return kj::mv(paf.promise);
}
void allowAsyncCancellation() override {
void allowCancellation() override {
// TODO(cleanup): We need to drop the request because it is holding on to the resolution
// chain which in turn holds on to the pipeline which holds on to this object thus
// preventing cancellation from working. This is a bit silly because obviously our
......@@ -1730,7 +1730,7 @@ private:
// a call started doesn't really need to hold the call open. To fix this we'd presumably
// need to make the answer table snapshot-able and have CapExtractorImpl take a snapshot
// at creation.
KJ_REQUIRE(request == nullptr, "Must call releaseParams() before allowAsyncCancellation().");
KJ_REQUIRE(request == nullptr, "Must call releaseParams() before allowCancellation().");
bool previouslyRequestedButNotAllowed = cancellationFlags == CANCEL_REQUESTED;
cancellationFlags |= CANCEL_ALLOWED;
......@@ -1741,9 +1741,6 @@ private:
cancelFulfiller->fulfill();
}
}
bool isCanceled() override {
return cancellationFlags & CANCEL_REQUESTED;
}
kj::Own<CallContextHook> addRef() override {
return kj::addRef(*this);
}
......@@ -1993,7 +1990,7 @@ private:
}));
// If the call that later picks up `redirectedResults` decides to discard it, we need to
// make sure our call is not itself canceled unless it has called allowAsyncCancellation().
// make sure our call is not itself canceled unless it has called allowCancellation().
// So we fork the promise and join one branch with the cancellation promise, in order to
// hold on to it.
auto forked = resultsPromise.fork();
......
......@@ -1009,17 +1009,14 @@ kj::Promise<void> TestMoreStuffImpl::callFooWhenResolved(CallFooWhenResolvedCont
kj::Promise<void> TestMoreStuffImpl::neverReturn(NeverReturnContext context) {
++callCount;
auto paf = kj::newPromiseAndFulfiller<void>();
neverFulfill = kj::mv(paf.fulfiller);
// Attach `cap` to the promise to make sure it is released.
paf.promise = paf.promise.attach(context.getParams().getCap());
auto promise = kj::Promise<void>(kj::NEVER_DONE).attach(context.getParams().getCap());
// Also attach `cap` to the result struct to make sure that is released.
context.getResults().setCapCopy(context.getParams().getCap());
context.allowAsyncCancellation();
return kj::mv(paf.promise);
context.allowCancellation();
return kj::mv(promise);
}
kj::Promise<void> TestMoreStuffImpl::hold(HoldContext context) {
......@@ -1059,45 +1056,18 @@ kj::Promise<void> TestMoreStuffImpl::echo(EchoContext context) {
return kj::READY_NOW;
}
kj::Promise<void> TestMoreStuffImpl::expectAsyncCancel(ExpectAsyncCancelContext context) {
auto cap = context.getParams().getCap();
context.releaseParams();
context.allowAsyncCancellation();
return loop(0, cap, context);
}
kj::Promise<void> TestMoreStuffImpl::loop(uint depth, test::TestInterface::Client cap,
ExpectAsyncCancelContext context) {
if (depth > 100) {
ADD_FAILURE() << "Looped too long, giving up.";
return kj::READY_NOW;
} else {
return kj::evalLater([=]() mutable {
return loop(depth + 1, cap, context);
});
}
}
kj::Promise<void> TestMoreStuffImpl::expectSyncCancel(ExpectSyncCancelContext context) {
kj::Promise<void> TestMoreStuffImpl::expectCancel(ExpectCancelContext context) {
auto cap = context.getParams().getCap();
context.releaseParams();
context.allowCancellation();
return loop(0, cap, context);
}
kj::Promise<void> TestMoreStuffImpl::loop(uint depth, test::TestInterface::Client cap,
ExpectSyncCancelContext context) {
ExpectCancelContext context) {
if (depth > 100) {
ADD_FAILURE() << "Looped too long, giving up.";
return kj::READY_NOW;
} else if (context.isCanceled()) {
auto request = cap.fooRequest();
request.setI(123);
request.setJ(true);
return request.send().then(
[](Response<test::TestInterface::FooResults>&& response) mutable {
EXPECT_EQ("foo", response.getX());
});
} else {
return kj::evalLater([=]() mutable {
return loop(depth + 1, cap, context);
......
......@@ -226,19 +226,13 @@ public:
kj::Promise<void> echo(EchoContext context) override;
kj::Promise<void> expectAsyncCancel(ExpectAsyncCancelContext context) override;
kj::Promise<void> expectSyncCancel(ExpectSyncCancelContext context) override;
kj::Promise<void> expectCancel(ExpectCancelContext context) override;
private:
int& callCount;
kj::Own<kj::PromiseFulfiller<void>> neverFulfill;
test::TestInterface::Client clientToHold = nullptr;
kj::Promise<void> loop(uint depth, test::TestInterface::Client cap,
CallContext<ExpectAsyncCancelParams, ExpectAsyncCancelResults> context);
kj::Promise<void> loop(uint depth, test::TestInterface::Client cap,
CallContext<ExpectSyncCancelParams, ExpectSyncCancelResults> context);
kj::Promise<void> loop(uint depth, test::TestInterface::Client cap, ExpectCancelContext context);
};
class TestCapDestructor final: public test::TestInterface::Server {
......
......@@ -655,12 +655,8 @@ interface TestMoreStuff extends(TestCallOrder) {
echo @6 (cap :TestCallOrder) -> (cap :TestCallOrder);
# Just returns the input cap.
expectAsyncCancel @7 (cap :TestInterface) -> ();
expectCancel @7 (cap :TestInterface) -> ();
# evalLater()-loops forever, holding `cap`. Must be canceled.
expectSyncCancel @8 (cap :TestInterface) -> ();
# evalLater()-loops until context.isCanceled() returns true, then makes a call to `cap` before
# returning.
}
struct TestSturdyRefHostId {
......
......@@ -176,13 +176,16 @@ private:
void detach(kj::Promise<void>&& promise);
void waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, WaitScope& waitScope);
Promise<void> yield();
Own<PromiseNode> neverDone();
class NeverDone {
public:
template <typename T>
operator Promise<T>() const;
operator Promise<T>() const {
return Promise<T>(false, neverDone());
}
void wait() KJ_NORETURN;
void wait(WaitScope& waitScope) KJ_NORETURN;
};
} // namespace _ (private)
......
......@@ -75,7 +75,7 @@ public:
}
};
class NeverReadyPromiseNode final: public _::PromiseNode {
class NeverDonePromiseNode final: public _::PromiseNode {
public:
void onReady(_::Event& event) noexcept override {
// ignore
......@@ -331,6 +331,16 @@ Promise<void> yield() {
return Promise<void>(false, kj::heap<YieldPromiseNode>());
}
Own<PromiseNode> neverDone() {
return kj::heap<NeverDonePromiseNode>();
}
void NeverDone::wait(WaitScope& waitScope) {
ExceptionOr<Void> dummy;
waitImpl(neverDone(), dummy, waitScope);
KJ_UNREACHABLE;
}
void detach(kj::Promise<void>&& promise) {
EventLoop& loop = currentEventLoop();
KJ_REQUIRE(loop.daemons.get() != nullptr, "EventLoop is shutting down.") { return; }
......
......@@ -289,6 +289,7 @@ private:
friend class _::ForkHub;
friend class _::TaskSetImpl;
friend Promise<void> _::yield();
friend class _::NeverDone;
};
template <typename T>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment