Skip to content

Commit

Permalink
Enhancement: add hashCache for namespacedName related labels
Browse files Browse the repository at this point in the history
Signed-off-by: jiuyu <guotongyu.gty@alibaba-inc.com>
  • Loading branch information
jiuyu committed Jan 13, 2025
1 parent 58ce2ea commit cba449f
Show file tree
Hide file tree
Showing 71 changed files with 476 additions and 450 deletions.
2 changes: 2 additions & 0 deletions pkg/common/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ const (
// i.e. fluid.io/managed-by
LabelAnnotationManagedBy = LabelAnnotationPrefix + "managed-by"

LabelAnnotationNamespacedNameHashAlias = LabelAnnotationDataset + ".namespaced-name-alias"

// fluid adminssion webhook inject flag
// i.e. fluid.io/enable-injection
EnableFluidInjectionFlag = LabelAnnotationPrefix + "enable-injection"
Expand Down
49 changes: 48 additions & 1 deletion pkg/controllers/runtime_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func NewRuntimeReconciler(reconciler RuntimeReconcilerInterface, client client.C

// ReconcileInternal handles the logic of reconcile runtime
func (r *RuntimeReconciler) ReconcileInternal(ctx cruntime.ReconcileRequestContext) (ctrl.Result, error) {

// 0. Set context time limit
ctxWithTimeout, cancel := context.WithTimeout(ctx.Context, reconcileTimeout)
defer cancel()
Expand Down Expand Up @@ -139,6 +138,11 @@ func (r *RuntimeReconciler) ReconcileInternal(ctx cruntime.ReconcileRequestConte
if !utils.ContainsOwners(objectMeta.GetOwnerReferences(), dataset) {
return r.AddOwnerAndRequeue(ctx, dataset)
}

if err := r.AddNamespacedNameHashAliasLabelOndemand(ctx, dataset); err != nil {
return utils.RequeueIfError(err)
}

if !dataset.CanbeBound(ctx.Name, ctx.Namespace, ctx.Category) {
ctx.Log.Info("the dataset can't be bound to the runtime, because it's already bound to another runtime ",
"dataset", dataset.Name)
Expand Down Expand Up @@ -340,6 +344,49 @@ func (r *RuntimeReconciler) AddOwnerAndRequeue(ctx cruntime.ReconcileRequestCont
return utils.RequeueImmediately()
}

// AddNamespacedNameHashAliasLabelOndemand add namesapcedNameHashAlias label on demand
func (r *RuntimeReconciler) AddNamespacedNameHashAliasLabelOndemand(ctx cruntime.ReconcileRequestContext, dataset *datav1alpha1.Dataset) error {
if dataset.GetLabels()[common.LabelAnnotationNamespacedNameHashAlias] != "" {
// hashAlias has been added before
return nil
}

return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
dataset, err := r.implement.GetDataset(ctx)
if err != nil {
return err
}
datasetLabels := dataset.GetLabels()
var namespacedNameHashAlias string

// 1. check if the length of namespacedNameHashAlias is longer than 63, which needs to calculate hash alias deterministically
if len(fmt.Sprintf("%s-%s", dataset.Namespace, dataset.Name)) > 63 {
namespacedNameHashAlias = utils.GetNamespacedNameValueWithPrefix("", dataset.Namespace, dataset.Name, "")
} else {
// 2. the length of prefixed namespaced values may longer than 63 and the hash alias has been calculated and stored in cache
found := false
namespacedNameHashAlias, found = utils.GetNamespacedNameHashValueByKey(fmt.Sprintf("%s-%s", dataset.Namespace, dataset.Name))
if !found {
return nil
}
}

if namespacedNameHashAlias == "" {
return nil
}

datasetLabels[common.LabelAnnotationNamespacedNameHashAlias] = namespacedNameHashAlias
datasetToUpdate := dataset.DeepCopy()
datasetToUpdate.Labels = datasetLabels
if err := r.Update(ctx, datasetToUpdate); err != nil {
ctx.Log.Error(err, "Failed to add NamespacedNameHashAliasLabel", "StatusUpdateError", ctx)
return err
}
utils.RemoveNamespacedNameHashValueByKey(fmt.Sprintf("%s-%s", dataset.Namespace, dataset.Name))
return nil
})
}

// GetRuntimeObjectMeta gets runtime object meta
func (r *RuntimeReconciler) GetRuntimeObjectMeta(ctx cruntime.ReconcileRequestContext) (objectMeta metav1.Object, err error) {
objectMetaAccessor, isOM := ctx.Runtime.(metav1.ObjectMetaAccessor)
Expand Down
8 changes: 6 additions & 2 deletions pkg/csi/plugins/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
// 4. remove label on node
// Once the label is removed, fuse pod on corresponding node will be terminated
// since node selector in the fuse daemonSet no longer matches.
fuseLabelKey := utils.GetFuseLabelName(namespace, name)
fuseLabelKey := utils.GetFuseLabelName(namespace, name, runtimeInfo.GetNamespacedNameAlias())
var labelsToModify common.LabelsToModify
labelsToModify.Delete(fuseLabelKey)

Expand Down Expand Up @@ -400,7 +400,11 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
}

// 4. Label node to launch FUSE Pod
fuseLabelKey := utils.GetFuseLabelName(namespace, name)
runtimeInfo, err := base.GetRuntimeInfo(ns.client, name, namespace)
if err != nil {
return nil, errors.Wrapf(err, "NodeStageVolume: failed to get runtime info for %s/%s", namespace, name)
}
fuseLabelKey := utils.GetFuseLabelName(namespace, name, runtimeInfo.GetNamespacedNameAlias())
var labelsToModify common.LabelsToModify
labelsToModify.Add(fuseLabelKey, "true")

Expand Down
4 changes: 2 additions & 2 deletions pkg/ctrl/fuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (e *Helper) CheckFuseHealthy(recorder record.EventRecorder, runtime base.Ru
func (e *Helper) CleanUpFuse() (count int, err error) {
var (
nodeList = &corev1.NodeList{}
fuseLabelKey = utils.GetFuseLabelName(e.runtimeInfo.GetNamespace(), e.runtimeInfo.GetName())
fuseLabelKey = utils.GetFuseLabelName(e.runtimeInfo.GetNamespace(), e.runtimeInfo.GetName(), e.runtimeInfo.GetNamespacedNameAlias())
)

labelNames := []string{fuseLabelKey}
Expand Down Expand Up @@ -153,7 +153,7 @@ func (e *Helper) CleanUpFuse() (count int, err error) {
func (e *Helper) GetFuseNodes() (nodes []corev1.Node, err error) {
var (
nodeList = &corev1.NodeList{}
fuseLabelKey = utils.GetFuseLabelName(e.runtimeInfo.GetNamespace(), e.runtimeInfo.GetName())
fuseLabelKey = utils.GetFuseLabelName(e.runtimeInfo.GetNamespace(), e.runtimeInfo.GetName(), e.runtimeInfo.GetNamespacedNameAlias())
)

labelNames := []string{fuseLabelKey}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddc/alluxio/deprecated_label.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (e *AlluxioEngine) HasDeprecatedCommonLabelname() (deprecated bool, err err
nodeSelectors := workers.Spec.Template.Spec.NodeSelector
e.Log.Info("The current node selectors for worker", "workerName", workerName, "nodeSelector", nodeSelectors)

deprecatedCommonLabelName := utils.GetCommonLabelName(true, e.namespace, e.name)
deprecatedCommonLabelName := utils.GetCommonLabelName(true, e.namespace, e.name, e.runtimeInfo.GetNamespacedNameAlias())
if _, deprecated = nodeSelectors[deprecatedCommonLabelName]; deprecated {
//
e.Log.Info("the deprecated node selector exists", "nodeselector", deprecatedCommonLabelName)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddc/alluxio/deprecated_label_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestAlluxioEngine_GetDeprecatedCommonLabelname(t *testing.T) {
},
}
for _, test := range testCases {
out := utils.GetCommonLabelName(true, test.namespace, test.name)
out := utils.GetCommonLabelName(true, test.namespace, test.name, "")
if out != test.out {
t.Errorf("input parameter is %s-%s,expected %s, got %s", test.namespace, test.name, test.out, out)
}
Expand Down
18 changes: 16 additions & 2 deletions pkg/ddc/alluxio/master_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package alluxio

import (
"fmt"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
"testing"

"github.com/brahma-adshonor/gohook"
Expand Down Expand Up @@ -85,6 +86,11 @@ func TestSetupMasterInternal(t *testing.T) {
}
client := fake.NewFakeClientWithScheme(testScheme, testObjs...)

runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", "alluxio")
if err != nil {
t.Errorf("fail to create the runtimeInfo with error %v", err)
}

engine := AlluxioEngine{
name: "hbase",
namespace: "fluid",
Expand All @@ -100,8 +106,10 @@ func TestSetupMasterInternal(t *testing.T) {
},
},
},
runtimeInfo: runtimeInfo,
}
err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts)

err = portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts)
if err != nil {
t.Fatal(err.Error())
}
Expand Down Expand Up @@ -195,6 +203,11 @@ func TestGenerateAlluxioValueFile(t *testing.T) {

client := fake.NewFakeClientWithScheme(testScheme, testObjs...)

runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", "alluxio")
if err != nil {
t.Errorf("fail to create the runtimeInfo with error %v", err)
}

engine := AlluxioEngine{
name: "hbase",
namespace: "fluid",
Expand All @@ -210,9 +223,10 @@ func TestGenerateAlluxioValueFile(t *testing.T) {
},
},
},
runtimeInfo: runtimeInfo,
}

err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, "bitmap", GetReservedPorts)
err = portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, "bitmap", GetReservedPorts)
if err != nil {
t.Fatal(err.Error())
}
Expand Down
33 changes: 17 additions & 16 deletions pkg/ddc/alluxio/runtime_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,23 @@ func (e *AlluxioEngine) getRuntimeInfo() (base.RuntimeInfoInterface, error) {
e.runtimeInfo.SetFuseNodeSelector(runtime.Spec.Fuse.NodeSelector)

if !e.UnitTest {
// Setup with Dataset Info
dataset, err := utils.GetDataset(e.Client, e.name, e.namespace)
if err != nil {
if utils.IgnoreNotFound(err) == nil {
e.Log.Info("Dataset is notfound", "name", e.name, "namespace", e.namespace)
return e.runtimeInfo, nil
}

e.Log.Info("Failed to get dataset when getRuntimeInfo")
return e.runtimeInfo, err
}

e.runtimeInfo.SetupWithDataset(dataset)
e.Log.Info("Setup with dataset done", "exclusive", e.runtimeInfo.IsExclusive())

e.runtimeInfo.SetNamespacedNameHashAliasWithDataset(dataset)

// Check if the runtime is using deprecated labels
isLabelDeprecated, err := e.HasDeprecatedCommonLabelname()
if err != nil {
Expand All @@ -58,22 +75,6 @@ func (e *AlluxioEngine) getRuntimeInfo() (base.RuntimeInfoInterface, error) {
e.runtimeInfo.SetDeprecatedPVName(isPVNameDeprecated)

e.Log.Info("Deprecation check finished", "isLabelDeprecated", e.runtimeInfo.IsDeprecatedNodeLabel(), "isPVNameDeprecated", e.runtimeInfo.IsDeprecatedPVName())

// Setup with Dataset Info
dataset, err := utils.GetDataset(e.Client, e.name, e.namespace)
if err != nil {
if utils.IgnoreNotFound(err) == nil {
e.Log.Info("Dataset is notfound", "name", e.name, "namespace", e.namespace)
return e.runtimeInfo, nil
}

e.Log.Info("Failed to get dataset when getRuntimeInfo")
return e.runtimeInfo, err
}

e.runtimeInfo.SetupWithDataset(dataset)

e.Log.Info("Setup with dataset done", "exclusive", e.runtimeInfo.IsExclusive())
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ddc/alluxio/shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (e *AlluxioEngine) destroyWorkers(expectedWorkers int32) (currentWorkers in
labelsToModify.Delete(label)
}

exclusiveLabelValue := utils.GetExclusiveValue(e.namespace, e.name)
exclusiveLabelValue := runtimeInfo.GetExclusiveLabelValue()
if val, exist := toUpdate.Labels[labelExclusiveName]; exist && val == exclusiveLabelValue {
labelsToModify.Delete(labelExclusiveName)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddc/alluxio/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (e *AlluxioEngine) transform(runtime *datav1alpha1.AlluxioRuntime) (value *
}

value.FullnameOverride = e.name
value.FullNamespacedNameOverride = utils.TransferFullNamespacedNameWithPrefixToLegalValue("", e.namespace, e.name)
value.FullNamespacedNameOverride = utils.GetNamespacedNameValueWithPrefix("", e.namespace, e.name, e.runtimeInfo.GetNamespacedNameAlias())

// 1.transform the common part
err = e.transformCommonPart(runtime, dataset, value)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddc/alluxio/transform_fuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (e *AlluxioEngine) transformFuse(runtime *datav1alpha1.AlluxioRuntime, data
} else {
value.Fuse.NodeSelector = map[string]string{}
}
value.Fuse.NodeSelector[utils.GetFuseLabelName(runtime.Namespace, runtime.Name)] = "true"
value.Fuse.NodeSelector[utils.GetFuseLabelName(runtime.Namespace, runtime.Name, e.runtimeInfo.GetNamespacedNameAlias())] = "true"

// parse fuse container network mode
value.Fuse.HostNetwork = datav1alpha1.IsHostNetwork(runtime.Spec.Fuse.NetworkMode)
Expand Down
16 changes: 10 additions & 6 deletions pkg/ddc/base/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,23 @@ import (
)

func (info *RuntimeInfo) GetLabelNameForMemory() string {
return utils.GetLabelNameForMemory(info.IsDeprecatedNodeLabel(), info.runtimeType, info.namespace, info.name)
return utils.GetLabelNameForMemory(info.IsDeprecatedNodeLabel(), info.runtimeType, info.namespace, info.name, info.namespacedNameAlias)
}

func (info *RuntimeInfo) GetLabelNameForDisk() string {
return utils.GetLabelNameForDisk(info.IsDeprecatedNodeLabel(), info.runtimeType, info.namespace, info.name)
return utils.GetLabelNameForDisk(info.IsDeprecatedNodeLabel(), info.runtimeType, info.namespace, info.name, info.namespacedNameAlias)
}

func (info *RuntimeInfo) GetLabelNameForTotal() string {
return utils.GetLabelNameForTotal(info.IsDeprecatedNodeLabel(), info.runtimeType, info.namespace, info.name)
return utils.GetLabelNameForTotal(info.IsDeprecatedNodeLabel(), info.runtimeType, info.namespace, info.name, info.namespacedNameAlias)
}

func (info *RuntimeInfo) GetCommonLabelName() string {
return utils.GetCommonLabelName(info.IsDeprecatedNodeLabel(), info.namespace, info.name)
return utils.GetCommonLabelName(info.IsDeprecatedNodeLabel(), info.namespace, info.name, info.namespacedNameAlias)
}

func (info *RuntimeInfo) GetRuntimeLabelName() string {
return utils.GetRuntimeLabelName(info.IsDeprecatedNodeLabel(), info.runtimeType, info.namespace, info.name)
return utils.GetRuntimeLabelName(info.IsDeprecatedNodeLabel(), info.runtimeType, info.namespace, info.name, info.namespacedNameAlias)
}

// GetDatasetNumLabelname get the label to record how much datasets on a node
Expand All @@ -48,5 +48,9 @@ func (info *RuntimeInfo) GetDatasetNumLabelName() string {

// GetFuseLabelName gets the label indicating a fuse running on some node.
func (info *RuntimeInfo) GetFuseLabelName() string {
return utils.TransferFullNamespacedNameWithPrefixToLegalValue(common.LabelAnnotationFusePrefix, info.namespace, info.name)
return utils.GetNamespacedNameValueWithPrefix(common.LabelAnnotationFusePrefix, info.namespace, info.name, info.namespacedNameAlias)
}

func (info *RuntimeInfo) GetExclusiveLabelValue() string {
return utils.GetNamespacedNameValueWithPrefix("", info.namespace, info.name, info.namespacedNameAlias)
}
2 changes: 1 addition & 1 deletion pkg/ddc/base/label_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestGetStorageLabelName(t *testing.T) {
}

for _, test := range tests {
result := utils.GetStorageLabelName(common.HumanReadType, common.MemoryStorageType, test.info.IsDeprecatedNodeLabel(), test.info.runtimeType, test.info.namespace, test.info.name)
result := utils.GetStorageLabelName(common.HumanReadType, common.MemoryStorageType, test.info.IsDeprecatedNodeLabel(), test.info.runtimeType, test.info.namespace, test.info.name, "")
if test.expectedResult != result {
t.Errorf("check failure, expected %s, get %s", test.expectedResult, result)
}
Expand Down
23 changes: 20 additions & 3 deletions pkg/ddc/base/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type Conventions interface {
GetDatasetNumLabelName() string

GetWorkerStatefulsetName() string

GetExclusiveLabelValue() string
}

// Runtime Information interface defines the interfaces that should be implemented
Expand All @@ -67,6 +69,8 @@ type RuntimeInfoInterface interface {

GetNamespace() string

GetNamespacedNameAlias() string

GetRuntimeType() string

IsExclusive() bool
Expand All @@ -77,6 +81,8 @@ type RuntimeInfoInterface interface {

SetupWithDataset(dataset *datav1alpha1.Dataset)

SetNamespacedNameHashAliasWithDataset(dataset *datav1alpha1.Dataset)

GetFuseNodeSelector() (nodeSelector map[string]string)

GetFuseCleanPolicy() datav1alpha1.FuseCleanPolicy
Expand Down Expand Up @@ -104,9 +110,10 @@ var _ RuntimeInfoInterface = &RuntimeInfo{}

// The real Runtime Info should implement
type RuntimeInfo struct {
name string
namespace string
runtimeType string
name string
namespace string
namespacedNameAlias string
runtimeType string

//tieredstore datav1alpha1.TieredStore
tieredstoreInfo TieredStoreInfo
Expand Down Expand Up @@ -272,6 +279,10 @@ func (info *RuntimeInfo) GetNamespace() string {
return info.namespace
}

func (info *RuntimeInfo) GetNamespacedNameAlias() string {
return info.namespacedNameAlias
}

// GetRuntimeType gets runtime type
func (info *RuntimeInfo) GetRuntimeType() string {
return info.runtimeType
Expand All @@ -287,6 +298,11 @@ func (info *RuntimeInfo) SetupWithDataset(dataset *datav1alpha1.Dataset) {
info.exclusive = dataset.IsExclusiveMode()
}

// SetupWithDataset determines if need to setup with the info of dataset
func (info *RuntimeInfo) SetNamespacedNameHashAliasWithDataset(dataset *datav1alpha1.Dataset) {
info.namespacedNameAlias = dataset.Labels[common.LabelAnnotationNamespacedNameHashAlias]
}

// SetFuseNodeSelector setups the fuse deploy mode
func (info *RuntimeInfo) SetFuseNodeSelector(nodeSelector map[string]string) {
info.fuse.NodeSelector = nodeSelector
Expand Down Expand Up @@ -525,6 +541,7 @@ func GetRuntimeInfo(client client.Client, name, namespace string) (runtimeInfo R

if runtimeInfo != nil {
runtimeInfo.SetClient(client)
runtimeInfo.SetNamespacedNameHashAliasWithDataset(dataset)
}
return runtimeInfo, err
}
Expand Down
Loading

0 comments on commit cba449f

Please sign in to comment.