diff --git a/controllers/resources/bentorequest_controller.go b/controllers/resources/bentorequest_controller.go index eea4151..bb7c984 100644 --- a/controllers/resources/bentorequest_controller.go +++ b/controllers/resources/bentorequest_controller.go @@ -18,7 +18,6 @@ package resources import ( "context" - "path" // nolint: gosec "crypto/md5" @@ -218,19 +217,6 @@ func (r *BentoRequestReconciler) Reconcile(ctx context.Context, req ctrl.Request } } - separateModels := isSeparateModels(bentoRequest) - - modelsExists := false - var modelsExistsResult ctrl.Result - var modelsExistsErr error - - if separateModels { - bentoRequest, modelsExists, modelsExistsResult, modelsExistsErr = r.ensureModelsExists(ctx, ensureModelsExistsOption{ - bentoRequest: bentoRequest, - req: req, - }) - } - bentoRequest, imageInfo, imageExists, imageExistsResult, err := r.ensureImageExists(ctx, ensureImageExistsOption{ bentoRequest: bentoRequest, req: req, @@ -263,47 +249,6 @@ func (r *BentoRequestReconciler) Reconcile(ctx context.Context, req ctrl.Request // } } - if modelsExistsErr != nil { - err = errors.Wrap(modelsExistsErr, "ensure model exists") - return - } - - if separateModels && modelsExists { - // Delete PVCs for all seeded models - for _, model := range bentoRequest.Spec.Models { - model := model - // Delete model seeder jobs - if err = r.deleteModelSeederJobs(ctx, bentoRequest, &model); err != nil { - r.Recorder.Eventf(bentoRequest, corev1.EventTypeWarning, "DeleteModelSeederJobs", "Failed to delete model seeder jobs: %v", err) - log.FromContext(ctx).Error(err, "Failed to delete model seeder jobs") - // We don't return here to allow the reconciliation to continue - } - - err = r.deleteModelPVC(ctx, bentoRequest, &model) - if err != nil { - r.Recorder.Eventf(bentoRequest, corev1.EventTypeWarning, "DeletePVC", "Failed to delete PVC for model %s: %v", model.Tag, err) - // Log the error but continue with other models - log.FromContext(ctx).Error(err, "Failed to delete PVC", "model", model.Tag) - } - } - } - - if separateModels && !modelsExists { - result = modelsExistsResult - bentoRequest, err = r.setStatusConditions(ctx, req, - metav1.Condition{ - Type: resourcesv1alpha1.BentoRequestConditionTypeBentoAvailable, - Status: metav1.ConditionUnknown, - Reason: "Reconciling", - Message: "Model is seeding", - }, - ) - if err != nil { - return - } - return - } - bentoCR := &resourcesv1alpha1.Bento{ ObjectMeta: metav1.ObjectMeta{ Name: bentoRequest.Name, @@ -319,17 +264,6 @@ func (r *BentoRequestReconciler) Reconcile(ctx context.Context, req ctrl.Request }, } - if separateModels { - bentoCR.Annotations = map[string]string{ - commonconsts.KubeAnnotationYataiImageBuilderSeparateModels: commonconsts.KubeLabelValueTrue, - commonconsts.KubeAnnotationAWSAccessKeySecretName: bentoRequest.Annotations[commonconsts.KubeAnnotationAWSAccessKeySecretName], - } - if isAddNamespacePrefix() { // deprecated - bentoCR.Annotations[commonconsts.KubeAnnotationIsMultiTenancy] = commonconsts.KubeLabelValueTrue - } - bentoCR.Annotations[KubeAnnotationModelStorageNS] = bentoRequest.Annotations[KubeAnnotationModelStorageNS] - } - if isImageStoredInS3(bentoRequest) { if bentoCR.Annotations == nil { bentoCR.Annotations = map[string]string{} @@ -523,11 +457,7 @@ func (r *BentoRequestReconciler) ensureImageExists(ctx context.Context, opt ensu commonconsts.KubeLabelIsBentoImageBuilder: commonconsts.KubeLabelValueTrue, } - if isSeparateModels(opt.bentoRequest) { - jobLabels[KubeLabelYataiImageBuilderSeparateModels] = commonconsts.KubeLabelValueTrue - } else { - jobLabels[KubeLabelYataiImageBuilderSeparateModels] = commonconsts.KubeLabelValueFalse - } + jobLabels[KubeLabelYataiImageBuilderSeparateModels] = commonconsts.KubeLabelValueFalse jobs := &batchv1.JobList{} err = r.List(ctx, jobs, client.InNamespace(req.Namespace), client.MatchingLabels(jobLabels)) @@ -718,275 +648,6 @@ func (r *BentoRequestReconciler) ensureImageExists(ctx context.Context, opt ensu return } -type ensureModelsExistsOption struct { - bentoRequest *resourcesv1alpha1.BentoRequest - req ctrl.Request -} - -func (r *BentoRequestReconciler) ensureModelsExists(ctx context.Context, opt ensureModelsExistsOption) (bentoRequest *resourcesv1alpha1.BentoRequest, modelsExists bool, result ctrl.Result, err error) { // nolint: unparam - bentoRequest = opt.bentoRequest - modelTags := make([]string, 0) - for _, model := range bentoRequest.Spec.Models { - modelTags = append(modelTags, model.Tag) - } - - modelsExistsCondition := meta.FindStatusCondition(bentoRequest.Status.Conditions, resourcesv1alpha1.BentoRequestConditionTypeModelsExists) - r.Recorder.Eventf(bentoRequest, corev1.EventTypeNormal, "SeparateModels", "Separate models are enabled") - if modelsExistsCondition == nil || modelsExistsCondition.Status == metav1.ConditionUnknown { - r.Recorder.Eventf(bentoRequest, corev1.EventTypeNormal, "ModelsExists", "Models are not ready") - modelsExistsCondition = &metav1.Condition{ - Type: resourcesv1alpha1.BentoRequestConditionTypeModelsExists, - Status: metav1.ConditionFalse, - Reason: "Reconciling", - Message: "Models are not ready", - } - bentoRequest, err = r.setStatusConditions(ctx, opt.req, *modelsExistsCondition) - if err != nil { - return - } - } - - modelsExists = modelsExistsCondition != nil && modelsExistsCondition.Status == metav1.ConditionTrue && modelsExistsCondition.Message == fmt.Sprintf("%s:%s", getJuiceFSStorageClassName(), strings.Join(modelTags, ", ")) - if modelsExists { - return - } - - modelsMap := make(map[string]*resourcesv1alpha1.BentoModel) - for _, model := range bentoRequest.Spec.Models { - model := model - modelsMap[model.Tag] = &model - } - - jobLabels := map[string]string{ - commonconsts.KubeLabelBentoRequest: bentoRequest.Name, - commonconsts.KubeLabelIsModelSeeder: "true", - } - - jobs := &batchv1.JobList{} - err = r.List(ctx, jobs, client.InNamespace(bentoRequest.Namespace), client.MatchingLabels(jobLabels)) - if err != nil { - err = errors.Wrap(err, "list jobs") - return - } - - var bentoRequestHashStr string - bentoRequestHashStr, err = r.getHashStr(bentoRequest) - if err != nil { - err = errors.Wrapf(err, "get BentoRequest %s/%s hash string", bentoRequest.Namespace, bentoRequest.Name) - return - } - - existingJobModelTags := make(map[string]struct{}) - for _, job_ := range jobs.Items { - job_ := job_ - - oldHash := job_.Annotations[KubeAnnotationBentoRequestHash] - if oldHash != bentoRequestHashStr { - r.Recorder.Eventf(bentoRequest, corev1.EventTypeNormal, "DeleteJob", "Because hash changed, delete old job %s, oldHash: %s, newHash: %s", job_.Name, oldHash, bentoRequestHashStr) - // --cascade=foreground - err = r.Delete(ctx, &job_, &client.DeleteOptions{ - PropagationPolicy: &[]metav1.DeletionPropagation{metav1.DeletePropagationForeground}[0], - }) - if err != nil { - err = errors.Wrapf(err, "delete job %s", job_.Name) - return - } - // clean all the events status to unknown - bentoRequest, err = r.setStatusConditionsToUnknow(ctx, opt.req, "Bento request hash changed, recreating the jobs") - if err != nil { - return - } - continue - } - - modelTag := fmt.Sprintf("%s:%s", job_.Labels[commonconsts.KubeLabelYataiModelRepository], job_.Labels[commonconsts.KubeLabelYataiModel]) - _, ok := modelsMap[modelTag] - - if !ok { - r.Recorder.Eventf(bentoRequest, corev1.EventTypeNormal, "DeleteJob", "Due to the nonexistence of the model %s, job %s has been deleted.", modelTag, job_.Name) - // --cascade=foreground - err = r.Delete(ctx, &job_, &client.DeleteOptions{ - PropagationPolicy: &[]metav1.DeletionPropagation{metav1.DeletePropagationForeground}[0], - }) - if err != nil { - err = errors.Wrapf(err, "delete job %s", job_.Name) - return - } - } else { - existingJobModelTags[modelTag] = struct{}{} - } - } - - for _, model := range bentoRequest.Spec.Models { - if _, ok := existingJobModelTags[model.Tag]; ok { - continue - } - model := model - pvc := &corev1.PersistentVolumeClaim{} - pvcName := r.getModelPVCName(bentoRequest, &model) - err = r.Get(ctx, client.ObjectKey{ - Namespace: bentoRequest.Namespace, - Name: pvcName, - }, pvc) - isPVCNotFound := k8serrors.IsNotFound(err) - if err != nil && !isPVCNotFound { - err = errors.Wrapf(err, "get PVC %s/%s", bentoRequest.Namespace, pvcName) - return - } - if isPVCNotFound { - pvc = r.generateModelPVC(GenerateModelPVCOption{ - BentoRequest: bentoRequest, - Model: &model, - }) - err = r.Create(ctx, pvc) - isPVCAlreadyExists := k8serrors.IsAlreadyExists(err) - if err != nil && !isPVCAlreadyExists { - err = errors.Wrapf(err, "create model %s/%s pvc", bentoRequest.Namespace, model.Tag) - return - } - } - var job *batchv1.Job - job, err = r.generateModelSeederJob(ctx, GenerateModelSeederJobOption{ - BentoRequest: bentoRequest, - Model: &model, - }) - if err != nil { - err = errors.Wrap(err, "generate model seeder job") - return - } - oldJob := &batchv1.Job{} - err = r.Get(ctx, client.ObjectKeyFromObject(job), oldJob) - oldJobIsNotFound := k8serrors.IsNotFound(err) - if err != nil && !oldJobIsNotFound { - err = errors.Wrap(err, "get job") - return - } - if oldJobIsNotFound { - err = r.Create(ctx, job) - if err != nil { - err = errors.Wrap(err, "create job") - return - } - r.Recorder.Eventf(bentoRequest, corev1.EventTypeNormal, "CreateJob", "Job %s has been created.", job.Name) - } else if !reflect.DeepEqual(job.Labels, oldJob.Labels) || !reflect.DeepEqual(job.Annotations, oldJob.Annotations) { - job.Labels = oldJob.Labels - job.Annotations = oldJob.Annotations - err = r.Update(ctx, job) - if err != nil { - err = errors.Wrap(err, "update job") - return - } - r.Recorder.Eventf(bentoRequest, corev1.EventTypeNormal, "UpdateJob", "Job %s has been updated.", job.Name) - } - } - - jobs = &batchv1.JobList{} - err = r.List(ctx, jobs, client.InNamespace(bentoRequest.Namespace), client.MatchingLabels(jobLabels)) - if err != nil { - err = errors.Wrap(err, "list jobs") - return - } - - succeedModelTags := make(map[string]struct{}) - failedJobNames := make([]string, 0) - notReadyJobNames := make([]string, 0) - for _, job_ := range jobs.Items { - if job_.Spec.Completions != nil && job_.Status.Succeeded == *job_.Spec.Completions { - modelTag := fmt.Sprintf("%s:%s", job_.Labels[commonconsts.KubeLabelYataiModelRepository], job_.Labels[commonconsts.KubeLabelYataiModel]) - succeedModelTags[modelTag] = struct{}{} - continue - } - if job_.Status.Failed > 0 { - failedJobNames = append(failedJobNames, job_.Name) - } - notReadyJobNames = append(notReadyJobNames, job_.Name) - } - - if len(failedJobNames) > 0 { - msg := "Model seeder jobs failed: " + strings.Join(failedJobNames, ", ") - pods := &corev1.PodList{} - err = r.List(ctx, pods, client.InNamespace(bentoRequest.Namespace), client.MatchingLabels(jobLabels)) - if err != nil { - err = errors.Wrap(err, "list pods") - return - } - hfValidateErr := false - for _, pod_ := range pods.Items { - for _, condition := range pod_.Status.Conditions { - if condition.Type == corev1.PodInitialized && condition.Status == corev1.ConditionFalse { - hfValidateErr = true - break - } - } - } - if hfValidateErr { - msg += ": no validate HF_TOKEN for seeding huggingface model" - } - r.Recorder.Event(bentoRequest, corev1.EventTypeNormal, "ModelsExists", msg) - bentoRequest, err = r.setStatusConditions(ctx, opt.req, - metav1.Condition{ - Type: resourcesv1alpha1.BentoRequestConditionTypeModelsExists, - Status: metav1.ConditionFalse, - Reason: "Reconciling", - Message: msg, - }, - metav1.Condition{ - Type: resourcesv1alpha1.BentoRequestConditionTypeBentoAvailable, - Status: metav1.ConditionFalse, - Reason: "Reconciling", - Message: msg, - }, - ) - if err != nil { - return - } - err = errors.New(msg) - return - } - - modelsExists = true - - for _, model := range bentoRequest.Spec.Models { - if _, ok := succeedModelTags[model.Tag]; !ok { - modelsExists = false - break - } - } - - if modelsExists { - bentoRequest, err = r.setStatusConditions(ctx, opt.req, - metav1.Condition{ - Type: resourcesv1alpha1.BentoRequestConditionTypeModelsExists, - Status: metav1.ConditionTrue, - Reason: "Reconciling", - Message: fmt.Sprintf("%s:%s", getJuiceFSStorageClassName(), strings.Join(modelTags, ", ")), - }, - metav1.Condition{ - Type: resourcesv1alpha1.BentoRequestConditionTypeModelsSeeding, - Status: metav1.ConditionFalse, - Reason: "Reconciling", - Message: "All models have been seeded.", - }, - ) - if err != nil { - return - } - } else { - bentoRequest, err = r.setStatusConditions(ctx, opt.req, - metav1.Condition{ - Type: resourcesv1alpha1.BentoRequestConditionTypeModelsSeeding, - Status: metav1.ConditionTrue, - Reason: "Reconciling", - Message: fmt.Sprintf("Model seeder jobs are not ready: %s.", strings.Join(notReadyJobNames, ", ")), - }, - ) - if err != nil { - return - } - } - return -} - func (r *BentoRequestReconciler) deleteImageBuilderJobs(ctx context.Context, bentoRequest *resourcesv1alpha1.BentoRequest) error { // nolint:unused jobLabels := r.getImageBuilderJobLabels(bentoRequest) @@ -1006,48 +667,6 @@ func (r *BentoRequestReconciler) deleteImageBuilderJobs(ctx context.Context, ben return nil } -func (r *BentoRequestReconciler) deleteModelSeederJobs(ctx context.Context, bentoRequest *resourcesv1alpha1.BentoRequest, model *resourcesv1alpha1.BentoModel) error { - jobLabels := r.getModelSeederJobLabels(bentoRequest, model) - - jobs := &batchv1.JobList{} - if err := r.List(ctx, jobs, client.InNamespace(bentoRequest.Namespace), client.MatchingLabels(jobLabels)); err != nil { - return errors.Wrap(err, "list model seeder jobs") - } - - for _, job := range jobs.Items { - job := job - if err := r.Delete(ctx, &job, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil { - return errors.Wrapf(err, "delete model seeder job %s", job.Name) - } - r.Recorder.Eventf(bentoRequest, corev1.EventTypeNormal, "DeleteModelSeederJob", "Deleted model seeder job %s", job.Name) - } - - return nil -} - -// deleteModelPVC deletes the PVC associated with a seeded model -func (r *BentoRequestReconciler) deleteModelPVC(ctx context.Context, bentoRequest *resourcesv1alpha1.BentoRequest, model *resourcesv1alpha1.BentoModel) error { - pvcName := r.getModelPVCName(bentoRequest, model) - pvc := &corev1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Name: pvcName, - Namespace: bentoRequest.Namespace, - }, - } - - err := r.Delete(ctx, pvc) - if err != nil { - if k8serrors.IsNotFound(err) { - // PVC already deleted, ignore the error - return nil - } - return errors.Wrapf(err, "failed to delete PVC %s", pvcName) - } - - r.Recorder.Eventf(bentoRequest, corev1.EventTypeNormal, "DeletePVC", "Successfully deleted PVC %s for model %s", pvcName, model.Tag) - return nil -} - func (r *BentoRequestReconciler) setStatusConditionsToUnknow(ctx context.Context, req ctrl.Request, msg string) (bentoRequest *resourcesv1alpha1.BentoRequest, err error) { bentoRequest = &resourcesv1alpha1.BentoRequest{} /* @@ -1432,20 +1051,6 @@ func getBentoImagePrefix(bentoRequest *resourcesv1alpha1.BentoRequest) string { return "" } -func getModelNamespace(bentoRequest *resourcesv1alpha1.BentoRequest) string { - if bentoRequest == nil { - return "" - } - prefix := bentoRequest.Annotations[KubeAnnotationModelStorageNS] - if prefix != "" { - return prefix - } - if isAddNamespacePrefix() { - return bentoRequest.Namespace - } - return "" -} - func getBentoImageName(bentoRequest *resourcesv1alpha1.BentoRequest, dockerRegistry modelschemas.DockerRegistrySchema, bentoRepositoryName, bentoVersion string, inCluster bool) string { if bentoRequest != nil && bentoRequest.Spec.Image != "" { return bentoRequest.Spec.Image @@ -1457,10 +1062,6 @@ func getBentoImageName(bentoRequest *resourcesv1alpha1.BentoRequest, dockerRegis uri = dockerRegistry.BentosRepositoryURI } tail := fmt.Sprintf("%s.%s", bentoRepositoryName, bentoVersion) - separateModels := isSeparateModels(bentoRequest) - if separateModels { - tail += ".nomodels" - } if isEstargzEnabled() { tail += ".esgz" } @@ -1480,10 +1081,6 @@ func getBentoImageName(bentoRequest *resourcesv1alpha1.BentoRequest, dockerRegis return fmt.Sprintf("%s:%s", uri, tag) } -func isSeparateModels(bentoRequest *resourcesv1alpha1.BentoRequest) (separateModels bool) { - return bentoRequest.Annotations[commonconsts.KubeAnnotationYataiImageBuilderSeparateModels] == commonconsts.KubeLabelValueTrue -} - func isImageStoredInS3(bentoRequest *resourcesv1alpha1.BentoRequest) (storedInS3 bool) { return bentoRequest.Annotations[commonconsts.KubeAnnotationImageStoredInS3] == commonconsts.KubeLabelValueTrue } @@ -1678,38 +1275,6 @@ func (r *BentoRequestReconciler) getImageBuilderJobName() string { return "yatai-bento-image-builder-" + guid.String() } -func (r *BentoRequestReconciler) getModelSeederJobName() string { - guid := xid.New() - return "yatai-model-seeder-" + guid.String() -} - -func (r *BentoRequestReconciler) getModelSeederJobLabels(bentoRequest *resourcesv1alpha1.BentoRequest, model *resourcesv1alpha1.BentoModel) map[string]string { - bentoRepositoryName, _, bentoVersion := xstrings.Partition(bentoRequest.Spec.BentoTag, ":") - modelRepositoryName, _, modelVersion := xstrings.Partition(model.Tag, ":") - return map[string]string{ - commonconsts.KubeLabelBentoRequest: bentoRequest.Name, - commonconsts.KubeLabelIsModelSeeder: "true", - commonconsts.KubeLabelYataiModelRepository: modelRepositoryName, - commonconsts.KubeLabelYataiModel: modelVersion, - commonconsts.KubeLabelYataiBentoRepository: bentoRepositoryName, - commonconsts.KubeLabelYataiBento: bentoVersion, - } -} - -func (r *BentoRequestReconciler) getModelSeederPodLabels(bentoRequest *resourcesv1alpha1.BentoRequest, model *resourcesv1alpha1.BentoModel) map[string]string { - bentoRepositoryName, _, bentoVersion := xstrings.Partition(bentoRequest.Spec.BentoTag, ":") - modelRepositoryName, _, modelVersion := xstrings.Partition(model.Tag, ":") - return map[string]string{ - commonconsts.KubeLabelBentoRequest: bentoRequest.Name, - commonconsts.KubeLabelIsModelSeeder: "true", - commonconsts.KubeLabelIsBentoImageBuilder: "true", - commonconsts.KubeLabelYataiModelRepository: modelRepositoryName, - commonconsts.KubeLabelYataiModel: modelVersion, - commonconsts.KubeLabelYataiBentoRepository: bentoRepositoryName, - commonconsts.KubeLabelYataiBento: bentoVersion, - } -} - func (r *BentoRequestReconciler) getImageBuilderJobLabels(bentoRequest *resourcesv1alpha1.BentoRequest) map[string]string { bentoRepositoryName, _, bentoVersion := xstrings.Partition(bentoRequest.Spec.BentoTag, ":") labels := map[string]string{ @@ -1719,11 +1284,7 @@ func (r *BentoRequestReconciler) getImageBuilderJobLabels(bentoRequest *resource commonconsts.KubeLabelYataiBento: bentoVersion, } - if isSeparateModels(bentoRequest) { - labels[KubeLabelYataiImageBuilderSeparateModels] = commonconsts.KubeLabelValueTrue - } else { - labels[KubeLabelYataiImageBuilderSeparateModels] = commonconsts.KubeLabelValueFalse - } + labels[KubeLabelYataiImageBuilderSeparateModels] = commonconsts.KubeLabelValueFalse return labels } @@ -1744,496 +1305,6 @@ func hash(text string) string { return hex.EncodeToString(hasher.Sum(nil)) } -func (r *BentoRequestReconciler) getModelPVCName(bentoRequest *resourcesv1alpha1.BentoRequest, model *resourcesv1alpha1.BentoModel) string { - storageClassName := getJuiceFSStorageClassName() - var hashStr string - ns := getModelNamespace(bentoRequest) - if ns == "" { - hashStr = hash(fmt.Sprintf("%s:%s", storageClassName, model.Tag)) - } else { - hashStr = hash(fmt.Sprintf("%s:%s:%s", storageClassName, ns, model.Tag)) - } - pvcName := "model-seeder-" + hashStr - if len(pvcName) > 63 { - pvcName = pvcName[:63] - } - return pvcName -} - -func (r *BentoRequestReconciler) getJuiceFSModelPath(bentoRequest *resourcesv1alpha1.BentoRequest, model *resourcesv1alpha1.BentoModel) string { - modelRepositoryName, _, modelVersion := xstrings.Partition(model.Tag, ":") - ns := getModelNamespace(bentoRequest) - if isHuggingfaceModel(model) { - modelVersion = "all" - } - var path string - if ns == "" { - path = fmt.Sprintf("models/.shared/%s/%s", modelRepositoryName, modelVersion) - } else { - path = fmt.Sprintf("models/%s/%s/%s", ns, modelRepositoryName, modelVersion) - } - return path -} - -func isHuggingfaceModel(model *resourcesv1alpha1.BentoModel) bool { - return strings.HasPrefix(model.DownloadURL, "hf://") -} - -type GenerateModelPVCOption struct { - BentoRequest *resourcesv1alpha1.BentoRequest - Model *resourcesv1alpha1.BentoModel -} - -func (r *BentoRequestReconciler) generateModelPVC(opt GenerateModelPVCOption) (pvc *corev1.PersistentVolumeClaim) { - modelRepositoryName, _, modelVersion := xstrings.Partition(opt.Model.Tag, ":") - storageSize := resource.MustParse("100Gi") - if opt.Model.Size != nil { - storageSize = *opt.Model.Size - minStorageSize := resource.MustParse("1Gi") - if storageSize.Value() < minStorageSize.Value() { - storageSize = minStorageSize - } - storageSize.Set(storageSize.Value() * 2) - } - path := r.getJuiceFSModelPath(opt.BentoRequest, opt.Model) - pvcName := r.getModelPVCName(opt.BentoRequest, opt.Model) - pvc = &corev1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Name: pvcName, - Namespace: opt.BentoRequest.Namespace, - Annotations: map[string]string{ - "path": path, - }, - Labels: map[string]string{ - commonconsts.KubeLabelYataiModelRepository: modelRepositoryName, - commonconsts.KubeLabelYataiModel: modelVersion, - }, - }, - Spec: corev1.PersistentVolumeClaimSpec{ - AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteMany}, - Resources: corev1.VolumeResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceStorage: storageSize, - }, - }, - StorageClassName: ptr.To(getJuiceFSStorageClassName()), - }, - } - return -} - -type GenerateModelSeederJobOption struct { - BentoRequest *resourcesv1alpha1.BentoRequest - Model *resourcesv1alpha1.BentoModel -} - -func (r *BentoRequestReconciler) generateModelSeederJob(ctx context.Context, opt GenerateModelSeederJobOption) (job *batchv1.Job, err error) { - // nolint: gosimple - podTemplateSpec, err := r.generateModelSeederPodTemplateSpec(ctx, GenerateModelSeederPodTemplateSpecOption{ - BentoRequest: opt.BentoRequest, - Model: opt.Model, - }) - if err != nil { - err = errors.Wrap(err, "generate model seeder pod template spec") - return - } - kubeAnnotations := make(map[string]string) - hashStr, err := r.getHashStr(opt.BentoRequest) - if err != nil { - err = errors.Wrap(err, "failed to get hash string") - return - } - kubeAnnotations[KubeAnnotationBentoRequestHash] = hashStr - job = &batchv1.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: r.getModelSeederJobName(), - Namespace: opt.BentoRequest.Namespace, - Labels: r.getModelSeederJobLabels(opt.BentoRequest, opt.Model), - Annotations: kubeAnnotations, - }, - Spec: batchv1.JobSpec{ - Completions: ptr.To(int32(1)), - Parallelism: ptr.To(int32(1)), - BackoffLimit: ptr.To(int32(1)), - PodFailurePolicy: &batchv1.PodFailurePolicy{ - Rules: []batchv1.PodFailurePolicyRule{ - { - Action: batchv1.PodFailurePolicyActionFailJob, - OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{ - ContainerName: ptr.To(ModelSeederContainerName), - Operator: batchv1.PodFailurePolicyOnExitCodesOpIn, - Values: []int32{ModelSeederJobFailedExitCode}, - }, - }, - }, - }, - Template: *podTemplateSpec, - }, - } - err = ctrl.SetControllerReference(opt.BentoRequest, job, r.Scheme) - if err != nil { - err = errors.Wrapf(err, "set controller reference for job %s", job.Name) - return - } - return -} - -type GenerateModelSeederPodTemplateSpecOption struct { - BentoRequest *resourcesv1alpha1.BentoRequest - Model *resourcesv1alpha1.BentoModel -} - -func (r *BentoRequestReconciler) generateModelSeederPodTemplateSpec(ctx context.Context, opt GenerateModelSeederPodTemplateSpecOption) (pod *corev1.PodTemplateSpec, err error) { - kubeLabels := r.getModelSeederPodLabels(opt.BentoRequest, opt.Model) - - volumes := make([]corev1.Volume, 0) - - volumeMounts := make([]corev1.VolumeMount, 0) - - yataiAPITokenSecretName := "" - - // nolint: gosec - awsAccessKeySecretName := opt.BentoRequest.Annotations[commonconsts.KubeAnnotationAWSAccessKeySecretName] - if awsAccessKeySecretName == "" { - awsAccessKeyID := os.Getenv(commonconsts.EnvAWSAccessKeyID) - awsSecretAccessKey := os.Getenv(commonconsts.EnvAWSSecretAccessKey) - if awsAccessKeyID != "" && awsSecretAccessKey != "" { - // nolint: gosec - awsAccessKeySecretName = YataiImageBuilderAWSAccessKeySecretName - stringData := map[string]string{ - commonconsts.EnvAWSAccessKeyID: awsAccessKeyID, - commonconsts.EnvAWSSecretAccessKey: awsSecretAccessKey, - } - awsAccessKeySecret := &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: awsAccessKeySecretName, - Namespace: opt.BentoRequest.Namespace, - }, - StringData: stringData, - } - r.Recorder.Eventf(opt.BentoRequest, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Creating or updating secret %s in namespace %s", awsAccessKeySecretName, opt.BentoRequest.Namespace) - _, err = controllerutil.CreateOrUpdate(ctx, r.Client, awsAccessKeySecret, func() error { - awsAccessKeySecret.StringData = stringData - return nil - }) - if err != nil { - err = errors.Wrapf(err, "failed to create or update secret %s", awsAccessKeySecretName) - return - } - r.Recorder.Eventf(opt.BentoRequest, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Secret %s is created or updated in namespace %s", awsAccessKeySecretName, opt.BentoRequest.Namespace) - } - } - - internalImages := commonconfig.GetInternalImages() - logrus.Infof("Model seeder is using the images %v", *internalImages) - - downloaderContainerResources := corev1.ResourceRequirements{ - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1000m"), - corev1.ResourceMemory: resource.MustParse("3000Mi"), - }, - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("100m"), - corev1.ResourceMemory: resource.MustParse("1000Mi"), - }, - } - - downloaderContainerEnvFrom := opt.BentoRequest.Spec.DownloaderContainerEnvFrom - - if yataiAPITokenSecretName != "" { - downloaderContainerEnvFrom = append(downloaderContainerEnvFrom, corev1.EnvFromSource{ - SecretRef: &corev1.SecretEnvSource{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: yataiAPITokenSecretName, - }, - }, - }) - } - - if awsAccessKeySecretName != "" { - downloaderContainerEnvFrom = append(downloaderContainerEnvFrom, corev1.EnvFromSource{ - SecretRef: &corev1.SecretEnvSource{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: awsAccessKeySecretName, - }, - }, - }) - } - - containers := make([]corev1.Container, 0) - - model := opt.Model - modelRepositoryName, _, modelVersion := xstrings.Partition(model.Tag, ":") - modelDownloadURL := model.DownloadURL - modelDownloadHeader := "" - if modelDownloadURL == "" { - var yataiClient_ **yataiclient.YataiClient - var yataiConf_ **commonconfig.YataiConfig - - yataiClient_, yataiConf_, err = r.getYataiClient(ctx) - if err != nil { - err = errors.Wrap(err, "get yatai client") - return - } - - if yataiClient_ == nil || yataiConf_ == nil { - err = errors.New("can't get yatai client, please check yatai configuration") - return - } - - yataiClient := *yataiClient_ - yataiConf := *yataiConf_ - - var model_ *schemasv1.ModelFullSchema - r.Recorder.Eventf(opt.BentoRequest, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Getting model %s from yatai service", model.Tag) - model_, err = yataiClient.GetModel(ctx, modelRepositoryName, modelVersion) - if err != nil { - err = errors.Wrap(err, "get model") - return - } - r.Recorder.Eventf(opt.BentoRequest, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Model %s is got from yatai service", model.Tag) - - if model_.TransmissionStrategy != nil && *model_.TransmissionStrategy == modelschemas.TransmissionStrategyPresignedURL { - var model0 *schemasv1.ModelSchema - r.Recorder.Eventf(opt.BentoRequest, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Getting presigned url for model %s from yatai service", model.Tag) - model0, err = yataiClient.PresignModelDownloadURL(ctx, modelRepositoryName, modelVersion) - if err != nil { - err = errors.Wrap(err, "presign model download url") - return - } - r.Recorder.Eventf(opt.BentoRequest, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Presigned url for model %s is got from yatai service", model.Tag) - modelDownloadURL = model0.PresignedDownloadUrl - } else { - modelDownloadURL = fmt.Sprintf("%s/api/v1/model_repositories/%s/models/%s/download", yataiConf.Endpoint, modelRepositoryName, modelVersion) - modelDownloadHeader = fmt.Sprintf("%s: %s:%s:$%s", commonconsts.YataiApiTokenHeaderName, commonconsts.YataiImageBuilderComponentName, yataiConf.ClusterName, commonconsts.EnvYataiApiToken) - } - } - - initContainers := make([]corev1.Container, 0) - if strings.HasPrefix(modelDownloadURL, "hf://") { - url := strings.TrimPrefix(modelDownloadURL, "hf://") - arr := strings.Split(url, "@") - if len(arr) != 3 { - err = errors.Errorf("invalid Huggingface model download URL %s", modelDownloadURL) - return - } - modelID := arr[0] - modelRevsion := arr[1] - modelEndpoint := arr[2] - modelURL := path.Join(modelEndpoint, modelID, "resolve/main/config.json") - - var hfTokenValidatorCommandOutput bytes.Buffer - err = template.Must(template.New("script").Parse(` -set -e - -if curl -XHEAD -sIL -H "Authorization: Bearer $HF_TOKEN" "{{.ModelURL}}" | grep -iq 'x-error-code:.*GatedRepo'; then - echo "Error: Model is gated. No access permission. \nPlease check your huggingface token and model access permission: {{.ModelURL}}" - exit 1 -else - echo "Model access granted." - exit 0 -fi -`)).Execute(&hfTokenValidatorCommandOutput, map[string]interface{}{ - "ModelURL": fmt.Sprintf("%s?revision=%s", modelURL, modelRevsion), - }) - if err != nil { - err = errors.Wrap(err, "failed to generate huggingface validation command") - return - } - - initContainers = append(initContainers, corev1.Container{ - Name: "hf-token-validator", - Image: internalImages.BentoDownloader, - Command: []string{ - "bash", - "-c", - hfTokenValidatorCommandOutput.String(), - }, - VolumeMounts: volumeMounts, - Resources: downloaderContainerResources, - EnvFrom: downloaderContainerEnvFrom, - }) - } - - modelDirPath := "/juicefs-workspace" - var modelSeedCommandOutput bytes.Buffer - err = template.Must(template.New("script").Parse(` -set -e - -mkdir -p {{.ModelDirPath}} -url="{{.ModelDownloadURL}}" - -if [[ ${url} == hf://* ]]; then - if [ -f "{{.ModelDirPath}}/{{.ModelVersion}}.exists" ]; then - echo "Model {{.ModelDirPath}}/{{.ModelVersion}}.exists already exists, skip downloading" - exit 0 - fi -else - if [ -f "{{.ModelDirPath}}/.exists" ]; then - echo "Model {{.ModelDirPath}} already exists, skip downloading" - exit 0 - fi -fi - -cleanup() { - echo "Cleaning up..." - rm -rf /tmp/model - rm -f /tmp/downloaded.tar -} - -trap cleanup EXIT - -if [[ ${url} == hf://* ]]; then - mkdir -p /tmp/model - hf_url="${url:5}" - model_id=$(echo "$hf_url" | awk -F '@' '{print $1}') - revision=$(echo "$hf_url" | awk -F '@' '{print $2}') - endpoint=$(echo "$hf_url" | awk -F '@' '{print $3}') - export HF_ENDPOINT=${endpoint} - - echo "Downloading model ${model_id} (endpoint=${endpoint}, revision=${revision}) from Huggingface..." - huggingface-cli download ${model_id} --revision ${revision} --cache-dir {{.ModelDirPath}} -else - echo "Downloading model {{.ModelRepositoryName}}:{{.ModelVersion}} to /tmp/downloaded.tar..." - if [[ ${url} == s3://* ]]; then - echo "Downloading from s3..." - aws s3 cp ${url} /tmp/downloaded.tar - elif [[ ${url} == gs://* ]]; then - echo "Downloading from GCS..." - gsutil cp ${url} /tmp/downloaded.tar - else - curl --fail -L -H "{{.ModelDownloadHeader}}" ${url} --output /tmp/downloaded.tar --progress-bar - fi - cd {{.ModelDirPath}} - echo "Extracting model tar file..." - tar -xvf /tmp/downloaded.tar -fi - -if [[ ${url} == hf://* ]]; then - echo "Creating {{.ModelDirPath}}/{{.ModelVersion}}.exists file..." - touch {{.ModelDirPath}}/{{.ModelVersion}}.exists -else - echo "Creating {{.ModelDirPath}}/.exists file..." - touch {{.ModelDirPath}}/.exists -fi - -echo "Done" -`)).Execute(&modelSeedCommandOutput, map[string]interface{}{ - "ModelDirPath": modelDirPath, - "ModelDownloadURL": modelDownloadURL, - "ModelDownloadHeader": modelDownloadHeader, - "ModelRepositoryName": modelRepositoryName, - "ModelVersion": modelVersion, - "HuggingfaceModelDir": "models--" + strings.ReplaceAll(modelRepositoryName, "/", "--"), - }) - if err != nil { - err = errors.Wrap(err, "failed to generate download command") - return - } - modelSeedCommand := modelSeedCommandOutput.String() - pvcName := r.getModelPVCName(opt.BentoRequest, model) - volumes = append(volumes, corev1.Volume{ - Name: pvcName, - VolumeSource: corev1.VolumeSource{ - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: pvcName, - }, - }, - }) - containers = append(containers, corev1.Container{ - Name: ModelSeederContainerName, - Image: internalImages.BentoDownloader, - Command: []string{ - "bash", - "-c", - modelSeedCommand, - }, - VolumeMounts: append(volumeMounts, corev1.VolumeMount{ - Name: pvcName, - MountPath: modelDirPath, - }), - Resources: downloaderContainerResources, - EnvFrom: downloaderContainerEnvFrom, - Env: []corev1.EnvVar{ - { - Name: "AWS_EC2_METADATA_DISABLED", - Value: "true", - }, - }, - }) - - kubeAnnotations := make(map[string]string) - kubeAnnotations[KubeAnnotationBentoRequestModelSeederHash] = opt.BentoRequest.Annotations[KubeAnnotationBentoRequestModelSeederHash] - - pod = &corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: kubeLabels, - Annotations: kubeAnnotations, - }, - Spec: corev1.PodSpec{ - RestartPolicy: corev1.RestartPolicyNever, - Volumes: volumes, - Containers: containers, - InitContainers: initContainers, - }, - } - - var globalExtraPodSpec *resourcesv1alpha1.ExtraPodSpec - - configNamespace, err := commonconfig.GetYataiImageBuilderNamespace(ctx, func(ctx context.Context, namespace, name string) (*corev1.Secret, error) { - secret := &corev1.Secret{} - err := r.Get(ctx, types.NamespacedName{ - Namespace: namespace, - Name: name, - }, secret) - return secret, errors.Wrap(err, "get secret") - }) - if err != nil { - err = errors.Wrap(err, "failed to get Yatai image builder namespace") - return - } - - r.Recorder.Eventf(opt.BentoRequest, corev1.EventTypeNormal, "GenerateModelSeederPod", "Getting configmap %s from namespace %s", configCmName, configNamespace) - configCm := &corev1.ConfigMap{} - err = r.Get(ctx, types.NamespacedName{Name: configCmName, Namespace: configNamespace}, configCm) - configCmIsNotFound := k8serrors.IsNotFound(err) - if err != nil && !configCmIsNotFound { - err = errors.Wrap(err, "failed to get configmap") - return - } - err = nil - - if !configCmIsNotFound { - r.Recorder.Eventf(opt.BentoRequest, corev1.EventTypeNormal, "GenerateModelSeederPod", "Configmap %s is got from namespace %s", configCmName, configNamespace) - - globalExtraPodSpec = &resourcesv1alpha1.ExtraPodSpec{} - - if val, ok := configCm.Data["extra_pod_spec"]; ok { - err = yaml.Unmarshal([]byte(val), globalExtraPodSpec) - if err != nil { - err = errors.Wrapf(err, "failed to yaml unmarshal extra_pod_spec, please check the configmap %s in namespace %s", configCmName, configNamespace) - return - } - } - } else { - r.Recorder.Eventf(opt.BentoRequest, corev1.EventTypeNormal, "GenerateModelSeederPod", "Configmap %s is not found in namespace %s", configCmName, configNamespace) - } - - if globalExtraPodSpec != nil { - pod.Spec.PriorityClassName = globalExtraPodSpec.PriorityClassName - pod.Spec.SchedulerName = globalExtraPodSpec.SchedulerName - pod.Spec.NodeSelector = globalExtraPodSpec.NodeSelector - pod.Spec.Affinity = globalExtraPodSpec.Affinity - pod.Spec.Tolerations = globalExtraPodSpec.Tolerations - pod.Spec.TopologySpreadConstraints = globalExtraPodSpec.TopologySpreadConstraints - pod.Spec.ServiceAccountName = globalExtraPodSpec.ServiceAccountName - } - - injectPodAffinity(&pod.Spec, opt.BentoRequest) - - return -} - type GenerateImageBuilderJobOption struct { ImageInfo ImageInfo BentoRequest *resourcesv1alpha1.BentoRequest @@ -2622,8 +1693,6 @@ echo "Done" containers := make([]corev1.Container, 0) - separateModels := isSeparateModels(opt.BentoRequest) - models := opt.BentoRequest.Spec.Models modelsSeen := map[string]struct{}{} for _, model := range models { @@ -2640,122 +1709,6 @@ echo "Done" } } - for idx, model := range models { - if separateModels { - continue - } - modelRepositoryName, _, modelVersion := xstrings.Partition(model.Tag, ":") - modelDownloadURL := model.DownloadURL - modelDownloadHeader := "" - if modelDownloadURL == "" { - if bento == nil { - continue - } - - var yataiClient_ **yataiclient.YataiClient - var yataiConf_ **commonconfig.YataiConfig - - yataiClient_, yataiConf_, err = r.getYataiClient(ctx) - if err != nil { - err = errors.Wrap(err, "get yatai client") - return - } - - if yataiClient_ == nil || yataiConf_ == nil { - err = errors.New("can't get yatai client, please check yatai configuration") - return - } - - yataiClient := *yataiClient_ - yataiConf := *yataiConf_ - - var model_ *schemasv1.ModelFullSchema - r.Recorder.Eventf(opt.BentoRequest, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Getting model %s from yatai service", model.Tag) - model_, err = yataiClient.GetModel(ctx, modelRepositoryName, modelVersion) - if err != nil { - err = errors.Wrap(err, "get model") - return - } - r.Recorder.Eventf(opt.BentoRequest, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Model %s is got from yatai service", model.Tag) - - if model_.TransmissionStrategy != nil && *model_.TransmissionStrategy == modelschemas.TransmissionStrategyPresignedURL { - var model0 *schemasv1.ModelSchema - r.Recorder.Eventf(opt.BentoRequest, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Getting presigned url for model %s from yatai service", model.Tag) - model0, err = yataiClient.PresignModelDownloadURL(ctx, modelRepositoryName, modelVersion) - if err != nil { - err = errors.Wrap(err, "presign model download url") - return - } - r.Recorder.Eventf(opt.BentoRequest, corev1.EventTypeNormal, "GenerateImageBuilderPod", "Presigned url for model %s is got from yatai service", model.Tag) - modelDownloadURL = model0.PresignedDownloadUrl - } else { - modelDownloadURL = fmt.Sprintf("%s/api/v1/model_repositories/%s/models/%s/download", yataiConf.Endpoint, modelRepositoryName, modelVersion) - modelDownloadHeader = fmt.Sprintf("%s: %s:%s:$%s", commonconsts.YataiApiTokenHeaderName, commonconsts.YataiImageBuilderComponentName, yataiConf.ClusterName, commonconsts.EnvYataiApiToken) - } - } - modelRepositoryDirPath := "/workspace/buildcontext/models/" + modelRepositoryName - modelDirPath := filepath.Join(modelRepositoryDirPath, modelVersion) - var modelDownloadCommandOutput bytes.Buffer - err = template.Must(template.New("script").Parse(` -set -e - -mkdir -p {{.ModelDirPath}} -url="{{.ModelDownloadURL}}" -echo "Downloading model {{.ModelRepositoryName}}:{{.ModelVersion}} to /tmp/downloaded.tar..." -if [[ ${url} == s3://* ]]; then - echo "Downloading from s3..." - aws s3 cp ${url} /tmp/downloaded.tar -elif [[ ${url} == gs://* ]]; then - echo "Downloading from GCS..." - gsutil cp ${url} /tmp/downloaded.tar -else - curl --fail -L -H "{{.ModelDownloadHeader}}" ${url} --output /tmp/downloaded.tar --progress-bar -fi -cd {{.ModelDirPath}} -echo "Extracting model tar file..." -tar -xvf /tmp/downloaded.tar -echo -n '{{.ModelVersion}}' > {{.ModelRepositoryDirPath}}/latest -echo "Removing model tar file..." -rm /tmp/downloaded.tar -{{if not .Privileged}} -echo "Changing directory permission..." -chown -R 1000:1000 /workspace -{{end}} -echo "Done" -`)).Execute(&modelDownloadCommandOutput, map[string]interface{}{ - "ModelDirPath": modelDirPath, - "ModelDownloadURL": modelDownloadURL, - "ModelDownloadHeader": modelDownloadHeader, - "ModelRepositoryDirPath": modelRepositoryDirPath, - "ModelRepositoryName": modelRepositoryName, - "ModelVersion": modelVersion, - "Privileged": privileged, - }) - if err != nil { - err = errors.Wrap(err, "failed to generate download command") - return - } - modelDownloadCommand := modelDownloadCommandOutput.String() - initContainers = append(initContainers, corev1.Container{ - Name: fmt.Sprintf("model-downloader-%d", idx), - Image: internalImages.BentoDownloader, - Command: []string{ - "bash", - "-c", - modelDownloadCommand, - }, - VolumeMounts: volumeMounts, - Resources: downloaderContainerResources, - EnvFrom: downloaderContainerEnvFrom, - Env: []corev1.EnvVar{ - { - Name: "AWS_EC2_METADATA_DISABLED", - Value: "true", - }, - }, - }) - } - var globalExtraPodMetadata *resourcesv1alpha1.ExtraPodMetadata var globalExtraPodSpec *resourcesv1alpha1.ExtraPodSpec var globalExtraContainerEnv []corev1.EnvVar @@ -3096,8 +2049,13 @@ echo "Done" cmd := shquot.POSIXShell(append(command, args...)) + type ModelSpec struct { + Tag string `json:"tag"` + DownloadURL string `json:"download_url"` // nolint:tagliatelle + } + if imageStoredInS3 { - builderImage = "quay.io/bentoml/bento-image-builder:0.0.13" + builderImage = "quay.io/bentoml/bento-image-builder:0.0.15" extraFlags := "" for _, buildArg := range buildArgs { extraFlags = fmt.Sprintf("%s --build-arg %s", extraFlags, strings.Replace(buildArg, "=", ":", 1)) @@ -3108,6 +2066,24 @@ echo "Done" if containerImageS3EnableStargz { extraFlags += " --enable-stargz" } + models := make([]ModelSpec, 0, len(opt.BentoRequest.Spec.Models)) + for _, model := range opt.BentoRequest.Spec.Models { + models = append(models, ModelSpec{ + Tag: model.Tag, + DownloadURL: model.DownloadURL, + }) + } + var modelsJSON []byte + modelsJSON, err = json.Marshal(models) + if err != nil { + err = errors.Wrap(err, "failed to marshal models") + return + } + + if len(models) > 0 { + extraFlags += " --models " + base64.StdEncoding.EncodeToString(modelsJSON) + } + var cmdOutput bytes.Buffer err = template.Must(template.New("script").Parse(` set -ex @@ -3368,13 +2344,6 @@ func (r *BentoRequestReconciler) registerYataiComponent() { } } -func getJuiceFSStorageClassName() string { - if v := os.Getenv("JUICEFS_STORAGE_CLASS_NAME"); v != "" { - return v - } - return "juicefs-sc" -} - const ( trueStr = "true" )