From 0c2b3bfb5278352c4698eb45d6fe47f19a39c540 Mon Sep 17 00:00:00 2001 From: Fabian Ruff Date: Fri, 8 Jan 2021 18:46:49 +0100 Subject: [PATCH] fix: race when tunnel endpoint becomes unavailable (#58) The current implementation using an index on the endpoint cache by pod name was flawed. A pod could be targeted by multiple services, so when processing the endpoints update for a service the pod name could be still present in an endpoints object of a different service. This could lead to missing the event that the tunnel endoint has became unavailable leaving the tunnel for this service broken until the next endpoints update. This commit removes the index entirely and explicitly searches the current endpoints object for the pod name of the current port forward. This is simpler and leaves no room for a race. Signed-off-by: Fabian Ruff --- internal/proxier/proxy.go | 35 +++++++++++++---------------------- 1 file changed, 13 insertions(+), 22 deletions(-) diff --git a/internal/proxier/proxy.go b/internal/proxier/proxy.go index 7c13e34..ba94d58 100644 --- a/internal/proxier/proxy.go +++ b/internal/proxier/proxy.go @@ -20,7 +20,6 @@ import ( "github.com/jaredallard/localizer/internal/kevents" "github.com/jaredallard/localizer/internal/kube" - "github.com/pkg/errors" "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" @@ -75,23 +74,6 @@ type ProxyOpts struct { IPCidr string } -func IndexEndpointsByActivePod(obj interface{}) ([]string, error) { - endpoints, ok := obj.(*corev1.Endpoints) - if !ok { - return nil, errors.New("indexer only works on Endpoints") - } - var pods []string - for _, subset := range endpoints.Subsets { - for _, address := range subset.Addresses { - if address.TargetRef == nil || address.TargetRef.Kind != PodKind { - continue - } - pods = append(pods, address.TargetRef.Name) - } - } - return pods, nil -} - // NewProxier creates a new proxier instance func NewProxier(ctx context.Context, k kubernetes.Interface, kconf *rest.Config, log logrus.FieldLogger, opts *ProxyOpts) (*Proxier, error) { //nolint:lll svcInformer := kevents.GlobalCache.Core().V1().Services().Informer() @@ -137,9 +119,7 @@ func NewProxier(ctx context.Context, k kubernetes.Interface, kconf *rest.Config, } }, }) - return p, endpointsInformer.AddIndexers(cache.Indexers{ - "pods": IndexEndpointsByActivePod, - }) + return p, nil } // Start starts the proxier @@ -259,7 +239,7 @@ func (p *Proxier) reconcile(key string) error { } case PortForwardStatusRunning: - if endpoints, err := p.endpointsInformer.GetIndexer().ByIndex("pods", existingForward.Pod.Name); err == nil && len(endpoints) == 0 { + if !isActiveEndpoint(existingForward.Pod.Name, endpoints) { p.createPortforward(svc, fmt.Sprintf("endpoints '%s' was removed", existingForward.Pod.Key())) } case PortForwardStatusRecreating: @@ -350,3 +330,14 @@ func (p *Proxier) List(ctx context.Context) ([]ServiceStatus, error) { return statuses, nil } + +func isActiveEndpoint(podName string, endpoints *corev1.Endpoints) bool { + for _, subset := range endpoints.Subsets { + for _, address := range subset.Addresses { + if address.TargetRef != nil && address.TargetRef.Kind == PodKind && address.TargetRef.Name == podName { + return true + } + } + } + return false +}