Commit 5a898e14 authored by gejun's avatar gejun

r35401: Refactored interruptions on bthreads. The semantic is more similar to…

r35401: Refactored interruptions on bthreads. The semantic is more similar to pthread's and EINTR is returned instead of ESTOP.
parent 412143ed
...@@ -45,7 +45,7 @@ BAIDU_CASSERT(sizeof(TaskControl*) == sizeof(butil::atomic<TaskControl*>), atomi ...@@ -45,7 +45,7 @@ BAIDU_CASSERT(sizeof(TaskControl*) == sizeof(butil::atomic<TaskControl*>), atomi
pthread_mutex_t g_task_control_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t g_task_control_mutex = PTHREAD_MUTEX_INITIALIZER;
// Referenced in rpc, needs to be extern. // Referenced in rpc, needs to be extern.
// Notice that we can't declare the variable as atomic<TaskControl*> which // Notice that we can't declare the variable as atomic<TaskControl*> which
// may not initialized before creating bthreads before main(). // are not constructed before main().
TaskControl* g_task_control = NULL; TaskControl* g_task_control = NULL;
extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group; extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
...@@ -106,8 +106,6 @@ start_from_non_worker(bthread_t* __restrict tid, ...@@ -106,8 +106,6 @@ start_from_non_worker(bthread_t* __restrict tid,
tid, attr, fn, arg); tid, attr, fn, arg);
} }
int stop_butex_wait(bthread_t tid);
struct TidTraits { struct TidTraits {
static const size_t BLOCK_SIZE = 63; static const size_t BLOCK_SIZE = 63;
static const size_t MAX_ENTRIES = 65536; static const size_t MAX_ENTRIES = 65536;
...@@ -169,23 +167,17 @@ void bthread_flush() __THROW { ...@@ -169,23 +167,17 @@ void bthread_flush() __THROW {
} }
} }
int bthread_interrupt(bthread_t tid) __THROW {
return bthread::TaskGroup::interrupt(tid, bthread::get_task_control());
}
int bthread_stop(bthread_t tid) __THROW { int bthread_stop(bthread_t tid) __THROW {
if (bthread::stop_butex_wait(tid) < 0) { bthread::TaskGroup::set_stopped(tid);
return errno; return bthread_interrupt(tid);
}
bthread::TaskGroup* g = bthread::tls_task_group;
if (!g) {
bthread::TaskControl* c = bthread::get_or_new_task_control();
if (!c) {
return ENOMEM;
}
g = c->choose_one_group();
}
return g->stop_usleep(tid);
} }
int bthread_stopped(bthread_t tid) __THROW { int bthread_stopped(bthread_t tid) __THROW {
return bthread::TaskGroup::stopped(tid); return (int)bthread::TaskGroup::is_stopped(tid);
} }
bthread_t bthread_self(void) __THROW { bthread_t bthread_self(void) __THROW {
...@@ -319,7 +311,6 @@ int bthread_usleep(uint64_t microseconds) __THROW { ...@@ -319,7 +311,6 @@ int bthread_usleep(uint64_t microseconds) __THROW {
if (NULL != g && !g->is_current_pthread_task()) { if (NULL != g && !g->is_current_pthread_task()) {
return bthread::TaskGroup::usleep(&g, microseconds); return bthread::TaskGroup::usleep(&g, microseconds);
} }
// TODO: return ESTOP for pthread_task
return ::usleep(microseconds); return ::usleep(microseconds);
} }
......
...@@ -33,16 +33,16 @@ ...@@ -33,16 +33,16 @@
__BEGIN_DECLS __BEGIN_DECLS
// Create bthread `fn(arg)' with attributes `attr' and put the identifier into // Create bthread `fn(args)' with attributes `attr' and put the identifier into
// `tid'. Switch to the new thread and schedule old thread to run. Use this // `tid'. Switch to the new thread and schedule old thread to run. Use this
// function when the new thread is more urgent. // function when the new thread is more urgent.
// Returns 0 on success, errno otherwise. // Returns 0 on success, errno otherwise.
extern int bthread_start_urgent(bthread_t* __restrict tid, extern int bthread_start_urgent(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr, const bthread_attr_t* __restrict attr,
void * (*fn)(void*), void * (*fn)(void*),
void* __restrict arg) __THROW; void* __restrict args) __THROW;
// Create bthread `fn(arg)' with attributes `attr' and put the identifier into // Create bthread `fn(args)' with attributes `attr' and put the identifier into
// `tid'. This function behaves closer to pthread_create: after scheduling the // `tid'. This function behaves closer to pthread_create: after scheduling the
// new thread to run, it returns. In another word, the new thread may take // new thread to run, it returns. In another word, the new thread may take
// longer time than bthread_start_urgent() to run. // longer time than bthread_start_urgent() to run.
...@@ -50,16 +50,38 @@ extern int bthread_start_urgent(bthread_t* __restrict tid, ...@@ -50,16 +50,38 @@ extern int bthread_start_urgent(bthread_t* __restrict tid,
extern int bthread_start_background(bthread_t* __restrict tid, extern int bthread_start_background(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr, const bthread_attr_t* __restrict attr,
void * (*fn)(void*), void * (*fn)(void*),
void* __restrict arg) __THROW; void* __restrict args) __THROW;
// Wake up operations blocking the thread. Different functions may behave
// differently:
// bthread_usleep(): returns -1 and sets errno to ESTOP if bthread_stop()
// is called, or to EINTR otherwise.
// butex_wait(): returns -1 and sets errno to EINTR
// bthread_mutex_*lock: unaffected (still blocking)
// bthread_cond_*wait: wakes up and returns 0.
// bthread_*join: unaffected.
// Common usage of interruption is to make a thread to quit ASAP.
// [Thread1] [Thread2]
// set stopping flag
// bthread_interrupt(Thread2)
// wake up
// see the flag and quit
// may block again if the flag is unchanged
// bthread_interrupt() guarantees that Thread2 is woken up reliably no matter
// how the 2 threads are interleaved.
// Returns 0 on success, errno otherwise.
extern int bthread_interrupt(bthread_t tid) __THROW;
// Ask the bthread `tid' to stop. Operations which would suspend the thread // Make bthread_stopped() on the bthread return true and interrupt the bthread.
// except bthread_join will not block, instead they return ESTOP. // Note that current bthread_stop() solely sets the built-in "stop flag" and
// This is a cooperative stopping mechanism. // calls bthread_interrupt(), which is different from earlier versions of
// bthread, and replaceable by user-defined stop flags plus calls to
// bthread_interrupt().
// Returns 0 on success, errno otherwise. // Returns 0 on success, errno otherwise.
extern int bthread_stop(bthread_t tid) __THROW; extern int bthread_stop(bthread_t tid) __THROW;
// Returns 1 iff bthread_stop() was called on the thread or the thread does // Returns 1 iff bthread_stop(tid) was called or the thread does not exist,
// not exist, 0 otherwise. // 0 otherwise.
extern int bthread_stopped(bthread_t tid) __THROW; extern int bthread_stopped(bthread_t tid) __THROW;
// Returns identifier of caller if caller is a bthread, 0 otherwise(Id of a // Returns identifier of caller if caller is a bthread, 0 otherwise(Id of a
...@@ -75,10 +97,12 @@ extern int bthread_equal(bthread_t t1, bthread_t t2) __THROW; ...@@ -75,10 +97,12 @@ extern int bthread_equal(bthread_t t1, bthread_t t2) __THROW;
extern void bthread_exit(void* retval) __attribute__((__noreturn__)); extern void bthread_exit(void* retval) __attribute__((__noreturn__));
// Make calling thread wait for termination of bthread `bt'. Return immediately // Make calling thread wait for termination of bthread `bt'. Return immediately
// if `bt' is already terminated. The exit status of the bthread shall be // if `bt' is already terminated.
// stored in *bthread_return (if it's not NULL), however at present it's // Notes:
// always set to NULL. There's no "detachment" in bthreads, all bthreads are // - All bthreads are "detached" but still joinable.
// "detached" as default and still joinable. // - *bthread_return is always set to null. If you need to return value
// from a bthread, pass the value via the `args' created the bthread.
// - bthread_join() is not affected by bthread_interrupt.
// Returns 0 on success, errno otherwise. // Returns 0 on success, errno otherwise.
extern int bthread_join(bthread_t bt, void** bthread_return) __THROW; extern int bthread_join(bthread_t bt, void** bthread_return) __THROW;
...@@ -125,6 +149,7 @@ extern int bthread_setconcurrency(int num) __THROW; ...@@ -125,6 +149,7 @@ extern int bthread_setconcurrency(int num) __THROW;
extern int bthread_yield(void) __THROW; extern int bthread_yield(void) __THROW;
// Suspend current thread for at least `microseconds' // Suspend current thread for at least `microseconds'
// Interruptible by bthread_interrupt().
extern int bthread_usleep(uint64_t microseconds) __THROW; extern int bthread_usleep(uint64_t microseconds) __THROW;
// --------------------------------------------- // ---------------------------------------------
......
...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
#endif #endif
#include "butil/logging.h" #include "butil/logging.h"
#include "butil/object_pool.h" #include "butil/object_pool.h"
#include "bthread/errno.h" // EWOULDBLOCK, ESTOP #include "bthread/errno.h" // EWOULDBLOCK
#include "bthread/sys_futex.h" // futex_* #include "bthread/sys_futex.h" // futex_*
#include "bthread/processor.h" // cpu_relax #include "bthread/processor.h" // cpu_relax
#include "bthread/task_control.h" // TaskControl #include "bthread/task_control.h" // TaskControl
...@@ -64,10 +64,6 @@ inline bvar::Adder<int64_t>& butex_waiter_count() { ...@@ -64,10 +64,6 @@ inline bvar::Adder<int64_t>& butex_waiter_count() {
} }
#endif #endif
// Implemented in task_group.cpp
int stop_and_consume_butex_waiter(bthread_t tid, ButexWaiter** pw);
int set_butex_waiter(bthread_t tid, ButexWaiter* w);
// If a thread would suspend for less than so many microseconds, return // If a thread would suspend for less than so many microseconds, return
// ETIMEDOUT directly. // ETIMEDOUT directly.
// Use 1: sleeping for less than 2 microsecond is inefficient and useless. // Use 1: sleeping for less than 2 microsecond is inefficient and useless.
...@@ -75,9 +71,10 @@ static const int64_t MIN_SLEEP_US = 2; ...@@ -75,9 +71,10 @@ static const int64_t MIN_SLEEP_US = 2;
enum WaiterState { enum WaiterState {
WAITER_STATE_NONE, WAITER_STATE_NONE,
WAITER_STATE_TIMED, WAITER_STATE_READY,
WAITER_STATE_CANCELLED, WAITER_STATE_TIMEDOUT,
WAITER_STATE_TIMEDOUT WAITER_STATE_UNMATCHEDVALUE,
WAITER_STATE_INTERRUPTED,
}; };
struct Butex; struct Butex;
...@@ -110,11 +107,7 @@ struct ButexPthreadWaiter : public ButexWaiter { ...@@ -110,11 +107,7 @@ struct ButexPthreadWaiter : public ButexWaiter {
typedef butil::LinkedList<ButexWaiter> ButexWaiterList; typedef butil::LinkedList<ButexWaiter> ButexWaiterList;
enum BUTEX_PTHREAD_SIGNAL { enum ButexPthreadSignal { PTHREAD_NOT_SIGNALLED, PTHREAD_SIGNALLED };
NOT_SIGNALLED = 0,
SIGNALLED = 1,
SAFE_TO_DESTROY
};
struct BAIDU_CACHELINE_ALIGNMENT Butex { struct BAIDU_CACHELINE_ALIGNMENT Butex {
Butex() {} Butex() {}
...@@ -128,42 +121,41 @@ struct BAIDU_CACHELINE_ALIGNMENT Butex { ...@@ -128,42 +121,41 @@ struct BAIDU_CACHELINE_ALIGNMENT Butex {
BAIDU_CASSERT(offsetof(Butex, value) == 0, offsetof_value_must_0); BAIDU_CASSERT(offsetof(Butex, value) == 0, offsetof_value_must_0);
BAIDU_CASSERT(sizeof(Butex) == BAIDU_CACHELINE_SIZE, butex_fits_in_one_cacheline); BAIDU_CASSERT(sizeof(Butex) == BAIDU_CACHELINE_SIZE, butex_fits_in_one_cacheline);
void wakeup_pthread(ButexPthreadWaiter* pw) { static void wakeup_pthread(ButexPthreadWaiter* pw) {
// release fence makes wait_pthread see other changes when it sees new sig // release fence makes wait_pthread see changes before wakeup.
pw->sig.store(SAFE_TO_DESTROY, butil::memory_order_release); pw->sig.store(PTHREAD_SIGNALLED, butil::memory_order_release);
// At this point, *pw is possibly destroyed if wait_pthread has woken up and // At this point, wait_pthread() possibly has woken up and destroyed `pw'.
// seen the new sig. As the futex_wake_private just check the accessibility // In which case, futex_wake_private() should return EFAULT.
// of the memory and returnes EFAULT in this case, it's just fine. // If crash happens in future, `pw' can be made TLS and never destroyed
// If crash happens in the future, we can make pw as tls and never // to solve the issue.
// destroyed to resolve this issue.
futex_wake_private(&pw->sig, 1); futex_wake_private(&pw->sig, 1);
} }
bool erase_from_butex(ButexWaiter*, bool); bool erase_from_butex(ButexWaiter*, bool, WaiterState);
int wait_pthread(ButexPthreadWaiter& pw, timespec* ptimeout) { int wait_pthread(ButexPthreadWaiter& pw, timespec* ptimeout) {
int expected_value = NOT_SIGNALLED;
while (true) { while (true) {
const int rc = futex_wait_private(&pw.sig, expected_value, ptimeout); const int rc = futex_wait_private(&pw.sig, PTHREAD_NOT_SIGNALLED, ptimeout);
// Accquire fence makes this thread sees other changes when it sees if (PTHREAD_NOT_SIGNALLED != pw.sig.load(butil::memory_order_acquire)) {
// the new |sig| // If `sig' is changed, wakeup_pthread() must be called and `pw'
if (expected_value != pw.sig.load(butil::memory_order_acquire)) { // is already removed from the butex.
// After this routine returns, |pw| will be destroyed while the wake // Acquire fence makes this thread sees changes before wakeup.
// thread possibly still uses it. See the comments in wakeup_pthread
return rc; return rc;
} }
if (rc != 0 && errno == ETIMEDOUT) { if (rc != 0 && errno == ETIMEDOUT) {
// Remove pw from waiters to make sure no one would wakeup pw after // Note that we don't handle the EINTR from futex_wait here since
// this function returnes. // pthreads waiting on a butex should behave similarly as bthreads
if (!erase_from_butex(&pw, false)) { // which are not able to be woken-up by signals.
// Another thread holds pw, attemping to signal it, spin until // EINTR on butex is only producible by TaskGroup::interrupt().
// it's safe to destroy pw
// Make sure this thread sees the lastest changes when sig is // `pw' is still in the queue, remove it.
// set to SAFE_TO_DESTROY if (!erase_from_butex(&pw, false, WAITER_STATE_TIMEDOUT)) {
BT_LOOP_WHEN(pw.sig.load(butil::memory_order_acquire) // Another thread is erasing `pw' as well, wait for the signal.
!= SAFE_TO_DESTROY, // Acquire fence makes this thread sees changes before wakeup.
30/*nops before sched_yield*/); if (pw.sig.load(butil::memory_order_acquire) == PTHREAD_NOT_SIGNALLED) {
ptimeout = NULL; // already timedout, ptimeout is expired.
continue;
}
} }
return rc; return rc;
} }
...@@ -456,10 +448,15 @@ int butex_requeue(void* arg, void* arg2) { ...@@ -456,10 +448,15 @@ int butex_requeue(void* arg, void* arg2) {
// Callable from multiple threads, at most one thread may wake up the waiter. // Callable from multiple threads, at most one thread may wake up the waiter.
static void erase_from_butex_and_wakeup(void* arg) { static void erase_from_butex_and_wakeup(void* arg) {
erase_from_butex(static_cast<ButexWaiter*>(arg), true); erase_from_butex(static_cast<ButexWaiter*>(arg), true, WAITER_STATE_TIMEDOUT);
}
// Used in task_group.cpp
bool erase_from_butex_because_of_interruption(ButexWaiter* bw) {
return erase_from_butex(bw, true, WAITER_STATE_INTERRUPTED);
} }
inline bool erase_from_butex(ButexWaiter* bw, bool wakeup) { inline bool erase_from_butex(ButexWaiter* bw, bool wakeup, WaiterState state) {
// `bw' is guaranteed to be valid inside this function because waiter // `bw' is guaranteed to be valid inside this function because waiter
// will wait until this function being cancelled or finished. // will wait until this function being cancelled or finished.
// NOTE: This function must be no-op when bw->container is NULL. // NOTE: This function must be no-op when bw->container is NULL.
...@@ -473,7 +470,7 @@ inline bool erase_from_butex(ButexWaiter* bw, bool wakeup) { ...@@ -473,7 +470,7 @@ inline bool erase_from_butex(ButexWaiter* bw, bool wakeup) {
bw->RemoveFromList(); bw->RemoveFromList();
bw->container.store(NULL, butil::memory_order_relaxed); bw->container.store(NULL, butil::memory_order_relaxed);
if (bw->tid) { if (bw->tid) {
static_cast<ButexBthreadWaiter*>(bw)->waiter_state = WAITER_STATE_TIMEDOUT; static_cast<ButexBthreadWaiter*>(bw)->waiter_state = state;
} }
erased = true; erased = true;
break; break;
...@@ -495,7 +492,7 @@ inline bool erase_from_butex(ButexWaiter* bw, bool wakeup) { ...@@ -495,7 +492,7 @@ inline bool erase_from_butex(ButexWaiter* bw, bool wakeup) {
static void wait_for_butex(void* arg) { static void wait_for_butex(void* arg) {
ButexBthreadWaiter* const bw = static_cast<ButexBthreadWaiter*>(arg); ButexBthreadWaiter* const bw = static_cast<ButexBthreadWaiter*>(arg);
Butex* const b = bw->initial_butex; Butex* const b = bw->initial_butex;
// 1: waiter with timeout should have waiter_state == WAITER_STATE_TIMED // 1: waiter with timeout should have waiter_state == WAITER_STATE_READY
// before they're are queued, otherwise the waiter is already timedout // before they're are queued, otherwise the waiter is already timedout
// and removed by TimerThread, in which case we should stop queueing. // and removed by TimerThread, in which case we should stop queueing.
// //
...@@ -511,9 +508,10 @@ static void wait_for_butex(void* arg) { ...@@ -511,9 +508,10 @@ static void wait_for_butex(void* arg) {
// see the correct value. // see the correct value.
{ {
BAIDU_SCOPED_LOCK(b->waiter_lock); BAIDU_SCOPED_LOCK(b->waiter_lock);
if (b->value.load(butil::memory_order_relaxed) == bw->expected_value && if (b->value.load(butil::memory_order_relaxed) != bw->expected_value) {
bw->waiter_state != WAITER_STATE_TIMEDOUT/*1*/ && bw->waiter_state = WAITER_STATE_UNMATCHEDVALUE;
(!bw->task_meta->stop || !bw->task_meta->interruptible)) { } else if (bw->waiter_state == WAITER_STATE_READY/*1*/ &&
!bw->task_meta->interrupted) {
b->waiters.Append(bw); b->waiters.Append(bw);
bw->container.store(b, butil::memory_order_relaxed); bw->container.store(b, butil::memory_order_relaxed);
return; return;
...@@ -521,13 +519,10 @@ static void wait_for_butex(void* arg) { ...@@ -521,13 +519,10 @@ static void wait_for_butex(void* arg) {
} }
// b->container is NULL which makes erase_from_butex_and_wakeup() and // b->container is NULL which makes erase_from_butex_and_wakeup() and
// stop_butex_wait() no-op, there's no race between following code and // TaskGroup::interrupt() no-op, there's no race between following code and
// the two functions. The on-stack ButexBthreadWaiter is safe to use and // the two functions. The on-stack ButexBthreadWaiter is safe to use and
// bw->waiter_state will not change again. // bw->waiter_state will not change again.
unsleep_if_necessary(bw, get_global_timer_thread()); unsleep_if_necessary(bw, get_global_timer_thread());
if (bw->waiter_state != WAITER_STATE_TIMEDOUT) {
bw->waiter_state = WAITER_STATE_CANCELLED;
}
tls_task_group->ready_to_run(bw->tid); tls_task_group->ready_to_run(bw->tid);
// FIXME: jump back to original thread is buggy. // FIXME: jump back to original thread is buggy.
...@@ -559,25 +554,27 @@ static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value, ...@@ -559,25 +554,27 @@ static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value,
} }
TaskMeta* task = NULL; TaskMeta* task = NULL;
bool set_waiter = false;
ButexPthreadWaiter pw; ButexPthreadWaiter pw;
pw.tid = 0; pw.tid = 0;
pw.sig.store(NOT_SIGNALLED, butil::memory_order_relaxed); pw.sig.store(PTHREAD_NOT_SIGNALLED, butil::memory_order_relaxed);
int rc = 0; int rc = 0;
if (g) { if (g) {
task = g->current_task(); task = g->current_task();
if (task->interruptible) { task->current_waiter.store(&pw, butil::memory_order_release);
if (task->stop) {
errno = ESTOP;
return -1;
}
set_waiter = true;
task->current_waiter.store(&pw, butil::memory_order_release);
}
} }
b->waiter_lock.lock(); b->waiter_lock.lock();
if (b->value.load(butil::memory_order_relaxed) == expected_value) { if (b->value.load(butil::memory_order_relaxed) != expected_value) {
b->waiter_lock.unlock();
errno = EWOULDBLOCK;
rc = -1;
} else if (task != NULL && task->interrupted) {
b->waiter_lock.unlock();
// Race with set and may consume multiple interruptions, which are OK.
task->interrupted = false;
errno = EINTR;
rc = -1;
} else {
b->waiters.Append(&pw); b->waiters.Append(&pw);
pw.container.store(b, butil::memory_order_relaxed); pw.container.store(b, butil::memory_order_relaxed);
b->waiter_lock.unlock(); b->waiter_lock.unlock();
...@@ -590,22 +587,19 @@ static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value, ...@@ -590,22 +587,19 @@ static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value,
#ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS #ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS
num_waiters << -1; num_waiters << -1;
#endif #endif
} else {
b->waiter_lock.unlock();
errno = EWOULDBLOCK;
rc = -1;
} }
if (task) { if (task) {
if (set_waiter) { // If current_waiter is NULL, TaskGroup::interrupt() is running and
// If current_waiter is NULL, stop_butex_wait() is running and // using pw, spin until current_waiter != NULL.
// using pw, spin until current_waiter != NULL. BT_LOOP_WHEN(task->current_waiter.exchange(
BT_LOOP_WHEN(task->current_waiter.exchange( NULL, butil::memory_order_acquire) == NULL,
NULL, butil::memory_order_acquire) == NULL, 30/*nops before sched_yield*/);
30/*nops before sched_yield*/); if (task->interrupted) {
} task->interrupted = false;
if (task->stop) { if (rc == 0) {
errno = ESTOP; errno = EINTR;
return -1; return -1;
}
} }
} }
return rc; return rc;
...@@ -630,7 +624,7 @@ int butex_wait(void* arg, int expected_value, const timespec* abstime) { ...@@ -630,7 +624,7 @@ int butex_wait(void* arg, int expected_value, const timespec* abstime) {
bbw.container.store(NULL, butil::memory_order_relaxed); bbw.container.store(NULL, butil::memory_order_relaxed);
bbw.task_meta = g->current_task(); bbw.task_meta = g->current_task();
bbw.sleep_id = 0; bbw.sleep_id = 0;
bbw.waiter_state = WAITER_STATE_NONE; bbw.waiter_state = WAITER_STATE_READY;
bbw.expected_value = expected_value; bbw.expected_value = expected_value;
bbw.initial_butex = b; bbw.initial_butex = b;
bbw.control = g->control(); bbw.control = g->control();
...@@ -638,10 +632,9 @@ int butex_wait(void* arg, int expected_value, const timespec* abstime) { ...@@ -638,10 +632,9 @@ int butex_wait(void* arg, int expected_value, const timespec* abstime) {
if (abstime != NULL) { if (abstime != NULL) {
// Schedule timer before queueing. If the timer is triggered before // Schedule timer before queueing. If the timer is triggered before
// queueing, cancel queueing. This is a kind of optimistic locking. // queueing, cancel queueing. This is a kind of optimistic locking.
bbw.waiter_state = WAITER_STATE_TIMED;
// Already timed out.
if (butil::timespec_to_microseconds(*abstime) < if (butil::timespec_to_microseconds(*abstime) <
(butil::gettimeofday_us() + MIN_SLEEP_US)) { (butil::gettimeofday_us() + MIN_SLEEP_US)) {
// Already timed out.
errno = ETIMEDOUT; errno = ETIMEDOUT;
return -1; return -1;
} }
...@@ -657,8 +650,8 @@ int butex_wait(void* arg, int expected_value, const timespec* abstime) { ...@@ -657,8 +650,8 @@ int butex_wait(void* arg, int expected_value, const timespec* abstime) {
num_waiters << 1; num_waiters << 1;
#endif #endif
// release fence matches with acquire fence in stop_and_consume_butex_waiter // release fence matches with acquire fence in interrupt_and_consume_waiters
// in task_group.cpp to guarantee visibility of `interruptible'. // in task_group.cpp to guarantee visibility of `interrupted'.
bbw.task_meta->current_waiter.store(&bbw, butil::memory_order_release); bbw.task_meta->current_waiter.store(&bbw, butil::memory_order_release);
g->set_remained(wait_for_butex, &bbw); g->set_remained(wait_for_butex, &bbw);
TaskGroup::sched(&g); TaskGroup::sched(&g);
...@@ -668,7 +661,7 @@ int butex_wait(void* arg, int expected_value, const timespec* abstime) { ...@@ -668,7 +661,7 @@ int butex_wait(void* arg, int expected_value, const timespec* abstime) {
BT_LOOP_WHEN(unsleep_if_necessary(&bbw, get_global_timer_thread()) < 0, BT_LOOP_WHEN(unsleep_if_necessary(&bbw, get_global_timer_thread()) < 0,
30/*nops before sched_yield*/); 30/*nops before sched_yield*/);
// If current_waiter is NULL, stop_butex_wait() is running and using bbw. // If current_waiter is NULL, TaskGroup::interrupt() is running and using bbw.
// Spin until current_waiter != NULL. // Spin until current_waiter != NULL.
BT_LOOP_WHEN(bbw.task_meta->current_waiter.exchange( BT_LOOP_WHEN(bbw.task_meta->current_waiter.exchange(
NULL, butil::memory_order_acquire) == NULL, NULL, butil::memory_order_acquire) == NULL,
...@@ -677,53 +670,23 @@ int butex_wait(void* arg, int expected_value, const timespec* abstime) { ...@@ -677,53 +670,23 @@ int butex_wait(void* arg, int expected_value, const timespec* abstime) {
num_waiters << -1; num_waiters << -1;
#endif #endif
// ESTOP has highest priority. bool is_interrupted = false;
if (bbw.task_meta->stop) { if (bbw.task_meta->interrupted) {
errno = ESTOP; // Race with set and may consume multiple interruptions, which are OK.
return -1; bbw.task_meta->interrupted = false;
is_interrupted = true;
} }
// If timed out as well as value unmatched, return ETIMEDOUT. // If timed out as well as value unmatched, return ETIMEDOUT.
if (WAITER_STATE_TIMEDOUT == bbw.waiter_state) { if (WAITER_STATE_TIMEDOUT == bbw.waiter_state) {
errno = ETIMEDOUT; errno = ETIMEDOUT;
return -1; return -1;
} else if (WAITER_STATE_CANCELLED == bbw.waiter_state) { } else if (WAITER_STATE_UNMATCHEDVALUE == bbw.waiter_state) {
errno = EWOULDBLOCK; errno = EWOULDBLOCK;
return -1; return -1;
} } else if (is_interrupted) {
return 0; errno = EINTR;
}
int butex_wait_uninterruptible(void* arg, int expected_value, const timespec* abstime) {
TaskGroup* g = tls_task_group;
TaskMeta* caller = NULL;
bool saved_interruptible = true;
if (NULL != g) {
caller = g->current_task();
saved_interruptible = caller->interruptible;
caller->interruptible = false;
}
const int rc = butex_wait(arg, expected_value, abstime);
if (caller) {
caller->interruptible = saved_interruptible;
}
return rc;
}
int stop_butex_wait(bthread_t tid) {
// Consume current_waiter in the TaskMeta, wake it up then set it back.
ButexWaiter* w = NULL;
if (stop_and_consume_butex_waiter(tid, &w) < 0) {
return -1; return -1;
} }
if (w != NULL) {
erase_from_butex(w, true);
// If butex_wait() already wakes up before we set current_waiter back,
// the function will spin until current_waiter becomes non-NULL.
if (__builtin_expect(set_butex_waiter(tid, w) < 0, 0)) {
LOG(FATAL) << "butex_wait should spin until setting back waiter";
return -1;
}
}
return 0; return 0;
} }
......
...@@ -64,14 +64,9 @@ int butex_requeue(void* butex1, void* butex2); ...@@ -64,14 +64,9 @@ int butex_requeue(void* butex1, void* butex2);
// abstime is not NULL. // abstime is not NULL.
// About |abstime|: // About |abstime|:
// Different from FUTEX_WAIT, butex_wait uses absolute time. // Different from FUTEX_WAIT, butex_wait uses absolute time.
// Returns 0 on success, -1 otherwise and errno is set.
int butex_wait(void* butex, int expected_value, const timespec* abstime); int butex_wait(void* butex, int expected_value, const timespec* abstime);
// Same with butex_wait except that this function cannot be woken up by
// bthread_stop(), although this function still returns -1(ESTOP) after
// wake-up.
int butex_wait_uninterruptible(void* butex, int expected_value,
const timespec* abstime);
} // namespace bthread } // namespace bthread
#endif // BAIDU_BTHREAD_BUTEX_H #endif // BAIDU_BTHREAD_BUTEX_H
...@@ -95,7 +95,18 @@ int bthread_cond_wait(bthread_cond_t* __restrict c, ...@@ -95,7 +95,18 @@ int bthread_cond_wait(bthread_cond_t* __restrict c,
bthread_mutex_unlock(m); bthread_mutex_unlock(m);
int rc1 = 0; int rc1 = 0;
if (bthread::butex_wait(ic->seq, expected_seq, NULL) < 0 && if (bthread::butex_wait(ic->seq, expected_seq, NULL) < 0 &&
errno != EWOULDBLOCK) { errno != EWOULDBLOCK && errno != EINTR/*note*/) {
// EINTR should not be returned by cond_*wait according to docs on
// pthread, however spurious wake-up is OK, just as we do here
// so that users can check flags in the loop often companioning
// with the cond_wait ASAP. For example:
// mutex.lock();
// while (!stop && other-predicates) {
// cond_wait(&mutex);
// }
// mutex.unlock();
// After interruption, above code should wake up from the cond_wait
// soon and check the `stop' flag and other predicates.
rc1 = errno; rc1 = errno;
} }
const int rc2 = bthread_mutex_lock_contended(m); const int rc2 = bthread_mutex_lock_contended(m);
...@@ -118,7 +129,8 @@ int bthread_cond_timedwait(bthread_cond_t* __restrict c, ...@@ -118,7 +129,8 @@ int bthread_cond_timedwait(bthread_cond_t* __restrict c,
bthread_mutex_unlock(m); bthread_mutex_unlock(m);
int rc1 = 0; int rc1 = 0;
if (bthread::butex_wait(ic->seq, expected_seq, abstime) < 0 && if (bthread::butex_wait(ic->seq, expected_seq, abstime) < 0 &&
errno != EWOULDBLOCK) { errno != EWOULDBLOCK && errno != EINTR/*note*/) {
// note: see comments in bthread_cond_wait on EINTR.
rc1 = errno; rc1 = errno;
} }
const int rc2 = bthread_mutex_lock_contended(m); const int rc2 = bthread_mutex_lock_contended(m);
......
...@@ -50,15 +50,18 @@ void CountdownEvent::signal(int sig) { ...@@ -50,15 +50,18 @@ void CountdownEvent::signal(int sig) {
butex_wake_all(saved_butex); butex_wake_all(saved_butex);
} }
void CountdownEvent::wait() { int CountdownEvent::wait() {
_wait_was_invoked = true; _wait_was_invoked = true;
for (;;) { for (;;) {
const int seen_counter = const int seen_counter =
((butil::atomic<int>*)_butex)->load(butil::memory_order_acquire); ((butil::atomic<int>*)_butex)->load(butil::memory_order_acquire);
if (seen_counter <= 0) { if (seen_counter <= 0) {
return; return 0;
}
if (butex_wait(_butex, seen_counter, NULL) < 0 &&
errno != EWOULDBLOCK && errno != EINTR) {
return errno;
} }
butex_wait(_butex, seen_counter, NULL);
} }
} }
...@@ -93,8 +96,8 @@ int CountdownEvent::timed_wait(const timespec& duetime) { ...@@ -93,8 +96,8 @@ int CountdownEvent::timed_wait(const timespec& duetime) {
if (seen_counter <= 0) { if (seen_counter <= 0) {
return 0; return 0;
} }
const int rc = butex_wait(_butex, seen_counter, &duetime); if (butex_wait(_butex, seen_counter, &duetime) < 0 &&
if (rc < 0 && errno != EWOULDBLOCK) { errno != EWOULDBLOCK && errno != EINTR) {
return errno; return errno;
} }
} }
......
...@@ -38,11 +38,14 @@ public: ...@@ -38,11 +38,14 @@ public:
// Decrease the counter by |sig| // Decrease the counter by |sig|
void signal(int sig = 1); void signal(int sig = 1);
// Block current thread until the counter reaches 0 // Block current thread until the counter reaches 0.
void wait(); // Returns 0 on success, error code otherwise.
// This method never returns EINTR.
int wait();
// Block the current thread until the counter reaches 0 or duetime has expired // Block the current thread until the counter reaches 0 or duetime has expired
// Returns 0 on success, ETIMEDOUT otherwise. // Returns 0 on success, error code otherwise. ETIMEDOUT is for timeout.
// This method never returns EINTR.
int timed_wait(const timespec& duetime); int timed_wait(const timespec& duetime);
private: private:
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
// Define errno in bthread/errno.h // Define errno in bthread/errno.h
extern const int ESTOP = -20; extern const int ESTOP = -20;
BAIDU_REGISTER_ERRNO(ESTOP, "the thread is stopping") BAIDU_REGISTER_ERRNO(ESTOP, "The structure is stopping")
extern "C" { extern "C" {
......
...@@ -206,17 +206,19 @@ void ExecutionQueueBase::_on_recycle() { ...@@ -206,17 +206,19 @@ void ExecutionQueueBase::_on_recycle() {
int ExecutionQueueBase::join(uint64_t id) { int ExecutionQueueBase::join(uint64_t id) {
const slot_id_t slot = slot_of_id(id); const slot_id_t slot = slot_of_id(id);
ExecutionQueueBase* const m = butil::address_resource(slot); ExecutionQueueBase* const m = butil::address_resource(slot);
if (BAIDU_LIKELY(m != NULL)) { if (m == NULL) {
int expected = _version_of_id(id); // The queue is not created yet, this join is definitely wrong.
// 1: acquire fence to make the join thread sees the newest changes return EINVAL;
// when it sees the unmatch of _join_butex and id }
while (expected == int expected = _version_of_id(id);
m->_join_butex->load(butil::memory_order_acquire/*1*/)) { // acquire fence makes this thread see changes before changing _join_butex.
butex_wait(m->_join_butex, expected, NULL); while (expected == m->_join_butex->load(butil::memory_order_acquire)) {
if (butex_wait(m->_join_butex, expected, NULL) < 0 &&
errno != EWOULDBLOCK && errno != EINTR) {
return errno;
} }
return 0;
} }
return EINVAL; return 0;
} }
int ExecutionQueueBase::stop() { int ExecutionQueueBase::stop() {
......
...@@ -231,12 +231,11 @@ public: ...@@ -231,12 +231,11 @@ public:
return -1; return -1;
} }
#endif #endif
const int rc = butex_wait(butex, expected_val, abstime); if (butex_wait(butex, expected_val, abstime) < 0 &&
if (rc < 0 && errno == EWOULDBLOCK) { errno != EWOULDBLOCK && errno != EINTR) {
// EpollThread did wake up, there's data. return -1;
return 0;
} }
return rc; return 0;
} }
int fd_close(int fd) { int fd_close(int fd) {
......
...@@ -436,10 +436,9 @@ int bthread_id_lock_and_reset_range_verbose( ...@@ -436,10 +436,9 @@ int bthread_id_lock_and_reset_range_verbose(
uint32_t expected_ver = *butex; uint32_t expected_ver = *butex;
meta->mutex.unlock(); meta->mutex.unlock();
ever_contended = true; ever_contended = true;
if (bthread::butex_wait(butex, expected_ver, NULL) < 0) { if (bthread::butex_wait(butex, expected_ver, NULL) < 0 &&
if (errno != EWOULDBLOCK && errno != ESTOP) { errno != EWOULDBLOCK && errno != EINTR) {
return errno; return errno;
}
} }
meta->mutex.lock(); meta->mutex.lock();
} else { // bthread_id_about_to_destroy was called. } else { // bthread_id_about_to_destroy was called.
...@@ -511,30 +510,25 @@ int bthread_id_join(bthread_id_t id) __THROW { ...@@ -511,30 +510,25 @@ int bthread_id_join(bthread_id_t id) __THROW {
const bthread::IdResourceId slot = bthread::get_slot(id); const bthread::IdResourceId slot = bthread::get_slot(id);
bthread::Id* const meta = address_resource(slot); bthread::Id* const meta = address_resource(slot);
if (!meta) { if (!meta) {
// The id is not created yet, this join is definitely wrong.
return EINVAL; return EINVAL;
} }
const uint32_t id_ver = bthread::get_version(id); const uint32_t id_ver = bthread::get_version(id);
uint32_t* join_butex = meta->join_butex; uint32_t* join_butex = meta->join_butex;
bool stopped = false;
while (1) { while (1) {
meta->mutex.lock(); meta->mutex.lock();
const bool has_ver = meta->has_version(id_ver); const bool has_ver = meta->has_version(id_ver);
const uint32_t expected_ver = *join_butex; const uint32_t expected_ver = *join_butex;
meta->mutex.unlock(); meta->mutex.unlock();
if (has_ver) { if (!has_ver) {
if (bthread::butex_wait(join_butex, expected_ver, NULL) < 0) {
if (errno != EWOULDBLOCK && errno != ESTOP) {
return errno;
}
if (errno == ESTOP) {
stopped = true;
}
}
} else {
break; break;
} }
if (bthread::butex_wait(join_butex, expected_ver, NULL) < 0 &&
errno != EWOULDBLOCK && errno != EINTR) {
return errno;
}
} }
return stopped ? ESTOP : 0; return 0;
} }
int bthread_id_trylock(bthread_id_t id, void** pdata) __THROW { int bthread_id_trylock(bthread_id_t id, void** pdata) __THROW {
......
...@@ -622,8 +622,10 @@ BAIDU_CASSERT(sizeof(unsigned) == sizeof(MutexInternal), ...@@ -622,8 +622,10 @@ BAIDU_CASSERT(sizeof(unsigned) == sizeof(MutexInternal),
inline int mutex_lock_contended(bthread_mutex_t* m) { inline int mutex_lock_contended(bthread_mutex_t* m) {
butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex; butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex;
while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) { while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
if (bthread::butex_wait(whole, BTHREAD_MUTEX_CONTENDED, NULL) < 0 if (bthread::butex_wait(whole, BTHREAD_MUTEX_CONTENDED, NULL) < 0 &&
&& errno != EWOULDBLOCK) { errno != EWOULDBLOCK && errno != EINTR/*note*/) {
// a mutex lock should ignore interrruptions in general since
// user code is unlikely to check the return value.
return errno; return errno;
} }
} }
...@@ -634,8 +636,10 @@ inline int mutex_timedlock_contended( ...@@ -634,8 +636,10 @@ inline int mutex_timedlock_contended(
bthread_mutex_t* m, const struct timespec* __restrict abstime) { bthread_mutex_t* m, const struct timespec* __restrict abstime) {
butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex; butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex;
while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) { while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
if (bthread::butex_wait(whole, BTHREAD_MUTEX_CONTENDED, abstime) < 0 if (bthread::butex_wait(whole, BTHREAD_MUTEX_CONTENDED, abstime) < 0 &&
&& errno != EWOULDBLOCK) { errno != EWOULDBLOCK && errno != EINTR/*note*/) {
// a mutex lock should ignore interrruptions in general since
// user code is unlikely to check the return value.
return errno; return errno;
} }
} }
......
...@@ -83,61 +83,29 @@ int TaskGroup::get_attr(bthread_t tid, bthread_attr_t* out) { ...@@ -83,61 +83,29 @@ int TaskGroup::get_attr(bthread_t tid, bthread_attr_t* out) {
return -1; return -1;
} }
int TaskGroup::stopped(bthread_t tid) { void TaskGroup::set_stopped(bthread_t tid) {
TaskMeta* const m = address_meta(tid); TaskMeta* const m = address_meta(tid);
if (m != NULL) { if (m != NULL) {
const uint32_t given_ver = get_version(tid); const uint32_t given_ver = get_version(tid);
BAIDU_SCOPED_LOCK(m->version_lock); BAIDU_SCOPED_LOCK(m->version_lock);
if (given_ver == *m->version_butex) { if (given_ver == *m->version_butex) {
return (int)m->stop;
}
}
// If the tid does not exist or version does not match, it's intuitive
// to treat the thread as "stopped".
return 1;
}
int stop_and_consume_butex_waiter(
bthread_t tid, ButexWaiter** pw) {
TaskMeta* const m = TaskGroup::address_meta(tid);
if (m != NULL) {
const uint32_t given_ver = get_version(tid);
// stopping bthread is not frequent, locking (a spinlock) is acceptable.
BAIDU_SCOPED_LOCK(m->version_lock);
// make sense when version matches.
if (given_ver == *m->version_butex) {
m->stop = true; m->stop = true;
// acquire fence guarantees visibility of `interruptible'.
ButexWaiter* w =
m->current_waiter.exchange(NULL, butil::memory_order_acquire);
if (w != NULL && !m->interruptible) {
// Set waiter back if the bthread is not interruptible.
m->current_waiter.store(w, butil::memory_order_relaxed);
*pw = NULL;
} else {
*pw = w;
}
return 0;
} }
} }
errno = EINVAL;
return -1;
} }
// called in butex.cpp bool TaskGroup::is_stopped(bthread_t tid) {
int set_butex_waiter(bthread_t tid, ButexWaiter* w) { TaskMeta* const m = address_meta(tid);
TaskMeta* const m = TaskGroup::address_meta(tid);
if (m != NULL) { if (m != NULL) {
const uint32_t given_ver = get_version(tid); const uint32_t given_ver = get_version(tid);
BAIDU_SCOPED_LOCK(m->version_lock); BAIDU_SCOPED_LOCK(m->version_lock);
if (given_ver == *m->version_butex) { if (given_ver == *m->version_butex) {
// Release fence makes m->stop visible to butex_wait return m->stop;
m->current_waiter.store(w, butil::memory_order_release);
return 0;
} }
} }
errno = EINVAL; // If the tid does not exist or version does not match, it's intuitive
return -1; // to treat the thread as "stopped".
return true;
} }
bool TaskGroup::wait_task(bthread_t* tid) { bool TaskGroup::wait_task(bthread_t* tid) {
...@@ -251,7 +219,7 @@ int TaskGroup::init(size_t runqueue_capacity) { ...@@ -251,7 +219,7 @@ int TaskGroup::init(size_t runqueue_capacity) {
return -1; return -1;
} }
m->stop = false; m->stop = false;
m->interruptible = true; m->interrupted = false;
m->about_to_quit = false; m->about_to_quit = false;
m->fn = NULL; m->fn = NULL;
m->arg = NULL; m->arg = NULL;
...@@ -400,7 +368,7 @@ int TaskGroup::start_foreground(TaskGroup** pg, ...@@ -400,7 +368,7 @@ int TaskGroup::start_foreground(TaskGroup** pg,
} }
CHECK(m->current_waiter.load(butil::memory_order_relaxed) == NULL); CHECK(m->current_waiter.load(butil::memory_order_relaxed) == NULL);
m->stop = false; m->stop = false;
m->interruptible = true; m->interrupted = false;
m->about_to_quit = false; m->about_to_quit = false;
m->fn = fn; m->fn = fn;
m->arg = arg; m->arg = arg;
...@@ -455,7 +423,7 @@ int TaskGroup::start_background(bthread_t* __restrict th, ...@@ -455,7 +423,7 @@ int TaskGroup::start_background(bthread_t* __restrict th,
} }
CHECK(m->current_waiter.load(butil::memory_order_relaxed) == NULL); CHECK(m->current_waiter.load(butil::memory_order_relaxed) == NULL);
m->stop = false; m->stop = false;
m->interruptible = true; m->interrupted = false;
m->about_to_quit = false; m->about_to_quit = false;
m->fn = fn; m->fn = fn;
m->arg = arg; m->arg = arg;
...@@ -495,40 +463,26 @@ int TaskGroup::join(bthread_t tid, void** return_value) { ...@@ -495,40 +463,26 @@ int TaskGroup::join(bthread_t tid, void** return_value) {
return EINVAL; return EINVAL;
} }
TaskMeta* m = address_meta(tid); TaskMeta* m = address_meta(tid);
if (__builtin_expect(!m, 0)) { // no bthread used the slot yet. if (__builtin_expect(!m, 0)) {
// The bthread is not created yet, this join is definitely wrong.
return EINVAL;
}
TaskGroup* g = tls_task_group;
if (g != NULL && g->current_tid() == tid) {
// joining self causes indefinite waiting.
return EINVAL; return EINVAL;
} }
int rc = 0;
const uint32_t expected_version = get_version(tid); const uint32_t expected_version = get_version(tid);
if (*m->version_butex == expected_version) { while (*m->version_butex == expected_version) {
TaskGroup* g = tls_task_group; if (butex_wait(m->version_butex, expected_version, NULL) < 0 &&
TaskMeta* caller = NULL; errno != EWOULDBLOCK && errno != EINTR) {
if (g != NULL) { return errno;
if (g->current_tid() == tid) {
// joining self causes indefinite waiting.
return EINVAL;
}
caller = g->current_task();
caller->interruptible = false;
}
rc = butex_wait(m->version_butex, expected_version, NULL);
if (rc < 0) {
if (errno == EWOULDBLOCK) {
// Unmatched version means the thread just terminated.
rc = 0;
} else {
rc = errno;
CHECK_EQ(ESTOP, rc);
}
}
if (caller) {
caller->interruptible = true;
} }
} }
if (return_value) { if (return_value) {
*return_value = NULL; // TODO: save return value *return_value = NULL;
} }
return rc; return 0;
} }
bool TaskGroup::exists(bthread_t tid) { bool TaskGroup::exists(bthread_t tid) {
...@@ -774,39 +728,36 @@ static void ready_to_run_from_timer_thread(void* arg) { ...@@ -774,39 +728,36 @@ static void ready_to_run_from_timer_thread(void* arg) {
e->group->control()->choose_one_group()->ready_to_run_remote(e->tid); e->group->control()->choose_one_group()->ready_to_run_remote(e->tid);
} }
void TaskGroup::_add_sleep_event(void* arg) { void TaskGroup::_add_sleep_event(void* void_args) {
// Must copy SleepArgs. After calling TimerThread::schedule(), previous // Must copy SleepArgs. After calling TimerThread::schedule(), previous
// thread may be stolen by a worker immediately and the on-stack SleepArgs // thread may be stolen by a worker immediately and the on-stack SleepArgs
// will be gone. // will be gone.
SleepArgs e = *static_cast<SleepArgs*>(arg); SleepArgs e = *static_cast<SleepArgs*>(void_args);
TaskGroup* g = e.group; TaskGroup* g = e.group;
TimerThread::TaskId sleep_id; TimerThread::TaskId sleep_id;
sleep_id = get_global_timer_thread()->schedule( sleep_id = get_global_timer_thread()->schedule(
ready_to_run_from_timer_thread, arg, ready_to_run_from_timer_thread, void_args,
butil::microseconds_from_now(e.timeout_us)); butil::microseconds_from_now(e.timeout_us));
if (!sleep_id) { if (!sleep_id) {
// fail to schedule timer, go back to previous thread. // fail to schedule timer, go back to previous thread.
// TODO(gejun): Need error?
g->ready_to_run(e.tid); g->ready_to_run(e.tid);
return; return;
} }
// Set TaskMeta::current_sleep, synchronizing with stop_usleep(). // Set TaskMeta::current_sleep which is for interruption.
const uint32_t given_ver = get_version(e.tid); const uint32_t given_ver = get_version(e.tid);
{ {
BAIDU_SCOPED_LOCK(e.meta->version_lock); BAIDU_SCOPED_LOCK(e.meta->version_lock);
if (given_ver == *e.meta->version_butex && !e.meta->stop) { if (given_ver == *e.meta->version_butex && !e.meta->interrupted) {
e.meta->current_sleep = sleep_id; e.meta->current_sleep = sleep_id;
return; return;
} }
} }
// Fail to set current_sleep when previous thread is stopping or even // The thread is stopped or interrupted.
// stopped(unmatched version). // interrupt() always sees that current_sleep == 0. It will not schedule
// Before above code block, stop_usleep() always sees current_sleep == 0. // the calling thread. The race is between current thread and timer thread.
// It will not schedule previous thread. The race is between current
// thread and timer thread.
if (get_global_timer_thread()->unschedule(sleep_id) == 0) { if (get_global_timer_thread()->unschedule(sleep_id) == 0) {
// added to timer, previous thread may be already woken up by timer and // added to timer, previous thread may be already woken up by timer and
// even stopped. It's safe to schedule previous thread when unschedule() // even stopped. It's safe to schedule previous thread when unschedule()
...@@ -833,33 +784,93 @@ int TaskGroup::usleep(TaskGroup** pg, uint64_t timeout_us) { ...@@ -833,33 +784,93 @@ int TaskGroup::usleep(TaskGroup** pg, uint64_t timeout_us) {
sched(pg); sched(pg);
g = *pg; g = *pg;
e.meta->current_sleep = 0; e.meta->current_sleep = 0;
if (e.meta->stop) { if (e.meta->interrupted) {
errno = ESTOP; // Race with set and may consume multiple interruptions, which are OK.
e.meta->interrupted = false;
// NOTE: setting errno to ESTOP is not necessary from bthread's
// pespective, however many RPC code expects bthread_usleep to set
// errno to ESTOP when the thread is stopping, and print FATAL
// otherwise. To make smooth transitions, ESTOP is still set instead
// of EINTR when the thread is stopping.
errno = (e.meta->stop ? ESTOP : EINTR);
return -1; return -1;
} }
return 0; return 0;
} }
int TaskGroup::stop_usleep(bthread_t tid) { // Defined in butex.cpp
TaskMeta* const m = address_meta(tid); bool erase_from_butex_because_of_interruption(ButexWaiter* bw);
static int interrupt_and_consume_waiters(
bthread_t tid, ButexWaiter** pw, uint64_t* sleep_id) {
TaskMeta* const m = TaskGroup::address_meta(tid);
if (m == NULL) { if (m == NULL) {
return EINVAL; return EINVAL;
} }
// Replace current_sleep of the thread with 0 and set stop to true.
TimerThread::TaskId sleep_id = 0;
const uint32_t given_ver = get_version(tid); const uint32_t given_ver = get_version(tid);
{ BAIDU_SCOPED_LOCK(m->version_lock);
if (given_ver == *m->version_butex) {
*pw = m->current_waiter.exchange(NULL, butil::memory_order_acquire);
*sleep_id = m->current_sleep;
m->current_sleep = 0; // only one stopper gets the sleep_id
m->interrupted = true;
return 0;
}
return EINVAL;
}
static int set_butex_waiter(bthread_t tid, ButexWaiter* w) {
TaskMeta* const m = TaskGroup::address_meta(tid);
if (m != NULL) {
const uint32_t given_ver = get_version(tid);
BAIDU_SCOPED_LOCK(m->version_lock); BAIDU_SCOPED_LOCK(m->version_lock);
if (given_ver == *m->version_butex) { if (given_ver == *m->version_butex) {
m->stop = true; // Release fence makes m->interrupted visible to butex_wait
if (m->interruptible) { m->current_waiter.store(w, butil::memory_order_release);
sleep_id = m->current_sleep; return 0;
m->current_sleep = 0; // only one stopper gets the sleep_id
}
} }
} }
if (sleep_id != 0 && get_global_timer_thread()->unschedule(sleep_id) == 0) { return EINVAL;
ready_to_run_general(tid); }
// The interruption is "persistent" compared to the ones caused by signals,
// namely if a bthread is interrupted when it's not blocked, the interruption
// is still remembered and will be checked at next blocking. This designing
// choice simplifies the implementation and reduces notification loss caused
// by race conditions.
// TODO: bthreads created by BTHREAD_ATTR_PTHREAD blocking on bthread_usleep()
// can't be interrupted.
int TaskGroup::interrupt(bthread_t tid, TaskControl* c) {
// Consume current_waiter in the TaskMeta, wake it up then set it back.
ButexWaiter* w = NULL;
uint64_t sleep_id = 0;
int rc = interrupt_and_consume_waiters(tid, &w, &sleep_id);
if (rc) {
return rc;
}
// a bthread cannot wait on a butex and be sleepy at the same time.
CHECK_NE(!!sleep_id, !!w);
if (w != NULL) {
erase_from_butex_because_of_interruption(w);
// If butex_wait() already wakes up before we set current_waiter back,
// the function will spin until current_waiter becomes non-NULL.
rc = set_butex_waiter(tid, w);
if (rc) {
LOG(FATAL) << "butex_wait should spin until setting back waiter";
return rc;
}
} else if (sleep_id != 0) {
if (get_global_timer_thread()->unschedule(sleep_id) == 0) {
bthread::TaskGroup* g = bthread::tls_task_group;
if (g) {
g->ready_to_run(tid);
} else {
if (!c) {
return EINVAL;
}
c->choose_one_group()->ready_to_run_remote(tid);
}
}
} }
return 0; return 0;
} }
...@@ -880,7 +891,7 @@ void print_task(std::ostream& os, bthread_t tid) { ...@@ -880,7 +891,7 @@ void print_task(std::ostream& os, bthread_t tid) {
const uint32_t given_ver = get_version(tid); const uint32_t given_ver = get_version(tid);
bool matched = false; bool matched = false;
bool stop = false; bool stop = false;
bool interruptible = false; bool interrupted = false;
bool about_to_quit = false; bool about_to_quit = false;
void* (*fn)(void*) = NULL; void* (*fn)(void*) = NULL;
void* arg = NULL; void* arg = NULL;
...@@ -893,7 +904,7 @@ void print_task(std::ostream& os, bthread_t tid) { ...@@ -893,7 +904,7 @@ void print_task(std::ostream& os, bthread_t tid) {
if (given_ver == *m->version_butex) { if (given_ver == *m->version_butex) {
matched = true; matched = true;
stop = m->stop; stop = m->stop;
interruptible = m->interruptible; interrupted = m->interrupted;
about_to_quit = m->about_to_quit; about_to_quit = m->about_to_quit;
fn = m->fn; fn = m->fn;
arg = m->arg; arg = m->arg;
...@@ -907,7 +918,7 @@ void print_task(std::ostream& os, bthread_t tid) { ...@@ -907,7 +918,7 @@ void print_task(std::ostream& os, bthread_t tid) {
os << "bthread=" << tid << " : not exist now"; os << "bthread=" << tid << " : not exist now";
} else { } else {
os << "bthread=" << tid << " :\nstop=" << stop os << "bthread=" << tid << " :\nstop=" << stop
<< "\ninterruptible=" << interruptible << "\ninterrupted=" << interrupted
<< "\nabout_to_quit=" << about_to_quit << "\nabout_to_quit=" << about_to_quit
<< "\nfn=" << (void*)fn << "\nfn=" << (void*)fn
<< "\narg=" << (void*)arg << "\narg=" << (void*)arg
......
...@@ -117,8 +117,9 @@ public: ...@@ -117,8 +117,9 @@ public:
// Returns 0 on success, -1 otherwise and errno is set. // Returns 0 on success, -1 otherwise and errno is set.
static int get_attr(bthread_t tid, bthread_attr_t* attr); static int get_attr(bthread_t tid, bthread_attr_t* attr);
// Returns non-zero the `tid' is stopped, 0 otherwise. // Get/set TaskMeta.stop of the tid.
static int stopped(bthread_t tid); static void set_stopped(bthread_t tid);
static bool is_stopped(bthread_t tid);
// The bthread running run_main_task(); // The bthread running run_main_task();
bthread_t main_tid() const { return _main_tid; } bthread_t main_tid() const { return _main_tid; }
...@@ -163,9 +164,9 @@ public: ...@@ -163,9 +164,9 @@ public:
// Call this instead of delete. // Call this instead of delete.
void destroy_self(); void destroy_self();
// Wake up `tid' if it's sleeping. // Wake up blocking ops in the thread.
// Returns 0 on success, error code otherwise. // Returns 0 on success, errno otherwise.
int stop_usleep(bthread_t tid); static int interrupt(bthread_t tid, TaskControl* c);
// Get the meta associate with the task. // Get the meta associate with the task.
static TaskMeta* address_meta(bthread_t tid); static TaskMeta* address_meta(bthread_t tid);
......
...@@ -49,9 +49,14 @@ struct TaskMeta { ...@@ -49,9 +49,14 @@ struct TaskMeta {
// [Not Reset] // [Not Reset]
butil::atomic<ButexWaiter*> current_waiter; butil::atomic<ButexWaiter*> current_waiter;
uint64_t current_sleep; uint64_t current_sleep;
// A builtin flag to mark if the thread is stopping.
bool stop; bool stop;
bool interruptible;
// The thread is interrupted and should wake up from some blocking ops.
bool interrupted;
// Scheduling of the thread can be delayed.
bool about_to_quit; bool about_to_quit;
// [Not Reset] guarantee visibility of version_butex. // [Not Reset] guarantee visibility of version_butex.
......
...@@ -223,7 +223,7 @@ TEST(ButexTest, stop_after_running) { ...@@ -223,7 +223,7 @@ TEST(ButexTest, stop_after_running) {
const bthread_attr_t attr = const bthread_attr_t attr =
(i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL); (i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
bthread_t th; bthread_t th;
ButexWaitArg arg = { butex, *butex, WAIT_MSEC, ESTOP }; ButexWaitArg arg = { butex, *butex, WAIT_MSEC, EINTR };
tm.start(); tm.start();
ASSERT_EQ(0, bthread_start_urgent(&th, &attr, wait_butex, &arg)); ASSERT_EQ(0, bthread_start_urgent(&th, &attr, wait_butex, &arg));
...@@ -250,7 +250,7 @@ TEST(ButexTest, stop_before_running) { ...@@ -250,7 +250,7 @@ TEST(ButexTest, stop_before_running) {
const bthread_attr_t attr = const bthread_attr_t attr =
(i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL; (i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
bthread_t th; bthread_t th;
ButexWaitArg arg = { butex, *butex, WAIT_MSEC, ESTOP }; ButexWaitArg arg = { butex, *butex, WAIT_MSEC, EINTR };
tm.start(); tm.start();
ASSERT_EQ(0, bthread_start_background(&th, &attr, wait_butex, &arg)); ASSERT_EQ(0, bthread_start_background(&th, &attr, wait_butex, &arg));
...@@ -268,7 +268,7 @@ TEST(ButexTest, stop_before_running) { ...@@ -268,7 +268,7 @@ TEST(ButexTest, stop_before_running) {
} }
void* join_the_waiter(void* arg) { void* join_the_waiter(void* arg) {
EXPECT_EQ(ESTOP, bthread_join((bthread_t)arg, NULL)); EXPECT_EQ(0, bthread_join((bthread_t)arg, NULL));
return NULL; return NULL;
} }
...@@ -277,7 +277,7 @@ TEST(ButexTest, join_cant_be_wakeup) { ...@@ -277,7 +277,7 @@ TEST(ButexTest, join_cant_be_wakeup) {
int* butex = bthread::butex_create_checked<int>(); int* butex = bthread::butex_create_checked<int>();
*butex = 7; *butex = 7;
butil::Timer tm; butil::Timer tm;
ButexWaitArg arg = { butex, *butex, 1000, ESTOP }; ButexWaitArg arg = { butex, *butex, 1000, EINTR };
for (int i = 0; i < 2; ++i) { for (int i = 0; i < 2; ++i) {
const bthread_attr_t attr = const bthread_attr_t attr =
......
...@@ -292,7 +292,7 @@ struct StoppedWaiterArgs { ...@@ -292,7 +292,7 @@ struct StoppedWaiterArgs {
void* stopped_waiter(void* void_arg) { void* stopped_waiter(void* void_arg) {
StoppedWaiterArgs* args = (StoppedWaiterArgs*)void_arg; StoppedWaiterArgs* args = (StoppedWaiterArgs*)void_arg;
args->thread_started = true; args->thread_started = true;
EXPECT_EQ(ESTOP, bthread_id_join(args->id)); EXPECT_EQ(0, bthread_id_join(args->id));
EXPECT_EQ(get_version(args->id) + 4, bthread::id_value(args->id)); EXPECT_EQ(get_version(args->id) + 4, bthread::id_value(args->id));
return NULL; return NULL;
} }
...@@ -312,11 +312,6 @@ TEST(BthreadIdTest, stop_a_wait_after_fight_before_signal) { ...@@ -312,11 +312,6 @@ TEST(BthreadIdTest, stop_a_wait_after_fight_before_signal) {
args[i].thread_started = false; args[i].thread_started = false;
ASSERT_EQ(0, bthread_start_urgent(&th[i], NULL, stopped_waiter, &args[i])); ASSERT_EQ(0, bthread_start_urgent(&th[i], NULL, stopped_waiter, &args[i]));
} }
for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
if (!args[i].thread_started) {
bthread_usleep(1000);
}
}
// stop does not wake up bthread_id_join // stop does not wake up bthread_id_join
for (size_t i = 0; i < ARRAY_SIZE(th); ++i) { for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
bthread_stop(th[i]); bthread_stop(th[i]);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment