From 1b219b9848298ce85a711e0df9b3da4fa0bf1b11 Mon Sep 17 00:00:00 2001 From: ninsbl Date: Mon, 24 Mar 2025 08:39:54 +0100 Subject: [PATCH] swath_mask and ruff --- .../t.sentinel3.import/t.sentinel3.import.py | 223 +++++++++++++----- 1 file changed, 168 insertions(+), 55 deletions(-) diff --git a/src/temporal/t.sentinel3.import/t.sentinel3.import.py b/src/temporal/t.sentinel3.import/t.sentinel3.import.py index 7ae47fb..d51fa72 100644 --- a/src/temporal/t.sentinel3.import/t.sentinel3.import.py +++ b/src/temporal/t.sentinel3.import/t.sentinel3.import.py @@ -1,12 +1,11 @@ #!/usr/bin/env python3 -""" -MODULE: t.sentinel3.import +"""MODULE: t.sentinel3.import AUTHOR(S): Stefan Blumentrath PURPOSE: Import and pre-process Sentinel-3 data from the Copernicus program into a Space Time Raster Dataset (STRDS) -COPYRIGHT: (C) 2024 by Norwegian Water and Energy Directorate, Stefan Blumentrath, - and the GRASS development team +COPYRIGHT: (C) 2024-2025 by Norwegian Water and Energy Directorate, + Stefan Blumentrath, and the GRASS development team This program is free software under the GNU General Public License (>=v2). Read the file COPYING that comes with GRASS @@ -91,6 +90,20 @@ # % required: no # %end +# %option +# % key: swath_mask_band +# % type: string +# % description: Band to use for creating a swath mask +# % required: no +# %end + +# %option +# % key: swath_mask_buffer +# % type: integer +# % description: Create a swath mask for removing border noise by buffering the swath_mask_band inwards (in number of pixels) +# % required: no +# %end + # %option # % key: basename # % description: Basename used as prefix for map names (default is derived from the input file(s)) @@ -156,12 +169,13 @@ # %rules # % collective: title,description +# % collective: swath_mask_band,swath_mask_buffer # % required: -e,title,description # %end import re import sys -from datetime import datetime +from datetime import datetime, timezone from functools import partial from math import floor from multiprocessing import Pool @@ -177,11 +191,13 @@ } -def check_file_input(file_input, product_type): - """Checks input for files to geocode. - If input is a directory (and not aSAFE), contained SAFE files are listed +def check_file_input(file_input: str, product_type: str) -> list[Path]: + """Check input for files to geocode. + + If input is a directory (and not a SAFE), contained SAFE files are listed If input is a text file each line is assumed to be a path to a SAFE file - If input is a comma separated list of files element is assumed to be a path to a SAFE file + If input is a comma separated list of files element is assumed to be a + path to a SAFE file. Returns a sanetized list of Sentinel-1 input files. """ file_input = file_input.split(",") @@ -200,13 +216,46 @@ def check_file_input(file_input, product_type): return check_files_list(file_input, product_type) -def check_files_list(file_path_list, product_type): - """Checks if files in a list of files exist and gives a warning otherwise""" +def check_swath_mask_input(module_options: dict) -> tuple[str, int] | None: + """Check input for producing swath masks. + + Returns parsed dict values if requested, None if not, + and raises errors for invalid input. + """ + if not module_options["swath_mask_band"]: + return None, None + + swath_mask_band = module_options["swath_mask_band"] + try: + swath_mask_buffer = int(module_options["swath_mask_buffer"]) + except ValueError: + gs.fatal(_("Buffer for swath_mask_buffer must be an integer.")) + + if not any( + swath_mask_band.replace("reflectance", "radiance") in module_options[bands] + or swath_mask_band in module_options[bands] + or module_options[bands] == "all" + for bands in ("bands", "anxillary_bands", "flag_bands") + ): + gs.fatal( + _( + "Band {} to be used as swath mask is not requested for import in bands, anxillary_bands or flag_bands", + ).format(swath_mask_band), + ) + del module_options["swath_mask_band"] + del module_options["swath_mask_buffer"] + + return swath_mask_band, swath_mask_buffer + + +def check_files_list(file_path_list: str | None, product_type: str) -> list[Path]: + """Check if files in a list of files exist and gives a warning otherwise.""" if not file_path_list: gs.fatal(_("No scenes found to process")) existing_paths = [] file_pattern = re.compile( - f".*{S3_FILE_PATTERN[product_type].replace('*', '.*')}", re.IGNORECASE + f".*{S3_FILE_PATTERN[product_type].replace('*', '.*')}", + re.IGNORECASE, ) for file_path in file_path_list: file_path_object = Path(file_path) @@ -217,24 +266,28 @@ def check_files_list(file_path_list, product_type): else: gs.warning( _( - "File {file_path} does not match expected pattern {pattern} of product {product_type}." + "File {file_path} does not match expected pattern {pattern} of product {product_type}.", ).format( file_path=file_path, pattern=S3_FILE_PATTERN[product_type], product_type=product_type, - ) + ), ) else: gs.warning(_("File {} not found").format(file_path)) return existing_paths -def parse_s3_file_name(file_name): - """Extract info from Sentinel-3 file name according to naming convention: +def parse_s3_file_name(file_name: str) -> dict: + """Extract info from Sentinel-3 file name according to naming convention. + + Naming convention is described here: https://sentinels.copernicus.eu/web/sentinel/user-guides/sentinel-3-slstr/naming-convention - Assumes that file name is checked to be a valid / supported Sentinel-3 file name, e.g.: + Assumes that file name is checked to be a valid / supported Sentinel-3 file name. + E.g.: "S3B_SL_1_RBT____20240129T110139_20240129T110439_20240130T114811_0180_089_094_1800_PS2_O_NT_004.SEN3" - The suffix does not matter + The suffix does not matter. + :param file_name: string representing the file name of a Senintel-3 scene """ try: @@ -243,9 +296,16 @@ def parse_s3_file_name(file_name): "instrument": file_name[4:6], "level": file_name[7], "product": file_name[9:12], - "start_time": datetime.strptime(file_name[16:31], "%Y%m%dT%H%M%S"), - "end_time": datetime.strptime(file_name[32:47], "%Y%m%dT%H%M%S"), - "ingestion_time": datetime.strptime(file_name[48:63], "%Y%m%dT%H%M%S"), + "start_time": datetime.strptime(file_name[16:31], "%Y%m%dT%H%M%S").replace( + tzinfo=timezone.utc, + ), + "end_time": datetime.strptime(file_name[32:47], "%Y%m%dT%H%M%S").replace( + tzinfo=timezone.utc, + ), + "ingestion_time": datetime.strptime( + file_name[48:63], + "%Y%m%dT%H%M%S", + ).replace(tzinfo=timezone.utc), "duration": file_name[64:68], "cycle": file_name[69:72], "relative_orbit": file_name[73:76], @@ -256,8 +316,8 @@ def parse_s3_file_name(file_name): def group_scenes( - s3_files, - group_variables=( + s3_files: list[Path], + group_variables: tuple[str] = ( "mission_id", "instrument", "level", @@ -265,16 +325,20 @@ def group_scenes( "cycle", "relative_orbit", ), -): - """ - Group scenes along track and date by information from the file name: +) -> dict: + """Group scenes along track and date by information from the file name. + + The following variables can be used to group the scenes: 1. mission ID - 2. product type - 3. temporal granule - 4. duration + 2. instrument + 3. level + 4. product 5. cycle 6. relative orbit - : param s3_filesv: list of pathlib.Path objects with Sentinel-3 files + 7. temporal granule + + :param s3_files: list of pathlib.Path objects with Sentinel-3 files + :param group_variables: tuple of strings with the variables to group the scenes by """ groups = {} for sfile in s3_files: @@ -284,7 +348,7 @@ def group_scenes( ) group_id = "_".join( [s3_name_dict[group_var] for group_var in group_variables] - + [s3_name_dict["start_time"].strftime("%Y%m%d")] + + [s3_name_dict["start_time"].strftime("%Y%m%d")], ) if group_id in groups: groups[group_id].append(str(sfile)) @@ -293,12 +357,24 @@ def group_scenes( return groups -def process_scene_group(scene_group, module_options=None, module_flags=None): - """Import a group of Sentinel3 scenes""" +def process_scene_group( + scene_group: tuple, + module_options: dict | None = None, + module_flags: str | None = None, + swath_mask: tuple[str, int] | None = None, +) -> str: + """Import a group of Sentinel3 scenes and create a swath mask if requested. + + :param scene_group: tuple with group name and list of scene file paths + :param module_options: dictionary with module options + :param module_flags: list of module flags + """ gs.verbose(_("Processing scene group {}...").format(scene_group[0])) module_options["basename"] = module_options["basename"] or scene_group[0] + swath_mask_band, swath_mask_buffer = swath_mask + i_sentinel3_import = Module( "i.sentinel3.import", input=",".join(scene_group[1]), @@ -307,22 +383,57 @@ def process_scene_group(scene_group, module_options=None, module_flags=None): flags=module_flags, quiet=True, ) - return i_sentinel3_import.outputs.stdout + module_stdout = i_sentinel3_import.outputs.stdout + module_stdout = module_stdout.strip() if module_stdout else "" + + if swath_mask_band and module_stdout: + if swath_mask_band not in module_stdout: + gs.warning( + _("{band} not imported for group {group}").format( + band=swath_mask_band, + group=scene_group[0], + ), + ) + else: + swath_mask_band_map, start_time, end_time, semantic_label = [ + line.split("|") + for line in module_stdout.split("\n") + if swath_mask_band in line + ][0] + swath_map_name = f"{module_options['basename']}_swath_mask" + Module( + "r.grow", + input=swath_mask_band_map, + output=swath_map_name, + radius=-swath_mask_buffer, + old=1, + new=-1, + quiet=True, + env_=i_sentinel3_import.env_, + ) + module_stdout += f"\n{swath_map_name}@{gs.gisenv().get('MAPSET')}|{start_time}|{end_time}|S3_swath_mask" + + return module_stdout + +def distribute_cores(nprocs: int, groups_n: int) -> tuple[int, int]: + """Distribute cores across expected processes. -def distribute_cores(nprocs, groups_n): - """Distribute cores across inner (parallel processes within + Distribute cores across inner (parallel processes within i.sentinel3.import) and outer (parallel runs of i.sentinel3.import) loop of processes. At least one core is allocated to inner (i.sentinel3.import) and outer (group of Sentinel-3 scenes) process. - Order if returns is inner, outer.""" - return max(1, floor(nprocs / groups_n)), min(groups_n, nprocs) + Order if returns is inner, outer. + :param nprocs: number of available cores + :param groups_n: number of groups to process + """ + return max(1, floor(nprocs / groups_n)), min(groups_n, nprocs) -def main(): - """Do the main work""" +def main() -> None: + """Do the main work.""" # Get GRASS GIS environment gisenv = dict(gs.gisenv()) @@ -334,30 +445,32 @@ def main(): tgis_strds = tgis.SpaceTimeRasterDataset(strds_long_name) # Check if input is complete and valid - # Check if target STRDS exists and create it if not or abort if overwriting is not allowed + # Check if target STRDS exists and create it if not + # or abort if overwriting is not allowed if tgis_strds.is_in_db(): if not gs.overwrite(): gs.fatal( _( "Output STRDS <{}> exists." - "Use --overwrite together with -e to modify the existing STRDS." - ).format(options["output"]) + "Use --overwrite together with -e to modify the existing STRDS.", + ).format(options["output"]), ) elif not options["title"] or not options["description"]: gs.fatal( _( - "Creation of a new STRDS <{}> requires the 'title' and 'description' option" - ).format(strds_long_name) + "Creation of a new STRDS <{}> requires the 'title' and 'description' option", + ).format(strds_long_name), ) # Group input scenes groups_to_process = group_scenes( - check_file_input(options["input"], options["product_type"]) + check_file_input(options["input"], options["product_type"]), ) # Distribute cores nprocs_inner, nprocs_outer = distribute_cores( - int(options["nprocs"]), len(groups_to_process) + int(options["nprocs"]), + len(groups_to_process), ) # Setup import module, and collect flags amd options @@ -377,6 +490,7 @@ def main(): process_scene_group, module_options=module_options, module_flags=[flag for flag in "cdnjkor" if flags[flag]], + swath_mask=check_swath_mask_input(options), ) if nprocs_outer > 1: @@ -388,14 +502,12 @@ def main(): ] # Remove empty results - register_strings = [ - result.strip() for result in register_strings if result and result.strip() - ] + register_strings = [result for result in register_strings if result] if not register_strings: gs.warning( _("No valid data found in <{}>. Nothing to register in STRDS.").format( - options["input"] - ) + options["input"], + ), ) sys.exit(0) @@ -406,13 +518,14 @@ def main(): strds_long_name = f"{options['output']}@{gisenv['MAPSET']}" tgis_strds = tgis.SpaceTimeRasterDataset(strds_long_name) - # Check if target STRDS exists and create it if not or abort if overwriting is not allowed + # Check if target STRDS exists and create it if not + # or abort if overwriting is not allowed if tgis_strds.is_in_db() and not gs.overwrite(): gs.fatal( _( "Output STRDS <{}> exists." - "Use --overwrite together with -e to modify the existing STRDS." - ).format(options["output"]) + "Use --overwrite together with -e to modify the existing STRDS.", + ).format(options["output"]), ) # Create STRDS if needed