Commit ed6f4503 authored by markshih91's avatar markshih91

update

parent 11cb03b3
Pipeline #648 failed with stages
cmake_minimum_required(VERSION 3.0.2)
project(tracking)
## Compile as C++11, supported in ROS Kinetic and newer
# add_compile_options(-std=c++11)
## Find catkin macros and libraries
## if COMPONENTS list like find_package(catkin REQUIRED COMPONENTS xyz)
## is used, also find other catkin packages
find_package(catkin REQUIRED COMPONENTS
message_generation
rospy
std_msgs
sensor_msgs
jsk_recognition_msgs
visualization_msgs
)
## System dependencies are found with CMake's conventions
# find_package(Boost REQUIRED COMPONENTS system)
## Uncomment this if the package has a setup.py. This macro ensures
## modules and global scripts declared therein get installed
## See http://ros.org/doc/api/catkin/html/user_guide/setup_dot_py.html
# catkin_python_setup()
################################################
## Declare ROS messages, services and actions ##
################################################
## To declare and build messages, services or actions from within this
## package, follow these steps:
## * Let MSG_DEP_SET be the set of packages whose message types you use in
## your messages/services/actions (e.g. std_msgs, actionlib_msgs, ...).
## * In the file package.xml:
## * add a build_depend tag for "message_generation"
## * add a build_depend and a exec_depend tag for each package in MSG_DEP_SET
## * If MSG_DEP_SET isn't empty the following dependency has been pulled in
## but can be declared for certainty nonetheless:
## * add a exec_depend tag for "message_runtime"
## * In this file (CMakeLists.txt):
## * add "message_generation" and every package in MSG_DEP_SET to
## find_package(catkin REQUIRED COMPONENTS ...)
## * add "message_runtime" and every package in MSG_DEP_SET to
## catkin_package(CATKIN_DEPENDS ...)
## * uncomment the add_*_files sections below as needed
## and list every .msg/.srv/.action file to be processed
## * uncomment the generate_messages entry below
## * add every package in MSG_DEP_SET to generate_messages(DEPENDENCIES ...)
## Generate messages in the 'msg' folder
add_message_files(
FILES
det_tracking.msg
det_tracking_array.msg
)
## Generate services in the 'srv' folder
# add_service_files(
# FILES
# Service1.srv
# Service2.srv
# )
## Generate actions in the 'action' folder
# add_action_files(
# FILES
# Action1.action
# Action2.action
# )
## Generate added messages and services with any dependencies listed here
generate_messages(
DEPENDENCIES
std_msgs
sensor_msgs
jsk_recognition_msgs
visualization_msgs
)
################################################
## Declare ROS dynamic reconfigure parameters ##
################################################
## To declare and build dynamic reconfigure parameters within this
## package, follow these steps:
## * In the file package.xml:
## * add a build_depend and a exec_depend tag for "dynamic_reconfigure"
## * In this file (CMakeLists.txt):
## * add "dynamic_reconfigure" to
## find_package(catkin REQUIRED COMPONENTS ...)
## * uncomment the "generate_dynamic_reconfigure_options" section below
## and list every .cfg file to be processed
## Generate dynamic reconfigure parameters in the 'cfg' folder
# generate_dynamic_reconfigure_options(
# cfg/DynReconf1.cfg
# cfg/DynReconf2.cfg
# )
###################################
## catkin specific configuration ##
###################################
## The catkin_package macro generates cmake config files for your package
## Declare things to be passed to dependent projects
## INCLUDE_DIRS: uncomment this if your package contains header files
## LIBRARIES: libraries you create in this project that dependent projects also need
## CATKIN_DEPENDS: catkin_packages dependent projects also need
## DEPENDS: system dependencies of this project that dependent projects also need
catkin_package(
# INCLUDE_DIRS include
# LIBRARIES tracking
CATKIN_DEPENDS rospy std_msgs message_runtime
# DEPENDS system_lib
)
###########
## Build ##
###########
## Specify additional locations of header files
## Your package locations should be listed before other locations
include_directories(
# include
${catkin_INCLUDE_DIRS}
)
## Declare a C++ library
# add_library(${PROJECT_NAME}
# src/${PROJECT_NAME}/tracking.cpp
# )
## Add cmake target dependencies of the library
## as an example, code may need to be generated before libraries
## either from message generation or dynamic reconfigure
# add_dependencies(${PROJECT_NAME} ${${PROJECT_NAME}_EXPORTED_TARGETS} ${catkin_EXPORTED_TARGETS})
## Declare a C++ executable
## With catkin_make all packages are built within a single CMake context
## The recommended prefix ensures that target names across packages don't collide
# add_executable(${PROJECT_NAME}_node src/tracking_node.cpp)
## Rename C++ executable without prefix
## The above recommended prefix causes long target names, the following renames the
## target back to the shorter version for ease of user use
## e.g. "rosrun someones_pkg node" instead of "rosrun someones_pkg someones_pkg_node"
# set_target_properties(${PROJECT_NAME}_node PROPERTIES OUTPUT_NAME node PREFIX "")
## Add cmake target dependencies of the executable
## same as for the library above
# add_dependencies(${PROJECT_NAME}_node ${${PROJECT_NAME}_EXPORTED_TARGETS} ${catkin_EXPORTED_TARGETS})
## Specify libraries to link a library or executable target against
# target_link_libraries(${PROJECT_NAME}_node
# ${catkin_LIBRARIES}
# )
#############
## Install ##
#############
# all install targets should use catkin DESTINATION variables
# See http://ros.org/doc/api/catkin/html/adv_user_guide/variables.html
## Mark executable scripts (Python etc.) for installation
## in contrast to setup.py, you can choose the destination
# catkin_install_python(PROGRAMS
# scripts/my_python_script
# DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION}
# )
## Mark executables for installation
## See http://docs.ros.org/melodic/api/catkin/html/howto/format1/building_executables.html
# install(TARGETS ${PROJECT_NAME}_node
# RUNTIME DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION}
# )
## Mark libraries for installation
## See http://docs.ros.org/melodic/api/catkin/html/howto/format1/building_libraries.html
# install(TARGETS ${PROJECT_NAME}
# ARCHIVE DESTINATION ${CATKIN_PACKAGE_LIB_DESTINATION}
# LIBRARY DESTINATION ${CATKIN_PACKAGE_LIB_DESTINATION}
# RUNTIME DESTINATION ${CATKIN_GLOBAL_BIN_DESTINATION}
# )
## Mark cpp header files for installation
# install(DIRECTORY include/${PROJECT_NAME}/
# DESTINATION ${CATKIN_PACKAGE_INCLUDE_DESTINATION}
# FILES_MATCHING PATTERN "*.h"
# PATTERN ".svn" EXCLUDE
# )
## Mark other files for installation (e.g. launch and bag files, etc.)
# install(FILES
# # myfile1
# # myfile2
# DESTINATION ${CATKIN_PACKAGE_SHARE_DESTINATION}
# )
#############
## Testing ##
#############
## Add gtest based cpp test target and link libraries
# catkin_add_gtest(${PROJECT_NAME}-test test/test_tracking.cpp)
# if(TARGET ${PROJECT_NAME}-test)
# target_link_libraries(${PROJECT_NAME}-test ${PROJECT_NAME})
# endif()
## Add folders to be run by python nosetests
# catkin_add_nosetests(test)
uint64 frame
uint8 type
float32 score
string name
string license_plate_number
string color_name
float32 h
float32 w
float32 l
float32 x
float32 y
float32 z
float32 rot_y
int32 obj_id
float32 v_x
float32 v_y
float32 v_z
float32 loc_x
float32 loc_y
float32 loc_z
det_tracking[] array
sensor_msgs/PointCloud2 cloud
<?xml version="1.0"?>
<package format="2">
<name>tracking</name>
<version>0.0.0</version>
<description>The tracking package</description>
<!-- One maintainer tag required, multiple allowed, one person per tag -->
<!-- Example: -->
<!-- <maintainer email="jane.doe@example.com">Jane Doe</maintainer> -->
<maintainer email="shishuai@todo.todo">shishuai</maintainer>
<!-- One license tag required, multiple allowed, one license per tag -->
<!-- Commonly used license strings: -->
<!-- BSD, MIT, Boost Software License, GPLv2, GPLv3, LGPLv2.1, LGPLv3 -->
<license>TODO</license>
<!-- Url tags are optional, but multiple are allowed, one per tag -->
<!-- Optional attribute type can be: website, bugtracker, or repository -->
<!-- Example: -->
<!-- <url type="website">http://wiki.ros.org/tracking</url> -->
<!-- Author tags are optional, multiple are allowed, one per tag -->
<!-- Authors do not have to be maintainers, but could be -->
<!-- Example: -->
<!-- <author email="jane.doe@example.com">Jane Doe</author> -->
<!-- The *depend tags are used to specify dependencies -->
<!-- Dependencies can be catkin packages or system dependencies -->
<!-- Examples: -->
<!-- Use depend as a shortcut for packages that are both build and exec dependencies -->
<!-- <depend>roscpp</depend> -->
<!-- Note that this is equivalent to the following: -->
<!-- <build_depend>roscpp</build_depend> -->
<!-- <exec_depend>roscpp</exec_depend> -->
<!-- Use build_depend for packages you need at compile time: -->
<!-- <build_depend>message_generation</build_depend> -->
<!-- Use build_export_depend for packages you need in order to build against this package: -->
<!-- <build_export_depend>message_generation</build_export_depend> -->
<!-- Use buildtool_depend for build tool packages: -->
<!-- <buildtool_depend>catkin</buildtool_depend> -->
<!-- Use exec_depend for packages you need at runtime: -->
<!-- <exec_depend>message_runtime</exec_depend> -->
<!-- Use test_depend for packages you need only for testing: -->
<!-- <test_depend>gtest</test_depend> -->
<!-- Use doc_depend for packages you need only for building documentation: -->
<!-- <doc_depend>doxygen</doc_depend> -->
<buildtool_depend>catkin</buildtool_depend>
<build_depend>rospy</build_depend>
<build_depend>std_msgs</build_depend>
<build_depend>sensor_msgs</build_depend>
<build_depend>jsk_recognition_msgs</build_depend>
<build_depend>visualization_msgs</build_depend>
<build_export_depend>rospy</build_export_depend>
<build_export_depend>std_msgs</build_export_depend>
<build_export_depend>sensor_msgs</build_export_depend>
<exec_depend>rospy</exec_depend>
<exec_depend>std_msgs</exec_depend>
<exec_depend>sensor_msgs</exec_depend>
<exec_depend>jsk_recognition_msgs</exec_depend>
<exec_depend>visualization_msgs</exec_depend>
<build_depend>message_generation</build_depend>
<build_export_depend>message_generation</build_export_depend>
<exec_depend>message_runtime</exec_depend>
<!-- The export tag contains other, unspecified, tags -->
<export>
<!-- Other tools can request additional information be placed here -->
</export>
</package>
# Author: Xinshuo Weng
# email: xinshuo.weng@gmail.com
import numpy as np, copy
# from numba import jit
from scipy.spatial import ConvexHull
# @jit
def poly_area(x, y):
""" Ref: http://stackoverflow.com/questions/24467972/calculate-area-of-polygon-given-x-y-coordinates """
return 0.5 * np.abs(np.dot(x, np.roll(y, 1)) - np.dot(y, np.roll(x, 1)))
# @jit
def box3d_vol(corners):
''' corners: (8,3) no assumption on axis direction '''
a = np.sqrt(np.sum((corners[0, :] - corners[1, :]) ** 2))
b = np.sqrt(np.sum((corners[1, :] - corners[2, :]) ** 2))
c = np.sqrt(np.sum((corners[0, :] - corners[4, :]) ** 2))
return a * b * c
# @jit
def convex_hull_intersection(p1, p2):
""" Compute area of two convex hull's intersection area.
p1,p2 are a list of (x,y) tuples of hull vertices.
return a list of (x,y) for the intersection and its volume
"""
inter_p = polygon_clip(p1, p2)
if inter_p is not None:
hull_inter = ConvexHull(inter_p)
return inter_p, hull_inter.volume
else:
return None, 0.0
def polygon_clip(subjectPolygon, clipPolygon):
""" Clip a polygon with another polygon.
Ref: https://rosettacode.org/wiki/Sutherland-Hodgman_polygon_clipping#Python
Args:
subjectPolygon: a list of (x,y) 2d points, any polygon.
clipPolygon: a list of (x,y) 2d points, has to be *convex*
Note:
**points have to be counter-clockwise ordered**
Return:
a list of (x,y) vertex point for the intersection polygon.
"""
def inside(p):
return (cp2[0] - cp1[0]) * (p[1] - cp1[1]) > (cp2[1] - cp1[1]) * (p[0] - cp1[0])
def computeIntersection():
dc = [cp1[0] - cp2[0], cp1[1] - cp2[1]]
dp = [s[0] - e[0], s[1] - e[1]]
n1 = cp1[0] * cp2[1] - cp1[1] * cp2[0]
n2 = s[0] * e[1] - s[1] * e[0]
n3 = 1.0 / (dc[0] * dp[1] - dc[1] * dp[0])
return [(n1 * dp[0] - n2 * dc[0]) * n3, (n1 * dp[1] - n2 * dc[1]) * n3]
outputList = subjectPolygon
cp1 = clipPolygon[-1]
for clipVertex in clipPolygon:
cp2 = clipVertex
inputList = outputList
outputList = []
s = inputList[-1]
for subjectVertex in inputList:
e = subjectVertex
if inside(e):
if not inside(s): outputList.append(computeIntersection())
outputList.append(e)
elif inside(s):
outputList.append(computeIntersection())
s = e
cp1 = cp2
if len(outputList) == 0: return None
return (outputList)
def iou3d(corners1, corners2):
''' Compute 3D bounding box IoU, only working for object parallel to ground
Input:
corners1: numpy array (8,3), assume up direction is negative Y
corners2: numpy array (8,3), assume up direction is negative Y
Output:
iou: 3D bounding box IoU
iou_2d: bird's eye view 2D bounding box IoU
todo (rqi): add more description on corner points' orders.
'''
# corner points are in counter clockwise order
rect1 = [(corners1[i, 0], corners1[i, 2]) for i in range(3, -1, -1)]
rect2 = [(corners2[i, 0], corners2[i, 2]) for i in range(3, -1, -1)]
area1 = poly_area(np.array(rect1)[:, 0], np.array(rect1)[:, 1])
area2 = poly_area(np.array(rect2)[:, 0], np.array(rect2)[:, 1])
# inter_area = shapely_polygon_intersection(rect1, rect2)
_, inter_area = convex_hull_intersection(rect1, rect2)
# try:
# _, inter_area = convex_hull_intersection(rect1, rect2)
# except ValueError:
# inter_area = 0
# except scipy.spatial.qhull.QhullError:
# inter_area = 0
iou_2d = inter_area / (area1 + area2 - inter_area)
ymax = min(corners1[0, 1], corners2[0, 1])
ymin = max(corners1[4, 1], corners2[4, 1])
inter_vol = inter_area * max(0.0, ymax - ymin)
vol1 = box3d_vol(corners1)
vol2 = box3d_vol(corners2)
iou = inter_vol / (vol1 + vol2 - inter_vol)
return iou, iou_2d
# @jit
def roty(t):
''' Rotation about the y-axis. '''
c = np.cos(t)
s = np.sin(t)
return np.array([[c, 0, s],
[0, 1, 0],
[-s, 0, c]])
def convert_3dbox_to_8corner(bbox3d_input):
''' Takes an object's 3D box with the representation of [x,y,z,theta,l,w,h] and
convert it to the 8 corners of the 3D box
Returns:
corners_3d: (8,3) array in in rect camera coord
'''
# compute rotational matrix around yaw axis
bbox3d = copy.copy(bbox3d_input)
R = roty(bbox3d[3])
# 3d bounding box dimensions
l = bbox3d[4]
w = bbox3d[5]
h = bbox3d[6]
# 3d bounding box corners
x_corners = [l / 2, l / 2, -l / 2, -l / 2, l / 2, l / 2, -l / 2, -l / 2]
y_corners = [0, 0, 0, 0, -h, -h, -h, -h]
z_corners = [w / 2, -w / 2, -w / 2, w / 2, w / 2, -w / 2, -w / 2, w / 2]
# rotate and translate 3d bounding box
corners_3d = np.dot(R, np.vstack([x_corners, y_corners, z_corners]))
# print corners_3d.shape
corners_3d[0, :] = corners_3d[0, :] + bbox3d[0]
corners_3d[1, :] = corners_3d[1, :] + bbox3d[1]
corners_3d[2, :] = corners_3d[2, :] + bbox3d[2]
return np.transpose(corners_3d)
# Author: Xinshuo Weng
# email: xinshuo.weng@gmail.com
import math
import numpy as np
from filterpy.kalman import KalmanFilter
from sklearn.decomposition import PCA
NUM_FRAME = 5
DIST_THRED = 0.5
ANGLE_THRED = 2.735884
def center_rot_y_f(points):
x_list = points[:, 0]
y_list = points[:, 1]
pca = PCA(n_components=2)
pca.fit(points)
PCA(copy=True, n_components=2, whiten=False)
orientation = pca.components_[0]
dx = x_list[-1] - x_list[0]
dy = y_list[-1] - y_list[0]
alpha = np.dot(np.array([dx, dy]), orientation)
if alpha < 0:
orientation = -orientation
center_rot_y = math.atan2(orientation[1], orientation[0])
return center_rot_y
class KalmanBoxTracker(object):
"""
This class represents the internel state of individual tracked objects observed as bbox.
"""
count = 0
def __init__(self, bbox3D, info):
"""
Initialises a tracker using initial bounding box.
"""
# define constant velocity model
self.kf = KalmanFilter(dim_x=10, dim_z=7)
self.kf.F = np.array([[1, 0, 0, 0, 0, 0, 0, 1, 0, 0], # state transition matrix
[0, 1, 0, 0, 0, 0, 0, 0, 1, 0],
[0, 0, 1, 0, 0, 0, 0, 0, 0, 1],
[0, 0, 0, 1, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 1, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 1, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 1, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 1, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 1]])
self.kf.H = np.array([[1, 0, 0, 0, 0, 0, 0, 0, 0, 0], # measurement function,
[0, 1, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 1, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 1, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 1, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 1, 0, 0, 0]])
# # with angular velocity
# self.kf = KalmanFilter(dim_x=11, dim_z=7)
# self.kf.F = np.array([[1,0,0,0,0,0,0,1,0,0,0], # state transition matrix
# [0,1,0,0,0,0,0,0,1,0,0],
# [0,0,1,0,0,0,0,0,0,1,0],
# [0,0,0,1,0,0,0,0,0,0,1],
# [0,0,0,0,1,0,0,0,0,0,0],
# [0,0,0,0,0,1,0,0,0,0,0],
# [0,0,0,0,0,0,1,0,0,0,0],
# [0,0,0,0,0,0,0,1,0,0,0],
# [0,0,0,0,0,0,0,0,1,0,0],
# [0,0,0,0,0,0,0,0,0,1,0],
# [0,0,0,0,0,0,0,0,0,0,1]])
# self.kf.H = np.array([[1,0,0,0,0,0,0,0,0,0,0], # measurement function,
# [0,1,0,0,0,0,0,0,0,0,0],
# [0,0,1,0,0,0,0,0,0,0,0],
# [0,0,0,1,0,0,0,0,0,0,0],
# [0,0,0,0,1,0,0,0,0,0,0],
# [0,0,0,0,0,1,0,0,0,0,0],
# [0,0,0,0,0,0,1,0,0,0,0]])
# self.kf.R[0:,0:] *= 10. # measurement uncertainty
self.kf.P[7:,
7:] *= 1000. # state uncertainty, give high uncertainty to the unobservable initial velocities, covariance matrix
self.kf.P *= 10.
# self.kf.Q[-1,-1] *= 0.01 # process uncertainty
self.kf.Q[7:, 7:] *= 0.01
self.kf.x[:7] = bbox3D.reshape((7, 1))
self.time_since_update = 0
self.id = KalmanBoxTracker.count
KalmanBoxTracker.count += 1
self.history = []
self.hits = 1 # number of total hits including the first detection
self.hit_streak = 1 # number of continuing hit considering the first detection
self.first_continuing_hit = 1
self.still_first = True
self.age = 0
self.info = info # other info associated
last_x_y = self.kf.x[:2]
self.last_x_y_list = [last_x_y]
def update(self, bbox3D, info):
"""
Updates the state vector with observed bbox.
"""
points = self.last_x_y_list[:]
points = np.array(points)
points = np.squeeze(points, axis=(2,))
x_list = points[:, 0]
y_list = points[:, 1]
if len(points) >= NUM_FRAME and abs(x_list[-1] - x_list[0]) > DIST_THRED or abs(y_list[-1] - y_list[0]) > DIST_THRED:
center_rot_y = center_rot_y_f(x_list, y_list)
rot_y = bbox3D[3]
if abs(center_rot_y - rot_y) > np.pi / 2.0 and abs(
center_rot_y - rot_y) < np.pi * 3 / 2.0: # if the angle of two theta is not acute angle
rot_y += np.pi
if rot_y > np.pi: rot_y -= np.pi * 2 # make the theta still in the range
if rot_y < -np.pi: rot_y += np.pi * 2
refine_alpha = (math.pi*ANGLE_THRED/180)
if abs(rot_y - center_rot_y) >= np.pi * 3 / 2.0:
if rot_y > center_rot_y + refine_alpha:
rot_y += refine_alpha
if rot_y < center_rot_y - refine_alpha:
rot_y -= refine_alpha
else:
if rot_y > center_rot_y + refine_alpha:
rot_y -= refine_alpha
if rot_y < center_rot_y - refine_alpha:
rot_y += refine_alpha
if rot_y > np.pi: rot_y -= np.pi * 2
if rot_y < -np.pi: rot_y += np.pi * 2
bbox3D[3] = rot_y
self.time_since_update = 0
self.history = []
self.hits += 1
self.hit_streak += 1 # number of continuing hit
if self.still_first:
self.first_continuing_hit += 1 # number of continuing hit in the fist time
######################### orientation correction
if self.kf.x[3] >= np.pi: self.kf.x[3] -= np.pi * 2 # make the theta still in the range
if self.kf.x[3] < -np.pi: self.kf.x[3] += np.pi * 2
new_theta = bbox3D[3]
if new_theta >= np.pi: new_theta -= np.pi * 2 # make the theta still in the range
if new_theta < -np.pi: new_theta += np.pi * 2
bbox3D[3] = new_theta
predicted_theta = self.kf.x[3]
if abs(new_theta - predicted_theta) > np.pi / 2.0 and abs(
new_theta - predicted_theta) < np.pi * 3 / 2.0: # if the angle of two theta is not acute angle
self.kf.x[3] += np.pi
if self.kf.x[3] > np.pi: self.kf.x[3] -= np.pi * 2 # make the theta still in the range
if self.kf.x[3] < -np.pi: self.kf.x[3] += np.pi * 2
# now the angle is acute: < 90 or > 270, convert the case of > 270 to < 90
if abs(new_theta - self.kf.x[3]) >= np.pi * 3 / 2.0:
if new_theta > 0:
self.kf.x[3] += np.pi * 2
else:
self.kf.x[3] -= np.pi * 2
######################### # flip
self.kf.update(bbox3D)
if self.kf.x[3] >= np.pi: self.kf.x[3] -= np.pi * 2 # make the theta still in the rage
if self.kf.x[3] < -np.pi: self.kf.x[3] += np.pi * 2
self.info = info
def predict(self):
"""
Advances the state vector and returns the predicted bounding box estimate.
"""
if self.time_since_update == 0:
last_x_y = self.kf.x[:2]
self.last_x_y_list.append(last_x_y)
if len(self.last_x_y_list) > NUM_FRAME:
del(self.last_x_y_list[0])
self.kf.predict()
if self.kf.x[3] >= np.pi: self.kf.x[3] -= np.pi * 2
if self.kf.x[3] < -np.pi: self.kf.x[3] += np.pi * 2
self.age += 1
if (self.time_since_update > 0):
self.hit_streak = 0
self.still_first = False
self.time_since_update += 1
self.history.append(self.kf.x)
return self.history[-1]
def get_state(self):
"""
Returns the current bounding box estimate.
"""
return self.kf.x[:7].reshape((7,))
def get_orientation(self):
return self.kf.x[7:].reshape((3,))
import numpy as np, cv2, os
class Object3d(object):
''' 3d object label '''
def __init__(self, label_file_line):
data = label_file_line.split(' ')
data[1:] = [float(x) for x in data[1:]]
# extract label, truncation, occlusion
self.type = data[0] # 'Car', 'Pedestrian', ...
self.truncation = data[1] # truncated pixel ratio [0..1]
self.occlusion = int(data[2]) # 0=visible, 1=partly occluded, 2=fully occluded, 3=unknown
self.alpha = data[3] # object observation angle [-pi..pi]
# extract 2d bounding box in 0-based coordinates
self.xmin = data[4] # left
self.ymin = data[5] # top
self.xmax = data[6] # right
self.ymax = data[7] # bottom
self.box2d = np.array([self.xmin,self.ymin,self.xmax,self.ymax])
# extract 3d bounding box information
self.h = data[8] # box height
self.w = data[9] # box width
self.l = data[10] # box length (in meters)
self.t = (data[11],data[12],data[13]) # location (x,y,z) in camera coord.
self.ry = data[14] # yaw angle (around Y-axis in camera coordinates) [-pi..pi]
if len(data) > 15: self.score = float(data[15])
if len(data) > 16: self.id = int(data[16])
def print_object(self):
print('Type, truncation, occlusion, alpha: %s, %d, %d, %f' % (self.type, self.truncation, self.occlusion, self.alpha))
print('2d bbox (x0,y0,x1,y1): %f, %f, %f, %f' % (self.xmin, self.ymin, self.xmax, self.ymax))
print('3d bbox h,w,l: %f, %f, %f' % (self.h, self.w, self.l))
print('3d bbox location, ry: (%f, %f, %f), %f' % (self.t[0], self.t[1], self.t[2], self.ry))
def convert_to_str(self):
return '%s %.2f %d %.2f %.2f %.2f %.2f %.2f %.2f %.2f %.2f %.2f %.2f %.2f %.2f' % \
(self.type, self.truncation, self.occlusion, self.alpha, self.xmin, self.ymin, self.xmax, self.ymax,
self.h, self.w, self.l, self.t[0], self.t[1], self.t[2], self.ry)
class Calibration(object):
''' Calibration matrices and utils
3d XYZ in <label>.txt are in rect camera coord.
2d box xy are in image2 coord
Points in <lidar>.bin are in Velodyne coord.
y_image2 = P^2_rect * x_rect
y_image2 = P^2_rect * R0_rect * Tr_velo_to_cam * x_velo
x_ref = Tr_velo_to_cam * x_velo
x_rect = R0_rect * x_ref
P^2_rect = [f^2_u, 0, c^2_u, -f^2_u b^2_x;
0, f^2_v, c^2_v, -f^2_v b^2_y;
0, 0, 1, 0]
= K * [1|t]
image2 coord:
----> x-axis (u)
|
|
v y-axis (v)
velodyne coord:
front x, left y, up z
rect/ref camera coord:
right x, down y, front z
Ref (KITTI paper): http://www.cvlibs.net/publications/Geiger2013IJRR.pdf
TODO(rqi): do matrix multiplication only once for each projection.
'''
def __init__(self, calib_filepath, from_video=False):
if from_video:
calibs = self.read_calib_from_video(calib_filepath)
else:
calibs = self.read_calib_file(calib_filepath)
# Projection matrix from rect camera coord to image2 coord
self.P = calibs['P2']
self.P = np.reshape(self.P, [3,4])
# Rigid transform from Velodyne coord to reference camera coord
self.V2C = calibs['Tr_velo_to_cam']
self.V2C = np.reshape(self.V2C, [3,4])
self.C2V = inverse_rigid_trans(self.V2C)
# Rotation from reference camera coord to rect camera coord
self.R0 = calibs['R0_rect']
self.R0 = np.reshape(self.R0,[3,3])
# Camera intrinsics and extrinsics
self.c_u = self.P[0,2]
self.c_v = self.P[1,2]
self.f_u = self.P[0,0]
self.f_v = self.P[1,1]
self.b_x = self.P[0,3]/(-self.f_u) # relative
self.b_y = self.P[1,3]/(-self.f_v)
def read_calib_file(self, filepath):
''' Read in a calibration file and parse into a dictionary.
Ref: https://github.com/utiasSTARS/pykitti/blob/master/pykitti/utils.py
'''
data = {}
with open(filepath, 'r') as f:
for line in f.readlines():
line = line.rstrip()
if len(line)==0: continue
key, value = line.split(':', 1)
# The only non-float values in these files are dates, which
# we don't care about anyway
try:
data[key] = np.array([float(x) for x in value.split()])
except ValueError:
pass
return data
def read_calib_from_video(self, calib_root_dir):
''' Read calibration for camera 2 from video calib files.
there are calib_cam_to_cam and calib_velo_to_cam under the calib_root_dir
'''
data = {}
cam2cam = self.read_calib_file(os.path.join(calib_root_dir, 'calib_cam_to_cam.txt'))
velo2cam = self.read_calib_file(os.path.join(calib_root_dir, 'calib_velo_to_cam.txt'))
Tr_velo_to_cam = np.zeros((3,4))
Tr_velo_to_cam[0:3,0:3] = np.reshape(velo2cam['R'], [3,3])
Tr_velo_to_cam[:,3] = velo2cam['T']
data['Tr_velo_to_cam'] = np.reshape(Tr_velo_to_cam, [12])
data['R0_rect'] = cam2cam['R_rect_00']
data['P2'] = cam2cam['P_rect_02']
return data
def cart2hom(self, pts_3d):
''' Input: nx3 points in Cartesian
Oupput: nx4 points in Homogeneous by pending 1
'''
n = pts_3d.shape[0]
pts_3d_hom = np.hstack((pts_3d, np.ones((n,1))))
return pts_3d_hom
# ===========================
# ------- 3d to 3d ----------
# ===========================
def project_velo_to_ref(self, pts_3d_velo):
pts_3d_velo = self.cart2hom(pts_3d_velo) # nx4
return np.dot(pts_3d_velo, np.transpose(self.V2C))
def project_ref_to_velo(self, pts_3d_ref):
pts_3d_ref = self.cart2hom(pts_3d_ref) # nx4
return np.dot(pts_3d_ref, np.transpose(self.C2V))
def project_rect_to_ref(self, pts_3d_rect):
''' Input and Output are nx3 points '''
return np.transpose(np.dot(np.linalg.inv(self.R0), np.transpose(pts_3d_rect)))
def project_ref_to_rect(self, pts_3d_ref):
''' Input and Output are nx3 points '''
return np.transpose(np.dot(self.R0, np.transpose(pts_3d_ref)))
def project_rect_to_velo(self, pts_3d_rect):
''' Input: nx3 points in rect camera coord.
Output: nx3 points in velodyne coord.
'''
pts_3d_ref = self.project_rect_to_ref(pts_3d_rect)
return self.project_ref_to_velo(pts_3d_ref)
def project_velo_to_rect(self, pts_3d_velo):
pts_3d_ref = self.project_velo_to_ref(pts_3d_velo)
return self.project_ref_to_rect(pts_3d_ref)
# ===========================
# ------- 3d to 2d ----------
# ===========================
def project_rect_to_image(self, pts_3d_rect):
''' Input: nx3 points in rect camera coord.
Output: nx2 points in image2 coord.
'''
pts_3d_rect = self.cart2hom(pts_3d_rect)
pts_2d = np.dot(pts_3d_rect, np.transpose(self.P)) # nx3
pts_2d[:,0] /= pts_2d[:,2]
pts_2d[:,1] /= pts_2d[:,2]
return pts_2d[:,0:2]
def project_velo_to_image(self, pts_3d_velo):
''' Input: nx3 points in velodyne coord.
Output: nx2 points in image2 coord.
'''
pts_3d_rect = self.project_velo_to_rect(pts_3d_velo)
return self.project_rect_to_image(pts_3d_rect)
# ===========================
# ------- 2d to 3d ----------
# ===========================
def project_image_to_rect(self, uv_depth):
''' Input: nx3 first two channels are uv, 3rd channel
is depth in rect camera coord.
Output: nx3 points in rect camera coord.
'''
n = uv_depth.shape[0]
x = ((uv_depth[:,0]-self.c_u)*uv_depth[:,2])/self.f_u + self.b_x
y = ((uv_depth[:,1]-self.c_v)*uv_depth[:,2])/self.f_v + self.b_y
pts_3d_rect = np.zeros((n,3))
pts_3d_rect[:,0] = x
pts_3d_rect[:,1] = y
pts_3d_rect[:,2] = uv_depth[:,2]
return pts_3d_rect
def project_image_to_velo(self, uv_depth):
pts_3d_rect = self.project_image_to_rect(uv_depth)
return self.project_rect_to_velo(pts_3d_rect)
def inverse_rigid_trans(Tr):
''' Inverse a rigid body transform matrix (3x4 as [R|t])
[R'|-R't; 0|1]
'''
inv_Tr = np.zeros_like(Tr) # 3x4
inv_Tr[0:3,0:3] = np.transpose(Tr[0:3,0:3])
inv_Tr[0:3,3] = np.dot(-np.transpose(Tr[0:3,0:3]), Tr[0:3,3])
return inv_Tr
def read_label(label_filename):
lines = [line.rstrip() for line in open(label_filename)]
# if mask: objects = [Object3d_Mask(line) for line in lines]
objects = [Object3d(line) for line in lines]
return objects
def compute_box_3d(obj, P):
''' Takes an object and a projection matrix (P) and projects the 3d
bounding box into the image plane.
Returns:
corners_2d: (8,2) array in left image coord.
corners_3d: (8,3) array in in rect camera coord.
'''
# compute rotational matrix around yaw axis
R = roty(obj.ry)
# 3d bounding box dimensions
l = obj.l;
w = obj.w;
h = obj.h;
# 3d bounding box corners
x_corners = [l/2,l/2,-l/2,-l/2,l/2,l/2,-l/2,-l/2];
y_corners = [0,0,0,0,-h,-h,-h,-h];
z_corners = [w/2,-w/2,-w/2,w/2,w/2,-w/2,-w/2,w/2];
# rotate and translate 3d bounding box
corners_3d = np.dot(R, np.vstack([x_corners,y_corners,z_corners]))
#print corners_3d.shape
corners_3d[0,:] = corners_3d[0,:] + obj.t[0];
corners_3d[1,:] = corners_3d[1,:] + obj.t[1];
corners_3d[2,:] = corners_3d[2,:] + obj.t[2];
# print('cornsers_3d: ', corners_3d)
# TODO: bugs when the point is behind the camera, the 2D coordinate is wrong
# only draw 3d bounding box for objs in front of the camera
if np.any(corners_3d[2,:]<0.1):
corners_2d = None
return corners_2d, np.transpose(corners_3d)
# project the 3d bounding box into the image plane
corners_2d = project_to_image(np.transpose(corners_3d), P);
#print 'corners_2d: ', corners_2d
return corners_2d, np.transpose(corners_3d)
def project_to_image(pts_3d, P):
''' Project 3d points to image plane.
Usage: pts_2d = projectToImage(pts_3d, P)
input: pts_3d: nx3 matrix
P: 3x4 projection matrix
output: pts_2d: nx2 matrix
P(3x4) dot pts_3d_extended(4xn) = projected_pts_2d(3xn)
=> normalize projected_pts_2d(2xn)
<=> pts_3d_extended(nx4) dot P'(4x3) = projected_pts_2d(nx3)
=> normalize projected_pts_2d(nx2)
'''
n = pts_3d.shape[0]
pts_3d_extend = np.hstack((pts_3d, np.ones((n,1))))
# print(('pts_3d_extend shape: ', pts_3d_extend.shape))
pts_2d = np.dot(pts_3d_extend, np.transpose(P)) # nx3
pts_2d[:,0] /= pts_2d[:,2]
pts_2d[:,1] /= pts_2d[:,2]
return pts_2d[:,0:2]
def draw_projected_box3d(image, qs, color=(255,255,255), thickness=4):
''' Draw 3d bounding box in image
qs: (8,2) array of vertices for the 3d box in following order:
1 -------- 0
/| /|
2 -------- 3 .
| | | |
. 5 -------- 4
|/ |/
6 -------- 7
'''
if qs is not None:
qs = qs.astype(np.int32)
for k in range(0,4):
i,j=k,(k+1)%4
image = cv2.line(image, (qs[i,0],qs[i,1]), (qs[j,0],qs[j,1]), color, thickness) # use LINE_AA for opencv3
i,j=k+4,(k+1)%4 + 4
image = cv2.line(image, (qs[i,0],qs[i,1]), (qs[j,0],qs[j,1]), color, thickness)
i,j=k,k+4
image = cv2.line(image, (qs[i,0],qs[i,1]), (qs[j,0],qs[j,1]), color, thickness)
return image
def roty(t):
''' Rotation about the y-axis. '''
c = np.cos(t)
s = np.sin(t)
return np.array([[c, 0, s],
[0, 1, 0],
[-s, 0, c]])
# Author: Xinshuo Weng
# email: xinshuo.weng@gmail.com
import numpy as np
# from sklearn.utils.linear_assignment_ import linear_assignment # deprecated
from scipy.optimize import linear_sum_assignment
from AB3DMOT_libs.bbox_utils import convert_3dbox_to_8corner, iou3d
from AB3DMOT_libs.kalman_filter import KalmanBoxTracker
def associate_detections_to_trackers(detections, trackers, iou_threshold=0.01):
"""
Assigns detections to tracked object (both represented as bounding boxes)
detections: N x 8 x 3
trackers: M x 8 x 3
Returns 3 lists of matches, unmatched_detections and unmatched_trackers
"""
if (len(trackers) == 0):
return np.empty((0, 2), dtype=int), np.arange(len(detections)), np.empty((0, 8, 3), dtype=int)
iou_matrix = np.zeros((len(detections), len(trackers)), dtype=np.float32)
for d, det in enumerate(detections):
for t, trk in enumerate(trackers):
iou_matrix[d, t] = iou3d(det, trk)[0] # det: 8 x 3, trk: 8 x 3
# matched_indices = linear_assignment(-iou_matrix)
# hougarian algorithm, compatible to linear_assignment in sklearn.utils
row_ind, col_ind = linear_sum_assignment(-iou_matrix) # hougarian algorithm
matched_indices = np.stack((row_ind, col_ind), axis=1)
unmatched_detections = []
for d, det in enumerate(detections):
if (d not in matched_indices[:, 0]): unmatched_detections.append(d)
unmatched_trackers = []
for t, trk in enumerate(trackers):
if (t not in matched_indices[:, 1]): unmatched_trackers.append(t)
# filter out matched with low IOU
matches = []
for m in matched_indices:
if (iou_matrix[m[0], m[1]] < iou_threshold):
unmatched_detections.append(m[0])
unmatched_trackers.append(m[1])
else:
matches.append(m.reshape(1, 2))
if (len(matches) == 0):
matches = np.empty((0, 2), dtype=int)
else:
matches = np.concatenate(matches, axis=0)
return matches, np.array(unmatched_detections), np.array(unmatched_trackers)
class AB3DMOT(object): # A baseline of 3D multi-object tracking
# max age will preserve the bbox does not appear no more than 2 frames, interpolate the detection
def __init__(self, max_age=2,
min_hits=3):
"""
Sets key parameters for SORT
"""
self.max_age = max_age
self.min_hits = min_hits
self.trackers = []
self.frame_count = 0
self.reorder = [3, 4, 5, 6, 2, 1, 0]
self.reorder_back = [6, 5, 4, 0, 1, 2, 3]
def update(self, dets_all):
"""
Params:
dets_all: dict
dets - a numpy array of detections in the format [[h,w,l,x,y,z,theta],...]
info: a array of other info for each det
Requires: this method must be called once for each frame even with empty detections.
Returns the a similar array, where the last column is the object ID.
NOTE: The number of objects returned may differ from the number of detections provided.
"""
dets, info = dets_all['dets'], dets_all['info'] # dets: N x 7, float numpy array
# reorder the data to put x,y,z in front to be compatible with the state transition matrix
# where the constant velocity model is defined in the first three rows of the matrix
dets = dets[:, self.reorder] # reorder the data to [[x,y,z,theta,l,w,h], ...]
self.frame_count += 1
trks = np.zeros((len(self.trackers), 7)) # N x 7 , # get predicted locations from existing trackers.
to_del = []
ret = []
for t, trk in enumerate(trks):
pos = self.trackers[t].predict().reshape((-1, 1))
trk[:] = [pos[0], pos[1], pos[2], pos[3], pos[4], pos[5], pos[6]]
if (np.any(np.isnan(pos))):
to_del.append(t)
trks = np.ma.compress_rows(np.ma.masked_invalid(trks))
for t in reversed(to_del):
self.trackers.pop(t)
dets_8corner = [convert_3dbox_to_8corner(det_tmp) for det_tmp in dets]
if len(dets_8corner) > 0:
dets_8corner = np.stack(dets_8corner, axis=0)
else:
dets_8corner = []
trks_8corner = [convert_3dbox_to_8corner(trk_tmp) for trk_tmp in trks]
if len(trks_8corner) > 0: trks_8corner = np.stack(trks_8corner, axis=0)
matched, unmatched_dets, unmatched_trks = associate_detections_to_trackers(dets_8corner, trks_8corner)
# update matched trackers with assigned detections
for t, trk in enumerate(self.trackers):
if t not in unmatched_trks:
d = matched[np.where(matched[:, 1] == t)[0], 0] # a list of index
ori_info = trk.info
cur_info = info[d, :][0]
if ori_info[-1] !='None':
cur_info[-1] = ori_info[-1]
if ori_info[-2] !='None':
cur_info[-2] = ori_info[-2]
if ori_info[-3] !='None':
cur_info[-3] = ori_info[-3]
trk.update(dets[d, :][0], cur_info)
else:
trk.info[0] = info[0][0]
# create and initialise new trackers for unmatched detections
for i in unmatched_dets: # a scalar of index
trk = KalmanBoxTracker(dets[i, :], info[i, :])
self.trackers.append(trk)
i = len(self.trackers)
for trk in reversed(self.trackers):
d = trk.get_state() # bbox location
d = d[self.reorder_back] # change format from [x,y,z,theta,l,w,h] to [h,w,l,x,y,z,theta]
orientation = trk.get_orientation()
if ((trk.time_since_update < self.max_age) and (
trk.hits >= self.min_hits or self.frame_count <= self.min_hits)):
# if trk.time_since_update < self.max_age:
ret.append(
np.concatenate((d, [trk.id + 1], orientation, trk.info)).reshape(1, -1)) # +1 as MOT benchmark requires positive
i -= 1
# remove dead tracklet
if (trk.time_since_update >= self.max_age):
self.trackers.pop(i)
if (len(ret) > 0): return np.concatenate(ret) # h,w,l,x,y,z,theta, ID, other info, confidence
return np.empty((0, 24))
def predict(self, timestamp):
rets = []
if len(self.trackers) == 0:
return rets
pre_timestamp = int(self.trackers[0].info[0])
if (timestamp - pre_timestamp) > 450:
return rets
count = int(round((timestamp - pre_timestamp) / 100.0))
index = 1
while index < count:
self.frame_count += 1
trks = np.zeros((len(self.trackers), 7))
to_del = []
ret = []
for t, trk in enumerate(trks):
pos = self.trackers[t].predict().reshape((-1, 1))
trk[:] = [pos[0], pos[1], pos[2], pos[3], pos[4], pos[5], pos[6]]
if (np.any(np.isnan(pos))):
to_del.append(t)
for t in reversed(to_del):
self.trackers.pop(t)
for trk in reversed(self.trackers):
d = trk.get_state() # bbox location
d = d[self.reorder_back] # change format from [x,y,z,theta,l,w,h] to [h,w,l,x,y,z,theta]
orientation = trk.get_orientation()
trk.info[0] = pre_timestamp + 100*index
ret.append(
np.concatenate((d, [trk.id + 1], orientation, trk.info)).reshape(1, -1)) # +1 as MOT benchmark requires positive
rets.append(np.concatenate(ret))
index += 1
return rets
glob2
pillow
scikit-learn
scikit-image
scikit-video
opencv-python==3.4.2.17
matplotlib
filterpy==1.4.5
numba==0.43.1
llvmlite==0.31.0
setuptools
rosdep
rosinstall_generator
wstool
rosinstall
six
vcstools
empy
glob2 pillow scikit-learn scikit-image scikit-video matplotlib filterpy==1.4.5 setuptools rosdep rosinstall_generator wstool rosinstall six vcstools empy
# Author: Xinshuo Weng
# email: xinshuo.weng@gmail.com
import yaml
from easydict import EasyDict as edict
def Config(filename):
listfile1 = open(filename, 'r')
listfile2 = open(filename, 'r')
cfg = edict(yaml.safe_load(listfile1))
settings_show = listfile2.read().splitlines()
listfile1.close()
listfile2.close()
return cfg, settings_show
\ No newline at end of file
#!/usr/bin/env python
# encoding: utf-8
import os
import rospy
import time
import math
import numpy as np
from tracking.msg import det_tracking, det_tracking_array
from sensor_msgs.msg import PointCloud2
from jsk_recognition_msgs.msg import BoundingBoxArray, BoundingBox
from visualization_msgs.msg import MarkerArray, Marker
from AB3DMOT_libs.model import AB3DMOT
from pyquaternion import Quaternion
from utils.demo_ww import get_loc
def color_name2RGB(color_name):
if color_name == 'red':
return 1.0,0,0
elif color_name == 'orange':
return 1.0,165.0/255,0
elif color_name == 'yellow':
return 1.0,1.0,0
elif color_name == 'green':
return 0,1.0,0
elif color_name == 'cyan':
return 0,1.0,1.0
elif color_name == 'blue':
return 0,0,1.0
elif color_name == 'violet':
return 238.0/255,130.0/255,238.0/255
elif color_name == 'black':
return 0,0,0
elif color_name == 'grey':
return 190.0/255,190.0/255,190.0/255
elif color_name == 'white':
return 1.0,1.0,1.0
else:
return 1.0,1.0,1.0
def rotz(t):
"""Rotation about the z-axis."""
c = np.cos(t)
s = np.sin(t)
return np.array([[c, -s, 0],
[s, c, 0],
[0, 0, 1]])
def loc2backwheel(msg, deal_type=0, bw_distance=3.6):
hypotenuse = np.sqrt(msg.v_x ** 2 + msg.v_y ** 2)
if msg.type != deal_type or hypotenuse <= 0:
return msg.loc_x, msg.loc_y, msg.loc_z
a = -msg.v_x*bw_distance / hypotenuse
b = -msg.v_y*bw_distance / hypotenuse
return msg.loc_x + a, msg.loc_y + b, msg.loc_z
def callback(msgs):
log_file = open(log_file_path, 'a')
cloud_pub = rospy.Publisher("tracking_cloud", PointCloud2, queue_size=10)
cloud_pub.publish(msgs.cloud)
if len(msgs.array) == 0:
bbox_arr = BoundingBoxArray()
bbox_arr.header.frame_id = "Pandar64"
bbox_pub = rospy.Publisher("tracking_bbox", BoundingBoxArray, queue_size=10)
bbox_pub.publish(bbox_arr)
marker_bbox_arr = MarkerArray()
marker_bbox_pub = rospy.Publisher("marker_tracking_bbox", MarkerArray, queue_size=10)
marker_bbox_pub.publish(marker_bbox_arr)
marker_arr = MarkerArray()
marker_pub = rospy.Publisher("tracking_loc", MarkerArray, queue_size=10)
marker_pub.publish(marker_arr)
return
dets = []
infos = []
for msg in msgs.array:
det = [msg.h, msg.w, msg.l, msg.x, msg.y, msg.z, msg.rot_y]
info = [msg.frame, msg.type, msg.score, msg.name, msg.license_plate_number, msg.color_name]
dets.append(det)
infos.append(info)
dets_all = {'dets': np.asarray(dets), 'info': np.asarray(infos)}
trackers = mot_tracker.update(dets_all)
bbox_arr = BoundingBoxArray()
bbox_arr.header.frame_id = "Pandar64"
marker_bbox_arr = MarkerArray()
bbox_i = 1
marker_arr = MarkerArray()
i = 1
for tracker in trackers:
msg = det_tracking()
msg.h = float(tracker[0])
msg.w = float(tracker[1])
msg.l = float(tracker[2])
msg.x = float(tracker[3])
msg.y = float(tracker[4])
msg.z = float(tracker[5])
msg.rot_y = float(tracker[6])
msg.obj_id = int(tracker[7])
msg.v_x = float(tracker[8])
msg.v_y = float(tracker[9])
msg.v_z = float(tracker[10])
msg.frame = int(tracker[11])
msg.type = int(tracker[12])
msg.score = float(tracker[13])
msg.name = str(tracker[14])
msg.license_plate_number = str(tracker[15])
msg.color_name = str(tracker[16])
lidar_loc, out_BL = get_loc([msg.x, msg.y, msg.z, msg.l, msg.w, msg.h, msg.rot_y], msg.type)
msg.loc_x = lidar_loc[0]
msg.loc_y = lidar_loc[1]
msg.loc_z = lidar_loc[2]
bbox = BoundingBox()
bbox.label = msg.obj_id
bbox.value = 2
bbox.header.frame_id = "Pandar64"
bbox.dimensions.x = msg.l
bbox.dimensions.y = msg.w
bbox.dimensions.z = msg.h
bbox.pose.position.x = msg.x
bbox.pose.position.y = msg.y
bbox.pose.position.z = msg.z
R = rotz(msg.rot_y)
R_quat= list(Quaternion(matrix=R))
bbox.pose.orientation.x = R_quat[1]
bbox.pose.orientation.y = R_quat[2]
bbox.pose.orientation.z = R_quat[3]
bbox.pose.orientation.w = R_quat[0]
bbox_arr.boxes.append(bbox)
marker_bbox = Marker()
marker_bbox.id = bbox_i
marker_bbox.header.frame_id = "Pandar64"
marker_bbox.action = Marker.ADD
marker_bbox.type = Marker.CUBE
marker_bbox.pose.position.x = msg.x
marker_bbox.pose.position.y = msg.y
marker_bbox.pose.position.z = msg.z
R = rotz(msg.rot_y)
R_quat= list(Quaternion(matrix=R))
marker_bbox.pose.orientation.x = R_quat[1]
marker_bbox.pose.orientation.y = R_quat[2]
marker_bbox.pose.orientation.z = R_quat[3]
marker_bbox.pose.orientation.w = R_quat[0]
marker_bbox.scale.x = msg.l
marker_bbox.scale.y = msg.w
marker_bbox.scale.z = msg.h
RGB = color_name2RGB(msg.color_name)
marker_bbox.color.a = 0.8
marker_bbox.color.r = RGB[0]
marker_bbox.color.g = RGB[1]
marker_bbox.color.b = RGB[2]
marker_bbox.lifetime = rospy.Duration(0.5)
marker_bbox_arr.markers.append(marker_bbox)
bbox_i += 1
if msg.type != 4 and msg.loc_x < 30 and msg.loc_y < -2 and (msg.loc_x < 28 or msg.loc_y < -7):
marker = Marker()
marker.id = i
marker.header.frame_id = "Pandar64"
marker.action = Marker.ADD
marker.type = Marker.SPHERE
marker.pose.position.x = msg.loc_x
marker.pose.position.y = msg.loc_y
marker.pose.position.z = msg.z + msg.h / 2
marker.pose.orientation.x = 0.0
marker.pose.orientation.y = 0.0
marker.pose.orientation.z = 0.0
marker.pose.orientation.w = 1.0
marker.scale.x = 0.5
marker.scale.y = 0.5
marker.scale.z = 0.5
marker.color.a = 1.0
marker.color.r = 1.0
marker.color.g = 1.0
marker.color.b = 1.0
marker.lifetime = rospy.Duration(0.5)
marker_arr.markers.append(marker)
i += 1
t_marker = Marker()
t_marker.id = i
t_marker.header.frame_id = "Pandar64"
t_marker.action = Marker.ADD
t_marker.type = Marker.TEXT_VIEW_FACING
t_marker.pose.position.x = msg.x
t_marker.pose.position.y = msg.y
t_marker.pose.position.z = msg.z + msg.h / 2 + 0.3
t_marker.pose.orientation.x = 0.0
t_marker.pose.orientation.y = 0.0
t_marker.pose.orientation.z = 0.0
t_marker.pose.orientation.w = 1.0
t_marker.scale.x = 1.2
t_marker.scale.y = 1.2
t_marker.scale.z = 1.2
t_marker.color.a = 1.0
t_marker.color.r = 0
t_marker.color.g = 1.0
t_marker.color.b = 0
t_marker.lifetime = rospy.Duration(0.5)
t_marker.text = 'id:{0},(x,y):({1:.2f},{2:.2f}),v:{3:.2f}m/s\ntype:{4},No:{6}'.format(msg.obj_id, msg.x, msg.y, math.sqrt(msg.v_x*msg.v_x+msg.v_y*msg.v_y), msg.name, msg.color_name, msg.license_plate_number)
marker_arr.markers.append(t_marker)
i += 1
log_file.write('%u,%u,%f,%f,%f,%f,%f,%f,%f,%f,%d,%f,%f,%f,%f,%f,%f,%f,%f\n' % (
msg.frame, msg.type, msg.score, msg.h, msg.w, msg.l, msg.x, msg.y, msg.z, msg.rot_y,
msg.obj_id, msg.v_x, msg.v_y, msg.v_z,msg.loc_x, msg.loc_y, msg.loc_z, out_BL[0], out_BL[1]))
log_file.close()
bbox_pub = rospy.Publisher("tracking_bbox", BoundingBoxArray, queue_size=10)
bbox_pub.publish(bbox_arr)
marker_bbox_pub = rospy.Publisher("marker_tracking_bbox", MarkerArray, queue_size=10)
marker_bbox_pub.publish(marker_bbox_arr)
marker_pub = rospy.Publisher("tracking_loc", MarkerArray, queue_size=10)
marker_pub.publish(marker_arr)
def listener():
rospy.init_node('lidar_tracking_listener', anonymous=True)
rospy.Subscriber("/detection/lidar_detector/objects", det_tracking_array, callback)
rospy.spin()
if __name__ == '__main__':
global mot_tracker, log_file_path
mot_tracker = AB3DMOT(max_age=3, min_hits=1)
cur_time = time.strftime('%Y%m%d_%H:%M:%S', time.localtime(time.time()))
log_file_path = 'src/tracking/logs/' + cur_time + '.txt'
if not os.path.exists('src/tracking/logs/'):
os.mkdir('src/tracking/logs/')
with open(log_file_path, 'w') as f:
f.write("frame,type,score,h,w,l,x,y,z,rot_y,obj_id,v_x,v_y,v_z,loc_x,loc_y,loc_z,Lat,Long\n")
listener()
#!/usr/bin/env python
# encoding: utf-8
import os
import rospy
import time
import math
import numpy as np
from tracking.msg import det_tracking, det_tracking_array
from sensor_msgs.msg import PointCloud2
from jsk_recognition_msgs.msg import BoundingBoxArray, BoundingBox
from visualization_msgs.msg import MarkerArray, Marker
from AB3DMOT_libs.model import AB3DMOT
from pyquaternion import Quaternion
from utils.demo_ww import get_loc
def color_name2RGB(color_name):
if color_name == 'red':
return 1.0,0,0
elif color_name == 'orange':
return 1.0,165.0/255,0
elif color_name == 'yellow':
return 1.0,1.0,0
elif color_name == 'green':
return 0,1.0,0
elif color_name == 'cyan':
return 0,1.0,1.0
elif color_name == 'blue':
return 0,0,1.0
elif color_name == 'violet':
return 238.0/255,130.0/255,238.0/255
elif color_name == 'black':
return 0,0,0
elif color_name == 'grey':
return 190.0/255,190.0/255,190.0/255
elif color_name == 'white':
return 1.0,1.0,1.0
else:
return 1.0,1.0,1.0
def rotz(t):
"""Rotation about the z-axis."""
c = np.cos(t)
s = np.sin(t)
return np.array([[c, -s, 0],
[s, c, 0],
[0, 0, 1]])
def loc2backwheel(msg, deal_type=0, bw_distance=3.6):
hypotenuse = np.sqrt(msg.v_x ** 2 + msg.v_y ** 2)
if msg.type != deal_type or hypotenuse <= 0:
return msg.loc_x, msg.loc_y, msg.loc_z
a = -msg.v_x*bw_distance / hypotenuse
b = -msg.v_y*bw_distance / hypotenuse
return msg.loc_x + a, msg.loc_y + b, msg.loc_z
def callback(msgs):
log_file = open(log_file_path, 'a')
if save_ori_info:
ori_log_file = open(ori_log_file_path, 'a')
cloud_pub = rospy.Publisher("tracking_cloud", PointCloud2, queue_size=10)
cloud_pub.publish(msgs.cloud)
if len(msgs.array) == 0:
bbox_arr = BoundingBoxArray()
bbox_arr.header.frame_id = "Pandar64"
bbox_pub = rospy.Publisher("tracking_bbox", BoundingBoxArray, queue_size=10)
bbox_pub.publish(bbox_arr)
marker_bbox_arr = MarkerArray()
marker_bbox_pub = rospy.Publisher("marker_tracking_bbox", MarkerArray, queue_size=10)
marker_bbox_pub.publish(marker_bbox_arr)
marker_arr = MarkerArray()
marker_pub = rospy.Publisher("tracking_loc", MarkerArray, queue_size=10)
marker_pub.publish(marker_arr)
return
timestamp = msgs.array[0].frame
multi_trackers = mot_tracker.predict(timestamp)
if len(multi_trackers) > 0:
for trackers in multi_trackers:
for tracker in trackers:
msg = det_tracking()
msg.h = float(tracker[0])
msg.w = float(tracker[1])
msg.l = float(tracker[2])
msg.x = float(tracker[3])
msg.y = float(tracker[4])
msg.z = float(tracker[5])
msg.rot_y = float(tracker[6])
msg.obj_id = int(tracker[7])
msg.v_x = float(tracker[8])
msg.v_y = float(tracker[9])
msg.v_z = float(tracker[10])
msg.frame = int(tracker[11])
msg.type = int(tracker[12])
msg.score = float(tracker[13])
msg.name = str(tracker[14])
msg.license_plate_number = str(tracker[15])
msg.color_name = str(tracker[16])
lidar_loc, out_BL, out_center_BL = get_loc([msg.x, msg.y, msg.z, msg.l, msg.w, msg.h, msg.rot_y], msg.type)
msg.loc_x = lidar_loc[0]
msg.loc_y = lidar_loc[1]
msg.loc_z = lidar_loc[2]
log_file.write('%u,%u,%f,%f,%f,%f,%f,%f,%f,%f,%d,%f,%f,%f,%f,%f,%f,%.8f,%.8f,%.8f,%.8f,%s,%s,%s\n' % (
msg.frame, msg.type, msg.score, msg.h, msg.w, msg.l, msg.x, msg.y, msg.z, msg.rot_y,
msg.obj_id, msg.v_x, msg.v_y, msg.v_z,msg.loc_x, msg.loc_y, msg.loc_z, out_BL[0], out_BL[1], out_center_BL[0], out_center_BL[1], msg.name, msg.license_plate_number, msg.color_name))
dets = []
infos = []
for msg in msgs.array:
det = [msg.h, msg.w, msg.l, msg.x, msg.y, msg.z, msg.rot_y]
info = [msg.frame, msg.type, msg.score, msg.h, msg.w, msg.l, msg.x, msg.y, msg.z, msg.rot_y, msg.name, msg.license_plate_number, msg.color_name]
dets.append(det)
infos.append(info)
dets_all = {'dets': np.asarray(dets), 'info': np.asarray(infos)}
trackers = mot_tracker.update(dets_all)
bbox_arr = BoundingBoxArray()
bbox_arr.header.frame_id = "Pandar64"
marker_bbox_arr = MarkerArray()
bbox_i = 1
marker_arr = MarkerArray()
i = 1
for tracker in trackers:
msg = det_tracking()
msg.h = float(tracker[0])
msg.w = float(tracker[1])
msg.l = float(tracker[2])
msg.x = float(tracker[3])
msg.y = float(tracker[4])
msg.z = float(tracker[5])
msg.rot_y = float(tracker[6])
msg.obj_id = int(tracker[7])
msg.v_x = float(tracker[8])
msg.v_y = float(tracker[9])
msg.v_z = float(tracker[10])
msg.frame = int(tracker[11])
msg.type = int(tracker[12])
msg.score = float(tracker[13])
ori_h = float(tracker[14])
ori_w = float(tracker[15])
ori_l = float(tracker[16])
ori_x = float(tracker[17])
ori_y = float(tracker[18])
ori_z = float(tracker[19])
ori_rot_y = float(tracker[20])
msg.name = str(tracker[21])
msg.license_plate_number = str(tracker[22])
msg.color_name = str(tracker[23])
lidar_loc, out_BL, out_center_BL = get_loc([msg.x, msg.y, msg.z, msg.l, msg.w, msg.h, msg.rot_y], msg.type)
msg.loc_x = lidar_loc[0]
msg.loc_y = lidar_loc[1]
msg.loc_z = lidar_loc[2]
ori_lidar_loc, ori_out_BL, ori_out_center_BL = get_loc([ori_x, ori_y, ori_z, ori_l, ori_w, ori_h, ori_rot_y], msg.type)
ori_loc_x = ori_lidar_loc[0]
ori_loc_y = ori_lidar_loc[1]
ori_loc_z = ori_lidar_loc[2]
bbox = BoundingBox()
bbox.label = msg.obj_id
bbox.value = 2
bbox.header.frame_id = "Pandar64"
bbox.dimensions.x = msg.l
bbox.dimensions.y = msg.w
bbox.dimensions.z = msg.h
bbox.pose.position.x = msg.x
bbox.pose.position.y = msg.y
bbox.pose.position.z = msg.z
R = rotz(msg.rot_y)
R_quat= list(Quaternion(matrix=R))
bbox.pose.orientation.x = R_quat[1]
bbox.pose.orientation.y = R_quat[2]
bbox.pose.orientation.z = R_quat[3]
bbox.pose.orientation.w = R_quat[0]
bbox_arr.boxes.append(bbox)
marker_bbox = Marker()
marker_bbox.id = bbox_i
marker_bbox.header.frame_id = "Pandar64"
marker_bbox.action = Marker.ADD
marker_bbox.type = Marker.CUBE
marker_bbox.pose.position.x = msg.x
marker_bbox.pose.position.y = msg.y
marker_bbox.pose.position.z = msg.z
R = rotz(msg.rot_y)
R_quat= list(Quaternion(matrix=R))
marker_bbox.pose.orientation.x = R_quat[1]
marker_bbox.pose.orientation.y = R_quat[2]
marker_bbox.pose.orientation.z = R_quat[3]
marker_bbox.pose.orientation.w = R_quat[0]
marker_bbox.scale.x = msg.l
marker_bbox.scale.y = msg.w
marker_bbox.scale.z = msg.h
RGB = color_name2RGB(msg.color_name)
marker_bbox.color.a = 0.8
marker_bbox.color.r = RGB[0]
marker_bbox.color.g = RGB[1]
marker_bbox.color.b = RGB[2]
marker_bbox.lifetime = rospy.Duration(0.5)
marker_bbox_arr.markers.append(marker_bbox)
bbox_i += 1
if msg.type != 4:
marker = Marker()
marker.id = i
marker.header.frame_id = "Pandar64"
marker.action = Marker.ADD
marker.type = Marker.SPHERE
marker.pose.position.x = msg.loc_x
marker.pose.position.y = msg.loc_y
marker.pose.position.z = msg.z + msg.h / 2
marker.pose.orientation.x = 0.0
marker.pose.orientation.y = 0.0
marker.pose.orientation.z = 0.0
marker.pose.orientation.w = 1.0
marker.scale.x = 0.5
marker.scale.y = 0.5
marker.scale.z = 0.5
marker.color.a = 1.0
marker.color.r = 1.0
marker.color.g = 1.0
marker.color.b = 1.0
marker.lifetime = rospy.Duration(0.5)
marker_arr.markers.append(marker)
i += 1
t_marker = Marker()
t_marker.id = i
t_marker.header.frame_id = "Pandar64"
t_marker.action = Marker.ADD
t_marker.type = Marker.TEXT_VIEW_FACING
t_marker.pose.position.x = msg.x
t_marker.pose.position.y = msg.y
t_marker.pose.position.z = msg.z + msg.h / 2 + 0.3
t_marker.pose.orientation.x = 0.0
t_marker.pose.orientation.y = 0.0
t_marker.pose.orientation.z = 0.0
t_marker.pose.orientation.w = 1.0
t_marker.scale.x = 1.2
t_marker.scale.y = 1.2
t_marker.scale.z = 1.2
t_marker.color.a = 1.0
t_marker.color.r = 0
t_marker.color.g = 1.0
t_marker.color.b = 0
t_marker.lifetime = rospy.Duration(0.5)
t_marker.text = 'id:{0},type:{1},v:{2:.2f}m/s,No:{3}'.format(msg.obj_id, msg.name, math.sqrt(msg.v_x*msg.v_x+msg.v_y*msg.v_y)*10, msg.license_plate_number)
marker_arr.markers.append(t_marker)
i += 1
log_file.write('%u,%u,%f,%f,%f,%f,%f,%f,%f,%f,%d,%f,%f,%f,%f,%f,%f,%.8f,%.8f,%.8f,%.8f,%s,%s,%s\n' % (
msg.frame, msg.type, msg.score, msg.h, msg.w, msg.l, msg.x, msg.y, msg.z, msg.rot_y,
msg.obj_id, msg.v_x, msg.v_y, msg.v_z,msg.loc_x, msg.loc_y, msg.loc_z, out_BL[0], out_BL[1], out_center_BL[0], out_center_BL[1], msg.name, msg.license_plate_number, msg.color_name))
if save_ori_info:
ori_log_file.write('%u,%u,%f,%f,%f,%f,%f,%f,%f,%f,%d,%f,%f,%f,%f,%f,%f,%.8f,%.8f,%.8f,%.8f\n' % (
msg.frame, msg.type, msg.score, ori_h, ori_w, ori_l, ori_x, ori_y, ori_z, ori_rot_y,
msg.obj_id, msg.v_x, msg.v_y, msg.v_z,ori_loc_x, ori_loc_y, ori_loc_z, ori_out_BL[0], ori_out_BL[1], ori_out_center_BL[0], ori_out_center_BL[1]))
log_file.close()
if save_ori_info:
ori_log_file.close()
bbox_pub = rospy.Publisher("tracking_bbox", BoundingBoxArray, queue_size=10)
bbox_pub.publish(bbox_arr)
marker_bbox_pub = rospy.Publisher("marker_tracking_bbox", MarkerArray, queue_size=10)
marker_bbox_pub.publish(marker_bbox_arr)
marker_pub = rospy.Publisher("tracking_loc", MarkerArray, queue_size=10)
marker_pub.publish(marker_arr)
def listener():
rospy.init_node('lidar_tracking_listener', anonymous=True)
rospy.Subscriber("/detection/lidar_detector/objects", det_tracking_array, callback)
rospy.spin()
if __name__ == '__main__':
global mot_tracker, log_file_path, save_ori_info
save_ori_info = True
mot_tracker = AB3DMOT(max_age=3, min_hits=3)
cur_time = time.strftime('%Y%m%d_%H:%M:%S', time.localtime(time.time()))
log_file_path = 'src/tracking/logs/' + cur_time + '.txt'
if not os.path.exists('src/tracking/logs/'):
os.mkdir('src/tracking/logs/')
with open(log_file_path, 'w') as f:
f.write("frame,type,score,h,w,l,x,y,z,rot_y,obj_id,v_x,v_y,v_z,loc_x,loc_y,loc_z,Lat,Long,center_Lat,center_Long,name,license_plate_number,color_name\n")
if save_ori_info:
ori_log_file_path = 'src/tracking/logs/' + cur_time + '_ori.txt'
with open(ori_log_file_path, 'w') as f:
f.write("frame,type,score,ori_h,ori_w,ori_l,ori_x,ori_y,ori_z,ori_rot_y,obj_id,v_x,v_y,v_z,ori_loc_x,ori_loc_y,ori_loc_z,ori_Lat,ori_Long,ori_center_Lat,ori_center_Long\n")
listener()
#!/usr/bin/env python
# encoding: utf-8
import os
import rospy
import time
import math
import numpy as np
from tracking.msg import det_tracking, det_tracking_array
from sensor_msgs.msg import PointCloud2
from jsk_recognition_msgs.msg import BoundingBoxArray, BoundingBox
from visualization_msgs.msg import MarkerArray, Marker
from AB3DMOT_libs.model import AB3DMOT
from pyquaternion import Quaternion
from utils.demo_ww import get_loc
def color_name2RGB(color_name):
if color_name == 'red':
return 1.0,0,0
elif color_name == 'orange':
return 1.0,165.0/255,0
elif color_name == 'yellow':
return 1.0,1.0,0
elif color_name == 'green':
return 0,1.0,0
elif color_name == 'cyan':
return 0,1.0,1.0
elif color_name == 'blue':
return 0,0,1.0
elif color_name == 'violet':
return 238.0/255,130.0/255,238.0/255
elif color_name == 'black':
return 0,0,0
elif color_name == 'grey':
return 190.0/255,190.0/255,190.0/255
elif color_name == 'white':
return 1.0,1.0,1.0
else:
return 1.0,1.0,1.0
def rotz(t):
"""Rotation about the z-axis."""
c = np.cos(t)
s = np.sin(t)
return np.array([[c, -s, 0],
[s, c, 0],
[0, 0, 1]])
def loc2backwheel(msg, deal_type=0, bw_distance=3.6):
hypotenuse = np.sqrt(msg.v_x ** 2 + msg.v_y ** 2)
if msg.type != deal_type or hypotenuse <= 0:
return msg.loc_x, msg.loc_y, msg.loc_z
a = -msg.v_x*bw_distance / hypotenuse
b = -msg.v_y*bw_distance / hypotenuse
return msg.loc_x + a, msg.loc_y + b, msg.loc_z
def callback(msgs):
cloud_pub = rospy.Publisher("tracking_cloud", PointCloud2, queue_size=10)
cloud_pub.publish(msgs.cloud)
if len(msgs.array) == 0:
bbox_arr = BoundingBoxArray()
bbox_arr.header.frame_id = "Pandar64"
bbox_pub = rospy.Publisher("tracking_bbox", BoundingBoxArray, queue_size=10)
bbox_pub.publish(bbox_arr)
marker_bbox_arr = MarkerArray()
marker_bbox_pub = rospy.Publisher("marker_tracking_bbox", MarkerArray, queue_size=10)
marker_bbox_pub.publish(marker_bbox_arr)
marker_arr = MarkerArray()
marker_pub = rospy.Publisher("tracking_loc", MarkerArray, queue_size=10)
marker_pub.publish(marker_arr)
return
bbox_arr = BoundingBoxArray()
bbox_arr.header.frame_id = "Pandar64"
marker_bbox_arr = MarkerArray()
bbox_i = 1
marker_arr = MarkerArray()
i = 1
for msg in msgs.array:
bbox = BoundingBox()
bbox.label = msg.obj_id
bbox.value = 2
bbox.header.frame_id = "Pandar64"
bbox.dimensions.x = msg.l
bbox.dimensions.y = msg.w
bbox.dimensions.z = msg.h
bbox.pose.position.x = msg.x
bbox.pose.position.y = msg.y
bbox.pose.position.z = msg.z
R = rotz(msg.rot_y)
R_quat= list(Quaternion(matrix=R))
bbox.pose.orientation.x = R_quat[1]
bbox.pose.orientation.y = R_quat[2]
bbox.pose.orientation.z = R_quat[3]
bbox.pose.orientation.w = R_quat[0]
bbox_arr.boxes.append(bbox)
marker_bbox = Marker()
marker_bbox.id = bbox_i
marker_bbox.header.frame_id = "Pandar64"
marker_bbox.action = Marker.ADD
marker_bbox.type = Marker.CUBE
marker_bbox.pose.position.x = msg.x
marker_bbox.pose.position.y = msg.y
marker_bbox.pose.position.z = msg.z
R = rotz(msg.rot_y)
R_quat= list(Quaternion(matrix=R))
marker_bbox.pose.orientation.x = R_quat[1]
marker_bbox.pose.orientation.y = R_quat[2]
marker_bbox.pose.orientation.z = R_quat[3]
marker_bbox.pose.orientation.w = R_quat[0]
marker_bbox.scale.x = msg.l
marker_bbox.scale.y = msg.w
marker_bbox.scale.z = msg.h
RGB = color_name2RGB(msg.color_name)
marker_bbox.color.a = 0.8
marker_bbox.color.r = RGB[0]
marker_bbox.color.g = RGB[1]
marker_bbox.color.b = RGB[2]
marker_bbox.lifetime = rospy.Duration(0.5)
marker_bbox_arr.markers.append(marker_bbox)
bbox_i += 1
if msg.type != 4:
marker = Marker()
marker.id = i
marker.header.frame_id = "Pandar64"
marker.action = Marker.ADD
marker.type = Marker.SPHERE
marker.pose.position.x = msg.loc_x
marker.pose.position.y = msg.loc_y
marker.pose.position.z = msg.z + msg.h / 2
marker.pose.orientation.x = 0.0
marker.pose.orientation.y = 0.0
marker.pose.orientation.z = 0.0
marker.pose.orientation.w = 1.0
marker.scale.x = 0.5
marker.scale.y = 0.5
marker.scale.z = 0.5
marker.color.a = 1.0
marker.color.r = 1.0
marker.color.g = 1.0
marker.color.b = 1.0
marker.lifetime = rospy.Duration(0.5)
marker_arr.markers.append(marker)
i += 1
t_marker = Marker()
t_marker.id = i
t_marker.header.frame_id = "Pandar64"
t_marker.action = Marker.ADD
t_marker.type = Marker.TEXT_VIEW_FACING
t_marker.pose.position.x = msg.x
t_marker.pose.position.y = msg.y
t_marker.pose.position.z = msg.z + msg.h / 2 + 0.3
t_marker.pose.orientation.x = 0.0
t_marker.pose.orientation.y = 0.0
t_marker.pose.orientation.z = 0.0
t_marker.pose.orientation.w = 1.0
t_marker.scale.x = 1.2
t_marker.scale.y = 1.2
t_marker.scale.z = 1.2
t_marker.color.a = 1.0
t_marker.color.r = 0
t_marker.color.g = 1.0
t_marker.color.b = 0
t_marker.lifetime = rospy.Duration(0.5)
#t_marker.text = 'id:{0},(x,y):({1:.2f},{2:.2f}),v:{3:.2f}m/s\ntype:{4},No:{6},timestamp:{7}'.format(msg.obj_id, msg.x, msg.y, math.sqrt(msg.v_x*msg.v_x+msg.v_y*msg.v_y)*10, msg.name, msg.color_name, msg.license_plate_number,msg.frame)
t_marker.text = 'rot_y: {0}'.format(msg.rot_y)
#t_marker.text = 'vx: {0}, vy: {1}'.format(msg.v_x, mag.v_y)
marker_arr.markers.append(t_marker)
i += 1
bbox_pub = rospy.Publisher("tracking_bbox", BoundingBoxArray, queue_size=10)
bbox_pub.publish(bbox_arr)
marker_bbox_pub = rospy.Publisher("marker_tracking_bbox", MarkerArray, queue_size=10)
marker_bbox_pub.publish(marker_bbox_arr)
marker_pub = rospy.Publisher("tracking_loc", MarkerArray, queue_size=10)
marker_pub.publish(marker_arr)
def listener():
rospy.init_node('lidar_tracking_listener', anonymous=True)
rospy.Subscriber("/detection/lidar_detector/objects", det_tracking_array, callback)
rospy.spin()
if __name__ == '__main__':
global mot_tracker
mot_tracker = AB3DMOT(max_age=5, min_hits=1)
cur_time = time.strftime('%Y%m%d_%H:%M:%S', time.localtime(time.time()))
listener()
#!/usr/bin/env python
# license removed for brevity
import os
import rospy
from sensor_msgs.msg import PointCloud2, PointField
import numpy as np
from tracking.msg import det_tracking, det_tracking_array
def talker(cloud_folder, seq_file):
pub = rospy.Publisher('/detection/lidar_detector/objects', det_tracking_array, queue_size=10)
rospy.init_node('lidar_detector_talker', anonymous=True)
rate = rospy.Rate(10)
seq_dets = np.loadtxt(seq_file, delimiter=',', dtype=np.str_)
if len(seq_dets.shape) == 1:
seq_dets = np.expand_dims(seq_dets, axis=0)
if seq_dets.shape[1] == 0:
return
min_frame, max_frame = int(seq_dets[:, 0].astype(np.int32).min()), int(seq_dets[:, 0].astype(np.int32).max())
frame = 2222 # min_frame
cloud_file_list = os.listdir(cloud_folder)
cloud_file_list.sort(key=lambda x: int(x.split('_')[0]))
while frame <= max_frame and not rospy.is_shutdown():
# cloud_file_name = '{}.bin'.format(int(frame))
cloud_file_name = cloud_file_list[int(frame)]
# points = np.genfromtxt(os.path.join(cloud_folder, cloud_file_name), skip_header=11, delimiter=' ', dtype=np.float32).reshape(-1, 3)
points = np.fromfile(os.path.join(cloud_folder, cloud_file_name), dtype=np.float32, count=-1).reshape([-1, 4])
points = points[:, :3]
cloud = PointCloud2()
cloud.header.stamp = rospy.Time().now()
cloud.header.frame_id = "Pandar64"
if len(points.shape) == 3:
cloud.height = points.shape[1]
cloud.width = points.shape[0]
else:
cloud.height = 1
cloud.width = len(points)
cloud.fields = [
PointField('x', 0, PointField.FLOAT32, 1),
PointField('y', 4, PointField.FLOAT32, 1),
PointField('z', 8, PointField.FLOAT32, 1)]
cloud.is_bigendian = False
cloud.point_step = 12
cloud.row_step = cloud.point_step * points.shape[0]
# msg.is_dense = int(np.isfinite(points).all())
cloud.is_dense = False
cloud.data = np.asarray(points, np.float32).tostring()
cur_frame_seq_dets = seq_dets[seq_dets[:, 0].astype(np.uint64) == frame]
msgs = det_tracking_array()
msgs.cloud = cloud
for seq_det in cur_frame_seq_dets:
msg = det_tracking()
msg.frame = int(int(seq_det[17])/1000000.0)
msg.type = int(seq_det[1])
msg.score = float(seq_det[6])
msg.h = float(seq_det[7])
msg.w = float(seq_det[8])
msg.l = float(seq_det[9])
msg.x = float(seq_det[10])
msg.y = float(seq_det[11])
msg.z = float(seq_det[12])
msg.rot_y = float(seq_det[13])
msg.loc_x = float(seq_det[14])
msg.loc_y = float(seq_det[15])
msg.loc_z = float(seq_det[16])
msg.name = str(seq_det[20])
msg.license_plate_number = str(seq_det[22])
msg.color_name = str(seq_det[21])
msgs.array.append(msg)
msgs_info = "cloud: {}, msgs: {}".format(cloud_file_name, msgs.array)
rospy.loginfo(msgs_info)
pub.publish(msgs)
rate.sleep()
frame += 1
if __name__ == '__main__':
seq_file = '/home/shishuai/Develop/Projects/catkin_ws/src/tracking/scripts/example/hs/2021-03-02-16-23-24_with_vision_results.log'
cloud_folder = '/home/shishuai/Develop/Projects/catkin_ws/src/tracking/scripts/example/hs/2021-03-02-16-23-24'
try:
talker(cloud_folder, seq_file)
except rospy.ROSInterruptException:
pass
#!/usr/bin/env python
# license removed for brevity
import os
import time
import rospy
from sensor_msgs.msg import PointCloud2, PointField
import numpy as np
from tracking.msg import det_tracking, det_tracking_array
def talker(seq_file):
pub = rospy.Publisher('/detection/lidar_detector/objects', det_tracking_array, queue_size=10)
rospy.init_node('lidar_detector_talker', anonymous=True)
rate = rospy.Rate(10)
seq_dets = np.loadtxt(seq_file, delimiter=',', skiprows=1, dtype=np.str_)
if len(seq_dets.shape) == 1:
seq_dets = np.expand_dims(seq_dets, axis=0)
if seq_dets.shape[1] == 0:
return
frame_id = 0
frames = np.unique(seq_dets[:, 0].astype(np.uint64))
while frame_id < len(frames) and not rospy.is_shutdown():
frame = frames[frame_id]
# if frame < 1618475108934:
# frame_id += 1
# continue
cur_frame_seq_dets = seq_dets[seq_dets[:, 0].astype(np.uint64) == frame]
msgs = det_tracking_array()
for seq_det in cur_frame_seq_dets:
msg = det_tracking()
msg.frame = int(seq_det[0])
msg.type = int(seq_det[1])
msg.score = float(seq_det[2])
msg.h = float(seq_det[3])
msg.w = float(seq_det[4])
msg.l = float(seq_det[5])
msg.x = float(seq_det[6])
msg.y = float(seq_det[7])
msg.z = float(seq_det[8])
msg.rot_y = float(seq_det[9])
msg.obj_id = int(seq_det[10])
msg.v_x = float(seq_det[11])
msg.v_y = float(seq_det[12])
msg.v_z = float(seq_det[13])
msg.loc_x = float(seq_det[14])
msg.loc_y = float(seq_det[15])
msg.loc_z = float(seq_det[16])
msg.name = str(seq_det[21])
msg.license_plate_number = str(seq_det[22])
msg.color_name = str(seq_det[23])
msgs.array.append(msg)
msgs_info = "msgs: {}".format(msgs.array)
rospy.loginfo(msgs_info)
pub.publish(msgs)
rate.sleep()
frame_id += 1
if __name__ == '__main__':
seq_file = '/home/shishuai/Develop/Projects/catkin_ws/src/tracking/scripts/example/hs/20210427_19_06_56.txt'
try:
talker(seq_file)
except rospy.ROSInterruptException:
pass
#!/usr/bin/env python
# license removed for brevity
import os
import math
import time
import rospy
from sensor_msgs.msg import PointCloud2, PointField
import numpy as np
from tracking.msg import det_tracking, det_tracking_array
import open3d as o3d
def my_atan(x, y):
if (x > 0.1 and y > 0.1) or (x < -0.1 and y > 0.1):
return -math.atan(x/y)
elif x > 0.1 and y < -0.1:
return -math.pi - math.atan(x/y)
elif x < -0.1 and y < -0.1:
return math.pi - math.atan(x/y)
else:
return 0
def show(theta):
if 0 <= theta <= math.pi/2.0:
return theta + math.pi/2.0
elif math.pi/2.0 < theta <= math.pi:
return theta - math.pi*3/2.0
elif -math.pi <= theta <= 0:
return theta + math.pi/2.0
else:
return theta
def talker(seq_file):
pub = rospy.Publisher('/detection/lidar_detector/objects', det_tracking_array, queue_size=10)
rospy.init_node('lidar_detector_talker', anonymous=True)
rate = rospy.Rate(10)
seq_dets = np.loadtxt(seq_file, delimiter=',', skiprows=1, dtype=np.str_)
if len(seq_dets.shape) == 1:
seq_dets = np.expand_dims(seq_dets, axis=0)
if seq_dets.shape[1] == 0:
return
frame_id = 0
frames = np.unique(seq_dets[:, 0].astype(np.uint64))
cloud_file_name = '/home/shishuai/Develop/Projects/catkin_ws/src/tracking_logs/GT/2012_329523133_ground_5cm_segment.pcd'
pcd = o3d.io.read_point_cloud(cloud_file_name)
points = np.asarray(pcd.points)
print(points.shape)
# points = np.genfromtxt(cloud_file_name, skip_header=11, delimiter=' ', dtype=np.float32).reshape(-1, 6)
# points = np.fromfile(cloud_file_name, dtype=np.float32, count=-1).reshape([-1, 6])
# points = points[:, :3]
cloud = PointCloud2()
cloud.header.stamp = rospy.Time().now()
cloud.header.frame_id = "Pandar64"
if len(points.shape) == 3:
cloud.height = points.shape[1]
cloud.width = points.shape[0]
else:
cloud.height = 1
cloud.width = len(points)
cloud.fields = [
PointField('x', 0, PointField.FLOAT32, 1),
PointField('y', 4, PointField.FLOAT32, 1),
PointField('z', 8, PointField.FLOAT32, 1)]
cloud.is_bigendian = False
cloud.point_step = 12
cloud.row_step = cloud.point_step * points.shape[0]
# msg.is_dense = int(np.isfinite(points).all())
cloud.is_dense = False
cloud.data = np.asarray(points, np.float32).tostring()
last_dict = {}
while frame_id < len(frames) and not rospy.is_shutdown():
frame = frames[frame_id]
cur_frame_seq_dets = seq_dets[seq_dets[:, 0].astype(np.uint64) == frame]
msgs = det_tracking_array()
msgs.cloud = cloud
for seq_det in cur_frame_seq_dets:
center_rot_y = 0
if int(seq_det[10]) in last_dict.keys():
l_x, l_y = last_dict[int(seq_det[10])]
dx = float(seq_det[6]) - l_x
dy = float(seq_det[7]) - l_y
if abs(dx) > 0.01 or abs(dy) > 0.01:
center_rot_y = my_atan(dx, dy)
last_dict[int(seq_det[10])] = [float(seq_det[6]), float(seq_det[7])]
msg = det_tracking()
msg.frame = int(seq_det[0])
msg.type = int(seq_det[1])
msg.score = float(seq_det[2])
msg.h = float(seq_det[3])
msg.w = float(seq_det[4])
msg.l = float(seq_det[5])
msg.x = float(seq_det[6])
msg.y = float(seq_det[7])
msg.z = float(seq_det[8])
if float(seq_det[9]) != 10000:
msg.rot_y = float(seq_det[9])
else:
msg.rot_y = my_atan(float(seq_det[11]), float(seq_det[12]))
msg.obj_id = int(seq_det[10])
msg.v_x = float(seq_det[11])
msg.v_y = float(seq_det[12])
msg.v_z = float(seq_det[13])
msg.loc_x = float(seq_det[14])
msg.loc_y = float(seq_det[15])
msg.loc_z = float(seq_det[16])
msgs.array.append(msg)
msgs_info = "msgs: {}".format(msgs.array)
rospy.loginfo(msgs_info)
pub.publish(msgs)
rate.sleep()
frame_id += 1
if __name__ == '__main__':
seq_file = '/home/shishuai/Develop/Projects/catkin_ws/src/tracking_logs/GT/20210427_15:16:04.txt'
try:
talker(seq_file)
except rospy.ROSInterruptException:
pass
import numpy as np
from gaussian import Convert_v2, Inverse_v2
def my_compute_box_3d(center, heading, size):
# P' = T * R * S * p
x_corners = [-1,1,1,-1,-1,1,1,-1]
y_corners = [1,1,-1,-1,1,1,-1,-1]
z_corners = [1,1,1,1,-1,-1,-1,-1]
tmp = np.vstack([x_corners, y_corners, z_corners]) / 2.0
corners_3d = np.ones([4,8])
corners_3d[:3,:] = tmp
S = np.diag([size[0],size[1],size[2],1])
rot = rotz(heading)
Trans = np.zeros([4,4])
Trans[:3,:3] = rot
Trans[0,3] = center[0]
Trans[1,3] = center[1]
Trans[2,3] = center[2]
#Trans[2,3] = center[2] - size[2]/2
tmp4x4 = np.dot(S,corners_3d)
world_corners_3d = np.dot(Trans,tmp4x4)
return np.transpose(world_corners_3d[:3,:])
def rotz(t):
"""Rotation about the z-axis."""
c = np.cos(t)
s = np.sin(t)
return np.array([[c, -s, 0],
[s, c, 0],
[0, 0, 1]])
def get_T():
h = 30.0
rot = [[-0.990636110306, 0.133679375052, -0.027746986598],
[-0.135864824057, -0.985262572765, 0.103915005922],
[-0.013446771540, 0.106711812317, 0.994199097157]]
rot = np.array(rot)
BL = [31.19450834,121.0856356]
XY = Convert_v2(BL)
# print(XY)
Trans = np.zeros([4,4])
Trans[:3,:3] = rot
Trans[0,3] = XY[0]
Trans[1,3] = XY[1]
Trans[2,3] = h
Trans[3,3] = 1
return Trans
def get_loc_GT(lidar_loc):
Trans = get_T()
tmp_loc = np.ones([4])
tmp_loc[:3] = lidar_loc[:3]
world_loc = np.dot(Trans,tmp_loc)
out_BL = Inverse_v2(world_loc[:2])
return out_BL
def get_loc(boxes_3d, car_type):
corners_3d = my_compute_box_3d(boxes_3d[:3],boxes_3d[6],boxes_3d[3:6])
#adjust bbox
#new_corners_3d = adjust_bbox(boxes_3d,corners_3d)
head_pnt = (corners_3d[1] + corners_3d[2]) / 2.0
tail_pnt = (corners_3d[0] + corners_3d[3]) / 2.0
inter_len = 1.5
total_len = 6.0
if car_type == 3:
inter_len = 0.5
total_len = 12.42
lidar_loc = (tail_pnt - head_pnt) * inter_len / (1.0*total_len) + head_pnt
#print(head_pnt, lidar_loc, tail_pnt)
Trans = get_T()
tmp_loc = np.ones([4])
tmp_loc[:3] = lidar_loc[:3]
world_loc = np.dot(Trans,tmp_loc)
#print(world_loc)
out_BL = Inverse_v2(world_loc[:2])
#center point transfer
Trans = get_T()
tmp_center_loc = np.ones([4])
tmp_center_loc[:3] = boxes_3d[:3]
world_center_loc = np.dot(Trans,tmp_center_loc)
out_center_BL = Inverse_v2(world_center_loc[:2])
return lidar_loc, out_BL, out_center_BL
return lidar_loc,out_BL
import math
import numpy as np
import wgs84
from rotations import degree2rad, rad2degree
ZoneOffset = 1e6
ZoneWidth = 3
EastOffset = 500000.0
def Convert_v2(inLonLat): # BL --> XY
Zone = (int)((inLonLat[1] + 1.5) / ZoneWidth)
L0 = Zone * ZoneWidth
dL = inLonLat[1] - L0
dLRadian = degree2rad(dL)
latRadian = degree2rad(inLonLat[0])
X = wgs84.GetX(latRadian)
N = wgs84.GetN(latRadian)
t = math.tan(latRadian)
t2 = t * t
sinB = math.sin(latRadian)
cosB = math.cos(latRadian)
ita2 = wgs84.ep2 * cosB * cosB
temp = dLRadian * cosB
temp2 = temp * temp
yPart1 = 0.5 * N * temp * temp * t
yPart2 = N * t * (5 - t2 + 9 * ita2 + 4 * ita2 * ita2) * temp2 * temp2 / 24
yPart3 = temp2 * temp2 * temp2 * N * t * (61 - 58 * t2 + t2 * t2) / 720
y = X + yPart1 + yPart2 + yPart3
xPart1 = N * temp
xPart2 = (1 - t2 + ita2) * N * temp * temp2 / 6
xPart3 = (5 - 18 * t2 + t2 * t2 + 14 * ita2 - 58 * ita2 * t2) * N * temp * temp2 * temp2 / 120
x = Zone * ZoneOffset + EastOffset + xPart1 + xPart2 + xPart3
projPt = np.array([x, y])
return projPt
def Inverse_v2(inXY): # XY --> BL
Zone = (int)(inXY[0] / ZoneOffset)
L0 = Zone * ZoneWidth
L0_radian = degree2rad(L0)
X0 = Zone * ZoneOffset + EastOffset
Y0 = 0
x = inXY[0] - X0
y = inXY[1] - Y0
Br = wgs84.GetLatByX(y)#
cosB = math.cos(Br)
secB = 1 / cosB
ita2 = wgs84.ep2 * cosB * cosB
t = math.tan(Br)
t2 = t * t
V = wgs84.GetV(Br)
V2 = V * V
N = wgs84.c / V
M = N/V/V
D = x/N
tmp3 = (1 + 2 * t2 + ita2) * D * D * D / 6
tmp5 = (5 + 28 * t2 + 24 * t2 * t2 + 6 * ita2 + 8 * ita2 * t2) * D * D * D * D * D / 120
l = secB * (D - tmp3 + tmp5)
L_radian = L0_radian + l
tmp2 = D * D / 2
tmp4 = (5 + 3 * t2 + ita2 - 9 * ita2 * t2) * D * D * D * D / 24
tmp6 = (61 + 90 * t2 + 45 * t2 * t2) * D * D * D * D * D * D / 720
B_radian = Br - t * V2 * (tmp2 - tmp4 + tmp6)
B = rad2degree(B_radian)
L = rad2degree(L_radian)
outLonLat = np.array([B, L])
return outLonLat
# Utitlity file with functions for handling rotations
#
# Author: Trevor Ablett
# University of Toronto Institute for Aerospace Studies
import numpy as np
from numpy import dot, inner, array, linalg
def skew_symmetric(v):
return np.array(
[[0, -v[2], v[1]],
[v[2], 0, -v[0]],
[-v[1], v[0], 0]]
)
def wraps2pi_pi(rad):
while rad > np.pi:
rad -= 2 * np.pi
while rad < - np.pi:
rad += 2 * np.pi
return rad
class Quaternion():
def __init__(self, w=1., x=0., y=0., z=0., axis_angle=None, euler=None):
"""
Allow initialization either with explicit wxyz, axis angle, or euler XYZ (RPY) angles.
:param w: w (real) of a quaternion.
:param x: x (i) of a quaternion.
:param y: y (j) of a quaternion.
:param z: z (k) of a quaternion.
:param axis_angle: Set of three values from axis-angle representation, as list or [3,] or [3,1] np.ndarray.
See C2M5L2 Slide 7 for details.
:param euler: Set of three angles from euler XYZ rotating frame angles.
"""
if axis_angle is None and euler is None:
self.w = w
self.x = x
self.y = y
self.z = z
elif euler is not None and axis_angle is not None:
raise AttributeError("Only one of axis_angle and euler can be specified.")
elif axis_angle is not None:
if not (type(axis_angle) == list or type(axis_angle) == np.ndarray) or len(axis_angle) != 3:
raise ValueError("axis_angle must be list or np.ndarray with length 3.")
axis_angle = np.array(axis_angle)
norm = np.linalg.norm(axis_angle)
self.w = np.cos(norm / 2)
if norm < 1e-50: # to avoid instabilities and nans
self.x = 0
self.y = 0
self.z = 0
else:
imag = axis_angle / norm * np.sin(norm / 2)
self.x = imag[0].item()
self.y = imag[1].item()
self.z = imag[2].item()
else:
roll = euler[0]
pitch = euler[1]
yaw = euler[2]
cy = np.cos(yaw * 0.5)
sy = np.sin(yaw * 0.5)
cr = np.cos(roll * 0.5)
sr = np.sin(roll * 0.5)
cp = np.cos(pitch * 0.5)
sp = np.sin(pitch * 0.5)
# static frame
self.w = cr * cp * cy + sr * sp * sy
self.x = sr * cp * cy - cr * sp * sy
self.y = cr * sp * cy + sr * cp * sy
self.z = cr * cp * sy - sr * sp * cy
# rotating frame
# self.w = cr * cp * cy - sr * sp * sy
# self.x = cr * sp * sy + sr * cp * cy
# self.y = cr * sp * cy - sr * cp * sy
# self.z = cr * cp * sy + sr * sp * cy
def __repr__(self):
return "Quaternion (wxyz): [%2.5f, %2.5f, %2.5f, %2.5f]" % (self.w, self.x, self.y, self.z)
def to_mat(self):
v = np.array([self.x, self.y, self.z]).reshape(3,1)
return (self.w ** 2 - np.dot(v.T, v)) * np.eye(3) + \
2 * np.dot(v, v.T) + 2 * self.w * skew_symmetric(v)
def to_euler(self):
""" Return as xyz (roll pitch yaw) Euler angles, with a static frame """
roll = np.arctan2(2 * (self.w * self.x + self.y * self.z), 1 - 2 * (self.x**2 + self.y**2))
pitch = np.arcsin(2 * (self.w * self.y - self.z * self.x))
yaw = np.arctan2(2 * (self.w * self.z + self.x * self.y), 1 - 2 * (self.y**2 + self.z**2))
return np.array([roll, pitch, yaw])
def to_numpy(self):
""" Return numpy wxyz representation. """
return np.array([self.w, self.x, self.y, self.z])
def normalize(self):
""" Return a normalized version of this quaternion to ensure that it is valid. """
norm = np.linalg.norm([self.w, self.x, self.y, self.z])
return Quaternion(self.w / norm, self.x / norm, self.y / norm, self.z / norm)
def to_reverse(self):
return Quaternion(self.w, -self.x, -self.y, -self.z)
def quat_outer_mult(self, q, out='np'):
"""
Give output of multiplying this quaternion by another quaternion.
See equation from C2M5L2 Slide 7 for details.
:param q: The quaternion to multiply by, either a Quaternion or 4x1 ndarray.
:param out: Output type, either np or Quaternion.
:return: Returns quaternion of desired type
"""
v = np.array([self.x, self.y, self.z]).reshape(3, 1)
sum_term = np.zeros([4,4])
sum_term[0,1:] = -v[:,0]
sum_term[1:, 0] = v[:,0]
sum_term[1:, 1:] = -skew_symmetric(v)
sigma = self.w * np.eye(4) + sum_term
if type(q).__name__ == "Quaternion":
quat_np = np.dot(sigma, q.to_numpy())
else:
quat_np = np.dot(sigma, q)
if out == 'np':
return quat_np
elif out == 'Quaternion':
quat_obj = Quaternion(quat_np[0], quat_np[1], quat_np[2], quat_np[3])
return quat_obj
def euler2quat(eulers):
eulers = array(eulers)
if len(eulers.shape) > 1:
output_shape = (-1,4)
else:
output_shape = (4,)
eulers = np.atleast_2d(eulers)
gamma, theta, psi = eulers[:,0], eulers[:,1], eulers[:,2]
q0 = np.cos(gamma / 2) * np.cos(theta / 2) * np.cos(psi / 2) + \
np.sin(gamma / 2) * np.sin(theta / 2) * np.sin(psi / 2)
q1 = np.sin(gamma / 2) * np.cos(theta / 2) * np.cos(psi / 2) - \
np.cos(gamma / 2) * np.sin(theta / 2) * np.sin(psi / 2)
q2 = np.cos(gamma / 2) * np.sin(theta / 2) * np.cos(psi / 2) + \
np.sin(gamma / 2) * np.cos(theta / 2) * np.sin(psi / 2)
q3 = np.cos(gamma / 2) * np.cos(theta / 2) * np.sin(psi / 2) - \
np.sin(gamma / 2) * np.sin(theta / 2) * np.cos(psi / 2)
quats = array([q0, q1, q2, q3]).T
for i in xrange(len(quats)):
if quats[i,0] < 0:
quats[i] = -quats[i]
return quats.reshape(output_shape)
def quat2euler(quats):
quats = array(quats)
if len(quats.shape) > 1:
output_shape = (-1,3)
else:
output_shape = (3,)
quats = np.atleast_2d(quats)
q0, q1, q2, q3 = quats[:,0], quats[:,1], quats[:,2], quats[:,3]
gamma = np.arctan2(2 * (q0 * q1 + q2 * q3), 1 - 2 * (q1**2 + q2**2))
theta = np.arcsin(2 * (q0 * q2 - q3 * q1))
psi = np.arctan2(2 * (q0 * q3 + q1 * q2), 1 - 2 * (q2**2 + q3**2))
eulers = array([wraps2pi_pi(gamma), wraps2pi_pi(theta), wraps2pi_pi(psi)]).T
return eulers.reshape(output_shape)
def quat2rot(quats):
quats = array(quats)
input_shape = quats.shape
quats = np.atleast_2d(quats)
Rs = np.zeros((quats.shape[0], 3, 3))
q0 = quats[:, 0]
q1 = quats[:, 1]
q2 = quats[:, 2]
q3 = quats[:, 3]
Rs[:, 0, 0] = q0 * q0 + q1 * q1 - q2 * q2 - q3 * q3
Rs[:, 0, 1] = 2 * (q1 * q2 - q0 * q3)
Rs[:, 0, 2] = 2 * (q0 * q2 + q1 * q3)
Rs[:, 1, 0] = 2 * (q1 * q2 + q0 * q3)
Rs[:, 1, 1] = q0 * q0 - q1 * q1 + q2 * q2 - q3 * q3
Rs[:, 1, 2] = 2 * (q2 * q3 - q0 * q1)
Rs[:, 2, 0] = 2 * (q1 * q3 - q0 * q2)
Rs[:, 2, 1] = 2 * (q0 * q1 + q2 * q3)
Rs[:, 2, 2] = q0 * q0 - q1 * q1 - q2 * q2 + q3 * q3
if len(input_shape) < 2:
return Rs[0]
else:
return Rs
def rot2quat(rots):
input_shape = rots.shape
if len(input_shape) < 3:
rots = array([rots])
K3 = np.empty((len(rots), 4, 4))
K3[:, 0, 0] = (rots[:, 0, 0] - rots[:, 1, 1] - rots[:, 2, 2]) / 3.0
K3[:, 0, 1] = (rots[:, 1, 0] + rots[:, 0, 1]) / 3.0
K3[:, 0, 2] = (rots[:, 2, 0] + rots[:, 0, 2]) / 3.0
K3[:, 0, 3] = (rots[:, 1, 2] - rots[:, 2, 1]) / 3.0
K3[:, 1, 0] = K3[:, 0, 1]
K3[:, 1, 1] = (rots[:, 1, 1] - rots[:, 0, 0] - rots[:, 2, 2]) / 3.0
K3[:, 1, 2] = (rots[:, 2, 1] + rots[:, 1, 2]) / 3.0
K3[:, 1, 3] = (rots[:, 2, 0] - rots[:, 0, 2]) / 3.0
K3[:, 2, 0] = K3[:, 0, 2]
K3[:, 2, 1] = K3[:, 1, 2]
K3[:, 2, 2] = (rots[:, 2, 2] - rots[:, 0, 0] - rots[:, 1, 1]) / 3.0
K3[:, 2, 3] = (rots[:, 0, 1] - rots[:, 1, 0]) / 3.0
K3[:, 3, 0] = K3[:, 0, 3]
K3[:, 3, 1] = K3[:, 1, 3]
K3[:, 3, 2] = K3[:, 2, 3]
K3[:, 3, 3] = (rots[:, 0, 0] + rots[:, 1, 1] + rots[:, 2, 2]) / 3.0
q = np.empty((len(rots), 4))
for i in xrange(len(rots)):
_, eigvecs = linalg.eigh(K3[i].T)
eigvecs = eigvecs[:,3:]
q[i, 0] = eigvecs[-1]
q[i, 1:] = -eigvecs[:-1].flatten()
if q[i, 0] < 0:
q[i] = -q[i]
if len(input_shape) < 3:
return q[0]
else:
return q
def euler2rot(eulers):
return quat2rot(euler2quat(eulers))
def rot2euler(rots):
return quat2euler(rot2quat(rots))
def gpseuler2rot_in_cptframe(gps_euler_degrees):
# R2 defines: XYZ right/forward/up -> ENU
yaw = degree2rad(gps_euler_degrees[2])
pitch = degree2rad(gps_euler_degrees[1])
roll = degree2rad(gps_euler_degrees[0])
sinA = np.sin(yaw);
cosA = np.cos(yaw);
matA = np.array([[cosA, sinA, 0], [-sinA, cosA, 0], [0, 0, 1]])
sinP = np.sin(pitch)
cosP = np.cos(pitch)
matP = np.array([[1, 0, 0], [0, cosP, -sinP], [0, sinP, cosP]])
sinR = np.sin(roll)
cosR = np.cos(roll)
matR = np.array([[cosR, 0, sinR], [0, 1, 0], [-sinR, 0, cosR]])
return matA.dot(matP.dot(matR))
def gpseuler2rot(gps_euler_degrees):
# R1 defines: XYZ forward/left/up -> XYZ right/forward/up
R1 = np.mat([[0, -1, 0],
[1, 0, 0],
[0, 0, 1]])
R2 = gpseuler2rot_in_cptframe(gps_euler_degrees)
# R2 * R1 * XYZ = ENU
return R2.dot(R1)
def rad2degree(rad=None, list=None):
if not (list is None):
newlist = np.zeros(len(list))
i = 0
for rad in list:
newlist[i] = rad * 180.0 / np.pi
i += 1
return newlist
else:
return rad * 180.0 / np.pi
def degree2rad(angle=None, list=None):
if not (list is None):
newlist = np.zeros(len(list))
i = 0
for angle in list:
newlist[i] = angle * np.pi / 180.0
i += 1
return newlist
else:
return angle * np.pi / 180.0
def quat_product(q, r):
t = np.zeros(4)
t[0] = r[0] * q[0] - r[1] * q[1] - r[2] * q[2] - r[3] * q[3]
t[1] = r[0] * q[1] + r[1] * q[0] - r[2] * q[3] + r[3] * q[2]
t[2] = r[0] * q[2] + r[1] * q[3] + r[2] * q[0] - r[3] * q[1]
t[3] = r[0] * q[3] - r[1] * q[2] + r[2] * q[1] + r[3] * q[0]
return t
import math
import numpy as np
from rotations import degree2rad, rad2degree
a = 6378137.0
f = 0.003352810664
b = 6356752.3142499477
c = 6399593.6257536924
e2 = 0.006694379988651241
ep2 = 0.0067394967407662064
m0 = 6335439.3273023246
m2 = 63617.757378010137
m4 = 532.35180239277622
m6 = 4.1577261283373916
m8 = 0.031312573415813519
a0 = 6367449.1457686741
a2 = 32077.017223574985
a4 = 67.330398573595
a6 = 0.13188597734903185
a8 = 0.00024462947981104312
#class Wgs84:
def GetN(radB):
cosB = math.cos(radB)
V = math.sqrt(1 + ep2 * cosB * cosB)
N = c / V
return N
def GetX(radB):
sinB = math.sin(radB)
cosB = math.cos(radB)
al = a0 * radB
sc = sinB * cosB
ss = sinB * sinB
X = al - sc *((a2 - a4 + a6) + (2 * a4 - a6 * 16 / 3) * ss + a6 * 16 * ss * ss /3)
return X
def GetLatByX(X):
Bfi0 = X / a0
Bfi1 = 0
num = 1
minAngle = math.pi * 1e-9
menus_minAngle = 0 - minAngle
while (num == 1):
num = 0
sinB = math.sin(Bfi0)
sinB2 = sinB * sinB
cosB = math.cos(Bfi0)
FBfi = 0 - sinB * cosB *((a2 - a4 + a6) + sinB2 * (2 * a4 - 16 * a6 / 3) + sinB2 * sinB2 * a6 * 16 / 3)
Bfi1 = (X - FBfi) / a0
deltaB = Bfi1 - Bfi0
if deltaB < menus_minAngle or deltaB > minAngle:
num = 1
Bfi0 = Bfi1
Bf = Bfi1
return Bf
def GetV(lat):
cosB = math.cos(lat)
V = math.sqrt(1 + ep2 * cosB * cosB)
return V
def BLH2ECEF(ptBLH):
cosB = math.cos(degree2rad(ptBLH[0]))
sinB = math.sin(degree2rad(ptBLH[0]))
cosL = math.cos(degree2rad(ptBLH[1]))
sinL = math.sin(degree2rad(ptBLH[1]))
N = GetN(degree2rad(ptBLH[0]))
X = (N + ptBLH[2]) * cosB * cosL
Y = (N + ptBLH[2]) * cosB * sinL
Z = (N * (1 - e2) + ptBLH[2]) * sinB
ptXYZ = np.array([X, Y, Z])
return ptXYZ
def ECEF2BLH(ptXYZ):
L_radian = math.atan(ptXYZ[1] / ptXYZ[0])
if L_radian < 0:
L_radian += math.pi
L = rad2degree(L_radian)
sqrtXY = math.sqrt(ptXYZ[0] * ptXYZ[0] + ptXYZ[1] * ptXYZ[1])
t0 = ptXYZ[2] / sqrtXY
p = c * e2 / sqrtXY
k = 1+ep2
ti = t0
ti1 = 0
loop = 0
while loop < 10000:
loop += 1
ti1 = t0 + p * ti / math.sqrt( k + ti * ti)
if math.fabs(ti1 - ti) < 1e-12:
break
ti = ti1
B_radian = math.atan(ti1)
N = GetN(B_radian)
H = sqrtXY / math.cos(B_radian) - N
B = rad2degree(B_radian)
geopoint = np.array([B, L, H])
return geopoint
def GetR_ENU2ECEF(radianL, radianB):
sinB = math.sin(radianB)
cosB = math.cos(radianB)
sinL = math.sin(radianL)
cosL = math.cos(radianL)
R4 = np.array([[-sinL, -sinB * cosL, cosB * cosL],
[cosL, -sinB * sinL, cosB * sinL],
[0, cosB, sinB]])
return R4
def GetAngleDif(distance):
angleRafian = distance / a
return rad2degree(angleRafian)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment