Skip to content

Commit

Permalink
juicefs: make juicefsruntime.spec.fuse configurale (#3539)
Browse files Browse the repository at this point in the history
Signed-off-by: zwwhdls <zww@hdls.me>
  • Loading branch information
zwwhdls authored Nov 15, 2023
1 parent d4e2847 commit 95f46e3
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 19 deletions.
103 changes: 84 additions & 19 deletions pkg/ddc/juicefs/sync_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,37 +189,102 @@ func (j *JuiceFSEngine) syncWorkerSpec(ctx cruntime.ReconcileRequestContext, run

func (j *JuiceFSEngine) syncFuseSpec(ctx cruntime.ReconcileRequestContext, runtime *datav1alpha1.JuiceFSRuntime, value *JuiceFS) (changed bool, err error) {
j.Log.V(1).Info("syncFuseSpec")
var cmdChanged bool
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
fuses, err := kubeclient.GetDaemonset(j.Client, j.getFuseDaemonsetName(), j.namespace)
if err != nil {
return err
}

fusesToUpdate := fuses.DeepCopy()

// nodeSelector
if nodeSelectorChanged, newSelector := j.isNodeSelectorChanged(fusesToUpdate.Spec.Template.Spec.NodeSelector, value.Fuse.NodeSelector); nodeSelectorChanged {
fusesToUpdate.Spec.Template.Spec.NodeSelector = newSelector
changed = true
}

// volumes
if volumeChanged, newVolumes := j.isVolumesChanged(fusesToUpdate.Spec.Template.Spec.Volumes, value.Fuse.Volumes); volumeChanged {
fusesToUpdate.Spec.Template.Spec.Volumes = newVolumes
changed = true
}

// labels
if labelChanged, newLabels := j.isLabelsChanged(fusesToUpdate.Spec.Template.ObjectMeta.Labels, value.Fuse.Labels); labelChanged {
fusesToUpdate.Spec.Template.ObjectMeta.Labels = newLabels
changed = true
}

// annotations
if annoChanged, newAnnos := j.isAnnotationsChanged(fusesToUpdate.Spec.Template.ObjectMeta.Annotations, value.Fuse.Annotations); annoChanged {
fusesToUpdate.Spec.Template.ObjectMeta.Annotations = newAnnos
changed = true
}

// options -> configmap
fuseCommand, err := j.getFuseCommand()
if err != nil || fuseCommand == "" {
j.Log.Error(err, "Failed to get fuse command")
return err
}
cmdChanged, _ = j.isCommandChanged(fuseCommand, value.Fuse.Command)

if len(fusesToUpdate.Spec.Template.Spec.Containers) == 1 {
fuseResource := runtime.Spec.Fuse.Resources
if !utils.ResourceRequirementsEqual(fusesToUpdate.Spec.Template.Spec.Containers[0].Resources, fuseResource) {
j.Log.Info("The resource requirement is different.", "fuse ds", fuses.Spec.Template.Spec.Containers[0].Resources, "runtime", fuseResource)
fusesToUpdate.Spec.Template.Spec.Containers[0].Resources = fuseResource
// resource
if resourcesChanged, newResources := j.isResourcesChanged(fusesToUpdate.Spec.Template.Spec.Containers[0].Resources, runtime.Spec.Fuse.Resources); resourcesChanged {
fusesToUpdate.Spec.Template.Spec.Containers[0].Resources = newResources
changed = true
} else {
j.Log.V(1).Info("The resource requirement of fuse is the same, skip")
}

if changed {
if reflect.DeepEqual(fuses, fusesToUpdate) {
changed = false
j.Log.V(1).Info("The resource requirement of fuse is not changed, skip")
return nil
}
j.Log.Info("The resource requirement of fuse is updated")
err = j.Client.Update(context.TODO(), fusesToUpdate)
if err != nil {
j.Log.Error(err, "Failed to update the sts spec")
}
} else {
j.Log.V(1).Info("The resource requirement of fuse is not set, skip")
// env
if envChanged, newEnvs := j.isEnvsChanged(fusesToUpdate.Spec.Template.Spec.Containers[0].Env, value.Fuse.Envs); envChanged {
fusesToUpdate.Spec.Template.Spec.Containers[0].Env = newEnvs
changed = true
}

// volumeMounts
if volumeMountChanged, newVolumeMounts := j.isVolumeMountsChanged(fusesToUpdate.Spec.Template.Spec.Containers[0].VolumeMounts, value.Fuse.VolumeMounts); volumeMountChanged {
fusesToUpdate.Spec.Template.Spec.Containers[0].VolumeMounts = newVolumeMounts
changed = true
}

// image
fuseImage := value.Fuse.Image
if value.ImageTag != "" {
fuseImage = fuseImage + ":" + value.Fuse.ImageTag
}
if imageChanged, newImage := j.isImageChanged(fusesToUpdate.Spec.Template.Spec.Containers[0].Image, fuseImage); imageChanged {
fusesToUpdate.Spec.Template.Spec.Containers[0].Image = newImage
changed = true
}
}

if cmdChanged {
j.Log.Info("The fuse config is updated")
err = j.updateFuseScript(value.Fuse.Command)
if err != nil {
j.Log.Error(err, "Failed to update the ds config")
return err
}
} else {
j.Log.V(1).Info("The fuse config is not changed")
}

if changed {
if reflect.DeepEqual(fuses, fusesToUpdate) {
changed = false
j.Log.V(1).Info("The fuse is not changed, skip")
return nil
}
j.Log.Info("The fuse is updated")

err = j.Client.Update(context.TODO(), fusesToUpdate)
if err != nil {
j.Log.Error(err, "Failed to update the ds spec")
}
} else {
j.Log.V(1).Info("The fuse is not changed")
}

return err
Expand Down
55 changes: 55 additions & 0 deletions pkg/ddc/juicefs/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,29 @@ func (j *JuiceFSEngine) getWorkerCommand() (command string, err error) {
return "", nil
}

func (j *JuiceFSEngine) getFuseCommand() (command string, err error) {
cm, err := kubeclient.GetConfigmapByName(j.Client, j.getFuseScriptName(), j.namespace)
if err != nil {
return "", err
}
if cm == nil {
j.Log.Info("value configMap not found")
return "", nil
}
data := cm.Data
script := data["script.sh"]
scripts := strings.Split(script, "\n")
j.Log.V(1).Info("get fuse script", "script", script)

// mount command is the last one
for i := len(scripts) - 1; i >= 0; i-- {
if scripts[i] != "" {
return scripts[i], nil
}
}
return "", nil
}

func (j JuiceFSEngine) updateWorkerScript(command string) error {
cm, err := kubeclient.GetConfigmapByName(j.Client, j.getWorkerScriptName(), j.namespace)
if err != nil {
Expand Down Expand Up @@ -391,6 +414,38 @@ func (j JuiceFSEngine) updateWorkerScript(command string) error {
return j.Client.Update(context.Background(), cm)
}

func (j JuiceFSEngine) updateFuseScript(command string) error {
cm, err := kubeclient.GetConfigmapByName(j.Client, j.getFuseScriptName(), j.namespace)
if err != nil {
return err
}
if cm == nil {
j.Log.Info("value configMap not found")
return nil
}
data := cm.Data
script := data["script.sh"]

newScript := script
newScripts := strings.Split(newScript, "\n")
// mount command is the last one, replace it
for i := len(newScripts) - 1; i >= 0; i-- {
if newScripts[i] != "" {
newScripts[i] = command
break
}
}

newValues := make(map[string]string)
newValues["script.sh"] = strings.Join(newScripts, "\n")
cm.Data = newValues
return j.Client.Update(context.Background(), cm)
}

func (j *JuiceFSEngine) getWorkerScriptName() string {
return fmt.Sprintf("%s-worker-script", j.name)
}

func (j *JuiceFSEngine) getFuseScriptName() string {
return fmt.Sprintf("%s-fuse-script", j.name)
}

0 comments on commit 95f46e3

Please sign in to comment.