// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include <execinfo.h>
#include <gtest/gtest.h>
#include "butil/time.h"
#include "butil/macros.h"
#include "butil/logging.h"
#include "butil/logging.h"
#include "butil/gperftools_profiler.h"
#include "bthread/bthread.h"
#include "bthread/unstable.h"
#include "bthread/task_meta.h"

namespace {
class BthreadTest : public ::testing::Test{
protected:
    BthreadTest(){
        const int kNumCores = sysconf(_SC_NPROCESSORS_ONLN);
        if (kNumCores > 0) {
            bthread_setconcurrency(kNumCores);
        }
    };
    virtual ~BthreadTest(){};
    virtual void SetUp() {
    };
    virtual void TearDown() {
    };
};

TEST_F(BthreadTest, sizeof_task_meta) {
    LOG(INFO) << "sizeof(TaskMeta)=" << sizeof(bthread::TaskMeta);
}

void* unrelated_pthread(void*) {
    LOG(INFO) << "I did not call any bthread function, "
        "I should begin and end without any problem";
    return (void*)(intptr_t)1;
}

TEST_F(BthreadTest, unrelated_pthread) {
    pthread_t th;
    ASSERT_EQ(0, pthread_create(&th, NULL, unrelated_pthread, NULL));
    void* ret = NULL;
    ASSERT_EQ(0, pthread_join(th, &ret));
    ASSERT_EQ(1, (intptr_t)ret);
}

TEST_F(BthreadTest, attr_init_and_destroy) {
    bthread_attr_t attr;
    ASSERT_EQ(0, bthread_attr_init(&attr));
    ASSERT_EQ(0, bthread_attr_destroy(&attr));
}

bthread_fcontext_t fcm;
bthread_fcontext_t fc;
typedef std::pair<int,int> pair_t;
static void f(intptr_t param) {
    pair_t* p = (pair_t*)param;
    p = (pair_t*)bthread_jump_fcontext(&fc, fcm, (intptr_t)(p->first+p->second));
    bthread_jump_fcontext(&fc, fcm, (intptr_t)(p->first+p->second));
}

TEST_F(BthreadTest, context_sanity) {
    fcm = NULL;
    std::size_t size(8192);
    void* sp = malloc(size);

    pair_t p(std::make_pair(2, 7));
    fc = bthread_make_fcontext((char*)sp + size, size, f);

    int res = (int)bthread_jump_fcontext(&fcm, fc, (intptr_t)&p);
    std::cout << p.first << " + " << p.second << " == " << res << std::endl;

    p = std::make_pair(5, 6);
    res = (int)bthread_jump_fcontext(&fcm, fc, (intptr_t)&p);
    std::cout << p.first << " + " << p.second << " == " << res << std::endl;
}

TEST_F(BthreadTest, call_bthread_functions_before_tls_created) {
    ASSERT_EQ(0, bthread_usleep(1000));
    ASSERT_EQ(EINVAL, bthread_join(0, NULL));
    ASSERT_EQ(0UL, bthread_self());
}

butil::atomic<bool> stop(false);

void* sleep_for_awhile(void* arg) {
    LOG(INFO) << "sleep_for_awhile(" << arg << ")";
    bthread_usleep(100000L);
    LOG(INFO) << "sleep_for_awhile(" << arg << ") wakes up";
    return NULL;
}

void* just_exit(void* arg) {
    LOG(INFO) << "just_exit(" << arg << ")";
    bthread_exit(NULL);
    EXPECT_TRUE(false) << "just_exit(" << arg << ") should never be here";
    return NULL;
}

void* repeated_sleep(void* arg) {
    for (size_t i = 0; !stop; ++i) {
        LOG(INFO) << "repeated_sleep(" << arg << ") i=" << i;
        bthread_usleep(1000000L);
    }
    return NULL;
}

void* spin_and_log(void* arg) {
    // This thread never yields CPU.
    butil::EveryManyUS every_1s(1000000L);
    size_t i = 0;
    while (!stop) {
        if (every_1s) {
            LOG(INFO) << "spin_and_log(" << arg << ")=" << i++;
        }
    }
    return NULL;
}

void* do_nothing(void* arg) {
    LOG(INFO) << "do_nothing(" << arg << ")";
    return NULL;
}

void* launcher(void* arg) {
    LOG(INFO) << "launcher(" << arg << ")";
    for (size_t i = 0; !stop; ++i) {
        bthread_t th;
        bthread_start_urgent(&th, NULL, do_nothing, (void*)i);
        bthread_usleep(1000000L);
    }
    return NULL;
}

void* stopper(void*) {
    // Need this thread to set `stop' to true. Reason: If spin_and_log (which
    // never yields CPU) is scheduled to main thread, main thread cannot get
    // to run again.
    bthread_usleep(5*1000000L);
    LOG(INFO) << "about to stop";
    stop = true;
    return NULL;
}

void* misc(void* arg) {
    LOG(INFO) << "misc(" << arg << ")";
    bthread_t th[8];
    EXPECT_EQ(0, bthread_start_urgent(&th[0], NULL, sleep_for_awhile, (void*)2));
    EXPECT_EQ(0, bthread_start_urgent(&th[1], NULL, just_exit, (void*)3));
    EXPECT_EQ(0, bthread_start_urgent(&th[2], NULL, repeated_sleep, (void*)4));
    EXPECT_EQ(0, bthread_start_urgent(&th[3], NULL, repeated_sleep, (void*)68));
    EXPECT_EQ(0, bthread_start_urgent(&th[4], NULL, spin_and_log, (void*)5));
    EXPECT_EQ(0, bthread_start_urgent(&th[5], NULL, spin_and_log, (void*)85));
    EXPECT_EQ(0, bthread_start_urgent(&th[6], NULL, launcher, (void*)6));
    EXPECT_EQ(0, bthread_start_urgent(&th[7], NULL, stopper, NULL));
    for (size_t i = 0; i < ARRAY_SIZE(th); ++i) {
        EXPECT_EQ(0, bthread_join(th[i], NULL));
    }
    return NULL;
}

TEST_F(BthreadTest, sanity) {
    LOG(INFO) << "main thread " << pthread_self();
    bthread_t th1;
    ASSERT_EQ(0, bthread_start_urgent(&th1, NULL, misc, (void*)1));
    LOG(INFO) << "back to main thread " << th1 << " " << pthread_self();
    ASSERT_EQ(0, bthread_join(th1, NULL));
}

const size_t BT_SIZE = 64;
void *bt_array[BT_SIZE];
int bt_cnt;

int do_bt (void) {
    bt_cnt = backtrace (bt_array, BT_SIZE);
    return 56;
}

int call_do_bt (void) {
    return do_bt () + 1;
}

void * tf (void*) {
    if (call_do_bt () != 57) {
        return (void *) 1L;
    }
    return NULL;
}

TEST_F(BthreadTest, backtrace) {
    bthread_t th;
    ASSERT_EQ(0, bthread_start_urgent(&th, NULL, tf, NULL));
    ASSERT_EQ(0, bthread_join (th, NULL));

    char **text = backtrace_symbols (bt_array, bt_cnt);
    ASSERT_TRUE(text);
    for (int i = 0; i < bt_cnt; ++i) {
        puts(text[i]);
    }
}

void* show_self(void*) {
    EXPECT_NE(0ul, bthread_self());
    LOG(INFO) << "bthread_self=" << bthread_self();
    return NULL;
}

TEST_F(BthreadTest, bthread_self) {
    ASSERT_EQ(0ul, bthread_self());
    bthread_t bth;
    ASSERT_EQ(0, bthread_start_urgent(&bth, NULL, show_self, NULL));
    ASSERT_EQ(0, bthread_join(bth, NULL));
}

void* join_self(void*) {
    EXPECT_EQ(EINVAL, bthread_join(bthread_self(), NULL));
    return NULL;
}

TEST_F(BthreadTest, bthread_join) {
    // Invalid tid
    ASSERT_EQ(EINVAL, bthread_join(0, NULL));
    
    // Unexisting tid
    ASSERT_EQ(EINVAL, bthread_join((bthread_t)-1, NULL));

    // Joining self
    bthread_t th;
    ASSERT_EQ(0, bthread_start_urgent(&th, NULL, join_self, NULL));
}

void* change_errno(void* arg) {
    errno = (intptr_t)arg;
    return NULL;
}

TEST_F(BthreadTest, errno_not_changed) {
    bthread_t th;
    errno = 1;
    bthread_start_urgent(&th, NULL, change_errno, (void*)(intptr_t)2);
    ASSERT_EQ(1, errno);
}

static long sleep_in_adding_func = 0;

void* adding_func(void* arg) {
    butil::atomic<size_t>* s = (butil::atomic<size_t>*)arg;
    if (sleep_in_adding_func > 0) {
        long t1 = 0;
        if (10000 == s->fetch_add(1)) {
            t1 = butil::cpuwide_time_us();
        }
        bthread_usleep(sleep_in_adding_func);
        if (t1) {
            LOG(INFO) << "elapse is " << butil::cpuwide_time_us() - t1 << "ns";
        }
    } else {
        s->fetch_add(1);
    }
    return NULL;
}

TEST_F(BthreadTest, small_threads) {
    for (size_t z = 0; z < 2; ++z) {
        sleep_in_adding_func = (z ? 1 : 0);
        char prof_name[32];
        if (sleep_in_adding_func) {
            snprintf(prof_name, sizeof(prof_name), "smallthread.prof");
        } else {
            snprintf(prof_name, sizeof(prof_name), "smallthread_nosleep.prof");
        }

        butil::atomic<size_t> s(0);
        size_t N = (sleep_in_adding_func ? 40000 : 100000);
        std::vector<bthread_t> th;
        th.reserve(N);
        butil::Timer tm;
        for (size_t j = 0; j < 3; ++j) {
            th.clear();
            if (j == 1) {
                ProfilerStart(prof_name);
            }
            tm.start();
            for (size_t i = 0; i < N; ++i) {
                bthread_t t1;
                ASSERT_EQ(0, bthread_start_urgent(
                              &t1, &BTHREAD_ATTR_SMALL, adding_func, &s));
                th.push_back(t1);
            }
            tm.stop();
            if (j == 1) {
                ProfilerStop();
            }
            for (size_t i = 0; i < N; ++i) {
                bthread_join(th[i], NULL);
            }
            LOG(INFO) << "[Round " << j + 1 << "] bthread_start_urgent takes "
                      << tm.n_elapsed()/N << "ns, sum=" << s;
            ASSERT_EQ(N * (j + 1), (size_t)s);
        
            // Check uniqueness of th
            std::sort(th.begin(), th.end());
            ASSERT_EQ(th.end(), std::unique(th.begin(), th.end()));
        }
    }
}

void* bthread_starter(void* void_counter) {
    while (!stop.load(butil::memory_order_relaxed)) {
        bthread_t th;
        EXPECT_EQ(0, bthread_start_urgent(&th, NULL, adding_func, void_counter));
    }
    return NULL;
}

struct BAIDU_CACHELINE_ALIGNMENT AlignedCounter {
    AlignedCounter() : value(0) {}
    butil::atomic<size_t> value;
};

TEST_F(BthreadTest, start_bthreads_frequently) {
    sleep_in_adding_func = 0;
    char prof_name[32];
    snprintf(prof_name, sizeof(prof_name), "start_bthreads_frequently.prof");
    const int con = bthread_getconcurrency();
    ASSERT_GT(con, 0);
    AlignedCounter* counters = new AlignedCounter[con];
    bthread_t th[con];

    std::cout << "Perf with different parameters..." << std::endl;
    //ProfilerStart(prof_name);
    for (int cur_con = 1; cur_con <= con; ++cur_con) {
        stop = false;
        for (int i = 0; i < cur_con; ++i) {
            counters[i].value = 0;
            ASSERT_EQ(0, bthread_start_urgent(
                          &th[i], NULL, bthread_starter, &counters[i].value));
        }
        butil::Timer tm;
        tm.start();
        bthread_usleep(200000L);
        stop = true;
        for (int i = 0; i < cur_con; ++i) {
            bthread_join(th[i], NULL);
        }
        tm.stop();
        size_t sum = 0;
        for (int i = 0; i < cur_con; ++i) {
            sum += counters[i].value * 1000 / tm.m_elapsed();
        }
        std::cout << sum << ",";
    }
    std::cout << std::endl;
    //ProfilerStop();
    delete [] counters;
}

void* log_start_latency(void* void_arg) {
    butil::Timer* tm = static_cast<butil::Timer*>(void_arg);
    tm->stop();
    return NULL;
}

TEST_F(BthreadTest, start_latency_when_high_idle) {
    bool warmup = true;
    long elp1 = 0;
    long elp2 = 0;
    int REP = 0;
    for (int i = 0; i < 10000; ++i) {
        butil::Timer tm;
        tm.start();
        bthread_t th;
        bthread_start_urgent(&th, NULL, log_start_latency, &tm);
        bthread_join(th, NULL);
        bthread_t th2;
        butil::Timer tm2;
        tm2.start();
        bthread_start_background(&th2, NULL, log_start_latency, &tm2);
        bthread_join(th2, NULL);
        if (!warmup) {
            ++REP;
            elp1 += tm.n_elapsed();
            elp2 += tm2.n_elapsed();
        } else if (i == 100) {
            warmup = false;
        }
    }
    LOG(INFO) << "start_urgent=" << elp1 / REP << "ns start_background="
              << elp2 / REP << "ns";
}

void* sleep_for_awhile_with_sleep(void* arg) {
    bthread_usleep((intptr_t)arg);
    return NULL;
}

TEST_F(BthreadTest, stop_sleep) {
    bthread_t th;
    ASSERT_EQ(0, bthread_start_urgent(
                  &th, NULL, sleep_for_awhile_with_sleep, (void*)1000000L));
    butil::Timer tm;
    tm.start();
    bthread_usleep(10000);
    ASSERT_EQ(0, bthread_stop(th));
    ASSERT_EQ(0, bthread_join(th, NULL));
    tm.stop();
    ASSERT_LE(labs(tm.m_elapsed() - 10), 10);
}

TEST_F(BthreadTest, bthread_exit) {
    bthread_t th1;
    bthread_t th2;
    pthread_t th3;
    bthread_t th4;
    bthread_t th5;
    const bthread_attr_t attr = BTHREAD_ATTR_PTHREAD;

    ASSERT_EQ(0, bthread_start_urgent(&th1, NULL, just_exit, NULL));
    ASSERT_EQ(0, bthread_start_background(&th2, NULL, just_exit, NULL));
    ASSERT_EQ(0, pthread_create(&th3, NULL, just_exit, NULL));
    EXPECT_EQ(0, bthread_start_urgent(&th4, &attr, just_exit, NULL));
    EXPECT_EQ(0, bthread_start_background(&th5, &attr, just_exit, NULL));

    ASSERT_EQ(0, bthread_join(th1, NULL));
    ASSERT_EQ(0, bthread_join(th2, NULL));
    ASSERT_EQ(0, pthread_join(th3, NULL));
    ASSERT_EQ(0, bthread_join(th4, NULL));
    ASSERT_EQ(0, bthread_join(th5, NULL));
}

TEST_F(BthreadTest, bthread_equal) {
    bthread_t th1;
    ASSERT_EQ(0, bthread_start_urgent(&th1, NULL, do_nothing, NULL));
    bthread_t th2;
    ASSERT_EQ(0, bthread_start_urgent(&th2, NULL, do_nothing, NULL));
    ASSERT_EQ(0, bthread_equal(th1, th2));
    bthread_t th3 = th2;
    ASSERT_EQ(1, bthread_equal(th3, th2));
    ASSERT_EQ(0, bthread_join(th1, NULL));
    ASSERT_EQ(0, bthread_join(th2, NULL));
}

void* mark_run(void* run) {
    *static_cast<pthread_t*>(run) = pthread_self();
    return NULL;
}

void* check_sleep(void* pthread_task) {
    EXPECT_TRUE(bthread_self() != 0);
    // Create a no-signal task that other worker will not steal. The task will be
    // run if current bthread does context switch.
    bthread_attr_t attr = BTHREAD_ATTR_NORMAL | BTHREAD_NOSIGNAL;
    bthread_t th1;
    pthread_t run = 0;
    const pthread_t pid = pthread_self();
    EXPECT_EQ(0, bthread_start_urgent(&th1, &attr, mark_run, &run));
    if (pthread_task) {
        bthread_usleep(100000L);
        // due to NOSIGNAL, mark_run did not run.
        // FIXME: actually runs. someone is still stealing.
        // EXPECT_EQ((pthread_t)0, run);
        // bthread_usleep = usleep for BTHREAD_ATTR_PTHREAD
        EXPECT_EQ(pid, pthread_self());
        // schedule mark_run
        bthread_flush();
    } else {
        // start_urgent should jump to the new thread first, then back to
        // current thread.
        EXPECT_EQ(pid, run);             // should run in the same pthread
    }
    EXPECT_EQ(0, bthread_join(th1, NULL));
    if (pthread_task) {
        EXPECT_EQ(pid, pthread_self());
        EXPECT_NE((pthread_t)0, run); // the mark_run should run.
    }
    return NULL;
}

TEST_F(BthreadTest, bthread_usleep) {
    // NOTE: May fail because worker threads may still be stealing tasks
    // after previous cases.
    usleep(10000);
    
    bthread_t th1;
    ASSERT_EQ(0, bthread_start_urgent(&th1, &BTHREAD_ATTR_PTHREAD,
                                      check_sleep, (void*)1));
    ASSERT_EQ(0, bthread_join(th1, NULL));
    
    bthread_t th2;
    ASSERT_EQ(0, bthread_start_urgent(&th2, NULL,
                                      check_sleep, (void*)0));
    ASSERT_EQ(0, bthread_join(th2, NULL));
}

void* dummy_thread(void*) {
    return NULL;
}

TEST_F(BthreadTest, too_many_nosignal_threads) {
    for (size_t i = 0; i < 100000; ++i) {
        bthread_attr_t attr = BTHREAD_ATTR_NORMAL | BTHREAD_NOSIGNAL;
        bthread_t tid;
        ASSERT_EQ(0, bthread_start_urgent(&tid, &attr, dummy_thread, NULL));
    }
}

static void* yield_thread(void*) {
    bthread_yield();
    return NULL;
}

TEST_F(BthreadTest, yield_single_thread) {
    bthread_t tid;
    ASSERT_EQ(0, bthread_start_background(&tid, NULL, yield_thread, NULL));
    ASSERT_EQ(0, bthread_join(tid, NULL));
}

} // namespace