Skip to content

Commit

Permalink
fix: race when tunnel endpoint becomes unavailable (#58)
Browse files Browse the repository at this point in the history
The current implementation using an index on the endpoint cache by pod name was flawed.
A pod could be targeted by multiple services, so when processing the endpoints update for a service the pod name could be still present in an endpoints object of a different service.
This could lead to missing the event that the tunnel endoint has became unavailable leaving the tunnel for this service broken until the next endpoints update.
This commit removes the index entirely and explicitly searches the current endpoints object for the pod name of the current port forward. This is simpler and leaves no room for a race.

Signed-off-by: Fabian Ruff <fabian.ruff@sap.com>
  • Loading branch information
databus23 authored Jan 8, 2021
1 parent 9e85d49 commit 0c2b3bf
Showing 1 changed file with 13 additions and 22 deletions.
35 changes: 13 additions & 22 deletions internal/proxier/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/jaredallard/localizer/internal/kevents"
"github.com/jaredallard/localizer/internal/kube"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -75,23 +74,6 @@ type ProxyOpts struct {
IPCidr string
}

func IndexEndpointsByActivePod(obj interface{}) ([]string, error) {
endpoints, ok := obj.(*corev1.Endpoints)
if !ok {
return nil, errors.New("indexer only works on Endpoints")
}
var pods []string
for _, subset := range endpoints.Subsets {
for _, address := range subset.Addresses {
if address.TargetRef == nil || address.TargetRef.Kind != PodKind {
continue
}
pods = append(pods, address.TargetRef.Name)
}
}
return pods, nil
}

// NewProxier creates a new proxier instance
func NewProxier(ctx context.Context, k kubernetes.Interface, kconf *rest.Config, log logrus.FieldLogger, opts *ProxyOpts) (*Proxier, error) { //nolint:lll
svcInformer := kevents.GlobalCache.Core().V1().Services().Informer()
Expand Down Expand Up @@ -137,9 +119,7 @@ func NewProxier(ctx context.Context, k kubernetes.Interface, kconf *rest.Config,
}
},
})
return p, endpointsInformer.AddIndexers(cache.Indexers{
"pods": IndexEndpointsByActivePod,
})
return p, nil
}

// Start starts the proxier
Expand Down Expand Up @@ -259,7 +239,7 @@ func (p *Proxier) reconcile(key string) error {
}

case PortForwardStatusRunning:
if endpoints, err := p.endpointsInformer.GetIndexer().ByIndex("pods", existingForward.Pod.Name); err == nil && len(endpoints) == 0 {
if !isActiveEndpoint(existingForward.Pod.Name, endpoints) {
p.createPortforward(svc, fmt.Sprintf("endpoints '%s' was removed", existingForward.Pod.Key()))
}
case PortForwardStatusRecreating:
Expand Down Expand Up @@ -350,3 +330,14 @@ func (p *Proxier) List(ctx context.Context) ([]ServiceStatus, error) {

return statuses, nil
}

func isActiveEndpoint(podName string, endpoints *corev1.Endpoints) bool {
for _, subset := range endpoints.Subsets {
for _, address := range subset.Addresses {
if address.TargetRef != nil && address.TargetRef.Kind == PodKind && address.TargetRef.Name == podName {
return true
}
}
}
return false
}

0 comments on commit 0c2b3bf

Please sign in to comment.