From e717ecc1a18f808aa405bfe7f8511de1d2ce5194 Mon Sep 17 00:00:00 2001 From: zhou hongbin <131335757+zzzzhhb@users.noreply.github.com> Date: Mon, 8 Apr 2024 14:43:48 +0800 Subject: [PATCH] feat(sysadvisor): add healthz check (#520) --- pkg/agent/evictionmanager/conditions.go | 6 ++ pkg/agent/evictionmanager/manager.go | 43 ++++++++++-- pkg/agent/evictionmanager/manager_test.go | 2 +- .../plugin/memory/system_pressure.go | 26 ++++--- pkg/agent/resourcemanager/fetcher/manager.go | 35 ++++++---- .../fetcher/manager_healthz.go | 69 ------------------- .../resourcemanager/fetcher/manager_test.go | 2 +- .../sysadvisor/plugin/inference/inference.go | 6 ++ .../sysadvisor/plugin/metacache/metacache.go | 6 +- .../qosaware/reporter/headroom_reporter.go | 6 ++ .../qosaware/reporter/nodemetric_reporter.go | 4 ++ .../qosaware/reporter/reporter_interface.go | 3 + .../plugin/qosaware/resource/cpu/advisor.go | 11 ++- .../qosaware/resource/memory/advisor.go | 21 ++++-- .../plugin/qosaware/server/base_server.go | 2 + .../plugin/qosaware/server/cpu_server.go | 5 ++ .../plugin/qosaware/server/memory_server.go | 5 ++ .../provisioner/malachite/provisioner.go | 43 ++++++++++-- pkg/metaserver/agent/pod/pod.go | 11 +++ 19 files changed, 193 insertions(+), 113 deletions(-) delete mode 100644 pkg/agent/resourcemanager/fetcher/manager_healthz.go diff --git a/pkg/agent/evictionmanager/conditions.go b/pkg/agent/evictionmanager/conditions.go index 92b0155bf..9fa9314f6 100644 --- a/pkg/agent/evictionmanager/conditions.go +++ b/pkg/agent/evictionmanager/conditions.go @@ -30,6 +30,7 @@ import ( pluginapi "github.com/kubewharf/katalyst-api/pkg/protocol/evictionplugin/v1alpha1" "github.com/kubewharf/katalyst-core/pkg/consts" + "github.com/kubewharf/katalyst-core/pkg/util/general" ) var ( @@ -89,6 +90,10 @@ func (m *EvictionManger) getNodeTaintsFromConditions() []v1.Taint { } func (m *EvictionManger) reportConditionsAsNodeTaints(ctx context.Context) { + var err error + defer func() { + _ = general.UpdateHealthzStateByError(reportTaintHealthCheckName, err) + }() node, err := m.metaGetter.GetNode(ctx) if err != nil { @@ -111,6 +116,7 @@ func (m *EvictionManger) reportConditionsAsNodeTaints(ctx context.Context) { if !controllerutil.SwapNodeControllerTaint(ctx, m.genericClient.KubeClient, taintsToAdd, taintsToDel, node) { klog.Errorf("failed to swap taints") + err = fmt.Errorf("failed to swap taints") } return diff --git a/pkg/agent/evictionmanager/manager.go b/pkg/agent/evictionmanager/manager.go index 0786a0ee9..fb1b241aa 100644 --- a/pkg/agent/evictionmanager/manager.go +++ b/pkg/agent/evictionmanager/manager.go @@ -29,6 +29,7 @@ import ( //nolint "github.com/golang/protobuf/proto" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/events" @@ -72,6 +73,11 @@ const ( UserUnknown = "unknown" MetricsPodLabelPrefix = "pod" + + evictionManagerHealthCheckName = "eviction_manager_sync" + reportTaintHealthCheckName = "eviction_manager_report_taint" + syncTolerationTurns = 3 + reportTaintToleration = 15 * time.Second ) // LatestCNRGetter returns the latest CNR resources. @@ -207,7 +213,10 @@ func (m *EvictionManger) getEvictionPlugins(genericClient *client.GenericClientS func (m *EvictionManger) Run(ctx context.Context) { general.Infof(" run with podKiller %v", m.podKiller.Name()) defer general.Infof(" started") - + general.RegisterHeartbeatCheck(evictionManagerHealthCheckName, syncTolerationTurns*m.conf.EvictionManagerSyncPeriod, + general.HealthzCheckStateNotReady, syncTolerationTurns*m.conf.EvictionManagerSyncPeriod) + general.RegisterHeartbeatCheck(reportTaintHealthCheckName, reportTaintToleration, + general.HealthzCheckStateNotReady, reportTaintToleration) m.podKiller.Start(ctx) for _, endpoint := range m.endpoints { endpoint.Start() @@ -220,6 +229,11 @@ func (m *EvictionManger) Run(ctx context.Context) { } func (m *EvictionManger) sync(ctx context.Context) { + var err error + defer func() { + _ = general.UpdateHealthzStateByError(evictionManagerHealthCheckName, err) + }() + activePods, err := m.metaGetter.GetPodList(ctx, native.PodIsActive) if err != nil { general.Errorf("failed to list pods from metaServer: %v", err) @@ -233,14 +247,25 @@ func (m *EvictionManger) sync(ctx context.Context) { general.Infof(" currently, there are %v candidate pods", len(pods)) _ = m.emitter.StoreInt64(MetricsNameCandidatePodCNT, int64(len(pods)), metrics.MetricTypeNameRaw) - collector := m.collectEvictionResult(pods) + errList := make([]error, 0) + collector, collectErr := m.collectEvictionResult(pods) + if collectErr != nil { + errList = append(errList, collectErr) + } - m.doEvict(collector.getSoftEvictPods(), collector.getForceEvictPods()) + evictErr := m.doEvict(collector.getSoftEvictPods(), collector.getForceEvictPods()) + if evictErr != nil { + errList = append(errList, evictErr) + } + if len(errList) > 0 { + err = errors.NewAggregate(errList) + } } -func (m *EvictionManger) collectEvictionResult(pods []*v1.Pod) *evictionRespCollector { +func (m *EvictionManger) collectEvictionResult(pods []*v1.Pod) (*evictionRespCollector, error) { dynamicConfig := m.conf.GetDynamicConfiguration() collector := newEvictionRespCollector(dynamicConfig.DryRun, m.conf, m.emitter) + var errList []error m.endpointLock.RLock() for pluginName, ep := range m.endpoints { @@ -252,6 +277,7 @@ func (m *EvictionManger) collectEvictionResult(pods []*v1.Pod) *evictionRespColl }) if err != nil { general.Errorf(" calling GetEvictPods of plugin: %s failed with error: %v", pluginName, err) + errList = append(errList, err) } else if getEvictResp == nil { general.Errorf(" calling GetEvictPods of plugin: %s and getting nil resp", pluginName) } else { @@ -262,6 +288,7 @@ func (m *EvictionManger) collectEvictionResult(pods []*v1.Pod) *evictionRespColl metResp, err := ep.ThresholdMet(context.Background()) if err != nil { general.Errorf(" calling ThresholdMet of plugin: %s failed with error: %v", pluginName, err) + errList = append(errList, err) continue } else if metResp == nil { general.Errorf(" calling ThresholdMet of plugin: %s and getting nil resp", pluginName) @@ -310,6 +337,7 @@ func (m *EvictionManger) collectEvictionResult(pods []*v1.Pod) *evictionRespColl m.endpointLock.RUnlock() if err != nil { general.Errorf(" calling GetTopEvictionPods of plugin: %s failed with error: %v", pluginName, err) + errList = append(errList, err) continue } else if resp == nil { general.Errorf(" calling GetTopEvictionPods of plugin: %s and getting nil resp", pluginName) @@ -322,10 +350,10 @@ func (m *EvictionManger) collectEvictionResult(pods []*v1.Pod) *evictionRespColl collector.collectTopEvictionPods(dynamicConfig.DryRun, pluginName, threshold, resp) } - return collector + return collector, errors.NewAggregate(errList) } -func (m *EvictionManger) doEvict(softEvictPods, forceEvictPods map[string]*rule.RuledEvictPod) { +func (m *EvictionManger) doEvict(softEvictPods, forceEvictPods map[string]*rule.RuledEvictPod) error { softEvictPods = filterOutCandidatePodsWithForcePods(softEvictPods, forceEvictPods) bestSuitedCandidate := m.getEvictPodFromCandidates(softEvictPods) if bestSuitedCandidate != nil && bestSuitedCandidate.Pod != nil { @@ -346,13 +374,14 @@ func (m *EvictionManger) doEvict(softEvictPods, forceEvictPods map[string]*rule. err := m.killWithRules(rpList) if err != nil { general.Errorf(" got err: %v in EvictPods", err) - return + return err } general.Infof(" evict %d pods in evictionmanager", len(rpList)) _ = m.emitter.StoreInt64(MetricsNameVictimPodCNT, int64(len(rpList)), metrics.MetricTypeNameRaw, metrics.MetricTag{Key: "type", Val: "total"}) metricPodsToEvict(m.emitter, rpList, m.conf.GenericConfiguration.QoSConfiguration, m.conf.GenericEvictionConfiguration.PodMetricLabels) + return nil } // ValidatePlugin validates a plugin if the version is correct and the name has the format of an extended resource diff --git a/pkg/agent/evictionmanager/manager_test.go b/pkg/agent/evictionmanager/manager_test.go index b5d3204f8..008caa5a4 100644 --- a/pkg/agent/evictionmanager/manager_test.go +++ b/pkg/agent/evictionmanager/manager_test.go @@ -302,7 +302,7 @@ func TestEvictionManger_collectEvictionResult(t *testing.T) { t.Run(tt.name, func(t *testing.T) { mgr.conf.GetDynamicConfiguration().DryRun = tt.dryrun - collector := mgr.collectEvictionResult(pods) + collector, _ := mgr.collectEvictionResult(pods) gotForceEvictPods := sets.String{} gotSoftEvictPods := sets.String{} gotConditions := sets.String{} diff --git a/pkg/agent/evictionmanager/plugin/memory/system_pressure.go b/pkg/agent/evictionmanager/plugin/memory/system_pressure.go index f9dccdab6..6fbb5b166 100644 --- a/pkg/agent/evictionmanager/plugin/memory/system_pressure.go +++ b/pkg/agent/evictionmanager/plugin/memory/system_pressure.go @@ -46,6 +46,8 @@ const ( EvictionPluginNameSystemMemoryPressure = "system-memory-pressure-eviction-plugin" EvictionScopeSystemMemory = "SystemMemory" evictionConditionMemoryPressure = "MemoryPressure" + systemMemoryPressureHealthCheck = "system_memory_pressure_eviction_detect" + syncTolerationTurns = 3 ) func NewSystemPressureEvictionPlugin(_ *client.GenericClientSet, _ events.EventRecorder, @@ -99,6 +101,8 @@ func (s *SystemPressureEvictionPlugin) Name() string { } func (s *SystemPressureEvictionPlugin) Start() { + general.RegisterHeartbeatCheck(systemMemoryPressureHealthCheck, syncTolerationTurns*s.syncPeriod, + general.HealthzCheckStateNotReady, syncTolerationTurns*s.syncPeriod) go wait.UntilWithContext(context.TODO(), s.detectSystemPressures, s.syncPeriod) } @@ -137,12 +141,16 @@ func (s *SystemPressureEvictionPlugin) ThresholdMet(_ context.Context) (*plugina func (s *SystemPressureEvictionPlugin) detectSystemPressures(_ context.Context) { s.Lock() defer s.Unlock() + var err error + defer func() { + _ = general.UpdateHealthzStateByError(systemMemoryPressureHealthCheck, err) + }() s.isUnderSystemPressure = false s.systemAction = actionNoop - s.detectSystemWatermarkPressure() - s.detectSystemKswapdStealPressure() + err = s.detectSystemWatermarkPressure() + err = s.detectSystemKswapdStealPressure() switch s.systemAction { case actionReclaimedEviction: @@ -162,7 +170,7 @@ func (s *SystemPressureEvictionPlugin) detectSystemPressures(_ context.Context) } } -func (s *SystemPressureEvictionPlugin) detectSystemWatermarkPressure() { +func (s *SystemPressureEvictionPlugin) detectSystemWatermarkPressure() error { free, total, scaleFactor, err := helper.GetWatermarkMetrics(s.metaServer.MetricsFetcher, s.emitter, nonExistNumaID) if err != nil { _ = s.emitter.StoreInt64(metricsNameFetchMetricError, 1, metrics.MetricTypeNameCount, @@ -170,7 +178,7 @@ func (s *SystemPressureEvictionPlugin) detectSystemWatermarkPressure() { metricsTagKeyNumaID: strconv.Itoa(nonExistNumaID), })...) general.Errorf("failed to getWatermarkMetrics for system, err: %v", err) - return + return err } thresholdMinimum := float64(s.dynamicConfig.GetDynamicConfiguration().SystemFreeMemoryThresholdMinimum) @@ -184,9 +192,10 @@ func (s *SystemPressureEvictionPlugin) detectSystemWatermarkPressure() { s.isUnderSystemPressure = true s.systemAction = actionReclaimedEviction } + return nil } -func (s *SystemPressureEvictionPlugin) detectSystemKswapdStealPressure() { +func (s *SystemPressureEvictionPlugin) detectSystemKswapdStealPressure() error { kswapdSteal, err := helper.GetNodeMetricWithTime(s.metaServer.MetricsFetcher, s.emitter, consts.MetricMemKswapdstealSystem) if err != nil { s.kswapdStealPreviousCycle = kswapdStealPreviousCycleMissing @@ -196,12 +205,12 @@ func (s *SystemPressureEvictionPlugin) detectSystemKswapdStealPressure() { metricsTagKeyNumaID: strconv.Itoa(nonExistNumaID), })...) general.Errorf("failed to getSystemKswapdStealMetrics, err: %v", err) - return + return err } if kswapdSteal.Time.Equal(s.kswapdStealPreviousCycleTime) { general.Warningf("getSystemKswapdStealMetrics get same result as last round,skip current round") - return + return nil } dynamicConfig := s.dynamicConfig.GetDynamicConfiguration() @@ -229,7 +238,7 @@ func (s *SystemPressureEvictionPlugin) detectSystemKswapdStealPressure() { s.kswapdStealPreviousCycleTime = *(kswapdSteal.Time) if kswapdStealPreviousCycle == kswapdStealPreviousCycleMissing { general.Warningf("kswapd steal of the previous cycle is missing") - return + return nil } if (kswapdSteal.Value-kswapdStealPreviousCycle)/(kswapdSteal.Time.Sub(kswapdStealPreviousCycleTime)).Seconds() >= float64(dynamicConfig.SystemKswapdRateThreshold) { @@ -249,6 +258,7 @@ func (s *SystemPressureEvictionPlugin) detectSystemKswapdStealPressure() { s.systemAction = actionEviction } } + return nil } func (s *SystemPressureEvictionPlugin) GetTopEvictionPods(_ context.Context, request *pluginapi.GetTopEvictionPodsRequest) (*pluginapi.GetTopEvictionPodsResponse, error) { diff --git a/pkg/agent/resourcemanager/fetcher/manager.go b/pkg/agent/resourcemanager/fetcher/manager.go index b73d3eae9..c7fbe7f76 100644 --- a/pkg/agent/resourcemanager/fetcher/manager.go +++ b/pkg/agent/resourcemanager/fetcher/manager.go @@ -50,7 +50,7 @@ import ( const reporterManagerCheckpoint = "reporter_manager_checkpoint" -const healthzNameReporterFetcherReady = "ReporterFetcherReady" +const reporterFetcherHealthCheckName = "reporter_fetcher_sync" const healthzGracePeriodMultiplier = 3 @@ -220,14 +220,12 @@ func (m *ReporterPluginManager) DeRegisterPlugin(pluginName string) { // Run start the reporter plugin manager func (m *ReporterPluginManager) Run(ctx context.Context) { + general.RegisterHeartbeatCheck(reporterFetcherHealthCheckName, m.reconcilePeriod*healthzGracePeriodMultiplier, + general.HealthzCheckStateReady, m.reconcilePeriod*healthzGracePeriodMultiplier) go wait.UntilWithContext(ctx, m.syncFunc, m.reconcilePeriod) klog.Infof("reporter plugin manager started") m.reporter.Run(ctx) - - general.RegisterHeartbeatCheck(healthzNameReporterFetcherReady, m.reconcilePeriod*healthzGracePeriodMultiplier, - general.HealthzCheckStateReady, m.reconcilePeriod*healthzGracePeriodMultiplier) - go wait.UntilWithContext(ctx, m.healthz, m.reconcilePeriod) } func (m *ReporterPluginManager) isVersionCompatibleWithPlugin(versions []string) bool { @@ -294,9 +292,9 @@ func (m *ReporterPluginManager) runEndpoint(pluginName string, e plugin.Endpoint // and the manager can read it from Endpoint cache to obtain content changes initiative func (m *ReporterPluginManager) genericCallback(pluginName string, _ *v1alpha1.GetReportContentResponse) { klog.Infof("genericCallback") - // get report content from each healthy Endpoint from cache, the lastly response + // get report content from each healthy Endpoint from cache, the last response // from this plugin has been already stored to its Endpoint cache before this callback called - reportResponses := m.getReportContent(true) + reportResponses, _ := m.getReportContent(true) err := m.pushContents(context.Background(), reportResponses) if err != nil { @@ -318,6 +316,11 @@ func (m *ReporterPluginManager) pushContents(ctx context.Context, reportResponse // genericSync periodically calls the Get function to obtain content changes func (m *ReporterPluginManager) genericSync(ctx context.Context) { klog.Infof("genericSync") + errList := make([]error, 0) + defer func() { + _ = general.UpdateHealthzStateByError(reporterFetcherHealthCheckName, errors.NewAggregate(errList)) + }() + begin := time.Now() defer func() { costs := time.Since(begin) @@ -329,15 +332,17 @@ func (m *ReporterPluginManager) genericSync(ctx context.Context) { m.clearUnhealthyPlugin() // get report content from each healthy Endpoint directly - reportResponses := m.getReportContent(false) - - err := m.pushContents(ctx, reportResponses) + reportResponses, err := m.getReportContent(false) if err != nil { + errList = append(errList, err) + } + + pushErr := m.pushContents(ctx, reportResponses) + if pushErr != nil { _ = m.emitter.StoreInt64("reporter_plugin_sync_push_failed", 1, metrics.MetricTypeNameCount) klog.Errorf("report plugin failed with error: %v", err) + errList = append(errList, pushErr) } - - m.healthzSyncLoop() } // clearUnhealthyPlugin is to clear stopped plugins from cache which exceeded grace period @@ -360,8 +365,9 @@ func (m *ReporterPluginManager) clearUnhealthyPlugin() { // getReportContent is to get reportContent from plugins. if cacheFirst is true, // use plugin cache (when it is no nil), otherwise we call plugin directly. -func (m *ReporterPluginManager) getReportContent(cacheFirst bool) map[string]*v1alpha1.GetReportContentResponse { +func (m *ReporterPluginManager) getReportContent(cacheFirst bool) (map[string]*v1alpha1.GetReportContentResponse, error) { reportResponses := make(map[string]*v1alpha1.GetReportContentResponse) + errList := make([]error, 0) begin := time.Now() m.mutex.Lock() @@ -395,6 +401,7 @@ func (m *ReporterPluginManager) getReportContent(cacheFirst bool) map[string]*v1 klog.InfoS("GetReportContent", "costs", epCosts, "pluginName", pluginName) _ = m.emitter.StoreInt64(metricsNameGetContentPluginCost, epCosts.Microseconds(), metrics.MetricTypeNameRaw, []metrics.MetricTag{{Key: "plugin", Val: pluginName}}...) if err != nil { + errList = append(errList, err) s, _ := status.FromError(err) _ = m.emitter.StoreInt64("reporter_plugin_get_content_failed", 1, metrics.MetricTypeNameCount, []metrics.MetricTag{ {Key: "code", Val: s.Code().String()}, @@ -409,7 +416,7 @@ func (m *ReporterPluginManager) getReportContent(cacheFirst bool) map[string]*v1 reportResponses[pluginName] = resp } - return reportResponses + return reportResponses, errors.NewAggregate(errList) } func (m *ReporterPluginManager) writeCheckpoint(reportResponses map[string]*v1alpha1.GetReportContentResponse) error { diff --git a/pkg/agent/resourcemanager/fetcher/manager_healthz.go b/pkg/agent/resourcemanager/fetcher/manager_healthz.go deleted file mode 100644 index a0aa943ea..000000000 --- a/pkg/agent/resourcemanager/fetcher/manager_healthz.go +++ /dev/null @@ -1,69 +0,0 @@ -/* -Copyright 2022 The Katalyst Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package fetcher - -import ( - "context" - "fmt" - "strings" - "time" - - "k8s.io/apimachinery/pkg/util/sets" - - "github.com/kubewharf/katalyst-core/pkg/util/general" -) - -const reporterFetcherSuccessTimeout = time.Minute * 5 - -const ( - reporterFetcherRulesSyncLoop = "SyncLoop" -) - -var reporterFetcherRules = sets.NewString( - reporterFetcherRulesSyncLoop, -) - -// healthzSyncLoop is easy, but other healthz check may be complicated, -// such.as. the dependency is crashed, the state-file is crashed to case -// the calculation logic can't work as expected, and so on. -func (m *ReporterPluginManager) healthzSyncLoop() { - m.healthzState.Store(reporterFetcherRulesSyncLoop, time.Now()) -} - -// healthz returns whether reporter manager is healthy by checking the -// healthy transition time for each rule is out-of-date. -// -// during starting period, the results may last unhealthy for a while, and -// the caller functions should handle this situation. -func (m *ReporterPluginManager) healthz(_ context.Context) { - - now := time.Now() - var unHealthy []string - for name := range reporterFetcherRules { - updatedTime, ok := m.healthzState.Load(name) - if !ok || now.After(updatedTime.(time.Time).Add(reporterFetcherSuccessTimeout)) { - unHealthy = append(unHealthy, name) - } - } - - if len(unHealthy) != 0 { - _ = general.UpdateHealthzState(healthzNameReporterFetcherReady, general.HealthzCheckStateNotReady, - fmt.Sprintf("the following checks timeout: %s", strings.Join(unHealthy, ","))) - } else { - _ = general.UpdateHealthzState(healthzNameReporterFetcherReady, general.HealthzCheckStateReady, "") - } -} diff --git a/pkg/agent/resourcemanager/fetcher/manager_test.go b/pkg/agent/resourcemanager/fetcher/manager_test.go index fec141e39..21a8d1e36 100644 --- a/pkg/agent/resourcemanager/fetcher/manager_test.go +++ b/pkg/agent/resourcemanager/fetcher/manager_test.go @@ -199,7 +199,7 @@ func TestHealthz(t *testing.T) { results := general.GetRegisterReadinessCheckResult() for name, response := range results { - if healthzNameReporterFetcherReady == string(name) { + if reporterFetcherHealthCheckName == string(name) { require.True(t, response.Ready) } } diff --git a/pkg/agent/sysadvisor/plugin/inference/inference.go b/pkg/agent/sysadvisor/plugin/inference/inference.go index 4bc1d4093..459c2a92e 100644 --- a/pkg/agent/sysadvisor/plugin/inference/inference.go +++ b/pkg/agent/sysadvisor/plugin/inference/inference.go @@ -104,6 +104,9 @@ func NewInferencePlugin(pluginName string, conf *config.Configuration, extraConf } func (infp *InferencePlugin) Run(ctx context.Context) { + if len(infp.modelsResultFetchers) > 0 { + general.RegisterHeartbeatCheck(borweinfetcher.BorweinModelResultFetcherName, 3*infp.period, general.HealthzCheckStateNotReady, 3*infp.period) + } wait.UntilWithContext(ctx, infp.fetchModelResult, infp.period) } @@ -125,6 +128,9 @@ func (infp *InferencePlugin) fetchModelResult(ctx context.Context) { go func(modelName string, fetcher modelresultfetcher.ModelResultFetcher) { defer wg.Done() err := fetcher.FetchModelResult(ctx, infp.metaReader, infp.metaWriter, infp.metaServer) + defer func() { + _ = general.UpdateHealthzStateByError(borweinfetcher.BorweinModelResultFetcherName, err) + }() if err != nil { general.Errorf("FetchModelResult for model: %s failed with error: %v", modelName, err) } diff --git a/pkg/agent/sysadvisor/plugin/metacache/metacache.go b/pkg/agent/sysadvisor/plugin/metacache/metacache.go index 1855c1da0..7e23276cf 100644 --- a/pkg/agent/sysadvisor/plugin/metacache/metacache.go +++ b/pkg/agent/sysadvisor/plugin/metacache/metacache.go @@ -30,6 +30,7 @@ import ( "github.com/kubewharf/katalyst-core/pkg/metaserver" "github.com/kubewharf/katalyst-core/pkg/metrics" metricspool "github.com/kubewharf/katalyst-core/pkg/metrics/metrics-pool" + "github.com/kubewharf/katalyst-core/pkg/util/general" ) const ( @@ -75,6 +76,7 @@ func (mcp *MetaCachePlugin) Init() error { // Run starts the metacache plugin func (mcp *MetaCachePlugin) Run(ctx context.Context) { + general.RegisterHeartbeatCheck(mcp.name, 3*mcp.period, general.HealthzCheckStateNotReady, 3*mcp.period) go wait.UntilWithContext(ctx, mcp.periodicWork, mcp.period) } @@ -84,6 +86,7 @@ func (mcp *MetaCachePlugin) periodicWork(_ context.Context) { // Fill missing container metadata from metaserver f := func(podUID string, containerName string, ci *types.ContainerInfo) bool { spec, err := mcp.metaServer.GetContainerSpec(podUID, containerName) + if err != nil { klog.Errorf("[metacache] get container spec failed: %v, %v/%v", err, podUID, containerName) return true @@ -106,5 +109,6 @@ func (mcp *MetaCachePlugin) periodicWork(_ context.Context) { } return true } - _ = mcp.MetaWriter.RangeAndUpdateContainer(f) + err := mcp.MetaWriter.RangeAndUpdateContainer(f) + _ = general.UpdateHealthzStateByError(mcp.name, err) } diff --git a/pkg/agent/sysadvisor/plugin/qosaware/reporter/headroom_reporter.go b/pkg/agent/sysadvisor/plugin/qosaware/reporter/headroom_reporter.go index fe7f3e255..c11e70398 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/reporter/headroom_reporter.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/reporter/headroom_reporter.go @@ -38,6 +38,7 @@ import ( "github.com/kubewharf/katalyst-core/pkg/metaserver" "github.com/kubewharf/katalyst-core/pkg/metrics" "github.com/kubewharf/katalyst-core/pkg/util" + "github.com/kubewharf/katalyst-core/pkg/util/general" ) func init() { @@ -127,6 +128,7 @@ func (r *headroomReporterPlugin) Name() string { func (r *headroomReporterPlugin) Start() (err error) { r.Lock() + general.RegisterHeartbeatCheck(headroomReporterPluginName, healthCheckTolerationDuration, general.HealthzCheckStateReady, healthCheckTolerationDuration) defer func() { if err == nil { r.started = true @@ -163,6 +165,10 @@ func (r *headroomReporterPlugin) Stop() error { } func (r *headroomReporterPlugin) GetReportContent(_ context.Context, _ *v1alpha1.Empty) (*v1alpha1.GetReportContentResponse, error) { + var err error + defer func() { + _ = general.UpdateHealthzStateByError(headroomReporterPluginName, err) + }() res, err := r.getReclaimedResource() if err != nil { return nil, err diff --git a/pkg/agent/sysadvisor/plugin/qosaware/reporter/nodemetric_reporter.go b/pkg/agent/sysadvisor/plugin/qosaware/reporter/nodemetric_reporter.go index 648d73905..6d98649a5 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/reporter/nodemetric_reporter.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/reporter/nodemetric_reporter.go @@ -152,6 +152,7 @@ func (p *nodeMetricsReporterPlugin) Name() string { func (p *nodeMetricsReporterPlugin) Start() (err error) { p.stop = make(chan struct{}) + general.RegisterHeartbeatCheck(nodeMetricsReporterPluginName, healthCheckTolerationDuration, general.HealthzCheckStateNotReady, healthCheckTolerationDuration) go wait.Until(p.updateNodeMetrics, p.syncPeriod, p.stop) return } @@ -166,6 +167,9 @@ func (p *nodeMetricsReporterPlugin) Stop() error { func (p *nodeMetricsReporterPlugin) GetReportContent(_ context.Context, _ *v1alpha1.Empty) (*v1alpha1.GetReportContentResponse, error) { general.InfoS("called") reportToCNR, err := p.getReportNodeMetricsForCNR() + defer func() { + _ = general.UpdateHealthzStateByError(nodeMetricsReporterPluginName, err) + }() if err != nil { return nil, err } diff --git a/pkg/agent/sysadvisor/plugin/qosaware/reporter/reporter_interface.go b/pkg/agent/sysadvisor/plugin/qosaware/reporter/reporter_interface.go index b0994e617..b004b819b 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/reporter/reporter_interface.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/reporter/reporter_interface.go @@ -18,8 +18,11 @@ package reporter import ( "context" + "time" ) +const healthCheckTolerationDuration = 15 * time.Second + // Reporter is used to report resource type Reporter interface { Run(ctx context.Context) diff --git a/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/advisor.go b/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/advisor.go index de14471a2..b205b56f2 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/advisor.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/advisor.go @@ -58,6 +58,9 @@ const ( metricRegionIndicatorTargetPrefix = "region_indicator_target_" metricRegionIndicatorCurrentPrefix = "region_indicator_current_" metricRegionIndicatorErrorPrefix = "region_indicator_error_" + + cpuAdvisorHealthCheckName = "cpu_advisor_update" + healthCheckTolerationDuration = 15 * time.Second ) var ( @@ -152,6 +155,7 @@ func NewCPUResourceAdvisor(conf *config.Configuration, extraConf interface{}, me } func (cra *cpuResourceAdvisor) Run(ctx context.Context) { + general.RegisterHeartbeatCheck(cpuAdvisorHealthCheckName, healthCheckTolerationDuration, general.HealthzCheckStateNotReady, healthCheckTolerationDuration) for { select { case v := <-cra.recvCh: @@ -206,10 +210,13 @@ func (cra *cpuResourceAdvisor) GetHeadroom() (resource.Quantity, error) { // update works in a monolithic way to maintain lifecycle and triggers update actions for all regions; // todo: re-consider whether it's efficient or we should make start individual goroutine for each region -func (cra *cpuResourceAdvisor) update() error { +func (cra *cpuResourceAdvisor) update() (err error) { cra.mutex.Lock() defer cra.mutex.Unlock() - if err := cra.updateWithIsolationGuardian(true); err != nil { + defer func() { + _ = general.UpdateHealthzStateByError(cpuAdvisorHealthCheckName, err) + }() + if err = cra.updateWithIsolationGuardian(true); err != nil { if err == errIsolationSafetyCheckFailed { klog.Warningf("[qosaware-cpu] failed to updateWithIsolationGuardian(true): %q", err) return cra.updateWithIsolationGuardian(false) diff --git a/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/advisor.go b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/advisor.go index 201601970..6ffdde3a8 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/advisor.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/resource/memory/advisor.go @@ -25,6 +25,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" @@ -68,6 +69,9 @@ const ( // multiply the scale by the criticalWaterMark to get the safe watermark criticalWaterMarkScaleFactor = 2 + + memoryHealthCheckName = "memory_advisor_update" + healthCheckTolerationDuration = 15 * time.Second ) // memoryResourceAdvisor updates memory headroom for reclaimed resource @@ -131,7 +135,7 @@ func NewMemoryResourceAdvisor(conf *config.Configuration, extraConf interface{}, func (ra *memoryResourceAdvisor) Run(ctx context.Context) { period := ra.conf.SysAdvisorPluginsConfiguration.QoSAwarePluginConfiguration.SyncPeriod - + general.RegisterHeartbeatCheck(memoryHealthCheckName, healthCheckTolerationDuration, general.HealthzCheckStateNotReady, healthCheckTolerationDuration) general.InfoS("wait to list containers") <-ra.recvCh general.InfoS("list containers successfully") @@ -181,7 +185,12 @@ func (ra *memoryResourceAdvisor) sendAdvices() error { } func (ra *memoryResourceAdvisor) doUpdate() { - if err := ra.update(); err != nil { + err := ra.update() + defer func() { + _ = general.UpdateHealthzStateByError(memoryHealthCheckName, err) + }() + + if err != nil { general.Warningf("failed to update memory advice: %q", err) } } @@ -227,11 +236,15 @@ func (ra *memoryResourceAdvisor) update() error { NUMAConditions: NUMAConditions, } + var errs []error for _, plugin := range ra.plugins { - _ = plugin.Reconcile(&memoryPressureStatus) + rErr := plugin.Reconcile(&memoryPressureStatus) + errs = append(errs, rErr) } - return ra.sendAdvices() + adviceErr := ra.sendAdvices() + errs = append(errs, adviceErr) + return errors.NewAggregate(errs) } func (ra *memoryResourceAdvisor) detectNUMAPressureConditions() (map[int]*types.MemoryPressureCondition, error) { diff --git a/pkg/agent/sysadvisor/plugin/qosaware/server/base_server.go b/pkg/agent/sysadvisor/plugin/qosaware/server/base_server.go index 7f563bdf6..71d0fe06e 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/server/base_server.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/server/base_server.go @@ -51,6 +51,8 @@ const ( metricServerLWSendResponseFailed = "lw_send_response_failed" metricServerLWSendResponseSucceeded = "lw_send_response_succeeded" metricServerCheckpointUpdateContainerFailed = "checkpoint_update_container_failed" + + healthCheckTolerationDuration = 15 * time.Second ) type baseServer struct { diff --git a/pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server.go b/pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server.go index 927c1a4ed..ffaaf60a2 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/server/cpu_server.go @@ -42,6 +42,8 @@ import ( const ( cpuServerName string = "cpu-server" + + cpuServerHealthCheckName = "cpu-server-lw" ) type cpuServer struct { @@ -68,6 +70,7 @@ func (cs *cpuServer) RegisterAdvisorServer() { func (cs *cpuServer) ListAndWatch(_ *advisorsvc.Empty, server cpuadvisor.CPUAdvisor_ListAndWatchServer) error { _ = cs.emitter.StoreInt64(cs.genMetricsName(metricServerLWCalled), int64(cs.period.Seconds()), metrics.MetricTypeNameCount) + general.RegisterHeartbeatCheck(cpuServerHealthCheckName, healthCheckTolerationDuration, general.HealthzCheckStateNotReady, healthCheckTolerationDuration) if !cs.getCheckpointCalled { if err := cs.startToGetCheckpointFromCPUPlugin(); err != nil { @@ -121,9 +124,11 @@ func (cs *cpuServer) ListAndWatch(_ *advisorsvc.Empty, server cpuadvisor.CPUAdvi if err := server.Send(&cpuadvisor.ListAndWatchResponse{Entries: calculationEntriesMap}); err != nil { klog.Errorf("[qosaware-server-cpu] send response failed: %v", err) _ = cs.emitter.StoreInt64(cs.genMetricsName(metricServerLWSendResponseFailed), int64(cs.period.Seconds()), metrics.MetricTypeNameCount) + _ = general.UpdateHealthzStateByError(cpuServerHealthCheckName, err) return err } klog.Infof("[qosaware-server-cpu] send calculation result: %v", general.ToString(calculationEntriesMap)) + _ = general.UpdateHealthzStateByError(cpuServerHealthCheckName, nil) _ = cs.emitter.StoreInt64(cs.genMetricsName(metricServerLWSendResponseSucceeded), int64(cs.period.Seconds()), metrics.MetricTypeNameCount) } } diff --git a/pkg/agent/sysadvisor/plugin/qosaware/server/memory_server.go b/pkg/agent/sysadvisor/plugin/qosaware/server/memory_server.go index 0c76e8cdf..abaf3bb45 100644 --- a/pkg/agent/sysadvisor/plugin/qosaware/server/memory_server.go +++ b/pkg/agent/sysadvisor/plugin/qosaware/server/memory_server.go @@ -39,6 +39,8 @@ const ( memoryServerName string = "memory-server" durationToWaitAddContainer = time.Second * 30 durationToWaitListAndWatchCalled = time.Second * 5 + + memoryServerHealthCheckName = "memory-server-lw" ) type memoryServer struct { @@ -134,6 +136,7 @@ func (ms *memoryServer) listContainers() error { func (ms *memoryServer) ListAndWatch(_ *advisorsvc.Empty, server advisorsvc.AdvisorService_ListAndWatchServer) error { _ = ms.emitter.StoreInt64(ms.genMetricsName(metricServerLWCalled), int64(ms.period.Seconds()), metrics.MetricTypeNameCount) + general.RegisterHeartbeatCheck(memoryServerHealthCheckName, healthCheckTolerationDuration, general.HealthzCheckStateNotReady, healthCheckTolerationDuration) recvCh, ok := ms.recvCh.(chan types.InternalMemoryCalculationResult) if !ok { @@ -163,10 +166,12 @@ func (ms *memoryServer) ListAndWatch(_ *advisorsvc.Empty, server advisorsvc.Advi if err := server.Send(resp); err != nil { klog.Warningf("[qosaware-server-memory] send response failed: %v", err) _ = ms.emitter.StoreInt64(ms.genMetricsName(metricServerLWSendResponseFailed), int64(ms.period.Seconds()), metrics.MetricTypeNameCount) + _ = general.UpdateHealthzStateByError(memoryServerHealthCheckName, err) return err } klog.Infof("[qosaware-server-memory] send calculation result: %v", general.ToString(resp)) + _ = general.UpdateHealthzStateByError(memoryServerHealthCheckName, nil) _ = ms.emitter.StoreInt64(ms.genMetricsName(metricServerLWSendResponseSucceeded), int64(ms.period.Seconds()), metrics.MetricTypeNameCount) } } diff --git a/pkg/metaserver/agent/metric/provisioner/malachite/provisioner.go b/pkg/metaserver/agent/metric/provisioner/malachite/provisioner.go index 64c87a82a..927d2caaf 100644 --- a/pkg/metaserver/agent/metric/provisioner/malachite/provisioner.go +++ b/pkg/metaserver/agent/metric/provisioner/malachite/provisioner.go @@ -20,8 +20,10 @@ import ( "context" "strconv" "strings" + "sync" "time" + "k8s.io/apimachinery/pkg/util/errors" "k8s.io/klog/v2" "github.com/kubewharf/katalyst-core/pkg/config/agent/global" @@ -45,6 +47,9 @@ const ( defaultMetricUpdateInterval = 10.0 pageShift = 12 + + malachiteProvisionerHealthCheckName = "malachite_provisioner_sample" + malachiteProvisionTolerationTime = 15 * time.Second ) // NewMalachiteMetricsProvisioner returns the default implementation of MetricsFetcher. @@ -63,9 +68,14 @@ type MalachiteMetricsProvisioner struct { malachiteClient *client.MalachiteClient baseConf *global.BaseConfiguration emitter metrics.MetricEmitter + startOnce sync.Once } func (m *MalachiteMetricsProvisioner) Run(ctx context.Context) { + m.startOnce.Do(func() { + general.RegisterHeartbeatCheck(malachiteProvisionerHealthCheckName, malachiteProvisionTolerationTime, + general.HealthzCheckStateNotReady, malachiteProvisionTolerationTime) + }) m.sample(ctx) } @@ -73,15 +83,24 @@ func (m *MalachiteMetricsProvisioner) sample(ctx context.Context) { klog.V(4).Infof("[malachite] heartbeat") if !m.checkMalachiteHealthy() { + _ = general.UpdateHealthzState(malachiteProvisionerHealthCheckName, general.HealthzCheckStateNotReady, "malachite is not healthy") return } + errList := make([]error, 0) // Update system data - m.updateSystemStats() + if err := m.updateSystemStats(); err != nil { + errList = append(errList, err) + } // Update pod data - m.updatePodsCgroupData(ctx) + if err := m.updatePodsCgroupData(ctx); err != nil { + errList = append(errList, err) + } // Update top level cgroup of kubepods - m.updateCgroupData() + if err := m.updateCgroupData(); err != nil { + errList = append(errList, err) + } + _ = general.UpdateHealthzStateByError(malachiteProvisionerHealthCheckName, errors.NewAggregate(errList)) } // checkMalachiteHealthy is to check whether malachite is healthy @@ -97,9 +116,11 @@ func (m *MalachiteMetricsProvisioner) checkMalachiteHealthy() bool { } // Get raw system stats by malachite sdk and set to metricStore -func (m *MalachiteMetricsProvisioner) updateSystemStats() { +func (m *MalachiteMetricsProvisioner) updateSystemStats() error { + errList := make([]error, 0) systemComputeData, err := m.malachiteClient.GetSystemComputeStats() if err != nil { + errList = append(errList, err) klog.Errorf("[malachite] get system compute stats failed, err %v", err) _ = m.emitter.StoreInt64(metricsNameMalachiteGetSystemStatusFailed, 1, metrics.MetricTypeNameCount, metrics.MetricTag{Key: "kind", Val: "compute"}) @@ -110,6 +131,7 @@ func (m *MalachiteMetricsProvisioner) updateSystemStats() { systemMemoryData, err := m.malachiteClient.GetSystemMemoryStats() if err != nil { + errList = append(errList, err) klog.Errorf("[malachite] get system memory stats failed, err %v", err) _ = m.emitter.StoreInt64(metricsNameMalachiteGetSystemStatusFailed, 1, metrics.MetricTypeNameCount, metrics.MetricTag{Key: "kind", Val: "memory"}) @@ -120,6 +142,7 @@ func (m *MalachiteMetricsProvisioner) updateSystemStats() { systemIOData, err := m.malachiteClient.GetSystemIOStats() if err != nil { + errList = append(errList, err) klog.Errorf("[malachite] get system io stats failed, err %v", err) _ = m.emitter.StoreInt64(metricsNameMalachiteGetSystemStatusFailed, 1, metrics.MetricTypeNameCount, metrics.MetricTag{Key: "kind", Val: "io"}) @@ -129,19 +152,24 @@ func (m *MalachiteMetricsProvisioner) updateSystemStats() { systemNetData, err := m.malachiteClient.GetSystemNetStats() if err != nil { + errList = append(errList, err) klog.Errorf("[malachite] get system net stats failed, err %v", err) _ = m.emitter.StoreInt64(metricsNameMalachiteGetSystemStatusFailed, 1, metrics.MetricTypeNameCount, metrics.MetricTag{Key: "kind", Val: "net"}) } else { m.processSystemNetData(systemNetData) } + + return errors.NewAggregate(errList) } -func (m *MalachiteMetricsProvisioner) updateCgroupData() { +func (m *MalachiteMetricsProvisioner) updateCgroupData() error { cgroupPaths := []string{m.baseConf.ReclaimRelativeRootCgroupPath, common.CgroupFsRootPathBurstable, common.CgroupFsRootPathBestEffort} + errList := make([]error, 0) for _, path := range cgroupPaths { stats, err := m.malachiteClient.GetCgroupStats(path) if err != nil { + errList = append(errList, err) general.Errorf("GetCgroupStats %v err %v", path, err) continue } @@ -151,10 +179,12 @@ func (m *MalachiteMetricsProvisioner) updateCgroupData() { m.processCgroupNetData(path, stats) m.processCgroupPerNumaMemoryData(path, stats) } + + return errors.NewAggregate(errList) } // Get raw cgroup data by malachite sdk and set container metrics to metricStore, GC not existed pod metrics -func (m *MalachiteMetricsProvisioner) updatePodsCgroupData(ctx context.Context) { +func (m *MalachiteMetricsProvisioner) updatePodsCgroupData(ctx context.Context) error { podsContainersStats, err := m.malachiteClient.GetAllPodContainersStats(ctx) if err != nil { klog.Errorf("[malachite] GetAllPodsContainersStats failed, error %v", err) @@ -174,6 +204,7 @@ func (m *MalachiteMetricsProvisioner) updatePodsCgroupData(ctx context.Context) } } m.metricStore.GCPodsMetric(podUIDSet) + return err } func (m *MalachiteMetricsProvisioner) processSystemComputeData(systemComputeData *malachitetypes.SystemComputeData) { diff --git a/pkg/metaserver/agent/pod/pod.go b/pkg/metaserver/agent/pod/pod.go index 034d18fc8..d1a863eb1 100644 --- a/pkg/metaserver/agent/pod/pod.go +++ b/pkg/metaserver/agent/pod/pod.go @@ -48,6 +48,10 @@ type ContextKey string const ( BypassCacheKey ContextKey = "bypass_cache" BypassCacheTrue ContextKey = "true" + + podFetcherKubeletHealthCheckName = "pod_fetcher_kubelet" + podFetcherRuntimeHealthCheckName = "pod_fetcher_runtime" + tolerationTurns = 3 ) type PodFetcher interface { @@ -145,6 +149,11 @@ func (w *podFetcherImpl) Run(ctx context.Context) { Op: fsnotify.Create, } + general.RegisterHeartbeatCheck(podFetcherKubeletHealthCheckName, tolerationTurns*w.podConf.KubeletPodCacheSyncPeriod, + general.HealthzCheckStateNotReady, tolerationTurns*w.podConf.KubeletPodCacheSyncPeriod) + general.RegisterHeartbeatCheck(podFetcherRuntimeHealthCheckName, tolerationTurns*w.podConf.RuntimePodCacheSyncPeriod, + general.HealthzCheckStateNotReady, tolerationTurns*w.podConf.RuntimePodCacheSyncPeriod) + watcherCh, err := general.RegisterFileEventWatcher(ctx.Done(), watcherInfo) if err != nil { klog.Fatalf("register file event watcher failed: %s", err) @@ -236,6 +245,7 @@ func (w *podFetcherImpl) syncRuntimePod(_ context.Context) { } runtimePods, err := w.runtimePodFetcher.GetPods(false) + _ = general.UpdateHealthzStateByError(podFetcherRuntimeHealthCheckName, err) if err != nil { klog.Errorf("sync runtime pod failed: %s", err) _ = w.emitter.StoreInt64(metricsNamePodCacheSync, 1, metrics.MetricTypeNameCount, @@ -266,6 +276,7 @@ func (w *podFetcherImpl) syncRuntimePod(_ context.Context) { // syncKubeletPod sync local kubelet pod cache from kubelet pod fetcher. func (w *podFetcherImpl) syncKubeletPod(ctx context.Context) { kubeletPods, err := w.kubeletPodFetcher.GetPodList(ctx, nil) + _ = general.UpdateHealthzStateByError(podFetcherKubeletHealthCheckName, err) if err != nil { klog.Errorf("sync kubelet pod failed: %s", err) _ = w.emitter.StoreInt64(metricsNamePodCacheSync, 1, metrics.MetricTypeNameCount,