Skip to content

Commit

Permalink
Use projected token volume for hostNetwork pods.
Browse files Browse the repository at this point in the history
  • Loading branch information
siyanshen committed Jan 15, 2025
1 parent f1588b0 commit 3b2b99f
Show file tree
Hide file tree
Showing 13 changed files with 340 additions and 28 deletions.
150 changes: 148 additions & 2 deletions cmd/sidecar_mounter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,41 @@ package main

import (
"context"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"path"
"path/filepath"
"strconv"
"strings"
"syscall"
"time"

"cloud.google.com/go/compute/metadata"

credentials "cloud.google.com/go/iam/credentials/apiv1"
sidecarmounter "github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/sidecar_mounter"
"github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/webhook"
"golang.org/x/oauth2"
"google.golang.org/api/option"
"google.golang.org/api/sts/v1"
"k8s.io/klog/v2"
)

var (
hostNetwork = flag.Bool("host-network", false, "pod hostNetwork configuration")
gcsfusePath = flag.String("gcsfuse-path", "/gcsfuse", "gcsfuse path")
volumeBasePath = flag.String("volume-base-path", webhook.SidecarContainerTmpVolumeMountPath+"/.volumes", "volume base path")
_ = flag.Int("grace-period", 0, "grace period for gcsfuse termination. This flag has been deprecated, has no effect and will be removed in the future.")
// This is set at compile time.
version = "unknown"
version = "unknown"
tokenURLSocketPath = "/gcsfuse-tmp/.volumes/gcs-fuse-csi-ephemeral/token.sock" // #nosec G101
)

func main() {
Expand All @@ -55,12 +70,17 @@ func main() {
mounter := sidecarmounter.New(*gcsfusePath)
ctx, cancel := context.WithCancel(context.Background())

if *hostNetwork {
klog.Info("Pod has hostNetwork enabled. Starting Token Server.")
go startTokenServer(ctx)
}

for _, sp := range socketPaths {
// sleep 1.5 seconds before launch the next gcsfuse to avoid
// 1. different gcsfuse logs mixed together.
// 2. memory usage peak.
time.Sleep(1500 * time.Millisecond)
mc := sidecarmounter.NewMountConfig(sp)
mc := sidecarmounter.NewMountConfig(sp, *hostNetwork)
if mc != nil {
if err := mounter.Mount(ctx, mc); err != nil {
mc.ErrWriter.WriteMsg(fmt.Sprintf("failed to mount bucket %q for volume %q: %v\n", mc.BucketName, mc.VolumeName, err))
Expand Down Expand Up @@ -117,3 +137,129 @@ func main() {

klog.Info("exiting sidecar mounter...")
}

func getK8sToken(tokenPath string) (string, error) {
token, err := ioutil.ReadFile(tokenPath)
if err != nil {
return "", fmt.Errorf("error reading token file: %w", err)
}

return strings.TrimSpace(string(token)), nil
}

func fetchIdentityBindingToken(ctx context.Context, k8sSAToken string) (*oauth2.Token, error) {
stsService, err := sts.NewService(ctx, option.WithHTTPClient(&http.Client{}))
if err != nil {
return nil, fmt.Errorf("new STS service error: %w", err)
}

audience, err := getAudienceFromContext(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get audience from the context: %w", err)
}

stsRequest := &sts.GoogleIdentityStsV1ExchangeTokenRequest{
Audience: audience,
GrantType: "urn:ietf:params:oauth:grant-type:token-exchange",
Scope: credentials.DefaultAuthScopes()[0],
RequestedTokenType: "urn:ietf:params:oauth:token-type:access_token",
SubjectTokenType: "urn:ietf:params:oauth:token-type:jwt",
SubjectToken: k8sSAToken,
}

stsResponse, err := stsService.V1.Token(stsRequest).Do()
if err != nil {
return nil, fmt.Errorf("IdentityBindingToken exchange error with audience %q: %w", audience, err)
}

return &oauth2.Token{
AccessToken: stsResponse.AccessToken,
TokenType: stsResponse.TokenType,
Expiry: time.Now().Add(time.Second * time.Duration(stsResponse.ExpiresIn)),
}, nil
}

func getAudienceFromContext(ctx context.Context) (string, error) {
projectID, err := metadata.ProjectIDWithContext(ctx)
if err != nil {
return "", fmt.Errorf("failed to get project ID: %w", err)
}
// Get all instance metadata attributes
clusterLocation, err := metadata.InstanceAttributeValueWithContext(ctx, "cluster-location")
if err != nil {
return "", fmt.Errorf("failed to get clusterLocation: %w", err)
}
clusterName, err := metadata.InstanceAttributeValueWithContext(ctx, "cluster-name")
if err != nil {
return "", fmt.Errorf("failed to get clusterName: %w", err)
}

klog.Infof("projectID: %s, clusterName: %s, clusterLocation: %s", projectID, clusterName, clusterLocation)
onePlatformClusterResourceURL := &url.URL{
Scheme: "https",
Host: "container.googleapis.com",
Path: path.Join("v1", "projects", projectID, "locations", clusterLocation, "clusters", clusterName)}

audience := fmt.Sprintf(
"identitynamespace:%s.svc.id.goog:%s",
projectID,
onePlatformClusterResourceURL,
)
klog.Infof("audience: %s", audience)

return audience, nil
}

func startTokenServer(ctx context.Context) {
// Create a unix domain socket and listen for incoming connections.
socket, err := net.Listen("unix", tokenURLSocketPath)
if err != nil {
klog.Errorf("failed to create socket %q: %v", tokenURLSocketPath, err)

return
}
klog.Infof("created a listener using the socket path %s", tokenURLSocketPath)
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) {
klog.Infof("got request on socket path %s", tokenURLSocketPath)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

k8stoken, err := getK8sToken("/gcsfuse-sa-token/token")
var stsToken *oauth2.Token
if err != nil {
klog.Errorf("failed to get k8s token from path %v", err)
w.WriteHeader(http.StatusInternalServerError)

return
}
stsToken, err = fetchIdentityBindingToken(ctx, k8stoken)
if err != nil {
klog.Errorf("failed to get sts token from path %v", err)
w.WriteHeader(http.StatusInternalServerError)

return
}
// Marshal the oauth2.Token object to JSON
jsonToken, err := json.Marshal(stsToken)
if err != nil {
klog.Errorf("failed to marshal token to JSON: %v", err)
w.WriteHeader(http.StatusInternalServerError)

return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, string(jsonToken))
})

server := http.Server{
Handler: mux,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}

if err := server.Serve(socket); err != nil {
klog.Errorf("failed to start the token server for %q: %v", tokenURLSocketPath, err)
}
}
2 changes: 1 addition & 1 deletion cmd/webhook/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ var (
certDir = flag.String("cert-dir", "/etc/tls-certs", "The directory that contains the server key and certificate.")
certName = flag.String("cert-name", "cert.pem", "The server certificate name.")
keyName = flag.String("key-name", "key.pem", "The server key name.")
imagePullPolicy = flag.String("sidecar-image-pull-policy", "IfNotPresent", "The default image pull policy for gcsfuse sidecar container.")
imagePullPolicy = flag.String("sidecar-image-pull-policy", "Always", "The default image pull policy for gcsfuse sidecar container.")
cpuRequest = flag.String("sidecar-cpu-request", "250m", "The default CPU request for gcsfuse sidecar container.")
cpuLimit = flag.String("sidecar-cpu-limit", "250m", "The default CPU limit for gcsfuse sidecar container.")
memoryRequest = flag.String("sidecar-memory-request", "256Mi", "The default memory request for gcsfuse sidecar container.")
Expand Down
2 changes: 1 addition & 1 deletion deploy/base/node/node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ spec:
privileged: true
readOnlyRootFilesystem: true
image: gke.gcr.io/gcs-fuse-csi-driver
imagePullPolicy: IfNotPresent
imagePullPolicy: Always
args:
- --v=5
- --endpoint=unix:/csi/csi.sock
Expand Down
2 changes: 1 addition & 1 deletion deploy/base/node/node_setup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ roleRef:
name: gcs-fuse-csi-publisher-role
subjects:
- kind: ServiceAccount
name: gcsfusecsi-node-sa
name: gcsfusecsi-node-sa
6 changes: 3 additions & 3 deletions deploy/base/webhook/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ spec:
drop:
- ALL
image: gke.gcr.io/gcs-fuse-csi-driver-webhook
imagePullPolicy: IfNotPresent
imagePullPolicy: Always
args:
- --sidecar-cpu-limit=0
- --sidecar-cpu-request=250m
Expand All @@ -61,7 +61,7 @@ spec:
- --health-probe-bind-address=:22031
env:
- name: SIDECAR_IMAGE_PULL_POLICY
value: "IfNotPresent"
value: "Always"
- name: SIDECAR_IMAGE
valueFrom:
configMapKeyRef:
Expand Down Expand Up @@ -117,4 +117,4 @@ spec:
- name: metrics
protocol: TCP
port: 8080
targetPort: 22032
targetPort: 22032
1 change: 1 addition & 0 deletions pkg/cloud_provider/auth/token_sources.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func (ts *GCPTokenSource) fetchK8sSAToken(ctx context.Context) (*oauth2.Token, e
return nil, fmt.Errorf("could not find token for the identity pool %q", ts.meta.GetIdentityPool())
}

klog.Infof("Calling Create SA token for SA %s/%s", ts.k8sSANamespace, ts.k8sSAName)
ttl := int64(10 * time.Minute.Seconds())
resp, err := ts.k8sClients.CreateServiceAccountToken(
ctx,
Expand Down
23 changes: 15 additions & 8 deletions pkg/sidecar_mounter/sidecar_mounter_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type MountConfig struct {
ErrWriter stderrWriterInterface `json:"-"`
FlagMap map[string]string `json:"-"`
ConfigFileFlagMap map[string]string `json:"-"`
HostNetwork bool `json:"-"`
}

var prometheusPort = 8080
Expand Down Expand Up @@ -91,17 +92,18 @@ var boolFlags = map[string]bool{
// 2. The file descriptor
// 3. GCS bucket name
// 4. Mount options passing to gcsfuse (passed by the csi mounter).
func NewMountConfig(sp string) *MountConfig {
func NewMountConfig(sp string, hnw bool) *MountConfig {
// socket path pattern: /gcsfuse-tmp/.volumes/<volume-name>/socket
tempDir := filepath.Dir(sp)
volumeName := filepath.Base(tempDir)
mc := MountConfig{
VolumeName: volumeName,
BufferDir: filepath.Join(webhook.SidecarContainerBufferVolumeMountPath, ".volumes", volumeName),
CacheDir: filepath.Join(webhook.SidecarContainerCacheVolumeMountPath, ".volumes", volumeName),
TempDir: tempDir,
ConfigFile: filepath.Join(webhook.SidecarContainerTmpVolumeMountPath, ".volumes", volumeName, "config.yaml"),
ErrWriter: NewErrorWriter(filepath.Join(tempDir, "error")),
VolumeName: volumeName,
BufferDir: filepath.Join(webhook.SidecarContainerBufferVolumeMountPath, ".volumes", volumeName),
CacheDir: filepath.Join(webhook.SidecarContainerCacheVolumeMountPath, ".volumes", volumeName),
TempDir: tempDir,
ConfigFile: filepath.Join(webhook.SidecarContainerTmpVolumeMountPath, ".volumes", volumeName, "config.yaml"),
ErrWriter: NewErrorWriter(filepath.Join(tempDir, "error")),
HostNetwork: hnw,
}

klog.Infof("connecting to socket %q", sp)
Expand Down Expand Up @@ -234,7 +236,7 @@ func (mc *MountConfig) prepareMountArgs() {
klog.Warningf("got invalid arguments for volume %q: %v. Will discard invalid args and continue to mount.",
invalidArgs, mc.VolumeName)
}

klog.Infof("configFileFlagMap: %+v", mc.ConfigFileFlagMap)
mc.FlagMap, mc.ConfigFileFlagMap = flagMap, configFileFlagMap
}

Expand Down Expand Up @@ -276,6 +278,11 @@ func (mc *MountConfig) prepareConfigFile() error {
}
}
}
if mc.HostNetwork {
configMap["gcs-auth"] = map[string]interface{}{
"token-url": "unix:///gcsfuse-tmp/.volumes/gcs-fuse-csi-ephemeral/token.sock",
}
}

yamlData, err := yaml.Marshal(&configMap)
if err != nil {
Expand Down
38 changes: 38 additions & 0 deletions pkg/sidecar_mounter/sidecar_mounter_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,44 @@ func TestPrepareConfigFile(t *testing.T) {
"cache-dir": "/gcsfuse-cache/.volumes/volume-name",
},
},
{
name: "should create valid config file when hostnetwork is enabled",
mc: &MountConfig{
ConfigFile: "./test-config-file.yaml",
ConfigFileFlagMap: map[string]string{
"logging:file-path": "/dev/fd/1",
"logging:format": "json",
"logging:severity": "error",
"write:create-empty-file": "true",
"file-cache:max-size-mb": "10000",
"file-cache:cache-file-for-range-read": "true",
"metadata-cache:stat-cache-max-size-mb": "1000",
"metadata-cache:type-cache-max-size-mb": "-1",
"cache-dir": "/gcsfuse-cache/.volumes/volume-name",
},
HostNetwork: true,
},
expectedConfig: map[string]interface{}{
"logging": map[string]interface{}{
"file-path": "/dev/fd/1",
"format": "json",
"severity": "error",
},
"write": map[string]interface{}{
"create-empty-file": true,
},
"file-cache": map[string]interface{}{
"max-size-mb": 10000,
"cache-file-for-range-read": true,
},
"metadata-cache": map[string]interface{}{
"stat-cache-max-size-mb": 1000,
"type-cache-max-size-mb": -1,
},
"cache-dir": "/gcsfuse-cache/.volumes/volume-name",
"gcs-auth": map[string]interface{}{"token-url": "unix:///gcsfuse-tmp/.volumes/gcs-fuse-csi-ephemeral/token.sock"},
},
},
{
name: "should throw error when incorrect flag is passed",
mc: &MountConfig{
Expand Down
38 changes: 38 additions & 0 deletions pkg/sidecar_mounter/token_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
Copyright 2025 The Kubernetes Authors.
Copyright 2025 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sidecarmounter

import (
"golang.org/x/oauth2"
)

type TokenManager interface {
GetTokenSource(token *oauth2.Token) oauth2.TokenSource
}

type tokenManager struct{}

func NewTokenManager() TokenManager {
return &tokenManager{}
}

func (tm *tokenManager) GetTokenSource(token *oauth2.Token) oauth2.TokenSource {
return &TokenSource{
token: token,
}
}
Loading

0 comments on commit 3b2b99f

Please sign in to comment.