Coverage for sarvey/triangulation.py: 95%

62 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2024-10-17 12:36 +0000

1#!/usr/bin/env python 

2 

3# SARvey - A multitemporal InSAR time series tool for the derivation of displacements. 

4# 

5# Copyright (C) 2021-2024 Andreas Piter (IPI Hannover, piter@ipi.uni-hannover.de) 

6# 

7# This software was developed together with FERN.Lab (fernlab@gfz-potsdam.de) in the context 

8# of the SAR4Infra project with funds of the German Federal Ministry for Digital and 

9# Transport and contributions from Landesamt fuer Vermessung und Geoinformation 

10# Schleswig-Holstein and Landesbetrieb Strassenbau und Verkehr Schleswig-Holstein. 

11# 

12# This program is free software: you can redistribute it and/or modify it under 

13# the terms of the GNU General Public License as published by the Free Software 

14# Foundation, either version 3 of the License, or (at your option) any later 

15# version. 

16# 

17# Important: This package uses PyMaxFlow. The core of PyMaxflows library is the C++ 

18# implementation by Vladimir Kolmogorov. It is also licensed under the GPL, but it REQUIRES that you 

19# cite [BOYKOV04] (see LICENSE) in any resulting publication if you use this code for research purposes. 

20# This requirement extends to SARvey. 

21# 

22# This program is distributed in the hope that it will be useful, but WITHOUT 

23# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 

24# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 

25# details. 

26# 

27# You should have received a copy of the GNU Lesser General Public License along 

28# with this program. If not, see <https://www.gnu.org/licenses/>. 

29 

30"""Triangulation module for SARvey.""" 

31import time 

32from typing import Optional 

33import numpy as np 

34from scipy.spatial import Delaunay, distance_matrix, KDTree 

35from scipy.sparse import lil_matrix, csr_matrix 

36from scipy.sparse.csgraph import connected_components 

37from logging import Logger 

38 

39from mintpy.utils import ptime 

40 

41 

42class PointNetworkTriangulation: 

43 """PointNetworkTriangulation.""" 

44 

45 def __init__(self, *, coord_xy: np.ndarray, coord_utmxy: Optional[np.ndarray], logger: Logger): 

46 """Triangulate points in space based on distance. 

47 

48 Parameters 

49 ---------- 

50 coord_xy: np.ndarray 

51 Radar coordinates of the points. 

52 coord_utmxy: np.ndarray 

53 UTM coordinates of the points. 

54 logger: Logger 

55 Logging handler. 

56 """ 

57 self.coord_xy = coord_xy 

58 num_points = self.coord_xy.shape[0] 

59 self.logger = logger 

60 

61 # create sparse matrix with dim (num_points x num_points), add 1 if connected. 

62 # create network afterwards once. reduces time. 

63 self.adj_mat = lil_matrix((num_points, num_points), dtype=np.bool_) 

64 

65 if coord_utmxy is not None: 

66 logger.info(msg="create distance matrix between all points...") 

67 self.dist_mat = distance_matrix(coord_utmxy, coord_utmxy) 

68 # todo: check out alternatives: 

69 # scipy.spatial.KDTree.sparse_distance_matrix 

70 else: # if only global delaunay shall be computed without memory issues 

71 self.dist_mat = None 

72 

73 def getArcsFromAdjMat(self): 

74 """Convert the adjacency matrix into a list of arcs. 

75 

76 Returns 

77 ------- 

78 arcs: np.ndarray 

79 List of arcs with indices of the start and end point. 

80 """ 

81 a = self.adj_mat.copy() 

82 # copy entries from lower to upper triangular matrix 

83 b = (a + a.T) 

84 # remove entries from diagonal and lower part of matrix 

85 arc_tmp = [[i, b.indices[b.indptr[i]:b.indptr[i + 1]]] for i in range(b.shape[0])] 

86 arc_tmp = [[s, e_list[np.where(e_list < s)[0]]] for s, e_list in arc_tmp] 

87 

88 arcs = list() 

89 for s, e_list in arc_tmp: 

90 for e in e_list: 

91 arcs.append([s, e]) 

92 arcs = np.array(arcs) 

93 return arcs 

94 

95 def removeLongArcs(self, *, max_dist: float): 

96 """Remove arcs from network which are longer than given threshold. 

97 

98 Parameter 

99 --------- 

100 max_dist: float 

101 distance threshold on arc length in [m] 

102 """ 

103 mask = self.dist_mat > max_dist 

104 self.adj_mat[mask] = False 

105 

106 def isConnected(self): 

107 """Check if the network is connected.""" 

108 n_components = connected_components(csgraph=csr_matrix(self.adj_mat), directed=False, return_labels=False) 

109 if n_components == 1: 

110 return True 

111 else: 

112 return False 

113 

114 def triangulateGlobal(self): 

115 """Connect the points with a GLOBAL delaunay triangulation.""" 

116 self.logger.info(msg="Triangulate points with global delaunay.") 

117 

118 network = Delaunay(points=self.coord_xy) 

119 for p1, p2, p3 in network.simplices: 

120 self.adj_mat[p1, p2] = True 

121 self.adj_mat[p1, p3] = True 

122 self.adj_mat[p2, p3] = True 

123 

124 def triangulateKnn(self, *, k: int): 

125 """Connect points to the k-nearest neighbours.""" 

126 self.logger.info(msg="Triangulate points with {}-nearest neighbours.".format(k)) 

127 num_points = self.coord_xy.shape[0] 

128 prog_bar = ptime.progressBar(maxValue=num_points) 

129 start_time = time.time() 

130 count = 0 

131 tree = KDTree(data=self.coord_xy) 

132 

133 if k > num_points: 

134 k = num_points 

135 self.logger.info(msg="k > number of points. Connect all points with each other.") 

136 for p1 in range(num_points): 

137 idx = tree.query(self.coord_xy[p1, :], k)[1] 

138 self.adj_mat[p1, idx] = True 

139 count += 1 

140 prog_bar.update(value=count + 1, every=np.int16(num_points / 250), 

141 suffix='{}/{} points triangulated'.format(count + 1, num_points + 1)) 

142 prog_bar.close() 

143 m, s = divmod(time.time() - start_time, 60) 

144 self.logger.debug(msg='time used: {:02.0f} mins {:02.1f} secs.'.format(m, s))