Coverage for sarvey/triangulation.py: 95%
62 statements
« prev ^ index » next coverage.py v7.6.0, created at 2024-10-17 12:36 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2024-10-17 12:36 +0000
1#!/usr/bin/env python
3# SARvey - A multitemporal InSAR time series tool for the derivation of displacements.
4#
5# Copyright (C) 2021-2024 Andreas Piter (IPI Hannover, piter@ipi.uni-hannover.de)
6#
7# This software was developed together with FERN.Lab (fernlab@gfz-potsdam.de) in the context
8# of the SAR4Infra project with funds of the German Federal Ministry for Digital and
9# Transport and contributions from Landesamt fuer Vermessung und Geoinformation
10# Schleswig-Holstein and Landesbetrieb Strassenbau und Verkehr Schleswig-Holstein.
11#
12# This program is free software: you can redistribute it and/or modify it under
13# the terms of the GNU General Public License as published by the Free Software
14# Foundation, either version 3 of the License, or (at your option) any later
15# version.
16#
17# Important: This package uses PyMaxFlow. The core of PyMaxflows library is the C++
18# implementation by Vladimir Kolmogorov. It is also licensed under the GPL, but it REQUIRES that you
19# cite [BOYKOV04] (see LICENSE) in any resulting publication if you use this code for research purposes.
20# This requirement extends to SARvey.
21#
22# This program is distributed in the hope that it will be useful, but WITHOUT
23# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
24# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
25# details.
26#
27# You should have received a copy of the GNU Lesser General Public License along
28# with this program. If not, see <https://www.gnu.org/licenses/>.
30"""Triangulation module for SARvey."""
31import time
32from typing import Optional
33import numpy as np
34from scipy.spatial import Delaunay, distance_matrix, KDTree
35from scipy.sparse import lil_matrix, csr_matrix
36from scipy.sparse.csgraph import connected_components
37from logging import Logger
39from mintpy.utils import ptime
42class PointNetworkTriangulation:
43 """PointNetworkTriangulation."""
45 def __init__(self, *, coord_xy: np.ndarray, coord_utmxy: Optional[np.ndarray], logger: Logger):
46 """Triangulate points in space based on distance.
48 Parameters
49 ----------
50 coord_xy: np.ndarray
51 Radar coordinates of the points.
52 coord_utmxy: np.ndarray
53 UTM coordinates of the points.
54 logger: Logger
55 Logging handler.
56 """
57 self.coord_xy = coord_xy
58 num_points = self.coord_xy.shape[0]
59 self.logger = logger
61 # create sparse matrix with dim (num_points x num_points), add 1 if connected.
62 # create network afterwards once. reduces time.
63 self.adj_mat = lil_matrix((num_points, num_points), dtype=np.bool_)
65 if coord_utmxy is not None:
66 logger.info(msg="create distance matrix between all points...")
67 self.dist_mat = distance_matrix(coord_utmxy, coord_utmxy)
68 # todo: check out alternatives:
69 # scipy.spatial.KDTree.sparse_distance_matrix
70 else: # if only global delaunay shall be computed without memory issues
71 self.dist_mat = None
73 def getArcsFromAdjMat(self):
74 """Convert the adjacency matrix into a list of arcs.
76 Returns
77 -------
78 arcs: np.ndarray
79 List of arcs with indices of the start and end point.
80 """
81 a = self.adj_mat.copy()
82 # copy entries from lower to upper triangular matrix
83 b = (a + a.T)
84 # remove entries from diagonal and lower part of matrix
85 arc_tmp = [[i, b.indices[b.indptr[i]:b.indptr[i + 1]]] for i in range(b.shape[0])]
86 arc_tmp = [[s, e_list[np.where(e_list < s)[0]]] for s, e_list in arc_tmp]
88 arcs = list()
89 for s, e_list in arc_tmp:
90 for e in e_list:
91 arcs.append([s, e])
92 arcs = np.array(arcs)
93 return arcs
95 def removeLongArcs(self, *, max_dist: float):
96 """Remove arcs from network which are longer than given threshold.
98 Parameter
99 ---------
100 max_dist: float
101 distance threshold on arc length in [m]
102 """
103 mask = self.dist_mat > max_dist
104 self.adj_mat[mask] = False
106 def isConnected(self):
107 """Check if the network is connected."""
108 n_components = connected_components(csgraph=csr_matrix(self.adj_mat), directed=False, return_labels=False)
109 if n_components == 1:
110 return True
111 else:
112 return False
114 def triangulateGlobal(self):
115 """Connect the points with a GLOBAL delaunay triangulation."""
116 self.logger.info(msg="Triangulate points with global delaunay.")
118 network = Delaunay(points=self.coord_xy)
119 for p1, p2, p3 in network.simplices:
120 self.adj_mat[p1, p2] = True
121 self.adj_mat[p1, p3] = True
122 self.adj_mat[p2, p3] = True
124 def triangulateKnn(self, *, k: int):
125 """Connect points to the k-nearest neighbours."""
126 self.logger.info(msg="Triangulate points with {}-nearest neighbours.".format(k))
127 num_points = self.coord_xy.shape[0]
128 prog_bar = ptime.progressBar(maxValue=num_points)
129 start_time = time.time()
130 count = 0
131 tree = KDTree(data=self.coord_xy)
133 if k > num_points:
134 k = num_points
135 self.logger.info(msg="k > number of points. Connect all points with each other.")
136 for p1 in range(num_points):
137 idx = tree.query(self.coord_xy[p1, :], k)[1]
138 self.adj_mat[p1, idx] = True
139 count += 1
140 prog_bar.update(value=count + 1, every=np.int16(num_points / 250),
141 suffix='{}/{} points triangulated'.format(count + 1, num_points + 1))
142 prog_bar.close()
143 m, s = divmod(time.time() - start_time, 60)
144 self.logger.debug(msg='time used: {:02.0f} mins {:02.1f} secs.'.format(m, s))