Skip to content

Commit

Permalink
fix: fix google storage paths and filter manifests
Browse files Browse the repository at this point in the history
  • Loading branch information
Szymon Szyszkowski committed Jul 22, 2024
1 parent 4a75267 commit 00fbb1f
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 8 deletions.
28 changes: 26 additions & 2 deletions src/ot_orchestration/task_groups/gwas_catalog/harmonisation.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
from airflow.providers.google.cloud.operators.cloud_batch import (
CloudBatchSubmitJobOperator,
)
from ot_orchestration.types import Manifest_Object
from airflow.operators.python import get_current_context
from airflow.utils.helpers import chain
from ot_orchestration.utils import create_batch_job, create_task_spec
from ot_orchestration.utils.common import GCP_REGION, GCP_PROJECT
from ot_orchestration.utils import GCSPath, GCSIOManager


@task_group(group_id="gwas_catalog_harmonisation")
Expand All @@ -26,6 +28,28 @@ def gwas_catalog_harmonisation() -> None:
match_glob="**/manifest.json",
)

@task(task_id="filter_manifests_by_step")
def filter_manifests_by_step(
manifest_paths: list[GCSPath],
) -> list[Manifest_Object]:
"""Read manifests that already exist in staging bucket."""
# recreate the paths with the gs://{bucket_name}/
params = get_gwas_catalog_dag_params()
staging_bucket: str = params["staging_bucket"]
manifest_paths = [
f"gs://{staging_bucket}/{manifest_path}" for manifest_path in manifest_paths
]
print(manifest_paths)
manifests = GCSIOManager().load_many(manifest_paths)
manifest_paths = [
manifest["manifestPath"]
for manifest in manifests
if not manifest["passHarmonisation"]
]
return manifest_paths

manifest_paths = filter_manifests_by_step(existing_manifest_paths.output)

@task(task_id="create_harmonisation_job")
def harmonisation_job(manifest_paths: list[str]) -> CloudBatchSubmitJobOperator:
"""Create a harmonisation batch job."""
Expand All @@ -37,7 +61,7 @@ def harmonisation_job(manifest_paths: list[str]) -> CloudBatchSubmitJobOperator:
policy_specs = google_batch_params["policy_specs"]
dag_params = get_gwas_catalog_dag_params()
image = dag_params["genetics_etl_image"]
commands = ["echo", "$MANIFEST_PATH"]
commands = ["tasks/harmonise.sh"]
task_env = [
Environment(variables={"MANIFEST_PATH": f"gs://{staging_bucket}/{mp}"})
for mp in manifest_paths
Expand All @@ -59,7 +83,7 @@ def harmonisation_job(manifest_paths: list[str]) -> CloudBatchSubmitJobOperator:
deferrable=False,
).execute(context=get_current_context())

harmonise = harmonisation_job(existing_manifest_paths.output)
harmonise = harmonisation_job(manifest_paths)
chain(existing_manifest_paths, harmonise)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ def read_existing_manifests(manifest_paths: list[GCSPath]) -> list[Manifest_Obje
"""Read manifests that already exist in staging bucket."""
# recreate the paths with the gs://{bucket_name}/
params = get_gwas_catalog_dag_params()
raw_sumstat_bucket: str = params["staging_bucket"]
staging_bucket: str = params["staging_bucket"]
manifest_paths = [
f"gs://{raw_sumstat_bucket}/{manifest_path}"
for manifest_path in manifest_paths
f"gs://{staging_bucket}/{manifest_path}" for manifest_path in manifest_paths
]
return GCSIOManager().load_many(manifest_paths)

Expand Down Expand Up @@ -63,7 +62,6 @@ def generate_new_manifests(
staging_prefix: str = params["staging_prefix"]
harmonised_prefix: str = params["harmonised_prefix"]
raw_sumstat_bucket: str = params["raw_sumstats_bucket"]
raw_sumstat_prefix: str = params["raw_sumstats_prefix"]
qc_prefix: str = params["qc_prefix"]
staging_dir = f"{staging_bucket}/{staging_prefix}"
studies_in_staging_bucket = [
Expand All @@ -76,7 +74,7 @@ def generate_new_manifests(
staging_path = f"{staging_dir}/{study_id}"
partial_manifest = {
"studyId": study_id,
"rawPath": f"gs://{raw_sumstat_bucket}/{raw_sumstat_prefix}",
"rawPath": f"gs://{raw_sumstat_bucket}/{raw_path}",
"manifestPath": f"gs://{staging_path}/manifest.json",
"harmonisedPath": f"gs://{staging_path}/{harmonised_prefix}",
"qcPath": f"gs://{staging_path}/{qc_prefix}",
Expand Down Expand Up @@ -121,7 +119,7 @@ def amend_curation_metadata(
curated_manifests = curated_manifest.replace({float("nan"): None}).to_dict(
"records"
)
return curated_manifests[0:10]
return curated_manifests

@task(task_id="save_manifests")
def save_manifests(manifests: list[Manifest_Object]) -> None:
Expand Down

0 comments on commit 00fbb1f

Please sign in to comment.