Coverage for sarvey/sarvey_mask.py: 0%
286 statements
« prev ^ index » next coverage.py v7.6.0, created at 2024-10-17 12:36 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2024-10-17 12:36 +0000
1#!/usr/bin/env python
3# SARvey - A multitemporal InSAR time series tool for the derivation of displacements.
4#
5# Copyright (C) 2021-2024 Andreas Piter (IPI Hannover, piter@ipi.uni-hannover.de)
6#
7# This software was developed together with FERN.Lab (fernlab@gfz-potsdam.de) in the context
8# of the SAR4Infra project with funds of the German Federal Ministry for Digital and
9# Transport and contributions from Landesamt fuer Vermessung und Geoinformation
10# Schleswig-Holstein and Landesbetrieb Strassenbau und Verkehr Schleswig-Holstein.
11#
12# This program is free software: you can redistribute it and/or modify it under
13# the terms of the GNU General Public License as published by the Free Software
14# Foundation, either version 3 of the License, or (at your option) any later
15# version.
16#
17# Important: This package uses PyMaxFlow. The core of PyMaxflows library is the C++
18# implementation by Vladimir Kolmogorov. It is also licensed under the GPL, but it REQUIRES that you
19# cite [BOYKOV04] (see LICENSE) in any resulting publication if you use this code for research purposes.
20# This requirement extends to SARvey.
21#
22# This program is distributed in the hope that it will be useful, but WITHOUT
23# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
24# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
25# details.
26#
27# You should have received a copy of the GNU Lesser General Public License along
28# with this program. If not, see <https://www.gnu.org/licenses/>.
30"""Generate mask from shape file."""
31import argparse
32import os
33import sys
34import time
35from os.path import join
36import PIL.Image as Image
37import PIL.ImageDraw as ImageDraw
38import matplotlib
39import matplotlib.pyplot as plt
40import numpy as np
41from scipy import spatial
42import logging
43from logging import Logger
44import geopandas as gpd
46from mintpy.utils import writefile, ptime, utils
48from sarvey.osm_utils import getSpatialExtend
50try:
51 matplotlib.use('TkAgg')
52except ImportError as e:
53 print(e)
55EXAMPLE = """Example:
56 sarvey_mask path/to/file.shp --geom ./geometryRadar.h5 --width 6 -o mask_infra.h5
57"""
60def create_parser():
61 """Create_parser."""
62 parser = argparse.ArgumentParser(
63 description='Create transport infrastructure mask from shp-file.',
64 formatter_class=argparse.RawTextHelpFormatter,
65 epilog=EXAMPLE)
67 parser.add_argument(dest='input_file', help='path to input shp-file.')
69 parser.add_argument('-w', '--work_dir', dest='work_dir', default=None,
70 help='absolute path to working directory\n' +
71 '(default: current directory).')
73 parser.add_argument('--geom', dest='geom_file', default=None,
74 help='path to existing geometryRadar.h5 file.')
76 parser.add_argument('--width', dest='width', default=6, type=int,
77 help='Width of the mask in pixel (default: 6).')
79 parser.add_argument('-o', dest='out_file_name', default='mask_infra.h5',
80 help="name of output file. (default: 'mask_infra.h5').")
82 return parser
85class Node:
86 """Define simple class for a node at a road (similar to overpy.Node)."""
88 def __init__(self, *, lat: float = None, lon: float = None):
89 """Init."""
90 self.lat = lat
91 self.lon = lon
94class CoordinateSearch:
95 """CoordinateSearch."""
97 def __init__(self):
98 """Init."""
99 self.search_tree = None
100 self.yidx = None
101 self.xidx = None
102 self.lon = None
103 self.lat = None
104 self.coord = None
106 def createSearchTree(self, *, coord: utils.coordinate, logger: Logger):
107 """Create search tree.
109 Parameters
110 ----------
111 coord: utils.coordinate
112 Coordinates
113 logger: Logger
114 Logging handler.
115 """
116 self.coord = coord
117 logger.info(msg='create kd-tree for efficient search...')
119 if self.coord.lut_y is None or self.coord.lut_x is None:
120 self.coord.open()
121 lat, lon = self.coord.read_lookup_table(print_msg=False)
122 self.lat = lat.ravel()
123 self.lon = lon.ravel()
125 # create the 2D coordinate arrays for fast indexing
126 x = np.arange(self.coord.lut_x.shape[1])
127 y = np.arange(self.coord.lut_x.shape[0])
128 xx, yy = np.meshgrid(x, y)
129 self.xidx = xx.ravel()
130 self.yidx = yy.ravel()
132 start_time = time.time()
133 self.search_tree = spatial.KDTree(data=np.array([self.lon, self.lat]).transpose())
135 logger.info(msg='... done.')
136 m, s = divmod(time.time() - start_time, 60)
137 logger.debug(msg='time used: {:02.0f} mins {:02.1f} secs.'.format(m, s))
139 def getMeanDistanceBetweenPixels(self):
140 """Compute mean distance between adjacent pixels."""
141 distances = self.search_tree.query([self.lon[0], self.lat[0]], k=10)[0]
142 mean_dist = np.mean(distances[1:])
143 return mean_dist
145 def getNearestNeighbour(self, *, node: Node):
146 """Query the kd-tree for the nearest neighbour.
148 :param node: Node object
149 """
150 # find nearest neighbour
151 dist, idx = self.search_tree.query([node.lon, node.lat])
152 found_node = Node(lat=self.lat[idx], lon=self.lon[idx])
153 # return index of NN in radar coordinates
154 return dist, (self.yidx[idx], self.xidx[idx]), found_node
157def findLastRoadPixel(*, csearch: CoordinateSearch, cur_node: Node, prev_node: Node, dist_thrsh: float):
158 """Find the index of the last road pixel that is within the image extend.
160 Idea: the pixel with the shortest distance to the current node of a road is not necessarily on the road, if the
161 current node is outside the image extend. Split the road in further linear parts and find the last road pixel
162 recursively that is still inside the image.
163 Hint: all nodes are instances from class Node
165 Parameters
166 ----------
167 csearch: CoordinateSearch
168 Search tree for efficient spatial search of the coordinate of a pixel in the radar image.
169 cur_node: Node
170 Current node of the road that is outside the image extend.
171 prev_node: Node
172 Previous node of the road that is inside the image extend.
173 dist_thrsh: float
174 Distance threshold for stop criterion (derived from average distance between two pixels in the image).
176 Returns
177 -------
178 node_idx: int
179 Node of the pixel which is the last pixel on the road inside the image.
180 """
181 # create a new node at half of the road distance between previous and current node
182 mid_lat = cur_node.lat + (cur_node.lat - prev_node.lat) / 2
183 mid_lon = cur_node.lon + (cur_node.lon - prev_node.lon) / 2
184 mid_node = Node(lat=mid_lat, lon=mid_lon)
186 dist, node_idx = csearch.getNearestNeighbour(node=mid_node)[0:2]
187 if dist < dist_thrsh:
188 return node_idx
189 else:
190 node_idx = findLastRoadPixel(csearch=csearch, cur_node=cur_node, prev_node=prev_node, dist_thrsh=dist_thrsh)
191 return node_idx
194def euclDist(*, node1: Node, node2: Node):
195 """Compute the euclidean distance between two nodes."""
196 return np.sqrt((node1.lat - node2.lat) ** 2 + (node1.lon - node2.lon) ** 2)
199def computeLastRoadPixel(*, cur_node: Node, prev_node: Node, found_node: Node):
200 """Compute the location of the pixel at the border of the radar image that is part of the road.
202 Parameters
203 ----------
204 cur_node: Node
205 Current node of the road.
206 prev_node: Node
207 Previous node of the road.
208 found_node: Node
209 Found node of the road.
211 Returns
212 -------
213 new_lon: float
214 Longitude of the pixel at the border of the radar image that is part of the road.
215 new_lat: float
216 Latitude of the pixel at the border of the radar image that is part of the road.
217 """
218 a = euclDist(node1=prev_node, node2=found_node)
219 b = euclDist(node1=cur_node, node2=found_node)
220 c = euclDist(node1=prev_node, node2=cur_node)
221 alpha = np.arccos((- a ** 2 + b ** 2 + c ** 2) / (2 * b * c))
222 d = b / np.sin(np.pi / 2 - alpha)
223 new_lat = cur_node.lat + (prev_node.lat - cur_node.lat) / c * d
224 new_lon = cur_node.lon + (prev_node.lon - cur_node.lon) / c * d
225 return new_lon, new_lat
228def convertToRadarCoordPolygon(*, gdf_infra: gpd.geodataframe, csearch: CoordinateSearch, logger: Logger):
229 """Convert Polygon to a mask in shape of radar image.
231 Parameters
232 ----------
233 gdf_infra: gpd.geodataframe
234 The queried infrastructures containing polygons.
235 csearch: CoordinateSearch
236 The coordinate search object.
237 logger: Logger
238 Logging handler.
240 Returns
241 -------
242 img_np: np.ndarray
243 Mask image.
244 """
245 # create a new image
246 logger.info(msg='create mask image...')
247 img_pil = Image.new(mode="1",
248 size=(int(csearch.coord.src_metadata['LENGTH']), int(csearch.coord.src_metadata['WIDTH'])))
249 img_pil_draw = ImageDraw.Draw(im=img_pil)
251 num_ways = gdf_infra.shape[0]
252 way_iter = 0
253 prog_bar = ptime.progressBar(maxValue=num_ways)
255 dist_thrsh = 1.3 * csearch.getMeanDistanceBetweenPixels()
256 lines = [geom.boundary.coords for geom in gdf_infra.geometry if geom is not None]
257 way_list = list()
258 for coo in lines:
259 way_list.append([Node(lat=point[1], lon=point[0]) for point in coo])
261 # plt.ion()
262 fig = plt.figure()
263 ax = fig.add_subplot()
264 ax.set_xlabel("lon")
265 ax.set_ylabel("lat")
266 lat, lon = csearch.coord.read_lookup_table(print_msg=False)
267 ax.plot([lon[0, 0], lon[-1, 0]], [lat[0, 0], lat[-1, 0]], '-k')
268 ax.plot([lon[0, 0], lon[0, -1]], [lat[0, 0], lat[0, -1]], '-k')
269 ax.plot([lon[0, -1], lon[-1, -1]], [lat[0, -1], lat[-1, -1]], '-k')
270 ax.plot([lon[-1, 0], lon[-1, -1]], [lat[-1, 0], lat[-1, -1]], '-k')
271 # ax.plot(lon.ravel(), lat.ravel(), '.k', markersize=0.5)
273 while way_iter < num_ways:
274 way = way_list[way_iter]
275 poly_line_way = []
277 # perform a preliminary search to check if polygon is partly outside image extend
278 outside = np.zeros(len(way))
279 for i in range(len(way)):
280 cur_node = way[i]
282 # convert node coordinates (lat, lon) to image coordinates
283 dist, _, _ = csearch.getNearestNeighbour(node=cur_node)
285 # check if node is outside the image
286 if dist > dist_thrsh:
287 outside[i] = 1
289 if np.sum(outside) == 0: # all road nodes inside image extend
290 for i in range(len(way)):
291 cur_node = way[i]
293 # convert node coordinates (lat, lon) to image coordinates
294 dist, node_idx, found_node = csearch.getNearestNeighbour(node=cur_node)
296 # Fill list of current way with node coordinates
297 poly_line_way.append(node_idx)
298 ax.plot(cur_node.lon, cur_node.lat, '*k')
299 ax.plot(found_node.lon, found_node.lat, 'ok')
301 else: # some polygon nodes outside image extend
302 if np.sum(outside) == outside.size: # all nodes outside, skip
303 way_iter += 1
304 continue
306 # polygon nodes partly inside and partly outside
307 prev_p = outside[-2] == 1 # last point == first point (due to closed polygon). Select second last.
308 for i in range(outside.shape[0]):
309 cur_p = outside[i] == 1
310 cur_node = way[i]
312 # convert node coordinates (lat, lon) to image coordinates
313 dist, node_idx, found_node = csearch.getNearestNeighbour(node=cur_node)
315 # check if transition happens
316 # yes: check if current point is inside or outside
317 # if outside: find transition point, but do not add current point
318 # if inside: find transition point, then add current point
319 # no: if point inside: add point
320 # if point outside: skip point
322 if not (prev_p == cur_p): # transition
323 stored_idx = None
324 if i - 1 < 0:
325 prev_node = way[-2]
326 else:
327 prev_node = way[i - 1]
329 if cur_p: # transition: in -> out
330 # find transition point, but do not add current point
331 ax.plot(cur_node.lon, cur_node.lat, '*y')
333 if prev_p: # transition: out -> in
334 # find and add transition point, then add current point.
335 stored_idx = node_idx # store current point for adding it later.
336 ax.plot(cur_node.lon, cur_node.lat, '*r') # plot now, because variables will be overwritten
337 # the 'found_node' has to be computed from the last point outside, i.e. from 'prev_node'
338 ax.plot(found_node.lon, found_node.lat, 'or')
339 _, _, found_node = csearch.getNearestNeighbour(node=prev_node)
341 new_lon, new_lat = computeLastRoadPixel(
342 cur_node=cur_node,
343 prev_node=prev_node,
344 found_node=found_node
345 )
347 dist, node_idx, found_node = csearch.getNearestNeighbour(node=Node(lon=new_lon, lat=new_lat))
348 ax.plot(cur_node.lon, cur_node.lat, '*b')
349 ax.plot(found_node.lon, found_node.lat, 'ob')
350 ax.plot(new_lon, new_lat, '+b')
352 # add the transition point
353 poly_line_way.append(node_idx)
354 if prev_p: # transition: out -> in
355 # transition point found and added, now add stored current point.
356 poly_line_way.append(stored_idx)
357 prev_p = cur_p # prepare for next iteration
359 elif cur_p: # no transition, current point is outside -> do not add point
360 ax.plot(cur_node.lon, cur_node.lat, '*y')
361 prev_p = cur_p # prepare for next iteration
363 else: # no transition, current points is inside -> add point
364 ax.plot(cur_node.lon, cur_node.lat, '*r')
365 ax.plot(found_node.lon, found_node.lat, 'or')
366 poly_line_way.append(node_idx)
367 prev_p = cur_p # prepare for next iteration
369 prog_bar.update(value=way_iter + 1, every=10, suffix='{}/{} polygons'.format(way_iter + 1, num_ways))
371 # if first point is outside image, the polygon will not be closed. However, it still works to create a polygon.
372 img_pil_draw.polygon(poly_line_way, fill=255)
373 # plt.figure()
374 # plt.imshow(np.array(img_pil.getdata()).reshape(img_pil.size[1], img_pil.size[0]).astype(int))
376 way_iter += 1
378 img_np = np.array(img_pil.getdata()).reshape(img_pil.size[1], img_pil.size[0]).astype(int)
379 return img_np
382def convertToRadarCoord(*, gdf_infra: gpd.geodataframe, csearch: CoordinateSearch, width: int, logger: Logger):
383 """Convert Polyline to a mask in shape of radar image. Apply a buffer of size 'width' in pixels.
385 Parameters
386 ----------
387 gdf_infra: gpd.geodataframe
388 The queried infrastructures containing polygons.
389 csearch: CoordinateSearch
390 The coordinate search object.
391 width: int
392 Width of the mask in pixel.
393 logger: Logger
394 Logging handler.
396 Returns
397 -------
398 img_np: np.ndarray
399 Mask image.
400 """
401 # create a new image
402 logger.info(msg='create mask image...')
403 img_pil = Image.new(mode="1",
404 size=(int(csearch.coord.src_metadata['LENGTH']), int(csearch.coord.src_metadata['WIDTH'])))
405 img_pil_draw = ImageDraw.Draw(im=img_pil)
407 num_roads = gdf_infra.shape[0]
408 prog_bar = ptime.progressBar(maxValue=num_roads)
410 dist_thrsh = 1.3 * csearch.getMeanDistanceBetweenPixels()
411 lines = [ls.coords for ls in gdf_infra.geometry if ls is not None] # enables to append to list
412 way_list = list()
413 for coo in lines:
414 way_list.append([Node(lat=point[1], lon=point[0]) for point in coo])
416 num_ways = len(way_list) # changes during iteration
417 way_iter = 0
419 # plt.ion()
420 fig = plt.figure()
421 ax = fig.add_subplot()
422 ax.set_xlabel("lon")
423 ax.set_ylabel("lat")
424 lat, lon = csearch.coord.read_lookup_table(print_msg=False)
425 ax.plot([lon[0, 0], lon[-1, 0]], [lat[0, 0], lat[-1, 0]], '-k')
426 ax.plot([lon[0, 0], lon[0, -1]], [lat[0, 0], lat[0, -1]], '-k')
427 ax.plot([lon[0, -1], lon[-1, -1]], [lat[0, -1], lat[-1, -1]], '-k')
428 ax.plot([lon[-1, 0], lon[-1, -1]], [lat[-1, 0], lat[-1, -1]], '-k')
429 # ax.plot(lon.ravel(), lat.ravel(), '.k', markersize=0.5)
431 while way_iter < num_ways:
432 way = way_list[way_iter]
433 poly_line_way = []
435 # perform a preliminary search to check if road is partly outside image extend
436 outside = np.zeros(len(way))
437 for i in range(len(way)):
438 cur_node = way[i]
440 # convert node coordinates (lat, lon) to image coordinates
441 dist, _, _ = csearch.getNearestNeighbour(node=cur_node)
443 # check if node is outside the image
444 if dist > dist_thrsh:
445 outside[i] = 1
447 if np.sum(outside) == 0: # all road nodes inside image extend
448 for i in range(len(way)):
449 cur_node = way[i]
451 # convert node coordinates (lat, lon) to image coordinates
452 dist, node_idx, found_node = csearch.getNearestNeighbour(node=cur_node)
454 # Fill list of current way with node coordinates
455 poly_line_way.append(node_idx)
456 ax.plot(cur_node.lon, cur_node.lat, '*k')
457 ax.plot(found_node.lon, found_node.lat, 'ok')
459 else: # some road nodes outside image extend
460 if np.sum(outside) == outside.size: # all nodes outside, skip
461 way_iter += 1
462 continue
463 # split the way into sub parts based on in-out / out-in transition
464 # find first node inside the image
465 first_inside_idx = np.where(outside == 0)[0][0]
466 if first_inside_idx > 0: # this is a transition into the image
467 start_idx = first_inside_idx - 1
468 else:
469 start_idx = first_inside_idx
471 # find first node which is again outside the image
472 outside_idx = np.where(outside[first_inside_idx:] == 1)[0]
473 if outside_idx.size == 0: # no more transition to outside the image
474 stop_idx = len(way)
475 else:
476 stop_idx = outside_idx[0] + first_inside_idx + 1
477 if stop_idx != len(way): # split the current way and add a new way at the end of the way_list
478 # to handle it later
479 way_list.append(way[stop_idx:])
480 num_ways += 1
482 for i in range(start_idx, stop_idx):
483 cur_node = way[i]
485 # convert node coordinates (lat, lon) to image coordinates
486 dist, node_idx, found_node = csearch.getNearestNeighbour(node=cur_node)
488 if dist > dist_thrsh:
489 if i == start_idx: # there is no previous node, but a next node.
490 prev_node = way[i + 1]
491 else:
492 prev_node = way[i - 1]
493 new_lon, new_lat = computeLastRoadPixel(cur_node=cur_node, prev_node=prev_node,
494 found_node=found_node)
495 dist, node_idx, found_node = csearch.getNearestNeighbour(node=Node(lon=new_lon, lat=new_lat))
496 ax.plot(cur_node.lon, cur_node.lat, '*b')
497 ax.plot(found_node.lon, found_node.lat, 'ob')
498 ax.plot(new_lon, new_lat, '+b')
499 else:
500 ax.plot(cur_node.lon, cur_node.lat, '*r')
501 ax.plot(found_node.lon, found_node.lat, 'or')
502 # Fill list of current way with node coordinates
503 poly_line_way.append(node_idx)
505 prog_bar.update(value=way_iter + 1, every=10, suffix='{}/{} road segments'.format(way_iter + 1, num_roads))
507 img_pil_draw.line(poly_line_way, fill=255, width=width)
508 # img_pil_draw.polygon(poly_line_way, fill=255)
510 way_iter += 1
512 img_np = np.array(img_pil.getdata()).reshape(img_pil.size[1], img_pil.size[0]).astype(int)
513 return img_np
516def saveMask(*, work_dir: str, mask: np.ndarray, atr: dict, out_file_name: str):
517 """Save the mask to 'maskRoads.h5'.
519 Parameters
520 ----------
521 work_dir: str
522 Working directory.
523 mask: np.ndarray
524 Mask image.
525 atr: dict
526 Metadata data, e.g. from the geometryRadar.h5 file.
527 out_file_name: str
528 Output file name.
529 """
530 # create the right attributes
531 ds_dict = dict()
532 ds_dict['mask'] = mask.transpose().astype('float32')
533 atr["FILE_TYPE"] = "mask"
535 writefile.write(datasetDict=ds_dict, out_file=os.path.join(work_dir, out_file_name), metadata=atr)
538def createMask(*, input_file: str, width: int, work_dir: str, out_file_name: str, geom_file: str,
539 logger: logging.Logger):
540 """Create a mask for the radar image from a shapefile containing lines or polygons.
542 Parameters
543 ----------
544 input_file: str
545 Path to input file.
546 width: int
547 Width of the mask in pixel. Applied to the lines only.
548 work_dir: str
549 Working directory.
550 out_file_name: str
551 Output file name.
552 geom_file: str
553 Path to geometryRadar.h5 file.
554 logger: logging.Logger
555 Logging handler.
556 """
557 logger.info(msg="Start creating mask file based on openstreetmap data.")
559 # get bounding box
560 _, _, _, coord, atr = getSpatialExtend(geom_file=geom_file, logger=logger)
562 # create search tree
563 csearch = CoordinateSearch()
564 csearch.createSearchTree(coord=coord, logger=logger)
566 logger.info(f"Read from input file: {input_file}")
567 gdf_infra = gpd.read_file(input_file)
569 if gdf_infra.geometry[0].geom_type == "LineString":
570 mask_img = convertToRadarCoord(gdf_infra=gdf_infra, csearch=csearch, width=width, logger=logger)
572 elif gdf_infra.geometry[0].geom_type == "Polygon":
573 mask_img = convertToRadarCoordPolygon(gdf_infra=gdf_infra, csearch=csearch, width=width, logger=logger)
574 else:
575 logger.error(msg=f"Geometry type is {gdf_infra.geometry[0].geom_type}."
576 f"Only 'LineString' and 'Polygon' supported!")
577 raise TypeError
579 if '.h5' not in out_file_name:
580 out_file_name += ".h5"
581 saveMask(work_dir=work_dir, mask=mask_img, atr=atr, out_file_name=out_file_name)
583 logger.info(msg="Masking finished.")
586def main(iargs=None):
587 """Create mask from lines or polygons given in geographic coordinates (EPSG:4326). Input as shp or gpkg."""
588 # check input
589 parser = create_parser()
590 inps = parser.parse_args(args=iargs)
592 # initiate logger
593 logging_level = logging.getLevelName('DEBUG')
595 log_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
596 logger = logging.getLogger(__name__)
598 current_datetime = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
599 log_filename = f"sarvey_mask_{current_datetime}.log"
600 if not os.path.exists(os.path.join(os.getcwd(), "logfiles")):
601 os.mkdir(os.path.join(os.getcwd(), "logfiles"))
602 file_handler = logging.FileHandler(filename=os.path.join(os.getcwd(), "logfiles", log_filename))
603 file_handler.setFormatter(log_format)
604 logger.addHandler(file_handler)
606 console_handler = logging.StreamHandler(sys.stdout)
607 console_handler.setFormatter(log_format)
608 logger.addHandler(console_handler)
609 logger.setLevel(logging_level)
611 if inps.work_dir is None:
612 work_dir = os.getcwd()
613 else:
614 work_dir = inps.work_dir
615 if not os.path.exists(path=work_dir):
616 logger.info(msg='create output folder: ' + work_dir)
617 os.mkdir(path=work_dir)
618 logger.info(msg='working directory: {}'.format(work_dir))
620 input_file = join(work_dir, inps.input_file)
621 out_file_name = join(work_dir, inps.out_file_name)
623 createMask(
624 input_file=input_file,
625 width=inps.width,
626 work_dir=work_dir,
627 out_file_name=out_file_name,
628 logger=logger,
629 geom_file=inps.geom_file
630 )
633if __name__ == '__main__':
634 main()