Skip to content

Commit

Permalink
Support to set single quota for multipath in tireStore.level
Browse files Browse the repository at this point in the history
Signed-off-by: jiuyu <guotongyu.gty@alibaba-inc.com>
  • Loading branch information
jiuyu committed Jan 17, 2025
1 parent 4e199c8 commit 7ff9009
Show file tree
Hide file tree
Showing 10 changed files with 201 additions and 169 deletions.
30 changes: 18 additions & 12 deletions pkg/ddc/jindo/master_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ package jindo
import (
"testing"

"github.com/fluid-cloudnative/fluid/pkg/common"
"k8s.io/apimachinery/pkg/api/resource"

"github.com/brahma-adshonor/gohook"
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base/portallocator"
"github.com/fluid-cloudnative/fluid/pkg/utils/fake"
"github.com/fluid-cloudnative/fluid/pkg/utils/helm"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/net"
Expand Down Expand Up @@ -177,6 +177,18 @@ func TestGenerateJindoValueFile(t *testing.T) {

client := fake.NewFakeClientWithScheme(testScheme, testObjs...)
result := resource.MustParse("20Gi")
tiredStore := datav1alpha1.TieredStore{
Levels: []datav1alpha1.Level{{
MediumType: common.Memory,
Quota: &result,
High: "0.8",
Low: "0.1",
}},
}
runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", common.JindoRuntime, base.WithTieredStore(tiredStore))
if err != nil {
t.Errorf("fail to create the runtimeInfo with error %v", err)
}
engine := JindoEngine{
name: "hbase",
namespace: "fluid",
Expand All @@ -187,19 +199,13 @@ func TestGenerateJindoValueFile(t *testing.T) {
Master: datav1alpha1.JindoCompTemplateSpec{
Replicas: 2,
},
TieredStore: datav1alpha1.TieredStore{
Levels: []datav1alpha1.Level{{
MediumType: common.Memory,
Quota: &result,
High: "0.8",
Low: "0.1",
}},
},
TieredStore: tiredStore,
},
},
runtimeInfo: runtimeInfo,
}

err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, "bitmap", GetReservedPorts)
err = portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, "bitmap", GetReservedPorts)
if err != nil {
t.Fatal(err.Error())
}
Expand Down
40 changes: 10 additions & 30 deletions pkg/ddc/jindo/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@ import (
"strings"
"time"

"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
corev1 "k8s.io/api/core/v1"

datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base/portallocator"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/fluid-cloudnative/fluid/pkg/utils/docker"
jindoutils "github.com/fluid-cloudnative/fluid/pkg/utils/jindo"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
"github.com/fluid-cloudnative/fluid/pkg/utils/transformer"
corev1 "k8s.io/api/core/v1"
)

func (e *JindoEngine) transform(runtime *datav1alpha1.JindoRuntime) (value *Jindo, err error) {
Expand All @@ -52,36 +53,15 @@ func (e *JindoEngine) transform(runtime *datav1alpha1.JindoRuntime) (value *Jind
return
}

var cachePaths []string // /mnt/disk1/bigboot or /mnt/disk1/bigboot,/mnt/disk2/bigboot
storagePath := runtime.Spec.TieredStore.Levels[0].Path
originPath := strings.Split(storagePath, ",")
for _, value := range originPath {
cachePaths = append(cachePaths, strings.TrimRight(value, "/")+"/"+
e.namespace+"/"+e.name+"/bigboot")
cachePaths, originPaths, originQuotas := jindoutils.ProcessTiredStoreInfo(e.runtimeInfo)
var quotaWithJindoUnit []string // 1Gi or 1Gi,2Gi,3Gi
for _, quota := range originQuotas {
quotaWithJindoUnit = append(quotaWithJindoUnit, utils.TransformQuantityToJindoUnit(quota))
}

metaPath := cachePaths[0]
dataPath := strings.Join(cachePaths, ",")

var userSetQuota []string // 1Gi or 1Gi,2Gi,3Gi
if runtime.Spec.TieredStore.Levels[0].Quota != nil {
userSetQuota = append(userSetQuota, utils.TransformQuantityToJindoUnit(runtime.Spec.TieredStore.Levels[0].Quota))
}

if runtime.Spec.TieredStore.Levels[0].QuotaList != "" {
quotaList := runtime.Spec.TieredStore.Levels[0].QuotaList
quotas := strings.Split(quotaList, ",")
if len(quotas) != len(originPath) {
err = fmt.Errorf("the num of cache path and quota must be equal")
return
}
for _, value := range quotas {
if strings.HasSuffix(value, "Gi") {
value = strings.ReplaceAll(value, "Gi", "g")
}
userSetQuota = append(userSetQuota, value)
}
}
userQuotas := strings.Join(userSetQuota, ",") // 1g or 1g,2g
userQuotas := strings.Join(quotaWithJindoUnit, ",")

jindoSmartdataImage, smartdataTag, dnsServer := e.getSmartDataConfigs()
jindoFuseImage, fuseTag := e.parseFuseImage()
Expand Down Expand Up @@ -111,7 +91,7 @@ func (e *JindoEngine) transform(runtime *datav1alpha1.JindoRuntime) (value *Jind
},
Mounts: Mounts{
Master: e.transformMasterMountPath(metaPath),
WorkersAndClients: e.transformWorkerMountPath(originPath),
WorkersAndClients: e.transformWorkerMountPath(originPaths),
},
Owner: transformer.GenerateOwnerReferenceFromObject(runtime),
RuntimeIdentity: common.RuntimeIdentity{
Expand Down
41 changes: 31 additions & 10 deletions pkg/ddc/jindocache/master_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package jindocache

import (
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
"testing"

"github.com/fluid-cloudnative/fluid/pkg/common"
Expand Down Expand Up @@ -84,6 +85,19 @@ func TestSetupMasterInternal(t *testing.T) {
testObjs = append(testObjs, datasetInput.DeepCopy())
}
client := fake.NewFakeClientWithScheme(testScheme, testObjs...)
quota := resource.MustParse("20Gi")
tiredStore := datav1alpha1.TieredStore{
Levels: []datav1alpha1.Level{{
MediumType: common.Memory,
Quota: &quota,
High: "0.8",
Low: "0.1",
}},
}
runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", common.JindoRuntime, base.WithTieredStore(tiredStore))
if err != nil {
t.Errorf("fail to create the runtimeInfo with error %v", err)
}

engine := JindoCacheEngine{
name: "hbase",
Expand All @@ -97,8 +111,9 @@ func TestSetupMasterInternal(t *testing.T) {
},
},
},
runtimeInfo: runtimeInfo,
}
err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts)
err = portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts)
if err != nil {
t.Fatal(err.Error())
}
Expand Down Expand Up @@ -179,6 +194,18 @@ func TestGenerateJindoValueFile(t *testing.T) {

client := fake.NewFakeClientWithScheme(testScheme, testObjs...)
result := resource.MustParse("20Gi")
tiredStore := datav1alpha1.TieredStore{
Levels: []datav1alpha1.Level{{
MediumType: common.Memory,
Quota: &result,
High: "0.8",
Low: "0.1",
}},
}
runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", common.JindoRuntime, base.WithTieredStore(tiredStore))
if err != nil {
t.Errorf("fail to create the runtimeInfo with error %v", err)
}
engine := JindoCacheEngine{
name: "hbase",
namespace: "fluid",
Expand All @@ -189,19 +216,13 @@ func TestGenerateJindoValueFile(t *testing.T) {
Master: datav1alpha1.JindoCompTemplateSpec{
Replicas: 2,
},
TieredStore: datav1alpha1.TieredStore{
Levels: []datav1alpha1.Level{{
MediumType: common.Memory,
Quota: &result,
High: "0.8",
Low: "0.1",
}},
},
TieredStore: tiredStore,
},
},
runtimeInfo: runtimeInfo,
}

err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, "bitmap", GetReservedPorts)
err = portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, "bitmap", GetReservedPorts)
if err != nil {
t.Fatal(err.Error())
}
Expand Down
59 changes: 14 additions & 45 deletions pkg/ddc/jindocache/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,21 @@ import (
"strings"
"time"

versionutil "github.com/fluid-cloudnative/fluid/pkg/utils/version"
"github.com/pkg/errors"

"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/client-go/util/retry"

datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base/portallocator"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/fluid-cloudnative/fluid/pkg/utils/docker"
jindoutils "github.com/fluid-cloudnative/fluid/pkg/utils/jindo"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
"github.com/fluid-cloudnative/fluid/pkg/utils/transformer"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/client-go/util/retry"
versionutil "github.com/fluid-cloudnative/fluid/pkg/utils/version"
)

type smartdataConfig struct {
Expand All @@ -62,47 +62,16 @@ func (e *JindoCacheEngine) transform(runtime *datav1alpha1.JindoRuntime) (value
return
}

var cachePaths []string // /mnt/disk1/bigboot or /mnt/disk1/bigboot,/mnt/disk2/bigboot
var storagePath = "/dev/shm/"
if len(runtime.Spec.TieredStore.Levels) > 0 {
storagePath = runtime.Spec.TieredStore.Levels[0].Path
}
originPath := strings.Split(storagePath, ",")
for _, value := range originPath {
cachePaths = append(cachePaths, strings.TrimRight(value, "/")+"/"+
e.namespace+"/"+e.name+"/jindocache")
cachePaths, originPaths, originQuotas := jindoutils.ProcessTiredStoreInfo(e.runtimeInfo)
var quotaWithJindoUnit, quotaStrings []string // 1Gi or 1Gi,2Gi,3Gi
for _, quota := range originQuotas {
quotaWithJindoUnit = append(quotaWithJindoUnit, utils.TransformQuantityToJindoUnit(quota))
quotaStrings = append(quotaStrings, quota.String())
}

metaPath := cachePaths[0]
dataPath := strings.Join(cachePaths, ",")

var quotas []string
var userSetQuota []string // 1Gi or 1Gi,2Gi,3Gi
if len(runtime.Spec.TieredStore.Levels) == 0 {
userSetQuota = append(userSetQuota, "1Gi")
quotas = append(quotas, "1Gi")
} else if runtime.Spec.TieredStore.Levels[0].Quota != nil {
userSetQuota = append(userSetQuota, utils.TransformQuantityToJindoUnit(runtime.Spec.TieredStore.Levels[0].Quota))
quotas = append(quotas, runtime.Spec.TieredStore.Levels[0].Quota.String())
}

if len(runtime.Spec.TieredStore.Levels) != 0 && runtime.Spec.TieredStore.Levels[0].QuotaList != "" {
quotaList := runtime.Spec.TieredStore.Levels[0].QuotaList
quotas = strings.Split(quotaList, ",")
if len(quotas) != len(originPath) {
err = fmt.Errorf("the num of cache path and quota must be equal")
return
}
for _, value := range quotas {
if strings.HasSuffix(value, "Gi") {
value = strings.ReplaceAll(value, "Gi", "g")
}
if strings.HasSuffix(value, "Mi") {
value = strings.ReplaceAll(value, "Mi", "m")
}
userSetQuota = append(userSetQuota, value)
}
}
userQuotas := strings.Join(userSetQuota, ",") // 1g or 1g,2g
userQuotas := strings.Join(quotaWithJindoUnit, ",")

smartdataConfig := e.getSmartDataConfigs(runtime)
smartdataTag := smartdataConfig.imageTag
Expand Down Expand Up @@ -145,7 +114,7 @@ func (e *JindoCacheEngine) transform(runtime *datav1alpha1.JindoRuntime) (value
},
Mounts: Mounts{
Master: e.transformMasterMountPath(metaPath, mediumType, volumeType),
WorkersAndClients: e.transformWorkerMountPath(originPath, quotas, e.getMediumTypeFromVolumeSource(string(mediumType), runtime.Spec.TieredStore.Levels), volumeType),
WorkersAndClients: e.transformWorkerMountPath(originPaths, quotaStrings, e.getMediumTypeFromVolumeSource(string(mediumType), runtime.Spec.TieredStore.Levels), volumeType),
},
Owner: transformer.GenerateOwnerReferenceFromObject(runtime),
RuntimeIdentity: common.RuntimeIdentity{
Expand Down
52 changes: 34 additions & 18 deletions pkg/ddc/jindocache/transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package jindocache

import (
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
"os"
"reflect"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -636,15 +637,20 @@ func TestJindoCacheEngine_transform(t *testing.T) {
s.AddKnownTypes(datav1alpha1.GroupVersion, &datav1alpha1.DatasetList{})
_ = corev1.AddToScheme(s)
client := fake.NewFakeClientWithScheme(s, runtimeObjs...)
runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", common.JindoRuntime)
if err != nil {
t.Errorf("fail to create the runtimeInfo with error %v", err)
}
e := &JindoCacheEngine{
runtime: tt.fields.runtime,
name: tt.fields.name,
namespace: tt.fields.namespace,
Client: client,
Log: fake.NullLogger(),
runtime: tt.fields.runtime,
name: tt.fields.name,
namespace: tt.fields.namespace,
Client: client,
Log: fake.NullLogger(),
runtimeInfo: runtimeInfo,
}
tt.args.runtime = tt.fields.runtime
err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts)
err = portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts)
if err != nil {
t.Fatalf("failed to set up runtime port allocator due to %v", err)
}
Expand Down Expand Up @@ -1143,15 +1149,20 @@ func TestJindoCacheEngine_transformPolicy(t *testing.T) {
s.AddKnownTypes(datav1alpha1.GroupVersion, &datav1alpha1.DatasetList{})
_ = corev1.AddToScheme(s)
client := fake.NewFakeClientWithScheme(s, runtimeObjs...)
runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", common.JindoRuntime)
if err != nil {
t.Errorf("fail to create the runtimeInfo with error %v", err)
}
e := &JindoCacheEngine{
runtime: tt.fields.runtime,
name: tt.fields.name,
namespace: tt.fields.namespace,
Client: client,
Log: fake.NullLogger(),
runtime: tt.fields.runtime,
name: tt.fields.name,
namespace: tt.fields.namespace,
Client: client,
Log: fake.NullLogger(),
runtimeInfo: runtimeInfo,
}
tt.args.runtime = tt.fields.runtime
err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts)
err = portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts)
if err != nil {
t.Fatalf("failed to set up runtime port allocator due to %v", err)
}
Expand Down Expand Up @@ -1352,15 +1363,20 @@ func TestJindoCacheEngine_transformCacheSet(t *testing.T) {
s.AddKnownTypes(datav1alpha1.GroupVersion, &datav1alpha1.DatasetList{})
_ = corev1.AddToScheme(s)
client := fake.NewFakeClientWithScheme(s, runtimeObjs...)
runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", common.JindoRuntime)
if err != nil {
t.Errorf("fail to create the runtimeInfo with error %v", err)
}
e := &JindoCacheEngine{
runtime: tt.fields.runtime,
name: tt.fields.name,
namespace: tt.fields.namespace,
Client: client,
Log: fake.NullLogger(),
runtime: tt.fields.runtime,
name: tt.fields.name,
namespace: tt.fields.namespace,
Client: client,
Log: fake.NullLogger(),
runtimeInfo: runtimeInfo,
}
tt.args.runtime = tt.fields.runtime
err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts)
err = portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts)
if err != nil {
t.Fatalf("failed to set up runtime port allocator due to %v", err)
}
Expand Down
Loading

0 comments on commit 7ff9009

Please sign in to comment.