diff --git a/pkg/pillar/cmd/volumemgr/handlediskmetrics.go b/pkg/pillar/cmd/volumemgr/handlediskmetrics.go index d1cd21be23..28f28c2614 100644 --- a/pkg/pillar/cmd/volumemgr/handlediskmetrics.go +++ b/pkg/pillar/cmd/volumemgr/handlediskmetrics.go @@ -123,6 +123,7 @@ func diskMetricsTimerTask(ctx *volumemgrContext, handleChannel chan interface{}) case <-diskMetricTicker.C: start := time.Now() createOrUpdateDiskMetrics(ctx, wdName) + createOrUpdatePvcDiskMetrics(ctx) generateAndPublishVolumeMgrStatus(ctx) ctx.ps.CheckMaxTimeTopic(wdName, "createOrUpdateDiskMetrics", start, warningTime, errorTime) @@ -309,13 +310,13 @@ func createOrUpdateDiskMetrics(ctx *volumemgrContext, wdName string) { publishDiskMetrics(ctx, diskMetricList...) for _, volumeStatus := range getAllVolumeStatus(ctx) { ctx.ps.StillRunning(wdName, warningTime, errorTime) - if err := createOrUpdateAppDiskMetrics(ctx, volumeStatus); err != nil { + if err := createOrUpdateAppDiskMetrics(ctx, wdName, volumeStatus); err != nil { log.Errorf("CreateOrUpdateCommonDiskMetrics: exception while publishing diskmetric. %s", err.Error()) } } } -func createOrUpdateAppDiskMetrics(ctx *volumemgrContext, volumeStatus *types.VolumeStatus) error { +func createOrUpdateAppDiskMetrics(ctx *volumemgrContext, wdName string, volumeStatus *types.VolumeStatus) error { log.Functionf("createOrUpdateAppDiskMetrics(%s, %s)", volumeStatus.VolumeID, volumeStatus.FileLocation) if volumeStatus.FileLocation == "" { if !ctx.hvTypeKube { @@ -326,6 +327,8 @@ func createOrUpdateAppDiskMetrics(ctx *volumemgrContext, volumeStatus *types.Vol volumeStatus.FileLocation = volumeStatus.GetPVCName() } } + // Some handlers (csi) can have http timeouts, update the watchdog + ctx.ps.StillRunning(wdName, warningTime, errorTime) actualSize, maxSize, diskType, dirtyFlag, err := volumehandlers.GetVolumeHandler(log, ctx, volumeStatus).GetVolumeDetails() if err != nil { err = fmt.Errorf("createOrUpdateAppDiskMetrics(%s, %s): exception while getting volume size. %s", diff --git a/pkg/pillar/cmd/volumemgr/handlepvcdiskmetrics.go b/pkg/pillar/cmd/volumemgr/handlepvcdiskmetrics.go new file mode 100644 index 0000000000..0ed904414b --- /dev/null +++ b/pkg/pillar/cmd/volumemgr/handlepvcdiskmetrics.go @@ -0,0 +1,66 @@ +// Copyright (c) 2024 Zededa, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//go:build kubevirt + +package volumemgr + +import ( + "github.com/lf-edge/eve/pkg/pillar/kubeapi" + "github.com/lf-edge/eve/pkg/pillar/types" +) + +// createOrUpdatePvcDiskMetrics creates or updates metrics for all kubevirt PVCs +// PVC mknod will match one of existing sdX devices, create copies for convenience +func createOrUpdatePvcDiskMetrics(ctx *volumemgrContext) { + log.Functionf("createOrUpdatePvcDiskMetrics") + var diskMetricList []*types.DiskMetric + + sdMajMinToNameMap, _, _ := kubeapi.SCSIGetMajMinMaps() + _, pvNameToMajMin, _ := kubeapi.LonghornGetMajorMinorMaps() + _, pvcToPvMap, _ := kubeapi.PvPvcMaps() + + kubeapi.CleanupUnmountedDiskMetrics(ctx.pubDiskMetric, pvcToPvMap) + + for pvcName, pvName := range pvcToPvMap { + // pv-name will be of format "pvc-" + // pvc-name will be of format "-pvc-0" + // pvc-name uuid prefix will show in VolumeStatus + // full pvc-name will be in VolumeStatus.FileLocation + + if pvName == "" { + continue + } + + pvMajMinStr, ok := pvNameToMajMin[pvName] + if !ok { + continue + } + + sdName, ok := sdMajMinToNameMap[pvMajMinStr] + if !ok { + continue + } + + var metric *types.DiskMetric + metric = lookupDiskMetric(ctx, sdName) + if metric == nil { + continue + } + + pvcMetric := lookupDiskMetric(ctx, sdName+"-"+pvcName) + if pvcMetric == nil { + pvcMetric = &types.DiskMetric{DiskPath: sdName + "-" + pvcName, IsDir: false} + } + pvcMetric.ReadBytes = metric.ReadBytes + pvcMetric.WriteBytes = metric.WriteBytes + pvcMetric.ReadCount = metric.ReadCount + pvcMetric.WriteCount = metric.WriteCount + pvcMetric.TotalBytes = metric.TotalBytes + pvcMetric.UsedBytes = metric.UsedBytes + pvcMetric.FreeBytes = metric.FreeBytes + pvcMetric.IsDir = false + diskMetricList = append(diskMetricList, pvcMetric) + } + publishDiskMetrics(ctx, diskMetricList...) +} diff --git a/pkg/pillar/cmd/volumemgr/handlevolume.go b/pkg/pillar/cmd/volumemgr/handlevolume.go index bd9a6d86f0..fa2e3d1217 100644 --- a/pkg/pillar/cmd/volumemgr/handlevolume.go +++ b/pkg/pillar/cmd/volumemgr/handlevolume.go @@ -53,7 +53,7 @@ func handleVolumeModify(ctxArg interface{}, key string, status.SetError(errStr, time.Now()) publishVolumeStatus(ctx, status) updateVolumeRefStatus(ctx, status) - if err := createOrUpdateAppDiskMetrics(ctx, status); err != nil { + if err := createOrUpdateAppDiskMetrics(ctx, agentName, status); err != nil { log.Errorf("handleVolumeModify(%s): exception while publishing diskmetric. %s", key, err.Error()) } return @@ -66,7 +66,7 @@ func handleVolumeModify(ctxArg interface{}, key string, updateVolumeStatusRefCount(ctx, status) publishVolumeStatus(ctx, status) updateVolumeRefStatus(ctx, status) - if err := createOrUpdateAppDiskMetrics(ctx, status); err != nil { + if err := createOrUpdateAppDiskMetrics(ctx, agentName, status); err != nil { log.Errorf("handleVolumeModify(%s): exception while publishing diskmetric. %s", key, err.Error()) } } @@ -135,7 +135,7 @@ func handleDeferredVolumeCreate(ctx *volumemgrContext, key string, config *types status.SetError(err.Error(), time.Now()) publishVolumeStatus(ctx, status) updateVolumeRefStatus(ctx, status) - if err := createOrUpdateAppDiskMetrics(ctx, status); err != nil { + if err := createOrUpdateAppDiskMetrics(ctx, agentName, status); err != nil { log.Errorf("handleDeferredVolumeCreate(%s): exception while publishing diskmetric. %s", key, err.Error()) } return @@ -175,7 +175,7 @@ func handleDeferredVolumeCreate(ctx *volumemgrContext, key string, config *types } publishVolumeStatus(ctx, status) updateVolumeRefStatus(ctx, status) - if err := createOrUpdateAppDiskMetrics(ctx, status); err != nil { + if err := createOrUpdateAppDiskMetrics(ctx, agentName, status); err != nil { log.Errorf("handleDeferredVolumeCreate(%s): exception while publishing diskmetric. %s", key, err.Error()) } return @@ -186,7 +186,7 @@ func handleDeferredVolumeCreate(ctx *volumemgrContext, key string, config *types publishVolumeStatus(ctx, status) updateVolumeRefStatus(ctx, status) } - if err := createOrUpdateAppDiskMetrics(ctx, status); err != nil { + if err := createOrUpdateAppDiskMetrics(ctx, agentName, status); err != nil { log.Errorf("handleDeferredVolumeCreate(%s): exception while publishing diskmetric. %s", key, err.Error()) } log.Tracef("handleDeferredVolumeCreate(%s) done", key) @@ -335,7 +335,7 @@ func maybeSpaceAvailable(ctx *volumemgrContext) { if changed { publishVolumeStatus(ctx, &status) updateVolumeRefStatus(ctx, &status) - if err := createOrUpdateAppDiskMetrics(ctx, &status); err != nil { + if err := createOrUpdateAppDiskMetrics(ctx, agentName, &status); err != nil { log.Errorf("maybeSpaceAvailable(%s): exception while publishing diskmetric. %s", status.Key(), err.Error()) } } diff --git a/pkg/pillar/cmd/volumemgr/handlevolumeref.go b/pkg/pillar/cmd/volumemgr/handlevolumeref.go index 6528dccbde..bdce9c060c 100644 --- a/pkg/pillar/cmd/volumemgr/handlevolumeref.go +++ b/pkg/pillar/cmd/volumemgr/handlevolumeref.go @@ -63,7 +63,7 @@ func handleVolumeRefCreate(ctxArg interface{}, key string, if changed { publishVolumeStatus(ctx, vs) updateVolumeRefStatus(ctx, vs) - if err := createOrUpdateAppDiskMetrics(ctx, vs); err != nil { + if err := createOrUpdateAppDiskMetrics(ctx, agentName, vs); err != nil { log.Errorf("handleVolumeRefCreate(%s): exception while publishing diskmetric. %s", status.Key(), err.Error()) } @@ -95,7 +95,7 @@ func handleVolumeRefModify(ctxArg interface{}, key string, if changed { publishVolumeStatus(ctx, vs) updateVolumeRefStatus(ctx, vs) - if err := createOrUpdateAppDiskMetrics(ctx, vs); err != nil { + if err := createOrUpdateAppDiskMetrics(ctx, agentName, vs); err != nil { log.Errorf("handleVolumeRefModify(%s): exception while publishing diskmetric. %s", status.Key(), err.Error()) } diff --git a/pkg/pillar/cmd/volumemgr/nokube.go b/pkg/pillar/cmd/volumemgr/nokube.go new file mode 100644 index 0000000000..6c3f3efe8f --- /dev/null +++ b/pkg/pillar/cmd/volumemgr/nokube.go @@ -0,0 +1,11 @@ +// Copyright (c) 2025 Zededa, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//go:build !kubevirt + +package volumemgr + +// createOrUpdatePvcDiskMetrics has no work in non kubevirt builds +func createOrUpdatePvcDiskMetrics(*volumemgrContext) { + return +} diff --git a/pkg/pillar/cmd/volumemgr/updatestatus.go b/pkg/pillar/cmd/volumemgr/updatestatus.go index 64a65823d2..1957c64779 100644 --- a/pkg/pillar/cmd/volumemgr/updatestatus.go +++ b/pkg/pillar/cmd/volumemgr/updatestatus.go @@ -919,7 +919,7 @@ func updateVolumeStatus(ctx *volumemgrContext, volumeID uuid.UUID) bool { publishVolumeStatus(ctx, &status) updateVolumeRefStatus(ctx, &status) } - if err := createOrUpdateAppDiskMetrics(ctx, &status); err != nil { + if err := createOrUpdateAppDiskMetrics(ctx, agentName, &status); err != nil { log.Errorf("updateVolumeStatus(%s): exception while publishing diskmetric. %s", status.Key(), err.Error()) } @@ -948,7 +948,7 @@ func updateVolumeStatusFromContentID(ctx *volumemgrContext, contentID uuid.UUID) if changed { publishVolumeStatus(ctx, &status) updateVolumeRefStatus(ctx, &status) - if err := createOrUpdateAppDiskMetrics(ctx, &status); err != nil { + if err := createOrUpdateAppDiskMetrics(ctx, agentName, &status); err != nil { log.Errorf("updateVolumeStatus(%s): exception while publishing diskmetric. %s", status.Key(), err.Error()) } diff --git a/pkg/pillar/cmd/zfsmanager/zfsstoragemetrics.go b/pkg/pillar/cmd/zfsmanager/zfsstoragemetrics.go index fb1849ce95..b6b96044b9 100644 --- a/pkg/pillar/cmd/zfsmanager/zfsstoragemetrics.go +++ b/pkg/pillar/cmd/zfsmanager/zfsstoragemetrics.go @@ -7,6 +7,7 @@ import ( "time" libzfs "github.com/andrewd-zededa/go-libzfs" + "github.com/lf-edge/eve/pkg/pillar/base" "github.com/lf-edge/eve/pkg/pillar/types" "github.com/lf-edge/eve/pkg/pillar/utils/persist" "github.com/lf-edge/eve/pkg/pillar/zfs" @@ -49,23 +50,25 @@ func collectAndPublishStorageMetrics(ctxPtr *zfsContext) { zfsPoolMetrics := zfs.GetZpoolMetrics(vdevs) - // Fill metrics for zvols - for _, vs := range ctxPtr.subVolumeStatus.GetAll() { - volumeStatus := vs.(types.VolumeStatus) - if volumeStatus.State < types.CREATING_VOLUME { - // we did not go to creating of volume, nothing to measure - continue + if !base.IsHVTypeKube() { + // Fill metrics for zvols + for _, vs := range ctxPtr.subVolumeStatus.GetAll() { + volumeStatus := vs.(types.VolumeStatus) + if volumeStatus.State < types.CREATING_VOLUME { + // we did not go to creating of volume, nothing to measure + continue + } + if !volumeStatus.UseZVolDisk(persist.ReadPersistType()) { + // we do not create zvol for that volumeStatus + continue + } + zVolMetric, err := zfs.GetZvolMetrics(volumeStatus, zfsPoolMetrics.PoolName) + if err != nil { + // It is possible that the logical volume belongs to another zpool + continue + } + zfsPoolMetrics.ZVols = append(zfsPoolMetrics.ZVols, zVolMetric) } - if !volumeStatus.UseZVolDisk(persist.ReadPersistType()) { - // we do not create zvol for that volumeStatus - continue - } - zVolMetric, err := zfs.GetZvolMetrics(volumeStatus, zfsPoolMetrics.PoolName) - if err != nil { - // It is possible that the logical volume belongs to another zpool - continue - } - zfsPoolMetrics.ZVols = append(zfsPoolMetrics.ZVols, zVolMetric) } if err := ctxPtr.storageMetricsPub.Publish(zfsPoolMetrics.Key(), *zfsPoolMetrics); err != nil { diff --git a/pkg/pillar/diskmetrics/usage.go b/pkg/pillar/diskmetrics/usage.go index 69951a6e4c..a33432e7eb 100644 --- a/pkg/pillar/diskmetrics/usage.go +++ b/pkg/pillar/diskmetrics/usage.go @@ -29,7 +29,8 @@ func StatAllocatedBytes(path string) (uint64, error) { if err != nil { return uint64(0), err } - return uint64(stat.Blocks * int64(stat.Blksize)), nil + // stat.Blocks is always 512-byte blocks + return uint64(stat.Blocks * 512), nil } // SizeFromDir performs a du -s equivalent operation. @@ -82,7 +83,8 @@ func SizeFromDir(log *base.LogObject, dirname string) (uint64, error) { // will be in the clear and vault volumes base directories so a lot of compute time // can be saved by not checking detailed allocated bytes information in deeper // directories. - if dirname == types.VolumeEncryptedDirName || dirname == types.VolumeClearDirName { + if strings.HasPrefix(dirname, types.VolumeEncryptedDirName) || + strings.HasPrefix(dirname, types.VolumeClearDirName) { // FileInfo.Size() returns the provisioned size // Sparse files will have a smaller allocated size than provisioned // Use full syscall.Stat_t to get the allocated size diff --git a/pkg/pillar/hypervisor/kubevirt.go b/pkg/pillar/hypervisor/kubevirt.go index 7006741b6d..39847207ac 100644 --- a/pkg/pillar/hypervisor/kubevirt.go +++ b/pkg/pillar/hypervisor/kubevirt.go @@ -128,10 +128,17 @@ func (metrics *kubevirtMetrics) fill(domainName, metricName string, value interf cpuNs := assignToInt64(value) * int64(time.Second) r.CPUTotalNs = r.CPUTotalNs + uint64(cpuNs) case "kubevirt_vmi_memory_usable_bytes": + // The amount of memory which can be reclaimed by balloon without pushing the guest system to swap, + // corresponds to ‘Available’ in /proc/meminfo + // https://kubevirt.io/monitoring/metrics.html#kubevirt r.AvailableMemory = uint32(assignToInt64(value)) / BytesInMegabyte - case "kubevirt_vmi_memory_domain_bytes_total": + case "kubevirt_vmi_memory_domain_bytes": + // The amount of memory in bytes allocated to the domain. + // https://kubevirt.io/monitoring/metrics.html#kubevirt r.AllocatedMB = uint32(assignToInt64(value)) / BytesInMegabyte case "kubevirt_vmi_memory_available_bytes": // save this temp for later + // Amount of usable memory as seen by the domain. + // https://kubevirt.io/monitoring/metrics.html#kubevirt r.UsedMemory = uint32(assignToInt64(value)) / BytesInMegabyte default: } diff --git a/pkg/pillar/kubeapi/cdiupload.go b/pkg/pillar/kubeapi/cdiupload.go new file mode 100644 index 0000000000..8952a1ecc4 --- /dev/null +++ b/pkg/pillar/kubeapi/cdiupload.go @@ -0,0 +1,69 @@ +// Copyright (c) 2025 Zededa, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//go:build kubevirt + +package kubeapi + +import ( + "context" + "fmt" + "time" + + "github.com/lf-edge/eve/pkg/pillar/base" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// waitForPVCUploadComplete: Loop until PVC upload annotations show upload complete +func waitForPVCUploadComplete(pvcName string, log *base.LogObject) error { + clientset, err := GetClientSet() + if err != nil { + log.Errorf("waitForPVCUploadComplete failed to get clientset err %v", err) + return err + } + + i := 100 + for i > 0 { + i-- + time.Sleep(5 * time.Second) + pvc, err := clientset.CoreV1().PersistentVolumeClaims(EVEKubeNameSpace). + Get(context.Background(), pvcName, metav1.GetOptions{}) + if err != nil { + log.Errorf("waitForPVCUploadComplete failed to get pvc info err %v", err) + continue + } + if cdiUploadIsComplete(log, pvc) { + return nil + } + } + + return fmt.Errorf("waitForPVCUploadComplete: time expired") +} + +func cdiUploadIsComplete(log *base.LogObject, pvc *corev1.PersistentVolumeClaim) bool { + annotationKey := "cdi.kubevirt.io/storage.pod.phase" + annotationExpectedVal := "Succeeded" + foundVal, ok := pvc.Annotations[annotationKey] + if !ok { + log.Errorf("pvc %s annotation %s is missing", pvc.Name, annotationKey) + return false + } + if foundVal != annotationExpectedVal { + log.Warnf("pvc %s annotation %s is %s, waiting for %s", pvc.Name, annotationKey, foundVal, annotationExpectedVal) + return false + } + + annotationKey = "cdi.kubevirt.io/storage.condition.running.message" + annotationExpectedVal = "Upload Complete" + foundVal, ok = pvc.Annotations[annotationKey] + if !ok { + log.Errorf("pvc %s annotation %s is missing", pvc.Name, annotationKey) + return false + } + if foundVal != annotationExpectedVal { + log.Warnf("pvc %s annotation %s is %s, waiting for %s", pvc.Name, annotationKey, foundVal, annotationExpectedVal) + return false + } + return true +} diff --git a/pkg/pillar/kubeapi/longhorninfo.go b/pkg/pillar/kubeapi/longhorninfo.go index 3642bb393d..437e081080 100644 --- a/pkg/pillar/kubeapi/longhorninfo.go +++ b/pkg/pillar/kubeapi/longhorninfo.go @@ -9,12 +9,38 @@ import ( "context" "fmt" "strings" + "time" lhv1beta2 "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2" "github.com/longhorn/longhorn-manager/k8s/pkg/client/clientset/versioned" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// LonghornVolumeSizeDetails returns the provisionedBytes and allocatedBytes size values for a longhorn volume +func LonghornVolumeSizeDetails(longhornVolumeName string) (provisionedBytes uint64, allocatedBytes uint64, err error) { + config, err := GetKubeConfig() + if err != nil { + return 0, 0, fmt.Errorf("LonghornVolumeSizeDetails can't get kubeconfig %v", err) + } + + lhClient, err := versioned.NewForConfig(config) + if err != nil { + return 0, 0, fmt.Errorf("LonghornVolumeSizeDetails can't get versioned config: %v", err) + } + + // Don't allow a k8s api timeout keep us waiting forever, set this one explicitly as its used in metrics path + shortContext, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + lhVol, err := lhClient.LonghornV1beta2().Volumes("longhorn-system").Get(shortContext, longhornVolumeName, metav1.GetOptions{}) + if err != nil || lhVol == nil { + return 0, 0, fmt.Errorf("LonghornVolumeSizeDetails can't get lh vol err:%v", err) + } + + return uint64(lhVol.Spec.Size), uint64(lhVol.Status.ActualSize), nil +} + +// LonghornReplicaList returns the replica for a given longhorn volume which is hosted on a given kubernetes node func LonghornReplicaList(ownerNodeName string, longhornVolName string) (*lhv1beta2.ReplicaList, error) { config, err := GetKubeConfig() if err != nil { diff --git a/pkg/pillar/kubeapi/vitoapiserver.go b/pkg/pillar/kubeapi/vitoapiserver.go index 318044a3f6..cc6902529e 100644 --- a/pkg/pillar/kubeapi/vitoapiserver.go +++ b/pkg/pillar/kubeapi/vitoapiserver.go @@ -14,7 +14,6 @@ import ( zconfig "github.com/lf-edge/eve-api/go/config" "github.com/lf-edge/eve/pkg/pillar/base" - "github.com/lf-edge/eve/pkg/pillar/diskmetrics" "github.com/lf-edge/eve/pkg/pillar/types" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -130,42 +129,34 @@ func GetPVCInfo(pvcName string, log *base.LogObject) (*types.ImgInfo, error) { return nil, err } - fmt := zconfig.Format_name[int32(zconfig.Format_PVC)] + imgFmt := zconfig.Format_name[int32(zconfig.Format_PVC)] imgInfo := types.ImgInfo{ - Format: fmt, + Format: imgFmt, Filename: pvcName, DirtyFlag: false, } - // Get the actual and used size of the PVC. - actualSizeBytes, usedSizeBytes := getPVCSizes(pvc) - - imgInfo.ActualSize = actualSizeBytes - imgInfo.VirtualSize = usedSizeBytes + // PVC asks for a minimum size, spec may be less than actual (status) provisioned + imgInfo.VirtualSize = getPVCSize(pvc) + // Ask longhorn for the PVCs backing-volume allocated space + _, imgInfo.ActualSize, err = LonghornVolumeSizeDetails(pvc.Spec.VolumeName) + if err != nil { + err = fmt.Errorf("GetPVCInfo failed to get info for pvc %s volume %s: %v", pvcName, pvc.Spec.VolumeName, err) + log.Error(err) + return &imgInfo, err + } return &imgInfo, nil } -// Returns the actual and used size of the PVC in bytes -func getPVCSizes(pvc *corev1.PersistentVolumeClaim) (actualSizeBytes, usedSizeBytes uint64) { - // Extract the actual size of the PVC from its spec. - actualSizeBytes = 0 - usedSizeBytes = 0 - - if pvc.Spec.Resources.Requests != nil { - if quantity, ok := pvc.Spec.Resources.Requests[corev1.ResourceStorage]; ok { - actualSizeBytes = uint64(quantity.Value()) - } - } - - // Extract the used size of the PVC from its status. +// Returns the provisioned size of the PVC in bytes +func getPVCSize(pvc *corev1.PersistentVolumeClaim) (provisionedSizeBytes uint64) { + // Status field contains the size of the volume which actually bound to the claim if pvc.Status.Phase == corev1.ClaimBound { if quantity, ok := pvc.Status.Capacity[corev1.ResourceStorage]; ok { - usedSizeBytes = uint64(quantity.Value()) + return uint64(quantity.Value()) } } - - return actualSizeBytes, usedSizeBytes - + return 0 } // longhorn PVC deals with Ki Mi not KB, MB @@ -213,7 +204,7 @@ func NewPVCDefinition(pvcName string, size string, annotations, // diskfile can be in qcow or raw format // If pvc does not exist, the command will create PVC and copies the data. func RolloutDiskToPVC(ctx context.Context, log *base.LogObject, exists bool, - diskfile string, pvcName string, filemode bool) error { + diskfile string, pvcName string, filemode bool, pvcSize uint64) error { // Get the Kubernetes clientset clientset, err := GetClientSet() @@ -248,23 +239,6 @@ func RolloutDiskToPVC(ctx context.Context, log *base.LogObject, exists bool, clusterIP := service.Spec.ClusterIP uploadproxyURL := "https://" + clusterIP + ":443" log.Noticef("RolloutDiskToPVC diskfile %s pvc %s URL %s", diskfile, pvcName, uploadproxyURL) - volSize, err := diskmetrics.GetDiskVirtualSize(log, diskfile) - if err != nil { - err = fmt.Errorf("failed to get virtual size of disk %s: %v", diskfile, err) - log.Error(err) - return err - } - - // ActualSize can be larger than VirtualSize for fully-allocated/not-thin QCOW2 files - actualVolSize, err := diskmetrics.GetDiskActualSize(log, diskfile) - if err != nil { - err = fmt.Errorf("failed to get actual size of disk %s: %v", diskfile, err) - log.Error(err) - return err - } - if actualVolSize > volSize { - volSize = actualVolSize - } // Sample virtctl command // virtctl image-upload -n eve-kube-app pvc pvcname --no-create --storage-class longhorn --image-path= @@ -288,29 +262,52 @@ func RolloutDiskToPVC(ctx context.Context, log *base.LogObject, exists bool, args = append(args, "--no-create") } else { // Add size - args = append(args, "--size", fmt.Sprint(volSize)) + args = append(args, "--size", fmt.Sprint(pvcSize)) } log.Noticef("virtctl args %v", args) - // Wait for long long time since some volumes could be in TBs - output, err := base.Exec(log, "/containers/services/kube/rootfs/usr/bin/virtctl", args...). - WithContext(ctx).WithUnlimitedTimeout(432000 * time.Second).CombinedOutput() + uploadTry := 0 + maxRetries := 10 + timeoutBaseSeconds := int64(300) // 5 min + volSizeGB := int64(pvcSize / 1024 / 1024 / 1024) + timeoutPer1GBSeconds := int64(120) + timeout := time.Duration(timeoutBaseSeconds + (volSizeGB * timeoutPer1GBSeconds)) + log.Noticef("RolloutDiskToPVC calculated timeout to %d seconds due to volume size %d GB", timeout, volSizeGB) - if err != nil { - err = fmt.Errorf("RolloutDiskToPVC: Failed to convert qcow to PVC %s: %v", output, err) - log.Error(err) - return err - } - err = waitForPVCReady(ctx, log, pvcName) + startTimeOverall := time.Now() - if err != nil { - err = fmt.Errorf("RolloutDiskToPVC: error wait for PVC %v", err) - log.Error(err) - return err - } + // + // CDI Upload is quick to fail upon short-lived k8s api errors during its own upload-wait status loop + // Try the upload again. + // + for uploadTry < maxRetries { + uploadTry++ - return nil + startTimeThisUpload := time.Now() + output, err := base.Exec(log, "/containers/services/kube/rootfs/usr/bin/virtctl", args...). + WithContext(ctx).WithUnlimitedTimeout(timeout * time.Second).CombinedOutput() + + uploadDuration := time.Since(startTimeThisUpload) + if err != nil { + err = fmt.Errorf("RolloutDiskToPVC: Failed after %f seconds to convert qcow to PVC %s: %v", uploadDuration.Seconds(), output, err) + log.Error(err) + time.Sleep(5) + continue + } + // Eventually the command should return something like: + // PVC 688b9728-6f21-4bb6-b2f7-4928813fefdc-pvc-0 already successfully imported/cloned/updated + overallDuration := time.Since(startTimeOverall) + log.Noticef("RolloutDiskToPVC image upload completed on try:%d after %f seconds, total elapsed time %f seconds", uploadTry, uploadDuration.Seconds(), overallDuration.Seconds()) + err = waitForPVCUploadComplete(pvcName, log) + if err != nil { + err = fmt.Errorf("RolloutDiskToPVC: error wait for PVC %v", err) + log.Error(err) + return err + } + return nil + } + return fmt.Errorf("RolloutDiskToPVC attempts to upload image failed") } // GetPVFromPVC : Returns volume name (PV) from the PVC name diff --git a/pkg/pillar/volumehandlers/csihandler.go b/pkg/pillar/volumehandlers/csihandler.go index 3f82c24868..fc720bec96 100644 --- a/pkg/pillar/volumehandlers/csihandler.go +++ b/pkg/pillar/volumehandlers/csihandler.go @@ -174,7 +174,7 @@ func (handler *volumeHandlerCSI) CreateVolume() (string, error) { } // Convert to PVC - pvcerr := kubeapi.RolloutDiskToPVC(createContext, handler.log, false, rawImgFile, pvcName, false) + pvcerr := kubeapi.RolloutDiskToPVC(createContext, handler.log, false, rawImgFile, pvcName, false, pvcSize) // Since we succeeded or failed to create PVC above, no point in keeping the rawImgFile. // Delete it to save space. @@ -199,7 +199,7 @@ func (handler *volumeHandlerCSI) CreateVolume() (string, error) { return pvcName, errors.New(errStr) } // Convert qcow2 to PVC - err = kubeapi.RolloutDiskToPVC(createContext, handler.log, false, qcowFile, pvcName, false) + err = kubeapi.RolloutDiskToPVC(createContext, handler.log, false, qcowFile, pvcName, false, pvcSize) if err != nil { errStr := fmt.Sprintf("Error converting %s to PVC %s: %v", @@ -239,6 +239,7 @@ func (handler *volumeHandlerCSI) DestroyVolume() (string, error) { func (handler *volumeHandlerCSI) Populate() (bool, error) { pvcName := handler.status.GetPVCName() + handler.status.FileLocation = pvcName handler.log.Noticef("Populate called for PVC %s", pvcName) _, err := kubeapi.FindPVC(pvcName, handler.log) if err != nil {