Skip to content

Commit

Permalink
Support to set single quota for multipath in tireStore.level
Browse files Browse the repository at this point in the history
Signed-off-by: jiuyu <guotongyu.gty@alibaba-inc.com>
  • Loading branch information
jiuyu committed Jan 17, 2025
1 parent 4e199c8 commit c57c209
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 132 deletions.
30 changes: 18 additions & 12 deletions pkg/ddc/jindo/master_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ package jindo
import (
"testing"

"github.com/fluid-cloudnative/fluid/pkg/common"
"k8s.io/apimachinery/pkg/api/resource"

"github.com/brahma-adshonor/gohook"
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base/portallocator"
"github.com/fluid-cloudnative/fluid/pkg/utils/fake"
"github.com/fluid-cloudnative/fluid/pkg/utils/helm"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/net"
Expand Down Expand Up @@ -177,6 +177,18 @@ func TestGenerateJindoValueFile(t *testing.T) {

client := fake.NewFakeClientWithScheme(testScheme, testObjs...)
result := resource.MustParse("20Gi")
tiredStore := datav1alpha1.TieredStore{
Levels: []datav1alpha1.Level{{
MediumType: common.Memory,
Quota: &result,
High: "0.8",
Low: "0.1",
}},
}
runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", common.JindoRuntime, base.WithTieredStore(tiredStore))
if err != nil {
t.Errorf("fail to create the runtimeInfo with error %v", err)
}
engine := JindoEngine{
name: "hbase",
namespace: "fluid",
Expand All @@ -187,19 +199,13 @@ func TestGenerateJindoValueFile(t *testing.T) {
Master: datav1alpha1.JindoCompTemplateSpec{
Replicas: 2,
},
TieredStore: datav1alpha1.TieredStore{
Levels: []datav1alpha1.Level{{
MediumType: common.Memory,
Quota: &result,
High: "0.8",
Low: "0.1",
}},
},
TieredStore: tiredStore,
},
},
runtimeInfo: runtimeInfo,
}

err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, "bitmap", GetReservedPorts)
err = portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, "bitmap", GetReservedPorts)
if err != nil {
t.Fatal(err.Error())
}
Expand Down
40 changes: 10 additions & 30 deletions pkg/ddc/jindo/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@ import (
"strings"
"time"

"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
corev1 "k8s.io/api/core/v1"

datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base/portallocator"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/fluid-cloudnative/fluid/pkg/utils/docker"
jindoutils "github.com/fluid-cloudnative/fluid/pkg/utils/jindo"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
"github.com/fluid-cloudnative/fluid/pkg/utils/transformer"
corev1 "k8s.io/api/core/v1"
)

func (e *JindoEngine) transform(runtime *datav1alpha1.JindoRuntime) (value *Jindo, err error) {
Expand All @@ -52,36 +53,15 @@ func (e *JindoEngine) transform(runtime *datav1alpha1.JindoRuntime) (value *Jind
return
}

var cachePaths []string // /mnt/disk1/bigboot or /mnt/disk1/bigboot,/mnt/disk2/bigboot
storagePath := runtime.Spec.TieredStore.Levels[0].Path
originPath := strings.Split(storagePath, ",")
for _, value := range originPath {
cachePaths = append(cachePaths, strings.TrimRight(value, "/")+"/"+
e.namespace+"/"+e.name+"/bigboot")
cachePaths, originPaths, originQuotas := jindoutils.ProcessTiredStoreInfo(e.runtimeInfo)
var quotaWithJindoUnit []string // 1Gi or 1Gi,2Gi,3Gi
for _, quota := range originQuotas {
quotaWithJindoUnit = append(quotaWithJindoUnit, utils.TransformQuantityToJindoUnit(quota))
}

metaPath := cachePaths[0]
dataPath := strings.Join(cachePaths, ",")

var userSetQuota []string // 1Gi or 1Gi,2Gi,3Gi
if runtime.Spec.TieredStore.Levels[0].Quota != nil {
userSetQuota = append(userSetQuota, utils.TransformQuantityToJindoUnit(runtime.Spec.TieredStore.Levels[0].Quota))
}

if runtime.Spec.TieredStore.Levels[0].QuotaList != "" {
quotaList := runtime.Spec.TieredStore.Levels[0].QuotaList
quotas := strings.Split(quotaList, ",")
if len(quotas) != len(originPath) {
err = fmt.Errorf("the num of cache path and quota must be equal")
return
}
for _, value := range quotas {
if strings.HasSuffix(value, "Gi") {
value = strings.ReplaceAll(value, "Gi", "g")
}
userSetQuota = append(userSetQuota, value)
}
}
userQuotas := strings.Join(userSetQuota, ",") // 1g or 1g,2g
userQuotas := strings.Join(quotaWithJindoUnit, ",")

jindoSmartdataImage, smartdataTag, dnsServer := e.getSmartDataConfigs()
jindoFuseImage, fuseTag := e.parseFuseImage()
Expand Down Expand Up @@ -111,7 +91,7 @@ func (e *JindoEngine) transform(runtime *datav1alpha1.JindoRuntime) (value *Jind
},
Mounts: Mounts{
Master: e.transformMasterMountPath(metaPath),
WorkersAndClients: e.transformWorkerMountPath(originPath),
WorkersAndClients: e.transformWorkerMountPath(originPaths),
},
Owner: transformer.GenerateOwnerReferenceFromObject(runtime),
RuntimeIdentity: common.RuntimeIdentity{
Expand Down
59 changes: 14 additions & 45 deletions pkg/ddc/jindocache/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,21 @@ import (
"strings"
"time"

versionutil "github.com/fluid-cloudnative/fluid/pkg/utils/version"
"github.com/pkg/errors"

"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/client-go/util/retry"

datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base/portallocator"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/fluid-cloudnative/fluid/pkg/utils/docker"
jindoutils "github.com/fluid-cloudnative/fluid/pkg/utils/jindo"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
"github.com/fluid-cloudnative/fluid/pkg/utils/transformer"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/client-go/util/retry"
versionutil "github.com/fluid-cloudnative/fluid/pkg/utils/version"
)

type smartdataConfig struct {
Expand All @@ -62,47 +62,16 @@ func (e *JindoCacheEngine) transform(runtime *datav1alpha1.JindoRuntime) (value
return
}

var cachePaths []string // /mnt/disk1/bigboot or /mnt/disk1/bigboot,/mnt/disk2/bigboot
var storagePath = "/dev/shm/"
if len(runtime.Spec.TieredStore.Levels) > 0 {
storagePath = runtime.Spec.TieredStore.Levels[0].Path
}
originPath := strings.Split(storagePath, ",")
for _, value := range originPath {
cachePaths = append(cachePaths, strings.TrimRight(value, "/")+"/"+
e.namespace+"/"+e.name+"/jindocache")
cachePaths, originPaths, originQuotas := jindoutils.ProcessTiredStoreInfo(e.runtimeInfo)
var quotaWithJindoUnit, quotaStrings []string // 1Gi or 1Gi,2Gi,3Gi
for _, quota := range originQuotas {
quotaWithJindoUnit = append(quotaWithJindoUnit, utils.TransformQuantityToJindoUnit(quota))
quotaStrings = append(quotaStrings, quota.String())
}

metaPath := cachePaths[0]
dataPath := strings.Join(cachePaths, ",")

var quotas []string
var userSetQuota []string // 1Gi or 1Gi,2Gi,3Gi
if len(runtime.Spec.TieredStore.Levels) == 0 {
userSetQuota = append(userSetQuota, "1Gi")
quotas = append(quotas, "1Gi")
} else if runtime.Spec.TieredStore.Levels[0].Quota != nil {
userSetQuota = append(userSetQuota, utils.TransformQuantityToJindoUnit(runtime.Spec.TieredStore.Levels[0].Quota))
quotas = append(quotas, runtime.Spec.TieredStore.Levels[0].Quota.String())
}

if len(runtime.Spec.TieredStore.Levels) != 0 && runtime.Spec.TieredStore.Levels[0].QuotaList != "" {
quotaList := runtime.Spec.TieredStore.Levels[0].QuotaList
quotas = strings.Split(quotaList, ",")
if len(quotas) != len(originPath) {
err = fmt.Errorf("the num of cache path and quota must be equal")
return
}
for _, value := range quotas {
if strings.HasSuffix(value, "Gi") {
value = strings.ReplaceAll(value, "Gi", "g")
}
if strings.HasSuffix(value, "Mi") {
value = strings.ReplaceAll(value, "Mi", "m")
}
userSetQuota = append(userSetQuota, value)
}
}
userQuotas := strings.Join(userSetQuota, ",") // 1g or 1g,2g
userQuotas := strings.Join(quotaWithJindoUnit, ",")

smartdataConfig := e.getSmartDataConfigs(runtime)
smartdataTag := smartdataConfig.imageTag
Expand Down Expand Up @@ -145,7 +114,7 @@ func (e *JindoCacheEngine) transform(runtime *datav1alpha1.JindoRuntime) (value
},
Mounts: Mounts{
Master: e.transformMasterMountPath(metaPath, mediumType, volumeType),
WorkersAndClients: e.transformWorkerMountPath(originPath, quotas, e.getMediumTypeFromVolumeSource(string(mediumType), runtime.Spec.TieredStore.Levels), volumeType),
WorkersAndClients: e.transformWorkerMountPath(originPaths, quotaStrings, e.getMediumTypeFromVolumeSource(string(mediumType), runtime.Spec.TieredStore.Levels), volumeType),
},
Owner: transformer.GenerateOwnerReferenceFromObject(runtime),
RuntimeIdentity: common.RuntimeIdentity{
Expand Down
59 changes: 14 additions & 45 deletions pkg/ddc/jindofsx/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@ import (
"strings"
"time"

versionutil "github.com/fluid-cloudnative/fluid/pkg/utils/version"

"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/client-go/util/retry"

datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base/portallocator"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/fluid-cloudnative/fluid/pkg/utils/docker"
jindoutils "github.com/fluid-cloudnative/fluid/pkg/utils/jindo"
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
"github.com/fluid-cloudnative/fluid/pkg/utils/transformer"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/client-go/util/retry"
versionutil "github.com/fluid-cloudnative/fluid/pkg/utils/version"
)

type smartdataConfig struct {
Expand All @@ -62,47 +62,16 @@ func (e *JindoFSxEngine) transform(runtime *datav1alpha1.JindoRuntime) (value *J
return
}

var cachePaths []string // /mnt/disk1/bigboot or /mnt/disk1/bigboot,/mnt/disk2/bigboot
var storagePath = "/dev/shm/"
if len(runtime.Spec.TieredStore.Levels) > 0 {
storagePath = runtime.Spec.TieredStore.Levels[0].Path
}
originPath := strings.Split(storagePath, ",")
for _, value := range originPath {
cachePaths = append(cachePaths, strings.TrimRight(value, "/")+"/"+
e.namespace+"/"+e.name+"/jindofsx")
cachePaths, originPaths, originQuotas := jindoutils.ProcessTiredStoreInfo(e.runtimeInfo)
var quotaWithJindoUnit, quotas []string // 1Gi or 1Gi,2Gi,3Gi
for _, quota := range originQuotas {
quotaWithJindoUnit = append(quotaWithJindoUnit, utils.TransformQuantityToJindoUnit(quota))
quotas = append(quotas, quota.String())
}

metaPath := cachePaths[0]
dataPath := strings.Join(cachePaths, ",")

var quotas []string
var userSetQuota []string // 1Gi or 1Gi,2Gi,3Gi
if len(runtime.Spec.TieredStore.Levels) == 0 {
userSetQuota = append(userSetQuota, "1Gi")
quotas = append(quotas, "1Gi")
} else if runtime.Spec.TieredStore.Levels[0].Quota != nil {
userSetQuota = append(userSetQuota, utils.TransformQuantityToJindoUnit(runtime.Spec.TieredStore.Levels[0].Quota))
quotas = append(quotas, runtime.Spec.TieredStore.Levels[0].Quota.String())
}

if len(runtime.Spec.TieredStore.Levels) != 0 && runtime.Spec.TieredStore.Levels[0].QuotaList != "" {
quotaList := runtime.Spec.TieredStore.Levels[0].QuotaList
quotas = strings.Split(quotaList, ",")
if len(quotas) != len(originPath) {
err = fmt.Errorf("the num of cache path and quota must be equal")
return
}
for _, value := range quotas {
if strings.HasSuffix(value, "Gi") {
value = strings.ReplaceAll(value, "Gi", "g")
}
if strings.HasSuffix(value, "Mi") {
value = strings.ReplaceAll(value, "Mi", "m")
}
userSetQuota = append(userSetQuota, value)
}
}
userQuotas := strings.Join(userSetQuota, ",") // 1g or 1g,2g
userQuotas := strings.Join(quotaWithJindoUnit, ",")

smartdataConfig := e.getSmartDataConfigs(runtime)
smartdataTag := smartdataConfig.imageTag
Expand Down Expand Up @@ -145,7 +114,7 @@ func (e *JindoFSxEngine) transform(runtime *datav1alpha1.JindoRuntime) (value *J
},
Mounts: Mounts{
Master: e.transformMasterMountPath(metaPath, mediumType, volumeType),
WorkersAndClients: e.transformWorkerMountPath(originPath, quotas, e.getMediumTypeFromVolumeSource(string(mediumType), runtime.Spec.TieredStore.Levels), volumeType),
WorkersAndClients: e.transformWorkerMountPath(originPaths, quotas, e.getMediumTypeFromVolumeSource(string(mediumType), runtime.Spec.TieredStore.Levels), volumeType),
},
Owner: transformer.GenerateOwnerReferenceFromObject(runtime),
RuntimeIdentity: common.RuntimeIdentity{
Expand Down
43 changes: 43 additions & 0 deletions pkg/utils/jindo/jindo.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ package jindo

import (
"os"
"strings"

"k8s.io/apimachinery/pkg/api/resource"

"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
)

const (
Expand Down Expand Up @@ -52,3 +56,42 @@ func GetRuntimeImage() (image string) {
}
return
}

func ProcessTiredStoreInfo(runtimeInfo base.RuntimeInfoInterface) (originPaths []string, cachePaths []string, quotas []*resource.Quantity) {
tireStoreInfo := runtimeInfo.GetTieredStoreInfo()
var defaultStoragePath = "/dev/shm/"

var subPath string
if GetDefaultEngineImpl() == common.JindoFSxEngineImpl {
subPath = "jindofsx"
} else if GetDefaultEngineImpl() == common.JindoFSEngineImpl {
subPath = "bigboot"
} else if GetDefaultEngineImpl() == common.JindoCacheEngineImpl {
subPath = "jindocache"
}

if len(tireStoreInfo.Levels) > 0 {
for _, cachePath := range tireStoreInfo.Levels[0].CachePaths {
originPaths = append(originPaths, cachePath.Path)
cachePaths = append(cachePaths, strings.TrimRight(cachePath.Path, "/")+"/"+
runtimeInfo.GetNamespace()+"/"+runtimeInfo.GetName()+"/"+subPath)
}

}
if len(cachePaths) == 0 {
originPaths = append(originPaths, defaultStoragePath)
cachePaths = append(cachePaths, strings.TrimRight(defaultStoragePath, "/")+"/"+
runtimeInfo.GetNamespace()+"/"+runtimeInfo.GetName()+"/"+subPath)
}

if len(tireStoreInfo.Levels) == 0 {
Quantity1Gi, _ := resource.ParseQuantity("1Gi")
quotas = append(quotas, &Quantity1Gi)
} else {
for _, cachePath := range tireStoreInfo.Levels[0].CachePaths {
quotas = append(quotas, cachePath.Quota)
}
}

return
}
3 changes: 3 additions & 0 deletions pkg/utils/quantity.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ func TransformQuantityToAlluxioUnit(q *resource.Quantity) (value string) {
// that can be recognized by Jindo.
func TransformQuantityToJindoUnit(q *resource.Quantity) (value string) {
value = q.String()
if strings.HasSuffix(value, "Ti") {
value = strings.ReplaceAll(value, "Ti", "t")
}
if strings.HasSuffix(value, "Gi") {
value = strings.ReplaceAll(value, "Gi", "g")
}
Expand Down

0 comments on commit c57c209

Please sign in to comment.