From a375d7a91e9fc9de3da0e676685f88dea0094d55 Mon Sep 17 00:00:00 2001 From: stijnvanhoey Date: Mon, 18 Sep 2023 21:28:38 +0200 Subject: [PATCH 1/5] Add grouping function for s3-path based search --- src/vptstools/s3.py | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/src/vptstools/s3.py b/src/vptstools/s3.py index 11605d9..c7f3ffa 100644 --- a/src/vptstools/s3.py +++ b/src/vptstools/s3.py @@ -68,7 +68,7 @@ def from_s3fs_enlisting(cls, h5_file_path): return cls( h5_file_path.split("/")[1], *cls.parse_file_name(str(h5_file_path)), - h5_file_path.split("/")[1], + h5_file_path.split("/")[2], ) @staticmethod @@ -199,6 +199,31 @@ def extract_daily_group_from_inventory(file_path): path_info.day, ) +def extract_daily_group_from_path(file_path): + """Extract file name components to define a group + + The coverage file counts the number of files available + per group (e.g. daily files per radar). This function is passed + to the Pandas ``groupby`` to translate the file path to a + countable set (e.g. source, radar-code, year month and day for + daily files per radar). + + Parameters + ---------- + file_path : str + File path of the ODIM HDF5 file. Only the file name is taken + into account and a folder-path is ignored. + """ + path_info = OdimFilePath.from_s3fs_enlisting(file_path) + return ( + path_info.source, + path_info.file_type, + path_info.radar_code, + path_info.year, + path_info.month, + path_info.day, + ) + def _last_modified_from_inventory(df, modified_days_ago="2day"): """Filter manifest files on last modified From 7da453ccd204f1f7d6b71a97c50a541519e9a98d Mon Sep 17 00:00:00 2001 From: stijnvanhoey Date: Mon, 18 Sep 2023 21:29:12 +0200 Subject: [PATCH 2/5] Add new cli option to use an s3 path instead of the modified date --- src/vptstools/bin/vph5_to_vpts.py | 93 +++++++++++++++++++++---------- 1 file changed, 63 insertions(+), 30 deletions(-) diff --git a/src/vptstools/bin/vph5_to_vpts.py b/src/vptstools/bin/vph5_to_vpts.py index 08b4011..1ea8e38 100644 --- a/src/vptstools/bin/vph5_to_vpts.py +++ b/src/vptstools/bin/vph5_to_vpts.py @@ -12,7 +12,7 @@ import pandas as pd from vptstools.vpts import vpts, vpts_to_csv -from vptstools.s3 import handle_manifest, OdimFilePath +from vptstools.s3 import handle_manifest, OdimFilePath, extract_daily_group_from_path from vptstools.bin.click_exception import catch_all_exceptions, report_click_exception_to_sns # Load environmental variables from file in dev @@ -48,7 +48,14 @@ help="Range of HDF5 VP files to include, i.e. files modified between now and N" "modified-days-ago. If 0, all HDF5 files in the bucket will be included.", ) -def cli(modified_days_ago): +@click.option( + "--path-s3-folder", + "path_s3_folder", + type=str, + help="Apply the conversion to VPTS to all files within a S3 sub-folders instead " + "of using the modified date of the files. This option does not use the inventory files." +) +def cli(modified_days_ago, path_s3_folder=None): """Convert and aggregate HDF5 VP files to daily and monthly VPTS CSV files on S3 bucket Check the latest modified @@ -59,12 +66,17 @@ def cli(modified_days_ago): HDF5 files were recently added and convert those files from ODIM bird profile to the `VPTS CSV format `_. Finally, upload the generated daily/monthly VPTS files to S3. + When using the `path_s3_folder` option, the modified date is not used, but e recursive search within the given s3 + path is applied to define the daily/monthly files to recreate. + E.g. `vph5_to_vpts --path-s3-folder uva/hdf5/nldhl/2019` or + `vph5_to_vpts --path-s3-folder baltrad/hdf5/bejab/2022/10`. + Besides, while scanning the S3 inventory to define the files to convert, the CLI routine creates the ``coverage.csv`` file and uploads it to the bucket. Configuration is loaded from the following environmental variables: - - ``S3_BUCKET``: AWS S3 bucket to read and write data to + - ``DESTINATION_BUCKET``: AWS S3 bucket to read and write data to - ``INVENTORY_BUCKET``: AWS S3 bucket configured as `s3 inventory bucket `_ for the S3_BUCKET. - ``SNS_TOPIC``: AWS SNS topic to report when routine fails - ``AWS_REGION``: AWS region where the SNS alerting is defined @@ -77,36 +89,57 @@ def cli(modified_days_ago): storage_options = dict() boto3_options = dict() - # Load the S3 manifest of today - click.echo(f"Load the S3 manifest of {date.today()}.") - - manifest_parent_key = ( - pd.Timestamp.now(tz="utc").date() - pd.Timedelta("1day") - ).strftime(f"%Y-%m-%dT{MANIFEST_HOUR_OF_DAY}Z") - # define manifest of today - s3_url = f"{MANIFEST_URL}/{manifest_parent_key}/manifest.json" - - click.echo(f"Extract coverage and days to recreate from manifest {s3_url}.") - if modified_days_ago == 0: - modified_days_ago = (pd.Timestamp.now(tz="utc") - S3_BUCKET_CREATION).days + 1 - click.echo( - f"Recreate the full set of bucket files (files " - f"modified since {modified_days_ago}days). " - f"This will take a while!" + if path_s3_folder: + click.echo(f"Applying the vpts conversion to all files within {path_s3_folder}. " + f"Ignoring the modified date of the files.") + + inbo_s3 = s3fs.S3FileSystem(**storage_options) + odim5_files = inbo_s3.glob(f"{S3_BUCKET}/{path_s3_folder}/**/*.h5") + + days_to_create_vpts = ( + pd.DataFrame(odim5_files, columns=["file"]) + .set_index("file") + .groupby(extract_daily_group_from_path).size().reset_index() + .rename( + columns={ + "index": "directory", + 0: "file_count", + } + ) ) - df_cov, days_to_create_vpts = handle_manifest( - s3_url, - modified_days_ago=f"{modified_days_ago}day", - storage_options=storage_options, - ) + else: + # Load the S3 manifest of today + click.echo(f"Load the S3 manifest of {date.today()} to rerun only files modified " + f"since {modified_days_ago} days ago.") + + manifest_parent_key = ( + pd.Timestamp.now(tz="utc").date() - pd.Timedelta("1day") + ).strftime(f"%Y-%m-%dT{MANIFEST_HOUR_OF_DAY}Z") + # define manifest of today + s3_url = f"{MANIFEST_URL}/{manifest_parent_key}/manifest.json" + + click.echo(f"Extract coverage and days to recreate from manifest {s3_url}.") + if modified_days_ago == 0: + modified_days_ago = (pd.Timestamp.now(tz="utc") - S3_BUCKET_CREATION).days + 1 + click.echo( + f"Recreate the full set of bucket files (files " + f"modified since {modified_days_ago}days). " + f"This will take a while!" + ) - # Save coverage file to S3 bucket - click.echo("Save coverage file to S3.") - df_cov["directory"] = df_cov["directory"].str.join("/") - df_cov.to_csv( - f"s3://{S3_BUCKET}/coverage.csv", index=False, storage_options=storage_options - ) + df_cov, days_to_create_vpts = handle_manifest( + s3_url, + modified_days_ago=f"{modified_days_ago}day", + storage_options=storage_options, + ) + + # Save coverage file to S3 bucket + click.echo("Save coverage file to S3.") + df_cov["directory"] = df_cov["directory"].str.join("/") + df_cov.to_csv( + f"s3://{S3_BUCKET}/coverage.csv", index=False, storage_options=storage_options + ) # Run VPTS daily conversion for each radar-day with modified files inbo_s3 = s3fs.S3FileSystem(**storage_options) From 2e475680535ca2918f3f7496e63730d4582187b1 Mon Sep 17 00:00:00 2001 From: Peter Desmet Date: Fri, 8 Dec 2023 22:42:25 +0100 Subject: [PATCH 3/5] Correct typo --- src/vptstools/bin/vph5_to_vpts.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/vptstools/bin/vph5_to_vpts.py b/src/vptstools/bin/vph5_to_vpts.py index 1ea8e38..b92a132 100644 --- a/src/vptstools/bin/vph5_to_vpts.py +++ b/src/vptstools/bin/vph5_to_vpts.py @@ -66,7 +66,7 @@ def cli(modified_days_ago, path_s3_folder=None): HDF5 files were recently added and convert those files from ODIM bird profile to the `VPTS CSV format `_. Finally, upload the generated daily/monthly VPTS files to S3. - When using the `path_s3_folder` option, the modified date is not used, but e recursive search within the given s3 + When using the `path_s3_folder` option, the modified date is not used, but a recursive search within the given s3 path is applied to define the daily/monthly files to recreate. E.g. `vph5_to_vpts --path-s3-folder uva/hdf5/nldhl/2019` or `vph5_to_vpts --path-s3-folder baltrad/hdf5/bejab/2022/10`. From 23f5a1600358d9de9c7654e286c8b9b67efb06de Mon Sep 17 00:00:00 2001 From: stijnvanhoey Date: Mon, 11 Dec 2023 16:44:35 +0100 Subject: [PATCH 4/5] Pin s3fs dependency --- setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index 19011c1..b282cbb 100644 --- a/setup.cfg +++ b/setup.cfg @@ -59,7 +59,7 @@ exclude = [options.extras_require] # Requirements to work with the transfer functionalities (FTP/S3) transfer = - s3fs[boto3] + s3fs[boto3]==2023.5.0 paramiko fsspec pyarrow From 99a3e97860078df39806353388ae9bbdb602a057 Mon Sep 17 00:00:00 2001 From: stijnvanhoey Date: Mon, 11 Dec 2023 16:51:37 +0100 Subject: [PATCH 5/5] Improve cli documentation rendering in terminal --- src/vptstools/bin/vph5_to_vpts.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/vptstools/bin/vph5_to_vpts.py b/src/vptstools/bin/vph5_to_vpts.py index b92a132..9da77a8 100644 --- a/src/vptstools/bin/vph5_to_vpts.py +++ b/src/vptstools/bin/vph5_to_vpts.py @@ -76,11 +76,15 @@ def cli(modified_days_ago, path_s3_folder=None): Configuration is loaded from the following environmental variables: + \b - ``DESTINATION_BUCKET``: AWS S3 bucket to read and write data to - - ``INVENTORY_BUCKET``: AWS S3 bucket configured as `s3 inventory bucket `_ for the S3_BUCKET. + - ``INVENTORY_BUCKET``: AWS S3 bucket configured as `s3 inventory bucket + `_ + for the S3_BUCKET. - ``SNS_TOPIC``: AWS SNS topic to report when routine fails - ``AWS_REGION``: AWS region where the SNS alerting is defined - - ``AWS_PROFILE``: AWS profile (mainly useful for local development when working with multiple AWS profiles) + - ``AWS_PROFILE``: AWS profile (mainly useful for local development when + working with multiple AWS profiles) """ if AWS_PROFILE: storage_options = {"profile": AWS_PROFILE}