// Copyright (c) 2014 Baidu, Inc. // Author: Ge,Jun (gejun@baidu.com) // Date: Sun Jul 13 15:04:18 CST 2014 #include <iostream> #include <gtest/gtest.h> #include "butil/time.h" #include "butil/macros.h" #include "bthread/bthread.h" #include "bthread/task_group.h" #include "bthread/butex.h" namespace bthread { void id_status(bthread_id_t, std::ostream &); uint32_t id_value(bthread_id_t id); } namespace { inline uint32_t get_version(bthread_id_t id) { return (uint32_t)(id.value & 0xFFFFFFFFul); } struct SignalArg { bthread_id_t id; long sleep_us_before_fight; long sleep_us_before_signal; }; void* signaller(void* void_arg) { SignalArg arg = *(SignalArg*)void_arg; bthread_usleep(arg.sleep_us_before_fight); void* data = NULL; int rc = bthread_id_trylock(arg.id, &data); if (rc == 0) { EXPECT_EQ(0xdead, *(int*)data); ++*(int*)data; //EXPECT_EQ(EBUSY, bthread_id_destroy(arg.id, ECANCELED)); bthread_usleep(arg.sleep_us_before_signal); EXPECT_EQ(0, bthread_id_unlock_and_destroy(arg.id)); return void_arg; } else { EXPECT_TRUE(EBUSY == rc || EINVAL == rc); return NULL; } } TEST(BthreadIdTest, join_after_destroy) { bthread_id_t id1; int x = 0xdead; ASSERT_EQ(0, bthread_id_create_ranged(&id1, &x, NULL, 2)); bthread_id_t id2 = { id1.value + 1 }; ASSERT_EQ(get_version(id1), bthread::id_value(id1)); ASSERT_EQ(get_version(id1), bthread::id_value(id2)); pthread_t th[8]; SignalArg args[ARRAY_SIZE(th)]; for (size_t i = 0; i < ARRAY_SIZE(th); ++i) { args[i].sleep_us_before_fight = 0; args[i].sleep_us_before_signal = 0; args[i].id = (i == 0 ? id1 : id2); ASSERT_EQ(0, pthread_create(&th[i], NULL, signaller, &args[i])); } void* ret[ARRAY_SIZE(th)]; size_t non_null_ret = 0; for (size_t i = 0; i < ARRAY_SIZE(th); ++i) { ASSERT_EQ(0, pthread_join(th[i], &ret[i])); non_null_ret += (ret[i] != NULL); } ASSERT_EQ(1UL, non_null_ret); ASSERT_EQ(0, bthread_id_join(id1)); ASSERT_EQ(0, bthread_id_join(id2)); ASSERT_EQ(0xdead + 1, x); ASSERT_EQ(get_version(id1) + 5, bthread::id_value(id1)); ASSERT_EQ(get_version(id1) + 5, bthread::id_value(id2)); } TEST(BthreadIdTest, join_before_destroy) { bthread_id_t id1; int x = 0xdead; ASSERT_EQ(0, bthread_id_create(&id1, &x, NULL)); ASSERT_EQ(get_version(id1), bthread::id_value(id1)); pthread_t th[8]; SignalArg args[ARRAY_SIZE(th)]; for (size_t i = 0; i < ARRAY_SIZE(th); ++i) { args[i].sleep_us_before_fight = 10000; args[i].sleep_us_before_signal = 0; args[i].id = id1; ASSERT_EQ(0, pthread_create(&th[i], NULL, signaller, &args[i])); } ASSERT_EQ(0, bthread_id_join(id1)); ASSERT_EQ(0xdead + 1, x); ASSERT_EQ(get_version(id1) + 4, bthread::id_value(id1)); void* ret[ARRAY_SIZE(th)]; size_t non_null_ret = 0; for (size_t i = 0; i < ARRAY_SIZE(th); ++i) { ASSERT_EQ(0, pthread_join(th[i], &ret[i])); non_null_ret += (ret[i] != NULL); } ASSERT_EQ(1UL, non_null_ret); } struct OnResetArg { bthread_id_t id; int error_code; }; int on_reset(bthread_id_t id, void* data, int error_code) { OnResetArg* arg = static_cast<OnResetArg*>(data); arg->id = id; arg->error_code = error_code; return bthread_id_unlock_and_destroy(id); } TEST(BthreadIdTest, error_is_destroy) { bthread_id_t id1; OnResetArg arg = { { 0 }, 0 }; ASSERT_EQ(0, bthread_id_create(&id1, &arg, on_reset)); ASSERT_EQ(get_version(id1), bthread::id_value(id1)); ASSERT_EQ(0, bthread_id_error(id1, EBADF)); ASSERT_EQ(EBADF, arg.error_code); ASSERT_EQ(id1.value, arg.id.value); ASSERT_EQ(get_version(id1) + 4, bthread::id_value(id1)); } TEST(BthreadIdTest, error_is_destroy_ranged) { bthread_id_t id1; OnResetArg arg = { { 0 }, 0 }; ASSERT_EQ(0, bthread_id_create_ranged(&id1, &arg, on_reset, 2)); bthread_id_t id2 = { id1.value + 1 }; ASSERT_EQ(get_version(id1), bthread::id_value(id2)); ASSERT_EQ(0, bthread_id_error(id2, EBADF)); ASSERT_EQ(EBADF, arg.error_code); ASSERT_EQ(id2.value, arg.id.value); ASSERT_EQ(get_version(id1) + 5, bthread::id_value(id2)); } TEST(BthreadIdTest, default_error_is_destroy) { bthread_id_t id1; ASSERT_EQ(0, bthread_id_create(&id1, NULL, NULL)); ASSERT_EQ(get_version(id1), bthread::id_value(id1)); ASSERT_EQ(0, bthread_id_error(id1, EBADF)); ASSERT_EQ(get_version(id1) + 4, bthread::id_value(id1)); } TEST(BthreadIdTest, doubly_destroy) { bthread_id_t id1; ASSERT_EQ(0, bthread_id_create_ranged(&id1, NULL, NULL, 2)); bthread_id_t id2 = { id1.value + 1 }; ASSERT_EQ(get_version(id1), bthread::id_value(id1)); ASSERT_EQ(get_version(id1), bthread::id_value(id2)); ASSERT_EQ(0, bthread_id_error(id1, EBADF)); ASSERT_EQ(get_version(id1) + 5, bthread::id_value(id1)); ASSERT_EQ(get_version(id1) + 5, bthread::id_value(id2)); ASSERT_EQ(EINVAL, bthread_id_error(id1, EBADF)); ASSERT_EQ(EINVAL, bthread_id_error(id2, EBADF)); } static int on_numeric_error(bthread_id_t id, void* data, int error_code) { std::vector<int>* result = static_cast<std::vector<int>*>(data); result->push_back(error_code); EXPECT_EQ(0, bthread_id_unlock(id)); return 0; } TEST(BthreadIdTest, many_error) { bthread_id_t id1; std::vector<int> result; ASSERT_EQ(0, bthread_id_create(&id1, &result, on_numeric_error)); ASSERT_EQ(get_version(id1), bthread::id_value(id1)); int err = 0; const int N = 100; for (int i = 0; i < N; ++i) { ASSERT_EQ(0, bthread_id_error(id1, err++)); } ASSERT_EQ((size_t)N, result.size()); for (int i = 0; i < N; ++i) { ASSERT_EQ(i, result[i]); } ASSERT_EQ(0, bthread_id_trylock(id1, NULL)); ASSERT_EQ(get_version(id1) + 1, bthread::id_value(id1)); for (int i = 0; i < N; ++i) { ASSERT_EQ(0, bthread_id_error(id1, err++)); } ASSERT_EQ((size_t)N, result.size()); ASSERT_EQ(0, bthread_id_unlock(id1)); ASSERT_EQ(get_version(id1), bthread::id_value(id1)); ASSERT_EQ((size_t)2*N, result.size()); for (int i = 0; i < 2*N; ++i) { EXPECT_EQ(i, result[i]); } result.clear(); ASSERT_EQ(0, bthread_id_trylock(id1, NULL)); ASSERT_EQ(get_version(id1) + 1, bthread::id_value(id1)); for (int i = 0; i < N; ++i) { ASSERT_EQ(0, bthread_id_error(id1, err++)); } ASSERT_EQ(0, bthread_id_unlock_and_destroy(id1)); ASSERT_TRUE(result.empty()); } static void* locker(void* arg) { bthread_id_t id = { (uintptr_t)arg }; butil::Timer tm; tm.start(); EXPECT_EQ(0, bthread_id_lock(id, NULL)); bthread_usleep(2000); EXPECT_EQ(0, bthread_id_unlock(id)); tm.stop(); LOG(INFO) << "Unlocked, tm=" << tm.u_elapsed(); return NULL; } TEST(BthreadIdTest, id_lock) { bthread_id_t id1; ASSERT_EQ(0, bthread_id_create(&id1, NULL, NULL)); ASSERT_EQ(get_version(id1), bthread::id_value(id1)); pthread_t th[8]; for (size_t i = 0; i < ARRAY_SIZE(th); ++i) { ASSERT_EQ(0, pthread_create(&th[i], NULL, locker, (void*)id1.value)); } for (size_t i = 0; i < ARRAY_SIZE(th); ++i) { ASSERT_EQ(0, pthread_join(th[i], NULL)); } } static void* failed_locker(void* arg) { bthread_id_t id = { (uintptr_t)arg }; int rc = bthread_id_lock(id, NULL); if (rc == 0) { bthread_usleep(2000); EXPECT_EQ(0, bthread_id_unlock_and_destroy(id)); return (void*)1; } else { EXPECT_EQ(EINVAL, rc); return NULL; } } TEST(BthreadIdTest, id_lock_and_destroy) { bthread_id_t id1; ASSERT_EQ(0, bthread_id_create(&id1, NULL, NULL)); ASSERT_EQ(get_version(id1), bthread::id_value(id1)); pthread_t th[8]; for (size_t i = 0; i < ARRAY_SIZE(th); ++i) { ASSERT_EQ(0, pthread_create(&th[i], NULL, failed_locker, (void*)id1.value)); } int non_null = 0; for (size_t i = 0; i < ARRAY_SIZE(th); ++i) { void* ret = NULL; ASSERT_EQ(0, pthread_join(th[i], &ret)); non_null += (ret != NULL); } ASSERT_EQ(1, non_null); } TEST(BthreadIdTest, join_after_destroy_before_unlock) { bthread_id_t id1; int x = 0xdead; ASSERT_EQ(0, bthread_id_create(&id1, &x, NULL)); ASSERT_EQ(get_version(id1), bthread::id_value(id1)); pthread_t th[8]; SignalArg args[ARRAY_SIZE(th)]; for (size_t i = 0; i < ARRAY_SIZE(th); ++i) { args[i].sleep_us_before_fight = 0; args[i].sleep_us_before_signal = 20000; args[i].id = id1; ASSERT_EQ(0, pthread_create(&th[i], NULL, signaller, &args[i])); } bthread_usleep(10000); // join() waits until destroy() is called. ASSERT_EQ(0, bthread_id_join(id1)); ASSERT_EQ(0xdead + 1, x); ASSERT_EQ(get_version(id1) + 4, bthread::id_value(id1)); void* ret[ARRAY_SIZE(th)]; size_t non_null_ret = 0; for (size_t i = 0; i < ARRAY_SIZE(th); ++i) { ASSERT_EQ(0, pthread_join(th[i], &ret[i])); non_null_ret += (ret[i] != NULL); } ASSERT_EQ(1UL, non_null_ret); } struct StoppedWaiterArgs { bthread_id_t id; bool thread_started; }; void* stopped_waiter(void* void_arg) { StoppedWaiterArgs* args = (StoppedWaiterArgs*)void_arg; args->thread_started = true; EXPECT_EQ(0, bthread_id_join(args->id)); EXPECT_EQ(get_version(args->id) + 4, bthread::id_value(args->id)); return NULL; } TEST(BthreadIdTest, stop_a_wait_after_fight_before_signal) { bthread_id_t id1; int x = 0xdead; ASSERT_EQ(0, bthread_id_create(&id1, &x, NULL)); ASSERT_EQ(get_version(id1), bthread::id_value(id1)); void* data; ASSERT_EQ(0, bthread_id_trylock(id1, &data)); ASSERT_EQ(&x, data); bthread_t th[8]; StoppedWaiterArgs args[ARRAY_SIZE(th)]; for (size_t i = 0; i < ARRAY_SIZE(th); ++i) { args[i].id = id1; args[i].thread_started = false; ASSERT_EQ(0, bthread_start_urgent(&th[i], NULL, stopped_waiter, &args[i])); } // stop does not wake up bthread_id_join for (size_t i = 0; i < ARRAY_SIZE(th); ++i) { bthread_stop(th[i]); } bthread_usleep(10000); for (size_t i = 0; i < ARRAY_SIZE(th); ++i) { ASSERT_TRUE(bthread::TaskGroup::exists(th[i])); } // destroy the id to end the joinings. ASSERT_EQ(0, bthread_id_unlock_and_destroy(id1)); for (size_t i = 0; i < ARRAY_SIZE(th); ++i) { ASSERT_EQ(0, bthread_join(th[i], NULL)); } } void* waiter(void* arg) { bthread_id_t id = { (uintptr_t)arg }; EXPECT_EQ(0, bthread_id_join(id)); EXPECT_EQ(get_version(id) + 4, bthread::id_value(id)); return NULL; } int handle_data(bthread_id_t id, void* data, int error_code) { EXPECT_EQ(EBADF, error_code); ++*(int*)data; EXPECT_EQ(0, bthread_id_unlock_and_destroy(id)); return 0; } TEST(BthreadIdTest, list_signal) { bthread_id_list_t list; ASSERT_EQ(0, bthread_id_list_init(&list, 32, 32)); bthread_id_t id[16]; int data[ARRAY_SIZE(id)]; for (size_t i = 0; i < ARRAY_SIZE(id); ++i) { data[i] = i; ASSERT_EQ(0, bthread_id_create(&id[i], &data[i], handle_data)); ASSERT_EQ(get_version(id[i]), bthread::id_value(id[i])); ASSERT_EQ(0, bthread_id_list_add(&list, id[i])); } pthread_t th[ARRAY_SIZE(id)]; for (size_t i = 0; i < ARRAY_SIZE(th); ++i) { ASSERT_EQ(0, pthread_create(&th[i], NULL, waiter, (void*)(intptr_t)id[i].value)); } bthread_usleep(10000); ASSERT_EQ(0, bthread_id_list_reset(&list, EBADF)); for (size_t i = 0; i < ARRAY_SIZE(th); ++i) { ASSERT_EQ((int)(i + 1), data[i]); ASSERT_EQ(0, pthread_join(th[i], NULL)); // already reset. ASSERT_EQ((int)(i + 1), data[i]); } bthread_id_list_destroy(&list); } int error_without_unlock(bthread_id_t, void *, int) { return 0; } TEST(BthreadIdTest, status) { bthread_id_t id; bthread_id_create(&id, NULL, NULL); bthread::id_status(id, std::cout); bthread_id_lock(id, NULL); bthread_id_error(id, 123); bthread_id_error(id, 256); bthread_id_error(id, 1256); bthread::id_status(id, std::cout); bthread_id_unlock_and_destroy(id); bthread_id_create(&id, NULL, error_without_unlock); bthread_id_lock(id, NULL); bthread::id_status(id, std::cout); bthread_id_error(id, 12); bthread::id_status(id, std::cout); bthread_id_unlock(id); bthread::id_status(id, std::cout); bthread_id_unlock_and_destroy(id); } TEST(BthreadIdTest, reset_range) { bthread_id_t id; ASSERT_EQ(0, bthread_id_create(&id, NULL, NULL)); ASSERT_EQ(0, bthread_id_lock_and_reset_range(id, NULL, 1000)); bthread::id_status(id, std::cout); bthread_id_unlock(id); ASSERT_EQ(0, bthread_id_lock_and_reset_range(id, NULL, 300)); bthread::id_status(id, std::cout); bthread_id_unlock_and_destroy(id); } static bool any_thread_quit = false; struct FailToLockIdArgs { bthread_id_t id; int expected_return; }; static void* fail_to_lock_id(void* args_in) { FailToLockIdArgs* args = (FailToLockIdArgs*)args_in; butil::Timer tm; EXPECT_EQ(args->expected_return, bthread_id_lock(args->id, NULL)); any_thread_quit = true; return NULL; } TEST(BthreadIdTest, about_to_destroy_before_locking) { bthread_id_t id; ASSERT_EQ(0, bthread_id_create(&id, NULL, NULL)); ASSERT_EQ(0, bthread_id_lock(id, NULL)); ASSERT_EQ(0, bthread_id_about_to_destroy(id)); pthread_t pth; bthread_t bth; FailToLockIdArgs args = { id, EPERM }; ASSERT_EQ(0, pthread_create(&pth, NULL, fail_to_lock_id, &args)); ASSERT_EQ(0, bthread_start_background(&bth, NULL, fail_to_lock_id, &args)); // The threads should quit soon. pthread_join(pth, NULL); bthread_join(bth, NULL); bthread::id_status(id, std::cout); ASSERT_EQ(0, bthread_id_unlock_and_destroy(id)); } static void* succeed_to_lock_id(void* arg) { bthread_id_t id = *(bthread_id_t*)arg; EXPECT_EQ(0, bthread_id_lock(id, NULL)); EXPECT_EQ(0, bthread_id_unlock(id)); return NULL; } TEST(BthreadIdTest, about_to_destroy_cancelled) { bthread_id_t id; ASSERT_EQ(0, bthread_id_create(&id, NULL, NULL)); ASSERT_EQ(0, bthread_id_lock(id, NULL)); ASSERT_EQ(0, bthread_id_about_to_destroy(id)); ASSERT_EQ(0, bthread_id_unlock(id)); pthread_t pth; bthread_t bth; ASSERT_EQ(0, pthread_create(&pth, NULL, succeed_to_lock_id, &id)); ASSERT_EQ(0, bthread_start_background(&bth, NULL, succeed_to_lock_id, &id)); // The threads should quit soon. pthread_join(pth, NULL); bthread_join(bth, NULL); bthread::id_status(id, std::cout); ASSERT_EQ(0, bthread_id_lock(id, NULL)); ASSERT_EQ(0, bthread_id_unlock_and_destroy(id)); } TEST(BthreadIdTest, about_to_destroy_during_locking) { bthread_id_t id; ASSERT_EQ(0, bthread_id_create(&id, NULL, NULL)); ASSERT_EQ(0, bthread_id_lock(id, NULL)); any_thread_quit = false; pthread_t pth; bthread_t bth; FailToLockIdArgs args = { id, EPERM }; ASSERT_EQ(0, pthread_create(&pth, NULL, fail_to_lock_id, &args)); ASSERT_EQ(0, bthread_start_background(&bth, NULL, fail_to_lock_id, &args)); usleep(100000); ASSERT_FALSE(any_thread_quit); ASSERT_EQ(0, bthread_id_about_to_destroy(id)); // The threads should quit soon. pthread_join(pth, NULL); bthread_join(bth, NULL); bthread::id_status(id, std::cout); ASSERT_EQ(0, bthread_id_unlock_and_destroy(id)); } void* const DUMMY_DATA1 = (void*)1; void* const DUMMY_DATA2 = (void*)2; int branch_counter = 0; int branch_tags[4] = {}; int expected_code = 0; const char* expected_desc = ""; int handler_without_desc(bthread_id_t id, void* data, int error_code) { EXPECT_EQ(DUMMY_DATA1, data); EXPECT_EQ(expected_code, error_code); if (error_code == ESTOP) { branch_tags[0] = branch_counter; return bthread_id_unlock_and_destroy(id); } else { branch_tags[1] = branch_counter; return bthread_id_unlock(id); } } int handler_with_desc(bthread_id_t id, void* data, int error_code, const std::string& error_text) { EXPECT_EQ(DUMMY_DATA2, data); EXPECT_EQ(expected_code, error_code); EXPECT_STREQ(expected_desc, error_text.c_str()); if (error_code == ESTOP) { branch_tags[2] = branch_counter; return bthread_id_unlock_and_destroy(id); } else { branch_tags[3] = branch_counter; return bthread_id_unlock(id); } } TEST(BthreadIdTest, error_with_descriptions) { bthread_id_t id1; ASSERT_EQ(0, bthread_id_create(&id1, DUMMY_DATA1, handler_without_desc)); bthread_id_t id2; ASSERT_EQ(0, bthread_id_create2(&id2, DUMMY_DATA2, handler_with_desc)); // [ Matched in-place ] // Call bthread_id_error on an id created by bthread_id_create ++branch_counter; expected_code = EINVAL; ASSERT_EQ(0, bthread_id_error(id1, expected_code)); ASSERT_EQ(branch_counter, branch_tags[1]); // Call bthread_id_error2 on an id created by bthread_id_create2 ++branch_counter; expected_code = EPERM; expected_desc = "description1"; ASSERT_EQ(0, bthread_id_error2(id2, expected_code, expected_desc)); ASSERT_EQ(branch_counter, branch_tags[3]); // [ Mixed in-place ] // Call bthread_id_error on an id created by bthread_id_create2 ++branch_counter; expected_code = ECONNREFUSED; expected_desc = ""; ASSERT_EQ(0, bthread_id_error(id2, expected_code)); ASSERT_EQ(branch_counter, branch_tags[3]); // Call bthread_id_error2 on an id created by bthread_id_create ++branch_counter; expected_code = EINTR; ASSERT_EQ(0, bthread_id_error2(id1, expected_code, "")); ASSERT_EQ(branch_counter, branch_tags[1]); // [ Matched pending ] // Call bthread_id_error on an id created by bthread_id_create ++branch_counter; expected_code = ECONNRESET; ASSERT_EQ(0, bthread_id_lock(id1, NULL)); ASSERT_EQ(0, bthread_id_error(id1, expected_code)); ASSERT_EQ(0, bthread_id_unlock(id1)); ASSERT_EQ(branch_counter, branch_tags[1]); // Call bthread_id_error2 on an id created by bthread_id_create2 ++branch_counter; expected_code = ENOSPC; expected_desc = "description3"; ASSERT_EQ(0, bthread_id_lock(id2, NULL)); ASSERT_EQ(0, bthread_id_error2(id2, expected_code, expected_desc)); ASSERT_EQ(0, bthread_id_unlock(id2)); ASSERT_EQ(branch_counter, branch_tags[3]); // [ Mixed pending ] // Call bthread_id_error on an id created by bthread_id_create2 ++branch_counter; expected_code = ESTOP; expected_desc = ""; ASSERT_EQ(0, bthread_id_lock(id2, NULL)); ASSERT_EQ(0, bthread_id_error(id2, expected_code)); ASSERT_EQ(0, bthread_id_unlock(id2)); ASSERT_EQ(branch_counter, branch_tags[2]); // Call bthread_id_error2 on an id created by bthread_id_create ++branch_counter; ASSERT_EQ(0, bthread_id_lock(id1, NULL)); ASSERT_EQ(0, bthread_id_error2(id1, expected_code, "")); ASSERT_EQ(0, bthread_id_unlock(id1)); ASSERT_EQ(branch_counter, branch_tags[0]); } } // namespace