// bthread - A M:N threading library to make applications more concurrent. // Copyright (c) 2012 Baidu, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Author: Ge,Jun (gejun@baidu.com) // Date: Tue Jul 10 17:40:58 CST 2012 #ifndef BTHREAD_WORK_STEALING_QUEUE_H #define BTHREAD_WORK_STEALING_QUEUE_H #include "butil/macros.h" #include "butil/atomicops.h" #include "butil/logging.h" namespace bthread { template <typename T> class WorkStealingQueue { public: WorkStealingQueue() : _bottom(1) , _capacity(0) , _buffer(NULL) , _top(1) { } ~WorkStealingQueue() { delete [] _buffer; _buffer = NULL; } int init(size_t capacity) { if (_capacity != 0) { LOG(ERROR) << "Already initialized"; return -1; } if (capacity == 0) { LOG(ERROR) << "Invalid capacity=" << capacity; return -1; } if (capacity & (capacity - 1)) { LOG(ERROR) << "Invalid capacity=" << capacity << " which must be power of 2"; return -1; } _buffer = new(std::nothrow) T[capacity]; if (NULL == _buffer) { return -1; } _capacity = capacity; return 0; } // Push an item into the queue. // Returns true on pushed. // May run in parallel with steal(). // Never run in parallel with pop() or another push(). bool push(const T& x) { const size_t b = _bottom.load(butil::memory_order_relaxed); const size_t t = _top.load(butil::memory_order_acquire); if (b >= t + _capacity) { // Full queue. return false; } _buffer[b & (_capacity - 1)] = x; _bottom.store(b + 1, butil::memory_order_release); return true; } // Pop an item from the queue. // Returns true on popped and the item is written to `val'. // May run in parallel with steal(). // Never run in parallel with push() or another pop(). bool pop(T* val) { const size_t b = _bottom.load(butil::memory_order_relaxed); size_t t = _top.load(butil::memory_order_relaxed); if (t >= b) { // fast check since we call pop() in each sched. // Stale _top which is smaller should not enter this branch. return false; } const size_t newb = b - 1; _bottom.store(newb, butil::memory_order_relaxed); butil::atomic_thread_fence(butil::memory_order_seq_cst); t = _top.load(butil::memory_order_relaxed); if (t > newb) { _bottom.store(b, butil::memory_order_relaxed); return false; } *val = _buffer[newb & (_capacity - 1)]; if (t != newb) { return true; } // Single last element, compete with steal() const bool popped = _top.compare_exchange_strong( t, t + 1, butil::memory_order_seq_cst, butil::memory_order_relaxed); _bottom.store(b, butil::memory_order_relaxed); return popped; } // Steal one item from the queue. // Returns true on stolen. // May run in parallel with push() pop() or another steal(). bool steal(T* val) { size_t t = _top.load(butil::memory_order_acquire); size_t b = _bottom.load(butil::memory_order_acquire); if (t >= b) { // Permit false negative for performance considerations. return false; } do { butil::atomic_thread_fence(butil::memory_order_seq_cst); b = _bottom.load(butil::memory_order_acquire); if (t >= b) { return false; } *val = _buffer[t & (_capacity - 1)]; } while (!_top.compare_exchange_strong(t, t + 1, butil::memory_order_seq_cst, butil::memory_order_relaxed)); return true; } size_t volatile_size() const { const size_t b = _bottom.load(butil::memory_order_relaxed); const size_t t = _top.load(butil::memory_order_relaxed); return (b <= t ? 0 : (b - t)); } size_t capacity() const { return _capacity; } private: // Copying a concurrent structure makes no sense. DISALLOW_COPY_AND_ASSIGN(WorkStealingQueue); butil::atomic<size_t> _bottom; size_t _capacity; T* _buffer; butil::atomic<size_t> BAIDU_CACHELINE_ALIGNMENT _top; }; } // namespace bthread #endif // BTHREAD_WORK_STEALING_QUEUE_H