Skip to content

Commit

Permalink
internal change: policies can reference upstreams (#10612)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuval-k authored Feb 12, 2025
1 parent 186bee4 commit d68bd4d
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 52 deletions.
2 changes: 2 additions & 0 deletions internal/kgateway/extensions2/common/krt.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ type CommonCollections struct {
Client kube.Client
KrtOpts krtutil.KrtOptions
Secrets *krtcollections.SecretIndex
Upstreams *krtcollections.UpstreamIndex

Pods krt.Collection[krtcollections.LocalityPod]
RefGrants *krtcollections.RefGrantIndex

Expand Down
52 changes: 26 additions & 26 deletions internal/kgateway/krtcollections/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,19 @@ type UpstreamIndex struct {
availableUpstreams map[schema.GroupKind]krt.Collection[ir.Upstream]
backendRefExtension []extensionsplug.GetBackendForRefPlugin
policies *PolicyIndex
refgrants *RefGrantIndex
krtopts krtutil.KrtOptions
}

func NewUpstreamIndex(
krtopts krtutil.KrtOptions,
backendRefExtension []extensionsplug.GetBackendForRefPlugin,
policies *PolicyIndex,
refgrants *RefGrantIndex,
) *UpstreamIndex {
return &UpstreamIndex{
policies: policies,
refgrants: refgrants,
availableUpstreams: map[schema.GroupKind]krt.Collection[ir.Upstream]{},
krtopts: krtopts,
backendRefExtension: backendRefExtension,
Expand All @@ -57,6 +60,9 @@ func (s *UpstreamIndex) HasSynced() bool {
if !s.policies.HasSynced() {
return false
}
if !s.refgrants.HasSynced() {
return false
}
for _, col := range s.availableUpstreams {
if !col.Synced().HasSynced() {
return false
Expand Down Expand Up @@ -145,6 +151,22 @@ func (i *UpstreamIndex) getUpstreamFromRef(kctx krt.HandlerContext, localns stri
return i.getUpstream(kctx, resolved.GetGroupKind(), types.NamespacedName{Namespace: resolved.Namespace, Name: resolved.Name}, ref.Port)
}

func (i *UpstreamIndex) GetUpstreamFromRef(kctx krt.HandlerContext, src ir.ObjectSource, ref gwv1.BackendObjectReference) (*ir.Upstream, error) {
fromns := src.Namespace

fromgk := schema.GroupKind{
Group: src.Group,
Kind: src.Kind,
}
to := toFromBackendRef(fromns, ref)

if i.refgrants.ReferenceAllowed(kctx, fromgk, fromns, to) {
return i.getUpstreamFromRef(kctx, src.Namespace, ref)
} else {
return nil, ErrMissingReferenceGrant
}
}

type GatewayIndex struct {
policies *PolicyIndex
gwClass krt.Collection[gwv1.GatewayClass]
Expand Down Expand Up @@ -655,17 +677,8 @@ func (h *RoutesIndex) getBackends(kctx krt.HandlerContext, src ir.ObjectSource,
continue
}

var upstream *ir.Upstream
fromgk := schema.GroupKind{
Group: src.Group,
Kind: src.Kind,
}
var err error
if h.refgrants.ReferenceAllowed(kctx, fromgk, fromns, to) {
upstream, err = h.upstreams.getUpstreamFromRef(kctx, src.Namespace, ref.BackendRef.BackendObjectReference)
} else {
err = ErrMissingReferenceGrant
}
upstream, err := h.upstreams.GetUpstreamFromRef(kctx, src, ref.BackendRef.BackendObjectReference)

// TODO: if we can't find the upstream, should we
// still use its cluster name in case it comes up later?
// if so we need to think about the way create cluster names,
Expand All @@ -692,25 +705,12 @@ func (h *RoutesIndex) getBackends(kctx krt.HandlerContext, src ir.ObjectSource,
func (h *RoutesIndex) getTcpBackends(kctx krt.HandlerContext, src ir.ObjectSource, i []gwv1.BackendRef) []ir.Backend {
backends := make([]ir.Backend, 0, len(i))
for _, ref := range i {
fromns := src.Namespace

to := toFromBackendRef(fromns, ref.BackendObjectReference)
var upstream *ir.Upstream
fromgk := schema.GroupKind{
Group: src.Group,
Kind: src.Kind,
}
var err error
if h.refgrants.ReferenceAllowed(kctx, fromgk, fromns, to) {
upstream, err = h.upstreams.getUpstreamFromRef(kctx, src.Namespace, ref.BackendObjectReference)
} else {
err = ErrMissingReferenceGrant
}
upstream, err := h.upstreams.GetUpstreamFromRef(kctx, src, ref.BackendObjectReference)
clusterName := "blackhole-cluster"
if upstream != nil {
clusterName = upstream.ClusterName()
} else if err == nil {
err = &NotFoundError{NotFoundObj: to}
err = &NotFoundError{NotFoundObj: toFromBackendRef(src.Namespace, ref.BackendObjectReference)}
}
backends = append(backends, ir.Backend{
Upstream: upstream,
Expand Down
4 changes: 2 additions & 2 deletions internal/kgateway/krtcollections/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,9 @@ func preRouteIndex(t *testing.T, inputs []any) *RoutesIndex {
services := krttest.GetMockCollection[*corev1.Service](mock)

policies := NewPolicyIndex(krtutil.KrtOptions{}, extensionsplug.ContributesPolicies{})
upstreams := NewUpstreamIndex(krtutil.KrtOptions{}, nil, policies)
upstreams.AddUpstreams(SvcGk, k8sUpstreams(services))
refgrants := NewRefGrantIndex(krttest.GetMockCollection[*gwv1beta1.ReferenceGrant](mock))
upstreams := NewUpstreamIndex(krtutil.KrtOptions{}, nil, policies, refgrants)
upstreams.AddUpstreams(SvcGk, k8sUpstreams(services))

httproutes := krttest.GetMockCollection[*gwv1.HTTPRoute](mock)
tcpproutes := krttest.GetMockCollection[*gwv1a2.TCPRoute](mock)
Expand Down
16 changes: 7 additions & 9 deletions internal/kgateway/krtcollections/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func InitCollections(
isOurGw func(gw *gwv1.Gateway) bool,
refgrants *RefGrantIndex,
krtopts krtutil.KrtOptions,
) (*GatewayIndex, *RoutesIndex, krt.Collection[ir.Upstream], krt.Collection[ir.EndpointsForUpstream]) {
) (*GatewayIndex, *RoutesIndex, *UpstreamIndex, krt.Collection[ir.EndpointsForUpstream]) {
registerTypes()

httpRoutes := krt.WrapClient(kclient.New[*gwv1.HTTPRoute](istioClient), krtopts.ToOptions("HTTPRoute")...)
Expand All @@ -79,7 +79,7 @@ func initCollectionsWithGateways(
refgrants *RefGrantIndex,
extensions extensionsplug.Plugin,
krtopts krtutil.KrtOptions,
) (*GatewayIndex, *RoutesIndex, krt.Collection[ir.Upstream], krt.Collection[ir.EndpointsForUpstream]) {
) (*GatewayIndex, *RoutesIndex, *UpstreamIndex, krt.Collection[ir.EndpointsForUpstream]) {

policies := NewPolicyIndex(krtopts, extensions.ContributesPolicies)

Expand All @@ -90,20 +90,20 @@ func initCollectionsWithGateways(
}
}

upstreamIndex := NewUpstreamIndex(krtopts, backendRefPlugins, policies)
finalUpstreams, endpointIRs := initUpstreams(extensions, upstreamIndex, krtopts)
upstreamIndex := NewUpstreamIndex(krtopts, backendRefPlugins, policies, refgrants)
endpointIRs := initUpstreams(extensions, upstreamIndex, krtopts)

kubeGateways := NewGatewayIndex(krtopts, isOurGw, policies, kubeRawGateways)

routes := NewRoutesIndex(krtopts, httpRoutes, tcproutes, policies, upstreamIndex, refgrants)
return kubeGateways, routes, finalUpstreams, endpointIRs
return kubeGateways, routes, upstreamIndex, endpointIRs
}

func initUpstreams(
extensions extensionsplug.Plugin,
upstreamIndex *UpstreamIndex,
krtopts krtutil.KrtOptions,
) (krt.Collection[ir.Upstream], krt.Collection[ir.EndpointsForUpstream]) {
) krt.Collection[ir.EndpointsForUpstream] {

allEndpoints := []krt.Collection[ir.EndpointsForUpstream]{}
for k, col := range extensions.ContributesUpstreams {
Expand All @@ -115,10 +115,8 @@ func initUpstreams(
}
}

finalUpstreams := krt.JoinCollection(upstreamIndex.Upstreams(), krtopts.ToOptions("FinalUpstreams")...)

// build Endpoint intermediate representation from kubernetes service and extensions
// TODO move kube service to be an extension
endpointIRs := krt.JoinCollection(allEndpoints, krtopts.ToOptions("EndpointIRs")...)
return finalUpstreams, endpointIRs
return endpointIRs
}
12 changes: 9 additions & 3 deletions internal/kgateway/proxy_syncer/proxy_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,7 @@ func (s *ProxySyncer) Init(ctx context.Context, isOurGw func(gw *gwv1.Gateway) b
ctx = contextutils.WithLogger(ctx, "k8s-gw-proxy-syncer")
logger := contextutils.LoggerFrom(ctx)

s.translatorSyncer.Init(ctx, isOurGw)

kubeGateways, routes, finalUpstreams, endpointIRs := krtcollections.InitCollections(
kubeGateways, routes, upstreamIndex, endpointIRs := krtcollections.InitCollections(
ctx,
s.extensions,
s.istioClient,
Expand All @@ -204,6 +202,13 @@ func (s *ProxySyncer) Init(ctx context.Context, isOurGw func(gw *gwv1.Gateway) b
krtopts,
)

finalUpstreams := krt.JoinCollection(upstreamIndex.Upstreams(), krtopts.ToOptions("FinalUpstreams")...)

// add the upstreams to the common collections, so they are available for policies.
s.commonCols.Upstreams = upstreamIndex

s.translatorSyncer.Init(ctx, routes)

s.mostXdsSnapshots = krt.NewCollection(kubeGateways.Gateways, func(kctx krt.HandlerContext, gw ir.Gateway) *GatewayXdsResources {
logger.Debugf("building proxy for kube gw %s version %s", client.ObjectKeyFromObject(gw.Obj), gw.Obj.GetResourceVersion())

Expand Down Expand Up @@ -279,6 +284,7 @@ func (s *ProxySyncer) Init(ctx context.Context, isOurGw func(gw *gwv1.Gateway) b
s.waitForSync = []cache.InformerSynced{
endpointIRs.Synced().HasSynced,
endpointIRs.Synced().HasSynced,
upstreamIndex.HasSynced,
finalUpstreams.Synced().HasSynced,
kubeGateways.Gateways.Synced().HasSynced,
s.perclientSnapCollection.Synced().HasSynced,
Expand Down
4 changes: 2 additions & 2 deletions internal/kgateway/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,11 +754,11 @@ func newQueries(initObjs ...client.Object) query.GatewayQueries {
}
mock := krttest.NewMock(GinkgoT(), anys)
services := krttest.GetMockCollection[*corev1.Service](mock)
refgrants := krtcollections.NewRefGrantIndex(krttest.GetMockCollection[*apiv1beta1.ReferenceGrant](mock))

policies := krtcollections.NewPolicyIndex(krtutil.KrtOptions{}, extensionsplug.ContributesPolicies{})
upstreams := krtcollections.NewUpstreamIndex(krtutil.KrtOptions{}, nil, policies)
upstreams := krtcollections.NewUpstreamIndex(krtutil.KrtOptions{}, nil, policies, refgrants)
upstreams.AddUpstreams(SvcGk, k8sUpstreams(services))
refgrants := krtcollections.NewRefGrantIndex(krttest.GetMockCollection[*apiv1beta1.ReferenceGrant](mock))

httproutes := krttest.GetMockCollection[*gwv1.HTTPRoute](mock)
tcpproutes := krttest.GetMockCollection[*gwv1a2.TCPRoute](mock)
Expand Down
7 changes: 4 additions & 3 deletions internal/kgateway/translator/gateway/translator_case_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,17 +195,18 @@ func (tc TestCase) Run(t test.Failer, ctx context.Context) (map[types.Namespaced
return true
}

gi, ri, ui, ei := krtcollections.InitCollections(ctx, extensions, cli, isOurGw, commoncol.RefGrants, krtOpts)

translator := translator.NewCombinedTranslator(ctx, extensions, commoncol)
translator.Init(ctx, isOurGw)
translator.Init(ctx, ri)

gi, ri, ui, ei := krtcollections.InitCollections(ctx, extensions, cli, isOurGw, commoncol.RefGrants, krtOpts)
cli.RunAndWait(ctx.Done())
gi.Gateways.Synced().WaitUntilSynced(ctx.Done())
kubeclient.WaitForCacheSync("routes", ctx.Done(), ri.HasSynced)
kubeclient.WaitForCacheSync("extensions", ctx.Done(), extensions.HasSynced)
kubeclient.WaitForCacheSync("commoncol", ctx.Done(), commoncol.HasSynced)
kubeclient.WaitForCacheSync("translator", ctx.Done(), translator.HasSynced)
kubeclient.WaitForCacheSync("upstreams", ctx.Done(), ui.Synced().HasSynced)
kubeclient.WaitForCacheSync("upstreams", ctx.Done(), ui.HasSynced)
kubeclient.WaitForCacheSync("endpoints", ctx.Done(), ei.Synced().HasSynced)

results := make(map[types.NamespacedName]ActualTestResult)
Expand Down
8 changes: 1 addition & 7 deletions internal/kgateway/translator/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
envoy_config_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
"github.com/solo-io/go-utils/contextutils"
"sigs.k8s.io/controller-runtime/pkg/client"
gwv1 "sigs.k8s.io/gateway-api/apis/v1"

"github.com/kgateway-dev/kgateway/v2/internal/kgateway/endpoints"
"github.com/kgateway-dev/kgateway/v2/internal/kgateway/extensions2/common"
Expand Down Expand Up @@ -62,12 +61,11 @@ func NewCombinedTranslator(
}

// Note: isOurGw is shared between us and the deployer.
func (s *CombinedTranslator) Init(ctx context.Context, isOurGw func(gw *gwv1.Gateway) bool) error {
func (s *CombinedTranslator) Init(ctx context.Context, routes *krtcollections.RoutesIndex) error {
ctx = contextutils.WithLogger(ctx, "k8s-gw-proxy-syncer")

nsCol := krtcollections.NewNamespaceCollection(ctx, s.commonCols.Client, s.commonCols.KrtOpts)

kubeGateways, routes, finalUpstreams, endpointIRs := krtcollections.InitCollections(ctx, s.extensions, s.commonCols.Client, isOurGw, s.commonCols.RefGrants, s.commonCols.KrtOpts)
queries := query.NewData(
routes,
s.commonCols.Secrets,
Expand All @@ -86,11 +84,7 @@ func (s *CombinedTranslator) Init(ctx context.Context, isOurGw func(gw *gwv1.Gat
}

s.waitForSync = append(s.waitForSync,
endpointIRs.Synced().HasSynced,
endpointIRs.Synced().HasSynced,
s.commonCols.HasSynced,
finalUpstreams.Synced().HasSynced,
kubeGateways.Gateways.Synced().HasSynced,
s.extensions.HasSynced,
routes.HasSynced,
)
Expand Down

0 comments on commit d68bd4d

Please sign in to comment.