Commit 972fbffa authored by oscar's avatar oscar

提交统一时间队列

parent 5bc2cc60

#include "TimeQueueObj.h"
TimeQueueObj::TimeQueueObj()
{
for (int i = 0; i < static_cast<int>(QueueDataEnum::QD_ENUM_NUM); i++)
m_enumCount[i] = 0;
}
int TimeQueueObj::SetCallbackByEnum(int max_num, QueueDataEnum base, TimeQueueDataProcessCallback cb)
{
m_max_num = max_num;
m_baseEnum = base;
m_cb = cb;
m_isStop = false;
m_thread = std::thread(&TimeQueueObj::ProcessThread, this);
return 0;
}
void TimeQueueObj::PushQueueData(TimeQueueDataPtr& data)
{
std::unique_lock<std::mutex> lock(m_mutex);
int isInsert = 0;
auto last = m_timeQueue.begin();
for (auto iter = m_timeQueue.begin(); iter != m_timeQueue.end(); iter++)
{
if ((*iter)->timestamp > data->timestamp)
{
m_timeQueue.insert(last, data);
isInsert = 1;
break;
}
else
{
last = iter;
}
}
if (isInsert == 0)
{
m_timeQueue.push_back(data);
}
m_totelCount++;
m_enumCount[data->dataType]++;
if (m_max_num > 0 && m_totelCount > m_max_num)
{
auto begin = m_timeQueue.begin();
m_enumCount[(*begin)->dataType]--;
m_totelCount--;
m_timeQueue.pop_front();
}
m_condition.notify_all();
}
void TimeQueueObj::ProcessThread()
{
while (m_isStop == false)
{
std::vector<TimeQueueDataPtr> outs;
int ret = GetQueueData(outs);
if (ret == 0)
{
if (m_cb)
m_cb(outs);
}
}
}
int TimeQueueObj::GetQueueData(std::vector<TimeQueueDataPtr>& outs)
{
std::unique_lock<std::mutex> lock(m_mutex);
m_condition.wait(lock, [this]() // Lambda funct
{
return !m_timeQueue.empty();
});
outs.clear();
int num = 0;
std::list< TimeQueueDataPtr>::iterator pos = nullptr;
TimeQueueDataPtr merge[static_cast<int>(QueueDataEnum::QD_ENUM_NUM)];
for (int i = 0; i < static_cast<int>(QueueDataEnum::QD_ENUM_NUM); i++)
merge[i] = nullptr;
for (auto iter = m_timeQueue.begin(); iter != m_timeQueue.end(); iter++)
{
if (merge[(*iter)->dataType] == nullptr)
num++;
merge[(*iter)->dataType] = *iter;
if ((*iter)->dataType == m_baseEnum && num = static_cast<int>(QueueDataEnum::QD_ENUM_NUM))
{
pos = iter;
break;
}
}
if (pos)
{
for (int i = 0; i < static_cast<int>(QueueDataEnum::QD_ENUM_NUM); i++)
outs.push_back(merge[i]);
for (auto obj = m_timeQueue.begin(); obj != m_timeQueue.end();)
{
int isFinish = 0;
if (obj == pos)
isFinish = 1;
if ((*iter)->dataType != m_baseEnum)
{
if (m_enumCount[(*iter)->dataType] == 1)
obj++;
else
{
m_enumCount[(*iter)->dataType]--;
obj = m_timeQueue.erase(obj);
}
}
else
{
m_enumCount[(*iter)->dataType]--;
obj = m_timeQueue.erase(obj);
}
if (isFinish == 1)
break;
}
return 0;
}
return -1;
}
#pragma once
#include <vector>
#include <list>
#include <mutex>
#include <thread>
#include <cstdint>
#include <condition_variable>
#include <functional>
#ifdef _CAR_SENSING_
enum class QueueDataEnum //数据类型的枚举
{
QD_NONE = -1,//默认值,无类型
QD_DET_TRACKING_ARRAY = 0,//对应jfx_common_msgs::det_tracking_array
QD_INFERRESES = 1,//对应jfx_common_msgs::InferReses
QD_ENUM_NUM
};
struct TimeQueueData
{
uint64_t timestamp = 0;//排序的时间
QueueDataEnum dataType = QueueDataEnum::QD_NONE;//数据类型
union {
jfx_common_msgs::det_tracking_array detArray;
jfx_common_msgs::InferReses infRes;
} data;
};
#else
enum class QueueDataEnum //数据类型的枚举
{
QD_NONE = -1,//默认值,无类型
QD_DET_TRACKING_ARRAY = 0,//对应jfx_common_msgs::det_tracking_array
QD_INFERRESES = 1,//对应jfx_common_msgs::InferReses
QD_ENUM_NUM
};
struct TimeQueueData
{
uint64_t timestamp = 0;//排序的时间
QueueDataEnum dataType = QueueDataEnum::QD_NONE;//数据类型
union {
uint64_t value;
int opt;
} data;
};
#endif
using TimeQueueDataPtr = std::shared_ptr<TimeQueueData>;
typedef void (TimeQueueDataProcess)(std::vector< TimeQueueDataPtr>& outs);
typedef std::function<TimeQueueDataProcess> TimeQueueDataProcessCallback;
class TimeQueueObj
{
public:
TimeQueueObj();
~TimeQueueObj() {}
int SetCallbackByEnum(int max_num,QueueDataEnum base, TimeQueueDataProcessCallback cb);
void PushQueueData(TimeQueueDataPtr& data);
void ProcessThread();
int GetQueueData(std::vector<TimeQueueDataPtr>& outs);
std::list<TimeQueueDataPtr> m_timeQueue;//记录数据的队列
mutable std::mutex m_mutex;
std::condition_variable m_condition;
int m_max_num = 0;
bool m_isStop = true;
std::thread m_thread;//等待触发的线程
//QueueDataEnum m_enumStatus[static_cast<int>(QueueDataEnum::QD_ENUM_NUM)];//记录是否至少保存一份数据,默认0是不保存
QueueDataEnum m_baseEnum;//触发回调时要检查的这个枚举的类型是否存在,存在就回调
TimeQueueDataProcessCallback m_cb = nullptr;//回调函数
int m_enumCount[static_cast<int>(QueueDataEnum::QD_ENUM_NUM)];//记录每个类型的数量
int m_totelCount = 0;//记录总数量
};
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment