sys_futex.cpp 4.04 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// bthread - A M:N threading library to make applications more concurrent.
// Copyright (c) 2012 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

16
// Author: Zhu,Jiashun (zhujiashun@baidu.com)
17 18 19 20
// Date: Wed Mar 14 17:44:58 CST 2018

#include "bthread/sys_futex.h"
#include "butil/scoped_lock.h"
21
#include "butil/atomicops.h"
22
#include <pthread.h>
23
#include <unordered_map>
24 25 26 27 28

#if defined(OS_MACOSX)

namespace bthread {

29 30
class SimuFutex {
public:
31 32
    SimuFutex() : counts(0)
                , ref(0) {
33 34 35 36 37 38 39
        pthread_mutex_init(&lock, NULL);
        pthread_cond_init(&cond, NULL);
    }
    ~SimuFutex() {
        pthread_mutex_destroy(&lock);
        pthread_cond_destroy(&cond);
    }
40 41 42 43

public:
    pthread_mutex_t lock;
    pthread_cond_t cond;
44 45
    int32_t counts;
    int32_t ref;
46 47 48
};

static pthread_mutex_t s_futex_map_mutex = PTHREAD_MUTEX_INITIALIZER;
49 50 51 52 53 54 55 56 57 58
static pthread_once_t init_futex_map_once = PTHREAD_ONCE_INIT;
static std::unordered_map<void*, SimuFutex>* s_futex_map = NULL;
static void InitFutexMap() {
    // Leave memory to process's clean up.
    s_futex_map = new (std::nothrow) std::unordered_map<void*, SimuFutex>();
    if (NULL == s_futex_map) {
        exit(1);
    }
    return;
}
59 60

int futex_wait_private(void* addr1, int expected, const timespec* timeout) {
61 62 63 64
    if (pthread_once(&init_futex_map_once, InitFutexMap) != 0) {
        LOG(FATAL) << "Fail to pthread_once";
        exit(1);
    }
65
    std::unique_lock<pthread_mutex_t> mu(s_futex_map_mutex);
66
    SimuFutex& simu_futex = (*s_futex_map)[addr1];
67
    ++simu_futex.ref;
68
    mu.unlock();
69

70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
    int rc = 0;
    {
        std::unique_lock<pthread_mutex_t> mu1(simu_futex.lock);
        if (static_cast<butil::atomic<int>*>(addr1)->load() == expected) {
            ++simu_futex.counts;
            if (timeout) {
                timespec timeout_abs = butil::timespec_from_now(*timeout);
                if ((rc = pthread_cond_timedwait(&simu_futex.cond, &simu_futex.lock, &timeout_abs)) != 0) {
                    errno = rc;
                    rc = -1;
                }
            } else {
                if ((rc = pthread_cond_wait(&simu_futex.cond, &simu_futex.lock)) != 0) {
                    errno = rc;
                    rc = -1;
                }
86
            }
87
            --simu_futex.counts;
88
        } else {
89 90
            errno = EAGAIN;
            rc = -1;
91
        }
92
    }
93 94 95

    std::unique_lock<pthread_mutex_t> mu1(s_futex_map_mutex);
    if (--simu_futex.ref == 0) {
96
        s_futex_map->erase(addr1);
97 98 99
    }
    mu1.unlock();
    return rc;
100 101 102
}

int futex_wake_private(void* addr1, int nwake) {
103 104 105 106
    if (pthread_once(&init_futex_map_once, InitFutexMap) != 0) {
        LOG(FATAL) << "Fail to pthread_once";
        exit(1);
    }
107
    std::unique_lock<pthread_mutex_t> mu(s_futex_map_mutex);
108 109
    auto it = s_futex_map->find(addr1);
    if (it == s_futex_map->end()) {
zhujiashun's avatar
zhujiashun committed
110
        mu.unlock();
111 112 113 114
        return 0;
    }
    SimuFutex& simu_futex = it->second;
    ++simu_futex.ref;
115
    mu.unlock();
116 117 118

    int nwakedup = 0;
    int rc = 0;
119 120 121 122 123 124 125 126 127 128
    {
        std::unique_lock<pthread_mutex_t> mu1(simu_futex.lock);
        nwake = (nwake < simu_futex.counts)? nwake: simu_futex.counts;
        for (int i = 0; i < nwake; ++i) {
            if ((rc = pthread_cond_signal(&simu_futex.cond)) != 0) {
                errno = rc;
                break;
            } else {
                ++nwakedup;
            }
129 130 131
        }
    }

132 133
    std::unique_lock<pthread_mutex_t> mu2(s_futex_map_mutex);
    if (--simu_futex.ref == 0) {
134
        s_futex_map->erase(addr1);
135 136 137
    }
    mu2.unlock();
    return nwakedup;
138 139 140 141 142
}

} // namespace bthread

#endif