diff --git a/earthspy/earthspy.py b/earthspy/earthspy.py index b0d9a22..67328b3 100644 --- a/earthspy/earthspy.py +++ b/earthspy/earthspy.py @@ -1,6 +1,6 @@ """ -@author: Adrien Wehrlé, EO-IO, University of Zurich, Switzerland +@authors: Adrien Wehrlé (EO-IO), Antsalacia """ @@ -84,6 +84,7 @@ def set_query_parameters( download_mode: str = "SM", remove_splitboxes: bool = True, verbose: bool = True, + raster_compression: str = None, ) -> None: """Define a set of parameters used for the API request. @@ -144,6 +145,11 @@ def set_query_parameters( :param verbose: Whether to print processing status or not, defaults to True. :type verbose: bool, optional + + + :param raster_compression: Raster compression to apply following methods + available in rasterio, defaults to None. + :type raster_compression: Union[None, str], optional """ # set processing attributes @@ -171,6 +177,9 @@ def set_query_parameters( # set and correct resolution self.set_correct_resolution() + # set compression method + self.get_raster_compression(raster_compression) + # set post-processing attributes self.get_evaluation_script(evaluation_script) self.get_store_folder(store_folder) @@ -187,6 +196,28 @@ def set_query_parameters( return None + def get_raster_compression(self, raster_compression: Union[None, str]) -> str: + """Get raster compression based on rasterio's available methods + + :return: Raster compression method + :rtype: Union[None, str] + """ + + # list rasterio compression algorithm and exclude dunders + rasterio_compression_algorithms = [ + m for m in dir(rasterio.enums.Compression) if not m.startswith("__") + ] + + # use rasterio compression method as is + if raster_compression is None: + self.raster_compression = None + elif raster_compression.lower() in rasterio_compression_algorithms: + self.raster_compression = raster_compression + else: + raise KeyError("Compression algorithm not found") + + return self.raster_compression + def get_data_collection(self) -> shb.DataCollection: """Get Sentinel Hub DataCollection object from data collection name. @@ -294,7 +325,6 @@ def get_raw_data_collection_resolution(self) -> int: :return: Data collection resolution. :rtype: int - """ # set default satellite resolution @@ -1072,11 +1102,19 @@ def merge_rasters(self) -> None: "transform": output_transform, } ) + + # update dictionary if compression set + if self.raster_compression is not None: + output_meta.update({"compress": self.raster_compression}) + + # extract scene id id_dict = {k: self.metadata[date][0][k] for k in ["id"]} + # write mosaic with rasterio.open(date_output_filename, "w", **output_meta) as dst: dst.write(mosaic) dst.update_tags(**id_dict) + # save file name of merged raster self.output_filenames_renamed.append(date_output_filename) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..a30424f --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,189 @@ +#!/usr/bin/env python3 +""" + +@authors: Adrien Wehrlé (EO-IO), Antsalacia + +""" + +import os + +import pytest + +import earthspy.earthspy as es + + +def pytest_addoption(parser): + """Add option to pass local path to authentication file""" + parser.addoption( + "--authfile", + action="store", + default="./auth.txt", + help="Full path to Sentinel Hub credential file containing ID and password", + ) + + +# if running in Github action +if os.getenv("CI") is not None: + + @pytest.fixture(scope="session") + def SH_CLIENT_ID() -> None: + """Create local client id from environment variable""" + # check if variable in environment variables + SH_CLIENT_ID = os.environ["SH_CLIENT_ID"] + return SH_CLIENT_ID + + @pytest.fixture(scope="session") + def SH_CLIENT_SECRET() -> None: + """Create local client secret from environment variable""" + # check if variable in environment variables + SH_CLIENT_SECRET = os.environ["SH_CLIENT_SECRET"] + return SH_CLIENT_SECRET + + # path to credential file to be created + @pytest.fixture(scope="session") + def authfile(SH_CLIENT_ID, SH_CLIENT_SECRET): + """Set credential file name and create credential file + for testing""" + authfile = "auth.txt" + with open(authfile, "w") as out: + out.write(f"{SH_CLIENT_ID}\n{SH_CLIENT_SECRET}") + return authfile + + +# if running locally +else: + + @pytest.fixture(scope="session") + def authfile(pytestconfig): + """Get option from command line call""" + return pytestconfig.getoption("authfile") + + @pytest.fixture(scope="session") + def credentials(authfile): + """Read credentials stored in text file""" + + with open(authfile) as file: + credentials = file.read().splitlines() + return credentials + + @pytest.fixture(scope="session") + def SH_CLIENT_ID(credentials) -> None: + """Extract client id from line""" + SH_CLIENT_ID = credentials[0] + return SH_CLIENT_ID + + @pytest.fixture(scope="session") + def SH_CLIENT_SECRET(credentials) -> None: + """Extract client secret from line""" + SH_CLIENT_SECRET = credentials[1] + return SH_CLIENT_SECRET + + +@pytest.fixture(scope="session") +def test_evalscript(): + """Set a test evalscript for Sentinel-2""" + test_evalscript = """ + //VERSION=3 + function setup(){ + return{ + input: ["B02", "B03", "B04", "dataMask"], + output: {bands: 4} + } + } + function evaluatePixel(sample){ + // Set gain for visualisation + let gain = 2.5; + // Return RGB + return [sample.B04 * gain, sample.B03 * gain, sample.B02 * gain, + sample.dataMask]; + } + """ + return test_evalscript + + +@pytest.fixture(scope="session") +def test_url(): + """Set a test evalscript pointing to Sentinel-2 True Color""" + test_url = ( + "https://custom-scripts.sentinel-hub.com/custom-scripts/" + + "sentinel-2/true_color/script.js" + ) + return test_url + + +@pytest.fixture(scope="session") +def test_collection(): + """Set a test data collection""" + test_collection = "SENTINEL2_L2A" + return test_collection + + +@pytest.fixture(scope="session") +def test_bounding_box(): + """Set a test footprint area (bounding box)""" + test_bounding_box = [-51.13, 69.204, -51.06, 69.225] + return test_bounding_box + + +@pytest.fixture(scope="session") +def test_area_name(): + """Set a test area available as geojson file""" + test_area_name = "Ilulissat" + return test_area_name + + +@pytest.fixture(scope="session") +def t1(authfile, test_evalscript, test_collection, test_bounding_box): + """Set a test query with default parameters""" + t1 = es.EarthSpy(authfile) + t1.set_query_parameters( + bounding_box=test_bounding_box, + time_interval=["2019-08-23"], + evaluation_script=test_evalscript, + data_collection=test_collection, + download_mode="SM", + ) + return t1 + + +@pytest.fixture(scope="session") +def t2(authfile, test_evalscript, test_collection, test_area_name): + """Set a test query with area name""" + t2 = es.EarthSpy(authfile) + t2.set_query_parameters( + bounding_box=test_area_name, + time_interval=["2019-08-23"], + evaluation_script=test_evalscript, + data_collection=test_collection, + download_mode="SM", + ) + return t2 + + +@pytest.fixture(scope="session") +def t3(authfile, test_evalscript, test_collection, test_bounding_box): + """Set a test query with direct download mode""" + t3 = es.EarthSpy(authfile) + t3.set_query_parameters( + bounding_box=test_bounding_box, + time_interval=["2019-08-23"], + evaluation_script=test_evalscript, + data_collection=test_collection, + download_mode="D", + ) + return t3 + + +@pytest.fixture(scope="session") +def t4(authfile, test_evalscript, test_collection, test_bounding_box): + """Set a test query with LZW raster compression""" + t4 = es.EarthSpy(authfile) + t4.set_query_parameters( + bounding_box=test_bounding_box, + time_interval=["2019-08-23"], + evaluation_script=test_evalscript, + data_collection=test_collection, + download_mode="SM", + raster_compression="LZW", + ) + return t4 diff --git a/tests/test_earthspy.py b/tests/test_earthspy.py index 4a9b678..8f0398c 100644 --- a/tests/test_earthspy.py +++ b/tests/test_earthspy.py @@ -1,396 +1,301 @@ #!/usr/bin/env python3 """ -@author: Adrien Wehrlé, EO-IO, University of Zurich, Switzerland +@authors: Adrien Wehrlé (EO-IO), Antsalacia """ -import glob -import json -import os - import numpy as np import pandas as pd import requests import sentinelhub as shb -import earthspy.earthspy as es - - -class TestEarthspy: - # create local variables from environment secrets for convenience - SH_CLIENT_ID = os.environ["SH_CLIENT_ID"] - SH_CLIENT_SECRET = os.environ["SH_CLIENT_SECRET"] - - # path to credential file to be created - authfile = "auth.txt" - - # create file containing credentials for testing - with open(authfile, "w") as out: - out.write(f"{SH_CLIENT_ID}\n{SH_CLIENT_SECRET}") - - # an example of custom script - test_evalscript = """ - //VERSION=3 - function setup(){ - return{ - input: ["B02", "B03", "B04", "dataMask"], - output: {bands: 4} - } - } - function evaluatePixel(sample){ - // Set gain for visualisation - let gain = 2.5; - // Return RGB - return [sample.B04 * gain, sample.B03 * gain, sample.B02 * gain, - sample.dataMask]; - } - """ - - # an example of custom script URL - test_url = ( - "https://custom-scripts.sentinel-hub.com/custom-scripts/" - + "sentinel-2/true_color/script.js" - ) - - # an example of data collection - test_collection = "SENTINEL2_L2A" - - # an example of footprint area - test_bounding_box = [-51.13, 69.204, -51.06, 69.225] - - # an example of area available as GEOJSON file - test_area_name = "Ilulissat" - - print(os.getcwd()) - - # example of query with default parameters - t1 = es.EarthSpy(authfile) - t1.set_query_parameters( - bounding_box=test_bounding_box, - time_interval=["2019-08-23"], - evaluation_script=test_evalscript, - data_collection=test_collection, - download_mode="SM", - ) - - # example of query with direct area name - t2 = es.EarthSpy(authfile) - t2.set_query_parameters( - bounding_box=test_area_name, - time_interval=["2019-08-23"], - evaluation_script=test_evalscript, - data_collection=test_collection, - download_mode="SM", - ) - - # example of query with direct mode - t3 = es.EarthSpy(authfile) - t3.set_query_parameters( - bounding_box=test_bounding_box, - time_interval=["2019-08-23"], - evaluation_script=test_evalscript, - data_collection=test_collection, - download_mode="D", - ) - - def test_init(self) -> None: - """Test auth.txt parsing and connection configuration.""" - - # check for credentials - assert self.t1.CLIENT_ID == self.SH_CLIENT_ID - assert self.t1.CLIENT_SECRET == self.SH_CLIENT_SECRET - - # check if connection was properly setup - assert isinstance(self.t1.config, shb.config.SHConfig) - assert self.t1.config.sh_client_id == self.SH_CLIENT_ID - assert self.t1.config.sh_client_secret == self.SH_CLIENT_SECRET - - def test_set_query_parameters(self) -> None: - """Test direct attribute assignment.""" - - # check if attributes were set accordingly - assert self.t1.download_mode is not None - assert self.t1.download_mode == "SM" - assert self.t1.verbose - assert self.t1.data_collection_str == self.test_collection - assert isinstance( - self.t1.user_date_range, pd.core.indexes.datetimes.DatetimeIndex - ) - assert isinstance(self.t1.evaluation_script, str) - - def test_get_data_collection(self) -> None: - """Test data collection selection.""" - - # check if data collection was set properly - assert self.t1.data_collection == shb.DataCollection[self.test_collection] - assert isinstance(self.t1.data_collection, shb.DataCollection) - - def test_get_satellite_name(self) -> None: - """Test satellite name extraction""" - - # check if satellite name was set properly - assert isinstance(self.t1.satellite, str) - assert self.t1.satellite == "SENTINEL2" - - def test_get_raw_data_collection_resolution(self) -> None: - """Test resolution selection""" - - # check if data resolution was set correctly - assert self.t1.raw_data_collection_resolution == 10 - assert self.t2.raw_data_collection_resolution == 10 - assert self.t3.raw_data_collection_resolution == 10 - - def test_set_number_of_cores(self) -> None: - """Test selection of number of cores for multiprocessing""" - - # check if number of cores was set correctly - assert isinstance(self.t1.nb_cores, int) - assert isinstance(self.t2.nb_cores, int) - assert isinstance(self.t3.nb_cores, int) - - def test_get_date_range(self) -> None: - """Test datetime object creation""" - - d1 = self.t1.get_date_range(time_interval=3) - # check if date from present was set accordingly - assert isinstance(d1, pd.DatetimeIndex) - - d2a = self.t1.get_date_range(time_interval="2019-08-01") - d2b = self.t2.get_date_range(time_interval="2019-08-01") - d2c = self.t3.get_date_range(time_interval="2019-08-01") - # check if single date (str) was set accordingly - assert d2a == pd.date_range("2019-08-01", "2019-08-01") - assert d2b == pd.date_range("2019-08-01", "2019-08-01") - assert d2c == pd.date_range("2019-08-01", "2019-08-01") - - d3a = self.t1.get_date_range(time_interval=["2019-08-01"]) - d3b = self.t2.get_date_range(time_interval=["2019-08-01"]) - d3c = self.t3.get_date_range(time_interval=["2019-08-01"]) - # check if single date (list) was set accordingly - assert d3a == pd.date_range("2019-08-01", "2019-08-01") - assert d3b == pd.date_range("2019-08-01", "2019-08-01") - assert d3c == pd.date_range("2019-08-01", "2019-08-01") - - d4a = self.t1.get_date_range( - time_interval=["2019-08-01", "2019-08-02", "2019-08-03"] - ) - d4b = self.t2.get_date_range( - time_interval=["2019-08-01", "2019-08-02", "2019-08-03"] - ) - d4c = self.t3.get_date_range( - time_interval=["2019-08-01", "2019-08-02", "2019-08-03"] - ) - # check if a list of dates was set accordingly - pd.testing.assert_index_equal(d4a, pd.date_range("2019-08-01", "2019-08-03")) - pd.testing.assert_index_equal(d4b, pd.date_range("2019-08-01", "2019-08-03")) - pd.testing.assert_index_equal(d4c, pd.date_range("2019-08-01", "2019-08-03")) - - def test_get_bounding_box(self) -> None: - """Test bounding box creation""" - - bb1 = self.t1.get_bounding_box(bounding_box=self.test_bounding_box) - # check if a Sentinel Hub bounding box was created - assert isinstance(bb1, shb.geometry.BBox) - - bb2 = self.t2.get_bounding_box(bounding_box=self.test_area_name) - # check if a Sentinel Hub bounding box was created - assert isinstance(bb2, shb.geometry.BBox) - area_coordinates = np.array(bb2.geojson["coordinates"][0]) - area_bounding_box = [ - np.nanmin(area_coordinates[:, 0]), - np.nanmin(area_coordinates[:, 1]), - np.nanmax(area_coordinates[:, 0]), - np.nanmax(area_coordinates[:, 1]), - ] - # check if setting Ilulissat bounding_box with coordinates gives - # the same bounding_box just like calling its area name - assert area_bounding_box == self.test_bounding_box - - def test_get_store_folder(self) -> None: - """Test store folder selection""" - - sf1 = self.t1.get_store_folder(None) - # # check if default store folder was set accordingly - assert isinstance(sf1, str) - - sf2 = self.t1.get_store_folder(store_folder="./test/path") - # # check if passed store folder was set accordingly - assert isinstance(sf2, str) - # # check the actual string - assert sf2 == "./test/path" - - def test_convert_bounding_box_coordinates(self) -> None: - """Test bounding box conversion""" - - self.t1.convert_bounding_box_coordinates() - # check if a new Sentinel Hub bounding box was created - assert isinstance(self.t1.bounding_box_UTM, shb.geometry.BBox) - # check if the right CRS was assigned - assert self.t1.bounding_box_UTM.crs == shb.CRS("32622") - # check if the right CRS was assigned - assert self.t1.bounding_box.crs == shb.CRS("4326") - # check if a bounding box list was created - assert isinstance(self.t1.bounding_box_UTM_list, list) - # check that all items of the list are floats - assert all(isinstance(item, float) for item in self.t1.bounding_box_UTM_list) - # check that all coordinates were included - assert len(self.t1.bounding_box_UTM_list) == 4 - - def test_get_max_resolution(self) -> None: - """Test maximum resolution computation""" - - mr1 = self.t1.get_max_resolution() - # check that maximum resolution was set correctly - assert isinstance(mr1, np.int64) - assert mr1 == 11 - - def test_set_correct_resolution(self) -> None: - """Test resolution refinement""" - - r1 = self.t1.set_correct_resolution() - # check that query resolution was set correctly - assert r1 == 10 - # check that download mode was set correctly - assert isinstance(self.t1.download_mode, str) - - r2 = self.t3.set_correct_resolution() - # check that query resolution was set correctly - assert r2 == 11 - # check that download mode was set correctly - assert isinstance(self.t3.download_mode, str) - - def test_list_requests(self) -> None: - """Test request listing""" - - lr1 = self.t1.list_requests() - # check that a list was created accordingly - assert isinstance(lr1, list) - assert len(lr1) == 4 - assert len(lr1) == len(self.t1.split_boxes) - # check that a list of Sentinel Hub requests was created - assert all(isinstance(item, shb.SentinelHubRequest) for item in lr1) - - lr2 = self.t3.list_requests() - # check that a list was created accordingly - assert isinstance(lr2, list) - assert len(lr2) == 1 - assert len(lr2) == len(self.t3.split_boxes) - # check that a list of Sentinel Hub requests was created - assert isinstance(lr2[0], shb.SentinelHubRequest) - - def test_get_split_boxes(self) -> None: - """Test split box creation""" - - sb1 = self.t1.get_split_boxes() - # check that a list of split boxes was created - assert isinstance(sb1, list) - # check that each split box is a Sentinel Hub bounding box - assert all(isinstance(item, shb.geometry.BBox) for item in sb1) - # check that each split box is in the correct projection - assert all(item.crs == shb.CRS("32622") for item in sb1) - - # check that only one box has been created - assert len(self.t3.split_boxes) == 1 - # check that the split box is in the right projection - assert self.t3.split_boxes[0].crs == shb.CRS("4326") - - def test_get_evaluation_script_from_link(self) -> None: - """Test custom script extraction from URL""" - - es1 = self.t1.get_evaluation_script_from_link(self.test_url) - # check that evalscript was set accordingly - assert es1 == requests.get(self.test_url).text - - def test_set_split_boxes_ids(self) -> None: - """Test split box ID generation""" - - sbi1 = self.t1.set_split_boxes_ids() - # check that split box ids were saved in dictionary - assert isinstance(sbi1, dict) - # check that dictionary has the right shape - assert len(sbi1) == 4 - - def test_get_evaluation_script(self) -> None: - """Test evaluation script extraction""" - - es1 = self.t1.get_evaluation_script(None) - # check that default evalscript was set accordingly - assert es1 == requests.get(self.test_url).text - - es2 = self.t1.get_evaluation_script(self.test_evalscript) - # check that passed evalscript was set correctly - assert isinstance(es2, str) - - def test_sentinelhub_request(self) -> None: - """Test API request generation""" - - sr1 = self.t1.sentinelhub_request( - self.t1.user_date_range[0], self.t1.split_boxes[0] - ) - # # check that a Sentinel Hub request was created - assert isinstance(sr1, shb.SentinelHubRequest) - - sr2 = self.t3.sentinelhub_request( - self.t3.user_date_range[0], self.t3.split_boxes[0] - ) - # # check that a Sentinel Hub request was created - assert isinstance(sr2, shb.SentinelHubRequest) - - def test_geojson_files(self) -> None: - """Test fields of GEOJSON files""" - - # list all geojson files available in earthspy - geojson_files = glob.glob("./data/*") - - # check that files were found - assert len(geojson_files) > 0 - - for file in geojson_files: - - with open(file, "r+") as f: - data = json.load(f) - - # extract coordinates of geometry nodes - geometry_coordinates = np.array( - data["features"][0]["geometry"]["coordinates"][0] - ) - - # check if feature is a polygon - assert data["features"][0]["geometry"]["type"] == "Polygon" - - # check if coordinates match a 4-point square - assert geometry_coordinates.shape == (5, 2) - - # check if the coordinates are valid longitude/latitude coordinates - assert ((geometry_coordinates >= -90) & (geometry_coordinates <= 90)).all() - - # check if first coordinate is repeated on the last element - assert geometry_coordinates[0, 0] == geometry_coordinates[-1, 0] - - def test_rename_output_files(self) -> None: - """Test output renaming""" - - # self.t4.send_sentinelhub_requests() - # # check that a list of file names was created - # assert all(isinstance(item, str) for item in self.t4.output_filenames) - # # check that one file name per split box was created - # assert len(self.t4.output_filenames) == len(self.t4.split_boxes) - def test_send_sentinelhub_requests(self) -> None: - """Test API outputs""" +def test_init(t1, SH_CLIENT_ID, SH_CLIENT_SECRET) -> None: + """Test auth.txt parsing and connection configuration.""" + + # check for credentials + assert t1.CLIENT_ID == SH_CLIENT_ID + assert t1.CLIENT_SECRET == SH_CLIENT_SECRET + + # check if connection was properly setup + assert isinstance(t1.config, shb.config.SHConfig) + assert t1.config.sh_client_id == SH_CLIENT_ID + assert t1.config.sh_client_secret == SH_CLIENT_SECRET + + +def test_set_query_parameters(t1, test_collection) -> None: + """Test direct attribute assignment.""" + + # check if attributes were set accordingly + assert t1.download_mode is not None + assert t1.download_mode == "SM" + assert t1.verbose + assert t1.data_collection_str == test_collection + assert isinstance(t1.user_date_range, pd.core.indexes.datetimes.DatetimeIndex) + assert isinstance(t1.evaluation_script, str) + + +def test_get_data_collection(t1, test_collection) -> None: + """Test data collection selection.""" + + # check if data collection was set properly + assert t1.data_collection == shb.DataCollection[test_collection] + assert isinstance(t1.data_collection, shb.DataCollection) + + +def test_get_satellite_name(t1) -> None: + """Test satellite name extraction""" + + # check if satellite name was set properly + assert isinstance(t1.satellite, str) + assert t1.satellite == "SENTINEL2" + + +def test_get_raw_data_collection_resolution(t1, t2, t3) -> None: + """Test resolution selection""" + + # check if data resolution was set correctly + assert t1.raw_data_collection_resolution == 10 + assert t2.raw_data_collection_resolution == 10 + assert t3.raw_data_collection_resolution == 10 + + +def test_set_number_of_cores(t1, t2, t3) -> None: + """Test selection of number of cores for multiprocessing""" + + # check if number of cores was set correctly + assert isinstance(t1.nb_cores, int) + assert isinstance(t2.nb_cores, int) + assert isinstance(t3.nb_cores, int) + + +def test_get_date_range(t1, t2, t3) -> None: + """Test datetime object creation""" + + d1 = t1.get_date_range(time_interval=3) + # check if date from present was set accordingly + assert isinstance(d1, pd.DatetimeIndex) + + d2a = t1.get_date_range(time_interval="2019-08-01") + d2b = t2.get_date_range(time_interval="2019-08-01") + d2c = t3.get_date_range(time_interval="2019-08-01") + # check if single date (str) was set accordingly + assert d2a == pd.date_range("2019-08-01", "2019-08-01") + assert d2b == pd.date_range("2019-08-01", "2019-08-01") + assert d2c == pd.date_range("2019-08-01", "2019-08-01") + + d3a = t1.get_date_range(time_interval=["2019-08-01"]) + d3b = t2.get_date_range(time_interval=["2019-08-01"]) + d3c = t3.get_date_range(time_interval=["2019-08-01"]) + # check if single date (list) was set accordingly + assert d3a == pd.date_range("2019-08-01", "2019-08-01") + assert d3b == pd.date_range("2019-08-01", "2019-08-01") + assert d3c == pd.date_range("2019-08-01", "2019-08-01") + + d4a = t1.get_date_range(time_interval=["2019-08-01", "2019-08-02", "2019-08-03"]) + d4b = t2.get_date_range(time_interval=["2019-08-01", "2019-08-02", "2019-08-03"]) + d4c = t3.get_date_range(time_interval=["2019-08-01", "2019-08-02", "2019-08-03"]) + # check if a list of dates was set accordingly + pd.testing.assert_index_equal(d4a, pd.date_range("2019-08-01", "2019-08-03")) + pd.testing.assert_index_equal(d4b, pd.date_range("2019-08-01", "2019-08-03")) + pd.testing.assert_index_equal(d4c, pd.date_range("2019-08-01", "2019-08-03")) + + +def test_get_bounding_box(t1, t2, test_bounding_box, test_area_name) -> None: + """Test bounding box creation""" + + bb1 = t1.get_bounding_box(bounding_box=test_bounding_box) + # check if a Sentinel Hub bounding box was created + assert isinstance(bb1, shb.geometry.BBox) + + bb2 = t2.get_bounding_box(bounding_box=test_area_name) + # check if a Sentinel Hub bounding box was created + assert isinstance(bb2, shb.geometry.BBox) + area_coordinates = np.array(bb2.geojson["coordinates"][0]) + area_bounding_box = [ + np.nanmin(area_coordinates[:, 0]), + np.nanmin(area_coordinates[:, 1]), + np.nanmax(area_coordinates[:, 0]), + np.nanmax(area_coordinates[:, 1]), + ] + # check if setting Ilulissat bounding_box with coordinates gives + # the same bounding_box just like calling its area name + assert area_bounding_box == test_bounding_box + + +def test_get_store_folder(t1) -> None: + """Test store folder selection""" + + sf1 = t1.get_store_folder(None) + # # check if default store folder was set accordingly + assert isinstance(sf1, str) + + sf2 = t1.get_store_folder(store_folder="./test/path") + # # check if passed store folder was set accordingly + assert isinstance(sf2, str) + # # check the actual string + assert sf2 == "./test/path" + + +def test_convert_bounding_box_coordinates(t1) -> None: + """Test bounding box conversion""" + + t1.convert_bounding_box_coordinates() + # check if a new Sentinel Hub bounding box was created + assert isinstance(t1.bounding_box_UTM, shb.geometry.BBox) + # check if the right CRS was assigned + assert t1.bounding_box_UTM.crs == shb.CRS("32622") + # check if the right CRS was assigned + assert t1.bounding_box.crs == shb.CRS("4326") + # check if a bounding box list was created + assert isinstance(t1.bounding_box_UTM_list, list) + # check that all items of the list are floats + assert all(isinstance(item, float) for item in t1.bounding_box_UTM_list) + # check that all coordinates were included + assert len(t1.bounding_box_UTM_list) == 4 + + +def test_get_max_resolution(t1) -> None: + """Test maximum resolution computation""" + + mr1 = t1.get_max_resolution() + # check that maximum resolution was set correctly + assert isinstance(mr1, np.int64) + assert mr1 == 11 + + +def test_set_correct_resolution(t1, t3) -> None: + """Test resolution refinement""" + + r1 = t1.set_correct_resolution() + # check that query resolution was set correctly + assert r1 == 10 + # check that download mode was set correctly + assert isinstance(t1.download_mode, str) + + r2 = t3.set_correct_resolution() + # check that query resolution was set correctly + assert r2 == 11 + # check that download mode was set correctly + assert isinstance(t3.download_mode, str) + + +def test_list_requests(t1, t3) -> None: + """Test request listing""" + + lr1 = t1.list_requests() + # check that a list was created accordingly + assert isinstance(lr1, list) + assert len(lr1) == 4 + assert len(lr1) == len(t1.split_boxes) + # check that a list of Sentinel Hub requests was created + assert all(isinstance(item, shb.SentinelHubRequest) for item in lr1) + + lr2 = t3.list_requests() + # check that a list was created accordingly + assert isinstance(lr2, list) + assert len(lr2) == 1 + assert len(lr2) == len(t3.split_boxes) + # check that a list of Sentinel Hub requests was created + assert isinstance(lr2[0], shb.SentinelHubRequest) + + +def test_get_split_boxes(t1, t3) -> None: + """Test split box creation""" + + sb1 = t1.get_split_boxes() + # check that a list of split boxes was created + assert isinstance(sb1, list) + # check that each split box is a Sentinel Hub bounding box + assert all(isinstance(item, shb.geometry.BBox) for item in sb1) + # check that each split box is in the correct projection + assert all(item.crs == shb.CRS("32622") for item in sb1) + + # check that only one box has been created + assert len(t3.split_boxes) == 1 + # check that the split box is in the right projection + assert t3.split_boxes[0].crs == shb.CRS("4326") + + +def test_get_evaluation_script_from_link(t1, test_url) -> None: + """Test custom script extraction from URL""" + + es1 = t1.get_evaluation_script_from_link(test_url) + # check that evalscript was set accordingly + assert es1 == requests.get(test_url).text + + +def test_set_split_boxes_ids(t1) -> None: + """Test split box ID generation""" + + sbi1 = t1.set_split_boxes_ids() + # check that split box ids were saved in dictionary + assert isinstance(sbi1, dict) + # check that dictionary has the right shape + assert len(sbi1) == 4 + + +def test_get_evaluation_script(t1, test_url, test_evalscript) -> None: + """Test evaluation script extraction""" + + es1 = t1.get_evaluation_script(None) + # check that default evalscript was set accordingly + assert es1 == requests.get(test_url).text + + es2 = t1.get_evaluation_script(test_evalscript) + # check that passed evalscript was set correctly + assert isinstance(es2, str) + + +def test_sentinelhub_request(t1, t3) -> None: + """Test API request generation""" + + sr1 = t1.sentinelhub_request(t1.user_date_range[0], t1.split_boxes[0]) + # # check that a Sentinel Hub request was created + assert isinstance(sr1, shb.SentinelHubRequest) + + sr2 = t3.sentinelhub_request(t3.user_date_range[0], t3.split_boxes[0]) + # # check that a Sentinel Hub request was created + assert isinstance(sr2, shb.SentinelHubRequest) + + +def test_rename_output_files() -> None: + """Test output renaming""" + + # t4.send_sentinelhub_requests() + # # check that a list of file names was created + # assert all(isinstance(item, str) for item in t4.output_filenames) + # # check that one file name per split box was created + # assert len(t4.output_filenames) == len(t4.split_boxes) + + +def test_send_sentinelhub_requests() -> None: + """Test API outputs""" + + # t5.send_sentinelhub_requests() + # # check that a list of raw file names was created + # assert all(isinstance(item, str) for item in t5.raw_filenames) + # # check that one raw file name per split box was created + # assert len(t5.raw_filenames) == len(t5.split_boxes) + + +def test_merge_rasters() -> None: + """Test raster merge""" + + # t3.send_sentinelhub_requests() + # # check that a list of renamed file names was created + # assert all(isinstance(item, str) for item in t3.output_filenames_renamed) + # # check that one output per split box was created + # assert len(t3.outputs) == len(t3.split_boxes) - # self.t5.send_sentinelhub_requests() - # # check that a list of raw file names was created - # assert all(isinstance(item, str) for item in self.t5.raw_filenames) - # # check that one raw file name per split box was created - # assert len(self.t5.raw_filenames) == len(self.t5.split_boxes) - def test_merge_rasters(self) -> None: - """Test raster merge""" +def test_get_raster_compression(t3, t4) -> None: + """Test raster compression""" + # check default raster compression + comp_def = t3.raster_compression + assert comp_def is None - # self.t3.send_sentinelhub_requests() - # # check that a list of renamed file names was created - # assert all(isinstance(item, str) for item in self.t3.output_filenames_renamed) - # # check that one output per split box was created - # assert len(self.t3.outputs) == len(self.t3.split_boxes) + # check raster compression when LZW compression is specified + comp_cust = t4.raster_compression + assert comp_cust == "LZW"