Skip to content

Conversation

@rushirajnenuji
Copy link
Member

This PR adds parallel task execution to ogdc-runner, allowing workflows to process large datasets faster by running tasks concurrently in Argo.

rel: #136

Add core parallel execution model
…models

Add parallel execution config field to ShellWorkflow and VizWorkflow models
Rename module to parallel_config
Update parallel_config.py
[WIP] Add partitioning and parallel exec modules
Align parallel execution with parallel_config model
Remove feature based batching for viz-workflow
Add support for rasterization and 3dtiles
initial work on this PR #114
Utilize parallel orchestration for shell type workflows
resolve issues with circular imports
[WIP] fix issues with adding exec steps  to a DAG
marks first successful test for running the shell workflow in parallel.
[WIP] clean up
Decided to handle these changes in a new branch
[WIP] update parallel partitioning approach and clean up
Removing filesystem type from recipe inputs
Within K8s, I wonder if we're ever going to access the file paths directly from local system. I think the K8s way is to access the object via PVC
[WIP] use PVC instead of artifacts for parallel execution
[WIP] Clean up and refactor parallel execution logic
Fix PVC mount name and file iteration
Make max parallel limit configurable via env-var
Update documentation
Fix issues with mypy
Copy link
Member

@trey-stafford trey-stafford left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize this is still in draft form, but wanted to get some initial feedback in before the holidays.

Lot of good work here - nice job! I do have some suggested changes and raise a couple of questions that we may want to discuss further. Overall though this is a great feature to have!

Do you have any "real-world" recipes for this feature drafted? It would be nice to see how e.g,. the PDG plans to use this. Maybe we could use that as an example?

# Each command will be executed in parallel across partitions

# Step 1: Process files - add header and line numbers
cat "$INPUT_FILE" | awk 'BEGIN {print "--- Processed File ---"} {print NR": "$0}' > "$OUTPUT_FILE"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not follow the shell recipe convention of using /input_dir/ and /output_dir/ to represent the input/output directory for each step.

E.g., see: https://ogdc-runner.readthedocs.io/en/latest/recipes.html#shell-workflow

  • It is expected that each command in the recipe.sh place data in /output_dir/
  • The input data for each step is always assumed to be in /input_dir/. The previous step’s /output_dir/ becomes the next step’s /input_dir/. The first step’s /input_dir/ contains the data specified in the meta.yaml’s input.
  • Each command is executed in isolation. Do not expect envvars (e.g., export ENVVAR=foo) to persist between lines.

We should try to make this consistent across sequential and parallel shell recipes. Maybe we could consolidate around $INPUT and $OUTPUT? Alternatively, this might be a reason to separate the parallel shell recipe into it's own type (parallel-shell), to clearly distinguish it from the sequential case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still think we need to address this. At a minimum, let's create an issue to return to once this PR is complete.

@field_validator("function", mode="before")
@classmethod
def validate_function(cls, v: Any) -> Any:
"""Validate that function is callable if provided.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does pydantic not automatically validate that this is a callable based on the typing above? Or does exclude prevent that? What's the use-case here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on this?

msg = f"ExecutionFunction '{func.name}' must have 'command' or 'function'"
raise ValueError(msg)

def _create_shell_template(self, func: ExecutionFunction) -> Container:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider moving this (and other shell-specific) logic to the shell module? I would strive to make this module (and the other partition/parallelization-related modules) to define high-level abstractions that can be used by recipe-specific implementations. his could look like defining an abstract class here, and then creating a ShellParallelExecutionOrchestrator class specifically for shell workflows that inherits from that.

That said, this might make more sense to tackle as part of supporting parallelization for viz workflows, or in another follow-on PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on this?


Each parallel task:

- Receives a partition of input files via workflow parameters
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it expected that each task have the same number of files?

docs/recipes.md Outdated
Each parallel task:

- Receives a partition of input files via workflow parameters
- Executes the same command/function independently
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that each command will be run once per file? Or do the underlying commands need to be capable of handling the partition of files passed to them?

Fix inputs for subsequent shell cmds
fix mypy errors
Add more fixes
remove get_max_parallelism
update documentation
Move test_parallel_recipe to unit test
Update recipe
@rushirajnenuji rushirajnenuji marked this pull request as ready for review January 14, 2026 16:56
Resolve type checking issues
remove version specific decorators
Copy link
Member

@trey-stafford trey-stafford left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting close! Nice work on this - it's a big change!

2. **Creates** independent Argo tasks for each partition
3. **Orchestrates** parallel execution with configurable maximum parallelism

The `ParallelExecutionOrchestrator` class manages this process, creating Argo
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The `ParallelExecutionOrchestrator` class manages this process, creating Argo
The {class}`ogdc_runner.parallel.ParallelExecutionOrchestrator` class manages this process, creating Argo

Comment on lines 65 to 67
- {mod} `ogdc_runner.parallel`: Orchestration logic for parallel task creation
- {mod} `ogdc_runner.partitioning`: Partitioning strategies for dividing work
- {mod} `ogdc_runner.models.parallel_config`: Configuration models for parallel
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- {mod} `ogdc_runner.parallel`: Orchestration logic for parallel task creation
- {mod} `ogdc_runner.partitioning`: Partitioning strategies for dividing work
- {mod} `ogdc_runner.models.parallel_config`: Configuration models for parallel
- {mod}`ogdc_runner.parallel`: Orchestration logic for parallel task creation
- {mod}`ogdc_runner.partitioning`: Partitioning strategies for dividing work
- {mod}`ogdc_runner.models.parallel_config`: Configuration models for parallel

"""
if use_input_as_output:

def get_recipe_inputs_path(recipe_id: str) -> str:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function appears to be unused and should be removed.

artifact storage.

Returns:
Output directory path
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Output directory path
Output directory as a string.

Comment on lines 62 to 64
use_input_as_output: If True, store inputs in PVC at a path that can be
referenced as output for downstream recipes. If False, use temporary
artifact storage.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
use_input_as_output: If True, store inputs in PVC at a path that can be
referenced as output for downstream recipes. If False, use temporary
artifact storage.
use_input_as_output: If True, return `"/mnt/workflow/{recipe_id}/inputs"`. Otherwise `/output_dir`.

It is not accurate to say that this function stores inputs on a PVC or artifact storage. It just returns a str.

Complete shell script as a string
"""
return f"""
set -e
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could move this string into a shell file that gets read here instead of inlining it? This would make it easier for editors to pick up on shell syntax issues and generally make it easier to read/edit.

command = "rsync --progress /input_dir/* /output_dir/"
volume_mounts = [
models.VolumeMount(
name=OGDC_WORKFLOW_PVC.name, mount_path="/output_dir/", sub_path=recipe_id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
name=OGDC_WORKFLOW_PVC.name, mount_path="/output_dir/", sub_path=recipe_id
name=OGDC_WORKFLOW_PVC.name,
mount_path="/output_dir/",
sub_path=recipe_id,

Bit of a nitpick here: easier to read kwargs passed when they are each on a separate line (and easier to add new kwargs w/o going over line-length).

# Each command will be executed in parallel across partitions

# Step 1: Process files - add header and line numbers
cat "$INPUT_FILE" | awk 'BEGIN {print "--- Processed File ---"} {print NR": "$0}' > "$OUTPUT_FILE"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still think we need to address this. At a minimum, let's create an issue to return to once this PR is complete.

@field_validator("function", mode="before")
@classmethod
def validate_function(cls, v: Any) -> Any:
"""Validate that function is callable if provided.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on this?

msg = f"ExecutionFunction '{func.name}' must have 'command' or 'function'"
raise ValueError(msg)

def _create_shell_template(self, func: ExecutionFunction) -> Container:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on this?

Feedback suggestions and clean up
Move partition script to a separate module
refactor: abstract ParallelExecutionOrchestrator
- abstract ParallelExecutionOrchestrator module defines the interface
- move shell specific logic to shell module
Fix shellcheck issues and updated documentation
Fix failing test
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants