condition_variable.cpp 5.04 KB
Newer Older
gejun's avatar
gejun committed
1
// bthread - A M:N threading library to make applications more concurrent.
gejun's avatar
gejun committed
2
// Copyright (c) 2014 Baidu, Inc.
gejun's avatar
gejun committed
3 4 5 6 7 8 9 10 11 12 13 14
// 
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// 
//     http://www.apache.org/licenses/LICENSE-2.0
// 
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
gejun's avatar
gejun committed
15 16 17 18

// Author: Ge,Jun (gejun@baidu.com)
// Date: Sun Aug  3 12:46:15 CST 2014

19 20
#include "butil/atomicops.h"
#include "butil/macros.h"                         // BAIDU_CASSERT
gejun's avatar
gejun committed
21 22 23
#include "bthread/butex.h"                       // butex_*
#include "bthread/types.h"                       // bthread_cond_t

gejun's avatar
gejun committed
24 25
namespace bthread {
struct CondInternal {
26 27
    butil::atomic<bthread_mutex_t*> m;
    butil::atomic<int>* seq;
gejun's avatar
gejun committed
28 29
};

gejun's avatar
gejun committed
30
BAIDU_CASSERT(sizeof(CondInternal) == sizeof(bthread_cond_t),
gejun's avatar
gejun committed
31
              sizeof_innercond_must_equal_cond);
gejun's avatar
gejun committed
32
BAIDU_CASSERT(offsetof(CondInternal, m) == offsetof(bthread_cond_t, m),
gejun's avatar
gejun committed
33
              offsetof_cond_mutex_must_equal);
gejun's avatar
gejun committed
34
BAIDU_CASSERT(offsetof(CondInternal, seq) ==
gejun's avatar
gejun committed
35 36
              offsetof(bthread_cond_t, seq),
              offsetof_cond_seq_must_equal);
gejun's avatar
gejun committed
37
}
gejun's avatar
gejun committed
38 39 40 41

extern "C" {

extern int bthread_mutex_unlock(bthread_mutex_t*);
gejun's avatar
gejun committed
42
extern int bthread_mutex_lock_contended(bthread_mutex_t*);
gejun's avatar
gejun committed
43 44

int bthread_cond_init(bthread_cond_t* __restrict c,
gejun's avatar
gejun committed
45
                      const bthread_condattr_t*) {
gejun's avatar
gejun committed
46 47 48 49 50 51 52 53 54 55 56 57 58
    c->m = NULL;
    c->seq = bthread::butex_create_checked<int>();
    *c->seq = 0;
    return 0;
}

int bthread_cond_destroy(bthread_cond_t* c) {
    bthread::butex_destroy(c->seq);
    c->seq = NULL;
    return 0;
}

int bthread_cond_signal(bthread_cond_t* c) {
gejun's avatar
gejun committed
59
    bthread::CondInternal* ic = reinterpret_cast<bthread::CondInternal*>(c);
60 61
    // ic is probably dereferenced after fetch_add, save required fields before
    // this point
62 63
    butil::atomic<int>* const saved_seq = ic->seq;
    saved_seq->fetch_add(1, butil::memory_order_release);
64 65
    // don't touch ic any more
    bthread::butex_wake(saved_seq);
gejun's avatar
gejun committed
66 67 68 69
    return 0;
}

int bthread_cond_broadcast(bthread_cond_t* c) {
gejun's avatar
gejun committed
70
    bthread::CondInternal* ic = reinterpret_cast<bthread::CondInternal*>(c);
71 72
    bthread_mutex_t* m = ic->m.load(butil::memory_order_relaxed);
    butil::atomic<int>* const saved_seq = ic->seq;
gejun's avatar
gejun committed
73
    if (!m) {
gejun's avatar
gejun committed
74 75
        return 0;
    }
76
    void* const saved_butex = m->butex;
gejun's avatar
gejun committed
77
    // Wakeup one thread and requeue the rest on the mutex.
78
    ic->seq->fetch_add(1, butil::memory_order_release);
79
    bthread::butex_requeue(saved_seq, saved_butex);
gejun's avatar
gejun committed
80 81 82 83 84
    return 0;
}

int bthread_cond_wait(bthread_cond_t* __restrict c,
                      bthread_mutex_t* __restrict m) {
gejun's avatar
gejun committed
85
    bthread::CondInternal* ic = reinterpret_cast<bthread::CondInternal*>(c);
86 87
    const int expected_seq = ic->seq->load(butil::memory_order_relaxed);
    if (ic->m.load(butil::memory_order_relaxed) != m) {
gejun's avatar
gejun committed
88 89
        // bind m to c
        bthread_mutex_t* expected_m = NULL;
gejun's avatar
gejun committed
90
        if (!ic->m.compare_exchange_strong(
91
                expected_m, m, butil::memory_order_relaxed)) {
gejun's avatar
gejun committed
92 93 94 95
            return EINVAL;
        }
    }
    bthread_mutex_unlock(m);
gejun's avatar
gejun committed
96 97
    int rc1 = 0;
    if (bthread::butex_wait(ic->seq, expected_seq, NULL) < 0 &&
98 99 100 101 102 103 104 105 106 107 108 109
        errno != EWOULDBLOCK && errno != EINTR/*note*/) {
        // EINTR should not be returned by cond_*wait according to docs on
        // pthread, however spurious wake-up is OK, just as we do here
        // so that users can check flags in the loop often companioning
        // with the cond_wait ASAP. For example:
        //   mutex.lock();
        //   while (!stop && other-predicates) {
        //     cond_wait(&mutex);
        //   }
        //   mutex.unlock();
        // After interruption, above code should wake up from the cond_wait
        // soon and check the `stop' flag and other predicates.
gejun's avatar
gejun committed
110 111 112 113
        rc1 = errno;
    }
    const int rc2 = bthread_mutex_lock_contended(m);
    return (rc2 ? rc2 : rc1);
gejun's avatar
gejun committed
114 115 116 117 118
}

int bthread_cond_timedwait(bthread_cond_t* __restrict c,
                           bthread_mutex_t* __restrict m,
                           const struct timespec* __restrict abstime) {
gejun's avatar
gejun committed
119
    bthread::CondInternal* ic = reinterpret_cast<bthread::CondInternal*>(c);
120 121
    const int expected_seq = ic->seq->load(butil::memory_order_relaxed);
    if (ic->m.load(butil::memory_order_relaxed) != m) {
gejun's avatar
gejun committed
122 123
        // bind m to c
        bthread_mutex_t* expected_m = NULL;
gejun's avatar
gejun committed
124
        if (!ic->m.compare_exchange_strong(
125
                expected_m, m, butil::memory_order_relaxed)) {
gejun's avatar
gejun committed
126 127 128 129
            return EINVAL;
        }
    }
    bthread_mutex_unlock(m);
gejun's avatar
gejun committed
130
    int rc1 = 0;
gejun's avatar
gejun committed
131
    if (bthread::butex_wait(ic->seq, expected_seq, abstime) < 0 &&
132 133
        errno != EWOULDBLOCK && errno != EINTR/*note*/) {
        // note: see comments in bthread_cond_wait on EINTR.
gejun's avatar
gejun committed
134
        rc1 = errno;
gejun's avatar
gejun committed
135
    }
gejun's avatar
gejun committed
136 137
    const int rc2 = bthread_mutex_lock_contended(m);
    return (rc2 ? rc2 : rc1);
gejun's avatar
gejun committed
138 139 140
}

}  // extern "C"