Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] Parse Iterables for PipelineParameterChannels #10840

Open
zazulam opened this issue May 22, 2024 · 13 comments · May be fixed by #11219 or #11502
Open

[feature] Parse Iterables for PipelineParameterChannels #10840

zazulam opened this issue May 22, 2024 · 13 comments · May be fixed by #11219 or #11502

Comments

@zazulam
Copy link
Contributor

zazulam commented May 22, 2024

Feature Area

/area sdk

What feature would you like to see?

During the build stage of the pipeline spec the kfp.dsl.pipeline_spec_builder.build_task_spec_for_task method loops through the tasks' inputs and based on the data type of the input performs certain logic based on that type. I would like to see additional logic to check for PipelineParameterChannels/PipelineArtifactChannels within common python iterables that are used as kfp inputs i.e. list & dict as the current state only checks for those types on the object it is looping over rather than going one level deep before sending the input to the _to_protobuf_value method.

What is the use case or pain point?

My team currently uses a custom mapping of our components to allow for our end users to dynamically build pipelines definitions in v1. We store our components output dictionary within another dictionary and reference those outputs as we iterate over the required components to run. This feature can assist in dynamic pipeline definitions and also helps with minimizing code changes/design for migration to v2.

Is there a workaround currently?

I am currently unaware of any workaround at this moment


Love this idea? Give it a 👍.

Copy link

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Jul 22, 2024
Copy link

This issue has been automatically closed because it has not had recent activity. Please comment "/reopen" to reopen it.

@zazulam
Copy link
Contributor Author

zazulam commented Aug 28, 2024

/reopen

@google-oss-prow google-oss-prow bot reopened this Aug 28, 2024
Copy link

@zazulam: Reopened this issue.

In response to this:

/reopen

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@zazulam
Copy link
Contributor Author

zazulam commented Aug 28, 2024

I created some examples of what I've attempted in v2.

  1. Attempt using kfp Artifacts / List[Datasets]
from typing import List
from kfp.dsl import Dataset, component, pipeline, Output, Input

@component(base_image="python:3.9",)
def create_dataset_paths(name:str, out_dfs: Output[Dataset], input_dfs:List[Dataset]=None):
    
    if input_dfs:
        input_df_paths = {input_df.name:input_df.metadata for input_df in input_dfs}
        print(input_df_paths)
    
    dataset_paths = {
        'wine': 's3://my-bucket/datasets/wine_dataset.csv',
        'iris': 's3://my-bucket/datasets/iris_dataset.csv',
        'cancer': 's3://my-bucket/datasets/cancer_dataset.csv'
    }

    out_dfs.name = f'{name}_dfs'
    out_dfs.metadata = dataset_paths


@component(base_image="python:3.9",)
def process_datasets(dataset_artifact: Input[Dataset]):

    dataset_paths = dataset_artifact.metadata
    
    for name, path in dataset_paths.items():
        print(f"Looking at {name} dataset at S3 path: {path}")
        

@pipeline(name="dynamic-pipeline-example")
def dynamic_pipeline():
    fruits = {
       'apple': ['banana', 'orange'],
      'banana': ['orange'],
     'orange': [],
   }
    sorted_fruits = dict(sorted(fruits.items(), key=lambda item: len(item[1])))
    output_pool = {}
    for fruit, children in sorted_fruits.items():
        if children:
            current_task = create_dataset_paths(name=fruit, input_dfs=[output_pool[child] for child in children])
        else:
            current_task = create_dataset_paths(name=fruit)
        output_pool[fruit] = current_task.outputs["out_dfs"]
        process_datasets(dataset_artifact=current_task.outputs["out_dfs"])

endpoint = 'http://localhost:80'
kfp_client = kfp.client.Client(host=endpoint)
run = kfp_client.create_run_from_pipeline_func(
    dynamic_pipeline,
    arguments={},
)

That results in the following error:

Traceback (most recent call last):
  File "/mnt/c/Users/zaz/Projects/Python/KFP/dataset_usage.py", line 30, in <module>
    @pipeline(name="dynamic-pipeline-example")
     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/dsl/pipeline_context.py", line 65, in pipeline
    return component_factory.create_graph_component_from_func(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/dsl/component_factory.py", line 673, in create_graph_component_from_func
    return graph_component.GraphComponent(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/dsl/graph_component.py", line 58, in __init__
    pipeline_outputs = pipeline_func(*args_list)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/mnt/c/Users/zaz/Projects/Python/KFP/dataset_usage.py", line 41, in dynamic_pipeline
    current_task = create_dataset_paths(name=fruit, input_dfs=[output_pool[child] for child in children])
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/dsl/base_component.py", line 101, in __call__
    return pipeline_task.PipelineTask(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/dsl/pipeline_task.py", line 118, in __init__
    type_utils.verify_type_compatibility(
  File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/dsl/types/type_utils.py", line 330, in verify_type_compatibility
    raise InconsistentTypeException(error_text)
kfp.dsl.types.type_utils.InconsistentTypeException: Incompatible argument passed to the input 'input_dfs' of component 'create-dataset-paths': Argument type 'LIST' 
is incompatible with the input type 'List[system.Dataset@0.0.1]'
  1. Using a PipelineArtifactChannel for those components result in this error:
Traceback (most recent call last):
  File "/mnt/c/Users/zaz/Projects/Python/KFP/artifact_collections.py", line 95, in <module>
    @pipeline(name="dynamic-pipeline-example")
     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/dsl/pipeline_context.py", line 65, in pipeline
    return component_factory.create_graph_component_from_func(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/dsl/component_factory.py", line 673, in create_graph_component_from_func
    return graph_component.GraphComponent(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/dsl/graph_component.py", line 68, in __init__
    pipeline_spec, platform_spec = builder.create_pipeline_spec(
                                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/compiler/pipeline_spec_builder.py", line 1919, in create_pipeline_spec
    build_spec_by_group(
  File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/compiler/pipeline_spec_builder.py", line 1272, in build_spec_by_group
    subgroup_task_spec = build_task_spec_for_task(
                         ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/compiler/pipeline_spec_builder.py", line 309, in build_task_spec_for_task
    to_protobuf_value(input_value))
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/compiler/pipeline_spec_builder.py", line 78, in to_protobuf_value
    values=[to_protobuf_value(v) for v in value]))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/compiler/pipeline_spec_builder.py", line 78, in <listcomp>
    values=[to_protobuf_value(v) for v in value]))
            ^^^^^^^^^^^^^^^^^^^^
  File "/mnt/c/Users/zaz/miniconda3/envs/kfpv2/lib/python3.11/site-packages/kfp/compiler/pipeline_spec_builder.py", line 80, in to_protobuf_value
    raise ValueError('Value must be one of the following types: '
ValueError: Value must be one of the following types: str, int, float, bool, dict, and list. Got: "{{channel:task=create-dataset-paths;name=orange;type=Dataset;}}" 
of type "<class 'kfp.dsl.pipeline_channel.PipelineArtifactChannel'>".

This ValueError was the same error I would receive when processing a dictionary input to a component where said dict was dynamically created during client-side compilation when setting the values to outputs of previous components. However, the type of the argument received was a PipelineParameterChannel.

When modifying the kfp.dsl.pipeline_spec_builder.build_task_spec_for_task to perform a check on lists/dicts for PipelineParameterChannels and PipelineArtifactChannels I was able to successful compile and have a properly connected DAG render in the UI based on the component outputs that would be fed in via that dictionary.

It seems to me that the List[Dataset] is something that can only be retrieved as a component output by having a component return an object of that type rather than declaring that list within the pipeline during compilation.

Is there an existing solution or method of collecting component outputs and feeding them into downstream components programmatically? My team uses this capability in v1 as we generate dynamic DAGs using the same pipeline definition function.

@github-actions github-actions bot removed the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Aug 29, 2024
@chensun
Copy link
Member

chensun commented Oct 10, 2024

PipelineParameterChannel and PipelineArtifactChannel are compilation time placeholder that will be replaced either with the constant values or, more commonly, runtime placeholders (e.g.: {{$inputs.parameters[...]}} which can be resolved only at runtime. So the type checking you're proposal is only applicable for the constant value case.

I'm still a bit unclear on what real-world use case you're proposing to solve. Can you maybe give an example and how you would achieve it in kfp v1?

@zazulam
Copy link
Contributor Author

zazulam commented Oct 11, 2024

Thanks for the response Chen! So the main reason for all of this is because we have some teams that run a large pipeline where they built the ability to use slightly different configurations that can determine which components run and which ones do not. These teams have pipelines where the number of components/nodes can span from 5 to 120, and when operating at that scale, to modify their pipeline definition each time and “rewire” the components to either skip or select which ones run makes it rough on the end user. This is the root issue, large scale pipeline users needing to manipulate their workflow without the need to rewire their DAG manually. Since in v1 we did not have the feature of sub-DAGs, the team determined that they can use their existing yaml structure that was already defined and essentially dynamically generate the pipeline definition by leveraging the placeholders values to connect the DAG for them at runtime without the need to statically type out each component instantiation in the pipeline function. The fact that they can funnel the outputs from varying components into a collection and then feed that collection into a downstream node in v1 allows for this ability to generate the DAG structure.

However, that ability is not available in v2.

I'll share an example of how a team does this in v1:

Here is an example of the yaml structure that would be used.

operation_a:
 operator_name: operator_a
 required_args:
  arg1: value1
  arg2: value2
 operator_inputs:
  {}

operation_b:
 operator_name: operator_b
 required_args:
  arg1: value1
  arg2: value2
 operator_inputs:
  input_x:
   operation_c: output_key

operation_c:
 operator_name: operator_c
 required_args:
  arg1: value1
  arg2: value2
 operator_inputs:
  input_x:
   operation_d: output_key

operation_d:
 operator_name: operator_d
 required_args: {}
 operator_inputs:
  input_x:
   operation_a: output_key

operation_e:
 operator_name: operator_e
 required_args:
  arg1: value1
  arg2: value2
 operator_inputs:
  input_x:
   operation_d: output_key
   operation_b: output_key

Here is an example of parsing an OrderedDict, workflow_components, from that yaml after performing a topological sort on it.

@kfp.dsl.pipeline(name='operation flow')
def sample_pipeline():

   comp_outputs = {}
   for comp in workflow_components.keys():
      comp_details = workflow_components[comp]

      # simple example of checking for starting nodes in the dag
      if len(comp_details["operator_inputs"]) == 0:
         operator_task = operator_component(operator_name=comp, required_args=comp_details["required_args"], )
      else:
         op_inputs = {}

         for key, value in comp_details["operator_inputs"].items():
            op = list(value.keys())[0]
            op_inputs[key] = comp_outputs[op]["output_key"]

         operator_task = operator_component(operator_name=comp, required_args=comp_details["required_args"], operator_inputs=op_inputs)

      operator_task.set_display_name(comp)
      comp_outputs[comp] = operator_task.outputs

kfp.compiler.Compiler().compile(sample_pipeline, 'sample_pipeline.yaml')

kfp_client = Client()
run = kfp_client.create_run_from_pipeline_func(
    sample_pipeline,
    arguments={
    },
)

image

@zazulam
Copy link
Contributor Author

zazulam commented Oct 11, 2024

Here is also a snip of the dag spec in the argo wf manifest:

  templates:
  - dag:
      tasks:
      - arguments: {}
        name: operator-comp
        template: operator-comp
      - arguments:
          parameters:
          - name: operator-comp-output_key
            value: '{{tasks.operator-comp.outputs.parameters.operator-comp-output_key}}'
        dependencies:
        - operator-comp
        name: operator-comp-2
        template: operator-comp-2
      - arguments:
          parameters:
          - name: operator-comp-2-output_key
            value: '{{tasks.operator-comp-2.outputs.parameters.operator-comp-2-output_key}}'
        dependencies:
        - operator-comp-2
        name: operator-comp-3
        template: operator-comp-3
      - arguments:
          parameters:
          - name: operator-comp-3-output_key
            value: '{{tasks.operator-comp-3.outputs.parameters.operator-comp-3-output_key}}'
        dependencies:
        - operator-comp-3
        name: operator-comp-4
        template: operator-comp-4
      - arguments:
          parameters:
          - name: operator-comp-2-output_key
            value: '{{tasks.operator-comp-2.outputs.parameters.operator-comp-2-output_key}}'
          - name: operator-comp-4-output_key
            value: '{{tasks.operator-comp-4.outputs.parameters.operator-comp-4-output_key}}'
        dependencies:
        - operator-comp-2
        - operator-comp-4
        name: operator-comp-5
        template: operator-comp-5

@gregsheremeta
Copy link
Contributor

@zazulam your team has implemented a DSL on top of a DSL 😑

🤯 because here we have a yaml abstraction on top of python so we can write pipelines in yaml instead of python

and the python being abstracted is itself is an abstraction on top of a way to write pipelines using yaml

@droctothorpe
Copy link
Contributor

DSLs all the way down 🤣. It's def unorthodox (our users / data scientists can be quite...creative), but it was supported in V1, is not supported in V2, is trivial to add support for, and the work is already complete.

Copy link

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Dec 11, 2024
@zazulam
Copy link
Contributor Author

zazulam commented Dec 11, 2024

After diving into it more, there were changes to the pipelinespec that needed to be added to appropriately handle the downstream usage of the component outputs. I'll be updating the associated PR soon.

@stale stale bot removed the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Dec 11, 2024
@chensun
Copy link
Member

chensun commented Jan 7, 2025

The so-called dynamic pipeline is kind of a misuse of KFP DSL. A pipeline has a static definition when sent to the API. All the "dynamic" logic shown in this sample--for loops, if/else conditions, etc.--has happened at pipeline compilation locally.

@kfp.dsl.pipeline(name='operation flow')
def sample_pipeline():

   comp_outputs = {}
   for comp in workflow_components.keys():
      comp_details = workflow_components[comp]

      # simple example of checking for starting nodes in the dag
      if len(comp_details["operator_inputs"]) == 0:
         operator_task = operator_component(operator_name=comp, required_args=comp_details["required_args"], )
      else:
         op_inputs = {}

         for key, value in comp_details["operator_inputs"].items():
            op = list(value.keys())[0]
            op_inputs[key] = comp_outputs[op]["output_key"]

         operator_task = operator_component(operator_name=comp, required_args=comp_details["required_args"], operator_inputs=op_inputs)

      operator_task.set_display_name(comp)
      comp_outputs[comp] = operator_task.outputs

I strongly suggest users avoid using non-DSL python code in pipeline definition, as many often got confused why their code didn't run as expected.
Likely all dynamic logics can and possible should be captured within components, and executed remotely at runtime.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants