Commit b5488cdd authored by xiongchao's avatar xiongchao

提交新版自动化接边,吐出三份报表

parent 89cd51f0
# edge_mathch # edge_mathch
1. 高精地图数据图幅边框接边工具使用方法 1. 高精地图数据图幅边框接边工具使用方法
本工具通过命令行参数制定输入和输出参数, 参数含义如下: 本工具通过命令行参数制定输入和输出参数, 参数含义如下:
1) 数据库连接字符串,如"PG:dbname='test3' host='localhost' port='5432' user='postgres' password='19931018'" 1) 数据库连接字符串,如"PG:dbname='test3' host='localhost' port='5432' user='postgres' password='19931018'"
2) 需要接边的图幅id文件,如"C:\Users\熊\Desktop\edge_match_file\mesh_id.txt" 2) 需要接边的图幅id文件,如"C:\Users\熊\Desktop\edge_match_file\mesh_id.txt"
3) 图幅框文件,如"C:\Users\熊\Desktop\edge_match_file\mesh_grid.gpkg" 3) 图幅框文件,如"C:\Users\熊\Desktop\edge_match_file\mesh_grid.gpkg"
4) 接边结果报表保存文件夹路径,如"C:\Users\熊\Desktop\edge_match_file"
2. 示例如下: 2. 示例如下:
lane_match "PG:dbname='test3' host='localhost' port='5432' user='postgres' password='19931018'" "C:\Users\熊\Desktop\edge_match_file\mesh_id.txt" "C:\Users\熊\Desktop\edge_match_file\mesh_grid.gpkg" main "PG:dbname='test2' host='localhost' port='5432' user='postgres' password='19931018'" "C:\Users\熊\Desktop\edge_match_file\mesh_ids.txt" "C:\Users\熊\Desktop\edge_match_file\mesh_grid.gpkg" "C:\Users\熊\Desktop\edge_match_file"
3. 备注 3. 备注
3.1 生成的未接边文件名为:未接边信息.txt,其在图幅框文件的同一目录下 3.1 生成的未接边文件名为:未接边信息.csv,其在输入的报表文件夹下
3.2 在图幅框文件目录下会生成一个新的图幅框文件,其中的图幅框仅包含“图幅id文件”中的图幅 3.2 生成的已接边信息.txt和不需要接边信息.txt是给研发人员使用,作业员无需关注
\ No newline at end of file \ No newline at end of file
This diff is collapsed.
# _*_ coding: utf-8 _*_ # # _*_ coding: utf-8 _*_ #
# @Time: 2020/1/21 13:18 # @Time: 2020/5/18 15:37
# @Author: XiongChao # @Author: XiongChao
\ No newline at end of file
This diff is collapsed.
# _*_ coding: utf-8 _*_ #
# @Time: 2020/4/11 17:47
# @Author: XiongChao
import numpy as np
ep2 = 0.0067394967407662064 # 第二偏心率平方 ep2 = ep * ep = (a*a - b*b)/b*b
e2 = 0.006694379988651241 # 第一偏心率平方e2 = e * e = (a * a - b * b) / a * a
c = 6399593.6257536924 # c是为简化书写引入的符号,c = a*a/b;
def radian_to_degree(radian):
# 将弧度转化为角度
return radian * 180 / np.pi
def degree_to_radian(degree):
# 将角度转变为弧度
return degree / 180 * np.pi
def get_n(radian_b):
# 获取某维度,radian_b是纬度的弧度表达
cos_b = np.cos(radian_b)
v = np.sqrt(1 + ep2 * cos_b * cos_b)
n = c / v
return n
def blh_to_ecef(lon, lat, up):
# 经纬高转空间直角坐标系
radian_b = lat / 180 * np.pi # 纬度
radian_l = lon / 180 * np.pi
cos_b = np.cos(radian_b)
sin_b = np.sin(radian_b)
cos_l = np.cos(radian_l)
sin_l = np.sin(radian_l)
n = get_n(radian_b)
x = (n + up) * cos_b * cos_l
y = (n + up) * cos_b * sin_l
z = (n * (1 - e2) + up) * sin_b
return [x, y, z]
def ecef_to_blh(x, y, z):
lon_radian = np.arctan(y / x)
if lon_radian < 0:
lon_radian += np.pi
lon = radian_to_degree(lon_radian)
sqrt_xy = np.sqrt(x * x + y * y)
t0 = z / sqrt_xy
p = c * e2 / sqrt_xy
k = 1 + ep2
# 迭代计算纬度
ti = t0
ti1 = 0
loop = 0
while loop < 10000:
loop += 1
ti1 = t0 + p * ti / np.sqrt(k + ti * ti)
if np.abs(ti1 - ti) < 1e-12:
break
ti = ti1
b_radian = np.arctan(ti1)
n = get_n(b_radian)
h = sqrt_xy / np.cos(b_radian) - n
b = radian_to_degree(b_radian)
return [lon, b, h]
def construct_matrix_cpt_enu(roll, pitch, azimuth):
radian_azimuth = degree_to_radian(azimuth)
radian_pitch = degree_to_radian(pitch)
radian_roll = degree_to_radian(roll)
azimuth_info = [np.cos(radian_azimuth), np.sin(radian_azimuth), 0,
-np.sin(radian_azimuth), np.cos(radian_azimuth), 0,
0, 0, 1]
matrix_azimuth = np.array(azimuth_info).reshape((3, 3))
pitch_info = [1, 0, 0,
0, np.cos(radian_pitch), -np.sin(radian_pitch),
0, np.sin(radian_pitch), np.cos(radian_pitch)]
matrix_pitch = np.array(pitch_info).reshape((3, 3))
roll_info = [np.cos(radian_roll), 0, np.sin(radian_roll),
0, 1, 0,
-np.sin(radian_roll), 0, np.cos(radian_roll)]
matrix_roll = np.array(roll_info).reshape((3, 3))
return np.dot(matrix_azimuth, np.dot(matrix_pitch, matrix_roll))
def construct_matrix_enu_cpt(roll, pitch, azimuth):
# 利用正交阵的转置就是它的逆
return construct_matrix_cpt_enu(roll, pitch, azimuth).T
def construct_matrix_enu_ecef(lon, lat):
radian_lon = degree_to_radian(lon)
radian_lat = degree_to_radian(lat)
info = [-np.sin(radian_lon), -np.cos(radian_lon) * np.sin(radian_lat), np.cos(radian_lon) * np.cos(radian_lat),
np.cos(radian_lon), -np.sin(radian_lon) * np.sin(radian_lat), np.sin(radian_lon) * np.cos(radian_lat),
0, np.cos(radian_lat), np.sin(radian_lat)]
return np.array(info).reshape((3, 3))
def construct_matrix_ecef_enu(lon, lat):
return construct_matrix_enu_ecef(lon, lat).T
def construct_vector_enu_ecef(lon, lat, up):
# 计算当前cpt原点在ecef坐标系中的坐标
ecef_point = blh_to_ecef(lon, lat, up)
return np.array(ecef_point).reshape((3, 1))
def enu_to_cpt(enu_points_matrix, pitch, roll, azimuth):
# 将enu上的点投影到cpt坐标系上,enu_points_matrix的每一列表示一个点
return np.dot(construct_matrix_enu_cpt(roll, pitch, azimuth), enu_points_matrix)
def cpt_to_enu(cpt_points_matrix, pitch, roll, azimuth):
# 将cpt上的点投影到enu坐标系上,cpt_points_matrix的每一列表示一个点
return np.dot(construct_matrix_cpt_enu(roll, pitch, azimuth), cpt_points_matrix)
def ecef_to_enu(ecef_points_matrix, lon, lat, up):
# 将ecef上的某点投影到指定的enu坐标系上,ecef_points_matrix的每一列表示一个点
translation_vector = construct_vector_enu_ecef(lon, lat, up)
return np.dot(construct_matrix_ecef_enu(lon, lat), (ecef_points_matrix - translation_vector))
def enu_to_ecef(enu_points_matrix, lon, lat, up):
# 将enu上的某点投影到指定的ecef坐标系上,enu_points_matrix的每一列表示一个点
translation_vector = construct_vector_enu_ecef(lon, lat, up)
return np.dot(construct_matrix_enu_ecef(lon, lat), enu_points_matrix) + translation_vector
def ecef_to_cpt(lon, lat, up, pitch, roll, azimuth, ecef_points):
# 将ecef上的点投影到指定的cpt坐标系上
# 范围的是一个二维列表,每一个元素为一个cpt坐标系下的点
ecef_points_matrix = np.array(ecef_points).T
enu_points_matrix = ecef_to_enu(ecef_points_matrix, lon, lat, up)
cpt_points_matrix = enu_to_cpt(enu_points_matrix, pitch, roll, azimuth)
cpt_points = [cpt_points_matrix[:, i] for i in range(np.shape(cpt_points_matrix)[-1])]
return cpt_points
def cpt_to_ecef(lon, lat, up, pitch, roll, azimuth, cpt_points):
# 将cpt上的点投影到指定的ecef坐标系上
# cpt_points是一个二维列表,每一个元素表示一个cpt下的点,[[x, y, z], ...]
# 返回的是一个二维列表,每一个元素表示一个ecef坐标系下的点
cpt_points_matrix = np.array(cpt_points).T
enu_points_matrix = cpt_to_enu(cpt_points_matrix, pitch, roll, azimuth)
ecef_points_matrix = enu_to_ecef(enu_points_matrix, lon, lat, up)
return [ecef_points_matrix[:, i] for i in range(np.shape(ecef_points_matrix)[-1])]
def blh_to_cpt(blh_points, lon, lat, up, pitch, roll, azimuth):
ecef_points = [blh_to_ecef(point[0], point[1], point[2]) for point in blh_points]
ecef_points_matrix = np.array(ecef_points).T
enu_points_matrix = ecef_to_enu(ecef_points_matrix, lon, lat, up)
cpt_points_matrix = enu_to_cpt(enu_points_matrix, pitch, roll, azimuth)
cpt_points = [cpt_points_matrix[:, i] for i in range(np.shape(cpt_points_matrix)[-1])]
return cpt_points
def cpt_to_blh(cpt_points, lon, lat, up, pitch, roll, azimuth):
# 将cpt坐标系下的点投影到blh坐标系下
# cpt_points是一个二维列表,每一个元素表示一个cpt坐标系下的点
# 返回的是一个二维列表,每一个元素表示一个blh坐标系下的点
cpt_points_matrix = np.array(cpt_points).T
enu_points_matrix = cpt_to_enu(cpt_points_matrix, pitch, roll, azimuth)
ecef_points_matrix = enu_to_ecef(enu_points_matrix, lon, lat, up)
blh_points = [ecef_to_blh(*ecef_points_matrix[:, i]) for i in range(np.shape(ecef_points_matrix)[-1])]
return blh_points
# if __name__ == '__main__':
# blh_points = [(121.29252568495818, 30.2123505136701, 28.342482462525368),
# (121.29250801166651, 30.21231321905551, 28.37178515084088),
# (121.29249033838757, 30.212275924438828, 28.40108783915639),
# (121.29247282234871, 30.212238824265214, 28.429960872977972),
# (121.2924553063224, 30.21220172408956, 28.458833906799555),
# (121.29243939266972, 30.212167895917432, 28.48491994710639),
# (121.29242347902743, 30.212134067743627, 28.511005987413228),
# ]
# cpt_points = [[-4.78917141, -1.34333042, -1.67376305], [-4.88939957, -4.30204398, -1.74877275]]
# cpt_points = [[(cpt_points[0][0] + cpt_points[1][0]) / 2, (cpt_points[0][1] + cpt_points[1][1]) / 2, (cpt_points[0][2] + cpt_points[1][2]) / 2]]
# blh_points = cpt_to_blh(cpt_points, 121.2925645406, 30.2122941425, 30.206394, -0.427793, 1.288437, 202.06017)
# print(blh_points)
# _*_ coding: utf-8 _*_ #
# @Time: 2020/6/1 11:40
# @Author: XiongChao
from osgeo import ogr, osr
import math
def transform_to_utm(blh_dataset, source_srs):
lon = blh_dataset[0][0]
utm_zone = 31 + math.floor(lon / 6.0)
utm_srs = osr.SpatialReference()
utm_srs.SetUTM(utm_zone, 1)
src_to_utm_trans = osr.CreateCoordinateTransformation(source_srs, utm_srs)
utm_dataset = []
for blh_point in blh_dataset:
temp_point = blh_point
utm_point = list(src_to_utm_trans.TransformPoint(temp_point[0], temp_point[1]))
utm_point[2] = blh_point[2]
utm_dataset.append(utm_point)
return utm_dataset, utm_srs
def transform_to_src(utm_dataset, source_srs, utm_srs):
utm_to_src_trans = osr.CreateCoordinateTransformation(utm_srs, source_srs)
src_dataset = []
for utm_point in utm_dataset:
temp_point = utm_point
src_point = list(utm_to_src_trans.TransformPoint(temp_point[0], temp_point[1]))
src_point[2] = utm_point[2]
src_dataset.append(src_point)
return src_dataset
def create_line_geometry(dataset):
geo = ogr.Geometry(ogr.wkbLineStringZM)
for i in range(len(dataset)):
geo.SetPoint(i, dataset[i][0], dataset[i][1], dataset[i][2])
return geo
def create_point_geometry(point):
geo = ogr.Geometry(ogr.wkbPointZM)
geo.SetPoint(0, point[0], point[1], point[2])
return geo
# _*_ coding: utf-8 _*_ #
# @Time: 2020/1/7 19:37
# @Author: XiongChao
from osgeo import ogr, osr
import math
import copy
from util import const
from util.common import is_same_point, three_point_dot_product, two_point_distance
def get_buffer_point(point1, point2):
# 获取两个点,任意一个点与point1的连线垂直于point1与point2的连线,此点到point1的距离为const.RECTANGLE_WIDTH
result1 = const.RECTANGLE_WIDTH / two_point_distance(point1, point2)
x1 = point1[0] - result1 * (point2[1] - point1[1])
y1 = point1[1] + result1 * (point2[0] - point1[0])
x2 = point1[0] + result1 * (point2[1] - point1[1])
y2 = point1[1] - result1 * (point2[0] - point1[0])
z = (point1[2] + point2[2]) / 2
return (x1, y1, z), (x2, y2, z)
def get_buffer_points(point1, point2):
# 根据point1和point2构造buffer,并返回四个点
data1, data2 = get_buffer_point(point1, point2)
data3, data4 = get_buffer_point(point2, point1)
return [data1, data2, data3, data4]
def intersection_of_two_line(point1, point2, point3, point4):
# 获取两条直线的交点, point1和point2为一条线的起始点,point3和point4为一条直线的起始点
# 在这里,如果两条线平行,那么必然是数据有问题
result1 = (point2[1] - point1[1]) * (point4[0] - point3[0]) - (point4[1] - point3[1]) * (point2[0] - point1[0])
# if result1 == 0:
# return False
result2 = (point2[1] - point1[1]) * point1[0] - (point2[0] - point1[0]) * point1[1]
result3 = (point4[1] - point3[1]) * point3[0] - (point4[0] - point3[0]) * point3[1]
x = ((point4[0] - point3[0]) * result2 - (point2[0] - point1[0]) * result3) / result1
y = ((point4[1] - point3[1]) * result2 - (point2[1] - point1[1]) * result3) / result1
z = (point1[2] + point2[2]) / 2 # point3和point4为边界线,为BL
return x, y, z
def truncate_or_offset_line(dataset, point):
# dataset为数据集,point为数据集与边界的交点,截断数据集或者延长数据集
copy_dataset = copy.copy(dataset)
point1 = dataset.pop()
if is_same_point(point1, point):
point1 = dataset.pop()
while dataset:
point2 = dataset.pop()
if is_same_point(point2, point):
point2 = dataset.pop()
result = three_point_dot_product(point2, point1, point)
if result > 0:
point1 = point2
else:
dataset.append(point2)
dataset.append(point1)
break
if len(dataset) == 0:
dataset.append(copy_dataset[0])
return dataset
def create_line_geometry(dataset):
geo = ogr.Geometry(ogr.wkbLineStringZM)
for i in range(len(dataset)):
geo.SetPoint(i, dataset[i][0], dataset[i][1], dataset[i][2])
return geo
def create_point_geometry(point):
geo = ogr.Geometry(ogr.wkbPointZM)
geo.SetPoint(0, point[0], point[1], point[2])
return geo
def remove_duplicate_data(dataset):
# 将数据集中重复的点去掉,每一个feature至少有起点和终点,并且这两个点不同
# 重点肯定是连续的,并且数量不确定
dataset1 = copy.copy(dataset[::-1])
target_dataset = []
point1 = dataset1.pop()
target_dataset.append(point1)
while dataset1:
point2 = dataset1.pop()
if is_same_point(point1, point2):
continue
target_dataset.append(point2)
point1 = point2
return target_dataset
def transform_to_utm(blh_dataset, source_srs):
lon = blh_dataset[0][0]
utm_zone = 31 + math.floor(lon / 6.0)
utm_srs = osr.SpatialReference()
utm_srs.SetUTM(utm_zone, 1)
src_to_utm_trans = osr.CreateCoordinateTransformation(source_srs, utm_srs)
utm_dataset = []
for blh_point in blh_dataset:
temp_point = blh_point
utm_point = list(src_to_utm_trans.TransformPoint(temp_point[0], temp_point[1]))
utm_point[2] = blh_point[2]
utm_dataset.append(utm_point)
return utm_dataset, utm_srs
def transform_to_src(utm_dataset, source_srs, utm_srs):
utm_to_src_trans = osr.CreateCoordinateTransformation(utm_srs, source_srs)
src_dataset = []
for utm_point in utm_dataset:
temp_point = utm_point
src_point = list(utm_to_src_trans.TransformPoint(temp_point[0], temp_point[1]))
src_point[2] = utm_point[2]
src_dataset.append(src_point)
return src_dataset
def create_polygon_geometry(dataset):
ring = ogr.Geometry(ogr.wkbLinearRing)
for i in range(len(dataset)):
ring.AddPoint(dataset[i][0], dataset[i][1], dataset[i][2])
yard = ogr.Geometry(ogr.wkbPolygon)
yard.AddGeometry(ring)
yard.CloseRings()
return yard
This diff is collapsed.
# _*_ coding: utf-8 _*_ #
# @Time: 2020/5/28 10:45
# @Author: XiongChao
\ No newline at end of file
...@@ -7,14 +7,14 @@ import numpy as np ...@@ -7,14 +7,14 @@ import numpy as np
import math import math
def two_point_distance(point1, point2): def two_point_distance_2d(point1, point2):
# 计算二元坐标的距离 # 计算二元坐标的距离
return math.sqrt((point2[0] - point1[0]) ** 2 + (point2[1] - point1[1]) ** 2) return math.sqrt((point2[0] - point1[0]) ** 2 + (point2[1] - point1[1]) ** 2)
def interpolation_between_two_point(start_point, end_point, ratio): def interpolation_between_two_point_2d(start_point, end_point, ratio):
# 在起点与终点之间进行插值,ratio是插值的数量 # 在起点与终点之间进行插值,ratio是插值的数量
distance = two_point_distance(start_point, end_point) distance = two_point_distance_2d(start_point, end_point)
n = int(distance * ratio) # 插值的个数 n = int(distance * ratio) # 插值的个数
dataset = [] dataset = []
result = np.linspace(0, 1, n + 2)[:n + 1] # 保留首,不保留尾 result = np.linspace(0, 1, n + 2)[:n + 1] # 保留首,不保留尾
...@@ -23,18 +23,45 @@ def interpolation_between_two_point(start_point, end_point, ratio): ...@@ -23,18 +23,45 @@ def interpolation_between_two_point(start_point, end_point, ratio):
y = k * (end_point[1] - start_point[1]) + start_point[1] y = k * (end_point[1] - start_point[1]) + start_point[1]
if len(start_point) == 3: if len(start_point) == 3:
z = k * (end_point[2] - start_point[2]) + start_point[2] z = k * (end_point[2] - start_point[2]) + start_point[2]
point = (x, y, z) point = [x, y, z]
else: else:
point = (x, y) point = [x, y]
dataset.append(point) dataset.append(point)
return dataset return dataset
def interpolation_dataset(dataset, interpolation_mount): def interpolation_dataset_2d(dataset, interpolation_mount):
m = len(dataset) m = len(dataset)
new_dataset = [] new_dataset = []
for i in range(m - 1): for i in range(m - 1):
new_dataset += interpolation_between_two_point(dataset[i], dataset[i + 1], interpolation_mount) new_dataset += interpolation_between_two_point_3d(dataset[i], dataset[i + 1], interpolation_mount)
new_dataset.append(dataset[m - 1]) new_dataset.append(dataset[m - 1])
return new_dataset return new_dataset
def two_point_distance_3d(point1, point2):
# 计算二元坐标的距离
return math.sqrt((point2[0] - point1[0]) ** 2 + (point2[1] - point1[1]) ** 2 + (point2[2] - point1[2]) ** 2)
def interpolation_between_two_point_3d(start_point, end_point, ratio):
# 在起点与终点之间进行插值,ratio是插值的数量
distance = two_point_distance_3d(start_point, end_point)
n = int(distance * ratio) # 插值的个数
dataset = []
result = np.linspace(0, 1, n + 2)[:n + 1] # 保留首,不保留尾
for k in result:
x = k * (end_point[0] - start_point[0]) + start_point[0]
y = k * (end_point[1] - start_point[1]) + start_point[1]
z = k * (end_point[2] - start_point[2]) + start_point[2]
dataset.append([x, y, z])
return dataset
def interpolation_dataset_3d(dataset, interpolation_mount):
m = len(dataset)
new_dataset = []
for i in range(m - 1):
new_dataset += interpolation_between_two_point_3d(dataset[i], dataset[i + 1], interpolation_mount)
new_dataset.append(dataset[m - 1])
return new_dataset
# _*_ coding: utf-8 _*_ #
# @Time: 2020/5/27 15:27
# @Author: XiongChao
import numpy as np
from sklearn.neighbors import KDTree
import math
def two_point_distance_2d(point1, point2):
# 计算二维点的距离
return math.sqrt((point2[0] - point1[0]) ** 2 + (point2[1] - point1[1]) ** 2)
def two_point_distance_3d(point1, point2):
# 计算三元坐标点的距离
return math.sqrt((point2[0] - point1[0]) ** 2 + (point2[1] - point1[1]) ** 2 + (point2[2] - point1[2]) ** 2)
def interpolation_between_two_point(start_point, end_point, ratio):
# 在起点与终点之间进行插值,ratio是插值的数量
distance = two_point_distance_3d(start_point, end_point)
n = int(distance * ratio) # 插值的个数
dataset = []
result = np.linspace(0, 1, n + 2)[:n + 1] # 保留首,不保留尾
for k in result:
x = k * (end_point[0] - start_point[0]) + start_point[0]
y = k * (end_point[1] - start_point[1]) + start_point[1]
if len(start_point) == 3:
z = k * (end_point[2] - start_point[2]) + start_point[2]
point = [x, y, z]
else:
point = [x, y]
dataset.append(point)
return dataset
class CalElevation:
"""
使用局部加权计算新的插值点的高程值
通过高斯函数将距离映射为权重,带宽取距离之和除以一个常数
"""
def __init__(self, m, n):
self.m = m # 根据目标点附近的几个点计算目标点的高程
self.n = n # 插值反算的时候对原始数据的插值数;越密,高程计算越准确
@staticmethod
def get_epsilon(points, point):
# 获取带宽,带宽为所有点的距离之和
square_epsilon = 0
for point1 in points:
square_epsilon += two_point_distance_2d(point, point1) ** 2
return np.sqrt(square_epsilon)
@staticmethod
def weight_func(point1, point2, epsilon=0.1):
distance = two_point_distance_2d(point1, point2)
return np.exp(-distance ** 2 / (2 * epsilon ** 2))
def get_weighted_list(self, points, point):
epsilon = self.get_epsilon(points, point) / 6
weight_list = []
for point1 in points:
weight_list.append(self.weight_func(point, point1, epsilon))
return weight_list
def get_elevation(self, ori_points, new_points):
# new_points是需要计算高程的点集,ori_points是原始点集
new_dataset = interpolation_dataset(ori_points, self.n)
ori_xy_list = [[point[0], point[1]] for point in new_dataset]
tree = KDTree(np.array(ori_xy_list), leaf_size=20) # 二维kd数
for i in range(1, len(new_points) - 1):
point = new_points[i]
nearest_points = []
dist, ind = tree.query([np.array(point[:2])], k=self.m)
index_list = list(ind[0])
for index in index_list:
nearest_points.append(new_dataset[index])
elevation_list = [point[2] for point in nearest_points]
weight_list = self.get_weighted_list(nearest_points, point)
ratio_list = [weight / sum(weight_list) for weight in weight_list]
elevation = sum(ratio_list[i] * elevation_list[i] for i in range(len(ratio_list)))
new_points[i][2] = elevation
return new_points
def interpolation_dataset(dataset, interpolation_mount):
m = len(dataset)
new_dataset = []
for i in range(m - 1):
new_dataset += interpolation_between_two_point(dataset[i], dataset[i + 1], interpolation_mount)
new_dataset.append(dataset[m - 1])
return new_dataset
class ImprovedDouglasPeucker(object):
"""
1. 抽稀后得结果不要两个点两个点得形式
2. 第二考虑高程得问题
3. 二维点的抽稀,三维坐标是utm坐标系
4. 二维点抽稀完后按照指定长度进行插值,插值点的第三维坐标需要反算
"""
def __init__(self, interpolation_count=0.125):
self.threshold = 0.005
self.m = 4 # 插值反算高程的时候使用多少个点进行计算
self.n = 4 # 插值反算的时候对原始数据的插值数;越密,高程计算越准确
self.interpolation_count = interpolation_count # 抽稀后数据的插值密度
self.returned_list = []
self.not_dilute_list = []
@staticmethod
def construct_vector_by_point(point):
# 构造从point1指向point2的向量,返回的是列向量
return np.array([point[0], point[1]]).reshape(2, 1)
def get_distance_from_point_to_line(self, point1, point2, point3):
# point2, point3确定一条空间直线,point1到此直线的距离
tmp_var = np.squeeze(np.dot(
(self.construct_vector_by_point(point3) - self.construct_vector_by_point(point2)).T,
(self.construct_vector_by_point(point2) - self.construct_vector_by_point(point1)))) / \
np.linalg.norm(self.construct_vector_by_point(point3) - self.construct_vector_by_point(point2)) ** 2
vertical_point = -tmp_var * (self.construct_vector_by_point(point3) - self.construct_vector_by_point(point2)) \
+ self.construct_vector_by_point(point2)
return np.linalg.norm(self.construct_vector_by_point(point1) - self.construct_vector_by_point(vertical_point))
def dilute(self, points):
m = len(points)
if m < 3:
self.returned_list.extend(points[::-1])
return
max_distance, index = 0, 0
for i in range(1, m - 1):
distance = self.get_distance_from_point_to_line(points[i], points[0], points[-1])
if distance > max_distance:
max_distance = distance
index = i
if max_distance < self.threshold:
self.returned_list.append(points[-1])
self.returned_list.append(points[0])
else:
sequence1 = points[:index + 1]
sequence2 = points[index:]
sequences = [sequence1, sequence2]
for i in range(2):
sequence = sequences[i]
if len(sequence) < 3 and i == 1:
self.returned_list.extend(sequence[::-1])
else:
self.not_dilute_list.append(sequence)
@staticmethod
def is_same_point(point1, point2):
# 判断两个二维点是不是一个点
flag = np.array([np.abs(point1[i] - point2[i]) < 0.000001 for i in range(len(point1))])
if flag.all():
return True
return False
def remove_duplicated_point(self, points):
# 去除二维重点,重点是连续的
dataset1 = points[::-1] + []
target_dataset = []
point1 = dataset1.pop()
target_dataset.append(point1)
while dataset1:
point2 = dataset1.pop()
if self.is_same_point(point1[:2], point2[:2]):
continue
target_dataset.append(point2)
point1 = point2
return target_dataset
def main(self, points):
# points是三维数据
self.dilute(points)
while self.not_dilute_list:
self.dilute(self.not_dilute_list.pop())
self.returned_list = self.returned_list[::-1] # 在抽稀的过程中反向了
tmp_points = self.remove_duplicated_point(self.returned_list)
new_points = interpolation_dataset(tmp_points, self.interpolation_count)
return CalElevation(self.m, self.n).get_elevation(points, new_points)
This diff is collapsed.
This diff is collapsed.
# !/usr/bin/env python
# coding:utf-8
# 此文件由薛冰冰编码完成,功能是:由于库中的topo数据不满足规范要求,因而调用此程序进行修改
# 本人仅删除没有使用过的变量,以及调整格式
from osgeo import gdal, ogr
import re
class RefreshTopoNext(object):
def __init__(self):
self.id = "ID"
self.meshidname = 'MESHID'
self.nextname = "NEXT"
self.nextcountname = "NEXT_COUNT"
self.prevname = "PREV"
self.prevcountname = "PREV_COUNT"
self.layername = "HD_TOPO_P"
def run(self, datasoure=None, meshid=0):
layer = datasoure.GetLayerByName(self.layername)
if layer is None:
print("can not find indentify layer: " + str(layer.GetName()))
return False
afilter = '"' + self.meshidname + '" = ' + "'" + str(meshid) + "'"
layer.SetAttributeFilter(afilter)
feat = layer.GetNextFeature()
mesh_featdict = {}
while feat is not None:
fid = feat.GetFieldAsInteger64(self.id)
mesh_featdict[fid] = feat
feat = layer.GetNextFeature()
next_idx = layer.FindFieldIndex(self.nextname, 1)
nextcount_idx = layer.FindFieldIndex(self.nextcountname, 1)
prev_idx = layer.FindFieldIndex(self.prevname, 1)
prevcount_idx = layer.FindFieldIndex(self.prevcountname, 1)
if next_idx < 0 or nextcount_idx < 0 or prev_idx < 0 or prevcount_idx < 0:
print("error data")
return False
layer.SetAttributeFilter(afilter)
layer.ResetReading()
feat = layer.GetNextFeature()
update_fet_set = set()
while feat is not None:
prevstring = feat.GetFieldAsString(prev_idx)
res = filter(lambda x: len(x) > 0, re.split('[:)(]', prevstring.strip()))
prev_ids = list(res)
count = len(prev_ids)
i = 1
while i < count:
prev_fid = int(prev_ids[i])
if prev_fid not in mesh_featdict:
feat.SetFieldInteger64(prevcount_idx, 0)
update_fet_set.add(feat)
i = i + 1
nextstring = feat.GetFieldAsString(next_idx)
res = filter(lambda x: len(x) > 0, re.split('[:)(]', nextstring.strip()))
next_ids = list(res)
count = len(next_ids)
i = 1
while i < count:
next_fid = int(next_ids[i])
if next_fid not in mesh_featdict:
feat.SetFieldInteger64(nextcount_idx, 0)
update_fet_set.add(feat)
i = i + 1
feat = layer.GetNextFeature()
datasoure.StartTransaction()
for fet in update_fet_set:
layer.SetFeature(fet)
datasoure.CommitTransaction()
layer.SetAttributeFilter(None)
layer.ResetReading()
return True
# def main():
# tool = RefreshTopoNext()
# try:
# filepath = r"PG:dbname='test1' host='localhost' port='5432' user='postgres' password='19931018'"
# datasource = ogr.Open(filepath, gdal.OF_VECTOR | gdal.OF_UPDATE)
#
# if tool.run(datasoure=datasource, meshid=20002055):
# print(u"SUCCESS:刷新图幅边界上不在本图幅内的前驱后继关系")
# else:
# print(u"ERROR:刷新图幅边界上不在本图幅内的前驱后继关系")
# except BaseException as e:
# print(u"ERROR:刷新图幅边界上不在本图幅内的前驱后继关系")
# print(e)
#
#
# if __name__ == '__main__':
# main()
# _*_ coding: utf-8 _*_ #
# @Time: 2020/1/2 19:18
# @Author: XiongChao
import numpy as np
def get_coefficient_vector(point, n):
# 根据一个点,返回一个(n + 1) × 1的矩阵
vector = np.zeros(n + 1)
for i in range(n + 1):
vector[i] = np.power(point[0], i)
return vector.reshape(n + 1, -1) # 返回的对象是二维
def get_coefficient_matrix(dataset, n):
# 获取系数矩阵,为m × (n + 1)
m = len(dataset)
coefficient_matrix = np.zeros((m, n + 1))
for i in range(m):
for j in range(n + 1):
coefficient_matrix[i, j] = np.power(dataset[i][0], j)
return coefficient_matrix
def get_standard_Y_matrix(dataset):
# 获取原始的Y值矩阵,为m × 1
m = len(dataset)
Y_matrix = np.zeros((m, 1))
for i in range(m):
Y_matrix[i, 0] = dataset[i][1]
return Y_matrix
def poly_fitting_parameter(dataset, n):
# dataset中有m个点用于拟合,拟合的多项式的次数为n次
coefficient_matrix = get_coefficient_matrix(dataset, n)
Y_matrix = get_standard_Y_matrix(dataset)
result = np.linalg.det(np.dot(coefficient_matrix.T, coefficient_matrix))
if result == 0:
# 返回的最好是同一种数据结构,便于判断,比如Numpy数组没有 not array判断
return np.zeros(n)
result1 = np.linalg.inv(np.dot(coefficient_matrix.T, coefficient_matrix))
parameter = np.dot(np.dot(result1, coefficient_matrix.T), Y_matrix) # 为(n + 1) × 1矩阵
return parameter
def fitting_loss(dataset, n):
# 指定次数的多项式拟合损失
parameter = poly_fitting_parameter(dataset, n)
if parameter.shape == (n, ):
# 出现奇异矩阵的时候,默认损失为无穷
return np.inf
loss = 0
for data in dataset:
X_vector = get_coefficient_vector(data, n)
Y_predict = np.dot(X_vector.T, parameter)
loss += np.squeeze(data[1] - Y_predict) ** 2
return loss, parameter
def get_degree_of_polynomial(dataset):
# 返回损失值最小的多项式次数
degree_list = [1, 2] # 仅遍历到二次
loss_list = []
parameter_list = []
for n in degree_list:
loss, parameter = fitting_loss(dataset, n)
loss_list.append(loss)
parameter_list.append(parameter)
result = np.array([value == np.inf for value in loss_list])
if result.all():
# 遍历的次数下,构造的系数矩阵都为奇异矩阵
return False
degree = loss_list.index(min(loss_list))
optimization_parameter = parameter_list[degree]
return degree + 1, optimization_parameter
def data_normalization(dataset):
# 数据归一化,返回新数据和均值
x_dataset = [data[0] for data in dataset]
y_dataset = [data[1] for data in dataset]
x_mean = sum(x_dataset) / len(x_dataset)
y_mean = sum(y_dataset) / len(y_dataset)
new_dataset = [[data[0] - x_mean, data[1] - y_mean] for data in dataset]
return new_dataset, x_mean, y_mean
\ No newline at end of file
# _*_ coding: utf-8 _*_ #
# @Time: 2020/1/15 10:32
# @Author: XiongChao
# 对数据集进行正向旋转平移和反向旋转平移
import numpy as np
import math
def cos_(vector1, vector2):
# 计算两个向量的余弦值
cos_theta = (vector1[0] * vector2[0] + vector1[1] * vector2[1]) / \
(math.sqrt(vector1[0] ** 2 + vector1[1] ** 2) * math.sqrt(vector2[0] ** 2 + vector2[1] ** 2))
# 由于浮点运算的截断误差,可能导致求得的值大于1或者小于-1,因此需要进行处理
if cos_theta >= 1:
cos_theta = 1
elif cos_theta <= -1:
cos_theta = -1
return cos_theta
def angle_of_two_vector(vector1, vector2):
# 返回两个向量的角度,返回的值为弧度
cos_theta = cos_(vector1, vector2)
theta = math.acos(cos_theta)
return theta
def intersection_of_two_line(point1, point2, point3, point4):
# 获取两条直线的交点, point1和point2为一条线的起始点,point3和point4为一条直线的起始点
# 在这里,如果两条线平行,那么必然是数据有问题
result1 = (point2[1] - point1[1]) * (point4[0] - point3[0]) - (point4[1] - point3[1]) * (point2[0] - point1[0])
if result1 == 0:
return False
result2 = (point2[1] - point1[1]) * point1[0] - (point2[0] - point1[0]) * point1[1]
result3 = (point4[1] - point3[1]) * point3[0] - (point4[0] - point3[0]) * point3[1]
x = ((point4[0] - point3[0]) * result2 - (point2[0] - point1[0]) * result3) / result1
y = ((point4[1] - point3[1]) * result2 - (point2[1] - point1[1]) * result3) / result1
return x, y
def clockwise_counterclockwise(point1, point2, point3):
# 判断123是顺时针还是逆时针
# 计算向量point1point2与point2point3的向量积
result = (point2[0] - point1[0]) * (point3[1] - point2[1]) - (point2[1] - point1[1]) * (point3[0] - point2[0])
if result > 0:
# point1point2point3逆时针
return 1
else:
return 0
def rotation_angle(point1, point2):
# 计算坐标轴旋转的角度
vector1 = (point2[0] - point1[0], point2[1] - point1[1])
vector2 = (1, 0)
angle = angle_of_two_vector(vector1, vector2)
intersection = intersection_of_two_line(point1, point2, (0, 0), (1, 0))
if not intersection:
return angle
flag_point = [intersection[0] + vector1[0], intersection[1] + vector1[1]]
result = clockwise_counterclockwise((0, 0), (1, 0), flag_point)
if result == 0:
angle = 2 * np.pi - angle
return angle
def data_normalization(dataset):
# 数据归一化,返回新数据和均值
x_dataset = [data[0] for data in dataset]
y_dataset = [data[1] for data in dataset]
x_mean = sum(x_dataset) / len(x_dataset)
y_mean = sum(y_dataset) / len(y_dataset)
new_dataset = [[data[0] - x_mean, data[1] - y_mean] for data in dataset]
return new_dataset, x_mean, y_mean
def construct_positive_transformation_matrix(theta):
# theta为弧度值
positive_matrix = np.zeros((2, 2))
positive_matrix[0, 0] = np.cos(theta)
positive_matrix[0, 1] = np.sin(theta)
positive_matrix[1, 0] = -np.sin(theta)
positive_matrix[1, 1] = np.cos(theta)
return positive_matrix
def positive_transformation_point(theta, point):
# 正向变化一个点
coefficient_matrix = np.zeros((2, 1))
coefficient_matrix[0, 0], coefficient_matrix[1, 0] = point[0], point[1]
transformation_matrix = construct_positive_transformation_matrix(theta)
positive_matrix = np.dot(transformation_matrix, coefficient_matrix)
positive_point = [positive_matrix[0, 0], positive_matrix[1, 0]]
return positive_point
def construct_negative_transformation_matrix(theta):
# theta为弧度值
negative_matrix = np.zeros((2, 2))
negative_matrix[0, 0] = np.cos(theta)
negative_matrix[0, 1] = - np.sin(theta)
negative_matrix[1, 0] = np.sin(theta)
negative_matrix[1, 1] = np.cos(theta)
return negative_matrix
def negative_transformation_point(theta, point):
# 反向变换一个点
coefficient_matrix = np.zeros((2, 1))
coefficient_matrix[0, 0], coefficient_matrix[1, 0] = point[0], point[1]
transformation_matrix = construct_negative_transformation_matrix(theta)
negative_matrix = np.dot(transformation_matrix, coefficient_matrix)
negative_point = [negative_matrix[0, 0], negative_matrix[1, 0]]
return negative_point
def positive_transformation(dataset):
# 对数据集(点集)进行正向旋转平移变换,使用起点和终点构建方向
new_dataset, x_mean, y_mean = data_normalization(dataset)
n = len(dataset)
coefficient_matrix = np.zeros((2, n))
for i in range(n):
coefficient_matrix[:, i] = new_dataset[i]
theta = rotation_angle(new_dataset[0], new_dataset[-1])
transformation_matrix = construct_positive_transformation_matrix(theta)
positive_matrix = np.dot(transformation_matrix, coefficient_matrix)
m = len(positive_matrix[0])
positive_dataset = [[]] * m
for i in range(m):
positive_dataset[i] = list(positive_matrix[:, i])
return positive_dataset, theta, x_mean, y_mean
def negative_transformation(dataset, theta, x_mean, y_mean):
# 对数据集(点集)进行反向旋转平移变换,使用起点和终点构建方向
n = len(dataset)
coefficient_matrix = np.zeros((2, n))
for i in range(n):
coefficient_matrix[: i] = dataset[i]
transformation_matrix = construct_negative_transformation_matrix(theta)
mean_matrix = np.zeros((2, 1))
mean_matrix[0, 0] = x_mean
mean_matrix[1, 0] = y_mean
negative_matrix = np.dot(transformation_matrix, coefficient_matrix) + mean_matrix
m = len(coefficient_matrix[0])
negative_dataset = [[]] * m
for i in range(m):
negative_dataset[i] = list(negative_matrix[:, i])
return negative_dataset
# _*_ coding: utf-8 _*_ #
# @Time: 2020/1/2 19:19
# @Author: XiongChao
import numpy as np
from .polynomial_fitting import get_degree_of_polynomial, get_coefficient_vector
from .rotation_and_translation_of_coordinate import positive_transformation, negative_transformation_point, \
positive_transformation_point
def savgol(dataset, window_size=5):
m = len(dataset)
if m <= window_size:
return dataset
smoothing_dataset = [[]] * m # 存储最终的平滑数据
smoothing_dataset[0], smoothing_dataset[-1] = dataset[0], dataset[-1] # 保留首尾点
# 对开始几个点的平滑
positive_dataset, theta, x_mean, y_mean = positive_transformation(dataset[:window_size])
result = get_degree_of_polynomial(positive_dataset)
if result:
degree, parameter = result
# parameter = poly_fitting_parameter(positive_dataset, degree)
for i in range(1, window_size // 2):
point = dataset[i]
new_point = [point[0] - x_mean, point[1] - y_mean]
positive_point = positive_transformation_point(theta, new_point)
X_vector = get_coefficient_vector(positive_point, degree)
Y_predict = float(np.squeeze(np.dot(X_vector.T, parameter))) # 去掉矩阵的维度,变为一个浮点数
negative_point = np.zeros((2, 1))
negative_point[0, 0], negative_point[1, 0] = positive_point[0], Y_predict
result_point = negative_transformation_point(theta, negative_point)
target_point = [result_point[0] + x_mean, result_point[1] + y_mean]
smoothing_dataset[i] = target_point
# 对最后几个点的平滑
positive_dataset, theta, x_mean, y_mean = positive_transformation(dataset[-window_size:])
result = get_degree_of_polynomial(positive_dataset)
if result:
degree, parameter = result
# parameter = poly_fitting_parameter(positive_dataset, degree)
for j in range(m - (window_size - 1) // 2, m - 1):
point = dataset[j]
new_point = [point[0] - x_mean, point[1] - y_mean]
positive_point = positive_transformation_point(theta, new_point)
X_vector = get_coefficient_vector(positive_point, degree)
Y_predict = float(np.squeeze(np.dot(X_vector.T, parameter))) # 去掉矩阵的维度,变为一个浮点数
negative_point = np.zeros((2, 1))
negative_point[0, 0], negative_point[1, 0] = positive_point[0], Y_predict
result_point = negative_transformation_point(theta, negative_point)
target_point = [result_point[0] + x_mean, result_point[1] + y_mean]
smoothing_dataset[j] = target_point
# 获取中间点的平滑值
for i in range(window_size // 2, m - (window_size - 1) // 2):
data = dataset[i - window_size // 2: i + window_size // 2 + 1]
positive_dataset, theta, x_mean, y_mean = positive_transformation(data)
result = get_degree_of_polynomial(positive_dataset)
if result:
degree, parameter = result
# parameter = poly_fitting_parameter(positive_dataset, degree)
point = dataset[i]
new_point = [point[0] - x_mean, point[1] - y_mean]
positive_point = positive_transformation_point(theta, new_point)
X_vector = get_coefficient_vector(positive_point, degree)
Y_predict = float(np.squeeze(np.dot(X_vector.T, parameter))) # 去掉矩阵的维度,变为一个浮点数
negative_point = np.zeros((2, 1))
negative_point[0, 0], negative_point[1, 0] = positive_point[0], Y_predict
result_point = negative_transformation_point(theta, negative_point)
target_point = [result_point[0] + x_mean, result_point[1] + y_mean]
smoothing_dataset[i] = target_point
return smoothing_dataset
def automatic_smoothing(dataset, times):
# 对线几何的自动化平滑处理
XY_dataset = [[data[0], data[1]] for data in dataset] # 仅对投影坐标下的值进行平滑,z值不变
Z_dataset = [data[2] for data in dataset]
for i in range(times):
XY_dataset = savgol(XY_dataset)
smoothing_XY_dataset = XY_dataset
smoothing_dataset = [[smoothing_XY_dataset[i][0], smoothing_XY_dataset[i][1], Z_dataset[i]]
for i in range(len(Z_dataset))]
return smoothing_dataset
...@@ -7,14 +7,19 @@ import math ...@@ -7,14 +7,19 @@ import math
import numpy as np import numpy as np
from sklearn.neighbors import KDTree from sklearn.neighbors import KDTree
from . import const
from .data_interpolation import interpolation_dataset
DEF_ALPHA = 0.0000001 # 单位米,如果是度的话,请进行转换
def two_point_distance(point1, point2):
def two_point_distance_2d(point1, point2):
return math.sqrt((point2[0] - point1[0]) ** 2 + (point2[1] - point1[1]) ** 2) return math.sqrt((point2[0] - point1[0]) ** 2 + (point2[1] - point1[1]) ** 2)
def two_point_distance_3d(point1, point2):
# 计算三元坐标点的距离
return math.sqrt((point2[0] - point1[0]) ** 2 + (point2[1] - point1[1]) ** 2 + (point2[2] - point1[2]) ** 2)
def cos_(vector1, vector2): def cos_(vector1, vector2):
# 计算两个向量的余弦值 # 计算两个向量的余弦值
cos_theta = (vector1[0] * vector2[0] + vector1[1] * vector2[1]) / \ cos_theta = (vector1[0] * vector2[0] + vector1[1] * vector2[1]) / \
...@@ -34,6 +39,20 @@ def angle_of_two_vector(vector1, vector2): ...@@ -34,6 +39,20 @@ def angle_of_two_vector(vector1, vector2):
return theta return theta
def intersection_of_two_line(point1, point2, point3, point4):
# 获取两条直线的交点, point1和point2为一条线的起终点,point3和point4为一条直线的起终点
# 在这里,如果两条线平行,那么必然是数据有问题
result1 = (point2[1] - point1[1]) * (point4[0] - point3[0]) - (point4[1] - point3[1]) * (point2[0] - point1[0])
if result1 == 0:
return []
result2 = (point2[1] - point1[1]) * point1[0] - (point2[0] - point1[0]) * point1[1]
result3 = (point4[1] - point3[1]) * point3[0] - (point4[0] - point3[0]) * point3[1]
x = ((point4[0] - point3[0]) * result2 - (point2[0] - point1[0]) * result3) / result1
y = ((point4[1] - point3[1]) * result2 - (point2[1] - point1[1]) * result3) / result1
z = (point1[2] + point2[2]) / 2
return [x, y, z]
def three_point_dot_product(point1, point2, point3): def three_point_dot_product(point1, point2, point3):
# 向量21与23的内积 # 向量21与23的内积
result = (point1[0] - point2[0]) * (point3[0] - point2[0]) + \ result = (point1[0] - point2[0]) * (point3[0] - point2[0]) + \
...@@ -41,35 +60,18 @@ def three_point_dot_product(point1, point2, point3): ...@@ -41,35 +60,18 @@ def three_point_dot_product(point1, point2, point3):
return result return result
def dis_from_point_to_line(k, b, point): def is_same_point_2d(point1, point2):
# k, b代表直线斜率,默认不与X轴垂直 # 判断两个点是不是一个点,仅选择x, y
dis = np.abs(k * point[0] + b - point[1]) / np.sqrt(1 + k * k) flag = np.array([np.abs(point1[i] - point2[i]) < 0.000001 for i in range(len(point1))])
return dis flag = flag[:2] # 当为三元数值得时候,仅取前两位进行比较
if flag.all():
return True
def nearest_point_of_dataset(point, dataset): return False
# 在dataset中寻找距离point最近的点
# 返回的是dataset的下标
# 仅使用两坐标
distance_list = [two_point_distance(point[:2], data[:2]) for data in dataset]
return dataset[distance_list.index(min(distance_list))]
def clockwise_counterclockwise(point1, point2, point3):
# 判断123是顺时针还是逆时针
# 计算向量point1point2与point2point3的向量积
result = (point2[0] - point1[0]) * (point3[1] - point2[1]) - (point2[1] - point1[1]) * (point3[0] - point2[0])
if result > 0:
# point1point2point3逆时针
return 1
else:
return 0
def is_same_point(point1, point2): def is_same_point_3d(point1, point2):
# 判断两个点是不是一个点 # 判断两个点是不是一个点,选择x, y, z
flag = np.array([np.abs(point1[i] - point2[i]) < const.ALPHA for i in range(len(point1))]) flag = np.array([np.abs(point1[i] - point2[i]) < DEF_ALPHA for i in range(len(point1))])
flag = flag[:2] # 当为三元数值得时候,仅取前两位进行比较
if flag.all(): if flag.all():
return True return True
return False return False
...@@ -94,78 +96,12 @@ def vertical_point(point1, point2, point3): ...@@ -94,78 +96,12 @@ def vertical_point(point1, point2, point3):
return x, y return x, y
def one_point_to_vertical_point_dis(point1, point2, point3): def nearest_m_point_of_dataset_2d(point, dataset, m):
# point2, point3确定一条直线,point1到此直线的垂点
ver_point = vertical_point(point1, point2, point3)
distance = two_point_distance(point1, ver_point)
return distance
# def one_point_to_vertical_point_dis(point1, point2, point3):
# # # point2, point3确定一条直线,point1到此直线的垂点
# # ver_point = vertical_point(point1, point2, point3)
# # distance = two_point_distance(point1, ver_point)
# # return distance
def nearest_point_index_of_dataset(point, dataset):
# 在dataset中寻找距离point最近的点 # 在dataset中寻找距离point最近的点
# 仅使用两坐标 # 二维搜索
tree = KDTree(np.array(dataset), leaf_size=const.LEAF_SIZE) if len(point) == 3:
dist, ind = tree.query([np.array(point[:2])], k=1) dataset = [[point1[0], point1[1]] for point1 in dataset]
return ind[0][0] tree = KDTree(np.array(dataset), leaf_size=20)
dist, ind = tree.query([np.array(point[:2])], k=m)
index_list = list(ind[0])
def nearest_two_point_of_dataset(point, dataset): return index_list
# 在dataset中寻找距离point最近的点
# 仅使用两坐标
tree = KDTree(np.array(dataset), leaf_size=const.LEAF_SIZE)
dist, ind = tree.query([np.array(point[:2])], k=2)
return [dataset[ind[0][0]], dataset[ind[0][1]]]
def dis_from_dataset_to_dataset(dataset1, dataset2):
# 两个数据集之间的距离,采取一个数据集的点到另一个数据集的距离的平均
# 数据集经过插值后,计算点到垂线的距离来度量宽度的误差会很小
# 首先保证同向
dataset1 = [[data[0], data[1]] for data in dataset1]
dataset2 = [[data[0], data[1]] for data in dataset2]
dataset1 = interpolation_dataset(dataset1, 2) # 插值
dataset2 = interpolation_dataset(dataset2, 2)
distance1 = two_point_distance(dataset1[0], dataset2[0])
distance2 = two_point_distance(dataset1[0], dataset2[-1])
if distance1 > distance2:
dataset2 = dataset2[::-1]
# 取对齐的两段
index1 = nearest_point_index_of_dataset(dataset1[0], dataset2)
index2 = nearest_point_index_of_dataset(dataset2[0], dataset1)
distance1 = two_point_distance(dataset1[0], dataset2[index1])
distance2 = two_point_distance(dataset2[0], dataset1[index2])
if distance1 > distance2:
dataset1 = dataset1[index2:]
else:
dataset2 = dataset2[index1:]
index3 = nearest_point_index_of_dataset(dataset1[-1], dataset2)
index4 = nearest_point_index_of_dataset(dataset2[-1], dataset1)
distance3 = two_point_distance(dataset1[-1], dataset2[index3])
distance4 = two_point_distance(dataset2[-1], dataset1[index4])
if distance3 > distance4:
dataset1 = dataset1[:index4 + 1]
else:
dataset2 = dataset2[:index3 + 1]
if len(dataset2) < 2:
# 仅有一个点,在此特殊情形下,仅作简单计算
point = dataset2[0]
index5 = nearest_point_index_of_dataset(point, dataset1)
target_distance = two_point_distance(point, dataset1[index5])
else:
distance_list = []
for i in range(0, len(dataset1)):
point1 = dataset1[i]
point2, point3 = nearest_two_point_of_dataset(point1, dataset2)
distance = one_point_to_vertical_point_dis(point1, point2, point3)
distance_list.append(distance)
target_distance = sum(distance_list) / len(distance_list)
return round(target_distance * 1000)
...@@ -23,47 +23,67 @@ class _const: ...@@ -23,47 +23,67 @@ class _const:
c = _const() c = _const()
c.BACKUP_MESH_BOX_FILE_NAME = "backup_mesh_box.gpkg" # 图幅框文件的备份文件
c.MESH_BOX_LAYER_NAME = "l13" # 图幅框文件的图层名 # main.py中使用的常量
c.MESH_BOX_MESH_ID = "meshid" # 图幅框文件的图幅号字段 c.MESH_BOX_LAYER_NAME = "l13" # 图幅框的图层名
c.MESH_ID = "MESH_ID" # 地图数据的图幅号
c.LINK_ID = "LINK_ID" # connect_mesh.py中使用的常量
c.LINK_IDS = "LINK_IDS" c.MESH_ID = "MESH_ID" # 图幅号
c.EDGE_IDS = "EDGE_IDS" c.BUFFER_SIZE1 = 0.0005 # 指定的图幅框的buffer的大小
c.LANE_IDS = "LANE_IDS" c.BUFFER_SIZE2 = 0.00008 # 根据相邻图幅框的共有顶点构造的buffer框的大小,近似8米
c.SMOOTHING_MOUNT = 70 # 接边的时候在边界附近平滑的次数,单位是每米的平滑次数 c.BUFFER_SIZE4 = 0.00005 # 根据对角的图幅框的共有顶点构造的buffer框的大小,近似4米
c.START_NODE_ID = "START_NODE_ID" c.HD_TOPO_P = "HD_TOPO_P" # 拓扑点的图层名
c.END_NODE_ID = "END_NODE_ID" c.HD_LINK = "HD_LINK" # 参考线图层名
c.NODE_ID = "NODE_ID" c.HD_NODE = "HD_NODE" # 参考线节点图层名
c.HD_LINK_ID = "HD_LINK_ID" c.HD_LANE_LINK = "HD_LANE_LINK" # 中心线图层名
c.START_EDGE_NODE_ID = "START_EDGE_NODE_ID" c.HD_LANE_NODE = "HD_LANE_NODE" # 中心线节点图层名
c.END_EDGE_NODE_ID = "END_EDGE_NODE_ID" c.HD_LANE_EDGE = "HD_LANE_EDGE" # 边界线图层名
c.START_LANE_NODE_ID = "START_LANE_NODE_ID" c.HD_LANE_EDGE_NODE = "HD_LANE_EDGE_NODE" # 边界线节点图层名
c.END_LANE_NODE_ID = "END_LANE_NODE_ID" c.HD_LINK_EDGE = "HD_LINK_EDGE" # 道路边缘图层名
c.LENGTH_THRESHOLD = 0.001 # 抽稀阈值 c.HD_LINK_EDGE_NODE = "HD_LINK_EDGE_NODE" # 道路边缘节点图层名
c.MESH_HD_LINK = "HD_LINK" c.PREV_COUNT = "PREV_COUNT" # 某个gps点的前面有几个节点
c.LANE_EDGE_ID = "LANE_EDGE_ID" c.NEXT_COUNT = "NEXT_COUNT" # 某个gps点的后面有几个节点
c.LEFT_LANE_EDGE_ID = "LEFT_LANE_EDGE_ID" c.AZIMUTH = "AZIMUTH" # 采集车的朝向角
c.RIGHT_LANE_EDGE_ID = "RIGHT_LANE_EDGE_ID" c.PITCH = "PITCH"
c.RECTANGLE_WIDTH = 30 # 设置buffer框的宽度为30,长已知 c.ROLL = "ROLL"
c.LINK_DIS_POINTS = 5 # 计算参考线之间距离所使用的点的个数 c.NEXT = "NEXT" # topo的下一个gps点
c.LINK_DIS_THRESHOLD = 4 # 衡量两个图幅的参考线之间距离的阈值 c.PREV = "PREV" # topo点的上一个gps点
c.MESH_HD_LANE_EDGE = "HD_LANE_EDGE" # 边界线图层名 c.ID = "ID" # gps数据的主键
c.MESH_HD_LANE_LINK = "HD_LANE_LINK" # 中心线图层名 c.BUFFER_SIZE3 = 0.00015 # 根据topo构建buffer的大小,近似15米
c.MESH_HD_NODE = "HD_NODE" # 参考线节点图层名 c.LEAF_SIZE = 20 # kd树的参数
c.MESH_NODE_ID = "NODE_ID" # 参考线节点的主键 c.INTERPOLATION_COUNT1 = 5 # 计算topo点到边界线的距离的时候插值的大小,越大,计算越准确
c.MESH_HD_LANE_NODE = "HD_LANE_NODE" # 中心线节点图层名 c.ANGLE1 = 60 # 用于避免引入交叉道路的接边数据
c.MESH_LANE_NODE_ID = "LANE_NODE_ID" # 中心线节点图层的主键 c.DIS_THRESHOLD1 = 4 # 只取距离gps在此范围内的边界线
c.MESH_HD_LANE_EDGE_NODE = "HD_LANE_EDGE_NODE" # 边界线节点图层名 c.HD_LINK_ID = "HD_LINK_ID" # 关联的参考线
c.MESH_LANE_EDGE_NODE_ID = "LANE_EDGE_NODE_ID" # 边界线节点图层的主键 c.LINK_ID = "LINK_ID" # 参考线的主键
c.SMOOTHING_DATA_LENGTH = 6 # 截取多长的数据平滑 c.LANE_EDGE_ID = "LANE_EDGE_ID" # 边界线的主键
c.MESH_INTERPOLATION_MOUNT1 = 1 # 接边的时候插值 c.LANE_LINK_ID = "LANE_LINK_ID" # 中心线的主键
c.MESH_INTERPOLATION_MOUNT2 = 0.125 # 接边的时候插值 c.LEFT_LINK_EDGE = "LEFT_LINK_EDGE" # 参考线关联的左边缘
c.CONNECT_DIS_THRESHOLD = 3 # 衡量是否连接对应两条线的阈值,针对边界线和中心线 c.RIGHT_LINK_EDGE = "RIGHT_LINK_EDGE" # 参考线关联的右边缘
c.ELEVATION_THRESHOLD = 1.5 # 判断对应的两条路是否在同一平面 # c.DIS_THRESHOLD2 = 50
c.NEAR_POINT_THRESHOLD = 0.03 # 是否相接近判断的阈值 c.AZIMUTH_THRESHOLD = 3
c.ALPHA = 0.00001 # 判断两个浮点数是否相等 c.DIS_THRESHOLD2 = 5
c.LEAF_SIZE = 20 c.ANGLE2 = 20
c.RETURNED_MESH_INFO_FILE_NAME = "未接边信息.txt" c.LINK_EDGE_ID = "LINK_EDGE_ID" # 道路边缘图层的主键
c.INTERPOLATION_COUNT2 = 4
c.COMPARISON_NUM1 = 5 # 使用多少个cpt的y轴数据或者z轴数据去做对比
c.LINK_CONNECT_INTERVAL = 4 # 参考线的间隔在这个范围内就接边
c.ELEVATION_THRESHOLD = 1.6 # 连接处的高程差距超过这个值就认为是高架桥
c.MOVE_COUNT_THRESHOLD = 25 # 如果begin_topo移动30次后还没有找到对应的接边参考线,就不接边
c.START_NODE_ID = "START_NODE_ID" # 参考线的起点节点的id
c.END_NODE_ID = "END_NODE_ID" # 参考线的尾点节点的id
c.NODE_ID = "NODE_ID" # 参考线节点的主键
c.INTERPOLATION_COUNT3 = 2 # 接边时候在接边处的插值数量
c.COMPARISON_NUM2 = 5 # 使用多少个点做接边相关的操作
c.SMOOTHING_COUNT = 50 # 每米平滑得次数
c.INTERPOLATION_COUNT4 = 0.125 # 最终数据的插值阈值
c.START_EDGE_NODE_ID = "START_EDGE_NODE_ID" # 边界线的起点字段
c.END_EDGE_NODE_ID = "END_EDGE_NODE_ID" # 边界线的终点字段
c.START_LANE_NODE_ID = "START_LANE_NODE_ID" # 中心线的起点字段
c.END_LANE_NODE_ID = "END_LANE_NODE_ID" # 中心线的终点字段
c.CONNECT_THRESHOLD = 2 # 除了参考线之外,需要连接的两天线之间cpt下x坐标的最大间距,超过就不接边
c.LANE_EDGE_NODE_ID = "LANE_EDGE_NODE_ID" # 边界线节点的主键
c.LANE_NODE_ID = "LANE_NODE_ID" # 中心线节点的主键
c.LINK_EDGE_NODE_ID = "LINK_EDGE_NODE_ID" # 道路边缘节点的主键
sys.modules[__name__] = c sys.modules[__name__] = c
# _*_ coding: utf-8 _*_ #
# @Time: 2020/2/26 15:33
# @Author: XiongChao
from util import const
from .common import one_point_to_vertical_point_dis
from edge_match_src import remove_duplicate_data
class DouglasPeuker(object):
def __init__(self):
self.threshold = const.LENGTH_THRESHOLD
self.qualify_list = list()
self.disqualify_list = list()
def diluting(self, point_list):
"""
vacuate_layer
:param point_list:二维点列表
:return:
"""
if len(point_list) < 3:
self.qualify_list.extend(point_list[::-1])
else:
# 找到与收尾两点连线距离最大的点
max_distance_index, max_distance = 0, 0
for index, point in enumerate(point_list):
if index in [0, len(point_list) - 1]:
continue
distance = one_point_to_vertical_point_dis(point, point_list[0], point_list[-1])
if distance > max_distance:
max_distance_index = index
max_distance = distance
# 若最大距离小于阈值,则去掉所有中间点。 反之,则将曲线按最大距离点分割
if max_distance < self.threshold:
self.qualify_list.append(point_list[-1])
self.qualify_list.append(point_list[0])
else:
# 将曲线按最大距离的点分割成两段
sequence_a = point_list[:max_distance_index + 1]
sequence_b = point_list[max_distance_index:]
for sequence in [sequence_a, sequence_b]:
if len(sequence) < 3 and sequence == sequence_b:
self.qualify_list.extend(sequence[::-1])
else:
self.disqualify_list.append(sequence)
def models(self, point_list):
self.diluting(point_list)
while len(self.disqualify_list) > 0:
self.diluting(self.disqualify_list.pop())
target_dataset = remove_duplicate_data(self.qualify_list)
return target_dataset
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment