// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gtest/gtest.h>

#include <bthread/execution_queue.h>
#include <bthread/sys_futex.h>
#include <bthread/countdown_event.h>
#include "butil/time.h"
#include "butil/fast_rand.h"
#include "butil/gperftools_profiler.h"

namespace {
bool stopped = false;

class ExecutionQueueTest : public testing::Test {
protected:
    void SetUp() { stopped = false; }
    void TearDown() {}
};

struct LongIntTask {
    long value;
    bthread::CountdownEvent* event;
    LongIntTask(long v)
        : value(v), event(NULL)
    {}
    LongIntTask(long v, bthread::CountdownEvent* e)
        : value(v), event(e)
    {}
    LongIntTask() : value(0), event(NULL) {}
};

int add(void* meta, bthread::TaskIterator<LongIntTask> &iter) {
    stopped = iter.is_queue_stopped();
    int64_t* result = (int64_t*)meta;
    for (; iter; ++iter) {
        *result += iter->value;
        if (iter->event) { iter->event->signal(); }
    }
    return 0;
}

TEST_F(ExecutionQueueTest, single_thread) {
    int64_t result = 0;
    int64_t expected_result = 0;
    stopped = false;
    bthread::ExecutionQueueId<LongIntTask> queue_id;
    bthread::ExecutionQueueOptions options;
    ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                    add, &result));
    for (int i = 0; i < 100; ++i) {
        expected_result += i;
        ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, i));
    }
    LOG(INFO) << "stop";
    ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
    ASSERT_NE(0, bthread::execution_queue_execute(queue_id, 0));
    ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
    ASSERT_EQ(expected_result, result);
    ASSERT_TRUE(stopped);
}

struct PushArg {
    bthread::ExecutionQueueId<LongIntTask> id;
    butil::atomic<int64_t> total_num;
    butil::atomic<int64_t> total_time;
    butil::atomic<int64_t> expected_value;
    volatile bool stopped;
    bool wait_task_completed;

    PushArg() {
        memset(this, 0, sizeof(*this));
    }
};

void* push_thread(void *arg) {
    PushArg* pa = (PushArg*)arg;
    int64_t sum = 0;
    butil::Timer timer;
    timer.start();
    int num = 0;
    bthread::CountdownEvent e;
    LongIntTask t(num, pa->wait_task_completed ? &e : NULL);
    if (pa->wait_task_completed) {
        e.reset(1);
    }
    while (bthread::execution_queue_execute(pa->id, t) == 0) {
        sum += num;
        t.value = ++num;
        if (pa->wait_task_completed) {
            e.wait();
            e.reset(1);
        }
    }
    timer.stop();
    pa->expected_value.fetch_add(sum, butil::memory_order_relaxed);
    pa->total_num.fetch_add(num);
    pa->total_time.fetch_add(timer.n_elapsed());
    return NULL;
}

void* push_thread_which_addresses_execq(void *arg) {
    PushArg* pa = (PushArg*)arg;
    int64_t sum = 0;
    butil::Timer timer;
    timer.start();
    int num = 0;
    bthread::ExecutionQueue<LongIntTask>::scoped_ptr_t ptr
            = bthread::execution_queue_address(pa->id);
    EXPECT_TRUE(ptr);
    while (ptr->execute(num) == 0) {
        sum += num;
        ++num;
    }
    EXPECT_TRUE(ptr->stopped());
    timer.stop();
    pa->expected_value.fetch_add(sum, butil::memory_order_relaxed);
    pa->total_num.fetch_add(num);
    pa->total_time.fetch_add(timer.n_elapsed());
    return NULL;
}

TEST_F(ExecutionQueueTest, performance) {
    pthread_t threads[8];
    bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
    bthread::ExecutionQueueOptions options;
    int64_t result = 0;
    ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                add, &result));
    PushArg pa;
    pa.id = queue_id;
    pa.total_num = 0;
    pa.total_time = 0;
    pa.expected_value = 0;
    pa.stopped = false;
    ProfilerStart("execq.prof");
    for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
        pthread_create(&threads[i], NULL, &push_thread_which_addresses_execq, &pa);
    }
    usleep(500 * 1000);
    ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
    for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
        pthread_join(threads[i], NULL);
    }
    ProfilerStop();
    ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
    ASSERT_EQ(pa.expected_value.load(), result);
    LOG(INFO) << "With addressed execq, each execution_queue_execute takes " 
              << pa.total_time.load() / pa.total_num.load()
              << " total_num=" << pa.total_num
              << " ns with " << ARRAY_SIZE(threads) << " threads";
#define BENCHMARK_BOTH
#ifdef BENCHMARK_BOTH
    result = 0;
    ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                add, &result));
    pa.id = queue_id;
    pa.total_num = 0;
    pa.total_time = 0;
    pa.expected_value = 0;
    pa.stopped = false;
    ProfilerStart("execq_id.prof");
    for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
        pthread_create(&threads[i], NULL, &push_thread, &pa);
    }
    usleep(500 * 1000);
    ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
    for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
        pthread_join(threads[i], NULL);
    }
    ProfilerStop();
    ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
    ASSERT_EQ(pa.expected_value.load(), result);
    LOG(INFO) << "With id explicitly, execution_queue_execute takes " 
              << pa.total_time.load() / pa.total_num.load()
              << " total_num=" << pa.total_num
              << " ns with " << ARRAY_SIZE(threads) << " threads";
#endif  // BENCHMARK_BOTH
}

volatile bool g_suspending = false;
volatile bool g_should_be_urgent = false;
int urgent_times = 0;

int add_with_suspend(void* meta, bthread::TaskIterator<LongIntTask>& iter) {
    int64_t* result = (int64_t*)meta;
    if (iter.is_queue_stopped()) {
        stopped = true;
        return 0;
    }
    if (g_should_be_urgent) {
        g_should_be_urgent = false;
        EXPECT_EQ(-1, iter->value) << urgent_times;
        if (iter->event) { iter->event->signal(); }
        ++iter;
        EXPECT_FALSE(iter) << urgent_times;
        ++urgent_times;
    } else {
        for (; iter; ++iter) {
            if (iter->value == -100) {
                g_suspending = true;
                while (g_suspending) {
                    bthread_usleep(100);
                }
                g_should_be_urgent = true;
                if (iter->event) { iter->event->signal(); }
                EXPECT_FALSE(++iter);
                return 0;
            } else {
                *result += iter->value;
                if (iter->event) { iter->event->signal(); }
            }
        }
    }
    return 0;
}

TEST_F(ExecutionQueueTest, execute_urgent) {
    g_should_be_urgent = false;
    pthread_t threads[10];
    bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
    bthread::ExecutionQueueOptions options;
    int64_t result = 0;
    ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                add_with_suspend, &result));
    PushArg pa;
    pa.id = queue_id;
    pa.total_num = 0;
    pa.total_time = 0;
    pa.expected_value = 0;
    pa.stopped = false;
    pa.wait_task_completed = true;
    for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
        pthread_create(&threads[i], NULL, &push_thread, &pa);
    }
    g_suspending = false;
    usleep(1000);

    for (int i = 0; i < 100; ++i) {
        ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, -100));
        while (!g_suspending) {
            usleep(100);
        }
        ASSERT_EQ(0, bthread::execution_queue_execute(
                      queue_id, -1, &bthread::TASK_OPTIONS_URGENT));
        g_suspending = false;
        usleep(100);
    }
    usleep(500* 1000);
    pa.stopped = true;
    ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
    for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
        pthread_join(threads[i], NULL);
    }
    LOG(INFO) << "result=" << result;
    ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
    ASSERT_EQ(pa.expected_value.load(), result);
}

TEST_F(ExecutionQueueTest, urgent_task_is_the_last_task) {
    g_should_be_urgent = false;
    g_suspending = false;
    bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
    bthread::ExecutionQueueOptions options;
    int64_t result = 0;
    ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                add_with_suspend, &result));
    g_suspending = false;
    ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, -100));
    while (!g_suspending) {
        usleep(10);
    }
    LOG(INFO) << "Going to push";
    int64_t expected = 0;
    for (int i = 1; i < 100; ++i) {
        expected += i;
        ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, i));
    }
    ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, -1, &bthread::TASK_OPTIONS_URGENT));
    usleep(100);
    g_suspending = false;
    butil::atomic_thread_fence(butil::memory_order_acq_rel);
    usleep(10 * 1000);
    LOG(INFO) << "going to quit";
    ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
    ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
    ASSERT_EQ(expected, result);
}

long next_task[1024];
butil::atomic<int> num_threads(0);

void* push_thread_with_id(void* arg) {
    bthread::ExecutionQueueId<LongIntTask> id = { (uint64_t)arg };
    int thread_id = num_threads.fetch_add(1, butil::memory_order_relaxed);
    LOG(INFO) << "Start thread" << thread_id;
    for (int i = 0; i < 100000; ++i) {
        bthread::execution_queue_execute(id, ((long)thread_id << 32) | i);
    }
    return NULL;
}

int check_order(void* meta, bthread::TaskIterator<LongIntTask>& iter) {
    for (; iter; ++iter) {
        long value = iter->value;
        int thread_id = value >> 32;
        long task = value & 0xFFFFFFFFul;
        if (task != next_task[thread_id]++) {
            EXPECT_TRUE(false) << "task=" << task << " thread_id=" << thread_id;
            ++*(long*)meta;
        }
        if (iter->event) { iter->event->signal(); }
    }
    return 0;
}

TEST_F(ExecutionQueueTest, multi_threaded_order) {
    memset(next_task, 0, sizeof(next_task));
    long disorder_times = 0;
    bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
    bthread::ExecutionQueueOptions options;
    ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                check_order, &disorder_times));
    pthread_t threads[12];
    for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
        pthread_create(&threads[i], NULL, &push_thread_with_id, (void *)queue_id.value);
    }
    for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
        pthread_join(threads[i], NULL);
    }
    ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
    ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
    ASSERT_EQ(0, disorder_times);
}

int check_running_thread(void* arg, bthread::TaskIterator<LongIntTask>& iter) {
    if (iter.is_queue_stopped()) {
        return 0;
    }
    for (; iter; ++iter) {}
    EXPECT_EQ(pthread_self(), (pthread_t)arg);
    return 0;
}

TEST_F(ExecutionQueueTest, in_place_task) {
    pthread_t thread_id = pthread_self();
    bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
    bthread::ExecutionQueueOptions options;
    ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                check_running_thread, 
                                                (void*)thread_id));
    ASSERT_EQ(0, bthread::execution_queue_execute(
                queue_id, 0, &bthread::TASK_OPTIONS_INPLACE));
    ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
    ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
}

struct InPlaceTask {
    bool first_task;
    pthread_t thread_id;
};

void *run_first_tasks(void* arg) {
    bthread::ExecutionQueueId<InPlaceTask> queue_id = { (uint64_t)arg };
    InPlaceTask task;
    task.first_task = true;
    task.thread_id = pthread_self();
    EXPECT_EQ(0, bthread::execution_queue_execute(queue_id, task, 
                                                  &bthread::TASK_OPTIONS_INPLACE));
    return NULL;
}

int stuck_and_check_running_thread(void* arg, bthread::TaskIterator<InPlaceTask>& iter) {
    if (iter.is_queue_stopped()) {
        return 0;
    }
    butil::atomic<int>* futex = (butil::atomic<int>*)arg;
    if (iter->first_task) {
        EXPECT_EQ(pthread_self(), iter->thread_id);
        futex->store(1);
        bthread::futex_wake_private(futex, 1);
        while (futex->load() != 2) {
            bthread::futex_wait_private(futex, 1, NULL);
        }
        ++iter;
        EXPECT_FALSE(iter);
    } else {
        for (; iter; ++iter) {
            EXPECT_FALSE(iter->first_task);
            EXPECT_NE(pthread_self(), iter->thread_id);
        }
    }
    return 0;
}

TEST_F(ExecutionQueueTest, should_start_new_thread_on_more_tasks) {
    bthread::ExecutionQueueId<InPlaceTask> queue_id = { 0 };
    bthread::ExecutionQueueOptions options;
    butil::atomic<int> futex(0);
    ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                stuck_and_check_running_thread, 
                                                (void*)&futex));
    pthread_t thread;
    ASSERT_EQ(0, pthread_create(&thread, NULL, run_first_tasks, (void*)queue_id.value));
    while (futex.load() != 1) {
        bthread::futex_wait_private(&futex, 0, NULL);
    }
    for (size_t i = 0; i < 100; ++i) {
        InPlaceTask task;
        task.first_task = false;
        task.thread_id = pthread_self();
        ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, task, 
                                                      &bthread::TASK_OPTIONS_INPLACE));
    }
    futex.store(2);
    bthread::futex_wake_private(&futex, 1);
    ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
    ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
}

void* inplace_push_thread(void* arg) {
    bthread::ExecutionQueueId<LongIntTask> id = { (uint64_t)arg };
    int thread_id = num_threads.fetch_add(1, butil::memory_order_relaxed);
    LOG(INFO) << "Start thread" << thread_id;
    for (int i = 0; i < 100000; ++i) {
        bthread::execution_queue_execute(id, ((long)thread_id << 32) | i,
                            &bthread::TASK_OPTIONS_INPLACE);
    }
    return NULL;
}

TEST_F(ExecutionQueueTest, inplace_and_order) {
    memset(next_task, 0, sizeof(next_task));
    long disorder_times = 0;
    bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
    bthread::ExecutionQueueOptions options;
    ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                check_order, &disorder_times));
    pthread_t threads[12];
    for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
        pthread_create(&threads[i], NULL, &inplace_push_thread, (void *)queue_id.value);
    }
    for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
        pthread_join(threads[i], NULL);
    }
    ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
    ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
    ASSERT_EQ(0, disorder_times);
}

TEST_F(ExecutionQueueTest, size_of_task_node) {
    LOG(INFO) << "sizeof(TaskNode)=" << sizeof(bthread::TaskNode);
}

int add_with_suspend2(void* meta, bthread::TaskIterator<LongIntTask>& iter) {
    int64_t* result = (int64_t*)meta;
    if (iter.is_queue_stopped()) {
        stopped = true;
        return 0;
    }
    for (; iter; ++iter) {
        if (iter->value == -100) {
            g_suspending = true;
            while (g_suspending) {
                usleep(10);
            }
            if (iter->event) { iter->event->signal(); }
        } else {
            *result += iter->value;
            if (iter->event) { iter->event->signal(); }
        }
    }
    return 0;
}

TEST_F(ExecutionQueueTest, cancel) {
    bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
    bthread::ExecutionQueueOptions options;
    int64_t result = 0;
    ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                add_with_suspend2, &result));
    g_suspending = false;
    bthread::TaskHandle handle0;
    ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, -100, NULL, &handle0));
    while (!g_suspending) {
        usleep(10);
    }
    ASSERT_EQ(1, bthread::execution_queue_cancel(handle0));
    ASSERT_EQ(1, bthread::execution_queue_cancel(handle0));
    bthread::TaskHandle handle1;
    ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, 100, NULL, &handle1));
    ASSERT_EQ(0, bthread::execution_queue_cancel(handle1));
    g_suspending = false;
    ASSERT_EQ(-1, bthread::execution_queue_cancel(handle1));
    ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
    ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
    ASSERT_EQ(0, result);
}

struct CancelSelf {
    butil::atomic<bthread::TaskHandle*> handle;
};

int cancel_self(void* /*meta*/, bthread::TaskIterator<CancelSelf*>& iter) {

    for (; iter; ++iter) {
        while ((*iter)->handle == NULL) {
            usleep(10);
        }
        EXPECT_EQ(1, bthread::execution_queue_cancel(*(*iter)->handle.load()));
        EXPECT_EQ(1, bthread::execution_queue_cancel(*(*iter)->handle.load()));
        EXPECT_EQ(1, bthread::execution_queue_cancel(*(*iter)->handle.load()));
    }
    return 0;
}

TEST_F(ExecutionQueueTest, cancel_self) {
    bthread::ExecutionQueueId<CancelSelf*> queue_id = { 0 }; // to suppress warnings
    bthread::ExecutionQueueOptions options;
    ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                cancel_self, NULL));
    CancelSelf task;
    task.handle = NULL;
    bthread::TaskHandle handle;
    ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, &task, NULL, &handle));
    task.handle.store(&handle);
    ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
    ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
}

struct AddTask {
    int value;
    bool cancel_task;
    int cancel_value;
    bthread::TaskHandle handle;
};

struct AddMeta {
    int64_t sum;
    butil::atomic<int64_t> expected;
    butil::atomic<int64_t> succ_times;
    butil::atomic<int64_t> race_times;
    butil::atomic<int64_t> fail_times;
};

int add_with_cancel(void* meta, bthread::TaskIterator<AddTask>& iter) {
    if (iter.is_queue_stopped()) {
        return 0;
    }
    AddMeta* m = (AddMeta*)meta;
    for (; iter; ++iter) {
        if (iter->cancel_task) {
            const int rc = bthread::execution_queue_cancel(iter->handle);
            if (rc == 0) {
                m->expected.fetch_sub(iter->cancel_value);
                m->succ_times.fetch_add(1);
            } else if (rc < 0) {
                m->fail_times.fetch_add(1);
            } else {
                m->race_times.fetch_add(1);
            }
        } else {
            m->sum += iter->value;
        }
    }
    return 0;
}

TEST_F(ExecutionQueueTest, random_cancel) {
    bthread::ExecutionQueueId<AddTask> queue_id = { 0 };
    AddMeta m;
    m.sum = 0;
    m.expected.store(0);
    m.succ_times.store(0);
    m.fail_times.store(0);
    m.race_times.store(0);
    ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, NULL,
                                                add_with_cancel, &m));
    int64_t expected = 0;
    for (int i = 0; i < 100000; ++i) {
        bthread::TaskHandle h;
        AddTask t;
        t.value = i;
        t.cancel_task = false;
        ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, t, NULL, &h));
        const int r = butil::fast_rand_less_than(4);
        expected += i;
        if (r == 0) {
            if (bthread::execution_queue_cancel(h) == 0) {
                expected -= i;
            }
        } else if (r == 1) {
            t.cancel_task = true;
            t.cancel_value = i;
            t.handle = h;
            ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, t, NULL));
        } else if (r == 2) {
            t.cancel_task = true;
            t.cancel_value = i;
            t.handle = h;
            ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, t, 
                                    &bthread::TASK_OPTIONS_URGENT));
        } else {
            // do nothing;
        }
    }
    m.expected.fetch_add(expected);
    ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
    ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
    ASSERT_EQ(m.sum, m.expected.load());
    LOG(INFO) << "sum=" << m.sum << " race_times=" << m.race_times
              << " succ_times=" << m.succ_times
              << " fail_times=" << m.fail_times;

}

int add2(void* meta, bthread::TaskIterator<LongIntTask> &iter) {
    if (iter) {
        int64_t* result = (int64_t*)meta;
        *result += iter->value;
        if (iter->event) { iter->event->signal(); }
    }
    return 0;
}

TEST_F(ExecutionQueueTest, not_do_iterate_at_all) {
    int64_t result = 0;
    int64_t expected_result = 0;
    bthread::ExecutionQueueId<LongIntTask> queue_id;
    bthread::ExecutionQueueOptions options;
    ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                    add2, &result));
    for (int i = 0; i < 100; ++i) {
        expected_result += i;
        ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, i));
    }
    ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
    ASSERT_NE(0, bthread::execution_queue_execute(queue_id, 0));
    ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
    ASSERT_EQ(expected_result, result);
}

int add_with_suspend3(void* meta, bthread::TaskIterator<LongIntTask>& iter) {
    int64_t* result = (int64_t*)meta;
    if (iter.is_queue_stopped()) {
        stopped = true;
        return 0;
    }
    for (; iter; ++iter) {
        if (iter->value == -100) {
            g_suspending = true;
            while (g_suspending) {
                usleep(10);
            }
            if (iter->event) { iter->event->signal(); }
        } else {
            *result += iter->value;
            if (iter->event) { iter->event->signal(); }
        }
    }
    return 0;
}

TEST_F(ExecutionQueueTest, cancel_unexecuted_high_priority_task) {
    g_should_be_urgent = false;
    bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to suppress warnings
    bthread::ExecutionQueueOptions options;
    int64_t result = 0;
    ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                add_with_suspend3, &result));
    // Push a normal task to make the executor suspend
    ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, -100));
    while (!g_suspending) {
        usleep(10);
    }
    // At this point, executor is suspended by the first task. Then we put
    // a high_priority task which is going to be cancelled immediately,
    // expecting that both operations are successful.
    bthread::TaskHandle h;
    ASSERT_EQ(0, bthread::execution_queue_execute(
                        queue_id, -100, &bthread::TASK_OPTIONS_URGENT, &h));
    ASSERT_EQ(0, bthread::execution_queue_cancel(h));
    
    // Resume executor
    g_suspending = false;  

    // Push a normal task
    ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, 12345));

    // The execq should stop normally
    ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
    ASSERT_EQ(0, bthread::execution_queue_join(queue_id));

    ASSERT_EQ(12345, result);
}
} // namespace