Coverage for sarvey/preparation.py: 60%
95 statements
« prev ^ index » next coverage.py v7.6.0, created at 2024-10-17 12:36 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2024-10-17 12:36 +0000
1#!/usr/bin/env python
3# SARvey - A multitemporal InSAR time series tool for the derivation of displacements.
4#
5# Copyright (C) 2021-2024 Andreas Piter (IPI Hannover, piter@ipi.uni-hannover.de)
6#
7# This software was developed together with FERN.Lab (fernlab@gfz-potsdam.de) in the context
8# of the SAR4Infra project with funds of the German Federal Ministry for Digital and
9# Transport and contributions from Landesamt fuer Vermessung und Geoinformation
10# Schleswig-Holstein and Landesbetrieb Strassenbau und Verkehr Schleswig-Holstein.
11#
12# This program is free software: you can redistribute it and/or modify it under
13# the terms of the GNU General Public License as published by the Free Software
14# Foundation, either version 3 of the License, or (at your option) any later
15# version.
16#
17# Important: This package uses PyMaxFlow. The core of PyMaxflows library is the C++
18# implementation by Vladimir Kolmogorov. It is also licensed under the GPL, but it REQUIRES that you
19# cite [BOYKOV04] (see LICENSE) in any resulting publication if you use this code for research purposes.
20# This requirement extends to SARvey.
21#
22# This program is distributed in the hope that it will be useful, but WITHOUT
23# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
24# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
25# details.
26#
27# You should have received a copy of the GNU Lesser General Public License along
28# with this program. If not, see <https://www.gnu.org/licenses/>.
30"""Preparation module for SARvey."""
31import datetime
32import matplotlib.pyplot as plt
33import numpy as np
34from logging import Logger
35from os.path import join
37import mintpy.utils.readfile as readfile
39from sarvey import viewer
40import sarvey.utils as ut
41from sarvey.objects import CoordinatesUTM, AmplitudeImage, BaseStack, Points
42from sarvey.triangulation import PointNetworkTriangulation
45def createTimeMaskFromDates(*, start_date: str, stop_date: str, date_list: list, logger: Logger):
46 """Create a mask with selected dates within given time frame.
48 Parameters
49 ----------
50 start_date: str
51 Start date.
52 stop_date: str
53 Stop date.
54 date_list: list
55 all avaiable dates in the slcStack.h5.
56 logger: Logger
57 Logging handler.
59 Returns
60 -------
61 time_mask: np.ndarray
62 mask with True for selected dates.
63 num_slc: int
64 number of selected images.
65 result_date_list: list
66 list of selected dates.
67 """
68 time_mask = np.ones((len(date_list)), dtype=np.bool_)
69 date_list = [datetime.date(year=int(d[:4]), month=int(d[4:6]), day=int(d[6:])) for d in date_list]
71 if (start_date is None) and (stop_date is None):
72 # use all images.
73 result_date_list = [date.isoformat() for date in date_list]
74 return time_mask, time_mask.shape[0], result_date_list
76 if start_date is None:
77 start_date = min(date_list)
78 else:
79 start_date = datetime.date.fromisoformat(start_date)
81 if stop_date is None:
82 stop_date = max(date_list)
83 else:
84 stop_date = datetime.date.fromisoformat(stop_date)
86 if start_date >= stop_date:
87 logger.error(msg="Choose start date < stop date!")
88 raise ValueError
90 if stop_date < min(date_list):
91 logger.error(msg="Stop date is before the first acquired image. Choose a later stop date!")
92 raise ValueError
94 if start_date > max(date_list):
95 logger.error(msg="Start date is after the last acquired image. Choose an earlier start date!")
96 raise ValueError
98 shift = " "
99 logger.debug(msg=shift + "{:>10} {:>10}".format(" Date ", "Selected"))
100 logger.debug(msg=shift + "{:>10} {:>10}".format("__________", "________"))
102 result_date_list = list()
103 for i, date in enumerate(date_list):
104 if (date < start_date) or (date > stop_date):
105 time_mask[i] = False
106 else:
107 result_date_list.append(date.isoformat())
108 val = " x" if time_mask[i] else ""
109 logger.debug(msg=shift + "{:>10} {:>3}".format(date.isoformat(), val))
111 num_slc = time_mask[time_mask].shape[0]
112 return time_mask, num_slc, result_date_list
115def readSlcFromMiaplpy(*, path: str, box: tuple = None, logger: Logger) -> np.ndarray:
116 """Read SLC data from phase-linking results of Miaplpy.
118 Parameters
119 ----------
120 path: str
121 Path to the phase_series.h5 file.
122 box: tuple
123 Bounding Box to read from.
124 logger: Logger
125 Logging handler.
127 Returns
128 -------
129 slc: np.ndarray
130 slc stack created from phase-linking results.
131 """
132 logger.info(msg="read phase from MiaplPy results...")
133 phase = readfile.read(path, datasetName='phase', box=box)[0]
135 logger.info(msg="read amplitude from MiaplPy results...")
136 amp = readfile.read(path, datasetName='amplitude', box=box)[0]
138 logger.info(msg="combine phase and amplitude to slc...")
139 slc = amp * np.exp(phase * 1j)
140 return slc
143def readCoherenceFromMiaplpy(*, path: str, box: tuple = None, logger: Logger) -> tuple[np.ndarray, dict]:
144 """Read the coherence image from phase-linking of MiaplPy.
146 Parameters
147 ----------
148 path: str
149 Path to phase_series.h5 file.
150 box: tuple
151 Bounding Box to read from.
152 logger: Logger
153 Logging handler.
155 Returns
156 -------
157 temp_coh: np.ndarray
158 temporal coherence image from phase-linking results of MiaplPy.
159 """
160 logger.info(msg="read quality from MiaplPy results...")
161 temp_coh = readfile.read(path, datasetName='temporalCoherence', box=box)[0][1, :, :]
162 return temp_coh
165def selectPixels(*, path: str, selection_method: str, thrsh: float,
166 grid_size: int = None, bool_plot: bool = False, logger: Logger):
167 """Select pixels based on temporal coherence.
169 Parameters
170 ----------
171 path: str
172 Path to the directory with the temporal_coherence.h5 file.
173 selection_method: str
174 Pixel selection method. Currently, only "temp_coh" is implemented.
175 thrsh: float
176 Threshold for pixel selection.
177 grid_size: int
178 Grid size for sparse pixel selection.
179 bool_plot: bool
180 Plot the selected pixels.
181 logger: Logger
182 Logging handler.
184 Returns
185 -------
186 cand_mask: np.ndarray
187 Mask with selected pixels.
188 """
189 quality = None
190 grid_min_val = None
191 cand_mask = None
192 unit = None
193 cmap = None
194 # compute candidates
195 if selection_method == "temp_coh":
196 temp_coh_obj = BaseStack(file=join(path, "temporal_coherence.h5"), logger=logger)
197 quality = temp_coh_obj.read(dataset_name="temp_coh")
198 cand_mask = quality >= thrsh
199 grid_min_val = False
200 unit = "Temporal\nCoherence [ ]"
201 cmap = "autumn"
203 if selection_method == "miaplpy":
204 raise NotImplementedError("This part is not developed yet. MiaplPy data is read in another way.")
205 # pl_coherence = readCoherenceFromMiaplpy(path=join(path, 'inverted', 'phase_series.h5'), box=None,
206 # logger=logger)
207 # cand_mask = pl_coherence >= thrsh
208 # quality = pl_coherence
209 # grid_min_val = False
210 # unit = "Phase-Linking\nCoherence [ ]"
211 # cmap = "autumn"
213 if grid_size is not None: # -> sparse pixel selection
214 coord_utm_obj = CoordinatesUTM(file_path=join(path, "coordinates_utm.h5"), logger=logger)
215 coord_utm_obj.open()
216 box_list = ut.createSpatialGrid(coord_utm_img=coord_utm_obj.coord_utm,
217 length=coord_utm_obj.coord_utm.shape[1],
218 width=coord_utm_obj.coord_utm.shape[2],
219 grid_size=grid_size)[0]
220 cand_mask_sparse = ut.selectBestPointsInGrid(box_list=box_list, quality=quality, sel_min=grid_min_val)
221 cand_mask &= cand_mask_sparse
223 if bool_plot:
224 coord_xy = np.array(np.where(cand_mask)).transpose()
225 bmap_obj = AmplitudeImage(file_path=join(path, "background_map.h5"))
226 viewer.plotScatter(value=quality[cand_mask], coord=coord_xy, bmap_obj=bmap_obj, ttl="Selected pixels",
227 unit=unit, s=2, cmap=cmap, vmin=0, vmax=1, logger=logger)
228 # if grid_size is not None:
229 # psViewer.plotGridFromBoxList(box_list, ax=ax, edgecolor="k", linewidth=0.2)
230 plt.tight_layout()
231 plt.gcf().savefig(join(path, "pic", "selected_pixels_{}_{}.png".format(selection_method, thrsh)),
232 dpi=300)
233 plt.close(plt.gcf())
235 return cand_mask
238def createArcsBetweenPoints(*, point_obj: Points, knn: int = None, max_arc_length: float = np.inf,
239 logger: Logger) -> np.ndarray:
240 """Create a spatial network of arcs to triangulate the points.
242 All points are triangulated with a Delaunay triangulation. If knn is given, the triangulation is done with the k
243 nearest neighbors. Too long arcs are removed from the network. If, afterward, the network is not connected, a
244 delaunay triangulation is performed again to ensure connectivity in the network.
246 Parameters
247 ----------
248 point_obj: Points
249 Point object.
250 knn: int
251 Number of nearest neighbors to consider (default: None).
252 max_arc_length: float
253 Maximum length of an arc. Longer arcs will be removed. Default: np.inf.
254 logger: Logger
255 Logging handler.
257 Returns
258 -------
259 arcs: np.ndarray
260 Arcs of the triangulation containing the indices of the points for each arc.
261 """
262 triang_obj = PointNetworkTriangulation(coord_xy=point_obj.coord_xy, coord_utmxy=point_obj.coord_utm, logger=logger)
264 if knn is not None:
265 triang_obj.triangulateKnn(k=knn)
267 triang_obj.triangulateGlobal()
269 logger.info(msg="remove arcs with length > {}.".format(max_arc_length))
270 triang_obj.removeLongArcs(max_dist=max_arc_length)
272 if not triang_obj.isConnected():
273 triang_obj.triangulateGlobal()
275 logger.info(msg="retrieve arcs from adjacency matrix.")
276 arcs = triang_obj.getArcsFromAdjMat()
277 return arcs