diff --git a/Makefile b/Makefile index 0307db9c8..b7ea1ba95 100644 --- a/Makefile +++ b/Makefile @@ -24,7 +24,7 @@ ALL_ARCH ?= amd64 arm arm64 ppc64le s390x # The output type could either be docker (local), or registry. OUTPUT_TYPE ?= docker GO_TOOLCHAIN ?= golang -GO_VERSION ?= 1.22.2 +GO_VERSION ?= 1.22.3 BASEIMAGE ?= gcr.io/distroless/static-debian11:nonroot ifeq ($(GOPATH),) @@ -80,6 +80,10 @@ test: test-integration: build go test -mod=vendor -race ./tests -agent-path $(PWD)/bin/proxy-agent +.PHONY: test-e2e +test-e2e: docker-build + go test -mod=vendor ./e2e -agent-image ${AGENT_FULL_IMAGE}-$(TARGETARCH):${TAG} -server-image ${SERVER_FULL_IMAGE}-$(TARGETARCH):${TAG} + ## -------------------------------------- ## Binaries ## -------------------------------------- diff --git a/cmd/agent/app/options/options.go b/cmd/agent/app/options/options.go index 541cb8ad4..d06cbdc78 100644 --- a/cmd/agent/app/options/options.go +++ b/cmd/agent/app/options/options.go @@ -27,6 +27,7 @@ import ( "github.com/google/uuid" "github.com/spf13/pflag" "google.golang.org/grpc" + "k8s.io/apimachinery/pkg/labels" "k8s.io/klog/v2" "sigs.k8s.io/apiserver-network-proxy/pkg/agent" @@ -80,6 +81,16 @@ type GrpcProxyAgentOptions struct { SyncForever bool XfrChannelSize int + + // Providing a label selector enables updating the server count by counting the + // number of valid leases matching the selector. + ServerCountLeaseSelector string + // Initial fallback server count to use if proxy server lease listing fails. + ServerCount uint + // Lease informer resync period. + InformerResync time.Duration + // Path to kubeconfig (used by kubernetes client for lease listing) + KubeconfigPath string } func (o *GrpcProxyAgentOptions) ClientSetConfig(dialOptions ...grpc.DialOption) *agent.ClientSetConfig { @@ -122,6 +133,10 @@ func (o *GrpcProxyAgentOptions) Flags() *pflag.FlagSet { flags.BoolVar(&o.WarnOnChannelLimit, "warn-on-channel-limit", o.WarnOnChannelLimit, "Turns on a warning if the system is going to push to a full channel. The check involves an unsafe read.") flags.BoolVar(&o.SyncForever, "sync-forever", o.SyncForever, "If true, the agent continues syncing, in order to support server count changes.") flags.IntVar(&o.XfrChannelSize, "xfr-channel-size", 150, "Set the size of the channel for transferring data between the agent and the proxy server.") + flags.UintVar(&o.ServerCount, "server-count", o.ServerCount, "Static server count, also used as fallback server count if proxy server lease listing fails.") + flags.StringVar(&o.ServerCountLeaseSelector, "server-count-lease-selector", o.ServerCountLeaseSelector, "Providing a label selector enables updating the server count by counting the number of valid leases matching the selector.") + flags.DurationVar(&o.InformerResync, "informer-resync", o.InformerResync, "Lease informer resync period") + flags.StringVar(&o.KubeconfigPath, "kubeconfig", o.KubeconfigPath, "absolute path to the kubeconfig file (used with agent-namespace, agent-service-account, authentication-audience).") return flags } @@ -198,6 +213,17 @@ func (o *GrpcProxyAgentOptions) Validate() error { if err := validateAgentIdentifiers(o.AgentIdentifiers); err != nil { return fmt.Errorf("agent address is invalid: %v", err) } + if o.ServerCountLeaseSelector != "" { + if _, err := labels.Parse(o.ServerCountLeaseSelector); err != nil { + return fmt.Errorf("invalid server count lease selector: %w", err) + } + } + if o.KubeconfigPath != "" { + if _, err := os.Stat(o.KubeconfigPath); os.IsNotExist(err) { + return fmt.Errorf("error checking KubeconfigPath %q, got %v", o.KubeconfigPath, err) + } + } + return nil } @@ -243,6 +269,10 @@ func NewGrpcProxyAgentOptions() *GrpcProxyAgentOptions { WarnOnChannelLimit: false, SyncForever: false, XfrChannelSize: 150, + ServerCount: 1, + ServerCountLeaseSelector: "", + InformerResync: 10 * time.Second, + KubeconfigPath: "", } return &o } diff --git a/cmd/agent/app/server.go b/cmd/agent/app/server.go index 8c087de53..f52d64512 100644 --- a/cmd/agent/app/server.go +++ b/cmd/agent/app/server.go @@ -38,7 +38,11 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" + "sigs.k8s.io/apiserver-network-proxy/pkg/servercounter" "sigs.k8s.io/apiserver-network-proxy/cmd/agent/app/options" "sigs.k8s.io/apiserver-network-proxy/pkg/agent" @@ -133,7 +137,34 @@ func (a *Agent) runProxyConnection(o *options.GrpcProxyAgentOptions, drainCh, st }), } cc := o.ClientSetConfig(dialOptions...) - cs := cc.NewAgentClientSet(drainCh, stopCh) + + var serverCounter servercounter.ServerCounter + if o.ServerCountLeaseSelector != "" { + var k8sClient *kubernetes.Clientset + config, err := clientcmd.BuildConfigFromFlags("", o.KubeconfigPath) + if err != nil { + return nil, fmt.Errorf("failed to load kubernetes client config: %v", err) + } + k8sClient, err = kubernetes.NewForConfig(config) + if err != nil { + return nil, fmt.Errorf("failed to create kubernetes clientset: %v", err) + } + sharedInformerFactory := informers.NewSharedInformerFactory(k8sClient, o.InformerResync) + serverLeaseCounter, err := servercounter.NewServerLeaseCounter( + sharedInformerFactory.Coordination().V1().Leases().Lister(), + o.ServerCountLeaseSelector, + int(o.ServerCount), + ) + if err != nil { + return nil, fmt.Errorf("failed to create server lease counter: %w", err) + } + serverCounter = serverLeaseCounter + sharedInformerFactory.Start(context.Background().Done()) + } else { + serverCounter = servercounter.StaticServerCounter(o.ServerCount) + } + + cs := cc.NewAgentClientSet(drainCh, stopCh, serverCounter) cs.Serve() return cs, nil diff --git a/cmd/server/app/options/options.go b/cmd/server/app/options/options.go index fd6d468c2..631b9c55b 100644 --- a/cmd/server/app/options/options.go +++ b/cmd/server/app/options/options.go @@ -23,6 +23,7 @@ import ( "github.com/google/uuid" "github.com/spf13/pflag" + "k8s.io/apimachinery/pkg/labels" "k8s.io/klog/v2" "sigs.k8s.io/apiserver-network-proxy/pkg/server" @@ -73,7 +74,13 @@ type ProxyRunOptions struct { // ID of this proxy server. ServerID string // Number of proxy server instances, should be 1 unless it is a HA proxy server. + // Serves as an initial fallback count when server counts are performed using leases. ServerCount uint + // Providing a label selector enables updating the server count by counting the + // number of valid leases matching the selector. + ServerCountLeaseSelector string + // Lease informer resync period. + InformerResync time.Duration // Agent pod's namespace for token-based agent authentication AgentNamespace string // Agent pod's service account for token-based agent authentication @@ -128,7 +135,9 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet { flags.BoolVar(&o.EnableProfiling, "enable-profiling", o.EnableProfiling, "enable pprof at host:admin-port/debug/pprof") flags.BoolVar(&o.EnableContentionProfiling, "enable-contention-profiling", o.EnableContentionProfiling, "enable contention profiling at host:admin-port/debug/pprof/block. \"--enable-profiling\" must also be set.") flags.StringVar(&o.ServerID, "server-id", o.ServerID, "The unique ID of this server. Can also be set by the 'PROXY_SERVER_ID' environment variable.") - flags.UintVar(&o.ServerCount, "server-count", o.ServerCount, "The number of proxy server instances, should be 1 unless it is an HA server.") + flags.UintVar(&o.ServerCount, "server-count", o.ServerCount, "The number of proxy server instances, should be 1 unless it is an HA server. Used as an initial fallback count when lease-based server counting is enabled.") + flags.StringVar(&o.ServerCountLeaseSelector, "server-count-lease-selector", o.ServerCountLeaseSelector, "Providing a label selector enables updating the server count by counting the number of valid leases matching the selector.") + flags.DurationVar(&o.InformerResync, "informer-resync", o.InformerResync, "Lease informer resync period") flags.StringVar(&o.AgentNamespace, "agent-namespace", o.AgentNamespace, "Expected agent's namespace during agent authentication (used with agent-service-account, authentication-audience, kubeconfig).") flags.StringVar(&o.AgentServiceAccount, "agent-service-account", o.AgentServiceAccount, "Expected agent's service account during agent authentication (used with agent-namespace, authentication-audience, kubeconfig).") flags.StringVar(&o.KubeconfigPath, "kubeconfig", o.KubeconfigPath, "absolute path to the kubeconfig file (used with agent-namespace, agent-service-account, authentication-audience).") @@ -314,6 +323,13 @@ func (o *ProxyRunOptions) Validate() error { } } + // validate server count lease selector + if o.ServerCountLeaseSelector != "" { + if _, err := labels.Parse(o.ServerCountLeaseSelector); err != nil { + return fmt.Errorf("invalid server count lease selector: %w", err) + } + } + return nil } @@ -342,6 +358,8 @@ func NewProxyRunOptions() *ProxyRunOptions { EnableContentionProfiling: false, ServerID: defaultServerID(), ServerCount: 1, + ServerCountLeaseSelector: "", + InformerResync: time.Second * 10, AgentNamespace: "", AgentServiceAccount: "", KubeconfigPath: "", diff --git a/cmd/server/app/server.go b/cmd/server/app/server.go index d684ff999..7e357e5ac 100644 --- a/cmd/server/app/server.go +++ b/cmd/server/app/server.go @@ -39,9 +39,11 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" + "sigs.k8s.io/apiserver-network-proxy/pkg/servercounter" "sigs.k8s.io/apiserver-network-proxy/cmd/server/app/options" "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" @@ -132,7 +134,24 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error { if err != nil { return err } - p.server = server.NewProxyServer(o.ServerID, ps, int(o.ServerCount), authOpt, o.XfrChannelSize) + + var serverCounter servercounter.ServerCounter + if o.ServerCountLeaseSelector != "" { + sharedInformerFactory := informers.NewSharedInformerFactory(k8sClient, o.InformerResync) + serverLeaseCounter, err := servercounter.NewServerLeaseCounter( + sharedInformerFactory.Coordination().V1().Leases().Lister(), + o.ServerCountLeaseSelector, + int(o.ServerCount), + ) + if err != nil { + return fmt.Errorf("failed to create server lease counter: %w", err) + } + serverCounter = serverLeaseCounter + } else { + serverCounter = servercounter.StaticServerCounter(o.ServerCount) + } + + p.server = server.NewProxyServer(o.ServerID, ps, serverCounter, authOpt, o.XfrChannelSize) frontendStop, err := p.runFrontendServer(ctx, o, p.server) if err != nil { diff --git a/e2e/main_test.go b/e2e/main_test.go new file mode 100644 index 000000000..0572bf036 --- /dev/null +++ b/e2e/main_test.go @@ -0,0 +1,104 @@ +package e2e + +import ( + "context" + "flag" + "log" + "os" + "testing" + + "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/e2e-framework/pkg/env" + "sigs.k8s.io/e2e-framework/pkg/envconf" + "sigs.k8s.io/e2e-framework/pkg/envfuncs" + "sigs.k8s.io/e2e-framework/support/kind" +) + +var ( + testenv env.Environment + agentImage = flag.String("agent-image", "", "The proxy agent's docker image.") + serverImage = flag.String("server-image", "", "The proxy server's docker image.") +) + +func TestMain(m *testing.M) { + flag.Parse() + if *agentImage == "" { + log.Fatalf("must provide agent image with -agent-image") + } + if *serverImage == "" { + log.Fatalf("must provide server image with -server-image") + } + + scheme.AddToScheme(scheme.Scheme) + + testenv = env.New() + kindClusterName := "kind-test" + kindCluster := kind.NewCluster(kindClusterName) + + testenv.Setup( + envfuncs.CreateCluster(kindCluster, kindClusterName), + envfuncs.LoadImageToCluster(kindClusterName, *agentImage), + envfuncs.LoadImageToCluster(kindClusterName, *serverImage), + func(ctx context.Context, cfg *envconf.Config) (context.Context, error) { + client := cfg.Client() + + // Render agent RBAC templates. + agentServiceAccount, _, err := renderAgentTemplate("serviceaccount.yaml", struct{}{}) + if err != nil { + return nil, err + } + agentClusterRole, _, err := renderAgentTemplate("clusterrole.yaml", struct{}{}) + if err != nil { + return nil, err + } + agentClusterRoleBinding, _, err := renderAgentTemplate("clusterrolebinding.yaml", struct{}{}) + if err != nil { + return ctx, err + } + + // Submit agent RBAC templates to k8s. + err = client.Resources().Create(ctx, agentServiceAccount) + if err != nil { + return ctx, err + } + err = client.Resources().Create(ctx, agentClusterRole) + if err != nil { + return ctx, err + } + err = client.Resources().Create(ctx, agentClusterRoleBinding) + if err != nil { + return ctx, err + } + + // Render server RBAC and Service templates. + serverClusterRoleBinding, _, err := renderServerTemplate("clusterrolebinding.yaml", struct{}{}) + if err != nil { + return ctx, err + } + serverService, _, err := renderServerTemplate("service.yaml", struct{}{}) + if err != nil { + return ctx, err + } + + // Submit server templates to k8s. + err = client.Resources().Create(ctx, serverClusterRoleBinding) + if err != nil { + return ctx, err + } + err = client.Resources().Create(ctx, serverService) + if err != nil { + return ctx, err + } + + return ctx, nil + }, + ) + + testenv.Finish(envfuncs.DestroyCluster(kindClusterName)) + + os.Exit(testenv.Run(m)) +} + +func TestSingleAgentAndServer(t *testing.T) { + +} diff --git a/e2e/singleserver_singleagent_test.go b/e2e/singleserver_singleagent_test.go new file mode 100644 index 000000000..0b4aa93c0 --- /dev/null +++ b/e2e/singleserver_singleagent_test.go @@ -0,0 +1,130 @@ +package e2e + +import ( + "context" + "fmt" + "net/http" + "strconv" + "testing" + "time" + + "github.com/prometheus/common/expfmt" + "sigs.k8s.io/e2e-framework/klient/wait" + "sigs.k8s.io/e2e-framework/klient/wait/conditions" + "sigs.k8s.io/e2e-framework/pkg/envconf" + "sigs.k8s.io/e2e-framework/pkg/features" +) + +func TestSingleServer_SingleAgent_StaticCount(t *testing.T) { + serviceHost := "konnectivity-server.kube-system.svc.cluster.local" + adminPort := 8093 + + serverDeploymentCfg := DeploymentConfig{ + Replicas: 1, + Image: *serverImage, + Args: []KeyValue{ + {Key: "log-file", Value: "/var/log/konnectivity-server.log"}, + {Key: "logtostderr", Value: "true"}, + {Key: "log-file-max-size", Value: "0"}, + {Key: "uds-name", Value: "/etc/kubernetes/konnectivity-server/konnectivity-server.socket"}, + {Key: "delete-existing-uds-file"}, + {Key: "cluster-cert", Value: "/etc/kubernetes/pki/apiserver.crt"}, + {Key: "cluster-key", Value: "/etc/kubernetes/pki/apiserver.key"}, + {Key: "server-port", Value: "8090"}, + {Key: "agent-port", Value: "8091"}, + {Key: "health-port", Value: "8092"}, + {Key: "admin-port", Value: strconv.Itoa(adminPort)}, + {Key: "keepalive-time", Value: "1h"}, + {Key: "mode", Value: "grpc"}, + {Key: "agent-namespace", Value: "kube-system"}, + {Key: "agent-service-account", Value: "konnectivity-agent"}, + {Key: "kubeconfig", Value: "/etc/kubernetes/admin.conf"}, + {Key: "authentication-audience", Value: "system:konnectivity-server"}, + {Key: "server-count", Value: "1"}, + }, + } + serverDeployment, _, err := renderServerTemplate("deployment.yaml", serverDeploymentCfg) + if err != nil { + t.Fatalf("could not render server deployment: %v", err) + } + + agentDeploymentCfg := DeploymentConfig{ + Replicas: 1, + Image: *agentImage, + Args: []KeyValue{ + {Key: "logtostderr", Value: "true"}, + {Key: "ca-cert", Value: "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"}, + {Key: "proxy-server-host", Value: serviceHost}, + {Key: "proxy-server-port", Value: "8091"}, + {Key: "sync-interval", Value: "1s"}, + {Key: "sync-interval-cap", Value: "10s"}, + {Key: "sync-forever"}, + {Key: "probe-interval", Value: "1s"}, + {Key: "service-account-token-path", Value: "/var/run/secrets/tokens/konnectivity-agent-token"}, + {Key: "server-count", Value: "1"}, + }, + } + agentDeployment, _, err := renderAgentTemplate("deployment.yaml", agentDeploymentCfg) + if err != nil { + t.Fatalf("could not render agent deployment: %v", err) + } + + feature := features.New("konnectivity server and agent deployment with single replica for each") + feature.Setup(func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context { + client := cfg.Client() + err := client.Resources().Create(ctx, serverDeployment) + if err != nil { + t.Fatalf("could not create server deployment: %v", err) + } + + err = client.Resources().Create(ctx, agentDeployment) + if err != nil { + t.Fatalf("could not create agent deployment: %v", err) + } + + err = wait.For( + conditions.New(client.Resources()).DeploymentAvailable(agentDeployment.GetName(), agentDeployment.GetNamespace()), + wait.WithTimeout(1*time.Minute), + wait.WithInterval(10*time.Second), + ) + if err != nil { + t.Fatalf("waiting for agent deployment failed: %v", err) + } + + err = wait.For( + conditions.New(client.Resources()).DeploymentAvailable(serverDeployment.GetName(), serverDeployment.GetNamespace()), + wait.WithTimeout(1*time.Minute), + wait.WithInterval(10*time.Second), + ) + if err != nil { + t.Fatalf("waiting for server deployment failed: %v", err) + } + + return ctx + }) + feature.Assess("konnectivity server has a connected client", func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context { + resp, err := http.Get(fmt.Sprintf("%v:%v/metrics", serviceHost, adminPort)) + if err != nil { + t.Fatalf("could not read server metrics: %v", err) + } + + metricsParser := &expfmt.TextParser{} + metricsFamilies, err := metricsParser.TextToMetricFamilies(resp.Body) + defer resp.Body.Close() + if err != nil { + t.Fatalf("could not parse server metrics: %v", err) + } + + connectionsMetric, exists := metricsFamilies["konnectivity_network_proxy_server_ready_backend_connections"] + if !exists { + t.Fatalf("couldn't find number of ready backend connections in metrics: %v", metricsFamilies) + } + + numConnections := int(connectionsMetric.GetMetric()[0].GetGauge().GetValue()) + if numConnections != 1 { + t.Errorf("incorrect number of connected agents (want: 1, got: %v)", numConnections) + } + + return ctx + }) +} diff --git a/e2e/templates/agent/clusterrole.yaml b/e2e/templates/agent/clusterrole.yaml new file mode 100644 index 000000000..85ee7302a --- /dev/null +++ b/e2e/templates/agent/clusterrole.yaml @@ -0,0 +1,10 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: system:konnectivity-agent + labels: + kubernetes.io/cluster-service: "true" +rules: +- apiGroups: ["coordination.k8s.io"] + resources: ["leases"] + verbs: ["get", "watch", "list"] diff --git a/e2e/templates/agent/clusterrolebinding.yaml b/e2e/templates/agent/clusterrolebinding.yaml new file mode 100644 index 000000000..57a3120c0 --- /dev/null +++ b/e2e/templates/agent/clusterrolebinding.yaml @@ -0,0 +1,14 @@ +apiVersion: rbac.authorization.k8s.io/rbacv1 +kind: ClusterRoleBinding +metadata: + name: system:konnectivity-agent + labels: + kubernetes.io/cluster-service: "true" +subjects: +- kind: ServiceAccount + name: konnectivity-agent + namespace: kube-system +roleRef: + kind: ClusterRole + name: system:konnectivity-agent + apiGroup: rbac.authorization.k8s.io diff --git a/e2e/templates/agent/deployment.yaml b/e2e/templates/agent/deployment.yaml new file mode 100644 index 000000000..57df57532 --- /dev/null +++ b/e2e/templates/agent/deployment.yaml @@ -0,0 +1,69 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + k8s-app: konnectivity-agent + namespace: kube-system + name: konnectivity-agent +spec: + replicas: {{ .Replicas }} + selector: + matchLabels: + k8s-app: konnectivity-agent + template: + metadata: + labels: + k8s-app: konnectivity-agent + spec: + containers: + - name: konnectivity-agent-container + image: {{ .Image }} + resources: + requests: + cpu: 50m + limits: + memory: 30Mi + command: [ "/proxy-agent"] + args: [ + {{ range .Args }} + "--{{ .Key }}{{if ne .Value ""}}={{ .Value }}{{ end }}", + {{ end }} + ] + env: + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: HOST_IP + valueFrom: + fieldRef: + fieldPath: status.hostIP + livenessProbe: + httpGet: + scheme: HTTP + port: 8093 + path: /healthz + initialDelaySeconds: 15 + timeoutSeconds: 15 + readinessProbe: + httpGet: + scheme: HTTP + port: 8093 + path: /readyz + initialDelaySeconds: 15 + timeoutSeconds: 15 + volumeMounts: + - mountPath: /var/run/secrets/tokens + name: konnectivity-agent-token + serviceAccountName: konnectivity-agent + volumes: + - name: konnectivity-agent-token + projected: + sources: + - serviceAccountToken: + path: konnectivity-agent-token + audience: system:konnectivity-server diff --git a/e2e/templates/agent/serviceaccount.yaml b/e2e/templates/agent/serviceaccount.yaml new file mode 100644 index 000000000..4876d12e5 --- /dev/null +++ b/e2e/templates/agent/serviceaccount.yaml @@ -0,0 +1,7 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: konnectivity-agent + namespace: kube-system + labels: + kubernetes.io/cluster-service: "true" diff --git a/e2e/templates/server/clusterrolebinding.yaml b/e2e/templates/server/clusterrolebinding.yaml new file mode 100644 index 000000000..1dc4fa928 --- /dev/null +++ b/e2e/templates/server/clusterrolebinding.yaml @@ -0,0 +1,14 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: system:konnectivity-server + labels: + kubernetes.io/cluster-service: "true" +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: system:auth-delegator +subjects: +- apiGroup: rbac.authorization.k8s.io + kind: User + name: system:konnectivity-server diff --git a/e2e/templates/server/deployment.yaml b/e2e/templates/server/deployment.yaml new file mode 100644 index 000000000..e1bec710b --- /dev/null +++ b/e2e/templates/server/deployment.yaml @@ -0,0 +1,70 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + k8s-app: konnectivity-server + namespace: kube-system + name: konnectivity-server +spec: + replicas: {{ .Replicas }} + selector: + matchLabels: + k8s-app: konnectivity-server + template: + metadata: + labels: + k8s-app: konnectivity-server + containers: + - name: konnectivity-server-container + image: {{ .Image }} + resources: + requests: + cpu: 1m + command: [ "/proxy-server"] + args: [ + {{ range .Args }} + "--{{ .Key }}{{if ne .Value ""}}={{ .Value }}{{ end }}", + {{ end }} + ] + livenessProbe: + httpGet: + scheme: HTTP + host: 127.0.0.1 + port: 8092 + path: /healthz + initialDelaySeconds: 10 + timeoutSeconds: 60 + ports: + - name: serverport + containerPort: 8090 + hostPort: 8090 + - name: agentport + containerPort: 8091 + hostPort: 8091 + - name: healthport + containerPort: 8092 + hostPort: 8092 + - name: adminport + containerPort: 8093 + hostPort: 8093 + volumeMounts: + - name: varlogkonnectivityserver + mountPath: /var/log/konnectivity-server.log + readOnly: false + - name: kubernetes + mountPath: /etc/kubernetes + readOnly: true + - name: konnectivity-home + mountPath: /etc/kubernetes/konnectivity-server + volumes: + - name: varlogkonnectivityserver + hostPath: + path: /var/log/konnectivity-server.log + type: FileOrCreate + - name: kubernetes + hostPath: + path: /etc/kubernetes + - name: konnectivity-home + hostPath: + path: /etc/kubernetes/konnectivity-server + type: DirectoryOrCreate diff --git a/e2e/templates/server/service.yaml b/e2e/templates/server/service.yaml new file mode 100644 index 000000000..9517cb6b2 --- /dev/null +++ b/e2e/templates/server/service.yaml @@ -0,0 +1,13 @@ +apiVersion: v1 +kind: Service +metadata: + name: konnectivity-server + namespace: kube-system +spec: + selector: + k8s-app: konnectivity-server + clusterIP: None + ports: + - protocol: TCP + port: 8091 + targetPort: 8091 diff --git a/e2e/templates_test.go b/e2e/templates_test.go new file mode 100644 index 000000000..9cba68cf7 --- /dev/null +++ b/e2e/templates_test.go @@ -0,0 +1,65 @@ +package e2e + +import ( + "bytes" + "fmt" + "text/template" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type KeyValue struct { + Key string + Value string +} + +type DeploymentConfig struct { + Replicas int + Image string + Args []KeyValue +} + +var ( + serverTemplates = template.Must(template.ParseGlob("templates/server/*")) + agentTemplates = template.Must(template.ParseGlob("templates/agent/*")) +) + +// renderServerTemplate renders a template from e2e/templates/server into a kubernetes object. +func renderServerTemplate(file string, params any) (client.Object, *schema.GroupVersionKind, error) { + b := &bytes.Buffer{} + + err := serverTemplates.ExecuteTemplate(b, file, params) + if err != nil { + return nil, nil, fmt.Errorf("could not render server template %v: %w", file, err) + } + + decoder := scheme.Codecs.UniversalDeserializer() + + obj, gvk, err := decoder.Decode(b.Bytes(), nil, nil) + if err != nil { + return nil, nil, fmt.Errorf("could not decode rendered yaml into kubernetes object: %w", err) + } + + return obj.(client.Object), gvk, nil +} + +// renderAgentTemplate renders a template from e2e/templates/agent into a kubernetes object. +func renderAgentTemplate(file string, params any) (client.Object, *schema.GroupVersionKind, error) { + b := &bytes.Buffer{} + + err := agentTemplates.ExecuteTemplate(b, file, params) + if err != nil { + return nil, nil, fmt.Errorf("could not render server template %v: %w", file, err) + } + + decoder := scheme.Codecs.UniversalDeserializer() + + obj, gvk, err := decoder.Decode(b.Bytes(), nil, nil) + if err != nil { + return nil, nil, fmt.Errorf("could not decode rendered yaml into kubernetes object: %w", err) + } + + return obj.(client.Object), gvk, nil +} diff --git a/examples/kind-multinode/templates/k8s/konnectivity-agent-ds.yaml b/examples/kind-multinode/templates/k8s/konnectivity-agent-ds.yaml index 779fdeb85..5030e5e03 100644 --- a/examples/kind-multinode/templates/k8s/konnectivity-agent-ds.yaml +++ b/examples/kind-multinode/templates/k8s/konnectivity-agent-ds.yaml @@ -1,3 +1,28 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: system:konnectivity-agent + labels: + kubernetes.io/cluster-service: "true" +rules: +- apiGroups: ["coordination.k8s.io"] + resources: ["leases"] + verbs: ["get", "watch", "list"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: system:konnectivity-agent + labels: + kubernetes.io/cluster-service: "true" +subjects: +- kind: ServiceAccount + name: konnectivity-agent + namespace: kube-system +roleRef: + kind: ClusterRole + name: system:konnectivity-agent + apiGroup: rbac.authorization.k8s.io --- apiVersion: v1 kind: ServiceAccount @@ -53,7 +78,8 @@ spec: "--sync-forever", "--probe-interval=5s", "--service-account-token-path=/var/run/secrets/tokens/konnectivity-agent-token", - "--agent-identifiers=ipv4=${HOST_IP}" + "--agent-identifiers=ipv4=${HOST_IP}", + "--server-count-lease-selector=apiserver.kubernetes.io/identity=kube-apiserver", ] env: - name: POD_NAME diff --git a/go.mod b/go.mod index c196d846e..13e9c52e1 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,8 @@ module sigs.k8s.io/apiserver-network-proxy go 1.22 -toolchain go1.22.4 + +toolchain go1.22 require ( github.com/google/uuid v1.6.0 @@ -21,7 +22,9 @@ require ( k8s.io/client-go v0.30.2 k8s.io/component-base v0.30.2 k8s.io/klog/v2 v2.130.0 - sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.0 + sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.29.0 + sigs.k8s.io/controller-runtime v0.18.2 + sigs.k8s.io/e2e-framework v0.4.0 ) require ( @@ -31,6 +34,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/evanphx/json-patch v4.12.0+incompatible // indirect + github.com/evanphx/json-patch/v5 v5.9.0 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect @@ -38,18 +42,23 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/gnostic-models v0.6.8 // indirect + github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect - github.com/imdario/mergo v0.3.6 // indirect + github.com/gorilla/websocket v1.5.0 // indirect + github.com/imdario/mergo v0.3.15 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/mailru/easyjson v0.7.7 // indirect + github.com/moby/spdystream v0.2.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect + github.com/vladimirvivien/gexe v0.2.0 // indirect golang.org/x/oauth2 v0.19.0 // indirect golang.org/x/sys v0.21.0 // indirect golang.org/x/term v0.21.0 // indirect @@ -63,7 +72,7 @@ require ( k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect - sigs.k8s.io/yaml v1.3.0 // indirect + sigs.k8s.io/yaml v1.4.0 // indirect ) replace sigs.k8s.io/apiserver-network-proxy/konnectivity-client => ./konnectivity-client diff --git a/go.sum b/go.sum index 5381e5c70..37407c802 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= +github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM= @@ -13,8 +15,12 @@ github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxER github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84= github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= +github.com/evanphx/json-patch/v5 v5.9.0 h1:kcBlZQbplgElYIlo/n1hJbls2z/1awpXxpRi0/FOJfg= +github.com/evanphx/json-patch/v5 v5.9.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/zapr v1.3.0 h1:XGdV8XW8zdwFiwOA2Dryh1gj2KRQyOOoNmBy4EplIcQ= +github.com/go-logr/zapr v1.3.0/go.mod h1:YKepepNBd1u/oyhd/yQmtjVXmm9uML4IXUgMOwR8/Gg= github.com/go-openapi/jsonpointer v0.19.6 h1:eCs3fxoIi3Wh6vtgmLTOjdhSpiqphQ+DaPn38N2ZdrE= github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs= github.com/go-openapi/jsonreference v0.20.2 h1:3sVjiK66+uXK/6oQ8xgcRKcFgQ5KXa2KvnJRumpMGbE= @@ -39,8 +45,11 @@ github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJY github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28= -github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/imdario/mergo v0.3.15 h1:M8XP7IuFNsqUx6VPK2P9OSmsYsI/YFaGil0uD21V3dM= +github.com/imdario/mergo v0.3.15/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= @@ -58,6 +67,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/moby/spdystream v0.2.0 h1:cjW1zVyyoiM0T7b6UoySUFqzXMoqRckQtXwGPiBhOM8= +github.com/moby/spdystream v0.2.0/go.mod h1:f7i0iNDQJ059oMTcWxx8MA/zKFIuD/lY+0GqbN2Wy8c= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -65,10 +76,12 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= -github.com/onsi/ginkgo/v2 v2.15.0 h1:79HwNRBAZHOEwrczrgSOPy+eFTTlIGELKy5as+ClttY= -github.com/onsi/ginkgo/v2 v2.15.0/go.mod h1:HlxMHtYF57y6Dpf+mc5529KKmSq9h2FpCF+/ZkwUxKM= -github.com/onsi/gomega v1.31.0 h1:54UJxxj6cPInHS3a35wm6BK/F9nHYueZ1NVujHDrnXE= -github.com/onsi/gomega v1.31.0/go.mod h1:DW9aCi7U6Yi40wNVAvT6kzFnEVEI5n3DloYBiKiT6zk= +github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus= +github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= +github.com/onsi/ginkgo/v2 v2.17.1 h1:V++EzdbhI4ZV4ev0UTIj0PzhzOcReJFyJaLjtSF55M8= +github.com/onsi/ginkgo/v2 v2.17.1/go.mod h1:llBI3WDLL9Z6taip6f33H76YcWtJv+7R3HigUjbIBOs= +github.com/onsi/gomega v1.32.0 h1:JRYU78fJ1LPxlckP6Txi/EYqJvjtMrDC04/MM5XRHPk= +github.com/onsi/gomega v1.32.0/go.mod h1:a4x4gW6Pz2yK1MAmvluYme5lvYTn61afQ2ETw/8n4Lg= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -97,15 +110,23 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/vladimirvivien/gexe v0.2.0 h1:nbdAQ6vbZ+ZNsolCgSVb9Fno60kzSuvtzVh6Ytqi/xY= +github.com/vladimirvivien/gexe v0.2.0/go.mod h1:LHQL00w/7gDUKIak24n801ABp8C+ni6eBht9vGVst8w= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= +go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e h1:+WEEuIdZHnUeJJmEUjyYC2gfUMj69yZXw17EnHg/otA= +golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e/go.mod h1:Kr81I6Kryrl9sr8s2FK3vxD90NdsKWRuOIl2O4CvYbA= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -161,6 +182,8 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= k8s.io/api v0.30.2 h1:+ZhRj+28QT4UOH+BKznu4CBgPWgkXO7XAvMcMl0qKvI= k8s.io/api v0.30.2/go.mod h1:ULg5g9JvOev2dG0u2hig4Z7tQ2hHIuS+m8MNZ+X6EmI= +k8s.io/apiextensions-apiserver v0.30.0 h1:jcZFKMqnICJfRxTgnC4E+Hpcq8UEhT8B2lhBcQ+6uAs= +k8s.io/apiextensions-apiserver v0.30.0/go.mod h1:N9ogQFGcrbWqAY9p2mUAL5mGxsLqwgtUce127VtRX5Y= k8s.io/apimachinery v0.30.2 h1:fEMcnBj6qkzzPGSVsAZtQThU62SmQ4ZymlXRC5yFSCg= k8s.io/apimachinery v0.30.2/go.mod h1:iexa2somDaxdnj7bha06bhb43Zpa6eWH8N8dbqVjTUc= k8s.io/client-go v0.30.2 h1:sBIVJdojUNPDU/jObC+18tXWcTJVcwyqS9diGdWHk50= @@ -173,9 +196,13 @@ k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7F k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340/go.mod h1:yD4MZYeKMBwQKVht279WycxKyM84kkAx2DPrTXaeb98= k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSnlTLKgpAAttJvpI= k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +sigs.k8s.io/controller-runtime v0.18.2 h1:RqVW6Kpeaji67CY5nPEfRz6ZfFMk0lWQlNrLqlNpx+Q= +sigs.k8s.io/controller-runtime v0.18.2/go.mod h1:tuAt1+wbVsXIT8lPtk5RURxqAnq7xkpv2Mhttslg7Hw= +sigs.k8s.io/e2e-framework v0.4.0 h1:4yYmFDNNoTnazqmZJXQ6dlQF1vrnDbutmxlyvBpC5rY= +sigs.k8s.io/e2e-framework v0.4.0/go.mod h1:JilFQPF1OL1728ABhMlf9huse7h+uBJDXl9YeTs49A8= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4= sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08= -sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo= -sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8= +sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= +sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= diff --git a/pkg/agent/clientset.go b/pkg/agent/clientset.go index 7d2a10145..c6457f1bf 100644 --- a/pkg/agent/clientset.go +++ b/pkg/agent/clientset.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" "sigs.k8s.io/apiserver-network-proxy/pkg/agent/metrics" + "sigs.k8s.io/apiserver-network-proxy/pkg/servercounter" ) // ClientSet consists of clients connected to each instance of an HA proxy server. @@ -36,9 +37,9 @@ type ClientSet struct { clients map[string]*Client // map between serverID and the client // connects to this server. - agentID string // ID of this agent - address string // proxy server address. Assuming HA proxy server - serverCount int // number of proxy server instances, should be 1 + agentID string // ID of this agent + address string // proxy server address. Assuming HA proxy server + serverCounter servercounter.ServerCounter // counts number of proxy servers // unless it is an HA server. Initialized when the ClientSet creates // the first client. When syncForever is set, it will be the most recently seen. syncInterval time.Duration // The interval by which the agent @@ -64,6 +65,8 @@ type ClientSet struct { xfrChannelSize int syncForever bool // Continue syncing (support dynamic server count). + + respectReceivedServerCount bool // Respect server count received from proxy server rather than relying on the agent's own server counter } func (cs *ClientSet) ClientsCount() int { @@ -145,22 +148,24 @@ type ClientSetConfig struct { XfrChannelSize int } -func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}) *ClientSet { +func (cc *ClientSetConfig) NewAgentClientSet(drainCh, stopCh <-chan struct{}, serverCounter servercounter.ServerCounter) *ClientSet { return &ClientSet{ - clients: make(map[string]*Client), - agentID: cc.AgentID, - agentIdentifiers: cc.AgentIdentifiers, - address: cc.Address, - syncInterval: cc.SyncInterval, - probeInterval: cc.ProbeInterval, - syncIntervalCap: cc.SyncIntervalCap, - dialOptions: cc.DialOptions, - serviceAccountTokenPath: cc.ServiceAccountTokenPath, - warnOnChannelLimit: cc.WarnOnChannelLimit, - syncForever: cc.SyncForever, - drainCh: drainCh, - xfrChannelSize: cc.XfrChannelSize, - stopCh: stopCh, + clients: make(map[string]*Client), + agentID: cc.AgentID, + agentIdentifiers: cc.AgentIdentifiers, + address: cc.Address, + syncInterval: cc.SyncInterval, + probeInterval: cc.ProbeInterval, + syncIntervalCap: cc.SyncIntervalCap, + dialOptions: cc.DialOptions, + serviceAccountTokenPath: cc.ServiceAccountTokenPath, + warnOnChannelLimit: cc.WarnOnChannelLimit, + syncForever: cc.SyncForever, + drainCh: drainCh, + xfrChannelSize: cc.XfrChannelSize, + stopCh: stopCh, + respectReceivedServerCount: false, + serverCounter: serverCounter, } } @@ -187,8 +192,9 @@ func (cs *ClientSet) sync() { if err := cs.connectOnce(); err != nil { if dse, ok := err.(*DuplicateServerError); ok { clientsCount := cs.ClientsCount() - klog.V(4).InfoS("duplicate server", "serverID", dse.ServerID, "serverCount", cs.serverCount, "clientsCount", clientsCount) - if cs.serverCount != 0 && clientsCount >= cs.serverCount { + serverCount := cs.serverCounter.CountServers() + klog.V(4).InfoS("duplicate server", "serverID", dse.ServerID, "serverCount", serverCount, "clientsCount", clientsCount) + if serverCount != 0 && clientsCount >= serverCount { duration = backoff.Step() } else { backoff = cs.resetBackoff() @@ -212,19 +218,24 @@ func (cs *ClientSet) sync() { } func (cs *ClientSet) connectOnce() error { - if !cs.syncForever && cs.serverCount != 0 && cs.ClientsCount() >= cs.serverCount { + agentServerCount := cs.serverCounter.CountServers() + if !cs.syncForever && agentServerCount != 0 && cs.ClientsCount() >= agentServerCount { return nil } - c, serverCount, err := cs.newAgentClient() + c, newServerCount, err := cs.newAgentClient() if err != nil { return err } - if cs.serverCount != 0 && cs.serverCount != serverCount { + if agentServerCount != 0 && agentServerCount != newServerCount { klog.V(2).InfoS("Server count change suggestion by server", - "current", cs.serverCount, "serverID", c.serverID, "actual", serverCount) - + "current", agentServerCount, "serverID", c.serverID, "actual", newServerCount) + if cs.respectReceivedServerCount { + cs.serverCounter = servercounter.StaticServerCounter(newServerCount) + klog.V(2).Infof("respecting server count change suggestion, new count: %v", newServerCount) + } else { + klog.V(2).Infof("ignoring server count change suggestion of %v", newServerCount) + } } - cs.serverCount = serverCount if err := cs.AddClient(c.serverID, c); err != nil { c.Close() return err diff --git a/pkg/server/server.go b/pkg/server/server.go index e0d47dda9..412ef0314 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -37,9 +37,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" - commonmetrics "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/common/metrics" "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" + "sigs.k8s.io/apiserver-network-proxy/pkg/servercounter" "sigs.k8s.io/apiserver-network-proxy/pkg/server/metrics" "sigs.k8s.io/apiserver-network-proxy/pkg/util" @@ -208,8 +208,8 @@ type ProxyServer struct { PendingDial *PendingDialManager - serverID string // unique ID of this server - serverCount int // Number of proxy server instances, should be 1 unless it is a HA server. + serverID string // unique ID of this server + serverCounter servercounter.ServerCounter // provides number of proxy servers // agent authentication AgentAuthenticationOptions *AgentTokenAuthenticationOptions @@ -375,7 +375,7 @@ func (s *ProxyServer) removeEstablishedForStream(streamUID string) []*ProxyClien } // NewProxyServer creates a new ProxyServer instance -func NewProxyServer(serverID string, proxyStrategies []ProxyStrategy, serverCount int, agentAuthenticationOptions *AgentTokenAuthenticationOptions, channelSize int) *ProxyServer { +func NewProxyServer(serverID string, proxyStrategies []ProxyStrategy, serverCounter servercounter.ServerCounter, agentAuthenticationOptions *AgentTokenAuthenticationOptions, channelSize int) *ProxyServer { var bms []BackendManager for _, ps := range proxyStrategies { switch ps { @@ -394,7 +394,7 @@ func NewProxyServer(serverID string, proxyStrategies []ProxyStrategy, serverCoun established: make(map[string](map[int64]*ProxyClientConnection)), PendingDial: NewPendingDialManager(), serverID: serverID, - serverCount: serverCount, + serverCounter: serverCounter, BackendManagers: bms, AgentAuthenticationOptions: agentAuthenticationOptions, // use the first backend-manager as the Readiness Manager @@ -441,7 +441,7 @@ func (s *ProxyServer) Proxy(stream client.ProxyService_ProxyServer) error { }() labels := runpprof.Labels( - "serverCount", strconv.Itoa(s.serverCount), + "serverCount", strconv.Itoa(s.serverCounter.CountServers()), "userAgent", strings.Join(userAgent, ", "), ) // Start goroutine to receive packets from frontend and push to recvCh @@ -722,7 +722,7 @@ func (s *ProxyServer) Connect(stream agent.AgentService_ConnectServer) error { klog.V(5).InfoS("Connect request from agent", "agentID", agentID, "serverID", s.serverID) labels := runpprof.Labels( - "serverCount", strconv.Itoa(s.serverCount), + "serverCount", strconv.Itoa(s.serverCounter.CountServers()), "agentID", agentID, ) ctx := runpprof.WithLabels(context.Background(), labels) @@ -735,7 +735,7 @@ func (s *ProxyServer) Connect(stream agent.AgentService_ConnectServer) error { } } - h := metadata.Pairs(header.ServerID, s.serverID, header.ServerCount, strconv.Itoa(s.serverCount)) + h := metadata.Pairs(header.ServerID, s.serverID, header.ServerCount, strconv.Itoa(s.serverCounter.CountServers())) if err := stream.SendHeader(h); err != nil { klog.ErrorS(err, "Failed to send server count back to agent", "agentID", agentID) return err diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index f5c548b3d..10658f3e2 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -28,6 +28,7 @@ import ( "github.com/google/uuid" "go.uber.org/mock/gomock" "google.golang.org/grpc/metadata" + "sigs.k8s.io/apiserver-network-proxy/pkg/servercounter" promtest "github.com/prometheus/client_golang/prometheus/testutil" authv1 "k8s.io/api/authentication/v1" @@ -169,7 +170,7 @@ func TestAgentTokenAuthenticationErrorsToken(t *testing.T) { conn.EXPECT().Recv().Return(nil, io.EOF) } - p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, 1, &AgentTokenAuthenticationOptions{ + p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, servercounter.StaticServerCounter(1), &AgentTokenAuthenticationOptions{ Enabled: true, KubernetesClient: kcs, AgentNamespace: tc.wantNamespace, @@ -197,7 +198,7 @@ func TestRemovePendingDialForStream(t *testing.T) { pending3 := &ProxyClientConnection{frontend: &GrpcFrontend{streamUID: streamUID}} pending4 := &ProxyClientConnection{frontend: &GrpcFrontend{streamUID: "different-uid"}} pending5 := &ProxyClientConnection{frontend: &GrpcFrontend{streamUID: ""}} - p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, 1, nil, xfrChannelSize) + p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, servercounter.StaticServerCounter(1), nil, xfrChannelSize) p.PendingDial.Add(1, pending1) p.PendingDial.Add(2, pending2) p.PendingDial.Add(3, pending3) @@ -225,7 +226,7 @@ func TestAddRemoveFrontends(t *testing.T) { agent2ConnID2 := new(ProxyClientConnection) agent3ConnID1 := new(ProxyClientConnection) - p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, 1, nil, xfrChannelSize) + p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, servercounter.StaticServerCounter(1), nil, xfrChannelSize) p.addEstablished("agent1", int64(1), agent1ConnID1) p.removeEstablished("agent1", int64(1)) expectedFrontends := make(map[string]map[int64]*ProxyClientConnection) @@ -233,7 +234,7 @@ func TestAddRemoveFrontends(t *testing.T) { t.Errorf("expected %v, got %v", e, a) } - p = NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, 1, nil, xfrChannelSize) + p = NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, servercounter.StaticServerCounter(1), nil, xfrChannelSize) p.addEstablished("agent1", int64(1), agent1ConnID1) p.addEstablished("agent1", int64(2), agent1ConnID2) p.addEstablished("agent2", int64(1), agent2ConnID1) @@ -263,7 +264,7 @@ func TestAddRemoveBackends_DefaultStrategy(t *testing.T) { backend2, _ := NewBackend(mockAgentConn(ctrl, "agent2", []string{})) backend3, _ := NewBackend(mockAgentConn(ctrl, "agent3", []string{})) - p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, 1, nil, xfrChannelSize) + p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, servercounter.StaticServerCounter(1), nil, xfrChannelSize) p.addBackend(backend1) @@ -295,7 +296,7 @@ func TestAddRemoveBackends_DefaultRouteStrategy(t *testing.T) { backend2, _ := NewBackend(mockAgentConn(ctrl, "agent2", []string{"default-route=false"})) backend3, _ := NewBackend(mockAgentConn(ctrl, "agent3", []string{"default-route=true"})) - p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefaultRoute}, 1, nil, xfrChannelSize) + p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefaultRoute}, servercounter.StaticServerCounter(1), nil, xfrChannelSize) p.addBackend(backend1) @@ -337,7 +338,7 @@ func TestAddRemoveBackends_DestHostStrategy(t *testing.T) { backend2, _ := NewBackend(mockAgentConn(ctrl, "agent2", []string{"default-route=true"})) backend3, _ := NewBackend(mockAgentConn(ctrl, "agent3", []string{"host=node2.mydomain.com&ipv4=5.6.7.8&ipv6=::"})) - p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDestHost}, 1, nil, xfrChannelSize) + p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDestHost}, servercounter.StaticServerCounter(1), nil, xfrChannelSize) p.addBackend(backend1) p.addBackend(backend2) @@ -384,7 +385,7 @@ func TestAddRemoveBackends_DestHostSanitizeRequest(t *testing.T) { backend1, _ := NewBackend(mockAgentConn(ctrl, "agent1", []string{"host=localhost&host=node1.mydomain.com&ipv4=1.2.3.4&ipv6=9878::7675:1292:9183:7562"})) backend2, _ := NewBackend(mockAgentConn(ctrl, "agent2", []string{"host=node2.mydomain.com&ipv4=5.6.7.8&ipv6=::"})) - p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDestHost}, 1, nil, xfrChannelSize) + p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDestHost}, servercounter.StaticServerCounter(1), nil, xfrChannelSize) p.addBackend(backend1) p.addBackend(backend2) @@ -408,7 +409,7 @@ func TestAddRemoveBackends_DestHostWithDefault(t *testing.T) { backend2, _ := NewBackend(mockAgentConn(ctrl, "agent2", []string{"default-route=false"})) backend3, _ := NewBackend(mockAgentConn(ctrl, "agent3", []string{"host=node2.mydomain.com&ipv4=5.6.7.8&ipv6=::"})) - p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDestHost, ProxyStrategyDefault}, 1, nil, xfrChannelSize) + p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDestHost, ProxyStrategyDefault}, servercounter.StaticServerCounter(1), nil, xfrChannelSize) p.addBackend(backend1) p.addBackend(backend2) @@ -461,7 +462,7 @@ func TestAddRemoveBackends_DestHostWithDuplicateIdents(t *testing.T) { backend2, _ := NewBackend(mockAgentConn(ctrl, "agent2", []string{"host=localhost&host=node1.mydomain.com&ipv4=1.2.3.4&ipv6=9878::7675:1292:9183:7562"})) backend3, _ := NewBackend(mockAgentConn(ctrl, "agent3", []string{"host=localhost&host=node2.mydomain.com&ipv4=5.6.7.8&ipv6=::"})) - p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDestHost, ProxyStrategyDefault}, 1, nil, xfrChannelSize) + p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDestHost, ProxyStrategyDefault}, servercounter.StaticServerCounter(1), nil, xfrChannelSize) p.addBackend(backend1) p.addBackend(backend2) @@ -518,7 +519,7 @@ func TestEstablishedConnsMetric(t *testing.T) { agent2ConnID2 := new(ProxyClientConnection) agent3ConnID1 := new(ProxyClientConnection) - p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, 1, nil, xfrChannelSize) + p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, servercounter.StaticServerCounter(1), nil, xfrChannelSize) p.addEstablished("agent1", int64(1), agent1ConnID1) assertEstablishedConnsMetric(t, 1) p.addEstablished("agent1", int64(2), agent1ConnID2) @@ -550,7 +551,7 @@ func TestRemoveEstablishedForBackendConn(t *testing.T) { agent2ConnID1 := &ProxyClientConnection{backend: backend2} agent2ConnID2 := &ProxyClientConnection{backend: backend2} agent3ConnID1 := &ProxyClientConnection{backend: backend3} - p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, 1, nil, xfrChannelSize) + p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, servercounter.StaticServerCounter(1), nil, xfrChannelSize) p.addEstablished("agent1", int64(1), agent1ConnID1) p.addEstablished("agent1", int64(2), agent1ConnID2) p.addEstablished("agent2", int64(1), agent2ConnID1) @@ -581,7 +582,7 @@ func TestRemoveEstablishedForStream(t *testing.T) { agent2ConnID1 := &ProxyClientConnection{backend: backend2, frontend: &GrpcFrontend{streamUID: streamUID}} agent2ConnID2 := &ProxyClientConnection{backend: backend2} agent3ConnID1 := &ProxyClientConnection{backend: backend3, frontend: &GrpcFrontend{streamUID: streamUID}} - p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, 1, nil, xfrChannelSize) + p := NewProxyServer("", []ProxyStrategy{ProxyStrategyDefault}, servercounter.StaticServerCounter(1), nil, xfrChannelSize) p.addEstablished("agent1", int64(1), agent1ConnID1) p.addEstablished("agent1", int64(2), agent1ConnID2) p.addEstablished("agent2", int64(1), agent2ConnID1) @@ -641,7 +642,7 @@ func baseServerProxyTestWithoutBackend(t *testing.T, validate func(*agentmock.Mo defer ctrl.Finish() frontendConn := prepareFrontendConn(ctrl) - proxyServer := NewProxyServer(uuid.New().String(), []ProxyStrategy{ProxyStrategyDefault}, 1, &AgentTokenAuthenticationOptions{}, xfrChannelSize) + proxyServer := NewProxyServer(uuid.New().String(), []ProxyStrategy{ProxyStrategyDefault}, servercounter.StaticServerCounter(1), &AgentTokenAuthenticationOptions{}, xfrChannelSize) validate(frontendConn) @@ -655,7 +656,7 @@ func baseServerProxyTestWithBackend(t *testing.T, validate func(*agentmock.MockA frontendConn := prepareFrontendConn(ctrl) // prepare proxy server - proxyServer := NewProxyServer(uuid.New().String(), []ProxyStrategy{ProxyStrategyDefault}, 1, &AgentTokenAuthenticationOptions{}, xfrChannelSize) + proxyServer := NewProxyServer(uuid.New().String(), []ProxyStrategy{ProxyStrategyDefault}, servercounter.StaticServerCounter(1), &AgentTokenAuthenticationOptions{}, xfrChannelSize) agentConn, _ := prepareAgentConnMD(t, ctrl, proxyServer) @@ -861,7 +862,7 @@ func TestReadyBackendsMetric(t *testing.T) { metrics.Metrics.Reset() - p := NewProxyServer(uuid.New().String(), []ProxyStrategy{ProxyStrategyDefault}, 1, &AgentTokenAuthenticationOptions{}, xfrChannelSize) + p := NewProxyServer(uuid.New().String(), []ProxyStrategy{ProxyStrategyDefault}, servercounter.StaticServerCounter(1), &AgentTokenAuthenticationOptions{}, xfrChannelSize) assertReadyBackendsMetric(t, 0) _, backend := prepareAgentConnMD(t, ctrl, p) diff --git a/pkg/servercounter/counter.go b/pkg/servercounter/counter.go new file mode 100644 index 000000000..96fd44dc6 --- /dev/null +++ b/pkg/servercounter/counter.go @@ -0,0 +1,16 @@ +// Package servercounter provides utilities for getting the current count of +// proxy servers. +package servercounter + +// A ServerCounter counts the number of available proxy servers. +type ServerCounter interface { + CountServers() int +} + +// A StaticServerCounter stores a static server count. +type StaticServerCounter int + +// CountServers returns the current (static) proxy server count. +func (sc StaticServerCounter) CountServers() int { + return int(sc) +} diff --git a/pkg/servercounter/lease_counter.go b/pkg/servercounter/lease_counter.go new file mode 100644 index 000000000..6fc71e3f4 --- /dev/null +++ b/pkg/servercounter/lease_counter.go @@ -0,0 +1,92 @@ +package servercounter + +import ( + "fmt" + "time" + + "k8s.io/apimachinery/pkg/labels" + "k8s.io/klog/v2" + + coordinationv1api "k8s.io/api/coordination/v1" + coordinationv1listers "k8s.io/client-go/listers/coordination/v1" +) + +var timeNow = time.Now + +// A ServerLeaseCounter counts leases in the k8s apiserver to determine the +// current proxy server count. +type ServerLeaseCounter struct { + lister coordinationv1listers.LeaseLister + selector labels.Selector + fallbackCount int +} + +// NewServerLeaseCounter creates a server counter that counts valid leases that match the label +// selector and provides the fallback count if this fails. +func NewServerLeaseCounter(lister coordinationv1listers.LeaseLister, labelSelector string, fallbackCount int) (*ServerLeaseCounter, error) { + selector, err := labels.Parse(labelSelector) + if err != nil { + return nil, fmt.Errorf("could not parse label selector %v: %w", labelSelector, err) + } + return &ServerLeaseCounter{ + lister: lister, + selector: selector, + fallbackCount: fallbackCount, + }, nil +} + +// CountServers counts the number of leases in the apiserver matching the provided +// label selector. +// +// In the event that no valid leases are found or lease listing fails, the +// fallback count is returned. This fallback count is updated upon successful +// discovery of valid leases. +func (lc *ServerLeaseCounter) CountServers() int { + // Since the number of proxy servers is generally small (1-10), we opted against + // using a LIST and WATCH pattern and instead list all leases in the informer. + // The informer still uses LIST and WATCH under the hood, so this doesn't result + // in additional calls in the apiserver, and checking whether a lease is valid + // is cheap. + leases, err := lc.lister.List(lc.selector) + if err != nil { + klog.Errorf("could not list leases to update server count, using fallback count (%v): %v", lc.fallbackCount, err) + return lc.fallbackCount + } + + count := 0 + for _, lease := range leases { + if isLeaseValid(lease) { + count++ + } else { + klog.InfoS("excluding expired lease from server count", "selector", lc.selector, "lease", lease) + } + } + + if count == 0 { + klog.Infof("no valid leases found, using fallback count (%v)", lc.fallbackCount) + return lc.fallbackCount + } + + if count != lc.fallbackCount { + klog.Infof("found %v valid leases, updating fallback count", count) + lc.fallbackCount = count + } + + return count +} + +func isLeaseValid(lease *coordinationv1api.Lease) bool { + var lastRenewTime time.Time + if lease.Spec.RenewTime != nil { + lastRenewTime = lease.Spec.RenewTime.Time + } else if lease.Spec.AcquireTime != nil { + lastRenewTime = lease.Spec.AcquireTime.Time + } else { + klog.Warningf("lease %v has neither a renew time or an acquire time, marking as expired: %v", lease.Name, lease) + return false + } + + duration := time.Duration(*lease.Spec.LeaseDurationSeconds) * time.Second + + return lastRenewTime.Add(duration).After(timeNow()) // renewTime+duration > time.Now() +} diff --git a/pkg/servercounter/lease_counter_test.go b/pkg/servercounter/lease_counter_test.go new file mode 100644 index 000000000..c5f5e1cb9 --- /dev/null +++ b/pkg/servercounter/lease_counter_test.go @@ -0,0 +1,316 @@ +package servercounter + +import ( + "fmt" + "testing" + "time" + + coordinationv1 "k8s.io/api/coordination/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + coordinationv1listers "k8s.io/client-go/listers/coordination/v1" +) + +type leaseTemplate struct { + durationSecs int32 + timeSinceAcquire time.Duration + timeSinceRenew time.Duration + labels map[string]string +} + +type controlledTime struct { + t time.Time +} + +func (ct *controlledTime) Now() time.Time { + return ct.t +} + +func (ct *controlledTime) Advance(d time.Duration) { + ct.t = ct.t.Add(d) +} + +func newLeaseFromTemplate(template leaseTemplate) *coordinationv1.Lease { + lease := &coordinationv1.Lease{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Labels: template.labels, + }, + Spec: coordinationv1.LeaseSpec{}, + } + + if template.durationSecs != 0 { + lease.Spec.LeaseDurationSeconds = &template.durationSecs + } + if template.timeSinceAcquire != time.Duration(0) { + acquireTime := metav1.NewMicroTime(timeNow().Add(-template.timeSinceAcquire)) + lease.Spec.AcquireTime = &acquireTime + } + if template.timeSinceRenew != time.Duration(0) { + renewTime := metav1.NewMicroTime(timeNow().Add(-template.timeSinceRenew)) + lease.Spec.RenewTime = &renewTime + } + + return lease +} + +func TestIsLeaseValid(t *testing.T) { + testCases := []struct { + name string + template leaseTemplate + want bool + }{ + { + name: "freshly acquired lease is valid", + template: leaseTemplate{ + durationSecs: 1000, + timeSinceAcquire: time.Second, + }, + want: true, + }, { + name: "freshly renewed lease is valid", + template: leaseTemplate{ + durationSecs: 1000, + timeSinceAcquire: 10000 * time.Second, + timeSinceRenew: time.Second, + }, + want: true, + }, { + name: "lease with neither acquisition nor renewal time is invalid", + template: leaseTemplate{ + durationSecs: 1000, + }, + want: false, + }, { + name: "expired lease (only acquired) is invalid", + template: leaseTemplate{ + durationSecs: 1000, + timeSinceAcquire: 10000 * time.Second, + }, + want: false, + }, { + name: "expired lease (acquired and renewed) is invalid", + template: leaseTemplate{ + durationSecs: 1000, + timeSinceAcquire: 10000 * time.Second, + timeSinceRenew: 9000 * time.Second, + }, + want: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + lease := newLeaseFromTemplate(tc.template) + + got := isLeaseValid(lease) + if got != tc.want { + t.Errorf("incorrect lease validity (got: %v, want: %v)", got, tc.want) + } + }) + } +} + +type fakeLeaseLister struct { + leases []*coordinationv1.Lease + calls []labels.Selector + err error +} + +type labelMap map[string]string + +func (l labelMap) Has(label string) bool { + _, exists := l[label] + return exists +} +func (l labelMap) Get(label string) string { + value, exists := l[label] + if !exists { + return "" + } + + return value +} + +func (lister *fakeLeaseLister) List(selector labels.Selector) ([]*coordinationv1.Lease, error) { + lister.calls = append(lister.calls, selector) + + if lister.err != nil { + return nil, lister.err + } + + res := []*coordinationv1.Lease{} + for _, lease := range lister.leases { + if selector.Matches(labelMap(lease.Labels)) { + res = append(res, lease) + } + } + + return res, nil +} +func (lister *fakeLeaseLister) Leases(_ string) coordinationv1listers.LeaseNamespaceLister { + panic("not implemented") +} + +func TestServerLeaseCounter(t *testing.T) { + testCases := []struct { + name string + + templates []leaseTemplate + leaseListerError error + + labelSelector string + fallbackCount int + + want int + }{ + { + name: "returns fallback count when no leases exist", + templates: []leaseTemplate{}, + labelSelector: "label=value", + fallbackCount: 999, + want: 999, + }, { + name: "returns fallback count when no leases matching selector exist", + templates: []leaseTemplate{ + { + durationSecs: 1000, + timeSinceAcquire: time.Second, + labels: labelMap{"label": "wrong_value"}, + }, + { + durationSecs: 1000, + timeSinceAcquire: time.Second, + labels: labelMap{"wrong_label": "value"}, + }, + }, + labelSelector: "label=value", + fallbackCount: 999, + want: 999, + }, { + name: "returns fallback count when no leases matching selector are still valid", + templates: []leaseTemplate{ + { + durationSecs: 1000, + timeSinceAcquire: 10000 * time.Second, + labels: labelMap{"label": "value"}, + }, + { + durationSecs: 1000, + timeSinceAcquire: time.Second, + labels: labelMap{"wrong_label": "value"}, + }, + }, + labelSelector: "label=value", + fallbackCount: 999, + want: 999, + }, { + name: "returns fallbackCount when LeaseLister returns an error", + templates: []leaseTemplate{}, + labelSelector: "label=value", + fallbackCount: 999, + leaseListerError: fmt.Errorf("test error"), + want: 999, + }, { + name: "counts only valid leases matching label selector", + templates: []leaseTemplate{ + { + durationSecs: 1000, + timeSinceAcquire: time.Second, + labels: labelMap{"label": "value"}, + }, + { + durationSecs: 1000, + timeSinceAcquire: time.Second, + labels: labelMap{"label": "value"}, + }, + { + durationSecs: 1000, + timeSinceAcquire: time.Second, + labels: labelMap{"label": "wrong_value"}, + }, + }, + labelSelector: "label=value", + fallbackCount: 999, + want: 2, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ct := &controlledTime{t: time.Unix(10000000, 0)} + timeNow = ct.Now + leases := make([]*coordinationv1.Lease, len(tc.templates)) + for i, template := range tc.templates { + leases[i] = newLeaseFromTemplate(template) + } + lister := &fakeLeaseLister{ + leases: leases, + err: tc.leaseListerError, + } + ct.Advance(time.Millisecond) + + counter, err := NewServerLeaseCounter(lister, tc.labelSelector, tc.fallbackCount) + if err != nil { + t.Fatalf("server counter creation failed: %v", err) + } + + got := counter.CountServers() + if tc.want != got { + t.Errorf("incorrect server count (got: %v, want: %v)", got, tc.want) + } + }) + } +} + +func TestServerLeaseCounter_FallbackCount(t *testing.T) { + validLease := leaseTemplate{ + durationSecs: 1000, + timeSinceAcquire: time.Second, + labels: map[string]string{"label": "value"}, + } + invalidLease := leaseTemplate{ + durationSecs: 1000, + timeSinceAcquire: time.Second * 10000, + labels: map[string]string{"label": "value"}, + } + + ct := &controlledTime{t: time.Unix(1000, 0)} + timeNow = ct.Now + leases := []*coordinationv1.Lease{} + leases = append(leases, newLeaseFromTemplate(validLease), newLeaseFromTemplate(validLease), newLeaseFromTemplate(validLease), newLeaseFromTemplate(invalidLease)) + ct.Advance(time.Millisecond) + + lister := &fakeLeaseLister{ + leases: leases, + err: fmt.Errorf("dummy lister error"), + } + + initialFallback := 999 + counter, err := NewServerLeaseCounter(lister, "label=value", initialFallback) + if err != nil { + t.Fatalf("server counter creation failed: %v", err) + } + + // First call should return fallback count of 999 because of lister error. + got := counter.CountServers() + if got != initialFallback { + t.Errorf("lease counter did not return fallback count on lister error (got: %v, want: %v)", got, initialFallback) + } + + // Second call should return the actual count (3) upon lister success. + actualCount := 3 + lister.err = nil + got = counter.CountServers() + if got != actualCount { + t.Errorf("lease counter did not return actual count on lister success (got: %v, want: %v)", got, actualCount) + } + + // Third call should return updated fallback count (3) upon lister failure. + lister.err = fmt.Errorf("dummy lister error") + lister.leases = append(lister.leases, newLeaseFromTemplate(validLease)) // Change actual count just in case. + got = counter.CountServers() + if got != actualCount { + t.Errorf("lease counter did not update fallback count after lister success, returned incorrect count on subsequent lister error (got: %v, want: %v)", got, actualCount) + } +} diff --git a/vendor/github.com/evanphx/json-patch/v5/LICENSE b/vendor/github.com/evanphx/json-patch/v5/LICENSE new file mode 100644 index 000000000..df76d7d77 --- /dev/null +++ b/vendor/github.com/evanphx/json-patch/v5/LICENSE @@ -0,0 +1,25 @@ +Copyright (c) 2014, Evan Phoenix +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. +* Neither the name of the Evan Phoenix nor the names of its contributors + may be used to endorse or promote products derived from this software + without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/evanphx/json-patch/v5/errors.go b/vendor/github.com/evanphx/json-patch/v5/errors.go new file mode 100644 index 000000000..75304b443 --- /dev/null +++ b/vendor/github.com/evanphx/json-patch/v5/errors.go @@ -0,0 +1,38 @@ +package jsonpatch + +import "fmt" + +// AccumulatedCopySizeError is an error type returned when the accumulated size +// increase caused by copy operations in a patch operation has exceeded the +// limit. +type AccumulatedCopySizeError struct { + limit int64 + accumulated int64 +} + +// NewAccumulatedCopySizeError returns an AccumulatedCopySizeError. +func NewAccumulatedCopySizeError(l, a int64) *AccumulatedCopySizeError { + return &AccumulatedCopySizeError{limit: l, accumulated: a} +} + +// Error implements the error interface. +func (a *AccumulatedCopySizeError) Error() string { + return fmt.Sprintf("Unable to complete the copy, the accumulated size increase of copy is %d, exceeding the limit %d", a.accumulated, a.limit) +} + +// ArraySizeError is an error type returned when the array size has exceeded +// the limit. +type ArraySizeError struct { + limit int + size int +} + +// NewArraySizeError returns an ArraySizeError. +func NewArraySizeError(l, s int) *ArraySizeError { + return &ArraySizeError{limit: l, size: s} +} + +// Error implements the error interface. +func (a *ArraySizeError) Error() string { + return fmt.Sprintf("Unable to create array of size %d, limit is %d", a.size, a.limit) +} diff --git a/vendor/github.com/evanphx/json-patch/v5/internal/json/decode.go b/vendor/github.com/evanphx/json-patch/v5/internal/json/decode.go new file mode 100644 index 000000000..e9bb0efe7 --- /dev/null +++ b/vendor/github.com/evanphx/json-patch/v5/internal/json/decode.go @@ -0,0 +1,1385 @@ +// Copyright 2010 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Represents JSON data structure using native Go types: booleans, floats, +// strings, arrays, and maps. + +package json + +import ( + "encoding" + "encoding/base64" + "fmt" + "reflect" + "strconv" + "strings" + "sync" + "unicode" + "unicode/utf16" + "unicode/utf8" +) + +// Unmarshal parses the JSON-encoded data and stores the result +// in the value pointed to by v. If v is nil or not a pointer, +// Unmarshal returns an InvalidUnmarshalError. +// +// Unmarshal uses the inverse of the encodings that +// Marshal uses, allocating maps, slices, and pointers as necessary, +// with the following additional rules: +// +// To unmarshal JSON into a pointer, Unmarshal first handles the case of +// the JSON being the JSON literal null. In that case, Unmarshal sets +// the pointer to nil. Otherwise, Unmarshal unmarshals the JSON into +// the value pointed at by the pointer. If the pointer is nil, Unmarshal +// allocates a new value for it to point to. +// +// To unmarshal JSON into a value implementing the Unmarshaler interface, +// Unmarshal calls that value's UnmarshalJSON method, including +// when the input is a JSON null. +// Otherwise, if the value implements encoding.TextUnmarshaler +// and the input is a JSON quoted string, Unmarshal calls that value's +// UnmarshalText method with the unquoted form of the string. +// +// To unmarshal JSON into a struct, Unmarshal matches incoming object +// keys to the keys used by Marshal (either the struct field name or its tag), +// preferring an exact match but also accepting a case-insensitive match. By +// default, object keys which don't have a corresponding struct field are +// ignored (see Decoder.DisallowUnknownFields for an alternative). +// +// To unmarshal JSON into an interface value, +// Unmarshal stores one of these in the interface value: +// +// bool, for JSON booleans +// float64, for JSON numbers +// string, for JSON strings +// []interface{}, for JSON arrays +// map[string]interface{}, for JSON objects +// nil for JSON null +// +// To unmarshal a JSON array into a slice, Unmarshal resets the slice length +// to zero and then appends each element to the slice. +// As a special case, to unmarshal an empty JSON array into a slice, +// Unmarshal replaces the slice with a new empty slice. +// +// To unmarshal a JSON array into a Go array, Unmarshal decodes +// JSON array elements into corresponding Go array elements. +// If the Go array is smaller than the JSON array, +// the additional JSON array elements are discarded. +// If the JSON array is smaller than the Go array, +// the additional Go array elements are set to zero values. +// +// To unmarshal a JSON object into a map, Unmarshal first establishes a map to +// use. If the map is nil, Unmarshal allocates a new map. Otherwise Unmarshal +// reuses the existing map, keeping existing entries. Unmarshal then stores +// key-value pairs from the JSON object into the map. The map's key type must +// either be any string type, an integer, implement json.Unmarshaler, or +// implement encoding.TextUnmarshaler. +// +// If the JSON-encoded data contain a syntax error, Unmarshal returns a SyntaxError. +// +// If a JSON value is not appropriate for a given target type, +// or if a JSON number overflows the target type, Unmarshal +// skips that field and completes the unmarshaling as best it can. +// If no more serious errors are encountered, Unmarshal returns +// an UnmarshalTypeError describing the earliest such error. In any +// case, it's not guaranteed that all the remaining fields following +// the problematic one will be unmarshaled into the target object. +// +// The JSON null value unmarshals into an interface, map, pointer, or slice +// by setting that Go value to nil. Because null is often used in JSON to mean +// “not present,” unmarshaling a JSON null into any other Go type has no effect +// on the value and produces no error. +// +// When unmarshaling quoted strings, invalid UTF-8 or +// invalid UTF-16 surrogate pairs are not treated as an error. +// Instead, they are replaced by the Unicode replacement +// character U+FFFD. +func Unmarshal(data []byte, v any) error { + // Check for well-formedness. + // Avoids filling out half a data structure + // before discovering a JSON syntax error. + d := ds.Get().(*decodeState) + defer ds.Put(d) + //var d decodeState + d.useNumber = true + err := checkValid(data, &d.scan) + if err != nil { + return err + } + + d.init(data) + return d.unmarshal(v) +} + +var ds = sync.Pool{ + New: func() any { + return new(decodeState) + }, +} + +func UnmarshalWithKeys(data []byte, v any) ([]string, error) { + // Check for well-formedness. + // Avoids filling out half a data structure + // before discovering a JSON syntax error. + + d := ds.Get().(*decodeState) + defer ds.Put(d) + //var d decodeState + d.useNumber = true + err := checkValid(data, &d.scan) + if err != nil { + return nil, err + } + + d.init(data) + err = d.unmarshal(v) + if err != nil { + return nil, err + } + + return d.lastKeys, nil +} + +func UnmarshalValid(data []byte, v any) error { + // Check for well-formedness. + // Avoids filling out half a data structure + // before discovering a JSON syntax error. + d := ds.Get().(*decodeState) + defer ds.Put(d) + //var d decodeState + d.useNumber = true + + d.init(data) + return d.unmarshal(v) +} + +func UnmarshalValidWithKeys(data []byte, v any) ([]string, error) { + // Check for well-formedness. + // Avoids filling out half a data structure + // before discovering a JSON syntax error. + + d := ds.Get().(*decodeState) + defer ds.Put(d) + //var d decodeState + d.useNumber = true + + d.init(data) + err := d.unmarshal(v) + if err != nil { + return nil, err + } + + return d.lastKeys, nil +} + +// Unmarshaler is the interface implemented by types +// that can unmarshal a JSON description of themselves. +// The input can be assumed to be a valid encoding of +// a JSON value. UnmarshalJSON must copy the JSON data +// if it wishes to retain the data after returning. +// +// By convention, to approximate the behavior of Unmarshal itself, +// Unmarshalers implement UnmarshalJSON([]byte("null")) as a no-op. +type Unmarshaler interface { + UnmarshalJSON([]byte) error +} + +// An UnmarshalTypeError describes a JSON value that was +// not appropriate for a value of a specific Go type. +type UnmarshalTypeError struct { + Value string // description of JSON value - "bool", "array", "number -5" + Type reflect.Type // type of Go value it could not be assigned to + Offset int64 // error occurred after reading Offset bytes + Struct string // name of the struct type containing the field + Field string // the full path from root node to the field +} + +func (e *UnmarshalTypeError) Error() string { + if e.Struct != "" || e.Field != "" { + return "json: cannot unmarshal " + e.Value + " into Go struct field " + e.Struct + "." + e.Field + " of type " + e.Type.String() + } + return "json: cannot unmarshal " + e.Value + " into Go value of type " + e.Type.String() +} + +// An UnmarshalFieldError describes a JSON object key that +// led to an unexported (and therefore unwritable) struct field. +// +// Deprecated: No longer used; kept for compatibility. +type UnmarshalFieldError struct { + Key string + Type reflect.Type + Field reflect.StructField +} + +func (e *UnmarshalFieldError) Error() string { + return "json: cannot unmarshal object key " + strconv.Quote(e.Key) + " into unexported field " + e.Field.Name + " of type " + e.Type.String() +} + +// An InvalidUnmarshalError describes an invalid argument passed to Unmarshal. +// (The argument to Unmarshal must be a non-nil pointer.) +type InvalidUnmarshalError struct { + Type reflect.Type +} + +func (e *InvalidUnmarshalError) Error() string { + if e.Type == nil { + return "json: Unmarshal(nil)" + } + + if e.Type.Kind() != reflect.Pointer { + return "json: Unmarshal(non-pointer " + e.Type.String() + ")" + } + return "json: Unmarshal(nil " + e.Type.String() + ")" +} + +func (d *decodeState) unmarshal(v any) error { + rv := reflect.ValueOf(v) + if rv.Kind() != reflect.Pointer || rv.IsNil() { + return &InvalidUnmarshalError{reflect.TypeOf(v)} + } + + d.scan.reset() + d.scanWhile(scanSkipSpace) + // We decode rv not rv.Elem because the Unmarshaler interface + // test must be applied at the top level of the value. + err := d.value(rv) + if err != nil { + return d.addErrorContext(err) + } + return d.savedError +} + +// A Number represents a JSON number literal. +type Number string + +// String returns the literal text of the number. +func (n Number) String() string { return string(n) } + +// Float64 returns the number as a float64. +func (n Number) Float64() (float64, error) { + return strconv.ParseFloat(string(n), 64) +} + +// Int64 returns the number as an int64. +func (n Number) Int64() (int64, error) { + return strconv.ParseInt(string(n), 10, 64) +} + +// An errorContext provides context for type errors during decoding. +type errorContext struct { + Struct reflect.Type + FieldStack []string +} + +// decodeState represents the state while decoding a JSON value. +type decodeState struct { + data []byte + off int // next read offset in data + opcode int // last read result + scan scanner + errorContext *errorContext + savedError error + useNumber bool + disallowUnknownFields bool + lastKeys []string +} + +// readIndex returns the position of the last byte read. +func (d *decodeState) readIndex() int { + return d.off - 1 +} + +// phasePanicMsg is used as a panic message when we end up with something that +// shouldn't happen. It can indicate a bug in the JSON decoder, or that +// something is editing the data slice while the decoder executes. +const phasePanicMsg = "JSON decoder out of sync - data changing underfoot?" + +func (d *decodeState) init(data []byte) *decodeState { + d.data = data + d.off = 0 + d.savedError = nil + if d.errorContext != nil { + d.errorContext.Struct = nil + // Reuse the allocated space for the FieldStack slice. + d.errorContext.FieldStack = d.errorContext.FieldStack[:0] + } + return d +} + +// saveError saves the first err it is called with, +// for reporting at the end of the unmarshal. +func (d *decodeState) saveError(err error) { + if d.savedError == nil { + d.savedError = d.addErrorContext(err) + } +} + +// addErrorContext returns a new error enhanced with information from d.errorContext +func (d *decodeState) addErrorContext(err error) error { + if d.errorContext != nil && (d.errorContext.Struct != nil || len(d.errorContext.FieldStack) > 0) { + switch err := err.(type) { + case *UnmarshalTypeError: + err.Struct = d.errorContext.Struct.Name() + err.Field = strings.Join(d.errorContext.FieldStack, ".") + } + } + return err +} + +// skip scans to the end of what was started. +func (d *decodeState) skip() { + s, data, i := &d.scan, d.data, d.off + depth := len(s.parseState) + for { + op := s.step(s, data[i]) + i++ + if len(s.parseState) < depth { + d.off = i + d.opcode = op + return + } + } +} + +// scanNext processes the byte at d.data[d.off]. +func (d *decodeState) scanNext() { + if d.off < len(d.data) { + d.opcode = d.scan.step(&d.scan, d.data[d.off]) + d.off++ + } else { + d.opcode = d.scan.eof() + d.off = len(d.data) + 1 // mark processed EOF with len+1 + } +} + +// scanWhile processes bytes in d.data[d.off:] until it +// receives a scan code not equal to op. +func (d *decodeState) scanWhile(op int) { + s, data, i := &d.scan, d.data, d.off + for i < len(data) { + newOp := s.step(s, data[i]) + i++ + if newOp != op { + d.opcode = newOp + d.off = i + return + } + } + + d.off = len(data) + 1 // mark processed EOF with len+1 + d.opcode = d.scan.eof() +} + +// rescanLiteral is similar to scanWhile(scanContinue), but it specialises the +// common case where we're decoding a literal. The decoder scans the input +// twice, once for syntax errors and to check the length of the value, and the +// second to perform the decoding. +// +// Only in the second step do we use decodeState to tokenize literals, so we +// know there aren't any syntax errors. We can take advantage of that knowledge, +// and scan a literal's bytes much more quickly. +func (d *decodeState) rescanLiteral() { + data, i := d.data, d.off +Switch: + switch data[i-1] { + case '"': // string + for ; i < len(data); i++ { + switch data[i] { + case '\\': + i++ // escaped char + case '"': + i++ // tokenize the closing quote too + break Switch + } + } + case '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '-': // number + for ; i < len(data); i++ { + switch data[i] { + case '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + '.', 'e', 'E', '+', '-': + default: + break Switch + } + } + case 't': // true + i += len("rue") + case 'f': // false + i += len("alse") + case 'n': // null + i += len("ull") + } + if i < len(data) { + d.opcode = stateEndValue(&d.scan, data[i]) + } else { + d.opcode = scanEnd + } + d.off = i + 1 +} + +// value consumes a JSON value from d.data[d.off-1:], decoding into v, and +// reads the following byte ahead. If v is invalid, the value is discarded. +// The first byte of the value has been read already. +func (d *decodeState) value(v reflect.Value) error { + switch d.opcode { + default: + panic(phasePanicMsg) + + case scanBeginArray: + if v.IsValid() { + if err := d.array(v); err != nil { + return err + } + } else { + d.skip() + } + d.scanNext() + + case scanBeginObject: + if v.IsValid() { + if err := d.object(v); err != nil { + return err + } + } else { + d.skip() + } + d.scanNext() + + case scanBeginLiteral: + // All bytes inside literal return scanContinue op code. + start := d.readIndex() + d.rescanLiteral() + + if v.IsValid() { + if err := d.literalStore(d.data[start:d.readIndex()], v, false); err != nil { + return err + } + } + } + return nil +} + +type unquotedValue struct{} + +// valueQuoted is like value but decodes a +// quoted string literal or literal null into an interface value. +// If it finds anything other than a quoted string literal or null, +// valueQuoted returns unquotedValue{}. +func (d *decodeState) valueQuoted() any { + switch d.opcode { + default: + panic(phasePanicMsg) + + case scanBeginArray, scanBeginObject: + d.skip() + d.scanNext() + + case scanBeginLiteral: + v := d.literalInterface() + switch v.(type) { + case nil, string: + return v + } + } + return unquotedValue{} +} + +// indirect walks down v allocating pointers as needed, +// until it gets to a non-pointer. +// If it encounters an Unmarshaler, indirect stops and returns that. +// If decodingNull is true, indirect stops at the first settable pointer so it +// can be set to nil. +func indirect(v reflect.Value, decodingNull bool) (Unmarshaler, encoding.TextUnmarshaler, reflect.Value) { + // Issue #24153 indicates that it is generally not a guaranteed property + // that you may round-trip a reflect.Value by calling Value.Addr().Elem() + // and expect the value to still be settable for values derived from + // unexported embedded struct fields. + // + // The logic below effectively does this when it first addresses the value + // (to satisfy possible pointer methods) and continues to dereference + // subsequent pointers as necessary. + // + // After the first round-trip, we set v back to the original value to + // preserve the original RW flags contained in reflect.Value. + v0 := v + haveAddr := false + + // If v is a named type and is addressable, + // start with its address, so that if the type has pointer methods, + // we find them. + if v.Kind() != reflect.Pointer && v.Type().Name() != "" && v.CanAddr() { + haveAddr = true + v = v.Addr() + } + for { + // Load value from interface, but only if the result will be + // usefully addressable. + if v.Kind() == reflect.Interface && !v.IsNil() { + e := v.Elem() + if e.Kind() == reflect.Pointer && !e.IsNil() && (!decodingNull || e.Elem().Kind() == reflect.Pointer) { + haveAddr = false + v = e + continue + } + } + + if v.Kind() != reflect.Pointer { + break + } + + if decodingNull && v.CanSet() { + break + } + + // Prevent infinite loop if v is an interface pointing to its own address: + // var v interface{} + // v = &v + if v.Elem().Kind() == reflect.Interface && v.Elem().Elem() == v { + v = v.Elem() + break + } + if v.IsNil() { + v.Set(reflect.New(v.Type().Elem())) + } + if v.Type().NumMethod() > 0 && v.CanInterface() { + if u, ok := v.Interface().(Unmarshaler); ok { + return u, nil, reflect.Value{} + } + if !decodingNull { + if u, ok := v.Interface().(encoding.TextUnmarshaler); ok { + return nil, u, reflect.Value{} + } + } + } + + if haveAddr { + v = v0 // restore original value after round-trip Value.Addr().Elem() + haveAddr = false + } else { + v = v.Elem() + } + } + return nil, nil, v +} + +// array consumes an array from d.data[d.off-1:], decoding into v. +// The first byte of the array ('[') has been read already. +func (d *decodeState) array(v reflect.Value) error { + // Check for unmarshaler. + u, ut, pv := indirect(v, false) + if u != nil { + start := d.readIndex() + d.skip() + return u.UnmarshalJSON(d.data[start:d.off]) + } + if ut != nil { + d.saveError(&UnmarshalTypeError{Value: "array", Type: v.Type(), Offset: int64(d.off)}) + d.skip() + return nil + } + v = pv + + // Check type of target. + switch v.Kind() { + case reflect.Interface: + if v.NumMethod() == 0 { + // Decoding into nil interface? Switch to non-reflect code. + ai := d.arrayInterface() + v.Set(reflect.ValueOf(ai)) + return nil + } + // Otherwise it's invalid. + fallthrough + default: + d.saveError(&UnmarshalTypeError{Value: "array", Type: v.Type(), Offset: int64(d.off)}) + d.skip() + return nil + case reflect.Array, reflect.Slice: + break + } + + i := 0 + for { + // Look ahead for ] - can only happen on first iteration. + d.scanWhile(scanSkipSpace) + if d.opcode == scanEndArray { + break + } + + // Get element of array, growing if necessary. + if v.Kind() == reflect.Slice { + // Grow slice if necessary + if i >= v.Cap() { + newcap := v.Cap() + v.Cap()/2 + if newcap < 4 { + newcap = 4 + } + newv := reflect.MakeSlice(v.Type(), v.Len(), newcap) + reflect.Copy(newv, v) + v.Set(newv) + } + if i >= v.Len() { + v.SetLen(i + 1) + } + } + + if i < v.Len() { + // Decode into element. + if err := d.value(v.Index(i)); err != nil { + return err + } + } else { + // Ran out of fixed array: skip. + if err := d.value(reflect.Value{}); err != nil { + return err + } + } + i++ + + // Next token must be , or ]. + if d.opcode == scanSkipSpace { + d.scanWhile(scanSkipSpace) + } + if d.opcode == scanEndArray { + break + } + if d.opcode != scanArrayValue { + panic(phasePanicMsg) + } + } + + if i < v.Len() { + if v.Kind() == reflect.Array { + // Array. Zero the rest. + z := reflect.Zero(v.Type().Elem()) + for ; i < v.Len(); i++ { + v.Index(i).Set(z) + } + } else { + v.SetLen(i) + } + } + if i == 0 && v.Kind() == reflect.Slice { + v.Set(reflect.MakeSlice(v.Type(), 0, 0)) + } + return nil +} + +var nullLiteral = []byte("null") +var textUnmarshalerType = reflect.TypeOf((*encoding.TextUnmarshaler)(nil)).Elem() + +// object consumes an object from d.data[d.off-1:], decoding into v. +// The first byte ('{') of the object has been read already. +func (d *decodeState) object(v reflect.Value) error { + // Check for unmarshaler. + u, ut, pv := indirect(v, false) + if u != nil { + start := d.readIndex() + d.skip() + return u.UnmarshalJSON(d.data[start:d.off]) + } + if ut != nil { + d.saveError(&UnmarshalTypeError{Value: "object", Type: v.Type(), Offset: int64(d.off)}) + d.skip() + return nil + } + v = pv + t := v.Type() + + // Decoding into nil interface? Switch to non-reflect code. + if v.Kind() == reflect.Interface && v.NumMethod() == 0 { + oi := d.objectInterface() + v.Set(reflect.ValueOf(oi)) + return nil + } + + var fields structFields + + // Check type of target: + // struct or + // map[T1]T2 where T1 is string, an integer type, + // or an encoding.TextUnmarshaler + switch v.Kind() { + case reflect.Map: + // Map key must either have string kind, have an integer kind, + // or be an encoding.TextUnmarshaler. + switch t.Key().Kind() { + case reflect.String, + reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, + reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Uintptr: + default: + if !reflect.PointerTo(t.Key()).Implements(textUnmarshalerType) { + d.saveError(&UnmarshalTypeError{Value: "object", Type: t, Offset: int64(d.off)}) + d.skip() + return nil + } + } + if v.IsNil() { + v.Set(reflect.MakeMap(t)) + } + case reflect.Struct: + fields = cachedTypeFields(t) + // ok + default: + d.saveError(&UnmarshalTypeError{Value: "object", Type: t, Offset: int64(d.off)}) + d.skip() + return nil + } + + var mapElem reflect.Value + var origErrorContext errorContext + if d.errorContext != nil { + origErrorContext = *d.errorContext + } + + var keys []string + + for { + // Read opening " of string key or closing }. + d.scanWhile(scanSkipSpace) + if d.opcode == scanEndObject { + // closing } - can only happen on first iteration. + break + } + if d.opcode != scanBeginLiteral { + panic(phasePanicMsg) + } + + // Read key. + start := d.readIndex() + d.rescanLiteral() + item := d.data[start:d.readIndex()] + key, ok := unquoteBytes(item) + if !ok { + panic(phasePanicMsg) + } + + keys = append(keys, string(key)) + + // Figure out field corresponding to key. + var subv reflect.Value + destring := false // whether the value is wrapped in a string to be decoded first + + if v.Kind() == reflect.Map { + elemType := t.Elem() + if !mapElem.IsValid() { + mapElem = reflect.New(elemType).Elem() + } else { + mapElem.Set(reflect.Zero(elemType)) + } + subv = mapElem + } else { + var f *field + if i, ok := fields.nameIndex[string(key)]; ok { + // Found an exact name match. + f = &fields.list[i] + } else { + // Fall back to the expensive case-insensitive + // linear search. + for i := range fields.list { + ff := &fields.list[i] + if ff.equalFold(ff.nameBytes, key) { + f = ff + break + } + } + } + if f != nil { + subv = v + destring = f.quoted + for _, i := range f.index { + if subv.Kind() == reflect.Pointer { + if subv.IsNil() { + // If a struct embeds a pointer to an unexported type, + // it is not possible to set a newly allocated value + // since the field is unexported. + // + // See https://golang.org/issue/21357 + if !subv.CanSet() { + d.saveError(fmt.Errorf("json: cannot set embedded pointer to unexported struct: %v", subv.Type().Elem())) + // Invalidate subv to ensure d.value(subv) skips over + // the JSON value without assigning it to subv. + subv = reflect.Value{} + destring = false + break + } + subv.Set(reflect.New(subv.Type().Elem())) + } + subv = subv.Elem() + } + subv = subv.Field(i) + } + if d.errorContext == nil { + d.errorContext = new(errorContext) + } + d.errorContext.FieldStack = append(d.errorContext.FieldStack, f.name) + d.errorContext.Struct = t + } else if d.disallowUnknownFields { + d.saveError(fmt.Errorf("json: unknown field %q", key)) + } + } + + // Read : before value. + if d.opcode == scanSkipSpace { + d.scanWhile(scanSkipSpace) + } + if d.opcode != scanObjectKey { + panic(phasePanicMsg) + } + d.scanWhile(scanSkipSpace) + + if destring { + switch qv := d.valueQuoted().(type) { + case nil: + if err := d.literalStore(nullLiteral, subv, false); err != nil { + return err + } + case string: + if err := d.literalStore([]byte(qv), subv, true); err != nil { + return err + } + default: + d.saveError(fmt.Errorf("json: invalid use of ,string struct tag, trying to unmarshal unquoted value into %v", subv.Type())) + } + } else { + if err := d.value(subv); err != nil { + return err + } + } + + // Write value back to map; + // if using struct, subv points into struct already. + if v.Kind() == reflect.Map { + kt := t.Key() + var kv reflect.Value + switch { + case reflect.PointerTo(kt).Implements(textUnmarshalerType): + kv = reflect.New(kt) + if err := d.literalStore(item, kv, true); err != nil { + return err + } + kv = kv.Elem() + case kt.Kind() == reflect.String: + kv = reflect.ValueOf(key).Convert(kt) + default: + switch kt.Kind() { + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + s := string(key) + n, err := strconv.ParseInt(s, 10, 64) + if err != nil || reflect.Zero(kt).OverflowInt(n) { + d.saveError(&UnmarshalTypeError{Value: "number " + s, Type: kt, Offset: int64(start + 1)}) + break + } + kv = reflect.ValueOf(n).Convert(kt) + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Uintptr: + s := string(key) + n, err := strconv.ParseUint(s, 10, 64) + if err != nil || reflect.Zero(kt).OverflowUint(n) { + d.saveError(&UnmarshalTypeError{Value: "number " + s, Type: kt, Offset: int64(start + 1)}) + break + } + kv = reflect.ValueOf(n).Convert(kt) + default: + panic("json: Unexpected key type") // should never occur + } + } + if kv.IsValid() { + v.SetMapIndex(kv, subv) + } + } + + // Next token must be , or }. + if d.opcode == scanSkipSpace { + d.scanWhile(scanSkipSpace) + } + if d.errorContext != nil { + // Reset errorContext to its original state. + // Keep the same underlying array for FieldStack, to reuse the + // space and avoid unnecessary allocs. + d.errorContext.FieldStack = d.errorContext.FieldStack[:len(origErrorContext.FieldStack)] + d.errorContext.Struct = origErrorContext.Struct + } + if d.opcode == scanEndObject { + break + } + if d.opcode != scanObjectValue { + panic(phasePanicMsg) + } + } + + if v.Kind() == reflect.Map { + d.lastKeys = keys + } + return nil +} + +// convertNumber converts the number literal s to a float64 or a Number +// depending on the setting of d.useNumber. +func (d *decodeState) convertNumber(s string) (any, error) { + if d.useNumber { + return Number(s), nil + } + f, err := strconv.ParseFloat(s, 64) + if err != nil { + return nil, &UnmarshalTypeError{Value: "number " + s, Type: reflect.TypeOf(0.0), Offset: int64(d.off)} + } + return f, nil +} + +var numberType = reflect.TypeOf(Number("")) + +// literalStore decodes a literal stored in item into v. +// +// fromQuoted indicates whether this literal came from unwrapping a +// string from the ",string" struct tag option. this is used only to +// produce more helpful error messages. +func (d *decodeState) literalStore(item []byte, v reflect.Value, fromQuoted bool) error { + // Check for unmarshaler. + if len(item) == 0 { + //Empty string given + d.saveError(fmt.Errorf("json: invalid use of ,string struct tag, trying to unmarshal %q into %v", item, v.Type())) + return nil + } + isNull := item[0] == 'n' // null + u, ut, pv := indirect(v, isNull) + if u != nil { + return u.UnmarshalJSON(item) + } + if ut != nil { + if item[0] != '"' { + if fromQuoted { + d.saveError(fmt.Errorf("json: invalid use of ,string struct tag, trying to unmarshal %q into %v", item, v.Type())) + return nil + } + val := "number" + switch item[0] { + case 'n': + val = "null" + case 't', 'f': + val = "bool" + } + d.saveError(&UnmarshalTypeError{Value: val, Type: v.Type(), Offset: int64(d.readIndex())}) + return nil + } + s, ok := unquoteBytes(item) + if !ok { + if fromQuoted { + return fmt.Errorf("json: invalid use of ,string struct tag, trying to unmarshal %q into %v", item, v.Type()) + } + panic(phasePanicMsg) + } + return ut.UnmarshalText(s) + } + + v = pv + + switch c := item[0]; c { + case 'n': // null + // The main parser checks that only true and false can reach here, + // but if this was a quoted string input, it could be anything. + if fromQuoted && string(item) != "null" { + d.saveError(fmt.Errorf("json: invalid use of ,string struct tag, trying to unmarshal %q into %v", item, v.Type())) + break + } + switch v.Kind() { + case reflect.Interface, reflect.Pointer, reflect.Map, reflect.Slice: + v.Set(reflect.Zero(v.Type())) + // otherwise, ignore null for primitives/string + } + case 't', 'f': // true, false + value := item[0] == 't' + // The main parser checks that only true and false can reach here, + // but if this was a quoted string input, it could be anything. + if fromQuoted && string(item) != "true" && string(item) != "false" { + d.saveError(fmt.Errorf("json: invalid use of ,string struct tag, trying to unmarshal %q into %v", item, v.Type())) + break + } + switch v.Kind() { + default: + if fromQuoted { + d.saveError(fmt.Errorf("json: invalid use of ,string struct tag, trying to unmarshal %q into %v", item, v.Type())) + } else { + d.saveError(&UnmarshalTypeError{Value: "bool", Type: v.Type(), Offset: int64(d.readIndex())}) + } + case reflect.Bool: + v.SetBool(value) + case reflect.Interface: + if v.NumMethod() == 0 { + v.Set(reflect.ValueOf(value)) + } else { + d.saveError(&UnmarshalTypeError{Value: "bool", Type: v.Type(), Offset: int64(d.readIndex())}) + } + } + + case '"': // string + s, ok := unquoteBytes(item) + if !ok { + if fromQuoted { + return fmt.Errorf("json: invalid use of ,string struct tag, trying to unmarshal %q into %v", item, v.Type()) + } + panic(phasePanicMsg) + } + switch v.Kind() { + default: + d.saveError(&UnmarshalTypeError{Value: "string", Type: v.Type(), Offset: int64(d.readIndex())}) + case reflect.Slice: + if v.Type().Elem().Kind() != reflect.Uint8 { + d.saveError(&UnmarshalTypeError{Value: "string", Type: v.Type(), Offset: int64(d.readIndex())}) + break + } + b := make([]byte, base64.StdEncoding.DecodedLen(len(s))) + n, err := base64.StdEncoding.Decode(b, s) + if err != nil { + d.saveError(err) + break + } + v.SetBytes(b[:n]) + case reflect.String: + if v.Type() == numberType && !isValidNumber(string(s)) { + return fmt.Errorf("json: invalid number literal, trying to unmarshal %q into Number", item) + } + v.SetString(string(s)) + case reflect.Interface: + if v.NumMethod() == 0 { + v.Set(reflect.ValueOf(string(s))) + } else { + d.saveError(&UnmarshalTypeError{Value: "string", Type: v.Type(), Offset: int64(d.readIndex())}) + } + } + + default: // number + if c != '-' && (c < '0' || c > '9') { + if fromQuoted { + return fmt.Errorf("json: invalid use of ,string struct tag, trying to unmarshal %q into %v", item, v.Type()) + } + panic(phasePanicMsg) + } + s := string(item) + switch v.Kind() { + default: + if v.Kind() == reflect.String && v.Type() == numberType { + // s must be a valid number, because it's + // already been tokenized. + v.SetString(s) + break + } + if fromQuoted { + return fmt.Errorf("json: invalid use of ,string struct tag, trying to unmarshal %q into %v", item, v.Type()) + } + d.saveError(&UnmarshalTypeError{Value: "number", Type: v.Type(), Offset: int64(d.readIndex())}) + case reflect.Interface: + n, err := d.convertNumber(s) + if err != nil { + d.saveError(err) + break + } + if v.NumMethod() != 0 { + d.saveError(&UnmarshalTypeError{Value: "number", Type: v.Type(), Offset: int64(d.readIndex())}) + break + } + v.Set(reflect.ValueOf(n)) + + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + n, err := strconv.ParseInt(s, 10, 64) + if err != nil || v.OverflowInt(n) { + d.saveError(&UnmarshalTypeError{Value: "number " + s, Type: v.Type(), Offset: int64(d.readIndex())}) + break + } + v.SetInt(n) + + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Uintptr: + n, err := strconv.ParseUint(s, 10, 64) + if err != nil || v.OverflowUint(n) { + d.saveError(&UnmarshalTypeError{Value: "number " + s, Type: v.Type(), Offset: int64(d.readIndex())}) + break + } + v.SetUint(n) + + case reflect.Float32, reflect.Float64: + n, err := strconv.ParseFloat(s, v.Type().Bits()) + if err != nil || v.OverflowFloat(n) { + d.saveError(&UnmarshalTypeError{Value: "number " + s, Type: v.Type(), Offset: int64(d.readIndex())}) + break + } + v.SetFloat(n) + } + } + return nil +} + +// The xxxInterface routines build up a value to be stored +// in an empty interface. They are not strictly necessary, +// but they avoid the weight of reflection in this common case. + +// valueInterface is like value but returns interface{} +func (d *decodeState) valueInterface() (val any) { + switch d.opcode { + default: + panic(phasePanicMsg) + case scanBeginArray: + val = d.arrayInterface() + d.scanNext() + case scanBeginObject: + val = d.objectInterface() + d.scanNext() + case scanBeginLiteral: + val = d.literalInterface() + } + return +} + +// arrayInterface is like array but returns []interface{}. +func (d *decodeState) arrayInterface() []any { + var v = make([]any, 0) + for { + // Look ahead for ] - can only happen on first iteration. + d.scanWhile(scanSkipSpace) + if d.opcode == scanEndArray { + break + } + + v = append(v, d.valueInterface()) + + // Next token must be , or ]. + if d.opcode == scanSkipSpace { + d.scanWhile(scanSkipSpace) + } + if d.opcode == scanEndArray { + break + } + if d.opcode != scanArrayValue { + panic(phasePanicMsg) + } + } + return v +} + +// objectInterface is like object but returns map[string]interface{}. +func (d *decodeState) objectInterface() map[string]any { + m := make(map[string]any) + for { + // Read opening " of string key or closing }. + d.scanWhile(scanSkipSpace) + if d.opcode == scanEndObject { + // closing } - can only happen on first iteration. + break + } + if d.opcode != scanBeginLiteral { + panic(phasePanicMsg) + } + + // Read string key. + start := d.readIndex() + d.rescanLiteral() + item := d.data[start:d.readIndex()] + key, ok := unquote(item) + if !ok { + panic(phasePanicMsg) + } + + // Read : before value. + if d.opcode == scanSkipSpace { + d.scanWhile(scanSkipSpace) + } + if d.opcode != scanObjectKey { + panic(phasePanicMsg) + } + d.scanWhile(scanSkipSpace) + + // Read value. + m[key] = d.valueInterface() + + // Next token must be , or }. + if d.opcode == scanSkipSpace { + d.scanWhile(scanSkipSpace) + } + if d.opcode == scanEndObject { + break + } + if d.opcode != scanObjectValue { + panic(phasePanicMsg) + } + } + return m +} + +// literalInterface consumes and returns a literal from d.data[d.off-1:] and +// it reads the following byte ahead. The first byte of the literal has been +// read already (that's how the caller knows it's a literal). +func (d *decodeState) literalInterface() any { + // All bytes inside literal return scanContinue op code. + start := d.readIndex() + d.rescanLiteral() + + item := d.data[start:d.readIndex()] + + switch c := item[0]; c { + case 'n': // null + return nil + + case 't', 'f': // true, false + return c == 't' + + case '"': // string + s, ok := unquote(item) + if !ok { + panic(phasePanicMsg) + } + return s + + default: // number + if c != '-' && (c < '0' || c > '9') { + panic(phasePanicMsg) + } + n, err := d.convertNumber(string(item)) + if err != nil { + d.saveError(err) + } + return n + } +} + +// getu4 decodes \uXXXX from the beginning of s, returning the hex value, +// or it returns -1. +func getu4(s []byte) rune { + if len(s) < 6 || s[0] != '\\' || s[1] != 'u' { + return -1 + } + var r rune + for _, c := range s[2:6] { + switch { + case '0' <= c && c <= '9': + c = c - '0' + case 'a' <= c && c <= 'f': + c = c - 'a' + 10 + case 'A' <= c && c <= 'F': + c = c - 'A' + 10 + default: + return -1 + } + r = r*16 + rune(c) + } + return r +} + +// unquote converts a quoted JSON string literal s into an actual string t. +// The rules are different than for Go, so cannot use strconv.Unquote. +func unquote(s []byte) (t string, ok bool) { + s, ok = unquoteBytes(s) + t = string(s) + return +} + +func unquoteBytes(s []byte) (t []byte, ok bool) { + if len(s) < 2 || s[0] != '"' || s[len(s)-1] != '"' { + return + } + s = s[1 : len(s)-1] + + // Check for unusual characters. If there are none, + // then no unquoting is needed, so return a slice of the + // original bytes. + r := 0 + for r < len(s) { + c := s[r] + if c == '\\' || c == '"' || c < ' ' { + break + } + if c < utf8.RuneSelf { + r++ + continue + } + rr, size := utf8.DecodeRune(s[r:]) + if rr == utf8.RuneError && size == 1 { + break + } + r += size + } + if r == len(s) { + return s, true + } + + b := make([]byte, len(s)+2*utf8.UTFMax) + w := copy(b, s[0:r]) + for r < len(s) { + // Out of room? Can only happen if s is full of + // malformed UTF-8 and we're replacing each + // byte with RuneError. + if w >= len(b)-2*utf8.UTFMax { + nb := make([]byte, (len(b)+utf8.UTFMax)*2) + copy(nb, b[0:w]) + b = nb + } + switch c := s[r]; { + case c == '\\': + r++ + if r >= len(s) { + return + } + switch s[r] { + default: + return + case '"', '\\', '/', '\'': + b[w] = s[r] + r++ + w++ + case 'b': + b[w] = '\b' + r++ + w++ + case 'f': + b[w] = '\f' + r++ + w++ + case 'n': + b[w] = '\n' + r++ + w++ + case 'r': + b[w] = '\r' + r++ + w++ + case 't': + b[w] = '\t' + r++ + w++ + case 'u': + r-- + rr := getu4(s[r:]) + if rr < 0 { + return + } + r += 6 + if utf16.IsSurrogate(rr) { + rr1 := getu4(s[r:]) + if dec := utf16.DecodeRune(rr, rr1); dec != unicode.ReplacementChar { + // A valid pair; consume. + r += 6 + w += utf8.EncodeRune(b[w:], dec) + break + } + // Invalid surrogate; fall back to replacement rune. + rr = unicode.ReplacementChar + } + w += utf8.EncodeRune(b[w:], rr) + } + + // Quote, control characters are invalid. + case c == '"', c < ' ': + return + + // ASCII + case c < utf8.RuneSelf: + b[w] = c + r++ + w++ + + // Coerce to well-formed UTF-8. + default: + rr, size := utf8.DecodeRune(s[r:]) + r += size + w += utf8.EncodeRune(b[w:], rr) + } + } + return b[0:w], true +} diff --git a/vendor/github.com/evanphx/json-patch/v5/internal/json/encode.go b/vendor/github.com/evanphx/json-patch/v5/internal/json/encode.go new file mode 100644 index 000000000..2e6eca448 --- /dev/null +++ b/vendor/github.com/evanphx/json-patch/v5/internal/json/encode.go @@ -0,0 +1,1486 @@ +// Copyright 2010 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package json implements encoding and decoding of JSON as defined in +// RFC 7159. The mapping between JSON and Go values is described +// in the documentation for the Marshal and Unmarshal functions. +// +// See "JSON and Go" for an introduction to this package: +// https://golang.org/doc/articles/json_and_go.html +package json + +import ( + "bytes" + "encoding" + "encoding/base64" + "fmt" + "math" + "reflect" + "sort" + "strconv" + "strings" + "sync" + "unicode" + "unicode/utf8" +) + +// Marshal returns the JSON encoding of v. +// +// Marshal traverses the value v recursively. +// If an encountered value implements the Marshaler interface +// and is not a nil pointer, Marshal calls its MarshalJSON method +// to produce JSON. If no MarshalJSON method is present but the +// value implements encoding.TextMarshaler instead, Marshal calls +// its MarshalText method and encodes the result as a JSON string. +// The nil pointer exception is not strictly necessary +// but mimics a similar, necessary exception in the behavior of +// UnmarshalJSON. +// +// Otherwise, Marshal uses the following type-dependent default encodings: +// +// Boolean values encode as JSON booleans. +// +// Floating point, integer, and Number values encode as JSON numbers. +// +// String values encode as JSON strings coerced to valid UTF-8, +// replacing invalid bytes with the Unicode replacement rune. +// So that the JSON will be safe to embed inside HTML " that closes the next token. If + // non-empty, the subsequent call to Next will return a raw or RCDATA text + // token: one that treats "
" as text instead of an element. + // rawTag's contents are lower-cased. + rawTag string + // textIsRaw is whether the current text token's data is not escaped. + textIsRaw bool + // convertNUL is whether NUL bytes in the current token's data should + // be converted into \ufffd replacement characters. + convertNUL bool + // allowCDATA is whether CDATA sections are allowed in the current context. + allowCDATA bool +} + +// AllowCDATA sets whether or not the tokenizer recognizes as +// the text "foo". The default value is false, which means to recognize it as +// a bogus comment "" instead. +// +// Strictly speaking, an HTML5 compliant tokenizer should allow CDATA if and +// only if tokenizing foreign content, such as MathML and SVG. However, +// tracking foreign-contentness is difficult to do purely in the tokenizer, +// as opposed to the parser, due to HTML integration points: an