Skip to content

Commit

Permalink
chore(deps): Update client-go and controller-runtime
Browse files Browse the repository at this point in the history
This updates k8s.io/client-go to v0.30.1 and
sigs.k8s.io/controller-runtime to v0.18.6. Updating any higher requires
updating github.com/argoproj/argo-workflows/v3 which requires updating
to Go 1.23 which isn't widely available in base container images yet.

Some other dependencies were updated for dependency resolution to
succeed.

Signed-off-by: mprahl <mprahl@users.noreply.github.com>
  • Loading branch information
mprahl committed Jan 2, 2025
1 parent 2ebb853 commit a1a3dec
Show file tree
Hide file tree
Showing 31 changed files with 247 additions and 2,788 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ jobs:
run-go-unittests:
runs-on: ubuntu-latest
steps:
- name: Install Go
uses: actions/setup-go@v4
with:
go-version: 1.21.x
- name: Checkout code
uses: actions/checkout@v4
- name: Install Go
uses: actions/setup-go@v5
with:
go-version-file: go.mod
- name: "run go unit tests"
run: go test -v -cover ./backend/...
backend-integration:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/presubmit-backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ jobs:
uses: actions/checkout@v2

- name: Set up Go
uses: actions/setup-go@v3
uses: actions/setup-go@v5
with:
go-version: '1.21'
go-version-file: go.mod

- name: Run Backend Tests
run: ./test/presubmit-backend-test.sh
2 changes: 1 addition & 1 deletion backend/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

# 1. Build api server application
FROM golang:1.21.7-bookworm as builder
FROM golang:1.22.10-bookworm as builder
RUN apt-get update && apt-get install -y cmake clang musl-dev openssl
WORKDIR /go/src/github.com/kubeflow/pipelines

Expand Down
2 changes: 1 addition & 1 deletion backend/Dockerfile.cacheserver
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

# Dockerfile for building the source code of cache_server
FROM golang:1.21.7-alpine3.19 as builder
FROM golang:1.22.10-alpine3.21 as builder

RUN apk update && apk upgrade && \
apk add --no-cache bash git openssh gcc musl-dev
Expand Down
2 changes: 1 addition & 1 deletion backend/Dockerfile.conformance
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

# Dockerfile for building the source code of conformance tests
FROM golang:1.21.7-alpine3.19 as builder
FROM golang:1.22.10-alpine3.21 as builder

RUN apk update && apk upgrade && \
apk add --no-cache bash git openssh gcc musl-dev
Expand Down
2 changes: 1 addition & 1 deletion backend/Dockerfile.driver
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM golang:1.21.7-alpine3.19 as builder
FROM golang:1.22.10-alpine3.21 as builder

WORKDIR /go/src/github.com/kubeflow/pipelines

Expand Down
2 changes: 1 addition & 1 deletion backend/Dockerfile.launcher
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM golang:1.21.7-alpine3.19 as builder
FROM golang:1.22.10-alpine3.21 as builder

WORKDIR /go/src/github.com/kubeflow/pipelines

Expand Down
2 changes: 1 addition & 1 deletion backend/Dockerfile.persistenceagent
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM golang:1.21.7-alpine3.19 as builder
FROM golang:1.22.10-alpine3.21 as builder

WORKDIR /go/src/github.com/kubeflow/pipelines

Expand Down
2 changes: 1 addition & 1 deletion backend/Dockerfile.scheduledworkflow
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM golang:1.21.7-alpine3.19 as builder
FROM golang:1.22.10-alpine3.21 as builder

WORKDIR /go/src/github.com/kubeflow/pipelines

Expand Down
2 changes: 1 addition & 1 deletion backend/Dockerfile.viewercontroller
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM golang:1.21.7-alpine3.19 as builder
FROM golang:1.22.10-alpine3.21 as builder

RUN apk update && apk upgrade
RUN apk add --no-cache git gcc musl-dev
Expand Down
2 changes: 1 addition & 1 deletion backend/api/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

# Generate client code (go & json) from API protocol buffers
FROM golang:1.21 as generator
FROM golang:1.22 as generator
ENV GRPC_GATEWAY_VERSION v1.9.6
ENV GO_SWAGGER_VERSION v0.18.0
ENV GOLANG_PROTOBUF_VERSION v1.5.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ func NewScheduledWorkflowClient(informer v1beta1.ScheduledWorkflowInformer) *Sch
}

// AddEventHandler adds an event handler.
func (c *ScheduledWorkflowClient) AddEventHandler(funcs *cache.ResourceEventHandlerFuncs) {
c.informer.Informer().AddEventHandler(funcs)
func (c *ScheduledWorkflowClient) AddEventHandler(funcs *cache.ResourceEventHandlerFuncs) (cache.ResourceEventHandlerRegistration, error) {
return c.informer.Informer().AddEventHandler(funcs)
}

// HasSynced returns true if the shared informer's store has synced.
Expand Down
4 changes: 2 additions & 2 deletions backend/src/agent/persistence/client/workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func NewWorkflowClient(informer util.ExecutionInformer) *WorkflowClient {
}

// AddEventHandler adds an event handler.
func (c *WorkflowClient) AddEventHandler(funcs *cache.ResourceEventHandlerFuncs) {
c.informer.AddEventHandler(funcs)
func (c *WorkflowClient) AddEventHandler(funcs *cache.ResourceEventHandlerFuncs) (cache.ResourceEventHandlerRegistration, error) {
return c.informer.AddEventHandler(funcs)
}

// HasSynced returns true if the shared informer's store has synced.
Expand Down
5 changes: 4 additions & 1 deletion backend/src/agent/persistence/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,14 @@ func main() {
log.Fatalf("Error creating ML pipeline API Server client: %v", err)
}

controller := NewPersistenceAgent(
controller, err := NewPersistenceAgent(
swfInformerFactory,
execInformer,
pipelineClient,
util.NewRealTime())
if err != nil {
log.Fatalf("Failed to instantiate the controller: %v", err)
}

go swfInformerFactory.Start(stopCh)
go execInformer.InformerFactoryStart(stopCh)
Expand Down
15 changes: 11 additions & 4 deletions backend/src/agent/persistence/persistence_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ func NewPersistenceAgent(
swfInformerFactory swfinformers.SharedInformerFactory,
execInformer util.ExecutionInformer,
pipelineClient *client.PipelineClient,
time util.TimeInterface) *PersistenceAgent {
time util.TimeInterface,
) (*PersistenceAgent, error) {
// obtain references to shared informers
swfInformer := swfInformerFactory.Scheduledworkflow().V1beta1().ScheduledWorkflows()

Expand All @@ -57,12 +58,18 @@ func NewPersistenceAgent(
swfClient := client.NewScheduledWorkflowClient(swfInformer)
workflowClient := client.NewWorkflowClient(execInformer)

swfWorker := worker.NewPersistenceWorker(time, swfregister.Kind, swfInformer.Informer(), true,
swfWorker, err := worker.NewPersistenceWorker(time, swfregister.Kind, swfInformer.Informer(), true,
worker.NewScheduledWorkflowSaver(swfClient, pipelineClient))
if err != nil {
return nil, err
}

workflowWorker := worker.NewPersistenceWorker(time, workflowregister.WorkflowKind,
workflowWorker, err := worker.NewPersistenceWorker(time, workflowregister.WorkflowKind,
execInformer, true,
worker.NewWorkflowSaver(workflowClient, pipelineClient, ttlSecondsAfterWorkflowFinish))
if err != nil {
return nil, err
}

agent := &PersistenceAgent{
swfClient: swfClient,
Expand All @@ -73,7 +80,7 @@ func NewPersistenceAgent(

log.Info("Setting up event handlers")

return agent
return agent, nil
}

// Run will set up the event handlers for types we are interested in, as well
Expand Down
10 changes: 7 additions & 3 deletions backend/src/agent/persistence/worker/persistence_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ func NewPersistenceWorker(
name string,
eventHandler util.ExecutionInformerEventHandler,
enforceRequeueDelays bool,
saver Saver) *PersistenceWorker {
saver Saver,
) (*PersistenceWorker, error) {
worker := &PersistenceWorker{
workqueue: workqueue.NewNamedRateLimitingQueue(
workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), name),
Expand All @@ -74,15 +75,18 @@ func NewPersistenceWorker(
log.Info("Setting up event handlers")

// Set up an event handler for when the Scheduled Workflow changes
eventHandler.AddEventHandler(&cache.ResourceEventHandlerFuncs{
_, err := eventHandler.AddEventHandler(&cache.ResourceEventHandlerFuncs{
AddFunc: worker.enqueue,
UpdateFunc: func(old, new interface{}) {
worker.enqueue(new)
},
DeleteFunc: worker.enqueueForDelete,
})
if err != nil {
return nil, err
}

return worker
return worker, nil
}

func (p *PersistenceWorker) Shutdown() {
Expand Down
16 changes: 11 additions & 5 deletions backend/src/agent/persistence/worker/persistence_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
client "github.com/kubeflow/pipelines/backend/src/agent/persistence/client"
"github.com/kubeflow/pipelines/backend/src/common/util"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -68,12 +69,13 @@ func TestPersistenceWorker_Success(t *testing.T) {
// Set up peristence worker
saver := NewWorkflowSaver(workflowClient, pipelineClient, 100)
eventHandler := NewFakeEventHandler()
worker := NewPersistenceWorker(
worker, err := NewPersistenceWorker(
util.NewFakeTimeForEpoch(),
"PERSISTENCE_WORKER",
eventHandler,
false,
saver)
require.NoError(t, err)

// Test
eventHandler.handler.OnAdd(workflow, true)
Expand All @@ -98,12 +100,13 @@ func TestPersistenceWorker_NotFoundError(t *testing.T) {
// Set up peristence worker
saver := NewWorkflowSaver(workflowClient, pipelineClient, 100)
eventHandler := NewFakeEventHandler()
worker := NewPersistenceWorker(
worker, err := NewPersistenceWorker(
util.NewFakeTimeForEpoch(),
"PERSISTENCE_WORKER",
eventHandler,
false,
saver)
require.NoError(t, err)

// Test
eventHandler.handler.OnAdd(workflow, true)
Expand All @@ -129,12 +132,13 @@ func TestPersistenceWorker_GetWorklowError(t *testing.T) {
// Set up peristence worker
saver := NewWorkflowSaver(workflowClient, pipelineClient, 100)
eventHandler := NewFakeEventHandler()
worker := NewPersistenceWorker(
worker, err := NewPersistenceWorker(
util.NewFakeTimeForEpoch(),
"PERSISTENCE_WORKER",
eventHandler,
false,
saver)
require.NoError(t, err)

// Test
eventHandler.handler.OnAdd(workflow, true)
Expand Down Expand Up @@ -163,12 +167,13 @@ func TestPersistenceWorker_ReportWorkflowRetryableError(t *testing.T) {
// Set up peristence worker
saver := NewWorkflowSaver(workflowClient, pipelineClient, 100)
eventHandler := NewFakeEventHandler()
worker := NewPersistenceWorker(
worker, err := NewPersistenceWorker(
util.NewFakeTimeForEpoch(),
"PERSISTENCE_WORKER",
eventHandler,
false,
saver)
require.NoError(t, err)

// Test
eventHandler.handler.OnAdd(workflow, true)
Expand Down Expand Up @@ -196,12 +201,13 @@ func TestPersistenceWorker_ReportWorkflowNonRetryableError(t *testing.T) {
// Set up peristence worker
saver := NewWorkflowSaver(workflowClient, pipelineClient, 100)
eventHandler := NewFakeEventHandler()
worker := NewPersistenceWorker(
worker, err := NewPersistenceWorker(
util.NewFakeTimeForEpoch(),
"PERSISTENCE_WORKER",
eventHandler,
false,
saver)
require.NoError(t, err)

// Test
eventHandler.handler.OnAdd(workflow, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func NewScheduledWorkflowClient(clientSet swfclientset.Interface,
}

// AddEventHandler adds an event handler.
func (p *ScheduledWorkflowClient) AddEventHandler(funcs *cache.ResourceEventHandlerFuncs) {
p.informer.Informer().AddEventHandler(funcs)
func (p *ScheduledWorkflowClient) AddEventHandler(funcs *cache.ResourceEventHandlerFuncs) (cache.ResourceEventHandlerRegistration, error) {
return p.informer.Informer().AddEventHandler(funcs)
}

// HasSynced returns true if the shared informer's store has synced.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func NewWorkflowClient(clientSet commonutil.ExecutionClient,
}

// AddEventHandler adds an event handler.
func (p *WorkflowClient) AddEventHandler(funcs *cache.ResourceEventHandlerFuncs) {
p.informer.AddEventHandler(funcs)
func (p *WorkflowClient) AddEventHandler(funcs *cache.ResourceEventHandlerFuncs) (cache.ResourceEventHandlerRegistration, error) {
return p.informer.AddEventHandler(funcs)
}

// HasSynced returns true if the shared informer's store has synced.
Expand Down
16 changes: 11 additions & 5 deletions backend/src/crd/controller/scheduledworkflow/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ func NewController(
swfInformerFactory swfinformers.SharedInformerFactory,
executionInformer commonutil.ExecutionInformer,
time commonutil.TimeInterface,
location *time.Location) *Controller {

location *time.Location,
) (*Controller, error) {
// obtain references to shared informers
swfInformer := swfInformerFactory.Scheduledworkflow().V1beta1().ScheduledWorkflows()

Expand Down Expand Up @@ -112,21 +112,24 @@ func NewController(
log.Info("Setting up event handlers")

// Set up an event handler for when the Scheduled Workflow changes
controller.swfClient.AddEventHandler(&cache.ResourceEventHandlerFuncs{
_, err := controller.swfClient.AddEventHandler(&cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueScheduledWorkflow,
UpdateFunc: func(old, new interface{}) {
controller.enqueueScheduledWorkflow(new)
},
DeleteFunc: controller.enqueueScheduledWorkflowForDelete,
})
if err != nil {
return nil, err
}

// Set up an event handler for when WorkflowHistory resources change. This
// handler will lookup the owner of the given WorkflowHistory, and if it is
// owned by a ScheduledWorkflow, it will enqueue that ScheduledWorkflow for
// processing. This way, we don't need to implement custom logic for
// handling WorkflowHistory resources. More info on this pattern:
// https://github.com/kubernetes/community/blob/8cafef897a22026d42f5e5bb3f104febe7e29830/contributors/devel/controllers.md
controller.workflowClient.AddEventHandler(&cache.ResourceEventHandlerFuncs{
_, err = controller.workflowClient.AddEventHandler(&cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleWorkflow,
UpdateFunc: func(old, new interface{}) {
newWorkflow := new.(*workflowapi.Workflow)
Expand All @@ -140,8 +143,11 @@ func NewController(
},
DeleteFunc: controller.handleWorkflow,
})
if err != nil {
return nil, err
}

return controller
return controller, nil
}

// Run will set up the event handlers for types we are interested in, as well
Expand Down
5 changes: 4 additions & 1 deletion backend/src/crd/controller/scheduledworkflow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,17 @@ func main() {
scheduleInformerFactory = swfinformers.NewFilteredSharedInformerFactory(scheduleClient, time.Second*30, namespace, nil)
}

controller := NewController(
controller, err := NewController(
kubeClient,
scheduleClient,
execClient,
scheduleInformerFactory,
execInformer,
commonutil.NewRealTime(),
location)
if err != nil {
log.Fatalf("Failed to instantiate the controller: %v", err)
}

go scheduleInformerFactory.Start(stopCh)
go execInformer.InformerFactoryStart(stopCh)
Expand Down
Loading

0 comments on commit a1a3dec

Please sign in to comment.