Commit 68190988 authored by zhujiashun's avatar zhujiashun

add life cycle management of simuFutex

parent 136b0408
...@@ -57,6 +57,8 @@ EventDispatcher::EventDispatcher() ...@@ -57,6 +57,8 @@ EventDispatcher::EventDispatcher()
PLOG(FATAL) << "Fail to create kqueue"; PLOG(FATAL) << "Fail to create kqueue";
return; return;
} }
#else
#error Not implemented
#endif #endif
CHECK_EQ(0, butil::make_close_on_exec(_epfd)); CHECK_EQ(0, butil::make_close_on_exec(_epfd));
......
...@@ -470,11 +470,9 @@ int pthread_fd_wait(int fd, unsigned events, ...@@ -470,11 +470,9 @@ int pthread_fd_wait(int fd, unsigned events,
diff_ms = (abstime_us - now_us + 999L) / 1000L; diff_ms = (abstime_us - now_us + 999L) / 1000L;
} }
#if defined(OS_LINUX) #if defined(OS_LINUX)
const short poll_events = const short poll_events = bthread::epoll_to_poll_events(events);
bthread::epoll_to_poll_events(events);
#elif defined(OS_MACOSX) #elif defined(OS_MACOSX)
const short poll_events = const short poll_events = bthread::kqueue_to_poll_events(events);
bthread::kqueue_to_poll_events(events);
#endif #endif
if (poll_events == 0) { if (poll_events == 0) {
errno = EINVAL; errno = EINVAL;
......
...@@ -19,8 +19,8 @@ ...@@ -19,8 +19,8 @@
#include "bthread/sys_futex.h" #include "bthread/sys_futex.h"
#include "butil/scoped_lock.h" #include "butil/scoped_lock.h"
#include "butil/atomicops.h" #include "butil/atomicops.h"
#include <map>
#include <pthread.h> #include <pthread.h>
#include <unordered_map>
#if defined(OS_MACOSX) #if defined(OS_MACOSX)
...@@ -28,8 +28,8 @@ namespace bthread { ...@@ -28,8 +28,8 @@ namespace bthread {
class SimuFutex { class SimuFutex {
public: public:
SimuFutex() : SimuFutex() : counts(0)
counts(0) { , ref(0) {
pthread_mutex_init(&lock, NULL); pthread_mutex_init(&lock, NULL);
pthread_cond_init(&cond, NULL); pthread_cond_init(&cond, NULL);
} }
...@@ -41,61 +41,82 @@ public: ...@@ -41,61 +41,82 @@ public:
public: public:
pthread_mutex_t lock; pthread_mutex_t lock;
pthread_cond_t cond; pthread_cond_t cond;
butil::atomic<int32_t> counts; int32_t counts;
int32_t ref;
}; };
// TODO: use a more efficient way. Current impl doesn't delete SimuFutex at all. static std::unordered_map<void*, SimuFutex> s_futex_map;
static std::map<void*, SimuFutex> s_futex_map;
static pthread_mutex_t s_futex_map_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t s_futex_map_mutex = PTHREAD_MUTEX_INITIALIZER;
int futex_wait_private(void* addr1, int expected, const timespec* timeout) { int futex_wait_private(void* addr1, int expected, const timespec* timeout) {
std::unique_lock<pthread_mutex_t> mu(s_futex_map_mutex); std::unique_lock<pthread_mutex_t> mu(s_futex_map_mutex);
SimuFutex& simu_futex = s_futex_map[addr1]; SimuFutex& simu_futex = s_futex_map[addr1];
++simu_futex.ref;
mu.unlock(); mu.unlock();
int rc = 0;
{
std::unique_lock<pthread_mutex_t> mu1(simu_futex.lock); std::unique_lock<pthread_mutex_t> mu1(simu_futex.lock);
if (static_cast<butil::atomic<int>*>(addr1)->load() == expected) { if (static_cast<butil::atomic<int>*>(addr1)->load() == expected) {
int rc = 0;
++simu_futex.counts; ++simu_futex.counts;
if (timeout) { if (timeout) {
timespec timeout_abs = butil::timespec_from_now(*timeout); timespec timeout_abs = butil::timespec_from_now(*timeout);
if ((rc = pthread_cond_timedwait(&simu_futex.cond, &simu_futex.lock, &timeout_abs)) != 0) { if ((rc = pthread_cond_timedwait(&simu_futex.cond, &simu_futex.lock, &timeout_abs)) != 0) {
errno = rc; errno = rc;
return -1; rc = -1;
} }
} else { } else {
if ((rc = pthread_cond_wait(&simu_futex.cond, &simu_futex.lock)) != 0) { if ((rc = pthread_cond_wait(&simu_futex.cond, &simu_futex.lock)) != 0) {
errno = rc; errno = rc;
return -1; rc = -1;
} }
} }
--simu_futex.counts; --simu_futex.counts;
} else {
errno = EAGAIN;
rc = -1;
} }
return 0; }
std::unique_lock<pthread_mutex_t> mu1(s_futex_map_mutex);
if (--simu_futex.ref == 0) {
s_futex_map.erase(addr1);
}
mu1.unlock();
return rc;
} }
int futex_wake_private(void* addr1, int nwake) { int futex_wake_private(void* addr1, int nwake) {
std::unique_lock<pthread_mutex_t> mu(s_futex_map_mutex); std::unique_lock<pthread_mutex_t> mu(s_futex_map_mutex);
SimuFutex& simu_futex = s_futex_map[addr1]; auto it = s_futex_map.find(addr1);
if (it == s_futex_map.end()) {
return 0;
}
SimuFutex& simu_futex = it->second;
++simu_futex.ref;
mu.unlock(); mu.unlock();
std::unique_lock<pthread_mutex_t> mu1(simu_futex.lock);
nwake = (nwake < simu_futex.counts)? nwake: simu_futex.counts.load();
int nwakedup = 0; int nwakedup = 0;
int rc = 0; int rc = 0;
{
std::unique_lock<pthread_mutex_t> mu1(simu_futex.lock);
nwake = (nwake < simu_futex.counts)? nwake: simu_futex.counts;
for (int i = 0; i < nwake; ++i) { for (int i = 0; i < nwake; ++i) {
if ((rc = pthread_cond_signal(&simu_futex.cond)) != 0) { if ((rc = pthread_cond_signal(&simu_futex.cond)) != 0) {
errno = rc; errno = rc;
return -1; break;
} } else {
++nwakedup; ++nwakedup;
} }
return nwakedup; }
} }
int futex_requeue_private(void* addr1, int nwake, void* addr2) { std::unique_lock<pthread_mutex_t> mu2(s_futex_map_mutex);
// TODO if (--simu_futex.ref == 0) {
return -1; s_futex_map.erase(addr1);
}
mu2.unlock();
return nwakedup;
} }
} // namespace bthread } // namespace bthread
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment