Skip to content

Commit 7c7e595

Browse files
author
Szymon Szyszkowski
committed
chore: cleanup
1 parent aeab7fa commit 7c7e595

File tree

4 files changed

+14
-9
lines changed

4 files changed

+14
-9
lines changed

src/ot_orchestration/dags/gwas_catalog_harmonisation.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@
99
from airflow.models.baseoperator import chain
1010
from airflow.models.dag import DAG
1111

12-
from ot_orchestration.operators.batch.harmonisation import (
12+
from ot_orchestration.operators.batch.generic import (
1313
BatchIndexOperator,
14-
GeneticsBatchJobOperator,
14+
BatchJobOperator,
1515
)
1616
from ot_orchestration.utils import (
1717
find_node_in_config,
@@ -50,7 +50,7 @@ def end():
5050
batch_index_specs=node_config["google_batch_index_specs"],
5151
)
5252
node_config = find_node_in_config(config["nodes"], "gwas_catalog_harmonisation")
53-
harmonisation_batch_job = GeneticsBatchJobOperator.partial(
53+
harmonisation_batch_job = BatchJobOperator.partial(
5454
task_id=node_config["id"],
5555
job_name="harmonisation",
5656
google_batch=node_config["google_batch"],

src/ot_orchestration/operators/batch/harmonisation.py renamed to src/ot_orchestration/operators/batch/generic.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,9 @@ def execute(self, context) -> list[BatchIndexRow]:
6767
return rows
6868

6969

70-
class GeneticsBatchJobOperator(CloudBatchSubmitJobOperator):
70+
class BatchJobOperator(CloudBatchSubmitJobOperator):
71+
"""Generic Batch Job operator."""
72+
7173
def __init__(
7274
self,
7375
job_name: str,

src/ot_orchestration/utils/batch.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,22 +143,25 @@ def create_batch_job(
143143
return job
144144

145145

146-
def create_task_env(var_list: list[dict[str, Any]]):
146+
def create_task_env(var_list: list[dict[str, str]]):
147147
"""This function creates list of batch_v1.Environment objects from provided list of dictionaries."""
148-
print(f"{var_list=}")
149148
environments = [Environment(variables=variables) for variables in var_list]
150149
return environments
151150

152151

153152
def create_task_commands(
154-
commands: list[str] | None, params: dict[str, dict[str, Any] | None]
153+
commands: list[str] | None, params: dict[str, str] | None
155154
) -> list[str]:
156155
"""This function prepares list of commands for google batch job from the step configuration."""
157-
args = convert_params_to_hydra_positional_arg(params=params, dataproc=False)
158156
task_commands = []
157+
args: list[str] = []
158+
if params:
159+
args = convert_params_to_hydra_positional_arg(params=params, dataproc=False)
159160
if commands:
160161
task_commands.extend(commands)
161162
task_commands.extend(args)
163+
# ensure all are string values
164+
task_commands = [str(t) for t in task_commands]
162165

163166
if len(task_commands) > 1 and task_commands[0] == "-c":
164167
task_commands = ["-c", " ".join(task_commands[1:])]

src/ot_orchestration/utils/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ def chain_dependencies(nodes: list[ConfigNode], tasks_or_task_groups: dict[str,
138138

139139

140140
def convert_params_to_hydra_positional_arg(
141-
params: dict[str, Any] | None, dataproc: bool = False
141+
params: dict[str, str] | None, dataproc: bool = False
142142
) -> list[str]:
143143
"""Convert configuration parameters to form that can be passed to hydra step positional arguments.
144144

0 commit comments

Comments
 (0)