From c7bf7daf8ceb6fb8d6e5721806d32442dc75ee71 Mon Sep 17 00:00:00 2001 From: Noah Dove Date: Mon, 28 Nov 2022 16:28:20 -0800 Subject: [PATCH] Refactor bailouts --- .../plugins/repository/tdr_anvil/__init__.py | 302 +++++++++--------- 1 file changed, 153 insertions(+), 149 deletions(-) diff --git a/src/azul/plugins/repository/tdr_anvil/__init__.py b/src/azul/plugins/repository/tdr_anvil/__init__.py index 52636e8abd..edb68d9b38 100644 --- a/src/azul/plugins/repository/tdr_anvil/__init__.py +++ b/src/azul/plugins/repository/tdr_anvil/__init__.py @@ -266,176 +266,180 @@ def _upstream_from_biosamples(self, source: TDRSourceSpec, biosample_ids: AbstractSet[Key] ) -> Links: - if not biosample_ids: - return set() - rows = self._run_sql(f''' - SELECT b.biosample_id, b.donor_id, b.part_of_dataset_id - FROM {backtick(self._full_table_name(source, 'biosample'))} AS b - WHERE b.biosample_id IN ({', '.join(map(repr, biosample_ids))}) - ''') - result: Links = set() - for row in rows: - downstream_ref = KeyReference(entity_type='biosample', - key=row['biosample_id']) - result.add(Link.create(outputs=downstream_ref, - inputs=KeyReference(entity_type='dataset', - key=one(row['part_of_dataset_id'])))) - for donor_id in row['donor_id']: + if biosample_ids: + rows = self._run_sql(f''' + SELECT b.biosample_id, b.donor_id, b.part_of_dataset_id + FROM {backtick(self._full_table_name(source, 'biosample'))} AS b + WHERE b.biosample_id IN ({', '.join(map(repr, biosample_ids))}) + ''') + result: Links = set() + for row in rows: + downstream_ref = KeyReference(entity_type='biosample', + key=row['biosample_id']) result.add(Link.create(outputs=downstream_ref, - inputs=KeyReference(entity_type='donor', - key=donor_id))) - return result + inputs=KeyReference(entity_type='dataset', + key=one(row['part_of_dataset_id'])))) + for donor_id in row['donor_id']: + result.add(Link.create(outputs=downstream_ref, + inputs=KeyReference(entity_type='donor', + key=donor_id))) + return result + else: + return set() def _upstream_from_files(self, source: TDRSourceSpec, file_ids: AbstractSet[Key] ) -> Links: - if not file_ids: + if file_ids: + rows = self._run_sql(f''' + WITH file AS ( + SELECT f.file_id FROM {backtick(self._full_table_name(source, 'file'))} AS f + WHERE f.file_id IN ({', '.join(map(repr, file_ids))}) + ) + SELECT + f.file_id AS generated_file_id, + 'alignmentactivity' AS activity_table, + ama.alignmentactivity_id AS activity_id, + ama.used_file_id AS uses_file_id, + [] AS uses_biosample_id, + FROM file AS f + JOIN {backtick(self._full_table_name(source, 'alignmentactivity'))} AS ama + ON f.file_id IN UNNEST(ama.generated_file_id) + UNION ALL SELECT + f.file_id, + 'assayactivity', + aya.assayactivity_id, + [], + aya.used_biosample_id, + FROM file AS f + JOIN {backtick(self._full_table_name(source, 'assayactivity'))} AS aya + ON f.file_id IN UNNEST(aya.generated_file_id) + UNION ALL SELECT + f.file_id, + 'sequencingactivity', + sqa.sequencingactivity_id, + [], + sqa.used_biosample_id, + FROM file AS f + JOIN {backtick(self._full_table_name(source, 'sequencingactivity'))} AS sqa + ON f.file_id IN UNNEST(sqa.generated_file_id) + UNION ALL SELECT + f.file_id, + 'activity', + a.activity_id, + a.used_file_id, + a.used_biosample_id, + FROM file AS f + JOIN {backtick(self._full_table_name(source, 'activity'))} AS a + ON f.file_id IN UNNEST(a.generated_file_id) + ''') + return { + Link.create( + activity=KeyReference(entity_type=row['activity_table'], key=row['activity_id']), + # The generated link is not a complete representation of the + # upstream activity because it does not include generated files + # that are not ancestors of the downstream file + outputs=KeyReference(entity_type='file', key=row['generated_file_id']), + inputs=[ + KeyReference(entity_type=entity_type, key=key) + for entity_type, column in [('file', 'uses_file_id'), + ('biosample', 'uses_biosample_id')] + for key in row[column] + ] + ) + for row in rows + } + else: return set() - rows = self._run_sql(f''' - WITH file AS ( - SELECT f.file_id FROM {backtick(self._full_table_name(source, 'file'))} AS f - WHERE f.file_id IN ({', '.join(map(repr, file_ids))}) - ) - SELECT - f.file_id AS generated_file_id, - 'alignmentactivity' AS activity_table, - ama.alignmentactivity_id AS activity_id, - ama.used_file_id AS uses_file_id, - [] AS uses_biosample_id, - FROM file AS f - JOIN {backtick(self._full_table_name(source, 'alignmentactivity'))} AS ama - ON f.file_id IN UNNEST(ama.generated_file_id) - UNION ALL SELECT - f.file_id, - 'assayactivity', - aya.assayactivity_id, - [], - aya.used_biosample_id, - FROM file AS f - JOIN {backtick(self._full_table_name(source, 'assayactivity'))} AS aya - ON f.file_id IN UNNEST(aya.generated_file_id) - UNION ALL SELECT - f.file_id, - 'sequencingactivity', - sqa.sequencingactivity_id, - [], - sqa.used_biosample_id, - FROM file AS f - JOIN {backtick(self._full_table_name(source, 'sequencingactivity'))} AS sqa - ON f.file_id IN UNNEST(sqa.generated_file_id) - UNION ALL SELECT - f.file_id, - 'activity', - a.activity_id, - a.used_file_id, - a.used_biosample_id, - FROM file AS f - JOIN {backtick(self._full_table_name(source, 'activity'))} AS a - ON f.file_id IN UNNEST(a.generated_file_id) - ''') - return { - Link.create( - activity=KeyReference(entity_type=row['activity_table'], key=row['activity_id']), - # The generated link is not a complete representation of the - # upstream activity because it does not include generated files - # that are not ancestors of the downstream file - outputs=KeyReference(entity_type='file', key=row['generated_file_id']), - inputs=[ - KeyReference(entity_type=entity_type, key=key) - for entity_type, column in [('file', 'uses_file_id'), - ('biosample', 'uses_biosample_id')] - for key in row[column] - ] - ) - for row in rows - } def _downstream_from_biosamples(self, source: TDRSourceSpec, biosample_ids: AbstractSet[Key], ) -> Links: - if not biosample_ids: - return set() - rows = self._run_sql(f''' - WITH activities AS ( - SELECT - sqa.sequencingactivity_id as activity_id, - 'sequencingactivity' as activity_table, - sqa.used_biosample_id, - sqa.generated_file_id - FROM {backtick(self._full_table_name(source, 'sequencingactivity'))} AS sqa - UNION ALL - SELECT - aya.assayactivity_id, - 'assayactivity', - aya.used_biosample_id, - aya.generated_file_id, - FROM {backtick(self._full_table_name(source, 'assayactivity'))} AS aya - UNION ALL + if biosample_ids: + rows = self._run_sql(f''' + WITH activities AS ( + SELECT + sqa.sequencingactivity_id as activity_id, + 'sequencingactivity' as activity_table, + sqa.used_biosample_id, + sqa.generated_file_id + FROM {backtick(self._full_table_name(source, 'sequencingactivity'))} AS sqa + UNION ALL + SELECT + aya.assayactivity_id, + 'assayactivity', + aya.used_biosample_id, + aya.generated_file_id, + FROM {backtick(self._full_table_name(source, 'assayactivity'))} AS aya + UNION ALL + SELECT + a.activity_id, + 'activity', + a.used_biosample_id, + a.generated_file_id, + FROM {backtick(self._full_table_name(source, 'activity'))} AS a + ) SELECT + biosample_id, a.activity_id, - 'activity', - a.used_biosample_id, - a.generated_file_id, - FROM {backtick(self._full_table_name(source, 'activity'))} AS a - ) - SELECT - biosample_id, - a.activity_id, - a.activity_table, - a.generated_file_id - FROM activities AS a, UNNEST(a.used_biosample_id) AS biosample_id - WHERE biosample_id IN ({', '.join(map(repr, biosample_ids))}) - ''') - return { - Link.create(inputs={KeyReference(key=row['biosample_id'], entity_type='biosample')}, - outputs=[ - KeyReference(key=output_id, entity_type='file') - for output_id in row['generated_file_id'] - ], - activity=KeyReference(key=row['activity_id'], entity_type=row['activity_table'])) - for row in rows - } + a.activity_table, + a.generated_file_id + FROM activities AS a, UNNEST(a.used_biosample_id) AS biosample_id + WHERE biosample_id IN ({', '.join(map(repr, biosample_ids))}) + ''') + return { + Link.create(inputs={KeyReference(key=row['biosample_id'], entity_type='biosample')}, + outputs=[ + KeyReference(key=output_id, entity_type='file') + for output_id in row['generated_file_id'] + ], + activity=KeyReference(key=row['activity_id'], entity_type=row['activity_table'])) + for row in rows + } + else: + return set() def _downstream_from_files(self, source: TDRSourceSpec, file_ids: AbstractSet[Key] ) -> Links: - if not file_ids: - return set() - rows = self._run_sql(f''' - WITH activities AS ( + if file_ids: + rows = self._run_sql(f''' + WITH activities AS ( + SELECT + ala.alignmentactivity_id, + 'alignmentactivity', + ala.used_file_id, + ala.generated_file_id + FROM {backtick(self._full_table_name(source, 'alignmentactivity'))} AS ala + UNION ALL SELECT + a.activity_id, + 'activity', + a.used_file_id, + a.generated_file_id + FROM {backtick(self._full_table_name(source, 'activity'))} AS a + ) SELECT - ala.alignmentactivity_id, - 'alignmentactivity', - ala.used_file_id, - ala.generated_file_id - FROM {backtick(self._full_table_name(source, 'alignmentactivity'))} AS ala - UNION ALL SELECT + used_file_id, + a.generated_file_id, a.activity_id, - 'activity', - a.used_file_id, - a.generated_file_id - FROM {backtick(self._full_table_name(source, 'activity'))} AS a - ) - SELECT - used_file_id, - a.generated_file_id, - a.activity_id, - a.activity_table - FROM activities AS a, UNNEST(a.used_file_id) AS used_file_id - WHERE used_file_id IN ({', '.join(map(repr, file_ids))}) - ''') - return { - Link.create(inputs=KeyReference(key=row['used_file_id'], entity_type='file'), - outputs=[ - KeyReference(key=file_id, entity_type='file') - for file_id in row['generated_file_id'] - ], - activity=KeyReference(key=row['actvity_id'], entity_type=row['activity_table'])) - for row in rows - } + a.activity_table + FROM activities AS a, UNNEST(a.used_file_id) AS used_file_id + WHERE used_file_id IN ({', '.join(map(repr, file_ids))}) + ''') + return { + Link.create(inputs=KeyReference(key=row['used_file_id'], entity_type='file'), + outputs=[ + KeyReference(key=file_id, entity_type='file') + for file_id in row['generated_file_id'] + ], + activity=KeyReference(key=row['actvity_id'], entity_type=row['activity_table'])) + for row in rows + } + else: + return set() def _retrieve_entities(self, source: TDRSourceSpec,