From be36e85f16c4a24ebdf809032946263ef27dbb07 Mon Sep 17 00:00:00 2001 From: Keegan Smith Date: Wed, 10 Jul 2024 08:50:20 +0800 Subject: [PATCH] Fix/metadata filename regex (#243) --- dags/oaebu_workflows/onix_utils.py | 18 ++++++++++++--- tests/test_onix_utils.py | 37 ++++++++++++++++++++++++++---- 2 files changed, 48 insertions(+), 7 deletions(-) diff --git a/dags/oaebu_workflows/onix_utils.py b/dags/oaebu_workflows/onix_utils.py index f02cee03..490b5e85 100644 --- a/dags/oaebu_workflows/onix_utils.py +++ b/dags/oaebu_workflows/onix_utils.py @@ -177,7 +177,7 @@ def _intermediate_file_path(self, file_name): return os.path.join(dir_, file_name) def _save_metadata(self, metadata: Union[List[dict], Mapping[str, Any]], file_path: str): - format = re.search(r"\.(.*)$", file_path).group(1) + format = get_file_ext(file_path) if format == "xml": if not isinstance(metadata, Mapping): raise TypeError(f"Metadata must be of type Mapping, instead got type {type(metadata)}") @@ -200,8 +200,7 @@ def _save_metadata(self, metadata: Union[List[dict], Mapping[str, Any]], file_pa self._current_md_path = file_path def _load_metadata(self, file_path: str): - fname = file_path.split("/")[-1] - format = re.search(r"\.(.*)$", fname).group(1) + format = get_file_ext(file_path) if format == "xml": with open(file_path, "rb") as f: metadata = xmltodict.parse(f) @@ -284,6 +283,19 @@ def _collapse_subjects(self): self._save_metadata(metadata, self._intermediate_file_path("collapsed.jsonl")) +def get_file_ext(file_path: str) -> str: + """Given a path to a file (or just the file itself), extracts the file extension. + E.g. if the input is some/path/my_file.txt, The extension returned will be "txt". + + :param file_path: The path to the file or the filename itself + :return: The file extension""" + fname = file_path.split("/")[-1] + try: + return re.search(r"\.(.*)$", fname).group(1) + except AttributeError: # No regex match + return "" + + def onix_parser_download(download_dir: str = observatory_home("bin")) -> Tuple[bool, str]: """Downloads the ONIX parser from Github diff --git a/tests/test_onix_utils.py b/tests/test_onix_utils.py index beed424b..8712cf2d 100644 --- a/tests/test_onix_utils.py +++ b/tests/test_onix_utils.py @@ -25,15 +25,16 @@ OnixTransformer, collapse_subjects, create_personname_fields, - onix_parser_download, - onix_parser_execute, + deduplicate_related_products, elevate_product_identifiers, - normalise_related_products, elevate_related_products, find_onix_product, filter_through_schema, + get_file_ext, + normalise_related_products, + onix_parser_download, + onix_parser_execute, remove_invalid_products, - deduplicate_related_products, ) from oaebu_workflows.config import test_fixtures_folder, schema_folder from observatory_platform.files import load_jsonl @@ -369,6 +370,34 @@ def test_duplicate_entries(self): self.assertEqual(result, expected_result) +class TestGetFileExt(unittest.TestCase): + """Tests the get_file_ext function""" + + def test_path_provided(self): + """Should return the extension if a filepath is passed""" + input = "my.path/to/my_file.txt" + expected_output = "txt" + actual_output = get_file_ext(input) + + self.assertEqual(expected_output, actual_output) + + def test_name_provided(self): + """Should return the extension if a file name is passed""" + input = "my_file.txt" + expected_output = "txt" + actual_output = get_file_ext(input) + + self.assertEqual(expected_output, actual_output) + + def test_no_ext(self): + """Should return an empty string if there is no extension""" + input = "my/path/to/my_file" + expected_output = "" + actual_output = get_file_ext(input) + + self.assertEqual(expected_output, actual_output) + + class TestElevateRelatedProducts(unittest.TestCase): """Tests the elevate_related_products function"""