Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for get_latest_source_version being called unnecessarily. #237

Merged
merged 15 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ jobs:

- name: create env params
run: |
echo "PYTHONPATH=$PWD:$PWD/robokop-genetics" >> $GITHUB_ENV
echo "ROBOKOP_HOME=$PWD" >> $GITHUB_ENV
mkdir -p $PWD/tests/logs
mkdir -p $PWD/tests/storage
echo "ORION_LOGS=$PWD/tests/logs" >> $GITHUB_ENV
echo "ORION_STORAGE=$PWD/tests/storage" >> $GITHUB_ENV
mkdir -p $PWD/tests/workspace/logs
mkdir -p $PWD/tests/workspace/storage
mkdir -p $PWD/tests/workspace/graphs
echo "ORION_LOGS=$PWD/tests/workspace/logs" >> $GITHUB_ENV
echo "ORION_STORAGE=$PWD/tests/workspace/storage" >> $GITHUB_ENV
echo "ORION_GRAPHS=$PWD/tests/workspace/graphs" >> $GITHUB_ENV

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
Expand Down
471 changes: 251 additions & 220 deletions Common/build_manager.py

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions Common/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@


class GraphSpecError(Exception):
def __init__(self, error_message: str, actual_error: Exception = None):
self.error_message = error_message
self.actual_error = actual_error

def __str__(self):
return self.error_message


class DataVersionError(Exception):
def __init__(self, error_message: str):
self.error_message = error_message

def __str__(self):
return self.error_message
9 changes: 6 additions & 3 deletions Common/kgx_file_merger.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ def merge(self,
primary_sources = []
secondary_sources = []
for graph_source in chain(graph_spec.sources, graph_spec.subgraphs):
if graph_source.merge_strategy == 'default':
if not graph_source.merge_strategy:
primary_sources.append(graph_source)
elif graph_source.merge_strategy == 'connected_edge_subset':
secondary_sources.append(graph_source)
else:
return {'merge_error': f'Unsupported merge strategy specified: {graph_source.merge_strategy}'}

# TODO we should be able to process a single primary source more efficiently (ie copy and paste it)
# if len(primary_sources) == 1:
Expand All @@ -73,8 +75,9 @@ def merge(self,
all_source_ids = [graph_source.id for graph_source in chain(graph_spec.sources, graph_spec.subgraphs)]
missing_data_sets = [source_id for source_id in all_source_ids if
source_id not in merge_metadata['sources'].keys()]
self.logger.error(f"Error merging graph {graph_spec.graph_id}! could not merge: {missing_data_sets}")

error_message = f"Error merging graph {graph_spec.graph_id}! could not merge: {missing_data_sets}"
self.logger.error(error_message)
merge_metadata["merge_error"] = error_message
return merge_metadata

def merge_primary_sources(self,
Expand Down
48 changes: 37 additions & 11 deletions Common/kgxmodel.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from dataclasses import dataclass
from Common.biolink_constants import NAMED_THING
from Common.metadata import GraphMetadata
from Common.metadata import GraphMetadata, get_source_release_version
from Common.normalization import NormalizationScheme


class kgxnode:
def __init__(self,
identifier,
Expand Down Expand Up @@ -60,21 +61,31 @@ def get_metadata_representation(self):
@dataclass
class GraphSource:
id: str
version: str = None
merge_strategy: str = 'default'
merge_strategy: str = None
file_paths: list = None

# Version may be generated when requested and differs for subclasses of GraphSource.
def __getattribute__(self, name):
if name == "version":
return self.generate_version()
else:
return object.__getattribute__(self, name)


@dataclass
class SubGraphSource(GraphSource):
graph_version: str = None
graph_metadata: GraphMetadata = None

def get_metadata_representation(self):
return {'graph_id': self.id,
'release_version': self.version,
'graph_version': self.graph_version,
'merge_strategy:': self.merge_strategy,
'graph_metadata': self.graph_metadata.metadata if self.graph_metadata else None}

def generate_version(self):
return self.graph_version


@dataclass
class DataSource(GraphSource):
Expand All @@ -86,14 +97,29 @@ class DataSource(GraphSource):

def get_metadata_representation(self):
metadata = {'source_id': self.id,
'source_version': self.source_version,
'release_version': self.version,
'parsing_version': self.parsing_version,
'supplementation_version': self.supplementation_version,
'normalization_scheme': self.normalization_scheme.get_metadata_representation(),
'merge_strategy': self.merge_strategy}
'source_version': self.source_version,
'parsing_version': self.parsing_version,
'supplementation_version': self.supplementation_version,
'normalization_scheme': self.normalization_scheme.get_metadata_representation(),
'release_version': self.generate_version(),
'merge_strategy': self.merge_strategy}
if self.release_info:
metadata.update(self.release_info)
return metadata


# We can use generate_version to see if a source_version was already set. If not, we don't try to generate an
# overall version because we can't. Typical usage would be a lazy instantiation approach, first setting
# source_version to None, then checking this and retrieving/setting the source_version if needed,
# after which the overall version can be generated.
#
# We use get_source_release_version to generate versions for data sources the same deterministic way that
# the data source pipeline uses, so a version generated by a graph spec will match the release version generated by
# previous runs of the pipeline.
def generate_version(self):
if self.source_version is None:
return None
return get_source_release_version(self.id,
self.source_version,
self.parsing_version,
self.normalization_scheme.get_composite_normalization_version(),
self.supplementation_version)
41 changes: 20 additions & 21 deletions Common/load_manager.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import os
import argparse
import datetime
import time
from collections import defaultdict

from Common.data_sources import SourceDataLoaderClassFactory, RESOURCE_HOGS, get_available_data_sources
from Common.exceptions import DataVersionError
from Common.utils import LoggingUtil, GetDataPullError
from Common.kgx_file_normalizer import KGXFileNormalizer
from Common.normalization import NormalizationScheme, NodeNormalizer, EdgeNormalizer, NormalizationFailedError
Expand Down Expand Up @@ -124,7 +126,7 @@ def run_fetch_stage(self, source_id: str, source_version: str):
self.logger.info(f"Fetching source data for {source_id} (version: {source_version})...")
return self.fetch_source(source_id, source_version=source_version)

def get_latest_source_version(self, source_id: str, retries: int=0):
def get_latest_source_version(self, source_id: str, retries: int = 0):
if source_id in self.latest_source_version_lookup:
return self.latest_source_version_lookup[source_id]

Expand All @@ -136,19 +138,18 @@ def get_latest_source_version(self, source_id: str, retries: int=0):
self.latest_source_version_lookup[source_id] = latest_source_version
return latest_source_version
except GetDataPullError as failed_error:
self.logger.error(
f"Error while checking for latest source version for {source_id}: {failed_error.error_message}")
error_message = f"Error while checking for latest source version for {source_id}: " \
f"{failed_error.error_message}"
self.logger.error(error_message)
if retries < 2:
time.sleep(3)
return self.get_latest_source_version(source_id, retries=retries+1)
else:
# TODO what should we do here?
# no great place to write an error in metadata because metadata is specific to source versions
# source_metadata.set_version_checking_error(failed_error.error_message)
return None
raise DataVersionError(error_message=error_message)
except Exception as e:
self.logger.error(
f"Error while checking for latest source version for {source_id}: {repr(e)}-{str(e)}")
return None
error_message = f"Error while checking for latest source version for {source_id}: {repr(e)}-{str(e)}"
self.logger.error(error_message)
raise DataVersionError(error_message=error_message)

def fetch_source(self, source_id: str, source_version: str='latest', retries: int=0):

Expand Down Expand Up @@ -503,20 +504,18 @@ def run_qc_and_metadata_stage(self,
parsing_version: str,
supplementation_version: str,
normalization_scheme: NormalizationScheme):
# source data QC here
source_metadata = self.get_source_metadata(source_id, source_version)
normalization_version = normalization_scheme.get_composite_normalization_version()
# source data QC should go here

self.logger.info(f'Generating release for {source_id}')
source_metadata = self.get_source_metadata(source_id, source_version)
loader = SOURCE_DATA_LOADER_CLASSES[source_id](test_mode=self.test_mode)
source_meta_information = loader.get_source_meta_information()
source_metadata.generate_release_metadata(parsing_version=parsing_version,
supplementation_version=supplementation_version,
normalization_version=normalization_version,
source_meta_information=source_meta_information)
return source_metadata.get_release_version(parsing_version=parsing_version,
supplementation_version=supplementation_version,
normalization_version=normalization_version)
normalization_version = normalization_scheme.get_composite_normalization_version()
release_version = source_metadata.generate_release_metadata(parsing_version=parsing_version,
supplementation_version=supplementation_version,
normalization_version=normalization_version,
source_meta_information=source_meta_information)
self.logger.info(f'Generating release version for {source_id}: {release_version}')
return release_version

def get_source_metadata(self, source_id: str, source_version):
if source_id not in self.source_metadata or source_version not in self.source_metadata[source_id]:
Expand Down
60 changes: 20 additions & 40 deletions Common/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ def init_metadata(self):
raise NotImplementedError()

def save_metadata(self):
if not os.path.isdir(os.path.dirname(self.metadata_file_path)):
os.makedirs(os.path.dirname(self.metadata_file_path))
with open(self.metadata_file_path, 'w') as meta_json_file:
json.dump(self.metadata, meta_json_file, indent=4)

Expand Down Expand Up @@ -295,31 +297,18 @@ def has_supplemental_data(self, parsing_version: str, normalization_version: str
except KeyError:
return False

def get_release_version(self,
parsing_version: str,
normalization_version: str,
supplementation_version: str):
if "releases" in self.metadata:
for release_version, release in self.metadata["releases"].items():
if ((release["parsing_version"] == parsing_version) and
(release["normalization_version"] == normalization_version) and
(release["supplementation_version"] == supplementation_version)):
return release_version
return None

def generate_release_metadata(self,
parsing_version: str,
normalization_version: str,
supplementation_version: str,
source_meta_information: dict):
if "releases" not in self.metadata:
self.metadata["releases"] = {}
release_info = "".join([self.source_id,
self.source_version,
parsing_version,
normalization_version,
supplementation_version])
release_version = xxh64_hexdigest(release_info)
release_version = get_source_release_version(self.source_id,
self.source_version,
parsing_version,
normalization_version,
supplementation_version)
if release_version not in self.metadata["releases"]:
self.metadata["releases"][release_version] = {
"source_version": self.source_version,
Expand All @@ -329,31 +318,22 @@ def generate_release_metadata(self,
}
self.metadata["releases"][release_version].update(source_meta_information)
self.save_metadata()
return release_version

def get_release_info(self, release_version: str):
if 'releases' in self.metadata and release_version in self.metadata['releases']:
return self.metadata['releases'][release_version]
return None

'''
these need to be updated for the new versioning format, but we may not need them
def get_final_node_count(self):
try:
node_count = 0
node_count += self.metadata['normalization_info']['final_normalized_nodes']
if self.has_supplemental_data():
node_count += self.metadata['supplementation_info']['normalization_info']['final_normalized_nodes']
return node_count
except KeyError as k:
raise RuntimeError(f'Required metadata was not available: {k}')

def get_final_edge_count(self):
try:
node_count = 0
node_count += self.metadata['normalization_info']['final_normalized_edges']
if self.has_supplemental_data():
node_count += self.metadata['supplementation_info']['normalization_info']['final_normalized_edges']
return node_count
except KeyError as k:
raise RuntimeError(f'Required metadata was not available: {k}')
'''

def get_source_release_version(source_id,
source_version,
parsing_version,
normalization_version,
supplementation_version):
release_string = "_".join([source_id,
source_version,
parsing_version,
normalization_version,
supplementation_version])
return xxh64_hexdigest(release_string)
Loading
Loading