Skip to content

Commit

Permalink
Some tests relied on cache that might not exist
Browse files Browse the repository at this point in the history
unless the whole test suite is executed
  • Loading branch information
rsgoncalves committed Mar 7, 2024
1 parent a87f32c commit ddf0af8
Showing 1 changed file with 19 additions and 8 deletions.
27 changes: 19 additions & 8 deletions test/simple_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

pd.set_option('display.max_columns', None)


class Text2TermTestSuite(unittest.TestCase):

@classmethod
Expand Down Expand Up @@ -51,6 +52,7 @@ def test_caching_ontology_set(self):
assert len(caches) == nr_ontologies_in_registry

def test_mapping_to_cached_ontology(self):
self.ensure_cache_exists("EFO", self.EFO_URL)
# Test mapping a list of terms to EFO loaded from cache
print("Test mapping a list of terms to EFO loaded from cache...")
mappings_efo_cache = text2term.map_terms(["asthma", "disease location", "food allergy"], target_ontology="EFO",
Expand All @@ -72,23 +74,25 @@ def test_mapping_to_cached_ontology(self):
print(f"...{mappings_match}")
assert mappings_match is True

def test_mapping_to_cached_efo_using_syntactic_mapper(self):
def test_mapping_to_cached_ontology_using_syntactic_mapper(self):
self.ensure_cache_exists("EFO", self.EFO_URL)
# Test mapping a list of terms to cached EFO using Jaro-Winkler syntactic similarity metric
print("Test mapping a list of terms to cached EFO using Jaro-Winkler syntactic similarity metric...")
print("Test mapping a list of terms to cached ontology using Jaro-Winkler syntactic similarity metric...")
df = text2term.map_terms(["asthma", "disease location", "food allergy"], "EFO", use_cache=True,
mapper=text2term.Mapper.JARO_WINKLER, term_type=OntologyTermType.ANY)
print(f"{df}\n")
assert df.size > 0

def test_mapping_to_efo_using_ontology_acronym(self):
# Test mapping a list of terms to EFO by specifying the ontology acronym, which gets resolved by bioregistry
def test_mapping_using_ontology_acronym(self):
# Test mapping a list of terms by specifying the target ontology acronym, which gets resolved by bioregistry
print(
"Test mapping a list of terms to EFO by specifying the ontology acronym, which gets resolved by bioregistry")
df2 = text2term.map_terms(["contains", "asthma"], "EFO", term_type=OntologyTermType.CLASS)
"Test mapping a list of terms by specifying the ontology acronym, which gets resolved by bioregistry")
df2 = text2term.map_terms(["contains", "asthma"], "MONDO")
print(f"{df2}\n")
assert df2.size > 0

def test_mapping_tagged_terms(self):
self.ensure_cache_exists("EFO", self.EFO_URL)
# Test mapping a dictionary of tagged terms to cached EFO, and include unmapped terms in the output
print("Test mapping a dictionary of tagged terms to cached EFO, and include unmapped terms in the output...")
df3 = text2term.map_terms(
Expand All @@ -100,6 +104,7 @@ def test_mapping_tagged_terms(self):
assert df3[self.TAGS_COLUMN].str.contains("measurement").any()

def test_preprocessing_from_file(self):
self.ensure_cache_exists("EFO", self.EFO_URL)
# Test processing tagged terms where the tags are provided in a file
print("Test processing tagged terms where the tags are provided in a file...")
tagged_terms = text2term.preprocess_tagged_terms("simple_preprocess.txt")
Expand All @@ -119,8 +124,7 @@ def test_mapping_to_properties(self):

# Test mapping a list of properties to EFO loaded from cache and restrict search to properties
print("Test mapping a list of properties to EFO loaded from cache and restrict search to properties...")
if not text2term.cache_exists("EFO"):
text2term.cache_ontology(ontology_url=self.EFO_URL, ontology_acronym="EFO")
self.ensure_cache_exists("EFO", self.EFO_URL)
df6 = text2term.map_terms(source_terms=["contains", "location"], target_ontology="EFO", use_cache=True,
term_type=OntologyTermType.PROPERTY)
print(f"{df6}\n")
Expand Down Expand Up @@ -184,6 +188,7 @@ def test_term_collector_iri_limit_properties_only(self):
assert len(terms) == expected_nr_properties_with_efo_iri

def test_mapping_with_min_score_filter(self):
self.ensure_cache_exists("EFO", self.EFO_URL)
min_score = 0.6
search_terms = ["asthma attack", "location"]

Expand All @@ -203,11 +208,13 @@ def test_mapping_with_min_score_filter(self):
assert (df_leven[self.MAPPING_SCORE_COLUMN] >= min_score).all()

def test_include_unmapped_terms(self):
self.ensure_cache_exists("EFO", self.EFO_URL)
df = text2term.map_terms(["asthma", "margarita"], target_ontology="EFO", use_cache=True, mapper=Mapper.TFIDF,
incl_unmapped=True, min_score=0.8)
assert df[self.TAGS_COLUMN].str.contains("unmapped").any()

def test_include_unmapped_terms_when_mappings_df_is_empty(self):
self.ensure_cache_exists("EFO", self.EFO_URL)
df = text2term.map_terms(["mojito", "margarita"], target_ontology="EFO", use_cache=True, mapper=Mapper.TFIDF,
incl_unmapped=True, min_score=0.8)
assert df[self.TAGS_COLUMN].str.contains("unmapped").any()
Expand All @@ -222,6 +229,10 @@ def check_df_equals(self, df, expected_df):
pd.testing.assert_frame_equal(df, expected_df, check_names=False, check_like=True)
return True

def ensure_cache_exists(self, ontology_name, ontology_url):
if not text2term.cache_exists(ontology_name):
text2term.cache_ontology(ontology_url=ontology_url, ontology_acronym=ontology_name)


if __name__ == '__main__':
unittest.main()

0 comments on commit ddf0af8

Please sign in to comment.