Skip to content

Commit

Permalink
Refactor bailouts
Browse files Browse the repository at this point in the history
  • Loading branch information
nadove-ucsc authored and dsotirho-ucsc committed Nov 30, 2022
1 parent 6847085 commit c7bf7da
Showing 1 changed file with 153 additions and 149 deletions.
302 changes: 153 additions & 149 deletions src/azul/plugins/repository/tdr_anvil/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,176 +266,180 @@ def _upstream_from_biosamples(self,
source: TDRSourceSpec,
biosample_ids: AbstractSet[Key]
) -> Links:
if not biosample_ids:
return set()
rows = self._run_sql(f'''
SELECT b.biosample_id, b.donor_id, b.part_of_dataset_id
FROM {backtick(self._full_table_name(source, 'biosample'))} AS b
WHERE b.biosample_id IN ({', '.join(map(repr, biosample_ids))})
''')
result: Links = set()
for row in rows:
downstream_ref = KeyReference(entity_type='biosample',
key=row['biosample_id'])
result.add(Link.create(outputs=downstream_ref,
inputs=KeyReference(entity_type='dataset',
key=one(row['part_of_dataset_id']))))
for donor_id in row['donor_id']:
if biosample_ids:
rows = self._run_sql(f'''
SELECT b.biosample_id, b.donor_id, b.part_of_dataset_id
FROM {backtick(self._full_table_name(source, 'biosample'))} AS b
WHERE b.biosample_id IN ({', '.join(map(repr, biosample_ids))})
''')
result: Links = set()
for row in rows:
downstream_ref = KeyReference(entity_type='biosample',
key=row['biosample_id'])
result.add(Link.create(outputs=downstream_ref,
inputs=KeyReference(entity_type='donor',
key=donor_id)))
return result
inputs=KeyReference(entity_type='dataset',
key=one(row['part_of_dataset_id']))))
for donor_id in row['donor_id']:
result.add(Link.create(outputs=downstream_ref,
inputs=KeyReference(entity_type='donor',
key=donor_id)))
return result
else:
return set()

def _upstream_from_files(self,
source: TDRSourceSpec,
file_ids: AbstractSet[Key]
) -> Links:
if not file_ids:
if file_ids:
rows = self._run_sql(f'''
WITH file AS (
SELECT f.file_id FROM {backtick(self._full_table_name(source, 'file'))} AS f
WHERE f.file_id IN ({', '.join(map(repr, file_ids))})
)
SELECT
f.file_id AS generated_file_id,
'alignmentactivity' AS activity_table,
ama.alignmentactivity_id AS activity_id,
ama.used_file_id AS uses_file_id,
[] AS uses_biosample_id,
FROM file AS f
JOIN {backtick(self._full_table_name(source, 'alignmentactivity'))} AS ama
ON f.file_id IN UNNEST(ama.generated_file_id)
UNION ALL SELECT
f.file_id,
'assayactivity',
aya.assayactivity_id,
[],
aya.used_biosample_id,
FROM file AS f
JOIN {backtick(self._full_table_name(source, 'assayactivity'))} AS aya
ON f.file_id IN UNNEST(aya.generated_file_id)
UNION ALL SELECT
f.file_id,
'sequencingactivity',
sqa.sequencingactivity_id,
[],
sqa.used_biosample_id,
FROM file AS f
JOIN {backtick(self._full_table_name(source, 'sequencingactivity'))} AS sqa
ON f.file_id IN UNNEST(sqa.generated_file_id)
UNION ALL SELECT
f.file_id,
'activity',
a.activity_id,
a.used_file_id,
a.used_biosample_id,
FROM file AS f
JOIN {backtick(self._full_table_name(source, 'activity'))} AS a
ON f.file_id IN UNNEST(a.generated_file_id)
''')
return {
Link.create(
activity=KeyReference(entity_type=row['activity_table'], key=row['activity_id']),
# The generated link is not a complete representation of the
# upstream activity because it does not include generated files
# that are not ancestors of the downstream file
outputs=KeyReference(entity_type='file', key=row['generated_file_id']),
inputs=[
KeyReference(entity_type=entity_type, key=key)
for entity_type, column in [('file', 'uses_file_id'),
('biosample', 'uses_biosample_id')]
for key in row[column]
]
)
for row in rows
}
else:
return set()
rows = self._run_sql(f'''
WITH file AS (
SELECT f.file_id FROM {backtick(self._full_table_name(source, 'file'))} AS f
WHERE f.file_id IN ({', '.join(map(repr, file_ids))})
)
SELECT
f.file_id AS generated_file_id,
'alignmentactivity' AS activity_table,
ama.alignmentactivity_id AS activity_id,
ama.used_file_id AS uses_file_id,
[] AS uses_biosample_id,
FROM file AS f
JOIN {backtick(self._full_table_name(source, 'alignmentactivity'))} AS ama
ON f.file_id IN UNNEST(ama.generated_file_id)
UNION ALL SELECT
f.file_id,
'assayactivity',
aya.assayactivity_id,
[],
aya.used_biosample_id,
FROM file AS f
JOIN {backtick(self._full_table_name(source, 'assayactivity'))} AS aya
ON f.file_id IN UNNEST(aya.generated_file_id)
UNION ALL SELECT
f.file_id,
'sequencingactivity',
sqa.sequencingactivity_id,
[],
sqa.used_biosample_id,
FROM file AS f
JOIN {backtick(self._full_table_name(source, 'sequencingactivity'))} AS sqa
ON f.file_id IN UNNEST(sqa.generated_file_id)
UNION ALL SELECT
f.file_id,
'activity',
a.activity_id,
a.used_file_id,
a.used_biosample_id,
FROM file AS f
JOIN {backtick(self._full_table_name(source, 'activity'))} AS a
ON f.file_id IN UNNEST(a.generated_file_id)
''')
return {
Link.create(
activity=KeyReference(entity_type=row['activity_table'], key=row['activity_id']),
# The generated link is not a complete representation of the
# upstream activity because it does not include generated files
# that are not ancestors of the downstream file
outputs=KeyReference(entity_type='file', key=row['generated_file_id']),
inputs=[
KeyReference(entity_type=entity_type, key=key)
for entity_type, column in [('file', 'uses_file_id'),
('biosample', 'uses_biosample_id')]
for key in row[column]
]
)
for row in rows
}

def _downstream_from_biosamples(self,
source: TDRSourceSpec,
biosample_ids: AbstractSet[Key],
) -> Links:
if not biosample_ids:
return set()
rows = self._run_sql(f'''
WITH activities AS (
SELECT
sqa.sequencingactivity_id as activity_id,
'sequencingactivity' as activity_table,
sqa.used_biosample_id,
sqa.generated_file_id
FROM {backtick(self._full_table_name(source, 'sequencingactivity'))} AS sqa
UNION ALL
SELECT
aya.assayactivity_id,
'assayactivity',
aya.used_biosample_id,
aya.generated_file_id,
FROM {backtick(self._full_table_name(source, 'assayactivity'))} AS aya
UNION ALL
if biosample_ids:
rows = self._run_sql(f'''
WITH activities AS (
SELECT
sqa.sequencingactivity_id as activity_id,
'sequencingactivity' as activity_table,
sqa.used_biosample_id,
sqa.generated_file_id
FROM {backtick(self._full_table_name(source, 'sequencingactivity'))} AS sqa
UNION ALL
SELECT
aya.assayactivity_id,
'assayactivity',
aya.used_biosample_id,
aya.generated_file_id,
FROM {backtick(self._full_table_name(source, 'assayactivity'))} AS aya
UNION ALL
SELECT
a.activity_id,
'activity',
a.used_biosample_id,
a.generated_file_id,
FROM {backtick(self._full_table_name(source, 'activity'))} AS a
)
SELECT
biosample_id,
a.activity_id,
'activity',
a.used_biosample_id,
a.generated_file_id,
FROM {backtick(self._full_table_name(source, 'activity'))} AS a
)
SELECT
biosample_id,
a.activity_id,
a.activity_table,
a.generated_file_id
FROM activities AS a, UNNEST(a.used_biosample_id) AS biosample_id
WHERE biosample_id IN ({', '.join(map(repr, biosample_ids))})
''')
return {
Link.create(inputs={KeyReference(key=row['biosample_id'], entity_type='biosample')},
outputs=[
KeyReference(key=output_id, entity_type='file')
for output_id in row['generated_file_id']
],
activity=KeyReference(key=row['activity_id'], entity_type=row['activity_table']))
for row in rows
}
a.activity_table,
a.generated_file_id
FROM activities AS a, UNNEST(a.used_biosample_id) AS biosample_id
WHERE biosample_id IN ({', '.join(map(repr, biosample_ids))})
''')
return {
Link.create(inputs={KeyReference(key=row['biosample_id'], entity_type='biosample')},
outputs=[
KeyReference(key=output_id, entity_type='file')
for output_id in row['generated_file_id']
],
activity=KeyReference(key=row['activity_id'], entity_type=row['activity_table']))
for row in rows
}
else:
return set()

def _downstream_from_files(self,
source: TDRSourceSpec,
file_ids: AbstractSet[Key]
) -> Links:
if not file_ids:
return set()
rows = self._run_sql(f'''
WITH activities AS (
if file_ids:
rows = self._run_sql(f'''
WITH activities AS (
SELECT
ala.alignmentactivity_id,
'alignmentactivity',
ala.used_file_id,
ala.generated_file_id
FROM {backtick(self._full_table_name(source, 'alignmentactivity'))} AS ala
UNION ALL SELECT
a.activity_id,
'activity',
a.used_file_id,
a.generated_file_id
FROM {backtick(self._full_table_name(source, 'activity'))} AS a
)
SELECT
ala.alignmentactivity_id,
'alignmentactivity',
ala.used_file_id,
ala.generated_file_id
FROM {backtick(self._full_table_name(source, 'alignmentactivity'))} AS ala
UNION ALL SELECT
used_file_id,
a.generated_file_id,
a.activity_id,
'activity',
a.used_file_id,
a.generated_file_id
FROM {backtick(self._full_table_name(source, 'activity'))} AS a
)
SELECT
used_file_id,
a.generated_file_id,
a.activity_id,
a.activity_table
FROM activities AS a, UNNEST(a.used_file_id) AS used_file_id
WHERE used_file_id IN ({', '.join(map(repr, file_ids))})
''')
return {
Link.create(inputs=KeyReference(key=row['used_file_id'], entity_type='file'),
outputs=[
KeyReference(key=file_id, entity_type='file')
for file_id in row['generated_file_id']
],
activity=KeyReference(key=row['actvity_id'], entity_type=row['activity_table']))
for row in rows
}
a.activity_table
FROM activities AS a, UNNEST(a.used_file_id) AS used_file_id
WHERE used_file_id IN ({', '.join(map(repr, file_ids))})
''')
return {
Link.create(inputs=KeyReference(key=row['used_file_id'], entity_type='file'),
outputs=[
KeyReference(key=file_id, entity_type='file')
for file_id in row['generated_file_id']
],
activity=KeyReference(key=row['actvity_id'], entity_type=row['activity_table']))
for row in rows
}
else:
return set()

def _retrieve_entities(self,
source: TDRSourceSpec,
Expand Down

0 comments on commit c7bf7da

Please sign in to comment.