diff --git a/cmd/sidecar_mounter/main.go b/cmd/sidecar_mounter/main.go index 86a58d38..093e4acf 100644 --- a/cmd/sidecar_mounter/main.go +++ b/cmd/sidecar_mounter/main.go @@ -34,6 +34,7 @@ import ( ) var ( + hostNetwork = flag.Bool("host-network", false, "pod hostNetwork configuration") gcsfusePath = flag.String("gcsfuse-path", "/gcsfuse", "gcsfuse path") volumeBasePath = flag.String("volume-base-path", webhook.SidecarContainerTmpVolumeMountPath+"/.volumes", "volume base path") _ = flag.Int("grace-period", 0, "grace period for gcsfuse termination. This flag has been deprecated, has no effect and will be removed in the future.") @@ -60,7 +61,7 @@ func main() { // 1. different gcsfuse logs mixed together. // 2. memory usage peak. time.Sleep(1500 * time.Millisecond) - mc := sidecarmounter.NewMountConfig(sp) + mc := sidecarmounter.NewMountConfig(sp, *hostNetwork) if mc != nil { if err := mounter.Mount(ctx, mc); err != nil { mc.ErrWriter.WriteMsg(fmt.Sprintf("failed to mount bucket %q for volume %q: %v\n", mc.BucketName, mc.VolumeName, err)) diff --git a/deploy/base/node/node_setup.yaml b/deploy/base/node/node_setup.yaml index e949353a..53a19084 100755 --- a/deploy/base/node/node_setup.yaml +++ b/deploy/base/node/node_setup.yaml @@ -50,4 +50,4 @@ roleRef: name: gcs-fuse-csi-publisher-role subjects: - kind: ServiceAccount - name: gcsfusecsi-node-sa \ No newline at end of file + name: gcsfusecsi-node-sa diff --git a/deploy/base/webhook/deployment.yaml b/deploy/base/webhook/deployment.yaml index e1248169..f6abdd9f 100644 --- a/deploy/base/webhook/deployment.yaml +++ b/deploy/base/webhook/deployment.yaml @@ -117,4 +117,4 @@ spec: - name: metrics protocol: TCP port: 8080 - targetPort: 22032 \ No newline at end of file + targetPort: 22032 diff --git a/pkg/sidecar_mounter/sidecar_mounter.go b/pkg/sidecar_mounter/sidecar_mounter.go index 3b64d683..e04e1365 100644 --- a/pkg/sidecar_mounter/sidecar_mounter.go +++ b/pkg/sidecar_mounter/sidecar_mounter.go @@ -20,19 +20,29 @@ package sidecarmounter import ( "bufio" "context" + "encoding/json" + "errors" "fmt" "io" "net" "net/http" + "net/url" "os" "os/exec" + "path" "path/filepath" "strings" "sync" "syscall" "time" + "cloud.google.com/go/compute/metadata" + credentials "cloud.google.com/go/iam/credentials/apiv1" "github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/metrics" + "github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/webhook" + "golang.org/x/oauth2" + "google.golang.org/api/option" + "google.golang.org/api/sts/v1" "k8s.io/klog/v2" ) @@ -53,6 +63,13 @@ func New(mounterPath string) *Mounter { } func (m *Mounter) Mount(ctx context.Context, mc *MountConfig) error { + // Start the token server for HostNetwork enabled pods. + if mc.HostNetwork { + tp := filepath.Join(mc.TempDir, TokenFileName) + klog.Infof("Pod has hostNetwork enabled. Starting Token Server on %s.", tp) + go StartTokenServer(ctx, tp) + } + klog.Infof("start to mount bucket %q for volume %q", mc.BucketName, mc.VolumeName) if err := os.MkdirAll(mc.BufferDir+TempDir, os.ModePerm); err != nil { @@ -275,3 +292,129 @@ func scrapeMetrics(ctx context.Context, metricEndpoint string, w http.ResponseWr return nil } + +func getK8sTokenFromFile(tokenPath string) (string, error) { + token, err := os.ReadFile(tokenPath) + if err != nil { + return "", fmt.Errorf("error reading token file: %w", err) + } + + return strings.TrimSpace(string(token)), nil +} + +func fetchIdentityBindingToken(ctx context.Context, k8sSAToken string) (*oauth2.Token, error) { + stsService, err := sts.NewService(ctx, option.WithHTTPClient(&http.Client{})) + if err != nil { + return nil, fmt.Errorf("new STS service error: %w", err) + } + + audience, err := getAudienceFromContext(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get audience from the context: %w", err) + } + + stsRequest := &sts.GoogleIdentityStsV1ExchangeTokenRequest{ + Audience: audience, + GrantType: "urn:ietf:params:oauth:grant-type:token-exchange", + Scope: credentials.DefaultAuthScopes()[0], + RequestedTokenType: "urn:ietf:params:oauth:token-type:access_token", + SubjectTokenType: "urn:ietf:params:oauth:token-type:jwt", + SubjectToken: k8sSAToken, + } + + stsResponse, err := stsService.V1.Token(stsRequest).Do() + if err != nil { + return nil, fmt.Errorf("IdentityBindingToken exchange error with audience %q: %w", audience, err) + } + + return &oauth2.Token{ + AccessToken: stsResponse.AccessToken, + TokenType: stsResponse.TokenType, + Expiry: time.Now().Add(time.Second * time.Duration(stsResponse.ExpiresIn)), + }, nil +} + +func getAudienceFromContext(ctx context.Context) (string, error) { + projectID, err := metadata.ProjectIDWithContext(ctx) + if err != nil { + return "", fmt.Errorf("failed to get project ID: %w", err) + } + // Get all instance metadata attributes + clusterLocation, err := metadata.InstanceAttributeValueWithContext(ctx, "cluster-location") + if err != nil { + return "", fmt.Errorf("failed to get clusterLocation: %w", err) + } + clusterName, err := metadata.InstanceAttributeValueWithContext(ctx, "cluster-name") + if err != nil { + return "", fmt.Errorf("failed to get clusterName: %w", err) + } + + klog.Infof("projectID: %s, clusterName: %s, clusterLocation: %s", projectID, clusterName, clusterLocation) + onePlatformClusterResourceURL := &url.URL{ + Scheme: "https", + Host: "container.googleapis.com", + Path: path.Join("v1", "projects", projectID, "locations", clusterLocation, "clusters", clusterName), + } + + audience := fmt.Sprintf( + "identitynamespace:%s.svc.id.goog:%s", + projectID, + onePlatformClusterResourceURL, + ) + klog.Infof("audience: %s", audience) + + return audience, nil +} + +func StartTokenServer(ctx context.Context, tokenURLSocketPath string) { + // Create a unix domain socket and listen for incoming connections. + tokenSocketListener, err := net.Listen("unix", tokenURLSocketPath) + if err != nil { + klog.Errorf("failed to create socket %q: %v", tokenURLSocketPath, err) + + return + } + klog.Infof("created a listener using the socket path %s", tokenURLSocketPath) + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + k8stoken, err := getK8sTokenFromFile(webhook.SidecarContainerSATokenVolumeMountPath + "/" + webhook.K8STokenPath) + var stsToken *oauth2.Token + if err != nil { + klog.Errorf("failed to get k8s token from path %v", err) + w.WriteHeader(http.StatusInternalServerError) + + return + } + stsToken, err = fetchIdentityBindingToken(ctx, k8stoken) + if err != nil { + klog.Errorf("failed to get sts token from path %v", err) + w.WriteHeader(http.StatusInternalServerError) + + return + } + // Marshal the oauth2.Token object to JSON + jsonToken, err := json.Marshal(stsToken) + if err != nil { + klog.Errorf("failed to marshal token to JSON: %v", err) + w.WriteHeader(http.StatusInternalServerError) + + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, string(jsonToken)) + }) + + server := http.Server{ + Handler: mux, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + } + + if err := server.Serve(tokenSocketListener); !errors.Is(err, http.ErrServerClosed) { + klog.Errorf("Server for %q returns unexpected error: %v", tokenURLSocketPath, err) + } +} diff --git a/pkg/sidecar_mounter/sidecar_mounter_config.go b/pkg/sidecar_mounter/sidecar_mounter_config.go index 66562139..22a51c43 100644 --- a/pkg/sidecar_mounter/sidecar_mounter_config.go +++ b/pkg/sidecar_mounter/sidecar_mounter_config.go @@ -35,8 +35,10 @@ import ( ) const ( - GCSFuseAppName = "gke-gcs-fuse-csi" - TempDir = "/temp-dir" + GCSFuseAppName = "gke-gcs-fuse-csi" + TempDir = "/temp-dir" + unixSocketBasePath = "unix://" + TokenFileName = "token.sock" // #nosec G101 ) // MountConfig contains the information gcsfuse needs. @@ -52,6 +54,7 @@ type MountConfig struct { ErrWriter stderrWriterInterface `json:"-"` FlagMap map[string]string `json:"-"` ConfigFileFlagMap map[string]string `json:"-"` + HostNetwork bool `json:"-"` } var prometheusPort = 8080 @@ -91,17 +94,18 @@ var boolFlags = map[string]bool{ // 2. The file descriptor // 3. GCS bucket name // 4. Mount options passing to gcsfuse (passed by the csi mounter). -func NewMountConfig(sp string) *MountConfig { +func NewMountConfig(sp string, hnw bool) *MountConfig { // socket path pattern: /gcsfuse-tmp/.volumes//socket tempDir := filepath.Dir(sp) volumeName := filepath.Base(tempDir) mc := MountConfig{ - VolumeName: volumeName, - BufferDir: filepath.Join(webhook.SidecarContainerBufferVolumeMountPath, ".volumes", volumeName), - CacheDir: filepath.Join(webhook.SidecarContainerCacheVolumeMountPath, ".volumes", volumeName), - TempDir: tempDir, - ConfigFile: filepath.Join(webhook.SidecarContainerTmpVolumeMountPath, ".volumes", volumeName, "config.yaml"), - ErrWriter: NewErrorWriter(filepath.Join(tempDir, "error")), + VolumeName: volumeName, + BufferDir: filepath.Join(webhook.SidecarContainerBufferVolumeMountPath, ".volumes", volumeName), + CacheDir: filepath.Join(webhook.SidecarContainerCacheVolumeMountPath, ".volumes", volumeName), + TempDir: tempDir, + ConfigFile: filepath.Join(webhook.SidecarContainerTmpVolumeMountPath, ".volumes", volumeName, "config.yaml"), + ErrWriter: NewErrorWriter(filepath.Join(tempDir, "error")), + HostNetwork: hnw, } klog.Infof("connecting to socket %q", sp) @@ -234,7 +238,6 @@ func (mc *MountConfig) prepareMountArgs() { klog.Warningf("got invalid arguments for volume %q: %v. Will discard invalid args and continue to mount.", invalidArgs, mc.VolumeName) } - mc.FlagMap, mc.ConfigFileFlagMap = flagMap, configFileFlagMap } @@ -276,6 +279,11 @@ func (mc *MountConfig) prepareConfigFile() error { } } } + if mc.HostNetwork { + configMap["gcs-auth"] = map[string]interface{}{ + "token-url": unixSocketBasePath + filepath.Join(mc.TempDir, TokenFileName), + } + } yamlData, err := yaml.Marshal(&configMap) if err != nil { diff --git a/pkg/sidecar_mounter/sidecar_mounter_config_test.go b/pkg/sidecar_mounter/sidecar_mounter_config_test.go index 27d59f59..177977cc 100644 --- a/pkg/sidecar_mounter/sidecar_mounter_config_test.go +++ b/pkg/sidecar_mounter/sidecar_mounter_config_test.go @@ -311,6 +311,45 @@ func TestPrepareConfigFile(t *testing.T) { "cache-dir": "/gcsfuse-cache/.volumes/volume-name", }, }, + { + name: "should create valid config file when hostnetwork is enabled", + mc: &MountConfig{ + ConfigFile: "./test-config-file.yaml", + TempDir: "/gcsfuse-tmp/.volumes/vol1", + ConfigFileFlagMap: map[string]string{ + "logging:file-path": "/dev/fd/1", + "logging:format": "json", + "logging:severity": "error", + "write:create-empty-file": "true", + "file-cache:max-size-mb": "10000", + "file-cache:cache-file-for-range-read": "true", + "metadata-cache:stat-cache-max-size-mb": "1000", + "metadata-cache:type-cache-max-size-mb": "-1", + "cache-dir": "/gcsfuse-cache/.volumes/volume-name", + }, + HostNetwork: true, + }, + expectedConfig: map[string]interface{}{ + "logging": map[string]interface{}{ + "file-path": "/dev/fd/1", + "format": "json", + "severity": "error", + }, + "write": map[string]interface{}{ + "create-empty-file": true, + }, + "file-cache": map[string]interface{}{ + "max-size-mb": 10000, + "cache-file-for-range-read": true, + }, + "metadata-cache": map[string]interface{}{ + "stat-cache-max-size-mb": 1000, + "type-cache-max-size-mb": -1, + }, + "cache-dir": "/gcsfuse-cache/.volumes/volume-name", + "gcs-auth": map[string]interface{}{"token-url": "unix:///gcsfuse-tmp/.volumes/vol1/token.sock"}, + }, + }, { name: "should throw error when incorrect flag is passed", mc: &MountConfig{ diff --git a/pkg/webhook/config.go b/pkg/webhook/config.go index 0b69743c..38628fb2 100644 --- a/pkg/webhook/config.go +++ b/pkg/webhook/config.go @@ -27,6 +27,7 @@ import ( ) type Config struct { + HostNetwork bool `json:"-"` ContainerImage string `json:"-"` MetadataContainerImage string `json:"-"` ImagePullPolicy string `json:"-"` diff --git a/pkg/webhook/mutatingwebhook.go b/pkg/webhook/mutatingwebhook.go index 83d01f4b..089ed7f1 100644 --- a/pkg/webhook/mutatingwebhook.go +++ b/pkg/webhook/mutatingwebhook.go @@ -23,6 +23,7 @@ import ( "fmt" "net/http" + "cloud.google.com/go/compute/metadata" admissionv1 "k8s.io/api/admission/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/version" @@ -55,7 +56,7 @@ type SidecarInjector struct { } // Handle injects a gcsfuse sidecar container and a emptyDir to incoming qualified pods. -func (si *SidecarInjector) Handle(_ context.Context, req admission.Request) admission.Response { +func (si *SidecarInjector) Handle(ctx context.Context, req admission.Request) admission.Response { pod := &corev1.Pod{} if err := si.Decoder.Decode(req, pod); err != nil { @@ -94,6 +95,8 @@ func (si *SidecarInjector) Handle(_ context.Context, req admission.Request) admi return admission.Errored(http.StatusBadRequest, err) } + config.HostNetwork = pod.Spec.HostNetwork + if userProvidedGcsFuseSidecarImage, err := ExtractImageAndDeleteContainer(&pod.Spec, GcsFuseSidecarName); err == nil { if userProvidedGcsFuseSidecarImage != "" { config.ContainerImage = userProvidedGcsFuseSidecarImage @@ -110,6 +113,15 @@ func (si *SidecarInjector) Handle(_ context.Context, req admission.Request) admi // Inject container. injectSidecarContainer(pod, config, injectAsNativeSidecar) + + if pod.Spec.HostNetwork { + projectID, err := metadata.ProjectIDWithContext(ctx) + if err != nil { + return admission.Errored(http.StatusInternalServerError, fmt.Errorf("failed to get project id: %w", err)) + } + pod.Spec.Volumes = append(pod.Spec.Volumes, GetSATokenVolume(projectID)) + } + pod.Spec.Volumes = append(GetSidecarContainerVolumeSpec(pod.Spec.Volumes...), pod.Spec.Volumes...) // Log pod mutation. diff --git a/pkg/webhook/sidecar_spec.go b/pkg/webhook/sidecar_spec.go index a75bc232..d8dd884f 100644 --- a/pkg/webhook/sidecar_spec.go +++ b/pkg/webhook/sidecar_spec.go @@ -19,6 +19,7 @@ package webhook import ( "path/filepath" + "strconv" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -27,21 +28,25 @@ import ( ) const ( - GcsFuseSidecarName = "gke-gcsfuse-sidecar" - MetadataPrefetchSidecarName = "gke-gcsfuse-metadata-prefetch" - SidecarContainerTmpVolumeName = "gke-gcsfuse-tmp" - SidecarContainerTmpVolumeMountPath = "/gcsfuse-tmp" - SidecarContainerBufferVolumeName = "gke-gcsfuse-buffer" - SidecarContainerBufferVolumeMountPath = "/gcsfuse-buffer" - SidecarContainerCacheVolumeName = "gke-gcsfuse-cache" - SidecarContainerCacheVolumeMountPath = "/gcsfuse-cache" + GcsFuseSidecarName = "gke-gcsfuse-sidecar" + MetadataPrefetchSidecarName = "gke-gcsfuse-metadata-prefetch" + SidecarContainerTmpVolumeName = "gke-gcsfuse-tmp" + SidecarContainerTmpVolumeMountPath = "/gcsfuse-tmp" + SidecarContainerBufferVolumeName = "gke-gcsfuse-buffer" + SidecarContainerBufferVolumeMountPath = "/gcsfuse-buffer" + SidecarContainerCacheVolumeName = "gke-gcsfuse-cache" + SidecarContainerCacheVolumeMountPath = "/gcsfuse-cache" + SidecarContainerSATokenVolumeName = "gcsfuse-sa-token" // #nosec G101 + SidecarContainerSATokenVolumeMountPath = "/gcsfuse-sa-token" // #nosec G101 + K8STokenPath = "token" // #nosec G101 // Webhook relevant volume attributes. gcsFuseMetadataPrefetchOnMountVolumeAttribute = "gcsfuseMetadataPrefetchOnMount" // See the nonroot user discussion: https://github.com/GoogleContainerTools/distroless/issues/443 - NobodyUID = 65534 - NobodyGID = 65534 + NobodyUID = 65534 + NobodyGID = 65534 + tokenExpiryDuration = 3600 ) var ( @@ -90,6 +95,11 @@ var ( Name: SidecarContainerCacheVolumeName, MountPath: SidecarContainerCacheVolumeMountPath, } + + saTokenVolumeMount = corev1.VolumeMount{ + Name: SidecarContainerSATokenVolumeName, + MountPath: SidecarContainerSATokenVolumeMountPath, + } ) func GetNativeSidecarContainerSpec(c *Config) corev1.Container { @@ -103,6 +113,11 @@ func GetNativeSidecarContainerSpec(c *Config) corev1.Container { func GetSidecarContainerSpec(c *Config) corev1.Container { limits, requests := prepareResourceList(c) + volumeMounts := []corev1.VolumeMount{TmpVolumeMount, buffVolumeMount, cacheVolumeMount} + if c.HostNetwork { + volumeMounts = append(volumeMounts, saTokenVolumeMount) + } + // The sidecar container follows Restricted Pod Security Standard, // see https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted container := corev1.Container{ @@ -112,12 +127,13 @@ func GetSidecarContainerSpec(c *Config) corev1.Container { SecurityContext: GetSecurityContext(), Args: []string{ "--v=5", + "--host-network=" + strconv.FormatBool(c.HostNetwork), }, Resources: corev1.ResourceRequirements{ Limits: limits, Requests: requests, }, - VolumeMounts: []corev1.VolumeMount{TmpVolumeMount, buffVolumeMount, cacheVolumeMount}, + VolumeMounts: volumeMounts, } return container @@ -218,6 +234,27 @@ func (si *SidecarInjector) GetMetadataPrefetchSidecarContainerSpec(pod *corev1.P return container } +func GetSATokenVolume(projectID string) corev1.Volume { + saTokenVolume := corev1.Volume{ + Name: SidecarContainerSATokenVolumeName, + VolumeSource: corev1.VolumeSource{ + Projected: &corev1.ProjectedVolumeSource{ + Sources: []corev1.VolumeProjection{ + { + ServiceAccountToken: &corev1.ServiceAccountTokenProjection{ + Audience: projectID + ".svc.id.goog", + ExpirationSeconds: &[]int64{tokenExpiryDuration}[0], + Path: K8STokenPath, + }, + }, + }, + }, + }, + } + + return saTokenVolume +} + // GetSidecarContainerVolumeSpec returns volumes required by the sidecar container, // skipping the existing custom volumes. func GetSidecarContainerVolumeSpec(existingVolumes ...corev1.Volume) []corev1.Volume {