Commit 8c1d5228 authored by Mengmeng Yang's avatar Mengmeng Yang

support custom executor in execution_queue

parent 29b74413
...@@ -101,15 +101,22 @@ void ExecutionQueueBase::start_execute(TaskNode* node) { ...@@ -101,15 +101,22 @@ void ExecutionQueueBase::start_execute(TaskNode* node) {
} }
} }
bthread_t tid; if (nullptr == _options.executor) {
// We start the execution thread in background instead of foreground as bthread_t tid;
// we can't determine whether the code after execute() is urgent (like // We start the execution thread in background instead of foreground as
// unlock a pthread_mutex_t) in which case implicit context switch may // we can't determine whether the code after execute() is urgent (like
// cause undefined behavior (e.g. deadlock) // unlock a pthread_mutex_t) in which case implicit context switch may
if (bthread_start_background(&tid, &_options.bthread_attr, // cause undefined behavior (e.g. deadlock)
_execute_tasks, node) != 0) { if (bthread_start_background(&tid, &_options.bthread_attr,
PLOG(FATAL) << "Fail to start bthread"; _execute_tasks, node) != 0) {
_execute_tasks(node); PLOG(FATAL) << "Fail to start bthread";
_execute_tasks(node);
}
} else {
if (_options.executor->submit(_execute_tasks, node) != 0) {
PLOG(FATAL) << "Fail to submit task";
_execute_tasks(node);
}
} }
} }
......
...@@ -128,11 +128,24 @@ const static TaskOptions TASK_OPTIONS_NORMAL = TaskOptions(false, false); ...@@ -128,11 +128,24 @@ const static TaskOptions TASK_OPTIONS_NORMAL = TaskOptions(false, false);
const static TaskOptions TASK_OPTIONS_URGENT = TaskOptions(true, false); const static TaskOptions TASK_OPTIONS_URGENT = TaskOptions(true, false);
const static TaskOptions TASK_OPTIONS_INPLACE = TaskOptions(false, true); const static TaskOptions TASK_OPTIONS_INPLACE = TaskOptions(false, true);
class Executor {
public:
virtual ~Executor() {}
// Return 0 on success.
virtual int submit(void * (*fn)(void*), void* args) = 0;
};
struct ExecutionQueueOptions { struct ExecutionQueueOptions {
ExecutionQueueOptions(); ExecutionQueueOptions();
// Attribute of the bthread which execute runs on // Attribute of the bthread which execute runs on
// default: BTHREAD_ATTR_NORMAL // default: BTHREAD_ATTR_NORMAL
bthread_attr_t bthread_attr; bthread_attr_t bthread_attr;
// Executor that tasks run on. bthread will be used when executor = NULL.
// Note that TaskOptions.in_place_if_possible = false will not work, if implementation of
// Executor is in-place(synchronous).
Executor * executor;
}; };
// Start a ExecutionQueue. If |options| is NULL, the queue will be created with // Start a ExecutionQueue. If |options| is NULL, the queue will be created with
......
...@@ -317,7 +317,7 @@ public: ...@@ -317,7 +317,7 @@ public:
}; };
inline ExecutionQueueOptions::ExecutionQueueOptions() inline ExecutionQueueOptions::ExecutionQueueOptions()
: bthread_attr(BTHREAD_ATTR_NORMAL) : bthread_attr(BTHREAD_ATTR_NORMAL), executor(NULL)
{} {}
template <typename T> template <typename T>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment