From 95d979c9ce53f37ead64b711c1b9015dedf5b791 Mon Sep 17 00:00:00 2001 From: Zeno Dalla Valle Date: Sun, 9 Apr 2023 11:01:19 +0200 Subject: [PATCH] improved smart_from algorithm --- italy_geopop/__version__.py | 2 +- italy_geopop/_utils.py | 83 ++++++++++++++++++++++++++++---- italy_geopop/pandas_extension.py | 9 ++-- tests/test_pandas_extension.py | 27 ++++++++--- 4 files changed, 99 insertions(+), 22 deletions(-) diff --git a/italy_geopop/__version__.py b/italy_geopop/__version__.py index 3d18726..dd9b22c 100644 --- a/italy_geopop/__version__.py +++ b/italy_geopop/__version__.py @@ -1 +1 @@ -__version__ = "0.5.0" +__version__ = "0.5.1" diff --git a/italy_geopop/_utils.py b/italy_geopop/_utils.py index 412ba9b..b8e1748 100644 --- a/italy_geopop/_utils.py +++ b/italy_geopop/_utils.py @@ -2,7 +2,7 @@ from itertools import pairwise import pandas as pd import numpy as np -from typing import Any, Callable, Iterable +from typing import Any, Callable, Iterable, Optional from warnings import warn import re @@ -57,24 +57,89 @@ def wrapper(*args, **kwargs) -> Any: return wrapper -def match_single_word(words: Iterable[str], text: str) -> str | None: - """return the word, taken from a list of words, that is found in text only if it's the only match. Word is searched as "exact word match". +def _match_every_word(words: Iterable[str], text: str) -> bool: + """return True if every word in words is found in text, False otherwise. Word is searched as "exact word match" and with "case-insensitive" flag. :param words: a list or iterable of words to be searched into text. :type words: Iterable[str] :param text: the source text into words are searched. :type text: str - :return: the word, taken from a list of words, that is found in text only if it's the only match. + :return: True if every word in words is found in text, False otherwise. + :rtype: bool + """ + for word in words: + if not re.search(r'\b{}\b'.format(word), text, flags=re.IGNORECASE): + return False + return True + + +def match_single_key( + keys: Iterable[str], + text: str, + _split_key: bool = False, + _return_values: Optional[Iterable[str]] = None, +) -> str | None: + """return the key, taken from a list of keys, that is found in text only if it's the only match. + Key is searched as "exact key match" and with "case-insensitive" flag. + + If no matches are found every key is splitted into: + - sinonims using '/' as separator and then search for every sinonim in text if '/' is found in key. + - words using '\W' regex as separator and then search for every word whose length is > 2 in text. If every word is found, the original key is returned. + + :param keys: a list or iterable of keys to be searched into text. + :type keys: Iterable[str] + :param text: the source text into keys are searched. + :type text: str + + :return: the key, taken from a list of keys, that is found in text only if it's the only match. :rtype: str | None """ + if _return_values is None: + return_values = keys + else: + return_values = _return_values + return_dict = dict(zip(keys, return_values)) n_matches = 0 match = None - for word in words: - if re.search(r'\b{}\b'.format(word), text): - n_matches += 1 - match = word + extended_keys = [] + extended_values = [] + if not _split_key: + for key in keys: + if re.search(r'\b{}\b'.format(key), text, flags=re.IGNORECASE): + n_matches += 1 + match = key + if not n_matches: + for key in keys: + if '/' not in key: + continue + for sinonim in key.split('/'): + sinonim = sinonim.strip() + if len(sinonim) < 3: + continue + extended_keys.append(sinonim) + extended_values.append(key) + if re.search(r'\b{}\b'.format(key), text, flags=re.IGNORECASE): + n_matches += 1 + match = key + + else: + for key in keys: + words = [x.strip() for x in re.split('\W', key) if len(x.strip()) >= 2] + if len(words) < 2: + continue + if _match_every_word(words, text): + n_matches += 1 + match = key + if n_matches == 1: - return match + return return_dict[match] + elif not n_matches and not _split_key: + return match_single_key( + ([*keys] + extended_keys), + text, + _split_key=True, + _return_values=([*return_values] + extended_values), + ) def aggregate_province_pop(pop_df: pd.DataFrame, geo_df: pd.DataFrame) -> pd.DataFrame: diff --git a/italy_geopop/pandas_extension.py b/italy_geopop/pandas_extension.py index 6d5b820..5371e4e 100644 --- a/italy_geopop/pandas_extension.py +++ b/italy_geopop/pandas_extension.py @@ -3,11 +3,10 @@ import re import numpy as np import pandas as pd -from warnings import warn from typing import Any -from ._utils import handle_return_cols, simple_cache, match_single_word +from ._utils import handle_return_cols, match_single_key from . import geopop @@ -299,7 +298,7 @@ def smart_from_municipality( @cache def get_data(x): - key = match_single_word(str_indexed.keys(), str(x).strip().lower()) + key = match_single_key(str_indexed.keys(), str(x).strip().lower()) return str_indexed.get(key, empty_serie) ret = ret.fillna(nans.apply(get_data)) @@ -345,7 +344,7 @@ def smart_from_province( @cache def get_data(x): - key = match_single_word(str_indexed.keys(), str(x).strip().lower()) + key = match_single_key(str_indexed.keys(), str(x).strip().lower()) return str_indexed.get(key, empty_serie) ret = ret.fillna(nans.apply(get_data)) @@ -391,7 +390,7 @@ def smart_from_region( @cache def get_data(x): - key = match_single_word(str_indexed.keys(), str(x).strip().lower()) + key = match_single_key(str_indexed.keys(), str(x).strip().lower()) return str_indexed.get(key, empty_serie) ret = ret.fillna(nans.apply(get_data)) diff --git a/tests/test_pandas_extension.py b/tests/test_pandas_extension.py index 0f47761..228d1f3 100644 --- a/tests/test_pandas_extension.py +++ b/tests/test_pandas_extension.py @@ -300,7 +300,18 @@ def region_name_complex() -> pd.Series: """ Returns a pd.Series with a valid complex region name. """ - return pd.Series(['Regione del Veneto']) + return pd.Series( + [ + 'Regione del Veneto', + "Valle d'Aosta", + 'Valle d Aosta', + 'Valle di Aosta', + 'Valle Aosta', + 'Trentino-Alto Adige', + 'Friuli Venezia Giulia', + 'Friuli-Venezia-Giulia', + ] + ) @pytest.fixture @@ -308,7 +319,7 @@ def region_name_complex_to_simple() -> pd.Series: """ Returns a pd.Series with a the right simple name for region_name_complex above. """ - return pd.Series(['veneto']) + return pd.Series(['veneto', 2, 2, 2, 2, 4, 6, 6]) @pytest.fixture @@ -316,7 +327,9 @@ def not_unequivocal_region_name_complex() -> pd.Series: """ Returns a pd.Series with a non-unequivocal complex region name. """ - return pd.Series(['Piemonte o Lombardia']) + return pd.Series( + ['Piemonte o Lombardia', 'Trentino Venezia Giulia', 'Valle Alto Adige'] + ) # Ensure same index as input series index @@ -654,11 +667,11 @@ def test_pandas_extension_find_correct_region_information_from_complex_region_na region_name_complex, region_name_complex_to_simple, include_geometry ): with pandas_activate_context(include_geometry=include_geometry): - expected = region_name_complex_to_simple.italy_geopop.from_region().drop( - ['provinces'], axis=1 + expected = region_name_complex_to_simple.italy_geopop.from_region( + return_cols=['region'] ) - output = region_name_complex.italy_geopop.smart_from_region().drop( - ['provinces'], axis=1 + output = region_name_complex.italy_geopop.smart_from_region( + return_cols=['region'] ) assert (output != expected).sum().sum() == 0