diff --git a/ESDC/README.md b/ESDC/README.md index 0084c74..6b32764 100644 --- a/ESDC/README.md +++ b/ESDC/README.md @@ -153,7 +153,7 @@ The cube generation process is divided in four phases: ### 1. Downloading required raw datasets -These datasets are the input data. Each dataset can be a set of `.nc`, `.hdf`, or `.tif` files. These files contains data with its original configuration. The downloading code for each dataset is found at the `inputs-collect` folder. Note that some datasets can be acquired via `xcube-cci` and don't require to be downloaded. Additional datasets were acquired va ftp (e.g. GLEAM) or sftp (e.g. GFED4) and don't have a download program. Other datasets were provided by their original providers (e.g. FLUXCOM) and don't have a download program neither. +These datasets are the input data. Each dataset can be a set of `.nc`, `.hdf`, or `.tif` files. These files contains data with its original configuration. The downloading code for each dataset is found at the `inputs-collect` folder. Note that some datasets can be acquired via `xcube-cci` and don't require to be downloaded. Additional datasets were acquired va ftp (e.g. GLEAM) or sftp (e.g. GFED4) and don't have a download program. Other datasets were provided by their original providers (e.g. FLUXCOM) and don't have a download program neither. For some datasets, user accounts are necessary. Please follow the instructions on their websites. On the Homepage of [GLEAM](https://www.gleam.eu/) you can register for downloading under Downloads. For ERA5, refer to the [Copernicus Climate Data Store (CDS) API How-To](https://cds.climate.copernicus.eu/api-how-to) for more information. For GFED4, use the EOSDIS [Earthdata Login](https://urs.earthdata.nasa.gov/oauth/authorize?client_id=YQOhivHfMTau88rjbMOVyg&response_type=code&redirect_uri=https://daac.ornl.gov/cgi-bin/urs/urs_logon_proc.pl&state=https%3A%2F%2Fdaac.ornl.gov%2F) for registration. ``` # MODIS: Download daily .hdf files @@ -166,8 +166,20 @@ inputs-collect/download-GOME2-SIF.py inputs-collect/download-GOSIF.py inputs-collect/extract-gz-gosif.py +# RTSIF: Download 8-days .tif files +inputs-collect/download-RTSIF.py + # CCI-SM: Download daily .nc files inputs-collect/download-cci-sm.py + +# ERA5: Download hourly .nc files +inputs-collect/download-ERA5.py + +# GLEAM: Download daily .nc files +inputs-collect/download-GLEAM.py + +# GFED4: Download monthly .hdf files +inputs-collect/download-GFED4.py ``` ### 2. Preprocessing datasets diff --git a/ESDC/inputs-collect/download-ERA5.py b/ESDC/inputs-collect/download-ERA5.py new file mode 100644 index 0000000..43ec9dd --- /dev/null +++ b/ESDC/inputs-collect/download-ERA5.py @@ -0,0 +1,89 @@ +from tqdm import tqdm +import os +import cdsapi +from multiprocessing import Pool, Manager + +def download_data(args): + pathOut, variable, year, month, progress_queue = args + directory = os.path.join(pathOut, variable, year) + + if not os.path.exists(directory): + try: + os.makedirs(directory) + except FileExistsError: + pass + + filename = f"{variable}.hh.*.era5.{month}.{year}.nc" + filepath = os.path.join(directory, filename) + + c = cdsapi.Client() + c.retrieve( + 'reanalysis-era5-single-levels', + { + 'product_type': 'reanalysis', + 'format': 'netcdf', + 'year': year, + 'month': month, + 'day': [ + '01', '02', '03', + '04', '05', '06', + '07', '08', '09', + '10', '11', '12', + '13', '14', '15', + '16', '17', '18', + '19', '20', '21', + '22', '23', '24', + '25', '26', '27', + '28', '29', '30', + '31', + ], + 'time': [ + '00:00', '01:00', '02:00', + '03:00', '04:00', '05:00', + '06:00', '07:00', '08:00', + '09:00', '10:00', '11:00', + '12:00', '13:00', '14:00', + '15:00', '16:00', '17:00', + '18:00', '19:00', '20:00', + '21:00', '22:00', '23:00', + ], + 'variable': variable, + }, + filepath + ) + + progress_queue.put(1) + return f"Downloaded {filepath}" + +def main(): + pathOut = "~/data/ERA5/source" + pathOut = os.path.expanduser(pathOut) + + if not os.path.exists(pathOut): + os.makedirs(pathOut) + + years = [str(year) for year in range(1971, 2023)] + months = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12'] + variables = ['2m_temperature', 'evaporation', 'maximum_2m_temperature_since_previous_post_processing', 'minimum_2m_temperature_since_previous_post_processing', 'total_precipitation', 'surface_solar_radiation_downwards'] + + args_list = [] + for variable in variables: + for year in years: + for month in months: + args_list.append((pathOut, variable, year, month)) + + with Pool() as pool, Manager() as manager: + progress_queue = manager.Queue() + total_tasks = len(args_list) + results = [] + + with tqdm(total=total_tasks) as pbar: + for result in pool.imap_unordered(download_data, [(args + (progress_queue,)) for args in args_list]): + results.append(result) + pbar.update(progress_queue.get()) + + for result in results: + print(result) + +if __name__ == '__main__': + main() diff --git a/ESDC/inputs-collect/download-GFED4.py b/ESDC/inputs-collect/download-GFED4.py new file mode 100644 index 0000000..e308807 --- /dev/null +++ b/ESDC/inputs-collect/download-GFED4.py @@ -0,0 +1,76 @@ +import requests +from tqdm import tqdm +import os + +path_out = "~/data/GFED4/source" +path_out = os.path.expanduser(path_out) + +if not os.path.exists(path_out): + os.makedirs(path_out) + +base_url = 'https://daac.ornl.gov/daacdata/global_vegetation/fire_emissions_v4_R1/data/Monthly/' +file_prefix = 'GFED4.0_MQ_' + +years = range(1995, 2017) +months = ['{:02d}'.format(m) for m in range(1, 13)] + +session = requests.Session() +auth_url = 'https://urs.earthdata.nasa.gov/oauth/authorize?app_type=401&client_id=QyeRbBJg8YuY_WBh-KBztA&response_type=code&redirect_uri=https%3A%2F%2Fdaac.ornl.gov%2Fdaacdata%2Fdoesntmater&state=aHR0cHM6Ly9kYWFjLm9ybmwuZ292L2RhYWNkYXRhL2dsb2JhbF92ZWdldGF0aW9uL2ZpcmVfZW1pc3Npb25zX3Y0X1IxL2RhdGEvTW9udGhseS8' + +username = input("Enter your username: ") +password = input("Enter your password: ") + +auth_response = session.get(auth_url, auth=(username, password)) +session.auth = (username, password) + +if auth_response.status_code != 200: + print('Authentication failed:', auth_response.status_code) + exit() + + +def handle_data_url_response(response, month, year): + if response.status_code != 200: + if response.status_code == 404: + print(f'Data not available for {month}/{year}') + else: + print('Failed to access the data URL:', response.status_code) + + +def handle_file_url_response(response, file_url): + if response.status_code != 200: + if response.status_code == 404: + print("---") + else: + print(f'Failed to download: {file_url}') + + +for year in years: + for month in months: + file_name = file_prefix + str(year) + month + '_BA.hdf' + data_url = base_url + file_name + + response = session.get(data_url, auth=(username, password), allow_redirects=False) + print(response.status_code) + + handle_data_url_response(response, month, year) + + file_url = base_url + file_name + response = session.get(file_url, auth=(username, password), allow_redirects=False, stream=True) + + handle_file_url_response(response, file_url) + + if response.status_code != 200: + continue + + file_path = os.path.join(path_out, file_name) + total_size = int(response.headers.get('content-length', 0)) + progress_bar = tqdm(total=total_size, unit='B', unit_scale=True, desc=file_name) + + with open(file_path, 'wb') as file: + for data in response.iter_content(chunk_size=1024): + file.write(data) + progress_bar.update(len(data)) + + progress_bar.close() + print(f'Downloaded: {file_name}') + print("---") diff --git a/ESDC/inputs-collect/download-GLEAM.py b/ESDC/inputs-collect/download-GLEAM.py new file mode 100644 index 0000000..52a5e47 --- /dev/null +++ b/ESDC/inputs-collect/download-GLEAM.py @@ -0,0 +1,69 @@ +import os +import paramiko +from tqdm import tqdm +from multiprocessing import Pool + +def download_file(args): + host, port, username, password, remote_file, local_file = args + transport = paramiko.Transport((host, port)) + transport.connect(username=username, password=password) + sftp = paramiko.SFTPClient.from_transport(transport) + + sftp.get(remote_file, local_file) + + sftp.close() + transport.close() + +def download_files(host, port, username, password, remote_dir, local_dir): + transport = paramiko.Transport((host, port)) + transport.connect(username=username, password=password) + sftp = paramiko.SFTPClient.from_transport(transport) + + remote_years = sftp.listdir(remote_dir) + + tasks = [] + + for year in range(1980, 2023): + str_year = str(year) + if str_year in remote_years: + remote_year_dir = os.path.join(remote_dir, str_year) + local_year_dir = os.path.join(local_dir, str_year) + + if not os.path.exists(local_year_dir): + os.makedirs(local_year_dir) + + remote_files = sftp.listdir(remote_year_dir) + + for file in remote_files: + remote_file = os.path.join(remote_year_dir, file) + local_file = os.path.join(local_year_dir, file) + + tasks.append((host, port, username, password, remote_file, local_file)) + + sftp.close() + transport.close() + + # Server Restriction for 8 simultaneously downloads (?) + with Pool(8) as pool: + with tqdm(total=len(tasks), desc="Downloading files") as pbar: + for _ in pool.imap_unordered(download_file, tasks): + pbar.update(1) + +def main(): + print("Please enter the credentials you got per mail from GLEAM") + host = input("Enter the host (without 'sftp://'): ") + port = int(input("Enter the port: ")) + username = input("Enter your username: ") + password = input("Enter your password: ") + remote_dir = "./data/v3.7a/daily" + local_dir = "~/data/GLEAM/source" + + local_dir = os.path.expanduser(local_dir) + + if not os.path.exists(local_dir): + os.makedirs(local_dir) + + download_files(host, port, username, password, remote_dir, local_dir) + +if __name__ == '__main__': + main() diff --git a/ESDC/inputs-collect/download-RTSIF.py b/ESDC/inputs-collect/download-RTSIF.py new file mode 100644 index 0000000..39dce92 --- /dev/null +++ b/ESDC/inputs-collect/download-RTSIF.py @@ -0,0 +1,50 @@ +import requests +import os +import patoolib +from tqdm import tqdm + +### There is no native package for extracting .rar files +### 7-Zip or WinRAR are needed for extracting .rar files + +url = "https://figshare.com/ndownloader/articles/19336346/versions/3" +filename = "cache.zip" +extract_path = "~/data/SIF/RTSIF/source" + +zip_filename = os.path.join(extract_path, filename) + +if not os.path.exists(extract_path): + os.makedirs(extract_path) + print("Extract path created.") + +response = requests.get(url, stream=True) +total_size = int(response.headers.get('content-length', 0)) + +with open(zip_filename, 'wb') as file, tqdm( + desc=filename, + total=total_size, + unit='iB', + unit_scale=True, + unit_divisor=1024, +) as progress_bar: + for data in response.iter_content(chunk_size=1024): + size = file.write(data) + progress_bar.update(size) + +print("File downloaded successfully.") + +patoolib.extract_archive(zip_filename, outdir=extract_path) +print("Extraction completed.") + +for filename in os.listdir(extract_path): + if filename.endswith(".rar"): + rar_file = os.path.join(extract_path, filename) + patoolib.extract_archive(rar_file, outdir=extract_path) + print(f"Extracted {filename} to {extract_path}") + +for file in os.listdir(extract_path): + file_path = os.path.join(extract_path, file) + if file.endswith((".rar", ".zip")): + os.remove(file_path) + print(f"Deleted {file}") + +print("All .rar and .zip files deleted.") diff --git a/ESDC/inputs-collect/download-cci-sm.py b/ESDC/inputs-collect/download-cci-sm.py index 23f9354..2018203 100644 --- a/ESDC/inputs-collect/download-cci-sm.py +++ b/ESDC/inputs-collect/download-cci-sm.py @@ -45,9 +45,9 @@ def download_file(url): else: print(f"File {filename} already exists!") -years = np.arange(1979,2021) +years = np.arange(1979,2022) for year in tqdm(years): - urls = get_url_paths(f"https://dap.ceda.ac.uk/neodc/esacci/soil_moisture/data/daily_files/COMBINED/v06.1/{year}/","nc") + urls = get_url_paths(f"https://dap.ceda.ac.uk/neodc/esacci/soil_moisture/data/daily_files/COMBINED/v07.1/{year}/","nc") for url in urls: download_file(url) diff --git a/ESDC/inputs-preprocess/GLEAM/gleam-data-cube.py b/ESDC/inputs-preprocess/GLEAM/gleam-data-cube.py index 4a4fa2e..7394648 100644 --- a/ESDC/inputs-preprocess/GLEAM/gleam-data-cube.py +++ b/ESDC/inputs-preprocess/GLEAM/gleam-data-cube.py @@ -10,7 +10,7 @@ # if not os.path.exists(pathOut): # os.mkdir(pathOut) -pathIn = "path-to-GLEAM-folder" +pathIn = "~/data/GLEAM/source" pathOut = "~/data/GLEAM/preprocess" pathOut = os.path.expanduser(pathOut) @@ -24,7 +24,7 @@ for year in tqdm(years): - files = glob.glob(f"{pathIn}/data/v3.6a/daily/{year}/*.nc") + files = glob.glob(f"{pathIn}/v3.7a/daily/{year}/*.nc") files.sort() datasets = [xr.open_dataset(file,chunks = {'time':512,'lat':128,'lon':128}) for file in files] diff --git a/black-sea/inputs-preprocess/black-sea-zarr2s3.py b/black-sea/inputs-preprocess/black-sea-zarr2s3.py new file mode 100644 index 0000000..c86d731 --- /dev/null +++ b/black-sea/inputs-preprocess/black-sea-zarr2s3.py @@ -0,0 +1,93 @@ +from tqdm import tqdm +from datetime import datetime +import yaml +import xarray as xr +import numpy as np + +from xcube.core.store import find_data_store_extensions +from xcube.core.store import get_data_store_params_schema +from xcube.core.store import new_data_store + +with tqdm(total=80) as pbar: + + FINAL_CHUNKS = dict(time=256, lat=256, lon=256) + + store = new_data_store("s3", root="deep-esdl-input") + + def sort_and_cast_by_time(ds): + ds = ds.sortby(ds.time) + ds["time"] = ds.time.astype("datetime64[D]") + return ds + + pbar.update(10) + pbar.set_description("Processing reference dataset (Chl)") + + chl = store.open_data("EO4SIBS-black-sea-chl.zarr") + chl = sort_and_cast_by_time(chl) + chl = chl * 0.01 + chl_spatial = dict(lat=chl.lat, lon=chl.lon) + chl = chl.chunk(FINAL_CHUNKS) + + pbar.update(10) + pbar.set_description("Processing additional dataset (wave height)") + + bswh = store.open_data("CMEMS-black-sea-wave-height.zarr") + bswh_time_aggregated = bswh.sortby(bswh.time).resample(time="1D").mean() + bswh_time_aggregated = bswh_time_aggregated.sel( + time=slice("2016-01-01", "2017-12-31") + ) + bswh_spatial_resampled = bswh_time_aggregated.interp( + coords=chl_spatial, method="nearest" + ) + bswh_time_aggregated = bswh_time_aggregated.chunk(FINAL_CHUNKS) + + pbar.update(10) + pbar.set_description("Processing additional dataset (altimetry)") + + bsa = store.open_data("EO4SIBS-black-sea-altimetry.zarr") + bsa_time_sorted = sort_and_cast_by_time(bsa) + bsa_spatial_resampled = bsa_time_sorted.interp(coords=chl_spatial, method="nearest") + bsa_spatial_resampled = bsa_spatial_resampled.chunk(FINAL_CHUNKS) + + pbar.update(10) + pbar.set_description("Processing additional dataset (SSS)") + + sss = store.open_data("EO4SIBS-black-sea-sss.zarr") + sss_time_sorted = sort_and_cast_by_time(sss).drop_duplicates(dim="time") + sss_spatial_resampled = sss_time_sorted.interp(coords=chl_spatial, method="nearest") + sss_spatial_resampled = sss_spatial_resampled.chunk(FINAL_CHUNKS) + + pbar.update(10) + pbar.set_description("Processing additional dataset (SST)") + + sst = store.open_data("CMEMS-black-sea-sst.zarr") + sst_time_sorted = sort_and_cast_by_time(sst) + sst_spatial_resampled = sst_time_sorted.interp(coords=chl_spatial, method="nearest") + sst_spatial_resampled = sst_spatial_resampled.chunk(FINAL_CHUNKS) + + pbar.update(20) + pbar.set_description("Writing dataset's") + + store.write_data( + chl, "EO4SIBS-black-sea-chl-256x256x256.zarr", replace=True + ) + + store.write_data( + bswh_time_aggregated, "CMEMS-black-sea-wave-height-256x256x256.zarr", replace=True + ) + + store.write_data( + bsa_spatial_resampled, "EO4SIBS-black-sea-altimetry-256x256x256.zarr", replace=True + ) + + store.write_data( + sss_spatial_resampled, "EO4SIBS-black-sea-sss-256x256x256.zarr", replace=True + ) + + store.write_data( + sst_spatial_resampled, "CMEMS-black-sea-sst-256x256x256.zarr", replace=True + ) + + pbar.update(10) + pbar.set_description("Done") + \ No newline at end of file diff --git a/black-sea/output-merge/black-sea-datacube-merge.py b/black-sea/output-merge/black-sea-datacube-merge.py new file mode 100644 index 0000000..08c3939 --- /dev/null +++ b/black-sea/output-merge/black-sea-datacube-merge.py @@ -0,0 +1,87 @@ +from tqdm import tqdm +from datetime import datetime +import yaml +import rioxarray +import xarray as xr +import numpy as np + +from xcube.core.store import find_data_store_extensions +from xcube.core.store import get_data_store_params_schema +from xcube.core.store import new_data_store + + +with tqdm(total=80) as pbar: + + with open("../output-postprocess/black-sea-metadata.yaml", "r") as stream: + try: + metadata = yaml.safe_load(stream) + except yaml.YAMLError as exc: + print(exc) + + store = new_data_store("s3", root="deep-esdl-input") + store_output = new_data_store("s3", root="deep-esdl-output") + + pbar.update(10) + pbar.set_description("Loading datasets") + + chl = store.open_data("EO4SIBS-black-sea-chl-256x256x256.zarr") + bswh = store.open_data("CMEMS-black-sea-wave-height-256x256x256.zarr") + bsa = store.open_data("EO4SIBS-black-sea-altimetry-256x256x256.zarr") + sss = store.open_data("EO4SIBS-black-sea-sss-256x256x256.zarr") + sst = store.open_data("CMEMS-black-sea-sst-256x256x256.zarr") + + pbar.update(10) + pbar.set_description("Merging datasets") + + black_sea_datacube = xr.merge( + [ + chl, + bswh, + bsa, + sss, + sst, + ] + ) + + pbar.update(10) + pbar.set_description("Adding attributes") + + black_sea_datacube = black_sea_datacube.rio.write_crs( + "epsg:4326", grid_mapping_name="crs" + ).reset_coords() + del black_sea_datacube.crs.attrs["spatial_ref"] + + black_sea_datacube.attrs = metadata["global"] + + for variable, attributes in metadata["local"].items(): + black_sea_datacube[variable].attrs = dict(sorted(attributes.items())) + + additional_attrs = { + "date_modified": str(datetime.now()), + "geospatial_lat_max": float(black_sea_datacube.lat.max().values), + "geospatial_lat_min": float(black_sea_datacube.lat.min().values), + "geospatial_lat_resolution": float( + black_sea_datacube.lat[1] - black_sea_datacube.lat[0] + ), + "geospatial_lon_max": float(black_sea_datacube.lon.max().values), + "geospatial_lon_min": float(black_sea_datacube.lon.min().values), + "geospatial_lon_resolution": float( + black_sea_datacube.lon[1] - black_sea_datacube.lon[0] + ), + "time_coverage_start": str(black_sea_datacube.time[0].values), + "time_coverage_end": str(black_sea_datacube.time[-1].values), + } + + black_sea_datacube.attrs = dict( + sorted({**black_sea_datacube.attrs, **additional_attrs}.items()) + ) + + pbar.update(20) + pbar.set_description("Writing dataset") + + store_output.write_data( + black_sea_datacube, f'{black_sea_datacube.attrs["id"]}.zarr', replace=True + ) + + pbar.update(10) + pbar.set_description("Done") \ No newline at end of file