butex.cpp 24.3 KB
Newer Older
gejun's avatar
gejun committed
1
// bthread - A M:N threading library to make applications more concurrent.
gejun's avatar
gejun committed
2
// Copyright (c) 2014 Baidu, Inc.
gejun's avatar
gejun committed
3 4 5 6 7 8 9 10 11 12 13 14
// 
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// 
//     http://www.apache.org/licenses/LICENSE-2.0
// 
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
gejun's avatar
gejun committed
15 16 17 18

// Author: Ge,Jun (gejun@baidu.com)
// Date: Tue Jul 22 17:30:12 CST 2014

19 20 21 22
#include "butil/atomicops.h"                // butil::atomic
#include "butil/scoped_lock.h"              // BAIDU_SCOPED_LOCK
#include "butil/macros.h"
#include "butil/containers/linked_list.h"   // LinkNode
23
#ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS
24
#include "butil/memory/singleton_on_pthread_once.h"
25
#endif
26 27
#include "butil/logging.h"
#include "butil/object_pool.h"
28
#include "bthread/errno.h"                 // EWOULDBLOCK
gejun's avatar
gejun committed
29 30 31 32 33 34
#include "bthread/sys_futex.h"             // futex_*
#include "bthread/processor.h"             // cpu_relax
#include "bthread/task_control.h"          // TaskControl
#include "bthread/task_group.h"            // TaskGroup
#include "bthread/timer_thread.h"
#include "bthread/butex.h"
35
#include "bthread/mutex.h"
gejun's avatar
gejun committed
36 37

// This file implements butex.h
38 39
// Provides futex-like semantics which is sequenced wait and wake operations
// and guaranteed visibilities.
gejun's avatar
gejun committed
40 41
//
// If wait is sequenced before wake:
42
//    [thread1]             [thread2]
gejun's avatar
gejun committed
43 44 45 46 47
//    wait()                value = new_value
//                          wake()
// wait() sees unmatched value(fail to wait), or wake() sees the waiter.
//
// If wait is sequenced after wake:
48
//    [thread1]             [thread2]
gejun's avatar
gejun committed
49 50 51
//                          value = new_value
//                          wake()
//    wait()
52
// wake() must provide some sort of memory fence to prevent assignment
gejun's avatar
gejun committed
53 54 55 56 57
// of value to be reordered after it. Thus the value is visible to wait()
// as well.

namespace bthread {

gejun's avatar
gejun committed
58
#ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS
gejun's avatar
gejun committed
59 60 61 62
struct ButexWaiterCount : public bvar::Adder<int64_t> {
    ButexWaiterCount() : bvar::Adder<int64_t>("bthread_butex_waiter_count") {}
};
inline bvar::Adder<int64_t>& butex_waiter_count() {
63
    return *butil::get_leaky_singleton<ButexWaiterCount>();
gejun's avatar
gejun committed
64
}
gejun's avatar
gejun committed
65
#endif
gejun's avatar
gejun committed
66 67 68

// If a thread would suspend for less than so many microseconds, return
// ETIMEDOUT directly.
69 70
// Use 1: sleeping for less than 2 microsecond is inefficient and useless.
static const int64_t MIN_SLEEP_US = 2; 
gejun's avatar
gejun committed
71 72 73

enum WaiterState {
    WAITER_STATE_NONE,
74 75 76 77
    WAITER_STATE_READY,
    WAITER_STATE_TIMEDOUT,
    WAITER_STATE_UNMATCHEDVALUE,
    WAITER_STATE_INTERRUPTED,
gejun's avatar
gejun committed
78 79 80 81
};

struct Butex;

82
struct ButexWaiter : public butil::LinkNode<ButexWaiter> {
gejun's avatar
gejun committed
83 84 85
    // tids of pthreads are 0
    bthread_t tid;

86 87
    // Erasing node from middle of LinkedList is thread-unsafe, we need
    // to hold its container's lock.
88
    butil::atomic<Butex*> container;
gejun's avatar
gejun committed
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
};

// non_pthread_task allocates this structure on stack and queue it in
// Butex::waiters.
struct ButexBthreadWaiter : public ButexWaiter {
    TaskMeta* task_meta;
    TimerThread::TaskId sleep_id;
    WaiterState waiter_state;
    int expected_value;
    Butex* initial_butex;
    TaskControl* control;
};

// pthread_task or main_task allocates this structure on stack and queue it
// in Butex::waiters.
struct ButexPthreadWaiter : public ButexWaiter {
105
    butil::atomic<int> sig;
gejun's avatar
gejun committed
106 107
};

108
typedef butil::LinkedList<ButexWaiter> ButexWaiterList;
gejun's avatar
gejun committed
109

110
enum ButexPthreadSignal { PTHREAD_NOT_SIGNALLED, PTHREAD_SIGNALLED };
gejun's avatar
gejun committed
111

gejun's avatar
gejun committed
112 113
struct BAIDU_CACHELINE_ALIGNMENT Butex {
    Butex() {}
gejun's avatar
gejun committed
114 115
    ~Butex() {}

116
    butil::atomic<int> value;
gejun's avatar
gejun committed
117
    ButexWaiterList waiters;
118
    internal::FastPthreadMutex waiter_lock;
gejun's avatar
gejun committed
119 120 121
};

BAIDU_CASSERT(offsetof(Butex, value) == 0, offsetof_value_must_0);
gejun's avatar
gejun committed
122
BAIDU_CASSERT(sizeof(Butex) == BAIDU_CACHELINE_SIZE, butex_fits_in_one_cacheline);
gejun's avatar
gejun committed
123

124 125 126 127 128 129 130
static void wakeup_pthread(ButexPthreadWaiter* pw) {
    // release fence makes wait_pthread see changes before wakeup.
    pw->sig.store(PTHREAD_SIGNALLED, butil::memory_order_release);
    // At this point, wait_pthread() possibly has woken up and destroyed `pw'.
    // In which case, futex_wake_private() should return EFAULT.
    // If crash happens in future, `pw' can be made TLS and never destroyed
    // to solve the issue.
gejun's avatar
gejun committed
131 132 133
    futex_wake_private(&pw->sig, 1);
}

134
bool erase_from_butex(ButexWaiter*, bool, WaiterState);
gejun's avatar
gejun committed
135 136 137

int wait_pthread(ButexPthreadWaiter& pw, timespec* ptimeout) {
    while (true) {
138 139 140 141 142
        const int rc = futex_wait_private(&pw.sig, PTHREAD_NOT_SIGNALLED, ptimeout);
        if (PTHREAD_NOT_SIGNALLED != pw.sig.load(butil::memory_order_acquire)) {
            // If `sig' is changed, wakeup_pthread() must be called and `pw'
            // is already removed from the butex.
            // Acquire fence makes this thread sees changes before wakeup.
gejun's avatar
gejun committed
143 144 145
            return rc;
        }
        if (rc != 0 && errno == ETIMEDOUT) {
146 147 148 149 150 151 152 153 154 155 156 157 158
            // Note that we don't handle the EINTR from futex_wait here since
            // pthreads waiting on a butex should behave similarly as bthreads
            // which are not able to be woken-up by signals.
            // EINTR on butex is only producible by TaskGroup::interrupt().

            // `pw' is still in the queue, remove it.
            if (!erase_from_butex(&pw, false, WAITER_STATE_TIMEDOUT)) {
                // Another thread is erasing `pw' as well, wait for the signal.
                // Acquire fence makes this thread sees changes before wakeup.
                if (pw.sig.load(butil::memory_order_acquire) == PTHREAD_NOT_SIGNALLED) {
                    ptimeout = NULL; // already timedout, ptimeout is expired.
                    continue;
                }
gejun's avatar
gejun committed
159 160 161 162 163 164 165 166 167 168
            }
            return rc;
        }
    }
}

extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;

// Returns 0 when no need to unschedule or successfully unscheduled,
// -1 otherwise.
169 170
inline int unsleep_if_necessary(ButexBthreadWaiter* w,
                                TimerThread* timer_thread) {
gejun's avatar
gejun committed
171 172 173 174 175 176 177 178 179 180 181
    if (!w->sleep_id) {
        return 0;
    }
    if (timer_thread->unschedule(w->sleep_id) > 0) {
        // the callback is running.
        return -1;
    }
    w->sleep_id = 0;
    return 0;
}

gejun's avatar
gejun committed
182
// Use ObjectPool(which never frees memory) to solve the race between
gejun's avatar
gejun committed
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
// butex_wake() and butex_destroy(). The race is as follows:
//
//   class Event {
//   public:
//     void wait() {
//       _mutex.lock();
//       if (!_done) {
//         _cond.wait(&_mutex);
//       }
//       _mutex.unlock();
//     }
//     void signal() {
//       _mutex.lock();
//       if (!_done) {
//         _done = true;
//         _cond.signal();
//       }
//       _mutex.unlock();  /*1*/
//     }
//   private:
//     bool _done = false;
//     Mutex _mutex;
//     Condition _cond;
//   };
//
//   [Thread1]                         [Thread2]
//   foo() {
//     Event event;
//     pass_to_thread2(&event);  --->  event.signal();
//     event.wait();
//   } <-- event destroyed
//   
gejun's avatar
gejun committed
215 216
// Summary: Thread1 passes a stateful condition to Thread2 and waits until
// the condition being signalled, which basically means the associated
gejun's avatar
gejun committed
217
// job is done and Thread1 can release related resources including the mutex
gejun's avatar
gejun committed
218 219 220 221
// and condition. The scenario is fine and the code is correct.
// The race needs a closer look. The unlock at /*1*/ may have different 
// implementations, but in which the last step is probably an atomic store
// and butex_wake(), like this:
gejun's avatar
gejun committed
222 223 224 225
//
//   locked->store(0);
//   butex_wake(locked);
//
gejun's avatar
gejun committed
226 227
// The `locked' represents the locking status of the mutex. The issue is that
// just after the store(), the mutex is already unlocked and the code in
gejun's avatar
gejun committed
228
// Event.wait() may successfully grab the lock and go through everything
gejun's avatar
gejun committed
229 230
// left and leave foo() function, destroying the mutex and butex, making
// the butex_wake(locked) crash.
gejun's avatar
gejun committed
231
// To solve this issue, one method is to add reference before store and
gejun's avatar
gejun committed
232 233 234 235 236 237
// release the reference after butex_wake. However reference countings need
// to be added in nearly every user scenario of butex_wake(), which is very
// error-prone. Another method is never freeing butex, with the side effect 
// that butex_wake() may wake up an unrelated butex(the one reuses the memory)
// and cause spurious wakeups. According to our observations, the race is 
// infrequent, even rare. The extra spurious wakeups should be acceptable.
gejun's avatar
gejun committed
238 239

void* butex_create() {
240
    Butex* b = butil::get_object<Butex>();
gejun's avatar
gejun committed
241
    if (b) {
gejun's avatar
gejun committed
242 243 244 245 246 247
        return &b->value;
    }
    return NULL;
}

void butex_destroy(void* butex) {
gejun's avatar
gejun committed
248 249
    if (!butex) {
        return;
gejun's avatar
gejun committed
250
    }
gejun's avatar
gejun committed
251
    Butex* b = static_cast<Butex*>(
252 253
        container_of(static_cast<butil::atomic<int>*>(butex), Butex, value));
    butil::return_object(b);
gejun's avatar
gejun committed
254 255 256 257 258 259 260 261
}

inline TaskGroup* get_task_group(TaskControl* c) {
    TaskGroup* g = tls_task_group;
    return g ? g : c->choose_one_group();
}

int butex_wake(void* arg) {
262
    Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
gejun's avatar
gejun committed
263 264 265 266 267 268 269 270
    ButexWaiter* front = NULL;
    {
        BAIDU_SCOPED_LOCK(b->waiter_lock);
        if (b->waiters.empty()) {
            return 0;
        }
        front = b->waiters.head()->value();
        front->RemoveFromList();
271
        front->container.store(NULL, butil::memory_order_relaxed);
gejun's avatar
gejun committed
272 273 274 275 276 277 278 279 280 281 282
    }
    if (front->tid == 0) {
        wakeup_pthread(static_cast<ButexPthreadWaiter*>(front));
        return 1;
    }
    ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front);
    unsleep_if_necessary(bbw, get_global_timer_thread());
    TaskGroup* g = tls_task_group;
    if (g) {
        TaskGroup::exchange(&g, bbw->tid);
    } else {
283
        bbw->control->choose_one_group()->ready_to_run_remote(bbw->tid);
gejun's avatar
gejun committed
284 285 286 287
    }
    return 1;
}

gejun's avatar
gejun committed
288
int butex_wake_all(void* arg) {
289
    Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
gejun's avatar
gejun committed
290 291 292 293 294 295 296 297

    ButexWaiterList bthread_waiters;
    ButexWaiterList pthread_waiters;
    {
        BAIDU_SCOPED_LOCK(b->waiter_lock);
        while (!b->waiters.empty()) {
            ButexWaiter* bw = b->waiters.head()->value();
            bw->RemoveFromList();
298
            bw->container.store(NULL, butil::memory_order_relaxed);
gejun's avatar
gejun committed
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331
            if (bw->tid) {
                bthread_waiters.Append(bw);
            } else {
                pthread_waiters.Append(bw);
            }
        }
    }

    int nwakeup = 0;
    while (!pthread_waiters.empty()) {
        ButexPthreadWaiter* bw = static_cast<ButexPthreadWaiter*>(
            pthread_waiters.head()->value());
        bw->RemoveFromList();
        wakeup_pthread(bw);
        ++nwakeup;
    }
    if (bthread_waiters.empty()) {
        return nwakeup;
    }
    // We will exchange with first waiter in the end.
    ButexBthreadWaiter* next = static_cast<ButexBthreadWaiter*>(
        bthread_waiters.head()->value());
    next->RemoveFromList();
    unsleep_if_necessary(next, get_global_timer_thread());
    ++nwakeup;
    TaskGroup* g = get_task_group(next->control);
    const int saved_nwakeup = nwakeup;
    while (!bthread_waiters.empty()) {
        // pop reversely
        ButexBthreadWaiter* w = static_cast<ButexBthreadWaiter*>(
            bthread_waiters.tail()->value());
        w->RemoveFromList();
        unsleep_if_necessary(w, get_global_timer_thread());
332
        g->ready_to_run_general(w->tid, true);
gejun's avatar
gejun committed
333 334 335
        ++nwakeup;
    }
    if (saved_nwakeup != nwakeup) {
336
        g->flush_nosignal_tasks_general();
gejun's avatar
gejun committed
337 338 339 340
    }
    if (g == tls_task_group) {
        TaskGroup::exchange(&g, next->tid);
    } else {
341
        g->ready_to_run_remote(next->tid);
gejun's avatar
gejun committed
342 343 344 345 346
    }
    return nwakeup;
}

int butex_wake_except(void* arg, bthread_t excluded_bthread) {
347
    Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
gejun's avatar
gejun committed
348 349 350 351 352 353 354 355 356 357 358 359 360

    ButexWaiterList bthread_waiters;
    ButexWaiterList pthread_waiters;
    {
        ButexWaiter* excluded_waiter = NULL;
        BAIDU_SCOPED_LOCK(b->waiter_lock);
        while (!b->waiters.empty()) {
            ButexWaiter* bw = b->waiters.head()->value();
            bw->RemoveFromList();

            if (bw->tid) {
                if (bw->tid != excluded_bthread) {
                    bthread_waiters.Append(bw);
361
                    bw->container.store(NULL, butil::memory_order_relaxed);
gejun's avatar
gejun committed
362 363 364 365
                } else {
                    excluded_waiter = bw;
                }
            } else {
366
                bw->container.store(NULL, butil::memory_order_relaxed);
gejun's avatar
gejun committed
367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398
                pthread_waiters.Append(bw);
            }
        }

        if (excluded_waiter) {
            b->waiters.Append(excluded_waiter);
        }
    }

    int nwakeup = 0;
    while (!pthread_waiters.empty()) {
        ButexPthreadWaiter* bw = static_cast<ButexPthreadWaiter*>(
            pthread_waiters.head()->value());
        bw->RemoveFromList();
        wakeup_pthread(bw);
        ++nwakeup;
    }

    if (bthread_waiters.empty()) {
        return nwakeup;
    }
    ButexBthreadWaiter* front = static_cast<ButexBthreadWaiter*>(
                bthread_waiters.head()->value());

    TaskGroup* g = get_task_group(front->control);
    const int saved_nwakeup = nwakeup;
    do {
        // pop reversely
        ButexBthreadWaiter* w = static_cast<ButexBthreadWaiter*>(
            bthread_waiters.tail()->value());
        w->RemoveFromList();
        unsleep_if_necessary(w, get_global_timer_thread());
399
        g->ready_to_run_general(w->tid, true);
gejun's avatar
gejun committed
400 401 402
        ++nwakeup;
    } while (!bthread_waiters.empty());
    if (saved_nwakeup != nwakeup) {
403
        g->flush_nosignal_tasks_general();
gejun's avatar
gejun committed
404 405 406 407 408
    }
    return nwakeup;
}

int butex_requeue(void* arg, void* arg2) {
409 410
    Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
    Butex* m = container_of(static_cast<butil::atomic<int>*>(arg2), Butex, value);
gejun's avatar
gejun committed
411 412 413

    ButexWaiter* front = NULL;
    {
414 415
        std::unique_lock<internal::FastPthreadMutex> lck1(b->waiter_lock, std::defer_lock);
        std::unique_lock<internal::FastPthreadMutex> lck2(m->waiter_lock, std::defer_lock);
416
        butil::double_lock(lck1, lck2);
gejun's avatar
gejun committed
417 418 419 420 421 422
        if (b->waiters.empty()) {
            return 0;
        }

        front = b->waiters.head()->value();
        front->RemoveFromList();
423
        front->container.store(NULL, butil::memory_order_relaxed);
gejun's avatar
gejun committed
424 425 426 427 428

        while (!b->waiters.empty()) {
            ButexWaiter* bw = b->waiters.head()->value();
            bw->RemoveFromList();
            m->waiters.Append(bw);
429
            bw->container.store(m, butil::memory_order_relaxed);
gejun's avatar
gejun committed
430 431 432 433 434 435 436 437 438 439 440 441 442
        }
    }

    if (front->tid == 0) {  // which is a pthread
        wakeup_pthread(static_cast<ButexPthreadWaiter*>(front));
        return 1;
    }
    ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front);
    unsleep_if_necessary(bbw, get_global_timer_thread());
    TaskGroup* g = tls_task_group;
    if (g) {
        TaskGroup::exchange(&g, front->tid);
    } else {
443
        bbw->control->choose_one_group()->ready_to_run_remote(front->tid);
gejun's avatar
gejun committed
444 445 446 447 448 449
    }
    return 1;
}

// Callable from multiple threads, at most one thread may wake up the waiter.
static void erase_from_butex_and_wakeup(void* arg) {
450 451 452 453 454 455
    erase_from_butex(static_cast<ButexWaiter*>(arg), true, WAITER_STATE_TIMEDOUT);
}

// Used in task_group.cpp
bool erase_from_butex_because_of_interruption(ButexWaiter* bw) {
    return erase_from_butex(bw, true, WAITER_STATE_INTERRUPTED);
gejun's avatar
gejun committed
456 457
}

458
inline bool erase_from_butex(ButexWaiter* bw, bool wakeup, WaiterState state) {
gejun's avatar
gejun committed
459
    // `bw' is guaranteed to be valid inside this function because waiter
460
    // will wait until this function being cancelled or finished.
gejun's avatar
gejun committed
461 462 463 464
    // NOTE: This function must be no-op when bw->container is NULL.
    bool erased = false;
    Butex* b;
    int saved_errno = errno;
465
    while ((b = bw->container.load(butil::memory_order_acquire))) {
gejun's avatar
gejun committed
466 467
        // b can be NULL when the waiter is scheduled but queued.
        BAIDU_SCOPED_LOCK(b->waiter_lock);
468
        if (b == bw->container.load(butil::memory_order_relaxed)) {
gejun's avatar
gejun committed
469
            bw->RemoveFromList();
470
            bw->container.store(NULL, butil::memory_order_relaxed);
gejun's avatar
gejun committed
471
            if (bw->tid) {
472
                static_cast<ButexBthreadWaiter*>(bw)->waiter_state = state;
gejun's avatar
gejun committed
473 474 475 476 477 478 479 480
            }
            erased = true;
            break;
        }
    }
    if (erased && wakeup) {
        if (bw->tid) {
            ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(bw);
481
            get_task_group(bbw->control)->ready_to_run_general(bw->tid);
gejun's avatar
gejun committed
482 483 484 485 486 487 488 489 490 491 492 493
        } else {
            ButexPthreadWaiter* pw = static_cast<ButexPthreadWaiter*>(bw);
            wakeup_pthread(pw);
        }
    }
    errno = saved_errno;
    return erased;
}

static void wait_for_butex(void* arg) {
    ButexBthreadWaiter* const bw = static_cast<ButexBthreadWaiter*>(arg);
    Butex* const b = bw->initial_butex;
494
    // 1: waiter with timeout should have waiter_state == WAITER_STATE_READY
gejun's avatar
gejun committed
495
    //    before they're queued, otherwise the waiter is already timedout
gejun's avatar
gejun committed
496 497 498
    //    and removed by TimerThread, in which case we should stop queueing.
    //
    // Visibility of waiter_state:
499
    //    [bthread]                         [TimerThread]
gejun's avatar
gejun committed
500 501 502 503 504
    //    waiter_state = TIMED
    //    tt_lock { add task }
    //                                      tt_lock { get task }
    //                                      waiter_lock { waiter_state=TIMEDOUT }
    //    waiter_lock { use waiter_state }
gejun's avatar
gejun committed
505 506 507
    // tt_lock represents TimerThread::_mutex. Visibility of waiter_state is
    // sequenced by two locks, both threads are guaranteed to see the correct
    // value.
gejun's avatar
gejun committed
508 509
    {
        BAIDU_SCOPED_LOCK(b->waiter_lock);
510 511 512 513
        if (b->value.load(butil::memory_order_relaxed) != bw->expected_value) {
            bw->waiter_state = WAITER_STATE_UNMATCHEDVALUE;
        } else if (bw->waiter_state == WAITER_STATE_READY/*1*/ &&
                   !bw->task_meta->interrupted) {
gejun's avatar
gejun committed
514
            b->waiters.Append(bw);
515
            bw->container.store(b, butil::memory_order_relaxed);
gejun's avatar
gejun committed
516 517 518 519 520
            return;
        }
    }
    
    // b->container is NULL which makes erase_from_butex_and_wakeup() and
521
    // TaskGroup::interrupt() no-op, there's no race between following code and
gejun's avatar
gejun committed
522 523 524 525 526 527 528 529 530
    // the two functions. The on-stack ButexBthreadWaiter is safe to use and
    // bw->waiter_state will not change again.
    unsleep_if_necessary(bw, get_global_timer_thread());
    tls_task_group->ready_to_run(bw->tid);
    // FIXME: jump back to original thread is buggy.
    
    // // Value unmatched or waiter is already woken up by TimerThread, jump
    // // back to original bthread.
    // TaskGroup* g = tls_task_group;
531 532
    // ReadyToRunArgs args = { g->current_tid(), false };
    // g->set_remained(TaskGroup::ready_to_run_in_worker, &args);
gejun's avatar
gejun committed
533 534 535 536 537 538 539 540 541 542 543 544
    // // 2: Don't run remained because we're already in a remained function
    // //    otherwise stack may overflow.
    // TaskGroup::sched_to(&g, bw->tid, false/*2*/);
}

static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value,
                                   const timespec* abstime) {
    // sys futex needs relative timeout.
    // Compute diff between abstime and now.
    timespec* ptimeout = NULL;
    timespec timeout;
    if (abstime != NULL) {
545 546
        const int64_t timeout_us = butil::timespec_to_microseconds(*abstime) -
            butil::gettimeofday_us();
547
        if (timeout_us < MIN_SLEEP_US) {
gejun's avatar
gejun committed
548 549 550
            errno = ETIMEDOUT;
            return -1;
        }
551
        timeout = butil::microseconds_to_timespec(timeout_us);
gejun's avatar
gejun committed
552 553 554 555 556 557
        ptimeout = &timeout;
    }

    TaskMeta* task = NULL;
    ButexPthreadWaiter pw;
    pw.tid = 0;
558
    pw.sig.store(PTHREAD_NOT_SIGNALLED, butil::memory_order_relaxed);
gejun's avatar
gejun committed
559 560 561 562
    int rc = 0;
    
    if (g) {
        task = g->current_task();
563
        task->current_waiter.store(&pw, butil::memory_order_release);
gejun's avatar
gejun committed
564
    }
gejun's avatar
gejun committed
565
    b->waiter_lock.lock();
566 567 568 569 570 571 572 573 574 575 576
    if (b->value.load(butil::memory_order_relaxed) != expected_value) {
        b->waiter_lock.unlock();
        errno = EWOULDBLOCK;
        rc = -1;
    } else if (task != NULL && task->interrupted) {
        b->waiter_lock.unlock();
        // Race with set and may consume multiple interruptions, which are OK.
        task->interrupted = false;
        errno = EINTR;
        rc = -1;
    } else {
gejun's avatar
gejun committed
577
        b->waiters.Append(&pw);
578
        pw.container.store(b, butil::memory_order_relaxed);
gejun's avatar
gejun committed
579 580 581 582 583 584 585 586 587 588
        b->waiter_lock.unlock();

#ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS
        bvar::Adder<int64_t>& num_waiters = butex_waiter_count();
        num_waiters << 1;
#endif
        rc = wait_pthread(pw, ptimeout);
#ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS
        num_waiters << -1;
#endif
gejun's avatar
gejun committed
589 590
    }
    if (task) {
591 592 593 594 595 596 597 598 599 600 601
        // If current_waiter is NULL, TaskGroup::interrupt() is running and
        // using pw, spin until current_waiter != NULL.
        BT_LOOP_WHEN(task->current_waiter.exchange(
                         NULL, butil::memory_order_acquire) == NULL,
                     30/*nops before sched_yield*/);
        if (task->interrupted) {
            task->interrupted = false;
            if (rc == 0) {
                errno = EINTR;
                return -1;
            }
gejun's avatar
gejun committed
602 603 604 605 606 607
        }
    }
    return rc;
}

int butex_wait(void* arg, int expected_value, const timespec* abstime) {
608 609
    Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
    if (b->value.load(butil::memory_order_relaxed) != expected_value) {
gejun's avatar
gejun committed
610 611 612
        errno = EWOULDBLOCK;
        // Sometimes we may take actions immediately after unmatched butex,
        // this fence makes sure that we see changes before changing butex.
613
        butil::atomic_thread_fence(butil::memory_order_acquire);
gejun's avatar
gejun committed
614 615 616 617 618 619 620 621 622
        return -1;
    }
    TaskGroup* g = tls_task_group;
    if (NULL == g || g->is_current_pthread_task()) {
        return butex_wait_from_pthread(g, b, expected_value, abstime);
    }
    ButexBthreadWaiter bbw;
    // tid is 0 iff the thread is non-bthread
    bbw.tid = g->current_tid();
623
    bbw.container.store(NULL, butil::memory_order_relaxed);
gejun's avatar
gejun committed
624 625
    bbw.task_meta = g->current_task();
    bbw.sleep_id = 0;
626
    bbw.waiter_state = WAITER_STATE_READY;
gejun's avatar
gejun committed
627 628 629 630 631 632 633
    bbw.expected_value = expected_value;
    bbw.initial_butex = b;
    bbw.control = g->control();

    if (abstime != NULL) {
        // Schedule timer before queueing. If the timer is triggered before
        // queueing, cancel queueing. This is a kind of optimistic locking.
634 635
        if (butil::timespec_to_microseconds(*abstime) <
            (butil::gettimeofday_us() + MIN_SLEEP_US)) {
636
            // Already timed out.
gejun's avatar
gejun committed
637 638 639 640 641 642 643 644 645 646
            errno = ETIMEDOUT;
            return -1;
        }
        bbw.sleep_id = get_global_timer_thread()->schedule(
            erase_from_butex_and_wakeup, &bbw, *abstime);
        if (!bbw.sleep_id) {  // TimerThread stopped.
            errno = ESTOP;
            return -1;
        }
    }
gejun's avatar
gejun committed
647
#ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS
648
    bvar::Adder<int64_t>& num_waiters = butex_waiter_count();
gejun's avatar
gejun committed
649
    num_waiters << 1;
gejun's avatar
gejun committed
650 651
#endif

652 653
    // release fence matches with acquire fence in interrupt_and_consume_waiters
    // in task_group.cpp to guarantee visibility of `interrupted'.
654
    bbw.task_meta->current_waiter.store(&bbw, butil::memory_order_release);
gejun's avatar
gejun committed
655 656 657 658 659 660 661 662
    g->set_remained(wait_for_butex, &bbw);
    TaskGroup::sched(&g);

    // erase_from_butex_and_wakeup (called by TimerThread) is possibly still
    // running and using bbw. The chance is small, just spin until it's done.
    BT_LOOP_WHEN(unsleep_if_necessary(&bbw, get_global_timer_thread()) < 0,
                 30/*nops before sched_yield*/);
    
663
    // If current_waiter is NULL, TaskGroup::interrupt() is running and using bbw.
gejun's avatar
gejun committed
664 665
    // Spin until current_waiter != NULL.
    BT_LOOP_WHEN(bbw.task_meta->current_waiter.exchange(
666
                     NULL, butil::memory_order_acquire) == NULL,
gejun's avatar
gejun committed
667
                 30/*nops before sched_yield*/);
gejun's avatar
gejun committed
668
#ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS
gejun's avatar
gejun committed
669
    num_waiters << -1;
gejun's avatar
gejun committed
670
#endif
gejun's avatar
gejun committed
671

672 673 674 675 676
    bool is_interrupted = false;
    if (bbw.task_meta->interrupted) {
        // Race with set and may consume multiple interruptions, which are OK.
        bbw.task_meta->interrupted = false;
        is_interrupted = true;
gejun's avatar
gejun committed
677 678 679 680 681
    }
    // If timed out as well as value unmatched, return ETIMEDOUT.
    if (WAITER_STATE_TIMEDOUT == bbw.waiter_state) {
        errno = ETIMEDOUT;
        return -1;
682
    } else if (WAITER_STATE_UNMATCHEDVALUE == bbw.waiter_state) {
gejun's avatar
gejun committed
683 684
        errno = EWOULDBLOCK;
        return -1;
685 686
    } else if (is_interrupted) {
        errno = EINTR;
gejun's avatar
gejun committed
687 688 689 690 691 692
        return -1;
    }
    return 0;
}

}  // namespace bthread
gejun's avatar
gejun committed
693

694
namespace butil {
gejun's avatar
gejun committed
695 696 697 698
template <> struct ObjectPoolBlockMaxItem<bthread::Butex> {
    static const size_t value = 128;
};
}