// Copyright (c) 2014 Baidu, Inc.
// Author: Ge,Jun (gejun@baidu.com)

#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <signal.h>
#include <gtest/gtest.h>
#include "butil/time.h"
#include "butil/macros.h"
#include "butil/errno.h"
#include <syscall.h>
#include <limits.h>                            // INT_MAX
#include "butil/atomicops.h"
#include "bthread/bthread.h"
#include <bthread/sys_futex.h>
#include <bthread/processor.h>

namespace {
volatile bool stop = false;

butil::atomic<int> nthread(0);

void* read_thread(void* arg) {
    butil::atomic<int>* m = (butil::atomic<int>*)arg;
    int njob = 0;
    while (!stop) {
        int x;
        while (!stop && (x = *m) != 0) {
            if (x > 0) {
                while ((x = m->fetch_sub(1)) > 0) {
                    ++njob;
                    const long start = butil::cpuwide_time_ns();
                    while (butil::cpuwide_time_ns() < start + 10000) {
                    }
                    if (stop) {
                        return new int(njob);
                    }
                }
                m->fetch_add(1);
            } else {
                cpu_relax();
            }
        }

        ++nthread;
        bthread::futex_wait_private(m/*lock1*/, 0/*consumed_njob*/, NULL);
        --nthread;
    }
    return new int(njob);
}

TEST(FutexTest, rdlock_performance) {
    const size_t N = 100000;
    butil::atomic<int> lock1(0);
    pthread_t rth[8];
    for (size_t i = 0; i < ARRAY_SIZE(rth); ++i) {
        ASSERT_EQ(0, pthread_create(&rth[i], NULL, read_thread, &lock1));
    }

    const size_t t1 = butil::cpuwide_time_ns();
    for (size_t i = 0; i < N; ++i) {
        if (nthread) {
            lock1.fetch_add(1);
            bthread::futex_wake_private(&lock1, 1);
        } else {
            lock1.fetch_add(1);
            if (nthread) {
                bthread::futex_wake_private(&lock1, 1);
            }
        }
    }
    const size_t t2 = butil::cpuwide_time_ns();

    bthread_usleep(3000000);
    stop = true;
    for (int i = 0; i < 10; ++i) {
        bthread::futex_wake_private(&lock1, INT_MAX);
        sched_yield();
    }

    int njob = 0;
    int* res;
    for (size_t i = 0; i < ARRAY_SIZE(rth); ++i) {
        pthread_join(rth[i], (void**)&res);
        njob += *res;
        delete res;
    }
    printf("wake %lu times, %ldns each, lock1=%d njob=%d\n",
           N, (t2-t1)/N, lock1.load(), njob);
    ASSERT_EQ(N, (size_t)(lock1.load() + njob));
}

TEST(FutexTest, futex_wake_before_wait) {
    int lock1 = 0;
    timespec timeout = { 1, 0 };
    ASSERT_EQ(0, bthread::futex_wake_private(&lock1, INT_MAX));
    ASSERT_EQ(-1, bthread::futex_wait_private(&lock1, 0, &timeout));
    ASSERT_EQ(ETIMEDOUT, errno);
}

void* dummy_waiter(void* lock) {
    bthread::futex_wait_private(lock, 0, NULL);
    return NULL;
}

TEST(FutexTest, futex_wake_many_waiters_perf) {
    
    int lock1 = 0;
    size_t N = 0;
    pthread_t th;
    for (; N < 1000 && !pthread_create(&th, NULL, dummy_waiter, &lock1); ++N) {}
    
    sleep(1);
    int nwakeup = 0;
    butil::Timer tm;
    tm.start();
    for (size_t i = 0; i < N; ++i) {
        nwakeup += bthread::futex_wake_private(&lock1, 1);
    }
    tm.stop();
    printf("N=%lu, futex_wake a thread = %ldns\n", N, tm.n_elapsed() / N);
    ASSERT_EQ(N, (size_t)nwakeup);

    const size_t REP = 10000;
    nwakeup = 0;
    tm.start();
    for (size_t i = 0; i < REP; ++i) {
        nwakeup += bthread::futex_wake_private(&lock1, 1);
    }
    tm.stop();
    ASSERT_EQ(0, nwakeup);
    printf("futex_wake nop = %ldns\n", tm.n_elapsed() / REP);
}

butil::atomic<int> nevent(0);

void* waker(void* lock) {
    bthread_usleep(10000);
    const size_t REP = 100000;
    int nwakeup = 0;
    butil::Timer tm;
    tm.start();
    for (size_t i = 0; i < REP; ++i) {
        nwakeup += bthread::futex_wake_private(lock, 1);
    }
    tm.stop();
    EXPECT_EQ(0, nwakeup);
    printf("futex_wake nop = %ldns\n", tm.n_elapsed() / REP);
    return NULL;
} 

void* batch_waker(void* lock) {
    bthread_usleep(10000);
    const size_t REP = 100000;
    int nwakeup = 0;
    butil::Timer tm;
    tm.start();
    for (size_t i = 0; i < REP; ++i) {
        if (nevent.fetch_add(1, butil::memory_order_relaxed) == 0) {
            nwakeup += bthread::futex_wake_private(lock, 1);
            int expected = 1;
            while (1) {
                int last_expected = expected;
                if (nevent.compare_exchange_strong(expected, 0, butil::memory_order_relaxed)) {
                    break;
                }
                nwakeup += bthread::futex_wake_private(lock, expected - last_expected);
            }
        }
    }
    tm.stop();
    EXPECT_EQ(0, nwakeup);
    printf("futex_wake nop = %ldns\n", tm.n_elapsed() / REP);
    return NULL;
} 

TEST(FutexTest, many_futex_wake_nop_perf) {
    pthread_t th[8];
    int lock1;
    std::cout << "[Direct wake]" << std::endl;
    for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
        ASSERT_EQ(0, pthread_create(&th[i], NULL, waker, &lock1));
    }
    for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
        ASSERT_EQ(0, pthread_join(th[i], NULL));
    }
    std::cout << "[Batch wake]" << std::endl;
    for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
        ASSERT_EQ(0, pthread_create(&th[i], NULL, batch_waker, &lock1));
    }
    for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
        ASSERT_EQ(0, pthread_join(th[i], NULL));
    }
}
} // namespace