Coverage for sarvey/preparation.py: 60%

95 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2024-10-17 12:36 +0000

1#!/usr/bin/env python 

2 

3# SARvey - A multitemporal InSAR time series tool for the derivation of displacements. 

4# 

5# Copyright (C) 2021-2024 Andreas Piter (IPI Hannover, piter@ipi.uni-hannover.de) 

6# 

7# This software was developed together with FERN.Lab (fernlab@gfz-potsdam.de) in the context 

8# of the SAR4Infra project with funds of the German Federal Ministry for Digital and 

9# Transport and contributions from Landesamt fuer Vermessung und Geoinformation 

10# Schleswig-Holstein and Landesbetrieb Strassenbau und Verkehr Schleswig-Holstein. 

11# 

12# This program is free software: you can redistribute it and/or modify it under 

13# the terms of the GNU General Public License as published by the Free Software 

14# Foundation, either version 3 of the License, or (at your option) any later 

15# version. 

16# 

17# Important: This package uses PyMaxFlow. The core of PyMaxflows library is the C++ 

18# implementation by Vladimir Kolmogorov. It is also licensed under the GPL, but it REQUIRES that you 

19# cite [BOYKOV04] (see LICENSE) in any resulting publication if you use this code for research purposes. 

20# This requirement extends to SARvey. 

21# 

22# This program is distributed in the hope that it will be useful, but WITHOUT 

23# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 

24# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 

25# details. 

26# 

27# You should have received a copy of the GNU Lesser General Public License along 

28# with this program. If not, see <https://www.gnu.org/licenses/>. 

29 

30"""Preparation module for SARvey.""" 

31import datetime 

32import matplotlib.pyplot as plt 

33import numpy as np 

34from logging import Logger 

35from os.path import join 

36 

37import mintpy.utils.readfile as readfile 

38 

39from sarvey import viewer 

40import sarvey.utils as ut 

41from sarvey.objects import CoordinatesUTM, AmplitudeImage, BaseStack, Points 

42from sarvey.triangulation import PointNetworkTriangulation 

43 

44 

45def createTimeMaskFromDates(*, start_date: str, stop_date: str, date_list: list, logger: Logger): 

46 """Create a mask with selected dates within given time frame. 

47 

48 Parameters 

49 ---------- 

50 start_date: str 

51 Start date. 

52 stop_date: str 

53 Stop date. 

54 date_list: list 

55 all avaiable dates in the slcStack.h5. 

56 logger: Logger 

57 Logging handler. 

58 

59 Returns 

60 ------- 

61 time_mask: np.ndarray 

62 mask with True for selected dates. 

63 num_slc: int 

64 number of selected images. 

65 result_date_list: list 

66 list of selected dates. 

67 """ 

68 time_mask = np.ones((len(date_list)), dtype=np.bool_) 

69 date_list = [datetime.date(year=int(d[:4]), month=int(d[4:6]), day=int(d[6:])) for d in date_list] 

70 

71 if (start_date is None) and (stop_date is None): 

72 # use all images. 

73 result_date_list = [date.isoformat() for date in date_list] 

74 return time_mask, time_mask.shape[0], result_date_list 

75 

76 if start_date is None: 

77 start_date = min(date_list) 

78 else: 

79 start_date = datetime.date.fromisoformat(start_date) 

80 

81 if stop_date is None: 

82 stop_date = max(date_list) 

83 else: 

84 stop_date = datetime.date.fromisoformat(stop_date) 

85 

86 if start_date >= stop_date: 

87 logger.error(msg="Choose start date < stop date!") 

88 raise ValueError 

89 

90 if stop_date < min(date_list): 

91 logger.error(msg="Stop date is before the first acquired image. Choose a later stop date!") 

92 raise ValueError 

93 

94 if start_date > max(date_list): 

95 logger.error(msg="Start date is after the last acquired image. Choose an earlier start date!") 

96 raise ValueError 

97 

98 shift = " " 

99 logger.debug(msg=shift + "{:>10} {:>10}".format(" Date ", "Selected")) 

100 logger.debug(msg=shift + "{:>10} {:>10}".format("__________", "________")) 

101 

102 result_date_list = list() 

103 for i, date in enumerate(date_list): 

104 if (date < start_date) or (date > stop_date): 

105 time_mask[i] = False 

106 else: 

107 result_date_list.append(date.isoformat()) 

108 val = " x" if time_mask[i] else "" 

109 logger.debug(msg=shift + "{:>10} {:>3}".format(date.isoformat(), val)) 

110 

111 num_slc = time_mask[time_mask].shape[0] 

112 return time_mask, num_slc, result_date_list 

113 

114 

115def readSlcFromMiaplpy(*, path: str, box: tuple = None, logger: Logger) -> np.ndarray: 

116 """Read SLC data from phase-linking results of Miaplpy. 

117 

118 Parameters 

119 ---------- 

120 path: str 

121 Path to the phase_series.h5 file. 

122 box: tuple 

123 Bounding Box to read from. 

124 logger: Logger 

125 Logging handler. 

126 

127 Returns 

128 ------- 

129 slc: np.ndarray 

130 slc stack created from phase-linking results. 

131 """ 

132 logger.info(msg="read phase from MiaplPy results...") 

133 phase = readfile.read(path, datasetName='phase', box=box)[0] 

134 

135 logger.info(msg="read amplitude from MiaplPy results...") 

136 amp = readfile.read(path, datasetName='amplitude', box=box)[0] 

137 

138 logger.info(msg="combine phase and amplitude to slc...") 

139 slc = amp * np.exp(phase * 1j) 

140 return slc 

141 

142 

143def readCoherenceFromMiaplpy(*, path: str, box: tuple = None, logger: Logger) -> tuple[np.ndarray, dict]: 

144 """Read the coherence image from phase-linking of MiaplPy. 

145 

146 Parameters 

147 ---------- 

148 path: str 

149 Path to phase_series.h5 file. 

150 box: tuple 

151 Bounding Box to read from. 

152 logger: Logger 

153 Logging handler. 

154 

155 Returns 

156 ------- 

157 temp_coh: np.ndarray 

158 temporal coherence image from phase-linking results of MiaplPy. 

159 """ 

160 logger.info(msg="read quality from MiaplPy results...") 

161 temp_coh = readfile.read(path, datasetName='temporalCoherence', box=box)[0][1, :, :] 

162 return temp_coh 

163 

164 

165def selectPixels(*, path: str, selection_method: str, thrsh: float, 

166 grid_size: int = None, bool_plot: bool = False, logger: Logger): 

167 """Select pixels based on temporal coherence. 

168 

169 Parameters 

170 ---------- 

171 path: str 

172 Path to the directory with the temporal_coherence.h5 file. 

173 selection_method: str 

174 Pixel selection method. Currently, only "temp_coh" is implemented. 

175 thrsh: float 

176 Threshold for pixel selection. 

177 grid_size: int 

178 Grid size for sparse pixel selection. 

179 bool_plot: bool 

180 Plot the selected pixels. 

181 logger: Logger 

182 Logging handler. 

183 

184 Returns 

185 ------- 

186 cand_mask: np.ndarray 

187 Mask with selected pixels. 

188 """ 

189 quality = None 

190 grid_min_val = None 

191 cand_mask = None 

192 unit = None 

193 cmap = None 

194 # compute candidates 

195 if selection_method == "temp_coh": 

196 temp_coh_obj = BaseStack(file=join(path, "temporal_coherence.h5"), logger=logger) 

197 quality = temp_coh_obj.read(dataset_name="temp_coh") 

198 cand_mask = quality >= thrsh 

199 grid_min_val = False 

200 unit = "Temporal\nCoherence [ ]" 

201 cmap = "autumn" 

202 

203 if selection_method == "miaplpy": 

204 raise NotImplementedError("This part is not developed yet. MiaplPy data is read in another way.") 

205 # pl_coherence = readCoherenceFromMiaplpy(path=join(path, 'inverted', 'phase_series.h5'), box=None, 

206 # logger=logger) 

207 # cand_mask = pl_coherence >= thrsh 

208 # quality = pl_coherence 

209 # grid_min_val = False 

210 # unit = "Phase-Linking\nCoherence [ ]" 

211 # cmap = "autumn" 

212 

213 if grid_size is not None: # -> sparse pixel selection 

214 coord_utm_obj = CoordinatesUTM(file_path=join(path, "coordinates_utm.h5"), logger=logger) 

215 coord_utm_obj.open() 

216 box_list = ut.createSpatialGrid(coord_utm_img=coord_utm_obj.coord_utm, 

217 length=coord_utm_obj.coord_utm.shape[1], 

218 width=coord_utm_obj.coord_utm.shape[2], 

219 grid_size=grid_size)[0] 

220 cand_mask_sparse = ut.selectBestPointsInGrid(box_list=box_list, quality=quality, sel_min=grid_min_val) 

221 cand_mask &= cand_mask_sparse 

222 

223 if bool_plot: 

224 coord_xy = np.array(np.where(cand_mask)).transpose() 

225 bmap_obj = AmplitudeImage(file_path=join(path, "background_map.h5")) 

226 viewer.plotScatter(value=quality[cand_mask], coord=coord_xy, bmap_obj=bmap_obj, ttl="Selected pixels", 

227 unit=unit, s=2, cmap=cmap, vmin=0, vmax=1, logger=logger) 

228 # if grid_size is not None: 

229 # psViewer.plotGridFromBoxList(box_list, ax=ax, edgecolor="k", linewidth=0.2) 

230 plt.tight_layout() 

231 plt.gcf().savefig(join(path, "pic", "selected_pixels_{}_{}.png".format(selection_method, thrsh)), 

232 dpi=300) 

233 plt.close(plt.gcf()) 

234 

235 return cand_mask 

236 

237 

238def createArcsBetweenPoints(*, point_obj: Points, knn: int = None, max_arc_length: float = np.inf, 

239 logger: Logger) -> np.ndarray: 

240 """Create a spatial network of arcs to triangulate the points. 

241 

242 All points are triangulated with a Delaunay triangulation. If knn is given, the triangulation is done with the k 

243 nearest neighbors. Too long arcs are removed from the network. If, afterward, the network is not connected, a 

244 delaunay triangulation is performed again to ensure connectivity in the network. 

245 

246 Parameters 

247 ---------- 

248 point_obj: Points 

249 Point object. 

250 knn: int 

251 Number of nearest neighbors to consider (default: None). 

252 max_arc_length: float 

253 Maximum length of an arc. Longer arcs will be removed. Default: np.inf. 

254 logger: Logger 

255 Logging handler. 

256 

257 Returns 

258 ------- 

259 arcs: np.ndarray 

260 Arcs of the triangulation containing the indices of the points for each arc. 

261 """ 

262 triang_obj = PointNetworkTriangulation(coord_xy=point_obj.coord_xy, coord_utmxy=point_obj.coord_utm, logger=logger) 

263 

264 if knn is not None: 

265 triang_obj.triangulateKnn(k=knn) 

266 

267 triang_obj.triangulateGlobal() 

268 

269 logger.info(msg="remove arcs with length > {}.".format(max_arc_length)) 

270 triang_obj.removeLongArcs(max_dist=max_arc_length) 

271 

272 if not triang_obj.isConnected(): 

273 triang_obj.triangulateGlobal() 

274 

275 logger.info(msg="retrieve arcs from adjacency matrix.") 

276 arcs = triang_obj.getArcsFromAdjMat() 

277 return arcs