// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

// bthread - A M:N threading library to make applications more concurrent.

// Date: 2016/04/16 18:43:24

#include "bthread/execution_queue.h"

#include "butil/memory/singleton_on_pthread_once.h"
#include "butil/object_pool.h"           // butil::get_object
#include "butil/resource_pool.h"         // butil::get_resource

namespace bthread {

//May be false on different platforms
//BAIDU_CASSERT(sizeof(TaskNode) == 128, sizeof_TaskNode_must_be_128);
//BAIDU_CASSERT(offsetof(TaskNode, static_task_mem) + sizeof(TaskNode().static_task_mem) == 128, sizeof_TaskNode_must_be_128);
BAIDU_CASSERT(sizeof(ExecutionQueue<int>) == sizeof(ExecutionQueueBase),
              sizeof_ExecutionQueue_must_be_the_same_with_ExecutionQueueBase);
BAIDU_CASSERT(sizeof(TaskIterator<int>) == sizeof(TaskIteratorBase),
              sizeof_TaskIterator_must_be_the_same_with_TaskIteratorBase);
namespace /*anonymous*/ {
typedef butil::ResourceId<ExecutionQueueBase> slot_id_t;

inline slot_id_t WARN_UNUSED_RESULT slot_of_id(uint64_t id) {
    slot_id_t slot = { (id & 0xFFFFFFFFul) };
    return slot;
}

inline uint64_t make_id(uint32_t version, slot_id_t slot) {
    return (((uint64_t)version) << 32) | slot.value;
}
}  // namespace anonymous

struct ExecutionQueueVars {
    bvar::Adder<int64_t> running_task_count;
    bvar::Adder<int64_t> execq_count;
    bvar::Adder<int64_t> execq_active_count;
    
    ExecutionQueueVars();
};

ExecutionQueueVars::ExecutionQueueVars()
    : running_task_count("bthread_execq_running_task_count")
    , execq_count("bthread_execq_count")
    , execq_active_count("bthread_execq_active_count") {
}

inline ExecutionQueueVars* get_execq_vars() {
    return butil::get_leaky_singleton<ExecutionQueueVars>();
}

void ExecutionQueueBase::start_execute(TaskNode* node) {
    node->next = TaskNode::UNCONNECTED;
    node->status = UNEXECUTED;
    node->iterated = false;
    if (node->high_priority) {
        // Add _high_priority_tasks before pushing this task into queue to
        // make sure that _execute_tasks sees the newest number when this 
        // task is in the queue. Althouth there might be some useless for 
        // loops in _execute_tasks if this thread is scheduled out at this 
        // point, we think it's just fine.
        _high_priority_tasks.fetch_add(1, butil::memory_order_relaxed);
    }
    TaskNode* const prev_head = _head.exchange(node, butil::memory_order_release);
    if (prev_head != NULL) {
        node->next = prev_head;
        return;
    }
    // Get the right to execute the task, start a bthread to avoid deadlock
    // or stack overflow
    node->next = NULL;
    node->q = this;

    ExecutionQueueVars* const vars = get_execq_vars();
    vars->execq_active_count << 1;
    if (node->in_place) {
        int niterated = 0;
        _execute(node, node->high_priority, &niterated);
        TaskNode* tmp = node;
        // return if no more
        if (node->high_priority) {
            _high_priority_tasks.fetch_sub(niterated, butil::memory_order_relaxed);
        }
        if (!_more_tasks(tmp, &tmp, !node->iterated)) {
            vars->execq_active_count << -1;
            return_task_node(node);
            return;
        }
    }

    if (nullptr == _options.executor) {
        bthread_t tid;
        // We start the execution thread in background instead of foreground as
        // we can't determine whether the code after execute() is urgent (like
        // unlock a pthread_mutex_t) in which case implicit context switch may
        // cause undefined behavior (e.g. deadlock)
        if (bthread_start_background(&tid, &_options.bthread_attr,
                                     _execute_tasks, node) != 0) {
            PLOG(FATAL) << "Fail to start bthread";
            _execute_tasks(node);
        }
    } else {
        if (_options.executor->submit(_execute_tasks, node) != 0) {
            PLOG(FATAL) << "Fail to submit task";
            _execute_tasks(node);
        }
    }
}

void* ExecutionQueueBase::_execute_tasks(void* arg) {
    ExecutionQueueVars* vars = get_execq_vars();
    TaskNode* head = (TaskNode*)arg;
    ExecutionQueueBase* m = (ExecutionQueueBase*)head->q;
    TaskNode* cur_tail = NULL;
    bool destroy_queue = false;
    for (;;) {
        if (head->iterated) {
            CHECK(head->next != NULL);
            TaskNode* saved_head = head;
            head = head->next;
            m->return_task_node(saved_head);
        }
        int rc = 0;
        if (m->_high_priority_tasks.load(butil::memory_order_relaxed) > 0) {
            int nexecuted = 0;
            // Don't care the return value
            rc = m->_execute(head, true, &nexecuted);
            m->_high_priority_tasks.fetch_sub(
                    nexecuted, butil::memory_order_relaxed);
            if (nexecuted == 0) {
                // Some high_priority tasks are not in queue
                sched_yield();
            }
        } else {
            rc = m->_execute(head, false, NULL);
        }
        if (rc == ESTOP) {
            destroy_queue = true;
        }
        // Release TaskNode until uniterated task or last task
        while (head->next != NULL && head->iterated) {
            TaskNode* saved_head = head;
            head = head->next;
            m->return_task_node(saved_head);
        }
        if (cur_tail == NULL) {
            for (cur_tail = head; cur_tail->next != NULL; 
                    cur_tail = cur_tail->next) {}
        }
        // break when no more tasks and head has been executed
        if (!m->_more_tasks(cur_tail, &cur_tail, !head->iterated)) {
            CHECK_EQ(cur_tail, head);
            CHECK(head->iterated);
            m->return_task_node(head);
            break;
        }
    }
    if (destroy_queue) {
        CHECK(m->_head.load(butil::memory_order_relaxed) == NULL);
        CHECK(m->_stopped);
        // Add _join_butex by 2 to make it equal to the next version of the
        // ExecutionQueue from the same slot so that join with old id would
        // return immediatly.
        // 
        // 1: release fence to make join sees the newst changes when it sees
        //    the newst _join_butex
        m->_join_butex->fetch_add(2, butil::memory_order_release/*1*/);
        butex_wake_all(m->_join_butex);
        vars->execq_count << -1;
        butil::return_resource(slot_of_id(m->_this_id));
    }
    vars->execq_active_count << -1;
    return NULL;
}

void ExecutionQueueBase::return_task_node(TaskNode* node) {
    node->clear_before_return(_clear_func);
    butil::return_object<TaskNode>(node);
    get_execq_vars()->running_task_count << -1;
}

void ExecutionQueueBase::_on_recycle() {
    // Push a closed tasks
    while (true) {
        TaskNode* node = butil::get_object<TaskNode>();
        if (BAIDU_LIKELY(node != NULL)) {
            get_execq_vars()->running_task_count << 1;
            node->stop_task = true;
            node->high_priority = false;
            node->in_place = false;
            start_execute(node);
            break;
        }
        CHECK(false) << "Fail to create task_node_t, " << berror();
        ::bthread_usleep(1000);
    }
}

int ExecutionQueueBase::join(uint64_t id) {
    const slot_id_t slot = slot_of_id(id);
    ExecutionQueueBase* const m = butil::address_resource(slot);
    if (m == NULL) {
        // The queue is not created yet, this join is definitely wrong.
        return EINVAL;
    }
    int expected = _version_of_id(id);
    // acquire fence makes this thread see changes before changing _join_butex.
    while (expected == m->_join_butex->load(butil::memory_order_acquire)) {
        if (butex_wait(m->_join_butex, expected, NULL) < 0 &&
            errno != EWOULDBLOCK && errno != EINTR) {
            return errno;
        }
    }
    return 0;
}

int ExecutionQueueBase::stop() {
    const uint32_t id_ver = _version_of_id(_this_id);
    uint64_t vref = _versioned_ref.load(butil::memory_order_relaxed);
    for (;;) {
        if (_version_of_vref(vref) != id_ver) {
            return EINVAL;
        }
        // Try to set version=id_ver+1 (to make later address() return NULL),
        // retry on fail.
        if (_versioned_ref.compare_exchange_strong(
                    vref, _make_vref(id_ver + 1, _ref_of_vref(vref)),
                    butil::memory_order_release,
                    butil::memory_order_relaxed)) {
            // Set _stopped to make lattern execute() fail immediately
            _stopped.store(true, butil::memory_order_release);
            // Deref additionally which is added at creation so that this
            // queue's reference will hit 0(recycle) when no one addresses it.
            _release_additional_reference();
            // NOTE: This queue may be recycled at this point, don't
            // touch anything.
            return 0;
        }
    }
}

int ExecutionQueueBase::_execute(TaskNode* head, bool high_priority, int* niterated) {
    if (head != NULL && head->stop_task) {
        CHECK(head->next == NULL);
        head->iterated = true;
        head->status = EXECUTED;
        TaskIteratorBase iter(NULL, this, true, false);
        _execute_func(_meta, _type_specific_function, iter);
        if (niterated) {
            *niterated = 1;
        }
        return ESTOP;
    }
    TaskIteratorBase iter(head, this, false, high_priority);
    if (iter) {
        _execute_func(_meta, _type_specific_function, iter);
    }
    // We must assign |niterated| with num_iterated even if we couldn't peek
    // any task to execute at the begining, in which case all the iterated 
    // tasks have been cancelled at this point. And we must return the 
    // correct num_iterated() to the caller to update the counter correctly.
    if (niterated) {
        *niterated = iter.num_iterated();
    }
    return 0;
}

TaskNode* ExecutionQueueBase::allocate_node() {
    get_execq_vars()->running_task_count << 1;
    return butil::get_object<TaskNode>();
}

TaskNode* const TaskNode::UNCONNECTED = (TaskNode*)-1L;

ExecutionQueueBase::scoped_ptr_t ExecutionQueueBase::address(uint64_t id) {
    scoped_ptr_t ret;
    const slot_id_t slot = slot_of_id(id);
    ExecutionQueueBase* const m = butil::address_resource(slot);
    if (BAIDU_LIKELY(m != NULL)) {
        // acquire fence makes sure this thread sees latest changes before
        // _dereference()
        const uint64_t vref1 = m->_versioned_ref.fetch_add(
            1, butil::memory_order_acquire);
        const uint32_t ver1 = _version_of_vref(vref1);
        if (ver1 == _version_of_id(id)) {
            ret.reset(m);
            return ret.Pass();
        }

        const uint64_t vref2 = m->_versioned_ref.fetch_sub(
            1, butil::memory_order_release);
        const int32_t nref = _ref_of_vref(vref2);
        if (nref > 1) {
            return ret.Pass();
        } else if (__builtin_expect(nref == 1, 1)) {
            const uint32_t ver2 = _version_of_vref(vref2);
            if ((ver2 & 1)) {
                if (ver1 == ver2 || ver1 + 1 == ver2) {
                    uint64_t expected_vref = vref2 - 1;
                    if (m->_versioned_ref.compare_exchange_strong(
                            expected_vref, _make_vref(ver2 + 1, 0),
                            butil::memory_order_acquire,
                            butil::memory_order_relaxed)) {
                        m->_on_recycle();
                        // We don't return m immediatly when the reference count
                        // reaches 0 as there might be in processing tasks. Instead
                        // _on_recycle would push a `stop_task', after which
                        // is excuted m would be finally reset and returned
                    }
                } else {
                    CHECK(false) << "ref-version=" << ver1
                                 << " unref-version=" << ver2;
                }
            } else {
                CHECK_EQ(ver1, ver2);
                // Addressed a free slot.
            }
        } else {
            CHECK(false) << "Over dereferenced id=" << id;
        }
    }
    return ret.Pass();
}

int ExecutionQueueBase::create(uint64_t* id, const ExecutionQueueOptions* options,
                               execute_func_t execute_func,
                               clear_task_mem clear_func,
                               void* meta, void* type_specific_function) {
    if (execute_func == NULL || clear_func == NULL) {
        return EINVAL;
    }

    slot_id_t slot;
    ExecutionQueueBase* const m = butil::get_resource(&slot, Forbidden());
    if (BAIDU_LIKELY(m != NULL)) {
        m->_execute_func = execute_func;
        m->_clear_func = clear_func;
        m->_meta = meta;
        m->_type_specific_function = type_specific_function;
        CHECK(m->_head.load(butil::memory_order_relaxed) == NULL);
        CHECK_EQ(0, m->_high_priority_tasks.load(butil::memory_order_relaxed));
        ExecutionQueueOptions opt;
        if (options != NULL) {
            opt = *options;   
        }
        m->_options = opt;
        m->_stopped.store(false, butil::memory_order_relaxed);
        m->_this_id = make_id(
                _version_of_vref(m->_versioned_ref.fetch_add(
                                    1, butil::memory_order_release)), slot);
        *id = m->_this_id;
        get_execq_vars()->execq_count << 1;
        return 0;
    }
    return ENOMEM;
}

inline bool TaskIteratorBase::should_break_for_high_priority_tasks() {
    if (!_high_priority && 
            _q->_high_priority_tasks.load(butil::memory_order_relaxed) > 0) {
        _should_break = true;
        return true;
    }
    return false;
}

void TaskIteratorBase::operator++() {
    if (!(*this)) {
        return;
    }
    if (_cur_node->iterated) {
        _cur_node = _cur_node->next;
    }
    if (should_break_for_high_priority_tasks()) {
        return;
    }  // else the next high_priority_task would be delayed for at most one task

    while (_cur_node && !_cur_node->stop_task) {
        if (_high_priority == _cur_node->high_priority) {
            if (!_cur_node->iterated && _cur_node->peek_to_execute()) {
                ++_num_iterated;
                _cur_node->iterated = true;
                return;
            }
            _num_iterated += !_cur_node->iterated;
            _cur_node->iterated = true;
        }
        _cur_node = _cur_node->next;
    }
    return;
}

TaskIteratorBase::~TaskIteratorBase() {
    // Set the iterated tasks as EXECUTED here instead of waiting them to be
    // returned in _start_execute as the high_priority_task might be in the
    // middle of the linked list and is not going to be returned soon
    if (_is_stopped) {
        return;
    }
    while (_head != _cur_node) {
        if (_head->iterated && _head->high_priority == _high_priority) {
            _head->set_executed();
        }
        _head = _head->next;
    }
    if (_should_break && _cur_node != NULL 
            && _cur_node->high_priority == _high_priority && _cur_node->iterated) {
        _cur_node->set_executed();
    }
}

} // namespace bthread