Skip to content

Commit

Permalink
Merge pull request #1164 from MartinForReal/shafan/token
Browse files Browse the repository at this point in the history
Refactor: Extract kubeclient from cloud provider
andyzhangx authored Dec 13, 2023
2 parents dedcb67 + 486e50b commit 9bd2958
Showing 9 changed files with 196 additions and 184 deletions.
42 changes: 5 additions & 37 deletions pkg/blob/azure.go
Original file line number Diff line number Diff line change
@@ -17,7 +17,6 @@ limitations under the License.
package blob

import (
"errors"
"fmt"
"os"
"strings"
@@ -27,9 +26,7 @@ import (
"github.com/Azure/azure-sdk-for-go/storage"
"github.com/Azure/go-autorest/autorest"
"golang.org/x/net/context"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"
"sigs.k8s.io/cloud-provider-azure/pkg/azclient/configloader"
@@ -45,36 +42,20 @@ var (

// IsAzureStackCloud decides whether the driver is running on Azure Stack Cloud.
func IsAzureStackCloud(cloud *azure.Cloud) bool {
return !cloud.Config.DisableAzureStackCloud && strings.EqualFold(cloud.Config.Cloud, "AZURESTACKCLOUD")
return !cloud.DisableAzureStackCloud && strings.EqualFold(cloud.Cloud, "AZURESTACKCLOUD")
}

// getCloudProvider get Azure Cloud Provider
func GetCloudProvider(ctx context.Context, kubeconfig, nodeID, secretName, secretNamespace, userAgent string, allowEmptyCloudConfig bool, kubeAPIQPS float64, kubeAPIBurst int) (*azure.Cloud, error) {
func GetCloudProvider(ctx context.Context, kubeClient kubernetes.Interface, nodeID, secretName, secretNamespace, userAgent string, allowEmptyCloudConfig bool) (*azure.Cloud, error) {
var (
config *azure.Config
kubeClient *clientset.Clientset
fromSecret bool
err error
)

az := &azure.Cloud{}
az.Environment.StorageEndpointSuffix = storage.DefaultBaseURL

kubeCfg, err := getKubeConfig(kubeconfig)
if err == nil && kubeCfg != nil {
// set QPS and QPS Burst as higher values
klog.V(2).Infof("set QPS(%f) and QPS Burst(%d) for driver kubeClient", float32(kubeAPIQPS), kubeAPIBurst)
kubeCfg.QPS = float32(kubeAPIQPS)
kubeCfg.Burst = kubeAPIBurst
if kubeClient, err = clientset.NewForConfig(kubeCfg); err != nil {
klog.Warningf("NewForConfig failed with error: %v", err)
}
} else {
klog.Warningf("get kubeconfig(%s) failed with error: %v", kubeconfig, err)
if !os.IsNotExist(err) && !errors.Is(err, rest.ErrNotInCluster) {
return az, fmt.Errorf("failed to get KubeClient: %w", err)
}
}

if kubeClient != nil {
az.KubeClient = kubeClient
klog.V(2).Infof("reading cloud config from secret %s/%s", secretNamespace, secretName)
@@ -180,7 +161,7 @@ func (d *Driver) initializeKvClient() (*kv.BaseClient, error) {
func (d *Driver) getKeyvaultToken() (authorizer autorest.Authorizer, err error) {
env := d.cloud.Environment
kvEndPoint := strings.TrimSuffix(env.KeyVaultEndpoint, "/")
servicePrincipalToken, err := providerconfig.GetServicePrincipalToken(&d.cloud.Config.AzureAuthConfig, &env, kvEndPoint)
servicePrincipalToken, err := providerconfig.GetServicePrincipalToken(&d.cloud.AzureAuthConfig, &env, kvEndPoint)
if err != nil {
return nil, err
}
@@ -255,16 +236,3 @@ func (d *Driver) updateSubnetServiceEndpoints(ctx context.Context, vnetResourceG

return nil
}

func getKubeConfig(kubeconfig string) (config *rest.Config, err error) {
if kubeconfig != "" {
if config, err = clientcmd.BuildConfigFromFlags("", kubeconfig); err != nil {
return nil, err
}
} else {
if config, err = rest.InClusterConfig(); err != nil {
return nil, err
}
}
return config, err
}
95 changes: 10 additions & 85 deletions pkg/blob/azure_test.go
Original file line number Diff line number Diff line change
@@ -31,6 +31,7 @@ import (
"github.com/Azure/go-autorest/autorest/azure"
"github.com/stretchr/testify/assert"

"sigs.k8s.io/blob-csi-driver/pkg/util"
"sigs.k8s.io/cloud-provider-azure/pkg/azureclients/subnetclient/mocksubnetclient"
azureprovider "sigs.k8s.io/cloud-provider-azure/pkg/provider"

@@ -114,7 +115,7 @@ users:
kubeconfig: emptyKubeConfig,
nodeID: "",
allowEmptyCloudConfig: true,
expectedErr: fmt.Errorf("failed to get KubeClient: invalid configuration: no configuration has been provided, try setting KUBERNETES_MASTER environment variable"),
expectedErr: fmt.Errorf("invalid configuration: no configuration has been provided, try setting KUBERNETES_MASTER environment variable"),
},
{
desc: "[failure] out of cluster & in cluster, specify a fake kubeconfig, no credential file",
@@ -168,7 +169,14 @@ users:
}
os.Setenv(DefaultAzureCredentialFileEnv, fakeCredFile)
}
cloud, err := GetCloudProvider(context.Background(), test.kubeconfig, test.nodeID, "", "", test.userAgent, test.allowEmptyCloudConfig, 25.0, 50)
kubeClient, err := util.GetKubeClient(test.kubeconfig, 25.0, 50, "")
if err != nil {
if !reflect.DeepEqual(err, test.expectedErr) && test.expectedErr != nil && !strings.Contains(err.Error(), test.expectedErr.Error()) {
t.Errorf("desc: %s,\n input: %q, GetCloudProvider err: %v, expectedErr: %v", test.desc, test.kubeconfig, err, test.expectedErr)
}
continue
}
cloud, err := GetCloudProvider(context.Background(), kubeClient, test.nodeID, "", "", test.userAgent, test.allowEmptyCloudConfig)
if !reflect.DeepEqual(err, test.expectedErr) && test.expectedErr != nil && !strings.Contains(err.Error(), test.expectedErr.Error()) {
t.Errorf("desc: %s,\n input: %q, GetCloudProvider err: %v, expectedErr: %v", test.desc, test.kubeconfig, err, test.expectedErr)
}
@@ -374,86 +382,3 @@ func TestUpdateSubnetServiceEndpoints(t *testing.T) {
t.Run(tc.name, tc.testFunc)
}
}

func TestGetKubeConfig(t *testing.T) {
emptyKubeConfig := "empty-Kube-Config"
validKubeConfig := "valid-Kube-Config"
fakeContent := `
apiVersion: v1
clusters:
- cluster:
server: https://localhost:8080
name: foo-cluster
contexts:
- context:
cluster: foo-cluster
user: foo-user
namespace: bar
name: foo-context
current-context: foo-context
kind: Config
users:
- name: foo-user
user:
exec:
apiVersion: client.authentication.k8s.io/v1beta1
args:
- arg-1
- arg-2
command: foo-command
`
err := createTestFile(emptyKubeConfig)
if err != nil {
t.Error(err)
}
defer func() {
if err := os.Remove(emptyKubeConfig); err != nil {
t.Error(err)
}
}()

err = createTestFile(validKubeConfig)
if err != nil {
t.Error(err)
}
defer func() {
if err := os.Remove(validKubeConfig); err != nil {
t.Error(err)
}
}()

if err := os.WriteFile(validKubeConfig, []byte(fakeContent), 0666); err != nil {
t.Error(err)
}

tests := []struct {
desc string
kubeconfig string
expectError bool
envVariableHasConfig bool
envVariableConfigIsValid bool
}{
{
desc: "[success] valid kube config passed",
kubeconfig: validKubeConfig,
expectError: false,
envVariableHasConfig: false,
envVariableConfigIsValid: false,
},
{
desc: "[failure] invalid kube config passed",
kubeconfig: emptyKubeConfig,
expectError: true,
envVariableHasConfig: false,
envVariableConfigIsValid: false,
},
}

for _, test := range tests {
_, err := getKubeConfig(test.kubeconfig)
receiveError := (err != nil)
if test.expectError != receiveError {
t.Errorf("desc: %s,\n input: %q, GetCloudProvider err: %v, expectErr: %v", test.desc, test.kubeconfig, err, test.expectError)
}
}
}
30 changes: 25 additions & 5 deletions pkg/blob/blob.go
Original file line number Diff line number Diff line change
@@ -17,6 +17,8 @@ limitations under the License.
package blob

import (
"context"
"flag"
"fmt"
"os"
"strconv"
@@ -29,8 +31,6 @@ import (
az "github.com/Azure/go-autorest/autorest/azure"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/pborman/uuid"
"golang.org/x/net/context"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -43,6 +43,7 @@ import (
csicommon "sigs.k8s.io/blob-csi-driver/pkg/csi-common"
"sigs.k8s.io/blob-csi-driver/pkg/util"
azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
"sigs.k8s.io/cloud-provider-azure/pkg/provider"
azure "sigs.k8s.io/cloud-provider-azure/pkg/provider"
)

@@ -167,11 +168,29 @@ type DriverOptions struct {
SasTokenExpirationMinutes int
}

func (option *DriverOptions) AddFlags() {
flag.StringVar(&option.BlobfuseProxyEndpoint, "blobfuse-proxy-endpoint", "unix://tmp/blobfuse-proxy.sock", "blobfuse-proxy endpoint")
flag.StringVar(&option.NodeID, "nodeid", "", "node id")
flag.StringVar(&option.DriverName, "drivername", DefaultDriverName, "name of the driver")
flag.BoolVar(&option.EnableBlobfuseProxy, "enable-blobfuse-proxy", false, "using blobfuse proxy for mounts")
flag.IntVar(&option.BlobfuseProxyConnTimout, "blobfuse-proxy-connect-timeout", 5, "blobfuse proxy connection timeout(seconds)")
flag.BoolVar(&option.EnableBlobMockMount, "enable-blob-mock-mount", false, "enable mock mount(only for testing)")
flag.BoolVar(&option.EnableGetVolumeStats, "enable-get-volume-stats", false, "allow GET_VOLUME_STATS on agent node")
flag.BoolVar(&option.AppendTimeStampInCacheDir, "append-timestamp-cache-dir", false, "append timestamp into cache directory on agent node")
flag.Uint64Var(&option.MountPermissions, "mount-permissions", 0777, "mounted folder permissions")
flag.BoolVar(&option.AllowInlineVolumeKeyAccessWithIdentity, "allow-inline-volume-key-access-with-idenitity", false, "allow accessing storage account key using cluster identity for inline volume")
flag.BoolVar(&option.AppendMountErrorHelpLink, "append-mount-error-help-link", true, "Whether to include a link for help with mount errors when a mount error occurs.")
flag.BoolVar(&option.EnableAznfsMount, "enable-aznfs-mount", false, "replace nfs mount with aznfs mount")
flag.IntVar(&option.VolStatsCacheExpireInMinutes, "vol-stats-cache-expire-in-minutes", 10, "The cache expire time in minutes for volume stats cache")
flag.IntVar(&option.SasTokenExpirationMinutes, "sas-token-expiration-minutes", 1440, "sas token expiration minutes during volume cloning")
}

// Driver implements all interfaces of CSI drivers
type Driver struct {
csicommon.CSIDriver

cloud *azure.Cloud
KubeClient kubernetes.Interface
blobfuseProxyEndpoint string
// enableBlobMockMount is only for testing, DO NOT set as true in non-testing scenario
enableBlobMockMount bool
@@ -206,7 +225,8 @@ type Driver struct {

// NewDriver Creates a NewCSIDriver object. Assumes vendor version is equal to driver version &
// does not support optional driver plugin info manifest field. Refer to CSI spec for more details.
func NewDriver(options *DriverOptions, cloud *azure.Cloud) *Driver {
func NewDriver(options *DriverOptions, kubeClient kubernetes.Interface, cloud *provider.Cloud) *Driver {
var err error
d := Driver{
volLockMap: util.NewLockMap(),
subnetLockMap: util.NewLockMap(),
@@ -222,12 +242,13 @@ func NewDriver(options *DriverOptions, cloud *azure.Cloud) *Driver {
enableAznfsMount: options.EnableAznfsMount,
sasTokenExpirationMinutes: options.SasTokenExpirationMinutes,
azcopy: &util.Azcopy{},
KubeClient: kubeClient,
cloud: cloud,
}
d.Name = options.DriverName
d.Version = driverVersion
d.NodeID = options.NodeID

var err error
getter := func(key string) (interface{}, error) { return nil, nil }
if d.accountSearchCache, err = azcache.NewTimedCache(time.Minute, getter, false); err != nil {
klog.Fatalf("%v", err)
@@ -242,7 +263,6 @@ func NewDriver(options *DriverOptions, cloud *azure.Cloud) *Driver {
if d.volStatsCache, err = azcache.NewTimedCache(time.Duration(options.VolStatsCacheExpireInMinutes)*time.Minute, getter, false); err != nil {
klog.Fatalf("%v", err)
}
d.cloud = cloud
d.mounter = &mount.SafeFormatAndMount{
Interface: mount.New(""),
Exec: utilexec.New(),
33 changes: 25 additions & 8 deletions pkg/blob/blob_test.go
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@ package blob
import (
"context"
"errors"
"flag"
"fmt"
"os"
"reflect"
@@ -29,12 +30,10 @@ import (
"github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2021-09-01/storage"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"

v1api "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"

"sigs.k8s.io/blob-csi-driver/pkg/util"
"sigs.k8s.io/cloud-provider-azure/pkg/azureclients/storageaccountclient/mockstorageaccountclient"
azure "sigs.k8s.io/cloud-provider-azure/pkg/provider"
@@ -56,7 +55,7 @@ func NewFakeDriver() *Driver {
BlobfuseProxyConnTimout: 5,
EnableBlobMockMount: false,
}
driver := NewDriver(&driverOptions, &azure.Cloud{})
driver := NewDriver(&driverOptions, nil, &azure.Cloud{})
driver.Name = fakeDriverName
driver.Version = vendorVersion
driver.subnetLockMap = util.NewLockMap()
@@ -72,7 +71,7 @@ func TestNewFakeDriver(t *testing.T) {
BlobfuseProxyConnTimout: 5,
EnableBlobMockMount: false,
}
d := NewDriver(&driverOptions, &azure.Cloud{})
d := NewDriver(&driverOptions, nil, &azure.Cloud{})
assert.NotNil(t, d)
}

@@ -85,7 +84,7 @@ func TestNewDriver(t *testing.T) {
BlobfuseProxyConnTimout: 5,
EnableBlobMockMount: false,
}
driver := NewDriver(&driverOptions, &azure.Cloud{})
driver := NewDriver(&driverOptions, nil, &azure.Cloud{})
fakedriver := NewFakeDriver()
fakedriver.Name = DefaultDriverName
fakedriver.Version = driverVersion
@@ -1231,7 +1230,7 @@ func TestGetSubnetResourceID(t *testing.T) {
d.cloud.VnetResourceGroup = "foo"
actualOutput := d.getSubnetResourceID("", "", "")
expectedOutput := fmt.Sprintf(subnetTemplate, d.cloud.SubscriptionID, "foo", d.cloud.VnetName, d.cloud.SubnetName)
assert.Equal(t, actualOutput, expectedOutput, "cloud.SubscriptionID should be used as the SubID")
assert.Equal(t, actualOutput, expectedOutput, "config.SubscriptionID should be used as the SubID")
},
},
{
@@ -1259,7 +1258,7 @@ func TestGetSubnetResourceID(t *testing.T) {
d.cloud.VnetResourceGroup = ""
actualOutput := d.getSubnetResourceID("", "", "")
expectedOutput := fmt.Sprintf(subnetTemplate, "bar", d.cloud.ResourceGroup, d.cloud.VnetName, d.cloud.SubnetName)
assert.Equal(t, actualOutput, expectedOutput, "cloud.Resourcegroup should be used as the rg")
assert.Equal(t, actualOutput, expectedOutput, "config.ResourceGroup should be used as the rg")
},
},
{
@@ -1273,7 +1272,7 @@ func TestGetSubnetResourceID(t *testing.T) {
d.cloud.VnetResourceGroup = "fakeVnetResourceGroup"
actualOutput := d.getSubnetResourceID("", "", "")
expectedOutput := fmt.Sprintf(subnetTemplate, "bar", d.cloud.VnetResourceGroup, d.cloud.VnetName, d.cloud.SubnetName)
assert.Equal(t, actualOutput, expectedOutput, "cloud.VnetResourceGroup should be used as the rg")
assert.Equal(t, actualOutput, expectedOutput, "config.VnetResourceGroup should be used as the rg")
},
},
{
@@ -1703,3 +1702,21 @@ func TestIsNFSProtocol(t *testing.T) {
}
}
}

func TestDriverOptions_AddFlags(t *testing.T) {
t.Run("test options", func(t *testing.T) {
option := DriverOptions{}
option.AddFlags()
typeInfo := reflect.TypeOf(option)
numOfExpectedOptions := typeInfo.NumField()
count := 0
flag.CommandLine.VisitAll(func(f *flag.Flag) {
if !strings.Contains(f.Name, "test") {
count++
}
})
if numOfExpectedOptions != count {
t.Errorf("expected %d flags, but found %d flag in DriverOptions", numOfExpectedOptions, count)
}
})
}
4 changes: 2 additions & 2 deletions pkg/blob/controllerserver_test.go
Original file line number Diff line number Diff line change
@@ -546,8 +546,8 @@ func TestCreateVolume(t *testing.T) {
testFunc: func(t *testing.T) {
d := NewFakeDriver()
d.cloud = &azure.Cloud{}
d.cloud.Config.DisableAzureStackCloud = false
d.cloud.Config.Cloud = "AZURESTACKCLOUD"
d.cloud.DisableAzureStackCloud = false
d.cloud.Cloud = "AZURESTACKCLOUD"
d.cloud.SubscriptionID = "subID"
mp := make(map[string]string)
mp[storeAccountKeyField] = falseValue
70 changes: 25 additions & 45 deletions pkg/blobplugin/main.go
Original file line number Diff line number Diff line change
@@ -26,45 +26,37 @@ import (
"strings"

"sigs.k8s.io/blob-csi-driver/pkg/blob"
"sigs.k8s.io/blob-csi-driver/pkg/util"

"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog/v2"
)

var driverOptions blob.DriverOptions
var (
endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint")
blobfuseProxyEndpoint = flag.String("blobfuse-proxy-endpoint", "unix://tmp/blobfuse-proxy.sock", "blobfuse-proxy endpoint")
nodeID = flag.String("nodeid", "", "node id")
version = flag.Bool("version", false, "Print the version and exit.")
metricsAddress = flag.String("metrics-address", "", "export the metrics")
kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.")
driverName = flag.String("drivername", blob.DefaultDriverName, "name of the driver")
enableBlobfuseProxy = flag.Bool("enable-blobfuse-proxy", false, "using blobfuse proxy for mounts")
blobfuseProxyConnTimout = flag.Int("blobfuse-proxy-connect-timeout", 5, "blobfuse proxy connection timeout(seconds)")
enableBlobMockMount = flag.Bool("enable-blob-mock-mount", false, "enable mock mount(only for testing)")
cloudConfigSecretName = flag.String("cloud-config-secret-name", "azure-cloud-provider", "secret name of cloud config")
cloudConfigSecretNamespace = flag.String("cloud-config-secret-namespace", "kube-system", "secret namespace of cloud config")
customUserAgent = flag.String("custom-user-agent", "", "custom userAgent")
userAgentSuffix = flag.String("user-agent-suffix", "", "userAgent suffix")
allowEmptyCloudConfig = flag.Bool("allow-empty-cloud-config", true, "allow running driver without cloud config")
enableGetVolumeStats = flag.Bool("enable-get-volume-stats", false, "allow GET_VOLUME_STATS on agent node")
appendTimeStampInCacheDir = flag.Bool("append-timestamp-cache-dir", false, "append timestamp into cache directory on agent node")
mountPermissions = flag.Uint64("mount-permissions", 0777, "mounted folder permissions")
allowInlineVolumeKeyAccessWithIdentity = flag.Bool("allow-inline-volume-key-access-with-idenitity", false, "allow accessing storage account key using cluster identity for inline volume")
kubeAPIQPS = flag.Float64("kube-api-qps", 25.0, "QPS to use while communicating with the kubernetes apiserver.")
kubeAPIBurst = flag.Int("kube-api-burst", 50, "Burst to use while communicating with the kubernetes apiserver.")
appendMountErrorHelpLink = flag.Bool("append-mount-error-help-link", true, "Whether to include a link for help with mount errors when a mount error occurs.")
enableAznfsMount = flag.Bool("enable-aznfs-mount", false, "replace nfs mount with aznfs mount")
volStatsCacheExpireInMinutes = flag.Int("vol-stats-cache-expire-in-minutes", 10, "The cache expire time in minutes for volume stats cache")
sasTokenExpirationMinutes = flag.Int("sas-token-expiration-minutes", 1440, "sas token expiration minutes during volume cloning")
metricsAddress = flag.String("metrics-address", "", "export the metrics")
version = flag.Bool("version", false, "Print the version and exit.")
endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint")
kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.")
cloudConfigSecretName = flag.String("cloud-config-secret-name", "azure-cloud-provider", "secret name of cloud config")
cloudConfigSecretNamespace = flag.String("cloud-config-secret-namespace", "kube-system", "secret namespace of cloud config")
customUserAgent = flag.String("custom-user-agent", "", "custom userAgent")
userAgentSuffix = flag.String("user-agent-suffix", "", "userAgent suffix")
allowEmptyCloudConfig = flag.Bool("allow-empty-cloud-config", true, "allow running driver without cloud config")
kubeAPIQPS = flag.Float64("kube-api-qps", 25.0, "QPS to use while communicating with the kubernetes apiserver.")
kubeAPIBurst = flag.Int("kube-api-burst", 50, "Burst to use while communicating with the kubernetes apiserver.")
)

func init() {
driverOptions.AddFlags()
}

func main() {
klog.InitFlags(nil)
_ = flag.Set("logtostderr", "true")
flag.Parse()
if *version {
info, err := blob.GetVersionYAML(*driverName)
info, err := blob.GetVersionYAML(driverOptions.DriverName)
if err != nil {
klog.Fatalln(err)
}
@@ -78,33 +70,21 @@ func main() {
}

func handle() {
driverOptions := blob.DriverOptions{
NodeID: *nodeID,
DriverName: *driverName,
BlobfuseProxyEndpoint: *blobfuseProxyEndpoint,
EnableBlobfuseProxy: *enableBlobfuseProxy,
BlobfuseProxyConnTimout: *blobfuseProxyConnTimout,
EnableBlobMockMount: *enableBlobMockMount,
EnableGetVolumeStats: *enableGetVolumeStats,
AppendTimeStampInCacheDir: *appendTimeStampInCacheDir,
MountPermissions: *mountPermissions,
AllowInlineVolumeKeyAccessWithIdentity: *allowInlineVolumeKeyAccessWithIdentity,
AppendMountErrorHelpLink: *appendMountErrorHelpLink,
EnableAznfsMount: *enableAznfsMount,
VolStatsCacheExpireInMinutes: *volStatsCacheExpireInMinutes,
SasTokenExpirationMinutes: *sasTokenExpirationMinutes,
}

userAgent := blob.GetUserAgent(driverOptions.DriverName, *customUserAgent, *userAgentSuffix)
klog.V(2).Infof("driver userAgent: %s", userAgent)

cloud, err := blob.GetCloudProvider(context.Background(), *kubeconfig, driverOptions.NodeID, *cloudConfigSecretName, *cloudConfigSecretNamespace, userAgent, *allowEmptyCloudConfig, *kubeAPIQPS, *kubeAPIBurst)
kubeClient, err := util.GetKubeClient(*kubeconfig, *kubeAPIQPS, *kubeAPIBurst, userAgent)
if err != nil {
klog.Warningf("failed to get kubeClient, error: %v", err)
}

cloud, err := blob.GetCloudProvider(context.Background(), kubeClient, driverOptions.NodeID, *cloudConfigSecretName, *cloudConfigSecretNamespace, userAgent, *allowEmptyCloudConfig)
if err != nil {
klog.Fatalf("failed to get Azure Cloud Provider, error: %v", err)
}
klog.V(2).Infof("cloud: %s, location: %s, rg: %s, VnetName: %s, VnetResourceGroup: %s, SubnetName: %s", cloud.Cloud, cloud.Location, cloud.ResourceGroup, cloud.VnetName, cloud.VnetResourceGroup, cloud.SubnetName)

driver := blob.NewDriver(&driverOptions, cloud)
driver := blob.NewDriver(&driverOptions, kubeClient, cloud)
if driver == nil {
klog.Fatalln("Failed to initialize Azure Blob Storage CSI driver")
}
23 changes: 23 additions & 0 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
@@ -26,6 +26,9 @@ import (

"github.com/go-ini/ini"
"github.com/pkg/errors"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
)

@@ -305,3 +308,23 @@ func parseAzcopyJobShow(jobshow string) (AzcopyJobState, string, error) {
}
return AzcopyJobRunning, strings.ReplaceAll(segments[1], "\n", ""), nil
}

func GetKubeClient(kubeconfig string, kubeAPIQPS float64, kubeAPIBurst int, userAgent string) (kubernetes.Interface, error) {
var err error
var kubeCfg *rest.Config
if kubeCfg, err = clientcmd.BuildConfigFromFlags("", kubeconfig); err != nil {
return nil, err
}
if kubeCfg == nil {
if kubeCfg, err = rest.InClusterConfig(); err != nil {
return nil, err
}
}
//kubeCfg should not be nil
// set QPS and QPS Burst as higher values
klog.V(2).Infof("set QPS(%f) and QPS Burst(%d) for driver kubeClient", float32(kubeAPIQPS), kubeAPIBurst)
kubeCfg.QPS = float32(kubeAPIQPS)
kubeCfg.Burst = kubeAPIBurst
kubeCfg.UserAgent = userAgent
return kubernetes.NewForConfig(kubeCfg)
}
76 changes: 76 additions & 0 deletions pkg/util/util_test.go
Original file line number Diff line number Diff line change
@@ -524,3 +524,79 @@ func TestParseAzcopyJobShow(t *testing.T) {
}
}
}

func TestGetKubeConfig(t *testing.T) {
emptyKubeConfig := "empty-Kube-Config"
validKubeConfig := "valid-Kube-Config"
nonexistingConfig := "nonexisting-config"
fakeContent := `
apiVersion: v1
clusters:
- cluster:
server: https://localhost:8080
name: foo-cluster
contexts:
- context:
cluster: foo-cluster
user: foo-user
namespace: bar
name: foo-context
current-context: foo-context
kind: Config
users:
- name: foo-user
user:
exec:
apiVersion: client.authentication.k8s.io/v1beta1
args:
- arg-1
- arg-2
command: foo-command
`
if err := os.WriteFile(validKubeConfig, []byte(""), 0666); err != nil {
t.Error(err)
}
defer os.Remove(emptyKubeConfig)
if err := os.WriteFile(validKubeConfig, []byte(fakeContent), 0666); err != nil {
t.Error(err)
}
defer os.Remove(validKubeConfig)

tests := []struct {
desc string
kubeconfig string
expectError bool
envVariableHasConfig bool
envVariableConfigIsValid bool
}{
{
desc: "[success] valid kube config passed",
kubeconfig: validKubeConfig,
expectError: false,
envVariableHasConfig: false,
envVariableConfigIsValid: false,
},
{
desc: "[failure] invalid kube config passed",
kubeconfig: emptyKubeConfig,
expectError: true,
envVariableHasConfig: false,
envVariableConfigIsValid: false,
},
{
desc: "[failure] invalid kube config passed",
kubeconfig: nonexistingConfig,
expectError: true,
envVariableHasConfig: false,
envVariableConfigIsValid: false,
},
}

for _, test := range tests {
_, err := GetKubeClient(test.kubeconfig, 25.0, 50, "")
receiveError := (err != nil)
if test.expectError != receiveError {
t.Errorf("desc: %s,\n input: %q, GetCloudProvider err: %v, expectErr: %v", test.desc, test.kubeconfig, err, test.expectError)
}
}
}
7 changes: 5 additions & 2 deletions test/e2e/suite_test.go
Original file line number Diff line number Diff line change
@@ -39,6 +39,7 @@ import (
"k8s.io/kubernetes/test/e2e/framework/config"
_ "k8s.io/kubernetes/test/e2e/framework/debug/init"
"sigs.k8s.io/blob-csi-driver/pkg/blob"
"sigs.k8s.io/blob-csi-driver/pkg/util"
"sigs.k8s.io/blob-csi-driver/test/utils/azure"
"sigs.k8s.io/blob-csi-driver/test/utils/credentials"
"sigs.k8s.io/blob-csi-driver/test/utils/testutil"
@@ -151,9 +152,11 @@ var _ = ginkgo.SynchronizedBeforeSuite(func(ctx ginkgo.SpecContext) []byte {
BlobfuseProxyConnTimout: 5,
EnableBlobMockMount: false,
}
cloud, err := blob.GetCloudProvider(context.Background(), kubeconfig, driverOptions.NodeID, "", "", "", false, 0, 0)
kubeClient, err := util.GetKubeClient(kubeconfig, 25.0, 50, "")
gomega.Expect(err).NotTo(gomega.HaveOccurred())
blobDriver = blob.NewDriver(&driverOptions, cloud)
cloud, err := blob.GetCloudProvider(context.Background(), kubeClient, driverOptions.NodeID, "", "", "", false)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
blobDriver = blob.NewDriver(&driverOptions, kubeClient, cloud)
go func() {
blobDriver.Run(fmt.Sprintf("unix:///tmp/csi-%s.sock", uuid.NewUUID().String()), false)
}()

0 comments on commit 9bd2958

Please sign in to comment.