diff --git a/Data_Processing_NSMv2.0/.ipynb_checkpoints/DataProcessing_Pipeline-checkpoint.ipynb b/Data_Processing_NSMv2.0/.ipynb_checkpoints/DataProcessing_Pipeline-checkpoint.ipynb new file mode 100644 index 0000000..b803420 --- /dev/null +++ b/Data_Processing_NSMv2.0/.ipynb_checkpoints/DataProcessing_Pipeline-checkpoint.ipynb @@ -0,0 +1,887 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "9582b19b-a266-4a82-9f9d-7d5ad1e3a391", + "metadata": {}, + "outputs": [], + "source": [ + "# Import packages\n", + "# Dataframe Packages\n", + "import numpy as np\n", + "import xarray as xr\n", + "import pandas as pd\n", + "\n", + "# Vector Packages\n", + "import geopandas as gpd\n", + "import shapely\n", + "from shapely import wkt\n", + "from shapely.geometry import Point, Polygon\n", + "from pyproj import CRS, Transformer\n", + "\n", + "# Raster Packages\n", + "import rioxarray as rxr\n", + "import rasterio\n", + "from rasterio.mask import mask\n", + "from rioxarray.merge import merge_arrays\n", + "import rasterstats as rs\n", + "import gdal\n", + "from gdal import gdalconst\n", + "\n", + "# Data Access Packages\n", + "import earthaccess as ea\n", + "import h5py\n", + "import pickle\n", + "from tensorflow.keras.models import load_model\n", + "from pystac_client import Client\n", + "import richdem as rd\n", + "import planetary_computer\n", + "from planetary_computer import sign\n", + "\n", + "# General Packages\n", + "import os\n", + "import re\n", + "import shutil\n", + "import math\n", + "from datetime import datetime\n", + "import glob\n", + "from pprint import pprint\n", + "from typing import Union\n", + "from pathlib import Path\n", + "from tqdm import tqdm\n", + "import time\n", + "import requests\n", + "from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed\n", + "import dask\n", + "import dask.dataframe as dd\n", + "from dask.distributed import progress\n", + "from dask.distributed import Client\n", + "from dask.diagnostics import ProgressBar\n", + "from retrying import retry\n", + "import fiona\n", + "import re\n", + "import s3fs" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7ab6edbc-c701-42b3-89c9-4649ac4dccba", + "metadata": {}, + "outputs": [], + "source": [ + "import NSIDC_Data\n", + "\n", + "class ASODataTool:\n", + " def __init__(self, short_name, version, polygon='', filename_filter=''):\n", + " self.short_name = short_name\n", + " self.version = version\n", + " self.polygon = polygon\n", + " self.filename_filter = filename_filter\n", + " self.url_list = []\n", + " self.CMR_URL = 'https://cmr.earthdata.nasa.gov'\n", + " self.CMR_PAGE_SIZE = 2000\n", + " self.CMR_FILE_URL = ('{0}/search/granules.json?provider=NSIDC_ECS'\n", + " '&sort_key[]=start_date&sort_key[]=producer_granule_id'\n", + " '&scroll=true&page_size={1}'.format(self.CMR_URL, self.CMR_PAGE_SIZE))\n", + "\n", + " def cmr_search(self, time_start, time_end, bounding_box):\n", + " try:\n", + " if not self.url_list:\n", + " self.url_list = NSIDC_Data.cmr_search(\n", + " self.short_name, self.version, time_start, time_end,\n", + " bounding_box=self.bounding_box, polygon=self.polygon,\n", + " filename_filter=self.filename_filter, quiet=False)\n", + " return self.url_list\n", + " except KeyboardInterrupt:\n", + " quit()\n", + "\n", + " def cmr_download(self, directory):\n", + " if not os.path.exists(directory):\n", + " os.makedirs(directory, exist_ok=True)\n", + "\n", + " NSIDC_Data.cmr_download(self.url_list, directory, False)\n", + "\n", + " @staticmethod\n", + " def get_bounding_box(region):\n", + " regions = pd.read_pickle(\"C:\\\\Users\\\\VISH NU\\\\National_snow_model\\\\National-Snow-Model\\\\Data\\\\Processed\\\\RegionVal.pkl\")\n", + " superset = []\n", + "\n", + " superset.append(regions[region])\n", + " superset = pd.concat(superset)\n", + " superset = gpd.GeoDataFrame(superset, geometry=gpd.points_from_xy(superset.Long, superset.Lat, crs=\"EPSG:4326\"))\n", + " bounding_box = list(superset.total_bounds)\n", + "\n", + " return f\"{bounding_box[0]},{bounding_box[1]},{bounding_box[2]},{bounding_box[3]}\"\n", + "\n", + "class ASODownload(ASODataTool):\n", + " def __init__(self, short_name, version, polygon='', filename_filter=''):\n", + " super().__init__(short_name, version, polygon, filename_filter)\n", + " self.region_list = [ 'N_Sierras',\n", + " 'S_Sierras',\n", + " 'Greater_Yellowstone',\n", + " 'N_Co_Rockies',\n", + " 'SW_Mont',\n", + " 'SW_Co_Rockies',\n", + " 'GBasin',\n", + " 'N_Wasatch',\n", + " 'N_Cascade',\n", + " 'S_Wasatch',\n", + " 'SW_Mtns',\n", + " 'E_WA_N_Id_W_Mont',\n", + " 'S_Wyoming',\n", + " 'SE_Co_Rockies',\n", + " 'Sawtooth',\n", + " 'Ca_Coast',\n", + " 'E_Or',\n", + " 'N_Yellowstone',\n", + " 'S_Cascade',\n", + " 'Wa_Coast',\n", + " 'Greater_Glacier',\n", + " 'Or_Coast' ]\n", + "\n", + " def select_region(self):\n", + " print(\"Select a region by entering its index:\")\n", + " for i, region in enumerate(self.region_list, start=1):\n", + " print(f\"{i}. {region}\")\n", + "\n", + " try:\n", + " user_input = int(input(\"Enter the index of the region: \"))\n", + " if 1 <= user_input <= len(self.region_list):\n", + " selected_region = self.region_list[user_input - 1]\n", + " self.bounding_box = self.get_bounding_box(selected_region)\n", + " print(f\"You selected: {selected_region}\")\n", + " print(f\"Bounding Box: {self.bounding_box}\")\n", + " else:\n", + " print(\"Invalid index. Please select a valid index.\")\n", + " except ValueError:\n", + " print(\"Invalid input. Please enter a valid index.\")\n", + " \n", + "if __name__ == \"__main__\":\n", + " short_name = 'ASO_50M_SWE'\n", + " version = '1'\n", + "\n", + " data_tool = ASODownload(short_name, version)\n", + " time_start = '2013-04-02T00:00:00Z'\n", + " time_end = '2019-07-19T23:59:59Z'\n", + " \n", + " selected_region = data_tool.select_region() # Call select_region on the instance\n", + " directory = \"SWE_Data\"\n", + "\n", + " print(f\"Fetching file URLs in progress for {selected_region} from {time_start} to {time_end}\")\n", + " url_list = data_tool.cmr_search(time_start, time_end, data_tool.bounding_box)\n", + " data_tool.cmr_download(directory)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "968b8ea0-b7c7-4c08-8cf7-69398a1493f4", + "metadata": {}, + "outputs": [], + "source": [ + "class ASODataProcessing:\n", + " \n", + " @staticmethod\n", + " def processing_tiff(input_file, output_res):\n", + " try:\n", + " date = os.path.splitext(input_file)[0].split(\"_\")[-1]\n", + " \n", + " # Define the output file path\n", + " output_folder = os.path.join(os.getcwd(), \"Processed_Data\")\n", + " os.makedirs(output_folder, exist_ok=True)\n", + " output_file = os.path.join(output_folder, f\"ASO_100M_{date}.tif\")\n", + " \n", + " ds = gdal.Open(input_file)\n", + " if ds is None:\n", + " print(f\"Failed to open '{input_file}'. Make sure the file is a valid GeoTIFF file.\")\n", + " return None\n", + " \n", + " # Reproject and resample\n", + " gdal.Warp(output_file, ds, dstSRS=\"EPSG:4326\", xRes=output_res, yRes=-output_res, resampleAlg=\"bilinear\")\n", + " \n", + " # Read the processed TIFF file using rasterio\n", + " rds = rxr.open_rasterio(output_file)\n", + " rds = rds.squeeze().drop(\"spatial_ref\").drop(\"band\")\n", + " rds.name = \"data\"\n", + " df = rds.to_dataframe().reset_index()\n", + " return df\n", + " \n", + " except Exception as e:\n", + " print(f\"An error occurred: {str(e)}\")\n", + " return None\n", + " \n", + " @staticmethod\n", + " def convert_tiff_to_csv(input_folder, output_res):\n", + "\n", + " curr_dir = os.getcwd()\n", + " folder_path = os.path.join(curr_dir, input_folder)\n", + " \n", + " # Check if the folder exists and is not empty\n", + " if not os.path.exists(folder_path) or not os.path.isdir(folder_path):\n", + " print(f\"The folder '{input_folder}' does not exist.\")\n", + " return\n", + " \n", + " if not os.listdir(folder_path):\n", + " print(f\"The folder '{input_folder}' is empty.\")\n", + " return\n", + " \n", + " tiff_files = [filename for filename in os.listdir(folder_path) if filename.endswith(\".tif\")]\n", + " \n", + " # Create CSV files from TIFF files\n", + " for tiff_filename in tiff_files:\n", + " \n", + " # Open the TIFF file\n", + " tiff_filepath = os.path.join(folder_path, tiff_filename)\n", + " df = ASODataProcessing.processing_tiff(tiff_filepath, output_res)\n", + " \n", + " if df is not None:\n", + " # Get the date from the TIFF filename\n", + " date = os.path.splitext(tiff_filename)[0].split(\"_\")[-1]\n", + " \n", + " # Define the CSV filename and folder\n", + " csv_filename = f\"ASO_SWE_{date}.csv\"\n", + " csv_folder = os.path.join(curr_dir, \"Processed_Data\", \"SWE_csv\")\n", + " os.makedirs(csv_folder, exist_ok=True)\n", + " csv_filepath = os.path.join(csv_folder, csv_filename)\n", + " \n", + " # Save the DataFrame as a CSV file\n", + " df.to_csv(csv_filepath, index=False)\n", + " \n", + " print(f\"Converted '{tiff_filename}' to '{csv_filename}'\")\n", + " \n", + " def create_polygon(self, row):\n", + " return Polygon([(row['BL_Coord_Long'], row['BL_Coord_Lat']),\n", + " (row['BR_Coord_Long'], row['BR_Coord_Lat']),\n", + " (row['UR_Coord_Long'], row['UR_Coord_Lat']),\n", + " (row['UL_Coord_Long'], row['UL_Coord_Lat'])])\n", + "\n", + " def process_folder(self, input_folder, metadata_path, output_folder):\n", + " # Import the metadata into a pandas DataFrame\n", + " pred_obs_metadata_df = pd.read_csv(metadata_path)\n", + " \n", + " # Assuming create_polygon is defined elsewhere, we add a column with polygon geometries\n", + " pred_obs_metadata_df = pred_obs_metadata_df.drop(columns=['Unnamed: 0'], axis=1)\n", + " pred_obs_metadata_df['geometry'] = pred_obs_metadata_df.apply(self.create_polygon, axis=1)\n", + " \n", + " # Convert the DataFrame to a GeoDataFrame\n", + " metadata = gpd.GeoDataFrame(pred_obs_metadata_df, geometry='geometry')\n", + " \n", + " # Drop coordinates columns\n", + " metadata_df = metadata.drop(columns=['BL_Coord_Long', 'BL_Coord_Lat', \n", + " 'BR_Coord_Long', 'BR_Coord_Lat', \n", + " 'UR_Coord_Long', 'UR_Coord_Lat', \n", + " 'UL_Coord_Long', 'UL_Coord_Lat'], axis=1)\n", + " \n", + " # List all CSV files in the input folder\n", + " csv_files = [f for f in os.listdir(input_folder) if f.endswith('.csv')]\n", + " \n", + " for csv_file in csv_files:\n", + " input_aso_path = os.path.join(input_folder, csv_file)\n", + " output_aso_path = os.path.join(output_folder, csv_file)\n", + " \n", + " # Check if the output file already exists\n", + " if os.path.exists(output_aso_path):\n", + " print(f\"CSV file {csv_file} already exists in the output folder.\")\n", + " continue\n", + " \n", + " # Process each CSV file\n", + " aso_swe_df = pd.read_csv(input_aso_path)\n", + " \n", + " # Convert the \"aso_swe_df\" into a GeoDataFrame with point geometries\n", + " geometry = [Point(xy) for xy in zip(aso_swe_df['x'], aso_swe_df['y'])]\n", + " aso_swe_geo = gpd.GeoDataFrame(aso_swe_df, geometry=geometry)\n", + "\n", + " result = gpd.sjoin(aso_swe_geo, metadata_df, how='left', predicate='within', op = 'intersects')\n", + " \n", + " # Select specific columns for the final DataFrame\n", + " Final_df = result[['y', 'x', 'data', 'cell_id']]\n", + " Final_df.rename(columns={'data': 'swe'}, inplace=True)\n", + " \n", + " # Drop rows where 'cell_id' is NaN\n", + " if Final_df['cell_id'].isnull().values.any():\n", + " Final_df = Final_df.dropna(subset=['cell_id'])\n", + " \n", + " # Save the processed DataFrame to a CSV file\n", + " Final_df.to_csv(output_aso_path, index=False)\n", + " print(f\"Processed {csv_file}\")\n", + " \n", + " def converting_ASO_to_standardized_format(self, input_folder, output_csv):\n", + " \n", + " # Initialize an empty DataFrame to store the final transformed data\n", + " final_df = pd.DataFrame()\n", + " \n", + " # Iterate through all CSV files in the directory\n", + " for filename in os.listdir(input_folder):\n", + " if filename.endswith(\".csv\"):\n", + " file_path = os.path.join(input_folder, filename)\n", + " \n", + " # Extract the time frame from the filename\n", + " time_frame = filename.split('_')[-1].split('.')[0]\n", + " \n", + " # Read the CSV file into a DataFrame\n", + " df = pd.read_csv(file_path)\n", + " \n", + " # Rename the 'SWE' column to the time frame for clarity\n", + " df = df.rename(columns={'SWE': time_frame})\n", + " \n", + " # Merge or concatenate the data into the final DataFrame\n", + " if final_df.empty:\n", + " final_df = df\n", + " else:\n", + " final_df = pd.merge(final_df, df, on='cell_id', how='outer')\n", + " \n", + " # Save the final transformed DataFrame to a single CSV file\n", + " final_df.to_csv(output_csv, index=False)\n", + " \n", + "if __name__ == \"__main__\":\n", + " \n", + " #data_processor = ASODataProcessing()\n", + " #folder_name = \"SWE_Data\"\n", + " #output_res = 0.001\n", + " data_processor.convert_tiff_to_csv(folder_name, output_res)\n", + " input_folder = r\"/home/vgindi/Processed_Data/SWE_csv\"\n", + " metadata_path = r\"/home/vgindi/Provided_Data/grid_cells_meta.csv\"\n", + " output_folder = r\"/home/vgindi/Processed_SWE\"\n", + "\n", + " data_processor.process_folder(input_folder, metadata_path, output_folder)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f4c91c7a-d327-4dbf-8669-1e667c0e4428", + "metadata": {}, + "outputs": [], + "source": [ + "def load_aso_snotel_geometry(aso_swe_file, folder_path):\n", + " \n", + " aso_file = pd.read_csv(os.path.join(folder_path, aso_swe_file))\n", + " aso_file.set_index('cell_id', inplace=True)\n", + " aso_geometry = [Point(xy) for xy in zip(aso_file['x'], aso_file['y'])]\n", + " aso_gdf = gpd.GeoDataFrame(aso_file, geometry=aso_geometry)\n", + " \n", + " return aso_gdf\n", + "\n", + "def haversine_vectorized(lat1, lon1, lat2, lon2):\n", + " \n", + " lon1 = np.radians(lon1)\n", + " lon2 = np.radians(lon2)\n", + " lat1 = np.radians(lat1)\n", + " lat2 = np.radians(lat2)\n", + "\n", + " # Haversine formula\n", + " dlat = lat2 - lat1\n", + " dlon = lon2 - lon1\n", + " a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2\n", + " c = 2 * np.arcsin(np.sqrt(a))\n", + " \n", + " r = 6371.0\n", + " # Distance calculation\n", + " distances = r * c\n", + "\n", + " return distances\n", + "\n", + "def calculate_nearest_snotel(aso_gdf, snotel_gdf, n=6, distance_cache=None):\n", + "\n", + " if distance_cache is None:\n", + " distance_cache = {}\n", + "\n", + " nearest_snotel = {}\n", + " for idx, aso_row in aso_gdf.iterrows():\n", + " cell_id = idx\n", + "\n", + " # Check if distances for this cell_id are already calculated and cached\n", + " if cell_id in distance_cache:\n", + " nearest_snotel[idx] = distance_cache[cell_id]\n", + " else:\n", + " # Calculate Haversine distances between the grid cell and all SNOTEL locations\n", + " distances = haversine_vectorized(\n", + " aso_row.geometry.y, aso_row.geometry.x,\n", + " snotel_gdf.geometry.y.values, snotel_gdf.geometry.x.values)\n", + "\n", + " # Store the nearest stations in the cache\n", + " nearest_snotel[idx] = list(snotel_gdf['station_id'].iloc[distances.argsort()[:n]])\n", + " distance_cache[cell_id] = nearest_snotel[idx]\n", + "\n", + " return nearest_snotel, distance_cache\n", + "\n", + "def calculate_distances_for_cell(aso_row, snotel_gdf, n=6):\n", + " \n", + " distances = haversine_vectorized(\n", + " aso_row.geometry.y, aso_row.geometry.x,\n", + " snotel_gdf.geometry.y.values, snotel_gdf.geometry.x.values)\n", + " \n", + " nearest_sites = list(snotel_gdf['station_id'].iloc[distances.argsort()[:n]])\n", + " \n", + " return nearest_sites\n", + "\n", + "def calculate_nearest_snotel_parallel(aso_gdf, snotel_gdf, n = 6, distance_cache = None):\n", + " \n", + " if distance_cache is None:\n", + " distance_cache = {}\n", + "\n", + " nearest_snotel = {}\n", + " with ProcessPoolExecutor(max_workers = 16) as executor:\n", + " futures = []\n", + " \n", + " for idx, aso_row in aso_gdf.iterrows():\n", + " if idx not in distance_cache:\n", + " # Submit the task for parallel execution\n", + " futures.append(executor.submit(calculate_distances_for_cell, aso_row, snotel_gdf, n))\n", + " else:\n", + " nearest_snotel[idx] = distance_cache[idx]\n", + "\n", + " # Retrieve results as they are completed\n", + " for future in tqdm(futures):\n", + " result = future.result()\n", + " \n", + " cell_id = result[0] \n", + " nearest_snotel[cell_id] = result[1]\n", + " distance_cache[cell_id] = result[1]\n", + "\n", + " return nearest_snotel, distance_cache\n", + "\n", + "def fetch_snotel_sites_for_cellids(aso_swe_files_folder_path, metadata_path, snotel_data_path):\n", + " \n", + " metadata_df = pd.read_csv(metadata_path)\n", + " #metadata_df['geometry'] = metadata_df['geometry'].apply(wkt.loads)\n", + " \n", + " def create_polygon(row):\n", + " return Polygon([(row['BL_Coord_Long'], row['BL_Coord_Lat']),\n", + " (row['BR_Coord_Long'], row['BR_Coord_Lat']),\n", + " (row['UR_Coord_Long'], row['UR_Coord_Lat']),\n", + " (row['UL_Coord_Long'], row['UL_Coord_Lat'])])\n", + " \n", + " metadata_df = metadata_df.drop(columns=['Unnamed: 0'], axis=1)\n", + " metadata_df['geometry'] = metadata_df.apply(create_polygon, axis=1)\n", + " \n", + " metadata = gpd.GeoDataFrame(metadata_df, geometry='geometry')\n", + " snotel_data = pd.read_csv(snotel_data_path)\n", + "\n", + " date_columns = snotel_data.columns[1:]\n", + " new_column_names = {col: pd.to_datetime(col, format='%Y-%m-%d').strftime('%Y%m%d') for col in date_columns}\n", + " snotel_data_f = snotel_data.rename(columns=new_column_names)\n", + "\n", + " snotel_file = pd.read_csv(\"/home/vgindi/Provided_Data/ground_measures_metadata.csv\")\n", + " snotel_geometry = [Point(xy) for xy in zip(snotel_file['longitude'], snotel_file['latitude'])]\n", + " snotel_gdf = gpd.GeoDataFrame(snotel_file, geometry=snotel_geometry)\n", + "\n", + " final_df = pd.DataFrame()\n", + "\n", + " for aso_swe_file in os.listdir(aso_swe_files_folder_path):\n", + "\n", + " if os.path.isdir(os.path.join(aso_swe_files_folder_path, aso_swe_file)):\n", + " continue\n", + "\n", + " timestamp = aso_swe_file.split('_')[-1].split('.')[0]\n", + " print(f\"Processing file with timestamp: {timestamp}\")\n", + "\n", + " aso_gdf = load_aso_snotel_geometry(aso_swe_file, aso_swe_files_folder_path)\n", + " aso_swe_data = pd.read_csv(os.path.join(aso_swe_files_folder_path, aso_swe_file))\n", + "\n", + " # Calculating nearest SNOTEL sites\n", + " nearest_snotel, distance_cache = calculate_nearest_snotel(aso_gdf, snotel_gdf, n=6)\n", + " print(f\"calculated nearest snotel for file with timestamp {timestamp}\")\n", + "\n", + " transposed_data = {}\n", + "\n", + " if timestamp in new_column_names.values():\n", + " for idx, aso_row in aso_gdf.iterrows(): \n", + " cell_id = idx\n", + " station_ids = nearest_snotel[cell_id]\n", + " selected_snotel_data = snotel_data_f[['station_id', timestamp]].loc[snotel_data_f['station_id'].isin(station_ids)]\n", + " station_mapping = {old_id: f\"nearest site {i+1}\" for i, old_id in enumerate(station_ids)}\n", + " \n", + " # Rename the station IDs in the selected SNOTEL data\n", + " selected_snotel_data['station_id'] = selected_snotel_data['station_id'].map(station_mapping)\n", + "\n", + " # Transpose and set the index correctly\n", + " transposed_data[cell_id] = selected_snotel_data.set_index('station_id').T\n", + "\n", + " transposed_df = pd.concat(transposed_data, axis=0)\n", + " \n", + " # Reset index and rename columns\n", + " transposed_df = transposed_df.reset_index()\n", + " transposed_df.rename(columns={'level_0': 'cell_id', 'level_1': 'Date'}, inplace = True)\n", + " transposed_df['Date'] = pd.to_datetime(transposed_df['Date'])\n", + " \n", + " aso_swe_data['Date'] = pd.to_datetime(timestamp)\n", + " aso_swe_data = aso_swe_data[['cell_id', 'Date', 'swe']]\n", + " merged_df = pd.merge(aso_swe_data, transposed_df, how='left', on=['cell_id', 'Date'])\n", + " \n", + " final_df = pd.concat([final_df, merged_df], ignore_index=True)\n", + " \n", + " else:\n", + " aso_swe_data['Date'] = pd.to_datetime(timestamp)\n", + " aso_swe_data = aso_swe_data[['cell_id', 'Date', 'swe']]\n", + " \n", + " # No need to merge in this case, directly concatenate\n", + " final_df = pd.concat([final_df, aso_swe_data], ignore_index=True)\n", + "\n", + "\n", + " # Merge with metadata\n", + " req_cols = ['cell_id', 'lat', 'lon', 'BR_Coord_Long', 'BR_Coord_Lat', 'UR_Coord_Long', 'UR_Coord_Lat',\n", + " 'UL_Coord_Long', 'UL_Coord_Lat', 'BL_Coord_Long', 'BL_Coord_Lat', 'geometry']\n", + " Result = final_df.merge(metadata[req_cols], how='left', on='cell_id')\n", + "\n", + " # Column renaming and ordering\n", + " Result.rename(columns={'swe': 'ASO_SWE_in'}, inplace=True)\n", + " Result = Result[['cell_id', 'Date', 'ASO_SWE_in', 'lat', 'lon', 'nearest site 1', 'nearest site 2',\n", + " 'nearest site 3', 'nearest site 4', 'nearest site 5', 'nearest site 6',\n", + " 'BR_Coord_Long', 'BR_Coord_Lat', 'UR_Coord_Long', 'UR_Coord_Lat',\n", + " 'UL_Coord_Long', 'UL_Coord_Lat', 'BL_Coord_Long', 'BL_Coord_Lat']]\n", + "\n", + " # Save the merged data to a new file\n", + " output_filename = r\"/home/vgindi/Provided_Data/Merged_aso_snotel_data.csv\"\n", + " Result.to_csv(output_filename, index=False)\n", + " print(\"Processed and saved data\")\n", + " \n", + "def main():\n", + " aso_swe_files_folder_path = r\"/home/vgindi/Processed_SWE\"\n", + " metadata_path = r\"/home/vgindi/Provided_Data/grid_cells_meta_idx.csv\"\n", + " snotel_data_path = r\"/home/vgindi/Provided_Data/ground_measures_train_featuresALLDATES.parquet\"\n", + " fetch_snotel_sites_for_cellids(aso_swe_files_folder_path, metadata_path, snotel_data_path)\n", + "\n", + "if __name__ == \"__main__\":\n", + " main()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a6c69770-62f8-4f8f-88ee-ffaac26aaa44", + "metadata": {}, + "outputs": [], + "source": [ + "Result = pd.read_csv(r'/home/vgindi/Provided_Data/Merged_aso_snotel_data.csv')\n", + "Result.head(10)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "29cb0f3a-7713-45d2-881f-574037b3a5fd", + "metadata": {}, + "outputs": [], + "source": [ + "\"\"\"\n", + "A Simple implementation of parallel processing using concurrency it takes so long to execute,\n", + "Explore terrain_daskconcurrency and terrain-processing_cluster python for more optimized implementations.\n", + "\"\"\"\n", + "\n", + "def process_single_location(args):\n", + " lat, lon, regions, tiles = args\n", + "\n", + " if (lat, lon) in elevation_cache:\n", + " elev, slop, asp = elevation_cache[(lat, lon)]\n", + " return elev, slop, asp\n", + "\n", + " tile_id = 'Copernicus_DSM_COG_30_N' + str(math.floor(lon)) + '_00_W' + str(math.ceil(abs(lat))) + '_00_DEM'\n", + " index_id = regions.loc[tile_id]['sliceID']\n", + "\n", + " signed_asset = planetary_computer.sign(tiles[index_id].assets[\"data\"])\n", + " #print(signed_asset)\n", + " elevation = rxr.open_rasterio(signed_asset.href)\n", + " \n", + " slope = elevation.copy()\n", + " aspect = elevation.copy()\n", + "\n", + " transformer = Transformer.from_crs(\"EPSG:4326\", elevation.rio.crs, always_xy=True)\n", + " xx, yy = transformer.transform(lon, lat)\n", + "\n", + " tilearray = np.around(elevation.values[0]).astype(int)\n", + " #print(tilearray)\n", + " geo = (math.floor(float(lon)), 90, 0.0, math.ceil(float(lat)), 0.0, -90)\n", + "\n", + " no_data_value = -9999\n", + " driver = gdal.GetDriverByName('MEM')\n", + " temp_ds = driver.Create('', tilearray.shape[1], tilearray.shape[0], 1, gdalconst.GDT_Float32)\n", + "\n", + " temp_ds.GetRasterBand(1).WriteArray(tilearray)\n", + " temp_ds.GetRasterBand(1).SetNoDataValue(no_data_value)\n", + " temp_ds.SetProjection('EPSG:4326')\n", + " temp_ds.SetGeoTransform(geo)\n", + "\n", + " tilearray_np = temp_ds.GetRasterBand(1).ReadAsArray()\n", + " slope_arr, aspect_arr = np.gradient(tilearray_np)\n", + " aspect_arr = np.rad2deg(np.arctan2(aspect_arr[0], aspect_arr[1]))\n", + " \n", + " slope.values[0] = slope_arr\n", + " aspect.values[0] = aspect_arr\n", + "\n", + " elev = round(elevation.sel(x=xx, y=yy, method=\"nearest\").values[0])\n", + " slop = round(slope.sel(x=xx, y=yy, method=\"nearest\").values[0])\n", + " asp = round(aspect.sel(x=xx, y=yy, method=\"nearest\").values[0])\n", + "\n", + " elevation_cache[(lat, lon)] = (elev, slop, asp) \n", + " return elev, slop, asp\n", + "\n", + "def extract_terrain_data_threaded(metadata_df, bounding_box, max_workers=10):\n", + " global elevation_cache \n", + "\n", + " elevation_cache = {} \n", + " min_x, min_y, max_x, max_y = *bounding_box[0], *bounding_box[1]\n", + " \n", + " client = Client.open(\n", + " \"https://planetarycomputer.microsoft.com/api/stac/v1\",\n", + " ignore_conformance=True,\n", + " )\n", + "\n", + " search = client.search(\n", + " collections=[\"cop-dem-glo-90\"],\n", + " intersects = {\n", + " \"type\": \"Polygon\",\n", + " \"coordinates\": [[\n", + " [min_x, min_y],\n", + " [max_x, min_y],\n", + " [max_x, max_y],\n", + " [min_x, max_y],\n", + " [min_x, min_y] \n", + " ]]})\n", + "\n", + " tiles = list(search.items())\n", + "\n", + " regions = []\n", + "\n", + " print(\"Retrieving Copernicus 90m DEM tiles\")\n", + " for i in tqdm(range(0, len(tiles))):\n", + " row = [i, tiles[i].id]\n", + " regions.append(row)\n", + " regions = pd.DataFrame(columns = ['sliceID', 'tileID'], data = regions)\n", + " regions = regions.set_index(regions['tileID'])\n", + " del regions['tileID']\n", + "\n", + " print(\"Interpolating Grid Cell Spatial Features\")\n", + "\n", + " with ThreadPoolExecutor(max_workers=max_workers) as executor:\n", + " futures = [executor.submit(process_single_location, (metadata_df.iloc[i]['cen_lat'], metadata_df.iloc[i]['cen_lon'], regions, tiles))\n", + " for i in tqdm(range(len(metadata_df)))]\n", + " \n", + " results = []\n", + " for future in tqdm(as_completed(futures), total=len(futures)):\n", + " results.append(future.result())\n", + " \n", + " metadata_df['Elevation_m'], metadata_df['Slope_Deg'], metadata_df['Aspect_L'] = zip(*results)\n", + "\n", + "metadata_df = pd.read_csv(r\"/home/vgindi/Provided_Data/Merged_aso_nearest_sites1.csv\")\n", + "metadata_df= metadata_df.head(20)\n", + "bounding_box = ((-120.3763448720203, 36.29256774541929), (-118.292253412863, 38.994985247736324)) \n", + " \n", + "extract_terrain_data_threaded(metadata_df, bounding_box)\n", + "\n", + "# Display the results\n", + "metadata_df.head(10)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f714b0f0-1c38-4ba3-8aed-1ca6b97c2d10", + "metadata": {}, + "outputs": [], + "source": [ + "\"\"\"\n", + "This code block crops the global coverage VIIRS data to south sierras subregion. \n", + "\"\"\"\n", + "\n", + "def crop_sierras(input_file_path, output_file_path, shapes):\n", + " with rasterio.open(input_file_path) as src:\n", + " out_image, out_transform = rasterio.mask.mask(src, shapes, crop=True)\n", + " out_meta = src.out_meta\n", + " out_meta.update({\"driver\": \"GTiff\",\n", + " \"height\": out_image.shape[1],\n", + " \"width\": out_image.shape[2],\n", + " \"transform\": out_transform})\n", + " \n", + " with rasterio.open(output_file_path, \"w\", **out_meta) as dest:\n", + " dest.write(out_image)\n", + "\n", + "def download_viirs_sca(input_dir, output_dir, shapefile_path):\n", + " \n", + " # Load shapes from the shapefile\n", + " with fiona.open(shapefile_path, 'r') as shapefile:\n", + " shapes = [feature[\"geometry\"] for feature in shapefile]\n", + " \n", + " # Iterate through each year directory in the input directory\n", + " for year_folder in os.listdir(input_dir):\n", + " year_folder_path = os.path.join(input_dir, year_folder)\n", + " if os.path.isdir(year_folder_path):\n", + " # Extract year from the folder name (assuming folder names like 'WY2013')\n", + " year = re.search(r'\\d{4}', year_folder).group()\n", + " output_year_folder = os.path.join(output_dir, year)\n", + " os.makedirs(output_year_folder, exist_ok=True)\n", + " \n", + " for file_name in os.listdir(year_folder_path): \n", + " if file_name.endswith('.tif'): \n", + " parts = file_name.split('_')\n", + " output_file_name = '_'.join(parts[:3]) + '.tif'\n", + " output_file_path = os.path.join(output_year_folder, output_file_name)\n", + " input_file_path = os.path.join(year_folder_path, file_name)\n", + " crop_sierras(input_file_path, output_file_path, shapes)\n", + " print(f\"Processed and saved {output_file_path}\")\n", + "\n", + "if __name__ == \"__main__\":\n", + " \n", + " input_directory = r\"/home/vgindi/VIIRS_Data\"\n", + " output_directory = r\"/home/vgindi/VIIRS_Sierras\"\n", + " shapefile_path = r\"/home/vgindi/Provided_Data/low_sierras_points.shp\"\n", + " download_viirs_sca(input_directory, output_directory, shapefile_path)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d7ab2744-b080-48c8-bb77-f4b9c14ca774", + "metadata": {}, + "outputs": [], + "source": [ + "\"\"\"\n", + "This code cell transforms the raw VIIRS tiff files to 100m resolution and saves each file in .csv format\n", + "\"\"\"\n", + "def processing_VIIRS(input_file, output_res):\n", + " try:\n", + " # Define the output file path for TIFFs using the original file name\n", + " output_folder_tiff = os.path.join(\"/home/vgindi/Processed_VIIRS\", os.path.basename(os.path.dirname(input_file)))\n", + " os.makedirs(output_folder_tiff, exist_ok=True)\n", + " output_file = os.path.join(output_folder_tiff, os.path.basename(input_file))\n", + "\n", + " # Reproject and resample\n", + " ds = gdal.Open(input_file)\n", + " if ds is None:\n", + " print(f\"Failed to open '{input_file}'. Make sure the file is a valid GeoTIFF file.\")\n", + " return None\n", + " \n", + " gdal.Warp(output_file, ds, dstSRS=\"EPSG:4326\", xRes=output_res, yRes=-output_res, resampleAlg=\"bilinear\")\n", + "\n", + " # Read the processed TIFF file using rasterio\n", + " rds = rxr.open_rasterio(output_file)\n", + " rds = rds.squeeze().drop(\"spatial_ref\").drop(\"band\")\n", + " rds.name = \"data\"\n", + " df = rds.to_dataframe().reset_index()\n", + " return df\n", + " except Exception as e:\n", + " print(f\"An error occurred: {str(e)}\")\n", + " return None\n", + "\n", + "def process_and_convert_viirs(input_dir, output_res):\n", + " # Iterate over subdirectories in the input directory\n", + " for year in os.listdir(input_dir):\n", + " year_dir = os.path.join(input_dir, year)\n", + " \n", + " if os.path.isdir(year_dir):\n", + " for file_name in os.listdir(year_dir):\n", + " if file_name.endswith('.tif'):\n", + " input_file_path = os.path.join(year_dir, file_name)\n", + " df = processing_VIIRS(input_file_path, output_res)\n", + " \n", + " if df is not None:\n", + " csv_folder = os.path.join(\"/home/vgindi/Processed_VIIRS\", \"VIIRS_csv\")\n", + " os.makedirs(csv_folder, exist_ok=True)\n", + " csv_file_path = os.path.join(csv_folder, file_name.replace('.tif', '.csv'))\n", + " \n", + " df.to_csv(csv_file_path, index=False)\n", + " print(f\"Processed and saved {csv_file_path}\")\n", + "\n", + "if __name__ == \"__main__\":\n", + " input_directory = \"/home/vgindi/VIIRS_Sierras\"\n", + " output_res = 100 # Desired resolution in meters\n", + " process_and_convert_viirs(input_directory, output_res)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c3e2831c-817d-40b5-a2e0-d9ebff8a5672", + "metadata": {}, + "outputs": [], + "source": [ + "\"\"\"\n", + "This code cell fetches the cell id using grid_cells_meta_idx metadata for each lat/lon pair for VIIRS csv file\n", + "\"\"\"\n", + "def create_polygon(self, row):\n", + " return Polygon([(row['BL_Coord_Long'], row['BL_Coord_Lat']),\n", + " (row['BR_Coord_Long'], row['BR_Coord_Lat']),\n", + " (row['UR_Coord_Long'], row['UR_Coord_Lat']),\n", + " (row['UL_Coord_Long'], row['UL_Coord_Lat'])])\n", + " \n", + "def process_folder(self, input_folder, metadata_path, output_folder):\n", + " # Import the metadata into a pandas DataFrame\n", + " pred_obs_metadata_df = pd.read_csv(metadata_path)\n", + "\n", + " # Assuming create_polygon is defined elsewhere, we add a column with polygon geometries\n", + " pred_obs_metadata_df = pred_obs_metadata_df.drop(columns=['Unnamed: 0'], axis=1)\n", + " pred_obs_metadata_df['geometry'] = pred_obs_metadata_df.apply(self.create_polygon, axis=1)\n", + "\n", + " # Convert the DataFrame to a GeoDataFrame\n", + " metadata = gpd.GeoDataFrame(pred_obs_metadata_df, geometry='geometry')\n", + "\n", + " # Drop coordinates columns\n", + " metadata = metadata.drop(columns=['BL_Coord_Long', 'BL_Coord_Lat', \n", + " 'BR_Coord_Long', 'BR_Coord_Lat', \n", + " 'UR_Coord_Long', 'UR_Coord_Lat', \n", + " 'UL_Coord_Long', 'UL_Coord_Lat'], axis=1)\n", + "\n", + " # List all CSV files in the input folder\n", + " csv_files = [f for f in os.listdir(input_folder) if f.endswith('.csv')]\n", + "\n", + " for csv_file in csv_files:\n", + " input_path = os.path.join(input_folder, csv_file)\n", + " output_path = os.path.join(output_folder, csv_file)\n", + "\n", + " # Check if the output file already exists\n", + " if os.path.exists(output_path):\n", + " print(f\"CSV file {csv_file} already exists in the output folder.\")\n", + " continue\n", + "\n", + " # Process each CSV file\n", + " viirs_sca_df = pd.read_csv(input_path)\n", + "\n", + " # Convert the \"aso_swe_df\" into a GeoDataFrame with point geometries\n", + " geometry = [Point(xy) for xy in zip(viirs_sca_df['x'], viirs_sca_df['y'])]\n", + " viirs_sca_geo = gpd.GeoDataFrame(viirs_sca_df, geometry=geometry)\n", + " result = gpd.sjoin(viirs_sca_geo, metadata, how='left', predicate='within', op = 'intersects')\n", + "\n", + " # Select specific columns for the final DataFrame\n", + " Final_df = result[['y', 'x', 'data', 'cell_id']]\n", + " Final_df.rename(columns={'data': 'VIIRS_SCA'}, inplace=True)\n", + "\n", + " # Drop rows where 'cell_id' is NaN\n", + " if Final_df['cell_id'].isnull().values.any():\n", + " Final_df = Final_df.dropna(subset=['cell_id'])\n", + "\n", + " # Save the processed DataFrame to a CSV file\n", + " Final_df.to_csv(output_path, index=False)\n", + " print(f\"Processed {csv_file}\")\n", + "\n", + "if __name__ == \"__main__\":\n", + " input_folder = r\"\"\n", + " metadata_path = r\"\"\n", + " output_folder = r\"\"\n", + " process_folder(input_folder, metadata_path, output_folder)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "NSM", + "language": "python", + "name": "nsm" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.18" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/Data_Processing_NSMv2.0/.ipynb_checkpoints/NSIDC_Data-checkpoint.py b/Data_Processing_NSMv2.0/.ipynb_checkpoints/NSIDC_Data-checkpoint.py new file mode 100644 index 0000000..f69ae85 --- /dev/null +++ b/Data_Processing_NSMv2.0/.ipynb_checkpoints/NSIDC_Data-checkpoint.py @@ -0,0 +1,350 @@ +from __future__ import print_function + +import base64 +import getopt +import itertools +import json +import math +import netrc +import os.path +import ssl +import sys +import time +import getpass +import shutil + +try: + from urllib.parse import urlparse + from urllib.request import urlopen, Request, build_opener, HTTPCookieProcessor + from urllib.error import HTTPError, URLError +except ImportError: + from urlparse import urlparse + from urllib2 import urlopen, Request, HTTPError, URLError, build_opener, HTTPCookieProcessor + +import netrc +from urllib.parse import urlparse + +"""" +short_name = 'ASO_50M_SWE' +version = '1' +time_start = '2013-04-03T00:00:00Z' +time_end = '2019-07-16T23:59:59Z' +bounding_box = '-123.34078531,33.35825379,-105.07803558,48.97106571' +polygon = '' +filename_filter = '' +url_list = [] """ + +CMR_URL = 'https://cmr.earthdata.nasa.gov' +URS_URL = 'https://urs.earthdata.nasa.gov' +CMR_PAGE_SIZE = 2000 +CMR_FILE_URL = ('{0}/search/granules.json?provider=NSIDC_ECS' + '&sort_key[]=start_date&sort_key[]=producer_granule_id' + '&scroll=true&page_size={1}'.format(CMR_URL, CMR_PAGE_SIZE)) + +def get_login_credentials(): + try: + info = netrc.netrc() + username, password = info.authenticators(URS_URL) + credentials = f'{username}:{password}' + credentials = base64.b64encode(credentials.encode('ascii')).decode('ascii') + except Exception: + username = input("Earthdata Login Username: ") + password = getpass.getpass("Earthdata Login Password: ") + credentials = f'{username}:{password}' + credentials = base64.b64encode(credentials.encode('ascii')).decode('ascii') + return credentials + +def build_version_query_params(version): + desired_pad_length = 3 + if len(version) > desired_pad_length: + print('Version string too long: "{0}"'.format(version)) + quit() + + version = str(int(version)) # Strip off any leading zeros + query_params = '' + + while len(version) <= desired_pad_length: + padded_version = version.zfill(desired_pad_length) + query_params += '&version={0}'.format(padded_version) + desired_pad_length -= 1 + return query_params + +def filter_add_wildcards(filter): + if not filter.startswith('*'): + filter = '*' + filter + if not filter.endswith('*'): + filter = filter + '*' + return filter + +def build_filename_filter(filename_filter): + filters = filename_filter.split(',') + result = '&options[producer_granule_id][pattern]=true' + for filter in filters: + result += '&producer_granule_id[]=' + filter_add_wildcards(filter) + return result + +def build_cmr_query_url(short_name, version, time_start, time_end, + bounding_box, polygon=None, + filename_filter=None): + params = '&short_name={0}'.format(short_name) + params += build_version_query_params(version) + params += '&temporal[]={0},{1}'.format(time_start, time_end) + if polygon: + params += '&polygon={0}'.format(polygon) + elif bounding_box: + params += '&bounding_box={0}'.format(bounding_box) + if filename_filter: + params += build_filename_filter(filename_filter) + return CMR_FILE_URL + params + +def get_speed(time_elapsed, chunk_size): + if time_elapsed <= 0: + return '' + speed = chunk_size / time_elapsed + if speed <= 0: + speed = 1 + size_name = ('', 'k', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y') + i = int(math.floor(math.log(speed, 1000))) + p = math.pow(1000, i) + return '{0:.1f}{1}B/s'.format(speed / p, size_name[i]) + +def output_progress(count, total, status='', bar_len=60): + if total <= 0: + return + fraction = min(max(count / float(total), 0), 1) + filled_len = int(round(bar_len * fraction)) + percents = int(round(100.0 * fraction)) + bar = '=' * filled_len + ' ' * (bar_len - filled_len) + fmt = ' [{0}] {1:3d}% {2} '.format(bar, percents, status) + print('\b' * (len(fmt) + 4), end='') # clears the line + sys.stdout.write(fmt) + sys.stdout.flush() + +def cmr_read_in_chunks(file_object, chunk_size=1024 * 1024): + """Read a file in chunks using a generator. Default chunk size: 1Mb.""" + while True: + data = file_object.read(chunk_size) + if not data: + break + yield data + +def get_login_response(url, credentials): + opener = build_opener(HTTPCookieProcessor()) + req = Request(url) + + if credentials: + req.add_header('Authorization', 'Basic {0}'.format(credentials)) + + try: + response = opener.open(req) + except HTTPError as e: + print('HTTP error {0}, {1}'.format(e.code, e.reason)) + sys.exit(1) + except URLError as e: + print('URL error: {0}'.format(e.reason)) + sys.exit(1) + + return response + +def cmr_download(urls, folder, quiet=False): + if not urls: + return + + # Create the target directory if it doesn't exist + if not os.path.exists(folder): + os.makedirs(folder) + + if not quiet: + print('Downloading {0} files to {1}...'.format(len(urls), folder)) + + credentials = get_login_credentials() + print(credentials) + + for index, url in enumerate(urls, start=1): + filename = os.path.join(folder, url.split('/')[-1]) # Specify the full path to the file + if not quiet: + print('{0}/{1}: {2}'.format(str(index).zfill(len(str(len(urls)))), len(urls), filename)) + + try: + response = get_login_response(url, credentials) + length = int(response.headers['content-length']) + count = 0 + chunk_size = min(max(length, 1), 1024 * 1024) + time_initial = time.time() + with open(filename, 'wb') as out_file: + for data in cmr_read_in_chunks(response, chunk_size=chunk_size): + out_file.write(data) + if not quiet: + count = count + 1 + time_elapsed = time.time() - time_initial + download_speed = get_speed(time_elapsed, count * chunk_size) + output_progress(count, math.ceil(length / chunk_size), status=download_speed) + + if not quiet: + print() + except HTTPError as e: + print('HTTP error {0}, {1}'.format(e.code, e.reason)) + except URLError as e: + print('URL error: {0}'.format(e.reason)) + except IOError: + raise + + orgnl_directory = folder + destination_directory = 'SWE_Data_xml' + + if not os.path.exists(destination_directory): + os.makedirs(destination_directory) + + files = os.listdir(orgnl_directory) + + for file in files: + if file.endswith('.xml'): + source_file_path = os.path.join(orgnl_directory, file) + destination_file_path = os.path.join(destination_directory, file) + + # Move the file to the destination directory + shutil.move(source_file_path, destination_file_path) + + print("Files with .xml extension moved to the destination folder.") + + +def cmr_filter_urls(search_results): + """Select only the desired data files from CMR response.""" + if 'feed' not in search_results or 'entry' not in search_results['feed']: + return [] + + entries = [e['links'] + for e in search_results['feed']['entry'] + if 'links' in e] + # Flatten "entries" to a simple list of links + links = list(itertools.chain(*entries)) + + urls = [] + unique_filenames = set() + for link in links: + if 'href' not in link: + # Exclude links with nothing to download + continue + if 'inherited' in link and link['inherited'] is True: + # Why are we excluding these links? + continue + if 'rel' in link and 'data#' not in link['rel']: + # Exclude links which are not classified by CMR as "data" or "metadata" + continue + + if 'title' in link and 'opendap' in link['title'].lower(): + # Exclude OPeNDAP links--they are responsible for many duplicates + # This is a hack; when the metadata is updated to properly identify + # non-datapool links, we should be able to do this in a non-hack way + continue + + filename = link['href'].split('/')[-1] + if filename in unique_filenames: + # Exclude links with duplicate filenames (they would overwrite) + continue + unique_filenames.add(filename) + + urls.append(link['href']) + + return urls + +def cmr_search(short_name, version, time_start, time_end, + bounding_box, polygon='', filename_filter='', quiet=False): + cmr_query_url = build_cmr_query_url(short_name=short_name, version=version, + time_start=time_start, time_end=time_end, + bounding_box=bounding_box, + polygon=polygon, filename_filter=filename_filter) + if not quiet: + print('Querying for data:\n\t{0}\n'.format(cmr_query_url)) + + cmr_scroll_id = None + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + + urls = [] + hits = 0 + while True: + req = Request(cmr_query_url) + if cmr_scroll_id: + req.add_header('cmr-scroll-id', cmr_scroll_id) + try: + response = urlopen(req, context=ctx) + except Exception as e: + print('Error: ' + str(e)) + sys.exit(1) + if not cmr_scroll_id: + # Python 2 and 3 have different case for the http headers + headers = {k.lower(): v for k, v in dict(response.info()).items()} + cmr_scroll_id = headers['cmr-scroll-id'] + hits = int(headers['cmr-hits']) + if not quiet: + if hits > 0: + print('Found {0} matches.'.format(hits)) + else: + print('Found no matches.') + search_page = response.read() + search_page = json.loads(search_page.decode('utf-8')) + url_scroll_results = cmr_filter_urls(search_page) + if not url_scroll_results: + break + if not quiet and hits > CMR_PAGE_SIZE: + print('.', end='') + sys.stdout.flush() + urls += url_scroll_results + print(urls) + + if not quiet and hits > CMR_PAGE_SIZE: + print() + return urls + +def main(argv=None): + global short_name, version, time_start, time_end, bounding_box, \ + polygon, filename_filter, url_list + + if argv is None: + argv = sys.argv[1:] + + force = False + quiet = False + usage = 'usage: nsidc-download_***.py [--help, -h] [--force, -f] [--quiet, -q]' + + try: + opts, args = getopt.getopt(argv, 'hfq', ['help', 'force', 'quiet']) + for opt, _arg in opts: + if opt in ('-f', '--force'): + force = True + elif opt in ('-q', '--quiet'): + quiet = True + elif opt in ('-h', '--help'): + print(usage) + sys.exit(0) + except getopt.GetoptError as e: + print(e.args[0]) + print(usage) + sys.exit(1) + + # Supply some default search parameters, just for testing purposes. + # These are only used if the parameters aren't filled in up above. + if 'short_name' in short_name: + short_name = 'ATL06' + version = '003' + time_start = '2018-10-14T00:00:00Z' + time_end = '2021-01-08T21:48:13Z' + bounding_box = '-123.34078531,33.35825379,-105.07803558,48.97106571' + polygon = '' + filename_filter = '*ATL06_2020111121*' + url_list = [] + + try: + if not url_list: + url_list = cmr_search(short_name, version, time_start, time_end, + bounding_box=bounding_box, polygon=polygon, + filename_filter=filename_filter, quiet=quiet) + + #cmr_download(url_list, force=force, quiet=quiet) + except KeyboardInterrupt: + quit() + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/Data_Processing_NSMv2.0/.ipynb_checkpoints/fetch_terrain_with_dask-checkpoint.py b/Data_Processing_NSMv2.0/.ipynb_checkpoints/fetch_terrain_with_dask-checkpoint.py new file mode 100644 index 0000000..805064d --- /dev/null +++ b/Data_Processing_NSMv2.0/.ipynb_checkpoints/fetch_terrain_with_dask-checkpoint.py @@ -0,0 +1,119 @@ +from dask.distributed import Client, LocalCluster +from concurrent.futures import ProcessPoolExecutor +from dask import delayed +import dask.dataframe as dd +import pandas as pd +import math +from pyproj import Transformer +import rioxarray as rxr +import numpy as np +from osgeo import gdal, gdalconst +from math import floor, ceil, sqrt, atan2, rad2deg +from numpy import rad2deg, arctan2, gradient +import dask_jobqueue +import pystac_client +import planetary_computer +from tqdm import tqdm +from dask.diagnostics import ProgressBar +from retrying import retry + +def process_single_location(args): + lat, lon, elevation = args + + slope = elevation.copy() + aspect = elevation.copy() + + transformer = Transformer.from_crs("EPSG:4326", elevation.rio.crs, always_xy=True) + xx, yy = transformer.transform(lon, lat) + + tilearray = np.around(elevation.values[0]).astype(int) + geo = (math.floor(float(lon)), 90, 0.0, math.ceil(float(lat)), 0.0, -90) + + driver = gdal.GetDriverByName('MEM') + temp_ds = driver.Create('', tilearray.shape[1], tilearray.shape[0], 1, gdalconst.GDT_Float32) + + temp_ds.GetRasterBand(1).WriteArray(tilearray) + + tilearray_np = temp_ds.GetRasterBand(1).ReadAsArray() + grad_y, grad_x = gradient(tilearray_np) + + # Calculate slope and aspect + slope_arr = np.sqrt(grad_x**2 + grad_y**2) + aspect_arr = rad2deg(arctan2(-grad_y, grad_x)) % 360 + + slope.values[0] = slope_arr + aspect.values[0] = aspect_arr + + elev = round(elevation.sel(x=xx, y=yy, method="nearest").values[0]) + slop = round(slope.sel(x=xx, y=yy, method="nearest").values[0]) + asp = round(aspect.sel(x=xx, y=yy, method="nearest").values[0]) + + return elev, slop, asp + +def process_data_in_chunks(tile_group, tiles): + results = [] + + index_id = int(tile_group['index_id'].iloc[0]) + tile = tiles[index_id] + + @retry(stop_max_attempt_number=3, wait_fixed=2000, retry_on_exception=lambda exception: isinstance(exception, IOError)) + def fetch_and_process_elevation(): + signed_asset = planetary_computer.sign(tile.assets["data"]) + elevation = rxr.open_rasterio(signed_asset.href) + + for lat, lon in zip(tile_group['lat'], tile_group['lon']): + try: + result = process_single_location((lat, lon, elevation)) + results.append(result) + except Exception as e: + print(f"Error processing location (lat: {lat}, lon: {lon}) due to {e}. Skipping...") + + fetch_and_process_elevation() + return pd.DataFrame(results, columns=['Elevation_m', 'Slope_Deg', 'Aspect_L']) + +def process_data_in_chunks_dask(tile_group, tiles): + + index_id = int(tile_group['index_id'].iloc[0]) + tile = tiles[index_id] + + signed_asset = planetary_computer.sign(tile.assets["data"]) + elevation = rxr.open_rasterio(signed_asset.href) + + results = [delayed(process_single_location)((lat, lon, elevation)) for lat, lon in zip(tile_group['lat'], tile_group['lon'])] + return pd.DataFrame(results, columns=['Elevation_m', 'Slope_Deg', 'Aspect_L']) + +def extract_terrain_data(df): + + + #cluster = dask_jobqueue.SLURMCluster(local_directory=r'/home/vgindi/slurmcluster') + #cluster.scale(num_workers) + #dask_client = Client(cluster) + + client = pystac_client.Client.open("https://planetarycomputer.microsoft.com/api/stac/v1", ignore_conformance=True) + + area_of_interest = {'type': 'Polygon', + 'coordinates': [[[-120.37693519839556, 36.29213061937931], + [-120.37690215328962, 38.8421802805432], + [-118.29165268221286, 38.84214595220293], + [-118.2917116398743, 36.29209713778364], + [-120.37693519839556, 36.29213061937931]]]} + + search = client.search(collections=["cop-dem-glo-90"], intersects=area_of_interest) + + tiles = list(search.items()) + df = df[['lat', 'lon', 'index_id']] + + # Convert the DataFrame to a Dask DataFrame for distributed computing + dask_df = dd.from_pandas(df, npartitions = df['index_id'].nunique()) + + # Process each partition (grouped by 'index_id') in parallel + results = dask_df.groupby('index_id').apply(lambda group: process_data_in_chunks(group, tiles), + meta=pd.DataFrame(columns=['Elevation_m', 'Slope_Deg', 'Aspect_L'])) + with ProgressBar(): + result_df = results.compute() + + #dask_client.close() + #cluster.close() + + final_df = pd.concat([df.reset_index(drop=True), result_df.reset_index(drop=True)], axis=1) + return result_df diff --git a/Data_Processing_NSMv2.0/.ipynb_checkpoints/generate_gridcells_sierras-checkpoint.py b/Data_Processing_NSMv2.0/.ipynb_checkpoints/generate_gridcells_sierras-checkpoint.py new file mode 100644 index 0000000..d7b7d63 --- /dev/null +++ b/Data_Processing_NSMv2.0/.ipynb_checkpoints/generate_gridcells_sierras-checkpoint.py @@ -0,0 +1,190 @@ +import math +import numpy as np +import pandas as pd +import geopandas as gpd +from pyproj import CRS, Transformer +import planetary_computer +from pystac_client import Client +import planetary_computer +import gdal +from gdal import gdalconst +from shapely import Point, Polygon +from shapely.geometry import box +from concurrent.futures import ThreadPoolExecutor, as_completed + +def generate_grids(bounding_box): + """ + Generates 100m resolution grid cells for low sierras regions uisng bounding box coordinates. Do not run this code if you have the file downloaded + this might take longer to run this. + """ + #gdf_shapefile = gpd.read_file(shapefile_path) + ## Get bounding box coordinates + #minx, miny, maxx, maxy = gdf_shapefile.total_bounds + + minx, miny, maxx, maxy = *bounding_box[0], *bounding_box[1] + bbox_polygon = box(minx, miny, maxx, maxy) + + #buffer the bounds + minx = minx-1 + maxx = maxx+1 + miny = miny-1 + maxy = maxy+1 + + # Define the source and target coordinate reference systems + src_crs = CRS('EPSG:4326') # WGS84 + + if -126 < minx < -120: + # transformer = Transformer.from_crs(src_crs, 'epsg:32610', always_xy=True) + target_crs = CRS('EPSG:32610') #UTM zone 10 + elif -120 < minx < -114: + # transformer = Transformer.from_crs(src_crs, 'epsg:32611', always_xy=True) + target_crs = CRS('EPSG:32611') #UTM zone 11 + elif -114 < minx < -108: + # transformer = Transformer.from_crs(src_crs, 'epsg:32612', always_xy=True) + target_crs = CRS('EPSG:32612') #UTM zone 12 + elif -108 < minx < -102: + # transformer = Transformer.from_crs(src_crs, 'epsg:32613', always_xy=True) + target_crs = CRS('EPSG:32613') #UTM zone 13 + else: + # transformer = Transformer.from_crs(src_crs, target_crs, always_xy=True) + target_crs = CRS('EPSG:3857') #Web Mercator + + transformer = Transformer.from_crs(src_crs, target_crs, always_xy=True) + + # Convert the bounding box coordinates to Web Mercator + minx, miny = transformer.transform(minx, miny) + maxx, maxy = transformer.transform(maxx, maxy) + + # set the grid cell size in meters + cell_size = 100 + + # Calculate the number of cells in x and y directions + num_cells_x = int((maxx-minx)/cell_size) + num_cells_y = int((maxy-miny)/cell_size) + + # Calculate the total grid width and height + grid_width = num_cells_x*cell_size + grid_height = num_cells_y*cell_size + + # Calculate the offset to center the grid + offset_x = ((maxx-minx)-grid_width)/2 + offset_y = ((maxy-miny)-grid_height)/2 + + # Generate latitude and longitude ranges + lon_range = np.linspace(minx + offset_x, maxx - offset_x, num=num_cells_x) + lat_range = np.linspace(miny + offset_y, maxy - offset_y, num=num_cells_y) + + # Create a grid of points + points = [] + for lon in lon_range: + for lat in lat_range: + points.append((lon, lat)) + + # Convert the coordinate pairs back to WGS84 + back_transformer = Transformer.from_crs(target_crs, src_crs, always_xy=True) + target_coordinates = [back_transformer.transform(lon, lat) for lon, lat in points] + + # Create a list of Shapely Point geometries + coords = [Point(lon, lat) for lon, lat in target_coordinates] + + # Create a GeoDataFrame from the points + gdf_points = gpd.GeoDataFrame(geometry=coords) + + # set CRS to WGS84 + gdf_points=gdf_points.set_crs('epsg:4326') + # Clip the points to the shapefile boundary + gdf_clipped_points = gpd.clip(gdf_points, bbox_polygon) + # Specify the output points shapefile path + output_shapefile = r'/home/vgindi/Provided_Data/low_sierras_points.shp'####### + # Export the clipped points to a shapefile + gdf_clipped_points.to_file(output_shapefile) + print("Regional Grid Created") + + #Create Submission format .csv for SWE predictions + gdf_clipped_points.index.names = ['cell_id'] + Geospatial_df = pd.DataFrame() + Geospatial_df['lon']= gdf_clipped_points['geometry'].x + Geospatial_df['lat']= gdf_clipped_points['geometry'].y + + ### Begin process to import geospatial features into DF + min_lon = min(Geospatial_df['lon']) + min_lat = min(Geospatial_df['lat']) + + # Define the source and target coordinate reference systems + src_crs = CRS('EPSG:4326') # WGS84 + if -126 < min_lon < -120: + # transformer = Transformer.from_crs(src_crs, 'epsg:32610', always_xy=True) + target_crs = CRS('EPSG:32610') #UTM zone 10 + elif -120 < min_lon < -114: + # transformer = Transformer.from_crs(src_crs, 'epsg:32611', always_xy=True) + target_crs = CRS('EPSG:32611') #UTM zone 11 + elif -114 < min_lon < -108: + # transformer = Transformer.from_crs(src_crs, 'epsg:32612', always_xy=True) + target_crs = CRS('EPSG:32612') #UTM zone 12 + elif -108 < min_lon < -102: + # transformer = Transformer.from_crs(src_crs, 'epsg:32613', always_xy=True) + target_crs = CRS('EPSG:32613') #UTM zone 13 + else: + # transformer = Transformer.from_crs(src_crs, target_crs, always_xy=True) + target_crs = CRS('EPSG:3857') #Web Mercator + + transformer = Transformer.from_crs(src_crs, target_crs, always_xy=True) + + # Convert the bounding box coordinates to Web Mercator + Geospatial_df['lon_m'], Geospatial_df['lat_m'] = transformer.transform(Geospatial_df['lon'].to_numpy(), Geospatial_df['lat'].to_numpy()) + geocols=['BR_Coord_Long', 'BR_Coord_Lat', 'UR_Coord_Long', 'UR_Coord_Lat', + 'UL_Coord_Long', 'UL_Coord_Lat', 'BL_Coord_Long', 'BL_Coord_Lat'] + + Geospatial_df = Geospatial_df.reindex(columns=[*Geospatial_df.columns.tolist(), *geocols], fill_value=0) + Geospatial_df.reset_index(drop=True, inplace=True) + Geospatial_df = Geospatial_df.assign(BR_Coord_Long=lambda x: x.lon_m + 50, + BR_Coord_Lat=lambda x: x.lat_m - 50, + UR_Coord_Long=lambda x: x.lon_m + 50, + UR_Coord_Lat=lambda x: x.lat_m + 50, + UL_Coord_Long=lambda x: x.lon_m - 50, + UL_Coord_Lat=lambda x: x.lat_m + 50, + BL_Coord_Long=lambda x: x.lon_m - 50, + BL_Coord_Lat=lambda x: x.lat_m - 50,) + + transformer = Transformer.from_crs(target_crs, src_crs, always_xy=True) + #Geospatial_df['lon_m'], Geospatial_df['lat_m'] = transformer.transform(Geospatial_df['lon'].to_numpy(), Geospatial_df['lat'].to_numpy()) + Geospatial_df['BR_Coord_Long'], Geospatial_df['BR_Coord_Lat']=transformer.transform(Geospatial_df['BR_Coord_Long'].to_numpy(), Geospatial_df['BR_Coord_Lat'].to_numpy()) + Geospatial_df['UR_Coord_Long'], Geospatial_df['UR_Coord_Lat']=transformer.transform(Geospatial_df['UR_Coord_Long'].to_numpy(), Geospatial_df['UR_Coord_Lat'].to_numpy()) + Geospatial_df['UL_Coord_Long'], Geospatial_df['UL_Coord_Lat']=transformer.transform(Geospatial_df['UL_Coord_Long'].to_numpy(), Geospatial_df['UL_Coord_Lat'].to_numpy()) + Geospatial_df['BL_Coord_Long'], Geospatial_df['BL_Coord_Lat']=transformer.transform(Geospatial_df['BL_Coord_Long'].to_numpy(), Geospatial_df['BL_Coord_Lat'].to_numpy()) + print(Geospatial_df.columns) + Geospatial_df['cell_id'] = Geospatial_df.apply(lambda x: f"11N_cell_{x['lon']}_{x['lat']}", axis=1) + + client = Client.open( + "https://planetarycomputer.microsoft.com/api/stac/v1", + ignore_conformance=True, + ) + + search = client.search( + collections=["cop-dem-glo-90"], + intersects = { + "type": "Polygon", + "coordinates": [[ + [min_x, min_y], + [max_x, min_y], + [max_x, max_y], + [min_x, max_y], + [min_x, min_y] + ]]}) + + tiles = list(search.items()) + regions = pd.DataFrame([(i, tile.id) for i, tile in enumerate(tiles)], columns=['sliceID', 'tileID']).set_index('tileID') + + Geospatial_df['tile_id'] = Geospatial_df.apply(lambda x: 'Copernicus_DSM_COG_30_N' + str(math.floor(x['lat'])) + '_00_W' + str(math.ceil(abs(x['lon']))) + '_00_DEM', axis=1) + + regions_dict = regions['sliceID'].to_dict() + Geospatial_df['index_id'] = Geospatial_df['tile_id'].map(regions_dict) + Geospatial_df = Geospatial_df.drop(columns = ['tile_id'], axis = 1) + + return Geospatial_df[['cell_id', 'lon', 'lat', 'BR_Coord_Long', 'BR_Coord_Lat', 'UR_Coord_Long', 'UR_Coord_Lat', + 'UL_Coord_Long', 'UL_Coord_Lat', 'BL_Coord_Long', 'BL_Coord_Lat', 'index_id']] + +if __name__ = "__main__": + bounding_box = ((-120.3763448720203, 36.29256774541929), (-118.292253412863, 38.994985247736324)) + grid_cells_meta = generate_grids(bounding_box) + grid_cells_meta.to_csv(r'/home/vgindi/Provided_Data/grid_cells_meta.csv') \ No newline at end of file diff --git a/Data_Processing_NSMv2.0/.ipynb_checkpoints/terrain_daskconcurrency-checkpoint.py b/Data_Processing_NSMv2.0/.ipynb_checkpoints/terrain_daskconcurrency-checkpoint.py new file mode 100644 index 0000000..fc8acd4 --- /dev/null +++ b/Data_Processing_NSMv2.0/.ipynb_checkpoints/terrain_daskconcurrency-checkpoint.py @@ -0,0 +1,177 @@ +# Import packages +# Dataframe Packages +import numpy as np +from numpy import gradient, rad2deg, arctan2 +import xarray as xr +import pandas as pd + +# Vector Packages +import geopandas as gpd +import shapely +from shapely.geometry import Point, Polygon +from pyproj import CRS, Transformer + +# Raster Packages +import rioxarray as rxr +from rioxarray.merge import merge_arrays +#import rasterstats as rs +from osgeo import gdal +from osgeo import gdalconst + +# Data Access Packages +import pystac_client +import planetary_computer + +# General Packages +import os +import re +import shutil +import math +from datetime import datetime +import glob +from pprint import pprint +from typing import Union +from pathlib import Path +from tqdm import tqdm +import time +import requests +import concurrent.futures +from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed, TimeoutError +import dask +import dask.dataframe as dd +from dask.diagnostics import ProgressBar +from tqdm import tqdm +from requests.exceptions import HTTPError +import dask_jobqueue +from dask.distributed import Client + +#Processing using gdal +#In my opinion working gdal is faster compared to richdem +def process_single_location(args): + lat, lon, index_id, tiles = args + + signed_asset = planetary_computer.sign(tiles[int(index_id)].assets["data"]) + elevation = rxr.open_rasterio(signed_asset.href) + + slope = elevation.copy() + aspect = elevation.copy() + + transformer = Transformer.from_crs("EPSG:4326", elevation.rio.crs, always_xy=True) + xx, yy = transformer.transform(lon, lat) + + tilearray = np.around(elevation.values[0]).astype(int) + geo = (math.floor(float(lon)), 90, 0.0, math.ceil(float(lat)), 0.0, -90) + + driver = gdal.GetDriverByName('MEM') + temp_ds = driver.Create('', tilearray.shape[1], tilearray.shape[0], 1, gdalconst.GDT_Float32) + + temp_ds.GetRasterBand(1).WriteArray(tilearray) + + tilearray_np = temp_ds.GetRasterBand(1).ReadAsArray() + grad_y, grad_x = gradient(tilearray_np) + + # Calculate slope and aspect + slope_arr = np.sqrt(grad_x**2 + grad_y**2) + aspect_arr = rad2deg(arctan2(-grad_y, grad_x)) % 360 + + slope.values[0] = slope_arr + aspect.values[0] = aspect_arr + + elev = round(elevation.sel(x=xx, y=yy, method="nearest").values[0]) + slop = round(slope.sel(x=xx, y=yy, method="nearest").values[0]) + asp = round(aspect.sel(x=xx, y=yy, method="nearest").values[0]) + + return elev, slop, asp + +#Processing using RichDEM +def process_single_location(args): + lat, lon, index_id, tiles = args + + signed_asset = sign(tiles[index_id].assets["data"]) + elevation = rxr.open_rasterio(signed_asset.href, masked=True) + + slope = elevation.copy() + aspect = elevation.copy() + + transformer = Transformer.from_crs("EPSG:4326", elevation.rio.crs, always_xy=True) + xx, yy = transformer.transform(lon, lat) + tilearray = np.around(elevation.values[0]).astype(int) + + #set tile geo to get slope and set at rdarray + geo = (math.floor(float(lon)), 90, 0.0, math.ceil(float(lat)), 0.0, -90) + + tilearray = rd.rdarray(tilearray, no_data = -9999) + tilearray.projection = 'EPSG:4326' + tilearray.geotransform = geo + + slope_arr = rd.TerrainAttribute(tilearray, attrib='slope_degrees') + aspect_arr = rd.TerrainAttribute(tilearray, attrib='aspect') + + slope.values[0] = slope_arr + aspect.values[0] = aspect_arr + + elev = round(elevation.sel(x=xx, y=yy, method="nearest").values[0]) + slop = round(slope.sel(x=xx, y=yy, method="nearest").values[0]) + asp = round(aspect.sel(x=xx, y=yy, method="nearest").values[0]) + + return elev, slop, asp + +def process_data_in_chunks(df, tiles, num_workers = 16): + chunk_results = [] + + with ProcessPoolExecutor(max_workers=num_workers) as executor: + futures = [executor.submit(process_single_location, (row.lat, row.lon, row.index_id, tiles)) for index, row in df.iterrows()] + + #for future in tqdm(as_completed(futures), total=len(futures)): + for future in tqdm(futures): + try: + chunk_results.append(future.result(timeout=10)) + except TimeoutError: + print("Processing location timed out.") + continue + except HTTPError as e: + print(f"Failed to process a location due to HTTPError: {e}") + continue + + return pd.DataFrame(chunk_results, columns=['Elevation_m', 'Slope_Deg', 'Aspect_L']) + +###Dask implementation to process the dataframe in chunks### +def extract_terrain_data(geospatial_df, num_workers=16): + + cores_per_worker = 1 + memory_per_worker = "3GB" #Adjust based on the necessity + + cluster = dask_jobqueue.SLURMCluster(cores=cores_per_worker, + memory=memory_per_worker, + local_directory=r'/home/vgindi/slurmcluster') + + # Scale the cluster to workers based on the requirement + cluster.scale(num_workers) + dask_client = Client(cluster) + dask_client + + client = pystac_client.Client.open("https://planetarycomputer.microsoft.com/api/stac/v1", ignore_conformance=True) + + #coordinates obtained from the grid_cells_meta data + area_of_interest = {'type': 'Polygon', + 'coordinates': [[[-120.37693519839556, 36.29213061937931], + [-120.37690215328962, 38.8421802805432], + [-118.29165268221286, 38.84214595220293], + [-118.2917116398743, 36.29209713778364], + [-120.37693519839556, 36.29213061937931]]]} + + search = client.search(collections=["cop-dem-glo-90"], intersects=area_of_interest) + + tiles = list(search.items()) + geospatial_df = geospatial_df[['lat', 'lon', 'index_id']] + + dask_df = dd.from_pandas(geospatial_df, npartitions=5) + results = dask_df.map_partitions(process_data_in_chunks, tiles=tiles, num_workers=num_workers, + meta=pd.DataFrame(columns=['Elevation_m', 'Slope_Deg', 'Aspect_L'])) + + + final_result = results.compute(scheduler='processes') + + #result_df = pd.concat([geospatial_df.reset_index(drop=True), final_result.reset_index(drop=True)], axis=1) + return final_result + diff --git a/Data_Processing_NSMv2.0/DataProcessing_Pipeline.ipynb b/Data_Processing_NSMv2.0/DataProcessing_Pipeline.ipynb new file mode 100644 index 0000000..b803420 --- /dev/null +++ b/Data_Processing_NSMv2.0/DataProcessing_Pipeline.ipynb @@ -0,0 +1,887 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "9582b19b-a266-4a82-9f9d-7d5ad1e3a391", + "metadata": {}, + "outputs": [], + "source": [ + "# Import packages\n", + "# Dataframe Packages\n", + "import numpy as np\n", + "import xarray as xr\n", + "import pandas as pd\n", + "\n", + "# Vector Packages\n", + "import geopandas as gpd\n", + "import shapely\n", + "from shapely import wkt\n", + "from shapely.geometry import Point, Polygon\n", + "from pyproj import CRS, Transformer\n", + "\n", + "# Raster Packages\n", + "import rioxarray as rxr\n", + "import rasterio\n", + "from rasterio.mask import mask\n", + "from rioxarray.merge import merge_arrays\n", + "import rasterstats as rs\n", + "import gdal\n", + "from gdal import gdalconst\n", + "\n", + "# Data Access Packages\n", + "import earthaccess as ea\n", + "import h5py\n", + "import pickle\n", + "from tensorflow.keras.models import load_model\n", + "from pystac_client import Client\n", + "import richdem as rd\n", + "import planetary_computer\n", + "from planetary_computer import sign\n", + "\n", + "# General Packages\n", + "import os\n", + "import re\n", + "import shutil\n", + "import math\n", + "from datetime import datetime\n", + "import glob\n", + "from pprint import pprint\n", + "from typing import Union\n", + "from pathlib import Path\n", + "from tqdm import tqdm\n", + "import time\n", + "import requests\n", + "from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed\n", + "import dask\n", + "import dask.dataframe as dd\n", + "from dask.distributed import progress\n", + "from dask.distributed import Client\n", + "from dask.diagnostics import ProgressBar\n", + "from retrying import retry\n", + "import fiona\n", + "import re\n", + "import s3fs" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7ab6edbc-c701-42b3-89c9-4649ac4dccba", + "metadata": {}, + "outputs": [], + "source": [ + "import NSIDC_Data\n", + "\n", + "class ASODataTool:\n", + " def __init__(self, short_name, version, polygon='', filename_filter=''):\n", + " self.short_name = short_name\n", + " self.version = version\n", + " self.polygon = polygon\n", + " self.filename_filter = filename_filter\n", + " self.url_list = []\n", + " self.CMR_URL = 'https://cmr.earthdata.nasa.gov'\n", + " self.CMR_PAGE_SIZE = 2000\n", + " self.CMR_FILE_URL = ('{0}/search/granules.json?provider=NSIDC_ECS'\n", + " '&sort_key[]=start_date&sort_key[]=producer_granule_id'\n", + " '&scroll=true&page_size={1}'.format(self.CMR_URL, self.CMR_PAGE_SIZE))\n", + "\n", + " def cmr_search(self, time_start, time_end, bounding_box):\n", + " try:\n", + " if not self.url_list:\n", + " self.url_list = NSIDC_Data.cmr_search(\n", + " self.short_name, self.version, time_start, time_end,\n", + " bounding_box=self.bounding_box, polygon=self.polygon,\n", + " filename_filter=self.filename_filter, quiet=False)\n", + " return self.url_list\n", + " except KeyboardInterrupt:\n", + " quit()\n", + "\n", + " def cmr_download(self, directory):\n", + " if not os.path.exists(directory):\n", + " os.makedirs(directory, exist_ok=True)\n", + "\n", + " NSIDC_Data.cmr_download(self.url_list, directory, False)\n", + "\n", + " @staticmethod\n", + " def get_bounding_box(region):\n", + " regions = pd.read_pickle(\"C:\\\\Users\\\\VISH NU\\\\National_snow_model\\\\National-Snow-Model\\\\Data\\\\Processed\\\\RegionVal.pkl\")\n", + " superset = []\n", + "\n", + " superset.append(regions[region])\n", + " superset = pd.concat(superset)\n", + " superset = gpd.GeoDataFrame(superset, geometry=gpd.points_from_xy(superset.Long, superset.Lat, crs=\"EPSG:4326\"))\n", + " bounding_box = list(superset.total_bounds)\n", + "\n", + " return f\"{bounding_box[0]},{bounding_box[1]},{bounding_box[2]},{bounding_box[3]}\"\n", + "\n", + "class ASODownload(ASODataTool):\n", + " def __init__(self, short_name, version, polygon='', filename_filter=''):\n", + " super().__init__(short_name, version, polygon, filename_filter)\n", + " self.region_list = [ 'N_Sierras',\n", + " 'S_Sierras',\n", + " 'Greater_Yellowstone',\n", + " 'N_Co_Rockies',\n", + " 'SW_Mont',\n", + " 'SW_Co_Rockies',\n", + " 'GBasin',\n", + " 'N_Wasatch',\n", + " 'N_Cascade',\n", + " 'S_Wasatch',\n", + " 'SW_Mtns',\n", + " 'E_WA_N_Id_W_Mont',\n", + " 'S_Wyoming',\n", + " 'SE_Co_Rockies',\n", + " 'Sawtooth',\n", + " 'Ca_Coast',\n", + " 'E_Or',\n", + " 'N_Yellowstone',\n", + " 'S_Cascade',\n", + " 'Wa_Coast',\n", + " 'Greater_Glacier',\n", + " 'Or_Coast' ]\n", + "\n", + " def select_region(self):\n", + " print(\"Select a region by entering its index:\")\n", + " for i, region in enumerate(self.region_list, start=1):\n", + " print(f\"{i}. {region}\")\n", + "\n", + " try:\n", + " user_input = int(input(\"Enter the index of the region: \"))\n", + " if 1 <= user_input <= len(self.region_list):\n", + " selected_region = self.region_list[user_input - 1]\n", + " self.bounding_box = self.get_bounding_box(selected_region)\n", + " print(f\"You selected: {selected_region}\")\n", + " print(f\"Bounding Box: {self.bounding_box}\")\n", + " else:\n", + " print(\"Invalid index. Please select a valid index.\")\n", + " except ValueError:\n", + " print(\"Invalid input. Please enter a valid index.\")\n", + " \n", + "if __name__ == \"__main__\":\n", + " short_name = 'ASO_50M_SWE'\n", + " version = '1'\n", + "\n", + " data_tool = ASODownload(short_name, version)\n", + " time_start = '2013-04-02T00:00:00Z'\n", + " time_end = '2019-07-19T23:59:59Z'\n", + " \n", + " selected_region = data_tool.select_region() # Call select_region on the instance\n", + " directory = \"SWE_Data\"\n", + "\n", + " print(f\"Fetching file URLs in progress for {selected_region} from {time_start} to {time_end}\")\n", + " url_list = data_tool.cmr_search(time_start, time_end, data_tool.bounding_box)\n", + " data_tool.cmr_download(directory)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "968b8ea0-b7c7-4c08-8cf7-69398a1493f4", + "metadata": {}, + "outputs": [], + "source": [ + "class ASODataProcessing:\n", + " \n", + " @staticmethod\n", + " def processing_tiff(input_file, output_res):\n", + " try:\n", + " date = os.path.splitext(input_file)[0].split(\"_\")[-1]\n", + " \n", + " # Define the output file path\n", + " output_folder = os.path.join(os.getcwd(), \"Processed_Data\")\n", + " os.makedirs(output_folder, exist_ok=True)\n", + " output_file = os.path.join(output_folder, f\"ASO_100M_{date}.tif\")\n", + " \n", + " ds = gdal.Open(input_file)\n", + " if ds is None:\n", + " print(f\"Failed to open '{input_file}'. Make sure the file is a valid GeoTIFF file.\")\n", + " return None\n", + " \n", + " # Reproject and resample\n", + " gdal.Warp(output_file, ds, dstSRS=\"EPSG:4326\", xRes=output_res, yRes=-output_res, resampleAlg=\"bilinear\")\n", + " \n", + " # Read the processed TIFF file using rasterio\n", + " rds = rxr.open_rasterio(output_file)\n", + " rds = rds.squeeze().drop(\"spatial_ref\").drop(\"band\")\n", + " rds.name = \"data\"\n", + " df = rds.to_dataframe().reset_index()\n", + " return df\n", + " \n", + " except Exception as e:\n", + " print(f\"An error occurred: {str(e)}\")\n", + " return None\n", + " \n", + " @staticmethod\n", + " def convert_tiff_to_csv(input_folder, output_res):\n", + "\n", + " curr_dir = os.getcwd()\n", + " folder_path = os.path.join(curr_dir, input_folder)\n", + " \n", + " # Check if the folder exists and is not empty\n", + " if not os.path.exists(folder_path) or not os.path.isdir(folder_path):\n", + " print(f\"The folder '{input_folder}' does not exist.\")\n", + " return\n", + " \n", + " if not os.listdir(folder_path):\n", + " print(f\"The folder '{input_folder}' is empty.\")\n", + " return\n", + " \n", + " tiff_files = [filename for filename in os.listdir(folder_path) if filename.endswith(\".tif\")]\n", + " \n", + " # Create CSV files from TIFF files\n", + " for tiff_filename in tiff_files:\n", + " \n", + " # Open the TIFF file\n", + " tiff_filepath = os.path.join(folder_path, tiff_filename)\n", + " df = ASODataProcessing.processing_tiff(tiff_filepath, output_res)\n", + " \n", + " if df is not None:\n", + " # Get the date from the TIFF filename\n", + " date = os.path.splitext(tiff_filename)[0].split(\"_\")[-1]\n", + " \n", + " # Define the CSV filename and folder\n", + " csv_filename = f\"ASO_SWE_{date}.csv\"\n", + " csv_folder = os.path.join(curr_dir, \"Processed_Data\", \"SWE_csv\")\n", + " os.makedirs(csv_folder, exist_ok=True)\n", + " csv_filepath = os.path.join(csv_folder, csv_filename)\n", + " \n", + " # Save the DataFrame as a CSV file\n", + " df.to_csv(csv_filepath, index=False)\n", + " \n", + " print(f\"Converted '{tiff_filename}' to '{csv_filename}'\")\n", + " \n", + " def create_polygon(self, row):\n", + " return Polygon([(row['BL_Coord_Long'], row['BL_Coord_Lat']),\n", + " (row['BR_Coord_Long'], row['BR_Coord_Lat']),\n", + " (row['UR_Coord_Long'], row['UR_Coord_Lat']),\n", + " (row['UL_Coord_Long'], row['UL_Coord_Lat'])])\n", + "\n", + " def process_folder(self, input_folder, metadata_path, output_folder):\n", + " # Import the metadata into a pandas DataFrame\n", + " pred_obs_metadata_df = pd.read_csv(metadata_path)\n", + " \n", + " # Assuming create_polygon is defined elsewhere, we add a column with polygon geometries\n", + " pred_obs_metadata_df = pred_obs_metadata_df.drop(columns=['Unnamed: 0'], axis=1)\n", + " pred_obs_metadata_df['geometry'] = pred_obs_metadata_df.apply(self.create_polygon, axis=1)\n", + " \n", + " # Convert the DataFrame to a GeoDataFrame\n", + " metadata = gpd.GeoDataFrame(pred_obs_metadata_df, geometry='geometry')\n", + " \n", + " # Drop coordinates columns\n", + " metadata_df = metadata.drop(columns=['BL_Coord_Long', 'BL_Coord_Lat', \n", + " 'BR_Coord_Long', 'BR_Coord_Lat', \n", + " 'UR_Coord_Long', 'UR_Coord_Lat', \n", + " 'UL_Coord_Long', 'UL_Coord_Lat'], axis=1)\n", + " \n", + " # List all CSV files in the input folder\n", + " csv_files = [f for f in os.listdir(input_folder) if f.endswith('.csv')]\n", + " \n", + " for csv_file in csv_files:\n", + " input_aso_path = os.path.join(input_folder, csv_file)\n", + " output_aso_path = os.path.join(output_folder, csv_file)\n", + " \n", + " # Check if the output file already exists\n", + " if os.path.exists(output_aso_path):\n", + " print(f\"CSV file {csv_file} already exists in the output folder.\")\n", + " continue\n", + " \n", + " # Process each CSV file\n", + " aso_swe_df = pd.read_csv(input_aso_path)\n", + " \n", + " # Convert the \"aso_swe_df\" into a GeoDataFrame with point geometries\n", + " geometry = [Point(xy) for xy in zip(aso_swe_df['x'], aso_swe_df['y'])]\n", + " aso_swe_geo = gpd.GeoDataFrame(aso_swe_df, geometry=geometry)\n", + "\n", + " result = gpd.sjoin(aso_swe_geo, metadata_df, how='left', predicate='within', op = 'intersects')\n", + " \n", + " # Select specific columns for the final DataFrame\n", + " Final_df = result[['y', 'x', 'data', 'cell_id']]\n", + " Final_df.rename(columns={'data': 'swe'}, inplace=True)\n", + " \n", + " # Drop rows where 'cell_id' is NaN\n", + " if Final_df['cell_id'].isnull().values.any():\n", + " Final_df = Final_df.dropna(subset=['cell_id'])\n", + " \n", + " # Save the processed DataFrame to a CSV file\n", + " Final_df.to_csv(output_aso_path, index=False)\n", + " print(f\"Processed {csv_file}\")\n", + " \n", + " def converting_ASO_to_standardized_format(self, input_folder, output_csv):\n", + " \n", + " # Initialize an empty DataFrame to store the final transformed data\n", + " final_df = pd.DataFrame()\n", + " \n", + " # Iterate through all CSV files in the directory\n", + " for filename in os.listdir(input_folder):\n", + " if filename.endswith(\".csv\"):\n", + " file_path = os.path.join(input_folder, filename)\n", + " \n", + " # Extract the time frame from the filename\n", + " time_frame = filename.split('_')[-1].split('.')[0]\n", + " \n", + " # Read the CSV file into a DataFrame\n", + " df = pd.read_csv(file_path)\n", + " \n", + " # Rename the 'SWE' column to the time frame for clarity\n", + " df = df.rename(columns={'SWE': time_frame})\n", + " \n", + " # Merge or concatenate the data into the final DataFrame\n", + " if final_df.empty:\n", + " final_df = df\n", + " else:\n", + " final_df = pd.merge(final_df, df, on='cell_id', how='outer')\n", + " \n", + " # Save the final transformed DataFrame to a single CSV file\n", + " final_df.to_csv(output_csv, index=False)\n", + " \n", + "if __name__ == \"__main__\":\n", + " \n", + " #data_processor = ASODataProcessing()\n", + " #folder_name = \"SWE_Data\"\n", + " #output_res = 0.001\n", + " data_processor.convert_tiff_to_csv(folder_name, output_res)\n", + " input_folder = r\"/home/vgindi/Processed_Data/SWE_csv\"\n", + " metadata_path = r\"/home/vgindi/Provided_Data/grid_cells_meta.csv\"\n", + " output_folder = r\"/home/vgindi/Processed_SWE\"\n", + "\n", + " data_processor.process_folder(input_folder, metadata_path, output_folder)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f4c91c7a-d327-4dbf-8669-1e667c0e4428", + "metadata": {}, + "outputs": [], + "source": [ + "def load_aso_snotel_geometry(aso_swe_file, folder_path):\n", + " \n", + " aso_file = pd.read_csv(os.path.join(folder_path, aso_swe_file))\n", + " aso_file.set_index('cell_id', inplace=True)\n", + " aso_geometry = [Point(xy) for xy in zip(aso_file['x'], aso_file['y'])]\n", + " aso_gdf = gpd.GeoDataFrame(aso_file, geometry=aso_geometry)\n", + " \n", + " return aso_gdf\n", + "\n", + "def haversine_vectorized(lat1, lon1, lat2, lon2):\n", + " \n", + " lon1 = np.radians(lon1)\n", + " lon2 = np.radians(lon2)\n", + " lat1 = np.radians(lat1)\n", + " lat2 = np.radians(lat2)\n", + "\n", + " # Haversine formula\n", + " dlat = lat2 - lat1\n", + " dlon = lon2 - lon1\n", + " a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2\n", + " c = 2 * np.arcsin(np.sqrt(a))\n", + " \n", + " r = 6371.0\n", + " # Distance calculation\n", + " distances = r * c\n", + "\n", + " return distances\n", + "\n", + "def calculate_nearest_snotel(aso_gdf, snotel_gdf, n=6, distance_cache=None):\n", + "\n", + " if distance_cache is None:\n", + " distance_cache = {}\n", + "\n", + " nearest_snotel = {}\n", + " for idx, aso_row in aso_gdf.iterrows():\n", + " cell_id = idx\n", + "\n", + " # Check if distances for this cell_id are already calculated and cached\n", + " if cell_id in distance_cache:\n", + " nearest_snotel[idx] = distance_cache[cell_id]\n", + " else:\n", + " # Calculate Haversine distances between the grid cell and all SNOTEL locations\n", + " distances = haversine_vectorized(\n", + " aso_row.geometry.y, aso_row.geometry.x,\n", + " snotel_gdf.geometry.y.values, snotel_gdf.geometry.x.values)\n", + "\n", + " # Store the nearest stations in the cache\n", + " nearest_snotel[idx] = list(snotel_gdf['station_id'].iloc[distances.argsort()[:n]])\n", + " distance_cache[cell_id] = nearest_snotel[idx]\n", + "\n", + " return nearest_snotel, distance_cache\n", + "\n", + "def calculate_distances_for_cell(aso_row, snotel_gdf, n=6):\n", + " \n", + " distances = haversine_vectorized(\n", + " aso_row.geometry.y, aso_row.geometry.x,\n", + " snotel_gdf.geometry.y.values, snotel_gdf.geometry.x.values)\n", + " \n", + " nearest_sites = list(snotel_gdf['station_id'].iloc[distances.argsort()[:n]])\n", + " \n", + " return nearest_sites\n", + "\n", + "def calculate_nearest_snotel_parallel(aso_gdf, snotel_gdf, n = 6, distance_cache = None):\n", + " \n", + " if distance_cache is None:\n", + " distance_cache = {}\n", + "\n", + " nearest_snotel = {}\n", + " with ProcessPoolExecutor(max_workers = 16) as executor:\n", + " futures = []\n", + " \n", + " for idx, aso_row in aso_gdf.iterrows():\n", + " if idx not in distance_cache:\n", + " # Submit the task for parallel execution\n", + " futures.append(executor.submit(calculate_distances_for_cell, aso_row, snotel_gdf, n))\n", + " else:\n", + " nearest_snotel[idx] = distance_cache[idx]\n", + "\n", + " # Retrieve results as they are completed\n", + " for future in tqdm(futures):\n", + " result = future.result()\n", + " \n", + " cell_id = result[0] \n", + " nearest_snotel[cell_id] = result[1]\n", + " distance_cache[cell_id] = result[1]\n", + "\n", + " return nearest_snotel, distance_cache\n", + "\n", + "def fetch_snotel_sites_for_cellids(aso_swe_files_folder_path, metadata_path, snotel_data_path):\n", + " \n", + " metadata_df = pd.read_csv(metadata_path)\n", + " #metadata_df['geometry'] = metadata_df['geometry'].apply(wkt.loads)\n", + " \n", + " def create_polygon(row):\n", + " return Polygon([(row['BL_Coord_Long'], row['BL_Coord_Lat']),\n", + " (row['BR_Coord_Long'], row['BR_Coord_Lat']),\n", + " (row['UR_Coord_Long'], row['UR_Coord_Lat']),\n", + " (row['UL_Coord_Long'], row['UL_Coord_Lat'])])\n", + " \n", + " metadata_df = metadata_df.drop(columns=['Unnamed: 0'], axis=1)\n", + " metadata_df['geometry'] = metadata_df.apply(create_polygon, axis=1)\n", + " \n", + " metadata = gpd.GeoDataFrame(metadata_df, geometry='geometry')\n", + " snotel_data = pd.read_csv(snotel_data_path)\n", + "\n", + " date_columns = snotel_data.columns[1:]\n", + " new_column_names = {col: pd.to_datetime(col, format='%Y-%m-%d').strftime('%Y%m%d') for col in date_columns}\n", + " snotel_data_f = snotel_data.rename(columns=new_column_names)\n", + "\n", + " snotel_file = pd.read_csv(\"/home/vgindi/Provided_Data/ground_measures_metadata.csv\")\n", + " snotel_geometry = [Point(xy) for xy in zip(snotel_file['longitude'], snotel_file['latitude'])]\n", + " snotel_gdf = gpd.GeoDataFrame(snotel_file, geometry=snotel_geometry)\n", + "\n", + " final_df = pd.DataFrame()\n", + "\n", + " for aso_swe_file in os.listdir(aso_swe_files_folder_path):\n", + "\n", + " if os.path.isdir(os.path.join(aso_swe_files_folder_path, aso_swe_file)):\n", + " continue\n", + "\n", + " timestamp = aso_swe_file.split('_')[-1].split('.')[0]\n", + " print(f\"Processing file with timestamp: {timestamp}\")\n", + "\n", + " aso_gdf = load_aso_snotel_geometry(aso_swe_file, aso_swe_files_folder_path)\n", + " aso_swe_data = pd.read_csv(os.path.join(aso_swe_files_folder_path, aso_swe_file))\n", + "\n", + " # Calculating nearest SNOTEL sites\n", + " nearest_snotel, distance_cache = calculate_nearest_snotel(aso_gdf, snotel_gdf, n=6)\n", + " print(f\"calculated nearest snotel for file with timestamp {timestamp}\")\n", + "\n", + " transposed_data = {}\n", + "\n", + " if timestamp in new_column_names.values():\n", + " for idx, aso_row in aso_gdf.iterrows(): \n", + " cell_id = idx\n", + " station_ids = nearest_snotel[cell_id]\n", + " selected_snotel_data = snotel_data_f[['station_id', timestamp]].loc[snotel_data_f['station_id'].isin(station_ids)]\n", + " station_mapping = {old_id: f\"nearest site {i+1}\" for i, old_id in enumerate(station_ids)}\n", + " \n", + " # Rename the station IDs in the selected SNOTEL data\n", + " selected_snotel_data['station_id'] = selected_snotel_data['station_id'].map(station_mapping)\n", + "\n", + " # Transpose and set the index correctly\n", + " transposed_data[cell_id] = selected_snotel_data.set_index('station_id').T\n", + "\n", + " transposed_df = pd.concat(transposed_data, axis=0)\n", + " \n", + " # Reset index and rename columns\n", + " transposed_df = transposed_df.reset_index()\n", + " transposed_df.rename(columns={'level_0': 'cell_id', 'level_1': 'Date'}, inplace = True)\n", + " transposed_df['Date'] = pd.to_datetime(transposed_df['Date'])\n", + " \n", + " aso_swe_data['Date'] = pd.to_datetime(timestamp)\n", + " aso_swe_data = aso_swe_data[['cell_id', 'Date', 'swe']]\n", + " merged_df = pd.merge(aso_swe_data, transposed_df, how='left', on=['cell_id', 'Date'])\n", + " \n", + " final_df = pd.concat([final_df, merged_df], ignore_index=True)\n", + " \n", + " else:\n", + " aso_swe_data['Date'] = pd.to_datetime(timestamp)\n", + " aso_swe_data = aso_swe_data[['cell_id', 'Date', 'swe']]\n", + " \n", + " # No need to merge in this case, directly concatenate\n", + " final_df = pd.concat([final_df, aso_swe_data], ignore_index=True)\n", + "\n", + "\n", + " # Merge with metadata\n", + " req_cols = ['cell_id', 'lat', 'lon', 'BR_Coord_Long', 'BR_Coord_Lat', 'UR_Coord_Long', 'UR_Coord_Lat',\n", + " 'UL_Coord_Long', 'UL_Coord_Lat', 'BL_Coord_Long', 'BL_Coord_Lat', 'geometry']\n", + " Result = final_df.merge(metadata[req_cols], how='left', on='cell_id')\n", + "\n", + " # Column renaming and ordering\n", + " Result.rename(columns={'swe': 'ASO_SWE_in'}, inplace=True)\n", + " Result = Result[['cell_id', 'Date', 'ASO_SWE_in', 'lat', 'lon', 'nearest site 1', 'nearest site 2',\n", + " 'nearest site 3', 'nearest site 4', 'nearest site 5', 'nearest site 6',\n", + " 'BR_Coord_Long', 'BR_Coord_Lat', 'UR_Coord_Long', 'UR_Coord_Lat',\n", + " 'UL_Coord_Long', 'UL_Coord_Lat', 'BL_Coord_Long', 'BL_Coord_Lat']]\n", + "\n", + " # Save the merged data to a new file\n", + " output_filename = r\"/home/vgindi/Provided_Data/Merged_aso_snotel_data.csv\"\n", + " Result.to_csv(output_filename, index=False)\n", + " print(\"Processed and saved data\")\n", + " \n", + "def main():\n", + " aso_swe_files_folder_path = r\"/home/vgindi/Processed_SWE\"\n", + " metadata_path = r\"/home/vgindi/Provided_Data/grid_cells_meta_idx.csv\"\n", + " snotel_data_path = r\"/home/vgindi/Provided_Data/ground_measures_train_featuresALLDATES.parquet\"\n", + " fetch_snotel_sites_for_cellids(aso_swe_files_folder_path, metadata_path, snotel_data_path)\n", + "\n", + "if __name__ == \"__main__\":\n", + " main()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a6c69770-62f8-4f8f-88ee-ffaac26aaa44", + "metadata": {}, + "outputs": [], + "source": [ + "Result = pd.read_csv(r'/home/vgindi/Provided_Data/Merged_aso_snotel_data.csv')\n", + "Result.head(10)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "29cb0f3a-7713-45d2-881f-574037b3a5fd", + "metadata": {}, + "outputs": [], + "source": [ + "\"\"\"\n", + "A Simple implementation of parallel processing using concurrency it takes so long to execute,\n", + "Explore terrain_daskconcurrency and terrain-processing_cluster python for more optimized implementations.\n", + "\"\"\"\n", + "\n", + "def process_single_location(args):\n", + " lat, lon, regions, tiles = args\n", + "\n", + " if (lat, lon) in elevation_cache:\n", + " elev, slop, asp = elevation_cache[(lat, lon)]\n", + " return elev, slop, asp\n", + "\n", + " tile_id = 'Copernicus_DSM_COG_30_N' + str(math.floor(lon)) + '_00_W' + str(math.ceil(abs(lat))) + '_00_DEM'\n", + " index_id = regions.loc[tile_id]['sliceID']\n", + "\n", + " signed_asset = planetary_computer.sign(tiles[index_id].assets[\"data\"])\n", + " #print(signed_asset)\n", + " elevation = rxr.open_rasterio(signed_asset.href)\n", + " \n", + " slope = elevation.copy()\n", + " aspect = elevation.copy()\n", + "\n", + " transformer = Transformer.from_crs(\"EPSG:4326\", elevation.rio.crs, always_xy=True)\n", + " xx, yy = transformer.transform(lon, lat)\n", + "\n", + " tilearray = np.around(elevation.values[0]).astype(int)\n", + " #print(tilearray)\n", + " geo = (math.floor(float(lon)), 90, 0.0, math.ceil(float(lat)), 0.0, -90)\n", + "\n", + " no_data_value = -9999\n", + " driver = gdal.GetDriverByName('MEM')\n", + " temp_ds = driver.Create('', tilearray.shape[1], tilearray.shape[0], 1, gdalconst.GDT_Float32)\n", + "\n", + " temp_ds.GetRasterBand(1).WriteArray(tilearray)\n", + " temp_ds.GetRasterBand(1).SetNoDataValue(no_data_value)\n", + " temp_ds.SetProjection('EPSG:4326')\n", + " temp_ds.SetGeoTransform(geo)\n", + "\n", + " tilearray_np = temp_ds.GetRasterBand(1).ReadAsArray()\n", + " slope_arr, aspect_arr = np.gradient(tilearray_np)\n", + " aspect_arr = np.rad2deg(np.arctan2(aspect_arr[0], aspect_arr[1]))\n", + " \n", + " slope.values[0] = slope_arr\n", + " aspect.values[0] = aspect_arr\n", + "\n", + " elev = round(elevation.sel(x=xx, y=yy, method=\"nearest\").values[0])\n", + " slop = round(slope.sel(x=xx, y=yy, method=\"nearest\").values[0])\n", + " asp = round(aspect.sel(x=xx, y=yy, method=\"nearest\").values[0])\n", + "\n", + " elevation_cache[(lat, lon)] = (elev, slop, asp) \n", + " return elev, slop, asp\n", + "\n", + "def extract_terrain_data_threaded(metadata_df, bounding_box, max_workers=10):\n", + " global elevation_cache \n", + "\n", + " elevation_cache = {} \n", + " min_x, min_y, max_x, max_y = *bounding_box[0], *bounding_box[1]\n", + " \n", + " client = Client.open(\n", + " \"https://planetarycomputer.microsoft.com/api/stac/v1\",\n", + " ignore_conformance=True,\n", + " )\n", + "\n", + " search = client.search(\n", + " collections=[\"cop-dem-glo-90\"],\n", + " intersects = {\n", + " \"type\": \"Polygon\",\n", + " \"coordinates\": [[\n", + " [min_x, min_y],\n", + " [max_x, min_y],\n", + " [max_x, max_y],\n", + " [min_x, max_y],\n", + " [min_x, min_y] \n", + " ]]})\n", + "\n", + " tiles = list(search.items())\n", + "\n", + " regions = []\n", + "\n", + " print(\"Retrieving Copernicus 90m DEM tiles\")\n", + " for i in tqdm(range(0, len(tiles))):\n", + " row = [i, tiles[i].id]\n", + " regions.append(row)\n", + " regions = pd.DataFrame(columns = ['sliceID', 'tileID'], data = regions)\n", + " regions = regions.set_index(regions['tileID'])\n", + " del regions['tileID']\n", + "\n", + " print(\"Interpolating Grid Cell Spatial Features\")\n", + "\n", + " with ThreadPoolExecutor(max_workers=max_workers) as executor:\n", + " futures = [executor.submit(process_single_location, (metadata_df.iloc[i]['cen_lat'], metadata_df.iloc[i]['cen_lon'], regions, tiles))\n", + " for i in tqdm(range(len(metadata_df)))]\n", + " \n", + " results = []\n", + " for future in tqdm(as_completed(futures), total=len(futures)):\n", + " results.append(future.result())\n", + " \n", + " metadata_df['Elevation_m'], metadata_df['Slope_Deg'], metadata_df['Aspect_L'] = zip(*results)\n", + "\n", + "metadata_df = pd.read_csv(r\"/home/vgindi/Provided_Data/Merged_aso_nearest_sites1.csv\")\n", + "metadata_df= metadata_df.head(20)\n", + "bounding_box = ((-120.3763448720203, 36.29256774541929), (-118.292253412863, 38.994985247736324)) \n", + " \n", + "extract_terrain_data_threaded(metadata_df, bounding_box)\n", + "\n", + "# Display the results\n", + "metadata_df.head(10)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f714b0f0-1c38-4ba3-8aed-1ca6b97c2d10", + "metadata": {}, + "outputs": [], + "source": [ + "\"\"\"\n", + "This code block crops the global coverage VIIRS data to south sierras subregion. \n", + "\"\"\"\n", + "\n", + "def crop_sierras(input_file_path, output_file_path, shapes):\n", + " with rasterio.open(input_file_path) as src:\n", + " out_image, out_transform = rasterio.mask.mask(src, shapes, crop=True)\n", + " out_meta = src.out_meta\n", + " out_meta.update({\"driver\": \"GTiff\",\n", + " \"height\": out_image.shape[1],\n", + " \"width\": out_image.shape[2],\n", + " \"transform\": out_transform})\n", + " \n", + " with rasterio.open(output_file_path, \"w\", **out_meta) as dest:\n", + " dest.write(out_image)\n", + "\n", + "def download_viirs_sca(input_dir, output_dir, shapefile_path):\n", + " \n", + " # Load shapes from the shapefile\n", + " with fiona.open(shapefile_path, 'r') as shapefile:\n", + " shapes = [feature[\"geometry\"] for feature in shapefile]\n", + " \n", + " # Iterate through each year directory in the input directory\n", + " for year_folder in os.listdir(input_dir):\n", + " year_folder_path = os.path.join(input_dir, year_folder)\n", + " if os.path.isdir(year_folder_path):\n", + " # Extract year from the folder name (assuming folder names like 'WY2013')\n", + " year = re.search(r'\\d{4}', year_folder).group()\n", + " output_year_folder = os.path.join(output_dir, year)\n", + " os.makedirs(output_year_folder, exist_ok=True)\n", + " \n", + " for file_name in os.listdir(year_folder_path): \n", + " if file_name.endswith('.tif'): \n", + " parts = file_name.split('_')\n", + " output_file_name = '_'.join(parts[:3]) + '.tif'\n", + " output_file_path = os.path.join(output_year_folder, output_file_name)\n", + " input_file_path = os.path.join(year_folder_path, file_name)\n", + " crop_sierras(input_file_path, output_file_path, shapes)\n", + " print(f\"Processed and saved {output_file_path}\")\n", + "\n", + "if __name__ == \"__main__\":\n", + " \n", + " input_directory = r\"/home/vgindi/VIIRS_Data\"\n", + " output_directory = r\"/home/vgindi/VIIRS_Sierras\"\n", + " shapefile_path = r\"/home/vgindi/Provided_Data/low_sierras_points.shp\"\n", + " download_viirs_sca(input_directory, output_directory, shapefile_path)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d7ab2744-b080-48c8-bb77-f4b9c14ca774", + "metadata": {}, + "outputs": [], + "source": [ + "\"\"\"\n", + "This code cell transforms the raw VIIRS tiff files to 100m resolution and saves each file in .csv format\n", + "\"\"\"\n", + "def processing_VIIRS(input_file, output_res):\n", + " try:\n", + " # Define the output file path for TIFFs using the original file name\n", + " output_folder_tiff = os.path.join(\"/home/vgindi/Processed_VIIRS\", os.path.basename(os.path.dirname(input_file)))\n", + " os.makedirs(output_folder_tiff, exist_ok=True)\n", + " output_file = os.path.join(output_folder_tiff, os.path.basename(input_file))\n", + "\n", + " # Reproject and resample\n", + " ds = gdal.Open(input_file)\n", + " if ds is None:\n", + " print(f\"Failed to open '{input_file}'. Make sure the file is a valid GeoTIFF file.\")\n", + " return None\n", + " \n", + " gdal.Warp(output_file, ds, dstSRS=\"EPSG:4326\", xRes=output_res, yRes=-output_res, resampleAlg=\"bilinear\")\n", + "\n", + " # Read the processed TIFF file using rasterio\n", + " rds = rxr.open_rasterio(output_file)\n", + " rds = rds.squeeze().drop(\"spatial_ref\").drop(\"band\")\n", + " rds.name = \"data\"\n", + " df = rds.to_dataframe().reset_index()\n", + " return df\n", + " except Exception as e:\n", + " print(f\"An error occurred: {str(e)}\")\n", + " return None\n", + "\n", + "def process_and_convert_viirs(input_dir, output_res):\n", + " # Iterate over subdirectories in the input directory\n", + " for year in os.listdir(input_dir):\n", + " year_dir = os.path.join(input_dir, year)\n", + " \n", + " if os.path.isdir(year_dir):\n", + " for file_name in os.listdir(year_dir):\n", + " if file_name.endswith('.tif'):\n", + " input_file_path = os.path.join(year_dir, file_name)\n", + " df = processing_VIIRS(input_file_path, output_res)\n", + " \n", + " if df is not None:\n", + " csv_folder = os.path.join(\"/home/vgindi/Processed_VIIRS\", \"VIIRS_csv\")\n", + " os.makedirs(csv_folder, exist_ok=True)\n", + " csv_file_path = os.path.join(csv_folder, file_name.replace('.tif', '.csv'))\n", + " \n", + " df.to_csv(csv_file_path, index=False)\n", + " print(f\"Processed and saved {csv_file_path}\")\n", + "\n", + "if __name__ == \"__main__\":\n", + " input_directory = \"/home/vgindi/VIIRS_Sierras\"\n", + " output_res = 100 # Desired resolution in meters\n", + " process_and_convert_viirs(input_directory, output_res)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c3e2831c-817d-40b5-a2e0-d9ebff8a5672", + "metadata": {}, + "outputs": [], + "source": [ + "\"\"\"\n", + "This code cell fetches the cell id using grid_cells_meta_idx metadata for each lat/lon pair for VIIRS csv file\n", + "\"\"\"\n", + "def create_polygon(self, row):\n", + " return Polygon([(row['BL_Coord_Long'], row['BL_Coord_Lat']),\n", + " (row['BR_Coord_Long'], row['BR_Coord_Lat']),\n", + " (row['UR_Coord_Long'], row['UR_Coord_Lat']),\n", + " (row['UL_Coord_Long'], row['UL_Coord_Lat'])])\n", + " \n", + "def process_folder(self, input_folder, metadata_path, output_folder):\n", + " # Import the metadata into a pandas DataFrame\n", + " pred_obs_metadata_df = pd.read_csv(metadata_path)\n", + "\n", + " # Assuming create_polygon is defined elsewhere, we add a column with polygon geometries\n", + " pred_obs_metadata_df = pred_obs_metadata_df.drop(columns=['Unnamed: 0'], axis=1)\n", + " pred_obs_metadata_df['geometry'] = pred_obs_metadata_df.apply(self.create_polygon, axis=1)\n", + "\n", + " # Convert the DataFrame to a GeoDataFrame\n", + " metadata = gpd.GeoDataFrame(pred_obs_metadata_df, geometry='geometry')\n", + "\n", + " # Drop coordinates columns\n", + " metadata = metadata.drop(columns=['BL_Coord_Long', 'BL_Coord_Lat', \n", + " 'BR_Coord_Long', 'BR_Coord_Lat', \n", + " 'UR_Coord_Long', 'UR_Coord_Lat', \n", + " 'UL_Coord_Long', 'UL_Coord_Lat'], axis=1)\n", + "\n", + " # List all CSV files in the input folder\n", + " csv_files = [f for f in os.listdir(input_folder) if f.endswith('.csv')]\n", + "\n", + " for csv_file in csv_files:\n", + " input_path = os.path.join(input_folder, csv_file)\n", + " output_path = os.path.join(output_folder, csv_file)\n", + "\n", + " # Check if the output file already exists\n", + " if os.path.exists(output_path):\n", + " print(f\"CSV file {csv_file} already exists in the output folder.\")\n", + " continue\n", + "\n", + " # Process each CSV file\n", + " viirs_sca_df = pd.read_csv(input_path)\n", + "\n", + " # Convert the \"aso_swe_df\" into a GeoDataFrame with point geometries\n", + " geometry = [Point(xy) for xy in zip(viirs_sca_df['x'], viirs_sca_df['y'])]\n", + " viirs_sca_geo = gpd.GeoDataFrame(viirs_sca_df, geometry=geometry)\n", + " result = gpd.sjoin(viirs_sca_geo, metadata, how='left', predicate='within', op = 'intersects')\n", + "\n", + " # Select specific columns for the final DataFrame\n", + " Final_df = result[['y', 'x', 'data', 'cell_id']]\n", + " Final_df.rename(columns={'data': 'VIIRS_SCA'}, inplace=True)\n", + "\n", + " # Drop rows where 'cell_id' is NaN\n", + " if Final_df['cell_id'].isnull().values.any():\n", + " Final_df = Final_df.dropna(subset=['cell_id'])\n", + "\n", + " # Save the processed DataFrame to a CSV file\n", + " Final_df.to_csv(output_path, index=False)\n", + " print(f\"Processed {csv_file}\")\n", + "\n", + "if __name__ == \"__main__\":\n", + " input_folder = r\"\"\n", + " metadata_path = r\"\"\n", + " output_folder = r\"\"\n", + " process_folder(input_folder, metadata_path, output_folder)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "NSM", + "language": "python", + "name": "nsm" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.18" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/Data_Processing_NSMv2.0/NSIDC_Data.py b/Data_Processing_NSMv2.0/NSIDC_Data.py new file mode 100644 index 0000000..f69ae85 --- /dev/null +++ b/Data_Processing_NSMv2.0/NSIDC_Data.py @@ -0,0 +1,350 @@ +from __future__ import print_function + +import base64 +import getopt +import itertools +import json +import math +import netrc +import os.path +import ssl +import sys +import time +import getpass +import shutil + +try: + from urllib.parse import urlparse + from urllib.request import urlopen, Request, build_opener, HTTPCookieProcessor + from urllib.error import HTTPError, URLError +except ImportError: + from urlparse import urlparse + from urllib2 import urlopen, Request, HTTPError, URLError, build_opener, HTTPCookieProcessor + +import netrc +from urllib.parse import urlparse + +"""" +short_name = 'ASO_50M_SWE' +version = '1' +time_start = '2013-04-03T00:00:00Z' +time_end = '2019-07-16T23:59:59Z' +bounding_box = '-123.34078531,33.35825379,-105.07803558,48.97106571' +polygon = '' +filename_filter = '' +url_list = [] """ + +CMR_URL = 'https://cmr.earthdata.nasa.gov' +URS_URL = 'https://urs.earthdata.nasa.gov' +CMR_PAGE_SIZE = 2000 +CMR_FILE_URL = ('{0}/search/granules.json?provider=NSIDC_ECS' + '&sort_key[]=start_date&sort_key[]=producer_granule_id' + '&scroll=true&page_size={1}'.format(CMR_URL, CMR_PAGE_SIZE)) + +def get_login_credentials(): + try: + info = netrc.netrc() + username, password = info.authenticators(URS_URL) + credentials = f'{username}:{password}' + credentials = base64.b64encode(credentials.encode('ascii')).decode('ascii') + except Exception: + username = input("Earthdata Login Username: ") + password = getpass.getpass("Earthdata Login Password: ") + credentials = f'{username}:{password}' + credentials = base64.b64encode(credentials.encode('ascii')).decode('ascii') + return credentials + +def build_version_query_params(version): + desired_pad_length = 3 + if len(version) > desired_pad_length: + print('Version string too long: "{0}"'.format(version)) + quit() + + version = str(int(version)) # Strip off any leading zeros + query_params = '' + + while len(version) <= desired_pad_length: + padded_version = version.zfill(desired_pad_length) + query_params += '&version={0}'.format(padded_version) + desired_pad_length -= 1 + return query_params + +def filter_add_wildcards(filter): + if not filter.startswith('*'): + filter = '*' + filter + if not filter.endswith('*'): + filter = filter + '*' + return filter + +def build_filename_filter(filename_filter): + filters = filename_filter.split(',') + result = '&options[producer_granule_id][pattern]=true' + for filter in filters: + result += '&producer_granule_id[]=' + filter_add_wildcards(filter) + return result + +def build_cmr_query_url(short_name, version, time_start, time_end, + bounding_box, polygon=None, + filename_filter=None): + params = '&short_name={0}'.format(short_name) + params += build_version_query_params(version) + params += '&temporal[]={0},{1}'.format(time_start, time_end) + if polygon: + params += '&polygon={0}'.format(polygon) + elif bounding_box: + params += '&bounding_box={0}'.format(bounding_box) + if filename_filter: + params += build_filename_filter(filename_filter) + return CMR_FILE_URL + params + +def get_speed(time_elapsed, chunk_size): + if time_elapsed <= 0: + return '' + speed = chunk_size / time_elapsed + if speed <= 0: + speed = 1 + size_name = ('', 'k', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y') + i = int(math.floor(math.log(speed, 1000))) + p = math.pow(1000, i) + return '{0:.1f}{1}B/s'.format(speed / p, size_name[i]) + +def output_progress(count, total, status='', bar_len=60): + if total <= 0: + return + fraction = min(max(count / float(total), 0), 1) + filled_len = int(round(bar_len * fraction)) + percents = int(round(100.0 * fraction)) + bar = '=' * filled_len + ' ' * (bar_len - filled_len) + fmt = ' [{0}] {1:3d}% {2} '.format(bar, percents, status) + print('\b' * (len(fmt) + 4), end='') # clears the line + sys.stdout.write(fmt) + sys.stdout.flush() + +def cmr_read_in_chunks(file_object, chunk_size=1024 * 1024): + """Read a file in chunks using a generator. Default chunk size: 1Mb.""" + while True: + data = file_object.read(chunk_size) + if not data: + break + yield data + +def get_login_response(url, credentials): + opener = build_opener(HTTPCookieProcessor()) + req = Request(url) + + if credentials: + req.add_header('Authorization', 'Basic {0}'.format(credentials)) + + try: + response = opener.open(req) + except HTTPError as e: + print('HTTP error {0}, {1}'.format(e.code, e.reason)) + sys.exit(1) + except URLError as e: + print('URL error: {0}'.format(e.reason)) + sys.exit(1) + + return response + +def cmr_download(urls, folder, quiet=False): + if not urls: + return + + # Create the target directory if it doesn't exist + if not os.path.exists(folder): + os.makedirs(folder) + + if not quiet: + print('Downloading {0} files to {1}...'.format(len(urls), folder)) + + credentials = get_login_credentials() + print(credentials) + + for index, url in enumerate(urls, start=1): + filename = os.path.join(folder, url.split('/')[-1]) # Specify the full path to the file + if not quiet: + print('{0}/{1}: {2}'.format(str(index).zfill(len(str(len(urls)))), len(urls), filename)) + + try: + response = get_login_response(url, credentials) + length = int(response.headers['content-length']) + count = 0 + chunk_size = min(max(length, 1), 1024 * 1024) + time_initial = time.time() + with open(filename, 'wb') as out_file: + for data in cmr_read_in_chunks(response, chunk_size=chunk_size): + out_file.write(data) + if not quiet: + count = count + 1 + time_elapsed = time.time() - time_initial + download_speed = get_speed(time_elapsed, count * chunk_size) + output_progress(count, math.ceil(length / chunk_size), status=download_speed) + + if not quiet: + print() + except HTTPError as e: + print('HTTP error {0}, {1}'.format(e.code, e.reason)) + except URLError as e: + print('URL error: {0}'.format(e.reason)) + except IOError: + raise + + orgnl_directory = folder + destination_directory = 'SWE_Data_xml' + + if not os.path.exists(destination_directory): + os.makedirs(destination_directory) + + files = os.listdir(orgnl_directory) + + for file in files: + if file.endswith('.xml'): + source_file_path = os.path.join(orgnl_directory, file) + destination_file_path = os.path.join(destination_directory, file) + + # Move the file to the destination directory + shutil.move(source_file_path, destination_file_path) + + print("Files with .xml extension moved to the destination folder.") + + +def cmr_filter_urls(search_results): + """Select only the desired data files from CMR response.""" + if 'feed' not in search_results or 'entry' not in search_results['feed']: + return [] + + entries = [e['links'] + for e in search_results['feed']['entry'] + if 'links' in e] + # Flatten "entries" to a simple list of links + links = list(itertools.chain(*entries)) + + urls = [] + unique_filenames = set() + for link in links: + if 'href' not in link: + # Exclude links with nothing to download + continue + if 'inherited' in link and link['inherited'] is True: + # Why are we excluding these links? + continue + if 'rel' in link and 'data#' not in link['rel']: + # Exclude links which are not classified by CMR as "data" or "metadata" + continue + + if 'title' in link and 'opendap' in link['title'].lower(): + # Exclude OPeNDAP links--they are responsible for many duplicates + # This is a hack; when the metadata is updated to properly identify + # non-datapool links, we should be able to do this in a non-hack way + continue + + filename = link['href'].split('/')[-1] + if filename in unique_filenames: + # Exclude links with duplicate filenames (they would overwrite) + continue + unique_filenames.add(filename) + + urls.append(link['href']) + + return urls + +def cmr_search(short_name, version, time_start, time_end, + bounding_box, polygon='', filename_filter='', quiet=False): + cmr_query_url = build_cmr_query_url(short_name=short_name, version=version, + time_start=time_start, time_end=time_end, + bounding_box=bounding_box, + polygon=polygon, filename_filter=filename_filter) + if not quiet: + print('Querying for data:\n\t{0}\n'.format(cmr_query_url)) + + cmr_scroll_id = None + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + + urls = [] + hits = 0 + while True: + req = Request(cmr_query_url) + if cmr_scroll_id: + req.add_header('cmr-scroll-id', cmr_scroll_id) + try: + response = urlopen(req, context=ctx) + except Exception as e: + print('Error: ' + str(e)) + sys.exit(1) + if not cmr_scroll_id: + # Python 2 and 3 have different case for the http headers + headers = {k.lower(): v for k, v in dict(response.info()).items()} + cmr_scroll_id = headers['cmr-scroll-id'] + hits = int(headers['cmr-hits']) + if not quiet: + if hits > 0: + print('Found {0} matches.'.format(hits)) + else: + print('Found no matches.') + search_page = response.read() + search_page = json.loads(search_page.decode('utf-8')) + url_scroll_results = cmr_filter_urls(search_page) + if not url_scroll_results: + break + if not quiet and hits > CMR_PAGE_SIZE: + print('.', end='') + sys.stdout.flush() + urls += url_scroll_results + print(urls) + + if not quiet and hits > CMR_PAGE_SIZE: + print() + return urls + +def main(argv=None): + global short_name, version, time_start, time_end, bounding_box, \ + polygon, filename_filter, url_list + + if argv is None: + argv = sys.argv[1:] + + force = False + quiet = False + usage = 'usage: nsidc-download_***.py [--help, -h] [--force, -f] [--quiet, -q]' + + try: + opts, args = getopt.getopt(argv, 'hfq', ['help', 'force', 'quiet']) + for opt, _arg in opts: + if opt in ('-f', '--force'): + force = True + elif opt in ('-q', '--quiet'): + quiet = True + elif opt in ('-h', '--help'): + print(usage) + sys.exit(0) + except getopt.GetoptError as e: + print(e.args[0]) + print(usage) + sys.exit(1) + + # Supply some default search parameters, just for testing purposes. + # These are only used if the parameters aren't filled in up above. + if 'short_name' in short_name: + short_name = 'ATL06' + version = '003' + time_start = '2018-10-14T00:00:00Z' + time_end = '2021-01-08T21:48:13Z' + bounding_box = '-123.34078531,33.35825379,-105.07803558,48.97106571' + polygon = '' + filename_filter = '*ATL06_2020111121*' + url_list = [] + + try: + if not url_list: + url_list = cmr_search(short_name, version, time_start, time_end, + bounding_box=bounding_box, polygon=polygon, + filename_filter=filename_filter, quiet=quiet) + + #cmr_download(url_list, force=force, quiet=quiet) + except KeyboardInterrupt: + quit() + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/Data_Processing_NSMv2.0/fetch_terrain_with_dask.py b/Data_Processing_NSMv2.0/fetch_terrain_with_dask.py new file mode 100644 index 0000000..805064d --- /dev/null +++ b/Data_Processing_NSMv2.0/fetch_terrain_with_dask.py @@ -0,0 +1,119 @@ +from dask.distributed import Client, LocalCluster +from concurrent.futures import ProcessPoolExecutor +from dask import delayed +import dask.dataframe as dd +import pandas as pd +import math +from pyproj import Transformer +import rioxarray as rxr +import numpy as np +from osgeo import gdal, gdalconst +from math import floor, ceil, sqrt, atan2, rad2deg +from numpy import rad2deg, arctan2, gradient +import dask_jobqueue +import pystac_client +import planetary_computer +from tqdm import tqdm +from dask.diagnostics import ProgressBar +from retrying import retry + +def process_single_location(args): + lat, lon, elevation = args + + slope = elevation.copy() + aspect = elevation.copy() + + transformer = Transformer.from_crs("EPSG:4326", elevation.rio.crs, always_xy=True) + xx, yy = transformer.transform(lon, lat) + + tilearray = np.around(elevation.values[0]).astype(int) + geo = (math.floor(float(lon)), 90, 0.0, math.ceil(float(lat)), 0.0, -90) + + driver = gdal.GetDriverByName('MEM') + temp_ds = driver.Create('', tilearray.shape[1], tilearray.shape[0], 1, gdalconst.GDT_Float32) + + temp_ds.GetRasterBand(1).WriteArray(tilearray) + + tilearray_np = temp_ds.GetRasterBand(1).ReadAsArray() + grad_y, grad_x = gradient(tilearray_np) + + # Calculate slope and aspect + slope_arr = np.sqrt(grad_x**2 + grad_y**2) + aspect_arr = rad2deg(arctan2(-grad_y, grad_x)) % 360 + + slope.values[0] = slope_arr + aspect.values[0] = aspect_arr + + elev = round(elevation.sel(x=xx, y=yy, method="nearest").values[0]) + slop = round(slope.sel(x=xx, y=yy, method="nearest").values[0]) + asp = round(aspect.sel(x=xx, y=yy, method="nearest").values[0]) + + return elev, slop, asp + +def process_data_in_chunks(tile_group, tiles): + results = [] + + index_id = int(tile_group['index_id'].iloc[0]) + tile = tiles[index_id] + + @retry(stop_max_attempt_number=3, wait_fixed=2000, retry_on_exception=lambda exception: isinstance(exception, IOError)) + def fetch_and_process_elevation(): + signed_asset = planetary_computer.sign(tile.assets["data"]) + elevation = rxr.open_rasterio(signed_asset.href) + + for lat, lon in zip(tile_group['lat'], tile_group['lon']): + try: + result = process_single_location((lat, lon, elevation)) + results.append(result) + except Exception as e: + print(f"Error processing location (lat: {lat}, lon: {lon}) due to {e}. Skipping...") + + fetch_and_process_elevation() + return pd.DataFrame(results, columns=['Elevation_m', 'Slope_Deg', 'Aspect_L']) + +def process_data_in_chunks_dask(tile_group, tiles): + + index_id = int(tile_group['index_id'].iloc[0]) + tile = tiles[index_id] + + signed_asset = planetary_computer.sign(tile.assets["data"]) + elevation = rxr.open_rasterio(signed_asset.href) + + results = [delayed(process_single_location)((lat, lon, elevation)) for lat, lon in zip(tile_group['lat'], tile_group['lon'])] + return pd.DataFrame(results, columns=['Elevation_m', 'Slope_Deg', 'Aspect_L']) + +def extract_terrain_data(df): + + + #cluster = dask_jobqueue.SLURMCluster(local_directory=r'/home/vgindi/slurmcluster') + #cluster.scale(num_workers) + #dask_client = Client(cluster) + + client = pystac_client.Client.open("https://planetarycomputer.microsoft.com/api/stac/v1", ignore_conformance=True) + + area_of_interest = {'type': 'Polygon', + 'coordinates': [[[-120.37693519839556, 36.29213061937931], + [-120.37690215328962, 38.8421802805432], + [-118.29165268221286, 38.84214595220293], + [-118.2917116398743, 36.29209713778364], + [-120.37693519839556, 36.29213061937931]]]} + + search = client.search(collections=["cop-dem-glo-90"], intersects=area_of_interest) + + tiles = list(search.items()) + df = df[['lat', 'lon', 'index_id']] + + # Convert the DataFrame to a Dask DataFrame for distributed computing + dask_df = dd.from_pandas(df, npartitions = df['index_id'].nunique()) + + # Process each partition (grouped by 'index_id') in parallel + results = dask_df.groupby('index_id').apply(lambda group: process_data_in_chunks(group, tiles), + meta=pd.DataFrame(columns=['Elevation_m', 'Slope_Deg', 'Aspect_L'])) + with ProgressBar(): + result_df = results.compute() + + #dask_client.close() + #cluster.close() + + final_df = pd.concat([df.reset_index(drop=True), result_df.reset_index(drop=True)], axis=1) + return result_df diff --git a/Data_Processing_NSMv2.0/generate_gridcells_sierras.py b/Data_Processing_NSMv2.0/generate_gridcells_sierras.py new file mode 100644 index 0000000..d7b7d63 --- /dev/null +++ b/Data_Processing_NSMv2.0/generate_gridcells_sierras.py @@ -0,0 +1,190 @@ +import math +import numpy as np +import pandas as pd +import geopandas as gpd +from pyproj import CRS, Transformer +import planetary_computer +from pystac_client import Client +import planetary_computer +import gdal +from gdal import gdalconst +from shapely import Point, Polygon +from shapely.geometry import box +from concurrent.futures import ThreadPoolExecutor, as_completed + +def generate_grids(bounding_box): + """ + Generates 100m resolution grid cells for low sierras regions uisng bounding box coordinates. Do not run this code if you have the file downloaded + this might take longer to run this. + """ + #gdf_shapefile = gpd.read_file(shapefile_path) + ## Get bounding box coordinates + #minx, miny, maxx, maxy = gdf_shapefile.total_bounds + + minx, miny, maxx, maxy = *bounding_box[0], *bounding_box[1] + bbox_polygon = box(minx, miny, maxx, maxy) + + #buffer the bounds + minx = minx-1 + maxx = maxx+1 + miny = miny-1 + maxy = maxy+1 + + # Define the source and target coordinate reference systems + src_crs = CRS('EPSG:4326') # WGS84 + + if -126 < minx < -120: + # transformer = Transformer.from_crs(src_crs, 'epsg:32610', always_xy=True) + target_crs = CRS('EPSG:32610') #UTM zone 10 + elif -120 < minx < -114: + # transformer = Transformer.from_crs(src_crs, 'epsg:32611', always_xy=True) + target_crs = CRS('EPSG:32611') #UTM zone 11 + elif -114 < minx < -108: + # transformer = Transformer.from_crs(src_crs, 'epsg:32612', always_xy=True) + target_crs = CRS('EPSG:32612') #UTM zone 12 + elif -108 < minx < -102: + # transformer = Transformer.from_crs(src_crs, 'epsg:32613', always_xy=True) + target_crs = CRS('EPSG:32613') #UTM zone 13 + else: + # transformer = Transformer.from_crs(src_crs, target_crs, always_xy=True) + target_crs = CRS('EPSG:3857') #Web Mercator + + transformer = Transformer.from_crs(src_crs, target_crs, always_xy=True) + + # Convert the bounding box coordinates to Web Mercator + minx, miny = transformer.transform(minx, miny) + maxx, maxy = transformer.transform(maxx, maxy) + + # set the grid cell size in meters + cell_size = 100 + + # Calculate the number of cells in x and y directions + num_cells_x = int((maxx-minx)/cell_size) + num_cells_y = int((maxy-miny)/cell_size) + + # Calculate the total grid width and height + grid_width = num_cells_x*cell_size + grid_height = num_cells_y*cell_size + + # Calculate the offset to center the grid + offset_x = ((maxx-minx)-grid_width)/2 + offset_y = ((maxy-miny)-grid_height)/2 + + # Generate latitude and longitude ranges + lon_range = np.linspace(minx + offset_x, maxx - offset_x, num=num_cells_x) + lat_range = np.linspace(miny + offset_y, maxy - offset_y, num=num_cells_y) + + # Create a grid of points + points = [] + for lon in lon_range: + for lat in lat_range: + points.append((lon, lat)) + + # Convert the coordinate pairs back to WGS84 + back_transformer = Transformer.from_crs(target_crs, src_crs, always_xy=True) + target_coordinates = [back_transformer.transform(lon, lat) for lon, lat in points] + + # Create a list of Shapely Point geometries + coords = [Point(lon, lat) for lon, lat in target_coordinates] + + # Create a GeoDataFrame from the points + gdf_points = gpd.GeoDataFrame(geometry=coords) + + # set CRS to WGS84 + gdf_points=gdf_points.set_crs('epsg:4326') + # Clip the points to the shapefile boundary + gdf_clipped_points = gpd.clip(gdf_points, bbox_polygon) + # Specify the output points shapefile path + output_shapefile = r'/home/vgindi/Provided_Data/low_sierras_points.shp'####### + # Export the clipped points to a shapefile + gdf_clipped_points.to_file(output_shapefile) + print("Regional Grid Created") + + #Create Submission format .csv for SWE predictions + gdf_clipped_points.index.names = ['cell_id'] + Geospatial_df = pd.DataFrame() + Geospatial_df['lon']= gdf_clipped_points['geometry'].x + Geospatial_df['lat']= gdf_clipped_points['geometry'].y + + ### Begin process to import geospatial features into DF + min_lon = min(Geospatial_df['lon']) + min_lat = min(Geospatial_df['lat']) + + # Define the source and target coordinate reference systems + src_crs = CRS('EPSG:4326') # WGS84 + if -126 < min_lon < -120: + # transformer = Transformer.from_crs(src_crs, 'epsg:32610', always_xy=True) + target_crs = CRS('EPSG:32610') #UTM zone 10 + elif -120 < min_lon < -114: + # transformer = Transformer.from_crs(src_crs, 'epsg:32611', always_xy=True) + target_crs = CRS('EPSG:32611') #UTM zone 11 + elif -114 < min_lon < -108: + # transformer = Transformer.from_crs(src_crs, 'epsg:32612', always_xy=True) + target_crs = CRS('EPSG:32612') #UTM zone 12 + elif -108 < min_lon < -102: + # transformer = Transformer.from_crs(src_crs, 'epsg:32613', always_xy=True) + target_crs = CRS('EPSG:32613') #UTM zone 13 + else: + # transformer = Transformer.from_crs(src_crs, target_crs, always_xy=True) + target_crs = CRS('EPSG:3857') #Web Mercator + + transformer = Transformer.from_crs(src_crs, target_crs, always_xy=True) + + # Convert the bounding box coordinates to Web Mercator + Geospatial_df['lon_m'], Geospatial_df['lat_m'] = transformer.transform(Geospatial_df['lon'].to_numpy(), Geospatial_df['lat'].to_numpy()) + geocols=['BR_Coord_Long', 'BR_Coord_Lat', 'UR_Coord_Long', 'UR_Coord_Lat', + 'UL_Coord_Long', 'UL_Coord_Lat', 'BL_Coord_Long', 'BL_Coord_Lat'] + + Geospatial_df = Geospatial_df.reindex(columns=[*Geospatial_df.columns.tolist(), *geocols], fill_value=0) + Geospatial_df.reset_index(drop=True, inplace=True) + Geospatial_df = Geospatial_df.assign(BR_Coord_Long=lambda x: x.lon_m + 50, + BR_Coord_Lat=lambda x: x.lat_m - 50, + UR_Coord_Long=lambda x: x.lon_m + 50, + UR_Coord_Lat=lambda x: x.lat_m + 50, + UL_Coord_Long=lambda x: x.lon_m - 50, + UL_Coord_Lat=lambda x: x.lat_m + 50, + BL_Coord_Long=lambda x: x.lon_m - 50, + BL_Coord_Lat=lambda x: x.lat_m - 50,) + + transformer = Transformer.from_crs(target_crs, src_crs, always_xy=True) + #Geospatial_df['lon_m'], Geospatial_df['lat_m'] = transformer.transform(Geospatial_df['lon'].to_numpy(), Geospatial_df['lat'].to_numpy()) + Geospatial_df['BR_Coord_Long'], Geospatial_df['BR_Coord_Lat']=transformer.transform(Geospatial_df['BR_Coord_Long'].to_numpy(), Geospatial_df['BR_Coord_Lat'].to_numpy()) + Geospatial_df['UR_Coord_Long'], Geospatial_df['UR_Coord_Lat']=transformer.transform(Geospatial_df['UR_Coord_Long'].to_numpy(), Geospatial_df['UR_Coord_Lat'].to_numpy()) + Geospatial_df['UL_Coord_Long'], Geospatial_df['UL_Coord_Lat']=transformer.transform(Geospatial_df['UL_Coord_Long'].to_numpy(), Geospatial_df['UL_Coord_Lat'].to_numpy()) + Geospatial_df['BL_Coord_Long'], Geospatial_df['BL_Coord_Lat']=transformer.transform(Geospatial_df['BL_Coord_Long'].to_numpy(), Geospatial_df['BL_Coord_Lat'].to_numpy()) + print(Geospatial_df.columns) + Geospatial_df['cell_id'] = Geospatial_df.apply(lambda x: f"11N_cell_{x['lon']}_{x['lat']}", axis=1) + + client = Client.open( + "https://planetarycomputer.microsoft.com/api/stac/v1", + ignore_conformance=True, + ) + + search = client.search( + collections=["cop-dem-glo-90"], + intersects = { + "type": "Polygon", + "coordinates": [[ + [min_x, min_y], + [max_x, min_y], + [max_x, max_y], + [min_x, max_y], + [min_x, min_y] + ]]}) + + tiles = list(search.items()) + regions = pd.DataFrame([(i, tile.id) for i, tile in enumerate(tiles)], columns=['sliceID', 'tileID']).set_index('tileID') + + Geospatial_df['tile_id'] = Geospatial_df.apply(lambda x: 'Copernicus_DSM_COG_30_N' + str(math.floor(x['lat'])) + '_00_W' + str(math.ceil(abs(x['lon']))) + '_00_DEM', axis=1) + + regions_dict = regions['sliceID'].to_dict() + Geospatial_df['index_id'] = Geospatial_df['tile_id'].map(regions_dict) + Geospatial_df = Geospatial_df.drop(columns = ['tile_id'], axis = 1) + + return Geospatial_df[['cell_id', 'lon', 'lat', 'BR_Coord_Long', 'BR_Coord_Lat', 'UR_Coord_Long', 'UR_Coord_Lat', + 'UL_Coord_Long', 'UL_Coord_Lat', 'BL_Coord_Long', 'BL_Coord_Lat', 'index_id']] + +if __name__ = "__main__": + bounding_box = ((-120.3763448720203, 36.29256774541929), (-118.292253412863, 38.994985247736324)) + grid_cells_meta = generate_grids(bounding_box) + grid_cells_meta.to_csv(r'/home/vgindi/Provided_Data/grid_cells_meta.csv') \ No newline at end of file diff --git a/Data_Processing_NSMv2.0/terrain_daskconcurrency.py b/Data_Processing_NSMv2.0/terrain_daskconcurrency.py new file mode 100644 index 0000000..fc8acd4 --- /dev/null +++ b/Data_Processing_NSMv2.0/terrain_daskconcurrency.py @@ -0,0 +1,177 @@ +# Import packages +# Dataframe Packages +import numpy as np +from numpy import gradient, rad2deg, arctan2 +import xarray as xr +import pandas as pd + +# Vector Packages +import geopandas as gpd +import shapely +from shapely.geometry import Point, Polygon +from pyproj import CRS, Transformer + +# Raster Packages +import rioxarray as rxr +from rioxarray.merge import merge_arrays +#import rasterstats as rs +from osgeo import gdal +from osgeo import gdalconst + +# Data Access Packages +import pystac_client +import planetary_computer + +# General Packages +import os +import re +import shutil +import math +from datetime import datetime +import glob +from pprint import pprint +from typing import Union +from pathlib import Path +from tqdm import tqdm +import time +import requests +import concurrent.futures +from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed, TimeoutError +import dask +import dask.dataframe as dd +from dask.diagnostics import ProgressBar +from tqdm import tqdm +from requests.exceptions import HTTPError +import dask_jobqueue +from dask.distributed import Client + +#Processing using gdal +#In my opinion working gdal is faster compared to richdem +def process_single_location(args): + lat, lon, index_id, tiles = args + + signed_asset = planetary_computer.sign(tiles[int(index_id)].assets["data"]) + elevation = rxr.open_rasterio(signed_asset.href) + + slope = elevation.copy() + aspect = elevation.copy() + + transformer = Transformer.from_crs("EPSG:4326", elevation.rio.crs, always_xy=True) + xx, yy = transformer.transform(lon, lat) + + tilearray = np.around(elevation.values[0]).astype(int) + geo = (math.floor(float(lon)), 90, 0.0, math.ceil(float(lat)), 0.0, -90) + + driver = gdal.GetDriverByName('MEM') + temp_ds = driver.Create('', tilearray.shape[1], tilearray.shape[0], 1, gdalconst.GDT_Float32) + + temp_ds.GetRasterBand(1).WriteArray(tilearray) + + tilearray_np = temp_ds.GetRasterBand(1).ReadAsArray() + grad_y, grad_x = gradient(tilearray_np) + + # Calculate slope and aspect + slope_arr = np.sqrt(grad_x**2 + grad_y**2) + aspect_arr = rad2deg(arctan2(-grad_y, grad_x)) % 360 + + slope.values[0] = slope_arr + aspect.values[0] = aspect_arr + + elev = round(elevation.sel(x=xx, y=yy, method="nearest").values[0]) + slop = round(slope.sel(x=xx, y=yy, method="nearest").values[0]) + asp = round(aspect.sel(x=xx, y=yy, method="nearest").values[0]) + + return elev, slop, asp + +#Processing using RichDEM +def process_single_location(args): + lat, lon, index_id, tiles = args + + signed_asset = sign(tiles[index_id].assets["data"]) + elevation = rxr.open_rasterio(signed_asset.href, masked=True) + + slope = elevation.copy() + aspect = elevation.copy() + + transformer = Transformer.from_crs("EPSG:4326", elevation.rio.crs, always_xy=True) + xx, yy = transformer.transform(lon, lat) + tilearray = np.around(elevation.values[0]).astype(int) + + #set tile geo to get slope and set at rdarray + geo = (math.floor(float(lon)), 90, 0.0, math.ceil(float(lat)), 0.0, -90) + + tilearray = rd.rdarray(tilearray, no_data = -9999) + tilearray.projection = 'EPSG:4326' + tilearray.geotransform = geo + + slope_arr = rd.TerrainAttribute(tilearray, attrib='slope_degrees') + aspect_arr = rd.TerrainAttribute(tilearray, attrib='aspect') + + slope.values[0] = slope_arr + aspect.values[0] = aspect_arr + + elev = round(elevation.sel(x=xx, y=yy, method="nearest").values[0]) + slop = round(slope.sel(x=xx, y=yy, method="nearest").values[0]) + asp = round(aspect.sel(x=xx, y=yy, method="nearest").values[0]) + + return elev, slop, asp + +def process_data_in_chunks(df, tiles, num_workers = 16): + chunk_results = [] + + with ProcessPoolExecutor(max_workers=num_workers) as executor: + futures = [executor.submit(process_single_location, (row.lat, row.lon, row.index_id, tiles)) for index, row in df.iterrows()] + + #for future in tqdm(as_completed(futures), total=len(futures)): + for future in tqdm(futures): + try: + chunk_results.append(future.result(timeout=10)) + except TimeoutError: + print("Processing location timed out.") + continue + except HTTPError as e: + print(f"Failed to process a location due to HTTPError: {e}") + continue + + return pd.DataFrame(chunk_results, columns=['Elevation_m', 'Slope_Deg', 'Aspect_L']) + +###Dask implementation to process the dataframe in chunks### +def extract_terrain_data(geospatial_df, num_workers=16): + + cores_per_worker = 1 + memory_per_worker = "3GB" #Adjust based on the necessity + + cluster = dask_jobqueue.SLURMCluster(cores=cores_per_worker, + memory=memory_per_worker, + local_directory=r'/home/vgindi/slurmcluster') + + # Scale the cluster to workers based on the requirement + cluster.scale(num_workers) + dask_client = Client(cluster) + dask_client + + client = pystac_client.Client.open("https://planetarycomputer.microsoft.com/api/stac/v1", ignore_conformance=True) + + #coordinates obtained from the grid_cells_meta data + area_of_interest = {'type': 'Polygon', + 'coordinates': [[[-120.37693519839556, 36.29213061937931], + [-120.37690215328962, 38.8421802805432], + [-118.29165268221286, 38.84214595220293], + [-118.2917116398743, 36.29209713778364], + [-120.37693519839556, 36.29213061937931]]]} + + search = client.search(collections=["cop-dem-glo-90"], intersects=area_of_interest) + + tiles = list(search.items()) + geospatial_df = geospatial_df[['lat', 'lon', 'index_id']] + + dask_df = dd.from_pandas(geospatial_df, npartitions=5) + results = dask_df.map_partitions(process_data_in_chunks, tiles=tiles, num_workers=num_workers, + meta=pd.DataFrame(columns=['Elevation_m', 'Slope_Deg', 'Aspect_L'])) + + + final_result = results.compute(scheduler='processes') + + #result_df = pd.concat([geospatial_df.reset_index(drop=True), final_result.reset_index(drop=True)], axis=1) + return final_result +