timer_thread.h 3.77 KB
// bthread - A M:N threading library to make applications more concurrent.
// Copyright (c) 2014 Baidu, Inc.
// 
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// 
//     http://www.apache.org/licenses/LICENSE-2.0
// 
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Author: Ge,Jun (gejun@baidu.com)

#ifndef BAIDU_BTHREAD_TIMER_THREAD_H
#define BAIDU_BTHREAD_TIMER_THREAD_H

#include <vector>                     // std::vector
#include <pthread.h>                  // pthread_*
#include "butil/atomicops.h" 
#include "butil/time.h"                // time utilities
#include "bthread/mutex.h"

namespace bthread {

struct TimerThreadOptions {
    // Scheduling requests are hashed into different bucket to improve
    // scalability. However bigger num_buckets may NOT result in more scalable 
    // schedule() because bigger values also make each buckets more sparse
    // and more likely to lock the global mutex. You better not change
    // this value, just leave it to us.
    // Default: 13
    size_t num_buckets;

    // If this field is not empty, some bvar for reporting stats of TimerThread
    // will be exposed with this prefix.
    // Default: ""
    std::string bvar_prefix;

    // Constructed with default options.
    TimerThreadOptions();
};

// TimerThread is a separate thread to run scheduled tasks at specific time.
// At most one task runs at any time, don't put time-consuming code in the
// callback otherwise the task may delay other tasks significantly.
class TimerThread {
public:
    struct Task;
    class Bucket;

    typedef uint64_t TaskId;
    const static TaskId INVALID_TASK_ID;

    TimerThread();
    ~TimerThread();

    // Start the timer thread.
    // This method should only be called once.
    // return 0 if success, errno otherwise.
    int start(const TimerThreadOptions* options);

    // Stop the timer thread. Later schedule() will return INVALID_TASK_ID.
    void stop_and_join();

    // Schedule |fn(arg)| to run at realtime |abstime| approximately.
    // Returns: identifier of the scheduled task, INVALID_TASK_ID on error.
    TaskId schedule(void (*fn)(void*), void* arg, const timespec& abstime);

    // Prevent the task denoted by `task_id' from running. `task_id' must be
    // returned by schedule() ever.
    // Returns:
    //   0   -  Removed the task which does not run yet
    //  -1   -  The task does not exist.
    //   1   -  The task is just running.
    int unschedule(TaskId task_id);

    // Get identifier of internal pthread.
    // Returns (pthread_t)0 if start() is not called yet.
    pthread_t thread_id() const { return _thread; }
    
private:
    // the timer thread will run this method.
    void run();
    static void* run_this(void* arg);

    bool _started;            // whether the timer thread was started successfully.
    butil::atomic<bool> _stop;

    TimerThreadOptions _options;
    Bucket* _buckets;        // list of tasks to be run
    internal::FastPthreadMutex _mutex;    // protect _nearest_run_time
    int64_t _nearest_run_time;
    // the futex for wake up timer thread. can't use _nearest_run_time because
    // it's 64-bit.
    int _nsignals;
    pthread_t _thread;       // all scheduled task will be run on this thread
};

// Get the global TimerThread which never quits.
TimerThread* get_or_create_global_timer_thread();
TimerThread* get_global_timer_thread();

}   // end namespace bthread

#endif  // BAIDU_BTHREAD_TIMER_THREAD_H