Skip to content

Commit

Permalink
feat(sysadvisor): add healthz check (#520)
Browse files Browse the repository at this point in the history
  • Loading branch information
zzzzhhb committed Apr 8, 2024
1 parent 490f9fa commit e717ecc
Show file tree
Hide file tree
Showing 19 changed files with 193 additions and 113 deletions.
6 changes: 6 additions & 0 deletions pkg/agent/evictionmanager/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

pluginapi "github.com/kubewharf/katalyst-api/pkg/protocol/evictionplugin/v1alpha1"
"github.com/kubewharf/katalyst-core/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/util/general"
)

var (
Expand Down Expand Up @@ -89,6 +90,10 @@ func (m *EvictionManger) getNodeTaintsFromConditions() []v1.Taint {
}

func (m *EvictionManger) reportConditionsAsNodeTaints(ctx context.Context) {
var err error
defer func() {
_ = general.UpdateHealthzStateByError(reportTaintHealthCheckName, err)
}()
node, err := m.metaGetter.GetNode(ctx)

if err != nil {
Expand All @@ -111,6 +116,7 @@ func (m *EvictionManger) reportConditionsAsNodeTaints(ctx context.Context) {

if !controllerutil.SwapNodeControllerTaint(ctx, m.genericClient.KubeClient, taintsToAdd, taintsToDel, node) {
klog.Errorf("failed to swap taints")
err = fmt.Errorf("failed to swap taints")
}

return
Expand Down
43 changes: 36 additions & 7 deletions pkg/agent/evictionmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
//nolint
"github.com/golang/protobuf/proto"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/events"
Expand Down Expand Up @@ -72,6 +73,11 @@ const (
UserUnknown = "unknown"

MetricsPodLabelPrefix = "pod"

evictionManagerHealthCheckName = "eviction_manager_sync"
reportTaintHealthCheckName = "eviction_manager_report_taint"
syncTolerationTurns = 3
reportTaintToleration = 15 * time.Second
)

// LatestCNRGetter returns the latest CNR resources.
Expand Down Expand Up @@ -207,7 +213,10 @@ func (m *EvictionManger) getEvictionPlugins(genericClient *client.GenericClientS
func (m *EvictionManger) Run(ctx context.Context) {
general.Infof(" run with podKiller %v", m.podKiller.Name())
defer general.Infof(" started")

general.RegisterHeartbeatCheck(evictionManagerHealthCheckName, syncTolerationTurns*m.conf.EvictionManagerSyncPeriod,
general.HealthzCheckStateNotReady, syncTolerationTurns*m.conf.EvictionManagerSyncPeriod)
general.RegisterHeartbeatCheck(reportTaintHealthCheckName, reportTaintToleration,
general.HealthzCheckStateNotReady, reportTaintToleration)
m.podKiller.Start(ctx)
for _, endpoint := range m.endpoints {
endpoint.Start()
Expand All @@ -220,6 +229,11 @@ func (m *EvictionManger) Run(ctx context.Context) {
}

func (m *EvictionManger) sync(ctx context.Context) {
var err error
defer func() {
_ = general.UpdateHealthzStateByError(evictionManagerHealthCheckName, err)
}()

activePods, err := m.metaGetter.GetPodList(ctx, native.PodIsActive)
if err != nil {
general.Errorf("failed to list pods from metaServer: %v", err)
Expand All @@ -233,14 +247,25 @@ func (m *EvictionManger) sync(ctx context.Context) {
general.Infof(" currently, there are %v candidate pods", len(pods))
_ = m.emitter.StoreInt64(MetricsNameCandidatePodCNT, int64(len(pods)), metrics.MetricTypeNameRaw)

collector := m.collectEvictionResult(pods)
errList := make([]error, 0)
collector, collectErr := m.collectEvictionResult(pods)
if collectErr != nil {
errList = append(errList, collectErr)
}

m.doEvict(collector.getSoftEvictPods(), collector.getForceEvictPods())
evictErr := m.doEvict(collector.getSoftEvictPods(), collector.getForceEvictPods())
if evictErr != nil {
errList = append(errList, evictErr)
}
if len(errList) > 0 {
err = errors.NewAggregate(errList)
}
}

func (m *EvictionManger) collectEvictionResult(pods []*v1.Pod) *evictionRespCollector {
func (m *EvictionManger) collectEvictionResult(pods []*v1.Pod) (*evictionRespCollector, error) {
dynamicConfig := m.conf.GetDynamicConfiguration()
collector := newEvictionRespCollector(dynamicConfig.DryRun, m.conf, m.emitter)
var errList []error

m.endpointLock.RLock()
for pluginName, ep := range m.endpoints {
Expand All @@ -252,6 +277,7 @@ func (m *EvictionManger) collectEvictionResult(pods []*v1.Pod) *evictionRespColl
})
if err != nil {
general.Errorf(" calling GetEvictPods of plugin: %s failed with error: %v", pluginName, err)
errList = append(errList, err)
} else if getEvictResp == nil {
general.Errorf(" calling GetEvictPods of plugin: %s and getting nil resp", pluginName)
} else {
Expand All @@ -262,6 +288,7 @@ func (m *EvictionManger) collectEvictionResult(pods []*v1.Pod) *evictionRespColl
metResp, err := ep.ThresholdMet(context.Background())
if err != nil {
general.Errorf(" calling ThresholdMet of plugin: %s failed with error: %v", pluginName, err)
errList = append(errList, err)
continue
} else if metResp == nil {
general.Errorf(" calling ThresholdMet of plugin: %s and getting nil resp", pluginName)
Expand Down Expand Up @@ -310,6 +337,7 @@ func (m *EvictionManger) collectEvictionResult(pods []*v1.Pod) *evictionRespColl
m.endpointLock.RUnlock()
if err != nil {
general.Errorf(" calling GetTopEvictionPods of plugin: %s failed with error: %v", pluginName, err)
errList = append(errList, err)
continue
} else if resp == nil {
general.Errorf(" calling GetTopEvictionPods of plugin: %s and getting nil resp", pluginName)
Expand All @@ -322,10 +350,10 @@ func (m *EvictionManger) collectEvictionResult(pods []*v1.Pod) *evictionRespColl
collector.collectTopEvictionPods(dynamicConfig.DryRun, pluginName, threshold, resp)
}

return collector
return collector, errors.NewAggregate(errList)
}

func (m *EvictionManger) doEvict(softEvictPods, forceEvictPods map[string]*rule.RuledEvictPod) {
func (m *EvictionManger) doEvict(softEvictPods, forceEvictPods map[string]*rule.RuledEvictPod) error {
softEvictPods = filterOutCandidatePodsWithForcePods(softEvictPods, forceEvictPods)
bestSuitedCandidate := m.getEvictPodFromCandidates(softEvictPods)
if bestSuitedCandidate != nil && bestSuitedCandidate.Pod != nil {
Expand All @@ -346,13 +374,14 @@ func (m *EvictionManger) doEvict(softEvictPods, forceEvictPods map[string]*rule.
err := m.killWithRules(rpList)
if err != nil {
general.Errorf(" got err: %v in EvictPods", err)
return
return err
}

general.Infof(" evict %d pods in evictionmanager", len(rpList))
_ = m.emitter.StoreInt64(MetricsNameVictimPodCNT, int64(len(rpList)), metrics.MetricTypeNameRaw,
metrics.MetricTag{Key: "type", Val: "total"})
metricPodsToEvict(m.emitter, rpList, m.conf.GenericConfiguration.QoSConfiguration, m.conf.GenericEvictionConfiguration.PodMetricLabels)
return nil
}

// ValidatePlugin validates a plugin if the version is correct and the name has the format of an extended resource
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/evictionmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func TestEvictionManger_collectEvictionResult(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
mgr.conf.GetDynamicConfiguration().DryRun = tt.dryrun

collector := mgr.collectEvictionResult(pods)
collector, _ := mgr.collectEvictionResult(pods)
gotForceEvictPods := sets.String{}
gotSoftEvictPods := sets.String{}
gotConditions := sets.String{}
Expand Down
26 changes: 18 additions & 8 deletions pkg/agent/evictionmanager/plugin/memory/system_pressure.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ const (
EvictionPluginNameSystemMemoryPressure = "system-memory-pressure-eviction-plugin"
EvictionScopeSystemMemory = "SystemMemory"
evictionConditionMemoryPressure = "MemoryPressure"
systemMemoryPressureHealthCheck = "system_memory_pressure_eviction_detect"
syncTolerationTurns = 3
)

func NewSystemPressureEvictionPlugin(_ *client.GenericClientSet, _ events.EventRecorder,
Expand Down Expand Up @@ -99,6 +101,8 @@ func (s *SystemPressureEvictionPlugin) Name() string {
}

func (s *SystemPressureEvictionPlugin) Start() {
general.RegisterHeartbeatCheck(systemMemoryPressureHealthCheck, syncTolerationTurns*s.syncPeriod,
general.HealthzCheckStateNotReady, syncTolerationTurns*s.syncPeriod)
go wait.UntilWithContext(context.TODO(), s.detectSystemPressures, s.syncPeriod)
}

Expand Down Expand Up @@ -137,12 +141,16 @@ func (s *SystemPressureEvictionPlugin) ThresholdMet(_ context.Context) (*plugina
func (s *SystemPressureEvictionPlugin) detectSystemPressures(_ context.Context) {
s.Lock()
defer s.Unlock()
var err error
defer func() {
_ = general.UpdateHealthzStateByError(systemMemoryPressureHealthCheck, err)
}()

s.isUnderSystemPressure = false
s.systemAction = actionNoop

s.detectSystemWatermarkPressure()
s.detectSystemKswapdStealPressure()
err = s.detectSystemWatermarkPressure()
err = s.detectSystemKswapdStealPressure()

switch s.systemAction {
case actionReclaimedEviction:
Expand All @@ -162,15 +170,15 @@ func (s *SystemPressureEvictionPlugin) detectSystemPressures(_ context.Context)
}
}

func (s *SystemPressureEvictionPlugin) detectSystemWatermarkPressure() {
func (s *SystemPressureEvictionPlugin) detectSystemWatermarkPressure() error {
free, total, scaleFactor, err := helper.GetWatermarkMetrics(s.metaServer.MetricsFetcher, s.emitter, nonExistNumaID)
if err != nil {
_ = s.emitter.StoreInt64(metricsNameFetchMetricError, 1, metrics.MetricTypeNameCount,
metrics.ConvertMapToTags(map[string]string{
metricsTagKeyNumaID: strconv.Itoa(nonExistNumaID),
})...)
general.Errorf("failed to getWatermarkMetrics for system, err: %v", err)
return
return err
}

thresholdMinimum := float64(s.dynamicConfig.GetDynamicConfiguration().SystemFreeMemoryThresholdMinimum)
Expand All @@ -184,9 +192,10 @@ func (s *SystemPressureEvictionPlugin) detectSystemWatermarkPressure() {
s.isUnderSystemPressure = true
s.systemAction = actionReclaimedEviction
}
return nil
}

func (s *SystemPressureEvictionPlugin) detectSystemKswapdStealPressure() {
func (s *SystemPressureEvictionPlugin) detectSystemKswapdStealPressure() error {
kswapdSteal, err := helper.GetNodeMetricWithTime(s.metaServer.MetricsFetcher, s.emitter, consts.MetricMemKswapdstealSystem)
if err != nil {
s.kswapdStealPreviousCycle = kswapdStealPreviousCycleMissing
Expand All @@ -196,12 +205,12 @@ func (s *SystemPressureEvictionPlugin) detectSystemKswapdStealPressure() {
metricsTagKeyNumaID: strconv.Itoa(nonExistNumaID),
})...)
general.Errorf("failed to getSystemKswapdStealMetrics, err: %v", err)
return
return err
}

if kswapdSteal.Time.Equal(s.kswapdStealPreviousCycleTime) {
general.Warningf("getSystemKswapdStealMetrics get same result as last round,skip current round")
return
return nil
}

dynamicConfig := s.dynamicConfig.GetDynamicConfiguration()
Expand Down Expand Up @@ -229,7 +238,7 @@ func (s *SystemPressureEvictionPlugin) detectSystemKswapdStealPressure() {
s.kswapdStealPreviousCycleTime = *(kswapdSteal.Time)
if kswapdStealPreviousCycle == kswapdStealPreviousCycleMissing {
general.Warningf("kswapd steal of the previous cycle is missing")
return
return nil
}

if (kswapdSteal.Value-kswapdStealPreviousCycle)/(kswapdSteal.Time.Sub(kswapdStealPreviousCycleTime)).Seconds() >= float64(dynamicConfig.SystemKswapdRateThreshold) {
Expand All @@ -249,6 +258,7 @@ func (s *SystemPressureEvictionPlugin) detectSystemKswapdStealPressure() {
s.systemAction = actionEviction
}
}
return nil
}

func (s *SystemPressureEvictionPlugin) GetTopEvictionPods(_ context.Context, request *pluginapi.GetTopEvictionPodsRequest) (*pluginapi.GetTopEvictionPodsResponse, error) {
Expand Down
35 changes: 21 additions & 14 deletions pkg/agent/resourcemanager/fetcher/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import (

const reporterManagerCheckpoint = "reporter_manager_checkpoint"

const healthzNameReporterFetcherReady = "ReporterFetcherReady"
const reporterFetcherHealthCheckName = "reporter_fetcher_sync"

const healthzGracePeriodMultiplier = 3

Expand Down Expand Up @@ -220,14 +220,12 @@ func (m *ReporterPluginManager) DeRegisterPlugin(pluginName string) {

// Run start the reporter plugin manager
func (m *ReporterPluginManager) Run(ctx context.Context) {
general.RegisterHeartbeatCheck(reporterFetcherHealthCheckName, m.reconcilePeriod*healthzGracePeriodMultiplier,
general.HealthzCheckStateReady, m.reconcilePeriod*healthzGracePeriodMultiplier)
go wait.UntilWithContext(ctx, m.syncFunc, m.reconcilePeriod)

klog.Infof("reporter plugin manager started")
m.reporter.Run(ctx)

general.RegisterHeartbeatCheck(healthzNameReporterFetcherReady, m.reconcilePeriod*healthzGracePeriodMultiplier,
general.HealthzCheckStateReady, m.reconcilePeriod*healthzGracePeriodMultiplier)
go wait.UntilWithContext(ctx, m.healthz, m.reconcilePeriod)
}

func (m *ReporterPluginManager) isVersionCompatibleWithPlugin(versions []string) bool {
Expand Down Expand Up @@ -294,9 +292,9 @@ func (m *ReporterPluginManager) runEndpoint(pluginName string, e plugin.Endpoint
// and the manager can read it from Endpoint cache to obtain content changes initiative
func (m *ReporterPluginManager) genericCallback(pluginName string, _ *v1alpha1.GetReportContentResponse) {
klog.Infof("genericCallback")
// get report content from each healthy Endpoint from cache, the lastly response
// get report content from each healthy Endpoint from cache, the last response
// from this plugin has been already stored to its Endpoint cache before this callback called
reportResponses := m.getReportContent(true)
reportResponses, _ := m.getReportContent(true)

err := m.pushContents(context.Background(), reportResponses)
if err != nil {
Expand All @@ -318,6 +316,11 @@ func (m *ReporterPluginManager) pushContents(ctx context.Context, reportResponse
// genericSync periodically calls the Get function to obtain content changes
func (m *ReporterPluginManager) genericSync(ctx context.Context) {
klog.Infof("genericSync")
errList := make([]error, 0)
defer func() {
_ = general.UpdateHealthzStateByError(reporterFetcherHealthCheckName, errors.NewAggregate(errList))
}()

begin := time.Now()
defer func() {
costs := time.Since(begin)
Expand All @@ -329,15 +332,17 @@ func (m *ReporterPluginManager) genericSync(ctx context.Context) {
m.clearUnhealthyPlugin()

// get report content from each healthy Endpoint directly
reportResponses := m.getReportContent(false)

err := m.pushContents(ctx, reportResponses)
reportResponses, err := m.getReportContent(false)
if err != nil {
errList = append(errList, err)
}

pushErr := m.pushContents(ctx, reportResponses)
if pushErr != nil {
_ = m.emitter.StoreInt64("reporter_plugin_sync_push_failed", 1, metrics.MetricTypeNameCount)
klog.Errorf("report plugin failed with error: %v", err)
errList = append(errList, pushErr)
}

m.healthzSyncLoop()
}

// clearUnhealthyPlugin is to clear stopped plugins from cache which exceeded grace period
Expand All @@ -360,8 +365,9 @@ func (m *ReporterPluginManager) clearUnhealthyPlugin() {

// getReportContent is to get reportContent from plugins. if cacheFirst is true,
// use plugin cache (when it is no nil), otherwise we call plugin directly.
func (m *ReporterPluginManager) getReportContent(cacheFirst bool) map[string]*v1alpha1.GetReportContentResponse {
func (m *ReporterPluginManager) getReportContent(cacheFirst bool) (map[string]*v1alpha1.GetReportContentResponse, error) {
reportResponses := make(map[string]*v1alpha1.GetReportContentResponse)
errList := make([]error, 0)

begin := time.Now()
m.mutex.Lock()
Expand Down Expand Up @@ -395,6 +401,7 @@ func (m *ReporterPluginManager) getReportContent(cacheFirst bool) map[string]*v1
klog.InfoS("GetReportContent", "costs", epCosts, "pluginName", pluginName)
_ = m.emitter.StoreInt64(metricsNameGetContentPluginCost, epCosts.Microseconds(), metrics.MetricTypeNameRaw, []metrics.MetricTag{{Key: "plugin", Val: pluginName}}...)
if err != nil {
errList = append(errList, err)
s, _ := status.FromError(err)
_ = m.emitter.StoreInt64("reporter_plugin_get_content_failed", 1, metrics.MetricTypeNameCount, []metrics.MetricTag{
{Key: "code", Val: s.Code().String()},
Expand All @@ -409,7 +416,7 @@ func (m *ReporterPluginManager) getReportContent(cacheFirst bool) map[string]*v1
reportResponses[pluginName] = resp
}

return reportResponses
return reportResponses, errors.NewAggregate(errList)
}

func (m *ReporterPluginManager) writeCheckpoint(reportResponses map[string]*v1alpha1.GetReportContentResponse) error {
Expand Down
Loading

0 comments on commit e717ecc

Please sign in to comment.