Skip to content

Commit

Permalink
Kubevirt: cdiupload retry, thin metrics
Browse files Browse the repository at this point in the history
- Implement retries of CDI upload to pvc to handle
intermittent timeouts to k8s api.
- Switch kubevirt memory metric as "_total" suffix
was removed from "kubevirt_vmi_memory_domain_bytes"
- Create DiskMetric with names for sdX and pvc name to help
match them in system debug.
- Fix AppDiskMetric thin allocation reporting by moving
csihandler GetVolumeDetails to reading allocated space
from lh volume object and Populate() filling in FileLocation.
- syscall.Stat.Blocks are always reported as 512 size.
(libc struct stat st_blocks)  Don't use stat.Blksize which
can return 4k and overinflate sizes.
- Use thin allocation check when dir has prefix of
types.VolumeEncryptedDirName or types.VolumeClearDirName
to handle subdirs like kubevirt /persist/vault/volumes/replicas/...

Signed-off-by: Andrew Durbin <andrewd@zededa.com>
  • Loading branch information
andrewd-zededa committed Jan 31, 2025
1 parent abf4f4f commit 43665e9
Show file tree
Hide file tree
Showing 13 changed files with 277 additions and 92 deletions.
7 changes: 5 additions & 2 deletions pkg/pillar/cmd/volumemgr/handlediskmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func diskMetricsTimerTask(ctx *volumemgrContext, handleChannel chan interface{})
case <-diskMetricTicker.C:
start := time.Now()
createOrUpdateDiskMetrics(ctx, wdName)
createOrUpdatePvcDiskMetrics(ctx)
generateAndPublishVolumeMgrStatus(ctx)
ctx.ps.CheckMaxTimeTopic(wdName, "createOrUpdateDiskMetrics", start,
warningTime, errorTime)
Expand Down Expand Up @@ -309,13 +310,13 @@ func createOrUpdateDiskMetrics(ctx *volumemgrContext, wdName string) {
publishDiskMetrics(ctx, diskMetricList...)
for _, volumeStatus := range getAllVolumeStatus(ctx) {
ctx.ps.StillRunning(wdName, warningTime, errorTime)
if err := createOrUpdateAppDiskMetrics(ctx, volumeStatus); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, wdName, volumeStatus); err != nil {
log.Errorf("CreateOrUpdateCommonDiskMetrics: exception while publishing diskmetric. %s", err.Error())
}
}
}

func createOrUpdateAppDiskMetrics(ctx *volumemgrContext, volumeStatus *types.VolumeStatus) error {
func createOrUpdateAppDiskMetrics(ctx *volumemgrContext, wdName string, volumeStatus *types.VolumeStatus) error {
log.Functionf("createOrUpdateAppDiskMetrics(%s, %s)", volumeStatus.VolumeID, volumeStatus.FileLocation)
if volumeStatus.FileLocation == "" {
if !ctx.hvTypeKube {
Expand All @@ -326,6 +327,8 @@ func createOrUpdateAppDiskMetrics(ctx *volumemgrContext, volumeStatus *types.Vol
volumeStatus.FileLocation = volumeStatus.GetPVCName()
}
}
// Some handlers (csi) can have http timeouts, update the watchdog
ctx.ps.StillRunning(wdName, warningTime, errorTime)
actualSize, maxSize, diskType, dirtyFlag, err := volumehandlers.GetVolumeHandler(log, ctx, volumeStatus).GetVolumeDetails()
if err != nil {
err = fmt.Errorf("createOrUpdateAppDiskMetrics(%s, %s): exception while getting volume size. %s",
Expand Down
66 changes: 66 additions & 0 deletions pkg/pillar/cmd/volumemgr/handlepvcdiskmetrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright (c) 2024 Zededa, Inc.
// SPDX-License-Identifier: Apache-2.0

//go:build kubevirt

package volumemgr

import (
"github.com/lf-edge/eve/pkg/pillar/kubeapi"
"github.com/lf-edge/eve/pkg/pillar/types"
)

// createOrUpdatePvcDiskMetrics creates or updates metrics for all kubevirt PVCs
// PVC mknod will match one of existing sdX devices, create copies for convenience
func createOrUpdatePvcDiskMetrics(ctx *volumemgrContext) {
log.Functionf("createOrUpdatePvcDiskMetrics")
var diskMetricList []*types.DiskMetric

sdMajMinToNameMap, _, _ := kubeapi.SCSIGetMajMinMaps()
_, pvNameToMajMin, _ := kubeapi.LonghornGetMajorMinorMaps()
_, pvcToPvMap, _ := kubeapi.PvPvcMaps()

kubeapi.CleanupUnmountedDiskMetrics(ctx.pubDiskMetric, pvcToPvMap)

for pvcName, pvName := range pvcToPvMap {
// pv-name will be of format "pvc-<uuid>"
// pvc-name will be of format "<uuid>-pvc-0"
// pvc-name uuid prefix will show in VolumeStatus
// full pvc-name will be in VolumeStatus.FileLocation

if pvName == "" {
continue
}

pvMajMinStr, ok := pvNameToMajMin[pvName]
if !ok {
continue
}

sdName, ok := sdMajMinToNameMap[pvMajMinStr]
if !ok {
continue
}

var metric *types.DiskMetric
metric = lookupDiskMetric(ctx, sdName)
if metric == nil {
continue
}

pvcMetric := lookupDiskMetric(ctx, sdName+"-"+pvcName)
if pvcMetric == nil {
pvcMetric = &types.DiskMetric{DiskPath: sdName + "-" + pvcName, IsDir: false}
}
pvcMetric.ReadBytes = metric.ReadBytes
pvcMetric.WriteBytes = metric.WriteBytes
pvcMetric.ReadCount = metric.ReadCount
pvcMetric.WriteCount = metric.WriteCount
pvcMetric.TotalBytes = metric.TotalBytes
pvcMetric.UsedBytes = metric.UsedBytes
pvcMetric.FreeBytes = metric.FreeBytes
pvcMetric.IsDir = false
diskMetricList = append(diskMetricList, pvcMetric)
}
publishDiskMetrics(ctx, diskMetricList...)
}
12 changes: 6 additions & 6 deletions pkg/pillar/cmd/volumemgr/handlevolume.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func handleVolumeModify(ctxArg interface{}, key string,
status.SetError(errStr, time.Now())
publishVolumeStatus(ctx, status)
updateVolumeRefStatus(ctx, status)
if err := createOrUpdateAppDiskMetrics(ctx, status); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, agentName, status); err != nil {
log.Errorf("handleVolumeModify(%s): exception while publishing diskmetric. %s", key, err.Error())
}
return
Expand All @@ -66,7 +66,7 @@ func handleVolumeModify(ctxArg interface{}, key string,
updateVolumeStatusRefCount(ctx, status)
publishVolumeStatus(ctx, status)
updateVolumeRefStatus(ctx, status)
if err := createOrUpdateAppDiskMetrics(ctx, status); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, agentName, status); err != nil {
log.Errorf("handleVolumeModify(%s): exception while publishing diskmetric. %s", key, err.Error())
}
}
Expand Down Expand Up @@ -135,7 +135,7 @@ func handleDeferredVolumeCreate(ctx *volumemgrContext, key string, config *types
status.SetError(err.Error(), time.Now())
publishVolumeStatus(ctx, status)
updateVolumeRefStatus(ctx, status)
if err := createOrUpdateAppDiskMetrics(ctx, status); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, agentName, status); err != nil {
log.Errorf("handleDeferredVolumeCreate(%s): exception while publishing diskmetric. %s", key, err.Error())
}
return
Expand Down Expand Up @@ -175,7 +175,7 @@ func handleDeferredVolumeCreate(ctx *volumemgrContext, key string, config *types
}
publishVolumeStatus(ctx, status)
updateVolumeRefStatus(ctx, status)
if err := createOrUpdateAppDiskMetrics(ctx, status); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, agentName, status); err != nil {
log.Errorf("handleDeferredVolumeCreate(%s): exception while publishing diskmetric. %s", key, err.Error())
}
return
Expand All @@ -186,7 +186,7 @@ func handleDeferredVolumeCreate(ctx *volumemgrContext, key string, config *types
publishVolumeStatus(ctx, status)
updateVolumeRefStatus(ctx, status)
}
if err := createOrUpdateAppDiskMetrics(ctx, status); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, agentName, status); err != nil {
log.Errorf("handleDeferredVolumeCreate(%s): exception while publishing diskmetric. %s", key, err.Error())
}
log.Tracef("handleDeferredVolumeCreate(%s) done", key)
Expand Down Expand Up @@ -335,7 +335,7 @@ func maybeSpaceAvailable(ctx *volumemgrContext) {
if changed {
publishVolumeStatus(ctx, &status)
updateVolumeRefStatus(ctx, &status)
if err := createOrUpdateAppDiskMetrics(ctx, &status); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, agentName, &status); err != nil {
log.Errorf("maybeSpaceAvailable(%s): exception while publishing diskmetric. %s", status.Key(), err.Error())
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/pillar/cmd/volumemgr/handlevolumeref.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func handleVolumeRefCreate(ctxArg interface{}, key string,
if changed {
publishVolumeStatus(ctx, vs)
updateVolumeRefStatus(ctx, vs)
if err := createOrUpdateAppDiskMetrics(ctx, vs); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, agentName, vs); err != nil {
log.Errorf("handleVolumeRefCreate(%s): exception while publishing diskmetric. %s",
status.Key(), err.Error())
}
Expand Down Expand Up @@ -95,7 +95,7 @@ func handleVolumeRefModify(ctxArg interface{}, key string,
if changed {
publishVolumeStatus(ctx, vs)
updateVolumeRefStatus(ctx, vs)
if err := createOrUpdateAppDiskMetrics(ctx, vs); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, agentName, vs); err != nil {
log.Errorf("handleVolumeRefModify(%s): exception while publishing diskmetric. %s",
status.Key(), err.Error())
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/pillar/cmd/volumemgr/nokube.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (c) 2025 Zededa, Inc.
// SPDX-License-Identifier: Apache-2.0

//go:build !kubevirt

package volumemgr

// createOrUpdatePvcDiskMetrics has no work in non kubevirt builds
func createOrUpdatePvcDiskMetrics(*volumemgrContext) {
return
}
4 changes: 2 additions & 2 deletions pkg/pillar/cmd/volumemgr/updatestatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,7 @@ func updateVolumeStatus(ctx *volumemgrContext, volumeID uuid.UUID) bool {
publishVolumeStatus(ctx, &status)
updateVolumeRefStatus(ctx, &status)
}
if err := createOrUpdateAppDiskMetrics(ctx, &status); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, agentName, &status); err != nil {
log.Errorf("updateVolumeStatus(%s): exception while publishing diskmetric. %s",
status.Key(), err.Error())
}
Expand Down Expand Up @@ -948,7 +948,7 @@ func updateVolumeStatusFromContentID(ctx *volumemgrContext, contentID uuid.UUID)
if changed {
publishVolumeStatus(ctx, &status)
updateVolumeRefStatus(ctx, &status)
if err := createOrUpdateAppDiskMetrics(ctx, &status); err != nil {
if err := createOrUpdateAppDiskMetrics(ctx, agentName, &status); err != nil {
log.Errorf("updateVolumeStatus(%s): exception while publishing diskmetric. %s",
status.Key(), err.Error())
}
Expand Down
35 changes: 19 additions & 16 deletions pkg/pillar/cmd/zfsmanager/zfsstoragemetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

libzfs "github.com/andrewd-zededa/go-libzfs"
"github.com/lf-edge/eve/pkg/pillar/base"
"github.com/lf-edge/eve/pkg/pillar/types"
"github.com/lf-edge/eve/pkg/pillar/utils/persist"
"github.com/lf-edge/eve/pkg/pillar/zfs"
Expand Down Expand Up @@ -49,23 +50,25 @@ func collectAndPublishStorageMetrics(ctxPtr *zfsContext) {

zfsPoolMetrics := zfs.GetZpoolMetrics(vdevs)

// Fill metrics for zvols
for _, vs := range ctxPtr.subVolumeStatus.GetAll() {
volumeStatus := vs.(types.VolumeStatus)
if volumeStatus.State < types.CREATING_VOLUME {
// we did not go to creating of volume, nothing to measure
continue
if !base.IsHVTypeKube() {
// Fill metrics for zvols
for _, vs := range ctxPtr.subVolumeStatus.GetAll() {
volumeStatus := vs.(types.VolumeStatus)
if volumeStatus.State < types.CREATING_VOLUME {
// we did not go to creating of volume, nothing to measure
continue
}
if !volumeStatus.UseZVolDisk(persist.ReadPersistType()) {
// we do not create zvol for that volumeStatus
continue
}
zVolMetric, err := zfs.GetZvolMetrics(volumeStatus, zfsPoolMetrics.PoolName)
if err != nil {
// It is possible that the logical volume belongs to another zpool
continue
}
zfsPoolMetrics.ZVols = append(zfsPoolMetrics.ZVols, zVolMetric)
}
if !volumeStatus.UseZVolDisk(persist.ReadPersistType()) {
// we do not create zvol for that volumeStatus
continue
}
zVolMetric, err := zfs.GetZvolMetrics(volumeStatus, zfsPoolMetrics.PoolName)
if err != nil {
// It is possible that the logical volume belongs to another zpool
continue
}
zfsPoolMetrics.ZVols = append(zfsPoolMetrics.ZVols, zVolMetric)
}

if err := ctxPtr.storageMetricsPub.Publish(zfsPoolMetrics.Key(), *zfsPoolMetrics); err != nil {
Expand Down
6 changes: 4 additions & 2 deletions pkg/pillar/diskmetrics/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ func StatAllocatedBytes(path string) (uint64, error) {
if err != nil {
return uint64(0), err
}
return uint64(stat.Blocks * int64(stat.Blksize)), nil
// stat.Blocks is always 512-byte blocks
return uint64(stat.Blocks * 512), nil
}

// SizeFromDir performs a du -s equivalent operation.
Expand Down Expand Up @@ -82,7 +83,8 @@ func SizeFromDir(log *base.LogObject, dirname string) (uint64, error) {
// will be in the clear and vault volumes base directories so a lot of compute time
// can be saved by not checking detailed allocated bytes information in deeper
// directories.
if dirname == types.VolumeEncryptedDirName || dirname == types.VolumeClearDirName {
if strings.HasPrefix(dirname, types.VolumeEncryptedDirName) ||
strings.HasPrefix(dirname, types.VolumeClearDirName) {
// FileInfo.Size() returns the provisioned size
// Sparse files will have a smaller allocated size than provisioned
// Use full syscall.Stat_t to get the allocated size
Expand Down
9 changes: 8 additions & 1 deletion pkg/pillar/hypervisor/kubevirt.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,17 @@ func (metrics *kubevirtMetrics) fill(domainName, metricName string, value interf
cpuNs := assignToInt64(value) * int64(time.Second)
r.CPUTotalNs = r.CPUTotalNs + uint64(cpuNs)
case "kubevirt_vmi_memory_usable_bytes":
// The amount of memory which can be reclaimed by balloon without pushing the guest system to swap,
// corresponds to ‘Available’ in /proc/meminfo
// https://kubevirt.io/monitoring/metrics.html#kubevirt
r.AvailableMemory = uint32(assignToInt64(value)) / BytesInMegabyte
case "kubevirt_vmi_memory_domain_bytes_total":
case "kubevirt_vmi_memory_domain_bytes":
// The amount of memory in bytes allocated to the domain.
// https://kubevirt.io/monitoring/metrics.html#kubevirt
r.AllocatedMB = uint32(assignToInt64(value)) / BytesInMegabyte
case "kubevirt_vmi_memory_available_bytes": // save this temp for later
// Amount of usable memory as seen by the domain.
// https://kubevirt.io/monitoring/metrics.html#kubevirt
r.UsedMemory = uint32(assignToInt64(value)) / BytesInMegabyte
default:
}
Expand Down
69 changes: 69 additions & 0 deletions pkg/pillar/kubeapi/cdiupload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) 2025 Zededa, Inc.
// SPDX-License-Identifier: Apache-2.0

//go:build kubevirt

package kubeapi

import (
"context"
"fmt"
"time"

"github.com/lf-edge/eve/pkg/pillar/base"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// waitForPVCUploadComplete: Loop until PVC upload annotations show upload complete
func waitForPVCUploadComplete(pvcName string, log *base.LogObject) error {
clientset, err := GetClientSet()
if err != nil {
log.Errorf("waitForPVCUploadComplete failed to get clientset err %v", err)
return err
}

i := 100
for i > 0 {
i--
time.Sleep(5 * time.Second)
pvc, err := clientset.CoreV1().PersistentVolumeClaims(EVEKubeNameSpace).
Get(context.Background(), pvcName, metav1.GetOptions{})
if err != nil {
log.Errorf("waitForPVCUploadComplete failed to get pvc info err %v", err)
continue
}
if cdiUploadIsComplete(log, pvc) {
return nil
}
}

return fmt.Errorf("waitForPVCUploadComplete: time expired")
}

func cdiUploadIsComplete(log *base.LogObject, pvc *corev1.PersistentVolumeClaim) bool {
annotationKey := "cdi.kubevirt.io/storage.pod.phase"
annotationExpectedVal := "Succeeded"
foundVal, ok := pvc.Annotations[annotationKey]
if !ok {
log.Errorf("pvc %s annotation %s is missing", pvc.Name, annotationKey)
return false
}
if foundVal != annotationExpectedVal {
log.Warnf("pvc %s annotation %s is %s, waiting for %s", pvc.Name, annotationKey, foundVal, annotationExpectedVal)
return false
}

annotationKey = "cdi.kubevirt.io/storage.condition.running.message"
annotationExpectedVal = "Upload Complete"
foundVal, ok = pvc.Annotations[annotationKey]
if !ok {
log.Errorf("pvc %s annotation %s is missing", pvc.Name, annotationKey)
return false
}
if foundVal != annotationExpectedVal {
log.Warnf("pvc %s annotation %s is %s, waiting for %s", pvc.Name, annotationKey, foundVal, annotationExpectedVal)
return false
}
return true
}
26 changes: 26 additions & 0 deletions pkg/pillar/kubeapi/longhorninfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,38 @@ import (
"context"
"fmt"
"strings"
"time"

lhv1beta2 "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2"
"github.com/longhorn/longhorn-manager/k8s/pkg/client/clientset/versioned"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// LonghornVolumeSizeDetails returns the provisionedBytes and allocatedBytes size values for a longhorn volume
func LonghornVolumeSizeDetails(longhornVolumeName string) (provisionedBytes uint64, allocatedBytes uint64, err error) {
config, err := GetKubeConfig()
if err != nil {
return 0, 0, fmt.Errorf("LonghornVolumeSizeDetails can't get kubeconfig %v", err)
}

lhClient, err := versioned.NewForConfig(config)
if err != nil {
return 0, 0, fmt.Errorf("LonghornVolumeSizeDetails can't get versioned config: %v", err)
}

// Don't allow a k8s api timeout keep us waiting forever, set this one explicitly as its used in metrics path
shortContext, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

lhVol, err := lhClient.LonghornV1beta2().Volumes("longhorn-system").Get(shortContext, longhornVolumeName, metav1.GetOptions{})
if err != nil || lhVol == nil {
return 0, 0, fmt.Errorf("LonghornVolumeSizeDetails can't get lh vol err:%v", err)
}

return uint64(lhVol.Spec.Size), uint64(lhVol.Status.ActualSize), nil
}

// LonghornReplicaList returns the replica for a given longhorn volume which is hosted on a given kubernetes node
func LonghornReplicaList(ownerNodeName string, longhornVolName string) (*lhv1beta2.ReplicaList, error) {
config, err := GetKubeConfig()
if err != nil {
Expand Down
Loading

0 comments on commit 43665e9

Please sign in to comment.