// Copyright (c) 2014 Baidu, Inc.
// 
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// 
//     http://www.apache.org/licenses/LICENSE-2.0
// 
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Author: chenzhangyi01@baidu.com, gejun@baidu.com
// Date 2014/09/24 16:01:08

#ifndef  BVAR_REDUCER_H
#define  BVAR_REDUCER_H

#include <limits>                                 // std::numeric_limits
#include "butil/logging.h"                         // LOG()
#include "butil/type_traits.h"                     // butil::add_cr_non_integral
#include "butil/class_name.h"                      // class_name_str
#include "bvar/variable.h"                        // Variable
#include "bvar/detail/combiner.h"                 // detail::AgentCombiner
#include "bvar/detail/sampler.h"                  // ReducerSampler
#include "bvar/detail/series.h"
#include "bvar/window.h"

namespace bvar {

// Reduce multiple values into one with `Op': e1 Op e2 Op e3 ...
// `Op' shall satisfy:
//   - associative:     a Op (b Op c) == (a Op b) Op c
//   - commutative:     a Op b == b Op a;
//   - no side effects: a Op b never changes if a and b are fixed. 
// otherwise the result is undefined.
//
// For performance issues, we don't let Op return value, instead it shall
// set the result to the first parameter in-place. Namely to add two values,
// "+=" should be implemented rather than "+".
//
// Reducer works for non-primitive T which satisfies:
//   - T() should be the identity of Op.
//   - stream << v should compile and put description of v into the stream
// Example:
// class MyType {
// friend std::ostream& operator<<(std::ostream& os, const MyType&);
// public:
//     MyType() : _x(0) {}
//     explicit MyType(int x) : _x(x) {}
//     void operator+=(const MyType& rhs) const {
//         _x += rhs._x;
//     }
// private:
//     int _x;
// };
// std::ostream& operator<<(std::ostream& os, const MyType& value) {
//     return os << "MyType{" << value._x << "}";
// }
// bvar::Adder<MyType> my_type_sum;
// my_type_sum << MyType(1) << MyType(2) << MyType(3);
// LOG(INFO) << my_type_sum;  // "MyType{6}"

template <typename T, typename Op, typename InvOp = detail::VoidOp>
class Reducer : public Variable {
public:
    typedef typename detail::AgentCombiner<T, T, Op> combiner_type;
    typedef typename combiner_type::Agent agent_type;
    typedef detail::ReducerSampler<Reducer, T, Op, InvOp> sampler_type;
    class SeriesSampler : public detail::Sampler {
    public:
        SeriesSampler(Reducer* owner, const Op& op)
            : _owner(owner), _series(op) {}
        ~SeriesSampler() {}
        void take_sample() { _series.append(_owner->get_value()); }
        void describe(std::ostream& os) { _series.describe(os, NULL); }
    private:
        Reducer* _owner;
        detail::Series<T, Op> _series;
    };

public:
    // The `identify' must satisfy: identity Op a == a
    Reducer(typename butil::add_cr_non_integral<T>::type identity = T(),
            const Op& op = Op(),
            const InvOp& inv_op = InvOp())
        : _combiner(identity, identity, op)
        , _sampler(NULL)
        , _series_sampler(NULL)
        , _inv_op(inv_op) {
    }

    ~Reducer() {
        // Calling hide() manually is a MUST required by Variable.
        hide();
        if (_sampler) {
            _sampler->destroy();
            _sampler = NULL;
        }
        if (_series_sampler) {
            _series_sampler->destroy();
            _series_sampler = NULL;
        }
    }

    // Add a value.
    // Returns self reference for chaining.
    Reducer& operator<<(typename butil::add_cr_non_integral<T>::type value);

    // Get reduced value.
    // Notice that this function walks through threads that ever add values
    // into this reducer. You should avoid calling it frequently.
    T get_value() const {
        CHECK(!(butil::is_same<InvOp, detail::VoidOp>::value) || _sampler == NULL)
            << "You should not call Reducer<" << butil::class_name_str<T>()
            << ", " << butil::class_name_str<Op>() << ">::get_value() when a"
            << " Window<> is used because the operator does not have inverse.";
        return _combiner.combine_agents();
    }


    // Reset the reduced value to T().
    // Returns the reduced value before reset.
    T reset() { return _combiner.reset_all_agents(); }

    // Implement Variable::describe() and Variable::get_value().
    void describe(std::ostream& os, bool quote_string) const {
        if (butil::is_same<T, std::string>::value && quote_string) {
            os << '"' << get_value() << '"';
        } else {
            os << get_value();
        }
    }
    
#ifdef BAIDU_INTERNAL
    void get_value(boost::any* value) const { *value = get_value(); }
#endif

    // True if this reducer is constructed successfully.
    bool valid() const { return _combiner.valid(); }

    // Get instance of Op.
    const Op& op() const { return _combiner.op(); }
    const InvOp& inv_op() const { return _inv_op; }
    
    sampler_type* get_sampler() {
        if (NULL == _sampler) {
            _sampler = new sampler_type(this);
            _sampler->schedule();
        }
        return _sampler;
    }

    int describe_series(std::ostream& os, const SeriesOptions& options) const {
        if (_series_sampler == NULL) {
            return 1;
        }
        if (!options.test_only) {
            _series_sampler->describe(os);
        }
        return 0;
    }
    
protected:
    int expose_impl(const butil::StringPiece& prefix,
                    const butil::StringPiece& name,
                    DisplayFilter display_filter) {
        const int rc = Variable::expose_impl(prefix, name, display_filter);
        if (rc == 0 &&
            _series_sampler == NULL &&
            !butil::is_same<InvOp, detail::VoidOp>::value &&
            !butil::is_same<T, std::string>::value &&
            FLAGS_save_series) {
            _series_sampler = new SeriesSampler(this, _combiner.op());
            _series_sampler->schedule();
        }
        return rc;
    }

private:
    combiner_type   _combiner;
    sampler_type* _sampler;
    SeriesSampler* _series_sampler;
    InvOp _inv_op;
};

template <typename T, typename Op, typename InvOp>
inline Reducer<T, Op, InvOp>& Reducer<T, Op, InvOp>::operator<<(
    typename butil::add_cr_non_integral<T>::type value) {
    // It's wait-free for most time
    agent_type* agent = _combiner.get_or_create_tls_agent();
    if (__builtin_expect(!agent, 0)) {
        LOG(FATAL) << "Fail to create agent";
        return *this;
    }
    agent->element.modify(_combiner.op(), value);
    return *this;
}

// =================== Common reducers ===================

// bvar::Adder<int> sum;
// sum << 1 << 2 << 3 << 4;
// LOG(INFO) << sum.get_value(); // 10
// Commonly used functors
namespace detail {
template <typename Tp>
struct AddTo {
    void operator()(Tp & lhs, 
                    typename butil::add_cr_non_integral<Tp>::type rhs) const
    { lhs += rhs; }
};
template <typename Tp>
struct MinusFrom {
    void operator()(Tp & lhs, 
                    typename butil::add_cr_non_integral<Tp>::type rhs) const
    { lhs -= rhs; }
};
}
template <typename T>
class Adder : public Reducer<T, detail::AddTo<T>, detail::MinusFrom<T> > {
public:
    typedef Reducer<T, detail::AddTo<T>, detail::MinusFrom<T> > Base;
    typedef T value_type;
    typedef typename Base::sampler_type sampler_type;
public:
    Adder() : Base() {}
    explicit Adder(const butil::StringPiece& name) : Base() {
        this->expose(name);
    }
    Adder(const butil::StringPiece& prefix,
          const butil::StringPiece& name) : Base() {
        this->expose_as(prefix, name);
    }
    ~Adder() { Variable::hide(); }
};

// bvar::Maxer<int> max_value;
// max_value << 1 << 2 << 3 << 4;
// LOG(INFO) << max_value.get_value(); // 4
namespace detail {
template <typename Tp> 
struct MaxTo {
    void operator()(Tp & lhs, 
                    typename butil::add_cr_non_integral<Tp>::type rhs) const {
        // Use operator< as well.
        if (lhs < rhs) {
            lhs = rhs;
        }
    }
};
class LatencyRecorderBase;
}
template <typename T>
class Maxer : public Reducer<T, detail::MaxTo<T> > {
public:
    typedef Reducer<T, detail::MaxTo<T> > Base;
    typedef T value_type;
    typedef typename Base::sampler_type sampler_type;
public:
    Maxer() : Base(std::numeric_limits<T>::min()) {}
    explicit Maxer(const butil::StringPiece& name)
        : Base(std::numeric_limits<T>::min()) {
        this->expose(name);
    }
    Maxer(const butil::StringPiece& prefix, const butil::StringPiece& name)
        : Base(std::numeric_limits<T>::min()) {
        this->expose_as(prefix, name);
    }
    ~Maxer() { Variable::hide(); }
private:
    friend class detail::LatencyRecorderBase;
    // The following private funcition a now used in LatencyRecorder,
    // it's dangerous so we don't make them public
    explicit Maxer(T default_value) : Base(default_value) {
    }
    Maxer(T default_value, const butil::StringPiece& prefix,
          const butil::StringPiece& name) 
        : Base(default_value) {
        this->expose_as(prefix, name);
    }
    Maxer(T default_value, const butil::StringPiece& name) : Base(default_value) {
        this->expose(name);
    }
};

// bvar::Miner<int> min_value;
// min_value << 1 << 2 << 3 << 4;
// LOG(INFO) << min_value.get_value(); // 1
namespace detail {

template <typename Tp> 
struct MinTo {
    void operator()(Tp & lhs, 
                    typename butil::add_cr_non_integral<Tp>::type rhs) const {
        if (rhs < lhs) {
            lhs = rhs;
        }
    }
};

}  // namespace detail

template <typename T>
class Miner : public Reducer<T, detail::MinTo<T> > {
public:
    typedef Reducer<T, detail::MinTo<T> > Base;
    typedef T value_type;
    typedef typename Base::sampler_type sampler_type;
public:
    Miner() : Base(std::numeric_limits<T>::max()) {}
    explicit Miner(const butil::StringPiece& name)
        : Base(std::numeric_limits<T>::max()) {
        this->expose(name);
    }
    Miner(const butil::StringPiece& prefix, const butil::StringPiece& name)
        : Base(std::numeric_limits<T>::max()) {
        this->expose_as(prefix, name);
    }
    ~Miner() { Variable::hide(); }
};

}  // namespace bvar

#endif  //BVAR_REDUCER_H