diff --git a/pkg/ddc/jindo/master_internal_test.go b/pkg/ddc/jindo/master_internal_test.go index 8154c16e802..f8e9ce1f1d0 100644 --- a/pkg/ddc/jindo/master_internal_test.go +++ b/pkg/ddc/jindo/master_internal_test.go @@ -19,15 +19,15 @@ package jindo import ( "testing" - "github.com/fluid-cloudnative/fluid/pkg/common" - "k8s.io/apimachinery/pkg/api/resource" - "github.com/brahma-adshonor/gohook" datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/ddc/base" "github.com/fluid-cloudnative/fluid/pkg/ddc/base/portallocator" "github.com/fluid-cloudnative/fluid/pkg/utils/fake" "github.com/fluid-cloudnative/fluid/pkg/utils/helm" "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/net" @@ -177,6 +177,18 @@ func TestGenerateJindoValueFile(t *testing.T) { client := fake.NewFakeClientWithScheme(testScheme, testObjs...) result := resource.MustParse("20Gi") + tiredStore := datav1alpha1.TieredStore{ + Levels: []datav1alpha1.Level{{ + MediumType: common.Memory, + Quota: &result, + High: "0.8", + Low: "0.1", + }}, + } + runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", common.JindoRuntime, base.WithTieredStore(tiredStore)) + if err != nil { + t.Errorf("fail to create the runtimeInfo with error %v", err) + } engine := JindoEngine{ name: "hbase", namespace: "fluid", @@ -187,19 +199,13 @@ func TestGenerateJindoValueFile(t *testing.T) { Master: datav1alpha1.JindoCompTemplateSpec{ Replicas: 2, }, - TieredStore: datav1alpha1.TieredStore{ - Levels: []datav1alpha1.Level{{ - MediumType: common.Memory, - Quota: &result, - High: "0.8", - Low: "0.1", - }}, - }, + TieredStore: tiredStore, }, }, + runtimeInfo: runtimeInfo, } - err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, "bitmap", GetReservedPorts) + err = portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, "bitmap", GetReservedPorts) if err != nil { t.Fatal(err.Error()) } diff --git a/pkg/ddc/jindo/transform.go b/pkg/ddc/jindo/transform.go index 69248db038d..a27d09ba8e9 100644 --- a/pkg/ddc/jindo/transform.go +++ b/pkg/ddc/jindo/transform.go @@ -24,15 +24,16 @@ import ( "strings" "time" - "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" + corev1 "k8s.io/api/core/v1" datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/ddc/base/portallocator" "github.com/fluid-cloudnative/fluid/pkg/utils" "github.com/fluid-cloudnative/fluid/pkg/utils/docker" + jindoutils "github.com/fluid-cloudnative/fluid/pkg/utils/jindo" + "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" "github.com/fluid-cloudnative/fluid/pkg/utils/transformer" - corev1 "k8s.io/api/core/v1" ) func (e *JindoEngine) transform(runtime *datav1alpha1.JindoRuntime) (value *Jindo, err error) { @@ -52,36 +53,15 @@ func (e *JindoEngine) transform(runtime *datav1alpha1.JindoRuntime) (value *Jind return } - var cachePaths []string // /mnt/disk1/bigboot or /mnt/disk1/bigboot,/mnt/disk2/bigboot - storagePath := runtime.Spec.TieredStore.Levels[0].Path - originPath := strings.Split(storagePath, ",") - for _, value := range originPath { - cachePaths = append(cachePaths, strings.TrimRight(value, "/")+"/"+ - e.namespace+"/"+e.name+"/bigboot") + cachePaths, originPaths, originQuotas := jindoutils.ProcessTiredStoreInfo(e.runtimeInfo) + var quotaWithJindoUnit []string // 1Gi or 1Gi,2Gi,3Gi + for _, quota := range originQuotas { + quotaWithJindoUnit = append(quotaWithJindoUnit, utils.TransformQuantityToJindoUnit(quota)) } + metaPath := cachePaths[0] dataPath := strings.Join(cachePaths, ",") - - var userSetQuota []string // 1Gi or 1Gi,2Gi,3Gi - if runtime.Spec.TieredStore.Levels[0].Quota != nil { - userSetQuota = append(userSetQuota, utils.TransformQuantityToJindoUnit(runtime.Spec.TieredStore.Levels[0].Quota)) - } - - if runtime.Spec.TieredStore.Levels[0].QuotaList != "" { - quotaList := runtime.Spec.TieredStore.Levels[0].QuotaList - quotas := strings.Split(quotaList, ",") - if len(quotas) != len(originPath) { - err = fmt.Errorf("the num of cache path and quota must be equal") - return - } - for _, value := range quotas { - if strings.HasSuffix(value, "Gi") { - value = strings.ReplaceAll(value, "Gi", "g") - } - userSetQuota = append(userSetQuota, value) - } - } - userQuotas := strings.Join(userSetQuota, ",") // 1g or 1g,2g + userQuotas := strings.Join(quotaWithJindoUnit, ",") jindoSmartdataImage, smartdataTag, dnsServer := e.getSmartDataConfigs() jindoFuseImage, fuseTag := e.parseFuseImage() @@ -111,7 +91,7 @@ func (e *JindoEngine) transform(runtime *datav1alpha1.JindoRuntime) (value *Jind }, Mounts: Mounts{ Master: e.transformMasterMountPath(metaPath), - WorkersAndClients: e.transformWorkerMountPath(originPath), + WorkersAndClients: e.transformWorkerMountPath(originPaths), }, Owner: transformer.GenerateOwnerReferenceFromObject(runtime), RuntimeIdentity: common.RuntimeIdentity{ diff --git a/pkg/ddc/jindocache/master_internal_test.go b/pkg/ddc/jindocache/master_internal_test.go index 7f4588ed216..05c7d363f3e 100644 --- a/pkg/ddc/jindocache/master_internal_test.go +++ b/pkg/ddc/jindocache/master_internal_test.go @@ -17,6 +17,7 @@ limitations under the License. package jindocache import ( + "github.com/fluid-cloudnative/fluid/pkg/ddc/base" "testing" "github.com/fluid-cloudnative/fluid/pkg/common" @@ -84,6 +85,19 @@ func TestSetupMasterInternal(t *testing.T) { testObjs = append(testObjs, datasetInput.DeepCopy()) } client := fake.NewFakeClientWithScheme(testScheme, testObjs...) + quota := resource.MustParse("20Gi") + tiredStore := datav1alpha1.TieredStore{ + Levels: []datav1alpha1.Level{{ + MediumType: common.Memory, + Quota: "a, + High: "0.8", + Low: "0.1", + }}, + } + runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", common.JindoRuntime, base.WithTieredStore(tiredStore)) + if err != nil { + t.Errorf("fail to create the runtimeInfo with error %v", err) + } engine := JindoCacheEngine{ name: "hbase", @@ -97,8 +111,9 @@ func TestSetupMasterInternal(t *testing.T) { }, }, }, + runtimeInfo: runtimeInfo, } - err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts) + err = portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts) if err != nil { t.Fatal(err.Error()) } @@ -179,6 +194,18 @@ func TestGenerateJindoValueFile(t *testing.T) { client := fake.NewFakeClientWithScheme(testScheme, testObjs...) result := resource.MustParse("20Gi") + tiredStore := datav1alpha1.TieredStore{ + Levels: []datav1alpha1.Level{{ + MediumType: common.Memory, + Quota: &result, + High: "0.8", + Low: "0.1", + }}, + } + runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", common.JindoRuntime, base.WithTieredStore(tiredStore)) + if err != nil { + t.Errorf("fail to create the runtimeInfo with error %v", err) + } engine := JindoCacheEngine{ name: "hbase", namespace: "fluid", @@ -189,19 +216,13 @@ func TestGenerateJindoValueFile(t *testing.T) { Master: datav1alpha1.JindoCompTemplateSpec{ Replicas: 2, }, - TieredStore: datav1alpha1.TieredStore{ - Levels: []datav1alpha1.Level{{ - MediumType: common.Memory, - Quota: &result, - High: "0.8", - Low: "0.1", - }}, - }, + TieredStore: tiredStore, }, }, + runtimeInfo: runtimeInfo, } - err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, "bitmap", GetReservedPorts) + err = portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, "bitmap", GetReservedPorts) if err != nil { t.Fatal(err.Error()) } diff --git a/pkg/ddc/jindocache/transform.go b/pkg/ddc/jindocache/transform.go index 323452aa827..e249f4098a8 100644 --- a/pkg/ddc/jindocache/transform.go +++ b/pkg/ddc/jindocache/transform.go @@ -26,21 +26,21 @@ import ( "strings" "time" - versionutil "github.com/fluid-cloudnative/fluid/pkg/utils/version" "github.com/pkg/errors" - - "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/client-go/util/retry" datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/ddc/base/portallocator" "github.com/fluid-cloudnative/fluid/pkg/utils" "github.com/fluid-cloudnative/fluid/pkg/utils/docker" + jindoutils "github.com/fluid-cloudnative/fluid/pkg/utils/jindo" + "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" "github.com/fluid-cloudnative/fluid/pkg/utils/transformer" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/client-go/util/retry" + versionutil "github.com/fluid-cloudnative/fluid/pkg/utils/version" ) type smartdataConfig struct { @@ -62,47 +62,16 @@ func (e *JindoCacheEngine) transform(runtime *datav1alpha1.JindoRuntime) (value return } - var cachePaths []string // /mnt/disk1/bigboot or /mnt/disk1/bigboot,/mnt/disk2/bigboot - var storagePath = "/dev/shm/" - if len(runtime.Spec.TieredStore.Levels) > 0 { - storagePath = runtime.Spec.TieredStore.Levels[0].Path - } - originPath := strings.Split(storagePath, ",") - for _, value := range originPath { - cachePaths = append(cachePaths, strings.TrimRight(value, "/")+"/"+ - e.namespace+"/"+e.name+"/jindocache") + cachePaths, originPaths, originQuotas := jindoutils.ProcessTiredStoreInfo(e.runtimeInfo) + var quotaWithJindoUnit, quotaStrings []string // 1Gi or 1Gi,2Gi,3Gi + for _, quota := range originQuotas { + quotaWithJindoUnit = append(quotaWithJindoUnit, utils.TransformQuantityToJindoUnit(quota)) + quotaStrings = append(quotaStrings, quota.String()) } + metaPath := cachePaths[0] dataPath := strings.Join(cachePaths, ",") - - var quotas []string - var userSetQuota []string // 1Gi or 1Gi,2Gi,3Gi - if len(runtime.Spec.TieredStore.Levels) == 0 { - userSetQuota = append(userSetQuota, "1Gi") - quotas = append(quotas, "1Gi") - } else if runtime.Spec.TieredStore.Levels[0].Quota != nil { - userSetQuota = append(userSetQuota, utils.TransformQuantityToJindoUnit(runtime.Spec.TieredStore.Levels[0].Quota)) - quotas = append(quotas, runtime.Spec.TieredStore.Levels[0].Quota.String()) - } - - if len(runtime.Spec.TieredStore.Levels) != 0 && runtime.Spec.TieredStore.Levels[0].QuotaList != "" { - quotaList := runtime.Spec.TieredStore.Levels[0].QuotaList - quotas = strings.Split(quotaList, ",") - if len(quotas) != len(originPath) { - err = fmt.Errorf("the num of cache path and quota must be equal") - return - } - for _, value := range quotas { - if strings.HasSuffix(value, "Gi") { - value = strings.ReplaceAll(value, "Gi", "g") - } - if strings.HasSuffix(value, "Mi") { - value = strings.ReplaceAll(value, "Mi", "m") - } - userSetQuota = append(userSetQuota, value) - } - } - userQuotas := strings.Join(userSetQuota, ",") // 1g or 1g,2g + userQuotas := strings.Join(quotaWithJindoUnit, ",") smartdataConfig := e.getSmartDataConfigs(runtime) smartdataTag := smartdataConfig.imageTag @@ -145,7 +114,7 @@ func (e *JindoCacheEngine) transform(runtime *datav1alpha1.JindoRuntime) (value }, Mounts: Mounts{ Master: e.transformMasterMountPath(metaPath, mediumType, volumeType), - WorkersAndClients: e.transformWorkerMountPath(originPath, quotas, e.getMediumTypeFromVolumeSource(string(mediumType), runtime.Spec.TieredStore.Levels), volumeType), + WorkersAndClients: e.transformWorkerMountPath(originPaths, quotaStrings, e.getMediumTypeFromVolumeSource(string(mediumType), runtime.Spec.TieredStore.Levels), volumeType), }, Owner: transformer.GenerateOwnerReferenceFromObject(runtime), RuntimeIdentity: common.RuntimeIdentity{ diff --git a/pkg/ddc/jindocache/transform_test.go b/pkg/ddc/jindocache/transform_test.go index 76cacbd13fd..d393502d949 100644 --- a/pkg/ddc/jindocache/transform_test.go +++ b/pkg/ddc/jindocache/transform_test.go @@ -17,6 +17,7 @@ limitations under the License. package jindocache import ( + "github.com/fluid-cloudnative/fluid/pkg/ddc/base" "os" "reflect" ctrl "sigs.k8s.io/controller-runtime" @@ -636,15 +637,20 @@ func TestJindoCacheEngine_transform(t *testing.T) { s.AddKnownTypes(datav1alpha1.GroupVersion, &datav1alpha1.DatasetList{}) _ = corev1.AddToScheme(s) client := fake.NewFakeClientWithScheme(s, runtimeObjs...) + runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", common.JindoRuntime) + if err != nil { + t.Errorf("fail to create the runtimeInfo with error %v", err) + } e := &JindoCacheEngine{ - runtime: tt.fields.runtime, - name: tt.fields.name, - namespace: tt.fields.namespace, - Client: client, - Log: fake.NullLogger(), + runtime: tt.fields.runtime, + name: tt.fields.name, + namespace: tt.fields.namespace, + Client: client, + Log: fake.NullLogger(), + runtimeInfo: runtimeInfo, } tt.args.runtime = tt.fields.runtime - err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts) + err = portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts) if err != nil { t.Fatalf("failed to set up runtime port allocator due to %v", err) } @@ -1143,15 +1149,20 @@ func TestJindoCacheEngine_transformPolicy(t *testing.T) { s.AddKnownTypes(datav1alpha1.GroupVersion, &datav1alpha1.DatasetList{}) _ = corev1.AddToScheme(s) client := fake.NewFakeClientWithScheme(s, runtimeObjs...) + runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", common.JindoRuntime) + if err != nil { + t.Errorf("fail to create the runtimeInfo with error %v", err) + } e := &JindoCacheEngine{ - runtime: tt.fields.runtime, - name: tt.fields.name, - namespace: tt.fields.namespace, - Client: client, - Log: fake.NullLogger(), + runtime: tt.fields.runtime, + name: tt.fields.name, + namespace: tt.fields.namespace, + Client: client, + Log: fake.NullLogger(), + runtimeInfo: runtimeInfo, } tt.args.runtime = tt.fields.runtime - err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts) + err = portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts) if err != nil { t.Fatalf("failed to set up runtime port allocator due to %v", err) } @@ -1352,15 +1363,20 @@ func TestJindoCacheEngine_transformCacheSet(t *testing.T) { s.AddKnownTypes(datav1alpha1.GroupVersion, &datav1alpha1.DatasetList{}) _ = corev1.AddToScheme(s) client := fake.NewFakeClientWithScheme(s, runtimeObjs...) + runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", common.JindoRuntime) + if err != nil { + t.Errorf("fail to create the runtimeInfo with error %v", err) + } e := &JindoCacheEngine{ - runtime: tt.fields.runtime, - name: tt.fields.name, - namespace: tt.fields.namespace, - Client: client, - Log: fake.NullLogger(), + runtime: tt.fields.runtime, + name: tt.fields.name, + namespace: tt.fields.namespace, + Client: client, + Log: fake.NullLogger(), + runtimeInfo: runtimeInfo, } tt.args.runtime = tt.fields.runtime - err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts) + err = portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts) if err != nil { t.Fatalf("failed to set up runtime port allocator due to %v", err) } diff --git a/pkg/ddc/jindofsx/master_internal_test.go b/pkg/ddc/jindofsx/master_internal_test.go index 088236498ac..db50be443a2 100644 --- a/pkg/ddc/jindofsx/master_internal_test.go +++ b/pkg/ddc/jindofsx/master_internal_test.go @@ -17,6 +17,7 @@ limitations under the License. package jindofsx import ( + "github.com/fluid-cloudnative/fluid/pkg/ddc/base" "testing" "github.com/fluid-cloudnative/fluid/pkg/common" @@ -84,7 +85,19 @@ func TestSetupMasterInternal(t *testing.T) { testObjs = append(testObjs, datasetInput.DeepCopy()) } client := fake.NewFakeClientWithScheme(testScheme, testObjs...) - + quota := resource.MustParse("20Gi") + tiredStore := datav1alpha1.TieredStore{ + Levels: []datav1alpha1.Level{{ + MediumType: common.Memory, + Quota: "a, + High: "0.8", + Low: "0.1", + }}, + } + runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", common.JindoRuntime, base.WithTieredStore(tiredStore)) + if err != nil { + t.Errorf("fail to create the runtimeInfo with error %v", err) + } engine := JindoFSxEngine{ name: "hbase", namespace: "fluid", @@ -97,8 +110,9 @@ func TestSetupMasterInternal(t *testing.T) { }, }, }, + runtimeInfo: runtimeInfo, } - err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts) + err = portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts) if err != nil { t.Fatal(err.Error()) } @@ -176,6 +190,10 @@ func TestGenerateJindoValueFile(t *testing.T) { client := fake.NewFakeClientWithScheme(testScheme, testObjs...) result := resource.MustParse("20Gi") + runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", common.JindoRuntime) + if err != nil { + t.Errorf("fail to create the runtimeInfo with error %v", err) + } engine := JindoFSxEngine{ name: "hbase", namespace: "fluid", @@ -196,9 +214,10 @@ func TestGenerateJindoValueFile(t *testing.T) { }, }, }, + runtimeInfo: runtimeInfo, } - err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, "bitmap", GetReservedPorts) + err = portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, "bitmap", GetReservedPorts) if err != nil { t.Fatal(err.Error()) } diff --git a/pkg/ddc/jindofsx/transform.go b/pkg/ddc/jindofsx/transform.go index 8310dd0a817..c92a3e5c2c9 100644 --- a/pkg/ddc/jindofsx/transform.go +++ b/pkg/ddc/jindofsx/transform.go @@ -27,20 +27,20 @@ import ( "strings" "time" - versionutil "github.com/fluid-cloudnative/fluid/pkg/utils/version" - - "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/client-go/util/retry" datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/ddc/base/portallocator" "github.com/fluid-cloudnative/fluid/pkg/utils" "github.com/fluid-cloudnative/fluid/pkg/utils/docker" + jindoutils "github.com/fluid-cloudnative/fluid/pkg/utils/jindo" + "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" "github.com/fluid-cloudnative/fluid/pkg/utils/transformer" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/client-go/util/retry" + versionutil "github.com/fluid-cloudnative/fluid/pkg/utils/version" ) type smartdataConfig struct { @@ -62,47 +62,16 @@ func (e *JindoFSxEngine) transform(runtime *datav1alpha1.JindoRuntime) (value *J return } - var cachePaths []string // /mnt/disk1/bigboot or /mnt/disk1/bigboot,/mnt/disk2/bigboot - var storagePath = "/dev/shm/" - if len(runtime.Spec.TieredStore.Levels) > 0 { - storagePath = runtime.Spec.TieredStore.Levels[0].Path - } - originPath := strings.Split(storagePath, ",") - for _, value := range originPath { - cachePaths = append(cachePaths, strings.TrimRight(value, "/")+"/"+ - e.namespace+"/"+e.name+"/jindofsx") + cachePaths, originPaths, originQuotas := jindoutils.ProcessTiredStoreInfo(e.runtimeInfo) + var quotaWithJindoUnit, quotas []string // 1Gi or 1Gi,2Gi,3Gi + for _, quota := range originQuotas { + quotaWithJindoUnit = append(quotaWithJindoUnit, utils.TransformQuantityToJindoUnit(quota)) + quotas = append(quotas, quota.String()) } + metaPath := cachePaths[0] dataPath := strings.Join(cachePaths, ",") - - var quotas []string - var userSetQuota []string // 1Gi or 1Gi,2Gi,3Gi - if len(runtime.Spec.TieredStore.Levels) == 0 { - userSetQuota = append(userSetQuota, "1Gi") - quotas = append(quotas, "1Gi") - } else if runtime.Spec.TieredStore.Levels[0].Quota != nil { - userSetQuota = append(userSetQuota, utils.TransformQuantityToJindoUnit(runtime.Spec.TieredStore.Levels[0].Quota)) - quotas = append(quotas, runtime.Spec.TieredStore.Levels[0].Quota.String()) - } - - if len(runtime.Spec.TieredStore.Levels) != 0 && runtime.Spec.TieredStore.Levels[0].QuotaList != "" { - quotaList := runtime.Spec.TieredStore.Levels[0].QuotaList - quotas = strings.Split(quotaList, ",") - if len(quotas) != len(originPath) { - err = fmt.Errorf("the num of cache path and quota must be equal") - return - } - for _, value := range quotas { - if strings.HasSuffix(value, "Gi") { - value = strings.ReplaceAll(value, "Gi", "g") - } - if strings.HasSuffix(value, "Mi") { - value = strings.ReplaceAll(value, "Mi", "m") - } - userSetQuota = append(userSetQuota, value) - } - } - userQuotas := strings.Join(userSetQuota, ",") // 1g or 1g,2g + userQuotas := strings.Join(quotaWithJindoUnit, ",") smartdataConfig := e.getSmartDataConfigs(runtime) smartdataTag := smartdataConfig.imageTag @@ -145,7 +114,7 @@ func (e *JindoFSxEngine) transform(runtime *datav1alpha1.JindoRuntime) (value *J }, Mounts: Mounts{ Master: e.transformMasterMountPath(metaPath, mediumType, volumeType), - WorkersAndClients: e.transformWorkerMountPath(originPath, quotas, e.getMediumTypeFromVolumeSource(string(mediumType), runtime.Spec.TieredStore.Levels), volumeType), + WorkersAndClients: e.transformWorkerMountPath(originPaths, quotas, e.getMediumTypeFromVolumeSource(string(mediumType), runtime.Spec.TieredStore.Levels), volumeType), }, Owner: transformer.GenerateOwnerReferenceFromObject(runtime), RuntimeIdentity: common.RuntimeIdentity{ diff --git a/pkg/ddc/jindofsx/transform_test.go b/pkg/ddc/jindofsx/transform_test.go index 45029f1c018..7e61513c2ef 100644 --- a/pkg/ddc/jindofsx/transform_test.go +++ b/pkg/ddc/jindofsx/transform_test.go @@ -17,6 +17,7 @@ limitations under the License. package jindofsx import ( + "github.com/fluid-cloudnative/fluid/pkg/ddc/base" "os" "reflect" "testing" @@ -634,15 +635,20 @@ func TestJindoFSxEngine_transform(t *testing.T) { s.AddKnownTypes(datav1alpha1.GroupVersion, &datav1alpha1.DatasetList{}) _ = corev1.AddToScheme(s) client := fake.NewFakeClientWithScheme(s, runtimeObjs...) + runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", common.JindoRuntime) + if err != nil { + t.Errorf("fail to create the runtimeInfo with error %v", err) + } e := &JindoFSxEngine{ - runtime: tt.fields.runtime, - name: tt.fields.name, - namespace: tt.fields.namespace, - Client: client, - Log: fake.NullLogger(), + runtime: tt.fields.runtime, + name: tt.fields.name, + namespace: tt.fields.namespace, + Client: client, + Log: fake.NullLogger(), + runtimeInfo: runtimeInfo, } tt.args.runtime = tt.fields.runtime - err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts) + err = portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts) if err != nil { t.Fatalf("failed to set up runtime port allocator due to %v", err) } diff --git a/pkg/utils/jindo/jindo.go b/pkg/utils/jindo/jindo.go index 4c15f4133ab..23c7615366c 100644 --- a/pkg/utils/jindo/jindo.go +++ b/pkg/utils/jindo/jindo.go @@ -18,8 +18,12 @@ package jindo import ( "os" + "strings" + + "k8s.io/apimachinery/pkg/api/resource" "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/ddc/base" ) const ( @@ -52,3 +56,42 @@ func GetRuntimeImage() (image string) { } return } + +func ProcessTiredStoreInfo(runtimeInfo base.RuntimeInfoInterface) (originPaths []string, cachePaths []string, quotas []*resource.Quantity) { + tireStoreInfo := runtimeInfo.GetTieredStoreInfo() + var defaultStoragePath = "/dev/shm/" + + var subPath string + if GetDefaultEngineImpl() == common.JindoFSxEngineImpl { + subPath = "jindofsx" + } else if GetDefaultEngineImpl() == common.JindoFSEngineImpl { + subPath = "bigboot" + } else if GetDefaultEngineImpl() == common.JindoCacheEngineImpl { + subPath = "jindocache" + } + + if len(tireStoreInfo.Levels) > 0 { + for _, cachePath := range tireStoreInfo.Levels[0].CachePaths { + originPaths = append(originPaths, cachePath.Path) + cachePaths = append(cachePaths, strings.TrimRight(cachePath.Path, "/")+"/"+ + runtimeInfo.GetNamespace()+"/"+runtimeInfo.GetName()+"/"+subPath) + } + + } + if len(cachePaths) == 0 { + originPaths = append(originPaths, defaultStoragePath) + cachePaths = append(cachePaths, strings.TrimRight(defaultStoragePath, "/")+"/"+ + runtimeInfo.GetNamespace()+"/"+runtimeInfo.GetName()+"/"+subPath) + } + + if len(tireStoreInfo.Levels) == 0 { + Quantity1Gi, _ := resource.ParseQuantity("1Gi") + quotas = append(quotas, &Quantity1Gi) + } else { + for _, cachePath := range tireStoreInfo.Levels[0].CachePaths { + quotas = append(quotas, cachePath.Quota) + } + } + + return +} diff --git a/pkg/utils/quantity.go b/pkg/utils/quantity.go index e6170e450de..5d53c0e9b0b 100644 --- a/pkg/utils/quantity.go +++ b/pkg/utils/quantity.go @@ -41,6 +41,9 @@ func TransformQuantityToAlluxioUnit(q *resource.Quantity) (value string) { // that can be recognized by Jindo. func TransformQuantityToJindoUnit(q *resource.Quantity) (value string) { value = q.String() + if strings.HasSuffix(value, "Ti") { + value = strings.ReplaceAll(value, "Ti", "t") + } if strings.HasSuffix(value, "Gi") { value = strings.ReplaceAll(value, "Gi", "g") }