Skip to content


Address PR feedback
Browse files Browse the repository at this point in the history
Signed-off-by: zazulam <>
Co-authored-by: droctothorpe <>
  • Loading branch information
zazulam and droctothorpe committed Oct 7, 2024
1 parent 40ccd8e commit b294d8c
Show file tree
Hide file tree
Showing 9 changed files with 305 additions and 120 deletions.
11 changes: 11 additions & 0 deletions backend/src/v2/component/launcher_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,17 @@ func (l *LauncherV2) Execute(ctx context.Context) (err error) {
glog.Infof("publish success.")
// At the end of the current task, we check the statuses of all tasks in the current DAG and update the DAG's status accordingly.
// TODO: If there's a pipeline whose only components are DAGs, this launcher logic will never run and as a result the dag status will never be updated. We need to implement a mechanism to handle this edge case.
dag, err := l.metadataClient.GetDAG(ctx, execution.GetExecution().CustomProperties["parent_dag_id"].GetIntValue())
if err != nil {
glog.Errorf("DAG Status Update: failed to get DAG: %s", err.Error())
pipeline, _ := l.metadataClient.GetPipelineFromExecution(ctx, execution.GetID())
err = l.metadataClient.UpdateDAGExecutionsState(ctx, dag, pipeline)
if err != nil {
glog.Errorf("failed to update DAG state: %s", err.Error())
executedStartedTime := time.Now().Unix()
execution, err = l.prePublish(ctx)
Expand Down
194 changes: 110 additions & 84 deletions backend/src/v2/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func Container(ctx context.Context, opts Options, mlmd *metadata.Client, cacheCl
return execution, err
if opts.KubernetesExecutorConfig != nil {
dagTasks, err := mlmd.GetExecutionsInDAG(ctx, dag, pipeline)
dagTasks, err := mlmd.GetExecutionsInDAG(ctx, dag, pipeline, true)
if err != nil {
return execution, err
Expand Down Expand Up @@ -758,29 +758,12 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E
ecfg.NotTriggered = !execution.WillTrigger()

// Handle writing output parameters to MLMD.
outputParameters := opts.Component.GetDag().GetOutputs().GetParameters()
glog.V(4).Info("outputParameters: ", outputParameters)
ecfg.OutputParameters = make(map[string]*structpb.Value)
for name, value := range outputParameters {
outputParameterKey := value.GetValueFromParameter().GetOutputParameterKey()
producerSubTask := value.GetValueFromParameter().GetProducerSubtask()
glog.V(4).Info("outputParameterKey: ", outputParameterKey)
glog.V(4).Info("producerSubtask: ", producerSubTask)

outputParameterMap := map[string]interface{}{
"output_parameter_key": outputParameterKey,
"producer_subtask": producerSubTask,

outputParameterStruct, _ := structpb.NewValue(outputParameterMap)

ecfg.OutputParameters[name] = outputParameterStruct
ecfg.OutputParameters = opts.Component.GetDag().GetOutputs().GetParameters()
glog.V(4).Info("outputParameters: ", ecfg.OutputParameters)

// Handle writing output artifacts to MLMD.
outputArtifacts := opts.Component.GetDag().GetOutputs().GetArtifacts()
glog.V(4).Info("outputArtifacts: ", outputArtifacts)
ecfg.OutputArtifacts = outputArtifacts
ecfg.OutputArtifacts = opts.Component.GetDag().GetOutputs().GetArtifacts()
glog.V(4).Info("outputArtifacts: ", ecfg.OutputArtifacts)

if opts.Task.GetArtifactIterator() != nil {
return execution, fmt.Errorf("ArtifactIterator is not implemented")
Expand Down Expand Up @@ -1256,55 +1239,136 @@ type resolveUpstreamParametersConfig struct {
func resolveUpstreamParameters(cfg resolveUpstreamParametersConfig) error {
taskOutput := cfg.paramSpec.GetTaskOutputParameter()
glog.V(4).Info("taskOutput: ", taskOutput)
if taskOutput.GetProducerTask() == "" {
return cfg.paramError(fmt.Errorf("producer task is empty"))
producerTaskName := taskOutput.GetProducerTask()
if producerTaskName == "" {
return cfg.paramError(fmt.Errorf("producerTaskName is empty"))
if taskOutput.GetOutputParameterKey() == "" {
outputParameterKey := taskOutput.GetOutputParameterKey()
if outputParameterKey == "" {
return cfg.paramError(fmt.Errorf("output parameter key is empty"))
tasks, err := getDAGTasks(cfg.ctx, cfg.dag, cfg.pipeline, cfg.mlmd, nil)
tasks, err := cfg.mlmd.GetExecutionsInDAG(cfg.ctx, cfg.dag, cfg.pipeline, false)
if err != nil {
return cfg.paramError(err)

glog.V(4).Infof("tasks: %#v", tasks)
// The producer is the task that produces the output that we need to
// consume.
producer := tasks[taskOutput.GetProducerTask()]
outputParameterKey := taskOutput.GetOutputParameterKey()
producer, ok := tasks[producerTaskName]
if !ok {
return cfg.paramError(fmt.Errorf("producer task, %v, not in tasks", producerTaskName))
glog.V(4).Info("producer: ", producer)
currentTask := producer
currentSubTaskMaybeDAG := true
// Continue looping until we reach a sub-task that is NOT a DAG.
for currentSubTaskMaybeDAG {
glog.V(4).Info("currentTask: ", currentTask.TaskName())
_, outputParametersCustomProperty, err := currentTask.GetParameters()
if err != nil {
return err
// If the current task is a DAG:
if *currentTask.GetExecution().Type == "system.DAGExecution" {
// Since currentTask is a DAG, we need to deserialize its
// output parameter map so that we can look its
// output parameter map so that we can look up its
// corresponding producer sub-task, reassign currentTask,
// and iterate through this loop again.
var outputParametersMap map[string]string
b, err := outputParametersCustomProperty[outputParameterKey].GetStructValue().MarshalJSON()
if err != nil {
return err
outputParametersCustomProperty, ok := currentTask.GetExecution().GetCustomProperties()["parameter_producer_task"]
if !ok {
return cfg.paramError(fmt.Errorf("Task, %v, does not have a parameter_producer_task custom property", currentTask.TaskName()))
glog.V(4).Infof("outputParametersCustomProperty: %#v", outputParametersCustomProperty)

dagOutputParametersMap := make(map[string]*pipelinespec.DagOutputsSpec_DagOutputParameterSpec)
glog.V(4).Infof("outputParametersCustomProperty: %v", outputParametersCustomProperty.GetStructValue())

for name, value := range outputParametersCustomProperty.GetStructValue().GetFields() {
outputSpec := &pipelinespec.DagOutputsSpec_DagOutputParameterSpec{}
err := protojson.Unmarshal([]byte(value.GetStringValue()), outputSpec)
if err != nil {
return err
dagOutputParametersMap[name] = outputSpec

glog.V(4).Infof("Deserialized dagOutputParametersMap: %v", dagOutputParametersMap)

// Support for the 2 DagOutputParameterSpec types:
// ValueFromParameter & ValueFromOneof
var subTaskName string
switch dagOutputParametersMap[outputParameterKey].Kind.(type) {
case *pipelinespec.DagOutputsSpec_DagOutputParameterSpec_ValueFromParameter:
subTaskName = dagOutputParametersMap[outputParameterKey].GetValueFromParameter().GetProducerSubtask()
outputParameterKey = dagOutputParametersMap[outputParameterKey].GetValueFromParameter().GetOutputParameterKey()
case *pipelinespec.DagOutputsSpec_DagOutputParameterSpec_ValueFromOneof:
// When OneOf is specified in a pipeline, the output of only 1 task is consumed even though there may be more than 1 task output set. In this case we will attempt to grab the first successful task output.
paramSelectors := dagOutputParametersMap[outputParameterKey].GetValueFromOneof().GetParameterSelectors()
glog.V(4).Infof("paramSelectors: %v", paramSelectors)
// Since we have the tasks map, we can iterate through the parameterSelectors if the ProducerSubTask is not present in the task map and then assign the new OutputParameterKey only if it exists.
successfulOneOfTask := false
for !successfulOneOfTask {
for _, paramSelector := range paramSelectors {
subTaskName = paramSelector.GetProducerSubtask()
glog.V(4).Infof("subTaskName from paramSelector: %v", subTaskName)
glog.V(4).Infof("outputParameterKey from paramSelector: %v", paramSelector.GetOutputParameterKey())
if subTask, ok := tasks[subTaskName]; ok {
subTaskState := subTask.GetExecution().LastKnownState.String()
glog.V(4).Infof("subTask: %w , subTaskState: %v", subTaskName, subTaskState)
if subTaskState == "CACHED" || subTaskState == "COMPLETE" {

outputParameterKey = paramSelector.GetOutputParameterKey()
successfulOneOfTask = true
return cfg.paramError(fmt.Errorf("Processing OneOf: No successful task found"))
// if reflect.TypeOf(dagOutputParametersMap[outputParameterKey].Kind).String() == "*pipelinespec.DagOutputsSpec_DagOutputParameterSpec_ValueFromParameter" {

// } else {
// // Type of dagOutputParametersMap[outputParameterKey].Kind is *pipelinespec.DagOutputsSpec_DagOutputParameterSpec_ValueFromOneof
// paramSelectors := dagOutputParametersMap[outputParameterKey].GetValueFromOneof().GetParameterSelectors()
// glog.V(4).Infof("paramSelectors: %v", paramSelectors)
// // Since we have the tasks map, we can iterate through the parameterSelectors if the ProducerSubTask is not present in the task map and then assign the new OutputParameterKey only if it exists.
// successfulOneOfTask := false
// for !successfulOneOfTask {
// for _, paramSelector := range paramSelectors {
// subTaskName = paramSelector.GetProducerSubtask()
// glog.V(4).Infof("subTaskName from paramSelector: %v", subTaskName)
// glog.V(4).Infof("outputParameterKey from paramSelector: %v", paramSelector.GetOutputParameterKey())
// if subTask, ok := tasks[subTaskName]; ok {
// subTaskState := subTask.GetExecution().LastKnownState.String()
// glog.V(4).Infof("subTask: %w , subTaskState: %v", subTaskName, subTaskState)
// if subTaskState == "CACHED" || subTaskState == "COMPLETE" {

// outputParameterKey = paramSelector.GetOutputParameterKey()
// successfulOneOfTask = true
// break
// }
// }
// }
// return cfg.paramError(fmt.Errorf("Processing OneOf: No successful task found"))
// }
// }
glog.V(4).Infof("SubTaskName from outputParams: %v", subTaskName)
glog.V(4).Infof("OutputParameterKey from outputParams: %v", outputParameterKey)
if subTaskName == "" {
return cfg.paramError(fmt.Errorf("producer_subtask not in outputParams"))
json.Unmarshal(b, &outputParametersMap)
glog.V(4).Info("Deserialized outputParametersMap: ", outputParametersMap)
subTaskName := outputParametersMap["producer_subtask"]
outputParameterKey = outputParametersMap["output_parameter_key"]
"Overriding currentTask, %v, output with currentTask's producer_subtask, %v, output.",
currentTask, ok = tasks[subTaskName]
if !ok {
return cfg.paramError(fmt.Errorf("subTaskName, %v, not in tasks", subTaskName))

// Reassign sub-task before running through the loop again.
currentTask = tasks[subTaskName]
} else {
_, outputParametersCustomProperty, err := currentTask.GetParameters()
if err != nil {
return err
cfg.inputs.ParameterValues[] = outputParametersCustomProperty[outputParameterKey]
// Exit the loop.
currentSubTaskMaybeDAG = false
Expand Down Expand Up @@ -1340,7 +1404,7 @@ func resolveUpstreamArtifacts(cfg resolveUpstreamArtifactsConfig) error {
if taskOutput.GetOutputArtifactKey() == "" {
cfg.artifactError(fmt.Errorf("output artifact key is empty"))
tasks, err := getDAGTasks(cfg.ctx, cfg.dag, cfg.pipeline, cfg.mlmd, nil)
tasks, err := cfg.mlmd.GetExecutionsInDAG(cfg.ctx, cfg.dag, cfg.pipeline, false)
if err != nil {
Expand All @@ -1361,7 +1425,7 @@ func resolveUpstreamArtifacts(cfg resolveUpstreamArtifactsConfig) error {
// If the current task is a DAG:
if *currentTask.GetExecution().Type == "system.DAGExecution" {
// Get the sub-task.
outputArtifactsCustomProperty := currentTask.GetExecution().GetCustomProperties()["output_artifacts"]
outputArtifactsCustomProperty := currentTask.GetExecution().GetCustomProperties()["artifact_producer_task"]
// Deserialize the output artifacts.
var outputArtifacts map[string]*pipelinespec.DagOutputsSpec_DagOutputArtifactSpec
err := json.Unmarshal([]byte(outputArtifactsCustomProperty.GetStringValue()), &outputArtifacts)
Expand Down Expand Up @@ -1416,44 +1480,6 @@ func resolveUpstreamArtifacts(cfg resolveUpstreamArtifactsConfig) error {
return nil

// getDAGTasks gets all the tasks associated with the specified DAG and all of
// its subDAGs.
func getDAGTasks(
ctx context.Context,
dag *metadata.DAG,
pipeline *metadata.Pipeline,
mlmd *metadata.Client,
flattenedTasks map[string]*metadata.Execution,
) (map[string]*metadata.Execution, error) {
if flattenedTasks == nil {
flattenedTasks = make(map[string]*metadata.Execution)
currentExecutionTasks, err := mlmd.GetExecutionsInDAG(ctx, dag, pipeline)
if err != nil {
return nil, err
for k, v := range currentExecutionTasks {
flattenedTasks[k] = v
for _, v := range currentExecutionTasks {
if v.GetExecution().GetType() == "system.DAGExecution" {
glog.V(4).Infof("Found a task, %v, with an execution type of system.DAGExecution. Adding its tasks to the task list.", v.TaskName())
subDAG, err := mlmd.GetDAG(ctx, v.GetExecution().GetId())
if err != nil {
return nil, err
// Pass the subDAG into a recursive call to getDAGTasks and update
// tasks to include the subDAG's tasks.
flattenedTasks, err = getDAGTasks(ctx, subDAG, pipeline, mlmd, flattenedTasks)
if err != nil {
return nil, err

return flattenedTasks, nil

func provisionOutputs(pipelineRoot, taskName string, outputsSpec *pipelinespec.ComponentOutputsSpec, outputUriSalt string) *pipelinespec.ExecutorInput_Outputs {
outputs := &pipelinespec.ExecutorInput_Outputs{
Artifacts: make(map[string]*pipelinespec.ArtifactList),
Expand Down

0 comments on commit b294d8c

Please sign in to comment.