Commit 89cd51f0 authored by xiongchao's avatar xiongchao

提交自动化流程中的接边

parent eac857da
## pycharm ignore
# User-specific files
#虚拟环境
venv
#python cache files
__pycache__
*.pyc
#pycharm project files
.idea
# lane_match
# edge_mathch
1. 高精地图数据图幅边框接边工具使用方法
本工具通过命令行参数制定输入和输出参数, 参数含义如下:
1) 数据库连接字符串,如"PG:dbname='test3' host='localhost' port='5432' user='postgres' password='19931018'"
2) 需要接边的图幅id文件,如"C:\Users\熊\Desktop\edge_match_file\mesh_id.txt"
3) 图幅框文件,如"C:\Users\熊\Desktop\edge_match_file\mesh_grid.gpkg"
2. 示例如下:
lane_match "PG:dbname='test3' host='localhost' port='5432' user='postgres' password='19931018'" "C:\Users\熊\Desktop\edge_match_file\mesh_id.txt" "C:\Users\熊\Desktop\edge_match_file\mesh_grid.gpkg"
3. 备注
3.1 生成的未接边文件名为:未接边信息.txt,其在图幅框文件的同一目录下
3.2 在图幅框文件目录下会生成一个新的图幅框文件,其中的图幅框仅包含“图幅id文件”中的图幅
\ No newline at end of file
# _*_ coding: utf-8 _*_ #
# @Time: 2020/4/22 17:04
# @Author: XiongChao
import shutil
import os
from smoothing.savitsky_golay_smoothing import automatic_smoothing
from smoothing.polynomial_fitting import get_coefficient_vector, poly_fitting_parameter
from smoothing.rotation_and_translation_of_coordinate import positive_transformation_point, \
negative_transformation_point, positive_transformation
from util.data_interpolation import interpolation_between_two_point
from util.improved_douglas_peuker import DouglasPeuker
from util.common import *
from edge_match_src import *
class BaseConnectMesh:
def __init__(self, connect_postgresql, sheet_designation_path, mesh_box_path):
self.connect_postgresql = connect_postgresql
self.sheet_designation_path = sheet_designation_path
self.mesh_box_path = mesh_box_path
self.map_data_source = ogr.Open(self.connect_postgresql, 1)
self.mesh_box_data_source = ogr.Open(self.mesh_box_path, 1)
def is_execute(self):
# 判断是否执行后面的接边逻辑
if self.map_data_source is None or self.mesh_box_data_source is None:
return False
return True
def get_sheet_designations(self):
# 获取图幅号列表
with open(self.sheet_designation_path, 'r', encoding='utf-8') as file:
lines = file.readlines()
mesh_ids = []
for line in lines:
if line.strip():
mesh_ids.append(int(line.strip()))
return mesh_ids
@staticmethod
def is_little_mesh(mesh_ids):
# 判断是否是空图幅文件
if len(mesh_ids) < 2:
print("传入的图幅框文件中图幅号数量少于两个!不执行接边逻辑!")
return True
return False
@staticmethod
def get_mesh_box_by_id(mesh_box_layer, mesh_id):
# 根据图幅号获取图幅框的feature
filter = '"' + const.MESH_BOX_MESH_ID + '" = ' + "'" + str(mesh_id) + "'"
mesh_box_layer.SetAttributeFilter(filter)
mesh_box_feature_count = mesh_box_layer.GetFeatureCount()
if mesh_box_feature_count != 1:
print("根据图幅号:" + str(mesh_id) + "获取的图幅框的feature不是一个,请查验!")
return None
mesh_box_feature = mesh_box_layer.GetNextFeature()
mesh_box_layer.SetAttributeFilter(None) # 消除属性过滤
return mesh_box_feature
def connect_adjacent_mesh(self, mesh_ids, mesh_box_layer):
# 获取相邻的图幅号对,此处的图幅框已经已经改变
returned_mesh_info = []
mesh_count = len(mesh_ids)
for i in range(mesh_count - 1):
mesh_id1 = mesh_ids[i]
mesh_box_feature1 = self.get_mesh_box_by_id(mesh_box_layer, mesh_id1)
if mesh_box_feature1 is None:
continue
mesh_box_geom1 = mesh_box_feature1.geometry()
for j in range(i + 1, mesh_count):
mesh_id2 = mesh_ids[j]
mesh_box_feature2 = self.get_mesh_box_by_id(mesh_box_layer, mesh_id2)
if mesh_box_feature2 is None:
continue
mesh_box_geom2 = mesh_box_feature2.geometry()
intersection_geom = mesh_box_geom1.Intersection(mesh_box_geom2)
if not intersection_geom.IsEmpty() and len(intersection_geom.GetPoints()) == 2:
mesh_box_public_border = [[data[0], data[1], 0] for data in intersection_geom.GetPoints()]
solution = ConnectTwoMesh(mesh_id1, mesh_id2, mesh_box_public_border, mesh_box_geom1,
mesh_box_geom2, self.map_data_source)
solution.models()
if solution.mesh_info:
returned_mesh_info += solution.mesh_info
file_str = self.mesh_box_path.split('\\')[:-1]
file_dir = '\\'.join(file_str)
files = os.listdir(file_dir)
file_str.append(const.RETURNED_MESH_INFO_FILE_NAME)
returned_mesh_info_path = '\\'.join(file_str)
if const.RETURNED_MESH_INFO_FILE_NAME in files:
os.remove(returned_mesh_info_path)
# if not returned_mesh_info:
# return
# file_str = self.mesh_box_path.split('\\')[:-1]
# file_str.append(const.RETURNED_MESH_INFO_FILE_NAME)
# returned_mesh_info_path = '\\'.join(file_str)
with open(returned_mesh_info_path, 'w', encoding='utf-8') as file:
file.write("第一列为图幅框1的id\n")
file.write("第二列为图幅框2的id\n")
file.write("第三列为参考线1的id\n")
file.write("第四列为参考线2的id\n\n")
for info in returned_mesh_info:
file.write(str(info[0]) + " " + str(info[1]) + " " + str(info[2]) + " " + str(info[3]))
class ConnectMeshs(BaseConnectMesh):
"""
连接所有的图幅框中的道路
"""
def __init__(self, connect_postgresql, sheet_designation_path, mesh_box_path):
super().__init__(connect_postgresql, sheet_designation_path, mesh_box_path)
def backup_mesh_box_file(self, mesh_ids):
# 备份图幅框文件,并将不在图幅列表中的图幅框删除
# 先删除原始图幅框文件中的空feature
delete_fids = []
mesh_box_layer = self.mesh_box_data_source.GetLayerByName(const.MESH_BOX_LAYER_NAME)
mesh_box_feature_count = mesh_box_layer.GetFeatureCount()
mesh_box_layer.StartTransaction()
for j in range(mesh_box_feature_count):
mesh_box_feature = mesh_box_layer.GetNextFeature()
if not mesh_box_feature:
delete_fids.append(mesh_box_feature.GetFID())
for delete_fid in delete_fids:
mesh_box_layer.DeleteFeature(delete_fid)
mesh_box_layer.CommitTransaction()
mesh_ids = mesh_ids + [] # 相当于复制了一份
file_str = self.mesh_box_path.split('\\')[:-1]
file_str.append(const.BACKUP_MESH_BOX_FILE_NAME)
backup_box_path = '\\'.join(file_str)
shutil.copy(self.mesh_box_path, backup_box_path) # 备份图幅框文件
self.mesh_box_data_source = ogr.Open(backup_box_path, 1) # 重新获取图幅框文件的数据源
mesh_box_layer = self.mesh_box_data_source.GetLayerByName(const.MESH_BOX_LAYER_NAME)
mesh_box_feature_count = mesh_box_layer.GetFeatureCount()
delete_fids = []
mesh_box_layer.StartTransaction()
for i in range(mesh_box_feature_count):
mesh_box_feature = mesh_box_layer.GetNextFeature()
if not mesh_box_feature:
delete_fids.append(mesh_box_feature.GetFID())
continue
mesh_id = mesh_box_feature[const.MESH_BOX_MESH_ID]
if mesh_id in mesh_ids:
mesh_ids.remove(mesh_id) # 不断减小列表,利于加快后面的遍历速度
for delete_fid in delete_fids:
mesh_box_layer.DeleteFeature(delete_fid)
mesh_box_layer.CommitTransaction()
return mesh_box_layer
def models(self):
if not self.is_execute():
return
mesh_ids = self.get_sheet_designations()
result1 = self.is_little_mesh(mesh_ids)
if result1:
return
mesh_box_layer = self.backup_mesh_box_file(mesh_ids)
self.connect_adjacent_mesh(mesh_ids, mesh_box_layer)
self.mesh_box_data_source.Destroy()
self.map_data_source.Destroy()
class ConnectTwoMesh:
"""
此类用于连接两个图幅
"""
def __init__(self, meshid1, meshid2, boundary_line, polygon_geom1, polygon_geom2, dataSource):
self.meshid1 = meshid1
self.meshid2 = meshid2
self.mesh_blh_boundary_line = boundary_line # 包含两个点
self.polygon_geom1 = polygon_geom1
self.polygon_geom2 = polygon_geom2
self.dataSource = dataSource
self.smoothing_count = const.SMOOTHING_MOUNT # 接边处平滑的次数
self.mesh_info = [] # 用于写入未接边情形的图幅号和参考线id信息
self.lane_edge_layer = self.dataSource.GetLayerByName(const.MESH_HD_LANE_EDGE)
self.lane_link_layer = self.dataSource.GetLayerByName(const.MESH_HD_LANE_LINK)
self.link_layer = self.dataSource.GetLayerByName(const.MESH_HD_LINK)
self.lane_edge_node_layer = self.dataSource.GetLayerByName(const.MESH_HD_LANE_EDGE_NODE)
self.lane_node_layer = self.dataSource.GetLayerByName(const.MESH_HD_LANE_NODE)
self.node_layer = self.dataSource.GetLayerByName(const.MESH_HD_NODE)
def dataset_direction1(self, geom):
# 获取数据集的方向,指向边界线为正向,数据没有越界
# intersection为延申的交点
blh_points = geom.GetPoints()
utm_points = transform_to_utm(blh_points, self.source_srs)[0]
utm_points = remove_duplicate_data(utm_points)
intersection = intersection_of_two_line(utm_points[0], utm_points[-1],
self.mesh_utm_boundary_line[0], self.mesh_utm_boundary_line[-1])
distance1 = two_point_distance(utm_points[0][:2], intersection[:2]) # 仅计算二维
distance2 = two_point_distance(utm_points[-1][:2], intersection[:2])
if distance1 < distance2:
flag = 1
utm_points = utm_points[::-1]
else:
flag = 0
return utm_points, flag
def dataset_direction2(self, geom):
# 获取数据集的方向,指向边界线为正向,数据没有越界
# intersection为延申的交点
blh_points = geom.GetPoints()
utm_points = transform_to_utm(blh_points, self.source_srs)[0]
utm_points = remove_duplicate_data(utm_points)
intersection = intersection_of_two_line(utm_points[0], utm_points[-1],
self.mesh_utm_boundary_line[0], self.mesh_utm_boundary_line[-1])
distance1 = two_point_distance(utm_points[0][:2], intersection[:2]) # 仅计算二维
distance2 = two_point_distance(utm_points[-1][:2], intersection[:2])
if abs(distance1 - distance2) < 0.15:
# 此处认为会有0.15的误差
return False
if distance1 < distance2:
flag = 1
utm_points = utm_points[::-1]
else:
flag = 0
return utm_points, flag
def dataset_direction(self, feature, polygon_geom1):
# 获取数据集的方向,指向图幅相交边界线为正向
# feature对应的图幅号与polygon1对应的图幅号相同
geom = feature.GetGeometryRef()
if polygon_geom1.Contains(geom):
utm_points, flag = self.dataset_direction1(geom)
else:
geom1 = geom.Intersection(polygon_geom1) # 取框内的部分
if geom1.IsEmpty():
# 全部在图幅框外
utm_points1, flag1 = self.dataset_direction1(geom)
flag = abs(flag1 - 1)
utm_points = utm_points1[::-1]
else:
result = self.dataset_direction2(geom1)
if not result:
geom2 = geom.Difference(polygon_geom1) # 选择图幅框外面的
utm_points1, flag1 = self.dataset_direction1(geom2)
flag = abs(flag1 - 1)
else:
utm_points1, flag = result
blh_points = geom.GetPoints()
utm_points = transform_to_utm(blh_points, self.source_srs)[0]
utm_points = remove_duplicate_data(utm_points)
if flag == 1:
utm_points = utm_points[::-1]
return utm_points, flag
@staticmethod
def get_boundary_point(point, x_mean, y_mean, theta, parameter):
new_point = [point[0] - x_mean, point[1] - y_mean]
positive_point = positive_transformation_point(theta, new_point)
X_vector = get_coefficient_vector(positive_point, 1)
Y_predict = float(np.squeeze(np.dot(X_vector.T, parameter))) # 去掉矩阵的维度,变为一个浮点数
negative_point = np.zeros((2, 1))
negative_point[0, 0], negative_point[1, 0] = positive_point[0], Y_predict
result_point = negative_transformation_point(theta, negative_point)
target_point = [result_point[0] + x_mean, result_point[1] + y_mean]
return target_point
def get_connect_boundary(self, link_feature1, lane_edge_features1, lane_link_features1,
link_feature2, lane_edge_features2, lane_link_features2):
# 获取两个图幅文件对应接边的两条道路的共有边界
boundary_point = []
link_utm_dataset1, flag = self.dataset_direction(link_feature1, self.polygon_geom1)
link_utm_dataset2, flag = self.dataset_direction(link_feature2, self.polygon_geom2)
link_middle_point = [(link_utm_dataset1[-1][0] + link_utm_dataset2[-1][0]) / 2,
(link_utm_dataset1[-1][1] + link_utm_dataset2[-1][1]) / 2,
(link_utm_dataset1[-1][2] + link_utm_dataset2[-1][2]) / 2, ]
boundary_point.append(link_middle_point)
for lane_edge_feature1 in lane_edge_features1:
target_lane_edge_feature = self.get_connect_feature(lane_edge_feature1, lane_edge_features1,
lane_edge_features2)
if target_lane_edge_feature:
lane_edge_utm_dataset1, flag = self.dataset_direction(lane_edge_feature1, self.polygon_geom1)
lane_edge_utm_dataset2, flag = self.dataset_direction(target_lane_edge_feature, self.polygon_geom2)
lane_edge_middle_point = [(lane_edge_utm_dataset1[-1][0] + lane_edge_utm_dataset2[-1][0]) / 2,
(lane_edge_utm_dataset1[-1][1] + lane_edge_utm_dataset2[-1][1]) / 2,
(lane_edge_utm_dataset1[-1][2] + lane_edge_utm_dataset2[-1][2]) / 2]
boundary_point.append(lane_edge_middle_point)
for lane_link_feature1 in lane_link_features1:
target_lane_link_feature = self.get_connect_feature(lane_link_feature1, lane_link_features1,
lane_link_features2)
if target_lane_link_feature:
lane_link_utm_dataset1, flag = self.dataset_direction(lane_link_feature1, self.polygon_geom1)
lane_link_utm_dataset2, flag = self.dataset_direction(target_lane_link_feature, self.polygon_geom2)
lane_edge_middle_point = [(lane_link_utm_dataset1[-1][0] + lane_link_utm_dataset2[-1][0]) / 2,
(lane_link_utm_dataset1[-1][1] + lane_link_utm_dataset2[-1][1]) / 2,
(lane_link_utm_dataset1[-1][2] + lane_link_utm_dataset2[-1][2]) / 2]
boundary_point.append(lane_edge_middle_point)
# 对获取的交界的点集做直线拟合,获取边界
positive_dataset, theta, x_mean, y_mean = positive_transformation(boundary_point)
parameter = poly_fitting_parameter(positive_dataset, 1)
# 取左边图幅第一条边界西安
target_point1 = self.get_boundary_point(boundary_point[0], x_mean, y_mean, theta, parameter)
target_point1 = [target_point1[0], target_point1[1], boundary_point[0][2]]
target_point2 = self.get_boundary_point(boundary_point[-1], x_mean, y_mean, theta, parameter)
target_point2 = [target_point2[0], target_point2[1], boundary_point[0][2]]
return [target_point1, target_point2]
def get_feature_dataset(self, feature1, feature2):
# 获取feature的方向,指向边界为标准
# 获取feature的数据,并转化为投影数据
utm_points1, flag1 = self.dataset_direction(feature1, self.polygon_geom1)
utm_points2, flag2 = self.dataset_direction(feature2, self.polygon_geom2)
return utm_points1, flag1, utm_points2, flag2
@staticmethod
def single_dis_from_dataset_to_dataset(dataset1, dataset2):
# dataset2到dataset1的距离
positive_dataset, theta, x_mean, y_mean = positive_transformation(dataset1)
parameter = np.squeeze(poly_fitting_parameter(positive_dataset, 1))
distance = 0
for point in dataset2:
new_point = [point[0] - x_mean, point[1] - y_mean]
positive_point = positive_transformation_point(theta, new_point)
distance += dis_from_point_to_line(parameter[1], parameter[0], positive_point)
return distance / len(dataset2)
def dis_from_dataset_to_dataset(self, dataset1, dataset2):
distance1 = self.single_dis_from_dataset_to_dataset(dataset1, dataset2)
distance2 = self.single_dis_from_dataset_to_dataset(dataset2, dataset1)
return (distance1 + distance2) / 2
@staticmethod
def interception_n_meter(dataset, n):
# 从尾部开始,截取长度为n米的点集
point = dataset.pop()
target_dataset = [point]
cumulative_distance = 0
while dataset:
point1 = dataset.pop()
target_dataset.append(point1)
distance = two_point_distance(point, point1)
cumulative_distance += distance
if cumulative_distance > n:
# 仅取n米
break
point = point1
if cumulative_distance <= n:
return target_dataset
distance = two_point_distance(target_dataset[-1], target_dataset[-2])
distance1 = cumulative_distance - distance
interp_dataset = interpolation_between_two_point(target_dataset[-2], target_dataset[-1], 2) # 不保留尾
k = int(len(interp_dataset) * (n - distance1) / distance) + 1
target_dataset = target_dataset[:-2] + interp_dataset[:k]
return target_dataset
def two_feature_distance(self, feature1, feature2):
# 两个feature可能有重合,可能没有
# 每个feature取3个点,做直线拟合,求直线距离
utm_dataset1, flag1 = self.dataset_direction(feature1, self.polygon_geom1)
utm_dataset2, flag2 = self.dataset_direction(feature2, self.polygon_geom2)
dataset1 = self.interception_n_meter(utm_dataset1, 3)
dataset2 = self.interception_n_meter(utm_dataset2, 3)
distance = self.dis_from_dataset_to_dataset(dataset1, dataset2)
return distance
def get_connect_feature(self, feature, features1, features2):
# 从features2中找到与feature连接的feature,feature属于features1
distance_list1 = []
for feature2 in features2:
distance = self.two_feature_distance(feature, feature2)
distance_list1.append(distance)
min_distance1 = min(distance_list1)
if min_distance1 > const.CONNECT_DIS_THRESHOLD:
return False
index1 = distance_list1.index(min_distance1)
target_feature = features2[index1]
distance_list2 = []
for feature1 in features1:
distance = self.two_feature_distance(feature1, target_feature)
distance_list2.append(distance)
min_distance2 = min(distance_list2)
if min_distance2 > const.CONNECT_DIS_THRESHOLD:
return False
index2 = distance_list2.index(min_distance2)
if features1[index2] is feature:
return target_feature
return False
def get_suspicious_link_features(self):
utm__buffer_points = get_buffer_points(self.mesh_utm_boundary_line[0], self.mesh_utm_boundary_line[1])
blh_buffer_points = transform_to_src(utm__buffer_points, self.source_srs, self.utm_srs)
xmin = min([point[0] for point in blh_buffer_points])
xmax = max([point[0] for point in blh_buffer_points])
ymin = min([point[1] for point in blh_buffer_points])
ymax = max([point[1] for point in blh_buffer_points])
self.link_layer.SetSpatialFilterRect(xmin, ymin, xmax, ymax)
link_feature_count = self.link_layer.GetFeatureCount()
if link_feature_count < 2:
print(str(self.meshid1) + "与" + str(self.meshid2) + "的边界相差太远,默认不接边!")
return
link_features1 = [] # 不能使用a = b = [],否则a的变化会反映到b上
link_features2 = []
for i in range(link_feature_count):
link_feature = self.link_layer.GetNextFeature()
if not link_feature:
continue
if link_feature[const.MESH_ID] == self.meshid1:
link_features1.append(link_feature)
elif link_feature[const.MESH_ID] == self.meshid2:
link_features2.append(link_feature)
if not link_features1 or not link_features2:
print(str(self.meshid1) + "或" + str(self.meshid2) + "距离边界太远,默认不接边!")
return
self.link_layer.ResetReading()
new_link_features1 = []
new_link_features2 = []
for link_feature1 in link_features1:
_, flag = self.dataset_direction(link_feature1, self.polygon_geom1)
mark = self.is_last_link_feature(link_feature1, flag)
if mark:
new_link_features1.append(link_feature1)
for link_feature2 in link_features2:
_, flag = self.dataset_direction(link_feature2, self.polygon_geom2)
mark = self.is_last_link_feature(link_feature2, flag)
if mark:
new_link_features2.append(link_feature2)
self.link_layer.SetSpatialFilter(None)
return new_link_features1, new_link_features2
def is_last_link_feature(self, feature, flag):
# 判断是不是一个图幅中的最后一个feature
if flag == 1:
node_id = feature[const.START_NODE_ID]
else:
node_id = feature[const.END_NODE_ID]
meshid = feature[const.MESH_ID]
filter = '"' + const.NODE_ID + '" = ' + str(node_id) + ' and "' + \
const.MESH_ID + '" = ' + "'" + str(meshid) + "'"
# filter = f'"{const.NODE_ID}"={node_id} and "{const.MESH_ID}"= \'{str(meshid)}\''
self.node_layer.SetAttributeFilter(filter)
node_feature_count = self.node_layer.GetFeatureCount()
if node_feature_count != 1:
# 不是一个说明数据有问题
return False
# 此节点关联的参考线只有一条
node_feature = self.node_layer.GetNextFeature()
link_ids = node_feature[const.LINK_IDS]
if ";" in link_ids:
# 说明不止关联一个link
return False
self.node_layer.ResetReading()
self.node_layer.SetAttributeFilter(None)
return True
def is_last_lane_link_feature(self, feature, flag):
# 判断是不是一个图幅中的最后一个feature
if flag == 1:
lane_node_id = feature[const.START_LANE_NODE_ID]
else:
lane_node_id = feature[const.END_LANE_NODE_ID]
meshid = feature[const.MESH_ID]
filter = '"' + const.MESH_LANE_NODE_ID + '" = ' + str(lane_node_id) + 'and "' + \
const.MESH_ID + '" = ' + "'" + str(meshid) + "'"
# filter = f'"{const.MESH_LANE_NODE_ID}"={lane_node_id} and "{const.MESH_ID}"= \'{str(meshid)}\''
self.lane_node_layer.SetAttributeFilter(filter)
lane_node_feature_count = self.lane_node_layer.GetFeatureCount()
if lane_node_feature_count != 1:
# 不是一个说明数据有问题
return False
# 此节点关联的参考线只有一条
lane_node_feature = self.lane_node_layer.GetNextFeature()
lane_link_ids = lane_node_feature[const.LANE_IDS]
if ";" in lane_link_ids:
# 说明不止关联一个link
return False
self.lane_node_layer.ResetReading()
self.lane_node_layer.SetAttributeFilter(None)
return True
def is_last_lane_edge_feature(self, feature, flag):
# 判断是不是一个图幅中的最后一个feature
if flag == 1:
lane_edge_node_id = feature[const.START_EDGE_NODE_ID]
else:
lane_edge_node_id = feature[const.END_EDGE_NODE_ID]
meshid = feature[const.MESH_ID]
filter = '"' + const.MESH_LANE_EDGE_NODE_ID + '" = ' + str(lane_edge_node_id) + ' and "' + \
const.MESH_ID + '" = ' + "'" + str(meshid) + "'"
# filter = f'"{const.MESH_LANE_EDGE_NODE_ID}"={lane_edge_node_id} and ' \
# f'"{const.MESH_ID}"= \'{str(meshid)}\''
self.lane_edge_node_layer.SetAttributeFilter(filter)
lane_edge_node_feature_count = self.lane_edge_node_layer.GetFeatureCount()
if lane_edge_node_feature_count != 1:
# 不是一个说明数据有问题
return False
# 此节点关联的参考线只有一条
lane_edge_node_feature = self.lane_edge_node_layer.GetNextFeature()
lane_edge_ids = lane_edge_node_feature[const.EDGE_IDS]
if ";" in lane_edge_ids:
# 说明不止关联一个link
return False
self.lane_edge_node_layer.ResetReading()
self.lane_edge_node_layer.SetAttributeFilter(None)
return True
@staticmethod
def elevation_judge(dataset1, dataset2, m=10):
# 判断参考线的高程是否在近似同一平面上
# 仅取最后m个点进行操作
dataset1 = dataset1[-m:]
dataset2 = dataset2[-m:]
z1 = [data[2] for data in dataset1]
z2 = [data[2] for data in dataset2]
min_digit = np.inf
for data1 in z1:
for data2 in z2:
digit = abs(data1 - data2)
if digit < min_digit:
min_digit = digit
if min_digit > const.ELEVATION_THRESHOLD:
return False
return True
def connect_judge(self, feature1, feature2, features2):
# 获取前后两个图幅的距离
# feature是参考线的feature
dataset1, flag1, dataset2, flag2 = self.get_feature_dataset(feature1, feature2)
# mark1 = self.is_last_link_feature(feature1, flag1)
# mark2 = self.is_last_link_feature(feature2, flag2)
# if not mark1 or not mark2:
# return False
new_dataset1 = dataset1[-const.LINK_DIS_POINTS:] # 取有限点
# new_dataset2 = dataset2[-const.LINK_DIS_POINTS:]
# distance = self.dis_from_dataset_to_dataset(new_dataset1, new_dataset2)
distance_list = []
for feature in features2:
dataset, flag = self.dataset_direction(feature, self.polygon_geom2)
new_dataset = dataset[-const.LINK_DIS_POINTS:]
distance = self.dis_from_dataset_to_dataset(new_dataset1, new_dataset)
distance_list.append(distance)
min_index = distance_list.index(min(distance_list))
suspicious_feature = features2[min_index]
min_distance = distance_list[min_index]
if suspicious_feature is not feature2 or min_distance > const.LINK_DIS_THRESHOLD:
print(str(self.meshid1) + "与" + str(self.meshid2) + "的参考线相差太远,默认不接边!")
return False
# 对高程的判断,
if not self.elevation_judge(dataset1, dataset2):
print(str(self.meshid1) + "与" + str(self.meshid2) + "的高程值相差太远,默认不接边!")
return False
lane_edge_features1 = self.get_lane_edge_by_link(feature1, self.meshid1)
lane_edge_features2 = self.get_lane_edge_by_link(feature2, self.meshid2)
if not lane_edge_features1 or not lane_edge_features2:
print(str(self.meshid1) + "与" + str(self.meshid2) + "的参考线至少有一条没有关联边界线,默认不接边!")
return False
# 判断这些feature是不是最后一个feature
for lane_edge_feature in lane_edge_features1:
_, flag = self.dataset_direction(lane_edge_feature, self.polygon_geom1)
mark = self.is_last_lane_edge_feature(lane_edge_feature, flag)
if not mark:
return False
for lane_edge_feature in lane_edge_features2:
_, flag = self.dataset_direction(lane_edge_feature, self.polygon_geom2)
mark = self.is_last_lane_edge_feature(lane_edge_feature, flag)
if not mark:
return False
lane_link_features1 = self.get_lane_link_by_link(feature1, self.meshid1)
lane_link_features2 = self.get_lane_link_by_link(feature2, self.meshid2)
if not lane_link_features1 or not lane_link_features2:
print(str(self.meshid1) + "与" + str(self.meshid2) + "的参考线至少有一条没有关联中心线,默认不接边!")
return False
# 判断这些feature是不是最后一个feature
for lane_link_feature in lane_link_features1:
_, flag = self.dataset_direction(lane_link_feature, self.polygon_geom1)
mark = self.is_last_lane_link_feature(lane_link_feature, flag)
if not mark:
return False
for lane_link_feature in lane_link_features2:
_, flag = self.dataset_direction(lane_link_feature, self.polygon_geom2)
mark = self.is_last_lane_link_feature(lane_link_feature, flag)
if not mark:
return False
lane_width1 = 0
m = len(lane_link_features1)
lane_width2 = 0
n = len(lane_link_features2)
if m == 0 or n == 0:
return False
for lane_link_feature in lane_link_features1:
left_lane_edge_id = lane_link_feature[const.LEFT_LANE_EDGE_ID]
right_lane_edge_id = lane_link_feature[const.RIGHT_LANE_EDGE_ID]
left_lane_edge_feature = self.get_lane_edge_by_id(left_lane_edge_id, self.lane_edge_layer)
right_lane_edge_feature = self.get_lane_edge_by_id(right_lane_edge_id, self.lane_edge_layer)
if not left_lane_edge_feature or not right_lane_edge_feature:
continue
left_lane_edge_dataset, _ = self.dataset_direction(left_lane_edge_feature, self.polygon_geom1)
right_lane_edge_dataset, _ = self.dataset_direction(right_lane_edge_feature, self.polygon_geom1)
left_lane_edge_utm_dataset = [[data[0], data[1]] for data in left_lane_edge_dataset] # 仅二维
right_lane_edge_utm_dataset = [[data[0], data[1]] for data in right_lane_edge_dataset] # 仅二维
lane_link_width = dis_from_dataset_to_dataset(left_lane_edge_utm_dataset, right_lane_edge_utm_dataset)
lane_width1 += lane_link_width
for lane_link_feature in lane_link_features2:
left_lane_edge_id = lane_link_feature[const.LEFT_LANE_EDGE_ID]
right_lane_edge_id = lane_link_feature[const.RIGHT_LANE_EDGE_ID]
left_lane_edge_feature = self.get_lane_edge_by_id(left_lane_edge_id, self.lane_edge_layer)
right_lane_edge_feature = self.get_lane_edge_by_id(right_lane_edge_id, self.lane_edge_layer)
if not left_lane_edge_feature or not right_lane_edge_feature:
continue
left_lane_edge_dataset, _ = self.dataset_direction(left_lane_edge_feature, self.polygon_geom2)
right_lane_edge_dataset, _ = self.dataset_direction(right_lane_edge_feature, self.polygon_geom2)
left_lane_edge_utm_dataset = [[data[0], data[1]] for data in left_lane_edge_dataset] # 仅二维
right_lane_edge_utm_dataset = [[data[0], data[1]] for data in right_lane_edge_dataset] # 仅二维
lane_link_width = dis_from_dataset_to_dataset(left_lane_edge_utm_dataset, right_lane_edge_utm_dataset)
lane_width2 += lane_link_width
if (m == n and abs(lane_width1 - lane_width2) > 4000) or \
(abs(m - n) >= 1 and (abs(lane_width1 - lane_width2) < 600)):
self.mesh_info.append([self.meshid1, self.meshid2, feature1[const.LINK_ID], feature2[const.LINK_ID]])
return False
return lane_edge_features1, lane_edge_features2, lane_link_features1, lane_link_features2
@staticmethod
def get_lane_edge_by_id(lane_edge_id, lane_edge_layer):
# 返回关联这个参考线的边线feature
filter = '"' + const.LANE_EDGE_ID + '" = ' + str(lane_edge_id)
lane_edge_layer.SetAttributeFilter(filter)
lane_edge_feature_count = lane_edge_layer.GetFeatureCount()
if lane_edge_feature_count != 1:
return None
else:
lane_edge_feature = lane_edge_layer.GetNextFeature()
lane_edge_layer.ResetReading()
lane_edge_layer.SetAttributeFilter(None)
return lane_edge_feature
def get_node_by_id(self, node_id, meshid):
filter = '"' + const.MESH_NODE_ID + '" = ' + str(node_id) + ' and "' + \
const.MESH_ID + '" = ' + "'" + str(meshid) + "'"
# filter = f'"{const.MESH_NODE_ID}"= {node_id} and "{const.MESH_ID}"= \'{str(meshid)}\''
self.node_layer.SetAttributeFilter(filter)
node_count = self.node_layer.GetFeatureCount()
if node_count != 1:
print("根据参考线节点id:" + str(node_id) + "搜索到的参考线node不是一个,节点状态无法修改")
return
node_feature = self.node_layer.GetNextFeature()
self.node_layer.ResetReading()
self.node_layer.SetAttributeFilter(None)
return node_feature
def get_lane_link_by_link(self, link_feature, meshid):
link_id = link_feature[const.LINK_ID]
filter = '"' + const.HD_LINK_ID + '" = ' + str(link_id) + ' and "' + \
const.MESH_ID + '" = ' + "'" + str(meshid) + "'"
# filter = f'"{const.HD_LINK_ID}"={link_id} and "{const.MESH_ID}"= \'{str(meshid)}\''
self.lane_link_layer.SetAttributeFilter(filter)
lane_link_feature = self.lane_link_layer.GetNextFeature()
lane_link_features = []
while lane_link_feature is not None:
lane_link_features.append(lane_link_feature)
lane_link_feature = self.lane_link_layer.GetNextFeature()
self.lane_link_layer.ResetReading()
self.lane_link_layer.SetAttributeFilter(None)
return lane_link_features
def get_lane_link_node_by_id(self, lane_link_node_id, meshid):
filter = '"' + const.MESH_LANE_NODE_ID + '" = ' + str(lane_link_node_id) + ' and "' + \
const.MESH_ID + '" = ' + "'" + str(meshid) + "'"
# filter = f'"{const.MESH_LANE_NODE_ID}"= {lane_link_node_id} and "{const.MESH_ID}" = \'{str(meshid)}\''
self.lane_node_layer.SetAttributeFilter(filter)
node_count = self.lane_node_layer.GetFeatureCount()
if node_count != 1:
print("根据id:" + str(lane_link_node_id) + "搜索到的中心线node不是一个,节点状态无法修改")
return
link_node_feature = self.lane_node_layer.GetNextFeature()
self.lane_node_layer.ResetReading()
self.lane_node_layer.SetAttributeFilter(None)
return link_node_feature
def get_lane_edge_by_link(self, link_feature, meshid):
# 返回关联这个参考线的边线feature
link_id = link_feature[const.LINK_ID]
filter = '"' + const.HD_LINK_ID + '" = ' + str(link_id) + ' and "' + \
const.MESH_ID + '" = ' + "'" + str(meshid) + "'"
# filter = f'"{const.HD_LINK_ID}"={link_id} and "{const.MESH_ID}"= \'{str(meshid)}\''
self.lane_edge_layer.SetAttributeFilter(filter)
lane_edge_feature = self.lane_edge_layer.GetNextFeature()
lane_edge_features = []
while lane_edge_feature is not None:
lane_edge_features.append(lane_edge_feature)
lane_edge_feature = self.lane_edge_layer.GetNextFeature()
self.lane_edge_layer.ResetReading()
self.lane_edge_layer.SetAttributeFilter(None)
return lane_edge_features
def get_lane_edge_node_by_id(self, lane_edge_node_id, meshid):
filter = '"' + const.MESH_LANE_EDGE_NODE_ID + '" = ' + str(lane_edge_node_id) + \
'and "' + const.MESH_ID + '" = ' + "'" + str(meshid) + "'"
# filter = f'"{const.MESH_LANE_EDGE_NODE_ID}"= {lane_edge_node_id} and ' \
# f'"{const.MESH_ID}" = \'{str(meshid)}\''
self.lane_edge_node_layer.SetAttributeFilter(filter)
node_count = self.lane_edge_node_layer.GetFeatureCount()
if node_count != 1:
print("根据id:" + str(lane_edge_node_id) + "搜索到的边界线node不是一个,节点状态无法修改")
return
link_node_feature = self.lane_edge_node_layer.GetNextFeature()
self.lane_edge_node_layer.ResetReading()
self.lane_edge_node_layer.SetAttributeFilter(None)
return link_node_feature
def get_standard_dataset(self, utm_dataset1, utm_dataset2, flag1, flag2, connect_boundary):
# 获取连接后的标准数据集,参数中的数据都是投影数据
# 仅取有限的数据求得交点,避免点数太多使得数据非直线
new_utm_dataset1 = utm_dataset1[-const.LINK_DIS_POINTS:]
intersection_point1 = intersection_of_two_line(new_utm_dataset1[0], new_utm_dataset1[-1],
connect_boundary[0], connect_boundary[-1])
interpolation_dataset1 = interpolation_dataset(utm_dataset1, const.MESH_INTERPOLATION_MOUNT1)
target_utm_dataset1 = truncate_or_offset_line(interpolation_dataset1, intersection_point1)
target_utm_dataset1 = DouglasPeuker().models(target_utm_dataset1)[::-1]
# target_utm_dataset1 = truncate_or_offset_line(utm_dataset1, intersection_point1)
new_utm_dataset2 = utm_dataset2[-const.LINK_DIS_POINTS:]
intersection_point2 = intersection_of_two_line(new_utm_dataset2[0], new_utm_dataset2[-1],
connect_boundary[0], connect_boundary[-1])
interpolation_dataset2 = interpolation_dataset(utm_dataset2, const.MESH_INTERPOLATION_MOUNT1)
target_utm_dataset2 = truncate_or_offset_line(interpolation_dataset2, intersection_point2)
target_utm_dataset2 = DouglasPeuker().models(target_utm_dataset2)[::-1]
# target_utm_dataset2 = truncate_or_offset_line(utm_dataset2, intersection_point2)
smoothing_dataset1, smoothing_dataset2, middle_intersection_point = \
self.smoothing_dataset(target_utm_dataset1, target_utm_dataset2, connect_boundary)
blh_middle_intersection_point = transform_to_src([middle_intersection_point], self.source_srs, self.utm_srs)[0]
target_blh_dataset1 = transform_to_src(smoothing_dataset1, self.source_srs, self.utm_srs)
target_blh_dataset2 = transform_to_src(smoothing_dataset2, self.source_srs, self.utm_srs)
target_blh_dataset1.append(blh_middle_intersection_point)
target_blh_dataset2.append(blh_middle_intersection_point)
if flag1 == 1:
target_blh_dataset1 = target_blh_dataset1[::-1]
if flag2 == 1:
target_blh_dataset2 = target_blh_dataset2[::-1]
# flag - 1的值代表起点还是终点,0代表起点,-1代表终点,用于修改点图层
return target_blh_dataset1, flag1 - 1, target_blh_dataset2, flag2 - 1, blh_middle_intersection_point
def get_smoothing_count(self, dataset1, dataset2):
# dataset1和dataset2分别是对应接边的数据集
# 仅取最后几个点进行计算
if len(dataset1) == 1 and len(dataset2) == 2:
smoothing_count = 50 # 此种情况下,写死即可
return smoothing_count
elif len(dataset1) == 1 and len(dataset2) > 1:
distance = self.single_dis_from_dataset_to_dataset(dataset2[-3:], dataset1)
elif len(dataset1) > 1 and len(dataset2) == 1:
distance = self.single_dis_from_dataset_to_dataset(dataset1[-3:], dataset2)
else:
distance = self.dis_from_dataset_to_dataset(dataset1[-3:], dataset2[-3:]) # 根据距离计算平滑次数
# 避免太接近导致没有平滑
if distance < 0.5:
smoothing_count = int(self.smoothing_count / 5)
else:
smoothing_count = int(distance * self.smoothing_count)
return smoothing_count
@staticmethod
def remove_near_points(dataset):
# 去除近点
dataset1 = copy.copy(dataset[::-1])
target_dataset = []
point1 = dataset1.pop()
target_dataset.append(point1)
while dataset1:
point2 = dataset1.pop()
distance = two_point_distance(point1, point2)
if distance < const.NEAR_POINT_THRESHOLD:
continue
target_dataset.append(point2)
point1 = point2
return target_dataset
def smoothing_dataset(self, dataset1, dataset2, connect_boundary):
point11 = dataset1.pop()
point21 = dataset2.pop()
new_dataset1 = [point11]
new_dataset2 = [point21]
len1 = 0
len2 = 0
while dataset1:
point12 = dataset1.pop()
new_dataset1.append(point12)
dis1 = two_point_distance(point11, point12)
len1 += dis1
if len1 > const.SMOOTHING_DATA_LENGTH:
break
point11 = point12
new_dataset1 = new_dataset1[::-1]
while dataset2:
point22 = dataset2.pop()
new_dataset2.append(point22)
dis2 = two_point_distance(point21, point22)
len2 += dis2
if len2 > const.SMOOTHING_DATA_LENGTH:
break
new_dataset2 = new_dataset2[::-1]
smoothing_dataset = [] + new_dataset1
smoothing_dataset += new_dataset2[::-1]
smoothing_count = self.get_smoothing_count(new_dataset1, new_dataset2) # 计算平滑次数
# 数据插值、平滑、抽稀、插值、去重
interpolation_smoothing_dataset = interpolation_dataset(smoothing_dataset, const.MESH_INTERPOLATION_MOUNT1)
# target_smoothing_dataset = automatic_smoothing(interpolation_smoothing_dataset, self.smoothing_count)
target_smoothing_dataset = automatic_smoothing(interpolation_smoothing_dataset, smoothing_count)
vacuate_dataset = DouglasPeuker().models(target_smoothing_dataset)[::-1]
interpolation_data = interpolation_dataset(vacuate_dataset, const.MESH_INTERPOLATION_MOUNT2)
# result_dataset = remove_duplicate_data(interpolation_data)
result_dataset = self.remove_near_points(interpolation_data)
# 以边界为界,将平滑数据分类
mark = clockwise_counterclockwise(connect_boundary[0], connect_boundary[-1], result_dataset[0])
for i in range(len(result_dataset)):
mark1 = clockwise_counterclockwise(connect_boundary[0], connect_boundary[-1], result_dataset[i])
if mark1 != mark:
break
smoothing_new_dataset1 = result_dataset[:i]
smoothing_new_dataset2 = result_dataset[i:][::-1]
intersection_point = intersection_of_two_line(smoothing_new_dataset1[-1], smoothing_new_dataset2[-1],
connect_boundary[0], connect_boundary[-1])
target_dataset1 = dataset1 + smoothing_new_dataset1
target_dataset2 = dataset2 + smoothing_new_dataset2
return target_dataset1, target_dataset2, intersection_point
def connect_link(self, link_feature1, link_feature2, connect_boundary):
utm_link_dataset1, flag1, utm_link_dataset2, flag2 = self.get_feature_dataset(link_feature1, link_feature2)
target_blh_dataset1, flag1, target_blh_dataset2, flag2, blh_middle_intersection_point = \
self.get_standard_dataset(utm_link_dataset1, utm_link_dataset2, flag1, flag2, connect_boundary)
geom1 = create_line_geometry(target_blh_dataset1)
geom2 = create_line_geometry(target_blh_dataset2)
self.link_layer.StartTransaction()
link_feature1.SetGeometry(geom1)
link_feature2.SetGeometry(geom2)
self.link_layer.SetFeature(link_feature1)
self.link_layer.SetFeature(link_feature2)
self.link_layer.CommitTransaction()
return flag1, flag2, blh_middle_intersection_point
def connect_node(self, link_feature, flag, blh_target_point, meshid):
# 连接参考线节点
if flag == 0:
node_id = link_feature[const.START_NODE_ID]
else:
node_id = link_feature[const.END_NODE_ID]
node_feature = self.get_node_by_id(node_id, meshid)
if not node_feature:
return
geom = create_point_geometry(blh_target_point)
self.node_layer.StartTransaction()
node_feature.SetGeometry(geom)
self.node_layer.SetFeature(node_feature)
self.node_layer.CommitTransaction()
# link_feature不能释放,因为在for中,target_link_feature1必须始终保持着
node_feature.Destroy()
def connect_lane_edge(self, lane_edge_feature1, lane_edge_feature2, connect_boundary):
utm_lane_edge_dataset1, flag1, utm_lane_edge_dataset2, flag2 = \
self.get_feature_dataset(lane_edge_feature1, lane_edge_feature2)
target_blh_dataset1, flag1, target_blh_dataset2, flag2, blh_middle_intersection_point = \
self.get_standard_dataset(utm_lane_edge_dataset1, utm_lane_edge_dataset2, flag1, flag2, connect_boundary)
geom1 = create_line_geometry(target_blh_dataset1)
geom2 = create_line_geometry(target_blh_dataset2)
self.lane_edge_layer.StartTransaction()
lane_edge_feature1.SetGeometry(geom1)
lane_edge_feature2.SetGeometry(geom2)
self.lane_edge_layer.SetFeature(lane_edge_feature1)
self.lane_edge_layer.SetFeature(lane_edge_feature2)
self.lane_edge_layer.CommitTransaction()
return flag1, flag2, blh_middle_intersection_point
def connect_lane_edge_node(self, lane_edge_feature, flag, blh_target_point, meshid):
if flag == 0:
lane_edge_node_id = lane_edge_feature[const.START_EDGE_NODE_ID]
else:
lane_edge_node_id = lane_edge_feature[const.END_EDGE_NODE_ID]
lane_edge_node_feature = self.get_lane_edge_node_by_id(lane_edge_node_id, meshid)
if not lane_edge_node_feature:
return
geom = create_point_geometry(blh_target_point)
self.lane_edge_node_layer.StartTransaction()
lane_edge_node_feature.SetGeometry(geom)
self.lane_edge_node_layer.SetFeature(lane_edge_node_feature)
self.lane_edge_node_layer.CommitTransaction()
lane_edge_node_feature.Destroy()
def connect_lane_link(self, lane_link_feature1, lane_link_feature2, connect_boundary):
utm_lane_link_dataset1, flag1, utm_lane_link_dataset2, flag2 = \
self.get_feature_dataset(lane_link_feature1, lane_link_feature2)
target_blh_dataset1, flag1, target_blh_dataset2, flag2, blh_middle_intersection_point = \
self.get_standard_dataset(utm_lane_link_dataset1, utm_lane_link_dataset2, flag1, flag2, connect_boundary)
geom1 = create_line_geometry(target_blh_dataset1)
geom2 = create_line_geometry(target_blh_dataset2)
self.lane_link_layer.StartTransaction()
lane_link_feature1.SetGeometry(geom1)
lane_link_feature2.SetGeometry(geom2)
self.lane_link_layer.SetFeature(lane_link_feature1)
self.lane_link_layer.SetFeature(lane_link_feature2)
self.lane_link_layer.CommitTransaction()
return flag1, flag2, blh_middle_intersection_point
def connect_lane_link_node(self, lane_link_feature, flag, blh_target_point, meshid):
if flag == 0:
lane_link_node_id = lane_link_feature[const.START_LANE_NODE_ID]
else:
lane_link_node_id = lane_link_feature[const.END_LANE_NODE_ID]
lane_node_feature = self.get_lane_link_node_by_id(lane_link_node_id, meshid)
if not lane_node_feature:
return
geom = create_point_geometry(blh_target_point)
self.lane_node_layer.StartTransaction()
lane_node_feature.SetGeometry(geom)
self.lane_node_layer.SetFeature(lane_node_feature)
self.lane_node_layer.CommitTransaction()
lane_node_feature.Destroy()
def models(self):
if not self.lane_edge_layer or not self.lane_link_layer or not self.link_layer or \
not self.lane_edge_node_layer or not self.lane_node_layer or not self.node_layer:
print("线图层和节点图层没有全部获取到,请查验数据是否正确!")
return
self.source_srs = self.lane_link_layer.GetSpatialRef() # 选择中心线的图层坐标,所有图层的坐标系统一致
self.mesh_utm_boundary_line, self.utm_srs = transform_to_utm(self.mesh_blh_boundary_line, self.source_srs)
result = self.get_suspicious_link_features()
if not result:
return
link_features1, link_features2 = result
m = len(link_features1)
n = len(link_features2)
for i in range(m):
target_link_feature1 = link_features1[i]
id1 = target_link_feature1["LINK_ID"]
for j in range(n):
target_link_feature2 = link_features2[j]
id2 = target_link_feature2["LINK_ID"]
connect_judge_ = self.connect_judge(target_link_feature1, target_link_feature2, link_features2)
if not connect_judge_:
continue
lane_edge_features1, lane_edge_features2, lane_link_features1, lane_link_features2 = connect_judge_
# 连接参考线和节点
connect_boundary = self.get_connect_boundary(target_link_feature1, lane_edge_features1,
lane_link_features1, target_link_feature2,
lane_edge_features2, lane_link_features2)
flag1, flag2, blh_link_middle_intersection_point = self.connect_link(target_link_feature1,
target_link_feature2,
connect_boundary)
self.connect_node(target_link_feature1, flag1, blh_link_middle_intersection_point, self.meshid1)
self.connect_node(target_link_feature2, flag2, blh_link_middle_intersection_point, self.meshid2)
# 连接边界线和节点
for lane_edge_feature1 in lane_edge_features1:
lane_edge_feature2 = self.get_connect_feature(lane_edge_feature1, lane_edge_features1,
lane_edge_features2)
if not lane_edge_feature2:
continue
flag1, flag2, blh_middle_intersection_point = self.connect_lane_edge(lane_edge_feature1,
lane_edge_feature2,
connect_boundary)
self.connect_lane_edge_node(lane_edge_feature1, flag1, blh_middle_intersection_point,
self.meshid1)
self.connect_lane_edge_node(lane_edge_feature2, flag2, blh_middle_intersection_point,
self.meshid2)
# 连接中心线和节点
for lane_link_feature1 in lane_link_features1:
lane_link_feature2 = self.get_connect_feature(lane_link_feature1, lane_link_features1,
lane_link_features2)
if not lane_link_feature2:
continue
flag1, flag2, blh_middle_intersection_point = self.connect_lane_link(lane_link_feature1,
lane_link_feature2,
connect_boundary)
self.connect_lane_link_node(lane_link_feature1, flag1, blh_middle_intersection_point,
self.meshid1)
self.connect_lane_link_node(lane_link_feature2, flag2, blh_middle_intersection_point,
self.meshid2)
for lane_edge_feature1 in lane_edge_features1:
lane_edge_feature1.Destroy()
for lane_edge_feature2 in lane_edge_features2:
lane_edge_feature2.Destroy()
for lane_link_feature1 in lane_link_features1:
lane_link_feature1.Destroy()
for lane_link_feature2 in lane_link_features2:
lane_link_feature2.Destroy()
target_link_feature1.Destroy()
for link_feature in link_features2:
link_feature.Destroy()
# _*_ coding: utf-8 _*_ #
# @Time: 2020/1/7 19:37
# @Author: XiongChao
from osgeo import ogr, osr
import math
import copy
from util import const
from util.common import is_same_point, three_point_dot_product, two_point_distance
def get_buffer_point(point1, point2):
# 获取两个点,任意一个点与point1的连线垂直于point1与point2的连线,此点到point1的距离为const.RECTANGLE_WIDTH
result1 = const.RECTANGLE_WIDTH / two_point_distance(point1, point2)
x1 = point1[0] - result1 * (point2[1] - point1[1])
y1 = point1[1] + result1 * (point2[0] - point1[0])
x2 = point1[0] + result1 * (point2[1] - point1[1])
y2 = point1[1] - result1 * (point2[0] - point1[0])
z = (point1[2] + point2[2]) / 2
return (x1, y1, z), (x2, y2, z)
def get_buffer_points(point1, point2):
# 根据point1和point2构造buffer,并返回四个点
data1, data2 = get_buffer_point(point1, point2)
data3, data4 = get_buffer_point(point2, point1)
return [data1, data2, data3, data4]
def intersection_of_two_line(point1, point2, point3, point4):
# 获取两条直线的交点, point1和point2为一条线的起始点,point3和point4为一条直线的起始点
# 在这里,如果两条线平行,那么必然是数据有问题
result1 = (point2[1] - point1[1]) * (point4[0] - point3[0]) - (point4[1] - point3[1]) * (point2[0] - point1[0])
# if result1 == 0:
# return False
result2 = (point2[1] - point1[1]) * point1[0] - (point2[0] - point1[0]) * point1[1]
result3 = (point4[1] - point3[1]) * point3[0] - (point4[0] - point3[0]) * point3[1]
x = ((point4[0] - point3[0]) * result2 - (point2[0] - point1[0]) * result3) / result1
y = ((point4[1] - point3[1]) * result2 - (point2[1] - point1[1]) * result3) / result1
z = (point1[2] + point2[2]) / 2 # point3和point4为边界线,为BL
return x, y, z
def truncate_or_offset_line(dataset, point):
# dataset为数据集,point为数据集与边界的交点,截断数据集或者延长数据集
copy_dataset = copy.copy(dataset)
point1 = dataset.pop()
if is_same_point(point1, point):
point1 = dataset.pop()
while dataset:
point2 = dataset.pop()
if is_same_point(point2, point):
point2 = dataset.pop()
result = three_point_dot_product(point2, point1, point)
if result > 0:
point1 = point2
else:
dataset.append(point2)
dataset.append(point1)
break
if len(dataset) == 0:
dataset.append(copy_dataset[0])
return dataset
def create_line_geometry(dataset):
geo = ogr.Geometry(ogr.wkbLineStringZM)
for i in range(len(dataset)):
geo.SetPoint(i, dataset[i][0], dataset[i][1], dataset[i][2])
return geo
def create_point_geometry(point):
geo = ogr.Geometry(ogr.wkbPointZM)
geo.SetPoint(0, point[0], point[1], point[2])
return geo
def remove_duplicate_data(dataset):
# 将数据集中重复的点去掉,每一个feature至少有起点和终点,并且这两个点不同
# 重点肯定是连续的,并且数量不确定
dataset1 = copy.copy(dataset[::-1])
target_dataset = []
point1 = dataset1.pop()
target_dataset.append(point1)
while dataset1:
point2 = dataset1.pop()
if is_same_point(point1, point2):
continue
target_dataset.append(point2)
point1 = point2
return target_dataset
def transform_to_utm(blh_dataset, source_srs):
lon = blh_dataset[0][0]
utm_zone = 31 + math.floor(lon / 6.0)
utm_srs = osr.SpatialReference()
utm_srs.SetUTM(utm_zone, 1)
src_to_utm_trans = osr.CreateCoordinateTransformation(source_srs, utm_srs)
utm_dataset = []
for blh_point in blh_dataset:
temp_point = blh_point
utm_point = list(src_to_utm_trans.TransformPoint(temp_point[0], temp_point[1]))
utm_point[2] = blh_point[2]
utm_dataset.append(utm_point)
return utm_dataset, utm_srs
def transform_to_src(utm_dataset, source_srs, utm_srs):
utm_to_src_trans = osr.CreateCoordinateTransformation(utm_srs, source_srs)
src_dataset = []
for utm_point in utm_dataset:
temp_point = utm_point
src_point = list(utm_to_src_trans.TransformPoint(temp_point[0], temp_point[1]))
src_point[2] = utm_point[2]
src_dataset.append(src_point)
return src_dataset
def create_polygon_geometry(dataset):
ring = ogr.Geometry(ogr.wkbLinearRing)
for i in range(len(dataset)):
ring.AddPoint(dataset[i][0], dataset[i][1], dataset[i][2])
yard = ogr.Geometry(ogr.wkbPolygon)
yard.AddGeometry(ring)
yard.CloseRings()
return yard
# _*_ coding: utf-8 _*_ #
# @Time: 2020/4/22 16:45
# @Author: XiongChao
import sys
import os
from osgeo import osr
sys.path.append('.')
sys.path.append("../")
from connect_solution import ConnectMeshs
def run(connect_postgresql, sheet_designation_path, mesh_box_path):
# 参数依次为pg库连接字符串、图幅号文件路径,图幅框文件路径
solution = ConnectMeshs(connect_postgresql, sheet_designation_path, mesh_box_path)
try:
solution.models()
print("接边成功,请确认!")
except Exception as e:
solution.mesh_box_data_source.Destroy()
solution.map_data_source.Destroy()
print(e)
if __name__ == '__main__':
# arg1 = "PG:dbname='test3' host='localhost' port='5432' user='postgres' password='19931018'"
# arg2 = r"C:\Users\熊超\Desktop\edge_match_file\mesh_id.txt"
# arg3 = r"C:\Users\熊超\Desktop\edge_match_file\mesh_grid.gpkg"
# run(arg1, arg2, arg3)
arg1 = sys.argv[1] # 数据库连接字符串
arg2 = sys.argv[2] # 图幅号的txt文件
arg3 = sys.argv[3] # 图幅框文件
if arg1[:2].lower() != "pg":
print("请传入连接pg库字符串!")
else:
if arg2[-3:].lower() != "txt":
print("请传入存储图幅号的txt文件")
else:
if arg3[-4:].lower() != "gpkg":
print("请传入图幅框的gpkg文件!")
else:
current_path = os.path.dirname(os.path.abspath(__file__))
gdal_data = current_path + '\\proj'
osr.SetPROJSearchPath(gdal_data)
run(arg1, arg2, arg3)
# _*_ coding: utf-8 _*_ #
# @Time: 2020/1/21 13:18
# @Author: XiongChao
# _*_ coding: utf-8 _*_ #
# @Time: 2020/1/2 19:18
# @Author: XiongChao
import numpy as np
def get_coefficient_vector(point, n):
# 根据一个点,返回一个(n + 1) × 1的矩阵
vector = np.zeros(n + 1)
for i in range(n + 1):
vector[i] = np.power(point[0], i)
return vector.reshape(n + 1, -1) # 返回的对象是二维
def get_coefficient_matrix(dataset, n):
# 获取系数矩阵,为m × (n + 1)
m = len(dataset)
coefficient_matrix = np.zeros((m, n + 1))
for i in range(m):
for j in range(n + 1):
coefficient_matrix[i, j] = np.power(dataset[i][0], j)
return coefficient_matrix
def get_standard_Y_matrix(dataset):
# 获取原始的Y值矩阵,为m × 1
m = len(dataset)
Y_matrix = np.zeros((m, 1))
for i in range(m):
Y_matrix[i, 0] = dataset[i][1]
return Y_matrix
def poly_fitting_parameter(dataset, n):
# dataset中有m个点用于拟合,拟合的多项式的次数为n次
coefficient_matrix = get_coefficient_matrix(dataset, n)
Y_matrix = get_standard_Y_matrix(dataset)
result = np.linalg.det(np.dot(coefficient_matrix.T, coefficient_matrix))
if result == 0:
# 返回的最好是同一种数据结构,便于判断,比如Numpy数组没有 not array判断
return np.zeros(n)
result1 = np.linalg.inv(np.dot(coefficient_matrix.T, coefficient_matrix))
parameter = np.dot(np.dot(result1, coefficient_matrix.T), Y_matrix) # 为(n + 1) × 1矩阵
return parameter
def fitting_loss(dataset, n):
# 指定次数的多项式拟合损失
parameter = poly_fitting_parameter(dataset, n)
if parameter.shape == (n, ):
# 出现奇异矩阵的时候,默认损失为无穷
return np.inf
loss = 0
for data in dataset:
X_vector = get_coefficient_vector(data, n)
Y_predict = np.dot(X_vector.T, parameter)
loss += np.squeeze(data[1] - Y_predict) ** 2
return loss, parameter
def get_degree_of_polynomial(dataset):
# 返回损失值最小的多项式次数
degree_list = [1, 2] # 仅遍历到二次
loss_list = []
parameter_list = []
for n in degree_list:
loss, parameter = fitting_loss(dataset, n)
loss_list.append(loss)
parameter_list.append(parameter)
result = np.array([value == np.inf for value in loss_list])
if result.all():
# 遍历的次数下,构造的系数矩阵都为奇异矩阵
return False
degree = loss_list.index(min(loss_list))
optimization_parameter = parameter_list[degree]
return degree + 1, optimization_parameter
def data_normalization(dataset):
# 数据归一化,返回新数据和均值
x_dataset = [data[0] for data in dataset]
y_dataset = [data[1] for data in dataset]
x_mean = sum(x_dataset) / len(x_dataset)
y_mean = sum(y_dataset) / len(y_dataset)
new_dataset = [[data[0] - x_mean, data[1] - y_mean] for data in dataset]
return new_dataset, x_mean, y_mean
\ No newline at end of file
# _*_ coding: utf-8 _*_ #
# @Time: 2020/1/15 10:32
# @Author: XiongChao
# 对数据集进行正向旋转平移和反向旋转平移
import numpy as np
import math
def cos_(vector1, vector2):
# 计算两个向量的余弦值
cos_theta = (vector1[0] * vector2[0] + vector1[1] * vector2[1]) / \
(math.sqrt(vector1[0] ** 2 + vector1[1] ** 2) * math.sqrt(vector2[0] ** 2 + vector2[1] ** 2))
# 由于浮点运算的截断误差,可能导致求得的值大于1或者小于-1,因此需要进行处理
if cos_theta >= 1:
cos_theta = 1
elif cos_theta <= -1:
cos_theta = -1
return cos_theta
def angle_of_two_vector(vector1, vector2):
# 返回两个向量的角度,返回的值为弧度
cos_theta = cos_(vector1, vector2)
theta = math.acos(cos_theta)
return theta
def intersection_of_two_line(point1, point2, point3, point4):
# 获取两条直线的交点, point1和point2为一条线的起始点,point3和point4为一条直线的起始点
# 在这里,如果两条线平行,那么必然是数据有问题
result1 = (point2[1] - point1[1]) * (point4[0] - point3[0]) - (point4[1] - point3[1]) * (point2[0] - point1[0])
if result1 == 0:
return False
result2 = (point2[1] - point1[1]) * point1[0] - (point2[0] - point1[0]) * point1[1]
result3 = (point4[1] - point3[1]) * point3[0] - (point4[0] - point3[0]) * point3[1]
x = ((point4[0] - point3[0]) * result2 - (point2[0] - point1[0]) * result3) / result1
y = ((point4[1] - point3[1]) * result2 - (point2[1] - point1[1]) * result3) / result1
return x, y
def clockwise_counterclockwise(point1, point2, point3):
# 判断123是顺时针还是逆时针
# 计算向量point1point2与point2point3的向量积
result = (point2[0] - point1[0]) * (point3[1] - point2[1]) - (point2[1] - point1[1]) * (point3[0] - point2[0])
if result > 0:
# point1point2point3逆时针
return 1
else:
return 0
def rotation_angle(point1, point2):
# 计算坐标轴旋转的角度
vector1 = (point2[0] - point1[0], point2[1] - point1[1])
vector2 = (1, 0)
angle = angle_of_two_vector(vector1, vector2)
intersection = intersection_of_two_line(point1, point2, (0, 0), (1, 0))
if not intersection:
return angle
flag_point = [intersection[0] + vector1[0], intersection[1] + vector1[1]]
result = clockwise_counterclockwise((0, 0), (1, 0), flag_point)
if result == 0:
angle = 2 * np.pi - angle
return angle
def data_normalization(dataset):
# 数据归一化,返回新数据和均值
x_dataset = [data[0] for data in dataset]
y_dataset = [data[1] for data in dataset]
x_mean = sum(x_dataset) / len(x_dataset)
y_mean = sum(y_dataset) / len(y_dataset)
new_dataset = [[data[0] - x_mean, data[1] - y_mean] for data in dataset]
return new_dataset, x_mean, y_mean
def construct_positive_transformation_matrix(theta):
# theta为弧度值
positive_matrix = np.zeros((2, 2))
positive_matrix[0, 0] = np.cos(theta)
positive_matrix[0, 1] = np.sin(theta)
positive_matrix[1, 0] = -np.sin(theta)
positive_matrix[1, 1] = np.cos(theta)
return positive_matrix
def positive_transformation_point(theta, point):
# 正向变化一个点
coefficient_matrix = np.zeros((2, 1))
coefficient_matrix[0, 0], coefficient_matrix[1, 0] = point[0], point[1]
transformation_matrix = construct_positive_transformation_matrix(theta)
positive_matrix = np.dot(transformation_matrix, coefficient_matrix)
positive_point = [positive_matrix[0, 0], positive_matrix[1, 0]]
return positive_point
def construct_negative_transformation_matrix(theta):
# theta为弧度值
negative_matrix = np.zeros((2, 2))
negative_matrix[0, 0] = np.cos(theta)
negative_matrix[0, 1] = - np.sin(theta)
negative_matrix[1, 0] = np.sin(theta)
negative_matrix[1, 1] = np.cos(theta)
return negative_matrix
def negative_transformation_point(theta, point):
# 反向变换一个点
coefficient_matrix = np.zeros((2, 1))
coefficient_matrix[0, 0], coefficient_matrix[1, 0] = point[0], point[1]
transformation_matrix = construct_negative_transformation_matrix(theta)
negative_matrix = np.dot(transformation_matrix, coefficient_matrix)
negative_point = [negative_matrix[0, 0], negative_matrix[1, 0]]
return negative_point
def positive_transformation(dataset):
# 对数据集(点集)进行正向旋转平移变换,使用起点和终点构建方向
new_dataset, x_mean, y_mean = data_normalization(dataset)
n = len(dataset)
coefficient_matrix = np.zeros((2, n))
for i in range(n):
coefficient_matrix[:, i] = new_dataset[i]
theta = rotation_angle(new_dataset[0], new_dataset[-1])
transformation_matrix = construct_positive_transformation_matrix(theta)
positive_matrix = np.dot(transformation_matrix, coefficient_matrix)
m = len(positive_matrix[0])
positive_dataset = [[]] * m
for i in range(m):
positive_dataset[i] = list(positive_matrix[:, i])
return positive_dataset, theta, x_mean, y_mean
def negative_transformation(dataset, theta, x_mean, y_mean):
# 对数据集(点集)进行反向旋转平移变换,使用起点和终点构建方向
n = len(dataset)
coefficient_matrix = np.zeros((2, n))
for i in range(n):
coefficient_matrix[: i] = dataset[i]
transformation_matrix = construct_negative_transformation_matrix(theta)
mean_matrix = np.zeros((2, 1))
mean_matrix[0, 0] = x_mean
mean_matrix[1, 0] = y_mean
negative_matrix = np.dot(transformation_matrix, coefficient_matrix) + mean_matrix
m = len(coefficient_matrix[0])
negative_dataset = [[]] * m
for i in range(m):
negative_dataset[i] = list(negative_matrix[:, i])
return negative_dataset
# _*_ coding: utf-8 _*_ #
# @Time: 2020/1/2 19:19
# @Author: XiongChao
import numpy as np
from .polynomial_fitting import get_degree_of_polynomial, get_coefficient_vector
from .rotation_and_translation_of_coordinate import positive_transformation, negative_transformation_point, \
positive_transformation_point
def savgol(dataset, window_size=5):
m = len(dataset)
if m <= window_size:
return dataset
smoothing_dataset = [[]] * m # 存储最终的平滑数据
smoothing_dataset[0], smoothing_dataset[-1] = dataset[0], dataset[-1] # 保留首尾点
# 对开始几个点的平滑
positive_dataset, theta, x_mean, y_mean = positive_transformation(dataset[:window_size])
result = get_degree_of_polynomial(positive_dataset)
if result:
degree, parameter = result
# parameter = poly_fitting_parameter(positive_dataset, degree)
for i in range(1, window_size // 2):
point = dataset[i]
new_point = [point[0] - x_mean, point[1] - y_mean]
positive_point = positive_transformation_point(theta, new_point)
X_vector = get_coefficient_vector(positive_point, degree)
Y_predict = float(np.squeeze(np.dot(X_vector.T, parameter))) # 去掉矩阵的维度,变为一个浮点数
negative_point = np.zeros((2, 1))
negative_point[0, 0], negative_point[1, 0] = positive_point[0], Y_predict
result_point = negative_transformation_point(theta, negative_point)
target_point = [result_point[0] + x_mean, result_point[1] + y_mean]
smoothing_dataset[i] = target_point
# 对最后几个点的平滑
positive_dataset, theta, x_mean, y_mean = positive_transformation(dataset[-window_size:])
result = get_degree_of_polynomial(positive_dataset)
if result:
degree, parameter = result
# parameter = poly_fitting_parameter(positive_dataset, degree)
for j in range(m - (window_size - 1) // 2, m - 1):
point = dataset[j]
new_point = [point[0] - x_mean, point[1] - y_mean]
positive_point = positive_transformation_point(theta, new_point)
X_vector = get_coefficient_vector(positive_point, degree)
Y_predict = float(np.squeeze(np.dot(X_vector.T, parameter))) # 去掉矩阵的维度,变为一个浮点数
negative_point = np.zeros((2, 1))
negative_point[0, 0], negative_point[1, 0] = positive_point[0], Y_predict
result_point = negative_transformation_point(theta, negative_point)
target_point = [result_point[0] + x_mean, result_point[1] + y_mean]
smoothing_dataset[j] = target_point
# 获取中间点的平滑值
for i in range(window_size // 2, m - (window_size - 1) // 2):
data = dataset[i - window_size // 2: i + window_size // 2 + 1]
positive_dataset, theta, x_mean, y_mean = positive_transformation(data)
result = get_degree_of_polynomial(positive_dataset)
if result:
degree, parameter = result
# parameter = poly_fitting_parameter(positive_dataset, degree)
point = dataset[i]
new_point = [point[0] - x_mean, point[1] - y_mean]
positive_point = positive_transformation_point(theta, new_point)
X_vector = get_coefficient_vector(positive_point, degree)
Y_predict = float(np.squeeze(np.dot(X_vector.T, parameter))) # 去掉矩阵的维度,变为一个浮点数
negative_point = np.zeros((2, 1))
negative_point[0, 0], negative_point[1, 0] = positive_point[0], Y_predict
result_point = negative_transformation_point(theta, negative_point)
target_point = [result_point[0] + x_mean, result_point[1] + y_mean]
smoothing_dataset[i] = target_point
return smoothing_dataset
def automatic_smoothing(dataset, times):
# 对线几何的自动化平滑处理
XY_dataset = [[data[0], data[1]] for data in dataset] # 仅对投影坐标下的值进行平滑,z值不变
Z_dataset = [data[2] for data in dataset]
for i in range(times):
XY_dataset = savgol(XY_dataset)
smoothing_XY_dataset = XY_dataset
smoothing_dataset = [[smoothing_XY_dataset[i][0], smoothing_XY_dataset[i][1], Z_dataset[i]]
for i in range(len(Z_dataset))]
return smoothing_dataset
# _*_ coding: utf-8 _*_ #
# @Time: 2020/4/22 16:45
# @Author: XiongChao
\ No newline at end of file
# _*_ coding: utf-8 _*_ #
# @Time: 2019/11/28 17:33
# @Author: XiongChao
import math
import numpy as np
from sklearn.neighbors import KDTree
from . import const
from .data_interpolation import interpolation_dataset
def two_point_distance(point1, point2):
return math.sqrt((point2[0] - point1[0]) ** 2 + (point2[1] - point1[1]) ** 2)
def cos_(vector1, vector2):
# 计算两个向量的余弦值
cos_theta = (vector1[0] * vector2[0] + vector1[1] * vector2[1]) / \
(math.sqrt(vector1[0] ** 2 + vector1[1] ** 2) * math.sqrt(vector2[0] ** 2 + vector2[1] ** 2))
# 由于浮点运算的截断误差,可能导致求得的值大于1或者小于-1,因此需要进行处理
if cos_theta >= 1:
cos_theta = 1
elif cos_theta <= -1:
cos_theta = -1
return cos_theta
def angle_of_two_vector(vector1, vector2):
# 返回两个向量的角度,返回的值为角度,为浮点数,可以运算
cos_theta = cos_(vector1, vector2)
theta = math.acos(cos_theta) / math.pi * 180
return theta
def three_point_dot_product(point1, point2, point3):
# 向量21与23的内积
result = (point1[0] - point2[0]) * (point3[0] - point2[0]) + \
(point1[1] - point2[1]) * (point3[1] - point2[1])
return result
def dis_from_point_to_line(k, b, point):
# k, b代表直线斜率,默认不与X轴垂直
dis = np.abs(k * point[0] + b - point[1]) / np.sqrt(1 + k * k)
return dis
def nearest_point_of_dataset(point, dataset):
# 在dataset中寻找距离point最近的点
# 返回的是dataset的下标
# 仅使用两坐标
distance_list = [two_point_distance(point[:2], data[:2]) for data in dataset]
return dataset[distance_list.index(min(distance_list))]
def clockwise_counterclockwise(point1, point2, point3):
# 判断123是顺时针还是逆时针
# 计算向量point1point2与point2point3的向量积
result = (point2[0] - point1[0]) * (point3[1] - point2[1]) - (point2[1] - point1[1]) * (point3[0] - point2[0])
if result > 0:
# point1point2point3逆时针
return 1
else:
return 0
def is_same_point(point1, point2):
# 判断两个点是不是一个点
flag = np.array([np.abs(point1[i] - point2[i]) < const.ALPHA for i in range(len(point1))])
flag = flag[:2] # 当为三元数值得时候,仅取前两位进行比较
if flag.all():
return True
return False
def two_point_slope_bias(point1, point2):
# 根据两点确定斜率和截距
k = (point2[1] - point1[1]) / (point2[0] - point1[0])
b = (point1[1] * point2[0] - point2[1] * point1[0]) / (point2[0] - point1[0])
return k, b
def vertical_point(point1, point2, point3):
# point2, point3确定一条直线,point1到此直线的垂点
if point2[0] != point3[0]:
k, b = two_point_slope_bias(point2, point3)
x = (k * point1[1] + point1[0] - k * b) / (1 + k * k)
y = k * x + b
else:
x = point2[0]
y = point1[1]
return x, y
def one_point_to_vertical_point_dis(point1, point2, point3):
# point2, point3确定一条直线,point1到此直线的垂点
ver_point = vertical_point(point1, point2, point3)
distance = two_point_distance(point1, ver_point)
return distance
# def one_point_to_vertical_point_dis(point1, point2, point3):
# # # point2, point3确定一条直线,point1到此直线的垂点
# # ver_point = vertical_point(point1, point2, point3)
# # distance = two_point_distance(point1, ver_point)
# # return distance
def nearest_point_index_of_dataset(point, dataset):
# 在dataset中寻找距离point最近的点
# 仅使用两坐标
tree = KDTree(np.array(dataset), leaf_size=const.LEAF_SIZE)
dist, ind = tree.query([np.array(point[:2])], k=1)
return ind[0][0]
def nearest_two_point_of_dataset(point, dataset):
# 在dataset中寻找距离point最近的点
# 仅使用两坐标
tree = KDTree(np.array(dataset), leaf_size=const.LEAF_SIZE)
dist, ind = tree.query([np.array(point[:2])], k=2)
return [dataset[ind[0][0]], dataset[ind[0][1]]]
def dis_from_dataset_to_dataset(dataset1, dataset2):
# 两个数据集之间的距离,采取一个数据集的点到另一个数据集的距离的平均
# 数据集经过插值后,计算点到垂线的距离来度量宽度的误差会很小
# 首先保证同向
dataset1 = [[data[0], data[1]] for data in dataset1]
dataset2 = [[data[0], data[1]] for data in dataset2]
dataset1 = interpolation_dataset(dataset1, 2) # 插值
dataset2 = interpolation_dataset(dataset2, 2)
distance1 = two_point_distance(dataset1[0], dataset2[0])
distance2 = two_point_distance(dataset1[0], dataset2[-1])
if distance1 > distance2:
dataset2 = dataset2[::-1]
# 取对齐的两段
index1 = nearest_point_index_of_dataset(dataset1[0], dataset2)
index2 = nearest_point_index_of_dataset(dataset2[0], dataset1)
distance1 = two_point_distance(dataset1[0], dataset2[index1])
distance2 = two_point_distance(dataset2[0], dataset1[index2])
if distance1 > distance2:
dataset1 = dataset1[index2:]
else:
dataset2 = dataset2[index1:]
index3 = nearest_point_index_of_dataset(dataset1[-1], dataset2)
index4 = nearest_point_index_of_dataset(dataset2[-1], dataset1)
distance3 = two_point_distance(dataset1[-1], dataset2[index3])
distance4 = two_point_distance(dataset2[-1], dataset1[index4])
if distance3 > distance4:
dataset1 = dataset1[:index4 + 1]
else:
dataset2 = dataset2[:index3 + 1]
if len(dataset2) < 2:
# 仅有一个点,在此特殊情形下,仅作简单计算
point = dataset2[0]
index5 = nearest_point_index_of_dataset(point, dataset1)
target_distance = two_point_distance(point, dataset1[index5])
else:
distance_list = []
for i in range(0, len(dataset1)):
point1 = dataset1[i]
point2, point3 = nearest_two_point_of_dataset(point1, dataset2)
distance = one_point_to_vertical_point_dis(point1, point2, point3)
distance_list.append(distance)
target_distance = sum(distance_list) / len(distance_list)
return round(target_distance * 1000)
# _*_ coding: utf-8 _*_ #
# @Time: 2019/11/27 15:12
# @Author: XiongChao
import sys
# 定义一个常量类实现常量的功能
# 该类定义了一个方法__setattr()__,和一个异常ConstError, ConstError类继承
# 自类TypeError. 通过调用类自带的字典__dict__, 判断定义的常量是否包含在字典
# 中。如果字典中包含此变量,将抛出异常,否则,给新创建的常量赋值。
# 最后两行代码的作用是把const类注册到sys.modules这个全局字典中。
class _const:
class ConstError(TypeError):
pass
def __setattr__(self, name, value):
if name in self.__dict__:
raise self.ConstError("Can't rebind const (%s)" % name)
self.__dict__[name] = value
c = _const()
c.BACKUP_MESH_BOX_FILE_NAME = "backup_mesh_box.gpkg" # 图幅框文件的备份文件
c.MESH_BOX_LAYER_NAME = "l13" # 图幅框文件的图层名
c.MESH_BOX_MESH_ID = "meshid" # 图幅框文件的图幅号字段
c.MESH_ID = "MESH_ID" # 地图数据的图幅号
c.LINK_ID = "LINK_ID"
c.LINK_IDS = "LINK_IDS"
c.EDGE_IDS = "EDGE_IDS"
c.LANE_IDS = "LANE_IDS"
c.SMOOTHING_MOUNT = 70 # 接边的时候在边界附近平滑的次数,单位是每米的平滑次数
c.START_NODE_ID = "START_NODE_ID"
c.END_NODE_ID = "END_NODE_ID"
c.NODE_ID = "NODE_ID"
c.HD_LINK_ID = "HD_LINK_ID"
c.START_EDGE_NODE_ID = "START_EDGE_NODE_ID"
c.END_EDGE_NODE_ID = "END_EDGE_NODE_ID"
c.START_LANE_NODE_ID = "START_LANE_NODE_ID"
c.END_LANE_NODE_ID = "END_LANE_NODE_ID"
c.LENGTH_THRESHOLD = 0.001 # 抽稀阈值
c.MESH_HD_LINK = "HD_LINK"
c.LANE_EDGE_ID = "LANE_EDGE_ID"
c.LEFT_LANE_EDGE_ID = "LEFT_LANE_EDGE_ID"
c.RIGHT_LANE_EDGE_ID = "RIGHT_LANE_EDGE_ID"
c.RECTANGLE_WIDTH = 30 # 设置buffer框的宽度为30,长已知
c.LINK_DIS_POINTS = 5 # 计算参考线之间距离所使用的点的个数
c.LINK_DIS_THRESHOLD = 4 # 衡量两个图幅的参考线之间距离的阈值
c.MESH_HD_LANE_EDGE = "HD_LANE_EDGE" # 边界线图层名
c.MESH_HD_LANE_LINK = "HD_LANE_LINK" # 中心线图层名
c.MESH_HD_NODE = "HD_NODE" # 参考线节点图层名
c.MESH_NODE_ID = "NODE_ID" # 参考线节点的主键
c.MESH_HD_LANE_NODE = "HD_LANE_NODE" # 中心线节点图层名
c.MESH_LANE_NODE_ID = "LANE_NODE_ID" # 中心线节点图层的主键
c.MESH_HD_LANE_EDGE_NODE = "HD_LANE_EDGE_NODE" # 边界线节点图层名
c.MESH_LANE_EDGE_NODE_ID = "LANE_EDGE_NODE_ID" # 边界线节点图层的主键
c.SMOOTHING_DATA_LENGTH = 6 # 截取多长的数据平滑
c.MESH_INTERPOLATION_MOUNT1 = 1 # 接边的时候插值
c.MESH_INTERPOLATION_MOUNT2 = 0.125 # 接边的时候插值
c.CONNECT_DIS_THRESHOLD = 3 # 衡量是否连接对应两条线的阈值,针对边界线和中心线
c.ELEVATION_THRESHOLD = 1.5 # 判断对应的两条路是否在同一平面
c.NEAR_POINT_THRESHOLD = 0.03 # 是否相接近判断的阈值
c.ALPHA = 0.00001 # 判断两个浮点数是否相等
c.LEAF_SIZE = 20
c.RETURNED_MESH_INFO_FILE_NAME = "未接边信息.txt"
sys.modules[__name__] = c
# _*_ coding: utf-8 _*_ #
# @Time: 2020/2/26 14:47
# @Author: XiongChao
import numpy as np
import math
def two_point_distance(point1, point2):
# 计算二元坐标的距离
return math.sqrt((point2[0] - point1[0]) ** 2 + (point2[1] - point1[1]) ** 2)
def interpolation_between_two_point(start_point, end_point, ratio):
# 在起点与终点之间进行插值,ratio是插值的数量
distance = two_point_distance(start_point, end_point)
n = int(distance * ratio) # 插值的个数
dataset = []
result = np.linspace(0, 1, n + 2)[:n + 1] # 保留首,不保留尾
for k in result:
x = k * (end_point[0] - start_point[0]) + start_point[0]
y = k * (end_point[1] - start_point[1]) + start_point[1]
if len(start_point) == 3:
z = k * (end_point[2] - start_point[2]) + start_point[2]
point = (x, y, z)
else:
point = (x, y)
dataset.append(point)
return dataset
def interpolation_dataset(dataset, interpolation_mount):
m = len(dataset)
new_dataset = []
for i in range(m - 1):
new_dataset += interpolation_between_two_point(dataset[i], dataset[i + 1], interpolation_mount)
new_dataset.append(dataset[m - 1])
return new_dataset
# _*_ coding: utf-8 _*_ #
# @Time: 2020/2/26 15:33
# @Author: XiongChao
from util import const
from .common import one_point_to_vertical_point_dis
from edge_match_src import remove_duplicate_data
class DouglasPeuker(object):
def __init__(self):
self.threshold = const.LENGTH_THRESHOLD
self.qualify_list = list()
self.disqualify_list = list()
def diluting(self, point_list):
"""
vacuate_layer
:param point_list:二维点列表
:return:
"""
if len(point_list) < 3:
self.qualify_list.extend(point_list[::-1])
else:
# 找到与收尾两点连线距离最大的点
max_distance_index, max_distance = 0, 0
for index, point in enumerate(point_list):
if index in [0, len(point_list) - 1]:
continue
distance = one_point_to_vertical_point_dis(point, point_list[0], point_list[-1])
if distance > max_distance:
max_distance_index = index
max_distance = distance
# 若最大距离小于阈值,则去掉所有中间点。 反之,则将曲线按最大距离点分割
if max_distance < self.threshold:
self.qualify_list.append(point_list[-1])
self.qualify_list.append(point_list[0])
else:
# 将曲线按最大距离的点分割成两段
sequence_a = point_list[:max_distance_index + 1]
sequence_b = point_list[max_distance_index:]
for sequence in [sequence_a, sequence_b]:
if len(sequence) < 3 and sequence == sequence_b:
self.qualify_list.extend(sequence[::-1])
else:
self.disqualify_list.append(sequence)
def models(self, point_list):
self.diluting(point_list)
while len(self.disqualify_list) > 0:
self.diluting(self.disqualify_list.pop())
target_dataset = remove_duplicate_data(self.qualify_list)
return target_dataset
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment