diff --git a/containers/sat/download_process_sat.py b/containers/sat/download_process_sat.py index 19da70f..28c963d 100644 --- a/containers/sat/download_process_sat.py +++ b/containers/sat/download_process_sat.py @@ -95,8 +95,8 @@ class Config: cadence="15min", product_id="EO:EUM:DAT:MSG:HRSEVIRI-IODC", zarr_fmtstr={ - "hrv": "%Y_hrv_iodc.zarr", - "nonhrv": "%Y_nonhrv_iodc.zarr", + "hrv": "%Y%m_hrv_iodc.zarr", + "nonhrv": "%Y%m_nonhrv_iodc.zarr", }, ), "severi": Config( @@ -104,8 +104,8 @@ class Config: cadence="5min", product_id="EO:EUM:DAT:MSG:MSG15-RSS", zarr_fmtstr={ - "hrv": "%Y_hrv.zarr", - "nonhrv": "%Y_nonhrv.zarr", + "hrv": "%Y%m_hrv.zarr", + "nonhrv": "%Y%m_nonhrv.zarr", }, ), # Optional @@ -114,8 +114,8 @@ class Config: cadence="15min", product_id="EO:EUM:DAT:MSG:HRSEVIRI", zarr_fmtstr={ - "hrv": "%Y_hrv_odegree.zarr", - "nonhrv": "%Y_nonhrv_odegree.zarr", + "hrv": "%Y%m_hrv_odegree.zarr", + "nonhrv": "%Y%m_nonhrv_odegree.zarr", }, ), } @@ -576,18 +576,11 @@ def _rewrite_zarr_times(output_name: str) -> None: type=pathlib.Path, ) parser.add_argument( - "--start_date", "-s", - help="Date to download from (YYYY-MM-DD)", - type=dt.date.fromisoformat, - required=False, - default=str(dt.datetime.now(tz=dt.UTC).date()), -) -parser.add_argument( - "--end_date", "-e", - help="Date to download to (YYYY-MM-DD)", - type=dt.date.fromisoformat, - required=False, - default=str(dt.datetime.now(tz=dt.UTC).date()), + "--month", "-m", + help="Month to download data for (YYYY-MM)", + type=str, + required=True, + default=str(dt.datetime.now(tz=dt.UTC).strftime("%Y-%m")), ) parser.add_argument( "--delete_raw", "--rm", @@ -597,6 +590,7 @@ def _rewrite_zarr_times(output_name: str) -> None: ) def run(args: argparse.Namespace) -> None: + """Run the download and processing pipeline.""" prog_start = dt.datetime.now(tz=dt.UTC) log.info(f"{prog_start!s}: Running with args: {args}") @@ -607,12 +601,16 @@ def run(args: argparse.Namespace) -> None: sat_config = CONFIGS[args.sat] # Get start and end times for run - start: dt.date = args.start_date - end: dt.date = args.end_date + dt.timedelta(days=1) if args.end_date == start else args.end_date + start: dt.datetime = dt.datetime.strptime(args.month, "%Y-%m") + end: dt.datetime = \ + start.replace(month=start.month + 1) if start.month < 12 \ + else start.replace(year=start.year + 1, month=1) \ + - dt.timedelta(days=1) scan_times: list[pd.Timestamp] = pd.date_range( start=start, end=end, freq=sat_config.cadence, + inclusive="left", ).tolist() # Estimate average runtime diff --git a/local_archives/__init__.py b/local_archives/__init__.py index 4e1f105..45265eb 100644 --- a/local_archives/__init__.py +++ b/local_archives/__init__.py @@ -7,7 +7,7 @@ import resources from constants import LOCATIONS_BY_ENVIRONMENT -from . import nwp +from . import nwp, sat resources_by_env = { "leo": { @@ -36,14 +36,17 @@ all_assets: list[dg.AssetsDefinition] = [ *nwp.all_assets, + *sat.all_assets, ] all_jobs: list[dg.JobDefinition] = [ *nwp.all_jobs, + *sat.all_jobs, ] all_schedules: list[dg.ScheduleDefinition] = [ *nwp.all_schedules, + *sat.all_schedules, ] defs = dg.Definitions( diff --git a/local_archives/nwp/ceda/ceda_global.py b/local_archives/nwp/ceda/ceda_global.py index e5ae40a..a74ef59 100644 --- a/local_archives/nwp/ceda/ceda_global.py +++ b/local_archives/nwp/ceda/ceda_global.py @@ -1,9 +1,10 @@ -import dagster as dg +import datetime as dt import os from typing import Any -import datetime as dt +import dagster as dg from dagster_docker import PipesDockerClient + from constants import LOCATIONS_BY_ENVIRONMENT env = os.getenv("ENVIRONMENT", "local") diff --git a/local_archives/sat/__init__.py b/local_archives/sat/__init__.py index b2037ce..28aeb1d 100644 --- a/local_archives/sat/__init__.py +++ b/local_archives/sat/__init__.py @@ -1,11 +1,14 @@ -from dagster import Definitions, load_assets_from_modules +"""Definitions for the sat dagster code location.""" -from sat import assets, jobs +import dagster as dg +from . import eumetsat -all_assets = load_assets_from_modules([assets]) +all_assets: list[dg.AssetsDefinition] = [ + *eumetsat.all_assets, +] + +all_jobs: list[dg.JobDefinition] = [] + +all_schedules: list[dg.ScheduleDefinition] = [] -defs = Definitions( - assets=all_assets, - schedules=jobs.schedules, -) diff --git a/local_archives/sat/assets/__init__.py b/local_archives/sat/assets/__init__.py deleted file mode 100644 index eb0e438..0000000 --- a/local_archives/sat/assets/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from sat.assets.eumetsat.iodc import download_eumetsat_iodc_data -from sat.assets.eumetsat.rss import download_eumetsat_rss_data -from sat.assets.eumetsat.zero_deg import download_eumetsat_0_deg_data diff --git a/local_archives/sat/assets/eumetsat/__init__.py b/local_archives/sat/assets/eumetsat/__init__.py deleted file mode 100644 index f5b4ead..0000000 --- a/local_archives/sat/assets/eumetsat/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .common import download_product_range, EumetsatConfig \ No newline at end of file diff --git a/local_archives/sat/assets/eumetsat/common.py b/local_archives/sat/assets/eumetsat/common.py deleted file mode 100644 index e1cfcad..0000000 --- a/local_archives/sat/assets/eumetsat/common.py +++ /dev/null @@ -1,39 +0,0 @@ -"""EO:EUM:DAT:MSG:HRSEVIRI-IODC -""" -import os - -import pandas as pd -from dagster import Config -from satip.eumetsat import DownloadManager - - -class EumetsatConfig(Config): - api_key: str - api_secret: str - data_dir: str - start_date: str - end_date: str - -def download_product_range(api_key: str, api_secret: str, data_dir: str, product_id: str, start_date: pd.Timestamp, end_date: pd.Timestamp): - download_manager = DownloadManager(user_key=api_key, user_secret=api_secret, data_dir=data_dir) - start_str = start_date.strftime("%Y-%m-%d") - end_str = end_date.strftime("%Y-%m-%d") - date_range = pd.date_range(start=start_str, - end=end_str, - freq="30min") - filenames_downloaded = [] - for filename in os.listdir(data_dir): - filenames_downloaded.append(filename.split("/")[-1]) - for date in date_range: - start_date = pd.Timestamp(date) - pd.Timedelta("1min") - end_date = pd.Timestamp(date) + pd.Timedelta("1min") - datasets = download_manager.identify_available_datasets( - start_date=start_date.tz_localize(None).strftime("%Y-%m-%d-%H-%M-%S"), - end_date=end_date.tz_localize(None).strftime("%Y-%m-%d-%H-%M-%S"), - ) - filtered_datasets = [] - for dataset in datasets: - if dataset["id"] not in filenames_downloaded: - filtered_datasets.append(dataset) - datasets = filtered_datasets - download_manager.download_datasets(datasets, product_id=product_id) diff --git a/local_archives/sat/assets/eumetsat/iodc.py b/local_archives/sat/assets/eumetsat/iodc.py deleted file mode 100644 index 2591814..0000000 --- a/local_archives/sat/assets/eumetsat/iodc.py +++ /dev/null @@ -1,14 +0,0 @@ -import pandas as pd -from dagster import asset # import the `dagster` library - -from . import EumetsatConfig, download_product_range - - -@asset -def download_eumetsat_iodc_data(config: EumetsatConfig) -> None: - download_product_range(api_key=config.api_key, - api_secret=config.api_secret, - data_dir=config.data_dir, - product_id="EO:EUM:DAT:MSG:HRSEVIRI-IODC", - start_date=pd.Timestamp(config.start_date), - end_date=pd.Timestamp(config.end_date)) diff --git a/local_archives/sat/eumetsat/__init__.py b/local_archives/sat/eumetsat/__init__.py new file mode 100644 index 0000000..ca94245 --- /dev/null +++ b/local_archives/sat/eumetsat/__init__.py @@ -0,0 +1,12 @@ +import dagster as dg + +from . import eumetsat_iodc + + +iodc_assets = dg.load_assets_from_modules( + modules=[eumetsat_iodc], + group_name="eumetsat_iodc", +) + +all_assets: list[dg.AssetsDefinition] = [*iodc_assets] + diff --git a/local_archives/sat/eumetsat/eumetsat_iodc.py b/local_archives/sat/eumetsat/eumetsat_iodc.py new file mode 100644 index 0000000..c7310a0 --- /dev/null +++ b/local_archives/sat/eumetsat/eumetsat_iodc.py @@ -0,0 +1,65 @@ +import datetime as dt +import os +from typing import Any + +import dagster as dg +from dagster_docker import PipesDockerClient + +from constants import LOCATIONS_BY_ENVIRONMENT + +env = os.getenv("ENVIRONMENT", "local") +ZARR_FOLDER = LOCATIONS_BY_ENVIRONMENT[env].SAT_ZARR_FOLDER + +@dg.asset( + name="zarr_archive", + description="".join(( + "Zarr archive of satellite data from EUMETSAT's IODC satellite.", + "Sourced via EUMDAC from EUMETSAT: ", + "https://navigator.eumetsat.int/product/EO:EUM:DAT:MSG:OCA-IODC\n", + "This asset is updated monthly, and surfaced as a Zarr Directory Store ", + "for each month. It is downloaded using the sat container: ", + "https://github.com/openclimatefix/dagster-dags", + )), + key_prefix=["sat", "eumetsat", "iodc"], + metadata={ + "archive_folder": dg.MetadataValue.text(f"{ZARR_FOLDER}/sat/eumetsat/india"), + "area": dg.MetadataValue.text("india"), + "source": dg.MetadataValue.text("eumetsat"), + "expected_runtime": dg.MetadataValue.text("TBD"), + }, + compute_kind="docker", + automation_condition=dg.AutomationCondition.eager(), + tags={ + # "dagster/max_runtime": str(60 * 60 * 10), # Should take 6 ish hours + "dagster/priority": "1", + "dagster/concurrency_key": "eumetsat", + }, + partitions_def=dg.MonthlyPartitionsDefinition( + start_date="2019-01-01", + end_offset=-3, + ), +) +def iodc_monthly( + context: dg.AssetExecutionContext, + pipes_docker_client: PipesDockerClient, +) -> Any: + image: str = "ghcr.io/openclimatefix/sat-etl:main" + it: dt.datetime = context.partition_time_window.start + return pipes_docker_client.run( + image=image, + command=[ + "iodc", + "-m", + it.strftime("%Y-%m"), + "--rm", + ], + env={ + "EUMETSAT_CONSUMER_KEY": os.environ["EUMETSAT_CONSUMER_KEY"], + "EUMETSAT_CONSUMER_SECRET": os.environ["EUMETSAT_CONSUMER_SECRET"], + }, + container_kwargs={ + "volumes": [f"{ZARR_FOLDER}/sat/eumetsat/india:/mnt/disks/sat"], + }, + context=context, + ).get_results() + diff --git a/local_archives/sat/jobs.py b/local_archives/sat/jobs.py deleted file mode 100644 index 9a89fac..0000000 --- a/local_archives/sat/jobs.py +++ /dev/null @@ -1,103 +0,0 @@ -import datetime as dt -import json -from typing import Any - -import dagster -from sat.assets import ( - download_eumetsat_0_deg_data, - download_eumetsat_iodc_data, - download_eumetsat_rss_data, -) -from sat.assets.eumetsat.common import EumetsatConfig - -jobs: list[dagster.JobDefinition] = [] -schedules: list[dagster.ScheduleDefinition] = [] - -base_path = "/mnt/storage_c/IODC/" - -# --- IODC jobs and schedules ---------------------------------------------- - -@dagster.daily_partitioned_config(start_date=dt.datetime(2017, 1, 1)) -def IODCDailyPartitionConfig(start: dt.datetime, _end: dt.datetime) -> dict[str, Any]: - # Do one day at a time - config = EumetsatConfig( - date=start.strftime("%Y-%m-%d"), - end_date=_end.strftime("%Y-%m-%d"), - data_dir=base_path, - api_key="", - api_secret="", - - ) - config = json.loads(config.json()) - config["api_key"] = {"env": "EUMETSAT_API_KEY"} - config["api_secret"] = {"env": "EUMETSAT_API_SECRET"} - return {"ops": {"download_eumetsat_iodc_data": {"config": config}}} - - -@dagster.job( - config=IODCDailyPartitionConfig, - tags={"source": "eumetsat", dagster.MAX_RUNTIME_SECONDS_TAG: 345600}, # 4 days -) -def iodc_daily_archive() -> None: - """Download IODC data for a given day.""" - download_eumetsat_iodc_data() - -jobs.append(iodc_daily_archive) -schedules.append(dagster.build_schedule_from_partitioned_job(iodc_daily_archive, hour_of_day=23)) - - -@dagster.daily_partitioned_config(start_date=dt.datetime(2008, 1, 1)) -def RSSDailyPartitionConfig(start: dt.datetime, _end: dt.datetime) -> dict[str, Any]: - # Do one day at a time - config = EumetsatConfig( - date=start.strftime("%Y-%m-%d"), - end_date=_end.strftime("%Y-%m-%d"), - data_dir=base_path, - api_key="", - api_secret="", - - ) - config = json.loads(config.json()) - config["api_key"] = {"env": "EUMETSAT_API_KEY"} - config["api_secret"] = {"env": "EUMETSAT_API_SECRET"} - return {"ops": {"download_eumetsat_rss_data": {"config": config}}} - - -@dagster.job( - config=RSSDailyPartitionConfig, - tags={"source": "eumetsat", dagster.MAX_RUNTIME_SECONDS_TAG: 345600}, # 4 days -) -def rss_daily_archive() -> None: - """Download RSS data for a given day.""" - download_eumetsat_rss_data() - -jobs.append(rss_daily_archive) -schedules.append(dagster.build_schedule_from_partitioned_job(rss_daily_archive, hour_of_day=23)) - -@dagster.daily_partitioned_config(start_date=dt.datetime(2008, 1, 1)) -def ZeroDegDailyPartitionConfig(start: dt.datetime, _end: dt.datetime) -> dict[str, Any]: - # Do one day at a time - config = EumetsatConfig( - date=start.strftime("%Y-%m-%d"), - end_date=_end.strftime("%Y-%m-%d"), - data_dir=base_path, - api_key="", - api_secret="", - - ) - config = json.loads(config.json()) - config["api_key"] = {"env": "EUMETSAT_API_KEY"} - config["api_secret"] = {"env": "EUMETSAT_API_SECRET"} - return {"ops": {"download_eumetsat_0_deg_data": {"config": config}}} - - -@dagster.job( - config=ZeroDegDailyPartitionConfig, - tags={"source": "eumetsat", dagster.MAX_RUNTIME_SECONDS_TAG: 345600}, # 4 days -) -def zero_deg_daily_archive() -> None: - """Download RSS data for a given day.""" - download_eumetsat_0_deg_data() - -jobs.append(zero_deg_daily_archive) -schedules.append(dagster.build_schedule_from_partitioned_job(zero_deg_daily_archive, hour_of_day=23))