bthread_futex_unittest.cpp 5.37 KB
Newer Older
gejun's avatar
gejun committed
1
// Copyright (c) 2014 Baidu, Inc.
gejun's avatar
gejun committed
2 3 4 5 6 7 8
// Author: Ge,Jun (gejun@baidu.com)

#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <signal.h>
#include <gtest/gtest.h>
9 10 11
#include "butil/time.h"
#include "butil/macros.h"
#include "butil/errno.h"
gejun's avatar
gejun committed
12
#include <limits.h>                            // INT_MAX
13
#include "butil/atomicops.h"
gejun's avatar
gejun committed
14 15 16 17 18 19 20
#include "bthread/bthread.h"
#include <bthread/sys_futex.h>
#include <bthread/processor.h>

namespace {
volatile bool stop = false;

21
butil::atomic<int> nthread(0);
gejun's avatar
gejun committed
22 23

void* read_thread(void* arg) {
24
    butil::atomic<int>* m = (butil::atomic<int>*)arg;
gejun's avatar
gejun committed
25 26 27 28 29 30 31
    int njob = 0;
    while (!stop) {
        int x;
        while (!stop && (x = *m) != 0) {
            if (x > 0) {
                while ((x = m->fetch_sub(1)) > 0) {
                    ++njob;
32 33
                    const long start = butil::cpuwide_time_ns();
                    while (butil::cpuwide_time_ns() < start + 10000) {
gejun's avatar
gejun committed
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
                    }
                    if (stop) {
                        return new int(njob);
                    }
                }
                m->fetch_add(1);
            } else {
                cpu_relax();
            }
        }

        ++nthread;
        bthread::futex_wait_private(m/*lock1*/, 0/*consumed_njob*/, NULL);
        --nthread;
    }
    return new int(njob);
}

TEST(FutexTest, rdlock_performance) {
    const size_t N = 100000;
54
    butil::atomic<int> lock1(0);
gejun's avatar
gejun committed
55 56 57 58 59
    pthread_t rth[8];
    for (size_t i = 0; i < ARRAY_SIZE(rth); ++i) {
        ASSERT_EQ(0, pthread_create(&rth[i], NULL, read_thread, &lock1));
    }

gejun's avatar
gejun committed
60
    const int64_t t1 = butil::cpuwide_time_ns();
gejun's avatar
gejun committed
61 62 63 64 65 66 67 68 69 70 71
    for (size_t i = 0; i < N; ++i) {
        if (nthread) {
            lock1.fetch_add(1);
            bthread::futex_wake_private(&lock1, 1);
        } else {
            lock1.fetch_add(1);
            if (nthread) {
                bthread::futex_wake_private(&lock1, 1);
            }
        }
    }
gejun's avatar
gejun committed
72
    const int64_t t2 = butil::cpuwide_time_ns();
gejun's avatar
gejun committed
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87

    bthread_usleep(3000000);
    stop = true;
    for (int i = 0; i < 10; ++i) {
        bthread::futex_wake_private(&lock1, INT_MAX);
        sched_yield();
    }

    int njob = 0;
    int* res;
    for (size_t i = 0; i < ARRAY_SIZE(rth); ++i) {
        pthread_join(rth[i], (void**)&res);
        njob += *res;
        delete res;
    }
gejun's avatar
gejun committed
88
    printf("wake %lu times, %" PRId64 "ns each, lock1=%d njob=%d\n",
gejun's avatar
gejun committed
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
           N, (t2-t1)/N, lock1.load(), njob);
    ASSERT_EQ(N, (size_t)(lock1.load() + njob));
}

TEST(FutexTest, futex_wake_before_wait) {
    int lock1 = 0;
    timespec timeout = { 1, 0 };
    ASSERT_EQ(0, bthread::futex_wake_private(&lock1, INT_MAX));
    ASSERT_EQ(-1, bthread::futex_wait_private(&lock1, 0, &timeout));
    ASSERT_EQ(ETIMEDOUT, errno);
}

void* dummy_waiter(void* lock) {
    bthread::futex_wait_private(lock, 0, NULL);
    return NULL;
}

TEST(FutexTest, futex_wake_many_waiters_perf) {
    
    int lock1 = 0;
    size_t N = 0;
    pthread_t th;
    for (; N < 1000 && !pthread_create(&th, NULL, dummy_waiter, &lock1); ++N) {}
    
    sleep(1);
    int nwakeup = 0;
115
    butil::Timer tm;
gejun's avatar
gejun committed
116 117 118 119 120
    tm.start();
    for (size_t i = 0; i < N; ++i) {
        nwakeup += bthread::futex_wake_private(&lock1, 1);
    }
    tm.stop();
gejun's avatar
gejun committed
121
    printf("N=%lu, futex_wake a thread = %" PRId64 "ns\n", N, tm.n_elapsed() / N);
gejun's avatar
gejun committed
122 123 124 125 126 127 128 129 130 131
    ASSERT_EQ(N, (size_t)nwakeup);

    const size_t REP = 10000;
    nwakeup = 0;
    tm.start();
    for (size_t i = 0; i < REP; ++i) {
        nwakeup += bthread::futex_wake_private(&lock1, 1);
    }
    tm.stop();
    ASSERT_EQ(0, nwakeup);
gejun's avatar
gejun committed
132
    printf("futex_wake nop = %" PRId64 "ns\n", tm.n_elapsed() / REP);
gejun's avatar
gejun committed
133 134
}

135
butil::atomic<int> nevent(0);
gejun's avatar
gejun committed
136 137 138 139 140

void* waker(void* lock) {
    bthread_usleep(10000);
    const size_t REP = 100000;
    int nwakeup = 0;
141
    butil::Timer tm;
gejun's avatar
gejun committed
142 143 144 145 146 147
    tm.start();
    for (size_t i = 0; i < REP; ++i) {
        nwakeup += bthread::futex_wake_private(lock, 1);
    }
    tm.stop();
    EXPECT_EQ(0, nwakeup);
gejun's avatar
gejun committed
148
    printf("futex_wake nop = %" PRId64 "ns\n", tm.n_elapsed() / REP);
gejun's avatar
gejun committed
149 150 151 152 153 154 155
    return NULL;
} 

void* batch_waker(void* lock) {
    bthread_usleep(10000);
    const size_t REP = 100000;
    int nwakeup = 0;
156
    butil::Timer tm;
gejun's avatar
gejun committed
157 158
    tm.start();
    for (size_t i = 0; i < REP; ++i) {
159
        if (nevent.fetch_add(1, butil::memory_order_relaxed) == 0) {
gejun's avatar
gejun committed
160 161 162 163
            nwakeup += bthread::futex_wake_private(lock, 1);
            int expected = 1;
            while (1) {
                int last_expected = expected;
164
                if (nevent.compare_exchange_strong(expected, 0, butil::memory_order_relaxed)) {
gejun's avatar
gejun committed
165 166 167 168 169 170 171 172
                    break;
                }
                nwakeup += bthread::futex_wake_private(lock, expected - last_expected);
            }
        }
    }
    tm.stop();
    EXPECT_EQ(0, nwakeup);
gejun's avatar
gejun committed
173
    printf("futex_wake nop = %" PRId64 "ns\n", tm.n_elapsed() / REP);
gejun's avatar
gejun committed
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
    return NULL;
} 

TEST(FutexTest, many_futex_wake_nop_perf) {
    pthread_t th[8];
    int lock1;
    std::cout << "[Direct wake]" << std::endl;
    for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
        ASSERT_EQ(0, pthread_create(&th[i], NULL, waker, &lock1));
    }
    for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
        ASSERT_EQ(0, pthread_join(th[i], NULL));
    }
    std::cout << "[Batch wake]" << std::endl;
    for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
        ASSERT_EQ(0, pthread_create(&th[i], NULL, batch_waker, &lock1));
    }
    for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
        ASSERT_EQ(0, pthread_join(th[i], NULL));
    }
}
} // namespace