sampler.cpp 4.89 KB
Newer Older
gejun's avatar
gejun committed
1
// Copyright (c) 2015 Baidu, Inc.
gejun's avatar
gejun committed
2 3 4 5 6 7 8 9 10 11 12 13 14
// 
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// 
//     http://www.apache.org/licenses/LICENSE-2.0
// 
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

gejun's avatar
gejun committed
15 16 17
// Author: Ge,Jun (gejun@baidu.com)
// Date: Tue Jul 28 18:14:40 CST 2015

18 19
#include "butil/time.h"
#include "butil/memory/singleton_on_pthread_once.h"
gejun's avatar
gejun committed
20 21
#include "bvar/reducer.h"
#include "bvar/detail/sampler.h"
gejun's avatar
gejun committed
22 23
#include "bvar/passive_status.h"
#include "bvar/window.h"
gejun's avatar
gejun committed
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56

namespace bvar {
namespace detail {

const int WARN_NOSLEEP_THRESHOLD = 2;

// Combine two circular linked list into one.
struct CombineSampler {
    void operator()(Sampler* & s1, Sampler* s2) const {
        if (s2 == NULL) {
            return;
        }
        if (s1 == NULL) {
            s1 = s2;
            return;
        }
        s1->InsertBeforeAsList(s2);
    }
};

// Call take_sample() of all scheduled samplers.
// This can be done with regular timer thread, but it's way too slow(global
// contention + log(N) heap manipulations). We need it to be super fast so that
// creation overhead of Window<> is negliable.
// The trick is to use Reducer<Sampler*, CombineSampler>. Each Sampler is
// doubly linked, thus we can reduce multiple Samplers into one cicurlarly
// doubly linked list, and multiple lists into larger lists. We create a
// dedicated thread to periodically get_value() which is just the combined
// list of Samplers. Waking through the list and call take_sample().
// If a Sampler needs to be deleted, we just mark it as unused and the
// deletion is taken place in the thread as well.
class SamplerCollector : public bvar::Reducer<Sampler*, CombineSampler> {
public:
gejun's avatar
gejun committed
57
    SamplerCollector() : _created(false), _stop(false), _cumulated_time_us(0) {
gejun's avatar
gejun committed
58 59 60 61 62 63 64 65 66 67 68 69 70 71
        int rc = pthread_create(&_tid, NULL, sampling_thread, this);
        if (rc != 0) {
            LOG(FATAL) << "Fail to create sampling_thread, " << berror(rc);
        } else {
            _created = true;
        }
    }
    ~SamplerCollector() {
        if (_created) {
            _stop = true;
            pthread_join(_tid, NULL);
            _created = false;
        }
    }
gejun's avatar
gejun committed
72 73 74 75

    static double get_cumulated_time(void* arg) {
        return ((SamplerCollector*)arg)->_cumulated_time_us / 1000.0 / 1000.0;
    }
gejun's avatar
gejun committed
76 77 78 79 80 81 82 83 84 85 86 87
    
private:
    void run();
    
    static void* sampling_thread(void* arg) {
        ((SamplerCollector*)arg)->run();
        return NULL;
    }

private:
    bool _created;
    bool _stop;
gejun's avatar
gejun committed
88
    int64_t _cumulated_time_us;
gejun's avatar
gejun committed
89 90 91 92
    pthread_t _tid;
};

void SamplerCollector::run() {
93
    butil::LinkNode<Sampler> root;
gejun's avatar
gejun committed
94
    int consecutive_nosleep = 0;
95
#ifndef UNIT_TEST
gejun's avatar
gejun committed
96
    PassiveStatus<double> cumulated_time(get_cumulated_time, this);
97 98 99
    bvar::PerSecond<bvar::PassiveStatus<double> > usage(
            "bvar_sampler_collector_usage", &cumulated_time, 10);
#endif
gejun's avatar
gejun committed
100
    while (!_stop) {
101
        int64_t abstime = butil::gettimeofday_us();
gejun's avatar
gejun committed
102 103 104 105 106 107
        Sampler* s = this->reset();
        if (s) {
            s->InsertBeforeAsList(&root);
        }
        int nremoved = 0;
        int nsampled = 0;
108
        for (butil::LinkNode<Sampler>* p = root.next(); p != &root;) {
gejun's avatar
gejun committed
109
            // We may remove p from the list, save next first.
110
            butil::LinkNode<Sampler>* saved_next = p->next();
gejun's avatar
gejun committed
111
            Sampler* s = p->value();
gejun's avatar
gejun committed
112
            s->_mutex.lock();
gejun's avatar
gejun committed
113
            if (!s->_used) {
gejun's avatar
gejun committed
114
                s->_mutex.unlock();
gejun's avatar
gejun committed
115 116 117 118 119
                p->RemoveFromList();
                delete s;
                ++nremoved;
            } else {
                s->take_sample();
gejun's avatar
gejun committed
120
                s->_mutex.unlock();
gejun's avatar
gejun committed
121 122 123 124 125
                ++nsampled;
            }
            p = saved_next;
        }
        bool slept = false;
126
        int64_t now = butil::gettimeofday_us();
gejun's avatar
gejun committed
127 128 129
        _cumulated_time_us += now - abstime;
        abstime += 1000000L;
        while (abstime > now) {
gejun's avatar
gejun committed
130 131
            ::usleep(abstime - now);
            slept = true;
132
            now = butil::gettimeofday_us();
gejun's avatar
gejun committed
133 134 135 136 137 138 139 140 141 142 143 144 145
        }
        if (slept) {
            consecutive_nosleep = 0;
        } else {            
            if (++consecutive_nosleep >= WARN_NOSLEEP_THRESHOLD) {
                consecutive_nosleep = 0;
                LOG(WARNING) << "bvar is busy at sampling for "
                             << WARN_NOSLEEP_THRESHOLD << " seconds!";
            }
        }
    }
}

gejun's avatar
gejun committed
146
Sampler::Sampler() : _used(true) {}
gejun's avatar
gejun committed
147

gejun's avatar
gejun committed
148
Sampler::~Sampler() {}
gejun's avatar
gejun committed
149 150

void Sampler::schedule() {
151
    *butil::get_leaky_singleton<SamplerCollector>() << this;
gejun's avatar
gejun committed
152 153 154
}

void Sampler::destroy() {
gejun's avatar
gejun committed
155
    _mutex.lock();
gejun's avatar
gejun committed
156
    _used = false;
gejun's avatar
gejun committed
157
    _mutex.unlock();
gejun's avatar
gejun committed
158 159 160 161
}

}  // namespace detail
}  // namespace bvar