Skip to content

Commit

Permalink
feat: implement Google Batch preprocessing
Browse files Browse the repository at this point in the history
  • Loading branch information
tskir committed Dec 9, 2023
1 parent 4c86f8d commit 4af10a1
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 0 deletions.
17 changes: 17 additions & 0 deletions src/batch/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
## How to run data ingestion using Google Batch

Specify which data source to ingest:
```bash
export MODULE=eqtl_catalogue
```

Run:
```bash
# Upload code.
gsutil -m cp -r . gs://genetics_etl_python_playground/batch/code
# Submit batch job.
gcloud batch jobs submit \
"${MODULE//_/-}-$(date --utc +"%Y%m%d-%H%M%S")" \
--config <(python3 ${MODULE}.py) \
--location europe-west1
```
67 changes: 67 additions & 0 deletions src/batch/batch_common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""Shared utilities for running non-Spark ingestion on Google Batch."""

from __future__ import annotations

import json


def generate_job_config(
step_name: str,
number_of_tasks: int,
cpu_per_job: int = 16,
max_parallelism: int = 50,
) -> str:
"""Generate configuration for a Google Batch job.
Args:
step_name (str): Name of the module which will run the step, without the ".py" extension.
number_of_tasks (int): How many tasks are there going to be in the batch.
cpu_per_job (int): How many CPUs to allocate for each VM worker for each job. Must be a power of 2, maximum value is 64.
max_parallelism (int): The maximum number of concurrently running tasks.
Returns:
str: Google Batch job config in JSON format.
"""
config = {
"taskGroups": [
{
"taskSpec": {
"runnables": [
{
"script": {
"text": f"bash /mnt/share/code/runner.sh {step_name}",
}
}
],
"computeResource": {
"cpuMilli": cpu_per_job * 1000,
"memoryMib": cpu_per_job * 3200,
},
"volumes": [
{
"gcs": {
"remotePath": "genetics_etl_python_playground/batch"
},
"mountPath": "/mnt/share",
}
],
"maxRetryCount": 1,
"maxRunDuration": "3600s",
},
"taskCount": number_of_tasks,
"parallelism": min(number_of_tasks, max_parallelism),
}
],
"allocationPolicy": {
"instances": [
{
"policy": {
"machineType": f"n2d-standard-{cpu_per_job}",
"provisioningModel": "SPOT",
}
}
]
},
"logsPolicy": {"destination": "CLOUD_LOGGING"},
}
return json.dumps(config, indent=4)
28 changes: 28 additions & 0 deletions src/batch/runner.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
export MODULE=$1

# Create directory for logs, if necessary.
mkdir -p /mnt/share/logs/$MODULE

# Redirect all subsequent logs.
# The $BATCH_TASK_INDEX variable is set by Google Batch.
exec &> /mnt/share/logs/$MODULE/log.$BATCH_TASK_INDEX.log

echo ">>> Update APT"
sudo apt -y update

echo ">>> Install packages"
sudo apt -y install python3 python3-pip python3-setuptools

echo ">>> Update PIP and setuptools"
sudo python3 -m pip install --upgrade pip setuptools
echo $?

echo ">>> Install packages"
sudo python3 -m pip install -r /mnt/share/code/requirements.txt
echo $?

echo ">>> Run script"
sudo python3 /mnt/share/code/${MODULE}.py ${BATCH_TASK_INDEX}

echo ">>> Completed"
echo $?

0 comments on commit 4af10a1

Please sign in to comment.