// Copyright (c) 2012 Baidu, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Author: Ge,Jun (gejun@baidu.com) // Date: Sat Aug 18 12:42:16 CST 2012 // A thread-unsafe bounded queue(ring buffer). It can push/pop from both // sides and is more handy than thread-safe queues in single thread. Use // boost::lockfree::spsc_queue or boost::lockfree::queue in multi-threaded // scenarios. #ifndef BUTIL_BOUNDED_QUEUE_H #define BUTIL_BOUNDED_QUEUE_H #include "butil/macros.h" #include "butil/logging.h" namespace butil { // [Create a on-stack small queue] // char storage[64]; // butil::BoundedQueue<int> q(storage, sizeof(storage), butil::NOT_OWN_STORAGE); // q.push(1); // q.push(2); // ... // [Initialize a class-member queue] // class Foo { // ... // BoundQueue<int> _queue; // }; // int Foo::init() { // BoundedQueue<int> tmp(capacity); // if (!tmp.initialized()) { // LOG(ERROR) << "Fail to create _queue"; // return -1; // } // tmp.swap(_queue); // } enum StorageOwnership { OWNS_STORAGE, NOT_OWN_STORAGE }; template <typename T> class BoundedQueue { public: // You have to pass the memory for storing items at creation. // The queue contains at most memsize/sizeof(T) items. BoundedQueue(void* mem, size_t memsize, StorageOwnership ownership) : _count(0) , _cap(memsize / sizeof(T)) , _start(0) , _ownership(ownership) , _items(mem) { DCHECK(_items); }; // Construct a queue with the given capacity. // The malloc() may fail sliently, call initialized() to test validity // of the queue. explicit BoundedQueue(size_t capacity) : _count(0) , _cap(capacity) , _start(0) , _ownership(OWNS_STORAGE) , _items(malloc(capacity * sizeof(T))) { DCHECK(_items); }; BoundedQueue() : _count(0) , _cap(0) , _start(0) , _ownership(NOT_OWN_STORAGE) , _items(NULL) { }; ~BoundedQueue() { clear(); if (_ownership == OWNS_STORAGE) { free(_items); _items = NULL; } } // Push |item| into bottom side of this queue. // Returns true on success, false if queue is full. bool push(const T& item) { if (_count < _cap) { new ((T*)_items + _mod(_start + _count, _cap)) T(item); ++_count; return true; } return false; } // Push |item| into bottom side of this queue. If the queue is full, // pop topmost item first. void elim_push(const T& item) { if (_count < _cap) { new ((T*)_items + _mod(_start + _count, _cap)) T(item); ++_count; } else { ((T*)_items)[_start] = item; _start = _mod(_start + 1, _cap); } } // Push a default-constructed item into bottom side of this queue // Returns address of the item inside this queue T* push() { if (_count < _cap) { return new ((T*)_items + _mod(_start + _count++, _cap)) T(); } return NULL; } // Push |item| into top side of this queue // Returns true on success, false if queue is full. bool push_top(const T& item) { if (_count < _cap) { _start = _start ? (_start - 1) : (_cap - 1); ++_count; new ((T*)_items + _start) T(item); return true; } return false; } // Push a default-constructed item into top side of this queue // Returns address of the item inside this queue T* push_top() { if (_count < _cap) { _start = _start ? (_start - 1) : (_cap - 1); ++_count; return new ((T*)_items + _start) T(); } return NULL; } // Pop top-most item from this queue // Returns true on success, false if queue is empty bool pop() { if (_count) { --_count; ((T*)_items + _start)->~T(); _start = _mod(_start + 1, _cap); return true; } return false; } // Pop top-most item from this queue and copy into |item|. // Returns true on success, false if queue is empty bool pop(T* item) { if (_count) { --_count; *item = ((T*)_items)[_start]; ((T*)_items)[_start].~T(); _start = _mod(_start + 1, _cap); return true; } return false; } // Pop bottom-most item from this queue // Returns true on success, false if queue is empty bool pop_bottom() { if (_count) { --_count; ((T*)_items + _start + _count)->~T(); return true; } return false; } // Pop bottom-most item from this queue and copy into |item|. // Returns true on success, false if queue is empty bool pop_bottom(T* item) { if (_count) { --_count; *item = ((T*)_items)[_start + _count]; ((T*)_items)[_start + _count].~T(); return true; } return false; } // Pop all items void clear() { for (uint32_t i = 0; i < _count; ++i) { ((T*)_items + _mod(_start + i, _cap))->~T(); } _count = 0; _start = 0; } // Get address of top-most item, NULL if queue is empty T* top() { return _count ? ((T*)_items + _start) : NULL; } const T* top() const { return _count ? ((const T*)_items + _start) : NULL; } // Randomly access item from top side. // top(0) == top(), top(size()-1) == bottom() // Returns NULL if |index| is out of range. T* top(size_t index) { if (index < _count) { return (T*)_items + _mod(_start + index, _cap); } return NULL; // including _count == 0 } const T* top(size_t index) const { if (index < _count) { return (const T*)_items + _mod(_start + index, _cap); } return NULL; // including _count == 0 } // Get address of bottom-most item, NULL if queue is empty T* bottom() { return _count ? ((T*)_items + _mod(_start + _count - 1, _cap)) : NULL; } const T* bottom() const { return _count ? ((const T*)_items + _mod(_start + _count - 1, _cap)) : NULL; } // Randomly access item from bottom side. // bottom(0) == bottom(), bottom(size()-1) == top() // Returns NULL if |index| is out of range. T* bottom(size_t index) { if (index < _count) { return (T*)_items + _mod(_start + _count - index - 1, _cap); } return NULL; // including _count == 0 } const T* bottom(size_t index) const { if (index < _count) { return (const T*)_items + _mod(_start + _count - index - 1, _cap); } return NULL; // including _count == 0 } bool empty() const { return !_count; } bool full() const { return _cap == _count; } // Number of items size_t size() const { return _count; } // Maximum number of items that can be in this queue size_t capacity() const { return _cap; } // Maximum value of capacity() size_t max_capacity() const { return (1UL << (sizeof(_cap) * 8)) - 1; } // True if the queue was constructed successfully. bool initialized() const { return _items != NULL; } // Swap internal fields with another queue. void swap(BoundedQueue& rhs) { std::swap(_count, rhs._count); std::swap(_cap, rhs._cap); std::swap(_start, rhs._start); std::swap(_ownership, rhs._ownership); std::swap(_items, rhs._items); } private: // Since the space is possibly not owned, we disable copying. DISALLOW_COPY_AND_ASSIGN(BoundedQueue); // This is faster than % in this queue because most |off| are smaller // than |cap|. This is probably not true in other place, be careful // before you use this trick. static uint32_t _mod(uint32_t off, uint32_t cap) { while (off >= cap) { off -= cap; } return off; } uint32_t _count; uint32_t _cap; uint32_t _start; StorageOwnership _ownership; void* _items; }; } // namespace butil #endif // BUTIL_BOUNDED_QUEUE_H