Skip to content

feat(kuma-cp): deduplicate dataplane inbounds by address and port combination #12760

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Apr 8, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 61 additions & 1 deletion pkg/plugins/runtime/k8s/controllers/inbound_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controllers
import (
"context"
"fmt"
"sort"
"strconv"
"strings"

Expand Down Expand Up @@ -131,7 +132,10 @@ func inboundForServiceless(zone string, pod *kube_core.Pod, name string, nodeLab
}
}

func (i *InboundConverter) InboundInterfacesFor(ctx context.Context, zone string, pod *kube_core.Pod, services []*kube_core.Service) ([]*mesh_proto.Dataplane_Networking_Inbound, error) {
// Deprecated: LegacyInboundInterfacesFor is currently only used for delegated gateway to not change order of inbounds
// generated for gateway, as we pick first to take tags from. Delegated gateway identity relies on this.
// TODO: We should revisit this when we rework identity
func (i *InboundConverter) LegacyInboundInterfacesFor(ctx context.Context, zone string, pod *kube_core.Pod, services []*kube_core.Service) ([]*mesh_proto.Dataplane_Networking_Inbound, error) {
nodeLabels, err := i.getNodeLabelsToCopy(ctx, pod.Spec.NodeName)
if err != nil {
return nil, err
Expand All @@ -158,9 +162,65 @@ func (i *InboundConverter) InboundInterfacesFor(ctx context.Context, zone string

ifaces = append(ifaces, inboundForServiceless(zone, pod, name, nodeLabels))
}

return ifaces, nil
}

func (i *InboundConverter) InboundInterfacesFor(ctx context.Context, zone string, pod *kube_core.Pod, services []*kube_core.Service) ([]*mesh_proto.Dataplane_Networking_Inbound, error) {
nodeLabels, err := i.getNodeLabelsToCopy(ctx, pod.Spec.NodeName)
if err != nil {
return nil, err
}

var ifaces []*mesh_proto.Dataplane_Networking_Inbound
for _, svc := range services {
// Services of ExternalName type should not have any selectors.
// Kubernetes does not validate this, so in rare cases, a service of
// ExternalName type could point to a workload inside the mesh. If this
// happens, we would incorrectly generate inbounds including
// ExternalName service. We do not currently support ExternalName
// services, so we can safely skip them from processing.
if svc.Spec.Type != kube_core.ServiceTypeExternalName {
ifaces = append(ifaces, inboundForService(zone, pod, svc, nodeLabels)...)
}
}

if len(ifaces) == 0 {
name, _, err := i.NameExtractor.Name(ctx, pod)
if err != nil {
return nil, err
}

ifaces = append(ifaces, inboundForServiceless(zone, pod, name, nodeLabels))
}

// Right now we will build multiple inbounds for each service selecting port, but later on
// we will only create one listener for last inbound. Ignoring other inbounds from dataplane.
// Because of this we can safely deduplicate them here. This needs to change when we get rid of kuma.io/service
return deduplicateInboundsByAddressAndPort(ifaces), nil
}

func deduplicateInboundsByAddressAndPort(ifaces []*mesh_proto.Dataplane_Networking_Inbound) []*mesh_proto.Dataplane_Networking_Inbound {
inboundKey := func(iface *mesh_proto.Dataplane_Networking_Inbound) string {
return fmt.Sprintf("%s:%d", iface.Address, iface.Port)
}

inboundsPerName := map[string]*mesh_proto.Dataplane_Networking_Inbound{}
for _, iface := range ifaces {
if inboundsPerName[inboundKey(iface)] == nil {
inboundsPerName[inboundKey(iface)] = iface
}
}
var deduplicatedInbounds []*mesh_proto.Dataplane_Networking_Inbound
for _, v := range inboundsPerName {
deduplicatedInbounds = append(deduplicatedInbounds, v)
}
sort.Slice(deduplicatedInbounds, func(i, j int) bool {
return inboundKey(deduplicatedInbounds[i]) < inboundKey(deduplicatedInbounds[j])
})
return deduplicatedInbounds
}

func (i *InboundConverter) getNodeLabelsToCopy(ctx context.Context, nodeName string) (map[string]string, error) {
if len(i.NodeLabelsToCopy) == 0 || nodeName == "" {
return map[string]string{}, nil
Expand Down
178 changes: 89 additions & 89 deletions pkg/plugins/runtime/k8s/controllers/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,64 +635,64 @@ var _ = Describe("PodReconciler", func() {
Expect(err).ToNot(HaveOccurred())
// and
Expect(actual).To(MatchYAML(`
mesh: poc
metadata:
creationTimestamp: null
labels:
app: sample
k8s.kuma.io/namespace: demo
kuma.io/env: kubernetes
kuma.io/mesh: poc
kuma.io/origin: zone
kuma.io/proxy-type: sidecar
kuma.io/zone: zone-1
name: pod-with-kuma-sidecar-and-ip
namespace: demo
ownerReferences:
- apiVersion: v1
blockOwnerDeletion: true
controller: true
kind: Pod
name: pod-with-kuma-sidecar-and-ip
uid: pod-with-kuma-sidecar-and-ip-demo
resourceVersion: "1"
spec:
networking:
address: 192.168.0.1
inbound:
- state: NotReady
health: {}
port: 8080
mesh: poc
metadata:
creationTimestamp: null
labels:
app: sample
k8s.kuma.io/namespace: demo
kuma.io/env: kubernetes
kuma.io/mesh: poc
kuma.io/origin: zone
kuma.io/proxy-type: sidecar
kuma.io/zone: zone-1
name: pod-with-kuma-sidecar-and-ip
namespace: demo
ownerReferences:
- apiVersion: v1
blockOwnerDeletion: true
controller: true
kind: Pod
name: pod-with-kuma-sidecar-and-ip
uid: pod-with-kuma-sidecar-and-ip-demo
resourceVersion: "1"
spec:
networking:
address: 192.168.0.1
inbound:
- health: {}
name: metrics
port: 6060
state: NotReady
tags:
app: sample
kuma.io/protocol: http
kuma.io/service: example_demo_svc_80
k8s.kuma.io/service-name: example
k8s.kuma.io/service-port: "80"
k8s.kuma.io/namespace: demo
k8s.kuma.io/service-name: example
k8s.kuma.io/service-port: "6061"
kuma.io/protocol: tcp
kuma.io/service: example_demo_svc_6061
kuma.io/zone: zone-1
- state: NotReady
health: {}
port: 6060
name: metrics
- health: {}
port: 8080
state: NotReady
tags:
app: sample
kuma.io/service: example_demo_svc_6061
kuma.io/protocol: tcp
k8s.kuma.io/service-name: example
k8s.kuma.io/service-port: "6061"
k8s.kuma.io/namespace: demo
k8s.kuma.io/service-name: example
k8s.kuma.io/service-port: "80"
kuma.io/protocol: http
kuma.io/service: example_demo_svc_80
kuma.io/zone: zone-1
- state: NotReady
health: {}
- health: {}
port: 9090
state: NotReady
tags:
app: sample
kuma.io/service: manual-example_demo_svc_90
kuma.io/protocol: http
k8s.kuma.io/namespace: demo
k8s.kuma.io/service-name: manual-example
k8s.kuma.io/service-port: "90"
k8s.kuma.io/namespace: demo
kuma.io/protocol: http
kuma.io/service: manual-example_demo_svc_90
kuma.io/zone: zone-1
`))
})
Expand Down Expand Up @@ -737,64 +737,64 @@ var _ = Describe("PodReconciler", func() {
Expect(err).ToNot(HaveOccurred())
// and
Expect(actual).To(MatchYAML(`
mesh: poc
metadata:
creationTimestamp: null
labels:
app: sample
k8s.kuma.io/namespace: demo
kuma.io/env: kubernetes
kuma.io/mesh: poc
kuma.io/origin: zone
kuma.io/proxy-type: sidecar
kuma.io/zone: zone-1
mesh: poc
metadata:
creationTimestamp: null
labels:
app: sample
k8s.kuma.io/namespace: demo
kuma.io/env: kubernetes
kuma.io/mesh: poc
kuma.io/origin: zone
kuma.io/proxy-type: sidecar
kuma.io/zone: zone-1
name: pod-with-kuma-sidecar-and-ip
namespace: demo
ownerReferences:
- apiVersion: v1
blockOwnerDeletion: true
controller: true
kind: Pod
name: pod-with-kuma-sidecar-and-ip
namespace: demo
ownerReferences:
- apiVersion: v1
blockOwnerDeletion: true
controller: true
kind: Pod
name: pod-with-kuma-sidecar-and-ip
uid: "pod-with-kuma-sidecar-and-ip-demo"
resourceVersion: "2"
spec:
networking:
address: 192.168.0.1
inbound:
- state: NotReady
health: {}
port: 8080
uid: pod-with-kuma-sidecar-and-ip-demo
resourceVersion: "2"
spec:
networking:
address: 192.168.0.1
inbound:
- health: {}
name: metrics
port: 6060
state: NotReady
tags:
app: sample
kuma.io/protocol: http
kuma.io/service: example_demo_svc_80
k8s.kuma.io/service-name: example
k8s.kuma.io/service-port: "80"
k8s.kuma.io/namespace: demo
k8s.kuma.io/service-name: example
k8s.kuma.io/service-port: "6061"
kuma.io/protocol: tcp
kuma.io/service: example_demo_svc_6061
kuma.io/zone: zone-1
- state: NotReady
health: {}
port: 6060
name: metrics
- health: {}
port: 8080
state: NotReady
tags:
app: sample
kuma.io/service: example_demo_svc_6061
kuma.io/protocol: tcp
k8s.kuma.io/service-name: example
k8s.kuma.io/service-port: "6061"
k8s.kuma.io/namespace: demo
k8s.kuma.io/service-name: example
k8s.kuma.io/service-port: "80"
kuma.io/protocol: http
kuma.io/service: example_demo_svc_80
kuma.io/zone: zone-1
- state: NotReady
health: {}
- health: {}
port: 9090
state: NotReady
tags:
app: sample
kuma.io/service: manual-example_demo_svc_90
kuma.io/protocol: http
k8s.kuma.io/namespace: demo
k8s.kuma.io/service-name: manual-example
k8s.kuma.io/service-port: "90"
k8s.kuma.io/namespace: demo
kuma.io/protocol: http
kuma.io/service: manual-example_demo_svc_90
kuma.io/zone: zone-1
`))
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/plugins/runtime/k8s/controllers/pod_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,13 +309,13 @@ func (p *PodConverter) dataplaneFor(
}

func (p *PodConverter) GatewayByServiceFor(ctx context.Context, clusterName string, pod *kube_core.Pod, services []*kube_core.Service) (*mesh_proto.Dataplane_Networking_Gateway, error) {
interfaces, err := p.InboundConverter.InboundInterfacesFor(ctx, clusterName, pod, services)
interfaces, err := p.InboundConverter.LegacyInboundInterfacesFor(ctx, clusterName, pod, services)
if err != nil {
return nil, err
}
return &mesh_proto.Dataplane_Networking_Gateway{
Type: mesh_proto.Dataplane_Networking_Gateway_DELEGATED,
Tags: interfaces[0].Tags, // InboundInterfacesFor() returns either a non-empty list or an error
Tags: interfaces[0].Tags,
}, nil
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/plugins/runtime/k8s/controllers/pod_converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,11 @@ var _ = Describe("PodToDataplane(..)", func() {
otherServices: "update-dataplane.other-services.yaml",
dataplane: "update-dataplane.dataplane.yaml",
}),
Entry("Multiples services selecting single port", testCase{
pod: "duplicated-inbounds.pod.yaml",
servicesForPod: "duplicated-inbounds.services-for-pod.yaml",
dataplane: "duplicated-inbounds.dataplane.yaml",
}),
)

DescribeTable("should convert Ingress Pod into an Ingress Dataplane YAML version",
Expand Down
Loading
Loading