// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include <gtest/gtest.h> #include <bthread/execution_queue.h> #include <bthread/sys_futex.h> #include <bthread/countdown_event.h> #include "butil/time.h" #include "butil/fast_rand.h" #include "butil/gperftools_profiler.h" namespace { bool stopped = false; class ExecutionQueueTest : public testing::Test { protected: void SetUp() { stopped = false; } void TearDown() {} }; struct LongIntTask { long value; bthread::CountdownEvent* event; LongIntTask(long v) : value(v), event(NULL) {} LongIntTask(long v, bthread::CountdownEvent* e) : value(v), event(e) {} LongIntTask() : value(0), event(NULL) {} }; int add(void* meta, bthread::TaskIterator<LongIntTask> &iter) { stopped = iter.is_queue_stopped(); int64_t* result = (int64_t*)meta; for (; iter; ++iter) { *result += iter->value; if (iter->event) { iter->event->signal(); } } return 0; } TEST_F(ExecutionQueueTest, single_thread) { int64_t result = 0; int64_t expected_result = 0; stopped = false; bthread::ExecutionQueueId<LongIntTask> queue_id; bthread::ExecutionQueueOptions options; ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options, add, &result)); for (int i = 0; i < 100; ++i) { expected_result += i; ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, i)); } LOG(INFO) << "stop"; ASSERT_EQ(0, bthread::execution_queue_stop(queue_id)); ASSERT_NE(0, bthread::execution_queue_execute(queue_id, 0)); ASSERT_EQ(0, bthread::execution_queue_join(queue_id)); ASSERT_EQ(expected_result, result); ASSERT_TRUE(stopped); } struct PushArg { bthread::ExecutionQueueId<LongIntTask> id; butil::atomic<int64_t> total_num; butil::atomic<int64_t> total_time; butil::atomic<int64_t> expected_value; volatile bool stopped; bool wait_task_completed; PushArg() { memset(this, 0, sizeof(*this)); } }; void* push_thread(void *arg) { PushArg* pa = (PushArg*)arg; int64_t sum = 0; butil::Timer timer; timer.start(); int num = 0; bthread::CountdownEvent e; LongIntTask t(num, pa->wait_task_completed ? &e : NULL); if (pa->wait_task_completed) { e.reset(1); } while (bthread::execution_queue_execute(pa->id, t) == 0) { sum += num; t.value = ++num; if (pa->wait_task_completed) { e.wait(); e.reset(1); } } timer.stop(); pa->expected_value.fetch_add(sum, butil::memory_order_relaxed); pa->total_num.fetch_add(num); pa->total_time.fetch_add(timer.n_elapsed()); return NULL; } void* push_thread_which_addresses_execq(void *arg) { PushArg* pa = (PushArg*)arg; int64_t sum = 0; butil::Timer timer; timer.start(); int num = 0; bthread::ExecutionQueue<LongIntTask>::scoped_ptr_t ptr = bthread::execution_queue_address(pa->id); EXPECT_TRUE(ptr); while (ptr->execute(num) == 0) { sum += num; ++num; } EXPECT_TRUE(ptr->stopped()); timer.stop(); pa->expected_value.fetch_add(sum, butil::memory_order_relaxed); pa->total_num.fetch_add(num); pa->total_time.fetch_add(timer.n_elapsed()); return NULL; } TEST_F(ExecutionQueueTest, performance) { pthread_t threads[8]; bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings bthread::ExecutionQueueOptions options; int64_t result = 0; ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options, add, &result)); PushArg pa; pa.id = queue_id; pa.total_num = 0; pa.total_time = 0; pa.expected_value = 0; pa.stopped = false; ProfilerStart("execq.prof"); for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) { pthread_create(&threads[i], NULL, &push_thread_which_addresses_execq, &pa); } usleep(500 * 1000); ASSERT_EQ(0, bthread::execution_queue_stop(queue_id)); for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) { pthread_join(threads[i], NULL); } ProfilerStop(); ASSERT_EQ(0, bthread::execution_queue_join(queue_id)); ASSERT_EQ(pa.expected_value.load(), result); LOG(INFO) << "With addressed execq, each execution_queue_execute takes " << pa.total_time.load() / pa.total_num.load() << " total_num=" << pa.total_num << " ns with " << ARRAY_SIZE(threads) << " threads"; #define BENCHMARK_BOTH #ifdef BENCHMARK_BOTH result = 0; ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options, add, &result)); pa.id = queue_id; pa.total_num = 0; pa.total_time = 0; pa.expected_value = 0; pa.stopped = false; ProfilerStart("execq_id.prof"); for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) { pthread_create(&threads[i], NULL, &push_thread, &pa); } usleep(500 * 1000); ASSERT_EQ(0, bthread::execution_queue_stop(queue_id)); for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) { pthread_join(threads[i], NULL); } ProfilerStop(); ASSERT_EQ(0, bthread::execution_queue_join(queue_id)); ASSERT_EQ(pa.expected_value.load(), result); LOG(INFO) << "With id explicitly, execution_queue_execute takes " << pa.total_time.load() / pa.total_num.load() << " total_num=" << pa.total_num << " ns with " << ARRAY_SIZE(threads) << " threads"; #endif // BENCHMARK_BOTH } volatile bool g_suspending = false; volatile bool g_should_be_urgent = false; int urgent_times = 0; int add_with_suspend(void* meta, bthread::TaskIterator<LongIntTask>& iter) { int64_t* result = (int64_t*)meta; if (iter.is_queue_stopped()) { stopped = true; return 0; } if (g_should_be_urgent) { g_should_be_urgent = false; EXPECT_EQ(-1, iter->value) << urgent_times; if (iter->event) { iter->event->signal(); } ++iter; EXPECT_FALSE(iter) << urgent_times; ++urgent_times; } else { for (; iter; ++iter) { if (iter->value == -100) { g_suspending = true; while (g_suspending) { bthread_usleep(100); } g_should_be_urgent = true; if (iter->event) { iter->event->signal(); } EXPECT_FALSE(++iter); return 0; } else { *result += iter->value; if (iter->event) { iter->event->signal(); } } } } return 0; } TEST_F(ExecutionQueueTest, execute_urgent) { g_should_be_urgent = false; pthread_t threads[10]; bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings bthread::ExecutionQueueOptions options; int64_t result = 0; ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options, add_with_suspend, &result)); PushArg pa; pa.id = queue_id; pa.total_num = 0; pa.total_time = 0; pa.expected_value = 0; pa.stopped = false; pa.wait_task_completed = true; for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) { pthread_create(&threads[i], NULL, &push_thread, &pa); } g_suspending = false; usleep(1000); for (int i = 0; i < 100; ++i) { ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, -100)); while (!g_suspending) { usleep(100); } ASSERT_EQ(0, bthread::execution_queue_execute( queue_id, -1, &bthread::TASK_OPTIONS_URGENT)); g_suspending = false; usleep(100); } usleep(500* 1000); pa.stopped = true; ASSERT_EQ(0, bthread::execution_queue_stop(queue_id)); for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) { pthread_join(threads[i], NULL); } LOG(INFO) << "result=" << result; ASSERT_EQ(0, bthread::execution_queue_join(queue_id)); ASSERT_EQ(pa.expected_value.load(), result); } TEST_F(ExecutionQueueTest, urgent_task_is_the_last_task) { g_should_be_urgent = false; g_suspending = false; bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings bthread::ExecutionQueueOptions options; int64_t result = 0; ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options, add_with_suspend, &result)); g_suspending = false; ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, -100)); while (!g_suspending) { usleep(10); } LOG(INFO) << "Going to push"; int64_t expected = 0; for (int i = 1; i < 100; ++i) { expected += i; ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, i)); } ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, -1, &bthread::TASK_OPTIONS_URGENT)); usleep(100); g_suspending = false; butil::atomic_thread_fence(butil::memory_order_acq_rel); usleep(10 * 1000); LOG(INFO) << "going to quit"; ASSERT_EQ(0, bthread::execution_queue_stop(queue_id)); ASSERT_EQ(0, bthread::execution_queue_join(queue_id)); ASSERT_EQ(expected, result); } long next_task[1024]; butil::atomic<int> num_threads(0); void* push_thread_with_id(void* arg) { bthread::ExecutionQueueId<LongIntTask> id = { (uint64_t)arg }; int thread_id = num_threads.fetch_add(1, butil::memory_order_relaxed); LOG(INFO) << "Start thread" << thread_id; for (int i = 0; i < 100000; ++i) { bthread::execution_queue_execute(id, ((long)thread_id << 32) | i); } return NULL; } int check_order(void* meta, bthread::TaskIterator<LongIntTask>& iter) { for (; iter; ++iter) { long value = iter->value; int thread_id = value >> 32; long task = value & 0xFFFFFFFFul; if (task != next_task[thread_id]++) { EXPECT_TRUE(false) << "task=" << task << " thread_id=" << thread_id; ++*(long*)meta; } if (iter->event) { iter->event->signal(); } } return 0; } TEST_F(ExecutionQueueTest, multi_threaded_order) { memset(next_task, 0, sizeof(next_task)); long disorder_times = 0; bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings bthread::ExecutionQueueOptions options; ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options, check_order, &disorder_times)); pthread_t threads[12]; for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) { pthread_create(&threads[i], NULL, &push_thread_with_id, (void *)queue_id.value); } for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) { pthread_join(threads[i], NULL); } ASSERT_EQ(0, bthread::execution_queue_stop(queue_id)); ASSERT_EQ(0, bthread::execution_queue_join(queue_id)); ASSERT_EQ(0, disorder_times); } int check_running_thread(void* arg, bthread::TaskIterator<LongIntTask>& iter) { if (iter.is_queue_stopped()) { return 0; } for (; iter; ++iter) {} EXPECT_EQ(pthread_self(), (pthread_t)arg); return 0; } TEST_F(ExecutionQueueTest, in_place_task) { pthread_t thread_id = pthread_self(); bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings bthread::ExecutionQueueOptions options; ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options, check_running_thread, (void*)thread_id)); ASSERT_EQ(0, bthread::execution_queue_execute( queue_id, 0, &bthread::TASK_OPTIONS_INPLACE)); ASSERT_EQ(0, bthread::execution_queue_stop(queue_id)); ASSERT_EQ(0, bthread::execution_queue_join(queue_id)); } struct InPlaceTask { bool first_task; pthread_t thread_id; }; void *run_first_tasks(void* arg) { bthread::ExecutionQueueId<InPlaceTask> queue_id = { (uint64_t)arg }; InPlaceTask task; task.first_task = true; task.thread_id = pthread_self(); EXPECT_EQ(0, bthread::execution_queue_execute(queue_id, task, &bthread::TASK_OPTIONS_INPLACE)); return NULL; } int stuck_and_check_running_thread(void* arg, bthread::TaskIterator<InPlaceTask>& iter) { if (iter.is_queue_stopped()) { return 0; } butil::atomic<int>* futex = (butil::atomic<int>*)arg; if (iter->first_task) { EXPECT_EQ(pthread_self(), iter->thread_id); futex->store(1); bthread::futex_wake_private(futex, 1); while (futex->load() != 2) { bthread::futex_wait_private(futex, 1, NULL); } ++iter; EXPECT_FALSE(iter); } else { for (; iter; ++iter) { EXPECT_FALSE(iter->first_task); EXPECT_NE(pthread_self(), iter->thread_id); } } return 0; } TEST_F(ExecutionQueueTest, should_start_new_thread_on_more_tasks) { bthread::ExecutionQueueId<InPlaceTask> queue_id = { 0 }; bthread::ExecutionQueueOptions options; butil::atomic<int> futex(0); ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options, stuck_and_check_running_thread, (void*)&futex)); pthread_t thread; ASSERT_EQ(0, pthread_create(&thread, NULL, run_first_tasks, (void*)queue_id.value)); while (futex.load() != 1) { bthread::futex_wait_private(&futex, 0, NULL); } for (size_t i = 0; i < 100; ++i) { InPlaceTask task; task.first_task = false; task.thread_id = pthread_self(); ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, task, &bthread::TASK_OPTIONS_INPLACE)); } futex.store(2); bthread::futex_wake_private(&futex, 1); ASSERT_EQ(0, bthread::execution_queue_stop(queue_id)); ASSERT_EQ(0, bthread::execution_queue_join(queue_id)); } void* inplace_push_thread(void* arg) { bthread::ExecutionQueueId<LongIntTask> id = { (uint64_t)arg }; int thread_id = num_threads.fetch_add(1, butil::memory_order_relaxed); LOG(INFO) << "Start thread" << thread_id; for (int i = 0; i < 100000; ++i) { bthread::execution_queue_execute(id, ((long)thread_id << 32) | i, &bthread::TASK_OPTIONS_INPLACE); } return NULL; } TEST_F(ExecutionQueueTest, inplace_and_order) { memset(next_task, 0, sizeof(next_task)); long disorder_times = 0; bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings bthread::ExecutionQueueOptions options; ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options, check_order, &disorder_times)); pthread_t threads[12]; for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) { pthread_create(&threads[i], NULL, &inplace_push_thread, (void *)queue_id.value); } for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) { pthread_join(threads[i], NULL); } ASSERT_EQ(0, bthread::execution_queue_stop(queue_id)); ASSERT_EQ(0, bthread::execution_queue_join(queue_id)); ASSERT_EQ(0, disorder_times); } TEST_F(ExecutionQueueTest, size_of_task_node) { LOG(INFO) << "sizeof(TaskNode)=" << sizeof(bthread::TaskNode); } int add_with_suspend2(void* meta, bthread::TaskIterator<LongIntTask>& iter) { int64_t* result = (int64_t*)meta; if (iter.is_queue_stopped()) { stopped = true; return 0; } for (; iter; ++iter) { if (iter->value == -100) { g_suspending = true; while (g_suspending) { usleep(10); } if (iter->event) { iter->event->signal(); } } else { *result += iter->value; if (iter->event) { iter->event->signal(); } } } return 0; } TEST_F(ExecutionQueueTest, cancel) { bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings bthread::ExecutionQueueOptions options; int64_t result = 0; ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options, add_with_suspend2, &result)); g_suspending = false; bthread::TaskHandle handle0; ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, -100, NULL, &handle0)); while (!g_suspending) { usleep(10); } ASSERT_EQ(1, bthread::execution_queue_cancel(handle0)); ASSERT_EQ(1, bthread::execution_queue_cancel(handle0)); bthread::TaskHandle handle1; ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, 100, NULL, &handle1)); ASSERT_EQ(0, bthread::execution_queue_cancel(handle1)); g_suspending = false; ASSERT_EQ(-1, bthread::execution_queue_cancel(handle1)); ASSERT_EQ(0, bthread::execution_queue_stop(queue_id)); ASSERT_EQ(0, bthread::execution_queue_join(queue_id)); ASSERT_EQ(0, result); } struct CancelSelf { butil::atomic<bthread::TaskHandle*> handle; }; int cancel_self(void* /*meta*/, bthread::TaskIterator<CancelSelf*>& iter) { for (; iter; ++iter) { while ((*iter)->handle == NULL) { usleep(10); } EXPECT_EQ(1, bthread::execution_queue_cancel(*(*iter)->handle.load())); EXPECT_EQ(1, bthread::execution_queue_cancel(*(*iter)->handle.load())); EXPECT_EQ(1, bthread::execution_queue_cancel(*(*iter)->handle.load())); } return 0; } TEST_F(ExecutionQueueTest, cancel_self) { bthread::ExecutionQueueId<CancelSelf*> queue_id = { 0 }; // to suppress warnings bthread::ExecutionQueueOptions options; ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options, cancel_self, NULL)); CancelSelf task; task.handle = NULL; bthread::TaskHandle handle; ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, &task, NULL, &handle)); task.handle.store(&handle); ASSERT_EQ(0, bthread::execution_queue_stop(queue_id)); ASSERT_EQ(0, bthread::execution_queue_join(queue_id)); } struct AddTask { int value; bool cancel_task; int cancel_value; bthread::TaskHandle handle; }; struct AddMeta { int64_t sum; butil::atomic<int64_t> expected; butil::atomic<int64_t> succ_times; butil::atomic<int64_t> race_times; butil::atomic<int64_t> fail_times; }; int add_with_cancel(void* meta, bthread::TaskIterator<AddTask>& iter) { if (iter.is_queue_stopped()) { return 0; } AddMeta* m = (AddMeta*)meta; for (; iter; ++iter) { if (iter->cancel_task) { const int rc = bthread::execution_queue_cancel(iter->handle); if (rc == 0) { m->expected.fetch_sub(iter->cancel_value); m->succ_times.fetch_add(1); } else if (rc < 0) { m->fail_times.fetch_add(1); } else { m->race_times.fetch_add(1); } } else { m->sum += iter->value; } } return 0; } TEST_F(ExecutionQueueTest, random_cancel) { bthread::ExecutionQueueId<AddTask> queue_id = { 0 }; AddMeta m; m.sum = 0; m.expected.store(0); m.succ_times.store(0); m.fail_times.store(0); m.race_times.store(0); ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, NULL, add_with_cancel, &m)); int64_t expected = 0; for (int i = 0; i < 100000; ++i) { bthread::TaskHandle h; AddTask t; t.value = i; t.cancel_task = false; ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, t, NULL, &h)); const int r = butil::fast_rand_less_than(4); expected += i; if (r == 0) { if (bthread::execution_queue_cancel(h) == 0) { expected -= i; } } else if (r == 1) { t.cancel_task = true; t.cancel_value = i; t.handle = h; ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, t, NULL)); } else if (r == 2) { t.cancel_task = true; t.cancel_value = i; t.handle = h; ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, t, &bthread::TASK_OPTIONS_URGENT)); } else { // do nothing; } } m.expected.fetch_add(expected); ASSERT_EQ(0, bthread::execution_queue_stop(queue_id)); ASSERT_EQ(0, bthread::execution_queue_join(queue_id)); ASSERT_EQ(m.sum, m.expected.load()); LOG(INFO) << "sum=" << m.sum << " race_times=" << m.race_times << " succ_times=" << m.succ_times << " fail_times=" << m.fail_times; } int add2(void* meta, bthread::TaskIterator<LongIntTask> &iter) { if (iter) { int64_t* result = (int64_t*)meta; *result += iter->value; if (iter->event) { iter->event->signal(); } } return 0; } TEST_F(ExecutionQueueTest, not_do_iterate_at_all) { int64_t result = 0; int64_t expected_result = 0; bthread::ExecutionQueueId<LongIntTask> queue_id; bthread::ExecutionQueueOptions options; ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options, add2, &result)); for (int i = 0; i < 100; ++i) { expected_result += i; ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, i)); } ASSERT_EQ(0, bthread::execution_queue_stop(queue_id)); ASSERT_NE(0, bthread::execution_queue_execute(queue_id, 0)); ASSERT_EQ(0, bthread::execution_queue_join(queue_id)); ASSERT_EQ(expected_result, result); } int add_with_suspend3(void* meta, bthread::TaskIterator<LongIntTask>& iter) { int64_t* result = (int64_t*)meta; if (iter.is_queue_stopped()) { stopped = true; return 0; } for (; iter; ++iter) { if (iter->value == -100) { g_suspending = true; while (g_suspending) { usleep(10); } if (iter->event) { iter->event->signal(); } } else { *result += iter->value; if (iter->event) { iter->event->signal(); } } } return 0; } TEST_F(ExecutionQueueTest, cancel_unexecuted_high_priority_task) { g_should_be_urgent = false; bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings bthread::ExecutionQueueOptions options; int64_t result = 0; ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options, add_with_suspend3, &result)); // Push a normal task to make the executor suspend ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, -100)); while (!g_suspending) { usleep(10); } // At this point, executor is suspended by the first task. Then we put // a high_priority task which is going to be cancelled immediately, // expecting that both operations are successful. bthread::TaskHandle h; ASSERT_EQ(0, bthread::execution_queue_execute( queue_id, -100, &bthread::TASK_OPTIONS_URGENT, &h)); ASSERT_EQ(0, bthread::execution_queue_cancel(h)); // Resume executor g_suspending = false; // Push a normal task ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, 12345)); // The execq should stop normally ASSERT_EQ(0, bthread::execution_queue_stop(queue_id)); ASSERT_EQ(0, bthread::execution_queue_join(queue_id)); ASSERT_EQ(12345, result); } } // namespace