Skip to content

Commit

Permalink
Kubevirt: cdiupload retry, thin metrics
Browse files Browse the repository at this point in the history
- Implement retries of CDI upload to pvc to handle
intermittent timeouts to k8s api.
- Switch kubevirt memory metric as "_total" suffix
was removed from "kubevirt_vmi_memory_domain_bytes"
- Create DiskMetric with names for sdX and pvc name to help
match them in system debug.
- Fix AppDiskMetric thin allocation reporting by moving
csihandler GetVolumeDetails to reading allocated space
from lh volume object and Populate() filling in FileLocation.
- Use thin allocation check when dir has prefix of
types.VolumeEncryptedDirName or types.VolumeClearDirName
to handle subdirs like kubevirt /persist/vault/volumes/replicas/...
- waitForPVCUploadComplete needs to use caller's context
to allow quicker exit if volume create is cancelled

Signed-off-by: Andrew Durbin <andrewd@zededa.com>
  • Loading branch information
andrewd-zededa authored and eriknordmark committed Feb 7, 2025
1 parent 15b6b30 commit 15144fd
Show file tree
Hide file tree
Showing 13 changed files with 302 additions and 96 deletions.
7 changes: 5 additions & 2 deletions pkg/pillar/cmd/volumemgr/handlediskmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func diskMetricsTimerTask(ctx *volumemgrContext, handleChannel chan interface{})
case <-diskMetricTicker.C:
start := time.Now()
createOrUpdateDiskMetrics(ctx, wdName)
createOrUpdatePvcDiskMetrics(ctx)
generateAndPublishVolumeMgrStatus(ctx)
ctx.ps.CheckMaxTimeTopic(wdName, "createOrUpdateDiskMetrics", start,
warningTime, errorTime)
Expand Down Expand Up @@ -309,13 +310,13 @@ func createOrUpdateDiskMetrics(ctx *volumemgrContext, wdName string) {
publishDiskMetrics(ctx, diskMetricList...)
for _, volumeStatus := range getAllVolumeStatus(ctx) {
ctx.ps.StillRunning(wdName, warningTime, errorTime)
if err := createOrUpdateAppDiskMetrics(ctx, volumeStatus); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, wdName, volumeStatus); err != nil {
log.Errorf("CreateOrUpdateCommonDiskMetrics: exception while publishing diskmetric. %s", err.Error())
}
}
}

func createOrUpdateAppDiskMetrics(ctx *volumemgrContext, volumeStatus *types.VolumeStatus) error {
func createOrUpdateAppDiskMetrics(ctx *volumemgrContext, wdName string, volumeStatus *types.VolumeStatus) error {
log.Functionf("createOrUpdateAppDiskMetrics(%s, %s)", volumeStatus.VolumeID, volumeStatus.FileLocation)
if volumeStatus.FileLocation == "" {
if !ctx.hvTypeKube {
Expand All @@ -326,6 +327,8 @@ func createOrUpdateAppDiskMetrics(ctx *volumemgrContext, volumeStatus *types.Vol
volumeStatus.FileLocation = volumeStatus.GetPVCName()
}
}
// Some handlers (csi) can have http timeouts, update the watchdog
ctx.ps.StillRunning(wdName, warningTime, errorTime)
actualSize, maxSize, diskType, dirtyFlag, err := volumehandlers.GetVolumeHandler(log, ctx, volumeStatus).GetVolumeDetails()
if err != nil {
err = fmt.Errorf("createOrUpdateAppDiskMetrics(%s, %s): exception while getting volume size. %s",
Expand Down
78 changes: 78 additions & 0 deletions pkg/pillar/cmd/volumemgr/handlepvcdiskmetrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright (c) 2025 Zededa, Inc.
// SPDX-License-Identifier: Apache-2.0

//go:build kubevirt

package volumemgr

import (
"github.com/lf-edge/eve/pkg/pillar/kubeapi"
"github.com/lf-edge/eve/pkg/pillar/types"
)

// createOrUpdatePvcDiskMetrics creates or updates metrics for all kubevirt PVCs
// PVC mknod will match one of existing sdX devices, create copies for convenience
func createOrUpdatePvcDiskMetrics(ctx *volumemgrContext) {
log.Functionf("createOrUpdatePvcDiskMetrics")
var diskMetricList []*types.DiskMetric

sdMajMinToNameMap, _, err := kubeapi.SCSIGetMajMinMaps()
if err != nil {
log.Errorf("Failed to get SCSI device maps: %v", err)
return
}
_, pvNameToMajMin, err := kubeapi.LonghornGetMajorMinorMaps()
if err != nil {
log.Errorf("Failed to get Longhorn device maps: %v", err)
return
}
_, pvcToPvMap, err := kubeapi.PvPvcMaps()
if err != nil {
log.Errorf("Failed to get PVC-PV maps: %v", err)
return
}

kubeapi.CleanupDetachedDiskMetrics(ctx.pubDiskMetric, pvcToPvMap)

for pvcName, pvName := range pvcToPvMap {
// pv-name will be of format "pvc-<uuid>"
// pvc-name will be of format "<uuid>-pvc-0"
// pvc-name uuid prefix will show in VolumeStatus
// full pvc-name will be in VolumeStatus.FileLocation

if pvName == "" {
continue
}

pvMajMinStr, ok := pvNameToMajMin[pvName]
if !ok {
continue
}

sdName, ok := sdMajMinToNameMap[pvMajMinStr]
if !ok {
continue
}

var metric *types.DiskMetric
metric = lookupDiskMetric(ctx, sdName)
if metric == nil {
continue
}

pvcMetric := lookupDiskMetric(ctx, sdName+"-"+pvcName)
if pvcMetric == nil {
pvcMetric = &types.DiskMetric{DiskPath: sdName + "-" + pvcName, IsDir: false}
}
pvcMetric.ReadBytes = metric.ReadBytes
pvcMetric.WriteBytes = metric.WriteBytes
pvcMetric.ReadCount = metric.ReadCount
pvcMetric.WriteCount = metric.WriteCount
pvcMetric.TotalBytes = metric.TotalBytes
pvcMetric.UsedBytes = metric.UsedBytes
pvcMetric.FreeBytes = metric.FreeBytes
pvcMetric.IsDir = false
diskMetricList = append(diskMetricList, pvcMetric)
}
publishDiskMetrics(ctx, diskMetricList...)
}
12 changes: 6 additions & 6 deletions pkg/pillar/cmd/volumemgr/handlevolume.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func handleVolumeModify(ctxArg interface{}, key string,
status.SetError(errStr, time.Now())
publishVolumeStatus(ctx, status)
updateVolumeRefStatus(ctx, status)
if err := createOrUpdateAppDiskMetrics(ctx, status); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, agentName, status); err != nil {
log.Errorf("handleVolumeModify(%s): exception while publishing diskmetric. %s", key, err.Error())
}
return
Expand All @@ -66,7 +66,7 @@ func handleVolumeModify(ctxArg interface{}, key string,
updateVolumeStatusRefCount(ctx, status)
publishVolumeStatus(ctx, status)
updateVolumeRefStatus(ctx, status)
if err := createOrUpdateAppDiskMetrics(ctx, status); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, agentName, status); err != nil {
log.Errorf("handleVolumeModify(%s): exception while publishing diskmetric. %s", key, err.Error())
}
}
Expand Down Expand Up @@ -135,7 +135,7 @@ func handleDeferredVolumeCreate(ctx *volumemgrContext, key string, config *types
status.SetError(err.Error(), time.Now())
publishVolumeStatus(ctx, status)
updateVolumeRefStatus(ctx, status)
if err := createOrUpdateAppDiskMetrics(ctx, status); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, agentName, status); err != nil {
log.Errorf("handleDeferredVolumeCreate(%s): exception while publishing diskmetric. %s", key, err.Error())
}
return
Expand Down Expand Up @@ -175,7 +175,7 @@ func handleDeferredVolumeCreate(ctx *volumemgrContext, key string, config *types
}
publishVolumeStatus(ctx, status)
updateVolumeRefStatus(ctx, status)
if err := createOrUpdateAppDiskMetrics(ctx, status); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, agentName, status); err != nil {
log.Errorf("handleDeferredVolumeCreate(%s): exception while publishing diskmetric. %s", key, err.Error())
}
return
Expand All @@ -186,7 +186,7 @@ func handleDeferredVolumeCreate(ctx *volumemgrContext, key string, config *types
publishVolumeStatus(ctx, status)
updateVolumeRefStatus(ctx, status)
}
if err := createOrUpdateAppDiskMetrics(ctx, status); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, agentName, status); err != nil {
log.Errorf("handleDeferredVolumeCreate(%s): exception while publishing diskmetric. %s", key, err.Error())
}
log.Tracef("handleDeferredVolumeCreate(%s) done", key)
Expand Down Expand Up @@ -335,7 +335,7 @@ func maybeSpaceAvailable(ctx *volumemgrContext) {
if changed {
publishVolumeStatus(ctx, &status)
updateVolumeRefStatus(ctx, &status)
if err := createOrUpdateAppDiskMetrics(ctx, &status); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, agentName, &status); err != nil {
log.Errorf("maybeSpaceAvailable(%s): exception while publishing diskmetric. %s", status.Key(), err.Error())
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/pillar/cmd/volumemgr/handlevolumeref.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func handleVolumeRefCreate(ctxArg interface{}, key string,
if changed {
publishVolumeStatus(ctx, vs)
updateVolumeRefStatus(ctx, vs)
if err := createOrUpdateAppDiskMetrics(ctx, vs); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, agentName, vs); err != nil {
log.Errorf("handleVolumeRefCreate(%s): exception while publishing diskmetric. %s",
status.Key(), err.Error())
}
Expand Down Expand Up @@ -95,7 +95,7 @@ func handleVolumeRefModify(ctxArg interface{}, key string,
if changed {
publishVolumeStatus(ctx, vs)
updateVolumeRefStatus(ctx, vs)
if err := createOrUpdateAppDiskMetrics(ctx, vs); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, agentName, vs); err != nil {
log.Errorf("handleVolumeRefModify(%s): exception while publishing diskmetric. %s",
status.Key(), err.Error())
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/pillar/cmd/volumemgr/nokube.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (c) 2025 Zededa, Inc.
// SPDX-License-Identifier: Apache-2.0

//go:build !kubevirt

package volumemgr

// createOrUpdatePvcDiskMetrics has no work in non kubevirt builds
func createOrUpdatePvcDiskMetrics(*volumemgrContext) {
return
}
4 changes: 2 additions & 2 deletions pkg/pillar/cmd/volumemgr/updatestatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,7 @@ func updateVolumeStatus(ctx *volumemgrContext, volumeID uuid.UUID) bool {
publishVolumeStatus(ctx, &status)
updateVolumeRefStatus(ctx, &status)
}
if err := createOrUpdateAppDiskMetrics(ctx, &status); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, agentName, &status); err != nil {
log.Errorf("updateVolumeStatus(%s): exception while publishing diskmetric. %s",
status.Key(), err.Error())
}
Expand Down Expand Up @@ -948,7 +948,7 @@ func updateVolumeStatusFromContentID(ctx *volumemgrContext, contentID uuid.UUID)
if changed {
publishVolumeStatus(ctx, &status)
updateVolumeRefStatus(ctx, &status)
if err := createOrUpdateAppDiskMetrics(ctx, &status); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, agentName, &status); err != nil {
log.Errorf("updateVolumeStatus(%s): exception while publishing diskmetric. %s",
status.Key(), err.Error())
}
Expand Down
35 changes: 19 additions & 16 deletions pkg/pillar/cmd/zfsmanager/zfsstoragemetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

libzfs "github.com/andrewd-zededa/go-libzfs"
"github.com/lf-edge/eve/pkg/pillar/base"
"github.com/lf-edge/eve/pkg/pillar/types"
"github.com/lf-edge/eve/pkg/pillar/utils/persist"
"github.com/lf-edge/eve/pkg/pillar/zfs"
Expand Down Expand Up @@ -49,23 +50,25 @@ func collectAndPublishStorageMetrics(ctxPtr *zfsContext) {

zfsPoolMetrics := zfs.GetZpoolMetrics(vdevs)

// Fill metrics for zvols
for _, vs := range ctxPtr.subVolumeStatus.GetAll() {
volumeStatus := vs.(types.VolumeStatus)
if volumeStatus.State < types.CREATING_VOLUME {
// we did not go to creating of volume, nothing to measure
continue
if !base.IsHVTypeKube() {
// Fill metrics for zvols
for _, vs := range ctxPtr.subVolumeStatus.GetAll() {
volumeStatus := vs.(types.VolumeStatus)
if volumeStatus.State < types.CREATING_VOLUME {
// we did not go to creating of volume, nothing to measure
continue
}
if !volumeStatus.UseZVolDisk(persist.ReadPersistType()) {
// we do not create zvol for that volumeStatus
continue
}
zVolMetric, err := zfs.GetZvolMetrics(volumeStatus, zfsPoolMetrics.PoolName)
if err != nil {
// It is possible that the logical volume belongs to another zpool
continue
}
zfsPoolMetrics.ZVols = append(zfsPoolMetrics.ZVols, zVolMetric)
}
if !volumeStatus.UseZVolDisk(persist.ReadPersistType()) {
// we do not create zvol for that volumeStatus
continue
}
zVolMetric, err := zfs.GetZvolMetrics(volumeStatus, zfsPoolMetrics.PoolName)
if err != nil {
// It is possible that the logical volume belongs to another zpool
continue
}
zfsPoolMetrics.ZVols = append(zfsPoolMetrics.ZVols, zVolMetric)
}

if err := ctxPtr.storageMetricsPub.Publish(zfsPoolMetrics.Key(), *zfsPoolMetrics); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/pillar/diskmetrics/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ func SizeFromDir(log *base.LogObject, dirname string) (uint64, error) {
// will be in the clear and vault volumes base directories so a lot of compute time
// can be saved by not checking detailed allocated bytes information in deeper
// directories.
if dirname == types.VolumeEncryptedDirName || dirname == types.VolumeClearDirName {
if strings.HasPrefix(dirname, types.VolumeEncryptedDirName) ||
strings.HasPrefix(dirname, types.VolumeClearDirName) {
// FileInfo.Size() returns the provisioned size
// Sparse files will have a smaller allocated size than provisioned
// Use full syscall.Stat_t to get the allocated size
Expand Down
17 changes: 12 additions & 5 deletions pkg/pillar/hypervisor/kubevirt.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (metrics *kubevirtMetrics) fill(domainName, metricName string, value interf
return
}

BytesInMegabyte := uint32(1024 * 1024)
BytesInMegabyte := int64(1024 * 1024)
switch metricName {
// add all the cpus to be Total, seconds should be from VM startup time
case "kubevirt_vmi_cpu_system_usage_seconds":
Expand All @@ -128,11 +128,18 @@ func (metrics *kubevirtMetrics) fill(domainName, metricName string, value interf
cpuNs := assignToInt64(value) * int64(time.Second)
r.CPUTotalNs = r.CPUTotalNs + uint64(cpuNs)
case "kubevirt_vmi_memory_usable_bytes":
r.AvailableMemory = uint32(assignToInt64(value)) / BytesInMegabyte
case "kubevirt_vmi_memory_domain_bytes_total":
r.AllocatedMB = uint32(assignToInt64(value)) / BytesInMegabyte
// The amount of memory which can be reclaimed by balloon without pushing the guest system to swap,
// corresponds to ‘Available’ in /proc/meminfo
// https://kubevirt.io/monitoring/metrics.html#kubevirt
r.AvailableMemory = uint32(assignToInt64(value) / BytesInMegabyte)
case "kubevirt_vmi_memory_domain_bytes":
// The amount of memory in bytes allocated to the domain.
// https://kubevirt.io/monitoring/metrics.html#kubevirt
r.AllocatedMB = uint32(assignToInt64(value) / BytesInMegabyte)
case "kubevirt_vmi_memory_available_bytes": // save this temp for later
r.UsedMemory = uint32(assignToInt64(value)) / BytesInMegabyte
// Amount of usable memory as seen by the domain.
// https://kubevirt.io/monitoring/metrics.html#kubevirt
r.UsedMemory = uint32(assignToInt64(value) / BytesInMegabyte)
default:
}
(*metrics)[domainName] = r
Expand Down
79 changes: 79 additions & 0 deletions pkg/pillar/kubeapi/cdiupload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright (c) 2025 Zededa, Inc.
// SPDX-License-Identifier: Apache-2.0

//go:build kubevirt

package kubeapi

import (
"context"
"fmt"
"time"

"github.com/lf-edge/eve/pkg/pillar/base"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// waitForPVCUploadComplete: Loop until PVC upload annotations show upload complete
// At this time the only caller of this is kubeapi.RolloutDiskToPVC() which runs in the
// volumecreate worker context. This does currently wait up to 5 minutes
// but does not need to bump a watchdog as the worker does not have one.
func waitForPVCUploadComplete(ctx context.Context, pvcName string, log *base.LogObject) error {
clientset, err := GetClientSet()
if err != nil {
log.Errorf("waitForPVCUploadComplete failed to get clientset err %v", err)
return err
}

i := 100
for i > 0 {
i--

select {
case <-ctx.Done():
return fmt.Errorf("waitForPVCUploadComplete: context done")
default:
time.Sleep(5 * time.Second)
}

pvc, err := clientset.CoreV1().PersistentVolumeClaims(EVEKubeNameSpace).
Get(context.Background(), pvcName, metav1.GetOptions{})
if err != nil {
log.Errorf("waitForPVCUploadComplete failed to get pvc info err %v", err)
continue
}
if cdiUploadIsComplete(log, pvc) {
return nil
}
}

return fmt.Errorf("waitForPVCUploadComplete: time expired")
}

func cdiUploadIsComplete(log *base.LogObject, pvc *corev1.PersistentVolumeClaim) bool {
annotationKey := "cdi.kubevirt.io/storage.pod.phase"
annotationExpectedVal := "Succeeded"
foundVal, ok := pvc.Annotations[annotationKey]
if !ok {
log.Errorf("pvc %s annotation %s is missing", pvc.Name, annotationKey)
return false
}
if foundVal != annotationExpectedVal {
log.Warnf("pvc %s annotation %s is %s, waiting for %s", pvc.Name, annotationKey, foundVal, annotationExpectedVal)
return false
}

annotationKey = "cdi.kubevirt.io/storage.condition.running.message"
annotationExpectedVal = "Upload Complete"
foundVal, ok = pvc.Annotations[annotationKey]
if !ok {
log.Errorf("pvc %s annotation %s is missing", pvc.Name, annotationKey)
return false
}
if foundVal != annotationExpectedVal {
log.Warnf("pvc %s annotation %s is %s, waiting for %s", pvc.Name, annotationKey, foundVal, annotationExpectedVal)
return false
}
return true
}
Loading

0 comments on commit 15144fd

Please sign in to comment.