// Copyright (c) 2015 Baidu.com, Inc. All Rights Reserved

// Author: Zhangyi Chen (chenzhangyi01@baidu.com)
// Date: 2015/09/15 10:44:17

#ifndef  BVAR_DETAIL_PERCENTILE_H
#define  BVAR_DETAIL_PERCENTILE_H

#include <string.h>                     // memset memcmp
#include <stdint.h>                     // uint32_t
#include <limits>                       // std::numeric_limits
#include <ostream>                      // std::ostream
#include <algorithm>                    // std::sort
#include <math.h>                       // ceil
#include "base/macros.h"                // ARRAY_SIZE
#include "bvar/reducer.h"               // Reducer
#include "bvar/window.h"                // Window
#include "bvar/detail/combiner.h"       // AgentCombiner
#include "bvar/detail/sampler.h"        // ReducerSampler
#include "base/fast_rand.h"

namespace bvar {
namespace detail {

// Round of expectation of a rational number |a/b| to a natural number.
inline unsigned long round_of_expectation(unsigned long a, unsigned long b) {
    if (BAIDU_UNLIKELY(b == 0)) {
        return 0;
    }
    return a / b + (base::fast_rand_less_than(b) < a % b);
}

// Storing latencies inside a interval.
template <size_t SAMPLE_SIZE>
class PercentileInterval {
public:
    PercentileInterval()
        : _num_added(0)
        , _sorted(false)
        , _num_samples(0) {
    }

    // Get index-th sample in ascending order.
    uint32_t get_sample_at(size_t index) {
        const size_t saved_num = _num_samples;
        if (index >= saved_num) {
            if (saved_num == 0) {
                return 0;
            }
            index = saved_num - 1;
        }
        if (!_sorted) {
            std::sort(_samples, _samples + saved_num);
            _sorted = true;
        }
        CHECK_EQ(saved_num, _num_samples) << "You must call get_number() on"
            " a unchanging PercentileInterval";
        return _samples[index];
    }

    // Add samples of another interval. This function tries to make each
    // sample in merged _samples has (approximately) equal probability to
    // remain.
    // This method is invoked when merging ThreadLocalPercentileSamples in to
    // GlobalPercentileSamples
    template <size_t size2>
    void merge(const PercentileInterval<size2> &rhs) {
        if (rhs._num_added == 0) {
            return;
        }
        BAIDU_CASSERT(SAMPLE_SIZE >= size2,
                      must_merge_small_interval_into_larger_one_currently);
        CHECK_EQ(rhs._num_samples, rhs._num_added);
        // Assume that the probability of each sample in |this| is a0/b0 and
        // the probability of each sample in |rhs| is a1/b1.
        // We are going to randomly pick some samples from |this| and |rhs| to
        // satisfy the constraint that each sample stands for the probability
        // of 
        //     * 1 (SAMPLE_SIZE >= |b0 + b1|), which indicates that no sample
        //       has been dropped 
        //     * SAMPLE_SIZE / |b0 + b1| (SAMPLE_SIZE < |b0 + b1|)
        // So we should keep |b0*SAMPLE_SIZE/(b0+b1)| from |this|
        // |b1*SAMPLE_SIZE/(b0+b1)| from |rhs|.
        if (_num_added + rhs._num_added <= SAMPLE_SIZE) {
            // No sample should be dropped
            CHECK_EQ(_num_samples, _num_added)
                << "_num_added=" << _num_added
                << " rhs._num_added" << rhs._num_added
                << " _num_samples=" << _num_samples
                << " rhs._num_samples=" << rhs._num_samples
                << " SAMPLE_SIZE=" << SAMPLE_SIZE
                << " size2=" << size2;
            memcpy(_samples + _num_samples, rhs._samples, 
                   sizeof(_samples[0]) * rhs._num_samples);
            _num_samples += rhs._num_samples;
        } else {
            // |num_remain| must be less than _num_samples:
            // if _num_added = _num_samples:
            //    SAMPLE_SIZE / (_num_added + rhs._num_added) < 1 so that
            //    num_remain < _num_added = _num_samples
            // otherwise:
            //    _num_samples = SAMPLE_SIZE;
            //    _num_added / (_num_added + rhs._num_added) < 1 so that
            //    num_remain < SAMPLE_SIZE = _num_added
            size_t num_remain = round_of_expectation(
                    _num_added * SAMPLE_SIZE, _num_added + rhs._num_added);
            CHECK_LE(num_remain, _num_samples);
            // Randomly drop samples of this
            for (size_t i = _num_samples; i > num_remain; --i) {
                _samples[base::fast_rand_less_than(i)] = _samples[i - 1];
            }
            const size_t num_remain_from_rhs = SAMPLE_SIZE - num_remain;
            CHECK_LE(num_remain_from_rhs, rhs._num_samples);
            // Have to copy data from rhs to shuffle since it's const
            DEFINE_SMALL_ARRAY(uint32_t, tmp, rhs._num_samples, 64);
            memcpy(tmp, rhs._samples, sizeof(uint32_t) * rhs._num_samples);
            for (size_t i = 0; i < num_remain_from_rhs; ++i) {
                const int index = base::fast_rand_less_than(rhs._num_samples - i);
                _samples[num_remain++] = tmp[index];
                tmp[index] = tmp[rhs._num_samples - i - 1];
            }
            _num_samples = num_remain;
            CHECK_EQ(_num_samples, SAMPLE_SIZE);
        }
        _num_added += rhs._num_added;
    }

    // Randomly pick n samples from mutable_rhs to |this|
    template <size_t size2>
    void merge_with_expectation(PercentileInterval<size2>& mutable_rhs, size_t n) {
        CHECK(n <= mutable_rhs._num_samples);
        _num_added += mutable_rhs._num_added;
        if (_num_samples + n <= SAMPLE_SIZE && n == mutable_rhs._num_samples) {
            memcpy(_samples + _num_samples, mutable_rhs._samples, sizeof(_samples[0]) * n);
            _num_samples += n;
            return;
        }
        for (size_t i = 0; i < n; ++i) {
            size_t index = base::fast_rand_less_than(mutable_rhs._num_samples - i);
            if (_num_samples < SAMPLE_SIZE) {
                _samples[_num_samples++] = mutable_rhs._samples[index];
            } else {
                _samples[base::fast_rand_less_than(_num_samples)]
                        = mutable_rhs._samples[index];
            }
            std::swap(mutable_rhs._samples[index],
                      mutable_rhs._samples[mutable_rhs._num_samples - i - 1]);
        }
    }

    // Add an unsigned 32-bit latency (what percentile actually accepts) to a
    // non-full interval, which is invoked by Percentile::operator<< to add a
    // sample into the ThreadLocalPercentileSamples.
    // Returns true if the input was stored.
    bool add32(uint32_t x) {
        if (BAIDU_UNLIKELY(_num_samples >= SAMPLE_SIZE)) {
            LOG(ERROR) << "This interval was full";
            return false;
        }
        ++_num_added;
        _samples[_num_samples++] = x;
        return true;
    }

    // Add a signed latency.
    bool add64(int64_t x) {
        if (x >= 0) {
            return add32((uint32_t)x);
        }
        return false;
    }

    // Remove all samples inside.
    void clear() {
        _num_added = 0;
        _sorted = false;
        _num_samples = 0;
    }

    // True if no more room for new samples.
    bool full() const { return _num_samples == SAMPLE_SIZE; }

    // True if there's no samples.
    bool empty() const { return !_num_samples; }

    // #samples ever added by calling add*()
    uint32_t added_count() const { return _num_added; }

    // #samples stored.
    uint32_t sample_count() const { return _num_samples; }

    // For debuggin.
    void describe(std::ostream &os) const {
        os << "(num_added=" << added_count() << ")[";
        for (size_t j = 0; j < _num_samples; ++j) {
            os << ' ' << _samples[j];
        }
        os << " ]";
    }

    // True if two PercentileInterval are exactly same, namely same # of added and
    // same samples, mainly for debuggin.
    bool operator==(const PercentileInterval& rhs) const {
        return (_num_added == rhs._num_added &&
                _num_samples == rhs._num_samples &&
                memcmp(_samples, rhs._samples,  _num_samples * sizeof(uint32_t)) == 0);
    }

private:
template <size_t size2> friend class PercentileInterval;
    BAIDU_CASSERT(SAMPLE_SIZE <= 65536, SAMPLE_SIZE_must_be_16bit);

    uint32_t _num_added;
    bool _sorted;
    uint16_t _num_samples;
    uint32_t _samples[SAMPLE_SIZE];
};

static const size_t NUM_INTERVALS = 32;

// This declartion is a must for gcc 3.4
class AddLatency;

// Group of PercentileIntervals.
template <size_t SAMPLE_SIZE_IN>
class PercentileSamples {
public:
friend class AddLatency;

    static const size_t SAMPLE_SIZE = SAMPLE_SIZE_IN;
    
    PercentileSamples() {
        memset(this, 0, sizeof(PercentileSamples));
    }

    ~PercentileSamples() {
        for (size_t i = 0; i < NUM_INTERVALS; ++i) {
            if (_intervals[i]) {
                delete _intervals[i];
            }
        }
    }

    // Copy-construct from another PercentileSamples.
    // Copy/assigning happen at per-second scale. should be OK.
    PercentileSamples(const PercentileSamples& rhs) {
        _num_added = rhs._num_added;
        for (size_t i = 0; i < NUM_INTERVALS; ++i) {
            if (rhs._intervals[i] && !rhs._intervals[i]->empty()) {
                _intervals[i] = new PercentileInterval<SAMPLE_SIZE>(*rhs._intervals[i]);
            } else {
                _intervals[i] = NULL;
            }
        }
    }

    // Assign from another PercentileSamples.
    // Notice that we keep empty _intervals to avoid future allocations.
    void operator=(const PercentileSamples& rhs) {
        _num_added = rhs._num_added;
        for (size_t i = 0; i < NUM_INTERVALS; ++i) {
            if (rhs._intervals[i] && !rhs._intervals[i]->empty()) {
                get_interval_at(i) = *rhs._intervals[i];
            } else if (_intervals[i]) {
                _intervals[i]->clear();
            }
        }
    }
    
    // Get the `ratio'-ile value. E.g. 0.99 means 99%-ile value.
    // Since we store samples in different intervals internally. We first
    // address the interval by multiplying ratio with _num_added, then
    // find the sample inside the interval. We've tested an alternative
    // method that store all samples together w/o any intervals (or in another
    // word, only one interval), the method is much simpler but is not as
    // stable as current impl. CDF plotted by the method changes dramatically
    // from seconds to seconds. It seems that separating intervals probably
    // keep more long-tail values.
    uint32_t get_number(double ratio) {
        size_t n = (size_t)ceil(ratio * _num_added);
        if (n > _num_added) {
            n = _num_added;
        } else if (n == 0) {
            return 0;
        }
        for (size_t i = 0; i < NUM_INTERVALS; ++i) {
            if (_intervals[i] == NULL) {
                continue;
            }
            PercentileInterval<SAMPLE_SIZE>& invl = *_intervals[i];
            if (n <= invl.added_count()) {
                size_t sample_n = n * invl.sample_count() / invl.added_count();
                size_t sample_index = (sample_n ? sample_n - 1 : 0);
                return invl.get_sample_at(sample_index);
            }
            n -= invl.added_count();
        }
        CHECK(false) << "Can't reach here";
        return std::numeric_limits<uint32_t>::max();
    }

    // Add samples in another PercentileSamples.
    template <size_t size2>
    void merge(const PercentileSamples<size2> &rhs) {
        _num_added += rhs._num_added;
        for (size_t i = 0; i < NUM_INTERVALS; ++i) {
            if (rhs._intervals[i] && !rhs._intervals[i]->empty()) {
                get_interval_at(i).merge(*rhs._intervals[i]);
            }
        }
    }

    // Combine multiple into a single PercentileSamples
    template <typename Iterator>
    void combine_of(const Iterator& begin, const Iterator& end) {
        if (_num_added) {
            // Very unlikely
            for (size_t i = 0; i < NUM_INTERVALS; ++i) {
                if (_intervals[i]) {
                    _intervals[i]->clear();
                }
            }
            _num_added = 0;
        }

        for (Iterator iter = begin; iter != end; ++iter) {
            _num_added += iter->_num_added;
        }

        // Calculate probabilities for each interval
        for (size_t i = 0; i < NUM_INTERVALS; ++i) {
            size_t total = 0;
            size_t total_sample=0;
            for (Iterator iter = begin; iter != end; ++iter) {
                if (iter->_intervals[i]) {
                    total += iter->_intervals[i]->added_count();
                    total_sample += iter->_intervals[i]->sample_count();
                }
            }
            if (total == 0) {
                // Empty interval
                continue;
            }


            // Consider that sub interval took |a| samples out of |b| totally,
            // each sample won the probability of |a/b| according to the
            // algorithm of add32(), now we should pick some samples into the
            // combined interval that satisfied each sample has the
            // probability of |SAMPLE_SIZE/total|, so each sample has the
            // probability of |(SAMPLE_SIZE*b)/(a*total) to remain and the
            // expected number of samples in this interval is
            // |(SAMPLE_SIZE*b)/total|
            for (Iterator iter = begin; iter != end; ++iter) {
                if (!iter->_intervals[i] || iter->_intervals[i]->empty()) {
                    continue;
                }
                typename base::add_reference<BAIDU_TYPEOF(*(iter->_intervals[i]))>::type
                        invl = *(iter->_intervals[i]);
                if (total <= SAMPLE_SIZE) {
                    get_interval_at(i).merge_with_expectation(
                            invl, invl.sample_count());
                    continue;
                }
                // Each 
                const size_t b = invl.added_count();
                const size_t remain = std::min(
                        round_of_expectation(b * SAMPLE_SIZE, total),
                        (size_t)invl.sample_count());
                get_interval_at(i).merge_with_expectation(invl, remain);
            }
        }
    }

   // For debuggin.
    void describe(std::ostream &os) const {
        os << this << "{num_added=" << _num_added;
        for (size_t i = 0; i < NUM_INTERVALS; ++i) {
            if (_intervals[i] && !_intervals[i]->empty()) {
                os << " interval[" << i << "]=";
                _intervals[i]->describe(os);
            }
        }
        os << '}';
    }

    // True if intervals of two PercentileSamples are exactly same.
    bool operator==(const PercentileSamples& rhs) const {
        for (size_t i = 0; i < NUM_INTERVALS; ++i) {
            if (_intervals != rhs._intervals[i]) {
                return false;
            }
        }
        return true;
    }

private:
template <size_t size1> friend class PercentileSamples;

    // Get/create interval on-demand.
    PercentileInterval<SAMPLE_SIZE>& get_interval_at(size_t index) {
        if (_intervals[index] == NULL) {
            _intervals[index] = new PercentileInterval<SAMPLE_SIZE>;
        }
        return *_intervals[index];
    }
    // sum of _num_added of all intervals. we update this value after any
    // changes to intervals inside to make it O(1)-time accessible.
    size_t _num_added;
    PercentileInterval<SAMPLE_SIZE>* _intervals[NUM_INTERVALS];
};

template <size_t sz> const size_t PercentileSamples<sz>::SAMPLE_SIZE;

template <size_t size>
std::ostream &operator<<(std::ostream &os, const PercentileInterval<size> &p) {
    p.describe(os);
    return os;
}

template <size_t size>
std::ostream &operator<<(std::ostream &os, const PercentileSamples<size> &p) {
    p.describe(os);
    return os;
}

// NOTE: we intentionally minus 2 uint32_t from sample-size to make the struct
// size be power of 2 and more friendly to memory allocators.
typedef PercentileSamples<254> GlobalPercentileSamples;
typedef PercentileSamples<30> ThreadLocalPercentileSamples;

// A specialized reducer for finding the percentile of latencies.
// NOTE: DON'T use it directly, use LatencyRecorder instead.
class Percentile {
public:
    struct AddPercentileSamples {
        template <size_t size1, size_t size2>
        void operator()(PercentileSamples<size1> &b1, 
                        const PercentileSamples<size2> &b2) const {
            b1.merge(b2);
        }
    };

    typedef GlobalPercentileSamples                         value_type;
    typedef ReducerSampler<Percentile, 
                           GlobalPercentileSamples,
                           AddPercentileSamples, VoidOp>    sampler_type;
    typedef AgentCombiner <GlobalPercentileSamples,
                           ThreadLocalPercentileSamples,
                           AddPercentileSamples>            combiner_type;
    typedef combiner_type::Agent                            agent_type;
    Percentile();
    ~Percentile();

    AddPercentileSamples op() const { return AddPercentileSamples(); }
    VoidOp inv_op() const { return VoidOp(); }

    // The sampler for windows over percentile.
    sampler_type* get_sampler() {
        if (NULL == _sampler) {
            _sampler = new sampler_type(this);
            _sampler->schedule();
        }
        return _sampler;
    }
    
    value_type reset();
    
    value_type get_value() const;
    
    Percentile& operator<<(int64_t latency);

    bool valid() const { return _combiner != NULL && _combiner->valid(); }
    
    // This name is useful for warning negative latencies in operator<<
    void set_debug_name(const base::StringPiece& name) {
        _debug_name.assign(name.data(), name.size());
    }

private:
    DISALLOW_COPY_AND_ASSIGN(Percentile);

    combiner_type*          _combiner;
    sampler_type*           _sampler;
    std::string _debug_name;
};

}  // namespace detail
}  // namespace bvar

#endif  //BVAR_DETAIL_PERCENTILE_H