Skip to content

Commit

Permalink
spd support cnc cache
Browse files Browse the repository at this point in the history
  • Loading branch information
luomingmeng committed Apr 10, 2024
1 parent a0246f3 commit cd34065
Show file tree
Hide file tree
Showing 15 changed files with 875 additions and 85 deletions.
5 changes: 5 additions & 0 deletions cmd/katalyst-controller/app/options/spd.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type SPDOptions struct {
ResyncPeriod time.Duration
SPDWorkloadGVResources []string
SPDPodLabelIndexerKeys []string
EnableCNCCache bool
IndicatorPlugins []string
BaselinePercent map[string]int64
}
Expand All @@ -38,6 +39,7 @@ type SPDOptions struct {
func NewSPDOptions() *SPDOptions {
return &SPDOptions{
ResyncPeriod: time.Second * 30,
EnableCNCCache: false,
BaselinePercent: map[string]int64{},
}
}
Expand All @@ -53,6 +55,8 @@ func (o *SPDOptions) AddFlags(fss *cliflag.NamedFlagSets) {
"SPDWorkloadGVResources should be in the format of `resource.version.group.com` like 'deployments.v1.apps'.")
fs.StringSliceVar(&o.SPDPodLabelIndexerKeys, "spd-pod-label-indexers", o.SPDPodLabelIndexerKeys, ""+
"A list of pod label keys to be used as indexers for pod informer")
fs.BoolVar(&o.EnableCNCCache, "spd-enable-cnc-cache", o.EnableCNCCache, ""+
"Whether enable cnc cache to reduce agent api-server remote request")
fs.StringSliceVar(&o.IndicatorPlugins, "spd-indicator-plugins", o.IndicatorPlugins,
"A list of indicator plugins to be used")
fs.StringToInt64Var(&o.BaselinePercent, "spd-qos-baseline-percent", o.BaselinePercent, ""+
Expand All @@ -64,6 +68,7 @@ func (o *SPDOptions) ApplyTo(c *controller.SPDConfig) error {
c.ReSyncPeriod = o.ResyncPeriod
c.SPDWorkloadGVResources = o.SPDWorkloadGVResources
c.SPDPodLabelIndexerKeys = o.SPDPodLabelIndexerKeys
c.EnableCNCCache = o.EnableCNCCache
c.IndicatorPlugins = o.IndicatorPlugins
c.BaselinePercent = o.BaselinePercent
return nil
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/controller/spd.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ type SPDConfig struct {
// SPDPodLabelIndexerKeys are used
SPDPodLabelIndexerKeys []string

// EnableCNCCache is to sync spd cnc target config
EnableCNCCache bool

IndicatorPlugins []string

BaselinePercent map[string]int64
Expand Down
22 changes: 12 additions & 10 deletions pkg/controller/kcc/cnc.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

apisv1alpha1 "github.com/kubewharf/katalyst-api/pkg/apis/config/v1alpha1"
configapi "github.com/kubewharf/katalyst-api/pkg/apis/config/v1alpha1"
configinformers "github.com/kubewharf/katalyst-api/pkg/client/informers/externalversions/config/v1alpha1"
"github.com/kubewharf/katalyst-api/pkg/client/listers/config/v1alpha1"
kcclient "github.com/kubewharf/katalyst-core/pkg/client"
Expand All @@ -44,6 +44,7 @@ import (
kcctarget "github.com/kubewharf/katalyst-core/pkg/controller/kcc/target"
"github.com/kubewharf/katalyst-core/pkg/controller/kcc/util"
"github.com/kubewharf/katalyst-core/pkg/metrics"
katalystutil "github.com/kubewharf/katalyst-core/pkg/util"
"github.com/kubewharf/katalyst-core/pkg/util/native"
)

Expand Down Expand Up @@ -219,7 +220,7 @@ func (c *CustomNodeConfigController) enqueueAllRelatedCNCForTargetConfig(target
}

func (c *CustomNodeConfigController) addCustomNodeConfigEventHandle(obj interface{}) {
t, ok := obj.(*apisv1alpha1.CustomNodeConfig)
t, ok := obj.(*configapi.CustomNodeConfig)
if !ok {
klog.Errorf("[cnc] cannot convert obj to *CustomNodeConfig: %v", obj)
return
Expand All @@ -230,7 +231,7 @@ func (c *CustomNodeConfigController) addCustomNodeConfigEventHandle(obj interfac
}

func (c *CustomNodeConfigController) updateCustomNodeConfigEventHandle(_, new interface{}) {
newCNC, ok := new.(*apisv1alpha1.CustomNodeConfig)
newCNC, ok := new.(*configapi.CustomNodeConfig)
if !ok {
klog.Errorf("[cnc] cannot convert obj to *CustomNodeConfig: %v", new)
return
Expand All @@ -240,7 +241,7 @@ func (c *CustomNodeConfigController) updateCustomNodeConfigEventHandle(_, new in
c.enqueueCustomNodeConfig(newCNC)
}

func (c *CustomNodeConfigController) enqueueCustomNodeConfig(cnc *apisv1alpha1.CustomNodeConfig) {
func (c *CustomNodeConfigController) enqueueCustomNodeConfig(cnc *configapi.CustomNodeConfig) {
if cnc == nil {
klog.Warning("[cnc] trying to enqueue a nil cnc")
return
Expand Down Expand Up @@ -304,7 +305,8 @@ func (c *CustomNodeConfigController) syncCustomNodeConfig(key string) error {
return nil
}

func (c *CustomNodeConfigController) patchCNC(cnc *apisv1alpha1.CustomNodeConfig, setFunc func(*apisv1alpha1.CustomNodeConfig)) (*apisv1alpha1.CustomNodeConfig, error) {
func (c *CustomNodeConfigController) patchCNC(cnc *configapi.CustomNodeConfig,
setFunc func(*configapi.CustomNodeConfig)) (*configapi.CustomNodeConfig, error) {
cncCopy := cnc.DeepCopy()
setFunc(cncCopy)
if apiequality.Semantic.DeepEqual(cnc, cncCopy) {
Expand All @@ -314,7 +316,7 @@ func (c *CustomNodeConfigController) patchCNC(cnc *apisv1alpha1.CustomNodeConfig
return c.cncControl.PatchCNCStatus(c.ctx, cnc.Name, cnc, cncCopy)
}

func (c *CustomNodeConfigController) updateCustomNodeConfig(cnc *apisv1alpha1.CustomNodeConfig) {
func (c *CustomNodeConfigController) updateCustomNodeConfig(cnc *configapi.CustomNodeConfig) {
c.targetHandler.RangeGVRTargetAccessor(func(gvr metav1.GroupVersionResource, targetAccessor kcctarget.KatalystCustomConfigTargetAccessor) bool {
matchedTarget, err := util.FindMatchedKCCTargetConfigForNode(cnc, targetAccessor)
if err != nil {
Expand All @@ -341,16 +343,16 @@ func (c *CustomNodeConfigController) clearUnusedConfig() {
return true
})

needToDeleteFunc := func(gvr metav1.GroupVersionResource) bool {
if _, ok := configGVRSet[gvr.String()]; !ok {
needToDeleteFunc := func(config configapi.TargetConfig) bool {
if _, ok := configGVRSet[config.ConfigType.String()]; !ok {
return true
}
return false
}

// func for clear cnc config if gvr config not exists
setFunc := func(cnc *apisv1alpha1.CustomNodeConfig) {
util.ClearUnNeededConfigForNode(cnc, needToDeleteFunc)
setFunc := func(cnc *configapi.CustomNodeConfig) {
cnc.Status.KatalystCustomConfigList = katalystutil.RemoveUnusedTargetConfig(cnc.Status.KatalystCustomConfigList, needToDeleteFunc)
}

clearCNCConfigs := func(i int) {
Expand Down
17 changes: 0 additions & 17 deletions pkg/controller/kcc/util/kcct.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,23 +144,6 @@ func FindMatchedKCCTargetConfigForNode(cnc *apisv1alpha1.CustomNodeConfig, targe
return kccTargetList[0], nil
}

// ClearUnNeededConfigForNode delete those un-needed configurations from CNC status
func ClearUnNeededConfigForNode(cnc *apisv1alpha1.CustomNodeConfig, needToDelete func(metav1.GroupVersionResource) bool) {
if cnc == nil {
return
}

katalystCustomConfigList := make([]apisv1alpha1.TargetConfig, 0, len(cnc.Status.KatalystCustomConfigList))
for _, config := range cnc.Status.KatalystCustomConfigList {
if needToDelete(config.ConfigType) {
continue
}
katalystCustomConfigList = append(katalystCustomConfigList, config)
}

cnc.Status.KatalystCustomConfigList = katalystCustomConfigList
}

// filterAvailableKCCTargetConfigs returns those available configurations from kcc target list
func filterAvailableKCCTargetConfigs(kccTargetList []*unstructured.Unstructured) ([]*unstructured.Unstructured, error) {
availableKCCTargetList := make([]*unstructured.Unstructured, 0, len(kccTargetList))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/controller/lifecycle/agent-healthz/helper"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util"
"github.com/kubewharf/katalyst-core/pkg/util/native"
)

const AgentHandlerGeneric = "generic"
Expand Down Expand Up @@ -129,7 +130,7 @@ func (g *GenericAgentHandler) GetCNRTaintInfo(nodeName string) (*helper.CNRTaint
// getNodeReclaimedPods returns reclaimed pods contained in the given node,
// only those nodes with reclaimed pods should be triggered with eviction/taint logic for generic agents
func (g *GenericAgentHandler) getNodeReclaimedPods(node *corev1.Node) (names []string) {
pods, err := helper.GetPodsAssignedToNode(node.Name, g.podIndexer)
pods, err := native.GetPodsAssignedToNode(node.Name, g.podIndexer)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to list pods from node %q: %v", node.Name, err))
return
Expand Down
14 changes: 2 additions & 12 deletions pkg/controller/lifecycle/agent-healthz/healthz_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,18 +123,8 @@ func NewHealthzController(ctx context.Context,

ec.podListerSynced = podInformer.Informer().HasSynced
podIndexer := podInformer.Informer().GetIndexer()
if err := podInformer.Informer().AddIndexers(cache.Indexers{
helper.NodeNameKeyIndex: func(obj interface{}) ([]string, error) {
pod, ok := obj.(*corev1.Pod)
if !ok {
return []string{}, nil
}
if len(pod.Spec.NodeName) == 0 {
return []string{}, nil
}
return []string{pod.Spec.NodeName}, nil
},
}); err != nil {

if err := native.AddNodeNameIndexerForPod(podInformer); err != nil {
return nil, err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (h *HealthzHelper) syncHeartBeatMap() {
currentNodes := sets.String{}
for _, node := range nodes {
baseTags := []metrics.MetricTag{{Key: metricsTagKeyNodeName, Val: node.Name}}
pods, err := GetPodsAssignedToNode(node.Name, h.podIndexer)
pods, err := native.GetPodsAssignedToNode(node.Name, h.podIndexer)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to list pods from node %q: %v", node.Name, err))
continue
Expand Down
Loading

0 comments on commit cd34065

Please sign in to comment.