butex.cpp 24.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

gejun's avatar
gejun committed
18 19 20 21
// bthread - A M:N threading library to make applications more concurrent.

// Date: Tue Jul 22 17:30:12 CST 2014

22 23 24 25
#include "butil/atomicops.h"                // butil::atomic
#include "butil/scoped_lock.h"              // BAIDU_SCOPED_LOCK
#include "butil/macros.h"
#include "butil/containers/linked_list.h"   // LinkNode
26
#ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS
27
#include "butil/memory/singleton_on_pthread_once.h"
28
#endif
29 30
#include "butil/logging.h"
#include "butil/object_pool.h"
31
#include "bthread/errno.h"                 // EWOULDBLOCK
gejun's avatar
gejun committed
32 33 34 35 36 37
#include "bthread/sys_futex.h"             // futex_*
#include "bthread/processor.h"             // cpu_relax
#include "bthread/task_control.h"          // TaskControl
#include "bthread/task_group.h"            // TaskGroup
#include "bthread/timer_thread.h"
#include "bthread/butex.h"
38
#include "bthread/mutex.h"
gejun's avatar
gejun committed
39 40

// This file implements butex.h
41 42
// Provides futex-like semantics which is sequenced wait and wake operations
// and guaranteed visibilities.
gejun's avatar
gejun committed
43 44
//
// If wait is sequenced before wake:
45
//    [thread1]             [thread2]
gejun's avatar
gejun committed
46 47 48 49 50
//    wait()                value = new_value
//                          wake()
// wait() sees unmatched value(fail to wait), or wake() sees the waiter.
//
// If wait is sequenced after wake:
51
//    [thread1]             [thread2]
gejun's avatar
gejun committed
52 53 54
//                          value = new_value
//                          wake()
//    wait()
55
// wake() must provide some sort of memory fence to prevent assignment
gejun's avatar
gejun committed
56 57 58 59 60
// of value to be reordered after it. Thus the value is visible to wait()
// as well.

namespace bthread {

gejun's avatar
gejun committed
61
#ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS
gejun's avatar
gejun committed
62 63 64 65
struct ButexWaiterCount : public bvar::Adder<int64_t> {
    ButexWaiterCount() : bvar::Adder<int64_t>("bthread_butex_waiter_count") {}
};
inline bvar::Adder<int64_t>& butex_waiter_count() {
66
    return *butil::get_leaky_singleton<ButexWaiterCount>();
gejun's avatar
gejun committed
67
}
gejun's avatar
gejun committed
68
#endif
gejun's avatar
gejun committed
69 70 71

// If a thread would suspend for less than so many microseconds, return
// ETIMEDOUT directly.
72 73
// Use 1: sleeping for less than 2 microsecond is inefficient and useless.
static const int64_t MIN_SLEEP_US = 2; 
gejun's avatar
gejun committed
74 75 76

enum WaiterState {
    WAITER_STATE_NONE,
77 78 79 80
    WAITER_STATE_READY,
    WAITER_STATE_TIMEDOUT,
    WAITER_STATE_UNMATCHEDVALUE,
    WAITER_STATE_INTERRUPTED,
gejun's avatar
gejun committed
81 82 83 84
};

struct Butex;

85
struct ButexWaiter : public butil::LinkNode<ButexWaiter> {
gejun's avatar
gejun committed
86 87 88
    // tids of pthreads are 0
    bthread_t tid;

89 90
    // Erasing node from middle of LinkedList is thread-unsafe, we need
    // to hold its container's lock.
91
    butil::atomic<Butex*> container;
gejun's avatar
gejun committed
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
};

// non_pthread_task allocates this structure on stack and queue it in
// Butex::waiters.
struct ButexBthreadWaiter : public ButexWaiter {
    TaskMeta* task_meta;
    TimerThread::TaskId sleep_id;
    WaiterState waiter_state;
    int expected_value;
    Butex* initial_butex;
    TaskControl* control;
};

// pthread_task or main_task allocates this structure on stack and queue it
// in Butex::waiters.
struct ButexPthreadWaiter : public ButexWaiter {
108
    butil::atomic<int> sig;
gejun's avatar
gejun committed
109 110
};

111
typedef butil::LinkedList<ButexWaiter> ButexWaiterList;
gejun's avatar
gejun committed
112

113
enum ButexPthreadSignal { PTHREAD_NOT_SIGNALLED, PTHREAD_SIGNALLED };
gejun's avatar
gejun committed
114

gejun's avatar
gejun committed
115 116
struct BAIDU_CACHELINE_ALIGNMENT Butex {
    Butex() {}
gejun's avatar
gejun committed
117 118
    ~Butex() {}

119
    butil::atomic<int> value;
gejun's avatar
gejun committed
120
    ButexWaiterList waiters;
121
    internal::FastPthreadMutex waiter_lock;
gejun's avatar
gejun committed
122 123 124
};

BAIDU_CASSERT(offsetof(Butex, value) == 0, offsetof_value_must_0);
gejun's avatar
gejun committed
125
BAIDU_CASSERT(sizeof(Butex) == BAIDU_CACHELINE_SIZE, butex_fits_in_one_cacheline);
gejun's avatar
gejun committed
126

127 128 129 130 131 132 133
static void wakeup_pthread(ButexPthreadWaiter* pw) {
    // release fence makes wait_pthread see changes before wakeup.
    pw->sig.store(PTHREAD_SIGNALLED, butil::memory_order_release);
    // At this point, wait_pthread() possibly has woken up and destroyed `pw'.
    // In which case, futex_wake_private() should return EFAULT.
    // If crash happens in future, `pw' can be made TLS and never destroyed
    // to solve the issue.
gejun's avatar
gejun committed
134 135 136
    futex_wake_private(&pw->sig, 1);
}

137
bool erase_from_butex(ButexWaiter*, bool, WaiterState);
gejun's avatar
gejun committed
138 139 140

int wait_pthread(ButexPthreadWaiter& pw, timespec* ptimeout) {
    while (true) {
141 142 143 144 145
        const int rc = futex_wait_private(&pw.sig, PTHREAD_NOT_SIGNALLED, ptimeout);
        if (PTHREAD_NOT_SIGNALLED != pw.sig.load(butil::memory_order_acquire)) {
            // If `sig' is changed, wakeup_pthread() must be called and `pw'
            // is already removed from the butex.
            // Acquire fence makes this thread sees changes before wakeup.
gejun's avatar
gejun committed
146 147 148
            return rc;
        }
        if (rc != 0 && errno == ETIMEDOUT) {
149 150 151 152 153 154 155 156 157 158 159 160 161
            // Note that we don't handle the EINTR from futex_wait here since
            // pthreads waiting on a butex should behave similarly as bthreads
            // which are not able to be woken-up by signals.
            // EINTR on butex is only producible by TaskGroup::interrupt().

            // `pw' is still in the queue, remove it.
            if (!erase_from_butex(&pw, false, WAITER_STATE_TIMEDOUT)) {
                // Another thread is erasing `pw' as well, wait for the signal.
                // Acquire fence makes this thread sees changes before wakeup.
                if (pw.sig.load(butil::memory_order_acquire) == PTHREAD_NOT_SIGNALLED) {
                    ptimeout = NULL; // already timedout, ptimeout is expired.
                    continue;
                }
gejun's avatar
gejun committed
162 163 164 165 166 167 168 169 170 171
            }
            return rc;
        }
    }
}

extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;

// Returns 0 when no need to unschedule or successfully unscheduled,
// -1 otherwise.
172 173
inline int unsleep_if_necessary(ButexBthreadWaiter* w,
                                TimerThread* timer_thread) {
gejun's avatar
gejun committed
174 175 176 177 178 179 180 181 182 183 184
    if (!w->sleep_id) {
        return 0;
    }
    if (timer_thread->unschedule(w->sleep_id) > 0) {
        // the callback is running.
        return -1;
    }
    w->sleep_id = 0;
    return 0;
}

gejun's avatar
gejun committed
185
// Use ObjectPool(which never frees memory) to solve the race between
gejun's avatar
gejun committed
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
// butex_wake() and butex_destroy(). The race is as follows:
//
//   class Event {
//   public:
//     void wait() {
//       _mutex.lock();
//       if (!_done) {
//         _cond.wait(&_mutex);
//       }
//       _mutex.unlock();
//     }
//     void signal() {
//       _mutex.lock();
//       if (!_done) {
//         _done = true;
//         _cond.signal();
//       }
//       _mutex.unlock();  /*1*/
//     }
//   private:
//     bool _done = false;
//     Mutex _mutex;
//     Condition _cond;
//   };
//
//   [Thread1]                         [Thread2]
//   foo() {
//     Event event;
//     pass_to_thread2(&event);  --->  event.signal();
//     event.wait();
//   } <-- event destroyed
//   
gejun's avatar
gejun committed
218 219
// Summary: Thread1 passes a stateful condition to Thread2 and waits until
// the condition being signalled, which basically means the associated
gejun's avatar
gejun committed
220
// job is done and Thread1 can release related resources including the mutex
gejun's avatar
gejun committed
221 222 223 224
// and condition. The scenario is fine and the code is correct.
// The race needs a closer look. The unlock at /*1*/ may have different 
// implementations, but in which the last step is probably an atomic store
// and butex_wake(), like this:
gejun's avatar
gejun committed
225 226 227 228
//
//   locked->store(0);
//   butex_wake(locked);
//
gejun's avatar
gejun committed
229 230
// The `locked' represents the locking status of the mutex. The issue is that
// just after the store(), the mutex is already unlocked and the code in
gejun's avatar
gejun committed
231
// Event.wait() may successfully grab the lock and go through everything
gejun's avatar
gejun committed
232 233
// left and leave foo() function, destroying the mutex and butex, making
// the butex_wake(locked) crash.
gejun's avatar
gejun committed
234
// To solve this issue, one method is to add reference before store and
gejun's avatar
gejun committed
235 236 237 238 239 240
// release the reference after butex_wake. However reference countings need
// to be added in nearly every user scenario of butex_wake(), which is very
// error-prone. Another method is never freeing butex, with the side effect 
// that butex_wake() may wake up an unrelated butex(the one reuses the memory)
// and cause spurious wakeups. According to our observations, the race is 
// infrequent, even rare. The extra spurious wakeups should be acceptable.
gejun's avatar
gejun committed
241 242

void* butex_create() {
243
    Butex* b = butil::get_object<Butex>();
gejun's avatar
gejun committed
244
    if (b) {
gejun's avatar
gejun committed
245 246 247 248 249 250
        return &b->value;
    }
    return NULL;
}

void butex_destroy(void* butex) {
gejun's avatar
gejun committed
251 252
    if (!butex) {
        return;
gejun's avatar
gejun committed
253
    }
gejun's avatar
gejun committed
254
    Butex* b = static_cast<Butex*>(
255 256
        container_of(static_cast<butil::atomic<int>*>(butex), Butex, value));
    butil::return_object(b);
gejun's avatar
gejun committed
257 258 259 260 261 262 263 264
}

inline TaskGroup* get_task_group(TaskControl* c) {
    TaskGroup* g = tls_task_group;
    return g ? g : c->choose_one_group();
}

int butex_wake(void* arg) {
265
    Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
gejun's avatar
gejun committed
266 267 268 269 270 271 272 273
    ButexWaiter* front = NULL;
    {
        BAIDU_SCOPED_LOCK(b->waiter_lock);
        if (b->waiters.empty()) {
            return 0;
        }
        front = b->waiters.head()->value();
        front->RemoveFromList();
274
        front->container.store(NULL, butil::memory_order_relaxed);
gejun's avatar
gejun committed
275 276 277 278 279 280 281 282 283 284 285
    }
    if (front->tid == 0) {
        wakeup_pthread(static_cast<ButexPthreadWaiter*>(front));
        return 1;
    }
    ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front);
    unsleep_if_necessary(bbw, get_global_timer_thread());
    TaskGroup* g = tls_task_group;
    if (g) {
        TaskGroup::exchange(&g, bbw->tid);
    } else {
286
        bbw->control->choose_one_group()->ready_to_run_remote(bbw->tid);
gejun's avatar
gejun committed
287 288 289 290
    }
    return 1;
}

gejun's avatar
gejun committed
291
int butex_wake_all(void* arg) {
292
    Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
gejun's avatar
gejun committed
293 294 295 296 297 298 299 300

    ButexWaiterList bthread_waiters;
    ButexWaiterList pthread_waiters;
    {
        BAIDU_SCOPED_LOCK(b->waiter_lock);
        while (!b->waiters.empty()) {
            ButexWaiter* bw = b->waiters.head()->value();
            bw->RemoveFromList();
301
            bw->container.store(NULL, butil::memory_order_relaxed);
gejun's avatar
gejun committed
302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334
            if (bw->tid) {
                bthread_waiters.Append(bw);
            } else {
                pthread_waiters.Append(bw);
            }
        }
    }

    int nwakeup = 0;
    while (!pthread_waiters.empty()) {
        ButexPthreadWaiter* bw = static_cast<ButexPthreadWaiter*>(
            pthread_waiters.head()->value());
        bw->RemoveFromList();
        wakeup_pthread(bw);
        ++nwakeup;
    }
    if (bthread_waiters.empty()) {
        return nwakeup;
    }
    // We will exchange with first waiter in the end.
    ButexBthreadWaiter* next = static_cast<ButexBthreadWaiter*>(
        bthread_waiters.head()->value());
    next->RemoveFromList();
    unsleep_if_necessary(next, get_global_timer_thread());
    ++nwakeup;
    TaskGroup* g = get_task_group(next->control);
    const int saved_nwakeup = nwakeup;
    while (!bthread_waiters.empty()) {
        // pop reversely
        ButexBthreadWaiter* w = static_cast<ButexBthreadWaiter*>(
            bthread_waiters.tail()->value());
        w->RemoveFromList();
        unsleep_if_necessary(w, get_global_timer_thread());
335
        g->ready_to_run_general(w->tid, true);
gejun's avatar
gejun committed
336 337 338
        ++nwakeup;
    }
    if (saved_nwakeup != nwakeup) {
339
        g->flush_nosignal_tasks_general();
gejun's avatar
gejun committed
340 341 342 343
    }
    if (g == tls_task_group) {
        TaskGroup::exchange(&g, next->tid);
    } else {
344
        g->ready_to_run_remote(next->tid);
gejun's avatar
gejun committed
345 346 347 348 349
    }
    return nwakeup;
}

int butex_wake_except(void* arg, bthread_t excluded_bthread) {
350
    Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
gejun's avatar
gejun committed
351 352 353 354 355 356 357 358 359 360 361 362 363

    ButexWaiterList bthread_waiters;
    ButexWaiterList pthread_waiters;
    {
        ButexWaiter* excluded_waiter = NULL;
        BAIDU_SCOPED_LOCK(b->waiter_lock);
        while (!b->waiters.empty()) {
            ButexWaiter* bw = b->waiters.head()->value();
            bw->RemoveFromList();

            if (bw->tid) {
                if (bw->tid != excluded_bthread) {
                    bthread_waiters.Append(bw);
364
                    bw->container.store(NULL, butil::memory_order_relaxed);
gejun's avatar
gejun committed
365 366 367 368
                } else {
                    excluded_waiter = bw;
                }
            } else {
369
                bw->container.store(NULL, butil::memory_order_relaxed);
gejun's avatar
gejun committed
370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401
                pthread_waiters.Append(bw);
            }
        }

        if (excluded_waiter) {
            b->waiters.Append(excluded_waiter);
        }
    }

    int nwakeup = 0;
    while (!pthread_waiters.empty()) {
        ButexPthreadWaiter* bw = static_cast<ButexPthreadWaiter*>(
            pthread_waiters.head()->value());
        bw->RemoveFromList();
        wakeup_pthread(bw);
        ++nwakeup;
    }

    if (bthread_waiters.empty()) {
        return nwakeup;
    }
    ButexBthreadWaiter* front = static_cast<ButexBthreadWaiter*>(
                bthread_waiters.head()->value());

    TaskGroup* g = get_task_group(front->control);
    const int saved_nwakeup = nwakeup;
    do {
        // pop reversely
        ButexBthreadWaiter* w = static_cast<ButexBthreadWaiter*>(
            bthread_waiters.tail()->value());
        w->RemoveFromList();
        unsleep_if_necessary(w, get_global_timer_thread());
402
        g->ready_to_run_general(w->tid, true);
gejun's avatar
gejun committed
403 404 405
        ++nwakeup;
    } while (!bthread_waiters.empty());
    if (saved_nwakeup != nwakeup) {
406
        g->flush_nosignal_tasks_general();
gejun's avatar
gejun committed
407 408 409 410 411
    }
    return nwakeup;
}

int butex_requeue(void* arg, void* arg2) {
412 413
    Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
    Butex* m = container_of(static_cast<butil::atomic<int>*>(arg2), Butex, value);
gejun's avatar
gejun committed
414 415 416

    ButexWaiter* front = NULL;
    {
417 418
        std::unique_lock<internal::FastPthreadMutex> lck1(b->waiter_lock, std::defer_lock);
        std::unique_lock<internal::FastPthreadMutex> lck2(m->waiter_lock, std::defer_lock);
419
        butil::double_lock(lck1, lck2);
gejun's avatar
gejun committed
420 421 422 423 424 425
        if (b->waiters.empty()) {
            return 0;
        }

        front = b->waiters.head()->value();
        front->RemoveFromList();
426
        front->container.store(NULL, butil::memory_order_relaxed);
gejun's avatar
gejun committed
427 428 429 430 431

        while (!b->waiters.empty()) {
            ButexWaiter* bw = b->waiters.head()->value();
            bw->RemoveFromList();
            m->waiters.Append(bw);
432
            bw->container.store(m, butil::memory_order_relaxed);
gejun's avatar
gejun committed
433 434 435 436 437 438 439 440 441 442 443 444 445
        }
    }

    if (front->tid == 0) {  // which is a pthread
        wakeup_pthread(static_cast<ButexPthreadWaiter*>(front));
        return 1;
    }
    ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front);
    unsleep_if_necessary(bbw, get_global_timer_thread());
    TaskGroup* g = tls_task_group;
    if (g) {
        TaskGroup::exchange(&g, front->tid);
    } else {
446
        bbw->control->choose_one_group()->ready_to_run_remote(front->tid);
gejun's avatar
gejun committed
447 448 449 450 451 452
    }
    return 1;
}

// Callable from multiple threads, at most one thread may wake up the waiter.
static void erase_from_butex_and_wakeup(void* arg) {
453 454 455 456 457 458
    erase_from_butex(static_cast<ButexWaiter*>(arg), true, WAITER_STATE_TIMEDOUT);
}

// Used in task_group.cpp
bool erase_from_butex_because_of_interruption(ButexWaiter* bw) {
    return erase_from_butex(bw, true, WAITER_STATE_INTERRUPTED);
gejun's avatar
gejun committed
459 460
}

461
inline bool erase_from_butex(ButexWaiter* bw, bool wakeup, WaiterState state) {
gejun's avatar
gejun committed
462
    // `bw' is guaranteed to be valid inside this function because waiter
463
    // will wait until this function being cancelled or finished.
gejun's avatar
gejun committed
464 465 466 467
    // NOTE: This function must be no-op when bw->container is NULL.
    bool erased = false;
    Butex* b;
    int saved_errno = errno;
468
    while ((b = bw->container.load(butil::memory_order_acquire))) {
gejun's avatar
gejun committed
469 470
        // b can be NULL when the waiter is scheduled but queued.
        BAIDU_SCOPED_LOCK(b->waiter_lock);
471
        if (b == bw->container.load(butil::memory_order_relaxed)) {
gejun's avatar
gejun committed
472
            bw->RemoveFromList();
473
            bw->container.store(NULL, butil::memory_order_relaxed);
gejun's avatar
gejun committed
474
            if (bw->tid) {
475
                static_cast<ButexBthreadWaiter*>(bw)->waiter_state = state;
gejun's avatar
gejun committed
476 477 478 479 480 481 482 483
            }
            erased = true;
            break;
        }
    }
    if (erased && wakeup) {
        if (bw->tid) {
            ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(bw);
484
            get_task_group(bbw->control)->ready_to_run_general(bw->tid);
gejun's avatar
gejun committed
485 486 487 488 489 490 491 492 493 494 495 496
        } else {
            ButexPthreadWaiter* pw = static_cast<ButexPthreadWaiter*>(bw);
            wakeup_pthread(pw);
        }
    }
    errno = saved_errno;
    return erased;
}

static void wait_for_butex(void* arg) {
    ButexBthreadWaiter* const bw = static_cast<ButexBthreadWaiter*>(arg);
    Butex* const b = bw->initial_butex;
497
    // 1: waiter with timeout should have waiter_state == WAITER_STATE_READY
gejun's avatar
gejun committed
498
    //    before they're queued, otherwise the waiter is already timedout
gejun's avatar
gejun committed
499 500 501
    //    and removed by TimerThread, in which case we should stop queueing.
    //
    // Visibility of waiter_state:
502
    //    [bthread]                         [TimerThread]
gejun's avatar
gejun committed
503 504 505 506 507
    //    waiter_state = TIMED
    //    tt_lock { add task }
    //                                      tt_lock { get task }
    //                                      waiter_lock { waiter_state=TIMEDOUT }
    //    waiter_lock { use waiter_state }
gejun's avatar
gejun committed
508 509 510
    // tt_lock represents TimerThread::_mutex. Visibility of waiter_state is
    // sequenced by two locks, both threads are guaranteed to see the correct
    // value.
gejun's avatar
gejun committed
511 512
    {
        BAIDU_SCOPED_LOCK(b->waiter_lock);
513 514 515 516
        if (b->value.load(butil::memory_order_relaxed) != bw->expected_value) {
            bw->waiter_state = WAITER_STATE_UNMATCHEDVALUE;
        } else if (bw->waiter_state == WAITER_STATE_READY/*1*/ &&
                   !bw->task_meta->interrupted) {
gejun's avatar
gejun committed
517
            b->waiters.Append(bw);
518
            bw->container.store(b, butil::memory_order_relaxed);
gejun's avatar
gejun committed
519 520 521 522 523
            return;
        }
    }
    
    // b->container is NULL which makes erase_from_butex_and_wakeup() and
524
    // TaskGroup::interrupt() no-op, there's no race between following code and
gejun's avatar
gejun committed
525 526 527 528 529 530 531 532 533
    // the two functions. The on-stack ButexBthreadWaiter is safe to use and
    // bw->waiter_state will not change again.
    unsleep_if_necessary(bw, get_global_timer_thread());
    tls_task_group->ready_to_run(bw->tid);
    // FIXME: jump back to original thread is buggy.
    
    // // Value unmatched or waiter is already woken up by TimerThread, jump
    // // back to original bthread.
    // TaskGroup* g = tls_task_group;
534 535
    // ReadyToRunArgs args = { g->current_tid(), false };
    // g->set_remained(TaskGroup::ready_to_run_in_worker, &args);
gejun's avatar
gejun committed
536 537 538 539 540 541 542 543 544 545 546 547
    // // 2: Don't run remained because we're already in a remained function
    // //    otherwise stack may overflow.
    // TaskGroup::sched_to(&g, bw->tid, false/*2*/);
}

static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value,
                                   const timespec* abstime) {
    // sys futex needs relative timeout.
    // Compute diff between abstime and now.
    timespec* ptimeout = NULL;
    timespec timeout;
    if (abstime != NULL) {
548 549
        const int64_t timeout_us = butil::timespec_to_microseconds(*abstime) -
            butil::gettimeofday_us();
550
        if (timeout_us < MIN_SLEEP_US) {
gejun's avatar
gejun committed
551 552 553
            errno = ETIMEDOUT;
            return -1;
        }
554
        timeout = butil::microseconds_to_timespec(timeout_us);
gejun's avatar
gejun committed
555 556 557 558 559 560
        ptimeout = &timeout;
    }

    TaskMeta* task = NULL;
    ButexPthreadWaiter pw;
    pw.tid = 0;
561
    pw.sig.store(PTHREAD_NOT_SIGNALLED, butil::memory_order_relaxed);
gejun's avatar
gejun committed
562 563 564 565
    int rc = 0;
    
    if (g) {
        task = g->current_task();
566
        task->current_waiter.store(&pw, butil::memory_order_release);
gejun's avatar
gejun committed
567
    }
gejun's avatar
gejun committed
568
    b->waiter_lock.lock();
569 570 571 572 573 574 575 576 577 578 579
    if (b->value.load(butil::memory_order_relaxed) != expected_value) {
        b->waiter_lock.unlock();
        errno = EWOULDBLOCK;
        rc = -1;
    } else if (task != NULL && task->interrupted) {
        b->waiter_lock.unlock();
        // Race with set and may consume multiple interruptions, which are OK.
        task->interrupted = false;
        errno = EINTR;
        rc = -1;
    } else {
gejun's avatar
gejun committed
580
        b->waiters.Append(&pw);
581
        pw.container.store(b, butil::memory_order_relaxed);
gejun's avatar
gejun committed
582 583 584 585 586 587 588 589 590 591
        b->waiter_lock.unlock();

#ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS
        bvar::Adder<int64_t>& num_waiters = butex_waiter_count();
        num_waiters << 1;
#endif
        rc = wait_pthread(pw, ptimeout);
#ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS
        num_waiters << -1;
#endif
gejun's avatar
gejun committed
592 593
    }
    if (task) {
594 595 596 597 598 599 600 601 602 603 604
        // If current_waiter is NULL, TaskGroup::interrupt() is running and
        // using pw, spin until current_waiter != NULL.
        BT_LOOP_WHEN(task->current_waiter.exchange(
                         NULL, butil::memory_order_acquire) == NULL,
                     30/*nops before sched_yield*/);
        if (task->interrupted) {
            task->interrupted = false;
            if (rc == 0) {
                errno = EINTR;
                return -1;
            }
gejun's avatar
gejun committed
605 606 607 608 609 610
        }
    }
    return rc;
}

int butex_wait(void* arg, int expected_value, const timespec* abstime) {
611 612
    Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
    if (b->value.load(butil::memory_order_relaxed) != expected_value) {
gejun's avatar
gejun committed
613 614 615
        errno = EWOULDBLOCK;
        // Sometimes we may take actions immediately after unmatched butex,
        // this fence makes sure that we see changes before changing butex.
616
        butil::atomic_thread_fence(butil::memory_order_acquire);
gejun's avatar
gejun committed
617 618 619 620 621 622 623 624 625
        return -1;
    }
    TaskGroup* g = tls_task_group;
    if (NULL == g || g->is_current_pthread_task()) {
        return butex_wait_from_pthread(g, b, expected_value, abstime);
    }
    ButexBthreadWaiter bbw;
    // tid is 0 iff the thread is non-bthread
    bbw.tid = g->current_tid();
626
    bbw.container.store(NULL, butil::memory_order_relaxed);
gejun's avatar
gejun committed
627 628
    bbw.task_meta = g->current_task();
    bbw.sleep_id = 0;
629
    bbw.waiter_state = WAITER_STATE_READY;
gejun's avatar
gejun committed
630 631 632 633 634 635 636
    bbw.expected_value = expected_value;
    bbw.initial_butex = b;
    bbw.control = g->control();

    if (abstime != NULL) {
        // Schedule timer before queueing. If the timer is triggered before
        // queueing, cancel queueing. This is a kind of optimistic locking.
637 638
        if (butil::timespec_to_microseconds(*abstime) <
            (butil::gettimeofday_us() + MIN_SLEEP_US)) {
639
            // Already timed out.
gejun's avatar
gejun committed
640 641 642 643 644 645 646 647 648 649
            errno = ETIMEDOUT;
            return -1;
        }
        bbw.sleep_id = get_global_timer_thread()->schedule(
            erase_from_butex_and_wakeup, &bbw, *abstime);
        if (!bbw.sleep_id) {  // TimerThread stopped.
            errno = ESTOP;
            return -1;
        }
    }
gejun's avatar
gejun committed
650
#ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS
651
    bvar::Adder<int64_t>& num_waiters = butex_waiter_count();
gejun's avatar
gejun committed
652
    num_waiters << 1;
gejun's avatar
gejun committed
653 654
#endif

655 656
    // release fence matches with acquire fence in interrupt_and_consume_waiters
    // in task_group.cpp to guarantee visibility of `interrupted'.
657
    bbw.task_meta->current_waiter.store(&bbw, butil::memory_order_release);
gejun's avatar
gejun committed
658 659 660 661 662 663 664 665
    g->set_remained(wait_for_butex, &bbw);
    TaskGroup::sched(&g);

    // erase_from_butex_and_wakeup (called by TimerThread) is possibly still
    // running and using bbw. The chance is small, just spin until it's done.
    BT_LOOP_WHEN(unsleep_if_necessary(&bbw, get_global_timer_thread()) < 0,
                 30/*nops before sched_yield*/);
    
666
    // If current_waiter is NULL, TaskGroup::interrupt() is running and using bbw.
gejun's avatar
gejun committed
667 668
    // Spin until current_waiter != NULL.
    BT_LOOP_WHEN(bbw.task_meta->current_waiter.exchange(
669
                     NULL, butil::memory_order_acquire) == NULL,
gejun's avatar
gejun committed
670
                 30/*nops before sched_yield*/);
gejun's avatar
gejun committed
671
#ifdef SHOW_BTHREAD_BUTEX_WAITER_COUNT_IN_VARS
gejun's avatar
gejun committed
672
    num_waiters << -1;
gejun's avatar
gejun committed
673
#endif
gejun's avatar
gejun committed
674

675 676 677 678 679
    bool is_interrupted = false;
    if (bbw.task_meta->interrupted) {
        // Race with set and may consume multiple interruptions, which are OK.
        bbw.task_meta->interrupted = false;
        is_interrupted = true;
gejun's avatar
gejun committed
680 681 682 683 684
    }
    // If timed out as well as value unmatched, return ETIMEDOUT.
    if (WAITER_STATE_TIMEDOUT == bbw.waiter_state) {
        errno = ETIMEDOUT;
        return -1;
685
    } else if (WAITER_STATE_UNMATCHEDVALUE == bbw.waiter_state) {
gejun's avatar
gejun committed
686 687
        errno = EWOULDBLOCK;
        return -1;
688 689
    } else if (is_interrupted) {
        errno = EINTR;
gejun's avatar
gejun committed
690 691 692 693 694 695
        return -1;
    }
    return 0;
}

}  // namespace bthread
gejun's avatar
gejun committed
696

697
namespace butil {
gejun's avatar
gejun committed
698 699 700 701
template <> struct ObjectPoolBlockMaxItem<bthread::Butex> {
    static const size_t value = 128;
};
}