From 2fc72939dccd2b6db40cadac24d5103b57817666 Mon Sep 17 00:00:00 2001 From: Matthew Penner Date: Sun, 26 May 2024 21:34:40 -0600 Subject: [PATCH] feat: add support for `TCPRoute` and `UDPRoute` Signed-off-by: Matthew Penner --- README.md | 5 +- internal/caddy/caddy.go | 96 +++++++----- internal/caddy/tcp.go | 86 +++++++++++ internal/caddy/udp.go | 86 +++++++++++ internal/controller/route_tcp.go | 244 +++++++++++++++++++++++++++++- internal/controller/route_udp.go | 244 +++++++++++++++++++++++++++++- internal/gateway.go | 5 + internal/layer4/handler.go | 8 + internal/layer4/l4proxy/proxy.go | 127 ++++++++++++++++ internal/layer4/layer4.go | 12 ++ internal/layer4/routes.go | 31 ++++ internal/layer4/server.go | 22 +++ internal/routechecks/route_tcp.go | 139 +++++++++++++++++ internal/routechecks/route_udp.go | 139 +++++++++++++++++ main.go | 40 +++-- 15 files changed, 1229 insertions(+), 55 deletions(-) create mode 100644 internal/caddy/tcp.go create mode 100644 internal/caddy/udp.go create mode 100644 internal/layer4/handler.go create mode 100644 internal/layer4/l4proxy/proxy.go create mode 100644 internal/layer4/layer4.go create mode 100644 internal/layer4/routes.go create mode 100644 internal/layer4/server.go create mode 100644 internal/routechecks/route_tcp.go create mode 100644 internal/routechecks/route_udp.go diff --git a/README.md b/README.md index 1c00828..993f84a 100644 --- a/README.md +++ b/README.md @@ -49,12 +49,13 @@ Support for missing resources is planned but not yet implemented. - [x] [GatewayClass](https://gateway-api.sigs.k8s.io/api-types/gatewayclass/) - [x] [Gateway](https://gateway-api.sigs.k8s.io/api-types/gateway/) - [x] [ReferenceGrant](https://gateway-api.sigs.k8s.io/api-types/referencegrant/) +- [ ] [BackendLBPolicy](https://gateway-api.sigs.k8s.io/geps/gep-1619/) - [x] [BackendTLSPolicy](https://gateway-api.sigs.k8s.io/api-types/backendtlspolicy/) - [x] [HTTPRoute](https://gateway-api.sigs.k8s.io/api-types/httproute/) - [ ] [GRPCRoute](https://gateway-api.sigs.k8s.io/api-types/grpcroute/) - [ ] [TLSRoute](https://gateway-api.sigs.k8s.io/concepts/api-overview/#tlsroute) -- [ ] [TCPRoute](https://gateway-api.sigs.k8s.io/concepts/api-overview/#tcproute-and-udproute) -- [ ] [UDPRoute](https://gateway-api.sigs.k8s.io/concepts/api-overview/#tcproute-and-udproute) +- [x] [TCPRoute](https://gateway-api.sigs.k8s.io/concepts/api-overview/#tcproute-and-udproute) +- [x] [UDPRoute](https://gateway-api.sigs.k8s.io/concepts/api-overview/#tcproute-and-udproute) The [Ingress](https://kubernetes.io/docs/concepts/services-networking/ingress/) resource is not supported and support is not planned, sorry. diff --git a/internal/caddy/caddy.go b/internal/caddy/caddy.go index 7af5e3f..4b72507 100644 --- a/internal/caddy/caddy.go +++ b/internal/caddy/caddy.go @@ -20,6 +20,7 @@ import ( caddyv2 "github.com/caddyserver/gateway/internal/caddyv2" "github.com/caddyserver/gateway/internal/caddyv2/caddyhttp" "github.com/caddyserver/gateway/internal/caddyv2/caddytls" + "github.com/caddyserver/gateway/internal/layer4" ) // Config represents the configuration for a Caddy server. @@ -31,10 +32,9 @@ type Config struct { // Apps is the configuration for "apps" on a Caddy server. type Apps struct { - HTTP *caddyhttp.App `json:"http,omitempty"` - TLS *caddytls.TLS `json:"tls,omitempty"` - // TODO: replace the layer4 package with our own definitions. - // Layer4 *layer4.App `json:"layer4,omitempty"` + HTTP *caddyhttp.App `json:"http,omitempty"` + TLS *caddytls.TLS `json:"tls,omitempty"` + Layer4 *layer4.App `json:"layer4,omitempty"` } // Input is provided to us by the Gateway Controller and is used to @@ -56,16 +56,16 @@ type Input struct { Client client.Client - httpServers map[string]*caddyhttp.Server - // layer4Servers map[string]*layer4.Server - config *Config - loadPems []caddytls.CertKeyPEMPair + httpServers map[string]*caddyhttp.Server + layer4Servers map[string]*layer4.Server + config *Config + loadPems []caddytls.CertKeyPEMPair } // Config generates a JSON config for use with a Caddy server. func (i *Input) Config() ([]byte, error) { i.httpServers = map[string]*caddyhttp.Server{} - // i.layer4Servers = map[string]*layer4.Server{} + i.layer4Servers = map[string]*layer4.Server{} i.config = &Config{ Admin: &caddyv2.AdminConfig{Listen: ":2019"}, Apps: &Apps{}, @@ -104,11 +104,11 @@ func (i *Input) Config() ([]byte, error) { GracePeriod: caddyv2.Duration(15 * time.Second), } } - //if len(i.layer4Servers) > 0 { - // i.config.Apps.Layer4 = &layer4.App{ - // Servers: i.layer4Servers, - // } - //} + if len(i.layer4Servers) > 0 { + i.config.Apps.Layer4 = &layer4.App{ + Servers: i.layer4Servers, + } + } if len(i.loadPems) > 0 { i.config.Apps.TLS = &caddytls.TLS{ Certificates: &caddytls.Certificates{ @@ -123,32 +123,26 @@ func (i *Input) Config() ([]byte, error) { func (i *Input) handleListener(l gatewayv1.Listener) error { switch l.Protocol { case gatewayv1.HTTPProtocolType: - break + return i.handleHTTPListener(l) case gatewayv1.HTTPSProtocolType: - break + return i.handleHTTPListener(l) case gatewayv1.TLSProtocolType: - break + // If TLS mode is set to Terminate, treat it as an HTTP server. + if l.TLS == nil || l.TLS.Mode == nil || *l.TLS.Mode == gatewayv1.TLSModeTerminate { + return i.handleHTTPListener(l) + } + // Otherwise we need TLS passthrough, which is more complicated. + return i.handleLayer4Listener(l) case gatewayv1.TCPProtocolType: - // TODO: implement - return nil + return i.handleLayer4Listener(l) case gatewayv1.UDPProtocolType: - // TODO: implement - return nil + return i.handleLayer4Listener(l) default: return nil } +} - // Defaults to Terminate which is fine, we do need to handle Passthrough - // differently. - if l.TLS != nil && l.TLS.Mode != nil && *l.TLS.Mode == gatewayv1.TLSModePassthrough { - //server, err := i.getTLSServer(l) - //if err != nil { - // return err - //} - //i.layer4Servers[string(l.Name)] = server - return nil - } - +func (i *Input) handleHTTPListener(l gatewayv1.Listener) error { key := strconv.Itoa(int(l.Port)) s, ok := i.httpServers[key] if !ok { @@ -176,8 +170,6 @@ func (i *Input) handleListener(l gatewayv1.Listener) error { Body: "{http.error.status_code} {http.error.status_text}\n\n{http.error.message}\n", Headers: http.Header{ "Caddy-Instance": {"{system.hostname}"}, - // TODO: remove - // "Trace-ID": {"{http.vars.trace_id}"}, }, }, }, @@ -195,6 +187,42 @@ func (i *Input) handleListener(l gatewayv1.Listener) error { return nil } +func (i *Input) handleLayer4Listener(l gatewayv1.Listener) error { + proto := "tcp" + if l.Protocol == gatewayv1.UDPProtocolType { + proto = "udp" + } + key := proto + "/" + strconv.Itoa(int(l.Port)) + s, ok := i.layer4Servers[key] + if !ok { + s = &layer4.Server{ + Listen: []string{proto + "/" + ":" + strconv.Itoa(int(l.Port))}, + } + } + + var ( + server *layer4.Server + err error + ) + switch l.Protocol { + case gatewayv1.TLSProtocolType: + // TODO: implement + // This TLS protocol is for passthrough, not terminate. + break + case gatewayv1.TCPProtocolType: + server, err = i.getTCPServer(s, l) + case gatewayv1.UDPProtocolType: + server, err = i.getUDPServer(s, l) + default: + return nil + } + if err != nil { + return err + } + i.layer4Servers[key] = server + return nil +} + func isRouteForListener(gw *gatewayv1.Gateway, l gatewayv1.Listener, rNS string, rs gatewayv1.RouteStatus) bool { for _, p := range rs.Parents { if !gateway.MatchesControllerName(p.ControllerName) { diff --git a/internal/caddy/tcp.go b/internal/caddy/tcp.go new file mode 100644 index 0000000..6ca28b3 --- /dev/null +++ b/internal/caddy/tcp.go @@ -0,0 +1,86 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright (c) 2024 Matthew Penner + +package caddy + +import ( + "net" + "strconv" + + gateway "github.com/caddyserver/gateway/internal" + "github.com/caddyserver/gateway/internal/layer4" + "github.com/caddyserver/gateway/internal/layer4/l4proxy" + corev1 "k8s.io/api/core/v1" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" +) + +func (i *Input) getTCPServer(s *layer4.Server, l gatewayv1.Listener) (*layer4.Server, error) { + routes := []*layer4.Route{} + for _, tr := range i.TCPRoutes { + if !isRouteForListener(i.Gateway, l, tr.Namespace, tr.Status.RouteStatus) { + continue + } + + handlers := []layer4.Handler{} + for _, rule := range tr.Spec.Rules { + // We only support a single backend ref as we don't support weights for layer4 proxy. + if len(rule.BackendRefs) != 1 { + continue + } + + bf := rule.BackendRefs[0] + bor := bf.BackendObjectReference + if !gateway.IsService(bor) { + continue + } + + // Safeguard against nil-pointer dereference. + if bor.Port == nil { + continue + } + + // Get the service. + // + // TODO: is there a more efficient way to do this? + // We currently list all services and forward them to the input, + // then iterate over them. + // + // Should we just use the Kubernetes client instead? + var service corev1.Service + for _, s := range i.Services { + if s.Namespace != gateway.NamespaceDerefOr(bor.Namespace, tr.Namespace) { + continue + } + if s.Name != string(bor.Name) { + continue + } + service = s + break + } + if service.Name == "" { + // Invalid service reference. + continue + } + + handlers = append(handlers, &l4proxy.Handler{ + Upstreams: l4proxy.UpstreamPool{ + &l4proxy.Upstream{ + Dial: []string{net.JoinHostPort(service.Spec.ClusterIP, strconv.Itoa(int(*bor.Port)))}, + }, + }, + // HealthChecks: &l4proxy.HealthChecks{}, + // LoadBalancing: &l4proxy.LoadBalancing{}, + // ProxyProtocol: "", + }) + } + + // Add the route. + routes = append(routes, &layer4.Route{ + Handlers: handlers, + }) + } + + // Update the routes on the server. + s.Routes = append(s.Routes, routes...) + return s, nil +} diff --git a/internal/caddy/udp.go b/internal/caddy/udp.go new file mode 100644 index 0000000..a18c71c --- /dev/null +++ b/internal/caddy/udp.go @@ -0,0 +1,86 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright (c) 2024 Matthew Penner + +package caddy + +import ( + "net" + "strconv" + + gateway "github.com/caddyserver/gateway/internal" + "github.com/caddyserver/gateway/internal/layer4" + "github.com/caddyserver/gateway/internal/layer4/l4proxy" + corev1 "k8s.io/api/core/v1" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" +) + +func (i *Input) getUDPServer(s *layer4.Server, l gatewayv1.Listener) (*layer4.Server, error) { + routes := []*layer4.Route{} + for _, tr := range i.UDPRoutes { + if !isRouteForListener(i.Gateway, l, tr.Namespace, tr.Status.RouteStatus) { + continue + } + + handlers := []layer4.Handler{} + for _, rule := range tr.Spec.Rules { + // We only support a single backend ref as we don't support weights for layer4 proxy. + if len(rule.BackendRefs) != 1 { + continue + } + + bf := rule.BackendRefs[0] + bor := bf.BackendObjectReference + if !gateway.IsService(bor) { + continue + } + + // Safeguard against nil-pointer dereference. + if bor.Port == nil { + continue + } + + // Get the service. + // + // TODO: is there a more efficient way to do this? + // We currently list all services and forward them to the input, + // then iterate over them. + // + // Should we just use the Kubernetes client instead? + var service corev1.Service + for _, s := range i.Services { + if s.Namespace != gateway.NamespaceDerefOr(bor.Namespace, tr.Namespace) { + continue + } + if s.Name != string(bor.Name) { + continue + } + service = s + break + } + if service.Name == "" { + // Invalid service reference. + continue + } + + handlers = append(handlers, &l4proxy.Handler{ + Upstreams: l4proxy.UpstreamPool{ + &l4proxy.Upstream{ + Dial: []string{net.JoinHostPort(service.Spec.ClusterIP, strconv.Itoa(int(*bor.Port)))}, + }, + }, + // HealthChecks: &l4proxy.HealthChecks{}, + // LoadBalancing: &l4proxy.LoadBalancing{}, + // ProxyProtocol: "", + }) + } + + // Add the route. + routes = append(routes, &layer4.Route{ + Handlers: handlers, + }) + } + + // Update the routes on the server. + s.Routes = append(s.Routes, routes...) + return s, nil +} diff --git a/internal/controller/route_tcp.go b/internal/controller/route_tcp.go index 41295fe..af73b53 100644 --- a/internal/controller/route_tcp.go +++ b/internal/controller/route_tcp.go @@ -5,14 +5,29 @@ package controller import ( "context" + "fmt" + gateway "github.com/caddyserver/gateway/internal" + "github.com/caddyserver/gateway/internal/routechecks" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" ) // +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=tcproutes,verbs=get;list;watch @@ -29,16 +44,241 @@ var _ reconcile.Reconciler = (*TCPRouteReconciler)(nil) // SetupWithManager sets up the controller with the Manager. func (r *TCPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { + ctx := context.Background() + + if err := mgr.GetFieldIndexer().IndexField(ctx, &gatewayv1alpha2.TCPRoute{}, backendServiceIndex, func(o client.Object) []string { + route, ok := o.(*gatewayv1alpha2.TCPRoute) + if !ok { + return nil + } + var backendServices []string + for _, rule := range route.Spec.Rules { + for _, backend := range rule.BackendRefs { + backendServiceName, err := gateway.GetBackendServiceName(backend.BackendObjectReference) + if err != nil { + mgr.GetLogger().WithValues( + "controller", "tcp-route", + "resource", client.ObjectKeyFromObject(o), + ).Error(err, "Failed to get backend service name") + continue + } + + backendServices = append(backendServices, types.NamespacedName{ + Namespace: gateway.NamespaceDerefOr(backend.Namespace, route.Namespace), + Name: backendServiceName, + }.String()) + } + } + return backendServices + }); err != nil { + return err + } + + if err := mgr.GetFieldIndexer().IndexField(ctx, &gatewayv1alpha2.TCPRoute{}, gatewayIndex, func(o client.Object) []string { + hr := o.(*gatewayv1alpha2.TCPRoute) + var gateways []string + for _, parent := range hr.Spec.ParentRefs { + if !gateway.IsGateway(parent) { + continue + } + gateways = append(gateways, types.NamespacedName{ + Namespace: gateway.NamespaceDerefOr(parent.Namespace, hr.Namespace), + Name: string(parent.Name), + }.String()) + } + return gateways + }); err != nil { + return err + } + return ctrl.NewControllerManagedBy(mgr). For(&gatewayv1alpha2.TCPRoute{}). + Watches(&corev1.Service{}, r.enqueueRequestForBackendService()). + Watches(&gatewayv1beta1.ReferenceGrant{}, r.enqueueRequestForReferenceGrant()). + Watches( + &gatewayv1.Gateway{}, + r.enqueueRequestForGateway(), + builder.WithPredicates(predicate.NewPredicateFuncs(r.hasMatchingController(ctx))), + ). Complete(r) } func (r *TCPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := log.FromContext(ctx) - _ = log - // TODO: implement + original := &gatewayv1alpha2.TCPRoute{} + if err := r.Client.Get(ctx, req.NamespacedName, original); err != nil { + if apierrors.IsNotFound(err) { + return ctrl.Result{}, nil + } + log.Error(err, "Unable to get TCPRoute") + return ctrl.Result{}, err + } + + // Check if the TCPRoute is being deleted. + if original.GetDeletionTimestamp() != nil { + return ctrl.Result{}, nil + } + + route := original.DeepCopy() + + grants := &gatewayv1beta1.ReferenceGrantList{} + if err := r.Client.List(ctx, grants); err != nil { + return r.handleReconcileErrorWithStatus(ctx, fmt.Errorf("failed to retrieve reference grants: %w", err), original, route) + } + + // input for the validators + i := &routechecks.TCPRouteInput{ + Ctx: ctx, + Client: r.Client, + Grants: grants, + TCPRoute: route, + } + + // gateway validators + for _, parent := range route.Spec.ParentRefs { + // set acceptance to okay, this wil be overwritten in checks if needed + i.SetParentCondition(parent, metav1.Condition{ + Type: string(gatewayv1.RouteConditionAccepted), + Status: metav1.ConditionTrue, + Reason: string(gatewayv1.RouteReasonAccepted), + Message: "Accepted TCPRoute", + }) + + // set status to okay, this wil be overwritten in checks if needed + i.SetAllParentCondition(metav1.Condition{ + Type: string(gatewayv1.RouteConditionResolvedRefs), + Status: metav1.ConditionTrue, + Reason: string(gatewayv1.RouteReasonResolvedRefs), + Message: "Service reference is valid", + }) + // run the actual validators + for _, fn := range []routechecks.CheckGatewayFunc{ + routechecks.CheckGatewayAllowedForNamespace, + routechecks.CheckGatewayRouteKindAllowed, + routechecks.CheckGatewayMatchingPorts, + routechecks.CheckGatewayMatchingSection, + } { + continueCheck, err := fn(i, parent) + if err != nil { + return r.handleReconcileErrorWithStatus(ctx, fmt.Errorf("failed to apply Gateway check: %w", err), original, route) + } + + if !continueCheck { + break + } + } + } + + for _, fn := range []routechecks.CheckRuleFunc{ + routechecks.CheckAgainstCrossNamespaceBackendReferences, + routechecks.CheckBackend, + routechecks.CheckBackendIsExistingService, + } { + continueCheck, err := fn(i) + if err != nil { + return r.handleReconcileErrorWithStatus(ctx, fmt.Errorf("failed to apply Backend check: %w", err), original, route) + } + + if !continueCheck { + break + } + } + + if err := r.updateStatus(ctx, original, route); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to update TCPRoute status: %w", err) + } + + log.Info("Reconciled TCPRoute") return ctrl.Result{}, nil } + +// enqueueRequestForBackendService . +// TODO: document +func (r *TCPRouteReconciler) enqueueRequestForBackendService() handler.EventHandler { + return handler.EnqueueRequestsFromMapFunc(r.enqueueFromIndex(backendServiceIndex)) +} + +// enqueueRequestForGateway . +// TODO: document +func (r *TCPRouteReconciler) enqueueRequestForGateway() handler.EventHandler { + return handler.EnqueueRequestsFromMapFunc(r.enqueueFromIndex(gatewayIndex)) +} + +// enqueueRequestForReferenceGrant . +// TODO: document +func (r *TCPRouteReconciler) enqueueRequestForReferenceGrant() handler.EventHandler { + return handler.EnqueueRequestsFromMapFunc(r.enqueueAll()) +} + +// enqueueFromIndex . +// TODO: document +func (r *TCPRouteReconciler) enqueueFromIndex(index string) handler.MapFunc { + return func(ctx context.Context, o client.Object) []reconcile.Request { + return r.enqueue(ctx, &client.ListOptions{ + FieldSelector: fields.OneTermEqualSelector(index, client.ObjectKeyFromObject(o).String()), + }) + } +} + +// enqueueAll . +// TODO +func (r *TCPRouteReconciler) enqueueAll() handler.MapFunc { + return func(ctx context.Context, _ client.Object) []reconcile.Request { + return r.enqueue(ctx) + } +} + +// enqueue . +// TODO +func (r *TCPRouteReconciler) enqueue(ctx context.Context, opts ...client.ListOption) []reconcile.Request { + log := log.FromContext(ctx) + + list := &gatewayv1alpha2.TCPRouteList{} + if err := r.Client.List(ctx, list, opts...); err != nil { + log.Error(err, "Failed to get TCPRoute") + return []reconcile.Request{} + } + + requests := make([]reconcile.Request, len(list.Items)) + for i, item := range list.Items { + route := types.NamespacedName{ + Namespace: item.GetNamespace(), + Name: item.GetName(), + } + requests[i] = reconcile.Request{ + NamespacedName: route, + } + log.Info("Enqueued TCPRoute for resource", "route", route) + } + return requests +} + +// hasMatchingController . +// TODO +func (r *TCPRouteReconciler) hasMatchingController(ctx context.Context) func(object client.Object) bool { + return hasMatchingController(ctx, r.Client) +} + +// updateStatus . +// TODO +func (r *TCPRouteReconciler) updateStatus(ctx context.Context, original, new *gatewayv1alpha2.TCPRoute) error { + oldStatus := original.Status.DeepCopy() + newStatus := new.Status.DeepCopy() + + opts := cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime") + if cmp.Equal(oldStatus, newStatus, opts) { + return nil + } + return r.Client.Status().Update(ctx, new) +} + +// handleReconcileErrorWithStatus . +// TODO +func (r *TCPRouteReconciler) handleReconcileErrorWithStatus(ctx context.Context, reconcileErr error, original, modified *gatewayv1alpha2.TCPRoute) (ctrl.Result, error) { + if err := r.updateStatus(ctx, original, modified); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to update TCPRoute status while handling the reconcile error %w: %w", reconcileErr, err) + } + return ctrl.Result{}, reconcileErr +} diff --git a/internal/controller/route_udp.go b/internal/controller/route_udp.go index caa94a2..52ff245 100644 --- a/internal/controller/route_udp.go +++ b/internal/controller/route_udp.go @@ -5,14 +5,29 @@ package controller import ( "context" + "fmt" + gateway "github.com/caddyserver/gateway/internal" + "github.com/caddyserver/gateway/internal/routechecks" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" ) // +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=udproutes,verbs=get;list;watch @@ -29,14 +44,241 @@ var _ reconcile.Reconciler = (*UDPRouteReconciler)(nil) // SetupWithManager sets up the controller with the Manager. func (r *UDPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { + ctx := context.Background() + + if err := mgr.GetFieldIndexer().IndexField(ctx, &gatewayv1alpha2.UDPRoute{}, backendServiceIndex, func(o client.Object) []string { + route, ok := o.(*gatewayv1alpha2.UDPRoute) + if !ok { + return nil + } + var backendServices []string + for _, rule := range route.Spec.Rules { + for _, backend := range rule.BackendRefs { + backendServiceName, err := gateway.GetBackendServiceName(backend.BackendObjectReference) + if err != nil { + mgr.GetLogger().WithValues( + "controller", "udp-route", + "resource", client.ObjectKeyFromObject(o), + ).Error(err, "Failed to get backend service name") + continue + } + + backendServices = append(backendServices, types.NamespacedName{ + Namespace: gateway.NamespaceDerefOr(backend.Namespace, route.Namespace), + Name: backendServiceName, + }.String()) + } + } + return backendServices + }); err != nil { + return err + } + + if err := mgr.GetFieldIndexer().IndexField(ctx, &gatewayv1alpha2.UDPRoute{}, gatewayIndex, func(o client.Object) []string { + hr := o.(*gatewayv1alpha2.UDPRoute) + var gateways []string + for _, parent := range hr.Spec.ParentRefs { + if !gateway.IsGateway(parent) { + continue + } + gateways = append(gateways, types.NamespacedName{ + Namespace: gateway.NamespaceDerefOr(parent.Namespace, hr.Namespace), + Name: string(parent.Name), + }.String()) + } + return gateways + }); err != nil { + return err + } + return ctrl.NewControllerManagedBy(mgr). For(&gatewayv1alpha2.UDPRoute{}). + Watches(&corev1.Service{}, r.enqueueRequestForBackendService()). + Watches(&gatewayv1beta1.ReferenceGrant{}, r.enqueueRequestForReferenceGrant()). + Watches( + &gatewayv1.Gateway{}, + r.enqueueRequestForGateway(), + builder.WithPredicates(predicate.NewPredicateFuncs(r.hasMatchingController(ctx))), + ). Complete(r) } func (r *UDPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := log.FromContext(ctx) - _ = log + original := &gatewayv1alpha2.UDPRoute{} + if err := r.Client.Get(ctx, req.NamespacedName, original); err != nil { + if apierrors.IsNotFound(err) { + return ctrl.Result{}, nil + } + log.Error(err, "Unable to get UDPRoute") + return ctrl.Result{}, err + } + + // Check if the UDPRoute is being deleted. + if original.GetDeletionTimestamp() != nil { + return ctrl.Result{}, nil + } + + route := original.DeepCopy() + + grants := &gatewayv1beta1.ReferenceGrantList{} + if err := r.Client.List(ctx, grants); err != nil { + return r.handleReconcileErrorWithStatus(ctx, fmt.Errorf("failed to retrieve reference grants: %w", err), original, route) + } + + // input for the validators + i := &routechecks.UDPRouteInput{ + Ctx: ctx, + Client: r.Client, + Grants: grants, + UDPRoute: route, + } + + // gateway validators + for _, parent := range route.Spec.ParentRefs { + // set acceptance to okay, this wil be overwritten in checks if needed + i.SetParentCondition(parent, metav1.Condition{ + Type: string(gatewayv1.RouteConditionAccepted), + Status: metav1.ConditionTrue, + Reason: string(gatewayv1.RouteReasonAccepted), + Message: "Accepted UDPRoute", + }) + + // set status to okay, this wil be overwritten in checks if needed + i.SetAllParentCondition(metav1.Condition{ + Type: string(gatewayv1.RouteConditionResolvedRefs), + Status: metav1.ConditionTrue, + Reason: string(gatewayv1.RouteReasonResolvedRefs), + Message: "Service reference is valid", + }) + + // run the actual validators + for _, fn := range []routechecks.CheckGatewayFunc{ + routechecks.CheckGatewayAllowedForNamespace, + routechecks.CheckGatewayRouteKindAllowed, + routechecks.CheckGatewayMatchingPorts, + routechecks.CheckGatewayMatchingSection, + } { + continueCheck, err := fn(i, parent) + if err != nil { + return r.handleReconcileErrorWithStatus(ctx, fmt.Errorf("failed to apply Gateway check: %w", err), original, route) + } + + if !continueCheck { + break + } + } + } + + for _, fn := range []routechecks.CheckRuleFunc{ + routechecks.CheckAgainstCrossNamespaceBackendReferences, + routechecks.CheckBackend, + routechecks.CheckBackendIsExistingService, + } { + continueCheck, err := fn(i) + if err != nil { + return r.handleReconcileErrorWithStatus(ctx, fmt.Errorf("failed to apply Backend check: %w", err), original, route) + } + + if !continueCheck { + break + } + } + + if err := r.updateStatus(ctx, original, route); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to update UDPRoute status: %w", err) + } + + log.Info("Reconciled UDPRoute") return ctrl.Result{}, nil } + +// enqueueRequestForBackendService . +// TODO: document +func (r *UDPRouteReconciler) enqueueRequestForBackendService() handler.EventHandler { + return handler.EnqueueRequestsFromMapFunc(r.enqueueFromIndex(backendServiceIndex)) +} + +// enqueueRequestForGateway . +// TODO: document +func (r *UDPRouteReconciler) enqueueRequestForGateway() handler.EventHandler { + return handler.EnqueueRequestsFromMapFunc(r.enqueueFromIndex(gatewayIndex)) +} + +// enqueueRequestForReferenceGrant . +// TODO: document +func (r *UDPRouteReconciler) enqueueRequestForReferenceGrant() handler.EventHandler { + return handler.EnqueueRequestsFromMapFunc(r.enqueueAll()) +} + +// enqueueFromIndex . +// TODO: document +func (r *UDPRouteReconciler) enqueueFromIndex(index string) handler.MapFunc { + return func(ctx context.Context, o client.Object) []reconcile.Request { + return r.enqueue(ctx, &client.ListOptions{ + FieldSelector: fields.OneTermEqualSelector(index, client.ObjectKeyFromObject(o).String()), + }) + } +} + +// enqueueAll . +// TODO +func (r *UDPRouteReconciler) enqueueAll() handler.MapFunc { + return func(ctx context.Context, _ client.Object) []reconcile.Request { + return r.enqueue(ctx) + } +} + +// enqueue . +// TODO +func (r *UDPRouteReconciler) enqueue(ctx context.Context, opts ...client.ListOption) []reconcile.Request { + log := log.FromContext(ctx) + + list := &gatewayv1alpha2.UDPRouteList{} + if err := r.Client.List(ctx, list, opts...); err != nil { + log.Error(err, "Failed to get UDPRoute") + return []reconcile.Request{} + } + + requests := make([]reconcile.Request, len(list.Items)) + for i, item := range list.Items { + route := types.NamespacedName{ + Namespace: item.GetNamespace(), + Name: item.GetName(), + } + requests[i] = reconcile.Request{ + NamespacedName: route, + } + log.Info("Enqueued UDPRoute for resource", "route", route) + } + return requests +} + +// hasMatchingController . +// TODO +func (r *UDPRouteReconciler) hasMatchingController(ctx context.Context) func(object client.Object) bool { + return hasMatchingController(ctx, r.Client) +} + +// updateStatus . +// TODO +func (r *UDPRouteReconciler) updateStatus(ctx context.Context, original, new *gatewayv1alpha2.UDPRoute) error { + oldStatus := original.Status.DeepCopy() + newStatus := new.Status.DeepCopy() + + opts := cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime") + if cmp.Equal(oldStatus, newStatus, opts) { + return nil + } + return r.Client.Status().Update(ctx, new) +} + +// handleReconcileErrorWithStatus . +// TODO +func (r *UDPRouteReconciler) handleReconcileErrorWithStatus(ctx context.Context, reconcileErr error, original, modified *gatewayv1alpha2.UDPRoute) (ctrl.Result, error) { + if err := r.updateStatus(ctx, original, modified); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to update UDPRoute status while handling the reconcile error %w: %w", reconcileErr, err) + } + return ctrl.Result{}, reconcileErr +} diff --git a/internal/gateway.go b/internal/gateway.go index c7a8ea0..72f1a6e 100644 --- a/internal/gateway.go +++ b/internal/gateway.go @@ -45,6 +45,11 @@ func IsService(be gatewayv1.BackendObjectReference) bool { return (be.Group == nil || *be.Group == corev1.GroupName) && (be.Kind == nil || *be.Kind == "Service") } +// // IsPolicyTargetService checks if the given PolicyTargetReference references a Service resource. +// func IsPolicyTargetService(be gatewayv1alpha2.PolicyTargetReference) bool { +// return be.Group == corev1.GroupName && be.Kind == "Service" +// } + // IsPolicyTargetService checks if the given PolicyTargetReference references a Service resource. func IsLocalPolicyTargetService(be gatewayv1alpha2.LocalPolicyTargetReference) bool { return be.Group == corev1.GroupName && be.Kind == "Service" diff --git a/internal/layer4/handler.go b/internal/layer4/handler.go new file mode 100644 index 0000000..ac3d161 --- /dev/null +++ b/internal/layer4/handler.go @@ -0,0 +1,8 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright (c) 2024 Matthew Penner + +package layer4 + +type Handler interface { + IAmAHandler() +} diff --git a/internal/layer4/l4proxy/proxy.go b/internal/layer4/l4proxy/proxy.go new file mode 100644 index 0000000..1decbac --- /dev/null +++ b/internal/layer4/l4proxy/proxy.go @@ -0,0 +1,127 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright (c) 2024 Matthew Penner + +package l4proxy + +import ( + caddy "github.com/caddyserver/gateway/internal/caddyv2" + "github.com/caddyserver/gateway/internal/caddyv2/caddyhttp/reverseproxy" +) + +type HandlerName string + +func (HandlerName) MarshalJSON() ([]byte, error) { + return []byte(`"proxy"`), nil +} + +// Handler is a handler that can proxy connections. +type Handler struct { + // Handler is the name of this handler for the JSON config. + // DO NOT USE this. This is a special value to represent this handler. + // It will be overwritten when we are marshalled. + Handler HandlerName `json:"handler"` + + // Upstreams is the list of backends to proxy to. + Upstreams UpstreamPool `json:"upstreams,omitempty"` + + // Health checks update the status of backends, whether they are + // up or down. Down backends will not be proxied to. + HealthChecks *HealthChecks `json:"health_checks,omitempty"` + + // Load balancing distributes load/connections between backends. + LoadBalancing *LoadBalancing `json:"load_balancing,omitempty"` + + // Specifies the version of the Proxy Protocol header to add, either "v1" or "v2". + // Ref: https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt + ProxyProtocol string `json:"proxy_protocol,omitempty"` +} + +func (Handler) IAmAHandler() {} + +// UpstreamPool is a collection of upstreams. +type UpstreamPool []*Upstream + +// Upstream represents a proxy upstream. +type Upstream struct { + // The network addresses to dial. Supports placeholders, but not port + // ranges currently (each address must be exactly 1 socket). + Dial []string `json:"dial,omitempty"` + + // Set this field to enable TLS to the upstream. + TLS *reverseproxy.TLSConfig `json:"tls,omitempty"` + + // How many connections this upstream is allowed to + // have before being marked as unhealthy (if > 0). + MaxConnections int `json:"max_connections,omitempty"` +} + +// HealthChecks configures active and passive health checks. +type HealthChecks struct { + // Active health checks run in the background on a timer. To + // minimally enable active health checks, set either path or + // port (or both). + Active *ActiveHealthChecks `json:"active,omitempty"` + + // Passive health checks monitor proxied connections for errors or timeouts. + // To minimally enable passive health checks, specify at least an empty + // config object. + Passive *PassiveHealthChecks `json:"passive,omitempty"` +} + +// ActiveHealthChecks holds configuration related to active health +// checks (that is, health checks which occur independently in a +// background goroutine). +type ActiveHealthChecks struct { + // The port to use (if different from the upstream's dial + // address) for health checks. + Port int `json:"port,omitempty"` + + // How frequently to perform active health checks (default 30s). + Interval caddy.Duration `json:"interval,omitempty"` + + // How long to wait for a connection to be established with + // peer before considering it unhealthy (default 5s). + Timeout caddy.Duration `json:"timeout,omitempty"` +} + +// PassiveHealthChecks holds configuration related to passive +// health checks (that is, health checks which occur during +// the normal flow of connection proxying). +type PassiveHealthChecks struct { + // How long to remember a failed connection to a backend. A + // duration > 0 enables passive health checking. Default 0. + FailDuration caddy.Duration `json:"fail_duration,omitempty"` + + // The number of failed connections within the FailDuration window to + // consider a backend as "down". Must be >= 1; default is 1. Requires + // that FailDuration be > 0. + MaxFails int `json:"max_fails,omitempty"` + + // Limits the number of simultaneous connections to a backend by + // marking the backend as "down" if it has this many or more + // concurrent connections. + UnhealthyConnectionCount int `json:"unhealthy_connection_count,omitempty"` +} + +// LoadBalancing has parameters related to load balancing. +type LoadBalancing struct { + // A selection policy is how to choose an available backend. + // The default policy is random selection. + // TODO: implement + SelectionPolicy any `json:"selection,omitempty"` + // SelectionPolicyRaw json.RawMessage `json:"selection,omitempty" caddy:"namespace=layer4.proxy.selection_policies inline_key=policy"` + + // How long to try selecting available backends for each connection + // if the next available host is down. By default, this retry is + // disabled. Clients will wait for up to this long while the load + // balancer tries to find an available upstream host. + TryDuration caddy.Duration `json:"try_duration,omitempty"` + + // How long to wait between selecting the next host from the pool. Default + // is 250ms. Only relevant when a connection to an upstream host fails. Be + // aware that setting this to 0 with a non-zero try_duration can cause the + // CPU to spin if all backends are down and latency is very low. + TryInterval caddy.Duration `json:"try_interval,omitempty"` + + // SelectionPolicy Selector `json:"-"` +} diff --git a/internal/layer4/layer4.go b/internal/layer4/layer4.go new file mode 100644 index 0000000..de5d173 --- /dev/null +++ b/internal/layer4/layer4.go @@ -0,0 +1,12 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright (c) 2024 Matthew Penner + +package layer4 + +// App is a Caddy app that operates closest to layer 4 of the OSI model. +type App struct { + // Servers are the servers to create. The key of each server must be + // a unique name identifying the server for your own convenience; + // the order of servers does not matter. + Servers map[string]*Server `json:"servers,omitempty"` +} diff --git a/internal/layer4/routes.go b/internal/layer4/routes.go new file mode 100644 index 0000000..9e83382 --- /dev/null +++ b/internal/layer4/routes.go @@ -0,0 +1,31 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright (c) 2024 Matthew Penner + +package layer4 + +// Route represents a collection of handlers that are gated by +// matching logic. A route is invoked if its matchers match +// the byte stream. In an equivalent "if...then" statement, +// matchers are like the "if" clause and handlers are the "then" +// clause: if the matchers match, then the handlers will be +// executed. +type Route struct { + // Matchers define the conditions upon which to execute the handlers. + // All matchers within the same set must match, and at least one set + // must match; in other words, matchers are AND'ed together within a + // set, but multiple sets are OR'ed together. No matchers matches all. + MatcherSets []any `json:"match,omitempty"` + // MatcherSetsRaw []caddy.ModuleMap `json:"match,omitempty" caddy:"namespace=layer4.matchers"` + + // Handlers define the behavior for handling the stream. They are + // executed in sequential order if the route's matchers match. + Handlers []Handler `json:"handle,omitempty"` + // HandlersRaw []json.RawMessage `json:"handle,omitempty" caddy:"namespace=layer4.handlers inline_key=handler"` +} + +// RouteList is a list of connection routes that can create +// a middleware chain. Routes are evaluated in sequential +// order: for the first route, the matchers will be evaluated, +// and if matched, the handlers invoked; and so on for the +// second route, etc. +type RouteList []*Route diff --git a/internal/layer4/server.go b/internal/layer4/server.go new file mode 100644 index 0000000..abd7075 --- /dev/null +++ b/internal/layer4/server.go @@ -0,0 +1,22 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright (c) 2024 Matthew Penner + +package layer4 + +import ( + caddy "github.com/caddyserver/gateway/internal/caddyv2" +) + +// Server represents a Caddy layer4 server. +type Server struct { + // The network address to bind to. Any Caddy network address + // is an acceptable value: + // https://caddyserver.com/docs/conventions#network-addresses + Listen []string `json:"listen,omitempty"` + + // Routes express composable logic for handling byte streams. + Routes RouteList `json:"routes,omitempty"` + + // Maximum time connections have to complete the matching phase (the first terminal handler is matched). Default: 3s. + MatchingTimeout caddy.Duration `json:"matching_timeout,omitempty"` +} diff --git a/internal/routechecks/route_tcp.go b/internal/routechecks/route_tcp.go new file mode 100644 index 0000000..c0acd8e --- /dev/null +++ b/internal/routechecks/route_tcp.go @@ -0,0 +1,139 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright (c) 2024 Matthew Penner + +package routechecks + +import ( + "context" + "fmt" + "reflect" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/client" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" + + gateway "github.com/caddyserver/gateway/internal" +) + +type TCPRouteInput struct { + Ctx context.Context + Client client.Client + Grants *gatewayv1beta1.ReferenceGrantList + TCPRoute *gatewayv1alpha2.TCPRoute + + gateways map[gatewayv1.ParentReference]*gatewayv1.Gateway +} + +func (h *TCPRouteInput) SetParentCondition(ref gatewayv1.ParentReference, condition metav1.Condition) { + // fill in the condition + condition.LastTransitionTime = metav1.NewTime(time.Now()) + condition.ObservedGeneration = h.TCPRoute.GetGeneration() + + h.mergeStatusConditions(ref, []metav1.Condition{ + condition, + }) +} + +func (h *TCPRouteInput) SetAllParentCondition(condition metav1.Condition) { + // fill in the condition + condition.LastTransitionTime = metav1.NewTime(time.Now()) + condition.ObservedGeneration = h.TCPRoute.GetGeneration() + + for _, parent := range h.TCPRoute.Spec.ParentRefs { + h.mergeStatusConditions(parent, []metav1.Condition{ + condition, + }) + } +} + +func (h *TCPRouteInput) mergeStatusConditions(parentRef gatewayv1.ParentReference, updates []metav1.Condition) { + index := -1 + for i, parent := range h.TCPRoute.Status.RouteStatus.Parents { + if reflect.DeepEqual(parent.ParentRef, parentRef) { + index = i + break + } + } + if index != -1 { + h.TCPRoute.Status.RouteStatus.Parents[index].Conditions = merge(h.TCPRoute.Status.RouteStatus.Parents[index].Conditions, updates...) + return + } + h.TCPRoute.Status.RouteStatus.Parents = append(h.TCPRoute.Status.RouteStatus.Parents, gatewayv1.RouteParentStatus{ + ParentRef: parentRef, + ControllerName: gateway.ControllerName, + Conditions: updates, + }) +} + +func (h *TCPRouteInput) GetGrants() []gatewayv1beta1.ReferenceGrant { + return h.Grants.Items +} + +func (h *TCPRouteInput) GetNamespace() string { + return h.TCPRoute.GetNamespace() +} + +func (h *TCPRouteInput) GetGVK() schema.GroupVersionKind { + return gatewayv1.SchemeGroupVersion.WithKind("TCPRoute") +} + +func (h *TCPRouteInput) GetRules() []GenericRule { + var rules []GenericRule + for _, rule := range h.TCPRoute.Spec.Rules { + rules = append(rules, &TCPRouteRule{rule}) + } + return rules +} + +func (h *TCPRouteInput) GetClient() client.Client { + return h.Client +} + +func (h *TCPRouteInput) GetContext() context.Context { + return h.Ctx +} + +func (h *TCPRouteInput) GetHostnames() []gatewayv1.Hostname { + return nil +} + +func (h *TCPRouteInput) GetGateway(parent gatewayv1.ParentReference) (*gatewayv1.Gateway, error) { + if h.gateways == nil { + h.gateways = make(map[gatewayv1.ParentReference]*gatewayv1.Gateway) + } + + if gw, exists := h.gateways[parent]; exists { + return gw, nil + } + + ns := gateway.NamespaceDerefOr(parent.Namespace, h.GetNamespace()) + gw := &gatewayv1.Gateway{} + + if err := h.Client.Get(h.Ctx, client.ObjectKey{Namespace: ns, Name: string(parent.Name)}, gw); err != nil { + if !apierrors.IsNotFound(err) { + // if it is not just a not found error, we should return the error as something is bad + return nil, fmt.Errorf("error while getting gateway: %w", err) + } + + // Gateway does not exist skip further checks + return nil, fmt.Errorf("gateway %q does not exist: %w", parent.Name, err) + } + + h.gateways[parent] = gw + + return gw, nil +} + +// TCPRouteRule is used to implement the GenericRule interface for TLSRoute +type TCPRouteRule struct { + Rule gatewayv1alpha2.TCPRouteRule +} + +func (t *TCPRouteRule) GetBackendRefs() []gatewayv1.BackendRef { + return t.Rule.BackendRefs +} diff --git a/internal/routechecks/route_udp.go b/internal/routechecks/route_udp.go new file mode 100644 index 0000000..977ca8f --- /dev/null +++ b/internal/routechecks/route_udp.go @@ -0,0 +1,139 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright (c) 2024 Matthew Penner + +package routechecks + +import ( + "context" + "fmt" + "reflect" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/client" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" + + gateway "github.com/caddyserver/gateway/internal" +) + +type UDPRouteInput struct { + Ctx context.Context + Client client.Client + Grants *gatewayv1beta1.ReferenceGrantList + UDPRoute *gatewayv1alpha2.UDPRoute + + gateways map[gatewayv1.ParentReference]*gatewayv1.Gateway +} + +func (h *UDPRouteInput) SetParentCondition(ref gatewayv1.ParentReference, condition metav1.Condition) { + // fill in the condition + condition.LastTransitionTime = metav1.NewTime(time.Now()) + condition.ObservedGeneration = h.UDPRoute.GetGeneration() + + h.mergeStatusConditions(ref, []metav1.Condition{ + condition, + }) +} + +func (h *UDPRouteInput) SetAllParentCondition(condition metav1.Condition) { + // fill in the condition + condition.LastTransitionTime = metav1.NewTime(time.Now()) + condition.ObservedGeneration = h.UDPRoute.GetGeneration() + + for _, parent := range h.UDPRoute.Spec.ParentRefs { + h.mergeStatusConditions(parent, []metav1.Condition{ + condition, + }) + } +} + +func (h *UDPRouteInput) mergeStatusConditions(parentRef gatewayv1.ParentReference, updates []metav1.Condition) { + index := -1 + for i, parent := range h.UDPRoute.Status.RouteStatus.Parents { + if reflect.DeepEqual(parent.ParentRef, parentRef) { + index = i + break + } + } + if index != -1 { + h.UDPRoute.Status.RouteStatus.Parents[index].Conditions = merge(h.UDPRoute.Status.RouteStatus.Parents[index].Conditions, updates...) + return + } + h.UDPRoute.Status.RouteStatus.Parents = append(h.UDPRoute.Status.RouteStatus.Parents, gatewayv1.RouteParentStatus{ + ParentRef: parentRef, + ControllerName: gateway.ControllerName, + Conditions: updates, + }) +} + +func (h *UDPRouteInput) GetGrants() []gatewayv1beta1.ReferenceGrant { + return h.Grants.Items +} + +func (h *UDPRouteInput) GetNamespace() string { + return h.UDPRoute.GetNamespace() +} + +func (h *UDPRouteInput) GetGVK() schema.GroupVersionKind { + return gatewayv1.SchemeGroupVersion.WithKind("UDPRoute") +} + +func (h *UDPRouteInput) GetRules() []GenericRule { + var rules []GenericRule + for _, rule := range h.UDPRoute.Spec.Rules { + rules = append(rules, &UDPRouteRule{rule}) + } + return rules +} + +func (h *UDPRouteInput) GetClient() client.Client { + return h.Client +} + +func (h *UDPRouteInput) GetContext() context.Context { + return h.Ctx +} + +func (h *UDPRouteInput) GetHostnames() []gatewayv1.Hostname { + return nil +} + +func (h *UDPRouteInput) GetGateway(parent gatewayv1.ParentReference) (*gatewayv1.Gateway, error) { + if h.gateways == nil { + h.gateways = make(map[gatewayv1.ParentReference]*gatewayv1.Gateway) + } + + if gw, exists := h.gateways[parent]; exists { + return gw, nil + } + + ns := gateway.NamespaceDerefOr(parent.Namespace, h.GetNamespace()) + gw := &gatewayv1.Gateway{} + + if err := h.Client.Get(h.Ctx, client.ObjectKey{Namespace: ns, Name: string(parent.Name)}, gw); err != nil { + if !apierrors.IsNotFound(err) { + // if it is not just a not found error, we should return the error as something is bad + return nil, fmt.Errorf("error while getting gateway: %w", err) + } + + // Gateway does not exist skip further checks + return nil, fmt.Errorf("gateway %q does not exist: %w", parent.Name, err) + } + + h.gateways[parent] = gw + + return gw, nil +} + +// UDPRouteRule is used to implement the GenericRule interface for TLSRoute +type UDPRouteRule struct { + Rule gatewayv1alpha2.UDPRouteRule +} + +func (t *UDPRouteRule) GetBackendRefs() []gatewayv1.BackendRef { + return t.Rule.BackendRefs +} diff --git a/main.go b/main.go index bfb7234..37950a5 100644 --- a/main.go +++ b/main.go @@ -115,6 +115,7 @@ func main() { if err != nil { setupLog.Error(err, "unable to start manager") os.Exit(1) + return } client := mgr.GetClient() @@ -128,6 +129,7 @@ func main() { }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Gateway") os.Exit(1) + return } if err = (&controller.GatewayClassReconciler{ Client: client, @@ -136,6 +138,7 @@ func main() { }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "GatewayClass") os.Exit(1) + return } //if err = (&controller.GRPCRouteReconciler{ // Client: client, @@ -144,6 +147,7 @@ func main() { //}).SetupWithManager(mgr); err != nil { // setupLog.Error(err, "unable to create controller", "controller", "GRPCRoute") // os.Exit(1) + // return //} if err = (&controller.HTTPRouteReconciler{ Client: client, @@ -152,15 +156,17 @@ func main() { }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "HTTPRoute") os.Exit(1) + return + } + if err = (&controller.TCPRouteReconciler{ + Client: client, + Scheme: scheme, + Recorder: recorder, + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "TCPRoute") + os.Exit(1) + return } - //if err = (&controller.TCPRouteReconciler{ - // Client: client, - // Scheme: scheme, - // Recorder: recorder, - //}).SetupWithManager(mgr); err != nil { - // setupLog.Error(err, "unable to create controller", "controller", "TCPRoute") - // os.Exit(1) - //} //if err = (&controller.TLSRouteReconciler{ // Client: client, // Scheme: scheme, @@ -168,15 +174,17 @@ func main() { //}).SetupWithManager(mgr); err != nil { // setupLog.Error(err, "unable to create controller", "controller", "TLSRoute") // os.Exit(1) + // return //} - //if err = (&controller.UDPRouteReconciler{ - // Client: client, - // Scheme: scheme, - // Recorder: recorder, - //}).SetupWithManager(mgr); err != nil { - // setupLog.Error(err, "unable to create controller", "controller", "UDPRoute") - // os.Exit(1) - //} + if err = (&controller.UDPRouteReconciler{ + Client: client, + Scheme: scheme, + Recorder: recorder, + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "UDPRoute") + os.Exit(1) + return + } //+kubebuilder:scaffold:builder if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {