# _*_ coding: utf-8 _*_ # # @Time: 2020/4/22 17:04 # @Author: XiongChao import csv from connect_solution.coordinate_transformation import * from util.common import * from math_tools.data_interpolation import interpolation_dataset_3d from connect_solution.geographic_tool import * from math_tools.polynomial_fitting import * from math_tools.savitsky_golay_filter import * from math_tools.douglas_peucker_2d import * from util import const class ConnectMeshByMeshList: """ 根据图幅号列表进行接边,需要传入图幅号列表和连接数据库 """ def __init__(self, hd_data_source, mesh_box_layer, mesh_ids, output_document_path): self.hd_data_source = hd_data_source self.mesh_box_layer = mesh_box_layer self.mesh_ids = mesh_ids self.output_document_path = output_document_path self.output_info1 = [] # 用于存储未接边输出的信息 self.output_info2 = [] # 用于存储已接边输出的信息 self.output_info3 = [] # 用于存储不需要接边的信息 self.topo_point_layer = self.hd_data_source.GetLayerByName(const.HD_TOPO_P) self.link_layer = self.hd_data_source.GetLayerByName(const.HD_LINK) self.node_layer = self.hd_data_source.GetLayerByName(const.HD_NODE) self.lane_link_layer = self.hd_data_source.GetLayerByName(const.HD_LANE_LINK) self.lane_node_layer = self.hd_data_source.GetLayerByName(const.HD_LANE_NODE) self.lane_edge_layer = self.hd_data_source.GetLayerByName(const.HD_LANE_EDGE) self.lane_edge_node_layer = self.hd_data_source.GetLayerByName(const.HD_LANE_EDGE_NODE) self.link_edge_layer = self.hd_data_source.GetLayerByName(const.HD_LINK_EDGE) self.link_edge_node_layer = self.hd_data_source.GetLayerByName(const.HD_LINK_EDGE_NODE) def get_box_by_meshid(self, mesh_id): # 根据图幅号获取图幅框 filter = '"' + const.MESH_ID + '" = ' + "'" + str(mesh_id) + "'" self.mesh_box_layer.SetAttributeFilter(filter) mesh_box_count = self.mesh_box_layer.GetFeatureCount() if mesh_box_count != 1: return False mesh_box_feature = self.mesh_box_layer.GetNextFeature() self.mesh_box_layer.SetAttributeFilter(None) # 消除属性过滤 self.mesh_box_layer.ResetReading() return mesh_box_feature def get_waiting_connect_meshs(self, mesh_box_feature, mesh_id): mesh_box_geom = mesh_box_feature.GetGeometryRef() mesh_box_buffer = mesh_box_geom.Buffer(const.BUFFER_SIZE1, 4) mesh_box_buffer_points = mesh_box_buffer.GetGeometryRef(0).GetPoints() lon_list = [point[0] for point in mesh_box_buffer_points] lat_list = [point[1] for point in mesh_box_buffer_points] min_lon, max_lon = min(lon_list), max(lon_list) min_lat, max_lat = min(lat_list), max(lat_list) self.mesh_box_layer.SetSpatialFilterRect(min_lon, min_lat, max_lon, max_lat) mesh_box_count = self.mesh_box_layer.GetFeatureCount() mesh_box_features = [] for _ in range(mesh_box_count): mesh_box_feature = self.mesh_box_layer.GetNextFeature() if mesh_box_feature is None or mesh_box_feature[const.MESH_ID] == mesh_id: continue mesh_box_features.append(mesh_box_feature) self.mesh_box_layer.SetSpatialFilter(None) # 消除空间过滤 self.mesh_box_layer.ResetReading() return mesh_box_features @staticmethod def construct_opposite_buffer(intersect_point): # 根据顶点的交点构造一个buffer lon, lat = intersect_point[0], intersect_point[1] min_lon, max_lon = lon - const.BUFFER_SIZE4, lon + const.BUFFER_SIZE4 min_lat, max_lat = lat - const.BUFFER_SIZE4, lat + const.BUFFER_SIZE4 return min_lon, max_lon, min_lat, max_lat @staticmethod def construct_adjacent_buffer(intersect_points): # 根据共有边线构造buffer框 point1, point2 = intersect_points[0], intersect_points[1] if abs(point1[0] - point2[0]) < 0.000000001: # 经度相同,边线垂直向下 min_lon, max_lon = point1[0] - const.BUFFER_SIZE2, point1[0] + const.BUFFER_SIZE2 min_lat, max_lat = min(point1[1], point2[1]), max(point1[1], point2[1]) else: # 纬度相同,边线水平 min_lon, max_lon = min(point1[0], point2[0]), max(point1[0], point2[0]) min_lat, max_lat = point1[1] - const.BUFFER_SIZE2, point2[1] + const.BUFFER_SIZE2 return min_lon, max_lon, min_lat, max_lat def is_effective_data(self): if self.topo_point_layer is None: output_info = "输入的数据没有拓扑点图层,请查验数据后接边" self.output_info1.append(['', '', output_info]) return False if self.link_layer is None: output_info = "输入的数据没有参考线图层,请查验数据后接边" self.output_info1.append(['', '', output_info]) return False if self.node_layer is None: output_info = "输入的数据没有参考线节点图层,请查验数据后接边" self.output_info1.append(['', '', output_info]) return False if self.lane_link_layer is None: output_info = "输入的数据没有中心线图层,请查验数据后接边" self.output_info1.append(['', '', output_info]) return False if self.lane_node_layer is None: output_info = "输入的数据没有中心线节点图层,请查验数据后接边" self.output_info1.append(['', '', output_info]) return False if self.lane_edge_layer is None: output_info = "输入的数据没有边界线图层,请查验数据后接边" self.output_info1.append(['', '', output_info]) return False if self.lane_edge_node_layer is None: output_info = "输入的数据没有边界线节点图层,请查验数据后接边" self.output_info1.append(['', '', output_info]) return False return True def write_info(self): # 将存储的接边信息写到指定的文件路径下,此处模式为添加 path1 = self.output_document_path + "\\未接边信息.csv" path2 = self.output_document_path + "\\已接边信息.txt" path3 = self.output_document_path + "\\不需要接边信息.txt" with open(path1, 'a', encoding='utf_8_sig', newline='') as file1: writer1 = csv.writer(file1) writer1.writerow([" "]) writer1.writerow(["序号", "图幅号1", "图幅号2", "接边报错信息"]) index = 1 for info in self.output_info1: error_info = [str(index), info[0], info[1]] for error_message in info[2]: error_info.append(str(error_message)) writer1.writerow(error_info) index += 1 with open(path2, 'a', encoding='utf-8') as file2: file2.write('\n') for info in self.output_info2: file2.write(info + '\n') with open(path3, 'a', encoding='utf-8') as file3: file3.write('\n') for info in self.output_info3: file3.write(info + '\n') def is_need_connect(self, min_lon, min_lat, max_lon, max_lat, mesh_id1, mesh_id2): # 用于输出需要接边的topo点,并且这些topo点有后继信息,可以根据next往后面走 self.topo_point_layer.SetSpatialFilterRect(min_lon, min_lat, max_lon, max_lat) topo_feature_count = self.topo_point_layer.GetFeatureCount() if topo_feature_count == 0: output_info = "图幅号为:" + str(mesh_id1) + "的topo点" + "和图幅号为:" + \ str(mesh_id2) + "的topo点没有连接的关系,不需要接边\n" self.output_info3.append(output_info) return False, [] filter = '"' + const.NEXT_COUNT + '" = 0 and "' + const.PREV_COUNT + '" != 0' self.topo_point_layer.SetAttributeFilter(filter) topo_feature_count = self.topo_point_layer.GetFeatureCount() if topo_feature_count == 0: output_info = "图幅号:" + str(mesh_id1) + "和图幅号:" + str(mesh_id2) + \ "之间没有topo的next_count为0且prev_count不为0,不需要接边\n" self.output_info3.append(output_info) return False, [] topo_features = [] for _ in range(topo_feature_count): topo_feature = self.topo_point_layer.GetNextFeature() if not topo_feature or not topo_feature[const.NEXT]: continue topo_features.append(topo_feature) self.topo_point_layer.SetAttributeFilter(None) # 消除属性过滤 self.topo_point_layer.SetSpatialFilter(None) # 消除空间过滤 self.topo_point_layer.ResetReading() if not topo_features: output_info = "图幅号:" + str(mesh_id1) + "和图幅号:" + str(mesh_id2) + \ "之间没有topo的next_count为0且prev_count不为0且next字段不为空,不需要接边\n" self.output_info3.append(output_info) return False, [] target_topo_features = [] gps_ids = [] for topo_feature in topo_features: if not topo_feature or (not int(topo_feature[const.NEXT][1]) == 1): continue _, next_gps_id = topo_feature[const.NEXT][1:-1].split(":") filter = '"' + const.ID + '" = ' + str(next_gps_id) self.topo_point_layer.SetAttributeFilter(filter) topo_count = self.topo_point_layer.GetFeatureCount() if topo_count == 0: output_info = "图幅号:" + str(mesh_id1) + "和图幅号:" + str(mesh_id2) + "的接边中,topo点的id为:" + \ str(topo_feature[const.ID]) + ",根据它的next字段没有找到它的准确后继," \ "可能为topo跳点的地方,不接边\n" self.output_info3.append(output_info) self.topo_point_layer.SetAttributeFilter(None) continue next_topo_feature = None flag = False for _ in range(topo_count): # 如果有多个topo数据,选择有next字段的并且仅有一个next的topo点 next_topo_feature = self.topo_point_layer.GetNextFeature() if not next_topo_feature: continue if int(next_topo_feature[const.NEXT_COUNT]) == 1: flag = True break self.topo_point_layer.SetAttributeFilter(None) if not flag: output_info = "图幅号:" + str(mesh_id1) + "和图幅号:" + str(mesh_id2) + "的接边中,topo点的id为:" + \ str(topo_feature[const.ID]) + ",根据它的next字段没有找到它的后继中也有后继的topo点," \ "从topo点判断不是接边处,不接边\n" self.output_info3.append(output_info) continue if topo_feature["MESHID"] != next_topo_feature["MESHID"] and next_topo_feature["MESHID"] in [mesh_id1, mesh_id2]: # topo点实现跨越图幅 if int(next_topo_feature[const.ID]) not in gps_ids: gps_ids.append(int(next_topo_feature[const.ID])) target_topo_features.append(next_topo_feature) if not target_topo_features: return False, [] return True, target_topo_features def reset_link_edge_layer(self): # 判断是否重置道路边缘图层,如果参考线没有左右道路边缘图层的字段,就重置为None if self.link_layer.FindFieldIndex(const.LEFT_LINK_EDGE, False) == -1 or \ self.link_layer.FindFieldIndex(const.RIGHT_LINK_EDGE, False) == -1 or \ self.link_edge_layer is None or self.link_edge_layer.GetFeatureCount() == 0: self.link_edge_layer = None def is_connect_by_opposite(self, intersect_points, mesh_id1, mesh_id2): """ 判断对角的情形下是否需要接边 :param intersect_points: 对角图幅的交点 :param mesh_id1: 第一个图幅框的图幅号 :param mesh_id2: 第二个图幅框的图幅号 :return: boolean """ min_lon, max_lon, min_lat, max_lat = self.construct_opposite_buffer(intersect_points[0]) self.topo_point_layer.SetSpatialFilterRect(min_lon, min_lat, max_lon, max_lat) topo_feature_count = self.topo_point_layer.GetFeatureCount() self.topo_point_layer.SetSpatialFilter(None) # 消除空间过滤器 if topo_feature_count == 0: output_info = "图幅号为:" + str(mesh_id1) + "的topo点" + "和图幅号为:" + \ str(mesh_id2) + "的topo点没有连接的关系,不需要接边\n" self.output_info3.append(output_info) return False return True def main(self): if not self.is_effective_data(): self.write_info() return self.reset_link_edge_layer() connect_info = [self.hd_data_source, self.topo_point_layer, self.link_layer, self.node_layer, self.lane_link_layer, self.lane_node_layer, self.lane_edge_layer, self.lane_edge_node_layer, self.link_edge_layer, self.link_edge_node_layer] two_mesh_solution = ConnectTwoMeshByTopo(*connect_info) connected_meshs = {} # 存储已经结果边的图幅,格式mesh_id:mesh_id1, mesh_id2, ... for mesh_id1 in self.mesh_ids: try: mesh_box_feature1 = self.get_box_by_meshid(mesh_id1) connected_meshs[mesh_id1] = [] if mesh_box_feature1 is False: output_info = "根据图幅号:" + str(mesh_id1) + \ "没有找到图幅框或者找到的图幅框不止一个,数据异常,请修改数据后执行接边" self.output_info1.append([str(mesh_id1), '', [output_info]]) continue mesh_box_geom1 = mesh_box_feature1.GetGeometryRef() mesh_box_features = self.get_waiting_connect_meshs(mesh_box_feature1, mesh_id1) except Exception as e: output_info = "在搜索图幅号:" + str(mesh_id1) + "周围应该与之接边的图幅框时,出现异常:" + \ str(e) + ",请手动将需要与之接边的地方接起来" self.output_info1.append([str(mesh_id1), '', [output_info]]) continue for mesh_box_feature2 in mesh_box_features: mesh_box_geom2 = mesh_box_feature2.GetGeometryRef() mesh_id2 = mesh_box_feature2[const.MESH_ID] if mesh_id2 in connected_meshs.keys() and mesh_id1 in connected_meshs[mesh_id2]: # 此时已经接过边了 continue if mesh_id2 not in connected_meshs[mesh_id1]: connected_meshs[mesh_id1].append(mesh_id2) if not mesh_box_geom1.Intersects(mesh_box_geom2): output_info = "图幅号为:" + str(mesh_id1) + "的图幅框" + "和图幅号为:" + \ str(mesh_id2) + "的图幅框没有相交的部分,不接边\n" self.output_info3.append(output_info) continue intersect_geom = mesh_box_geom1.Intersection(mesh_box_geom2) intersect_points = intersect_geom.GetPoints() if len(intersect_points) == 1: # 表示两个图幅互为对角,如果需要接边,则报出来手动接边,此情形极少 if self.is_connect_by_opposite(intersect_points, mesh_id1, mesh_id2): output_info = "图幅号为:" + str(mesh_id1) + "的图幅框" + "和图幅号为:" + str(mesh_id2) + \ "的图幅框互为对角,可能要接边,请查验此处是否需要接边,如果需要,请手动接边" self.output_info1.append([str(mesh_id1), str(mesh_id2), [output_info]]) continue else: # 表示两个图幅相邻 min_lon, max_lon, min_lat, max_lat = self.construct_adjacent_buffer(intersect_points) try: flag, topo_features = self.is_need_connect(min_lon, min_lat, max_lon, max_lat, mesh_id1, mesh_id2) except Exception as e: output_info = "查找图幅号为:" + str(mesh_id1) + " " + str(mesh_id2) + \ "的交界处查找是否有需要接边的topo时,出现异常:" + str(e) + \ "请找到此处判别是否需要接边,如果需要,请手动接边" self.output_info1.append([str(mesh_id1), str(mesh_id2), [output_info]]) continue if flag: # 对每一对topo,从gps上来说这个地方是要连接的 try: self.hd_data_source.StartTransaction() two_mesh_solution.models(topo_features, mesh_id1, mesh_id2) self.hd_data_source.CommitTransaction() except Exception as e: output_info = "图幅号为:" + str(mesh_id1) + " " + str(mesh_id2) + \ "的接边出现异常,异常:" + str(e) + ",请手动接边" self.output_info1.append([str(mesh_id1), str(mesh_id2), [output_info]]) self.hd_data_source.CommitTransaction() continue if two_mesh_solution.output_info1: self.output_info1.append([str(mesh_id1), str(mesh_id2), two_mesh_solution.output_info1]) if two_mesh_solution.output_info2: output_info = "图幅号为:" + str(mesh_id1) + " " + str(mesh_id2) + "的接边运行开始" self.output_info2.append(output_info) self.output_info2 += two_mesh_solution.output_info2 output_info = "图幅号为:" + str(mesh_id1) + " " + str(mesh_id2) + "的接边运行结束\n" self.output_info2.append(output_info) two_mesh_solution.output_info1 = [] two_mesh_solution.output_info2 = [] output_info = "\n接边程序程序运行结束\n" self.output_info2.append(output_info) self.output_info3.append(output_info) self.write_info() # 写入接边信息 class ConnectTwoMeshByTopo: """ 先判断是否需要连接两个图幅,如果需要的话将两个图幅中需要连接的道路连接起来 """ def __init__(self, hd_data_source, topo_point_layer, link_layer, node_layer, lane_link_layer, lane_node_layer, lane_edge_layer, lane_edge_node_layer, link_edge_layer=None, link_edge_node_layer=None): self.hd_data_source = hd_data_source self.topo_point_layer = topo_point_layer self.link_layer = link_layer self.node_layer = node_layer self.lane_link_layer = lane_link_layer self.lane_node_layer = lane_node_layer self.lane_edge_layer = lane_edge_layer self.lane_edge_node_layer = lane_edge_node_layer self.link_edge_layer = link_edge_layer self.link_edge_node_layer = link_edge_node_layer self.output_info1 = [] self.output_info2 = [] def is_gap_or_big_curve(self, topo_feature, source_srs): # 判断从某一点开始,后面的gps点有没有间断或者大弯道的情形 move_count = 0 blh_point1 = topo_feature.GetGeometryRef().GetPoints() # 为一元列表 azimuth1 = topo_feature[const.AZIMUTH] utm_point1 = transform_to_utm(blh_point1, source_srs)[0][0] while move_count <= const.MOVE_COUNT_THRESHOLD: next_count = topo_feature[const.NEXT_COUNT] if not next_count or int(next_count) != 1: # 在寻找参考线的过程中可能会报错,在那里报出来即可 return False _, next_topo_id = topo_feature[const.NEXT][1:-1].split(':') filter = '"' + const.ID + '" = ' + str(next_topo_id) self.topo_point_layer.SetAttributeFilter(filter) topo_count = self.topo_point_layer.GetFeatureCount() if topo_count != 1: # 在寻找参考线的过程中可能会报错,在那里报出来即可 # 可能到达了屠夫边界 self.topo_point_layer.SetAttributeFilter(None) # 消除属性过滤 return False topo_feature = self.topo_point_layer.GetNextFeature() self.topo_point_layer.SetAttributeFilter(None) # 消除属性过滤 self.topo_point_layer.ResetReading() azimuth2 = topo_feature[const.AZIMUTH] if abs(abs(abs(azimuth2 - azimuth1) - 180) - 180) > const.AZIMUTH_THRESHOLD: output_info = "gps_id为: " + str(topo_feature[const.ID]) + "的topo点的接边处为大弯道,请手动接边" self.output_info1.append(output_info) return True blh_point2 = topo_feature.GetGeometryRef().GetPoints() utm_point2 = transform_to_utm(blh_point2, source_srs)[0][0] distance = two_point_distance_2d(utm_point1, utm_point2) if distance > const.DIS_THRESHOLD2: output_info = "gps_id为: " + str(topo_feature[const.ID]) + "的topo点的接边处为大间断,请手动接边" self.output_info1.append(output_info) return True move_count += 1 utm_point1 = utm_point2 azimuth1 = azimuth2 return False @staticmethod def create_buffer(topo_feature): topo_blh_point = topo_feature.GetGeometryRef().GetPoints()[0] lon, lat = topo_blh_point[0], topo_blh_point[1] min_lon, max_lon = lon - const.BUFFER_SIZE3, lon + const.BUFFER_SIZE3 min_lat, max_lat = lat - const.BUFFER_SIZE3, lat + const.BUFFER_SIZE3 return min_lon, max_lon, min_lat, max_lat def dis_from_topo_to_line(self, topo_feature, line_feature, source_srs): # 计算一个gps点到一条线的距离 topo_blh_point = topo_feature.GetGeometryRef().GetPoints() lon, lat, up = topo_blh_point[0][0], topo_blh_point[0][1], topo_blh_point[0][2] azimuth, roll, pitch = topo_feature[const.AZIMUTH], topo_feature[const.ROLL], topo_feature[const.PITCH] line_blh_points = line_feature.GetGeometryRef().GetPoints() line_cpt_points = blh_to_cpt(line_blh_points, lon, lat, up, pitch, roll, azimuth) # 去除很近点,防止后面报错 new_line_cpt_points = self.remove_near_points(interpolation_dataset_3d(line_cpt_points, const.INTERPOLATION_COUNT1)) point_index1, point_index2 = nearest_m_point_of_dataset_2d([0, 0, 0], new_line_cpt_points, 2) cpt_points = [new_line_cpt_points[point_index1], new_line_cpt_points[point_index2]] vector = [cpt_points[1][0] - cpt_points[0][0], cpt_points[1][1] - cpt_points[0][1]] angle = angle_of_two_vector(vector, [0, 1]) if abs(angle - 90) < const.ANGLE2: # 与行驶方向差异过大,不计入计算 return np.inf utm_topo_point = transform_to_utm(topo_blh_point, source_srs)[0][0] utm_line_points = transform_to_utm(line_blh_points, source_srs)[0] new_line_utm_points = interpolation_dataset_3d(utm_line_points, const.INTERPOLATION_COUNT1) point_index = nearest_m_point_of_dataset_2d(utm_topo_point, new_line_utm_points, 1)[0] return two_point_distance_2d(utm_topo_point, new_line_utm_points[point_index]) def get_lane_edge_intersect_buffer(self, topo_feature, mesh_id1, mesh_id2, source_srs): min_lon, max_lon, min_lat, max_lat = self.create_buffer(topo_feature) self.lane_edge_layer.SetSpatialFilterRect(min_lon, min_lat, max_lon, max_lat) lane_edge_feature_count = self.lane_edge_layer.GetFeatureCount() if lane_edge_feature_count < 2: output_info = "gps_id: " + str(topo_feature[const.ID]) + "的topo点附近的车道宽度过大,请手动接边" self.output_info1.append(output_info) return [] lane_edge_features = [] for _ in range(lane_edge_feature_count): lane_edge_feature = self.lane_edge_layer.GetNextFeature() if lane_edge_feature is None: continue distance = self.dis_from_topo_to_line(topo_feature, lane_edge_feature, source_srs) if distance == np.inf: # 边界线的朝向与topo的朝向不同 continue lane_edge_features.append([distance, lane_edge_feature]) self.lane_edge_layer.SetSpatialFilter(None) # 消除空间过滤 self.lane_edge_layer.ResetReading() if len(lane_edge_features) == 0: output_info = "根据gps_id: " + str(topo_feature[const.ID]) + \ "构造的buffer没有框选到的非空边界线,或者框选的边界线的方向与topo点的方向差异过大,请手动接边" self.output_info1.append(output_info) return [] lane_edge_features = [lane_edge_feature for lane_edge_feature in lane_edge_features if (int(lane_edge_feature[1][const.MESH_ID]) == mesh_id1 or int(lane_edge_feature[1][const.MESH_ID]) == mesh_id2) and lane_edge_feature[0] < const.DIS_THRESHOLD1] if not lane_edge_features: output_info = "根据gps_id: " + str(topo_feature[const.ID]) + "构造的buffer框选的边界线与gps点相距太远,请手动接边" self.output_info1.append(output_info) # # 获取的边界线距离topo距离较近,判断这些参考线是否关联着相同的参考线 # link_id1 = lane_edge_features[0][1][const.HD_LINK_ID] # mesh_id1 = lane_edge_features[0][1][const.MESH_ID] # for distance, lane_edge_feature in lane_edge_features: # link_id2 = lane_edge_feature[const.HD_LINK_ID] # if link_id2 != link_id1: # # 计算两条参考线接近部分的夹角,较大,说明为交叉路 # mesh_id2 = lane_edge_feature[const.MESH_ID] # output_info = "gps_id: " + str(topo_feature[const.ID]) + "附近可能有交叉路或距离很近的双向道路,请手动接边" # self.output_info1.append(output_info) # return [] return lane_edge_features @staticmethod def cal_elevation_bias(utm_topo_point, utm_link_points1, utm_link_points2): # 计算两条参考线的高程偏差;取10个点进行分析 distance1 = two_point_distance_2d(utm_link_points1[0], utm_topo_point) distance2 = two_point_distance_2d(utm_link_points1[-1], utm_topo_point) if distance1 <= distance2: new_utm_link_points1 = utm_link_points1[:10] else: new_utm_link_points1 = utm_link_points1[-10:] distance3 = two_point_distance_2d(utm_link_points2[0], utm_topo_point) distance4 = two_point_distance_2d(utm_link_points2[-1], utm_topo_point) if distance3 < distance4: new_utm_link_points2 = utm_link_points2[:10] else: new_utm_link_points2 = utm_link_points2[-10:] elevation_mean1 = np.mean([point[2] for point in new_utm_link_points1]) elevation_mean2 = np.mean([point[2] for point in new_utm_link_points2]) return abs(elevation_mean1 - elevation_mean2) def has_overpass(self, topo_feature, lane_edge_features, source_srs): # 判断这个地方是否是高架桥 utm_topo_point = transform_to_utm(topo_feature.GetGeometryRef().GetPoints(), source_srs)[0][0] link_id1 = lane_edge_features[0][const.HD_LINK_ID] # 最近的边线的id mesh_id1 = lane_edge_features[0][const.MESH_ID] # 最近边线的图幅号 if len(lane_edge_features) != 1: for lane_edge_feature in lane_edge_features: link_id2 = lane_edge_feature[const.HD_LINK_ID] if link_id2 != link_id1: mesh_id2 = lane_edge_feature[const.MESH_ID] # 找到这两条参考线,并计算高程的差距 link_feature1 = self.get_link_by_id(link_id1, mesh_id1) link_feature2 = self.get_link_by_id(link_id2, mesh_id2) if not link_feature1 or not link_feature2: output_info = "根据id:" + str(link_id1) + "和" + str(mesh_id1) + "或者id:" + \ str(link_id2) + "和" + str(mesh_id2) + "没有获取到参考线,请手动接边" self.output_info1.append(output_info) return True blh_link_points1 = link_feature1.GetGeometryRef().GetPoints() blh_link_points2 = link_feature2.GetGeometryRef().GetPoints() utm_link_points1 = interpolation_dataset_3d(transform_to_utm(blh_link_points1, source_srs)[0], 5) utm_link_points2 = interpolation_dataset_3d(transform_to_utm(blh_link_points2, source_srs)[0], 5) elevation_bias = self.cal_elevation_bias(utm_topo_point, utm_link_points1, utm_link_points2) if elevation_bias > const.ELEVATION_THRESHOLD: output_info = "gps_id为" + str(topo_feature[const.ID]) + \ "附近可能为高架桥或者相隔很近的双向道路,请手动接边" self.output_info1.append(output_info) return True return False def is_connect_road(self, begin_topo_feature, mesh_id1, mesh_id2, source_srs): # 根据数据形式,begin_topo和end_topo均位于相对的另一个图幅中 # 方法为根据begin_topo的next字段,不断向后搜索,直到距离topo最近的边界线的图幅号变为mesh_id2为止 lane_edge_features = self.get_lane_edge_intersect_buffer(begin_topo_feature, mesh_id1, mesh_id2, source_srs) if not lane_edge_features: return False, [] lane_edge_features.sort(key=lambda x: x[0]) # 按照距离gps点的距离排序 lane_edge_features = [lane_edge_feature[1] for lane_edge_feature in lane_edge_features] # 去除距离 if self.has_overpass(begin_topo_feature, lane_edge_features, source_srs): return False, [] link_id1 = link_id2 = lane_edge_features[0][const.HD_LINK_ID] mesh_id = mesh_id1 = lane_edge_features[0][const.MESH_ID] move_count = 0 while mesh_id1 == mesh_id and move_count < const.MOVE_COUNT_THRESHOLD: next_count = begin_topo_feature[const.NEXT_COUNT] if not next_count or int(next_count) != 1: output_info = "gps_id为" + str(begin_topo_feature[const.ID]) + \ "的gps点的下一个gps点的数量不为1,topo数据异常,请手动接边" self.output_info1.append(output_info) return False, [] _, next_topo_id = begin_topo_feature[const.NEXT][1:-1].split(':') filter = '"' + const.ID + '" = ' + str(next_topo_id) self.topo_point_layer.SetAttributeFilter(filter) topo_count = self.topo_point_layer.GetFeatureCount() if topo_count != 1: output_info = "根据gps_id:" + str(next_topo_id) + "搜索到的topo点不是一个,topo数据异常,请手动接边" self.output_info1.append(output_info) return False, [] begin_topo_feature = self.topo_point_layer.GetNextFeature() self.topo_point_layer.SetAttributeFilter(None) # 消除属性过滤 self.topo_point_layer.ResetReading() lane_edge_features = self.get_lane_edge_intersect_buffer(begin_topo_feature, mesh_id1, mesh_id2, source_srs) if not lane_edge_features: return False, [] lane_edge_features.sort(key=lambda x: x[0]) # 按照距离gps点的距离排序 lane_edge_features = [lane_edge_feature[1] for lane_edge_feature in lane_edge_features] # 去除距离 if self.has_overpass(begin_topo_feature, lane_edge_features, source_srs): return False, [] mesh_id, link_id2 = lane_edge_features[0][const.MESH_ID], lane_edge_features[0][const.HD_LINK_ID] if mesh_id == mesh_id1 and link_id2 != link_id1: # 同一图幅内找到新的待接边参考线 link_id1 = link_id2 move_count += 1 if move_count >= const.MOVE_COUNT_THRESHOLD: output_info = "gps_id为:" + str(begin_topo_feature[const.ID]) + "的topo点附近接边的数据距离边界太远,请手动接边" self.output_info1.append(output_info) return False, [] if link_id1 == link_id2: output_info = "id为:" + str(link_id1) + "的参考线上挂接有不同图幅的两条边界线,数据异常,请修改数据后手动接边" self.output_info1.append(output_info) return False, [] return True, [begin_topo_feature, link_id1, link_id2] def get_link_by_id(self, link_id, mesh_id): filter = '"' + const.LINK_ID + '" = ' + str(link_id) + ' and "' + \ const.MESH_ID + '" = ' + "'" + str(mesh_id) + "'" self.link_layer.SetAttributeFilter(filter) link_count = self.link_layer.GetFeatureCount() if link_count != 1: output_info = "在图幅号为:" + str(mesh_id) + "中根据参考线id:" + \ str(link_id) + "搜索到的参考线不是一条,数据异常,请修改数据后手动接边" self.output_info1.append(output_info) return None link_feature = self.link_layer.GetNextFeature() self.link_layer.SetAttributeFilter(None) # 消除属性过滤 self.link_layer.ResetReading() return link_feature def get_lane_edge_by_link_id(self, link_id, mesh_id): # 根据边界线关联的参考线获取边界线 filter = '"' + const.HD_LINK_ID + '" = ' + str(link_id) + ' and "' + const.MESH_ID + '" = ' + "'" + str( mesh_id) + "'" self.lane_edge_layer.SetAttributeFilter(filter) lane_edge_count = self.lane_edge_layer.GetFeatureCount() if lane_edge_count < 2: output_info = "在图幅号为:" + str(mesh_id) + "中根据参考线id:" + \ str(link_id) + "搜索到的边界线不足2条,请手动接边" self.output_info1.append(output_info) return None lane_edge_features = [] for _ in range(lane_edge_count): lane_edge_feature = self.lane_edge_layer.GetNextFeature() if lane_edge_feature is None: continue lane_edge_features.append(lane_edge_feature) if len(lane_edge_features) < 2: output_info = "在图幅号为:" + str(mesh_id) + "中根据参考线id:" + \ str(link_id) + "搜索到的边界线不足2条,请手动接边" self.output_info1.append(output_info) return None self.lane_edge_layer.SetAttributeFilter(None) self.lane_edge_layer.ResetReading() return lane_edge_features def get_lane_link_by_link_id(self, link_id, mesh_id): # 根据中心线关联的参考线寻找中心线 filter = '"' + const.HD_LINK_ID + '" = ' + str(link_id) + ' and "' + const.MESH_ID + '" = ' + "'" + str( mesh_id) + "'" self.lane_link_layer.SetAttributeFilter(filter) lane_link_count = self.lane_link_layer.GetFeatureCount() if lane_link_count == 0: output_info = "在图幅号为:" + str(mesh_id) + "中,根据关联的参考线id:" + \ str(link_id) + "没有搜索到中心线,数据异常,请修改数据后手动接边" self.output_info1.append(output_info) return None lane_link_features = [] for _ in range(lane_link_count): lane_link_feature = self.lane_link_layer.GetNextFeature() if lane_link_feature is None: continue lane_link_features.append(lane_link_feature) if len(lane_link_features) == 0: output_info = "在图幅号为:" + str(mesh_id) + "中,根据关联的参考线id:" + \ str(link_id) + "没有搜索到中心线,数据异常,请修改数据后手动接边" self.output_info1.append(output_info) return None self.lane_link_layer.SetAttributeFilter(None) self.lane_link_layer.ResetReading() return lane_link_features def get_link_edge_by_link(self, link_feature, mesh_id): left_link_edge_id, right_link_edge_id = link_feature[const.LEFT_LINK_EDGE], link_feature[const.RIGHT_LINK_EDGE] if not left_link_edge_id or not right_link_edge_id: output_info = "id为:" + str(link_feature[const.LINK_ID]) + \ "的参考线的左道路边缘或者右道路边缘为空,数据异常,请修改收据后手动接边" self.output_info1.append(output_info) return [] filter = '"' + const.LINK_EDGE_ID + '" = ' + str(left_link_edge_id) + \ ' and "' + const.MESH_ID + '" = ' + "'" + str(mesh_id) + "'" self.link_edge_layer.SetAttributeFilter(filter) link_edge_count = self.link_edge_layer.GetFeatureCount() if link_edge_count != 1: output_info = "根据id:" + str(left_link_edge_id) + "搜索到的道路边缘不是一条,数据异常,请修改数据后手动接边" self.output_info1.append(output_info) return [] left_link_edge_feature = self.link_edge_layer.GetNextFeature() self.link_edge_layer.SetAttributeFilter(None) self.link_edge_layer.ResetReading() filter = '"' + const.LINK_EDGE_ID + '" = ' + str(right_link_edge_id) + \ ' and "' + const.MESH_ID + '" = ' + "'" + str(mesh_id) + "'" self.link_edge_layer.SetAttributeFilter(filter) link_edge_count = self.link_edge_layer.GetFeatureCount() if link_edge_count != 1: output_info = "根据id:" + str(right_link_edge_id) + "搜索到的道路边缘不是一条,请修改数据后手动接边" self.output_info1.append(output_info) return [] right_link_edge_feature = self.link_edge_layer.GetNextFeature() self.link_edge_layer.SetAttributeFilter(None) self.link_edge_layer.ResetReading() return [left_link_edge_feature, right_link_edge_feature] def match_line(self, lines1, lines2, id): # 对前后得线段集合进行匹配,id为参考线的主键 cal_x_mean = lambda line: np.mean([point[0] for point in line[-const.COMPARISON_NUM2:]]) line_info1 = [[cal_x_mean(lines1[i]), i] for i in range(len(lines1))] line_info2 = [[cal_x_mean(lines2[j]), j] for j in range(len(lines2))] line_info1.sort(key=lambda x: x[0]) line_info2.sort(key=lambda x: x[0]) m1, m2 = len(line_info1), len(line_info2) if abs(line_info1[0][0] - line_info2[0][0]) < 1 and abs(line_info1[-1][0] - line_info2[-1][0]) < 1 and m1 != m2: # 道路宽度一样,车道数不一样 output_info = "id为:" + str(id) + "的参考线的接边处,两者的道路宽度近似,但是车道数不一样,请手动接边" self.output_info1.append(output_info) return False, [] if ((abs(line_info1[0][0] - line_info2[0][0]) < 1 and abs(line_info1[-1][0] - line_info2[-1][0]) > 2) or ( abs(line_info1[-1][0] - line_info2[-1][0]) < 1 and abs( line_info1[0][0] - line_info2[0][0]) > 2)) and m1 == m2: # 车道数一样,且起始车道线一样(或者末尾车道线一样),道路宽度不一样 output_info = "id为:" + str(id) + "的参考线的接边处,两者的车道数一样,道路宽度不一样,请手动接边" self.output_info1.append(output_info) return False, [] match_list = [] is_nearest_line = lambda x, min_d, lines: (np.array([abs(line[0] - x) for line in lines]) >= min_d).all() for x1, index1 in line_info1: d = 1000 index = -1 min_x = -1000 for x2, index2 in line_info2: d1 = abs(x2 - x1) if d1 < d: index = index2 d = d1 min_x = x2 if d < const.CONNECT_THRESHOLD and index != -1 and min_x != -1000 and is_nearest_line(min_x, d, line_info1): # 判断是不是相互距离最近 match_list.append([index1, index]) if not match_list: return False, [] return True, match_list @staticmethod def cal_forward_direction(cpt_points, topo_feature, *proj_info): """ 计算前向参考线及其关联数据的方向 计算方式:计算线段首尾点到topo点的距离 1. 如果距离的差值相差不大,就认为y越大越为接边点 2. 如果距离的差值相差挺大,就认为距离topo点最近是接边点 """ blh_topo_point = topo_feature.GetGeometryRef().GetPoints()[0] cpt_topo_point = blh_to_cpt([blh_topo_point], *proj_info)[0] distance1 = two_point_distance_3d(cpt_topo_point, cpt_points[0]) distance2 = two_point_distance_3d(cpt_topo_point, cpt_points[-1]) if abs(distance2 - distance1) > 10: if distance1 < distance2: # 首点为接边点,方向反了 line_direction = -1 cpt_points = cpt_points[::-1] else: # 尾点为接边点,方向为正 line_direction = 1 else: # 判断最大的y是在首点还是尾点取得 if cpt_points[-1][1] > cpt_points[0][1]: line_direction = 1 else: line_direction = -1 cpt_points = cpt_points[::-1] return cpt_points, line_direction @staticmethod def cal_behind_direction(cpt_points, topo_feature, *proj_info): """ 计算后向参考线及其关联数据的方向 计算方式:计算线段首尾点到topo点的距离 1. 如果距离的差值相差不大,就认为y越小越为接边点 2. 如果距离的差值相差挺大,就认为距离topo点最近是接边点 """ blh_topo_point = topo_feature.GetGeometryRef().GetPoints()[0] cpt_topo_point = blh_to_cpt([blh_topo_point], *proj_info)[0] distance1 = two_point_distance_3d(cpt_topo_point, cpt_points[0]) distance2 = two_point_distance_3d(cpt_topo_point, cpt_points[-1]) if abs(distance2 - distance1) > 10: if distance1 <= distance2: # 首点为接边点,方向反了 line_direction = -1 cpt_points = cpt_points[::-1] else: # 尾点为接边点,方向为正 line_direction = 1 else: # 判断最小的y是在首点还是尾点取得 if cpt_points[-1][1] > cpt_points[0][1]: line_direction = -1 cpt_points = cpt_points[::-1] else: line_direction = 1 return cpt_points, line_direction @staticmethod def cal_smoothing_count(cpt_dataset1, cpt_dataset2): # 根据x值得间距计算平滑次数,两数据都指向接边边界 new_cpt_dataset1 = interpolation_dataset_3d(cpt_dataset1, 3) new_cpt_dataset2 = interpolation_dataset_3d(cpt_dataset2, 3) mean_x1 = np.mean([point[0] for point in new_cpt_dataset1[-const.COMPARISON_NUM2:]]) mean_x2 = np.mean([point[0] for point in new_cpt_dataset2[-const.COMPARISON_NUM2:]]) return int(15 + 15 * np.log(1 + abs(mean_x1 - mean_x2)) / np.log(2)) def is_connect_lane_edge(self, topo_feature, link_id1, link_id2, mesh_id1, mesh_id2, connect_boundary, *proj_info): # 基于cpt坐标系做的判断以及获取的数据 lane_edge_features1 = self.get_lane_edge_by_link_id(link_id1, mesh_id1) if len(lane_edge_features1) < 2: output_info = "根据id:" + str(link_id1) + "的参考线搜索到的边界线太少,请手动接边" self.output_info1.append(output_info) return False, [] lane_edge_features2 = self.get_lane_edge_by_link_id(link_id2, mesh_id2) if len(lane_edge_features2) < 2: output_info = "根据id:" + str(link_id2) + "的参考线搜索到的边界线太少,请手动接边" self.output_info1.append(output_info) return False, [] cpt_lane_edge_lines1 = [] lane_edge_directions1 = [] for lane_edge_feature in lane_edge_features1: lane_edge_points = lane_edge_feature.GetGeometryRef().GetPoints() cpt_lane_edge_points = blh_to_cpt(lane_edge_points, *proj_info) new_cpt_lane_edge_points, lane_edge_direction = self.cal_forward_direction(cpt_lane_edge_points, topo_feature, *proj_info) cpt_lane_edge_lines1.append(new_cpt_lane_edge_points) lane_edge_directions1.append(lane_edge_direction) cpt_lane_edge_lines2 = [] lane_edge_directions2 = [] for lane_edge_feature in lane_edge_features2: lane_edge_points = lane_edge_feature.GetGeometryRef().GetPoints() cpt_lane_edge_points = blh_to_cpt(lane_edge_points, *proj_info) new_cpt_lane_edge_points, lane_edge_direction = self.cal_behind_direction(cpt_lane_edge_points, topo_feature, *proj_info) cpt_lane_edge_lines2.append(new_cpt_lane_edge_points) lane_edge_directions2.append(lane_edge_direction) flag, lane_edge_match_list = self.match_line(cpt_lane_edge_lines1, cpt_lane_edge_lines2, link_id1) if not flag: return False, [] smoothing_count_list = [] lane_edge_match_directions = [] get_middle_point = lambda point1, point2: [(point1[0] + point2[0]) / 2, (point1[1] + point2[1]) / 2, (point1[2] + point2[2]) / 2] for index1, index2 in lane_edge_match_list: cpt_lane_edge_line1 = cpt_lane_edge_lines1[index1] cpt_lane_edge_line2 = cpt_lane_edge_lines2[index2] connect_boundary.append(get_middle_point(cpt_lane_edge_line1[-1], cpt_lane_edge_line2[-1])) lane_edge_match_directions.append([lane_edge_directions1[index1], lane_edge_directions2[index2]]) smoothing_count_list.append(self.cal_smoothing_count(cpt_lane_edge_line1, cpt_lane_edge_line2)) return True, [lane_edge_features1, lane_edge_features2, lane_edge_match_directions, lane_edge_match_list, smoothing_count_list] def is_connect_lane_link(self, topo_feature, link_id1, link_id2, mesh_id1, mesh_id2, connect_boundary, *proj_info): # 基于cpt坐标系做的判断以及获取的数据 lane_link_features1 = self.get_lane_link_by_link_id(link_id1, mesh_id1) if not lane_link_features1: return False, [] lane_link_features2 = self.get_lane_link_by_link_id(link_id2, mesh_id2) if not lane_link_features2: return False, [] cpt_lane_link_lines1 = [] lane_link_directions1 = [] for lane_link_feature in lane_link_features1: lane_link_points = lane_link_feature.GetGeometryRef().GetPoints() cpt_lane_link_points = blh_to_cpt(lane_link_points, *proj_info) new_cpt_lane_link_points, lane_link_direction = self.cal_forward_direction(cpt_lane_link_points, topo_feature, *proj_info) cpt_lane_link_lines1.append(new_cpt_lane_link_points) lane_link_directions1.append(lane_link_direction) cpt_lane_link_lines2 = [] lane_link_directions2 = [] for lane_link_feature in lane_link_features2: lane_link_points = lane_link_feature.GetGeometryRef().GetPoints() cpt_lane_link_points = blh_to_cpt(lane_link_points, *proj_info) new_cpt_lane_link_points, lane_link_direction = self.cal_behind_direction(cpt_lane_link_points, topo_feature, *proj_info) cpt_lane_link_lines2.append(new_cpt_lane_link_points) lane_link_directions2.append(lane_link_direction) flag, lane_link_match_list = self.match_line(cpt_lane_link_lines1, cpt_lane_link_lines2, link_id1) if not flag: return False, [] smoothing_count_list = [] lane_link_match_directions = [] get_middle_point = lambda point1, point2: [(point1[0] + point2[0]) / 2, (point1[1] + point2[1]) / 2, (point1[2] + point2[2]) / 2] for index1, index2 in lane_link_match_list: cpt_lane_link_line1 = cpt_lane_link_lines1[index1] cpt_lane_link_line2 = cpt_lane_link_lines2[index2] connect_boundary.append(get_middle_point(cpt_lane_link_line1[-1], cpt_lane_link_line2[-1])) lane_link_match_directions.append([lane_link_directions1[index1], lane_link_directions2[index2]]) smoothing_count_list.append(self.cal_smoothing_count(cpt_lane_link_line1, cpt_lane_link_line2)) return True, [lane_link_features1, lane_link_features2, lane_link_match_directions, lane_link_match_list, smoothing_count_list] def is_connect_link_edge(self, topo_feature, link_feature1, link_feature2, mesh_id1, mesh_id2, connect_boundary, *proj_info): link_edge_features1 = self.get_link_edge_by_link(link_feature1, mesh_id1) if not link_edge_features1: return False, [] link_edge_features2 = self.get_link_edge_by_link(link_feature2, mesh_id2) if not link_edge_features2: return False, [] cpt_link_edge_lines1 = [] link_edge_directions1 = [] for link_edge_feature in link_edge_features1: link_edge_points = link_edge_feature.GetGeometryRef().GetPoints() cpt_link_edge_points = blh_to_cpt(link_edge_points, *proj_info) new_cpt_link_edge_points, link_edge_direction = self.cal_forward_direction(cpt_link_edge_points, topo_feature, *proj_info) cpt_link_edge_lines1.append(new_cpt_link_edge_points) link_edge_directions1.append(link_edge_direction) cpt_link_edge_lines2 = [] link_edge_directions2 = [] for link_edge_feature in link_edge_features2: link_edge_points = link_edge_feature.GetGeometryRef().GetPoints() cpt_link_edge_points = blh_to_cpt(link_edge_points, *proj_info) new_cpt_link_edge_points, link_edge_direction = self.cal_forward_direction(cpt_link_edge_points, topo_feature, *proj_info) cpt_link_edge_lines2.append(new_cpt_link_edge_points) link_edge_directions2.append(link_edge_direction) flag, link_edge_match_list = self.match_line(cpt_link_edge_lines1, cpt_link_edge_lines2, link_feature1[const.LINK_ID]) if not flag: return False, [] smoothing_count_list = [] link_edge_match_directions = [] get_middle_point = lambda point1, point2: [(point1[0] + point2[0]) / 2, (point1[1] + point2[1]) / 2, (point1[2] + point2[2]) / 2] for index1, index2 in link_edge_match_list: cpt_link_edge_line1 = cpt_link_edge_lines1[index1] cpt_link_edge_line2 = cpt_link_edge_lines2[index2] connect_boundary.append(get_middle_point(cpt_link_edge_line1[-1], cpt_link_edge_line2[-1])) link_edge_match_directions.append([link_edge_directions1[index1], link_edge_directions2[index2]]) smoothing_count_list.append(self.cal_smoothing_count(cpt_link_edge_line1, cpt_link_edge_line2)) return True, [link_edge_features1, link_edge_features2, link_edge_match_directions, link_edge_match_list, smoothing_count_list] def is_connect_link(self, topo_feature, link_id1, link_id2, mesh_id1, mesh_id2, connect_boundary, *proj_info): """ 在cpt坐标系下,计算接边处x值得差距,差距不大再计算z值得差距 """ link_feature1 = self.get_link_by_id(link_id1, mesh_id1) if not link_feature1: return False, [] link_feature2 = self.get_link_by_id(link_id2, mesh_id2) if not link_feature2: return False, [] link_geom1, link_geom2 = link_feature1.GetGeometryRef(), link_feature2.GetGeometryRef() if not link_geom1.IsSimple() or not link_geom2.IsSimple(): output_info = "id为:" + str(link_id1) + "或" + str(link_id2) + \ "的参考线的几何不是simple geometry,数据异常,请修改数据后手动接边" self.output_info1.append(output_info) return False, [] link_points1, link_points2 = link_geom1.GetPoints(), link_geom2.GetPoints() cpt_link_points1 = blh_to_cpt(link_points1, *proj_info) cpt_link_points2 = blh_to_cpt(link_points2, *proj_info) new_cpt_link_points1, link_direction1 = self.cal_forward_direction(cpt_link_points1, topo_feature, *proj_info) new_cpt_link_points2, link_direction2 = self.cal_behind_direction(cpt_link_points2, topo_feature, *proj_info) get_middle_point = lambda point1, point2: [(point1[0] + point2[0]) / 2, (point1[1] + point2[1]) / 2, (point1[2] + point2[2]) / 2] connect_boundary.append(get_middle_point(new_cpt_link_points1[-1], new_cpt_link_points2[-1])) interpolation_cpt_link_points1 = interpolation_dataset_3d(new_cpt_link_points1, 4) interpolation_cpt_link_points2 = interpolation_dataset_3d(new_cpt_link_points2, 4) compare_x_list1 = [point[0] for point in interpolation_cpt_link_points1[-const.COMPARISON_NUM1:]] compare_x_list2 = [point[0] for point in interpolation_cpt_link_points2[-const.COMPARISON_NUM1:]] x_mean1, x_mean2 = np.mean(compare_x_list1), np.mean(compare_x_list2) if abs(x_mean1 - x_mean2) > const.LINK_CONNECT_INTERVAL: output_info = "id为:" + str(link_id1) + "和" + str(link_id2) + "的参考线间距太大,请手动接边" self.output_info1.append(output_info) return False, [] compare_z_list1 = [point[2] for point in interpolation_cpt_link_points1[-const.COMPARISON_NUM1:]] compare_z_list2 = [point[2] for point in interpolation_cpt_link_points2[-const.COMPARISON_NUM1:]] if abs(np.mean(compare_z_list1) - np.mean(compare_z_list2)) > const.ELEVATION_THRESHOLD: output_info = "id为:" + str(link_id1) + "和" + str(link_id2) + "的参考线的高程值差距太大,数据异常,请手动接边" self.output_info1.append(output_info) return False, [] smoothing_count = min(max(15, int(abs(x_mean1 - x_mean2) * const.SMOOTHING_COUNT)), 70) # 15到70之间 return True, [link_feature1, link_feature2, link_direction1, link_direction2, smoothing_count] @staticmethod def remove_near_points(dataset): """ 去除相隔很近的数据 :param dataset: utm坐标系下的数据仅计算x, y坐标 :return: 返回的是utm坐标系下的点集 """ dataset1 = dataset[::-1] + [] target_dataset = [] point1 = dataset1.pop() target_dataset.append(point1) while dataset1: point2 = dataset1.pop() distance = two_point_distance_2d(point1, point2) if distance < 0.03: # 两点的距离小于3cm,就删除一个点 continue target_dataset.append(point2) point1 = point2 return target_dataset @staticmethod def norm_dataset(dataset, point): """ 规范化接边的数据 :param dataset: utm坐标系下的点集 :param point: 接边边界上的一个点 :return: 返回一个点集,且在utm坐标系下不与接边边界相交 """ point1 = dataset.pop() if is_same_point_3d(point1, point): point1 = dataset.pop() while dataset: point2 = dataset.pop() if is_same_point_3d(point2, point): if dataset: point2 = dataset.pop() else: break result = three_point_dot_product(point2, point1, point) if result >= 0: point1 = point2 else: dataset.append(point2) dataset.append(point1) break return dataset def get_standard_dataset(self, utm_dataset1, utm_dataset2, utm_boundary, smoothing_count, source_srs, utm_srs, gps_id): """ 在utm坐标系下,将对应的接边数据标准化,操作流程: 1. 剔除相距很近的点,然后插值 2. 寻找与边界的交点 3. 规范化数据,使得接边的线不与边界相交 4. 将前后数据连接前来进行平滑,之后抽稀,之后去除相隔很近的点 :param self: :param utm_dataset1: 前向数据投影到utm坐标系 :param utm_dataset2: 后向数据投影到utm坐标系 :param utm_boundary: 接边边界,utm坐标系下 :param gps_id: 接边附近的topo点的主键,主要用来返回错误信息 :param utm_srs: utm的坐标系 :param source_srs: 图层自带的坐标系 :param smoothing_count: 平滑次数 :return: 返回标准的两个点集数据 """ # 先删除很近的点,再给数据插值,避免造成异常,然后截取部分数据,仅操作一小段数据 if not utm_dataset1 or not utm_dataset2: output_info = "gps_id为:" + str(gps_id) + "的topo点附近的接边线为空" self.output_info1.append(output_info) return False, [] utm_dataset1, utm_dataset2 = self.remove_near_points(utm_dataset1), self.remove_near_points(utm_dataset2) interpol_utm_dataset1 = interpolation_dataset_3d(utm_dataset1, 3) # 按每米3个点进行插值 interpol_utm_dataset2 = interpolation_dataset_3d(utm_dataset2, 3) compare_dataset1 = interpol_utm_dataset1[-const.COMPARISON_NUM2:] intersection_point1 = intersection_of_two_line(compare_dataset1[0], compare_dataset1[-1], utm_boundary[0], utm_boundary[-1]) if not intersection_point1: output_info = "gps_id为:" + str(gps_id) + "的topo点附近的接边线与接边边界平行,数据反常,请手动接边" self.output_info1.append(output_info) return False, [] compare_dataset2 = interpol_utm_dataset2[-const.COMPARISON_NUM2:] intersection_point2 = intersection_of_two_line(compare_dataset2[0], compare_dataset2[-1], utm_boundary[0], utm_boundary[-1]) if not intersection_point2: output_info = "gps_id为:" + str(gps_id) + "的topo点附近的接边线与接边边界平行,数据反常,请手动接边" self.output_info1.append(output_info) return False, [] new_utm_dataset1 = self.norm_dataset(interpol_utm_dataset1, intersection_point1) new_utm_dataset2 = self.norm_dataset(interpol_utm_dataset2, intersection_point2) if len(new_utm_dataset1) < 2 or len(new_utm_dataset2) < 2: output_info = "gps_id为:" + str(gps_id) + "的topo点附近的接边线的大部分超越了接边边界,数据反常,请手动接边" self.output_info1.append(output_info) return False, [] # 由于前面已经插了值,50cm一个点,选取定长的数据可以直接获取 operate_point_num = 20 # 选择操作点的数量,20*0.5表示操作点的长度 # 选择20 * 0.5=10米的数据进行操作,但只取前5个点,避免两条线间距较大时获取的平滑结果有异常时两个点 new_compare_dataset1 = new_utm_dataset1[-operate_point_num:][:5] if len(new_utm_dataset1) - operate_point_num <= 0: new_utm_dataset1 = [] else: new_utm_dataset1 = new_utm_dataset1[:len(new_utm_dataset1) - operate_point_num] # 截取数据,仅操作部分数据 new_compare_dataset2 = new_utm_dataset2[-operate_point_num:][:5] if len(new_utm_dataset2) - operate_point_num <= 0: new_utm_dataset2 = [] else: new_utm_dataset2 = new_utm_dataset2[:len(new_utm_dataset2) - operate_point_num] compare_dataset = interpolation_dataset_3d(new_compare_dataset1 + new_compare_dataset2[::-1], 2) flag, info = self.smoothing_dataset(compare_dataset, smoothing_count, utm_boundary, gps_id) if not flag: return False, [] # 街边附近返回的数据已经按照指定阈值插值 target_compare_dataset1, target_compare_dataset2, intersection_point = info if len(new_utm_dataset1) > 1: new_utm_dataset1 = ImprovedDouglasPeucker(const.INTERPOLATION_COUNT4).main(new_utm_dataset1) if len(new_utm_dataset2) > 1: new_utm_dataset2 = ImprovedDouglasPeucker(const.INTERPOLATION_COUNT4).main(new_utm_dataset2) target_utm_dataset1 = new_utm_dataset1 + target_compare_dataset1 target_utm_dataset2 = new_utm_dataset2 + target_compare_dataset2 blh_intersection_point = transform_to_src([intersection_point], source_srs, utm_srs)[0] target_blh_dataset1 = transform_to_src(target_utm_dataset1, source_srs, utm_srs) target_blh_dataset2 = transform_to_src(target_utm_dataset2, source_srs, utm_srs) target_blh_dataset1.append(blh_intersection_point) target_blh_dataset2.append(blh_intersection_point) if len(target_blh_dataset1) > 1: target_blh_dataset1 = ImprovedDouglasPeucker(const.INTERPOLATION_COUNT4).main(target_blh_dataset1) if len(target_blh_dataset2) > 1: target_blh_dataset2 = ImprovedDouglasPeucker(const.INTERPOLATION_COUNT4).main(target_blh_dataset2) return True, [target_blh_dataset1, target_blh_dataset2, blh_intersection_point] @staticmethod def is_intersect_in_line(point1, point2, point3, point4): # point1和point2为一条线的起终点,point3和point4为一条直线的起终点 # 判断两条线段的交点是否在第一条线上 intersect_point = intersection_of_two_line(point1, point2, point3, point4) if not intersect_point: # 两条线平行,没有交点 return False if three_point_dot_product(point1, intersect_point, point2) > 0: return False return True def smoothing_dataset(self, compare_points, smoothing_count, connect_boundary, gps_id): """ 对接边的数据进行处理:平滑、抽稀、分组,求交点 此处的抽稀中包含了抽稀后按照指定阈值插值 :param compare_points: 需处理的原始数据,已经插值了 :param smoothing_count: 平滑的次数 :param connect_boundary: 接边的边界 :param gps_id: 接边附近某个topo点的主键 :return: """ smoothing_dataset = SavitzkyGolayFilter().main(compare_points, smoothing_count) vacuate_dataset = ImprovedDouglasPeucker(const.INTERPOLATION_COUNT4).main(smoothing_dataset) target_dataset = self.remove_near_points(vacuate_dataset) new_dataset1, new_dataset2 = [], [] for i in range(len(target_dataset) - 1): if self.is_intersect_in_line(target_dataset[i], target_dataset[i + 1], connect_boundary[0], connect_boundary[-1]): new_dataset1 = target_dataset[:i + 1] new_dataset2 = target_dataset[i + 1:][::-1] break if not new_dataset1 or not new_dataset2: output_info = "gps_id为:" + str(gps_id) + "的topo点附近的接边线的大部分超越了接边边界,数据反常,请手动接边" self.output_info1.append(output_info) return False, [] intersection_point = intersection_of_two_line(new_dataset1[-1], new_dataset2[-1], connect_boundary[0], connect_boundary[-1]) if not intersection_point: output_info = "gps_id为:" + str(gps_id) + "的topo点附近的接边线与接边边界平行,数据反常,请手动接边" self.output_info1.append(output_info) return False, [] return True, [new_dataset1, new_dataset2, intersection_point] @staticmethod def get_boundary_point(point, x_mean, y_mean, theta, parameter): # 将一个utm坐标系下的点投影到局部坐标系下,然后计算其对应的拟合值为多少,再反投影到utm坐标系下 new_point = [point[0] - x_mean, point[1] - y_mean] positive_point = PosNegTrans().positive_transformation_point(theta, new_point) X_vector = PolyFit().get_coefficient_vector(positive_point, 1) Y_predict = float(np.squeeze(np.dot(X_vector.T, parameter))) # 去掉矩阵的维度,变为一个浮点数 negative_point = np.zeros((2, 1)) negative_point[0, 0], negative_point[1, 0] = positive_point[0], Y_predict result_point = PosNegTrans().negative_transformation_point(theta, negative_point) target_point = [result_point[0] + x_mean, result_point[1] + y_mean] return target_point def get_connect_boundary(self, cpt_connect_boundary, source_srs, *proj_info): """ 获取两个图幅文件对应接边的两条道路的共有边界 :param self: :param cpt_connect_boundary: cpt坐标系下拟合分割线的点集 :param source_srs: 原始图层自带的坐标系 :param proj_info: 投影相关的参数,三个姿态角以及自身经纬高 :return: utm坐标系下的分割线 """ blh_points = cpt_to_blh(cpt_connect_boundary, *proj_info) utm_points = transform_to_utm(blh_points, source_srs)[0] # 对获取的交界的点集做直线拟合,获取边界 positive_dataset, theta, x_mean, y_mean = PosNegTrans().positive_transformation(utm_points) parameter = PolyFit().poly_fitting_parameter(positive_dataset, 1) if parameter.shape == (1,): # 出现奇异矩阵的时候,默认损失为无穷 output_info = "接边中点点集" + str(cpt_connect_boundary) + "做直线拟合过程中,系数矩阵奇异,请修改数据后手动接边" self.output_info1.append(output_info) return False, [] # 取左边图幅第一条边界西安 target_point1 = self.get_boundary_point(utm_points[0], x_mean, y_mean, theta, parameter) new_target_point1 = [target_point1[0], target_point1[1], utm_points[0][2]] target_point2 = self.get_boundary_point(utm_points[-1], x_mean, y_mean, theta, parameter) new_target_point2 = [target_point2[0], target_point2[1], utm_points[0][2]] return True, [new_target_point1, new_target_point2] def get_node_by_id(self, node_layer, node_id_field, node_id, mesh_id, require_node_point, layer_name, source_srs): """ 根据id搜索节点,并判断节点坐标是否正确 :param node_layer: 节点图层 :param node_id_field: 节点的主键名 :param node_id: 节点id :param mesh_id: 图幅号 :param require_node_point: 节点正确的坐标,坐标系为经纬高 :param layer_name: 对应的线图层的名字 :param source_srs: 原始的坐标系的系统 :return: 返回boolean, feature """ filter = '"' + node_id_field + '" = ' + str(node_id) + ' and "' + \ const.MESH_ID + '" = ' + "'" + str(mesh_id) + "'" node_layer.SetAttributeFilter(filter) node_count = node_layer.GetFeatureCount() if node_count != 1: output_info = "根据id为:" + str(node_id) + "寻找" + str(layer_name) + "节点失败,请修改数据后手动接边" self.output_info1.append(output_info) node_layer.SetAttributeFilter(None) # 消除属性过滤 return False, None node_feature = node_layer.GetNextFeature() node_point = node_feature.GetGeometryRef().GetPoints()[0] node_layer.SetAttributeFilter(None) # 消除属性过滤 node_layer.ResetReading() utm_require_node_point = transform_to_utm([require_node_point], source_srs)[0][0] utm_node_point = transform_to_utm([node_point], source_srs)[0][0] if is_same_point_3d(utm_node_point, utm_require_node_point): return True, node_feature else: output_info = "节点id为:" + str(node_id) + "的" + str(layer_name) + "节点不与" + \ str(layer_name) + "的端点重合,请修改数据后手动接边" self.output_info1.append(output_info) return False, None def connect_point(self, line_feature, ori_node_point, line_direction, blh_node_point, node_layer, node_fields, layer_name, source_srs): """ 连接线图层对应的点图层 :param line_feature: 线feature :param ori_node_point: 原始节点应该的坐标 :param line_direction: 线的方向 :param blh_node_point: 节点 :param node_layer: :param node_fields: 长度为3的列表,第一个表示线的起点字段,第二个表示线的终点字段,第三个表示节点的主键名 :param layer_name: 对应线图层的名字 :param source_srs: 原始坐标系系统 :return: """ mesh_id = line_feature[const.MESH_ID] if line_direction == -1: node_id = line_feature[node_fields[0]] # 获取首点id else: node_id = line_feature[node_fields[1]] # 获取尾点id flag, node_feature = self.get_node_by_id(node_layer, node_fields[2], node_id, mesh_id, ori_node_point, layer_name, source_srs) if not flag: return False geom = create_point_geometry(blh_node_point) node_feature.SetGeometry(geom) node_layer.SetFeature(node_feature) node_feature.Destroy() return True def is_intersect_by_multi_lines(self, node_layer, blh_point, mesh_id, gps_id, layer_name, source_srs): # 判断一个点处是否是多条线的端点,如在路口或者匝道处 lon, lat = blh_point[:2] buffer_size = 0.0000001 min_lon, max_lon = lon - buffer_size, lon + buffer_size min_lat, max_lat = lat - buffer_size, lat + buffer_size node_layer.SetSpatialFilterRect(min_lon, min_lat, max_lon, max_lat) node_count = node_layer.GetFeatureCount() if node_count == 1: node_layer.SetSpatialFilter(None) return False filter = '"' + const.MESH_ID + '" = ' + "'" + str(mesh_id) + "'" node_layer.SetAttributeFilter(filter) node_count = node_layer.GetFeatureCount() if node_count == 1: node_layer.SetSpatialFilter(None) # 消除空间过滤 node_layer.SetAttributeFilter(None) # 消除属性过滤 return False # node_points = [node_layer.GetNextFeature().GetGeometryRef().GetPoints()[0] for _ in range(node_count)] # 使用推导式会出现奇怪的错误 node_points = [] for _ in range(node_count): node_feature = node_layer.GetNextFeature() if not node_feature: continue node_point = node_feature.GetGeometryRef().GetPoints()[0] node_points.append(node_point) utm_node_points = transform_to_utm(node_points, source_srs)[0] utm_point = transform_to_utm([blh_point], source_srs)[0][0] node_layer.SetSpatialFilter(None) # 消除空间过滤 node_layer.SetAttributeFilter(None) # 消除属性过滤 node_layer.ResetReading() same_point_num = 0 for point1 in utm_node_points: if point1 and is_same_point_3d(utm_point, point1): same_point_num += 1 if same_point_num >= 2: output_info = "gps_id:" + str(gps_id) + ",mesh_id:" + str(mesh_id) + "的topo点附近的接边处," + \ str(layer_name) + "的节点坐标为:" + str(blh_point) + ",在此处有2条线共点,请修改数据再接边" self.output_info1.append(output_info) return True def connect_line(self, source_srs, utm_boundary, gps_id, attribute_info, connect_info): """ 连接线图层以及对应的点图层,先连接线图层,再连接点图层 :param source_srs: 图层的坐标系 :param utm_boundary: 接边的边界 :param gps_id: 接边附近的一个topo点 :param attribute_info: 接边的属性信息 :param connect_info: 接边的连接信息,根据计算获取 :return: boolean """ if not attribute_info or not connect_info: return False line_layer, node_layer, node_fields, layer_name = attribute_info features1, features2, match_directions, match_list, smoothing_count_list = connect_info if line_layer is None: # 没有边缘图层时,直接运行结束 return False for i in range(len(match_list)): index1, index2 = match_list[i] smoothing_count = smoothing_count_list[i] feature1, feature2 = features1[index1], features2[index2] direction1, direction2 = match_directions[i] blh_points1 = feature1.GetGeometryRef().GetPoints() blh_points2 = feature2.GetGeometryRef().GetPoints() utm_points1, utm_srs = transform_to_utm(blh_points1, source_srs) utm_points2 = transform_to_utm(blh_points2, source_srs)[0] cal_line_length = lambda line: sum([two_point_distance_2d(line[j], line[j + 1]) for j in range(len(line) - 1)]) length1, length2 = cal_line_length(utm_points1), cal_line_length(utm_points2) if length1 < 3 or length2 < 3: output_info = "gps_id为:" + str(gps_id) + "附近的接边处," + \ str(layer_name) + "的两条线至少有一条长度太短,自动接边有风险,请手动接边" self.output_info1.append(output_info) return False if direction1 == -1: utm_points1 = utm_points1[::-1] blh_points1 = blh_points1[::-1] if direction2 == -1: utm_points2 = utm_points2[::-1] blh_points2 = blh_points2[::-1] flag1 = self.is_intersect_by_multi_lines(node_layer, blh_points1[-1], feature1[const.MESH_ID], gps_id, layer_name, source_srs) flag2 = self.is_intersect_by_multi_lines(node_layer, blh_points2[-1], feature2[const.MESH_ID], gps_id, layer_name, source_srs) if flag1 or flag2: return False flag, connect_info = self.get_standard_dataset(utm_points1, utm_points2, utm_boundary, smoothing_count, source_srs, utm_srs, gps_id) if not flag: return False target_blh_dataset1, target_blh_dataset2, blh_intersection_point = connect_info if direction1 == -1: target_blh_dataset1 = target_blh_dataset1[::-1] if direction2 == -1: target_blh_dataset2 = target_blh_dataset2[::-1] geom1 = create_line_geometry(target_blh_dataset1) geom2 = create_line_geometry(target_blh_dataset2) feature1.SetGeometry(geom1) feature2.SetGeometry(geom2) line_layer.SetFeature(feature1) line_layer.SetFeature(feature2) # 连接线图层对应的节点图层 if not self.connect_point(feature1, blh_points1[-1], direction1, blh_intersection_point, node_layer, node_fields, layer_name, source_srs) or \ not self.connect_point(feature2, blh_points2[-1], direction2, blh_intersection_point, node_layer, node_fields, layer_name, source_srs): output_info = "gps_id为:" + str(gps_id) + "附近的接边处," + \ str(layer_name) + "的节点没有连接成功,请手动接边" self.output_info1.append(output_info) return False return True @staticmethod def reset_mesh_id(topo_feature, mesh_id1, mesh_id2): # 根据topo点的信息,重新设置图幅号,按照采集车的行驶轨迹,确认前后的图幅号 mesh_id = topo_feature["MESHID"] if mesh_id == mesh_id1: mesh_id1, mesh_id2 = mesh_id2, mesh_id1 return mesh_id1, mesh_id2 def models(self, topo_features, mesh_id1, mesh_id2): source_srs = self.link_layer.GetSpatialRef() # 获取图层的坐标系 for topo_feature in topo_features: try: link_ids = [] # 存储已经接过边的道路参考线id,主要应用于多次同路轨迹 mesh_id1, mesh_id2 = self.reset_mesh_id(topo_feature, mesh_id1, mesh_id2) if self.is_gap_or_big_curve(topo_feature, source_srs): # 大弯道或者大间断 continue flag, connect_info = self.is_connect_road(topo_feature, mesh_id1, mesh_id2, source_srs) if not flag: continue connect_topo_feature, link_id1, link_id2 = connect_info # connect_topo_feature表示连接处的topo点 if link_id1 in link_ids and link_id2 in link_ids: # 此处已经接过边了,不需要重复接边 continue gps_id = connect_topo_feature[const.ID] # topo点的主键 lon, lat, up = connect_topo_feature.GetGeometryRef().GetPoints()[0] pitch = float(connect_topo_feature[const.PITCH]) roll = float(connect_topo_feature[const.ROLL]) azimuth = float(connect_topo_feature[const.AZIMUTH]) proj_info = [lon, lat, up, pitch, roll, azimuth] cpt_connect_boundary = [] flag1, link_info = self.is_connect_link(connect_topo_feature, link_id1, link_id2, mesh_id1, mesh_id2, cpt_connect_boundary, *proj_info) if not flag1: continue link_feature1, link_feature2, link_direction1, link_direction2, link_smoothing_count = link_info flag2, lane_edge_connect_info = self.is_connect_lane_edge(connect_topo_feature, link_id1, link_id2, mesh_id1, mesh_id2, cpt_connect_boundary, *proj_info) if not flag2: continue flag3, lane_link_connect_info = self.is_connect_lane_link(connect_topo_feature, link_id1, link_id2, mesh_id1, mesh_id2, cpt_connect_boundary, *proj_info) if not flag3: continue link_edge_connect_info = None if self.link_edge_layer: flag4, link_edge_connect_info = self.is_connect_link_edge(connect_topo_feature, link_feature1, link_feature2, mesh_id1, mesh_id2, cpt_connect_boundary, *proj_info) if not flag4: continue if len(cpt_connect_boundary) < 2: output_info = "gps_id为:" + str(gps_id) + "的gps附近的接边数据的边界点太少,数据异常,请修改数据后接边" self.output_info1.append(output_info) continue flag5, utm_connect_boundary = self.get_connect_boundary(cpt_connect_boundary, source_srs, *proj_info) if not flag5: continue # 连接参考线及其节点 link_attribute_info = [self.link_layer, self.node_layer, [const.START_NODE_ID, const.END_NODE_ID, const.NODE_ID], const.HD_LINK] link_connect_info = [[link_feature1], [link_feature2], [[link_direction1, link_direction2]], [[0, 0]], [link_smoothing_count]] if self.connect_line(source_srs, utm_connect_boundary, gps_id, link_attribute_info, link_connect_info): output_info = "gps_id为:" + str(gps_id) + "的topo的接边处,参考线接边成功" self.output_info2.append(output_info) else: output_info = "gps_id为:" + str(gps_id) + "的topo的接边处,参考线接边失败,请手动接边" self.output_info1.append(output_info) continue # 连接边界线及其节点 lane_edge_attribute_info = [self.lane_edge_layer, self.lane_edge_node_layer, [const.START_EDGE_NODE_ID, const.END_EDGE_NODE_ID, const.LANE_EDGE_NODE_ID], const.HD_LANE_EDGE] if self.connect_line(source_srs, utm_connect_boundary, gps_id, lane_edge_attribute_info, lane_edge_connect_info): output_info = "gps_id为:" + str(gps_id) + "的topo的接边处,边界线接边成功" self.output_info2.append(output_info) else: output_info = "gps_id为:" + str(gps_id) + "的topo的接边处,边界线接边失败,请手动接边" self.output_info1.append(output_info) continue # 连接中心线及其节点 lane_link_attribute_info = [self.lane_link_layer, self.lane_node_layer, [const.START_LANE_NODE_ID, const.END_LANE_NODE_ID, const.LANE_NODE_ID], const.HD_LANE_LINK] if self.connect_line(source_srs, utm_connect_boundary, gps_id, lane_link_attribute_info, lane_link_connect_info): output_info = "gps_id为:" + str(gps_id) + "的topo的接边处,中心线接边成功" self.output_info2.append(output_info) else: output_info = "gps_id为:" + str(gps_id) + "的topo的接边处,中心线接边失败,请手动接边" self.output_info1.append(output_info) continue # 连接道路边缘及其节点 link_edge_attribute_info = [self.link_edge_layer, self.link_edge_node_layer, [const.START_EDGE_NODE_ID, const.END_EDGE_NODE_ID, const.LINK_EDGE_NODE_ID], const.HD_LINK_EDGE] if self.connect_line(source_srs, utm_connect_boundary, gps_id, link_edge_attribute_info, link_edge_connect_info): output_info = "gps_id为:" + str(gps_id) + "的topo的接边处,道路边缘接边成功" self.output_info2.append(output_info) else: if self.link_edge_layer: output_info = "gps_id为:" + str(gps_id) + "的topo的接边处,道路边缘接边失败,请手动接边" self.output_info1.append(output_info) continue # 运行到此处,说明所有该接边的图层都接边了 link_ids.append(link_id1) link_ids.append(link_id2) except Exception as e: output_info = "在gps_id为:" + str(topo_feature[const.ID]) + \ "附近的接边处,接边程序出发异常,异常为:" + str(e) + "请手动接边" self.output_info1.append(output_info) continue