// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // bthread - A M:N threading library to make applications more concurrent. // Date: Tue Jul 22 17:30:12 CST 2014 #include "butil/atomicops.h" // butil::atomic #include "butil/scoped_lock.h" // BAIDU_SCOPED_LOCK #include "butil/macros.h" #include "butil/containers/linked_list.h" // LinkNode #ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS #include "butil/memory/singleton_on_pthread_once.h" #endif #include "butil/logging.h" #include "butil/object_pool.h" #include "bthread/errno.h" // EWOULDBLOCK #include "bthread/sys_futex.h" // futex_* #include "bthread/processor.h" // cpu_relax #include "bthread/task_control.h" // TaskControl #include "bthread/task_group.h" // TaskGroup #include "bthread/timer_thread.h" #include "bthread/butex.h" #include "bthread/mutex.h" // This file implements butex.h // Provides futex-like semantics which is sequenced wait and wake operations // and guaranteed visibilities. // // If wait is sequenced before wake: // [thread1] [thread2] // wait() value = new_value // wake() // wait() sees unmatched value(fail to wait), or wake() sees the waiter. // // If wait is sequenced after wake: // [thread1] [thread2] // value = new_value // wake() // wait() // wake() must provide some sort of memory fence to prevent assignment // of value to be reordered after it. Thus the value is visible to wait() // as well. namespace bthread { #ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS struct ButexWaiterCount : public bvar::Adder<int64_t> { ButexWaiterCount() : bvar::Adder<int64_t>("bthread_butex_waiter_count") {} }; inline bvar::Adder<int64_t>& butex_waiter_count() { return *butil::get_leaky_singleton<ButexWaiterCount>(); } #endif // If a thread would suspend for less than so many microseconds, return // ETIMEDOUT directly. // Use 1: sleeping for less than 2 microsecond is inefficient and useless. static const int64_t MIN_SLEEP_US = 2; enum WaiterState { WAITER_STATE_NONE, WAITER_STATE_READY, WAITER_STATE_TIMEDOUT, WAITER_STATE_UNMATCHEDVALUE, WAITER_STATE_INTERRUPTED, }; struct Butex; struct ButexWaiter : public butil::LinkNode<ButexWaiter> { // tids of pthreads are 0 bthread_t tid; // Erasing node from middle of LinkedList is thread-unsafe, we need // to hold its container's lock. butil::atomic<Butex*> container; }; // non_pthread_task allocates this structure on stack and queue it in // Butex::waiters. struct ButexBthreadWaiter : public ButexWaiter { TaskMeta* task_meta; TimerThread::TaskId sleep_id; WaiterState waiter_state; int expected_value; Butex* initial_butex; TaskControl* control; }; // pthread_task or main_task allocates this structure on stack and queue it // in Butex::waiters. struct ButexPthreadWaiter : public ButexWaiter { butil::atomic<int> sig; }; typedef butil::LinkedList<ButexWaiter> ButexWaiterList; enum ButexPthreadSignal { PTHREAD_NOT_SIGNALLED, PTHREAD_SIGNALLED }; struct BAIDU_CACHELINE_ALIGNMENT Butex { Butex() {} ~Butex() {} butil::atomic<int> value; ButexWaiterList waiters; internal::FastPthreadMutex waiter_lock; }; BAIDU_CASSERT(offsetof(Butex, value) == 0, offsetof_value_must_0); BAIDU_CASSERT(sizeof(Butex) == BAIDU_CACHELINE_SIZE, butex_fits_in_one_cacheline); static void wakeup_pthread(ButexPthreadWaiter* pw) { // release fence makes wait_pthread see changes before wakeup. pw->sig.store(PTHREAD_SIGNALLED, butil::memory_order_release); // At this point, wait_pthread() possibly has woken up and destroyed `pw'. // In which case, futex_wake_private() should return EFAULT. // If crash happens in future, `pw' can be made TLS and never destroyed // to solve the issue. futex_wake_private(&pw->sig, 1); } bool erase_from_butex(ButexWaiter*, bool, WaiterState); int wait_pthread(ButexPthreadWaiter& pw, timespec* ptimeout) { while (true) { const int rc = futex_wait_private(&pw.sig, PTHREAD_NOT_SIGNALLED, ptimeout); if (PTHREAD_NOT_SIGNALLED != pw.sig.load(butil::memory_order_acquire)) { // If `sig' is changed, wakeup_pthread() must be called and `pw' // is already removed from the butex. // Acquire fence makes this thread sees changes before wakeup. return rc; } if (rc != 0 && errno == ETIMEDOUT) { // Note that we don't handle the EINTR from futex_wait here since // pthreads waiting on a butex should behave similarly as bthreads // which are not able to be woken-up by signals. // EINTR on butex is only producible by TaskGroup::interrupt(). // `pw' is still in the queue, remove it. if (!erase_from_butex(&pw, false, WAITER_STATE_TIMEDOUT)) { // Another thread is erasing `pw' as well, wait for the signal. // Acquire fence makes this thread sees changes before wakeup. if (pw.sig.load(butil::memory_order_acquire) == PTHREAD_NOT_SIGNALLED) { ptimeout = NULL; // already timedout, ptimeout is expired. continue; } } return rc; } } } extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group; // Returns 0 when no need to unschedule or successfully unscheduled, // -1 otherwise. inline int unsleep_if_necessary(ButexBthreadWaiter* w, TimerThread* timer_thread) { if (!w->sleep_id) { return 0; } if (timer_thread->unschedule(w->sleep_id) > 0) { // the callback is running. return -1; } w->sleep_id = 0; return 0; } // Use ObjectPool(which never frees memory) to solve the race between // butex_wake() and butex_destroy(). The race is as follows: // // class Event { // public: // void wait() { // _mutex.lock(); // if (!_done) { // _cond.wait(&_mutex); // } // _mutex.unlock(); // } // void signal() { // _mutex.lock(); // if (!_done) { // _done = true; // _cond.signal(); // } // _mutex.unlock(); /*1*/ // } // private: // bool _done = false; // Mutex _mutex; // Condition _cond; // }; // // [Thread1] [Thread2] // foo() { // Event event; // pass_to_thread2(&event); ---> event.signal(); // event.wait(); // } <-- event destroyed // // Summary: Thread1 passes a stateful condition to Thread2 and waits until // the condition being signalled, which basically means the associated // job is done and Thread1 can release related resources including the mutex // and condition. The scenario is fine and the code is correct. // The race needs a closer look. The unlock at /*1*/ may have different // implementations, but in which the last step is probably an atomic store // and butex_wake(), like this: // // locked->store(0); // butex_wake(locked); // // The `locked' represents the locking status of the mutex. The issue is that // just after the store(), the mutex is already unlocked and the code in // Event.wait() may successfully grab the lock and go through everything // left and leave foo() function, destroying the mutex and butex, making // the butex_wake(locked) crash. // To solve this issue, one method is to add reference before store and // release the reference after butex_wake. However reference countings need // to be added in nearly every user scenario of butex_wake(), which is very // error-prone. Another method is never freeing butex, with the side effect // that butex_wake() may wake up an unrelated butex(the one reuses the memory) // and cause spurious wakeups. According to our observations, the race is // infrequent, even rare. The extra spurious wakeups should be acceptable. void* butex_create() { Butex* b = butil::get_object<Butex>(); if (b) { return &b->value; } return NULL; } void butex_destroy(void* butex) { if (!butex) { return; } Butex* b = static_cast<Butex*>( container_of(static_cast<butil::atomic<int>*>(butex), Butex, value)); butil::return_object(b); } inline TaskGroup* get_task_group(TaskControl* c) { TaskGroup* g = tls_task_group; return g ? g : c->choose_one_group(); } int butex_wake(void* arg) { Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value); ButexWaiter* front = NULL; { BAIDU_SCOPED_LOCK(b->waiter_lock); if (b->waiters.empty()) { return 0; } front = b->waiters.head()->value(); front->RemoveFromList(); front->container.store(NULL, butil::memory_order_relaxed); } if (front->tid == 0) { wakeup_pthread(static_cast<ButexPthreadWaiter*>(front)); return 1; } ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front); unsleep_if_necessary(bbw, get_global_timer_thread()); TaskGroup* g = tls_task_group; if (g) { TaskGroup::exchange(&g, bbw->tid); } else { bbw->control->choose_one_group()->ready_to_run_remote(bbw->tid); } return 1; } int butex_wake_all(void* arg) { Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value); ButexWaiterList bthread_waiters; ButexWaiterList pthread_waiters; { BAIDU_SCOPED_LOCK(b->waiter_lock); while (!b->waiters.empty()) { ButexWaiter* bw = b->waiters.head()->value(); bw->RemoveFromList(); bw->container.store(NULL, butil::memory_order_relaxed); if (bw->tid) { bthread_waiters.Append(bw); } else { pthread_waiters.Append(bw); } } } int nwakeup = 0; while (!pthread_waiters.empty()) { ButexPthreadWaiter* bw = static_cast<ButexPthreadWaiter*>( pthread_waiters.head()->value()); bw->RemoveFromList(); wakeup_pthread(bw); ++nwakeup; } if (bthread_waiters.empty()) { return nwakeup; } // We will exchange with first waiter in the end. ButexBthreadWaiter* next = static_cast<ButexBthreadWaiter*>( bthread_waiters.head()->value()); next->RemoveFromList(); unsleep_if_necessary(next, get_global_timer_thread()); ++nwakeup; TaskGroup* g = get_task_group(next->control); const int saved_nwakeup = nwakeup; while (!bthread_waiters.empty()) { // pop reversely ButexBthreadWaiter* w = static_cast<ButexBthreadWaiter*>( bthread_waiters.tail()->value()); w->RemoveFromList(); unsleep_if_necessary(w, get_global_timer_thread()); g->ready_to_run_general(w->tid, true); ++nwakeup; } if (saved_nwakeup != nwakeup) { g->flush_nosignal_tasks_general(); } if (g == tls_task_group) { TaskGroup::exchange(&g, next->tid); } else { g->ready_to_run_remote(next->tid); } return nwakeup; } int butex_wake_except(void* arg, bthread_t excluded_bthread) { Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value); ButexWaiterList bthread_waiters; ButexWaiterList pthread_waiters; { ButexWaiter* excluded_waiter = NULL; BAIDU_SCOPED_LOCK(b->waiter_lock); while (!b->waiters.empty()) { ButexWaiter* bw = b->waiters.head()->value(); bw->RemoveFromList(); if (bw->tid) { if (bw->tid != excluded_bthread) { bthread_waiters.Append(bw); bw->container.store(NULL, butil::memory_order_relaxed); } else { excluded_waiter = bw; } } else { bw->container.store(NULL, butil::memory_order_relaxed); pthread_waiters.Append(bw); } } if (excluded_waiter) { b->waiters.Append(excluded_waiter); } } int nwakeup = 0; while (!pthread_waiters.empty()) { ButexPthreadWaiter* bw = static_cast<ButexPthreadWaiter*>( pthread_waiters.head()->value()); bw->RemoveFromList(); wakeup_pthread(bw); ++nwakeup; } if (bthread_waiters.empty()) { return nwakeup; } ButexBthreadWaiter* front = static_cast<ButexBthreadWaiter*>( bthread_waiters.head()->value()); TaskGroup* g = get_task_group(front->control); const int saved_nwakeup = nwakeup; do { // pop reversely ButexBthreadWaiter* w = static_cast<ButexBthreadWaiter*>( bthread_waiters.tail()->value()); w->RemoveFromList(); unsleep_if_necessary(w, get_global_timer_thread()); g->ready_to_run_general(w->tid, true); ++nwakeup; } while (!bthread_waiters.empty()); if (saved_nwakeup != nwakeup) { g->flush_nosignal_tasks_general(); } return nwakeup; } int butex_requeue(void* arg, void* arg2) { Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value); Butex* m = container_of(static_cast<butil::atomic<int>*>(arg2), Butex, value); ButexWaiter* front = NULL; { std::unique_lock<internal::FastPthreadMutex> lck1(b->waiter_lock, std::defer_lock); std::unique_lock<internal::FastPthreadMutex> lck2(m->waiter_lock, std::defer_lock); butil::double_lock(lck1, lck2); if (b->waiters.empty()) { return 0; } front = b->waiters.head()->value(); front->RemoveFromList(); front->container.store(NULL, butil::memory_order_relaxed); while (!b->waiters.empty()) { ButexWaiter* bw = b->waiters.head()->value(); bw->RemoveFromList(); m->waiters.Append(bw); bw->container.store(m, butil::memory_order_relaxed); } } if (front->tid == 0) { // which is a pthread wakeup_pthread(static_cast<ButexPthreadWaiter*>(front)); return 1; } ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front); unsleep_if_necessary(bbw, get_global_timer_thread()); TaskGroup* g = tls_task_group; if (g) { TaskGroup::exchange(&g, front->tid); } else { bbw->control->choose_one_group()->ready_to_run_remote(front->tid); } return 1; } // Callable from multiple threads, at most one thread may wake up the waiter. static void erase_from_butex_and_wakeup(void* arg) { erase_from_butex(static_cast<ButexWaiter*>(arg), true, WAITER_STATE_TIMEDOUT); } // Used in task_group.cpp bool erase_from_butex_because_of_interruption(ButexWaiter* bw) { return erase_from_butex(bw, true, WAITER_STATE_INTERRUPTED); } inline bool erase_from_butex(ButexWaiter* bw, bool wakeup, WaiterState state) { // `bw' is guaranteed to be valid inside this function because waiter // will wait until this function being cancelled or finished. // NOTE: This function must be no-op when bw->container is NULL. bool erased = false; Butex* b; int saved_errno = errno; while ((b = bw->container.load(butil::memory_order_acquire))) { // b can be NULL when the waiter is scheduled but queued. BAIDU_SCOPED_LOCK(b->waiter_lock); if (b == bw->container.load(butil::memory_order_relaxed)) { bw->RemoveFromList(); bw->container.store(NULL, butil::memory_order_relaxed); if (bw->tid) { static_cast<ButexBthreadWaiter*>(bw)->waiter_state = state; } erased = true; break; } } if (erased && wakeup) { if (bw->tid) { ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(bw); get_task_group(bbw->control)->ready_to_run_general(bw->tid); } else { ButexPthreadWaiter* pw = static_cast<ButexPthreadWaiter*>(bw); wakeup_pthread(pw); } } errno = saved_errno; return erased; } static void wait_for_butex(void* arg) { ButexBthreadWaiter* const bw = static_cast<ButexBthreadWaiter*>(arg); Butex* const b = bw->initial_butex; // 1: waiter with timeout should have waiter_state == WAITER_STATE_READY // before they're queued, otherwise the waiter is already timedout // and removed by TimerThread, in which case we should stop queueing. // // Visibility of waiter_state: // [bthread] [TimerThread] // waiter_state = TIMED // tt_lock { add task } // tt_lock { get task } // waiter_lock { waiter_state=TIMEDOUT } // waiter_lock { use waiter_state } // tt_lock represents TimerThread::_mutex. Visibility of waiter_state is // sequenced by two locks, both threads are guaranteed to see the correct // value. { BAIDU_SCOPED_LOCK(b->waiter_lock); if (b->value.load(butil::memory_order_relaxed) != bw->expected_value) { bw->waiter_state = WAITER_STATE_UNMATCHEDVALUE; } else if (bw->waiter_state == WAITER_STATE_READY/*1*/ && !bw->task_meta->interrupted) { b->waiters.Append(bw); bw->container.store(b, butil::memory_order_relaxed); return; } } // b->container is NULL which makes erase_from_butex_and_wakeup() and // TaskGroup::interrupt() no-op, there's no race between following code and // the two functions. The on-stack ButexBthreadWaiter is safe to use and // bw->waiter_state will not change again. unsleep_if_necessary(bw, get_global_timer_thread()); tls_task_group->ready_to_run(bw->tid); // FIXME: jump back to original thread is buggy. // // Value unmatched or waiter is already woken up by TimerThread, jump // // back to original bthread. // TaskGroup* g = tls_task_group; // ReadyToRunArgs args = { g->current_tid(), false }; // g->set_remained(TaskGroup::ready_to_run_in_worker, &args); // // 2: Don't run remained because we're already in a remained function // // otherwise stack may overflow. // TaskGroup::sched_to(&g, bw->tid, false/*2*/); } static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value, const timespec* abstime) { // sys futex needs relative timeout. // Compute diff between abstime and now. timespec* ptimeout = NULL; timespec timeout; if (abstime != NULL) { const int64_t timeout_us = butil::timespec_to_microseconds(*abstime) - butil::gettimeofday_us(); if (timeout_us < MIN_SLEEP_US) { errno = ETIMEDOUT; return -1; } timeout = butil::microseconds_to_timespec(timeout_us); ptimeout = &timeout; } TaskMeta* task = NULL; ButexPthreadWaiter pw; pw.tid = 0; pw.sig.store(PTHREAD_NOT_SIGNALLED, butil::memory_order_relaxed); int rc = 0; if (g) { task = g->current_task(); task->current_waiter.store(&pw, butil::memory_order_release); } b->waiter_lock.lock(); if (b->value.load(butil::memory_order_relaxed) != expected_value) { b->waiter_lock.unlock(); errno = EWOULDBLOCK; rc = -1; } else if (task != NULL && task->interrupted) { b->waiter_lock.unlock(); // Race with set and may consume multiple interruptions, which are OK. task->interrupted = false; errno = EINTR; rc = -1; } else { b->waiters.Append(&pw); pw.container.store(b, butil::memory_order_relaxed); b->waiter_lock.unlock(); #ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS bvar::Adder<int64_t>& num_waiters = butex_waiter_count(); num_waiters << 1; #endif rc = wait_pthread(pw, ptimeout); #ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS num_waiters << -1; #endif } if (task) { // If current_waiter is NULL, TaskGroup::interrupt() is running and // using pw, spin until current_waiter != NULL. BT_LOOP_WHEN(task->current_waiter.exchange( NULL, butil::memory_order_acquire) == NULL, 30/*nops before sched_yield*/); if (task->interrupted) { task->interrupted = false; if (rc == 0) { errno = EINTR; return -1; } } } return rc; } int butex_wait(void* arg, int expected_value, const timespec* abstime) { Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value); if (b->value.load(butil::memory_order_relaxed) != expected_value) { errno = EWOULDBLOCK; // Sometimes we may take actions immediately after unmatched butex, // this fence makes sure that we see changes before changing butex. butil::atomic_thread_fence(butil::memory_order_acquire); return -1; } TaskGroup* g = tls_task_group; if (NULL == g || g->is_current_pthread_task()) { return butex_wait_from_pthread(g, b, expected_value, abstime); } ButexBthreadWaiter bbw; // tid is 0 iff the thread is non-bthread bbw.tid = g->current_tid(); bbw.container.store(NULL, butil::memory_order_relaxed); bbw.task_meta = g->current_task(); bbw.sleep_id = 0; bbw.waiter_state = WAITER_STATE_READY; bbw.expected_value = expected_value; bbw.initial_butex = b; bbw.control = g->control(); if (abstime != NULL) { // Schedule timer before queueing. If the timer is triggered before // queueing, cancel queueing. This is a kind of optimistic locking. if (butil::timespec_to_microseconds(*abstime) < (butil::gettimeofday_us() + MIN_SLEEP_US)) { // Already timed out. errno = ETIMEDOUT; return -1; } bbw.sleep_id = get_global_timer_thread()->schedule( erase_from_butex_and_wakeup, &bbw, *abstime); if (!bbw.sleep_id) { // TimerThread stopped. errno = ESTOP; return -1; } } #ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS bvar::Adder<int64_t>& num_waiters = butex_waiter_count(); num_waiters << 1; #endif // release fence matches with acquire fence in interrupt_and_consume_waiters // in task_group.cpp to guarantee visibility of `interrupted'. bbw.task_meta->current_waiter.store(&bbw, butil::memory_order_release); g->set_remained(wait_for_butex, &bbw); TaskGroup::sched(&g); // erase_from_butex_and_wakeup (called by TimerThread) is possibly still // running and using bbw. The chance is small, just spin until it's done. BT_LOOP_WHEN(unsleep_if_necessary(&bbw, get_global_timer_thread()) < 0, 30/*nops before sched_yield*/); // If current_waiter is NULL, TaskGroup::interrupt() is running and using bbw. // Spin until current_waiter != NULL. BT_LOOP_WHEN(bbw.task_meta->current_waiter.exchange( NULL, butil::memory_order_acquire) == NULL, 30/*nops before sched_yield*/); #ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS num_waiters << -1; #endif bool is_interrupted = false; if (bbw.task_meta->interrupted) { // Race with set and may consume multiple interruptions, which are OK. bbw.task_meta->interrupted = false; is_interrupted = true; } // If timed out as well as value unmatched, return ETIMEDOUT. if (WAITER_STATE_TIMEDOUT == bbw.waiter_state) { errno = ETIMEDOUT; return -1; } else if (WAITER_STATE_UNMATCHEDVALUE == bbw.waiter_state) { errno = EWOULDBLOCK; return -1; } else if (is_interrupted) { errno = EINTR; return -1; } return 0; } } // namespace bthread namespace butil { template <> struct ObjectPoolBlockMaxItem<bthread::Butex> { static const size_t value = 128; }; }