Coverage for sarvey/coherence.py: 84%
63 statements
« prev ^ index » next coverage.py v7.6.0, created at 2024-10-17 12:36 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2024-10-17 12:36 +0000
1#!/usr/bin/env python
3# SARvey - A multitemporal InSAR time series tool for the derivation of displacements.
4#
5# Copyright (C) 2021-2024 Andreas Piter (IPI Hannover, piter@ipi.uni-hannover.de)
6#
7# This software was developed together with FERN.Lab (fernlab@gfz-potsdam.de) in the context
8# of the SAR4Infra project with funds of the German Federal Ministry for Digital and
9# Transport and contributions from Landesamt fuer Vermessung und Geoinformation
10# Schleswig-Holstein and Landesbetrieb Strassenbau und Verkehr Schleswig-Holstein.
11#
12# This program is free software: you can redistribute it and/or modify it under
13# the terms of the GNU General Public License as published by the Free Software
14# Foundation, either version 3 of the License, or (at your option) any later
15# version.
16#
17# Important: This package uses PyMaxFlow. The core of PyMaxflows library is the C++
18# implementation by Vladimir Kolmogorov. It is also licensed under the GPL, but it REQUIRES that you
19# cite [BOYKOV04] (see LICENSE) in any resulting publication if you use this code for research purposes.
20# This requirement extends to SARvey.
21#
22# This program is distributed in the hope that it will be useful, but WITHOUT
23# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
24# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
25# details.
26#
27# You should have received a copy of the GNU Lesser General Public License along
28# with this program. If not, see <https://www.gnu.org/licenses/>.
30"""Coherence module for SARvey."""
31import multiprocessing
32import time
33import numpy as np
34from numba import jit
35from scipy.signal import convolve2d
36from logging import Logger
37from miaplpy.objects.slcStack import slcStack
38from sarvey.objects import BaseStack
39from sarvey.utils import convertBboxToBlock
42def computeIfgsAndTemporalCoherence(*, path_temp_coh: str, path_ifgs: str, path_slc: str, ifg_array: np.ndarray,
43 time_mask: np.ndarray, wdw_size: int, num_boxes: int, box_list: list,
44 num_cores: int, logger: Logger):
45 """ComputeIfgsAndTemporalCoherence.
47 Compute the interferograms and temporal coherence from the SLC stack for a given set of (spatial) patches.
49 Parameters
50 ----------
51 path_temp_coh : str
52 Path to the temporary coherence stack. The data will be stored in this file during processing.
53 path_ifgs : str
54 Path to the interferograms stack. The data will be stored in this file during processing.
55 path_slc : str
56 Path to the SLC stack. The data will be read from this file.
57 ifg_array : np.ndarray
58 Array containing the indices of the reference and secondary images which are used to compute the interferograms.
59 time_mask : np.ndarray
60 Binary mask indicating the selected images from the SLC stack.
61 wdw_size : int
62 Size of the filter window. Has to be odd.
63 num_boxes : int
64 Number of patches to enable reading and processing of larger SLC stacks.
65 box_list : list
66 List containing the indices of each patch.
67 num_cores : int
68 Number of cores for parallel processing.
69 logger : Logger
70 Logger object.
72 Returns
73 -------
74 mean_amp_img : np.ndarray
75 Mean amplitude image.
76 """
77 start_time = time.time()
78 filter_kernel = np.ones((wdw_size, wdw_size), dtype=np.float64)
79 filter_kernel[wdw_size // 2, wdw_size // 2] = 0
81 slc_stack_obj = slcStack(path_slc)
82 slc_stack_obj.open()
83 temp_coh_obj = BaseStack(file=path_temp_coh, logger=logger)
84 ifg_stack_obj = BaseStack(file=path_ifgs, logger=logger)
86 mean_amp_img = np.zeros((slc_stack_obj.length, slc_stack_obj.width), dtype=np.float32)
87 num_ifgs = ifg_array.shape[0]
89 for idx in range(num_boxes):
90 bbox = box_list[idx]
91 block2d = convertBboxToBlock(bbox=bbox)
93 # read slc
94 slc = slc_stack_obj.read(datasetName='slc', box=bbox, print_msg=False)
95 slc = slc[time_mask, :, :]
97 mean_amp = np.mean(np.abs(slc), axis=0)
98 mean_amp[mean_amp == 0] = np.nan
99 mean_amp_img[bbox[1]:bbox[3], bbox[0]:bbox[2]] = np.log10(mean_amp)
101 # compute ifgs
102 ifgs = computeIfgs(slc=slc, ifg_array=ifg_array)
103 ifg_stack_obj.writeToFileBlock(data=ifgs, dataset_name="ifgs", block=block2d, print_msg=False)
104 del slc
106 # filter ifgs
107 avg_neighbours = np.zeros_like(ifgs)
108 if num_cores == 1:
109 for i in range(num_ifgs):
110 avg_neighbours[:, :, i] = convolve2d(in1=ifgs[:, :, i], in2=filter_kernel, mode='same', boundary="symm")
111 else:
112 pool = multiprocessing.Pool(processes=num_cores)
114 args = [(
115 idx,
116 ifgs[:, :, idx],
117 filter_kernel) for idx in range(num_ifgs)]
119 results = pool.map(func=launchConvolve2d, iterable=args)
121 # retrieve results
122 for j, avg_neigh in results:
123 avg_neighbours[:, :, j] = avg_neigh
124 del results, args, avg_neigh
126 # compute temporal coherence
127 residual_phase = np.angle(ifgs * np.conjugate(avg_neighbours))
128 del ifgs, avg_neighbours
129 temp_coh = np.abs(np.mean(np.exp(1j * residual_phase), axis=2))
130 temp_coh_obj.writeToFileBlock(data=temp_coh, dataset_name="temp_coh", block=block2d, print_msg=False)
131 del residual_phase, temp_coh
132 logger.info(msg="Patches processed:\t {}/{}".format(idx + 1, num_boxes))
134 m, s = divmod(time.time() - start_time, 60)
135 logger.debug(msg='\ntime used: {:02.0f} mins {:02.1f} secs.\n'.format(m, s))
136 return mean_amp_img
139@jit(nopython=True)
140def computeIfgs(*, slc: np.ndarray, ifg_array: np.ndarray):
141 """ComputeIfgs.
143 Parameters
144 ----------
145 slc : np.ndarray
146 SLC stack.
147 ifg_array : np.ndarray
148 Array containing the indices of the reference and secondary images which are used to compute the interferograms.
150 Returns
151 -------
152 ifgs : np.ndarray
153 Interferograms.
154 """
155 t, length, width = slc.shape
156 num_ifgs = ifg_array.shape[0]
157 ifgs = np.zeros((length, width, num_ifgs), dtype=np.complex64)
159 c = 0
160 for i, j in ifg_array:
161 ifgs[:, :, c] = slc[i, :, :] * np.conjugate(slc[j, :, :])
162 c += 1
163 return ifgs
166def launchConvolve2d(args: tuple):
167 """LaunchConvolve2d.
169 Parameters
170 ----------
171 args : tuple
172 Tuple containing the arguments for the convolution.
173 Tuple contains:
175 idx : int
176 Index of the processed interferogram.
177 ifg : np.ndarray
178 Interferogram.
179 filter_kernel : np.ndarray
180 Filter kernel.
182 Returns
183 -------
184 idx : int
185 Index of the processed interferogram.
186 avg_neighbours : np.ndarray
187 Low-pass filtered phase derived as average of neighbours.
188 """
189 (idx, ifg, filter_kernel) = args
190 avg_neighbours = convolve2d(in1=ifg, in2=filter_kernel, mode='same', boundary="symm")
191 return idx, avg_neighbours