diff --git a/maps4fs/generator/dtm/dtm.py b/maps4fs/generator/dtm/dtm.py index e1113d9f..22b0cd7e 100644 --- a/maps4fs/generator/dtm/dtm.py +++ b/maps4fs/generator/dtm/dtm.py @@ -11,7 +11,10 @@ import numpy as np import osmnx as ox # type: ignore import rasterio # type: ignore -import requests +from rasterio.warp import calculate_default_transform, reproject +from rasterio.enums import Resampling +from rasterio.merge import merge + from pydantic import BaseModel from maps4fs.logger import Logger @@ -23,6 +26,9 @@ class DTMProviderSettings(BaseModel): """Base class for DTM provider settings models.""" + easy_mode: bool = True + power_factor: int = 0 + # pylint: disable=too-many-public-methods class DTMProvider(ABC): @@ -44,6 +50,20 @@ class DTMProvider(ABC): _instructions: str | None = None + _base_instructions = ( + "ℹ️ Using **Easy mode** is recommended, as it automatically adjusts the values in the image, " + "so the terrain elevation in Giants Editor will match real world elevation in meters. \n" + "ℹ️ If the terrain height difference in the real world is bigger than 255 meters, " + "the [Height scale](https://github.com/iwatkot/maps4fs/blob/main/docs/dem.md#height-scale)" + " parameter in the **map.i3d** file will be automatically adjusted. \n" + "⚡ If the **Easy mode** option is disabled, you will probably get completely flat " + "terrain, unless you adjust the DEM Multiplier Setting or the Height scale parameter in " + "the Giants Editor. \n" + "💡 You can use the **Power factor** setting to make the difference between heights " + "bigger. Be extremely careful with this setting, and use only low values, otherwise your " + "terrain may be completely broken. \n" + ) + # pylint: disable=R0913, R0917 def __init__( self, @@ -152,7 +172,7 @@ def is_community(cls) -> bool: return cls._is_community @classmethod - def settings(cls) -> Type[DTMProviderSettings] | None: + def settings(cls) -> Type[DTMProviderSettings]: """Settings model of the provider. Returns: @@ -169,6 +189,15 @@ def instructions(cls) -> str | None: """ return cls._instructions + @classmethod + def base_instructions(cls) -> str | None: + """Instructions for using any provider. + + Returns: + str: Instructions for using any provider. + """ + return cls._base_instructions + @property def user_settings(self) -> DTMProviderSettings | None: """User settings of the provider. @@ -212,63 +241,116 @@ def get_provider_descriptions(cls) -> dict[str, str]: """ providers = {} for provider in cls.__subclasses__(): - if not provider.is_base(): - providers[provider._code] = provider.description() # pylint: disable=W0212 + providers[provider._code] = provider.description() # pylint: disable=W0212 return providers # type: ignore - def download_tile(self, output_path: str, **kwargs) -> bool: - """Download a tile from the provider. - - Arguments: - output_path (str): Path to save the downloaded tile. + @abstractmethod + def download_tiles(self) -> list[str]: + """Download tiles from the provider. Returns: - bool: True if the tile was downloaded successfully, False otherwise. + list: List of paths to the downloaded tiles. """ - url = self.formatted_url(**kwargs) - response = requests.get(url, stream=True, timeout=10) - if response.status_code == 200: - with open(output_path, "wb") as file: - for chunk in response.iter_content(chunk_size=1024): - file.write(chunk) - return True - return False - - def get_or_download_tile(self, output_path: str, **kwargs) -> str | None: - """Get or download a tile from the provider. + raise NotImplementedError - Arguments: - output_path (str): Path to save the downloaded tile. + def get_numpy(self) -> np.ndarray: + """Get numpy array of the tile. + Resulting array must be 16 bit (signed or unsigned) integer and it should be already + windowed to the bounding box of ROI. It also must have only one channel. Returns: - str: Path to the downloaded tile or None if the tile not exists and was - not downloaded. + np.ndarray: Numpy array of the tile. """ - if not os.path.exists(output_path): - if not self.download_tile(output_path, **kwargs): - return None - return output_path + # download tiles using DTM provider implementation + tiles = self.download_tiles() + self.logger.debug(f"Downloaded {len(tiles)} DEM tiles") + + # merge tiles if necessary + if len(tiles) > 1: + self.logger.debug("Multiple tiles downloaded. Merging tiles") + tile, _ = self.merge_geotiff(tiles) + else: + tile = tiles[0] + + # determine CRS of the resulting tile and reproject if necessary + with rasterio.open(tile) as src: + crs = src.crs + if crs != "EPSG:4326": + self.logger.debug(f"Reprojecting GeoTIFF from {crs} to EPSG:4326...") + tile = self.reproject_geotiff(tile) + + # extract region of interest from the tile + data = self.extract_roi(tile) + + # process elevation data to be compatible with the game + data = self.process_elevation(data) - def get_tile_parameters(self, *args, **kwargs) -> dict: - """Get parameters for the tile, that will be used to format the URL. - Must be implemented in subclasses. + return data + + def process_elevation(self, data: np.ndarray) -> np.ndarray: + """Process elevation data. + + Arguments: + data (np.ndarray): Elevation data. Returns: - dict: Tile parameters to format the URL. + np.ndarray: Processed elevation data. """ - raise NotImplementedError + self.data_info = {} + self.add_numpy_params(data, "original") - @abstractmethod - def get_numpy(self) -> np.ndarray: - """Get numpy array of the tile. - Resulting array must be 16 bit (signed or unsigned) integer and it should be already - windowed to the bounding box of ROI. It also must have only one channel. + data = self.signed_to_unsigned(data) + self.add_numpy_params(data, "grounded") + + original_deviation = int(self.data_info["original_deviation"]) + in_game_maximum_height = 65535 // 255 + if original_deviation > in_game_maximum_height: + suggested_height_scale_multiplier = ( + original_deviation / in_game_maximum_height # type: ignore + ) + suggested_height_scale_value = int(255 * suggested_height_scale_multiplier) + else: + suggested_height_scale_multiplier = 1 + suggested_height_scale_value = 255 + + self.data_info["suggested_height_scale_multiplier"] = suggested_height_scale_multiplier + self.data_info["suggested_height_scale_value"] = suggested_height_scale_value + + self.map.shared_settings.height_scale_multiplier = ( # type: ignore + suggested_height_scale_multiplier + ) + self.map.shared_settings.height_scale_value = suggested_height_scale_value # type: ignore + + if self.user_settings.easy_mode: # type: ignore + try: + data = self.normalize_dem(data) + self.add_numpy_params(data, "normalized") + + normalized_deviation = self.data_info["normalized_deviation"] + z_scaling_factor = normalized_deviation / original_deviation # type: ignore + self.data_info["z_scaling_factor"] = z_scaling_factor + + self.map.shared_settings.mesh_z_scaling_factor = z_scaling_factor # type: ignore + self.map.shared_settings.change_height_scale = True # type: ignore + + except Exception as e: # pylint: disable=W0718 + self.logger.error( + "Failed to normalize DEM data. Error: %s. Using original data.", e + ) + + return data + + def info_sequence(self) -> dict[str, int | str | float] | None: + """Returns the information sequence for the component. Must be implemented in the child + class. If the component does not have an information sequence, an empty dictionary must be + returned. Returns: - np.ndarray: Numpy array of the tile. + dict[str, int | str | float] | None: Information sequence for the component. """ - raise NotImplementedError + return self.data_info + # region helpers def get_bbox(self) -> tuple[float, float, float, float]: """Get bounding box of the tile based on the center point and size. @@ -281,6 +363,83 @@ def get_bbox(self) -> tuple[float, float, float, float]: bbox = north, south, east, west return bbox + def reproject_geotiff(self, input_tiff: str) -> str: + """Reproject a GeoTIFF file to a new coordinate reference system (CRS). + + Arguments: + input_tiff (str): Path to the input GeoTIFF file. + target_crs (str): Target CRS (e.g., EPSG:4326 for CRS:84). + + Returns: + str: Path to the reprojected GeoTIFF file. + """ + output_tiff = os.path.join(self._tile_directory, "merged.tif") + + # Open the source GeoTIFF + self.logger.debug("Reprojecting GeoTIFF to %s CRS...", "EPSG:4326") + with rasterio.open(input_tiff) as src: + # Get the transform, width, and height of the target CRS + transform, width, height = calculate_default_transform( + src.crs, "EPSG:4326", src.width, src.height, *src.bounds + ) + + # Update the metadata for the target GeoTIFF + kwargs = src.meta.copy() + kwargs.update( + {"crs": "EPSG:4326", "transform": transform, "width": width, "height": height} + ) + + # Open the destination GeoTIFF file and reproject + with rasterio.open(output_tiff, "w", **kwargs) as dst: + for i in range(1, src.count + 1): # Iterate over all raster bands + reproject( + source=rasterio.band(src, i), + destination=rasterio.band(dst, i), + src_transform=src.transform, + src_crs=src.crs, + dst_transform=transform, + dst_crs="EPSG:4326", + resampling=Resampling.nearest, # Choose resampling method + ) + + self.logger.debug("Reprojected GeoTIFF saved to %s", output_tiff) + return output_tiff + + def merge_geotiff(self, input_files: list[str]) -> tuple[str, str]: + """Merge multiple GeoTIFF files into a single GeoTIFF file. + + Arguments: + input_files (list): List of input GeoTIFF files to merge. + output_file (str): Path to save the merged GeoTIFF file. + """ + output_file = os.path.join(self._tile_directory, "merged.tif") + # Open all input GeoTIFF files as datasets + self.logger.debug("Merging tiff files...") + datasets = [rasterio.open(file) for file in input_files] + + # Merge datasets + crs = datasets[0].crs + mosaic, out_transform = merge(datasets, nodata=0) + + # Get metadata from the first file and update it for the output + out_meta = datasets[0].meta.copy() + out_meta.update( + { + "driver": "GTiff", + "height": mosaic.shape[1], + "width": mosaic.shape[2], + "transform": out_transform, + "count": mosaic.shape[0], # Number of bands + } + ) + + # Write merged GeoTIFF to the output file + with rasterio.open(output_file, "w", **out_meta) as dest: + dest.write(mosaic) + + self.logger.debug("GeoTIFF images merged successfully into %s", output_file) + return output_file, crs + def extract_roi(self, tile_path: str) -> np.ndarray: """Extract region of interest (ROI) from the GeoTIFF file. @@ -304,18 +463,86 @@ def extract_roi(self, tile_path: str) -> np.ndarray: window.width, window.height, ) - data = src.read(1, window=window) + data = src.read(1, window=window, masked=True) if not data.size > 0: raise ValueError("No data in the tile.") return data - def info_sequence(self) -> dict[str, int | str | float] | None: - """Returns the information sequence for the component. Must be implemented in the child - class. If the component does not have an information sequence, an empty dictionary must be - returned. + def normalize_dem(self, data: np.ndarray) -> np.ndarray: + """Normalize DEM data to 16-bit unsigned integer using max height from settings. + + Arguments: + data (np.ndarray): DEM data from SRTM file after cropping. Returns: - dict[str, int | str | float] | None: Information sequence for the component. + np.ndarray: Normalized DEM data. """ - return self.data_info + maximum_height = int(data.max()) + minimum_height = int(data.min()) + deviation = maximum_height - minimum_height + self.logger.debug( + "Maximum height: %s. Minimum height: %s. Deviation: %s.", + maximum_height, + minimum_height, + deviation, + ) + self.logger.debug("Number of unique values in original DEM data: %s.", np.unique(data).size) + + adjusted_maximum_height = maximum_height * 255 + adjusted_maximum_height = min(adjusted_maximum_height, 65535) + scaling_factor = adjusted_maximum_height / maximum_height + self.logger.debug( + "Adjusted maximum height: %s. Scaling factor: %s.", + adjusted_maximum_height, + scaling_factor, + ) + + if self.user_settings.power_factor: # type: ignore + power_factor = 1 + self.user_settings.power_factor / 10 # type: ignore + self.logger.debug( + "Applying power factor: %s to the DEM data.", + power_factor, + ) + data = np.power(data, power_factor).astype(np.uint16) + + normalized_data = np.round(data * scaling_factor).astype(np.uint16) + self.logger.debug( + "Normalized data maximum height: %s. Minimum height: %s. Number of unique values: %s.", + normalized_data.max(), + normalized_data.min(), + np.unique(normalized_data).size, + ) + return normalized_data + + def signed_to_unsigned(self, data: np.ndarray, add_one: bool = True) -> np.ndarray: + """Convert signed 16-bit integer to unsigned 16-bit integer. + + Arguments: + data (np.ndarray): DEM data from SRTM file after cropping. + + Returns: + np.ndarray: Unsigned DEM data. + """ + data = data - data.min() + if add_one: + data = data + 1 + return data.astype(np.uint16) + + def add_numpy_params( + self, + data: np.ndarray, + prefix: str, + ) -> None: + """Add numpy array parameters to the data_info dictionary. + + Arguments: + data (np.ndarray): Numpy array of the tile. + prefix (str): Prefix for the parameters. + """ + self.data_info[f"{prefix}_minimum_height"] = int(data.min()) # type: ignore + self.data_info[f"{prefix}_maximum_height"] = int(data.max()) # type: ignore + self.data_info[f"{prefix}_deviation"] = int(data.max() - data.min()) # type: ignore + self.data_info[f"{prefix}_unique_values"] = int(np.unique(data).size) # type: ignore + + # endregion diff --git a/maps4fs/generator/dtm/nrw.py b/maps4fs/generator/dtm/nrw.py index 66272757..f9bfcf4d 100644 --- a/maps4fs/generator/dtm/nrw.py +++ b/maps4fs/generator/dtm/nrw.py @@ -2,18 +2,11 @@ import os from datetime import datetime -from zipfile import ZipFile import numpy as np -from pyproj import Transformer -import rasterio -import requests -from rasterio.enums import Resampling -from rasterio.merge import merge -from rasterio.warp import calculate_default_transform, reproject -from rasterio.windows import from_bounds from owslib.wcs import WebCoverageService from owslib.util import Authentication +from pyproj import Transformer from maps4fs.generator.dtm.dtm import DTMProvider, DTMProviderSettings @@ -21,9 +14,6 @@ class NRWProviderSettings(DTMProviderSettings): """Settings for the USGS provider.""" - max_local_elevation: int = 255 - # wcs_url: str = '' - class NRWProvider(DTMProvider): """Generic provider of WCS sources.""" @@ -37,259 +27,21 @@ class NRWProvider(DTMProvider): _settings = NRWProviderSettings _author = "[kbrandwijk](https://github.com/kbrandwijk)" _is_community = True - _instructions = ( - "ℹ️ Set the max local elevation to approx the local max elevation for your area in" - " meters. This will allow you to use heightScale 255 in GE with minimal tweaking." - " Setting this value too low can cause a flat map!" - ) + _instructions = None def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") self.shared_tiff_path = os.path.join(self._tile_directory, "shared") os.makedirs(self.shared_tiff_path, exist_ok=True) - self.output_path = os.path.join(self._tile_directory, f"timestamp_{timestamp}") - os.makedirs(self.output_path, exist_ok=True) - - def download_tif_files(self, urls: list[str]) -> list[str]: - """Download GeoTIFF files from the given URLs. - - Arguments: - urls (list): List of URLs to download GeoTIFF files from. - - Returns: - list: List of paths to the downloaded GeoTIFF files. - """ - tif_files = [] - for url in urls: - file_name = os.path.basename(url) - self.logger.debug("Retrieving TIFF: %s", file_name) - file_path = os.path.join(self.shared_tiff_path, file_name) - if not os.path.exists(file_path): - try: - # Send a GET request to the file URL - response = requests.get(url, stream=True) # pylint: disable=W3101 - response.raise_for_status() # Raise an error for HTTP status codes 4xx/5xx - - # Write the content of the response to the file - with open(file_path, "wb") as file: - for chunk in response.iter_content(chunk_size=8192): # Download in chunks - file.write(chunk) - self.logger.info("File downloaded successfully: %s", file_path) - if file_name.endswith('.zip'): - with ZipFile(file_path, "r") as f_in: - f_in.extract(file_name.replace('.zip', '.img'), self.shared_tiff_path) - tif_files.append(file_path.replace('.zip', '.img')) - else: - tif_files.append(file_path) - except requests.exceptions.RequestException as e: - self.logger.error("Failed to download file: %s", e) - else: - self.logger.debug("File already exists: %s", file_name) - if file_name.endswith('.zip'): - if not os.path.exists(file_path.replace('.zip', '.img')): - with ZipFile(file_path, "r") as f_in: - f_in.extract(file_name.replace('.zip', '.img'), self.shared_tiff_path) - tif_files.append(file_path.replace('.zip', '.img')) - else: - tif_files.append(file_path) - - return tif_files - - def merge_geotiff(self, input_files: list[str], output_file: str) -> None: - """Merge multiple GeoTIFF files into a single GeoTIFF file. - - Arguments: - input_files (list): List of input GeoTIFF files to merge. - output_file (str): Path to save the merged GeoTIFF file. - """ - # Open all input GeoTIFF files as datasets - self.logger.debug("Merging tiff files...") - datasets = [rasterio.open(file) for file in input_files] - - # Merge datasets - mosaic, out_transform = merge(datasets, nodata=0) - - # Get metadata from the first file and update it for the output - out_meta = datasets[0].meta.copy() - out_meta.update( - { - "driver": "GTiff", - "height": mosaic.shape[1], - "width": mosaic.shape[2], - "transform": out_transform, - "count": mosaic.shape[0], # Number of bands - } - ) - - # Write merged GeoTIFF to the output file - with rasterio.open(output_file, "w", **out_meta) as dest: - dest.write(mosaic) - - self.logger.debug("GeoTIFF images merged successfully into %s", output_file) - - def reproject_geotiff(self, input_tiff: str, output_tiff: str, target_crs: str) -> None: - """Reproject a GeoTIFF file to a new coordinate reference system (CRS). - - Arguments: - input_tiff (str): Path to the input GeoTIFF file. - output_tiff (str): Path to save the reprojected GeoTIFF file. - target_crs (str): Target CRS (e.g., EPSG:4326 for CRS:84). - """ - # Open the source GeoTIFF - self.logger.debug("Reprojecting GeoTIFF to %s CRS...", target_crs) - with rasterio.open(input_tiff) as src: - # Get the transform, width, and height of the target CRS - transform, width, height = calculate_default_transform( - src.crs, target_crs, src.width, src.height, *src.bounds - ) - - # Update the metadata for the target GeoTIFF - kwargs = src.meta.copy() - kwargs.update( - {"crs": target_crs, "transform": transform, "width": width, "height": height} - ) - - # Open the destination GeoTIFF file and reproject - with rasterio.open(output_tiff, "w", **kwargs) as dst: - for i in range(1, src.count + 1): # Iterate over all raster bands - reproject( - source=rasterio.band(src, i), - destination=rasterio.band(dst, i), - src_transform=src.transform, - src_crs=src.crs, - dst_transform=transform, - dst_crs=target_crs, - resampling=Resampling.nearest, # Choose resampling method - ) - self.logger.debug("Reprojected GeoTIFF saved to %s", output_tiff) - - def extract_roi(self, input_tiff: str) -> np.ndarray: # pylint: disable=W0237 - """ - Crop a GeoTIFF based on given geographic bounding box and save to a new file. - - Arguments: - input_tiff (str): Path to the input GeoTIFF file. - - Returns: - np.ndarray: Numpy array of the cropped GeoTIFF. - """ - self.logger.debug("Extracting ROI...") - # Open the input GeoTIFF - with rasterio.open(input_tiff) as src: - - # Create a rasterio window from the bounding box - (north, south, east, west) = self.get_bbox() - window = from_bounds(west, south, east, north, transform=src.transform) - - data = src.read(1, window=window) - self.logger.debug("Extracted ROI") - return data - - # pylint: disable=R0914, R0917, R0913 - def convert_geotiff_to_geotiff( - self, - input_tiff: str, - output_tiff: str, - min_height: float, - max_height: float, - target_crs: str, - ) -> None: - """ - Convert a GeoTIFF to a scaled GeoTIFF with UInt16 values using a specific coordinate - system and output size. - - Arguments: - input_tiff (str): Path to the input GeoTIFF file. - output_tiff (str): Path to save the output GeoTIFF file. - min_height (float): Minimum terrain height (input range). - max_height (float): Maximum terrain height (input range). - target_crs (str): Target CRS (e.g., EPSG:4326 for CRS:84). - """ - # Open the input GeoTIFF file - self.logger.debug("Converting to uint16") - with rasterio.open(input_tiff) as src: - # Ensure the input CRS matches the target CRS (reprojection may be required) - if str(src.crs) != str(target_crs): - raise ValueError( - f"The GeoTIFF CRS is {src.crs}, but the target CRS is {target_crs}. " - "Reprojection may be required." - ) - - # Read the data from the first band - data = src.read(1) # Assuming the input GeoTIFF has only a single band - - # Identify the input file's NoData value - input_nodata = src.nodata - if input_nodata is None: - input_nodata = -999999.0 # Default fallback if no NoData value is defined - nodata_value = 0 - # Replace NoData values (e.g., -999999.0) with the new NoData value - # (e.g., 65535 for UInt16) - data[data == input_nodata] = nodata_value - - # Scale the data to the 0–65535 range (UInt16), avoiding NoData areas - scaled_data = np.clip( - (data - min_height) * (65535 / (max_height - min_height)), 0, 65535 - ).astype(np.uint16) - scaled_data[data == nodata_value] = ( - nodata_value # Preserve NoData value in the scaled array - ) - - # Compute the proper transform to ensure consistency - # Get the original transform, width, and height - transform = src.transform - width = src.width - height = src.height - left, bottom, right, top = src.bounds - - # Adjust the transform matrix to make sure bounds and transform align correctly - transform = rasterio.transform.from_bounds(left, bottom, right, top, width, height) - - # Prepare metadata for the output GeoTIFF - metadata = src.meta.copy() - metadata.update( - { - "dtype": rasterio.uint16, # Update dtype for uint16 - "crs": target_crs, # Update CRS if needed - "nodata": nodata_value, # Set the new NoData value - "transform": transform, # Use the updated, consistent transform - } - ) - # Write the scaled data to the output GeoTIFF - with rasterio.open(output_tiff, "w", **metadata) as dst: - dst.write(scaled_data, 1) # Write the first band + def download_tiles(self) -> list[str]: + bbox = self.get_bbox() + bbox = self.transform_bbox(bbox, "EPSG:25832") - self.logger.debug( - "GeoTIFF successfully converted and saved to %s, with nodata value: %s.", - output_tiff, - nodata_value, - ) + tiles = self.tile_bbox(bbox, 1000) - def transform_bbox(self, bbox: tuple[float, float, float, float], to_crs: str) -> tuple[float, float, float, float]: - """Transform the bounding box to a different coordinate reference system (CRS). - - Arguments: - bbox (tuple): Bounding box to transform (north, south, east, west). - to_crs (str): Target CRS (e.g., EPSG:4326 for CRS:84). - - Returns: - tuple: Transformed bounding box (north, south, east, west). - """ - transformer = Transformer.from_crs("epsg:4326", to_crs) - north, south, east, west = bbox - bottom_left_x, bottom_left_y = transformer.transform(xx=south, yy=west) - top_left_x, top_left_y = transformer.transform(xx=north, yy=west) - top_right_x, top_right_y = transformer.transform(xx=north, yy=east) - bottom_right_x, bottom_right_y = transformer.transform(xx=south, yy=east) - - west = min(bottom_left_y, bottom_right_y) - east = max(top_left_y, top_right_y) - south = min(bottom_left_x, top_left_x) - north = max(bottom_right_x, top_right_x) - - return north, south, east, west + all_tif_files = self.download_all_tiles(tiles) + return all_tif_files def tile_bbox(self, bbox: tuple[float, float, float, float], tile_size: int) -> list[tuple[float, float, float, float]]: """Tile the bounding box into smaller bounding boxes of a specified size. @@ -314,7 +66,7 @@ def tile_bbox(self, bbox: tuple[float, float, float, float], tile_size: int) -> return tiles - def download_tiles(self, tiles: list[tuple[float, float, float, float]]) -> list[str]: + def download_all_tiles(self, tiles: list[tuple[float, float, float, float]]) -> list[str]: """Download tiles from the NRW provider. Arguments: @@ -324,6 +76,10 @@ def download_tiles(self, tiles: list[tuple[float, float, float, float]]) -> list list: List of paths to the downloaded GeoTIFF files. """ all_tif_files = [] + wcs = WebCoverageService( + 'https://www.wcs.nrw.de/geobasis/wcs_nw_dgm', + auth=Authentication(verify=False), + timeout=600) for tile in tiles: file_name = '_'.join(map(str, tile)) + '.tif' file_path = os.path.join(self.shared_tiff_path, file_name) @@ -340,49 +96,26 @@ def download_tiles(self, tiles: list[tuple[float, float, float, float]]) -> list all_tif_files.append(file_path) return all_tif_files - def generate_data(self) -> np.ndarray: - """Generate data from the NRW provider. + def transform_bbox(self, bbox: tuple[float, float, float, float], to_crs: str) -> tuple[float, float, float, float]: + """Transform the bounding box to a different coordinate reference system (CRS). + + Arguments: + bbox (tuple): Bounding box to transform (north, south, east, west). + to_crs (str): Target CRS (e.g., EPSG:4326 for CRS:84). Returns: - np.ndarray: Numpy array of the data. + tuple: Transformed bounding box (north, south, east, west). """ - north, south, east, west = self.get_bbox() - - north, south, east, west = self.transform_bbox((north, south, east, west), "epsg:25832") - - tiles = self.tile_bbox((north, south, east, west), 1000) - - all_tif_files = self.download_tiles(tiles) - - self.merge_geotiff(all_tif_files, os.path.join(self.output_path, "merged.tif")) - - self.reproject_geotiff( - os.path.join(self.output_path, "merged.tif"), - os.path.join(self.output_path, "reprojected.tif"), - "EPSG:4326", - ) - self.convert_geotiff_to_geotiff( - os.path.join(self.output_path, "reprojected.tif"), - os.path.join(self.output_path, "translated.tif"), - min_height=0, - max_height=self.user_settings.max_local_elevation, # type: ignore - target_crs="EPSG:4326", - ) - return self.extract_roi(os.path.join(self.output_path, "translated.tif")) + transformer = Transformer.from_crs("epsg:4326", to_crs) + north, south, east, west = bbox + bottom_left_x, bottom_left_y = transformer.transform(xx=south, yy=west) + top_left_x, top_left_y = transformer.transform(xx=north, yy=west) + top_right_x, top_right_y = transformer.transform(xx=north, yy=east) + bottom_right_x, bottom_right_y = transformer.transform(xx=south, yy=east) - def get_numpy(self) -> np.ndarray: - """Get numpy array of the tile. + west = min(bottom_left_y, bottom_right_y) + east = max(top_left_y, top_right_y) + south = min(bottom_left_x, top_left_x) + north = max(bottom_right_x, top_right_x) - Returns: - np.ndarray: Numpy array of the tile. - """ - if not self.user_settings: - raise ValueError("user_settings is 'none'") - if self.user_settings.max_local_elevation <= 0: # type: ignore - raise ValueError( - "Entered 'max_local_elevation' value is unable to be used. " - "Use a value greater than 0." - ) - if not self._data: - self._data = self.generate_data() - return self._data + return north, south, east, west diff --git a/maps4fs/generator/dtm/srtm.py b/maps4fs/generator/dtm/srtm.py index 5ac286d9..f322b070 100644 --- a/maps4fs/generator/dtm/srtm.py +++ b/maps4fs/generator/dtm/srtm.py @@ -7,7 +7,7 @@ import os import shutil -import numpy as np +import requests from maps4fs.generator.dtm.dtm import DTMProvider, DTMProviderSettings @@ -15,9 +15,6 @@ class SRTM30ProviderSettings(DTMProviderSettings): """Settings for SRTM 30m provider.""" - easy_mode: bool = True - power_factor: int = 0 - class SRTM30Provider(DTMProvider): """Provider of Shuttle Radar Topography Mission (SRTM) 30m data.""" @@ -32,22 +29,6 @@ class SRTM30Provider(DTMProvider): _author = "[iwatkot](https://github.com/iwatkot)" - _instructions = ( - "ℹ️ If you don't know how to work with DEM data, it is recommended to use the " - "**Easy mode** option. It will automatically change the values in the image, so the " - "terrain will be visible in the Giants Editor. If you're an experienced modder, it's " - "recommended to disable this option and work with the DEM data in a usual way. \n" - "ℹ️ If the terrain height difference in the real world is bigger than 255 meters, " - "the [Height scale](https://github.com/iwatkot/maps4fs/blob/main/docs/dem.md#height-scale)" - " parameter in the **map.i3d** file will be changed automatically. \n" - "⚡ If the **Easy mode** option is disabled, you will probably get completely flat " - "terrain, unless you adjust the DEM Multiplier Setting or the Height scale parameter in " - "the Giants Editor. \n" - "💡 You can use the **Power factor** setting to make the difference between heights " - "bigger. Be extremely careful with this setting, and use only low values, otherwise your " - "terrain may be completely broken. \n" - ) - _settings = SRTM30ProviderSettings def __init__(self, *args, **kwargs): @@ -58,169 +39,86 @@ def __init__(self, *args, **kwargs): os.makedirs(self.gz_directory, exist_ok=True) self.data_info: dict[str, int | str | float] | None = None # type: ignore - def get_tile_parameters(self, *args, **kwargs) -> dict[str, str]: - """Returns latitude band and tile name for SRTM tile from coordinates. + def download_tiles(self): + """Download SRTM tiles.""" + north, south, east, west = self.get_bbox() - Arguments: - lat (float): Latitude. - lon (float): Longitude. + tiles = [] + # Look at each corner of the bbox in case the bbox spans across multiple tiles + for pair in [(north, east), (south, west), (south, east), (north, west)]: + tile_parameters = self.get_tile_parameters(*pair) + tile_name = tile_parameters["tile_name"] + decompressed_tile_path = os.path.join(self.hgt_directory, f"{tile_name}.hgt") - Returns: - dict: Tile parameters. - """ - lat, lon = args + if not os.path.isfile(decompressed_tile_path): + compressed_tile_path = os.path.join(self.gz_directory, f"{tile_name}.hgt.gz") + if not self.get_or_download_tile(compressed_tile_path, **tile_parameters): + raise FileNotFoundError(f"Tile {tile_name} not found.") - tile_latitude = math.floor(lat) - tile_longitude = math.floor(lon) + with gzip.open(compressed_tile_path, "rb") as f_in: + with open(decompressed_tile_path, "wb") as f_out: + shutil.copyfileobj(f_in, f_out) + tiles.append(decompressed_tile_path) - latitude_band = f"N{abs(tile_latitude):02d}" if lat >= 0 else f"S{abs(tile_latitude):02d}" - if lon < 0: - tile_name = f"{latitude_band}W{abs(tile_longitude):03d}" - else: - tile_name = f"{latitude_band}E{abs(tile_longitude):03d}" + return tiles - self.logger.debug( - "Detected tile name: %s for coordinates: lat %s, lon %s.", tile_name, lat, lon - ) - return {"latitude_band": latitude_band, "tile_name": tile_name} + # region provider specific helpers + def download_tile(self, output_path: str, **kwargs) -> bool: + """Download a tile from the provider. - def get_numpy(self) -> np.ndarray: - """Get numpy array of the tile. + Arguments: + output_path (str): Path to save the downloaded tile. Returns: - np.ndarray: Numpy array of the tile. - """ - tile_parameters = self.get_tile_parameters(*self.coordinates) - tile_name = tile_parameters["tile_name"] - decompressed_tile_path = os.path.join(self.hgt_directory, f"{tile_name}.hgt") - - if not os.path.isfile(decompressed_tile_path): - compressed_tile_path = os.path.join(self.gz_directory, f"{tile_name}.hgt.gz") - if not self.get_or_download_tile(compressed_tile_path, **tile_parameters): - raise FileNotFoundError(f"Tile {tile_name} not found.") - - with gzip.open(compressed_tile_path, "rb") as f_in: - with open(decompressed_tile_path, "wb") as f_out: - shutil.copyfileobj(f_in, f_out) - - data = self.extract_roi(decompressed_tile_path) - - self.data_info = {} - self.add_numpy_params(data, "original") - - data = self.signed_to_unsigned(data) - self.add_numpy_params(data, "grounded") - - original_deviation = int(self.data_info["original_deviation"]) - in_game_maximum_height = 65535 // 255 - if original_deviation > in_game_maximum_height: - suggested_height_scale_multiplier = ( - original_deviation / in_game_maximum_height # type: ignore - ) - suggested_height_scale_value = int(255 * suggested_height_scale_multiplier) - else: - suggested_height_scale_multiplier = 1 - suggested_height_scale_value = 255 - - self.data_info["suggested_height_scale_multiplier"] = suggested_height_scale_multiplier - self.data_info["suggested_height_scale_value"] = suggested_height_scale_value - - self.map.shared_settings.height_scale_multiplier = ( # type: ignore - suggested_height_scale_multiplier - ) - self.map.shared_settings.height_scale_value = suggested_height_scale_value # type: ignore - - if self.user_settings.easy_mode: # type: ignore - try: - data = self.normalize_dem(data) - self.add_numpy_params(data, "normalized") - - normalized_deviation = self.data_info["normalized_deviation"] - z_scaling_factor = normalized_deviation / original_deviation # type: ignore - self.data_info["z_scaling_factor"] = z_scaling_factor - - self.map.shared_settings.mesh_z_scaling_factor = z_scaling_factor # type: ignore - self.map.shared_settings.change_height_scale = True # type: ignore - - except Exception as e: # pylint: disable=W0718 - self.logger.error( - "Failed to normalize DEM data. Error: %s. Using original data.", e - ) - - return data - - def add_numpy_params( - self, - data: np.ndarray, - prefix: str, - ) -> None: - """Add numpy array parameters to the data_info dictionary. - - Arguments: - data (np.ndarray): Numpy array of the tile. - prefix (str): Prefix for the parameters. + bool: True if the tile was downloaded successfully, False otherwise. """ - self.data_info[f"{prefix}_minimum_height"] = int(data.min()) # type: ignore - self.data_info[f"{prefix}_maximum_height"] = int(data.max()) # type: ignore - self.data_info[f"{prefix}_deviation"] = int(data.max() - data.min()) # type: ignore - self.data_info[f"{prefix}_unique_values"] = int(np.unique(data).size) # type: ignore - - def signed_to_unsigned(self, data: np.ndarray, add_one: bool = True) -> np.ndarray: - """Convert signed 16-bit integer to unsigned 16-bit integer. + url = self.formatted_url(**kwargs) + response = requests.get(url, stream=True, timeout=10) + if response.status_code == 200: + with open(output_path, "wb") as file: + for chunk in response.iter_content(chunk_size=1024): + file.write(chunk) + return True + return False + + def get_or_download_tile(self, output_path: str, **kwargs) -> str | None: + """Get or download a tile from the provider. Arguments: - data (np.ndarray): DEM data from SRTM file after cropping. + output_path (str): Path to save the downloaded tile. Returns: - np.ndarray: Unsigned DEM data. + str: Path to the downloaded tile or None if the tile not exists and was + not downloaded. """ - data = data - data.min() - if add_one: - data = data + 1 - return data.astype(np.uint16) + if not os.path.exists(output_path): + if not self.download_tile(output_path, **kwargs): + return None + return output_path - def normalize_dem(self, data: np.ndarray) -> np.ndarray: - """Normalize DEM data to 16-bit unsigned integer using max height from settings. + def get_tile_parameters(self, *args, **kwargs) -> dict[str, str]: + """Returns latitude band and tile name for SRTM tile from coordinates. Arguments: - data (np.ndarray): DEM data from SRTM file after cropping. + lat (float): Latitude. + lon (float): Longitude. Returns: - np.ndarray: Normalized DEM data. + dict: Tile parameters. """ - maximum_height = int(data.max()) - minimum_height = int(data.min()) - deviation = maximum_height - minimum_height - self.logger.debug( - "Maximum height: %s. Minimum height: %s. Deviation: %s.", - maximum_height, - minimum_height, - deviation, - ) - self.logger.debug("Number of unique values in original DEM data: %s.", np.unique(data).size) + lat, lon = args - adjusted_maximum_height = maximum_height * 255 - adjusted_maximum_height = min(adjusted_maximum_height, 65535) - scaling_factor = adjusted_maximum_height / maximum_height - self.logger.debug( - "Adjusted maximum height: %s. Scaling factor: %s.", - adjusted_maximum_height, - scaling_factor, - ) + tile_latitude = math.floor(lat) + tile_longitude = math.floor(lon) - if self.user_settings.power_factor: # type: ignore - power_factor = 1 + self.user_settings.power_factor / 10 # type: ignore - self.logger.debug( - "Applying power factor: %s to the DEM data.", - power_factor, - ) - data = np.power(data, power_factor).astype(np.uint16) + latitude_band = f"N{abs(tile_latitude):02d}" if lat >= 0 else f"S{abs(tile_latitude):02d}" + if lon < 0: + tile_name = f"{latitude_band}W{abs(tile_longitude):03d}" + else: + tile_name = f"{latitude_band}E{abs(tile_longitude):03d}" - normalized_data = np.round(data * scaling_factor).astype(np.uint16) self.logger.debug( - "Normalized data maximum height: %s. Minimum height: %s. Number of unique values: %s.", - normalized_data.max(), - normalized_data.min(), - np.unique(normalized_data).size, + "Detected tile name: %s for coordinates: lat %s, lon %s.", tile_name, lat, lon ) - return normalized_data + return {"latitude_band": latitude_band, "tile_name": tile_name} + # endregion diff --git a/maps4fs/generator/dtm/usgs.py b/maps4fs/generator/dtm/usgs.py index 2741e94a..df2449d1 100644 --- a/maps4fs/generator/dtm/usgs.py +++ b/maps4fs/generator/dtm/usgs.py @@ -54,6 +54,11 @@ class USGSProvider(DTMProvider): ) + def download_tiles(self): + download_urls = self.get_download_urls() + all_tif_files = self.download_tif_files(download_urls) + return all_tif_files + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") @@ -138,214 +143,3 @@ def download_tif_files(self, urls: list[str]) -> list[str]: tif_files.append(file_path) return tif_files - - def merge_geotiff(self, input_files: list[str], output_file: str) -> None: - """Merge multiple GeoTIFF files into a single GeoTIFF file. - - Arguments: - input_files (list): List of input GeoTIFF files to merge. - output_file (str): Path to save the merged GeoTIFF file. - """ - # Open all input GeoTIFF files as datasets - self.logger.debug("Merging tiff files...") - datasets = [rasterio.open(file) for file in input_files] - - # Merge datasets - mosaic, out_transform = merge(datasets, nodata=0) - - # Get metadata from the first file and update it for the output - out_meta = datasets[0].meta.copy() - out_meta.update( - { - "driver": "GTiff", - "height": mosaic.shape[1], - "width": mosaic.shape[2], - "transform": out_transform, - "count": mosaic.shape[0], # Number of bands - } - ) - - # Write merged GeoTIFF to the output file - with rasterio.open(output_file, "w", **out_meta) as dest: - dest.write(mosaic) - - self.logger.debug("GeoTIFF images merged successfully into %s", output_file) - - def reproject_geotiff(self, input_tiff: str, output_tiff: str, target_crs: str) -> None: - """Reproject a GeoTIFF file to a new coordinate reference system (CRS). - - Arguments: - input_tiff (str): Path to the input GeoTIFF file. - output_tiff (str): Path to save the reprojected GeoTIFF file. - target_crs (str): Target CRS (e.g., EPSG:4326 for CRS:84). - """ - # Open the source GeoTIFF - self.logger.debug("Reprojecting GeoTIFF to %s CRS...", target_crs) - with rasterio.open(input_tiff) as src: - # Get the transform, width, and height of the target CRS - transform, width, height = calculate_default_transform( - src.crs, target_crs, src.width, src.height, *src.bounds - ) - - # Update the metadata for the target GeoTIFF - kwargs = src.meta.copy() - kwargs.update( - {"crs": target_crs, "transform": transform, "width": width, "height": height} - ) - - # Open the destination GeoTIFF file and reproject - with rasterio.open(output_tiff, "w", **kwargs) as dst: - for i in range(1, src.count + 1): # Iterate over all raster bands - reproject( - source=rasterio.band(src, i), - destination=rasterio.band(dst, i), - src_transform=src.transform, - src_crs=src.crs, - dst_transform=transform, - dst_crs=target_crs, - resampling=Resampling.nearest, # Choose resampling method - ) - self.logger.debug("Reprojected GeoTIFF saved to %s", output_tiff) - - def extract_roi(self, input_tiff: str) -> np.ndarray: # pylint: disable=W0237 - """ - Crop a GeoTIFF based on given geographic bounding box and save to a new file. - - Arguments: - input_tiff (str): Path to the input GeoTIFF file. - - Returns: - np.ndarray: Numpy array of the cropped GeoTIFF. - """ - self.logger.debug("Extracting ROI...") - # Open the input GeoTIFF - with rasterio.open(input_tiff) as src: - - # Create a rasterio window from the bounding box - (north, south, east, west) = self.get_bbox() - window = from_bounds(west, south, east, north, transform=src.transform) - - data = src.read(1, window=window) - self.logger.debug("Extracted ROI") - return data - - # pylint: disable=R0914, R0917, R0913 - def convert_geotiff_to_geotiff( - self, - input_tiff: str, - output_tiff: str, - min_height: float, - max_height: float, - target_crs: str, - ) -> None: - """ - Convert a GeoTIFF to a scaled GeoTIFF with UInt16 values using a specific coordinate - system and output size. - - Arguments: - input_tiff (str): Path to the input GeoTIFF file. - output_tiff (str): Path to save the output GeoTIFF file. - min_height (float): Minimum terrain height (input range). - max_height (float): Maximum terrain height (input range). - target_crs (str): Target CRS (e.g., EPSG:4326 for CRS:84). - """ - # Open the input GeoTIFF file - self.logger.debug("Converting to uint16") - with rasterio.open(input_tiff) as src: - # Ensure the input CRS matches the target CRS (reprojection may be required) - if str(src.crs) != str(target_crs): - raise ValueError( - f"The GeoTIFF CRS is {src.crs}, but the target CRS is {target_crs}. " - "Reprojection may be required." - ) - - # Read the data from the first band - data = src.read(1) # Assuming the input GeoTIFF has only a single band - - # Identify the input file's NoData value - input_nodata = src.nodata - if input_nodata is None: - input_nodata = -999999.0 # Default fallback if no NoData value is defined - nodata_value = 0 - # Replace NoData values (e.g., -999999.0) with the new NoData value - # (e.g., 65535 for UInt16) - data[data == input_nodata] = nodata_value - - # Scale the data to the 0–65535 range (UInt16), avoiding NoData areas - scaled_data = np.clip( - (data - min_height) * (65535 / (max_height - min_height)), 0, 65535 - ).astype(np.uint16) - scaled_data[data == nodata_value] = ( - nodata_value # Preserve NoData value in the scaled array - ) - - # Compute the proper transform to ensure consistency - # Get the original transform, width, and height - transform = src.transform - width = src.width - height = src.height - left, bottom, right, top = src.bounds - - # Adjust the transform matrix to make sure bounds and transform align correctly - transform = rasterio.transform.from_bounds(left, bottom, right, top, width, height) - - # Prepare metadata for the output GeoTIFF - metadata = src.meta.copy() - metadata.update( - { - "dtype": rasterio.uint16, # Update dtype for uint16 - "crs": target_crs, # Update CRS if needed - "nodata": nodata_value, # Set the new NoData value - "transform": transform, # Use the updated, consistent transform - } - ) - - # Write the scaled data to the output GeoTIFF - with rasterio.open(output_tiff, "w", **metadata) as dst: - dst.write(scaled_data, 1) # Write the first band - - self.logger.debug( - "GeoTIFF successfully converted and saved to %s, with nodata value: %s.", - output_tiff, - nodata_value, - ) - - def generate_data(self) -> np.ndarray: - """Generate data from the USGS 1m provider. - - Returns: - np.ndarray: Numpy array of the data. - """ - download_urls = self.get_download_urls() - all_tif_files = self.download_tif_files(download_urls) - self.merge_geotiff(all_tif_files, os.path.join(self.output_path, "merged.tif")) - self.reproject_geotiff( - os.path.join(self.output_path, "merged.tif"), - os.path.join(self.output_path, "reprojected.tif"), - "EPSG:4326", - ) - self.convert_geotiff_to_geotiff( - os.path.join(self.output_path, "reprojected.tif"), - os.path.join(self.output_path, "translated.tif"), - min_height=0, - max_height=self.user_settings.max_local_elevation, # type: ignore - target_crs="EPSG:4326", - ) - return self.extract_roi(os.path.join(self.output_path, "translated.tif")) - - def get_numpy(self) -> np.ndarray: - """Get numpy array of the tile. - - Returns: - np.ndarray: Numpy array of the tile. - """ - if not self.user_settings: - raise ValueError("user_settings is 'none'") - if self.user_settings.max_local_elevation <= 0: # type: ignore - raise ValueError( - "Entered 'max_local_elevation' value is unable to be used. " - "Use a value greater than 0." - ) - if not self._data: - self._data = self.generate_data() - return self._data diff --git a/webui/generator.py b/webui/generator.py index d75436cb..9dd2342a 100644 --- a/webui/generator.py +++ b/webui/generator.py @@ -233,6 +233,9 @@ def provider_info(self) -> None: provider_code = self.dtm_provider_code provider = mfs.DTMProvider.get_provider_by_code(provider_code) + if provider is None: + return + self.provider_settings = None self.provider_info_container.empty() sleep(0.1) @@ -245,6 +248,9 @@ def provider_info(self) -> None: if provider.contributors() is not None: st.write(f"Contributors: {provider.contributors()}") + if provider.base_instructions() is not None: + st.write(provider.base_instructions()) + if provider.instructions() is not None: st.write(provider.instructions()) @@ -336,7 +342,7 @@ def add_left_widgets(self) -> None: on_change=self.provider_info, ) self.provider_settings = None - self.provider_info_container = st.empty() + self.provider_info_container: st.delta_generator.DeltaGenerator = st.empty() self.provider_info() # Rotation input.