Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use projected token volume for hostNetwork pods. #428

Merged
merged 1 commit into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/sidecar_mounter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
)

var (
hostNetwork = flag.Bool("host-network", false, "pod hostNetwork configuration")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this goes inside mounter.Mount, do we need this flag? I think we pass (or can pass) hostNetwork flag through mountConfig.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can simplify the sidecar injection logic too.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need this to pass to mount config

Copy link
Collaborator

@hime hime Feb 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are using the new HostNetwork flag to:

  1. set mc.HostNetwork
  2. setup gcs-auth:token-url:...

To avoid creating flags, we can determine if the Pod supports this feature on gcs-fuse-csi-driver container by checking if hostNetwork is enabled on the pod and the volume we need is injected. We have a pod informer that has that information.

pod, err := s.k8sClients.GetPod(vc[VolumeContextKeyPodNamespace], vc[VolumeContextKeyPodName])

We can then pass the mountConfig from gcs-fuse-csi-driver -> gke-gcsfuse-sidecar

driver sending mc to sidecar:

mc := sidecarmounter.MountConfig{

sidecar receiving mc from driver:

if err := json.Unmarshal(msg, &mc); err != nil {

After that, we can process mc to set the configmap with the uds path

func (mc *MountConfig) prepareMountArgs() {

Copy link
Collaborator Author

@siyanshen siyanshen Feb 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Jaime, thanks for the suggestion. Somehow the resulting pod of k8sclients.GetPod(...) does not persist the HostNetwork value. It is "false" despite my test pod setting. It might be worthwhile to derive the HostNetwork from pod.Annotations["kubectl.kubernetes.io/last-applied-configuration"], but it might be very messy.

This is the last-applied annotation.
{"apiVersion":"v1","kind":"Pod","metadata":{"annotations":{"gke-gcsfuse/volumes":"true"},"name":"test-pv-pod-hnw","namespace":"ns1"},"spec":{"containers":[{"command":["sleep","3600"],"image":"busybox","name":"busybox","volumeMounts":[{"mountPath":"/data","name":"gcp-cloud-storage-pvc"},{"mountPath":"/dataEph","name":"gcs-fuse-csi-ephemeral"}]}],"hostNetwork":true,"serviceAccountName":"test-ksa-ns1","volumes":[{"name":"gcp-cloud-storage-pvc","persistentVolumeClaim":{"claimName":"gcp-cloud-storage-csi-static-pvc"}},{"csi":{"driver":"gcsfuse.csi.storage.gke.io","volumeAttributes":{"bucketName":"test-wi-host-network-2","mountOptions":"implicit-dirs"}},"name":"gcs-fuse-csi-ephemeral"}]}}

So far the cleanest and safest solution is passing the bool flag from webhook. What do you think? Let me know.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synced with Jaime offline. PodSpec was filtered in this PR: #413

This approach proves to be unfeasible even after adding back hostNetwork in podSpec. In csi driver, the msg is sent to the listner in the csi path, that looks like "/var/lib/kubelet/pods/c9a5236b-cf41-49c0-bcfe-f61988440b8f/volumes/kubernetes.io~csi/gcs-fuse-csi-ephemeral/mount". The msg received in sidecar mounter is received in the path of the volume socket: "connecting to socket "/gcsfuse-tmp/.volumes/gcs-fuse-csi-ephemeral/socket". They are not the same msg.

Will stick to the original design.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@siyanshen Do we know how all the other options are making their way from the node-server to the sidecar? Regarding the different paths being used, I believe we are using a symbolic link for socket communication. Could you clarify this so going forward we follow a different approach then?

// Create socket base path.
// Need to create symbolic link of emptyDirBasePath to socketBasePath,
// because the socket absolute path is longer than 104 characters,
// which will cause "bind: invalid argument" errors.
socketBasePath := util.GetSocketBasePath(target, m.fuseSocketDir)

gcsfusePath = flag.String("gcsfuse-path", "/gcsfuse", "gcsfuse path")
volumeBasePath = flag.String("volume-base-path", webhook.SidecarContainerTmpVolumeMountPath+"/.volumes", "volume base path")
_ = flag.Int("grace-period", 0, "grace period for gcsfuse termination. This flag has been deprecated, has no effect and will be removed in the future.")
Expand All @@ -60,7 +61,7 @@ func main() {
// 1. different gcsfuse logs mixed together.
// 2. memory usage peak.
time.Sleep(1500 * time.Millisecond)
mc := sidecarmounter.NewMountConfig(sp)
mc := sidecarmounter.NewMountConfig(sp, *hostNetwork)
if mc != nil {
if err := mounter.Mount(ctx, mc); err != nil {
mc.ErrWriter.WriteMsg(fmt.Sprintf("failed to mount bucket %q for volume %q: %v\n", mc.BucketName, mc.VolumeName, err))
Expand Down
2 changes: 1 addition & 1 deletion deploy/base/node/node_setup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ roleRef:
name: gcs-fuse-csi-publisher-role
subjects:
- kind: ServiceAccount
name: gcsfusecsi-node-sa
name: gcsfusecsi-node-sa
2 changes: 1 addition & 1 deletion deploy/base/webhook/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,4 @@ spec:
- name: metrics
protocol: TCP
port: 8080
targetPort: 22032
targetPort: 22032
143 changes: 143 additions & 0 deletions pkg/sidecar_mounter/sidecar_mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,29 @@ package sidecarmounter
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
"os/exec"
"path"
"path/filepath"
"strings"
"sync"
"syscall"
"time"

"cloud.google.com/go/compute/metadata"
credentials "cloud.google.com/go/iam/credentials/apiv1"
"github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/metrics"
"github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/webhook"
"golang.org/x/oauth2"
"google.golang.org/api/option"
"google.golang.org/api/sts/v1"
"k8s.io/klog/v2"
)

Expand All @@ -53,6 +63,13 @@ func New(mounterPath string) *Mounter {
}

func (m *Mounter) Mount(ctx context.Context, mc *MountConfig) error {
// Start the token server for HostNetwork enabled pods.
if mc.HostNetwork {
tp := filepath.Join(mc.TempDir, TokenFileName)
klog.Infof("Pod has hostNetwork enabled. Starting Token Server on %s.", tp)
go StartTokenServer(ctx, tp)
}

klog.Infof("start to mount bucket %q for volume %q", mc.BucketName, mc.VolumeName)

if err := os.MkdirAll(mc.BufferDir+TempDir, os.ModePerm); err != nil {
Expand Down Expand Up @@ -275,3 +292,129 @@ func scrapeMetrics(ctx context.Context, metricEndpoint string, w http.ResponseWr

return nil
}

func getK8sTokenFromFile(tokenPath string) (string, error) {
token, err := os.ReadFile(tokenPath)
if err != nil {
return "", fmt.Errorf("error reading token file: %w", err)
}

return strings.TrimSpace(string(token)), nil
}

func fetchIdentityBindingToken(ctx context.Context, k8sSAToken string) (*oauth2.Token, error) {
stsService, err := sts.NewService(ctx, option.WithHTTPClient(&http.Client{}))
if err != nil {
return nil, fmt.Errorf("new STS service error: %w", err)
}

audience, err := getAudienceFromContext(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get audience from the context: %w", err)
}

stsRequest := &sts.GoogleIdentityStsV1ExchangeTokenRequest{
Audience: audience,
GrantType: "urn:ietf:params:oauth:grant-type:token-exchange",
Scope: credentials.DefaultAuthScopes()[0],
RequestedTokenType: "urn:ietf:params:oauth:token-type:access_token",
SubjectTokenType: "urn:ietf:params:oauth:token-type:jwt",
SubjectToken: k8sSAToken,
}

stsResponse, err := stsService.V1.Token(stsRequest).Do()
if err != nil {
return nil, fmt.Errorf("IdentityBindingToken exchange error with audience %q: %w", audience, err)
}

return &oauth2.Token{
AccessToken: stsResponse.AccessToken,
TokenType: stsResponse.TokenType,
Expiry: time.Now().Add(time.Second * time.Duration(stsResponse.ExpiresIn)),
}, nil
}

func getAudienceFromContext(ctx context.Context) (string, error) {
projectID, err := metadata.ProjectIDWithContext(ctx)
if err != nil {
return "", fmt.Errorf("failed to get project ID: %w", err)
}
// Get all instance metadata attributes
clusterLocation, err := metadata.InstanceAttributeValueWithContext(ctx, "cluster-location")
if err != nil {
return "", fmt.Errorf("failed to get clusterLocation: %w", err)
}
clusterName, err := metadata.InstanceAttributeValueWithContext(ctx, "cluster-name")
if err != nil {
return "", fmt.Errorf("failed to get clusterName: %w", err)
}

klog.Infof("projectID: %s, clusterName: %s, clusterLocation: %s", projectID, clusterName, clusterLocation)
onePlatformClusterResourceURL := &url.URL{
Scheme: "https",
Host: "container.googleapis.com",
Path: path.Join("v1", "projects", projectID, "locations", clusterLocation, "clusters", clusterName),
}

audience := fmt.Sprintf(
"identitynamespace:%s.svc.id.goog:%s",
projectID,
onePlatformClusterResourceURL,
)
klog.Infof("audience: %s", audience)

return audience, nil
}

func StartTokenServer(ctx context.Context, tokenURLSocketPath string) {
// Create a unix domain socket and listen for incoming connections.
tokenSocketListener, err := net.Listen("unix", tokenURLSocketPath)
if err != nil {
klog.Errorf("failed to create socket %q: %v", tokenURLSocketPath, err)

return
}
klog.Infof("created a listener using the socket path %s", tokenURLSocketPath)
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) {
siyanshen marked this conversation as resolved.
Show resolved Hide resolved
ctx, cancel := context.WithCancel(ctx)
defer cancel()

k8stoken, err := getK8sTokenFromFile(webhook.SidecarContainerSATokenVolumeMountPath + "/" + webhook.K8STokenPath)
var stsToken *oauth2.Token
if err != nil {
klog.Errorf("failed to get k8s token from path %v", err)
w.WriteHeader(http.StatusInternalServerError)

return
}
stsToken, err = fetchIdentityBindingToken(ctx, k8stoken)
if err != nil {
klog.Errorf("failed to get sts token from path %v", err)
w.WriteHeader(http.StatusInternalServerError)

return
}
// Marshal the oauth2.Token object to JSON
jsonToken, err := json.Marshal(stsToken)
if err != nil {
klog.Errorf("failed to marshal token to JSON: %v", err)
w.WriteHeader(http.StatusInternalServerError)

return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, string(jsonToken))
})

server := http.Server{
Handler: mux,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}

if err := server.Serve(tokenSocketListener); !errors.Is(err, http.ErrServerClosed) {
klog.Errorf("Server for %q returns unexpected error: %v", tokenURLSocketPath, err)
}
}
28 changes: 18 additions & 10 deletions pkg/sidecar_mounter/sidecar_mounter_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ import (
)

const (
GCSFuseAppName = "gke-gcs-fuse-csi"
TempDir = "/temp-dir"
GCSFuseAppName = "gke-gcs-fuse-csi"
TempDir = "/temp-dir"
unixSocketBasePath = "unix://"
TokenFileName = "token.sock" // #nosec G101
)

// MountConfig contains the information gcsfuse needs.
Expand All @@ -52,6 +54,7 @@ type MountConfig struct {
ErrWriter stderrWriterInterface `json:"-"`
FlagMap map[string]string `json:"-"`
ConfigFileFlagMap map[string]string `json:"-"`
HostNetwork bool `json:"-"`
}

var prometheusPort = 8080
Expand Down Expand Up @@ -91,17 +94,18 @@ var boolFlags = map[string]bool{
// 2. The file descriptor
// 3. GCS bucket name
// 4. Mount options passing to gcsfuse (passed by the csi mounter).
func NewMountConfig(sp string) *MountConfig {
func NewMountConfig(sp string, hnw bool) *MountConfig {
// socket path pattern: /gcsfuse-tmp/.volumes/<volume-name>/socket
tempDir := filepath.Dir(sp)
volumeName := filepath.Base(tempDir)
mc := MountConfig{
VolumeName: volumeName,
BufferDir: filepath.Join(webhook.SidecarContainerBufferVolumeMountPath, ".volumes", volumeName),
CacheDir: filepath.Join(webhook.SidecarContainerCacheVolumeMountPath, ".volumes", volumeName),
TempDir: tempDir,
ConfigFile: filepath.Join(webhook.SidecarContainerTmpVolumeMountPath, ".volumes", volumeName, "config.yaml"),
ErrWriter: NewErrorWriter(filepath.Join(tempDir, "error")),
VolumeName: volumeName,
BufferDir: filepath.Join(webhook.SidecarContainerBufferVolumeMountPath, ".volumes", volumeName),
CacheDir: filepath.Join(webhook.SidecarContainerCacheVolumeMountPath, ".volumes", volumeName),
TempDir: tempDir,
ConfigFile: filepath.Join(webhook.SidecarContainerTmpVolumeMountPath, ".volumes", volumeName, "config.yaml"),
ErrWriter: NewErrorWriter(filepath.Join(tempDir, "error")),
HostNetwork: hnw,
}

klog.Infof("connecting to socket %q", sp)
Expand Down Expand Up @@ -234,7 +238,6 @@ func (mc *MountConfig) prepareMountArgs() {
klog.Warningf("got invalid arguments for volume %q: %v. Will discard invalid args and continue to mount.",
invalidArgs, mc.VolumeName)
}

mc.FlagMap, mc.ConfigFileFlagMap = flagMap, configFileFlagMap
}

Expand Down Expand Up @@ -276,6 +279,11 @@ func (mc *MountConfig) prepareConfigFile() error {
}
}
}
if mc.HostNetwork {
configMap["gcs-auth"] = map[string]interface{}{
"token-url": unixSocketBasePath + filepath.Join(mc.TempDir, TokenFileName),
}
}

yamlData, err := yaml.Marshal(&configMap)
if err != nil {
Expand Down
39 changes: 39 additions & 0 deletions pkg/sidecar_mounter/sidecar_mounter_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,45 @@ func TestPrepareConfigFile(t *testing.T) {
"cache-dir": "/gcsfuse-cache/.volumes/volume-name",
},
},
{
name: "should create valid config file when hostnetwork is enabled",
mc: &MountConfig{
ConfigFile: "./test-config-file.yaml",
TempDir: "/gcsfuse-tmp/.volumes/vol1",
ConfigFileFlagMap: map[string]string{
"logging:file-path": "/dev/fd/1",
"logging:format": "json",
"logging:severity": "error",
"write:create-empty-file": "true",
"file-cache:max-size-mb": "10000",
"file-cache:cache-file-for-range-read": "true",
"metadata-cache:stat-cache-max-size-mb": "1000",
"metadata-cache:type-cache-max-size-mb": "-1",
"cache-dir": "/gcsfuse-cache/.volumes/volume-name",
},
HostNetwork: true,
},
expectedConfig: map[string]interface{}{
"logging": map[string]interface{}{
"file-path": "/dev/fd/1",
"format": "json",
"severity": "error",
},
"write": map[string]interface{}{
"create-empty-file": true,
},
"file-cache": map[string]interface{}{
"max-size-mb": 10000,
"cache-file-for-range-read": true,
},
"metadata-cache": map[string]interface{}{
"stat-cache-max-size-mb": 1000,
"type-cache-max-size-mb": -1,
},
"cache-dir": "/gcsfuse-cache/.volumes/volume-name",
"gcs-auth": map[string]interface{}{"token-url": "unix:///gcsfuse-tmp/.volumes/vol1/token.sock"},
},
},
{
name: "should throw error when incorrect flag is passed",
mc: &MountConfig{
Expand Down
1 change: 1 addition & 0 deletions pkg/webhook/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
)

type Config struct {
HostNetwork bool `json:"-"`
ContainerImage string `json:"-"`
MetadataContainerImage string `json:"-"`
ImagePullPolicy string `json:"-"`
Expand Down
14 changes: 13 additions & 1 deletion pkg/webhook/mutatingwebhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"net/http"

"cloud.google.com/go/compute/metadata"
admissionv1 "k8s.io/api/admission/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/version"
Expand Down Expand Up @@ -55,7 +56,7 @@ type SidecarInjector struct {
}

// Handle injects a gcsfuse sidecar container and a emptyDir to incoming qualified pods.
func (si *SidecarInjector) Handle(_ context.Context, req admission.Request) admission.Response {
func (si *SidecarInjector) Handle(ctx context.Context, req admission.Request) admission.Response {
pod := &corev1.Pod{}

if err := si.Decoder.Decode(req, pod); err != nil {
Expand Down Expand Up @@ -94,6 +95,8 @@ func (si *SidecarInjector) Handle(_ context.Context, req admission.Request) admi
return admission.Errored(http.StatusBadRequest, err)
}

config.HostNetwork = pod.Spec.HostNetwork

if userProvidedGcsFuseSidecarImage, err := ExtractImageAndDeleteContainer(&pod.Spec, GcsFuseSidecarName); err == nil {
if userProvidedGcsFuseSidecarImage != "" {
config.ContainerImage = userProvidedGcsFuseSidecarImage
Expand All @@ -110,6 +113,15 @@ func (si *SidecarInjector) Handle(_ context.Context, req admission.Request) admi

// Inject container.
injectSidecarContainer(pod, config, injectAsNativeSidecar)

if pod.Spec.HostNetwork {
hime marked this conversation as resolved.
Show resolved Hide resolved
projectID, err := metadata.ProjectIDWithContext(ctx)
if err != nil {
return admission.Errored(http.StatusInternalServerError, fmt.Errorf("failed to get project id: %w", err))
}
pod.Spec.Volumes = append(pod.Spec.Volumes, GetSATokenVolume(projectID))
hime marked this conversation as resolved.
Show resolved Hide resolved
}

pod.Spec.Volumes = append(GetSidecarContainerVolumeSpec(pod.Spec.Volumes...), pod.Spec.Volumes...)

// Log pod mutation.
Expand Down
Loading