Commit 94a7152a authored by gabime's avatar gabime

async queue - overrun oldsest policy option

parent 0358d115
...@@ -126,8 +126,8 @@ using level_hasher = std::hash<int>; ...@@ -126,8 +126,8 @@ using level_hasher = std::hash<int>;
// //
enum class async_overflow_policy enum class async_overflow_policy
{ {
block_retry, // Block / yield / sleep until message can be enqueued block_retry, // Block until message can be enqueued
discard_log_msg // Discard the message it enqueue fails overrun_oldeset // Discard oldest message in the queue if full when trying to add new item.
}; };
// //
......
...@@ -15,20 +15,20 @@ public: ...@@ -15,20 +15,20 @@ public:
using item_type = T; using item_type = T;
explicit circular_q(size_t max_items) explicit circular_q(size_t max_items)
: max_items_(max_items + 1) : max_items_(max_items + 1) // one item is reserved as marker for full q
, v_(max_items_) , v_(max_items_)
{ {
} }
// push back, overrun last item if no room left // push back, overrun (oldest) item if no room left
void push_back(T &&item) void push_back(T &&item)
{ {
v_[head_] = std::move(item); v_[tail_] = std::move(item);
head_ = (head_ + 1) % max_items_; tail_ = (tail_ + 1) % max_items_;
if (head_ == tail_) if (tail_ == head_) // overrun last item if full
{ {
tail_ = (tail_ + 1) % max_items_; head_ = (head_ + 1) % max_items_;
} }
} }
...@@ -36,25 +36,26 @@ public: ...@@ -36,25 +36,26 @@ public:
// If there are no elements in the container, the behavior is undefined. // If there are no elements in the container, the behavior is undefined.
void pop_front(T &popped_item) void pop_front(T &popped_item)
{ {
popped_item = std::move(v_[tail_]); popped_item = std::move(v_[head_]);
tail_ = (tail_ + 1) % max_items_; head_ = (head_ + 1) % max_items_;
} }
bool empty() bool empty()
{ {
return head_ == tail_; return tail_ == head_;
} }
bool full() bool full()
{ {
// tail is ahead of the head by 1 // head is ahead of the tail by 1
return ((head_ + 1) % max_items_) == tail_; return ((tail_ + 1) % max_items_) == head_;
} }
private: private:
size_t max_items_; size_t max_items_;
typename std::vector<T>::size_type head_ = 0; typename std::vector<T>::size_type head_ = 0;
typename std::vector<T>::size_type tail_ = 0; typename std::vector<T>::size_type tail_ = 0;
std::vector<T> v_; std::vector<T> v_;
}; };
} // namespace details } // namespace details
......
...@@ -39,19 +39,14 @@ public: ...@@ -39,19 +39,14 @@ public:
push_cv_.notify_one(); push_cv_.notify_one();
} }
// try to enqueue and return immediately false if no room left // enqueue immediately. overrun oldest message in the queue if no room left.
bool enqueue_nowait(T &&item) void enqueue_nowait(T &&item)
{ {
{ {
std::unique_lock<std::mutex> lock(queue_mutex_); std::unique_lock<std::mutex> lock(queue_mutex_);
if (q_.full()) q_.push_back(std::move(item));
{
return false;
}
q_.push_back(std::forward<T>(item));
} }
push_cv_.notify_one(); push_cv_.notify_one();
return true;
} }
// try to dequeue item. if no item found. wait upto timeout and try again // try to dequeue item. if no item found. wait upto timeout and try again
......
...@@ -30,7 +30,7 @@ TEST_CASE("discard policy ", "[async]") ...@@ -30,7 +30,7 @@ TEST_CASE("discard policy ", "[async]")
size_t messages = 1024; size_t messages = 1024;
{ {
auto tp = std::make_shared<details::thread_pool>(queue_size, 1); auto tp = std::make_shared<details::thread_pool>(queue_size, 1);
auto logger = std::make_shared<async_logger>("as", test_sink, tp, async_overflow_policy::discard_log_msg); auto logger = std::make_shared<async_logger>("as", test_sink, tp, async_overflow_policy::overrun_oldeset);
for (size_t i = 0; i < messages; i++) for (size_t i = 0; i < messages; i++)
{ {
logger->info("Hello message #{}", i); logger->info("Hello message #{}", i);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment