Skip to content

Commit

Permalink
refactor: repackage batch_common
Browse files Browse the repository at this point in the history
  • Loading branch information
tskir committed Dec 3, 2023
1 parent f1d6f84 commit adad24a
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 126 deletions.
67 changes: 67 additions & 0 deletions src/batch/batch_common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""Shared utilities for running non-Spark preprocessing on Google Batch."""

from __future__ import annotations

import json


def generate_job_config(
step_name: str,
number_of_tasks: int,
cpu_per_job: int = 8,
max_parallelism: int = 50,
) -> str:
"""Generate configuration for a Google Batch job.
Args:
step_name (str): Name of the module which will run the step, without the ".py" extension.
number_of_tasks (int): How many tasks are there going to be in the batch.
cpu_per_job (int): How many CPUs to allocate for each VM worker for each job. Must be a power of 2, maximum value is 64.
max_parallelism (int): The maximum number of concurrently running tasks.
Returns:
str: Google Batch job config in JSON format.
"""
config = {
"taskGroups": [
{
"taskSpec": {
"runnables": [
{
"script": {
"text": f"bash /mnt/share/code/runner.sh {step_name}",
}
}
],
"computeResource": {
"cpuMilli": cpu_per_job * 1000,
"memoryMib": cpu_per_job * 3200,
},
"volumes": [
{
"gcs": {
"remotePath": "genetics_etl_python_playground/batch"
},
"mountPath": "/mnt/share",
}
],
"maxRetryCount": 1,
"maxRunDuration": "3600s",
},
"taskCount": number_of_tasks,
"parallelism": min(number_of_tasks, max_parallelism),
}
],
"allocationPolicy": {
"instances": [
{
"policy": {
"machineType": f"n2d-standard-{cpu_per_job}",
"provisioningModel": "SPOT",
}
}
]
},
"logsPolicy": {"destination": "CLOUD_LOGGING"},
}
return json.dumps(config, indent=4)
5 changes: 2 additions & 3 deletions src/batch/eqtl_catalogue.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
import sys

import pandas as pd
from batch_common import generate_job_config
from spark_prep import SparkPrep

from utils import generate_job_config

EQTL_CATALOGUE_IMPORTED_PATH = "https://raw.githubusercontent.com/eQTL-Catalogue/eQTL-Catalogue-resources/master/tabix/tabix_ftp_paths_imported.tsv"
EQTL_CATALOGUE_OUPUT_BASE = (
"gs://genetics_etl_python_playground/1-smart-mirror/summary_stats"
Expand Down Expand Up @@ -36,7 +35,7 @@
worker.process()
else:
# We are running locally. Let's generate the job config.
# For this, we only really need the number of jobs to run.
# For this, we only really need the total number of jobs to run.
number_of_tasks = len(pd.read_table(EQTL_CATALOGUE_IMPORTED_PATH))
print(
generate_job_config(
Expand Down
123 changes: 0 additions & 123 deletions src/batch/utils.py

This file was deleted.

0 comments on commit adad24a

Please sign in to comment.