Skip to content

Commit

Permalink
Merge pull request #757 from traPtitech/impr/retry
Browse files Browse the repository at this point in the history
Improve backend watch retry
  • Loading branch information
motoki317 authored Oct 31, 2023
2 parents f4a2d2f + 92187e2 commit 670caa8
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 37 deletions.
26 changes: 5 additions & 21 deletions pkg/infrastructure/backend/dockerimpl/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"github.com/traPtitech/neoshowcase/pkg/util/retry"
"sync"
"time"

Expand All @@ -13,8 +14,6 @@ import (
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/client"
"github.com/friendsofgo/errors"
log "github.com/sirupsen/logrus"

"github.com/traPtitech/neoshowcase/pkg/domain"
"github.com/traPtitech/neoshowcase/pkg/domain/builder"
"github.com/traPtitech/neoshowcase/pkg/util/ds"
Expand All @@ -40,7 +39,7 @@ type Backend struct {
image builder.ImageConfig

eventSubs domain.PubSub[*domain.ContainerEvent]
eventCancel func()
stopWatcher func()

reloadLock sync.Mutex
}
Expand Down Expand Up @@ -78,27 +77,12 @@ func (b *Backend) Start(ctx context.Context) error {
}

eventCtx, eventCancel := context.WithCancel(context.Background())
b.eventCancel = eventCancel
go b.eventListenerLoop(eventCtx)
b.stopWatcher = eventCancel
go retry.Do(eventCtx, b.eventListener, "container watcher")

return nil
}

func (b *Backend) eventListenerLoop(ctx context.Context) {
for {
err := b.eventListener(ctx)
if err == nil {
return
}
log.Errorf("docker event listner errored, retrying in 1s: %+v", err)
select {
case <-time.After(time.Second):
case <-ctx.Done():
return
}
}
}

func (b *Backend) eventListener(ctx context.Context) error {
// https://docs.docker.com/engine/reference/commandline/events/
ch, errCh := b.c.Events(ctx, types.EventsOptions{Filters: filters.NewArgs(filters.Arg("type", "container"))})
Expand Down Expand Up @@ -126,7 +110,7 @@ func (b *Backend) eventListener(ctx context.Context) error {
}

func (b *Backend) Dispose(_ context.Context) error {
b.eventCancel()
b.stopWatcher()
return nil
}

Expand Down
27 changes: 15 additions & 12 deletions pkg/infrastructure/backend/k8simpl/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package k8simpl
import (
"context"
"fmt"
"github.com/traPtitech/neoshowcase/pkg/util/retry"
"strings"
"sync"

Expand All @@ -12,7 +13,6 @@ import (
traefikv1alpha1 "github.com/traefik/traefik/v2/pkg/provider/kubernetes/crd/generated/clientset/versioned/typed/traefikio/v1alpha1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

Expand Down Expand Up @@ -43,9 +43,9 @@ type Backend struct {
certManagerClient *certmanagerv1.Clientset
config Config

eventSubs domain.PubSub[*domain.ContainerEvent]
eventSubs domain.PubSub[*domain.ContainerEvent]
stopWatcher func()

podWatcher watch.Interface
reloadLock sync.Mutex
}

Expand All @@ -71,22 +71,24 @@ func NewK8SBackend(
}

func (b *Backend) Start(_ context.Context) error {
var err error
b.podWatcher, err = b.client.CoreV1().Pods(b.config.Namespace).Watch(context.Background(), metav1.ListOptions{
ctx, cancel := context.WithCancel(context.Background())
b.stopWatcher = cancel
go retry.Do(ctx, b.eventListener, "pod watcher")
return nil
}

func (b *Backend) eventListener(ctx context.Context) error {
podWatcher, err := b.client.CoreV1().Pods(b.config.Namespace).Watch(ctx, metav1.ListOptions{
LabelSelector: metav1.FormatLabelSelector(&metav1.LabelSelector{MatchLabels: map[string]string{
managedLabel: "true",
}}),
})
if err != nil {
return errors.Wrap(err, "failed to watch pods")
}
go b.eventListener()
defer podWatcher.Stop()

return nil
}

func (b *Backend) eventListener() {
for ev := range b.podWatcher.ResultChan() {
for ev := range podWatcher.ResultChan() {
p, ok := ev.Object.(*apiv1.Pod)
if !ok {
log.Warnf("unexpected type: %v", ev)
Expand All @@ -99,10 +101,11 @@ func (b *Backend) eventListener() {
}
b.eventSubs.Publish(&domain.ContainerEvent{ApplicationID: appID})
}
return nil
}

func (b *Backend) Dispose(_ context.Context) error {
b.podWatcher.Stop()
b.stopWatcher()
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/usecase/builder/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (s *builderService) Start(_ context.Context) error {

go retry.Do(ctx, func(ctx context.Context) error {
return s.client.ConnectBuilder(ctx, s.onRequest, response)
})
}, "connect to controller")
go loop.Loop(ctx, s.prune, 1*time.Hour, false)

return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/usecase/ssgen/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (s *generatorService) Start(_ context.Context) error {

go retry.Do(ctx, func(ctx context.Context) error {
return s.client.ConnectSSGen(ctx, s.onRequest)
})
}, "connect to controller")
go func() {
for i := 0; i < 300; i++ {
s.reload()
Expand Down
8 changes: 6 additions & 2 deletions pkg/util/retry/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const (
successThreshold = 60 * time.Second
)

func Do(ctx context.Context, fn func(ctx context.Context) error) {
func Do(ctx context.Context, fn func(ctx context.Context) error, msg string) {
backoff := initialBackoff
for {
start := time.Now()
Expand All @@ -26,7 +26,11 @@ func Do(ctx context.Context, fn func(ctx context.Context) error) {
if time.Since(start) >= successThreshold || err == nil {
backoff = initialBackoff
}
log.Infof("Lost connection, retrying in %v", backoff)
if err == nil {
log.Infof("Retrier: retrying in %v: %v", backoff, msg)
} else {
log.Errorf("Retrier: retrying in %v: %v: %v", backoff, msg, err)
}
select {
case <-time.After(backoff):
case <-ctx.Done():
Expand Down

0 comments on commit 670caa8

Please sign in to comment.