Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 8232: ensure the ending event sinked before shutdown #8237

Merged
merged 1 commit into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/datamover/backup_micro_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
}

func (r *BackupMicroService) Init() error {
r.eventRecorder = kube.NewEventRecorder(r.kubeClient, r.client.Scheme(), r.dataUploadName, r.nodeName)
r.eventRecorder = kube.NewEventRecorder(r.kubeClient, r.client.Scheme(), r.dataUploadName, r.nodeName, r.logger)

Check warning on line 98 in pkg/datamover/backup_micro_service.go

View check run for this annotation

Codecov / codecov/patch

pkg/datamover/backup_micro_service.go#L98

Added line #L98 was not covered by tests

handler, err := r.duInformer.AddEventHandler(
cachetool.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -222,6 +222,8 @@
log.WithError(err).Error("Async fs backup was not completed")
}

r.eventRecorder.EndingEvent(du, false, datapath.EventReasonStopped, "Data path for %s stopped", du.Name)

return result, err
}

Expand Down
15 changes: 12 additions & 3 deletions pkg/datamover/backup_micro_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ func (bt *backupMsTestHelper) Event(_ runtime.Object, _ bool, reason string, mes
bt.eventReason = reason
bt.eventMsg = fmt.Sprintf(message, a...)
}

func (bt *backupMsTestHelper) EndingEvent(_ runtime.Object, _ bool, reason string, message string, a ...any) {
bt.eventLock.Lock()
defer bt.eventLock.Unlock()

bt.withEvent = true
bt.eventReason = reason
bt.eventMsg = fmt.Sprintf(message, a...)
}
func (bt *backupMsTestHelper) Shutdown() {}

func (bt *backupMsTestHelper) Marshal(v any) ([]byte, error) {
Expand Down Expand Up @@ -336,7 +345,7 @@ func TestRunCancelableDataPath(t *testing.T) {
ctx: ctxTimeout,
kubeClientObj: []runtime.Object{duInProgress},
dataPathStarted: true,
expectedEventMsg: fmt.Sprintf("Data path for %s started", dataUploadName),
expectedEventMsg: fmt.Sprintf("Data path for %s stopped", dataUploadName),
expectedErr: "timed out waiting for fs backup to complete",
},
{
Expand All @@ -347,7 +356,7 @@ func TestRunCancelableDataPath(t *testing.T) {
result: &dataPathResult{
err: errors.New("fake-data-path-error"),
},
expectedEventMsg: fmt.Sprintf("Data path for %s started", dataUploadName),
expectedEventMsg: fmt.Sprintf("Data path for %s stopped", dataUploadName),
expectedErr: "fake-data-path-error",
},
{
Expand All @@ -358,7 +367,7 @@ func TestRunCancelableDataPath(t *testing.T) {
result: &dataPathResult{
result: "fake-succeed-result",
},
expectedEventMsg: fmt.Sprintf("Data path for %s started", dataUploadName),
expectedEventMsg: fmt.Sprintf("Data path for %s stopped", dataUploadName),
},
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/datamover/restore_micro_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
}

func (r *RestoreMicroService) Init() error {
r.eventRecorder = kube.NewEventRecorder(r.kubeClient, r.client.Scheme(), r.dataDownloadName, r.nodeName)
r.eventRecorder = kube.NewEventRecorder(r.kubeClient, r.client.Scheme(), r.dataDownloadName, r.nodeName, r.logger)

Check warning on line 87 in pkg/datamover/restore_micro_service.go

View check run for this annotation

Codecov / codecov/patch

pkg/datamover/restore_micro_service.go#L87

Added line #L87 was not covered by tests

handler, err := r.ddInformer.AddEventHandler(
cachetool.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -199,6 +199,8 @@
log.WithError(err).Error("Async fs restore was not completed")
}

r.eventRecorder.EndingEvent(dd, false, datapath.EventReasonStopped, "Data path for %s stopped", dd.Name)

return result, err
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/datamover/restore_micro_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func TestRunCancelableRestore(t *testing.T) {
ctx: ctxTimeout,
kubeClientObj: []runtime.Object{ddInProgress},
dataPathStarted: true,
expectedEventMsg: fmt.Sprintf("Data path for %s started", dataDownloadName),
expectedEventMsg: fmt.Sprintf("Data path for %s stopped", dataDownloadName),
expectedErr: "timed out waiting for fs restore to complete",
},
{
Expand All @@ -300,7 +300,7 @@ func TestRunCancelableRestore(t *testing.T) {
result: &dataPathResult{
err: errors.New("fake-data-path-error"),
},
expectedEventMsg: fmt.Sprintf("Data path for %s started", dataDownloadName),
expectedEventMsg: fmt.Sprintf("Data path for %s stopped", dataDownloadName),
expectedErr: "fake-data-path-error",
},
{
Expand All @@ -311,7 +311,7 @@ func TestRunCancelableRestore(t *testing.T) {
result: &dataPathResult{
result: "fake-succeed-result",
},
expectedEventMsg: fmt.Sprintf("Data path for %s started", dataDownloadName),
expectedEventMsg: fmt.Sprintf("Data path for %s stopped", dataDownloadName),
},
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/datapath/micro_service_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const (
EventReasonCancelled = "Data-Path-Canceled"
EventReasonProgress = "Data-Path-Progress"
EventReasonCancelling = "Data-Path-Canceling"
EventReasonStopped = "Data-Path-Stopped"
)

type microServiceBRWatcher struct {
Expand Down Expand Up @@ -340,15 +341,15 @@ func (ms *microServiceBRWatcher) onEvent(evt *v1.Event) {
ms.callbacks.OnProgress(ms.ctx, ms.namespace, ms.taskName, funcGetProgressFromMessage(evt.Message, ms.log))
case EventReasonCompleted:
ms.log.Infof("Received data path completed message: %v", funcGetResultFromMessage(ms.taskType, evt.Message, ms.log))
ms.terminatedFromEvent = true
case EventReasonCancelled:
ms.log.Infof("Received data path canceled message: %s", evt.Message)
ms.terminatedFromEvent = true
case EventReasonFailed:
ms.log.Infof("Received data path failed message: %s", evt.Message)
ms.terminatedFromEvent = true
case EventReasonCancelling:
ms.log.Infof("Received data path canceling message: %s", evt.Message)
case EventReasonStopped:
ms.terminatedFromEvent = true
ms.log.Infof("Received data path stop message: %s", evt.Message)
default:
ms.log.Infof("Received event for data path %s, reason: %s, message: %s", ms.taskName, evt.Reason, evt.Message)
}
Expand Down
23 changes: 22 additions & 1 deletion pkg/datapath/micro_service_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ func TestStartWatch(t *testing.T) {
},
{
event: &v1.Event{Reason: EventReasonCompleted},
},
{
event: &v1.Event{Reason: EventReasonStopped},
delay: time.Second,
},
},
Expand All @@ -214,6 +217,9 @@ func TestStartWatch(t *testing.T) {
{
event: &v1.Event{Reason: EventReasonCompleted},
},
{
event: &v1.Event{Reason: EventReasonStopped},
},
},
expectStartEvent: true,
expectTerminateEvent: true,
Expand All @@ -231,6 +237,9 @@ func TestStartWatch(t *testing.T) {
{
event: &v1.Event{Reason: EventReasonCompleted},
},
{
event: &v1.Event{Reason: EventReasonStopped},
},
},
redirectLogErr: errors.New("fake-error"),
expectStartEvent: true,
Expand Down Expand Up @@ -269,7 +278,10 @@ func TestStartWatch(t *testing.T) {
insertEventsAfter: []insertEvent{
{
event: &v1.Event{Reason: EventReasonCompleted},
after: time.Second,
},
{
event: &v1.Event{Reason: EventReasonStopped},
delay: time.Second,
},
},
expectStartEvent: true,
Expand All @@ -293,6 +305,9 @@ func TestStartWatch(t *testing.T) {
},
{
event: &v1.Event{Reason: EventReasonCompleted},
},
{
event: &v1.Event{Reason: EventReasonStopped},
delay: time.Second,
},
},
Expand All @@ -313,6 +328,9 @@ func TestStartWatch(t *testing.T) {
{
event: &v1.Event{Reason: EventReasonCancelled},
},
{
event: &v1.Event{Reason: EventReasonStopped},
},
},
terminationMessage: "fake-termination-message-1",
expectStartEvent: true,
Expand All @@ -339,6 +357,9 @@ func TestStartWatch(t *testing.T) {
{
event: &v1.Event{Reason: EventReasonCancelled},
},
{
event: &v1.Event{Reason: EventReasonStopped},
},
},
terminationMessage: ErrCancelled,
expectStartEvent: true,
Expand Down
130 changes: 123 additions & 7 deletions pkg/util/kube/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
package kube

import (
"sync"
"time"

"github.com/google/uuid"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
Expand All @@ -27,16 +30,34 @@

type EventRecorder interface {
Event(object runtime.Object, warning bool, reason string, message string, a ...any)
EndingEvent(object runtime.Object, warning bool, reason string, message string, a ...any)
Shutdown()
}

type eventRecorder struct {
broadcaster record.EventBroadcaster
recorder record.EventRecorder
broadcaster record.EventBroadcaster
recorder record.EventRecorder
lock sync.Mutex
endingSentinel *eventElement
log logrus.FieldLogger
}

func NewEventRecorder(kubeClient kubernetes.Interface, scheme *runtime.Scheme, eventSource string, eventNode string) EventRecorder {
res := eventRecorder{}
type eventElement struct {
t string
r string
m string
sinked chan struct{}
}

type eventSink struct {
recorder *eventRecorder
sink typedcorev1.EventInterface
}

func NewEventRecorder(kubeClient kubernetes.Interface, scheme *runtime.Scheme, eventSource string, eventNode string, log logrus.FieldLogger) EventRecorder {
res := eventRecorder{
log: log,
}

res.broadcaster = record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{
MaxEvents: 1,
Expand All @@ -45,7 +66,11 @@
},
})

res.broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
res.broadcaster.StartRecordingToSink(&eventSink{
recorder: &res,
sink: kubeClient.CoreV1().Events(""),
})

res.recorder = res.broadcaster.NewRecorder(scheme, v1.EventSource{
Component: eventSource,
Host: eventNode,
Expand All @@ -55,6 +80,10 @@
}

func (er *eventRecorder) Event(object runtime.Object, warning bool, reason string, message string, a ...any) {
if er.broadcaster == nil {
return

Check warning on line 84 in pkg/util/kube/event.go

View check run for this annotation

Codecov / codecov/patch

pkg/util/kube/event.go#L84

Added line #L84 was not covered by tests
}

eventType := v1.EventTypeNormal
if warning {
eventType = v1.EventTypeWarning
Expand All @@ -67,8 +96,95 @@
}
}

func (er *eventRecorder) EndingEvent(object runtime.Object, warning bool, reason string, message string, a ...any) {
if er.broadcaster == nil {
return

Check warning on line 101 in pkg/util/kube/event.go

View check run for this annotation

Codecov / codecov/patch

pkg/util/kube/event.go#L101

Added line #L101 was not covered by tests
}

er.Event(object, warning, reason, message, a...)

var sentinelEvent string

er.lock.Lock()
if er.endingSentinel == nil {
sentinelEvent = uuid.NewString()
er.endingSentinel = &eventElement{
t: v1.EventTypeNormal,
r: sentinelEvent,
m: sentinelEvent,
sinked: make(chan struct{}),
}
}
er.lock.Unlock()

if sentinelEvent != "" {
er.Event(object, false, sentinelEvent, sentinelEvent)
} else {
er.log.Warn("More than one ending events, ignore")
}
}

var shutdownTimeout time.Duration = time.Minute

func (er *eventRecorder) Shutdown() {
// StartEventWatcher doesn't wait for writing all buffered events to API server when Shutdown is called, so have to hardcode a sleep time
time.Sleep(2 * time.Second)
var wait chan struct{}
er.lock.Lock()
if er.endingSentinel != nil {
wait = er.endingSentinel.sinked
}
er.lock.Unlock()

if wait != nil {
er.log.Info("Waiting sentinel before shutdown")

waitloop:
for {
select {
case <-wait:
break waitloop
case <-time.After(shutdownTimeout):
er.log.Warn("Timeout waiting for assured events processed")
break waitloop

Check warning on line 147 in pkg/util/kube/event.go

View check run for this annotation

Codecov / codecov/patch

pkg/util/kube/event.go#L145-L147

Added lines #L145 - L147 were not covered by tests
}
}
}

er.broadcaster.Shutdown()
er.broadcaster = nil

er.lock.Lock()
er.endingSentinel = nil
er.lock.Unlock()
}

func (er *eventRecorder) sentinelWatch(event *v1.Event) bool {
er.lock.Lock()
defer er.lock.Unlock()

if er.endingSentinel == nil {
return false

Check warning on line 165 in pkg/util/kube/event.go

View check run for this annotation

Codecov / codecov/patch

pkg/util/kube/event.go#L165

Added line #L165 was not covered by tests
}

if er.endingSentinel.m == event.Message && er.endingSentinel.r == event.Reason && er.endingSentinel.t == event.Type {
close(er.endingSentinel.sinked)
return true
}

return false
}

func (es *eventSink) Create(event *v1.Event) (*v1.Event, error) {
if es.recorder.sentinelWatch(event) {
return event, nil
}

return es.sink.CreateWithEventNamespace(event)
}

func (es *eventSink) Update(event *v1.Event) (*v1.Event, error) {
return es.sink.UpdateWithEventNamespace(event)

Check warning on line 185 in pkg/util/kube/event.go

View check run for this annotation

Codecov / codecov/patch

pkg/util/kube/event.go#L184-L185

Added lines #L184 - L185 were not covered by tests
}

func (es *eventSink) Patch(event *v1.Event, data []byte) (*v1.Event, error) {
return es.sink.PatchWithEventNamespace(event, data)
}
Loading
Loading