From 616645c34113221530bba9017cc7bcea2c993652 Mon Sep 17 00:00:00 2001 From: mattiagiupponi <51856725+mattiagiupponi@users.noreply.github.com> Date: Thu, 5 Oct 2023 13:12:49 +0200 Subject: [PATCH] Test fix CircleCi test (#11560) * Test fix CircleCi test * Test fix CircleCi test * Test fix CircleCi test --- geonode/upload/api/tests.py | 80 ++++++++++++++++++++++++++----------- 1 file changed, 57 insertions(+), 23 deletions(-) diff --git a/geonode/upload/api/tests.py b/geonode/upload/api/tests.py index 8f84fec6a92..81cd4a3ea3f 100644 --- a/geonode/upload/api/tests.py +++ b/geonode/upload/api/tests.py @@ -17,7 +17,9 @@ # ######################################################################### +from geonode.base.models import ResourceBase from geonode.resource.models import ExecutionRequest +from geonode.geoserver.helpers import gs_catalog import os import shutil import logging @@ -233,35 +235,67 @@ def test_rest_uploads(self): """ Ensure we can access the Local Server Uploads list. """ - # Try to upload a good raster file and check the session IDs - fname = os.path.join(GOOD_DATA, "raster", "relief_san_andres.tif") - resp, data = rest_upload_by_path(fname, self.client) - self.assertEqual(resp.status_code, 201) - - url = reverse("uploads-list") - # Anonymous - self.client.logout() - response = self.client.get(url, format="json") - self.assertEqual(response.status_code, 200) - self.assertEqual(len(response.data), 5) - self.assertEqual(response.data["total"], 0) - # Pagination - self.assertEqual(len(response.data["uploads"]), 0) - logger.debug(response.data) + resp = None + layer_name = "relief_san_andres" + try: + self._cleanup_layer(layer_name=layer_name) + # Try to upload a good raster file and check the session IDs + fname = os.path.join(GOOD_DATA, "raster", "relief_san_andres.tif") + resp, data = rest_upload_by_path(fname, self.client) + self.assertEqual(resp.status_code, 201) + + url = reverse("uploads-list") + # Anonymous + self.client.logout() + response = self.client.get(url, format="json") + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.data), 5) + self.assertEqual(response.data["total"], 0) + # Pagination + self.assertEqual(len(response.data["uploads"]), 0) + logger.debug(response.data) + except Exception: + if resp.json().get("errors"): + layer_name = resp.json().get("errors")[0].split("for : ")[1].split(",")[0] + finally: + self._cleanup_layer(layer_name) @override_settings(CELERY_TASK_ALWAYS_EAGER=True) def test_rest_uploads_non_interactive(self): """ Ensure we can access the Local Server Uploads list. """ - # Try to upload a good raster file and check the session IDs - fname = os.path.join(GOOD_DATA, "raster", "relief_san_andres.tif") - resp, data = rest_upload_by_path(fname, self.client, non_interactive=True) - self.assertEqual(resp.status_code, 201) - - exec_id = data.get("execution_id", None) - _exec = ExecutionRequest.objects.get(exec_id=exec_id) - self.assertEqual(_exec.status, "finished") + resp = None + layer_name = "relief_san_andres" + try: + self._cleanup_layer(layer_name=layer_name) + # Try to upload a good raster file and check the session IDs + fname = os.path.join(GOOD_DATA, "raster", "relief_san_andres.tif") + resp, data = rest_upload_by_path(fname, self.client, non_interactive=True) + self.assertEqual(resp.status_code, 201) + exec_id = data.get("execution_id", None) + _exec = ExecutionRequest.objects.get(exec_id=exec_id) + self.assertEqual(_exec.status, "finished") + except Exception: + if resp.json().get("errors"): + layer_name = resp.json().get("errors")[0].split("for : ")[1].split(",")[0] + finally: + self._cleanup_layer(layer_name) + + def _cleanup_layer(self, layer_name): + # removing the layer from geonode + x = ResourceBase.objects.filter(alternate__icontains=layer_name) + if x.exists(): + for el in x.iterator(): + el.delete() + # removing the layer from geoserver + dataset = gs_catalog.get_layer(layer_name) + if dataset: + gs_catalog.delete(dataset, purge="all", recurse=True) + # removing the layer from geoserver + store = gs_catalog.get_store(layer_name, workspace="geonode") + if store: + gs_catalog.delete(store, purge="all", recurse=True) @mock.patch("geonode.upload.uploadhandler.SimpleUploadedFile") def test_rest_uploads_with_size_limit(self, mocked_uploaded_file):