Commit 0df23c4f authored by Kenton Varda's avatar Kenton Varda

Implement 'Canceler' for managing side-band promise cancelation.

This is necessary when someone other than the promise owner might need to destroy the state a promise is operating on. It comes up when implementing pumpTo() for userland pipes -- cancelling the pump needs to cancel any writes currently passing through the pipe.
parent 394643ba
...@@ -580,6 +580,29 @@ TEST(Async, ArrayJoinVoid) { ...@@ -580,6 +580,29 @@ TEST(Async, ArrayJoinVoid) {
promise.wait(waitScope); promise.wait(waitScope);
} }
TEST(Async, Canceler) {
EventLoop loop;
WaitScope waitScope(loop);
Canceler canceler;
auto never = canceler.wrap(kj::Promise<void>(kj::NEVER_DONE));
auto now = canceler.wrap(kj::Promise<void>(kj::READY_NOW));
auto neverI = canceler.wrap(kj::Promise<void>(kj::NEVER_DONE).then([]() { return 123u; }));
auto nowI = canceler.wrap(kj::Promise<uint>(123u));
KJ_EXPECT(!never.poll(waitScope));
KJ_EXPECT(now.poll(waitScope));
KJ_EXPECT(!neverI.poll(waitScope));
KJ_EXPECT(nowI.poll(waitScope));
canceler.cancel("foobar");
KJ_EXPECT_THROW_MESSAGE("foobar", never.wait(waitScope));
now.wait(waitScope);
KJ_EXPECT_THROW_MESSAGE("foobar", neverI.wait(waitScope));
KJ_EXPECT(nowI.wait(waitScope) == 123u);
}
class ErrorHandlerImpl: public TaskSet::ErrorHandler { class ErrorHandlerImpl: public TaskSet::ErrorHandler {
public: public:
uint exceptionCount = 0; uint exceptionCount = 0;
......
...@@ -84,6 +84,62 @@ public: ...@@ -84,6 +84,62 @@ public:
} // namespace } // namespace
// =======================================================================================
Canceler::~Canceler() noexcept(false) {
cancel("operation canceled");
}
void Canceler::cancel(StringPtr cancelReason) {
if (isEmpty()) return;
cancel(Exception(Exception::Type::FAILED, __FILE__, __LINE__, kj::str(cancelReason)));
}
void Canceler::cancel(const Exception& exception) {
for (;;) {
KJ_IF_MAYBE(a, list) {
list = a->next;
a->prev = nullptr;
a->next = nullptr;
a->cancel(kj::cp(exception));
} else {
break;
}
}
}
void Canceler::release() {
for (;;) {
KJ_IF_MAYBE(a, list) {
list = a->next;
a->prev = nullptr;
a->next = nullptr;
} else {
break;
}
}
}
Canceler::AdapterBase::AdapterBase(Canceler& canceler)
: prev(canceler.list),
next(canceler.list) {
canceler.list = *this;
KJ_IF_MAYBE(n, next) {
n->prev = next;
}
}
Canceler::AdapterBase::~AdapterBase() noexcept(false) {
KJ_IF_MAYBE(p, prev) {
*p = next;
}
KJ_IF_MAYBE(n, next) {
n->prev = prev;
}
}
// =======================================================================================
TaskSet::TaskSet(TaskSet::ErrorHandler& errorHandler) TaskSet::TaskSet(TaskSet::ErrorHandler& errorHandler)
: errorHandler(errorHandler) {} : errorHandler(errorHandler) {}
......
...@@ -500,6 +500,117 @@ PromiseFulfillerPair<T> newPromiseAndFulfiller(); ...@@ -500,6 +500,117 @@ PromiseFulfillerPair<T> newPromiseAndFulfiller();
// fulfiller will be of type `PromiseFulfiller<Promise<U>>`. Thus you pass a `Promise<U>` to the // fulfiller will be of type `PromiseFulfiller<Promise<U>>`. Thus you pass a `Promise<U>` to the
// `fulfill()` callback, and the promises are chained. // `fulfill()` callback, and the promises are chained.
// =======================================================================================
// Canceler
class Canceler {
// A Canceler can wrap some set of Promises and then forcefully cancel them on-demand, or
// implicitly when the Canceler is destroyed.
//
// The cancellation is done in such a way that once cancel() (or the Canceler's destructor)
// returns, it's guaranteed that the promise has already been canceled and destroyed. This
// guarantee is important for enforcing ownership constraints. For example, imagine that Alice
// calls a method on Bob that returns a Promise. That Promise encapsulates a task that uses Bob's
// internal state. But, imagine that Alice does not own Bob, and indeed Bob might be destroyed
// at random without Alice having canceled the promise. In this case, it is necessary for Bob to
// ensure that the promise will be forcefully canceled. Bob can do this by constructing a
// Canceler and using it to wrap promises before returning them to callers. When Bob is
// destroyed, the Canceler is destroyed too, and all promises Bob wrapped with it throw errors.
//
// Note that another common strategy for cancelation is to use exclusiveJoin() to join a promise
// with some "cancellation promise" which only resolves if the operation should be canceled. The
// cancellation promise could itself be created by newPromiseAndFulfiller<void>(), and thus
// calling the PromiseFulfiller cancels the operation. There is a major problem with this
// approach: upon invoking the fulfiller, an arbitrary amount of time may pass before the
// exclusive-joined promise actually resolves and cancels its other fork. During that time, the
// task might continue to execute. If it holds pointers to objects that have been destroyed, this
// might cause segfaults. Thus, it is safer to use a Canceler.
public:
inline Canceler() {}
~Canceler() noexcept(false);
KJ_DISALLOW_COPY(Canceler);
template <typename T>
Promise<T> wrap(Promise<T> promise) {
return newAdaptedPromise<T, AdapterImpl<T>>(*this, kj::mv(promise));
}
void cancel(StringPtr cancelReason);
void cancel(const Exception& exception);
// Cancel all previously-wrapped promises that have not already completed, causing them to throw
// the given exception. If you provide just a description message instead of an exception, then
// an exception object will be constructed from it -- but only if there are requests to cancel.
void release();
// Releases previously-wrapped promises, so that they will not be canceled regardless of what
// happens to this Canceler.
bool isEmpty() { return list == nullptr; }
// Indicates if any previously-wrapped promises are still executing. (If this returns false, then
// cancel() would be a no-op.)
private:
class AdapterBase {
public:
AdapterBase(Canceler& canceler);
~AdapterBase() noexcept(false);
virtual void cancel(Exception&& e) = 0;
private:
Maybe<Maybe<AdapterBase&>&> prev;
Maybe<AdapterBase&> next;
friend class Canceler;
};
template <typename T>
class AdapterImpl: public AdapterBase {
public:
AdapterImpl(PromiseFulfiller<T>& fulfiller,
Canceler& canceler, Promise<T> inner)
: AdapterBase(canceler),
fulfiller(fulfiller),
inner(inner.then(
[&fulfiller](T&& value) { fulfiller.fulfill(kj::mv(value)); },
[&fulfiller](Exception&& e) { fulfiller.reject(kj::mv(e)); })
.eagerlyEvaluate(nullptr)) {}
void cancel(Exception&& e) override {
fulfiller.reject(kj::mv(e));
inner = nullptr;
}
private:
PromiseFulfiller<T>& fulfiller;
Promise<void> inner;
};
Maybe<AdapterBase&> list;
};
template <>
class Canceler::AdapterImpl<void>: public AdapterBase {
public:
AdapterImpl(kj::PromiseFulfiller<void>& fulfiller,
Canceler& canceler, kj::Promise<void> inner)
: AdapterBase(canceler),
fulfiller(fulfiller),
inner(inner.then(
[&fulfiller]() { fulfiller.fulfill(); },
[&fulfiller](kj::Exception&& e) { fulfiller.reject(kj::mv(e)); })
.eagerlyEvaluate(nullptr)) {}
void cancel(kj::Exception&& e) override {
fulfiller.reject(kj::mv(e));
inner = nullptr;
}
private:
kj::PromiseFulfiller<void>& fulfiller;
kj::Promise<void> inner;
};
// ======================================================================================= // =======================================================================================
// TaskSet // TaskSet
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment