Commit bfb0607f authored by oscar's avatar oscar

提交更新

parent e9997606
......@@ -350,7 +350,7 @@ void TrackingRos::Init(ros::NodeHandle& nh)
m_trackingThread = std::thread(&TrackingRos::ThreadTrackingProcess, this);
m_timeQueue.SetCallbackByEnum(20, QueueDataEnum::QD_INFERRESES, 500,[=](std::vector< TimeQueueDataPtr>& outs) {
m_timeQueue.SetCallbackByEnum(20, QueueDataEnum::QD_INFERRESES, 500,0,[=](std::vector< TimeQueueDataPtr>& outs) {
TimeQueueProcess(outs);
});
//Eigen::MatrixXf mat(2,2);
......
......@@ -6,18 +6,23 @@
TimeQueueObj::TimeQueueObj()
{
for (int i = 0; i < static_cast<int>(QueueDataEnum::QD_ENUM_NUM); i++)
{
m_enumCount[i] = 0;
m_startTime[i] = 0;
}
}
int TimeQueueObj::SetCallbackByEnum(int max_num, QueueDataEnum base, uint64_t interval, TimeQueueDataProcessCallback cb)
int TimeQueueObj::SetCallbackByEnum(int max_num, QueueDataEnum base, uint64_t interval, int recvAll, TimeQueueDataProcessCallback cb)
{
m_max_num = max_num;
m_baseEnum = base;
m_baseInterval = interval;
m_recvAll = recvAll;
m_cb = cb;
m_isStop = false;
m_isReady = false;
m_thread = std::thread(&TimeQueueObj::ProcessThread, this);
return 0;
}
......@@ -25,20 +30,33 @@ int TimeQueueObj::SetCallbackByEnum(int max_num, QueueDataEnum base, uint64_t in
void TimeQueueObj::PushQueueData(TimeQueueDataPtr& data)
{
std::unique_lock<std::mutex> lock(m_mutex);
if (m_canRecv == 0 && m_recvAll == 1 && data->dataType == m_baseEnum)//需要接收所有消息,并且收到的是基准消息
{
int isCanRecv = 0;
for (int i = 0; i < static_cast<int>(QueueDataEnum::QD_ENUM_NUM); i++)
{
if (i != static_cast<int>(m_baseEnum))
{
if (m_startTime[i] != 0 && m_startTime[i] < data->timestamp)
isCanRecv++;
}
}
if (isCanRecv == static_cast<int>(QueueDataEnum::QD_ENUM_NUM) - 1)
m_canRecv = 1;
else
return;
}
if (m_startTime[static_cast<int>(data->dataType)] == 0)
m_startTime[static_cast<int>(data->dataType)] = data->timestamp;
int isInsert = 0;
auto last = m_timeQueue.begin();
for (auto iter = m_timeQueue.begin(); iter != m_timeQueue.end(); iter++)
{
if ((*iter)->timestamp > data->timestamp)
{
m_timeQueue.insert(last, data);
m_timeQueue.insert(iter, data);
isInsert = 1;
break;
}
else
{
last = iter;
}
}
if (isInsert == 0)
{
......@@ -47,16 +65,26 @@ void TimeQueueObj::PushQueueData(TimeQueueDataPtr& data)
m_totelCount++;
m_enumCount[static_cast<int>(data->dataType)]++;
if (data->dataType == m_baseEnum)
m_baseTimestmap = data->timestamp;//更新基准类型的最新时间戳,为了判断时间差用
m_baseWaitTimestamps.push(data->timestamp);
if (m_max_num > 0 && m_totelCount > m_max_num)
{
auto begin = m_timeQueue.begin();
while (begin != m_timeQueue.end() && (*begin)->dataType != data->dataType)
begin++;
m_enumCount[static_cast<int>((*begin)->dataType)]--;
m_totelCount--;
m_timeQueue.pop_front();
m_timeQueue.erase(begin);
if (data->dataType == m_baseEnum)
{
m_baseWaitTimestamps.pop();
}
}
if ( (m_recvAll == 0 && data->dataType == m_baseEnum) || m_recvAll == 1)
{
m_isReady = true;
m_condition.notify_all();
}
m_condition.notify_all();
}
void TimeQueueObj::ProcessThread()
......@@ -76,17 +104,17 @@ void TimeQueueObj::ProcessThread()
int TimeQueueObj::GetQueueData(std::vector<TimeQueueDataPtr>& outs)
{
std::unique_lock<std::mutex> lock(m_mutex);
m_condition.wait(lock, [this]() // Lambda funct
{
return !m_timeQueue.empty();
});
while ( m_isReady == false && m_isStop == false)
m_condition.wait(lock);
outs.clear();
int num = 0;
if (m_enumCount[static_cast<int>(m_baseEnum)] <= 0)
{
m_isReady = false;
return -1;
}
uint64_t timestamp = m_baseWaitTimestamps.front();
std::list<TimeQueueDataPtr>::iterator pos = std::list<TimeQueueDataPtr>::iterator(0);
TimeQueueDataPtr merge[static_cast<int>(QueueDataEnum::QD_ENUM_NUM)];
uint64_t detaT[static_cast<int>(QueueDataEnum::QD_ENUM_NUM)];
......@@ -97,19 +125,22 @@ int TimeQueueObj::GetQueueData(std::vector<TimeQueueDataPtr>& outs)
}
for (auto iter = m_timeQueue.begin(); iter != m_timeQueue.end(); iter++)
{
if (labs(m_baseTimestmap - (*iter)->timestamp) < std::min(m_baseInterval, detaT[static_cast<int>((*iter)->dataType)]))
if (labs(timestamp - (*iter)->timestamp) < std::min(m_baseInterval, detaT[static_cast<int>((*iter)->dataType)]))
{
if(merge[static_cast<int>((*iter)->dataType)] == nullptr)
num++;
detaT[static_cast<int>((*iter)->dataType)] = labs(m_baseTimestmap - (*iter)->timestamp);
detaT[static_cast<int>((*iter)->dataType)] = labs(timestamp - (*iter)->timestamp);
merge[static_cast<int>((*iter)->dataType)] = *iter;
pos = iter;
}
}
if (num == static_cast<int>(QueueDataEnum::QD_ENUM_NUM))
if ( (m_recvAll == 1 && num == static_cast<int>(QueueDataEnum::QD_ENUM_NUM)) || (m_recvAll == 0 && num > 0) )
{
for (int i = 0; i < static_cast<int>(QueueDataEnum::QD_ENUM_NUM); i++)
outs.push_back(merge[i]);
{
if (merge[i])
outs.push_back(merge[i]);
}
for (auto obj = m_timeQueue.begin(); obj != m_timeQueue.end();)
{
int isFinish = 0;
......@@ -117,24 +148,38 @@ int TimeQueueObj::GetQueueData(std::vector<TimeQueueDataPtr>& outs)
isFinish = 1;
if ((*obj)->dataType != m_baseEnum)
{
if (m_enumCount[static_cast<int>((*obj)->dataType)] == 1)
if (m_recvAll == 1 && m_enumCount[static_cast<int>((*obj)->dataType)] == 1)
obj++;
else
{
m_enumCount[static_cast<int>((*obj)->dataType)]--;
m_totelCount--;
obj = m_timeQueue.erase(obj);
}
}
else
{
m_enumCount[static_cast<int>((*obj)->dataType)]--;
m_totelCount--;
m_baseWaitTimestamps.pop();
obj = m_timeQueue.erase(obj);
}
if (isFinish == 1)
break;
}
m_isReady = false;
return 0;
}
m_isReady = false;
return -1;
}
void TimeQueueObj::Release()
{
if (m_isStop == false)
{
m_isStop = true;
m_condition.notify_all();
m_thread.join();
}
}
......@@ -7,8 +7,9 @@
#include <cstdint>
#include <condition_variable>
#include <functional>
#include <queue>
#ifdef _CAR_SENSING_
#ifdef _USING_QUEUE_STRUCT_H_
#include "TimeQueueStruct.h"
#else
enum class QueueDataEnum //数据类型的枚举
......@@ -42,7 +43,8 @@ public:
TimeQueueObj();
~TimeQueueObj() {}
int SetCallbackByEnum(int max_num,QueueDataEnum base,uint64_t interval, TimeQueueDataProcessCallback cb);
//max_num队列最大值,base是触发回调需要检测哪个消息为触发条件,interval是离base消息前后差多少毫秒的数据为需要融合的消息,recvAll是0就只要接收到base消息就触发,不需要所有类型消息都全,1是需要所有消息都有并且所有消息离base消息差interval内的消息,cb是回调函数
int SetCallbackByEnum(int max_num, QueueDataEnum base, uint64_t interval, int recvAll, TimeQueueDataProcessCallback cb);
void PushQueueData(TimeQueueDataPtr& data);
......@@ -50,6 +52,8 @@ public:
int GetQueueData(std::vector<TimeQueueDataPtr>& outs);
void Release();
std::list<TimeQueueDataPtr> m_timeQueue;//记录数据的队列
......@@ -57,6 +61,7 @@ public:
std::condition_variable m_condition;
int m_max_num = 0;
bool m_isStop = true;
bool m_isReady = false;
std::thread m_thread;//等待触发的线程
//QueueDataEnum m_enumStatus[static_cast<int>(QueueDataEnum::QD_ENUM_NUM)];//记录是否至少保存一份数据,默认0是不保存
......@@ -65,6 +70,10 @@ public:
int m_enumCount[static_cast<int>(QueueDataEnum::QD_ENUM_NUM)];//记录每个类型的数量
int m_totelCount = 0;//记录总数量
uint64_t m_baseTimestmap = 0;//base数据最新的时间戳
//uint64_t m_baseTimestmap = 0;//base数据最新的时间戳
uint64_t m_baseInterval = 0;//base数据判断的时间间隔
int m_recvAll = 0;//是否接受到所有消息才触发回调函数
std::queue<uint64_t> m_baseWaitTimestamps;//等待处理的时间戳
uint64_t m_startTime[static_cast<int>(QueueDataEnum::QD_ENUM_NUM)];//记录开始的时间戳
int m_canRecv = 0;//是否能接收基准消息
};
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment