From e9f49e4dc5f0f474a439e2db3fbd2d22bb0a12df Mon Sep 17 00:00:00 2001 From: Evan Morris Date: Fri, 23 Aug 2024 17:25:33 -0400 Subject: [PATCH] fixed some logic for subgraphs allowing explicit versioning, improved error messages, general clean up --- Common/build_manager.py | 109 +++++++++++++++++++++++----------------- 1 file changed, 62 insertions(+), 47 deletions(-) diff --git a/Common/build_manager.py b/Common/build_manager.py index 05668d3..059284f 100644 --- a/Common/build_manager.py +++ b/Common/build_manager.py @@ -25,6 +25,12 @@ REDUNDANT_EDGES_FILENAME = 'redundant_edges.jsonl' +class GraphSpecError(Exception): + def __init__(self, error_message: str, actual_error: Exception = None): + self.error_message = error_message + self.actual_error = actual_error + + class GraphBuilder: def __init__(self): @@ -33,9 +39,9 @@ def __init__(self): line_format='medium', log_file_path=os.environ['ORION_LOGS']) - #This dictionary determines the graph versions of already parsed graphs. - #This is more tempermental than it seems because the only way to get - #the version name for many subgraphs is to download it. + # This dictionary determines the graph versions of already parsed graphs. + # This is more temperamental than it seems because the only way to get + # the version name for many subgraphs is to download it. self.graph_id_to_version = {} self.graphs_dir = self.init_graphs_dir() # path to the graphs output directory @@ -138,32 +144,49 @@ def build_graph(self, graph_id: str): redundant_filepath = edges_filepath.replace(EDGES_FILENAME, REDUNDANT_EDGES_FILENAME) generate_redundant_kg(edges_filepath, redundant_filepath) - def get_graph_version(self, graph_id:str) -> str: - if(graph_id not in self.graph_id_to_version): + def get_graph_version(self, graph_id: str) -> str: + if graph_id not in self.graph_id_to_version: graph_spec = self.get_graph_spec(graph_id) + if not graph_spec: + raise GraphSpecError(error_message=f'Tried to dynamically determine the version of a ' + f'graph that was not found in the Graph Spec.') graph_version = self.generate_graph_version(graph_spec) self.graph_id_to_version[graph_id] = graph_version return self.graph_id_to_version[graph_id] - def build_dependencies(self, graph_spec: GraphSpec): for subgraph_source in graph_spec.subgraphs: subgraph_id = subgraph_source.id - subgraph_version = self.get_graph_version(subgraph_id) + subgraph_version = subgraph_source.version + # Get the subgraph version from the subgraph source spec, + # which will either be one specified in the graph spec or None. + if not subgraph_version: + try: + # if one was not specified, retrieve or generate it like we would any graph version + subgraph_version = self.get_graph_version(subgraph_id) + except GraphSpecError: + self.logger.error(f'Could not determine version of subgraph {subgraph_id}. ' + f'Either specify an existing version of the graph, or the subgraph must ' + f'be defined in the same Graph Spec.') + return False if self.check_for_existing_graph_dir(subgraph_id, subgraph_version): - # load previous metadata + # load previous metadata if the specified subgraph version was already built graph_metadata = self.get_graph_metadata(subgraph_id, subgraph_version) subgraph_source.graph_metadata = graph_metadata.metadata - elif self.graphid_to_version_name[subgraph_id] == subgraph_version: - self.logger.warning(f'For graph {graph_spec.graph_id} subgraph dependency ' - f'{subgraph_id} version {subgraph_version} is not ready. Building now...') - self.build_graph(subgraph_id) - #With changes here, this line should never run... else: - self.logger.warning(f'Building graph {graph_spec.graph_id} failed, ' - f'subgraph {subgraph_id} had version {subgraph_version} specified, ' - f'but that version of the graph was not found in the graphs directory.') - return False + # If the subgraph doesn't already exist, we need to make sure it matches the current version of the + # subgraph as generated by the current graph spec, otherwise we won't be able to build it. + current_subgraph_version = self.get_graph_version(subgraph_id) + if subgraph_version == current_subgraph_version: + self.logger.warning(f'For graph {graph_spec.graph_id} subgraph dependency ' + f'{subgraph_id} is not ready. Building now...') + self.build_graph(subgraph_id) + else: + self.logger.error(f'Subgraph ({subgraph_id}) version ({subgraph_version}) was specified, but that ' + f'version of the graph could not be found. It can not be built now because the ' + f'current version is {current_subgraph_version}. Either specify a version that ' + f'is already built, or leave the subgraph version blank to automatically ' + f'build the new one.') graph_metadata = self.get_graph_metadata(subgraph_id, subgraph_version) if graph_metadata.get_build_status() == Metadata.STABLE: @@ -370,7 +393,7 @@ def parse_graph_spec(self, graph_spec_yaml): graph_wide_normalization_code_version = graph_yaml['normalization_code_version'] \ if 'normalization_code_version' in graph_yaml else None - # apply them to all of the data sources, this will overwrite anything defined at the source level + # apply them to all the data sources, this will overwrite anything defined at the source level for data_source in data_sources: if graph_wide_node_norm_version is not None: data_source.normalization_scheme.node_normalization_version = graph_wide_node_norm_version @@ -385,13 +408,13 @@ def parse_graph_spec(self, graph_spec_yaml): graph_output_format = graph_yaml['output_format'] if 'output_format' in graph_yaml else "" graph_spec = GraphSpec(graph_id=graph_id, - graph_name=graph_name, - graph_description=graph_description, - graph_url=graph_url, - graph_version=None, # this will get populated later - graph_output_format=graph_output_format, - subgraphs=subgraph_sources, - sources=data_sources) + graph_name=graph_name, + graph_description=graph_description, + graph_url=graph_url, + graph_version=None, # this will get populated later + graph_output_format=graph_output_format, + subgraphs=subgraph_sources, + sources=data_sources) graph_specs.append(graph_spec) except Exception as e: self.logger.error(f'Error parsing Graph Spec ({graph_id}), formatting error or missing information: {repr(e)}') @@ -400,17 +423,7 @@ def parse_graph_spec(self, graph_spec_yaml): def parse_subgraph_spec(self, subgraph_yml): subgraph_id = subgraph_yml['graph_id'] - - if 'graph_version' in subgraph_yml: subgraph_version = subgraph_yml['graph_version'] - else: subgraph_version= 'latest' - -# if subgraph_version == 'current': -# if subgraph_id in self.current_graph_versions: -# subgraph_version = self.current_graph_versions[subgraph_id] -# else: -# raise Exception(f'Graph Spec Error - Could not determine version of subgraph {subgraph_id}. ' -# f'Either specify an existing version, already built in your graphs directory, ' -# f'or the subgraph must be defined previously in the same Graph Spec.') + subgraph_version = subgraph_yml['graph_version'] if 'graph_version' in subgraph_yml else None merge_strategy = subgraph_yml['merge_strategy'] if 'merge_strategy' in subgraph_yml else 'default' subgraph_source = SubGraphSource(id=subgraph_id, version=subgraph_version, @@ -425,10 +438,15 @@ def parse_data_source_spec(self, source_yml): f'Valid sources are: {", ".join(get_available_data_sources())}') raise Exception(error_message) - if 'source_version' not in source_yml or source_yml['source_version']=='latest': + # The DataSource() will get initialized with either a specific source version, if specified, + # or a callable function which can determine the latest source version. This is for a lazy initialization + # technique, so that we don't call get_latest_source_version until we need to, if at all. + if 'source_version' not in source_yml or source_yml['source_version'] == 'latest': get_source_version = self.source_data_manager.get_latest_source_version + source_version = None else: - get_source_version = lambda source_id=None : str(source_yml['source_version']) + source_version = str(source_yml['source_version']) + get_source_version = None parsing_version = source_yml['parsing_version'] if 'parsing_version' in source_yml \ else self.source_data_manager.get_latest_parsing_version(source_id) @@ -452,13 +470,12 @@ def parse_data_source_spec(self, source_yml): conflation=conflation) supplementation_version = SequenceVariantSupplementation.SUPPLEMENTATION_VERSION data_source = DataSource(id=source_id, - version=None, # this will get populated later in build_dependencies - get_source_version=get_source_version, -# source_version=source_version, - merge_strategy=merge_strategy, - normalization_scheme=normalization_scheme, - parsing_version=parsing_version, - supplementation_version=supplementation_version) + source_version=source_version, + get_source_version=get_source_version, + merge_strategy=merge_strategy, + normalization_scheme=normalization_scheme, + parsing_version=parsing_version, + supplementation_version=supplementation_version) return data_source def get_graph_spec(self, graph_id: str): @@ -491,8 +508,6 @@ def check_for_existing_graph_dir(self, graph_id: str, graph_version: str): def get_graph_metadata(self, graph_id: str, graph_version: str): # make sure the output directory exists (where we check for existing GraphMetadata) graph_output_dir = self.get_graph_dir_path(graph_id, graph_version) - if not os.path.isdir(graph_output_dir): - os.makedirs(graph_output_dir) # load existing or create new metadata file return GraphMetadata(graph_id, graph_output_dir)