From 789ce7e49a5d7d74d4a36f748746e914668ddde1 Mon Sep 17 00:00:00 2001 From: Xinyu Ma Date: Fri, 11 Sep 2020 21:18:37 +0000 Subject: [PATCH] Add workload daemon --- Dockerfile.workload-daemon | 19 + Makefile | 3 +- cmd/workload-daemon/main.go | 76 +++ pkg/experimental/metadata/metadata.go | 518 ++++++++++++++++++ pkg/experimental/workload/daemon/daemon.go | 199 +++++++ .../workload/daemon/provider/gce/vm.go | 298 ++++++++++ .../workload/daemon/utils/interface.go | 52 ++ .../workload/daemon/utils/kube-config.go | 72 +++ .../workload/daemon/utils/template.go | 63 +++ 9 files changed, 1299 insertions(+), 1 deletion(-) create mode 100644 Dockerfile.workload-daemon create mode 100644 cmd/workload-daemon/main.go create mode 100644 pkg/experimental/metadata/metadata.go create mode 100644 pkg/experimental/workload/daemon/daemon.go create mode 100644 pkg/experimental/workload/daemon/provider/gce/vm.go create mode 100644 pkg/experimental/workload/daemon/utils/interface.go create mode 100644 pkg/experimental/workload/daemon/utils/kube-config.go create mode 100644 pkg/experimental/workload/daemon/utils/template.go diff --git a/Dockerfile.workload-daemon b/Dockerfile.workload-daemon new file mode 100644 index 0000000000..f9de13ea1c --- /dev/null +++ b/Dockerfile.workload-daemon @@ -0,0 +1,19 @@ +# Copyright 2020 The Kubernetes Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This image requires ca-certificate, which is pre-installed in distroless. +FROM gcr.io/distroless/static:latest + +ADD bin/ARG_ARCH/ARG_BIN /ARG_BIN +ENTRYPOINT ["/ARG_BIN"] diff --git a/Makefile b/Makefile index 70bfa056ed..da3d4233f5 100644 --- a/Makefile +++ b/Makefile @@ -29,7 +29,8 @@ CONTAINER_BINARIES ?= \ echo \ fuzzer \ glbc \ - workload-controller + workload-controller \ + workload-daemon # Latest commit hash for current branch. GIT_COMMIT := $(shell git rev-parse HEAD) diff --git a/cmd/workload-daemon/main.go b/cmd/workload-daemon/main.go new file mode 100644 index 0000000000..dc45af80a8 --- /dev/null +++ b/cmd/workload-daemon/main.go @@ -0,0 +1,76 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "flag" + "fmt" + "os" + "time" + + daemon "k8s.io/ingress-gce/pkg/experimental/workload/daemon" + gce "k8s.io/ingress-gce/pkg/experimental/workload/daemon/provider/gce" + "k8s.io/klog" + + // GCP Authentication + _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" +) + +func main() { + cmdSet := flag.NewFlagSet("cmd", flag.ExitOnError) + provider := cmdSet.String("provider", "gce", "The provider of this external workload.") + updateInterval := cmdSet.Duration("interval", 30*time.Second, "The resync interval.") + + if len(os.Args) < 2 { + outputHelp() + return + } + cmdSet.Parse(os.Args[2:]) + if *provider != "gce" { + klog.Fatalf("Current implementation only supports gce provider.") + } + + switch os.Args[1] { + case "get-credentials": + vm, err := gce.NewVM() + if err != nil { + klog.Fatalf("unable to initialize GCE VM: %+v", err) + } + credentials, err := vm.Credentials() + if err != nil { + klog.Fatalf("unable to get credentials: %+v", err) + } + daemon.OutputCredentials(credentials) + return + case "start": + klog.V(0).Infof("Workload daemon started") + vm, err := gce.NewVM() + if err != nil { + klog.Fatalf("unable to initialize GCE VM: %+v", err) + } + daemon.RunDaemon(vm, vm, *updateInterval) + return + default: + outputHelp() + return + } +} + +func outputHelp() { + fmt.Printf("Usage: %v [command]\n", os.Args[0]) + fmt.Printf("command:\n start\n get-credentials\n") +} diff --git a/pkg/experimental/metadata/metadata.go b/pkg/experimental/metadata/metadata.go new file mode 100644 index 0000000000..70b014573f --- /dev/null +++ b/pkg/experimental/metadata/metadata.go @@ -0,0 +1,518 @@ +// Copyright 2014 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package metadata provides access to Google Compute Engine (GCE) +// metadata and API service accounts. +// +// This package is a wrapper around the GCE metadata service, +// as documented at https://developers.google.com/compute/docs/metadata. + +// TODO: this is copied from "cloud.google.com/go/compute/metadata" +// Because we want to use metadata server without depending on the module cloud.google.com/go. +// Need to consider how to handle this when writing up an official version. + +package metadata // import "cloud.google.com/go/compute/metadata" + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net" + "net/http" + "net/url" + "os" + "runtime" + "strings" + "sync" + "time" +) + +const ( + // metadataIP is the documented metadata server IP address. + metadataIP = "169.254.169.254" + + // metadataHostEnv is the environment variable specifying the + // GCE metadata hostname. If empty, the default value of + // metadataIP ("169.254.169.254") is used instead. + // This is variable name is not defined by any spec, as far as + // I know; it was made up for the Go package. + metadataHostEnv = "GCE_METADATA_HOST" + + userAgent = "gcloud-golang/0.1" +) + +type cachedValue struct { + k string + trim bool + mu sync.Mutex + v string +} + +var ( + projID = &cachedValue{k: "project/project-id", trim: true} + projNum = &cachedValue{k: "project/numeric-project-id", trim: true} + instID = &cachedValue{k: "instance/id", trim: true} +) + +var ( + defaultClient = &Client{hc: &http.Client{ + Transport: &http.Transport{ + Dial: (&net.Dialer{ + Timeout: 2 * time.Second, + KeepAlive: 30 * time.Second, + }).Dial, + ResponseHeaderTimeout: 2 * time.Second, + }, + }} + subscribeClient = &Client{hc: &http.Client{ + Transport: &http.Transport{ + Dial: (&net.Dialer{ + Timeout: 2 * time.Second, + KeepAlive: 30 * time.Second, + }).Dial, + }, + }} +) + +// NotDefinedError is returned when requested metadata is not defined. +// +// The underlying string is the suffix after "/computeMetadata/v1/". +// +// This error is not returned if the value is defined to be the empty +// string. +type NotDefinedError string + +func (suffix NotDefinedError) Error() string { + return fmt.Sprintf("metadata: GCE metadata %q not defined", string(suffix)) +} + +func (c *cachedValue) get(cl *Client) (v string, err error) { + defer c.mu.Unlock() + c.mu.Lock() + if c.v != "" { + return c.v, nil + } + if c.trim { + v, err = cl.getTrimmed(c.k) + } else { + v, err = cl.Get(c.k) + } + if err == nil { + c.v = v + } + return +} + +var ( + onGCEOnce sync.Once + onGCE bool +) + +// OnGCE reports whether this process is running on Google Compute Engine. +func OnGCE() bool { + onGCEOnce.Do(initOnGCE) + return onGCE +} + +func initOnGCE() { + onGCE = testOnGCE() +} + +func testOnGCE() bool { + // The user explicitly said they're on GCE, so trust them. + if os.Getenv(metadataHostEnv) != "" { + return true + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + resc := make(chan bool, 2) + + // Try two strategies in parallel. + // See https://github.com/googleapis/google-cloud-go/issues/194 + go func() { + req, _ := http.NewRequest("GET", "http://"+metadataIP, nil) + req.Header.Set("User-Agent", userAgent) + res, err := defaultClient.hc.Do(req.WithContext(ctx)) + if err != nil { + resc <- false + return + } + defer res.Body.Close() + resc <- res.Header.Get("Metadata-Flavor") == "Google" + }() + + go func() { + addrs, err := net.LookupHost("metadata.google.internal") + if err != nil || len(addrs) == 0 { + resc <- false + return + } + resc <- strsContains(addrs, metadataIP) + }() + + tryHarder := systemInfoSuggestsGCE() + if tryHarder { + res := <-resc + if res { + // The first strategy succeeded, so let's use it. + return true + } + // Wait for either the DNS or metadata server probe to + // contradict the other one and say we are running on + // GCE. Give it a lot of time to do so, since the system + // info already suggests we're running on a GCE BIOS. + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + select { + case res = <-resc: + return res + case <-timer.C: + // Too slow. Who knows what this system is. + return false + } + } + + // There's no hint from the system info that we're running on + // GCE, so use the first probe's result as truth, whether it's + // true or false. The goal here is to optimize for speed for + // users who are NOT running on GCE. We can't assume that + // either a DNS lookup or an HTTP request to a blackholed IP + // address is fast. Worst case this should return when the + // metaClient's Transport.ResponseHeaderTimeout or + // Transport.Dial.Timeout fires (in two seconds). + return <-resc +} + +// systemInfoSuggestsGCE reports whether the local system (without +// doing network requests) suggests that we're running on GCE. If this +// returns true, testOnGCE tries a bit harder to reach its metadata +// server. +func systemInfoSuggestsGCE() bool { + if runtime.GOOS != "linux" { + // We don't have any non-Linux clues available, at least yet. + return false + } + slurp, _ := ioutil.ReadFile("/sys/class/dmi/id/product_name") + name := strings.TrimSpace(string(slurp)) + return name == "Google" || name == "Google Compute Engine" +} + +// Subscribe calls Client.Subscribe on a client designed for subscribing (one with no +// ResponseHeaderTimeout). +func Subscribe(suffix string, fn func(v string, ok bool) error) error { + return subscribeClient.Subscribe(suffix, fn) +} + +// Get calls Client.Get on the default client. +func Get(suffix string) (string, error) { return defaultClient.Get(suffix) } + +// ProjectID returns the current instance's project ID string. +func ProjectID() (string, error) { return defaultClient.ProjectID() } + +// NumericProjectID returns the current instance's numeric project ID. +func NumericProjectID() (string, error) { return defaultClient.NumericProjectID() } + +// InternalIP returns the instance's primary internal IP address. +func InternalIP() (string, error) { return defaultClient.InternalIP() } + +// ExternalIP returns the instance's primary external (public) IP address. +func ExternalIP() (string, error) { return defaultClient.ExternalIP() } + +// Hostname returns the instance's hostname. This will be of the form +// ".c..internal". +func Hostname() (string, error) { return defaultClient.Hostname() } + +// InstanceTags returns the list of user-defined instance tags, +// assigned when initially creating a GCE instance. +func InstanceTags() ([]string, error) { return defaultClient.InstanceTags() } + +// InstanceID returns the current VM's numeric instance ID. +func InstanceID() (string, error) { return defaultClient.InstanceID() } + +// InstanceName returns the current VM's instance ID string. +func InstanceName() (string, error) { return defaultClient.InstanceName() } + +// Zone returns the current VM's zone, such as "us-central1-b". +func Zone() (string, error) { return defaultClient.Zone() } + +// InstanceAttributes calls Client.InstanceAttributes on the default client. +func InstanceAttributes() ([]string, error) { return defaultClient.InstanceAttributes() } + +// ProjectAttributes calls Client.ProjectAttributes on the default client. +func ProjectAttributes() ([]string, error) { return defaultClient.ProjectAttributes() } + +// InstanceAttributeValue calls Client.InstanceAttributeValue on the default client. +func InstanceAttributeValue(attr string) (string, error) { + return defaultClient.InstanceAttributeValue(attr) +} + +// ProjectAttributeValue calls Client.ProjectAttributeValue on the default client. +func ProjectAttributeValue(attr string) (string, error) { + return defaultClient.ProjectAttributeValue(attr) +} + +// Scopes calls Client.Scopes on the default client. +func Scopes(serviceAccount string) ([]string, error) { return defaultClient.Scopes(serviceAccount) } + +func strsContains(ss []string, s string) bool { + for _, v := range ss { + if v == s { + return true + } + } + return false +} + +// A Client provides metadata. +type Client struct { + hc *http.Client +} + +// NewClient returns a Client that can be used to fetch metadata. All HTTP requests +// will use the given http.Client instead of the default client. +func NewClient(c *http.Client) *Client { + return &Client{hc: c} +} + +// getETag returns a value from the metadata service as well as the associated ETag. +// This func is otherwise equivalent to Get. +func (c *Client) getETag(suffix string) (value, etag string, err error) { + // Using a fixed IP makes it very difficult to spoof the metadata service in + // a container, which is an important use-case for local testing of cloud + // deployments. To enable spoofing of the metadata service, the environment + // variable GCE_METADATA_HOST is first inspected to decide where metadata + // requests shall go. + host := os.Getenv(metadataHostEnv) + if host == "" { + // Using 169.254.169.254 instead of "metadata" here because Go + // binaries built with the "netgo" tag and without cgo won't + // know the search suffix for "metadata" is + // ".google.internal", and this IP address is documented as + // being stable anyway. + host = metadataIP + } + u := "http://" + host + "/computeMetadata/v1/" + suffix + req, _ := http.NewRequest("GET", u, nil) + req.Header.Set("Metadata-Flavor", "Google") + req.Header.Set("User-Agent", userAgent) + res, err := c.hc.Do(req) + if err != nil { + return "", "", err + } + defer res.Body.Close() + if res.StatusCode == http.StatusNotFound { + return "", "", NotDefinedError(suffix) + } + all, err := ioutil.ReadAll(res.Body) + if err != nil { + return "", "", err + } + if res.StatusCode != 200 { + return "", "", &Error{Code: res.StatusCode, Message: string(all)} + } + return string(all), res.Header.Get("Etag"), nil +} + +// Get returns a value from the metadata service. +// The suffix is appended to "http://${GCE_METADATA_HOST}/computeMetadata/v1/". +// +// If the GCE_METADATA_HOST environment variable is not defined, a default of +// 169.254.169.254 will be used instead. +// +// If the requested metadata is not defined, the returned error will +// be of type NotDefinedError. +func (c *Client) Get(suffix string) (string, error) { + val, _, err := c.getETag(suffix) + return val, err +} + +func (c *Client) getTrimmed(suffix string) (s string, err error) { + s, err = c.Get(suffix) + s = strings.TrimSpace(s) + return +} + +func (c *Client) lines(suffix string) ([]string, error) { + j, err := c.Get(suffix) + if err != nil { + return nil, err + } + s := strings.Split(strings.TrimSpace(j), "\n") + for i := range s { + s[i] = strings.TrimSpace(s[i]) + } + return s, nil +} + +// ProjectID returns the current instance's project ID string. +func (c *Client) ProjectID() (string, error) { return projID.get(c) } + +// NumericProjectID returns the current instance's numeric project ID. +func (c *Client) NumericProjectID() (string, error) { return projNum.get(c) } + +// InstanceID returns the current VM's numeric instance ID. +func (c *Client) InstanceID() (string, error) { return instID.get(c) } + +// InternalIP returns the instance's primary internal IP address. +func (c *Client) InternalIP() (string, error) { + return c.getTrimmed("instance/network-interfaces/0/ip") +} + +// ExternalIP returns the instance's primary external (public) IP address. +func (c *Client) ExternalIP() (string, error) { + return c.getTrimmed("instance/network-interfaces/0/access-configs/0/external-ip") +} + +// Hostname returns the instance's hostname. This will be of the form +// ".c..internal". +func (c *Client) Hostname() (string, error) { + return c.getTrimmed("instance/hostname") +} + +// InstanceTags returns the list of user-defined instance tags, +// assigned when initially creating a GCE instance. +func (c *Client) InstanceTags() ([]string, error) { + var s []string + j, err := c.Get("instance/tags") + if err != nil { + return nil, err + } + if err := json.NewDecoder(strings.NewReader(j)).Decode(&s); err != nil { + return nil, err + } + return s, nil +} + +// InstanceName returns the current VM's instance ID string. +func (c *Client) InstanceName() (string, error) { + host, err := c.Hostname() + if err != nil { + return "", err + } + return strings.Split(host, ".")[0], nil +} + +// Zone returns the current VM's zone, such as "us-central1-b". +func (c *Client) Zone() (string, error) { + zone, err := c.getTrimmed("instance/zone") + // zone is of the form "projects//zones/". + if err != nil { + return "", err + } + return zone[strings.LastIndex(zone, "/")+1:], nil +} + +// InstanceAttributes returns the list of user-defined attributes, +// assigned when initially creating a GCE VM instance. The value of an +// attribute can be obtained with InstanceAttributeValue. +func (c *Client) InstanceAttributes() ([]string, error) { return c.lines("instance/attributes/") } + +// ProjectAttributes returns the list of user-defined attributes +// applying to the project as a whole, not just this VM. The value of +// an attribute can be obtained with ProjectAttributeValue. +func (c *Client) ProjectAttributes() ([]string, error) { return c.lines("project/attributes/") } + +// InstanceAttributeValue returns the value of the provided VM +// instance attribute. +// +// If the requested attribute is not defined, the returned error will +// be of type NotDefinedError. +// +// InstanceAttributeValue may return ("", nil) if the attribute was +// defined to be the empty string. +func (c *Client) InstanceAttributeValue(attr string) (string, error) { + return c.Get("instance/attributes/" + attr) +} + +// ProjectAttributeValue returns the value of the provided +// project attribute. +// +// If the requested attribute is not defined, the returned error will +// be of type NotDefinedError. +// +// ProjectAttributeValue may return ("", nil) if the attribute was +// defined to be the empty string. +func (c *Client) ProjectAttributeValue(attr string) (string, error) { + return c.Get("project/attributes/" + attr) +} + +// Scopes returns the service account scopes for the given account. +// The account may be empty or the string "default" to use the instance's +// main account. +func (c *Client) Scopes(serviceAccount string) ([]string, error) { + if serviceAccount == "" { + serviceAccount = "default" + } + return c.lines("instance/service-accounts/" + serviceAccount + "/scopes") +} + +// Subscribe subscribes to a value from the metadata service. +// The suffix is appended to "http://${GCE_METADATA_HOST}/computeMetadata/v1/". +// The suffix may contain query parameters. +// +// Subscribe calls fn with the latest metadata value indicated by the provided +// suffix. If the metadata value is deleted, fn is called with the empty string +// and ok false. Subscribe blocks until fn returns a non-nil error or the value +// is deleted. Subscribe returns the error value returned from the last call to +// fn, which may be nil when ok == false. +func (c *Client) Subscribe(suffix string, fn func(v string, ok bool) error) error { + const failedSubscribeSleep = time.Second * 5 + + // First check to see if the metadata value exists at all. + val, lastETag, err := c.getETag(suffix) + if err != nil { + return err + } + + if err := fn(val, true); err != nil { + return err + } + + ok := true + if strings.ContainsRune(suffix, '?') { + suffix += "&wait_for_change=true&last_etag=" + } else { + suffix += "?wait_for_change=true&last_etag=" + } + for { + val, etag, err := c.getETag(suffix + url.QueryEscape(lastETag)) + if err != nil { + if _, deleted := err.(NotDefinedError); !deleted { + time.Sleep(failedSubscribeSleep) + continue // Retry on other errors. + } + ok = false + } + lastETag = etag + + if err := fn(val, ok); err != nil || !ok { + return err + } + } +} + +// Error contains an error response from the server. +type Error struct { + // Code is the HTTP response status code. + Code int + // Message is the server response message. + Message string +} + +func (e *Error) Error() string { + return fmt.Sprintf("compute: Received %d `%s`", e.Code, e.Message) +} diff --git a/pkg/experimental/workload/daemon/daemon.go b/pkg/experimental/workload/daemon/daemon.go new file mode 100644 index 0000000000..0c94fc634e --- /dev/null +++ b/pkg/experimental/workload/daemon/daemon.go @@ -0,0 +1,199 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package daemon + +import ( + "context" + "encoding/json" + "fmt" + "os" + "os/signal" + "syscall" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + workloadv1a1 "k8s.io/ingress-gce/pkg/experimental/apis/workload/v1alpha1" + workloadclient "k8s.io/ingress-gce/pkg/experimental/workload/client/clientset/versioned" + daemonutils "k8s.io/ingress-gce/pkg/experimental/workload/daemon/utils" + "k8s.io/ingress-gce/pkg/utils" + "k8s.io/klog" +) + +// RunDaemon executes the workload daemon +func RunDaemon( + workload daemonutils.WorkloadInfo, + connHelper daemonutils.ConnectionHelper, + updateInterval time.Duration, +) { + name, nameExist := workload.Name() + if !nameExist { + klog.Fatalf("Workload must have a name") + } + + // Generate KubeConfig and connect to it + config, err := connHelper.KubeConfig() + if err != nil { + klog.Fatalf("unable to create KubeConfig: %+v", err) + } + var clientset workloadclient.Interface + clientset, err = workloadclient.NewForConfig(config) + if err != nil { + klog.Fatalf("unable to connect to the cluster: %+v", err) + } + + // Create the workload resource + client := clientset.NetworkingV1alpha1().Workloads(corev1.NamespaceDefault) + wlcr := getWorkloadCR(workload) + _, err = client.Create(context.Background(), wlcr, metav1.CreateOptions{}) + if err != nil { + klog.Fatalf("unable to create the workload resource: %+v", err) + } + klog.V(2).Infof("workload resource created: %s", name) + + // Update the heartbeat regularly + ticker := time.NewTicker(updateInterval) + quit := make(chan interface{}) + sigs := make(chan os.Signal, 1) + go updateCR(wlcr, clientset, ticker, sigs, quit) + + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + <-quit + klog.V(0).Infof("receiving quit signal, try to delete the workload resource") + + err = client.Delete(context.Background(), name, metav1.DeleteOptions{}) + if err != nil { + klog.Errorf("unable to delete the workload resource: %+v", err) + } else { + klog.V(2).Infof("workload resource deleted") + } +} + +func updateCR( + workload *workloadv1a1.Workload, + clientset workloadclient.Interface, + ticker *time.Ticker, + sigs chan os.Signal, + quit chan interface{}, +) { + oldStatus := workload.Status + for { + select { + case <-ticker.C: + newStatus := generateHeartbeatStatus() + patch, err := preparePatchBytesforWorkloadStatus(oldStatus, newStatus) + if err != nil { + klog.Errorf("failed to prepare the patch for workload resource: %+v", err) + continue + } + oldStatus = newStatus + + vmInstClient := clientset.NetworkingV1alpha1().Workloads(corev1.NamespaceDefault) + // WARNING: This patch does not work with Ping and Ready, since it will overwrite the whole conditions list + // TODO: The following options are considered: + // - Get and Update. The problem is that it may be too heavy for Heartbeat. + // - Fix the index of Heartbeat in the list, and do a JSON patch. Maybe too hacky. + // - StrategicMerge does not work for CRD now, but hopefully it may work after + // we switch to metav1.Condition. Not been tested yet. + // - Use server-side Apply Patch. This only works for 1.18+. + _, err = vmInstClient.Patch(context.Background(), workload.Name, types.MergePatchType, + patch, metav1.PatchOptions{}) + if err != nil { + klog.Errorf("failed to update the workload resource: %+v", err) + } else { + klog.V(2).Infof("workload resource updated") + } + case <-sigs: + ticker.Stop() + quit <- true + return + } + } +} + +func getWorkloadCR(workload daemonutils.WorkloadInfo) *workloadv1a1.Workload { + ret := workloadv1a1.Workload{ + ObjectMeta: metav1.ObjectMeta{ + Name: stringOrEmpty(workload.Name()), + Labels: workload.Labels(), + }, + Spec: workloadv1a1.WorkloadSpec{ + EnableHeartbeat: true, + EnablePing: true, + }, + Status: generateHeartbeatStatus(), + } + if region, exist := workload.Region(); exist { + ret.ObjectMeta.Labels["topology.kubernetes.io/region"] = region + } + if zone, exist := workload.Zone(); exist { + ret.ObjectMeta.Labels["topology.kubernetes.io/region"] = zone + } + if hostname, exist := workload.Hostname(); exist { + ret.Spec.Hostname = &hostname + } + if ip, exist := workload.IP(); exist { + ret.Spec.Addresses = []workloadv1a1.ExternalWorkloadAddress{ + { + Address: ip, + AddressType: workloadv1a1.AddressTypeIPv4, + }, + } + } + return &ret +} + +func stringOrEmpty(str string, exist bool) string { + if exist { + return str + } else { + return "" + } +} + +// OutputCredentials prints the credentials to stdout +func OutputCredentials(credentials daemonutils.ClusterCredentials) { + ret, err := json.Marshal(credentials) + if err != nil { + klog.Fatalf("unable to serialize credentials: %+v", err) + } + fmt.Println(string(ret)) +} + +// preparePatchBytesforWorkloadStatus generates patch bytes based on the old and new workload status +func preparePatchBytesforWorkloadStatus(oldStatus, newStatus workloadv1a1.WorkloadStatus) ([]byte, error) { + patchBytes, err := utils.StrategicMergePatchBytes( + workloadv1a1.Workload{Status: oldStatus}, + workloadv1a1.Workload{Status: newStatus}, + workloadv1a1.Workload{}, + ) + return patchBytes, err +} + +func generateHeartbeatStatus() workloadv1a1.WorkloadStatus { + return workloadv1a1.WorkloadStatus{ + Conditions: []workloadv1a1.Condition{ + { + Type: workloadv1a1.WorkloadConditionHeartbeat, + Status: workloadv1a1.ConditionStatusTrue, + LastTransitionTime: metav1.Now(), + Reason: "Heartbeat", + }, + }, + } +} diff --git a/pkg/experimental/workload/daemon/provider/gce/vm.go b/pkg/experimental/workload/daemon/provider/gce/vm.go new file mode 100644 index 0000000000..02854ccf1d --- /dev/null +++ b/pkg/experimental/workload/daemon/provider/gce/vm.go @@ -0,0 +1,298 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gce + +import ( + "context" + "encoding/json" + "os" + "path/filepath" + "strings" + "time" + + gkev1 "google.golang.org/api/container/v1" + "google.golang.org/api/option" + "google.golang.org/api/transport" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/homedir" + "k8s.io/ingress-gce/pkg/experimental/metadata" + daemonutils "k8s.io/ingress-gce/pkg/experimental/workload/daemon/utils" + "k8s.io/klog" +) + +// VM represents a VM instance running on Google Cloud. +// It uses the metadata server to fetch all required information. +type VM struct { + instanceName string + hostname string + internalIP string + externalIP string + projectID string + region string + zone string + + // The following fields are used to access Kubernetes cluster and create resources. + // They are stored as custom metadata, if any. + + // clusterName is the Kubernetes cluster name, stored as "k8s-cluster-name". + clusterName string + // clusterZone is the zone the Kubernetes cluster locates, stored as "k8s-cluster-zone". + clusterZone string + // ksaName is the Kubernetes service account (KSA) name used to access the cluster. + // Blank in the case when gcloud IAM service account (GSA) is used. + ksaName string + // ksaToken is the access token of Kubernetes service account + // Blank if not applicable. + ksaToken string + + // vmLabels are labels to use in the workload resource + // Metadata startwith "k8s-label-" are used to create it. + // For example, metadata "k8s-label-foo:bar" leads to the label "foo:bar" + vmLabels map[string]string +} + +// gsaAccessToken is an OAuth2 access token fetched from metadata server +type gsaAccessToken struct { + AccessToken string `json:"access_token"` + ExpiresIn int `json:"expires_in"` + TokenType string `json:"token_type,omitempty"` +} + +// Name is the name of the workload +func (vm *VM) Name() (string, bool) { + return vm.instanceName, true +} + +// Hostname is the hostname or DNS address of the workload +func (vm *VM) Hostname() (string, bool) { + return vm.hostname, true +} + +// IP is the IP used to access this workload from the cluster +func (vm *VM) IP() (string, bool) { + return vm.internalIP, true +} + +// Labels are one or more labels associated with the workload +func (vm *VM) Labels() map[string]string { + return vm.vmLabels +} + +// Region associated with the endpoint. +func (vm *VM) Region() (string, bool) { + return vm.region, true +} + +// Zone associated with the endpoint. +func (vm *VM) Zone() (string, bool) { + return vm.zone, true +} + +func decodeGsaAccessToken(jsonData string) (token gsaAccessToken, err error) { + token = gsaAccessToken{} + err = json.Unmarshal([]byte(jsonData), &token) + if err != nil { + klog.Errorf("malformed service account access token: %+v", err) + return + } + return +} + +// Credentials contain the credentials used for the daemon to access the cluster +func (vm *VM) Credentials() (daemonutils.ClusterCredentials, error) { + jsonData, err := metadata.Get("instance/service-accounts/default/token") + if err != nil { + klog.Errorf("failed to get service account access token: %+v", err) + return daemonutils.ClusterCredentials{}, err + } + + token, err := decodeGsaAccessToken(jsonData) + if err != nil { + klog.Errorf("failed to decode service account access token: %+v", err) + return daemonutils.ClusterCredentials{}, err + } + expiryTime := time.Now().Add(time.Duration(token.ExpiresIn) * time.Second) + ret := daemonutils.ClusterCredentials{ + AccessToken: token.AccessToken, + TokenExpiry: expiryTime.UTC().Format(time.RFC3339), + } + return ret, nil +} + +// getCluster returns the cluster info +func (vm *VM) getCluster() (cluster *gkev1.Cluster, err error) { + // These fields should be fetched in NewVM(), but the error info was ignored, + // as they are optional fields when "~/.kube/config" is used. + // Therefore, if they are not present, try to fetch again to return the exact error. + if vm.clusterName == "" { + vm.clusterName, err = metadata.InstanceAttributeValue("k8s-cluster-name") + if err != nil { + klog.Errorf("failed to get k8s-cluster-name from metadata server: %+v", err) + return + } + } + if vm.clusterZone == "" { + vm.clusterZone, err = metadata.InstanceAttributeValue("k8s-cluster-zone") + if err != nil { + klog.Errorf("failed to get k8s-cluster-zone from metadata server: %+v", err) + return + } + } + + // Use GCE container APIs to get IP and CA + // Should be available for "Kubernetes Engine Cluster Viewer" role + oauthClient, _, err := transport.NewHTTPClient(context.Background(), + option.WithScopes(gkev1.CloudPlatformScope)) + if err != nil { + klog.Errorf("failed to initalize http client: %+v", err) + return + } + gkeSvc, err := gkev1.New(oauthClient) + if err != nil { + klog.Errorf("failed to initialize gke client: %+v", err) + return + } + clusterSvc := gkev1.NewProjectsZonesClustersService(gkeSvc) + cluster, err = clusterSvc.Get(vm.projectID, vm.clusterZone, vm.clusterName).Do() + if err != nil { + klog.Errorf("failed to get gke cluster: %+v", err) + return + } + return +} + +// KubeConfig yields the config used to create Kubernetes clientset. +// It tries the following ways in order: +// - Use ~/.kube/config file. +// - Use Kubernetes service account (KSA) specified by metadata. +// - Use gcloud IAM service account (GSA) associated with this instance. +func (vm *VM) KubeConfig() (config *rest.Config, err error) { + // CASE1: If there is a kubeConfig file, use that file. E.g. testing on a cloudtop. + configFile := filepath.Join(homedir.HomeDir(), ".kube", "config") + config, err = clientcmd.BuildConfigFromFlags("", configFile) + if err == nil { + return + } + if !os.IsNotExist(err) { + klog.Errorf("unable to build config from kubeConfig file: %+v", err) + return + } + + // Get contianer master address and CA + cluster, err := vm.getCluster() + if err != nil { + klog.Errorf("unable to get the cluster info: %+v", err) + return + } + + var kubeConfig []byte + if vm.ksaName != "" && vm.ksaToken != "" { + // CASE2: If there is a KSA specified as metadata, use it + kubeConfig = daemonutils.GenKubeConfigForKSA(cluster.MasterAuth.ClusterCaCertificate, cluster.Endpoint, + cluster.Name, vm.ksaName, vm.ksaToken) + } else { + // CASE3: Use gcloud SA to authenticate as a Kubernetes user + kubeConfig = daemonutils.GenKubeConfigForUser(cluster.MasterAuth.ClusterCaCertificate, cluster.Endpoint, + cluster.Name, "gcp") + } + + config, err = clientcmd.RESTConfigFromKubeConfig([]byte(kubeConfig)) + if err != nil { + klog.Errorf("failed to create kubeconfig: %+v", err) + return + } + + return +} + +func getAttrOrPanic(getter func() (string, error), name string) string { + ret, err := getter() + if err != nil { + klog.Errorf("failed to get %s from metadata server: %+v", name, err) + // This will be recovered by the NewVM function. + panic(err) + } + return ret +} + +func getOptionalMetadata(attr string) string { + ret, err := metadata.InstanceAttributeValue(attr) + if err != nil { + ret = "" + } + return ret +} + +// NewVM fetches all data needed from the metadata server to create VM +func NewVM() (vm *VM, err error) { + // Catch the error in getAttrOrPanic + defer func() { + e := recover() + if e != nil { + err = e.(error) + } + }() + vm = &VM{ + // Fetch basic info that every GCP Instance has + instanceName: getAttrOrPanic(metadata.InstanceName, "InstanceName"), + hostname: getAttrOrPanic(metadata.Hostname, "Hostname"), + internalIP: getAttrOrPanic(metadata.InternalIP, "InternalIP"), + externalIP: getAttrOrPanic(metadata.ExternalIP, "ExternalIP"), + projectID: getAttrOrPanic(metadata.ProjectID, "ProjectID"), + zone: getAttrOrPanic(metadata.Zone, "Zone"), + // Fetch the cluster name and zone + // Not specified if the user want to use ~/.kube/config file + clusterName: getOptionalMetadata("k8s-cluster-name"), + clusterZone: getOptionalMetadata("k8s-cluster-zone"), + // Fetch the KSA name and token if existing + ksaName: getOptionalMetadata("k8s-sa-name"), + ksaToken: getOptionalMetadata("k8s-sa-token"), + // Labels to use in the workload resource + vmLabels: make(map[string]string), + } + + lastDash := strings.LastIndex(vm.zone, "-") + if lastDash >= 0 { + vm.region = vm.zone[:lastDash] + } else { + vm.region = "" + } + + const ( + labelPrefix = "k8s-label-" + prefixLen = len(labelPrefix) + ) + + // Fetch labels + attrs, err := metadata.InstanceAttributes() + if err != nil { + klog.Errorf("failed to get attribute list from metadata server: %+v", err) + return nil, err + } + for _, name := range attrs { + if strings.HasPrefix(name, labelPrefix) { + val, err := metadata.InstanceAttributeValue(name) + if err != nil { + klog.Errorf("faild to fetch label %s: %+v", name, err) + } + vm.vmLabels[name[prefixLen:]] = val + } + } + + return +} diff --git a/pkg/experimental/workload/daemon/utils/interface.go b/pkg/experimental/workload/daemon/utils/interface.go new file mode 100644 index 0000000000..5d074aa582 --- /dev/null +++ b/pkg/experimental/workload/daemon/utils/interface.go @@ -0,0 +1,52 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import "k8s.io/client-go/rest" + +// WorkloadInfo represents basic information of this workload +type WorkloadInfo interface { + // Name is the name of the workload + Name() (string, bool) + // Hostname is the hostname or DNS address of the workload + Hostname() (string, bool) + // IP is the IP used to access this workload from the cluster + IP() (string, bool) + // Labels are one or more labels associated with the workload + Labels() map[string]string + // Region associated with the endpoint. + Region() (string, bool) + // Zone associated with the endpoint. + Zone() (string, bool) +} + +// ConnectionHelper provides the identity and config used to connect to the cluster +type ConnectionHelper interface { + // Credentials contain the credentials used for the deamon to access the cluster. + // This is output to stdout, for Kubernetes clients to use. + Credentials() (ClusterCredentials, error) + // KubeConfig yields the config used to create Kubernetes clientset. + KubeConfig() (*rest.Config, error) +} + +// ClusterCredentials contains the access token to the cluster and its expiry time +type ClusterCredentials struct { + // AccessToken is the access token to access the cluster + AccessToken string `json:"access_token"` + // TokenExpiry is the expiry time of the access token, in RFC3339 format + TokenExpiry string `json:"token_expiry"` +} diff --git a/pkg/experimental/workload/daemon/utils/kube-config.go b/pkg/experimental/workload/daemon/utils/kube-config.go new file mode 100644 index 0000000000..7da8d6d17d --- /dev/null +++ b/pkg/experimental/workload/daemon/utils/kube-config.go @@ -0,0 +1,72 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "bytes" + "os" + "path/filepath" + "text/template" + + "k8s.io/klog" +) + +// GenKubeConfigForKSA generates a KubeConfig to access the cluster using a Kubernetes service account +func GenKubeConfigForKSA(clusterCa, clusterIP, clusterName, saName, accessToken string) []byte { + var kubeConfig bytes.Buffer + t, err := template.New("user").Parse(kubeConfigUserTemp) + if err != nil { + klog.Fatalf("unablt to create KubeConfig template: %+v", err) + } + err = t.Execute(&kubeConfig, map[string]string{ + "clusterCa": clusterCa, + "clusterIP": clusterIP, + "clusterName": clusterName, + "saName": saName, + "accessToken": accessToken, + }) + if err != nil { + klog.Fatalf("unablt to execute KubeConfig template: %+v", err) + } + return kubeConfig.Bytes() +} + +// GenKubeConfigForUser generates a KubeConfig to access the cluster using a third-party identity +func GenKubeConfigForUser(clusterCa, clusterIP, clusterName, authProvider string) []byte { + pwd, err := os.Getwd() + if err != nil { + klog.Fatalf("failed to get current dir: %+v", err) + } + path := filepath.Join(pwd, os.Args[0]) + + var kubeConfig bytes.Buffer + t, err := template.New("user").Parse(kubeConfigUserTemp) + if err != nil { + klog.Fatalf("unablt to create KubeConfig template: %+v", err) + } + err = t.Execute(&kubeConfig, map[string]string{ + "clusterCa": clusterCa, + "clusterIP": clusterIP, + "clusterName": clusterName, + "path": path, + "authProvider": authProvider, + }) + if err != nil { + klog.Fatalf("unablt to execute KubeConfig template: %+v", err) + } + return kubeConfig.Bytes() +} diff --git a/pkg/experimental/workload/daemon/utils/template.go b/pkg/experimental/workload/daemon/utils/template.go new file mode 100644 index 0000000000..66f3b65479 --- /dev/null +++ b/pkg/experimental/workload/daemon/utils/template.go @@ -0,0 +1,63 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +const kubeConfigUserTemp = ` +apiVersion: v1 +clusters: +- cluster: + certificate-authority-data: {{.clusterCa}} + server: https://{{.clusterIP}} + name: {{.clusterName}} +contexts: +- context: + cluster: {{.clusterName}} + user: {{.clusterName}} + name: {{.clusterName}} +current-context: {{.clusterName}} +kind: Config +preferences: {} +users: +- name: {{.clusterName}} + user: + auth-provider: + config: + cmd-args: get-credentials + cmd-path: {{.path}} + expiry-key: '{.token_expiry}' + token-key: '{.access_token}' + name: {{.authProvider}}` + +const kubeConfigKsaTemp = ` +apiVersion: v1 +clusters: +- cluster: + certificate-authority-data: {{.clusterCa}} + server: https://{{.clusterIP}} + name: {{.clusterName}} +contexts: +- context: + cluster: {{.clusterName}} + user: {{.saName}} + name: {{.clusterName}} +current-context: {{.clusterName}} +kind: Config +preferences: {} +users: +- name: {{.saName}} + user: + token: {{.accessToken}}`