// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // Date 2014/09/22 11:57:43 #ifndef BVAR_COMBINER_H #define BVAR_COMBINER_H #include <string> // std::string #include <vector> // std::vector #include "butil/atomicops.h" // butil::atomic #include "butil/scoped_lock.h" // BAIDU_SCOPED_LOCK #include "butil/type_traits.h" // butil::add_cr_non_integral #include "butil/synchronization/lock.h" // butil::Lock #include "butil/containers/linked_list.h"// LinkNode #include "bvar/detail/agent_group.h" // detail::AgentGroup #include "bvar/detail/is_atomical.h" #include "bvar/detail/call_op_returning_void.h" namespace bvar { namespace detail { // Parameter to merge_global. template <typename Combiner> class GlobalValue { public: typedef typename Combiner::result_type result_type; typedef typename Combiner::Agent agent_type; GlobalValue(agent_type* a, Combiner* c) : _a(a), _c(c) {} ~GlobalValue() {} // Call this method to unlock tls element and lock the combiner. // Unlocking tls element avoids potential deadlock with // AgentCombiner::reset(), which also means that tls element may be // changed during calling of this method. BE AWARE OF THIS! // After this method is called (and before unlock), tls element and // global_result will not be changed provided this method is called // from the thread owning the agent. result_type* lock() { _a->element._lock.Release(); _c->_lock.Acquire(); return &_c->_global_result; } // Call this method to unlock the combiner and lock tls element again. void unlock() { _c->_lock.Release(); _a->element._lock.Acquire(); } private: agent_type* _a; Combiner* _c; }; // Abstraction of tls element whose operations are all atomic. template <typename T, typename Enabler = void> class ElementContainer { template <typename> friend class GlobalValue; public: void load(T* out) { butil::AutoLock guard(_lock); *out = _value; } void store(const T& new_value) { butil::AutoLock guard(_lock); _value = new_value; } void exchange(T* prev, const T& new_value) { butil::AutoLock guard(_lock); *prev = _value; _value = new_value; } template <typename Op, typename T1> void modify(const Op &op, const T1 &value2) { butil::AutoLock guard(_lock); call_op_returning_void(op, _value, value2); } // [Unique] template <typename Op, typename GlobalValue> void merge_global(const Op &op, GlobalValue & global_value) { _lock.Acquire(); op(global_value, _value); _lock.Release(); } private: T _value; butil::Lock _lock; }; template <typename T> class ElementContainer< T, typename butil::enable_if<is_atomical<T>::value>::type> { public: // We don't need any memory fencing here, every op is relaxed. inline void load(T* out) { *out = _value.load(butil::memory_order_relaxed); } inline void store(T new_value) { _value.store(new_value, butil::memory_order_relaxed); } inline void exchange(T* prev, T new_value) { *prev = _value.exchange(new_value, butil::memory_order_relaxed); } // [Unique] inline bool compare_exchange_weak(T& expected, T new_value) { return _value.compare_exchange_weak(expected, new_value, butil::memory_order_relaxed); } template <typename Op, typename T1> void modify(const Op &op, const T1 &value2) { T old_value = _value.load(butil::memory_order_relaxed); T new_value = old_value; call_op_returning_void(op, new_value, value2); // There's a contention with the reset operation of combiner, // if the tls value has been modified during _op, the // compare_exchange_weak operation will fail and recalculation is // to be processed according to the new version of value while (!_value.compare_exchange_weak( old_value, new_value, butil::memory_order_relaxed)) { new_value = old_value; call_op_returning_void(op, new_value, value2); } } private: butil::atomic<T> _value; }; template <typename ResultTp, typename ElementTp, typename BinaryOp> class AgentCombiner { public: typedef ResultTp result_type; typedef ElementTp element_type; typedef AgentCombiner<ResultTp, ElementTp, BinaryOp> self_type; friend class GlobalValue<self_type>; struct Agent : public butil::LinkNode<Agent> { Agent() : combiner(NULL) {} ~Agent() { if (combiner) { combiner->commit_and_erase(this); combiner = NULL; } } void reset(const ElementTp& val, self_type* c) { combiner = c; element.store(val); } // Call op(GlobalValue<Combiner> &, ElementTp &) to merge tls element // into global_result. The common impl. is: // struct XXX { // void operator()(GlobalValue<Combiner> & global_value, // ElementTp & local_value) const { // if (test_for_merging(local_value)) { // // // Unlock tls element and lock combiner. Obviously // // tls element can be changed during lock(). // ResultTp* g = global_value.lock(); // // // *g and local_value are not changed provided // // merge_global is called from the thread owning // // the agent. // merge(*g, local_value); // // // unlock combiner and lock tls element again. // global_value.unlock(); // } // // // safe to modify local_value because it's already locked // // or locked again after merging. // ... // } // }; // // NOTE: Only available to non-atomic types. template <typename Op> void merge_global(const Op &op) { GlobalValue<self_type> g(this, combiner); element.merge_global(op, g); } self_type *combiner; ElementContainer<ElementTp> element; }; typedef detail::AgentGroup<Agent> AgentGroup; explicit AgentCombiner(const ResultTp result_identity = ResultTp(), const ElementTp element_identity = ElementTp(), const BinaryOp& op = BinaryOp()) : _id(AgentGroup::create_new_agent()) , _op(op) , _global_result(result_identity) , _result_identity(result_identity) , _element_identity(element_identity) { } ~AgentCombiner() { if (_id >= 0) { clear_all_agents(); AgentGroup::destroy_agent(_id); _id = -1; } } // [Threadsafe] May be called from anywhere ResultTp combine_agents() const { ElementTp tls_value; butil::AutoLock guard(_lock); ResultTp ret = _global_result; for (butil::LinkNode<Agent>* node = _agents.head(); node != _agents.end(); node = node->next()) { node->value()->element.load(&tls_value); call_op_returning_void(_op, ret, tls_value); } return ret; } typename butil::add_cr_non_integral<ElementTp>::type element_identity() const { return _element_identity; } typename butil::add_cr_non_integral<ResultTp>::type result_identity() const { return _result_identity; } // [Threadsafe] May be called from anywhere. ResultTp reset_all_agents() { ElementTp prev; butil::AutoLock guard(_lock); ResultTp tmp = _global_result; _global_result = _result_identity; for (butil::LinkNode<Agent>* node = _agents.head(); node != _agents.end(); node = node->next()) { node->value()->element.exchange(&prev, _element_identity); call_op_returning_void(_op, tmp, prev); } return tmp; } // Always called from the thread owning the agent. void commit_and_erase(Agent *agent) { if (NULL == agent) { return; } ElementTp local; butil::AutoLock guard(_lock); // TODO: For non-atomic types, we can pass the reference to op directly. // But atomic types cannot. The code is a little troublesome to write. agent->element.load(&local); call_op_returning_void(_op, _global_result, local); agent->RemoveFromList(); } // Always called from the thread owning the agent void commit_and_clear(Agent *agent) { if (NULL == agent) { return; } ElementTp prev; butil::AutoLock guard(_lock); agent->element.exchange(&prev, _element_identity); call_op_returning_void(_op, _global_result, prev); } // We need this function to be as fast as possible. inline Agent* get_or_create_tls_agent() { Agent* agent = AgentGroup::get_tls_agent(_id); if (!agent) { // Create the agent agent = AgentGroup::get_or_create_tls_agent(_id); if (NULL == agent) { LOG(FATAL) << "Fail to create agent"; return NULL; } } if (agent->combiner) { return agent; } agent->reset(_element_identity, this); // TODO: Is uniqueness-checking necessary here? { butil::AutoLock guard(_lock); _agents.Append(agent); } return agent; } void clear_all_agents() { butil::AutoLock guard(_lock); // reseting agents is must because the agent object may be reused. // Set element to be default-constructed so that if it's non-pod, // internal allocations should be released. for (butil::LinkNode<Agent>* node = _agents.head(); node != _agents.end();) { node->value()->reset(ElementTp(), NULL); butil::LinkNode<Agent>* const saved_next = node->next(); node->RemoveFromList(); node = saved_next; } } const BinaryOp& op() const { return _op; } bool valid() const { return _id >= 0; } private: AgentId _id; BinaryOp _op; mutable butil::Lock _lock; ResultTp _global_result; ResultTp _result_identity; ElementTp _element_identity; butil::LinkedList<Agent> _agents; }; } // namespace detail } // namespace bvar #endif // BVAR_COMBINER_H