Commit 27f46add authored by oscar's avatar oscar

删除文件

parent ef87d9c2

#include "TimeQueueObj.h"
TimeQueueObj::TimeQueueObj()
{
for (int i = 0; i < static_cast<int>(QueueDataEnum::QD_ENUM_NUM); i++)
{
m_enumCount[i] = 0;
m_startTime[i] = 0;
}
}
int TimeQueueObj::SetCallbackByEnum(int max_num, QueueDataEnum base, uint64_t interval, int recvAll, TimeQueueDataProcessCallback cb)
{
m_max_num = max_num;
m_baseEnum = base;
m_baseInterval = interval;
m_recvAll = recvAll;
m_cb = cb;
m_isStop = false;
m_isReady = false;
m_thread = std::thread(&TimeQueueObj::ProcessThread, this);
return 0;
}
void TimeQueueObj::PushQueueData(TimeQueueDataPtr& data)
{
std::unique_lock<std::mutex> lock(m_mutex);
if (m_canRecv == 0 && m_recvAll == 1 && data->dataType == m_baseEnum)//需要接收所有消息,并且收到的是基准消息
{
int isCanRecv = 0;
for (int i = 0; i < static_cast<int>(QueueDataEnum::QD_ENUM_NUM); i++)
{
if (i != static_cast<int>(m_baseEnum))
{
if (m_startTime[i] != 0 && m_startTime[i] < data->timestamp)
isCanRecv++;
}
}
if (isCanRecv == static_cast<int>(QueueDataEnum::QD_ENUM_NUM) - 1)
m_canRecv = 1;
else
return;
}
if (m_startTime[static_cast<int>(data->dataType)] == 0)
m_startTime[static_cast<int>(data->dataType)] = data->timestamp;
int isInsert = 0;
for (auto iter = m_timeQueue.begin(); iter != m_timeQueue.end(); iter++)
{
if ((*iter)->timestamp > data->timestamp)
{
m_timeQueue.insert(iter, data);
isInsert = 1;
break;
}
}
if (isInsert == 0)
{
m_timeQueue.push_back(data);
}
m_totelCount++;
m_enumCount[static_cast<int>(data->dataType)]++;
if (data->dataType == m_baseEnum)
m_baseWaitTimestamps.push(data->timestamp);
if (m_max_num > 0 && m_totelCount > m_max_num)
{
auto begin = m_timeQueue.begin();
while (begin != m_timeQueue.end() && (*begin)->dataType != data->dataType)
begin++;
m_enumCount[static_cast<int>((*begin)->dataType)]--;
m_totelCount--;
m_timeQueue.erase(begin);
if (data->dataType == m_baseEnum)
{
m_baseWaitTimestamps.pop();
}
}
if ( (m_recvAll == 0 && data->dataType == m_baseEnum) || m_recvAll == 1)
{
m_isReady = true;
m_condition.notify_all();
}
}
void TimeQueueObj::ProcessThread()
{
while (m_isStop == false)
{
std::vector<TimeQueueDataPtr> outs;
int ret = GetQueueData(outs);
if (ret == 0)
{
if (m_cb)
m_cb(outs);
}
}
}
int TimeQueueObj::GetQueueData(std::vector<TimeQueueDataPtr>& outs)
{
std::unique_lock<std::mutex> lock(m_mutex);
while ( m_isReady == false && m_isStop == false)
m_condition.wait(lock);
outs.clear();
int num = 0;
if (m_enumCount[static_cast<int>(m_baseEnum)] <= 0)
{
m_isReady = false;
return -1;
}
uint64_t timestamp = m_baseWaitTimestamps.front();
std::list<TimeQueueDataPtr>::iterator pos = std::list<TimeQueueDataPtr>::iterator(0);
TimeQueueDataPtr merge[static_cast<int>(QueueDataEnum::QD_ENUM_NUM)];
uint64_t detaT[static_cast<int>(QueueDataEnum::QD_ENUM_NUM)];
for (int i = 0; i < static_cast<int>(QueueDataEnum::QD_ENUM_NUM); i++)
{
merge[i] = nullptr;
detaT[i] = 1000;
}
for (auto iter = m_timeQueue.begin(); iter != m_timeQueue.end(); iter++)
{
if (labs(timestamp - (*iter)->timestamp) < std::min(m_baseInterval, detaT[static_cast<int>((*iter)->dataType)]))
{
if(merge[static_cast<int>((*iter)->dataType)] == nullptr)
num++;
detaT[static_cast<int>((*iter)->dataType)] = labs(timestamp - (*iter)->timestamp);
merge[static_cast<int>((*iter)->dataType)] = *iter;
pos = iter;
}
}
if ( (m_recvAll == 1 && num == static_cast<int>(QueueDataEnum::QD_ENUM_NUM)) || (m_recvAll == 0 && num > 0) )
{
for (int i = 0; i < static_cast<int>(QueueDataEnum::QD_ENUM_NUM); i++)
{
if (merge[i])
outs.push_back(merge[i]);
}
for (auto obj = m_timeQueue.begin(); obj != m_timeQueue.end();)
{
int isFinish = 0;
if (obj == pos)
isFinish = 1;
if ((*obj)->dataType != m_baseEnum)
{
if (m_recvAll == 1 && m_enumCount[static_cast<int>((*obj)->dataType)] == 1)
obj++;
else
{
m_enumCount[static_cast<int>((*obj)->dataType)]--;
m_totelCount--;
obj = m_timeQueue.erase(obj);
}
}
else
{
m_enumCount[static_cast<int>((*obj)->dataType)]--;
m_totelCount--;
m_baseWaitTimestamps.pop();
obj = m_timeQueue.erase(obj);
}
if (isFinish == 1)
break;
}
m_isReady = false;
return 0;
}
m_isReady = false;
return -1;
}
void TimeQueueObj::Release()
{
if (m_isStop == false)
{
m_isStop = true;
m_condition.notify_all();
m_thread.join();
}
}
#pragma once
#include <vector>
#include <list>
#include <mutex>
#include <thread>
#include <cstdint>
#include <condition_variable>
#include <functional>
#include <queue>
#ifdef _USING_QUEUE_STRUCT_H_
#include "TimeQueueStruct.h"
#else
enum class QueueDataEnum //数据类型的枚举
{
QD_NONE = -1,//默认值,无类型
QD_DET_TRACKING_ARRAY = 0,//对应jfx_common_msgs::det_tracking_array
QD_INFERRESES = 1,//对应jfx_common_msgs::InferReses
QD_ENUM_NUM
};
struct TimeQueueData
{
uint64_t timestamp = 0;//排序的时间,以毫秒为单位
QueueDataEnum dataType = QueueDataEnum::QD_NONE;//数据类型
union {
uint64_t value;
int opt;
} data;
};
#endif
using TimeQueueDataPtr = std::shared_ptr<TimeQueueData>;
typedef void (TimeQueueDataProcess)(std::vector< TimeQueueDataPtr>& outs);
typedef std::function<TimeQueueDataProcess> TimeQueueDataProcessCallback;
class TimeQueueObj
{
public:
TimeQueueObj();
~TimeQueueObj() {}
//max_num队列最大值,base是触发回调需要检测哪个消息为触发条件,interval是离base消息前后差多少毫秒的数据为需要融合的消息,recvAll是0就只要接收到base消息就触发,不需要所有类型消息都全,1是需要所有消息都有并且所有消息离base消息差interval内的消息,cb是回调函数
int SetCallbackByEnum(int max_num, QueueDataEnum base, uint64_t interval, int recvAll, TimeQueueDataProcessCallback cb);
void PushQueueData(TimeQueueDataPtr& data);
void ProcessThread();
int GetQueueData(std::vector<TimeQueueDataPtr>& outs);
void Release();
std::list<TimeQueueDataPtr> m_timeQueue;//记录数据的队列
mutable std::mutex m_mutex;
std::condition_variable m_condition;
int m_max_num = 0;
bool m_isStop = true;
bool m_isReady = false;
std::thread m_thread;//等待触发的线程
//QueueDataEnum m_enumStatus[static_cast<int>(QueueDataEnum::QD_ENUM_NUM)];//记录是否至少保存一份数据,默认0是不保存
QueueDataEnum m_baseEnum;//触发回调时要检查的这个枚举的类型是否存在,存在就回调
TimeQueueDataProcessCallback m_cb = nullptr;//回调函数
int m_enumCount[static_cast<int>(QueueDataEnum::QD_ENUM_NUM)];//记录每个类型的数量
int m_totelCount = 0;//记录总数量
//uint64_t m_baseTimestmap = 0;//base数据最新的时间戳
uint64_t m_baseInterval = 0;//base数据判断的时间间隔
int m_recvAll = 0;//是否接受到所有消息才触发回调函数
std::queue<uint64_t> m_baseWaitTimestamps;//等待处理的时间戳
uint64_t m_startTime[static_cast<int>(QueueDataEnum::QD_ENUM_NUM)];//记录开始的时间戳
int m_canRecv = 0;//是否能接收基准消息
};
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment