Skip to content

Commit

Permalink
report topology provider policy in agent
Browse files Browse the repository at this point in the history
  • Loading branch information
WangZzzhe committed Feb 7, 2024
1 parent d2c069a commit 60ff01e
Show file tree
Hide file tree
Showing 30 changed files with 2,442 additions and 144 deletions.
3 changes: 3 additions & 0 deletions cmd/base/context_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"reflect"
"strconv"

nodev1alpha1 "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1"

"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -241,6 +243,7 @@ func GenerateFakeGenericContext(objects ...[]runtime.Object) (*GenericContext, e
utilruntime.Must(v1alpha1.AddToScheme(scheme))
utilruntime.Must(overcommitapis.AddToScheme(scheme))
utilruntime.Must(apiregistration.AddToScheme(scheme))
utilruntime.Must(nodev1alpha1.AddToScheme(scheme))

fakeMetaClient := metaFake.NewSimpleMetadataClient(scheme, nilObjectFilter(metaObjects)...)
fakeInternalClient := externalfake.NewSimpleClientset(nilObjectFilter(internalObjects)...)
Expand Down
5 changes: 3 additions & 2 deletions cmd/katalyst-scheduler/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,9 @@ func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions
return nil, nil, err
}

eventhandlers.AddCNREventHandler(cc.InformerFactory, cc.InternalInformerFactory)
eventhandlers.AddPodEventHandler(cc.InformerFactory, cc.InternalInformerFactory)
for _, handlerFunc := range eventhandlers.ListEventHandlerFunc() {
handlerFunc(cc.InformerFactory, cc.InternalInformerFactory)
}

return &cc, sched, nil
}
2 changes: 2 additions & 0 deletions cmd/katalyst-scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/component-base/logs"

"github.com/kubewharf/katalyst-core/cmd/katalyst-scheduler/app"
"github.com/kubewharf/katalyst-core/pkg/scheduler/plugins/nodeovercommitment"
"github.com/kubewharf/katalyst-core/pkg/scheduler/plugins/noderesourcetopology"
"github.com/kubewharf/katalyst-core/pkg/scheduler/plugins/qosawarenoderesources"

Expand All @@ -38,6 +39,7 @@ func main() {
app.WithPlugin(qosawarenoderesources.FitName, qosawarenoderesources.NewFit),
app.WithPlugin(qosawarenoderesources.BalancedAllocationName, qosawarenoderesources.NewBalancedAllocation),
app.WithPlugin(noderesourcetopology.TopologyMatchName, noderesourcetopology.New),
app.WithPlugin(nodeovercommitment.Name, nodeovercommitment.New),
)

if err := runCommand(command); err != nil {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ require (
)

replace (
github.com/kubewharf/katalyst-api => github.com/WangZzzhe/katalyst-api v0.0.0-20240207080647-ddea2f8a9aa7
k8s.io/api => k8s.io/api v0.24.6
k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.24.6
k8s.io/apimachinery => k8s.io/apimachinery v0.24.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWX
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
github.com/WangZzzhe/katalyst-api v0.0.0-20240207080647-ddea2f8a9aa7 h1:cWsFBKMwiHgmiUb5r1kkapnd2RoEsG3gYXVj25F7drY=
github.com/WangZzzhe/katalyst-api v0.0.0-20240207080647-ddea2f8a9aa7/go.mod h1:YJJwV7ucsCM9np4KR3FXQTpNVlQS4ceaMl5lxIXXZfo=
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down Expand Up @@ -546,8 +548,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubewharf/katalyst-api v0.4.1-0.20240118125832-edab146c0c0c h1:z8aEJTcfu8mv7AuVAQK+uVMjUcR1eSeE1ELoA9hmKGw=
github.com/kubewharf/katalyst-api v0.4.1-0.20240118125832-edab146c0c0c/go.mod h1:YJJwV7ucsCM9np4KR3FXQTpNVlQS4ceaMl5lxIXXZfo=
github.com/kubewharf/kubelet v1.24.6-kubewharf.8 h1:2e89T/nZTgzaVhyRsZuwEdRk8V8kJXs4PRkgfeG4Ai4=
github.com/kubewharf/kubelet v1.24.6-kubewharf.8/go.mod h1:MxbSZUx3wXztFneeelwWWlX7NAAStJ6expqq7gY2J3c=
github.com/kyoh86/exportloopref v0.1.7/go.mod h1:h1rDl2Kdj97+Kwh4gdz3ujE7XHmH51Q0lUiZ1z4NLj8=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func NewOvercommitmentAwarePlugin(

realtimeOvercommitmentAdvisor := realtime.NewRealtimeOvercommitmentAdvisor(conf, metaServer, emitter)

overcommitRatioReporter, err := reporter.NewOvercommitRatioReporter(emitter, conf, realtimeOvercommitmentAdvisor)
overcommitRatioReporter, err := reporter.NewOvercommitRatioReporter(emitter, conf, realtimeOvercommitmentAdvisor, metaServer)
if err != nil {
return nil, err
}
Expand Down
123 changes: 114 additions & 9 deletions pkg/agent/sysadvisor/plugin/overcommitmentaware/reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,24 @@ import (
"context"
"encoding/json"
"fmt"
"strconv"
"sync"

"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
"k8s.io/kubernetes/pkg/features"

"github.com/kubewharf/katalyst-api/pkg/consts"
"github.com/kubewharf/katalyst-api/pkg/plugins/registration"
"github.com/kubewharf/katalyst-api/pkg/plugins/skeleton"
"github.com/kubewharf/katalyst-api/pkg/protocol/reporterplugin/v1alpha1"
"github.com/kubewharf/katalyst-core/pkg/config"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util"
"github.com/kubewharf/katalyst-core/pkg/util/native"
)

const (
Expand All @@ -55,8 +61,9 @@ func NewOvercommitRatioReporter(
emitter metrics.MetricEmitter,
conf *config.Configuration,
manager OvercommitManager,
metaServer *metaserver.MetaServer,
) (OvercommitRatioReporter, error) {
plugin, err := newOvercommitRatioReporterPlugin(emitter, conf, manager)
plugin, err := newOvercommitRatioReporterPlugin(emitter, conf, manager, metaServer)
if err != nil {
return nil, fmt.Errorf("[overcommit-reporter] create reporter failed: %v", err)
}
Expand All @@ -80,7 +87,8 @@ func (o *overcommitRatioReporterImpl) Run(ctx context.Context) {
type OvercommitRatioReporterPlugin struct {
sync.Mutex

manager OvercommitManager
manager OvercommitManager
metaServer *metaserver.MetaServer

ctx context.Context
cancel context.CancelFunc
Expand All @@ -91,10 +99,12 @@ func newOvercommitRatioReporterPlugin(
emitter metrics.MetricEmitter,
conf *config.Configuration,
overcommitManager OvercommitManager,
metaserver *metaserver.MetaServer,
) (skeleton.GenericPlugin, error) {

reporter := &OvercommitRatioReporterPlugin{
manager: overcommitManager,
manager: overcommitManager,
metaServer: metaserver,
}

return skeleton.NewRegistrationPluginWrapper(reporter, []string{conf.PluginRegistrationDir},
Expand Down Expand Up @@ -146,6 +156,10 @@ func (o *OvercommitRatioReporterPlugin) Stop() error {
// Since the metrics collected by Manager are already an average within a time period,
// we expect a faster response to node load fluctuations to avoid excessive overcommit of online resources.
func (o *OvercommitRatioReporterPlugin) GetReportContent(_ context.Context, _ *v1alpha1.Empty) (*v1alpha1.GetReportContentResponse, error) {
response := &v1alpha1.GetReportContentResponse{
Content: []*v1alpha1.ReportContent{},
}

overcommitRatioMap, err := o.manager.GetOvercommitRatio()
if err != nil {
klog.Errorf("OvercommitRatioReporterPlugin GetOvercommitMent fail: %v", err)
Expand All @@ -154,16 +168,20 @@ func (o *OvercommitRatioReporterPlugin) GetReportContent(_ context.Context, _ *v
klog.V(6).Infof("reporter get overcommit ratio: %v", overcommitRatioMap)

// overcommit data to CNR data
reportToCNR, err := o.overcommitRatioToCNRAnnotation(overcommitRatioMap)
overcommitRatioContent, err := o.overcommitRatioToCNRAnnotation(overcommitRatioMap)
if err != nil {
return nil, err
}
response.Content = append(response.Content, overcommitRatioContent)

return &v1alpha1.GetReportContentResponse{
Content: []*v1alpha1.ReportContent{
reportToCNR,
},
}, nil
// get topologyProvider and guaranteed cpus
topologyProviderContent, err := o.getTopologyProviderReportContent(o.ctx)
if err != nil {
return nil, err
}
response.Content = append(response.Content, topologyProviderContent)

return response, nil
}

func (o *OvercommitRatioReporterPlugin) ListAndWatchReportContent(_ *v1alpha1.Empty, server v1alpha1.ReporterPlugin_ListAndWatchReportContentServer) error {
Expand Down Expand Up @@ -211,3 +229,90 @@ func (o *OvercommitRatioReporterPlugin) overcommitRatioToCNRAnnotation(overcommi
},
}, nil
}

func (o *OvercommitRatioReporterPlugin) getTopologyProviderReportContent(ctx context.Context) (*v1alpha1.ReportContent, error) {
annotations, err := o.getTopologyProvider(ctx)
if err != nil {
return nil, fmt.Errorf("get topology provider from adapter failed: %v", err)
}

guaranteedCPUs := "0"
if annotations[consts.KCNRAnnotationCPUManager] != string(consts.CPUManagerPolicyNone) {
guaranteedCPUs, err = o.getGuaranteedCPUs(ctx)
if err != nil {
return nil, err
}
}
annotations[consts.KCNRAnnotationGuaranteedCPUs] = guaranteedCPUs

value, err := json.Marshal(&annotations)
if err != nil {
return nil, errors.Wrap(err, "marshal topology provider failed")
}

return &v1alpha1.ReportContent{
GroupVersionKind: &util.CNRGroupVersionKind,
Field: []*v1alpha1.ReportField{
{
FieldType: v1alpha1.FieldType_Metadata,
FieldName: util.CNRFieldNameAnnotations,
Value: value,
},
},
}, nil
}

func (o *OvercommitRatioReporterPlugin) getTopologyProvider(ctx context.Context) (map[string]string, error) {
klConfig, err := o.metaServer.GetKubeletConfig(ctx)
if err != nil {
return nil, fmt.Errorf("get kubelet config fail: %v", err)
}

return generateProviderPolicies(klConfig), nil
}

func (o *OvercommitRatioReporterPlugin) getGuaranteedCPUs(ctx context.Context) (string, error) {
podList, err := o.metaServer.GetPodList(ctx, func(pod *v1.Pod) bool {
return true
})

if err != nil {
return "", errors.Wrap(err, "get pod list from metaserver failed")
}

cpus := 0
for _, pod := range podList {
cpus += native.PodGuaranteedCPUs(pod)
}

return strconv.Itoa(cpus), nil
}

func generateProviderPolicies(kubeletConfig *kubeletconfigv1beta1.KubeletConfiguration) map[string]string {
klog.V(5).Infof("generateProviderPolicies featureGates: %v, cpuManagerPolicy: %v, memoryManagerPolicy: %v",
kubeletConfig.FeatureGates, features.CPUManager, features.MemoryManager)

featureGates := kubeletConfig.FeatureGates

res := map[string]string{
consts.KCNRAnnotationCPUManager: string(consts.CPUManagerOff),
consts.KCNRAnnotationMemoryManager: string(consts.MemoryManagerOff),
}

on, ok := featureGates[string(features.CPUManager)]
// default true
if (ok && on) || (!ok) {
if kubeletConfig.CPUManagerPolicy != "" {
res[consts.KCNRAnnotationCPUManager] = kubeletConfig.CPUManagerPolicy
}
}

on, ok = featureGates[string(features.MemoryManager)]
if (ok && on) || (!ok) {
if kubeletConfig.MemoryManagerPolicy != "" {
res[consts.KCNRAnnotationMemoryManager] = kubeletConfig.MemoryManagerPolicy
}
}

return res
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,39 @@ import (
"fmt"
"testing"

"github.com/kubewharf/katalyst-api/pkg/consts"

"github.com/stretchr/testify/assert"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
v12 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"

"github.com/kubewharf/katalyst-api/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/metaserver"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/kubeletconfig"
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod"
)

func TestGetReportContent(t *testing.T) {
t.Parallel()

fakeKubeletConfig := kubeletconfigv1beta1.KubeletConfiguration{
FeatureGates: map[string]bool{
string(features.CPUManager): true,
string(features.MemoryManager): false,
},
CPUManagerPolicy: string(cpumanager.PolicyStatic),
}

p := &OvercommitRatioReporterPlugin{
manager: NewFakeOvercommitManager(map[v1.ResourceName]float64{}),
metaServer: &metaserver.MetaServer{
MetaAgent: &agent.MetaAgent{
KubeletConfigFetcher: kubeletconfig.NewFakeKubeletConfigFetcher(fakeKubeletConfig),
},
},
}

_, err := p.GetReportContent(context.TODO(), nil)
Expand All @@ -43,17 +66,53 @@ func TestGetReportContent(t *testing.T) {
v1.ResourceMemory: 1.2678,
v1.ResourceStorage: 1.0,
}),
metaServer: &metaserver.MetaServer{
MetaAgent: &agent.MetaAgent{
KubeletConfigFetcher: kubeletconfig.NewFakeKubeletConfigFetcher(fakeKubeletConfig),
PodFetcher: &pod.PodFetcherStub{
PodList: []*v1.Pod{
{
ObjectMeta: v12.ObjectMeta{
Name: "p1",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "c1",
Resources: v1.ResourceRequirements{
Limits: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: resource.MustParse("1"),
v1.ResourceMemory: resource.MustParse("4Gi"),
},
Requests: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: resource.MustParse("1"),
v1.ResourceMemory: resource.MustParse("4Gi"),
},
},
},
},
},
},
},
},
},
},
}

res, err := p.GetReportContent(context.TODO(), nil)
assert.NoError(t, err)
assert.Equal(t, 1, len(res.Content))
assert.Equal(t, 2, len(res.Content))

ratio := map[string]string{}
err = json.Unmarshal(res.Content[0].Field[0].Value, &ratio)
assert.NoError(t, err)
assert.Equal(t, "1.51", ratio[consts.NodeAnnotationCPUOvercommitRatioKey])
assert.Equal(t, "1.27", ratio[consts.NodeAnnotationMemoryOvercommitRatioKey])

anno := map[string]string{}
err = json.Unmarshal(res.Content[1].Field[0].Value, &anno)
assert.NoError(t, err)
assert.Equal(t, "static", anno[string(consts.KCNRAnnotationCPUManager)])
}

func TestStart(t *testing.T) {
Expand Down
Loading

0 comments on commit 60ff01e

Please sign in to comment.