// Copyright (c) 2014 Baidu, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Author: chenzhangyi01@baidu.com, gejun@baidu.com // Date 2014/09/24 16:01:08 #ifndef BVAR_REDUCER_H #define BVAR_REDUCER_H #include <limits> // std::numeric_limits #include "butil/logging.h" // LOG() #include "butil/type_traits.h" // butil::add_cr_non_integral #include "butil/class_name.h" // class_name_str #include "bvar/variable.h" // Variable #include "bvar/detail/combiner.h" // detail::AgentCombiner #include "bvar/detail/sampler.h" // ReducerSampler #include "bvar/detail/series.h" #include "bvar/window.h" namespace bvar { // Reduce multiple values into one with `Op': e1 Op e2 Op e3 ... // `Op' shall satisfy: // - associative: a Op (b Op c) == (a Op b) Op c // - commutative: a Op b == b Op a; // - no side effects: a Op b never changes if a and b are fixed. // otherwise the result is undefined. // // For performance issues, we don't let Op return value, instead it shall // set the result to the first parameter in-place. Namely to add two values, // "+=" should be implemented rather than "+". // // Reducer works for non-primitive T which satisfies: // - T() should be the identity of Op. // - stream << v should compile and put description of v into the stream // Example: // class MyType { // friend std::ostream& operator<<(std::ostream& os, const MyType&); // public: // MyType() : _x(0) {} // explicit MyType(int x) : _x(x) {} // void operator+=(const MyType& rhs) const { // _x += rhs._x; // } // private: // int _x; // }; // std::ostream& operator<<(std::ostream& os, const MyType& value) { // return os << "MyType{" << value._x << "}"; // } // bvar::Adder<MyType> my_type_sum; // my_type_sum << MyType(1) << MyType(2) << MyType(3); // LOG(INFO) << my_type_sum; // "MyType{6}" template <typename T, typename Op, typename InvOp = detail::VoidOp> class Reducer : public Variable { public: typedef typename detail::AgentCombiner<T, T, Op> combiner_type; typedef typename combiner_type::Agent agent_type; typedef detail::ReducerSampler<Reducer, T, Op, InvOp> sampler_type; class SeriesSampler : public detail::Sampler { public: SeriesSampler(Reducer* owner, const Op& op) : _owner(owner), _series(op) {} ~SeriesSampler() {} void take_sample() { _series.append(_owner->get_value()); } void describe(std::ostream& os) { _series.describe(os, NULL); } private: Reducer* _owner; detail::Series<T, Op> _series; }; public: // The `identify' must satisfy: identity Op a == a Reducer(typename butil::add_cr_non_integral<T>::type identity = T(), const Op& op = Op(), const InvOp& inv_op = InvOp()) : _combiner(identity, identity, op) , _sampler(NULL) , _series_sampler(NULL) , _inv_op(inv_op) { } ~Reducer() { // Calling hide() manually is a MUST required by Variable. hide(); if (_sampler) { _sampler->destroy(); _sampler = NULL; } if (_series_sampler) { _series_sampler->destroy(); _series_sampler = NULL; } } // Add a value. // Returns self reference for chaining. Reducer& operator<<(typename butil::add_cr_non_integral<T>::type value); // Get reduced value. // Notice that this function walks through threads that ever add values // into this reducer. You should avoid calling it frequently. T get_value() const { CHECK(!(butil::is_same<InvOp, detail::VoidOp>::value) || _sampler == NULL) << "You should not call Reducer<" << butil::class_name_str<T>() << ", " << butil::class_name_str<Op>() << ">::get_value() when a" << " Window<> is used because the operator does not have inverse."; return _combiner.combine_agents(); } // Reset the reduced value to T(). // Returns the reduced value before reset. T reset() { return _combiner.reset_all_agents(); } // Implement Variable::describe() and Variable::get_value(). void describe(std::ostream& os, bool quote_string) const { if (butil::is_same<T, std::string>::value && quote_string) { os << '"' << get_value() << '"'; } else { os << get_value(); } } #ifdef BAIDU_INTERNAL void get_value(boost::any* value) const { *value = get_value(); } #endif // True if this reducer is constructed successfully. bool valid() const { return _combiner.valid(); } // Get instance of Op. const Op& op() const { return _combiner.op(); } const InvOp& inv_op() const { return _inv_op; } sampler_type* get_sampler() { if (NULL == _sampler) { _sampler = new sampler_type(this); _sampler->schedule(); } return _sampler; } int describe_series(std::ostream& os, const SeriesOptions& options) const { if (_series_sampler == NULL) { return 1; } if (!options.test_only) { _series_sampler->describe(os); } return 0; } protected: int expose_impl(const butil::StringPiece& prefix, const butil::StringPiece& name, DisplayFilter display_filter) { const int rc = Variable::expose_impl(prefix, name, display_filter); if (rc == 0 && _series_sampler == NULL && !butil::is_same<InvOp, detail::VoidOp>::value && !butil::is_same<T, std::string>::value && FLAGS_save_series) { _series_sampler = new SeriesSampler(this, _combiner.op()); _series_sampler->schedule(); } return rc; } private: combiner_type _combiner; sampler_type* _sampler; SeriesSampler* _series_sampler; InvOp _inv_op; }; template <typename T, typename Op, typename InvOp> inline Reducer<T, Op, InvOp>& Reducer<T, Op, InvOp>::operator<<( typename butil::add_cr_non_integral<T>::type value) { // It's wait-free for most time agent_type* agent = _combiner.get_or_create_tls_agent(); if (__builtin_expect(!agent, 0)) { LOG(FATAL) << "Fail to create agent"; return *this; } agent->element.modify(_combiner.op(), value); return *this; } // =================== Common reducers =================== // bvar::Adder<int> sum; // sum << 1 << 2 << 3 << 4; // LOG(INFO) << sum.get_value(); // 10 // Commonly used functors namespace detail { template <typename Tp> struct AddTo { void operator()(Tp & lhs, typename butil::add_cr_non_integral<Tp>::type rhs) const { lhs += rhs; } }; template <typename Tp> struct MinusFrom { void operator()(Tp & lhs, typename butil::add_cr_non_integral<Tp>::type rhs) const { lhs -= rhs; } }; } template <typename T> class Adder : public Reducer<T, detail::AddTo<T>, detail::MinusFrom<T> > { public: typedef Reducer<T, detail::AddTo<T>, detail::MinusFrom<T> > Base; typedef T value_type; typedef typename Base::sampler_type sampler_type; public: Adder() : Base() {} explicit Adder(const butil::StringPiece& name) : Base() { this->expose(name); } Adder(const butil::StringPiece& prefix, const butil::StringPiece& name) : Base() { this->expose_as(prefix, name); } ~Adder() { Variable::hide(); } }; // bvar::Maxer<int> max_value; // max_value << 1 << 2 << 3 << 4; // LOG(INFO) << max_value.get_value(); // 4 namespace detail { template <typename Tp> struct MaxTo { void operator()(Tp & lhs, typename butil::add_cr_non_integral<Tp>::type rhs) const { // Use operator< as well. if (lhs < rhs) { lhs = rhs; } } }; class LatencyRecorderBase; } template <typename T> class Maxer : public Reducer<T, detail::MaxTo<T> > { public: typedef Reducer<T, detail::MaxTo<T> > Base; typedef T value_type; typedef typename Base::sampler_type sampler_type; public: Maxer() : Base(std::numeric_limits<T>::min()) {} explicit Maxer(const butil::StringPiece& name) : Base(std::numeric_limits<T>::min()) { this->expose(name); } Maxer(const butil::StringPiece& prefix, const butil::StringPiece& name) : Base(std::numeric_limits<T>::min()) { this->expose_as(prefix, name); } ~Maxer() { Variable::hide(); } private: friend class detail::LatencyRecorderBase; // The following private funcition a now used in LatencyRecorder, // it's dangerous so we don't make them public explicit Maxer(T default_value) : Base(default_value) { } Maxer(T default_value, const butil::StringPiece& prefix, const butil::StringPiece& name) : Base(default_value) { this->expose_as(prefix, name); } Maxer(T default_value, const butil::StringPiece& name) : Base(default_value) { this->expose(name); } }; // bvar::Miner<int> min_value; // min_value << 1 << 2 << 3 << 4; // LOG(INFO) << min_value.get_value(); // 1 namespace detail { template <typename Tp> struct MinTo { void operator()(Tp & lhs, typename butil::add_cr_non_integral<Tp>::type rhs) const { if (rhs < lhs) { lhs = rhs; } } }; } // namespace detail template <typename T> class Miner : public Reducer<T, detail::MinTo<T> > { public: typedef Reducer<T, detail::MinTo<T> > Base; typedef T value_type; typedef typename Base::sampler_type sampler_type; public: Miner() : Base(std::numeric_limits<T>::max()) {} explicit Miner(const butil::StringPiece& name) : Base(std::numeric_limits<T>::max()) { this->expose(name); } Miner(const butil::StringPiece& prefix, const butil::StringPiece& name) : Base(std::numeric_limits<T>::max()) { this->expose_as(prefix, name); } ~Miner() { Variable::hide(); } }; } // namespace bvar #endif //BVAR_REDUCER_H