Skip to content

Commit f8729c8

Browse files
Szymon SzyszkowskiSzymonSzyszkowski
authored andcommitted
chore: cleanup
1 parent 24774b8 commit f8729c8

File tree

3 files changed

+13
-8
lines changed

3 files changed

+13
-8
lines changed

src/ot_orchestration/dags/gwas_catalog_harmonisation.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@
99
from airflow.models.baseoperator import chain
1010
from airflow.models.dag import DAG
1111

12-
from ot_orchestration.operators.batch.harmonisation import (
12+
from ot_orchestration.operators.batch.generic import (
1313
BatchIndexOperator,
14-
GeneticsBatchJobOperator,
14+
BatchJobOperator,
1515
)
1616
from ot_orchestration.utils import (
1717
find_node_in_config,
@@ -50,7 +50,7 @@ def end():
5050
batch_index_specs=node_config["google_batch_index_specs"],
5151
)
5252
node_config = find_node_in_config(config["nodes"], "gwas_catalog_harmonisation")
53-
harmonisation_batch_job = GeneticsBatchJobOperator.partial(
53+
harmonisation_batch_job = BatchJobOperator.partial(
5454
task_id=node_config["id"],
5555
job_name="harmonisation",
5656
google_batch=node_config["google_batch"],

src/ot_orchestration/operators/batch/harmonisation.py renamed to src/ot_orchestration/operators/batch/generic.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,9 @@ def execute(self, context) -> list[BatchIndexRow]:
6767
return rows
6868

6969

70-
class GeneticsBatchJobOperator(CloudBatchSubmitJobOperator):
70+
class BatchJobOperator(CloudBatchSubmitJobOperator):
71+
"""Generic Batch Job operator."""
72+
7173
def __init__(
7274
self,
7375
job_name: str,

src/ot_orchestration/utils/batch.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -149,22 +149,25 @@ def create_batch_job(
149149
return job
150150

151151

152-
def create_task_env(var_list: list[dict[str, Any]]):
152+
def create_task_env(var_list: list[dict[str, str]]):
153153
"""This function creates list of batch_v1.Environment objects from provided list of dictionaries."""
154-
print(f"{var_list=}")
155154
environments = [Environment(variables=variables) for variables in var_list]
156155
return environments
157156

158157

159158
def create_task_commands(
160-
commands: list[str] | None, params: dict[str, dict[str, Any] | None]
159+
commands: list[str] | None, params: dict[str, str] | None
161160
) -> list[str]:
162161
"""This function prepares list of commands for google batch job from the step configuration."""
163-
args = convert_params_to_hydra_positional_arg(params=params, dataproc=False)
164162
task_commands = []
163+
args: list[str] = []
164+
if params:
165+
args = convert_params_to_hydra_positional_arg(params=params, dataproc=False)
165166
if commands:
166167
task_commands.extend(commands)
167168
task_commands.extend(args)
169+
# ensure all are string values
170+
task_commands = [str(t) for t in task_commands]
168171

169172
if len(task_commands) > 1 and task_commands[0] == "-c":
170173
task_commands = ["-c", " ".join(task_commands[1:])]

0 commit comments

Comments
 (0)