sampler.cpp 6.44 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
gejun's avatar
gejun committed
17

gejun's avatar
gejun committed
18 19 20
// Author: Ge,Jun (gejun@baidu.com)
// Date: Tue Jul 28 18:14:40 CST 2015

21 22
#include "butil/time.h"
#include "butil/memory/singleton_on_pthread_once.h"
gejun's avatar
gejun committed
23 24
#include "bvar/reducer.h"
#include "bvar/detail/sampler.h"
gejun's avatar
gejun committed
25 26
#include "bvar/passive_status.h"
#include "bvar/window.h"
gejun's avatar
gejun committed
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59

namespace bvar {
namespace detail {

const int WARN_NOSLEEP_THRESHOLD = 2;

// Combine two circular linked list into one.
struct CombineSampler {
    void operator()(Sampler* & s1, Sampler* s2) const {
        if (s2 == NULL) {
            return;
        }
        if (s1 == NULL) {
            s1 = s2;
            return;
        }
        s1->InsertBeforeAsList(s2);
    }
};

// Call take_sample() of all scheduled samplers.
// This can be done with regular timer thread, but it's way too slow(global
// contention + log(N) heap manipulations). We need it to be super fast so that
// creation overhead of Window<> is negliable.
// The trick is to use Reducer<Sampler*, CombineSampler>. Each Sampler is
// doubly linked, thus we can reduce multiple Samplers into one cicurlarly
// doubly linked list, and multiple lists into larger lists. We create a
// dedicated thread to periodically get_value() which is just the combined
// list of Samplers. Waking through the list and call take_sample().
// If a Sampler needs to be deleted, we just mark it as unused and the
// deletion is taken place in the thread as well.
class SamplerCollector : public bvar::Reducer<Sampler*, CombineSampler> {
public:
jamesge's avatar
jamesge committed
60 61 62 63 64
    SamplerCollector()
        : _created(false)
        , _stop(false)
        , _cumulated_time_us(0) {
        create_sampling_thread();
gejun's avatar
gejun committed
65 66 67 68 69 70 71 72
    }
    ~SamplerCollector() {
        if (_created) {
            _stop = true;
            pthread_join(_tid, NULL);
            _created = false;
        }
    }
gejun's avatar
gejun committed
73

gejun's avatar
gejun committed
74
private:
jamesge's avatar
jamesge committed
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
    // Support for fork:
    // * The singleton can be null before forking, the child callback will not
    //   be registered.
    // * If the singleton is not null before forking, the child callback will
    //   be registered and the sampling thread will be re-created.
    // * A forked program can be forked again.

    static void child_callback_atfork() {
        butil::get_leaky_singleton<SamplerCollector>()->after_forked_as_child();
    }

    void create_sampling_thread() {
        const int rc = pthread_create(&_tid, NULL, sampling_thread, this);
        if (rc != 0) {
            LOG(FATAL) << "Fail to create sampling_thread, " << berror(rc);
        } else {
            _created = true;
            pthread_atfork(NULL, NULL, child_callback_atfork);
        }
    }

    void after_forked_as_child() {
        _created = false;
        create_sampling_thread();
    }

gejun's avatar
gejun committed
101
    void run();
jamesge's avatar
jamesge committed
102

gejun's avatar
gejun committed
103
    static void* sampling_thread(void* arg) {
jamesge's avatar
jamesge committed
104
        static_cast<SamplerCollector*>(arg)->run();
gejun's avatar
gejun committed
105 106 107
        return NULL;
    }

jamesge's avatar
jamesge committed
108 109 110 111
    static double get_cumulated_time(void* arg) {
        return static_cast<SamplerCollector*>(arg)->_cumulated_time_us / 1000.0 / 1000.0;
    }

gejun's avatar
gejun committed
112 113 114
private:
    bool _created;
    bool _stop;
jamesge's avatar
jamesge committed
115
    pid_t _created_pid;
gejun's avatar
gejun committed
116
    int64_t _cumulated_time_us;
gejun's avatar
gejun committed
117 118 119
    pthread_t _tid;
};

120
#ifndef UNIT_TEST
121 122
static PassiveStatus<double>* s_cumulated_time_bvar = NULL;
static bvar::PerSecond<bvar::PassiveStatus<double> >* s_sampling_thread_usage_bvar = NULL;
123
#endif
jamesge's avatar
jamesge committed
124

gejun's avatar
gejun committed
125
void SamplerCollector::run() {
126
#ifndef UNIT_TEST
jamesge's avatar
jamesge committed
127 128 129 130 131
    // NOTE:
    // * Following vars can't be created on thread's stack since this thread
    //   may be adandoned at any time after forking.
    // * They can't created inside the constructor of SamplerCollector as well,
    //   which results in deadlock.
132 133
    if (s_cumulated_time_bvar == NULL) {
        s_cumulated_time_bvar =
jamesge's avatar
jamesge committed
134 135
            new PassiveStatus<double>(get_cumulated_time, this);
    }
136 137
    if (s_sampling_thread_usage_bvar == NULL) {
        s_sampling_thread_usage_bvar =
jamesge's avatar
jamesge committed
138
            new bvar::PerSecond<bvar::PassiveStatus<double> >(
139
                    "bvar_sampler_collector_usage", s_cumulated_time_bvar, 10);
jamesge's avatar
jamesge committed
140
    }
141
#endif
jamesge's avatar
jamesge committed
142 143 144

    butil::LinkNode<Sampler> root;
    int consecutive_nosleep = 0;
gejun's avatar
gejun committed
145
    while (!_stop) {
146
        int64_t abstime = butil::gettimeofday_us();
gejun's avatar
gejun committed
147 148 149 150 151 152
        Sampler* s = this->reset();
        if (s) {
            s->InsertBeforeAsList(&root);
        }
        int nremoved = 0;
        int nsampled = 0;
153
        for (butil::LinkNode<Sampler>* p = root.next(); p != &root;) {
gejun's avatar
gejun committed
154
            // We may remove p from the list, save next first.
155
            butil::LinkNode<Sampler>* saved_next = p->next();
gejun's avatar
gejun committed
156
            Sampler* s = p->value();
gejun's avatar
gejun committed
157
            s->_mutex.lock();
gejun's avatar
gejun committed
158
            if (!s->_used) {
gejun's avatar
gejun committed
159
                s->_mutex.unlock();
gejun's avatar
gejun committed
160 161 162 163 164
                p->RemoveFromList();
                delete s;
                ++nremoved;
            } else {
                s->take_sample();
gejun's avatar
gejun committed
165
                s->_mutex.unlock();
gejun's avatar
gejun committed
166 167 168 169 170
                ++nsampled;
            }
            p = saved_next;
        }
        bool slept = false;
171
        int64_t now = butil::gettimeofday_us();
gejun's avatar
gejun committed
172 173 174
        _cumulated_time_us += now - abstime;
        abstime += 1000000L;
        while (abstime > now) {
gejun's avatar
gejun committed
175 176
            ::usleep(abstime - now);
            slept = true;
177
            now = butil::gettimeofday_us();
gejun's avatar
gejun committed
178 179 180 181 182 183 184 185 186 187 188 189 190
        }
        if (slept) {
            consecutive_nosleep = 0;
        } else {            
            if (++consecutive_nosleep >= WARN_NOSLEEP_THRESHOLD) {
                consecutive_nosleep = 0;
                LOG(WARNING) << "bvar is busy at sampling for "
                             << WARN_NOSLEEP_THRESHOLD << " seconds!";
            }
        }
    }
}

gejun's avatar
gejun committed
191
Sampler::Sampler() : _used(true) {}
gejun's avatar
gejun committed
192

gejun's avatar
gejun committed
193
Sampler::~Sampler() {}
gejun's avatar
gejun committed
194 195

void Sampler::schedule() {
196
    *butil::get_leaky_singleton<SamplerCollector>() << this;
gejun's avatar
gejun committed
197 198 199
}

void Sampler::destroy() {
gejun's avatar
gejun committed
200
    _mutex.lock();
gejun's avatar
gejun committed
201
    _used = false;
gejun's avatar
gejun committed
202
    _mutex.unlock();
gejun's avatar
gejun committed
203 204 205 206
}

}  // namespace detail
}  // namespace bvar