From 092dee0b666a139cd8e6d26843b515087327b8bc Mon Sep 17 00:00:00 2001 From: Kirill Tsukanov Date: Thu, 23 Nov 2023 13:05:19 +0000 Subject: [PATCH] chore: add Beam pipeline requirements --- src/beam/README.md | 3 ++- src/beam/eqtl_catalogue.py | 4 ++-- src/beam/requirements.txt | 3 +++ 3 files changed, 7 insertions(+), 3 deletions(-) create mode 100644 src/beam/requirements.txt diff --git a/src/beam/README.md b/src/beam/README.md index acbb9f47f3..752df38deb 100644 --- a/src/beam/README.md +++ b/src/beam/README.md @@ -5,7 +5,8 @@ python -m eqtl_catalogue \ --project open-targets-genetics-dev \ --staging_location gs://genetics-portal-dev-staging/beam \ --template_location gs://genetics_etl_python_playground/beam/eqtl_catalogue \ - --region europe-west1 + --region europe-west1 \ + --requirements_file requirements.txt ``` To run a pipeline: diff --git a/src/beam/eqtl_catalogue.py b/src/beam/eqtl_catalogue.py index 1b0c8709be..b766d9a6ad 100644 --- a/src/beam/eqtl_catalogue.py +++ b/src/beam/eqtl_catalogue.py @@ -180,7 +180,7 @@ def process( # Skip header. continue data = row.split("\t") - if i == 100000: + if i == 1000000: break # Perform actions depending on the chromosome. chromosome = data[chromosome_index] @@ -242,7 +242,7 @@ def run_pipeline() -> None: with beam.Pipeline(options=PipelineOptions()) as pipeline: ( pipeline - | "List input files" >> beam.Create(get_input_files()[:1]) + | "List input files" >> beam.Create(get_input_files()) | "Parse data" >> beam.ParDo(ParseData()) | "Write to Parquet" >> beam.ParDo(WriteData()) ) diff --git a/src/beam/requirements.txt b/src/beam/requirements.txt new file mode 100644 index 0000000000..14eaff7146 --- /dev/null +++ b/src/beam/requirements.txt @@ -0,0 +1,3 @@ +pandas +fsspec +gcsfs