bthread_execution_queue_unittest.cpp 23.4 KB
Newer Older
gejun's avatar
gejun committed
1
// Copyright (c) 2014 Baidu, Inc.
gejun's avatar
gejun committed
2 3 4 5 6 7 8
// Author: Zhangyi Chen (chenzhangyi01@baidu.com)
// Date: 2015/11/09 19:09:02

#include <gtest/gtest.h>

#include <bthread/execution_queue.h>
#include <bthread/sys_futex.h>
gejun's avatar
gejun committed
9
#include <bthread/countdown_event.h>
10 11
#include "butil/time.h"
#include "butil/fast_rand.h"
12
#include "butil/gperftools_profiler.h"
gejun's avatar
gejun committed
13 14 15 16 17 18 19 20 21 22

namespace {
bool stopped = false;

class ExecutionQueueTest : public testing::Test {
protected:
    void SetUp() { stopped = false; }
    void TearDown() {}
};

gejun's avatar
gejun committed
23 24 25 26 27 28 29 30 31 32 33 34 35
struct LongIntTask {
    long value;
    bthread::CountdownEvent* event;
    LongIntTask(long v)
        : value(v), event(NULL)
    {}
    LongIntTask(long v, bthread::CountdownEvent* e)
        : value(v), event(e)
    {}
    LongIntTask() : value(0), event(NULL) {}
};

int add(void* meta, bthread::TaskIterator<LongIntTask> &iter) {
gejun's avatar
gejun committed
36 37 38
    stopped = iter.is_queue_stopped();
    int64_t* result = (int64_t*)meta;
    for (; iter; ++iter) {
gejun's avatar
gejun committed
39 40
        *result += iter->value;
        if (iter->event) { iter->event->signal(); }
gejun's avatar
gejun committed
41 42 43 44 45 46 47 48
    }
    return 0;
}

TEST_F(ExecutionQueueTest, single_thread) {
    int64_t result = 0;
    int64_t expected_result = 0;
    stopped = false;
gejun's avatar
gejun committed
49
    bthread::ExecutionQueueId<LongIntTask> queue_id;
gejun's avatar
gejun committed
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
    bthread::ExecutionQueueOptions options;
    ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                    add, &result));
    for (int i = 0; i < 100; ++i) {
        expected_result += i;
        ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, i));
    }
    LOG(INFO) << "stop";
    ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
    ASSERT_NE(0, bthread::execution_queue_execute(queue_id, 0));
    ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
    ASSERT_EQ(expected_result, result);
    ASSERT_TRUE(stopped);
}

struct PushArg {
gejun's avatar
gejun committed
66
    bthread::ExecutionQueueId<LongIntTask> id;
67 68 69
    butil::atomic<int64_t> total_num;
    butil::atomic<int64_t> total_time;
    butil::atomic<int64_t> expected_value;
gejun's avatar
gejun committed
70
    volatile bool stopped;
gejun's avatar
gejun committed
71 72 73 74 75
    bool wait_task_completed;

    PushArg() {
        memset(this, 0, sizeof(*this));
    }
gejun's avatar
gejun committed
76 77 78 79 80
};

void* push_thread(void *arg) {
    PushArg* pa = (PushArg*)arg;
    int64_t sum = 0;
81
    butil::Timer timer;
gejun's avatar
gejun committed
82 83
    timer.start();
    int num = 0;
gejun's avatar
gejun committed
84 85 86 87 88 89
    bthread::CountdownEvent e;
    LongIntTask t(num, pa->wait_task_completed ? &e : NULL);
    if (pa->wait_task_completed) {
        e.reset(1);
    }
    while (bthread::execution_queue_execute(pa->id, t) == 0) {
gejun's avatar
gejun committed
90
        sum += num;
gejun's avatar
gejun committed
91 92 93 94
        t.value = ++num;
        if (pa->wait_task_completed) {
            e.wait();
            e.reset(1);
gejun's avatar
gejun committed
95 96 97
        }
    }
    timer.stop();
98
    pa->expected_value.fetch_add(sum, butil::memory_order_relaxed);
gejun's avatar
gejun committed
99 100 101 102 103 104 105 106
    pa->total_num.fetch_add(num);
    pa->total_time.fetch_add(timer.n_elapsed());
    return NULL;
}

void* push_thread_which_addresses_execq(void *arg) {
    PushArg* pa = (PushArg*)arg;
    int64_t sum = 0;
107
    butil::Timer timer;
gejun's avatar
gejun committed
108 109
    timer.start();
    int num = 0;
gejun's avatar
gejun committed
110
    bthread::ExecutionQueue<LongIntTask>::scoped_ptr_t ptr
gejun's avatar
gejun committed
111 112 113 114 115 116 117 118
            = bthread::execution_queue_address(pa->id);
    EXPECT_TRUE(ptr);
    while (ptr->execute(num) == 0) {
        sum += num;
        ++num;
    }
    EXPECT_TRUE(ptr->stopped());
    timer.stop();
119
    pa->expected_value.fetch_add(sum, butil::memory_order_relaxed);
gejun's avatar
gejun committed
120 121 122 123 124 125 126
    pa->total_num.fetch_add(num);
    pa->total_time.fetch_add(timer.n_elapsed());
    return NULL;
}

TEST_F(ExecutionQueueTest, performance) {
    pthread_t threads[8];
gejun's avatar
gejun committed
127
    bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to supress warns
gejun's avatar
gejun committed
128 129 130 131 132 133 134 135 136 137 138 139 140 141
    bthread::ExecutionQueueOptions options;
    int64_t result = 0;
    ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                add, &result));
    PushArg pa;
    pa.id = queue_id;
    pa.total_num = 0;
    pa.total_time = 0;
    pa.expected_value = 0;
    pa.stopped = false;
    ProfilerStart("execq.prof");
    for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
        pthread_create(&threads[i], NULL, &push_thread_which_addresses_execq, &pa);
    }
gejun's avatar
gejun committed
142
    usleep(500 * 1000);
gejun's avatar
gejun committed
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
    ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
    for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
        pthread_join(threads[i], NULL);
    }
    ProfilerStop();
    ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
    ASSERT_EQ(pa.expected_value.load(), result);
    LOG(INFO) << "With addressed execq, each execution_queue_execute takes " 
              << pa.total_time.load() / pa.total_num.load()
              << " total_num=" << pa.total_num
              << " ns with " << ARRAY_SIZE(threads) << " threads";
#define BENCHMARK_BOTH
#ifdef BENCHMARK_BOTH
    result = 0;
    ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                add, &result));
    pa.id = queue_id;
    pa.total_num = 0;
    pa.total_time = 0;
    pa.expected_value = 0;
    pa.stopped = false;
    ProfilerStart("execq_id.prof");
    for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
        pthread_create(&threads[i], NULL, &push_thread, &pa);
    }
gejun's avatar
gejun committed
168
    usleep(500 * 1000);
gejun's avatar
gejun committed
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
    ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
    for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
        pthread_join(threads[i], NULL);
    }
    ProfilerStop();
    ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
    ASSERT_EQ(pa.expected_value.load(), result);
    LOG(INFO) << "With id explicitly, execution_queue_execute takes " 
              << pa.total_time.load() / pa.total_num.load()
              << " total_num=" << pa.total_num
              << " ns with " << ARRAY_SIZE(threads) << " threads";
#endif  // BENCHMARK_BOTH
}

volatile bool g_suspending = false;
volatile bool g_should_be_urgent = false;
int urgent_times = 0;

gejun's avatar
gejun committed
187
int add_with_suspend(void* meta, bthread::TaskIterator<LongIntTask>& iter) {
gejun's avatar
gejun committed
188 189 190 191 192 193 194
    int64_t* result = (int64_t*)meta;
    if (iter.is_queue_stopped()) {
        stopped = true;
        return 0;
    }
    if (g_should_be_urgent) {
        g_should_be_urgent = false;
gejun's avatar
gejun committed
195 196
        EXPECT_EQ(-1, iter->value) << urgent_times;
        if (iter->event) { iter->event->signal(); }
gejun's avatar
gejun committed
197 198 199 200 201
        ++iter;
        EXPECT_FALSE(iter) << urgent_times;
        ++urgent_times;
    } else {
        for (; iter; ++iter) {
gejun's avatar
gejun committed
202
            if (iter->value == -100) {
gejun's avatar
gejun committed
203 204
                g_suspending = true;
                while (g_suspending) {
gejun's avatar
gejun committed
205
                    bthread_usleep(100);
gejun's avatar
gejun committed
206 207
                }
                g_should_be_urgent = true;
gejun's avatar
gejun committed
208
                if (iter->event) { iter->event->signal(); }
gejun's avatar
gejun committed
209 210 211
                EXPECT_FALSE(++iter);
                return 0;
            } else {
gejun's avatar
gejun committed
212 213
                *result += iter->value;
                if (iter->event) { iter->event->signal(); }
gejun's avatar
gejun committed
214 215 216 217 218 219 220 221
            }
        }
    }
    return 0;
}

TEST_F(ExecutionQueueTest, execute_urgent) {
    g_should_be_urgent = false;
gejun's avatar
gejun committed
222 223
    pthread_t threads[10];
    bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to supress warns
gejun's avatar
gejun committed
224 225 226 227 228 229 230 231 232 233
    bthread::ExecutionQueueOptions options;
    int64_t result = 0;
    ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                add_with_suspend, &result));
    PushArg pa;
    pa.id = queue_id;
    pa.total_num = 0;
    pa.total_time = 0;
    pa.expected_value = 0;
    pa.stopped = false;
gejun's avatar
gejun committed
234
    pa.wait_task_completed = true;
gejun's avatar
gejun committed
235 236 237 238 239 240
    for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
        pthread_create(&threads[i], NULL, &push_thread, &pa);
    }
    g_suspending = false;
    usleep(1000);

gejun's avatar
gejun committed
241
    for (int i = 0; i < 100; ++i) {
gejun's avatar
gejun committed
242 243
        ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, -100));
        while (!g_suspending) {
gejun's avatar
gejun committed
244
            usleep(100);
gejun's avatar
gejun committed
245
        }
gejun's avatar
gejun committed
246 247
        ASSERT_EQ(0, bthread::execution_queue_execute(
                      queue_id, -1, &bthread::TASK_OPTIONS_URGENT));
gejun's avatar
gejun committed
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264
        g_suspending = false;
        usleep(100);
    }
    usleep(500* 1000);
    pa.stopped = true;
    ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
    for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
        pthread_join(threads[i], NULL);
    }
    LOG(INFO) << "result=" << result;
    ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
    ASSERT_EQ(pa.expected_value.load(), result);
}

TEST_F(ExecutionQueueTest, urgent_task_is_the_last_task) {
    g_should_be_urgent = false;
    g_suspending = false;
gejun's avatar
gejun committed
265
    bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to supress warns
gejun's avatar
gejun committed
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
    bthread::ExecutionQueueOptions options;
    int64_t result = 0;
    ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                add_with_suspend, &result));
    g_suspending = false;
    ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, -100));
    while (!g_suspending) {
        usleep(10);
    }
    LOG(INFO) << "Going to push";
    int64_t expected = 0;
    for (int i = 1; i < 100; ++i) {
        expected += i;
        ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, i));
    }
    ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, -1, &bthread::TASK_OPTIONS_URGENT));
    usleep(100);
    g_suspending = false;
284
    butil::atomic_thread_fence(butil::memory_order_acq_rel);
gejun's avatar
gejun committed
285 286 287 288 289 290 291 292
    usleep(10 * 1000);
    LOG(INFO) << "going to quit";
    ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
    ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
    ASSERT_EQ(expected, result);
}

long next_task[1024];
293
butil::atomic<int> num_threads(0);
gejun's avatar
gejun committed
294 295

void* push_thread_with_id(void* arg) {
gejun's avatar
gejun committed
296
    bthread::ExecutionQueueId<LongIntTask> id = { (uint64_t)arg };
297
    int thread_id = num_threads.fetch_add(1, butil::memory_order_relaxed);
gejun's avatar
gejun committed
298 299 300 301 302 303 304
    LOG(INFO) << "Start thread" << thread_id;
    for (int i = 0; i < 100000; ++i) {
        bthread::execution_queue_execute(id, ((long)thread_id << 32) | i);
    }
    return NULL;
}

gejun's avatar
gejun committed
305
int check_order(void* meta, bthread::TaskIterator<LongIntTask>& iter) {
gejun's avatar
gejun committed
306
    for (; iter; ++iter) {
gejun's avatar
gejun committed
307
        long value = iter->value;
gejun's avatar
gejun committed
308 309 310
        int thread_id = value >> 32;
        long task = value & 0xFFFFFFFFul;
        if (task != next_task[thread_id]++) {
gejun's avatar
gejun committed
311
            EXPECT_TRUE(false) << "task=" << task << " thread_id=" << thread_id;
gejun's avatar
gejun committed
312 313
            ++*(long*)meta;
        }
gejun's avatar
gejun committed
314
        if (iter->event) { iter->event->signal(); }
gejun's avatar
gejun committed
315 316 317 318 319 320 321
    }
    return 0;
}

TEST_F(ExecutionQueueTest, multi_threaded_order) {
    memset(next_task, 0, sizeof(next_task));
    long disorder_times = 0;
gejun's avatar
gejun committed
322
    bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to supress warns
gejun's avatar
gejun committed
323 324 325 326 327 328 329 330 331 332 333 334 335 336 337
    bthread::ExecutionQueueOptions options;
    ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                check_order, &disorder_times));
    pthread_t threads[12];
    for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
        pthread_create(&threads[i], NULL, &push_thread_with_id, (void *)queue_id.value);
    }
    for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
        pthread_join(threads[i], NULL);
    }
    ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
    ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
    ASSERT_EQ(0, disorder_times);
}

gejun's avatar
gejun committed
338
int check_running_thread(void* arg, bthread::TaskIterator<LongIntTask>& iter) {
gejun's avatar
gejun committed
339 340 341 342 343 344 345 346 347 348
    if (iter.is_queue_stopped()) {
        return 0;
    }
    for (; iter; ++iter) {}
    EXPECT_EQ(pthread_self(), (pthread_t)arg);
    return 0;
}

TEST_F(ExecutionQueueTest, in_place_task) {
    pthread_t thread_id = pthread_self();
gejun's avatar
gejun committed
349
    bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to supress warns
gejun's avatar
gejun committed
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378
    bthread::ExecutionQueueOptions options;
    ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                check_running_thread, 
                                                (void*)thread_id));
    ASSERT_EQ(0, bthread::execution_queue_execute(
                queue_id, 0, &bthread::TASK_OPTIONS_INPLACE));
    ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
    ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
}

struct InPlaceTask {
    bool first_task;
    pthread_t thread_id;
};

void *run_first_tasks(void* arg) {
    bthread::ExecutionQueueId<InPlaceTask> queue_id = { (uint64_t)arg };
    InPlaceTask task;
    task.first_task = true;
    task.thread_id = pthread_self();
    EXPECT_EQ(0, bthread::execution_queue_execute(queue_id, task, 
                                                  &bthread::TASK_OPTIONS_INPLACE));
    return NULL;
}

int stuck_and_check_running_thread(void* arg, bthread::TaskIterator<InPlaceTask>& iter) {
    if (iter.is_queue_stopped()) {
        return 0;
    }
379
    butil::atomic<int>* futex = (butil::atomic<int>*)arg;
gejun's avatar
gejun committed
380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400
    if (iter->first_task) {
        EXPECT_EQ(pthread_self(), iter->thread_id);
        futex->store(1);
        bthread::futex_wake_private(futex, 1);
        while (futex->load() != 2) {
            bthread::futex_wait_private(futex, 1, NULL);
        }
        ++iter;
        EXPECT_FALSE(iter);
    } else {
        for (; iter; ++iter) {
            EXPECT_FALSE(iter->first_task);
            EXPECT_NE(pthread_self(), iter->thread_id);
        }
    }
    return 0;
}

TEST_F(ExecutionQueueTest, should_start_new_thread_on_more_tasks) {
    bthread::ExecutionQueueId<InPlaceTask> queue_id = { 0 };
    bthread::ExecutionQueueOptions options;
401
    butil::atomic<int> futex(0);
gejun's avatar
gejun committed
402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423
    ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                stuck_and_check_running_thread, 
                                                (void*)&futex));
    pthread_t thread;
    ASSERT_EQ(0, pthread_create(&thread, NULL, run_first_tasks, (void*)queue_id.value));
    while (futex.load() != 1) {
        bthread::futex_wait_private(&futex, 0, NULL);
    }
    for (size_t i = 0; i < 100; ++i) {
        InPlaceTask task;
        task.first_task = false;
        task.thread_id = pthread_self();
        ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, task, 
                                                      &bthread::TASK_OPTIONS_INPLACE));
    }
    futex.store(2);
    bthread::futex_wake_private(&futex, 1);
    ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
    ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
}

void* inplace_push_thread(void* arg) {
gejun's avatar
gejun committed
424
    bthread::ExecutionQueueId<LongIntTask> id = { (uint64_t)arg };
425
    int thread_id = num_threads.fetch_add(1, butil::memory_order_relaxed);
gejun's avatar
gejun committed
426 427 428 429 430 431 432 433 434 435 436
    LOG(INFO) << "Start thread" << thread_id;
    for (int i = 0; i < 100000; ++i) {
        bthread::execution_queue_execute(id, ((long)thread_id << 32) | i,
                            &bthread::TASK_OPTIONS_INPLACE);
    }
    return NULL;
}

TEST_F(ExecutionQueueTest, inplace_and_order) {
    memset(next_task, 0, sizeof(next_task));
    long disorder_times = 0;
gejun's avatar
gejun committed
437
    bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to supress warns
gejun's avatar
gejun committed
438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456
    bthread::ExecutionQueueOptions options;
    ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                check_order, &disorder_times));
    pthread_t threads[12];
    for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
        pthread_create(&threads[i], NULL, &inplace_push_thread, (void *)queue_id.value);
    }
    for (size_t i = 0; i < ARRAY_SIZE(threads); ++i) {
        pthread_join(threads[i], NULL);
    }
    ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
    ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
    ASSERT_EQ(0, disorder_times);
}

TEST_F(ExecutionQueueTest, size_of_task_node) {
    LOG(INFO) << "sizeof(TaskNode)=" << sizeof(bthread::TaskNode);
}

gejun's avatar
gejun committed
457
int add_with_suspend2(void* meta, bthread::TaskIterator<LongIntTask>& iter) {
gejun's avatar
gejun committed
458 459 460 461 462 463
    int64_t* result = (int64_t*)meta;
    if (iter.is_queue_stopped()) {
        stopped = true;
        return 0;
    }
    for (; iter; ++iter) {
gejun's avatar
gejun committed
464
        if (iter->value == -100) {
gejun's avatar
gejun committed
465 466 467 468
            g_suspending = true;
            while (g_suspending) {
                usleep(10);
            }
gejun's avatar
gejun committed
469
            if (iter->event) { iter->event->signal(); }
gejun's avatar
gejun committed
470
        } else {
gejun's avatar
gejun committed
471 472
            *result += iter->value;
            if (iter->event) { iter->event->signal(); }
gejun's avatar
gejun committed
473 474 475 476 477 478
        }
    }
    return 0;
}

TEST_F(ExecutionQueueTest, cancel) {
gejun's avatar
gejun committed
479
    bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to supress warns
gejun's avatar
gejun committed
480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502
    bthread::ExecutionQueueOptions options;
    int64_t result = 0;
    ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                add_with_suspend2, &result));
    g_suspending = false;
    bthread::TaskHandle handle0;
    ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, -100, NULL, &handle0));
    while (!g_suspending) {
        usleep(10);
    }
    ASSERT_EQ(1, bthread::execution_queue_cancel(handle0));
    ASSERT_EQ(1, bthread::execution_queue_cancel(handle0));
    bthread::TaskHandle handle1;
    ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, 100, NULL, &handle1));
    ASSERT_EQ(0, bthread::execution_queue_cancel(handle1));
    g_suspending = false;
    ASSERT_EQ(-1, bthread::execution_queue_cancel(handle1));
    ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
    ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
    ASSERT_EQ(0, result);
}

struct CancelSelf {
503
    butil::atomic<bthread::TaskHandle*> handle;
gejun's avatar
gejun committed
504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541
};

int cancel_self(void* /*meta*/, bthread::TaskIterator<CancelSelf*>& iter) {

    for (; iter; ++iter) {
        while ((*iter)->handle == NULL) {
            usleep(10);
        }
        EXPECT_EQ(1, bthread::execution_queue_cancel(*(*iter)->handle.load()));
        EXPECT_EQ(1, bthread::execution_queue_cancel(*(*iter)->handle.load()));
        EXPECT_EQ(1, bthread::execution_queue_cancel(*(*iter)->handle.load()));
    }
    return 0;
}

TEST_F(ExecutionQueueTest, cancel_self) {
    bthread::ExecutionQueueId<CancelSelf*> queue_id = { 0 }; // to supress warns
    bthread::ExecutionQueueOptions options;
    ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                cancel_self, NULL));
    CancelSelf task;
    task.handle = NULL;
    bthread::TaskHandle handle;
    ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, &task, NULL, &handle));
    task.handle.store(&handle);
    ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
    ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
}

struct AddTask {
    int value;
    bool cancel_task;
    int cancel_value;
    bthread::TaskHandle handle;
};

struct AddMeta {
    int64_t sum;
542 543 544 545
    butil::atomic<int64_t> expected;
    butil::atomic<int64_t> succ_times;
    butil::atomic<int64_t> race_times;
    butil::atomic<int64_t> fail_times;
gejun's avatar
gejun committed
546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583
};

int add_with_cancel(void* meta, bthread::TaskIterator<AddTask>& iter) {
    if (iter.is_queue_stopped()) {
        return 0;
    }
    AddMeta* m = (AddMeta*)meta;
    for (; iter; ++iter) {
        if (iter->cancel_task) {
            const int rc = bthread::execution_queue_cancel(iter->handle);
            if (rc == 0) {
                m->expected.fetch_sub(iter->cancel_value);
                m->succ_times.fetch_add(1);
            } else if (rc < 0) {
                m->fail_times.fetch_add(1);
            } else {
                m->race_times.fetch_add(1);
            }
        } else {
            m->sum += iter->value;
        }
    }
    return 0;
}

TEST_F(ExecutionQueueTest, random_cancel) {
    bthread::ExecutionQueueId<AddTask> queue_id = { 0 };
    AddMeta m;
    m.sum = 0;
    m.expected.store(0);
    m.succ_times.store(0);
    m.fail_times.store(0);
    m.race_times.store(0);
    ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, NULL,
                                                add_with_cancel, &m));
    int64_t expected = 0;
    for (int i = 0; i < 100000; ++i) {
        bthread::TaskHandle h;
584
        AddTask t;
gejun's avatar
gejun committed
585 586 587
        t.value = i;
        t.cancel_task = false;
        ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, t, NULL, &h));
588
        const int r = butil::fast_rand_less_than(4);
gejun's avatar
gejun committed
589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618
        expected += i;
        if (r == 0) {
            if (bthread::execution_queue_cancel(h) == 0) {
                expected -= i;
            }
        } else if (r == 1) {
            t.cancel_task = true;
            t.cancel_value = i;
            t.handle = h;
            ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, t, NULL));
        } else if (r == 2) {
            t.cancel_task = true;
            t.cancel_value = i;
            t.handle = h;
            ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, t, 
                                    &bthread::TASK_OPTIONS_URGENT));
        } else {
            // do nothing;
        }
    }
    m.expected.fetch_add(expected);
    ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
    ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
    ASSERT_EQ(m.sum, m.expected.load());
    LOG(INFO) << "sum=" << m.sum << " race_times=" << m.race_times
              << " succ_times=" << m.succ_times
              << " fail_times=" << m.fail_times;

}

gejun's avatar
gejun committed
619
int add2(void* meta, bthread::TaskIterator<LongIntTask> &iter) {
gejun's avatar
gejun committed
620 621
    if (iter) {
        int64_t* result = (int64_t*)meta;
gejun's avatar
gejun committed
622 623
        *result += iter->value;
        if (iter->event) { iter->event->signal(); }
gejun's avatar
gejun committed
624 625 626 627 628 629 630
    }
    return 0;
}

TEST_F(ExecutionQueueTest, not_do_iterate_at_all) {
    int64_t result = 0;
    int64_t expected_result = 0;
gejun's avatar
gejun committed
631
    bthread::ExecutionQueueId<LongIntTask> queue_id;
gejun's avatar
gejun committed
632 633 634 635 636 637 638 639 640 641 642 643 644
    bthread::ExecutionQueueOptions options;
    ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                    add2, &result));
    for (int i = 0; i < 100; ++i) {
        expected_result += i;
        ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, i));
    }
    ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
    ASSERT_NE(0, bthread::execution_queue_execute(queue_id, 0));
    ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
    ASSERT_EQ(expected_result, result);
}

gejun's avatar
gejun committed
645
int add_with_suspend3(void* meta, bthread::TaskIterator<LongIntTask>& iter) {
gejun's avatar
gejun committed
646 647 648 649 650 651
    int64_t* result = (int64_t*)meta;
    if (iter.is_queue_stopped()) {
        stopped = true;
        return 0;
    }
    for (; iter; ++iter) {
gejun's avatar
gejun committed
652
        if (iter->value == -100) {
gejun's avatar
gejun committed
653 654 655 656
            g_suspending = true;
            while (g_suspending) {
                usleep(10);
            }
gejun's avatar
gejun committed
657
            if (iter->event) { iter->event->signal(); }
gejun's avatar
gejun committed
658
        } else {
gejun's avatar
gejun committed
659 660
            *result += iter->value;
            if (iter->event) { iter->event->signal(); }
gejun's avatar
gejun committed
661 662 663 664 665 666 667
        }
    }
    return 0;
}

TEST_F(ExecutionQueueTest, cancel_unexecuted_high_priority_task) {
    g_should_be_urgent = false;
gejun's avatar
gejun committed
668
    bthread::ExecutionQueueId<LongIntTask> queue_id = { 0 }; // to supress warns
gejun's avatar
gejun committed
669 670 671 672 673 674 675 676 677
    bthread::ExecutionQueueOptions options;
    int64_t result = 0;
    ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
                                                add_with_suspend3, &result));
    // Push a normal task to make the executor suspend
    ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, -100));
    while (!g_suspending) {
        usleep(10);
    }
gejun's avatar
gejun committed
678 679 680
    // At this point, executor is suspended by the first task. Then we put
    // a high_priority task which is going to be cancelled immediately,
    // expecting that both operations are successful.
gejun's avatar
gejun committed
681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698
    bthread::TaskHandle h;
    ASSERT_EQ(0, bthread::execution_queue_execute(
                        queue_id, -100, &bthread::TASK_OPTIONS_URGENT, &h));
    ASSERT_EQ(0, bthread::execution_queue_cancel(h));
    
    // Resume executor
    g_suspending = false;  

    // Push a normal task
    ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, 12345));

    // The execq should stop normally
    ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
    ASSERT_EQ(0, bthread::execution_queue_join(queue_id));

    ASSERT_EQ(12345, result);
}
} // namespace