Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 168 additions & 55 deletions src/temporal/t.sentinel3.import/t.sentinel3.import.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
#!/usr/bin/env python3

Check failure on line 1 in src/temporal/t.sentinel3.import/t.sentinel3.import.py

View workflow job for this annotation

GitHub Actions / Python Code Quality Checks (ubuntu-22.04)

[pylint] src/temporal/t.sentinel3.import/t.sentinel3.import.py E:219,52: unsupported operand type(s) for | (unsupported-binary-operation) E:251,37: unsupported operand type(s) for | (unsupported-binary-operation) E:362,20: unsupported operand type(s) for | (unsupported-binary-operation) E:363,18: unsupported operand type(s) for | (unsupported-binary-operation) E:364,16: unsupported operand type(s) for | (unsupported-binary-operation)

"""
MODULE: t.sentinel3.import
"""MODULE: t.sentinel3.import
AUTHOR(S): Stefan Blumentrath
PURPOSE: Import and pre-process Sentinel-3 data from the Copernicus program
into a Space Time Raster Dataset (STRDS)
COPYRIGHT: (C) 2024 by Norwegian Water and Energy Directorate, Stefan Blumentrath,
and the GRASS development team
COPYRIGHT: (C) 2024-2025 by Norwegian Water and Energy Directorate,
Stefan Blumentrath, and the GRASS development team

This program is free software under the GNU General Public
License (>=v2). Read the file COPYING that comes with GRASS
Expand Down Expand Up @@ -91,6 +90,20 @@
# % required: no
# %end

# %option
# % key: swath_mask_band
# % type: string
# % description: Band to use for creating a swath mask
# % required: no
# %end

# %option
# % key: swath_mask_buffer
# % type: integer
# % description: Create a swath mask for removing border noise by buffering the swath_mask_band inwards (in number of pixels)
# % required: no
# %end

# %option
# % key: basename
# % description: Basename used as prefix for map names (default is derived from the input file(s))
Expand Down Expand Up @@ -156,12 +169,13 @@

# %rules
# % collective: title,description
# % collective: swath_mask_band,swath_mask_buffer
# % required: -e,title,description
# %end

import re
import sys
from datetime import datetime
from datetime import datetime, timezone
from functools import partial
from math import floor
from multiprocessing import Pool
Expand All @@ -177,11 +191,13 @@
}


def check_file_input(file_input, product_type):
"""Checks input for files to geocode.
If input is a directory (and not aSAFE), contained SAFE files are listed
def check_file_input(file_input: str, product_type: str) -> list[Path]:
"""Check input for files to geocode.

If input is a directory (and not a SAFE), contained SAFE files are listed
If input is a text file each line is assumed to be a path to a SAFE file
If input is a comma separated list of files element is assumed to be a path to a SAFE file
If input is a comma separated list of files element is assumed to be a
path to a SAFE file.
Returns a sanetized list of Sentinel-1 input files.
"""
file_input = file_input.split(",")
Expand All @@ -200,13 +216,46 @@
return check_files_list(file_input, product_type)


def check_files_list(file_path_list, product_type):
"""Checks if files in a list of files exist and gives a warning otherwise"""
def check_swath_mask_input(module_options: dict) -> tuple[str, int] | None:
"""Check input for producing swath masks.

Returns parsed dict values if requested, None if not,
and raises errors for invalid input.
"""
if not module_options["swath_mask_band"]:
return None, None

swath_mask_band = module_options["swath_mask_band"]
try:
swath_mask_buffer = int(module_options["swath_mask_buffer"])
except ValueError:
gs.fatal(_("Buffer for swath_mask_buffer must be an integer."))

if not any(
swath_mask_band.replace("reflectance", "radiance") in module_options[bands]
or swath_mask_band in module_options[bands]
or module_options[bands] == "all"
for bands in ("bands", "anxillary_bands", "flag_bands")
):
gs.fatal(
_(
"Band {} to be used as swath mask is not requested for import in bands, anxillary_bands or flag_bands",
).format(swath_mask_band),
)
del module_options["swath_mask_band"]
del module_options["swath_mask_buffer"]

return swath_mask_band, swath_mask_buffer


def check_files_list(file_path_list: str | None, product_type: str) -> list[Path]:
"""Check if files in a list of files exist and gives a warning otherwise."""
if not file_path_list:
gs.fatal(_("No scenes found to process"))
existing_paths = []
file_pattern = re.compile(
f".*{S3_FILE_PATTERN[product_type].replace('*', '.*')}", re.IGNORECASE
f".*{S3_FILE_PATTERN[product_type].replace('*', '.*')}",
re.IGNORECASE,
)
for file_path in file_path_list:
file_path_object = Path(file_path)
Expand All @@ -217,24 +266,28 @@
else:
gs.warning(
_(
"File {file_path} does not match expected pattern {pattern} of product {product_type}."
"File {file_path} does not match expected pattern {pattern} of product {product_type}.",
).format(
file_path=file_path,
pattern=S3_FILE_PATTERN[product_type],
product_type=product_type,
)
),
)
else:
gs.warning(_("File {} not found").format(file_path))
return existing_paths


def parse_s3_file_name(file_name):
"""Extract info from Sentinel-3 file name according to naming convention:
def parse_s3_file_name(file_name: str) -> dict:
"""Extract info from Sentinel-3 file name according to naming convention.

Naming convention is described here:
https://sentinels.copernicus.eu/web/sentinel/user-guides/sentinel-3-slstr/naming-convention
Assumes that file name is checked to be a valid / supported Sentinel-3 file name, e.g.:
Assumes that file name is checked to be a valid / supported Sentinel-3 file name.
E.g.:
"S3B_SL_1_RBT____20240129T110139_20240129T110439_20240130T114811_0180_089_094_1800_PS2_O_NT_004.SEN3"
The suffix does not matter
The suffix does not matter.

:param file_name: string representing the file name of a Senintel-3 scene
"""
try:
Expand All @@ -243,9 +296,16 @@
"instrument": file_name[4:6],
"level": file_name[7],
"product": file_name[9:12],
"start_time": datetime.strptime(file_name[16:31], "%Y%m%dT%H%M%S"),
"end_time": datetime.strptime(file_name[32:47], "%Y%m%dT%H%M%S"),
"ingestion_time": datetime.strptime(file_name[48:63], "%Y%m%dT%H%M%S"),
"start_time": datetime.strptime(file_name[16:31], "%Y%m%dT%H%M%S").replace(
tzinfo=timezone.utc,
),
"end_time": datetime.strptime(file_name[32:47], "%Y%m%dT%H%M%S").replace(
tzinfo=timezone.utc,
),
"ingestion_time": datetime.strptime(
file_name[48:63],
"%Y%m%dT%H%M%S",
).replace(tzinfo=timezone.utc),
"duration": file_name[64:68],
"cycle": file_name[69:72],
"relative_orbit": file_name[73:76],
Expand All @@ -256,25 +316,29 @@


def group_scenes(
s3_files,
group_variables=(
s3_files: list[Path],
group_variables: tuple[str] = (
"mission_id",
"instrument",
"level",
"product",
"cycle",
"relative_orbit",
),
):
"""
Group scenes along track and date by information from the file name:
) -> dict:
"""Group scenes along track and date by information from the file name.

The following variables can be used to group the scenes:
1. mission ID
2. product type
3. temporal granule
4. duration
2. instrument
3. level
4. product
5. cycle
6. relative orbit
: param s3_filesv: list of pathlib.Path objects with Sentinel-3 files
7. temporal granule

:param s3_files: list of pathlib.Path objects with Sentinel-3 files
:param group_variables: tuple of strings with the variables to group the scenes by
"""
groups = {}
for sfile in s3_files:
Expand All @@ -284,7 +348,7 @@
)
group_id = "_".join(
[s3_name_dict[group_var] for group_var in group_variables]
+ [s3_name_dict["start_time"].strftime("%Y%m%d")]
+ [s3_name_dict["start_time"].strftime("%Y%m%d")],
)
if group_id in groups:
groups[group_id].append(str(sfile))
Expand All @@ -293,12 +357,24 @@
return groups


def process_scene_group(scene_group, module_options=None, module_flags=None):
"""Import a group of Sentinel3 scenes"""
def process_scene_group(
scene_group: tuple,
module_options: dict | None = None,
module_flags: str | None = None,
swath_mask: tuple[str, int] | None = None,
) -> str:
"""Import a group of Sentinel3 scenes and create a swath mask if requested.

:param scene_group: tuple with group name and list of scene file paths
:param module_options: dictionary with module options
:param module_flags: list of module flags
"""
gs.verbose(_("Processing scene group {}...").format(scene_group[0]))

module_options["basename"] = module_options["basename"] or scene_group[0]

swath_mask_band, swath_mask_buffer = swath_mask

i_sentinel3_import = Module(
"i.sentinel3.import",
input=",".join(scene_group[1]),
Expand All @@ -307,22 +383,57 @@
flags=module_flags,
quiet=True,
)
return i_sentinel3_import.outputs.stdout
module_stdout = i_sentinel3_import.outputs.stdout
module_stdout = module_stdout.strip() if module_stdout else ""

if swath_mask_band and module_stdout:
if swath_mask_band not in module_stdout:
gs.warning(
_("{band} not imported for group {group}").format(
band=swath_mask_band,
group=scene_group[0],
),
)
else:
swath_mask_band_map, start_time, end_time, semantic_label = [
line.split("|")
for line in module_stdout.split("\n")
if swath_mask_band in line
][0]
swath_map_name = f"{module_options['basename']}_swath_mask"
Module(
"r.grow",
input=swath_mask_band_map,
output=swath_map_name,
radius=-swath_mask_buffer,
old=1,
new=-1,
quiet=True,
env_=i_sentinel3_import.env_,
)
module_stdout += f"\n{swath_map_name}@{gs.gisenv().get('MAPSET')}|{start_time}|{end_time}|S3_swath_mask"

return module_stdout


def distribute_cores(nprocs: int, groups_n: int) -> tuple[int, int]:
"""Distribute cores across expected processes.

def distribute_cores(nprocs, groups_n):
"""Distribute cores across inner (parallel processes within
Distribute cores across inner (parallel processes within
i.sentinel3.import) and outer (parallel runs of i.sentinel3.import)
loop of processes. At least one core is allocated to inner
(i.sentinel3.import) and outer (group of Sentinel-3 scenes)
process.
Order if returns is inner, outer."""
return max(1, floor(nprocs / groups_n)), min(groups_n, nprocs)
Order if returns is inner, outer.

:param nprocs: number of available cores
:param groups_n: number of groups to process
"""
return max(1, floor(nprocs / groups_n)), min(groups_n, nprocs)

def main():
"""Do the main work"""

def main() -> None:
"""Do the main work."""
# Get GRASS GIS environment
gisenv = dict(gs.gisenv())

Expand All @@ -334,30 +445,32 @@
tgis_strds = tgis.SpaceTimeRasterDataset(strds_long_name)

# Check if input is complete and valid
# Check if target STRDS exists and create it if not or abort if overwriting is not allowed
# Check if target STRDS exists and create it if not
# or abort if overwriting is not allowed
if tgis_strds.is_in_db():
if not gs.overwrite():
gs.fatal(
_(
"Output STRDS <{}> exists."
"Use --overwrite together with -e to modify the existing STRDS."
).format(options["output"])
"Use --overwrite together with -e to modify the existing STRDS.",
).format(options["output"]),
)
elif not options["title"] or not options["description"]:
gs.fatal(
_(
"Creation of a new STRDS <{}> requires the 'title' and 'description' option"
).format(strds_long_name)
"Creation of a new STRDS <{}> requires the 'title' and 'description' option",
).format(strds_long_name),
)

# Group input scenes
groups_to_process = group_scenes(
check_file_input(options["input"], options["product_type"])
check_file_input(options["input"], options["product_type"]),
)

# Distribute cores
nprocs_inner, nprocs_outer = distribute_cores(
int(options["nprocs"]), len(groups_to_process)
int(options["nprocs"]),
len(groups_to_process),
)

# Setup import module, and collect flags amd options
Expand All @@ -377,6 +490,7 @@
process_scene_group,
module_options=module_options,
module_flags=[flag for flag in "cdnjkor" if flags[flag]],
swath_mask=check_swath_mask_input(options),
)

if nprocs_outer > 1:
Expand All @@ -388,14 +502,12 @@
]

# Remove empty results
register_strings = [
result.strip() for result in register_strings if result and result.strip()
]
register_strings = [result for result in register_strings if result]
if not register_strings:
gs.warning(
_("No valid data found in <{}>. Nothing to register in STRDS.").format(
options["input"]
)
options["input"],
),
)
sys.exit(0)

Expand All @@ -406,13 +518,14 @@
strds_long_name = f"{options['output']}@{gisenv['MAPSET']}"
tgis_strds = tgis.SpaceTimeRasterDataset(strds_long_name)

# Check if target STRDS exists and create it if not or abort if overwriting is not allowed
# Check if target STRDS exists and create it if not
# or abort if overwriting is not allowed
if tgis_strds.is_in_db() and not gs.overwrite():
gs.fatal(
_(
"Output STRDS <{}> exists."
"Use --overwrite together with -e to modify the existing STRDS."
).format(options["output"])
"Use --overwrite together with -e to modify the existing STRDS.",
).format(options["output"]),
)

# Create STRDS if needed
Expand Down
Loading