Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flytepropeller][flyteadmin] Streaming Decks V2 #6053

Open
wants to merge 28 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
54aa165
add tests from Yi-Cheng
Future-Outlier Nov 27, 2024
9ed6b6e
helped by Kevin and Yi-Cheng
Future-Outlier Nov 27, 2024
4b4f6bd
lint
Future-Outlier Nov 27, 2024
dd774cb
nit
Future-Outlier Nov 28, 2024
0bb8e91
add comments
Future-Outlier Dec 13, 2024
25fea29
add comments and better solution for backward compativle
Future-Outlier Dec 17, 2024
4e24e91
better comments
Future-Outlier Dec 17, 2024
8d1d0e4
DeckStatus
Future-Outlier Dec 18, 2024
31853bb
rename GetDeckStatus
Future-Outlier Dec 18, 2024
4068043
comments
Future-Outlier Dec 18, 2024
65b6efe
lint
Future-Outlier Jan 2, 2025
137579f
fix
Future-Outlier Jan 9, 2025
04f7fbc
Merge branch 'master' into streaming-deck-v2
Future-Outlier Jan 9, 2025
aa56d64
Merge branch 'master' into streaming-deck-v2
Future-Outlier Jan 13, 2025
a16851f
use BoolValue as IDL, suggested by Eduardo
Future-Outlier Jan 13, 2025
7314455
change commennts
Future-Outlier Jan 13, 2025
19498f5
update
Future-Outlier Jan 13, 2025
74f595f
fix
Future-Outlier Jan 13, 2025
3bd3336
fix
Future-Outlier Jan 14, 2025
f6d8493
Merge branch 'master' into streaming-deck-v2
Future-Outlier Jan 14, 2025
4b56e52
fix
Future-Outlier Jan 14, 2025
db4b19e
remove unused ogic
Future-Outlier Jan 14, 2025
2737251
Update flyteidl/protos/flyteidl/core/tasks.proto
Future-Outlier Jan 16, 2025
564dc5f
Update flyteidl/protos/flyteidl/core/tasks.proto
Future-Outlier Jan 16, 2025
69ba94e
Merge remote-tracking branch 'origin' into streaming-deck-v2
eapolinario Jan 16, 2025
c992eae
Merge branch 'master' into streaming-deck-v2
Future-Outlier Jan 17, 2025
0b91b5c
Update by Kevin's advice
Future-Outlier Jan 17, 2025
1d18265
update
Future-Outlier Jan 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions flyteadmin/pkg/repositories/transformers/node_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func addNodeRunningState(request *admin.NodeExecutionEventRequest, nodeExecution
"failed to marshal occurredAt into a timestamp proto with error: %v", err)
}
closure.StartedAt = startedAtProto
closure.DeckUri = request.GetEvent().GetDeckUri()
return nil
}

Expand Down
35 changes: 35 additions & 0 deletions flyteadmin/pkg/repositories/transformers/node_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var childExecutionID = &core.WorkflowExecutionIdentifier{
const dynamicWorkflowClosureRef = "s3://bucket/admin/metadata/workflow"

const testInputURI = "fake://bucket/inputs.pb"
const DeckURI = "fake://bucket/deck.html"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test here showing that the deck uri also shows up in the case of failed executions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


var testInputs = &core.LiteralMap{
Literals: map[string]*core.Literal{
Expand All @@ -65,6 +66,7 @@ func TestAddRunningState(t *testing.T) {
Event: &event.NodeExecutionEvent{
Phase: core.NodeExecution_RUNNING,
OccurredAt: startedAtProto,
DeckUri: DeckURI,
},
}
nodeExecutionModel := models.NodeExecution{}
Expand All @@ -73,6 +75,7 @@ func TestAddRunningState(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, startedAt, *nodeExecutionModel.StartedAt)
assert.True(t, proto.Equal(startedAtProto, closure.GetStartedAt()))
assert.Equal(t, DeckURI, closure.GetDeckUri())
}

func TestAddTerminalState_OutputURI(t *testing.T) {
Expand All @@ -84,6 +87,7 @@ func TestAddTerminalState_OutputURI(t *testing.T) {
OutputUri: outputURI,
},
OccurredAt: occurredAtProto,
DeckUri: DeckURI,
},
}
startedAt := occurredAt.Add(-time.Minute)
Expand All @@ -99,6 +103,7 @@ func TestAddTerminalState_OutputURI(t *testing.T) {
assert.Nil(t, err)
assert.EqualValues(t, outputURI, closure.GetOutputUri())
assert.Equal(t, time.Minute, nodeExecutionModel.Duration)
assert.Equal(t, DeckURI, closure.GetDeckUri())
}

func TestAddTerminalState_OutputData(t *testing.T) {
Expand Down Expand Up @@ -193,6 +198,36 @@ func TestAddTerminalState_Error(t *testing.T) {
assert.Equal(t, time.Minute, nodeExecutionModel.Duration)
}

func TestAddTerminalState_DeckURIInFailedExecution(t *testing.T) {
error := &core.ExecutionError{
Code: "foo",
}
request := admin.NodeExecutionEventRequest{
Event: &event.NodeExecutionEvent{
Phase: core.NodeExecution_FAILED,
OutputResult: &event.NodeExecutionEvent_Error{
Error: error,
Comment on lines +202 to +209
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable name shadows built-in error type

Consider using error as a variable name may shadow the built-in error type. Consider renaming to execError or similar.

Code suggestion
Check the AI-generated fix before applying
 -	error := &core.ExecutionError{
 +	execError := &core.ExecutionError{
 		Code: "foo",
 	}
 -			Error: error,
 +			Error: execError,

Code Review Run #dc455e


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

},
OccurredAt: occurredAtProto,
DeckUri: DeckURI,
},
}
startedAt := occurredAt.Add(-time.Minute)
startedAtProto, _ := ptypes.TimestampProto(startedAt)
nodeExecutionModel := models.NodeExecution{
StartedAt: &startedAt,
}
closure := admin.NodeExecutionClosure{
StartedAt: startedAtProto,
}
err := addTerminalState(context.TODO(), &request, &nodeExecutionModel, &closure,
interfaces.InlineEventDataPolicyStoreInline, commonMocks.GetMockStorageClient())
assert.Nil(t, err)
assert.True(t, proto.Equal(error, closure.GetError()))
assert.Equal(t, time.Minute, nodeExecutionModel.Duration)
assert.Equal(t, DeckURI, closure.GetDeckUri())
}

func TestCreateNodeExecutionModel(t *testing.T) {
parentTaskExecID := uint(8)
request := &admin.NodeExecutionEventRequest{
Expand Down
127 changes: 108 additions & 19 deletions flytepropeller/pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@

const pluginContextKey = contextutils.Key("plugin")

type DeckStatus int

const (
DeckUnknown DeckStatus = iota
DeckEnabled
DeckDisabled
)

type metrics struct {
pluginPanics labeled.Counter
unsupportedTaskType labeled.Counter
Expand Down Expand Up @@ -71,10 +79,42 @@
return taskType + "_" + pluginID
}

func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, deckPath *storage.DataReference, entry catalog.Entry) {
func (p *pluginRequestedTransition) AddDeckURI(tCtx *taskExecutionContext) {
var deckURI *storage.DataReference
deckURIValue := tCtx.ow.GetDeckPath()
deckURI = &deckURIValue

if p.execInfo.OutputInfo == nil {
p.execInfo.OutputInfo = &handler.OutputInfo{}
}

p.execInfo.OutputInfo.DeckURI = deckURI
}

func (p *pluginRequestedTransition) RemoveDeckURIIfDeckNotExists(ctx context.Context, tCtx *taskExecutionContext) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider clearer function name

The function name RemoveDeckURIIfDeckNotExists seems to have a double negative and may be confusing. Consider renaming to ClearDeckURIIfMissing or similar for better clarity.

Code suggestion
Check the AI-generated fix before applying
Suggested change
func (p *pluginRequestedTransition) RemoveDeckURIIfDeckNotExists(ctx context.Context, tCtx *taskExecutionContext) error {
func (p *pluginRequestedTransition) ClearDeckURIIfMissing(ctx context.Context, tCtx *taskExecutionContext) error {

Code Review Run #dc455e


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

reader := tCtx.ow.GetReader()
if reader == nil {
return nil
}

Check warning on line 98 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L97-L98

Added lines #L97 - L98 were not covered by tests

exists, err := reader.DeckExists(ctx)
if err != nil {
logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err)
p.execInfo.OutputInfo.DeckURI = nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider nil check for OutputInfo

Consider initializing OutputInfo if it's nil before attempting to set DeckURI to avoid potential nil pointer dereference in RemoveDeckURIIfDeckNotExists

Code suggestion
Check the AI-generated fix before applying
 @@ -102,2 +102,5 @@
 		logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err)
 +		if p.execInfo.OutputInfo == nil {
 +			p.execInfo.OutputInfo = &handler.OutputInfo{}
 +		}
 		p.execInfo.OutputInfo.DeckURI = nil

Code Review Run #dc455e


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

return regErrors.Wrapf(err, "failed to check existence of deck file")
}

Check warning on line 105 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L102-L105

Added lines #L102 - L105 were not covered by tests

if !exists && p.execInfo.OutputInfo != nil {
p.execInfo.OutputInfo.DeckURI = nil
}

return nil
}

func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, entry catalog.Entry) {

Check warning on line 114 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L114

Added line #L114 was not covered by tests
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
p.ttype = handler.TransitionTypeEphemeral
p.pInfo = pluginCore.PhaseInfoSuccess(nil)
p.ObserveSuccess(outputPath, deckPath, &event.TaskNodeMetadata{CacheStatus: entry.GetStatus().GetCacheStatus(), CatalogKey: entry.GetStatus().GetMetadata()})
p.ObserveSuccess(outputPath, &event.TaskNodeMetadata{CacheStatus: entry.GetStatus().GetCacheStatus(), CatalogKey: entry.GetStatus().GetMetadata()})

Check warning on line 117 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L117

Added line #L117 was not covered by tests
}

func (p *pluginRequestedTransition) PopulateCacheInfo(entry catalog.Entry) {
Expand Down Expand Up @@ -144,10 +184,13 @@
return ToTaskExecutionEvent(input)
}

func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, deckPath *storage.DataReference, taskMetadata *event.TaskNodeMetadata) {
p.execInfo.OutputInfo = &handler.OutputInfo{
OutputURI: outputPath,
DeckURI: deckPath,
func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, taskMetadata *event.TaskNodeMetadata) {
if p.execInfo.OutputInfo == nil {
p.execInfo.OutputInfo = &handler.OutputInfo{
OutputURI: outputPath,
}

Check warning on line 191 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L189-L191

Added lines #L189 - L191 were not covered by tests
} else {
p.execInfo.OutputInfo.OutputURI = outputPath
Comment on lines +187 to +193
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider validating OutputURI before update

Consider checking if the OutputURI is empty before updating it. The current implementation may overwrite an existing valid output URI with an empty one.

Code suggestion
Check the AI-generated fix before applying
 -	if p.execInfo.OutputInfo == nil {
 +	if p.execInfo.OutputInfo == nil && len(outputPath) > 0 {
  		p.execInfo.OutputInfo = &handler.OutputInfo{
  			OutputURI: outputPath,
  		}
  	} else {
 -		p.execInfo.OutputInfo.OutputURI = outputPath
 +		if len(outputPath) > 0 {
 +			p.execInfo.OutputInfo.OutputURI = outputPath
 +		}
  	}

Code Review Run #ecb1b4


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

}

p.execInfo.TaskNodeInfo = &handler.TaskNodeInfo{
Expand All @@ -171,7 +214,8 @@
}

logger.Debugf(ctx, "Task still running")
return handler.DoTransition(p.ttype, handler.PhaseInfoRunning(nil)), nil
// Here will send the deck uri to flyteadmin
return handler.DoTransition(p.ttype, handler.PhaseInfoRunning(&p.execInfo)), nil
}

// The plugin interface available especially for testing.
Expand Down Expand Up @@ -380,6 +424,40 @@
return t.taskMetricsMap[metricNameKey], nil
}

func GetDeckStatus(ctx context.Context, tCtx *taskExecutionContext) (DeckStatus, error) {
// GetDeckStatus determines whether a task generates a deck based on its execution context.
//
// This function evaluates the current condition of the task to determine the deck status:
//
// | Condition Description | Has Deck |
// |--------------------------------|----------|
// | Enabled and Running | Yes |
// | Unknown State with Deck | Yes |
// | Unknown State without Deck | No |
// | Enabled and Succeeded | Yes |
// | Enabled but Memory Exceeded | No |
// | Disabled | No |
//
// The lifecycle of deck generation is as follows:
// - During task execution, the condition is checked to determine if a deck should be generated.
// - In terminal states, if the status is DeckUnknown or DeckEnabled, a HEAD request can be made to verify the existence of the deck file.
template, err := tCtx.tr.Read(ctx)
if err != nil {
return DeckUnknown, regErrors.Wrapf(err, "failed to read task template")
}

Check warning on line 447 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L446-L447

Added lines #L446 - L447 were not covered by tests

deckValue := template.GetMetadata().GetGeneratesDeck()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add nil check for metadata access

Consider adding error handling for the case when GetMetadata() returns nil. Currently, if template.GetMetadata() returns nil, the code will panic when calling GetGeneratesDeck() on a nil pointer.

Code suggestion
Check the AI-generated fix before applying
Suggested change
deckValue := template.GetMetadata().GetGeneratesDeck()
metadata := template.GetMetadata()
if metadata == nil {
return DeckUnknown, nil
}
deckValue := metadata.GetGeneratesDeck()

Code Review Run #9d753e


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

if deckValue == nil {
return DeckUnknown, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this correct in older versions of flytekit? didn't tasks in the past also have this field? this means that this function will always return Disabled right for older versions of flytekit. meaning the condition on line 567 won't get triggered cuz it'll be disabled instead of unknown.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yes, you are right, thinking solution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

older versions of flytekit will return DeckUnknown, which ends up calling AddDeckURIIfDeckExists (which is the old code path where we would hit the blob store to check for the deck file before adding it to the events).

}

if deckValue.GetValue() {
return DeckEnabled, nil
}

Check warning on line 456 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L454-L456

Added lines #L454 - L456 were not covered by tests

return DeckDisabled, nil

Check warning on line 458 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L458

Added line #L458 was not covered by tests
}

func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *taskExecutionContext, ts handler.TaskNodeState) (*pluginRequestedTransition, error) {
pluginTrns := &pluginRequestedTransition{}

Expand Down Expand Up @@ -464,8 +542,30 @@
}
}

// Regardless of the observed phase, we always add the DeckUri to support real-time deck functionality.
// The deck should be accessible even if the task is still running or has failed.
deckStatus, err := GetDeckStatus(ctx, tCtx)
if err != nil {
return nil, err
}

Check warning on line 550 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L549-L550

Added lines #L549 - L550 were not covered by tests

if deckStatus == DeckEnabled {
pluginTrns.AddDeckURI(tCtx)
}

Check warning on line 554 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L553-L554

Added lines #L553 - L554 were not covered by tests

defer func() {
if (deckStatus == DeckUnknown || deckStatus == DeckEnabled) && pluginTrns.pInfo.Phase().IsTerminal() {
if err := pluginTrns.RemoveDeckURIIfDeckNotExists(ctx, tCtx); err != nil {
logger.Errorf(ctx, "Failed to remove deck URI if deck does not exist. Error: %v", err)
}

Check warning on line 560 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L559-L560

Added lines #L559 - L560 were not covered by tests
}
}()
Comment on lines +557 to +562
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider proper error handling for RemoveDeckURIIfDeckNotExists

Consider checking for errors from RemoveDeckURIIfDeckNotExists before proceeding with the task completion. The current implementation only logs the error but continues execution which could lead to inconsistent state.

Code suggestion
Check the AI-generated fix before applying
Suggested change
if (deckStatus == DeckUnknown || deckStatus == DeckEnabled) && pluginTrns.pInfo.Phase().IsTerminal() {
if err := pluginTrns.RemoveDeckURIIfDeckNotExists(ctx, tCtx); err != nil {
logger.Errorf(ctx, "Failed to remove deck URI if deck does not exist. Error: %v", err)
}
}
}()
if (deckStatus == DeckUnknown || deckStatus == DeckEnabled) && pluginTrns.pInfo.Phase().IsTerminal() {
if err := pluginTrns.RemoveDeckURIIfDeckNotExists(ctx, tCtx); err != nil {
logger.Errorf(ctx, "Failed to remove deck URI if deck does not exist. Error: %v", err)
// Return error to allow proper handling at higher levels
return pluginTrns, err
}
}
}
}()

Code Review Run #dc455e


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines +556 to +562
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider extracting deck cleanup logic

Consider moving the deck URI cleanup logic to a separate function for better code organization and reusability. The deferred function could be simplified by extracting the logic into a named function.

Code suggestion
Check the AI-generated fix before applying
Suggested change
defer func() {
if (deckStatus == DeckUnknown || deckStatus == DeckEnabled) && pluginTrns.pInfo.Phase().IsTerminal() {
if err := pluginTrns.RemoveDeckURIIfDeckNotExists(ctx, tCtx); err != nil {
logger.Errorf(ctx, "Failed to remove deck URI if deck does not exist. Error: %v", err)
}
}
}()
defer cleanupDeckURI(ctx, tCtx, deckStatus, pluginTrns)
func cleanupDeckURI(ctx context.Context, tCtx *taskExecutionContext, deckStatus DeckStatus, pluginTrns *pluginRequestedTransition) {
if (deckStatus == DeckUnknown || deckStatus == DeckEnabled) && pluginTrns.pInfo.Phase().IsTerminal() {
if err := pluginTrns.RemoveDeckURIIfDeckNotExists(ctx, tCtx); err != nil {
logger.Errorf(ctx, "Failed to remove deck URI if deck does not exist. Error: %v", err)
}
}
}

Code Review Run #dc455e


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged


switch pluginTrns.pInfo.Phase() {
case pluginCore.PhaseSuccess:
if deckStatus == DeckUnknown {
pluginTrns.AddDeckURI(tCtx)
}
// -------------------------------------
// TODO: @kumare create Issue# Remove the code after we use closures to handle dynamic nodes
// This code only exists to support Dynamic tasks. Eventually dynamic tasks will use closure nodes to execute
Expand Down Expand Up @@ -501,18 +601,7 @@
CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(),
})
} else {
var deckURI *storage.DataReference
if tCtx.ow.GetReader() != nil {
exists, err := tCtx.ow.GetReader().DeckExists(ctx)
if err != nil {
logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err)
return pluginTrns, regErrors.Wrapf(err, "failed to check existence of deck file")
} else if exists {
deckURIValue := tCtx.ow.GetDeckPath()
deckURI = &deckURIValue
}
}
pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), deckURI,
pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider impact of method signature change

The ObserveSuccess method signature has been modified to remove the deckURI parameter, but the deck URI is now being added through separate methods. Consider if this change maintains backward compatibility with existing code.

Code suggestion
Check the AI-generated fix before applying
Suggested change
pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(),
// TODO: Deprecated - deckURI parameter will be removed in future versions
pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), nil,

Code Review Run #ecb1b4


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

&event.TaskNodeMetadata{
CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(),
})
Expand Down
Loading