Skip to content

Commit

Permalink
Application Failover support
Browse files Browse the repository at this point in the history
1) Please see the pkg/pillar/docs/failover.md for more details
2) zedagent parses the designated node id in the config
   and sets the corresponding boolean flags in AppInstanceConfig, VolumeConfig and ContentTree Config.
3) Designated node will start the app and publishAppInstanceStatus.
4) Non-designated node will process the config, if content needs to be downloaded it downloads and
   publishAppInstanceStatus, but the AppInstanceStatus.Activated will be false.
4) zedmanager subscribes to ENClusterAppStatus and handles the app schedule and deschedule on that node.
5) zedagent also sets the flag if the app is of native container type.
   Domainmgr launches the native container as a kubernetes pod, that code already exists in domainmgr.
6) zedagent Reports app metrics to controller only if the app is activated on that node.

Signed-off-by: Pramodh Pallapothu <pramodh@zededa.com>
  • Loading branch information
Pramodh Pallapothu authored and eriknordmark committed Jan 28, 2025
1 parent 4b979bf commit df51e7a
Show file tree
Hide file tree
Showing 16 changed files with 371 additions and 40 deletions.
21 changes: 21 additions & 0 deletions pkg/pillar/cmd/volumemgr/handlevolume.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"time"

zconfig "github.com/lf-edge/eve-api/go/config"
"github.com/lf-edge/eve/pkg/pillar/types"
"github.com/lf-edge/eve/pkg/pillar/vault"
"github.com/lf-edge/eve/pkg/pillar/volumehandlers"
Expand Down Expand Up @@ -35,6 +36,7 @@ func handleVolumeModify(ctxArg interface{}, key string,
log.Functionf("handleVolumeModify(%s)", key)
config := configArg.(types.VolumeConfig)
ctx := ctxArg.(*volumemgrContext)

if _, deferred := ctx.volumeConfigCreateDeferredMap[key]; deferred {
//update deferred creation if exists
ctx.volumeConfigCreateDeferredMap[key] = &config
Expand Down Expand Up @@ -77,6 +79,7 @@ func handleVolumeDelete(ctxArg interface{}, key string,
log.Functionf("handleVolumeDelete(%s)", key)
config := configArg.(types.VolumeConfig)
ctx := ctxArg.(*volumemgrContext)

if _, deferred := ctx.volumeConfigCreateDeferredMap[key]; deferred {
//remove deferred creation if exists
delete(ctx.volumeConfigCreateDeferredMap, key)
Expand All @@ -98,6 +101,10 @@ func handleDeferredVolumeCreate(ctx *volumemgrContext, key string, config *types
log.Tracef("handleDeferredVolumeCreate(%s)", key)
status := ctx.LookupVolumeStatus(config.Key())
if status != nil {
if config.IsReplicated {
// Objects are replicated across cluster nodes, just exit.
return
}
log.Fatalf("status exists at handleVolumeCreate for %s", config.Key())
}
status = &types.VolumeStatus{
Expand All @@ -116,6 +123,8 @@ func handleDeferredVolumeCreate(ctx *volumemgrContext, key string, config *types
LastRefCountChangeTime: time.Now(),
LastUse: time.Now(),
State: types.INITIAL,
IsReplicated: config.IsReplicated,
IsNativeContainer: config.IsNativeContainer,
}
updateVolumeStatusRefCount(ctx, status)
log.Noticef("handleDeferredVolumeCreate(%s) setting contentFormat to %s", key, volumeFormat[status.Key()])
Expand Down Expand Up @@ -152,6 +161,18 @@ func handleDeferredVolumeCreate(ctx *volumemgrContext, key string, config *types
status.TotalSize = int64(actualSize)
status.CurrentSize = int64(actualSize)
}

// Fill the ReferenceName which will be used by domainmgr to launch native containers.
ctStatus := ctx.LookupContentTreeStatus(status.ContentID.String())

if ctStatus != nil {
status.ReferenceName = ctStatus.ReferenceID()
// In kubevirt eve though we created PVC from container image, we still set the content format as container.
// This will help domainmgr to load the external boot kernel (support shim VM container)
if ctStatus.Format == zconfig.Format_CONTAINER {
status.ContentFormat = zconfig.Format_CONTAINER
}
}
publishVolumeStatus(ctx, status)
updateVolumeRefStatus(ctx, status)
if err := createOrUpdateAppDiskMetrics(ctx, status); err != nil {
Expand Down
18 changes: 18 additions & 0 deletions pkg/pillar/cmd/zedagent/handlecontent.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,24 @@ func parseContentInfoConfig(ctx *getconfigContext,
contentConfig.MaxDownloadSize = cfgContentTree.GetMaxSizeBytes()
contentConfig.DisplayName = cfgContentTree.GetDisplayName()
contentConfig.CustomMeta = cfgContentTree.GetCustomMetaData()
contentConfig.IsLocal = true
controllerDNID := cfgContentTree.GetDesignatedNodeId()
// If this node is not designated node id set IsLocal to false.
// Content will be downloaded to only to the designated node id of that content tree.
// So on other nodes in the cluster mark the content tree as non-local.
// On single node eve either kvm or kubevirt based, this node will always be designated node.
// But if this is the contenttree of a container, we download it to all nodes of the cluster,
// so set IsLocal true in such case.
if controllerDNID != "" && controllerDNID != devUUID.String() {
if contentConfig.Format == zconfig.Format_CONTAINER {
contentConfig.IsLocal = true
} else {
contentConfig.IsLocal = false
}
}

log.Noticef("parseContentInfo designated ID copy from volume config: %v, contentid %v, url %s", controllerDNID, contentConfig.ContentID, contentConfig.RelativeURL)

publishContentTreeConfig(ctx, *contentConfig)
}
ctx.pubContentTreeConfig.SignalRestarted()
Expand Down
5 changes: 5 additions & 0 deletions pkg/pillar/cmd/zedagent/handlemetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -1040,6 +1040,7 @@ func PublishAppInfoToZedCloud(ctx *zedagentContext, uuid string,

ReportAppInfo.AppID = uuid
ReportAppInfo.SystemApp = false
ReportAppInfo.ClusterAppRunning = false

if aiStatus != nil {
// In cluster mode, if ENClusterAppStatus reports the app is not scheduled on the node,
Expand Down Expand Up @@ -1152,6 +1153,10 @@ func PublishAppInfoToZedCloud(ctx *zedagentContext, uuid string,
snapInfo.SnapErr = encodeErrorInfo(snap.Error)
ReportAppInfo.Snapshots = append(ReportAppInfo.Snapshots, snapInfo)
}

// For Clustered apps on HV=kubevirt, 'ClusterAppRunning' designates
// the app is running on this node either naturally or after some failover event.
ReportAppInfo.ClusterAppRunning = aiStatus.Activated
}

ReportInfo.InfoContent = new(info.ZInfoMsg_Ainfo)
Expand Down
23 changes: 23 additions & 0 deletions pkg/pillar/cmd/zedagent/handlevolume.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,29 @@ func parseVolumeConfig(ctx *getconfigContext,
// Add config submitted via local profile server.
addLocalVolumeConfig(ctx, volumeConfig)

controllerDNID := cfgVolume.GetDesignatedNodeId()
// If this node is designated node id set IsReplicated to false.
// On single node eve either kvm or kubevirt based, this node will always be designated node.
if controllerDNID != "" && controllerDNID != devUUID.String() {
volumeConfig.IsReplicated = true
} else {
volumeConfig.IsReplicated = false
}

// Iterate through appconfig and check if this volume belongs to a native container deployment.
// Looks for NOHYPER type in VirtualizationMode.
appInstanceList := config.GetApps()
for _, ai := range appInstanceList {
if ai.Fixedresources.VirtualizationMode == zconfig.VmMode_NOHYPER {
for _, vr := range ai.VolumeRefList {
if vr.Uuid == volumeConfig.VolumeID.String() && volumeConfig.ContentID != uuid.Nil {
volumeConfig.IsNativeContainer = true
log.Noticef("parseVolumeConfig: setting IsNativeContainer for %s", volumeConfig.VolumeID.String())
break
}
}
}
}
publishVolumeConfig(ctx, *volumeConfig)
}

Expand Down
13 changes: 10 additions & 3 deletions pkg/pillar/cmd/zedagent/parseconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,8 @@ func parseAppInstanceConfig(getconfigCtx *getconfigContext,
appinstancePrevConfigHash, configHash, Apps)
appinstancePrevConfigHash = configHash

devUUIDStr := config.GetId().Uuid

// First look for deleted ones
items := getconfigCtx.pubAppInstanceConfig.GetAll()
for uuidStr := range items {
Expand Down Expand Up @@ -768,9 +770,14 @@ func parseAppInstanceConfig(getconfigCtx *getconfigContext,
// Add config submitted via local profile server.
addLocalAppConfig(getconfigCtx, &appInstance)

// XXX add Designated ID to the appInstance
// XXX Keep this here for now to allow the kubevirt single-node working, the later PR to EVE main will remove this
appInstance.DesignatedNodeID = devUUID
controllerDNID := cfgApp.GetDesignatedNodeId()
// If this node is designated node id set IsDesignatedNodeID to true else false.
// On single node eve either kvm or kubevirt based, this node will always be designated node.
if controllerDNID != "" && controllerDNID != devUUIDStr {
appInstance.IsDesignatedNodeID = false
} else {
appInstance.IsDesignatedNodeID = true
}

// Verify that it fits and if not publish with error
checkAndPublishAppInstanceConfig(getconfigCtx, appInstance)
Expand Down
11 changes: 7 additions & 4 deletions pkg/pillar/cmd/zedagent/reportinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,13 @@ func objectInfoTask(ctxPtr *zedagentContext, triggerInfo <-chan infoForObjectKey
sub := ctxPtr.getconfigCtx.subContentTreeStatus
if c, err = sub.Get(infoForKeyMessage.objectKey); err == nil {
ctStatus := c.(types.ContentTreeStatus)
uuidStr := ctStatus.Key()
PublishContentInfoToZedCloud(ctxPtr, uuidStr, &ctStatus,
ctxPtr.iteration, infoDest)
ctxPtr.iteration++
// We publish the info to zedcloud only if it is a local contenttree
if ctStatus.IsLocal {
uuidStr := ctStatus.Key()
PublishContentInfoToZedCloud(ctxPtr, uuidStr, &ctStatus,
ctxPtr.iteration, infoDest)
ctxPtr.iteration++
}
}
case info.ZInfoTypes_ZiBlobList:
// publish blob info
Expand Down
12 changes: 3 additions & 9 deletions pkg/pillar/cmd/zedkube/applogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/lf-edge/eve/pkg/pillar/base"
"github.com/lf-edge/eve/pkg/pillar/kubeapi"
"github.com/lf-edge/eve/pkg/pillar/types"
uuid "github.com/satori/go.uuid"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -48,7 +47,7 @@ func (z *zedkube) collectAppLogs() {
if aiconfig.FixedResources.VirtualizationMode != types.NOHYPER {
continue
}
if aiconfig.DesignatedNodeID != uuid.Nil && aiconfig.DesignatedNodeID.String() != z.nodeuuid {
if !aiconfig.IsDesignatedNodeID {
continue
}
kubeName := base.GetAppKubeName(aiconfig.DisplayName, aiconfig.UUIDandVersion.UUID)
Expand Down Expand Up @@ -121,11 +120,6 @@ func (z *zedkube) checkAppsStatus() {
return
}

u, err := uuid.FromString(z.nodeuuid)
if err != nil {
return
}

clientset, err := getKubeClientSet()
if err != nil {
log.Errorf("checkAppsStatus: can't get clientset %v", err)
Expand All @@ -146,12 +140,12 @@ func (z *zedkube) checkAppsStatus() {
var oldStatus *types.ENClusterAppStatus
for _, item := range items {
aiconfig := item.(types.AppInstanceConfig)
if aiconfig.DesignatedNodeID == uuid.Nil { // if not for cluster app, skip
if !aiconfig.IsDesignatedNodeID { // if not for cluster app, skip
continue
}
encAppStatus := types.ENClusterAppStatus{
AppUUID: aiconfig.UUIDandVersion.UUID,
IsDNidNode: aiconfig.DesignatedNodeID == u,
IsDNidNode: aiconfig.IsDesignatedNodeID,
}
contName := base.GetAppKubeName(aiconfig.DisplayName, aiconfig.UUIDandVersion.UUID)

Expand Down
9 changes: 5 additions & 4 deletions pkg/pillar/cmd/zedmanager/handleclusterapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,11 @@ func handleENClusterAppStatusImpl(ctx *zedmanagerContext, key string, status *ty
log.Errorf("handleENClusterAppStatusImpl(%s) AppInstanceConfig missing for app", key)
return
}
// XXX this will be handled in later PR in clustering and zedmanager code
//handleCreateAppInstanceStatus(ctx, *aiConfig)
handleCreateAppInstanceStatus(ctx, *aiConfig)
} else {
// re-publish the aiStatus, in case the cluster status has changed.

activateAIStatusUUID(ctx, key)
log.Functionf("handleENClusterAppStatusImpl(%s) for app-status %v aiStatus %v", key, status, aiStatus)
publishAppInstanceStatus(ctx, aiStatus)
return
}
} else { // not scheduled here.
Expand All @@ -54,7 +53,9 @@ func handleENClusterAppStatusImpl(ctx *zedmanagerContext, key string, status *ty
if aiStatus != nil {
// If I am not scheduled here, modify and publish the aiStatus with NoUploadStatsToController set.
publishAppInstanceStatus(ctx, aiStatus)
publishAppInstanceSummary(ctx)
}

}

}
7 changes: 1 addition & 6 deletions pkg/pillar/cmd/zedmanager/handledomainmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"strings"

"github.com/lf-edge/eve/pkg/pillar/types"
uuid "github.com/satori/go.uuid"
)

const (
Expand Down Expand Up @@ -48,10 +47,6 @@ func MaybeAddDomainConfig(ctx *zedmanagerContext,
AppNum = ns.AppNum
}

isDNiDnode := false
if aiConfig.DesignatedNodeID != uuid.Nil && aiConfig.DesignatedNodeID == ctx.nodeUUID {
isDNiDnode = true
}
effectiveActivate := effectiveActivateCombined(aiConfig, ctx)
dc := types.DomainConfig{
UUIDandVersion: aiConfig.UUIDandVersion,
Expand All @@ -68,7 +63,7 @@ func MaybeAddDomainConfig(ctx *zedmanagerContext,
CloudInitVersion: aiConfig.CloudInitVersion,
// This isDNiDnode will be set to true even if the App is not in cluster mode,
// This will be set in zedagent parseConfig for the case of single node/device App case.
IsDNidNode: isDNiDnode,
IsDNidNode: aiConfig.IsDesignatedNodeID,
}

dc.DiskConfigList = make([]types.DiskConfig, 0, len(aiStatus.VolumeRefStatusList))
Expand Down
25 changes: 25 additions & 0 deletions pkg/pillar/cmd/zedmanager/updatestatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,31 @@ func updateAIStatusUUID(ctx *zedmanagerContext, uuidStr string) {
}
}

// Activate this AppInstanceStatus generate config updates to
// the microservices
func activateAIStatusUUID(ctx *zedmanagerContext, uuidStr string) {

log.Functionf("activateAIStatusUUID(%s)", uuidStr)
status := lookupAppInstanceStatus(ctx, uuidStr)
if status == nil {
log.Functionf("activateAIStatusUUID for %s: Missing AppInstanceStatus",
uuidStr)
return
}
config := lookupAppInstanceConfig(ctx, uuidStr, true)
if config == nil || (status.PurgeInprogress == types.BringDown) {
removeAIStatus(ctx, status)
return
}
doActivate(ctx, uuidStr, *config, status)

log.Functionf("activateAIStatusUUID status %d for %s",
status.State, uuidStr)
publishAppInstanceStatus(ctx, status)
publishAppInstanceSummary(ctx)

}

// Remove this AppInstanceStatus and generate config removes for
// the microservices
func removeAIStatusUUID(ctx *zedmanagerContext, uuidStr string) {
Expand Down
32 changes: 20 additions & 12 deletions pkg/pillar/cmd/zedmanager/zedmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1098,11 +1098,19 @@ func handleCreate(ctxArg interface{}, key string,
log.Functionf("handleCreate(%v) for %s",
config.UUIDandVersion, config.DisplayName)

handleCreateAppInstanceStatus(ctx, config)
}

func handleCreateAppInstanceStatus(ctx *zedmanagerContext, config types.AppInstanceConfig) {
log.Functionf("handleCreateAppInstanceStatus(%v) for %s",
config.UUIDandVersion, config.DisplayName)

status := types.AppInstanceStatus{
UUIDandVersion: config.UUIDandVersion,
DisplayName: config.DisplayName,
FixedResources: config.FixedResources,
State: types.INITIAL,
UUIDandVersion: config.UUIDandVersion,
DisplayName: config.DisplayName,
FixedResources: config.FixedResources,
State: types.INITIAL,
IsDesignatedNodeID: config.IsDesignatedNodeID,
}

// Calculate the moment when the application should start, taking into account the configured delay
Expand All @@ -1119,10 +1127,10 @@ func handleCreate(ctxArg interface{}, key string,
configCounter := int(config.PurgeCmd.Counter + config.LocalPurgeCmd.Counter)
if err == nil {
if persistedCounter == configCounter {
log.Functionf("handleCreate(%v) for %s found matching purge counter %d",
log.Functionf("handleCreateAppInstanceStatus(%v) for %s found matching purge counter %d",
config.UUIDandVersion, config.DisplayName, persistedCounter)
} else {
log.Warnf("handleCreate(%v) for %s found different purge counter %d vs. %d",
log.Warnf("handleCreateAppInstanceStatus(%v) for %s found different purge counter %d vs. %d",
config.UUIDandVersion, config.DisplayName, persistedCounter, configCounter)
status.PurgeInprogress = types.DownloadAndVerify
status.PurgeStartedAt = time.Now()
Expand All @@ -1131,7 +1139,7 @@ func handleCreate(ctxArg interface{}, key string,
}
} else {
// Save this PurgeCmd.Counter as the baseline
log.Functionf("handleCreate(%v) for %s saving purge counter %d",
log.Functionf("handleCreateAppInstanceStatus(%v) for %s saving purge counter %d",
config.UUIDandVersion, config.DisplayName, configCounter)
err = ctx.appToPurgeCounterMap.Assign(mapKey, configCounter, true)
if err != nil {
Expand Down Expand Up @@ -1194,7 +1202,7 @@ func handleCreate(ctxArg interface{}, key string,
config.DisplayName, config.UUIDandVersion.UUID)
publishAppInstanceStatus(ctx, &status)
}
log.Functionf("handleCreate done for %s", config.DisplayName)
log.Functionf("handleCreateAppInstanceStatus done for %s", config.DisplayName)
}

func handleModify(ctxArg interface{}, key string,
Expand Down Expand Up @@ -1664,7 +1672,7 @@ func effectiveActivateCurrentProfile(config types.AppInstanceConfig, currentProf

func getKubeAppActivateStatus(ctx *zedmanagerContext, aiConfig types.AppInstanceConfig, effectiveActivate bool) bool {

if !ctx.hvTypeKube || aiConfig.DesignatedNodeID == uuid.Nil {
if !ctx.hvTypeKube {
return effectiveActivate
}

Expand Down Expand Up @@ -1693,9 +1701,9 @@ func getKubeAppActivateStatus(ctx *zedmanagerContext, aiConfig types.AppInstance
}
}

log.Functionf("getKubeAppActivateStatus: ai %s, node %s, onTheDevice %v, statusRunning %v",
aiConfig.DesignatedNodeID.String(), ctx.nodeUUID, onTheDevice, statusRunning)
if aiConfig.DesignatedNodeID == ctx.nodeUUID {
log.Functionf("getKubeAppActivateStatus: is designated node %v, node %s, onTheDevice %v, statusRunning %v",
aiConfig.IsDesignatedNodeID, ctx.nodeUUID, onTheDevice, statusRunning)
if aiConfig.IsDesignatedNodeID {
if statusRunning && !onTheDevice {
return false
}
Expand Down
Loading

0 comments on commit df51e7a

Please sign in to comment.