diff --git a/pkg/common/constants.go b/pkg/common/constants.go index e715dd5e888..5b40c31a1e5 100644 --- a/pkg/common/constants.go +++ b/pkg/common/constants.go @@ -225,3 +225,8 @@ const ( SkipPrecheckAnnotationKey = "sidecar.fluid.io/skip-precheck" HostMountPathModeOnDefaultPlatformKey = "default.fuse-sidecar.fluid.io/host-mount-path-mode" ) + +const ( + // DatasetPolicyAutoCreate indicates that a Dataset should be auto-created for the Runtime. + DatasetPolicyAutoCreate = "auto-create" +) diff --git a/pkg/common/label.go b/pkg/common/label.go index f806d9489b4..1fa5b78f834 100644 --- a/pkg/common/label.go +++ b/pkg/common/label.go @@ -77,6 +77,10 @@ const ( // "Sidecar": for only sidecar to skip check mount ready, AnnotationSkipCheckMountReadyTarget = LabelAnnotationPrefix + "skip-check-mount-ready-target" + // AnnotationDatasetPolicy is a runtime annotation that controls how Dataset is handled. + // i.e. fluid.io/dataset-policy + AnnotationDatasetPolicy = LabelAnnotationPrefix + "dataset-policy" + // AnnotationDisableRuntimeHelmValueConfig is a runtime label indicates the configmap contains helm value will not be created in setup. AnnotationDisableRuntimeHelmValueConfig = "runtime." + LabelAnnotationPrefix + "disable-helm-value-config" diff --git a/pkg/controllers/runtime_controller.go b/pkg/controllers/runtime_controller.go index 887176200aa..98802df1e4f 100644 --- a/pkg/controllers/runtime_controller.go +++ b/pkg/controllers/runtime_controller.go @@ -23,6 +23,7 @@ import ( "strings" "time" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/validation" "k8s.io/apimachinery/pkg/util/validation/field" @@ -110,14 +111,27 @@ func (r *RuntimeReconciler) ReconcileInternal(ctx cruntime.ReconcileRequestConte return utils.RequeueIfError(err) } + var datasetPolicy string + if annotations := objectMeta.GetAnnotations(); annotations != nil { + datasetPolicy = annotations[common.AnnotationDatasetPolicy] + } + // 5.Get the dataset dataset, err := r.GetDataset(ctx) if err != nil { - // r.Recorder.Eventf(ctx.Dataset, corev1.EventTypeWarning, common.ErrorProcessRuntimeReason, "Process Runtime error %v", err) if utils.IgnoreNotFound(err) == nil { - ctx.Log.Info("The dataset is not found", "dataset", ctx.NamespacedName) - dataset = nil - // return ctrl.Result{}, nil + if datasetPolicy == common.DatasetPolicyAutoCreate { + ctx.Log.Info("The dataset is not found, auto-creating according to policy", "dataset", ctx.NamespacedName) + dataset, err = r.ensureDatasetForRuntime(ctx, objectMeta) + if err != nil { + ctx.Log.Error(err, "Failed to auto-create the dataset") + r.Recorder.Eventf(runtime, corev1.EventTypeWarning, common.ErrorCreateDataset, "Failed to auto-create dataset: %v", err) + return utils.RequeueAfterInterval(5 * time.Second) + } + } else { + ctx.Log.Info("The dataset is not found", "dataset", ctx.NamespacedName) + dataset = nil + } } else { ctx.Log.Error(err, "Failed to get the ddc dataset") return utils.RequeueIfError(errors.Wrap(err, "Unable to get dataset")) @@ -172,7 +186,11 @@ func (r *RuntimeReconciler) ReconcileInternal(ctx cruntime.ReconcileRequestConte } } else { // If dataset is nil, need to wait because the user may have not created dataset - ctx.Log.Info("No dataset can be bound to the runtime, waiting.") + if datasetPolicy == common.DatasetPolicyAutoCreate { + ctx.Log.Info("No dataset is available for the runtime after auto-create, waiting.", "dataset", ctx.NamespacedName) + } else { + ctx.Log.Info("No dataset can be bound to the runtime, waiting.") + } r.Recorder.Event(runtime, corev1.EventTypeWarning, common.ErrorProcessRuntimeReason, "No dataset can be bound to the runtime, waiting.") return utils.RequeueAfterInterval(time.Duration(5 * time.Second)) } @@ -380,6 +398,56 @@ func (r *RuntimeReconciler) GetDataset(ctx cruntime.ReconcileRequestContext) (*d return &dataset, nil } +func (r *RuntimeReconciler) ensureDatasetForRuntime(ctx cruntime.ReconcileRequestContext, objectMeta metav1.Object) (*datav1alpha1.Dataset, error) { + runtime := ctx.Runtime + if runtime == nil { + return nil, fmt.Errorf("runtime is nil") + } + + isController := true + blockOwnerDeletion := true + gvk := runtime.GetObjectKind().GroupVersionKind() + + dataset := &datav1alpha1.Dataset{ + TypeMeta: metav1.TypeMeta{ + Kind: datav1alpha1.Datasetkind, + APIVersion: datav1alpha1.GroupVersion.Group + "/" + datav1alpha1.GroupVersion.Version, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: objectMeta.GetName(), + Namespace: objectMeta.GetNamespace(), + OwnerReferences: []metav1.OwnerReference{{ + APIVersion: gvk.GroupVersion().String(), + Kind: gvk.Kind, + Name: objectMeta.GetName(), + UID: objectMeta.GetUID(), + Controller: &isController, + BlockOwnerDeletion: &blockOwnerDeletion, + }}, + }, + } + + annotations := make(map[string]string) + for k, v := range objectMeta.GetAnnotations() { + annotations[k] = v + } + annotations[common.AnnotationDatasetPolicy] = common.DatasetPolicyAutoCreate + dataset.ObjectMeta.Annotations = annotations + + err := r.Create(ctx, dataset) + if err != nil && !apierrors.IsAlreadyExists(err) { + return nil, err + } + + if err := r.Get(ctx, ctx.NamespacedName, dataset); err != nil { + return nil, err + } + + r.Recorder.Eventf(runtime, corev1.EventTypeNormal, common.Succeed, "Auto-created Dataset %s for Runtime", dataset.Name) + + return dataset, nil +} + func (r *RuntimeReconciler) CheckIfReferenceDatasetIsSupported(ctx cruntime.ReconcileRequestContext) (bool, string) { mounted := base.GetPhysicalDatasetFromMounts(ctx.Dataset.Spec.Mounts)