Skip to content

Commit

Permalink
spd support extended indicator sdk and extended baseline
Browse files Browse the repository at this point in the history
  • Loading branch information
luomingmeng committed Mar 7, 2024
1 parent fbb326a commit 77b958e
Show file tree
Hide file tree
Showing 12 changed files with 340 additions and 121 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ require (
)

replace (
github.com/kubewharf/katalyst-api => github.com/luomingmeng/katalyst-api v0.0.0-20240307163059-4bef1b20e09d
k8s.io/api => k8s.io/api v0.24.6
k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.24.6
k8s.io/apimachinery => k8s.io/apimachinery v0.24.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -548,8 +548,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubewharf/katalyst-api v0.4.1-0.20240222122824-be538f641f58 h1:D9dCR5EIR0k0Qil2A5biZjrubagRkEr7fyov6fb2ApY=
github.com/kubewharf/katalyst-api v0.4.1-0.20240222122824-be538f641f58/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/kubewharf/kubelet v1.24.6-kubewharf.8 h1:2e89T/nZTgzaVhyRsZuwEdRk8V8kJXs4PRkgfeG4Ai4=
github.com/kubewharf/kubelet v1.24.6-kubewharf.8/go.mod h1:MxbSZUx3wXztFneeelwWWlX7NAAStJ6expqq7gY2J3c=
github.com/kyoh86/exportloopref v0.1.7/go.mod h1:h1rDl2Kdj97+Kwh4gdz3ujE7XHmH51Q0lUiZ1z4NLj8=
Expand All @@ -561,6 +559,8 @@ github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0U
github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc=
github.com/logrusorgru/aurora v0.0.0-20181002194514-a7b3b318ed4e/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4=
github.com/lpabon/godbc v0.1.1/go.mod h1:Jo9QV0cf3U6jZABgiJ2skINAXb9j8m51r07g4KI92ZA=
github.com/luomingmeng/katalyst-api v0.0.0-20240307163059-4bef1b20e09d h1:rIc4hfUGC9I/rTVSaAOlG9bEeOyMfcLeH97eCa5ArxY=
github.com/luomingmeng/katalyst-api v0.0.0-20240307163059-4bef1b20e09d/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
Expand Down
22 changes: 21 additions & 1 deletion pkg/controller/spd/indicator-plugin/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ const (
// IndicatorUpdater is used by IndicatorPlugin as a unified implementation
// to trigger indicator updating logic.
type IndicatorUpdater interface {
// UpdateBusinessIndicatorSpec + UpdateSystemIndicatorSpec + UpdateBusinessIndicatorStatus
// UpdateExtendedIndicatorSpec + UpdateBusinessIndicatorSpec + UpdateSystemIndicatorSpec + UpdateBusinessIndicatorStatus
// for indicator add functions, IndicatorUpdater will try to merge them in local stores.
UpdateExtendedIndicatorSpec(_ types.NamespacedName, _ []apiworkload.ServiceExtendedIndicatorSpec)
UpdateBusinessIndicatorSpec(_ types.NamespacedName, _ []apiworkload.ServiceBusinessIndicatorSpec)
UpdateSystemIndicatorSpec(_ types.NamespacedName, _ []apiworkload.ServiceSystemIndicatorSpec)
UpdateBusinessIndicatorStatus(_ types.NamespacedName, _ []apiworkload.ServiceBusinessIndicatorStatus)
Expand Down Expand Up @@ -78,6 +79,24 @@ func NewIndicatorManager() *IndicatorManager {
}
}

func (u *IndicatorManager) UpdateExtendedIndicatorSpec(nn types.NamespacedName, indicators []apiworkload.ServiceExtendedIndicatorSpec) {
u.specMtx.Lock()

insert := false
if _, ok := u.specMap[nn]; !ok {
insert = true
u.specMap[nn] = initServiceProfileDescriptorSpec()
}
for _, indicator := range indicators {
util.InsertSPDExtendedIndicatorSpec(u.specMap[nn], &indicator)
}
u.specMtx.Unlock()

if insert {
u.specQueue <- nn
}
}

func (u *IndicatorManager) UpdateBusinessIndicatorSpec(nn types.NamespacedName, indicators []apiworkload.ServiceBusinessIndicatorSpec) {
u.specMtx.Lock()

Expand Down Expand Up @@ -173,6 +192,7 @@ func (u *IndicatorManager) GetIndicatorStatus(nn types.NamespacedName) *apiworkl

func initServiceProfileDescriptorSpec() *apiworkload.ServiceProfileDescriptorSpec {
return &apiworkload.ServiceProfileDescriptorSpec{
ExtendedIndicator: []apiworkload.ServiceExtendedIndicatorSpec{},
BusinessIndicator: []apiworkload.ServiceBusinessIndicatorSpec{},
SystemIndicator: []apiworkload.ServiceSystemIndicatorSpec{},
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/spd/indicator-plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ type IndicatorPlugin interface {
// is not supported by any indicator plugin, the controller will clear in CR.
GetSupportedBusinessIndicatorSpec() []apiworkload.ServiceBusinessIndicatorName
GetSupportedSystemIndicatorSpec() []apiworkload.ServiceSystemIndicatorName
GetSupportedExtendedIndicatorSpec() []string
GetSupportedBusinessIndicatorStatus() []apiworkload.ServiceBusinessIndicatorName
}

type DummyIndicatorPlugin struct {
SystemSpecNames []apiworkload.ServiceSystemIndicatorName
BusinessSpecNames []apiworkload.ServiceBusinessIndicatorName
BusinessStatusNames []apiworkload.ServiceBusinessIndicatorName
ExtendedSpecNames []string
}

var _ IndicatorPlugin = DummyIndicatorPlugin{}
Expand All @@ -60,6 +62,9 @@ func (d DummyIndicatorPlugin) GetSupportedBusinessIndicatorSpec() []apiworkload.
func (d DummyIndicatorPlugin) GetSupportedSystemIndicatorSpec() []apiworkload.ServiceSystemIndicatorName {
return d.SystemSpecNames
}
func (d DummyIndicatorPlugin) GetSupportedExtendedIndicatorSpec() []string {
return d.ExtendedSpecNames
}
func (d DummyIndicatorPlugin) GetSupportedBusinessIndicatorStatus() []apiworkload.ServiceBusinessIndicatorName {
return d.BusinessStatusNames
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/controller/spd/spd.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type SPDController struct {
indicatorManager *indicator_plugin.IndicatorManager
indicatorPlugins map[string]indicator_plugin.IndicatorPlugin
indicatorsSpecBusiness map[apiworkload.ServiceBusinessIndicatorName]interface{}
indicatorsSpecExtended map[string]interface{}
indicatorsSpecSystem map[apiworkload.ServiceSystemIndicatorName]interface{}
indicatorsStatusBusiness map[apiworkload.ServiceBusinessIndicatorName]interface{}
}
Expand Down Expand Up @@ -247,6 +248,7 @@ func (sc *SPDController) initializeIndicatorPlugins(controlCtx *katalystbase.Gen
sc.indicatorPlugins = make(map[string]indicator_plugin.IndicatorPlugin)
sc.indicatorsSpecBusiness = make(map[apiworkload.ServiceBusinessIndicatorName]interface{})
sc.indicatorsSpecSystem = make(map[apiworkload.ServiceSystemIndicatorName]interface{})
sc.indicatorsSpecExtended = make(map[string]interface{})
sc.indicatorsStatusBusiness = make(map[apiworkload.ServiceBusinessIndicatorName]interface{})

initializers := indicator_plugin.GetPluginInitializers()
Expand All @@ -266,6 +268,9 @@ func (sc *SPDController) initializeIndicatorPlugins(controlCtx *katalystbase.Gen
for _, name := range plugin.GetSupportedSystemIndicatorSpec() {
sc.indicatorsSpecSystem[name] = struct{}{}
}
for _, name := range plugin.GetSupportedExtendedIndicatorSpec() {
sc.indicatorsSpecExtended[name] = struct{}{}
}
for _, name := range plugin.GetSupportedBusinessIndicatorStatus() {
sc.indicatorsStatusBusiness[name] = struct{}{}
}
Expand Down Expand Up @@ -596,7 +601,11 @@ func (sc *SPDController) getOrCreateSPDForWorkload(workload *unstructured.Unstru
AggMetrics: []apiworkload.AggPodMetrics{},
},
}
sc.updateBaselineSentinel(spd)

err := sc.updateBaselineSentinel(spd)
if err != nil {
return nil, err
}

return sc.spdControl.CreateSPD(sc.ctx, spd, metav1.CreateOptions{})
}
Expand Down
64 changes: 49 additions & 15 deletions pkg/controller/spd/spd_baseline.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,46 +37,80 @@ func (sc *SPDController) updateBaselineSentinel(spd *v1alpha1.ServiceProfileDesc
return nil
}

if spd.Spec.BaselinePercent == nil || *spd.Spec.BaselinePercent >= consts.SPDBaselinePercentMax || *spd.Spec.BaselinePercent <= consts.SPDBaselinePercentMin {
// delete baseline sentinel annotation if baseline percent or extended indicator not set
if spd.Spec.BaselinePercent == nil && len(spd.Spec.ExtendedIndicator) == 0 {
util.SetSPDBaselineSentinel(spd, nil)
util.SetSPDExtendedBaselineSentinel(spd, nil)
return nil
}

podMeta, err := sc.calculateBaselineSentinel(spd)
podMetaList, err := sc.getSPDPodMetaList(spd)
if err != nil {
return err
}
util.SetSPDBaselineSentinel(spd, &podMeta)

// calculate baseline sentinel
baselineSentinel := calculateBaselineSentinel(podMetaList, spd.Spec.BaselinePercent)

// calculate extended baseline sentinel for each extended indicator
extendedBaselineSentinel := make(map[string]util.SPDBaselinePodMeta)
for _, indicator := range spd.Spec.ExtendedIndicator {
sentinel := calculateBaselineSentinel(podMetaList, indicator.BaselinePercent)
if sentinel == nil {
continue
}

extendedBaselineSentinel[indicator.Name] = *sentinel
}

util.SetSPDBaselineSentinel(spd, baselineSentinel)
util.SetSPDExtendedBaselineSentinel(spd, extendedBaselineSentinel)
return nil
}

// calculateBaselineSentinel returns the sentinel one for a list of pods
// referenced by the SPD. If one pod's createTime is less than the sentinel pod
func (sc *SPDController) calculateBaselineSentinel(spd *v1alpha1.ServiceProfileDescriptor) (util.SPDBaselinePodMeta, error) {
// getSPDPodMetaList get spd pod meta list in order
func (sc *SPDController) getSPDPodMetaList(spd *v1alpha1.ServiceProfileDescriptor) ([]util.SPDBaselinePodMeta, error) {
gvr, _ := meta.UnsafeGuessKindToResource(schema.FromAPIVersionAndKind(spd.Spec.TargetRef.APIVersion, spd.Spec.TargetRef.Kind))
workloadLister, ok := sc.workloadLister[gvr]
if !ok {
return util.SPDBaselinePodMeta{}, fmt.Errorf("without workload lister for gvr %v", gvr)
return nil, fmt.Errorf("without workload lister for gvr %v", gvr)
}

podList, err := util.GetPodListForSPD(spd, sc.podIndexer, sc.conf.SPDPodLabelIndexerKeys, workloadLister, sc.podLister)
if err != nil {
return util.SPDBaselinePodMeta{}, err
return nil, err
}

podList = native.FilterPods(podList, func(pod *v1.Pod) (bool, error) {
return native.PodIsActive(pod), nil
})
if len(podList) == 0 {
return util.SPDBaselinePodMeta{}, nil
return nil, nil
}

bcList := make([]util.SPDBaselinePodMeta, 0, len(podList))
podMetaList := make([]util.SPDBaselinePodMeta, 0, len(podList))
for _, p := range podList {
bcList = append(bcList, util.GetPodMeta(p))
podMetaList = append(podMetaList, util.GetPodMeta(p))
}
sort.SliceStable(bcList, func(i, j int) bool {
return bcList[i].Cmp(bcList[j]) < 0
sort.SliceStable(podMetaList, func(i, j int) bool {
return podMetaList[i].Cmp(podMetaList[j]) < 0
})
baselineIndex := int(math.Floor(float64(len(bcList)-1) * float64(*spd.Spec.BaselinePercent) / 100))
return bcList[baselineIndex], nil

return podMetaList, nil
}

// calculateBaselineSentinel returns the sentinel one for a list of pods
// referenced by the SPD. If one pod's createTime is less than the sentinel pod
func calculateBaselineSentinel(podMetaList []util.SPDBaselinePodMeta, baselinePercent *int32) *util.SPDBaselinePodMeta {
if baselinePercent == nil || *baselinePercent >= consts.SPDBaselinePercentMax ||
*baselinePercent <= consts.SPDBaselinePercentMin {
return nil
}

if len(podMetaList) == 0 {
return nil
}

baselineIndex := int(math.Floor(float64(len(podMetaList)-1) * float64(*baselinePercent) / 100))
return &podMetaList[baselineIndex]
}
3 changes: 0 additions & 3 deletions pkg/controller/spd/spd_baseline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,6 @@ func TestSPDController_updateBaselinePercentile(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "spd1",
Annotations: map[string]string{
consts.SPDAnnotationBaselineSentinelKey: util.SPDBaselinePodMeta{}.String(),
},
},
Spec: apiworkload.ServiceProfileDescriptorSpec{
TargetRef: apis.CrossVersionObjectReference{
Expand Down
10 changes: 10 additions & 0 deletions pkg/controller/spd/spd_indicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ func (sc *SPDController) mergeIndicatorSpec(spd *apiworkload.ServiceProfileDescr
for _, indicator := range expected.SystemIndicator {
util.InsertSPDSystemIndicatorSpec(&spd.Spec, &indicator)
}
for _, indicator := range expected.ExtendedIndicator {
util.InsertSPDExtendedIndicatorSpec(&spd.Spec, &indicator)
}

for i := 0; i < len(spd.Spec.BusinessIndicator); i++ {
if _, ok := sc.indicatorsSpecBusiness[spd.Spec.BusinessIndicator[i].Name]; !ok {
Expand All @@ -200,6 +203,13 @@ func (sc *SPDController) mergeIndicatorSpec(spd *apiworkload.ServiceProfileDescr
spd.Spec.SystemIndicator = append(spd.Spec.SystemIndicator[:i], spd.Spec.SystemIndicator[i+1:]...)
}
}

for i := 0; i < len(spd.Spec.ExtendedIndicator); i++ {
if _, ok := sc.indicatorsSpecExtended[spd.Spec.ExtendedIndicator[i].Name]; !ok {
klog.Infof("skip spec extended %v for spd %v", spd.Spec.ExtendedIndicator[i].Name, spd.Name)
spd.Spec.ExtendedIndicator = append(spd.Spec.ExtendedIndicator[:i], spd.Spec.ExtendedIndicator[i+1:]...)
}
}
}

func (sc *SPDController) mergeIndicatorStatus(spd *apiworkload.ServiceProfileDescriptor, expected apiworkload.ServiceProfileDescriptorStatus) {
Expand Down
66 changes: 61 additions & 5 deletions pkg/metaserver/spd/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@ package spd
import (
"context"
"fmt"
"reflect"
"strings"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"

workloadapis "github.com/kubewharf/katalyst-api/pkg/apis/workload/v1alpha1"
"github.com/kubewharf/katalyst-core/pkg/util"
)

Expand Down Expand Up @@ -57,6 +61,9 @@ type ServiceProfilingManager interface {
// ServiceBaseline returns whether this pod is baseline
ServiceBaseline(ctx context.Context, pod *v1.Pod) (bool, error)

// ServiceExtendedIndicator load the extended indicators and return whether the pod is baseline for the extended indicators
ServiceExtendedIndicator(ctx context.Context, pod *v1.Pod, indicators interface{}) (bool, error)

// Run starts the service profiling manager
Run(ctx context.Context)
}
Expand All @@ -70,7 +77,11 @@ type DummyServiceProfilingManager struct {
podProfiles map[types.UID]DummyPodServiceProfile
}

func (d *DummyServiceProfilingManager) ServiceBaseline(ctx context.Context, pod *v1.Pod) (bool, error) {
func (d *DummyServiceProfilingManager) ServiceExtendedIndicator(_ context.Context, _ *v1.Pod, _ interface{}) (bool, error) {
return false, nil
}

func (d *DummyServiceProfilingManager) ServiceBaseline(_ context.Context, _ *v1.Pod) (bool, error) {
return false, nil
}

Expand Down Expand Up @@ -106,6 +117,50 @@ type serviceProfilingManager struct {
fetcher SPDFetcher
}

func (m *serviceProfilingManager) ServiceExtendedIndicator(ctx context.Context, pod *v1.Pod, indicators interface{}) (bool, error) {
spd, err := m.fetcher.GetSPD(ctx, pod)
if err != nil {
return false, err
}

extendedBaselineSentinel, err := util.GetSPDExtendedBaselineSentinel(spd)
if err != nil {
return false, err
}

name, o, err := util.GetExtendedIndicator(indicators)
if err != nil {
return false, err
}

for _, indicator := range spd.Spec.ExtendedIndicator {
if indicator.Name != name {
continue
}

object := indicator.Indicators.Object
if object == nil {
return false, fmt.Errorf("%s inidators object is nil", name)
}

t := reflect.TypeOf(indicators)
if t.Kind() != reflect.Ptr {
return false, fmt.Errorf("indicators must be pointers to structs")
}

v := reflect.ValueOf(object)
if !v.CanConvert(t) {
return false, fmt.Errorf("%s indicators object cannot convert to %v", name, t.Name())
}

reflect.ValueOf(indicators).Elem().Set(v.Convert(t).Elem())
return util.IsExtendedBaselinePod(pod, indicator.BaselinePercent, extendedBaselineSentinel, name)
}

return false, errors.NewNotFound(schema.GroupResource{Group: workloadapis.GroupName,
Resource: strings.ToLower(o.GetObjectKind().GroupVersionKind().Kind)}, name)
}

func (m *serviceProfilingManager) ServiceBaseline(ctx context.Context, pod *v1.Pod) (bool, error) {
spd, err := m.fetcher.GetSPD(ctx, pod)
if err != nil && !errors.IsNotFound(err) {
Expand All @@ -114,16 +169,17 @@ func (m *serviceProfilingManager) ServiceBaseline(ctx context.Context, pod *v1.P
return false, nil
}

baselinePod, enable, err := util.IsBaselinePod(pod, spd)
baselineSentinel, err := util.GetSPDBaselineSentinel(spd)
if err != nil {
return false, err
}

if enable {
return baselinePod, nil
isBaseline, err := util.IsBaselinePod(pod, spd.Spec.BaselinePercent, baselineSentinel)
if err != nil {
return false, err
}

return false, nil
return isBaseline, nil
}

func NewServiceProfilingManager(fetcher SPDFetcher) ServiceProfilingManager {
Expand Down
Loading

0 comments on commit 77b958e

Please sign in to comment.