// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include <gtest/gtest.h> #include "butil/atomicops.h" #include "butil/time.h" #include "butil/macros.h" #include "butil/logging.h" #include "bthread/butex.h" #include "bthread/task_control.h" #include "bthread/task_group.h" #include "bthread/bthread.h" #include "bthread/unstable.h" namespace bthread { extern butil::atomic<TaskControl*> g_task_control; inline TaskControl* get_task_control() { return g_task_control.load(butil::memory_order_consume); } } // namespace bthread namespace { TEST(ButexTest, wait_on_already_timedout_butex) { uint32_t* butex = bthread::butex_create_checked<uint32_t>(); ASSERT_TRUE(butex); timespec now; ASSERT_EQ(0, clock_gettime(CLOCK_REALTIME, &now)); *butex = 1; ASSERT_EQ(-1, bthread::butex_wait(butex, 1, &now)); ASSERT_EQ(ETIMEDOUT, errno); } void* sleeper(void* arg) { bthread_usleep((uint64_t)arg); return NULL; } void* joiner(void* arg) { const long t1 = butil::gettimeofday_us(); for (bthread_t* th = (bthread_t*)arg; *th; ++th) { if (0 != bthread_join(*th, NULL)) { LOG(FATAL) << "fail to join thread_" << th - (bthread_t*)arg; } long elp = butil::gettimeofday_us() - t1; EXPECT_LE(labs(elp - (th - (bthread_t*)arg + 1) * 100000L), 15000L) << "timeout when joining thread_" << th - (bthread_t*)arg; LOG(INFO) << "Joined thread " << *th << " at " << elp << "us [" << bthread_self() << "]"; } for (bthread_t* th = (bthread_t*)arg; *th; ++th) { EXPECT_EQ(0, bthread_join(*th, NULL)); } return NULL; } struct A { uint64_t a; char dummy[0]; }; struct B { uint64_t a; }; TEST(ButexTest, with_or_without_array_zero) { ASSERT_EQ(sizeof(B), sizeof(A)); } TEST(ButexTest, join) { const size_t N = 6; const size_t M = 6; bthread_t th[N+1]; bthread_t jth[M]; pthread_t pth[M]; for (size_t i = 0; i < N; ++i) { bthread_attr_t attr = (i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL); ASSERT_EQ(0, bthread_start_urgent( &th[i], &attr, sleeper, (void*)(100000L/*100ms*/ * (i + 1)))); } th[N] = 0; // joiner will join tids in `th' until seeing 0. for (size_t i = 0; i < M; ++i) { ASSERT_EQ(0, bthread_start_urgent(&jth[i], NULL, joiner, th)); } for (size_t i = 0; i < M; ++i) { ASSERT_EQ(0, pthread_create(&pth[i], NULL, joiner, th)); } for (size_t i = 0; i < M; ++i) { ASSERT_EQ(0, bthread_join(jth[i], NULL)) << "i=" << i << " error=" << berror(); } for (size_t i = 0; i < M; ++i) { ASSERT_EQ(0, pthread_join(pth[i], NULL)); } } struct WaiterArg { int expected_result; int expected_value; butil::atomic<int> *butex; const timespec *ptimeout; }; void* waiter(void* arg) { WaiterArg * wa = (WaiterArg*)arg; const long t1 = butil::gettimeofday_us(); const int rc = bthread::butex_wait( wa->butex, wa->expected_value, wa->ptimeout); const long t2 = butil::gettimeofday_us(); if (rc == 0) { EXPECT_EQ(wa->expected_result, 0) << bthread_self(); } else { EXPECT_EQ(wa->expected_result, errno) << bthread_self(); } LOG(INFO) << "after wait, time=" << (t2-t1) << "us"; return NULL; } TEST(ButexTest, sanity) { const size_t N = 5; WaiterArg args[N * 4]; pthread_t t1, t2; butil::atomic<int>* b1 = bthread::butex_create_checked<butil::atomic<int> >(); ASSERT_TRUE(b1); bthread::butex_destroy(b1); b1 = bthread::butex_create_checked<butil::atomic<int> >(); *b1 = 1; ASSERT_EQ(0, bthread::butex_wake(b1)); WaiterArg *unmatched_arg = new WaiterArg; unmatched_arg->expected_value = *b1 + 1; unmatched_arg->expected_result = EWOULDBLOCK; unmatched_arg->butex = b1; unmatched_arg->ptimeout = NULL; pthread_create(&t2, NULL, waiter, unmatched_arg); bthread_t th; ASSERT_EQ(0, bthread_start_urgent(&th, NULL, waiter, unmatched_arg)); const timespec abstime = butil::seconds_from_now(1); for (size_t i = 0; i < 4*N; ++i) { args[i].expected_value = *b1; args[i].butex = b1; if ((i % 2) == 0) { args[i].expected_result = 0; args[i].ptimeout = NULL; } else { args[i].expected_result = ETIMEDOUT; args[i].ptimeout = &abstime; } if (i < 2*N) { pthread_create(&t1, NULL, waiter, &args[i]); } else { ASSERT_EQ(0, bthread_start_urgent(&th, NULL, waiter, &args[i])); } } sleep(2); for (size_t i = 0; i < 2*N; ++i) { ASSERT_EQ(1, bthread::butex_wake(b1)); } ASSERT_EQ(0, bthread::butex_wake(b1)); sleep(1); bthread::butex_destroy(b1); } struct ButexWaitArg { int* butex; int expected_val; long wait_msec; int error_code; }; void* wait_butex(void* void_arg) { ButexWaitArg* arg = static_cast<ButexWaitArg*>(void_arg); const timespec ts = butil::milliseconds_from_now(arg->wait_msec); int rc = bthread::butex_wait(arg->butex, arg->expected_val, &ts); int saved_errno = errno; if (arg->error_code) { EXPECT_EQ(-1, rc); EXPECT_EQ(arg->error_code, saved_errno); } else { EXPECT_EQ(0, rc); } return NULL; } TEST(ButexTest, wait_without_stop) { int* butex = bthread::butex_create_checked<int>(); *butex = 7; butil::Timer tm; const long WAIT_MSEC = 500; for (int i = 0; i < 2; ++i) { const bthread_attr_t attr = (i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL); ButexWaitArg arg = { butex, *butex, WAIT_MSEC, ETIMEDOUT }; bthread_t th; tm.start(); ASSERT_EQ(0, bthread_start_urgent(&th, &attr, wait_butex, &arg)); ASSERT_EQ(0, bthread_join(th, NULL)); tm.stop(); ASSERT_LT(labs(tm.m_elapsed() - WAIT_MSEC), 250); } bthread::butex_destroy(butex); } TEST(ButexTest, stop_after_running) { int* butex = bthread::butex_create_checked<int>(); *butex = 7; butil::Timer tm; const long WAIT_MSEC = 500; const long SLEEP_MSEC = 10; for (int i = 0; i < 2; ++i) { const bthread_attr_t attr = (i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL); bthread_t th; ButexWaitArg arg = { butex, *butex, WAIT_MSEC, EINTR }; tm.start(); ASSERT_EQ(0, bthread_start_urgent(&th, &attr, wait_butex, &arg)); ASSERT_EQ(0, bthread_usleep(SLEEP_MSEC * 1000L)); ASSERT_EQ(0, bthread_stop(th)); ASSERT_EQ(0, bthread_join(th, NULL)); tm.stop(); ASSERT_LT(labs(tm.m_elapsed() - SLEEP_MSEC), 25); // ASSERT_TRUE(bthread::get_task_control()-> // timer_thread()._idset.empty()); ASSERT_EQ(EINVAL, bthread_stop(th)); } bthread::butex_destroy(butex); } TEST(ButexTest, stop_before_running) { int* butex = bthread::butex_create_checked<int>(); *butex = 7; butil::Timer tm; const long WAIT_MSEC = 500; for (int i = 0; i < 2; ++i) { const bthread_attr_t attr = (i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL; bthread_t th; ButexWaitArg arg = { butex, *butex, WAIT_MSEC, EINTR }; tm.start(); ASSERT_EQ(0, bthread_start_background(&th, &attr, wait_butex, &arg)); ASSERT_EQ(0, bthread_stop(th)); bthread_flush(); ASSERT_EQ(0, bthread_join(th, NULL)); tm.stop(); ASSERT_LT(tm.m_elapsed(), 5); // ASSERT_TRUE(bthread::get_task_control()-> // timer_thread()._idset.empty()); ASSERT_EQ(EINVAL, bthread_stop(th)); } bthread::butex_destroy(butex); } void* join_the_waiter(void* arg) { EXPECT_EQ(0, bthread_join((bthread_t)arg, NULL)); return NULL; } TEST(ButexTest, join_cant_be_wakeup) { const long WAIT_MSEC = 100; int* butex = bthread::butex_create_checked<int>(); *butex = 7; butil::Timer tm; ButexWaitArg arg = { butex, *butex, 1000, EINTR }; for (int i = 0; i < 2; ++i) { const bthread_attr_t attr = (i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL); tm.start(); bthread_t th, th2; ASSERT_EQ(0, bthread_start_urgent(&th, NULL, wait_butex, &arg)); ASSERT_EQ(0, bthread_start_urgent(&th2, &attr, join_the_waiter, (void*)th)); ASSERT_EQ(0, bthread_stop(th2)); ASSERT_EQ(0, bthread_usleep(WAIT_MSEC / 2 * 1000L)); ASSERT_TRUE(bthread::TaskGroup::exists(th)); ASSERT_TRUE(bthread::TaskGroup::exists(th2)); ASSERT_EQ(0, bthread_usleep(WAIT_MSEC / 2 * 1000L)); ASSERT_EQ(0, bthread_stop(th)); ASSERT_EQ(0, bthread_join(th2, NULL)); ASSERT_EQ(0, bthread_join(th, NULL)); tm.stop(); ASSERT_LT(tm.m_elapsed(), WAIT_MSEC + 15); ASSERT_EQ(EINVAL, bthread_stop(th)); ASSERT_EQ(EINVAL, bthread_stop(th2)); } bthread::butex_destroy(butex); } TEST(ButexTest, stop_after_slept) { butil::Timer tm; const long SLEEP_MSEC = 100; const long WAIT_MSEC = 10; for (int i = 0; i < 2; ++i) { const bthread_attr_t attr = (i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL); tm.start(); bthread_t th; ASSERT_EQ(0, bthread_start_urgent( &th, &attr, sleeper, (void*)(SLEEP_MSEC*1000L))); ASSERT_EQ(0, bthread_usleep(WAIT_MSEC * 1000L)); ASSERT_EQ(0, bthread_stop(th)); ASSERT_EQ(0, bthread_join(th, NULL)); tm.stop(); if (attr.stack_type == BTHREAD_STACKTYPE_PTHREAD) { ASSERT_LT(labs(tm.m_elapsed() - SLEEP_MSEC), 15); } else { ASSERT_LT(labs(tm.m_elapsed() - WAIT_MSEC), 15); } // ASSERT_TRUE(bthread::get_task_control()-> // timer_thread()._idset.empty()); ASSERT_EQ(EINVAL, bthread_stop(th)); } } TEST(ButexTest, stop_just_when_sleeping) { butil::Timer tm; const long SLEEP_MSEC = 100; for (int i = 0; i < 2; ++i) { const bthread_attr_t attr = (i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL); tm.start(); bthread_t th; ASSERT_EQ(0, bthread_start_urgent( &th, &attr, sleeper, (void*)(SLEEP_MSEC*1000L))); ASSERT_EQ(0, bthread_stop(th)); ASSERT_EQ(0, bthread_join(th, NULL)); tm.stop(); if (attr.stack_type == BTHREAD_STACKTYPE_PTHREAD) { ASSERT_LT(labs(tm.m_elapsed() - SLEEP_MSEC), 15); } else { ASSERT_LT(tm.m_elapsed(), 15); } // ASSERT_TRUE(bthread::get_task_control()-> // timer_thread()._idset.empty()); ASSERT_EQ(EINVAL, bthread_stop(th)); } } TEST(ButexTest, stop_before_sleeping) { butil::Timer tm; const long SLEEP_MSEC = 100; for (int i = 0; i < 2; ++i) { bthread_t th; const bthread_attr_t attr = (i == 0 ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL; tm.start(); ASSERT_EQ(0, bthread_start_background(&th, &attr, sleeper, (void*)(SLEEP_MSEC*1000L))); ASSERT_EQ(0, bthread_stop(th)); bthread_flush(); ASSERT_EQ(0, bthread_join(th, NULL)); tm.stop(); if (attr.stack_type == BTHREAD_STACKTYPE_PTHREAD) { ASSERT_LT(labs(tm.m_elapsed() - SLEEP_MSEC), 10); } else { ASSERT_LT(tm.m_elapsed(), 10); } // ASSERT_TRUE(bthread::get_task_control()-> // timer_thread()._idset.empty()); ASSERT_EQ(EINVAL, bthread_stop(th)); } } } // namespace