Skip to content

Commit

Permalink
chore(sysadvisor): refine some healthcheck rule (#543)
Browse files Browse the repository at this point in the history
  • Loading branch information
zzzzhhb authored Apr 11, 2024
1 parent 76d9f46 commit 2fe7633
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 37 deletions.
18 changes: 2 additions & 16 deletions pkg/agent/resourcemanager/fetcher/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ import (

const reporterManagerCheckpoint = "reporter_manager_checkpoint"

const reporterFetcherHealthCheckName = "reporter_fetcher_sync"

const healthzGracePeriodMultiplier = 3

const (
metricsNameGetContentCost = "reporter_get_content_cost"
metricsNameGetContentPluginCost = "reporter_get_content_plugin_cost"
Expand Down Expand Up @@ -220,8 +216,6 @@ func (m *ReporterPluginManager) DeRegisterPlugin(pluginName string) {

// Run start the reporter plugin manager
func (m *ReporterPluginManager) Run(ctx context.Context) {
general.RegisterHeartbeatCheck(reporterFetcherHealthCheckName, m.reconcilePeriod*healthzGracePeriodMultiplier,
general.HealthzCheckStateReady, m.reconcilePeriod*healthzGracePeriodMultiplier)
go wait.UntilWithContext(ctx, m.syncFunc, m.reconcilePeriod)

klog.Infof("reporter plugin manager started")
Expand Down Expand Up @@ -316,10 +310,6 @@ func (m *ReporterPluginManager) pushContents(ctx context.Context, reportResponse
// genericSync periodically calls the Get function to obtain content changes
func (m *ReporterPluginManager) genericSync(ctx context.Context) {
klog.Infof("genericSync")
errList := make([]error, 0)
defer func() {
_ = general.UpdateHealthzStateByError(reporterFetcherHealthCheckName, errors.NewAggregate(errList))
}()

begin := time.Now()
defer func() {
Expand All @@ -332,16 +322,12 @@ func (m *ReporterPluginManager) genericSync(ctx context.Context) {
m.clearUnhealthyPlugin()

// get report content from each healthy Endpoint directly
reportResponses, err := m.getReportContent(false)
if err != nil {
errList = append(errList, err)
}
reportResponses, _ := m.getReportContent(false)

pushErr := m.pushContents(ctx, reportResponses)
if pushErr != nil {
_ = m.emitter.StoreInt64("reporter_plugin_sync_push_failed", 1, metrics.MetricTypeNameCount)
klog.Errorf("report plugin failed with error: %v", err)
errList = append(errList, pushErr)
klog.Errorf("report plugin failed with error: %v", pushErr)
}
}

Expand Down
8 changes: 0 additions & 8 deletions pkg/agent/resourcemanager/fetcher/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/kubewharf/katalyst-core/pkg/config/agent/metaserver"
reporterconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/reporter"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
)

const (
Expand Down Expand Up @@ -197,13 +196,6 @@ func TestHealthz(t *testing.T) {
t.Fatalf("timeout while waiting for manager update")
}

results := general.GetRegisterReadinessCheckResult()
for name, response := range results {
if reporterFetcherHealthCheckName == string(name) {
require.True(t, response.Ready)
}
}

_ = p.Stop()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util"
"github.com/kubewharf/katalyst-core/pkg/util/general"
)

func init() {
Expand Down Expand Up @@ -128,7 +127,6 @@ func (r *headroomReporterPlugin) Name() string {

func (r *headroomReporterPlugin) Start() (err error) {
r.Lock()
general.RegisterHeartbeatCheck(headroomReporterPluginName, healthCheckTolerationDuration, general.HealthzCheckStateReady, healthCheckTolerationDuration)
defer func() {
if err == nil {
r.started = true
Expand Down Expand Up @@ -166,9 +164,6 @@ func (r *headroomReporterPlugin) Stop() error {

func (r *headroomReporterPlugin) GetReportContent(_ context.Context, _ *v1alpha1.Empty) (*v1alpha1.GetReportContentResponse, error) {
var err error
defer func() {
_ = general.UpdateHealthzStateByError(headroomReporterPluginName, err)
}()
res, err := r.getReclaimedResource()
if err != nil {
return nil, err
Expand Down
15 changes: 9 additions & 6 deletions pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/advisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ const (
metricRegionIndicatorErrorPrefix = "region_indicator_error_"

cpuAdvisorHealthCheckName = "cpu_advisor_update"
healthCheckTolerationDuration = 15 * time.Second
healthCheckTolerationDuration = 30 * time.Second
)

var (
Expand Down Expand Up @@ -112,6 +112,7 @@ type cpuResourceAdvisor struct {
metaCache metacache.MetaCache
metaServer *metaserver.MetaServer
emitter metrics.MetricEmitter
doOnce sync.Once
}

// NewCPUResourceAdvisor returns a cpuResourceAdvisor instance
Expand Down Expand Up @@ -155,10 +156,13 @@ func NewCPUResourceAdvisor(conf *config.Configuration, extraConf interface{}, me
}

func (cra *cpuResourceAdvisor) Run(ctx context.Context) {
general.RegisterHeartbeatCheck(cpuAdvisorHealthCheckName, healthCheckTolerationDuration, general.HealthzCheckStateNotReady, healthCheckTolerationDuration)
for {
select {
case v := <-cra.recvCh:
cra.doOnce.Do(func() {
general.RegisterReportCheck(cpuAdvisorHealthCheckName, healthCheckTolerationDuration)
})

lag := time.Since(v.TimeStamp)
klog.Infof("[qosaware-cpu] receive update trigger, checkpoint at %v", v.TimeStamp)
cra.emitter.StoreFloat64(metricCPUAdvisorUpdateLag, float64(lag/time.Millisecond), metrics.MetricTypeNameRaw)
Expand All @@ -168,7 +172,9 @@ func (cra *cpuResourceAdvisor) Run(ctx context.Context) {
klog.Errorf("[qosaware-cpu] skip update: checkpoint is outdated, lag %v", lag)
continue
}
if err := cra.update(); err != nil {
err := cra.update()
_ = general.UpdateHealthzStateByError(cpuAdvisorHealthCheckName, err)
if err != nil {
klog.Errorf("[qosaware-cpu] failed to do update: %q", err)
continue
}
Expand Down Expand Up @@ -213,9 +219,6 @@ func (cra *cpuResourceAdvisor) GetHeadroom() (resource.Quantity, error) {
func (cra *cpuResourceAdvisor) update() (err error) {
cra.mutex.Lock()
defer cra.mutex.Unlock()
defer func() {
_ = general.UpdateHealthzStateByError(cpuAdvisorHealthCheckName, err)
}()
if err = cra.updateWithIsolationGuardian(true); err != nil {
if err == errIsolationSafetyCheckFailed {
klog.Warningf("[qosaware-cpu] failed to updateWithIsolationGuardian(true): %q", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,9 @@ func NewMemoryResourceAdvisor(conf *config.Configuration, extraConf interface{},

func (ra *memoryResourceAdvisor) Run(ctx context.Context) {
period := ra.conf.SysAdvisorPluginsConfiguration.QoSAwarePluginConfiguration.SyncPeriod
general.RegisterHeartbeatCheck(memoryHealthCheckName, healthCheckTolerationDuration, general.HealthzCheckStateNotReady, healthCheckTolerationDuration)
general.InfoS("wait to list containers")
<-ra.recvCh
general.RegisterHeartbeatCheck(memoryHealthCheckName, healthCheckTolerationDuration, general.HealthzCheckStateNotReady, healthCheckTolerationDuration)
general.InfoS("list containers successfully")

go wait.Until(ra.doUpdate, period, ctx.Done())
Expand Down
2 changes: 1 addition & 1 deletion pkg/metaserver/metaserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestMetaServer_Run(t *testing.T) {

go meta.Run(context.Background())

time.Sleep(3 * time.Millisecond)
time.Sleep(10 * time.Millisecond)

meta.Lock()
assert.True(t, meta.start)
Expand Down

0 comments on commit 2fe7633

Please sign in to comment.