combiner.h 11.4 KB
Newer Older
gejun's avatar
gejun committed
1
// Copyright (c) 2014 Baidu, Inc.
gejun's avatar
gejun committed
2 3 4 5 6 7 8 9 10 11 12 13 14
// 
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// 
//     http://www.apache.org/licenses/LICENSE-2.0
// 
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

gejun's avatar
gejun committed
15 16 17
// Author Zhangyi Chen (chenzhangyi01@baidu.com)
// Date 2014/09/22 11:57:43

gejun's avatar
gejun committed
18 19
#ifndef  BVAR_COMBINER_H
#define  BVAR_COMBINER_H
gejun's avatar
gejun committed
20 21 22

#include <string>                       // std::string
#include <vector>                       // std::vector
23 24 25 26 27
#include "butil/atomicops.h"             // butil::atomic
#include "butil/scoped_lock.h"           // BAIDU_SCOPED_LOCK
#include "butil/type_traits.h"           // butil::add_cr_non_integral
#include "butil/synchronization/lock.h"  // butil::Lock
#include "butil/containers/linked_list.h"// LinkNode
gejun's avatar
gejun committed
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
#include "bvar/detail/agent_group.h"    // detail::AgentGroup
#include "bvar/detail/is_atomical.h"
#include "bvar/detail/call_op_returning_void.h"

namespace bvar {
namespace detail {

// Parameter to merge_global.
template <typename Combiner>
class GlobalValue {
public:
    typedef typename Combiner::result_type result_type;
    typedef typename Combiner::Agent agent_type;

    GlobalValue(agent_type* a, Combiner* c) : _a(a), _c(c) {}
    ~GlobalValue() {}

    // Call this method to unlock tls element and lock the combiner.
    // Unlocking tls element avoids potential deadlock with
    // AgentCombiner::reset(), which also means that tls element may be
    // changed during calling of this method. BE AWARE OF THIS!
    // After this method is called (and before unlock), tls element and
    // global_result will not be changed provided this method is called
    // from the thread owning the agent.
    result_type* lock() {
        _a->element._lock.Release();
        _c->_lock.Acquire();
        return &_c->_global_result;
    }

    // Call this method to unlock the combiner and lock tls element again.
    void unlock() {
        _c->_lock.Release();
        _a->element._lock.Acquire();
    }

private:
    agent_type* _a;
    Combiner* _c;
};

// Abstraction of tls element whose operations are all atomic.
template <typename T, typename Enabler = void>
class ElementContainer {
template <typename> friend class GlobalValue;
public:
    void load(T* out) {
75
        butil::AutoLock guard(_lock);
gejun's avatar
gejun committed
76 77 78 79
        *out = _value;
    }

    void store(const T& new_value) {
80
        butil::AutoLock guard(_lock);
gejun's avatar
gejun committed
81 82 83 84
        _value = new_value;
    }

    void exchange(T* prev, const T& new_value) {
85
        butil::AutoLock guard(_lock);
gejun's avatar
gejun committed
86 87 88 89 90 91
        *prev = _value;
        _value = new_value;
    }

    template <typename Op, typename T1>
    void modify(const Op &op, const T1 &value2) {
92
        butil::AutoLock guard(_lock);
gejun's avatar
gejun committed
93 94 95 96 97 98 99 100 101 102 103 104 105
        call_op_returning_void(op, _value, value2);
    }

    // [Unique]
    template <typename Op, typename GlobalValue>
    void merge_global(const Op &op, GlobalValue & global_value) {
        _lock.Acquire();
        op(global_value, _value);
        _lock.Release();
    }

private:
    T _value;
106
    butil::Lock _lock;
gejun's avatar
gejun committed
107 108 109 110
};

template <typename T>
class ElementContainer<
111
    T, typename butil::enable_if<is_atomical<T>::value>::type> {
gejun's avatar
gejun committed
112 113 114 115
public:
    // We don't need any memory fencing here, every op is relaxed.
    
    inline void load(T* out) {
116
        *out = _value.load(butil::memory_order_relaxed);
gejun's avatar
gejun committed
117 118 119
    }

    inline void store(T new_value) {
120
        _value.store(new_value, butil::memory_order_relaxed);
gejun's avatar
gejun committed
121 122 123
    }

    inline void exchange(T* prev, T new_value) {
124
        *prev = _value.exchange(new_value, butil::memory_order_relaxed);
gejun's avatar
gejun committed
125 126 127 128 129
    }

    // [Unique]
    inline bool compare_exchange_weak(T& expected, T new_value) {
        return _value.compare_exchange_weak(expected, new_value,
130
                                            butil::memory_order_relaxed);
gejun's avatar
gejun committed
131 132 133 134
    }

    template <typename Op, typename T1>
    void modify(const Op &op, const T1 &value2) {
135
        T old_value = _value.load(butil::memory_order_relaxed);
gejun's avatar
gejun committed
136 137 138 139 140 141 142
        T new_value = old_value;
        call_op_returning_void(op, new_value, value2);
        // There's a contention with the reset operation of combiner,
        // if the tls value has been modified during _op, the
        // compare_exchange_weak operation will fail and recalculation is
        // to be processed according to the new version of value
        while (!_value.compare_exchange_weak(
143
                   old_value, new_value, butil::memory_order_relaxed)) {
gejun's avatar
gejun committed
144 145 146 147 148 149
            new_value = old_value;
            call_op_returning_void(op, new_value, value2);
        }
    }

private:
150
    butil::atomic<T> _value;
gejun's avatar
gejun committed
151 152 153 154 155 156 157 158 159 160
};

template <typename ResultTp, typename ElementTp, typename BinaryOp>
class AgentCombiner {
public:
    typedef ResultTp result_type;
    typedef ElementTp element_type;
    typedef AgentCombiner<ResultTp, ElementTp, BinaryOp> self_type;
friend class GlobalValue<self_type>;
    
161
    struct Agent : public butil::LinkNode<Agent> {
gejun's avatar
gejun committed
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235
        Agent() : combiner(NULL) {}

        ~Agent() {
            if (combiner) {
                combiner->commit_and_erase(this);
                combiner = NULL;
            }
        }
        
        void reset(const ElementTp& val, self_type* c) {
            combiner = c;
            element.store(val);
        }

        // Call op(GlobalValue<Combiner> &, ElementTp &) to merge tls element
        // into global_result. The common impl. is:
        //   struct XXX {
        //       void operator()(GlobalValue<Combiner> & global_value,
        //                       ElementTp & local_value) const {
        //           if (test_for_merging(local_value)) {
        // 
        //               // Unlock tls element and lock combiner. Obviously
        //               // tls element can be changed during lock().
        //               ResultTp* g = global_value.lock();
        // 
        //               // *g and local_value are not changed provided
        //               // merge_global is called from the thread owning
        //               // the agent.
        //               merge(*g, local_value);
        //
        //               // unlock combiner and lock tls element again.
        //               global_value.unlock();
        //           }
        //
        //           // safe to modify local_value because it's already locked
        //           // or locked again after merging.
        //           ...
        //       }
        //   };
        // 
        // NOTE: Only available to non-atomic types.
        template <typename Op>
        void merge_global(const Op &op) {
            GlobalValue<self_type> g(this, combiner);
            element.merge_global(op, g);
        }

        self_type *combiner;
        ElementContainer<ElementTp> element;
    };

    typedef detail::AgentGroup<Agent> AgentGroup;

    explicit AgentCombiner(const ResultTp result_identity = ResultTp(),
                           const ElementTp element_identity = ElementTp(),
                           const BinaryOp& op = BinaryOp())
        : _id(AgentGroup::create_new_agent())
        , _op(op)
        , _global_result(result_identity)
        , _result_identity(result_identity)
        , _element_identity(element_identity) {
    }

    ~AgentCombiner() {
        if (_id >= 0) {
            clear_all_agents();
            AgentGroup::destroy_agent(_id);
            _id = -1;
        }
    }
    
    // [Threadsafe] May be called from anywhere
    ResultTp combine_agents() const {
        ElementTp tls_value;
236
        butil::AutoLock guard(_lock);
gejun's avatar
gejun committed
237
        ResultTp ret = _global_result;
238
        for (butil::LinkNode<Agent>* node = _agents.head();
gejun's avatar
gejun committed
239 240 241 242 243 244 245
             node != _agents.end(); node = node->next()) {
            node->value()->element.load(&tls_value);
            call_op_returning_void(_op, ret, tls_value);
        }
        return ret;
    }

246
    typename butil::add_cr_non_integral<ElementTp>::type element_identity() const 
gejun's avatar
gejun committed
247
    { return _element_identity; }
248
    typename butil::add_cr_non_integral<ResultTp>::type result_identity() const 
gejun's avatar
gejun committed
249 250 251 252 253
    { return _result_identity; }

    // [Threadsafe] May be called from anywhere.
    ResultTp reset_all_agents() {
        ElementTp prev;
254
        butil::AutoLock guard(_lock);
gejun's avatar
gejun committed
255 256
        ResultTp tmp = _global_result;
        _global_result = _result_identity;
257
        for (butil::LinkNode<Agent>* node = _agents.head();
gejun's avatar
gejun committed
258 259 260 261 262 263 264 265 266 267 268 269 270
             node != _agents.end(); node = node->next()) {
            node->value()->element.exchange(&prev, _element_identity);
            call_op_returning_void(_op, tmp, prev);
        }
        return tmp;
    }

    // Always called from the thread owning the agent.
    void commit_and_erase(Agent *agent) {
        if (NULL == agent) {
            return;
        }
        ElementTp local;
271
        butil::AutoLock guard(_lock);
gejun's avatar
gejun committed
272 273 274 275 276 277 278 279 280 281 282 283 284
        // TODO: For non-atomic types, we can pass the reference to op directly.
        // But atomic types cannot. The code is a little troublesome to write.
        agent->element.load(&local);
        call_op_returning_void(_op, _global_result, local);
        agent->RemoveFromList();
    }

    // Always called from the thread owning the agent
    void commit_and_clear(Agent *agent) {
        if (NULL == agent) {
            return;
        }
        ElementTp prev;
285
        butil::AutoLock guard(_lock);
gejun's avatar
gejun committed
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306
        agent->element.exchange(&prev, _element_identity);
        call_op_returning_void(_op, _global_result, prev);
    }

    // We need this function to be as fast as possible.
    inline Agent* get_or_create_tls_agent() {
        Agent* agent = AgentGroup::get_tls_agent(_id);
        if (!agent) {
            // Create the agent
            agent = AgentGroup::get_or_create_tls_agent(_id);
            if (NULL == agent) {
                LOG(FATAL) << "Fail to create agent";
                return NULL;
            }
        }
        if (agent->combiner) {
            return agent;
        }
        agent->reset(_element_identity, this);
        // TODO: Is uniqueness-checking necessary here?
        {
307
            butil::AutoLock guard(_lock);
gejun's avatar
gejun committed
308 309 310 311 312 313
            _agents.Append(agent);
        }
        return agent;
    }

    void clear_all_agents() {
314
        butil::AutoLock guard(_lock);
gejun's avatar
gejun committed
315 316 317
        // reseting agents is must because the agent object may be reused.
        // Set element to be default-constructed so that if it's non-pod,
        // internal allocations should be released.
318
        for (butil::LinkNode<Agent>* 
gejun's avatar
gejun committed
319 320
                node = _agents.head(); node != _agents.end();) {
            node->value()->reset(ElementTp(), NULL);
321
            butil::LinkNode<Agent>* const saved_next =  node->next();
gejun's avatar
gejun committed
322 323 324 325 326 327 328 329 330 331 332 333
            node->RemoveFromList();
            node = saved_next;
        }
    }

    const BinaryOp& op() const { return _op; }

    bool valid() const { return _id >= 0; }

private:
    AgentId                                     _id;
    BinaryOp                                    _op;
334
    mutable butil::Lock                          _lock;
gejun's avatar
gejun committed
335 336 337
    ResultTp                                    _global_result;
    ResultTp                                    _result_identity;
    ElementTp                                   _element_identity;
338
    butil::LinkedList<Agent>                     _agents;
gejun's avatar
gejun committed
339 340 341 342 343
};

}  // namespace detail
}  // namespace bvar

gejun's avatar
gejun committed
344
#endif  // BVAR_COMBINER_H