Skip to content

Commit d21fca6

Browse files
authored
fix(backend): Synced ScheduledWorkflow CRs on apiserver startup (#11469)
Signed-off-by: Helber Belmiro <helber.belmiro@gmail.com>
1 parent 2686e01 commit d21fca6

File tree

6 files changed

+145
-7
lines changed

6 files changed

+145
-7
lines changed

backend/src/apiserver/client/scheduled_workflow_fake.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@ func (c *FakeScheduledWorkflowClient) Get(ctx context.Context, name string, opti
6666
return nil, k8errors.NewNotFound(k8schema.ParseGroupResource("scheduledworkflows.kubeflow.org"), name)
6767
}
6868

69-
func (c *FakeScheduledWorkflowClient) Update(context.Context, *v1beta1.ScheduledWorkflow) (*v1beta1.ScheduledWorkflow, error) {
70-
glog.Error("This fake method is not yet implemented.")
71-
return nil, nil
69+
func (c *FakeScheduledWorkflowClient) Update(_ context.Context, scheduledWorkflow *v1beta1.ScheduledWorkflow) (*v1beta1.ScheduledWorkflow, error) {
70+
c.scheduledWorkflows[scheduledWorkflow.Name] = scheduledWorkflow
71+
return scheduledWorkflow, nil
7272
}
7373

7474
func (c *FakeScheduledWorkflowClient) DeleteCollection(ctx context.Context, options *v1.DeleteOptions, listOptions v1.ListOptions) error {

backend/src/apiserver/list/list.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"encoding/base64"
2323
"encoding/json"
2424
"fmt"
25+
"math"
2526
"reflect"
2627
"strings"
2728

@@ -97,6 +98,13 @@ type Options struct {
9798
*token
9899
}
99100

101+
func EmptyOptions() *Options {
102+
return &Options{
103+
math.MaxInt32,
104+
&token{},
105+
}
106+
}
107+
100108
// Matches returns trues if the sorting and filtering criteria in o matches that
101109
// of the one supplied in opts.
102110
func (o *Options) Matches(opts *Options) bool {
@@ -213,9 +221,14 @@ func (o *Options) AddSortingToSelect(sqlBuilder sq.SelectBuilder) sq.SelectBuild
213221
if o.IsDesc {
214222
order = "DESC"
215223
}
216-
sqlBuilder = sqlBuilder.
217-
OrderBy(fmt.Sprintf("%v %v", o.SortByFieldPrefix+o.SortByFieldName, order)).
218-
OrderBy(fmt.Sprintf("%v %v", o.KeyFieldPrefix+o.KeyFieldName, order))
224+
225+
if o.SortByFieldName != "" {
226+
sqlBuilder = sqlBuilder.OrderBy(fmt.Sprintf("%v %v", o.SortByFieldPrefix+o.SortByFieldName, order))
227+
}
228+
229+
if o.KeyFieldName != "" {
230+
sqlBuilder = sqlBuilder.OrderBy(fmt.Sprintf("%v %v", o.KeyFieldPrefix+o.KeyFieldName, order))
231+
}
219232

220233
return sqlBuilder
221234
}

backend/src/apiserver/list/list_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
package list
1616

1717
import (
18+
"fmt"
19+
"math"
1820
"reflect"
1921
"strings"
2022
"testing"
@@ -645,6 +647,11 @@ func TestAddPaginationAndFilterToSelect(t *testing.T) {
645647
wantSQL: "SELECT * FROM MyTable ORDER BY SortField DESC, KeyField DESC LIMIT 124",
646648
wantArgs: nil,
647649
},
650+
{
651+
in: EmptyOptions(),
652+
wantSQL: fmt.Sprintf("SELECT * FROM MyTable LIMIT %d", math.MaxInt32+1),
653+
wantArgs: nil,
654+
},
648655
{
649656
in: &Options{
650657
PageSize: 123,

backend/src/apiserver/main.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"os"
2929
"strconv"
3030
"strings"
31+
"sync"
3132
"time"
3233

3334
"github.com/fsnotify/fsnotify"
@@ -106,10 +107,25 @@ func main() {
106107
}
107108
log.SetLevel(level)
108109

110+
backgroundCtx, backgroundCancel := context.WithCancel(context.Background())
111+
defer backgroundCancel()
112+
wg := sync.WaitGroup{}
113+
wg.Add(1)
114+
go reconcileSwfCrs(resourceManager, backgroundCtx, &wg)
109115
go startRpcServer(resourceManager)
116+
// This is blocking
110117
startHttpProxy(resourceManager)
111-
118+
backgroundCancel()
112119
clientManager.Close()
120+
wg.Wait()
121+
}
122+
123+
func reconcileSwfCrs(resourceManager *resource.ResourceManager, ctx context.Context, wg *sync.WaitGroup) {
124+
defer wg.Done()
125+
err := resourceManager.ReconcileSwfCrs(ctx)
126+
if err != nil {
127+
log.Errorf("Could not reconcile the ScheduledWorkflow Kubernetes resources: %v", err)
128+
}
113129
}
114130

115131
// A custom http request header matcher to pass on the user identity

backend/src/apiserver/resource/resource_manager.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ import (
1818
"context"
1919
"encoding/json"
2020
"fmt"
21+
scheduledworkflow "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1"
2122
"io"
2223
"net"
24+
"reflect"
2325
"strconv"
2426

2527
"github.com/cenkalti/backoff"
@@ -567,6 +569,77 @@ func (r *ResourceManager) CreateRun(ctx context.Context, run *model.Run) (*model
567569
return newRun, nil
568570
}
569571

572+
// ReconcileSwfCrs reconciles the ScheduledWorkflow CRs based on existing jobs.
573+
func (r *ResourceManager) ReconcileSwfCrs(ctx context.Context) error {
574+
filterContext := &model.FilterContext{
575+
ReferenceKey: &model.ReferenceKey{Type: model.NamespaceResourceType, ID: common.GetPodNamespace()},
576+
}
577+
578+
opts := list.EmptyOptions()
579+
580+
jobs, _, _, err := r.jobStore.ListJobs(filterContext, opts)
581+
582+
if err != nil {
583+
return util.Wrap(err, "Failed to reconcile ScheduledWorkflow Kubernetes resources")
584+
}
585+
586+
for i := range jobs {
587+
select {
588+
case <-ctx.Done():
589+
return nil
590+
default:
591+
}
592+
593+
tmpl, _, err := r.fetchTemplateFromPipelineSpec(&jobs[i].PipelineSpec)
594+
if err != nil {
595+
return failedToReconcileSwfCrsError(err)
596+
}
597+
598+
newScheduledWorkflow, err := tmpl.ScheduledWorkflow(jobs[i])
599+
if err != nil {
600+
return failedToReconcileSwfCrsError(err)
601+
}
602+
603+
for {
604+
currentScheduledWorkflow, err := r.getScheduledWorkflowClient(jobs[i].Namespace).Get(ctx, jobs[i].K8SName, v1.GetOptions{})
605+
if err != nil {
606+
if util.IsNotFound(err) {
607+
break
608+
}
609+
return failedToReconcileSwfCrsError(err)
610+
}
611+
612+
if !reflect.DeepEqual(currentScheduledWorkflow.Spec, newScheduledWorkflow.Spec) {
613+
currentScheduledWorkflow.Spec = newScheduledWorkflow.Spec
614+
err = r.updateSwfCrSpec(ctx, jobs[i].Namespace, currentScheduledWorkflow)
615+
if err != nil {
616+
if apierrors.IsConflict(errors.Unwrap(err)) {
617+
continue
618+
} else if util.IsNotFound(errors.Cause(err)) {
619+
break
620+
}
621+
return failedToReconcileSwfCrsError(err)
622+
}
623+
}
624+
break
625+
}
626+
}
627+
628+
return nil
629+
}
630+
631+
func failedToReconcileSwfCrsError(err error) error {
632+
return util.Wrap(err, "Failed to reconcile ScheduledWorkflow Kubernetes resources")
633+
}
634+
635+
func (r *ResourceManager) updateSwfCrSpec(ctx context.Context, k8sNamespace string, scheduledWorkflow *scheduledworkflow.ScheduledWorkflow) error {
636+
_, err := r.getScheduledWorkflowClient(k8sNamespace).Update(ctx, scheduledWorkflow)
637+
if err != nil {
638+
return util.Wrap(err, "Failed to update ScheduledWorkflow")
639+
}
640+
return nil
641+
}
642+
570643
// Fetches a run with a given id.
571644
func (r *ResourceManager) GetRun(runId string) (*model.Run, error) {
572645
run, err := r.runStore.GetRun(runId)

backend/src/apiserver/resource/resource_manager_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3146,6 +3146,35 @@ func TestReportScheduledWorkflowResource_Success_withRuntimeParamsV2(t *testing.
31463146
assert.Equal(t, expectedJob.ToV1(), actualJob.ToV1())
31473147
}
31483148

3149+
func TestReconcileSwfCrs(t *testing.T) {
3150+
store, manager, job := initWithJobV2(t)
3151+
defer store.Close()
3152+
3153+
fetchedJob, err := manager.GetJob(job.UUID)
3154+
require.Nil(t, err)
3155+
require.NotNil(t, fetchedJob)
3156+
3157+
swfClient := store.SwfClient().ScheduledWorkflow("ns1")
3158+
3159+
options := v1.GetOptions{}
3160+
ctx := context.Background()
3161+
3162+
swf, err := swfClient.Get(ctx, "job-", options)
3163+
require.Nil(t, err)
3164+
3165+
// emulates an invalid/outdated spec
3166+
swf.Spec.Workflow.Spec = nil
3167+
swf, err = swfClient.Update(ctx, swf)
3168+
require.Nil(t, swf.Spec.Workflow.Spec)
3169+
3170+
err = manager.ReconcileSwfCrs(ctx)
3171+
require.Nil(t, err)
3172+
3173+
swf, err = swfClient.Get(ctx, "job-", options)
3174+
require.Nil(t, err)
3175+
require.NotNil(t, swf.Spec.Workflow.Spec)
3176+
}
3177+
31493178
func TestReportScheduledWorkflowResource_Error(t *testing.T) {
31503179
store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch())
31513180
defer store.Close()

0 commit comments

Comments
 (0)