Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[backend] dsl.ParallelFor loop: cannot resolve the upstream artifact output of a previous pod #11520

Open
zeidsolh opened this issue Jan 15, 2025 · 9 comments · May be fixed by #11627
Open

[backend] dsl.ParallelFor loop: cannot resolve the upstream artifact output of a previous pod #11520

zeidsolh opened this issue Jan 15, 2025 · 9 comments · May be fixed by #11627
Assignees
Milestone

Comments

@zeidsolh
Copy link

zeidsolh commented Jan 15, 2025

Environment

  • How did you deploy Kubeflow Pipelines (KFP)?
  • KFP version: master (which I assume is a predecessor to 2.4.0)
  • KFP SDK version: 2.11.0

Steps to reproduce

Here is a simple pipeline that produces this error:

import kfp
import kfp.kubernetes
from kfp import dsl
from kfp.dsl import Artifact, Input, Output


@dsl.component(base_image="python:3.10")
def split_model_ids(model_ids: str) -> list:
    return model_ids.split(",")

@dsl.component(base_image="python:3.10")
def create_file(file: Output[Artifact], content: str):
    with open(file.path, "w") as f:
        f.write(content)

@dsl.component(base_image="python:3.10")
def read_file(file: Input[Artifact]) -> str:
    with open(file.path, "r") as f:
        print(f.read())

@dsl.pipeline(name="Pipeline", description="Pipeline")
def export_model(
    model_ids: str = "",
):
    model_ids_split_op = split_model_ids(model_ids=model_ids)
    with dsl.ParallelFor(model_ids_split_op.output) as model_id:
        create_file_op = create_file(content=model_id)
        read_file_op = read_file(file=create_file_op.outputs["file"])
        read_file_op.after(create_file_op)

if __name__ == "__main__":
    kfp.compiler.Compiler().compile(export_model, "simple_pipeline.yaml")

Expected result

There were some changes that were made to parallel for loop when running from the master branch. I get this error because of the new kfp driver. This happens because I have a dsl.ParallelFor loop, where it cannot resolve the upstream artifact output of a previous pod.

Materials and Reference

Gives this error:

│ main I0107 16:08:18.259327      22 driver.go:984] parent DAG input parameters: map[pipelinechannel--split-model-ids-Output-loop-item:string_value:"zs1"], artifa │
│ cts: map[]                                                                                                                                                       │
│ main panic: runtime error: invalid memory address or nil pointer dereference                                                                                     │
│ main [signal SIGSEGV: segmentation violation code=0x1 addr=0x40 pc=0x1f1cd39]                                                                                    │
│ main                                                                                                                                                             │
│ main goroutine 1 [running]:                                                                                                                                      │
│ main github.com/kubeflow/pipelines/backend/src/v2/driver.resolveUpstreamArtifacts({{0x2c074f0, 0x4148aa0}, 0x0, 0xc000809e40, 0xc000c71200, 0xc00012df00, 0xc000 │
│ a718f0, 0xc000befbc0, {0xc00088dde0, 0x9}, ...})                                                                                                                 │
│ main     /go/src/github.com/kubeflow/pipelines/backend/src/v2/driver/driver.go:1450 +0x4b9                                                                       │
│ main github.com/kubeflow/pipelines/backend/src/v2/driver.resolveInputs({0x2c074f0, 0x4148aa0}, 0xc000c71200, 0x0, 0xc00012df00, 0xc000301b90, 0xc000809840, 0xc0 │
│ 00a718f0, 0xc000ba77f0)                                                                                                                                          │
│ main     /go/src/github.com/kubeflow/pipelines/backend/src/v2/driver/driver.go:1221 +0x1b58                                                                      │
│ main github.com/kubeflow/pipelines/backend/src/v2/driver.Container({0x2c074f0, 0x4148aa0}, {{0x7ffe76d8c3f0, 0x16}, {0x7ffe76d8c410, 0x24}, 0xc0009bf310, 0xffff │
│ ffffffffffff, 0x0, {0xc0009a22a0, ...}, ...}, ...)                                                                                                               │
│ main     /go/src/github.com/kubeflow/pipelines/backend/src/v2/driver/driver.go:263 +0x3c9                                                                        │
│ main main.drive()                                                                                                                                                │
│ main     /go/src/github.com/kubeflow/pipelines/backend/src/v2/cmd/driver/main.go:174 +0xb1c                                                                      │
│ main main.main()                                                                                                                                                 │
│ main     /go/src/github.com/kubeflow/pipelines/backend/src/v2/cmd/driver/main.go:77 +0x65                                                                        │
│ main time="2025-01-07T16:08:18.862Z" level=info msg="sub-process exited" argo=true error="<nil>"                                                                 │
│ main time="2025-01-07T16:08:18.862Z" level=error msg="cannot save parameter /tmp/outputs/pod-spec-patch" argo=true error="open /tmp/outputs/pod-spec-patch: no s │
│ uch file or directory"                                                                                                                                           │
│ main time="2025-01-07T16:08:18.862Z" level=error msg="cannot save parameter /tmp/outputs/cached-decision" argo=true error="open /tmp/outputs/cached-decision: no │
│  such file or directory"                                                                                                                                         │
│ main time="2025-01-07T16:08:18.862Z" level=error msg="cannot save parameter /tmp/outputs/condition" argo=true error="open /tmp/outputs/condition: no such file o │
│ r directory"                                                                                                                                                     │
│ main Error: exit status 2   

Thank you so much!

Impacted by this bug? Give it a 👍.

@zeidsolh zeidsolh changed the title [backend] <Bug Name> [backend] dsl.ParallelFor loop: cannot resolve the upstream artifact output of a previous pod Jan 15, 2025
@HumairAK HumairAK added this to the KFP 2.5.0 milestone Jan 15, 2025
@HumairAK
Copy link
Collaborator

related commits:
c5b787a
b7d8c97

cc @droctothorpe / @gmfrasca FYI you may have more insights here

@droctothorpe
Copy link
Contributor

@zazulam fyi

@droctothorpe
Copy link
Contributor

If you're building from master anyway, @zeidsolh, can you check if the error is reproducible for the following two commit SHAs on the master branch?

  1. c5b787aacc4fddeeb1ebc526a83159540cd7b311
  2. f2fead51cc0b8d9f2df2218f989e0de2faa4f142

Thanks!

@zeidsolh
Copy link
Author

Hey @droctothorpe , sorry for the delay in getting back to you. This commit worked! f2fead5

I was wondering how I could add it to the code in KFP 2.4.0? Do you have any suggestions? Will it be a part of 2.5.0?

c5b787a did not work.

Thank you so much!

@zeidsolh
Copy link
Author

@droctothorpe I am asking how to merge because the commit f2fead5 points to fixing the spelling of two print statements, so I am unsure of what it is that I need to fix/change in my code (KFP 2.4.0).

@droctothorpe
Copy link
Contributor

Thanks for testing, @zeidsolh. Based on your results, I think it's safe to say that c5b787a caused the regression. There must not be a large tests that validates parallelFor output consumption, otherwise this would have been caught by CI. My apologies for not including it in the manual validation.

@zazulam, does the PR you've been working on impact this? If not, I'll prioritize a fix (and corresponding large test) for this ASAP.

@zeidsolh
Copy link
Author

Thank you so much @droctothorpe !

@zazulam
Copy link
Contributor

zazulam commented Feb 12, 2025

@droctothorpe I believe so. The work I'm doing related to 10050 is going to involve the downstream of parallelFor outputs but I think both consumption and production of parallelFor tasks inputs/outputs should be handled in a similar design. I plan on fleshing out that PR within this sprint.

@zazulam
Copy link
Contributor

zazulam commented Feb 13, 2025

/assign @zazulam

@zazulam zazulam linked a pull request Feb 13, 2025 that will close this issue
2 tasks
@zazulam zazulam moved this to In Progress in KFP 2.x Release Feb 14, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: In Progress
Development

Successfully merging a pull request may close this issue.

4 participants