Skip to content

Commit

Permalink
Enhancement: add hashCache for namespacedName related labels
Browse files Browse the repository at this point in the history
Signed-off-by: jiuyu <guotongyu.gty@alibaba-inc.com>
  • Loading branch information
jiuyu committed Jan 13, 2025
1 parent 58ce2ea commit 0709682
Show file tree
Hide file tree
Showing 86 changed files with 719 additions and 518 deletions.
2 changes: 2 additions & 0 deletions pkg/common/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ const (
// i.e. fluid.io/managed-by
LabelAnnotationManagedBy = LabelAnnotationPrefix + "managed-by"

LabelAnnotationNamespacedNameHashAlias = LabelAnnotationDataset + ".namespaced-name-alias"

// fluid adminssion webhook inject flag
// i.e. fluid.io/enable-injection
EnableFluidInjectionFlag = LabelAnnotationPrefix + "enable-injection"
Expand Down
49 changes: 48 additions & 1 deletion pkg/controllers/runtime_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func NewRuntimeReconciler(reconciler RuntimeReconcilerInterface, client client.C

// ReconcileInternal handles the logic of reconcile runtime
func (r *RuntimeReconciler) ReconcileInternal(ctx cruntime.ReconcileRequestContext) (ctrl.Result, error) {

// 0. Set context time limit
ctxWithTimeout, cancel := context.WithTimeout(ctx.Context, reconcileTimeout)
defer cancel()
Expand Down Expand Up @@ -139,6 +138,11 @@ func (r *RuntimeReconciler) ReconcileInternal(ctx cruntime.ReconcileRequestConte
if !utils.ContainsOwners(objectMeta.GetOwnerReferences(), dataset) {
return r.AddOwnerAndRequeue(ctx, dataset)
}

if err := r.AddNamespacedNameHashAliasLabelOndemand(ctx, dataset); err != nil {
return utils.RequeueIfError(err)
}

if !dataset.CanbeBound(ctx.Name, ctx.Namespace, ctx.Category) {
ctx.Log.Info("the dataset can't be bound to the runtime, because it's already bound to another runtime ",
"dataset", dataset.Name)
Expand Down Expand Up @@ -340,6 +344,49 @@ func (r *RuntimeReconciler) AddOwnerAndRequeue(ctx cruntime.ReconcileRequestCont
return utils.RequeueImmediately()
}

// AddNamespacedNameHashAliasLabelOndemand add namesapcedNameHashAlias label on demand
func (r *RuntimeReconciler) AddNamespacedNameHashAliasLabelOndemand(ctx cruntime.ReconcileRequestContext, dataset *datav1alpha1.Dataset) error {
if dataset.GetLabels()[common.LabelAnnotationNamespacedNameHashAlias] != "" {
// hashAlias has been added before
return nil
}

return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
dataset, err := r.implement.GetDataset(ctx)
if err != nil {
return err
}
datasetLabels := dataset.GetLabels()
var namespacedNameHashAlias string

// 1. check if the length of namespacedNameHashAlias is longer than 63, which needs to calculate hash alias deterministically
if len(fmt.Sprintf("%s-%s", dataset.Namespace, dataset.Name)) > 63 {
namespacedNameHashAlias = utils.GetNamespacedNameValueWithPrefix("", dataset.Namespace, dataset.Name, "")
} else {
// 2. the length of prefixed namespaced values may longer than 63 and the hash alias has been calculated and stored in cache
found := false
namespacedNameHashAlias, found = utils.GetNamespacedNameHashValueByKey(fmt.Sprintf("%s-%s", dataset.Namespace, dataset.Name))
if !found {
return nil
}
}

if namespacedNameHashAlias == "" {
return nil
}

datasetLabels[common.LabelAnnotationNamespacedNameHashAlias] = namespacedNameHashAlias
datasetToUpdate := dataset.DeepCopy()
datasetToUpdate.Labels = datasetLabels
if err := r.Update(ctx, datasetToUpdate); err != nil {
ctx.Log.Error(err, "Failed to add NamespacedNameHashAliasLabel", "StatusUpdateError", ctx)
return err
}
utils.RemoveNamespacedNameHashValueByKey(fmt.Sprintf("%s-%s", dataset.Namespace, dataset.Name))
return nil
})
}

// GetRuntimeObjectMeta gets runtime object meta
func (r *RuntimeReconciler) GetRuntimeObjectMeta(ctx cruntime.ReconcileRequestContext) (objectMeta metav1.Object, err error) {
objectMetaAccessor, isOM := ctx.Runtime.(metav1.ObjectMetaAccessor)
Expand Down
8 changes: 6 additions & 2 deletions pkg/csi/plugins/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
// 4. remove label on node
// Once the label is removed, fuse pod on corresponding node will be terminated
// since node selector in the fuse daemonSet no longer matches.
fuseLabelKey := utils.GetFuseLabelName(namespace, name)
fuseLabelKey := utils.GetFuseLabelName(namespace, name, runtimeInfo.GetNamespacedNameAlias())
var labelsToModify common.LabelsToModify
labelsToModify.Delete(fuseLabelKey)

Expand Down Expand Up @@ -400,7 +400,11 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
}

// 4. Label node to launch FUSE Pod
fuseLabelKey := utils.GetFuseLabelName(namespace, name)
runtimeInfo, err := base.GetRuntimeInfo(ns.client, name, namespace)
if err != nil {
return nil, errors.Wrapf(err, "NodeStageVolume: failed to get runtime info for %s/%s", namespace, name)
}
fuseLabelKey := utils.GetFuseLabelName(namespace, name, runtimeInfo.GetNamespacedNameAlias())
var labelsToModify common.LabelsToModify
labelsToModify.Add(fuseLabelKey, "true")

Expand Down
4 changes: 2 additions & 2 deletions pkg/ctrl/fuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (e *Helper) CheckFuseHealthy(recorder record.EventRecorder, runtime base.Ru
func (e *Helper) CleanUpFuse() (count int, err error) {
var (
nodeList = &corev1.NodeList{}
fuseLabelKey = utils.GetFuseLabelName(e.runtimeInfo.GetNamespace(), e.runtimeInfo.GetName())
fuseLabelKey = utils.GetFuseLabelName(e.runtimeInfo.GetNamespace(), e.runtimeInfo.GetName(), e.runtimeInfo.GetNamespacedNameAlias())
)

labelNames := []string{fuseLabelKey}
Expand Down Expand Up @@ -153,7 +153,7 @@ func (e *Helper) CleanUpFuse() (count int, err error) {
func (e *Helper) GetFuseNodes() (nodes []corev1.Node, err error) {
var (
nodeList = &corev1.NodeList{}
fuseLabelKey = utils.GetFuseLabelName(e.runtimeInfo.GetNamespace(), e.runtimeInfo.GetName())
fuseLabelKey = utils.GetFuseLabelName(e.runtimeInfo.GetNamespace(), e.runtimeInfo.GetName(), e.runtimeInfo.GetNamespacedNameAlias())
)

labelNames := []string{fuseLabelKey}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddc/alluxio/deprecated_label.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (e *AlluxioEngine) HasDeprecatedCommonLabelname() (deprecated bool, err err
nodeSelectors := workers.Spec.Template.Spec.NodeSelector
e.Log.Info("The current node selectors for worker", "workerName", workerName, "nodeSelector", nodeSelectors)

deprecatedCommonLabelName := utils.GetCommonLabelName(true, e.namespace, e.name)
deprecatedCommonLabelName := utils.GetCommonLabelName(true, e.namespace, e.name, e.runtimeInfo.GetNamespacedNameAlias())
if _, deprecated = nodeSelectors[deprecatedCommonLabelName]; deprecated {
//
e.Log.Info("the deprecated node selector exists", "nodeselector", deprecatedCommonLabelName)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddc/alluxio/deprecated_label_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestAlluxioEngine_GetDeprecatedCommonLabelname(t *testing.T) {
},
}
for _, test := range testCases {
out := utils.GetCommonLabelName(true, test.namespace, test.name)
out := utils.GetCommonLabelName(true, test.namespace, test.name, "")
if out != test.out {
t.Errorf("input parameter is %s-%s,expected %s, got %s", test.namespace, test.name, test.out, out)
}
Expand Down
18 changes: 16 additions & 2 deletions pkg/ddc/alluxio/master_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package alluxio

import (
"fmt"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
"testing"

"github.com/brahma-adshonor/gohook"
Expand Down Expand Up @@ -85,6 +86,11 @@ func TestSetupMasterInternal(t *testing.T) {
}
client := fake.NewFakeClientWithScheme(testScheme, testObjs...)

runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", "alluxio")
if err != nil {
t.Errorf("fail to create the runtimeInfo with error %v", err)
}

engine := AlluxioEngine{
name: "hbase",
namespace: "fluid",
Expand All @@ -100,8 +106,10 @@ func TestSetupMasterInternal(t *testing.T) {
},
},
},
runtimeInfo: runtimeInfo,
}
err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts)

err = portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 100}, "bitmap", GetReservedPorts)
if err != nil {
t.Fatal(err.Error())
}
Expand Down Expand Up @@ -195,6 +203,11 @@ func TestGenerateAlluxioValueFile(t *testing.T) {

client := fake.NewFakeClientWithScheme(testScheme, testObjs...)

runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", "alluxio")
if err != nil {
t.Errorf("fail to create the runtimeInfo with error %v", err)
}

engine := AlluxioEngine{
name: "hbase",
namespace: "fluid",
Expand All @@ -210,9 +223,10 @@ func TestGenerateAlluxioValueFile(t *testing.T) {
},
},
},
runtimeInfo: runtimeInfo,
}

err := portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, "bitmap", GetReservedPorts)
err = portallocator.SetupRuntimePortAllocator(client, &net.PortRange{Base: 10, Size: 50}, "bitmap", GetReservedPorts)
if err != nil {
t.Fatal(err.Error())
}
Expand Down
33 changes: 17 additions & 16 deletions pkg/ddc/alluxio/runtime_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,23 @@ func (e *AlluxioEngine) getRuntimeInfo() (base.RuntimeInfoInterface, error) {
e.runtimeInfo.SetFuseNodeSelector(runtime.Spec.Fuse.NodeSelector)

if !e.UnitTest {
// Setup with Dataset Info
dataset, err := utils.GetDataset(e.Client, e.name, e.namespace)
if err != nil {
if utils.IgnoreNotFound(err) == nil {
e.Log.Info("Dataset is notfound", "name", e.name, "namespace", e.namespace)
return e.runtimeInfo, nil
}

e.Log.Info("Failed to get dataset when getRuntimeInfo")
return e.runtimeInfo, err
}

e.runtimeInfo.SetupWithDataset(dataset)
e.Log.Info("Setup with dataset done", "exclusive", e.runtimeInfo.IsExclusive())

e.runtimeInfo.SetNamespacedNameHashAliasWithDataset(dataset)

// Check if the runtime is using deprecated labels
isLabelDeprecated, err := e.HasDeprecatedCommonLabelname()
if err != nil {
Expand All @@ -58,22 +75,6 @@ func (e *AlluxioEngine) getRuntimeInfo() (base.RuntimeInfoInterface, error) {
e.runtimeInfo.SetDeprecatedPVName(isPVNameDeprecated)

e.Log.Info("Deprecation check finished", "isLabelDeprecated", e.runtimeInfo.IsDeprecatedNodeLabel(), "isPVNameDeprecated", e.runtimeInfo.IsDeprecatedPVName())

// Setup with Dataset Info
dataset, err := utils.GetDataset(e.Client, e.name, e.namespace)
if err != nil {
if utils.IgnoreNotFound(err) == nil {
e.Log.Info("Dataset is notfound", "name", e.name, "namespace", e.namespace)
return e.runtimeInfo, nil
}

e.Log.Info("Failed to get dataset when getRuntimeInfo")
return e.runtimeInfo, err
}

e.runtimeInfo.SetupWithDataset(dataset)

e.Log.Info("Setup with dataset done", "exclusive", e.runtimeInfo.IsExclusive())
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ddc/alluxio/shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (e *AlluxioEngine) destroyWorkers(expectedWorkers int32) (currentWorkers in
labelsToModify.Delete(label)
}

exclusiveLabelValue := utils.GetExclusiveValue(e.namespace, e.name)
exclusiveLabelValue := runtimeInfo.GetExclusiveLabelValue()
if val, exist := toUpdate.Labels[labelExclusiveName]; exist && val == exclusiveLabelValue {
labelsToModify.Delete(labelExclusiveName)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/ddc/alluxio/shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ func TestDestroyWorker(t *testing.T) {
}

if len(newNode.Labels) != len(test.wantedNodeLabels[node.Name]) {
t.Log(newNode.Labels)
t.Log(test.wantedNodeLabels[node.Name])
t.Errorf("fail to decrease the labels")
}
if len(newNode.Labels) != 0 && !reflect.DeepEqual(newNode.Labels, test.wantedNodeLabels[node.Name]) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddc/alluxio/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (e *AlluxioEngine) transform(runtime *datav1alpha1.AlluxioRuntime) (value *
}

value.FullnameOverride = e.name
value.FullNamespacedNameOverride = utils.TransferFullNamespacedNameWithPrefixToLegalValue("", e.namespace, e.name)
value.FullNamespacedNameOverride = utils.GetNamespacedNameValueWithPrefix("", e.namespace, e.name, e.runtimeInfo.GetNamespacedNameAlias())

// 1.transform the common part
err = e.transformCommonPart(runtime, dataset, value)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddc/alluxio/transform_fuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (e *AlluxioEngine) transformFuse(runtime *datav1alpha1.AlluxioRuntime, data
} else {
value.Fuse.NodeSelector = map[string]string{}
}
value.Fuse.NodeSelector[utils.GetFuseLabelName(runtime.Namespace, runtime.Name)] = "true"
value.Fuse.NodeSelector[utils.GetFuseLabelName(runtime.Namespace, runtime.Name, e.runtimeInfo.GetNamespacedNameAlias())] = "true"

// parse fuse container network mode
value.Fuse.HostNetwork = datav1alpha1.IsHostNetwork(runtime.Spec.Fuse.NetworkMode)
Expand Down
42 changes: 33 additions & 9 deletions pkg/ddc/alluxio/transform_fuse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package alluxio

import (
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
"reflect"
"testing"

Expand Down Expand Up @@ -62,11 +63,18 @@ func TestTransformFuseWithNoArgs(t *testing.T) {
}}, &Alluxio{}, []string{"fuse", "--fuse-opts=kernel_cache,rw,allow_other", "/alluxio/default/test/alluxio-fuse", "/"}, false},
}
for _, test := range tests {
runtimeInfo, err := base.BuildRuntimeInfo("test", "default", "alluxio")
if err != nil {
t.Errorf("fail to create the runtimeInfo with error %v", err)
}
engine := &AlluxioEngine{
name: "test",
namespace: "default",
Log: fake.NullLogger()}
err := engine.transformFuse(test.runtime, test.dataset, test.alluxioValue)
name: "test",
namespace: "default",
Log: fake.NullLogger(),
runtimeInfo: runtimeInfo,
Client: fake.NewFakeClientWithScheme(testScheme),
}
err = engine.transformFuse(test.runtime, test.dataset, test.alluxioValue)
if err != nil {
t.Errorf("Got err %v", err)
}
Expand Down Expand Up @@ -153,11 +161,18 @@ func TestTransformFuseWithArgs(t *testing.T) {
}}, &Alluxio{}, []string{"fuse", "--fuse-opts=kernel_cache,allow_other", "/alluxio/default/test/alluxio-fuse", "/"}, false},
}
for _, test := range tests {
runtimeInfo, err := base.BuildRuntimeInfo("test", "fluid", "alluxio")
if err != nil {
t.Errorf("fail to create the runtimeInfo with error %v", err)
}
engine := &AlluxioEngine{
name: "test",
namespace: "default",
Log: fake.NullLogger()}
err := engine.transformFuse(test.runtime, test.dataset, test.alluxioValue)
name: "test",
namespace: "default",
Log: fake.NullLogger(),
runtimeInfo: runtimeInfo,
Client: fake.NewFakeClientWithScheme(testScheme),
}
err = engine.transformFuse(test.runtime, test.dataset, test.alluxioValue)
if err != nil {
t.Errorf("Got err %v", err)
}
Expand Down Expand Up @@ -230,7 +245,16 @@ func TestTransformFuseWithNetwork(t *testing.T) {
},
}

engine := &AlluxioEngine{Log: fake.NullLogger()}
runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", "alluxio")
if err != nil {
t.Errorf("fail to create the runtimeInfo with error %v", err)
}

engine := &AlluxioEngine{
Log: fake.NullLogger(),
runtimeInfo: runtimeInfo,
Client: fake.NewFakeClientWithScheme(testScheme),
}
ds := &datav1alpha1.Dataset{}
for k, v := range testCases {
gotValue := &Alluxio{}
Expand Down
22 changes: 19 additions & 3 deletions pkg/ddc/alluxio/transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,16 @@ func TestTransformFuse(t *testing.T) {
}, &Alluxio{}, []string{"fuse", "--fuse-opts=kernel_cache,rw,uid=1000,gid=1000,allow_other"}},
}
for _, test := range tests {
engine := &AlluxioEngine{}
runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", "alluxio")
if err != nil {
t.Errorf("fail to create the runtimeInfo with error %v", err)
}
engine := &AlluxioEngine{
runtimeInfo: runtimeInfo,
Client: fake.NewFakeClientWithScheme(testScheme),
}
engine.Log = ctrl.Log
err := engine.transformFuse(test.runtime, test.dataset, test.value)
err = engine.transformFuse(test.runtime, test.dataset, test.value)
if err != nil {
t.Errorf("error %v", err)
}
Expand Down Expand Up @@ -1012,7 +1019,16 @@ func TestTransformWorkerProperties(t *testing.T) {
}

func TestTransformFuseProperties(t *testing.T) {
engine := &AlluxioEngine{Log: fake.NullLogger()}
runtimeInfo, err := base.BuildRuntimeInfo("test", "fluid", "alluxio")
if err != nil {
t.Errorf("fail to create the runtimeInfo with error %v", err)
}

engine := &AlluxioEngine{
Log: fake.NullLogger(),
Client: fake.NewFakeClientWithScheme(testScheme),
runtimeInfo: runtimeInfo,
}
var x int64 = 1000
ctrl.SetLogger(zap.New(func(o *zap.Options) {
o.Development = true
Expand Down
Loading

0 comments on commit 0709682

Please sign in to comment.