Commit 0d849158 authored by Kenton Varda's avatar Kenton Varda

Use non-atomic refcounting.

parent aa9c3115
......@@ -309,11 +309,8 @@ public:
// All current questions complete with exceptions.
questions.forEach([&](QuestionId id, Question& question) {
KJ_IF_MAYBE(questionRef, question.selfRef) {
// QuestionRef still present. Make sure it's not in the midst of being destroyed, then
// reject it.
KJ_IF_MAYBE(ownRef, kj::tryAddRef(*questionRef)) {
questionRef->reject(kj::cp(networkException));
}
// QuestionRef still present.
questionRef->reject(kj::cp(networkException));
}
KJ_IF_MAYBE(pc, question.paramCaps) {
capInjectorsToRelease.add(kj::mv(*pc));
......@@ -1200,13 +1197,11 @@ private:
// Check if the import still exists under this ID.
KJ_IF_MAYBE(import, connectionState.imports.find(importId)) {
KJ_IF_MAYBE(ic, import->importClient) {
KJ_IF_MAYBE(ref, kj::tryAddRef(*ic)) {
// Import indeed still exists! We'll return it in the retained caps, which means it
// now has a new remote ref.
ic->addRemoteRef();
*actualRetained++ = importId;
refs.add(kj::mv(*ref));
}
// Import indeed still exists! We'll return it in the retained caps, which means it
// now has a new remote ref.
ic->addRemoteRef();
*actualRetained++ = importId;
refs.add(kj::addRef(*ic));
}
}
}
......@@ -1275,15 +1270,10 @@ private:
// No recent resolutions. Check the import table then.
auto& import = connectionState.imports[importId];
KJ_IF_MAYBE(c, import.appClient) {
// The import is already on the table, but it could be being deleted in another
// thread.
KJ_IF_MAYBE(ref, kj::tryAddRef(*c)) {
// We successfully grabbed a reference to the import without it being deleted in
// another thread. Since this import already exists, we don't have to take
// responsibility for retaining it. We can just return the existing object and
// be done with it.
return kj::mv(*ref);
}
// The import is already on the table. Since this import already exists, we don't have
// to take responsibility for retaining it. We can just return the existing object and
// be done with it.
return kj::addRef(*c);
}
// No import for this ID exists currently, so create one.
......@@ -2442,7 +2432,7 @@ private:
void handleReturn(kj::Own<IncomingRpcMessage>&& message, const rpc::Return::Reader& ret) {
kj::Own<CapInjectorImpl> paramCapsToRelease;
kj::Promise<kj::Own<RpcResponse>> promiseToRelease = nullptr;
kj::Maybe<kj::Promise<kj::Own<RpcResponse>>> promiseToRelease;
KJ_IF_MAYBE(question, questions.find(ret.getQuestionId())) {
KJ_REQUIRE(question->paramCaps != nullptr, "Duplicate Return.") { return; }
......@@ -2462,87 +2452,69 @@ private:
}
}
switch (ret.which()) {
case rpc::Return::RESULTS:
KJ_REQUIRE(!question->isTailCall,
"Tail call `Return` must set `resultsSentElsewhere`, not `results`.");
KJ_IF_MAYBE(questionRef, question->selfRef) {
// The questionRef still exists, but could be being deleted in another thread.
KJ_IF_MAYBE(ownRef, kj::tryAddRef(*questionRef)) {
// Not being deleted.
questionRef->fulfill(kj::refcounted<RpcResponseImpl>(
*this, kj::mv(*ownRef), kj::mv(message), ret.getResults(),
kj::addRef(*resolutionChainTail)));
KJ_IF_MAYBE(questionRef, question->selfRef) {
switch (ret.which()) {
case rpc::Return::RESULTS:
KJ_REQUIRE(!question->isTailCall,
"Tail call `Return` must set `resultsSentElsewhere`, not `results`.") {
return;
}
}
break;
case rpc::Return::EXCEPTION:
KJ_REQUIRE(!question->isTailCall,
"Tail call `Return` must set `resultsSentElsewhere`, not `results`.");
questionRef->fulfill(kj::refcounted<RpcResponseImpl>(
*this, kj::addRef(*questionRef), kj::mv(message), ret.getResults(),
kj::addRef(*resolutionChainTail)));
break;
KJ_IF_MAYBE(questionRef, question->selfRef) {
// The questionRef still exists, but could be being deleted in another thread.
KJ_IF_MAYBE(ownRef, kj::tryAddRef(*questionRef)) {
questionRef->reject(toException(ret.getException()));
case rpc::Return::EXCEPTION:
KJ_REQUIRE(!question->isTailCall,
"Tail call `Return` must set `resultsSentElsewhere`, not `exception`.") {
return;
}
}
break;
case rpc::Return::CANCELED:
KJ_REQUIRE(!question->isTailCall,
"Tail call `Return` must set `resultsSentElsewhere`, not `results`.");
KJ_REQUIRE(question->selfRef == nullptr,
"Return message falsely claims call was canceled.") { return; }
// We don't bother fulfilling the result. If someone is somehow still waiting on it
// (shouldn't be possible), that's OK: they'll get an exception due to the fulfiller
// being destroyed.
break;
case rpc::Return::RESULTS_SENT_ELSEWHERE:
KJ_REQUIRE(question->isTailCall,
"`Return` had `resultsSentElsewhere` but this was not a tail call.");
KJ_IF_MAYBE(questionRef, question->selfRef) {
// The questionRef still exists, but could be being deleted in another thread.
KJ_IF_MAYBE(ownRef, kj::tryAddRef(*questionRef)) {
// Not being deleted. Tail calls are fulfilled with a null pointer.
questionRef->fulfill(kj::Own<RpcResponse>());
questionRef->reject(toException(ret.getException()));
break;
case rpc::Return::CANCELED:
KJ_FAIL_REQUIRE("Return message falsely claims call was canceled.") { return; }
break;
case rpc::Return::RESULTS_SENT_ELSEWHERE:
KJ_REQUIRE(question->isTailCall,
"`Return` had `resultsSentElsewhere` but this was not a tail call.") {
return;
}
}
break;
case rpc::Return::TAKE_FROM_OTHER_ANSWER:
KJ_IF_MAYBE(answer, answers.find(ret.getTakeFromOtherAnswer())) {
KJ_IF_MAYBE(response, answer->redirectedResults) {
// If we don't manage to fill in a questionRef here, we will want to release the
// promise.
promiseToRelease = kj::mv(*response);
KJ_IF_MAYBE(questionRef, question->selfRef) {
// The questionRef still exists, but could be being deleted in another thread.
KJ_IF_MAYBE(ownRef, kj::tryAddRef(*questionRef)) {
// Not being deleted.
questionRef->fulfill(kj::mv(promiseToRelease));
}
// Tail calls are fulfilled with a null pointer.
questionRef->fulfill(kj::Own<RpcResponse>());
break;
case rpc::Return::TAKE_FROM_OTHER_ANSWER:
KJ_IF_MAYBE(answer, answers.find(ret.getTakeFromOtherAnswer())) {
KJ_IF_MAYBE(response, answer->redirectedResults) {
questionRef->fulfill(kj::mv(*response));
} else {
KJ_FAIL_REQUIRE("`Return.takeFromOtherAnswer` referenced a call that did not "
"use `sendResultsTo.yourself`.") { return; }
}
} else {
KJ_FAIL_REQUIRE("`Return.takeFromOtherAnswer` referenced a call that did not "
"use `sendResultsTo.yourself`.") { return; }
KJ_FAIL_REQUIRE("`Return.takeFromOtherAnswer` had invalid answer ID.") { return; }
}
} else {
KJ_FAIL_REQUIRE("`Return.takeFromOtherAnswer` had invalid answer ID.") { return; }
}
break;
break;
default:
KJ_FAIL_REQUIRE("Unknown 'Return' type.") { return; }
}
default:
KJ_FAIL_REQUIRE("Unknown 'Return' type.") { return; }
}
} else {
if (ret.isTakeFromOtherAnswer()) {
// Be sure to release the tail call's promise.
KJ_IF_MAYBE(answer, answers.find(ret.getTakeFromOtherAnswer())) {
promiseToRelease = kj::mv(answer->redirectedResults);
}
}
if (question->selfRef == nullptr) {
// Looks like this question was canceled earlier, so `Finish` was already sent. We can go
// ahead and delete it from the table.
questions.erase(ret.getQuestionId());
}
......
......@@ -36,8 +36,8 @@ struct SetTrueInDestructor: public Refcounted {
TEST(Refcount, Basic) {
bool b = false;
Own<SetTrueInDestructor> ref1 = kj::refcounted<SetTrueInDestructor>(&b);
Own<const SetTrueInDestructor> ref2 = kj::addRef(*ref1);
Own<const SetTrueInDestructor> ref3 = kj::addRef(*ref2);
Own<SetTrueInDestructor> ref2 = kj::addRef(*ref1);
Own<SetTrueInDestructor> ref3 = kj::addRef(*ref2);
EXPECT_FALSE(b);
ref1 = Own<SetTrueInDestructor>();
......@@ -54,22 +54,4 @@ TEST(Refcount, Basic) {
#endif
}
TEST(Refcount, Weak) {
{
bool b = false;
SetTrueInDestructor obj(&b);
EXPECT_TRUE(tryAddRef(obj) == nullptr);
}
{
bool b = false;
Own<SetTrueInDestructor> ref = kj::refcounted<SetTrueInDestructor>(&b);
KJ_IF_MAYBE(ref2, tryAddRef(*ref)) {
EXPECT_EQ(ref.get(), ref2->get());
} else {
ADD_FAILURE() << "tryAddRef() failed.";
}
}
}
} // namespace kj
......@@ -32,29 +32,9 @@ Refcounted::~Refcounted() noexcept(false) {
}
void Refcounted::disposeImpl(void* pointer) const {
// Need to do a "release" decrement in order to release the object's state to any other thread
// which seeks to destroy it.
if (__atomic_sub_fetch(&refcount, 1, __ATOMIC_RELEASE) == 0) {
// This was the last reference. Acquire the memory so that we can destroy it.
__atomic_thread_fence(__ATOMIC_ACQUIRE);
if (--refcount == 0) {
delete this;
}
}
bool Refcounted::tryAddRefInternal() const {
// We want to increment the refcount, but only if it is non-zero. We have to use a cmpxchg for
// this.
uint old = __atomic_load_n(&refcount, __ATOMIC_RELAXED);
for (;;) {
if (old == 0) {
return false;
}
if (__atomic_compare_exchange_n(&refcount, &old, old + 1, true,
__ATOMIC_RELAXED, __ATOMIC_RELAXED)) {
return true;
}
}
}
} // namespace kj
......@@ -33,11 +33,12 @@ class Refcounted: private Disposer {
// `kj::refcounted<T>()` to allocate a new refcounted pointer.
//
// Do NOT use this lightly. Refcounting is a crutch. Good designs should strive to make object
// ownership clear, so that refcounting is not necessary. Keep in mind that reference counting
// must use atomic operations and therefore is surprisingly slow -- often slower than allocating
// a copy on the heap. All that said, reference counting can sometimes simplify code that would
// otherwise become convoluted with explicit ownership, even when ownership relationships are
// clear at an abstract level.
// ownership clear, so that refcounting is not necessary. All that said, reference counting can
// sometimes simplify code that would otherwise become convoluted with explicit ownership, even
// when ownership relationships are clear at an abstract level.
//
// NOT THREADSAFE: This refcounting implementation assumes that an object's references are
// manipulated only in one thread, because atomic (thread-safe) refcounting is surprisingly slow.
//
// In general, abstract classes should _not_ subclass this. The concrete class at the bottom
// of the heirarchy should be the one to decide how it implements refcounting. Interfaces should
......@@ -48,23 +49,24 @@ class Refcounted: private Disposer {
// inefficient.
// 2. An implementation may decide that it would rather return a copy than a refcount, or use
// some other strategy.
//
// TODO(cleanup): Rethink above. Virtual inheritance is not necessarily that bad. OTOH, a
// virtual function call for every refcount is sad in its own way. A Ref<T> type to replace
// Own<T> could also be nice.
public:
virtual ~Refcounted() noexcept(false);
private:
mutable volatile uint refcount = 0;
mutable uint refcount = 0;
// "mutable" because disposeImpl() is const. Bleh.
void disposeImpl(void* pointer) const override;
template <typename T>
static Own<T> addRefInternal(T* object);
bool tryAddRefInternal() const;
template <typename T>
friend Own<T> addRef(T& object);
template <typename T>
friend Maybe<Own<T>> tryAddRef(T& object);
template <typename T, typename... Params>
friend Own<T> refcounted(Params&&... params);
};
......@@ -87,27 +89,10 @@ Own<T> addRef(T& object) {
return Refcounted::addRefInternal(&object);
}
template <typename T>
Maybe<Own<T>> tryAddRef(T& object) {
// Like `addRef`, but if the object's refcount is already zero or if the object was not allocated
// with `refcounted`, returns nullptr. This can be used to implement weak references in a
// thread-safe way: store a (regular, non-owned) pointer to the object, and have the object's
// destructor null out that pointer. To convert the pointer to a full reference, use tryAddRef().
// If it fails, the object is already being destroyed. Be sure to also use some sort of mutex
// locking to synchronize access to the raw pointer, since you'll want the object's destructor
// to block if another thread is currently trying to restore the ref.
if (object.Refcounted::tryAddRefInternal()) {
return Own<T>(&object, kj::implicitCast<const Refcounted&>(object));
} else {
return nullptr;
}
}
template <typename T>
Own<T> Refcounted::addRefInternal(T* object) {
const Refcounted* refcounted = object;
__atomic_add_fetch(&refcounted->refcount, 1, __ATOMIC_RELAXED);
Refcounted* refcounted = object;
++refcounted->refcount;
return Own<T>(object, *refcounted);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment