condition_variable.cpp 4.38 KB
Newer Older
gejun's avatar
gejun committed
1
// bthread - A M:N threading library to make applications more concurrent.
gejun's avatar
gejun committed
2
// Copyright (c) 2014 Baidu, Inc.
gejun's avatar
gejun committed
3 4 5 6 7 8 9 10 11 12 13 14
// 
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// 
//     http://www.apache.org/licenses/LICENSE-2.0
// 
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
gejun's avatar
gejun committed
15 16 17 18

// Author: Ge,Jun (gejun@baidu.com)
// Date: Sun Aug  3 12:46:15 CST 2014

19 20
#include "butil/atomicops.h"
#include "butil/macros.h"                         // BAIDU_CASSERT
gejun's avatar
gejun committed
21 22 23
#include "bthread/butex.h"                       // butex_*
#include "bthread/types.h"                       // bthread_cond_t

gejun's avatar
gejun committed
24 25
namespace bthread {
struct CondInternal {
26 27
    butil::atomic<bthread_mutex_t*> m;
    butil::atomic<int>* seq;
gejun's avatar
gejun committed
28 29
};

gejun's avatar
gejun committed
30
BAIDU_CASSERT(sizeof(CondInternal) == sizeof(bthread_cond_t),
gejun's avatar
gejun committed
31
              sizeof_innercond_must_equal_cond);
gejun's avatar
gejun committed
32
BAIDU_CASSERT(offsetof(CondInternal, m) == offsetof(bthread_cond_t, m),
gejun's avatar
gejun committed
33
              offsetof_cond_mutex_must_equal);
gejun's avatar
gejun committed
34
BAIDU_CASSERT(offsetof(CondInternal, seq) ==
gejun's avatar
gejun committed
35 36
              offsetof(bthread_cond_t, seq),
              offsetof_cond_seq_must_equal);
gejun's avatar
gejun committed
37
}
gejun's avatar
gejun committed
38 39 40 41

extern "C" {

extern int bthread_mutex_unlock(bthread_mutex_t*);
gejun's avatar
gejun committed
42
extern int bthread_mutex_lock_contended(bthread_mutex_t*);
gejun's avatar
gejun committed
43 44

int bthread_cond_init(bthread_cond_t* __restrict c,
gejun's avatar
gejun committed
45
                      const bthread_condattr_t*) {
gejun's avatar
gejun committed
46 47 48 49 50 51 52 53 54 55 56 57 58
    c->m = NULL;
    c->seq = bthread::butex_create_checked<int>();
    *c->seq = 0;
    return 0;
}

int bthread_cond_destroy(bthread_cond_t* c) {
    bthread::butex_destroy(c->seq);
    c->seq = NULL;
    return 0;
}

int bthread_cond_signal(bthread_cond_t* c) {
gejun's avatar
gejun committed
59
    bthread::CondInternal* ic = reinterpret_cast<bthread::CondInternal*>(c);
60 61
    // ic is probably dereferenced after fetch_add, save required fields before
    // this point
62 63
    butil::atomic<int>* const saved_seq = ic->seq;
    saved_seq->fetch_add(1, butil::memory_order_release);
64 65
    // don't touch ic any more
    bthread::butex_wake(saved_seq);
gejun's avatar
gejun committed
66 67 68 69
    return 0;
}

int bthread_cond_broadcast(bthread_cond_t* c) {
gejun's avatar
gejun committed
70
    bthread::CondInternal* ic = reinterpret_cast<bthread::CondInternal*>(c);
71 72
    bthread_mutex_t* m = ic->m.load(butil::memory_order_relaxed);
    butil::atomic<int>* const saved_seq = ic->seq;
gejun's avatar
gejun committed
73
    if (!m) {
gejun's avatar
gejun committed
74 75
        return 0;
    }
76
    void* const saved_butex = m->butex;
gejun's avatar
gejun committed
77
    // Wakeup one thread and requeue the rest on the mutex.
78
    ic->seq->fetch_add(1, butil::memory_order_release);
79
    bthread::butex_requeue(saved_seq, saved_butex);
gejun's avatar
gejun committed
80 81 82 83 84
    return 0;
}

int bthread_cond_wait(bthread_cond_t* __restrict c,
                      bthread_mutex_t* __restrict m) {
gejun's avatar
gejun committed
85
    bthread::CondInternal* ic = reinterpret_cast<bthread::CondInternal*>(c);
86 87
    const int expected_seq = ic->seq->load(butil::memory_order_relaxed);
    if (ic->m.load(butil::memory_order_relaxed) != m) {
gejun's avatar
gejun committed
88 89
        // bind m to c
        bthread_mutex_t* expected_m = NULL;
gejun's avatar
gejun committed
90
        if (!ic->m.compare_exchange_strong(
91
                expected_m, m, butil::memory_order_relaxed)) {
gejun's avatar
gejun committed
92 93 94 95
            return EINVAL;
        }
    }
    bthread_mutex_unlock(m);
gejun's avatar
gejun committed
96 97 98 99 100 101 102
    int rc1 = 0;
    if (bthread::butex_wait(ic->seq, expected_seq, NULL) < 0 &&
        errno != EWOULDBLOCK) {
        rc1 = errno;
    }
    const int rc2 = bthread_mutex_lock_contended(m);
    return (rc2 ? rc2 : rc1);
gejun's avatar
gejun committed
103 104 105 106 107
}

int bthread_cond_timedwait(bthread_cond_t* __restrict c,
                           bthread_mutex_t* __restrict m,
                           const struct timespec* __restrict abstime) {
gejun's avatar
gejun committed
108
    bthread::CondInternal* ic = reinterpret_cast<bthread::CondInternal*>(c);
109 110
    const int expected_seq = ic->seq->load(butil::memory_order_relaxed);
    if (ic->m.load(butil::memory_order_relaxed) != m) {
gejun's avatar
gejun committed
111 112
        // bind m to c
        bthread_mutex_t* expected_m = NULL;
gejun's avatar
gejun committed
113
        if (!ic->m.compare_exchange_strong(
114
                expected_m, m, butil::memory_order_relaxed)) {
gejun's avatar
gejun committed
115 116 117 118
            return EINVAL;
        }
    }
    bthread_mutex_unlock(m);
gejun's avatar
gejun committed
119
    int rc1 = 0;
gejun's avatar
gejun committed
120
    if (bthread::butex_wait(ic->seq, expected_seq, abstime) < 0 &&
gejun's avatar
gejun committed
121 122
        errno != EWOULDBLOCK) {
        rc1 = errno;
gejun's avatar
gejun committed
123
    }
gejun's avatar
gejun committed
124 125
    const int rc2 = bthread_mutex_lock_contended(m);
    return (rc2 ? rc2 : rc1);
gejun's avatar
gejun committed
126 127 128
}

}  // extern "C"