Skip to content

Commit

Permalink
refine metric-fetcher to support specified collecting intervals
Browse files Browse the repository at this point in the history
  • Loading branch information
waynepeking348 committed May 10, 2024
1 parent 6e4a3b0 commit 74b648c
Show file tree
Hide file tree
Showing 7 changed files with 316 additions and 92 deletions.
36 changes: 12 additions & 24 deletions cmd/katalyst-agent/app/options/metaserver/metaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ const (
defaultServiceProfileCacheTTL = 1 * time.Minute
)

const defaultMetricInsurancePeriod = 0 * time.Second

const (
defaultKubeletPodCacheSyncPeriod = 30 * time.Second
defaultKubeletPodCacheSyncMaxRate = 5
Expand All @@ -56,12 +54,11 @@ const defaultCustomNodeResourceCacheTTL = 15 * time.Second

const defaultCustomNodeConfigCacheTTL = 15 * time.Second

const defaultRodanServerPort = 9102

// MetaServerOptions holds all the configurations for metaserver.
// we will not try to separate this structure into several individual
// structures since it will not be used directly by other components; instead,
// we will only separate them with blanks in a single structure.
// todo separate this option-structure into individual structures
type MetaServerOptions struct {
// generic configurations for metaserver
CheckpointManagerDir string
Expand All @@ -78,11 +75,6 @@ type MetaServerOptions struct {
ServiceProfileSkipCorruptionError bool
ServiceProfileCacheTTL time.Duration

// configurations for metric-fetcher
MetricInsurancePeriod time.Duration
MetricProvisions []string
RodanServerPort int

// configurations for pod-cache
KubeletPodCacheSyncPeriod time.Duration
KubeletPodCacheSyncMaxRate int
Expand All @@ -94,6 +86,9 @@ type MetaServerOptions struct {

// configurations for cnc
CustomNodeConfigCacheTTL time.Duration

// configurations for metric-fetcher
*MetricFetcherOptions
}

func NewMetaServerOptions() *MetaServerOptions {
Expand All @@ -110,10 +105,6 @@ func NewMetaServerOptions() *MetaServerOptions {
ServiceProfileSkipCorruptionError: defaultServiceProfileSkipCorruptionError,
ServiceProfileCacheTTL: defaultServiceProfileCacheTTL,

MetricInsurancePeriod: defaultMetricInsurancePeriod,
MetricProvisions: []string{metaserver.MetricProvisionerMalachite, metaserver.MetricProvisionerKubelet},
RodanServerPort: defaultRodanServerPort,

KubeletPodCacheSyncPeriod: defaultKubeletPodCacheSyncPeriod,
KubeletPodCacheSyncMaxRate: defaultKubeletPodCacheSyncMaxRate,
KubeletPodCacheSyncBurstBulk: defaultKubeletPodCacheSyncBurstBulk,
Expand All @@ -122,6 +113,8 @@ func NewMetaServerOptions() *MetaServerOptions {
CNRCacheTTL: defaultCustomNodeResourceCacheTTL,

CustomNodeConfigCacheTTL: defaultCustomNodeConfigCacheTTL,

MetricFetcherOptions: NewMetricFetcherOptions(),
}
}

Expand Down Expand Up @@ -150,13 +143,6 @@ func (o *MetaServerOptions) AddFlags(fss *cliflag.NamedFlagSets) {
fs.DurationVar(&o.ServiceProfileCacheTTL, "service-profile-cache-ttl", o.ServiceProfileCacheTTL,
"The ttl of service profile manager cache remote spd")

fs.DurationVar(&o.MetricInsurancePeriod, "metric-insurance-period", o.MetricInsurancePeriod,
"The meta server return metric data and MetricDataExpired if the update time of metric data is earlier than this period.")
fs.StringSliceVar(&o.MetricProvisions, "metric-provisioners", o.MetricProvisions,
"The provisioners that should be enabled by default")
fs.IntVar(&o.RodanServerPort, "rodan-server-port", o.RodanServerPort,
"The rodan metric provisioner server port")

fs.DurationVar(&o.KubeletPodCacheSyncPeriod, "kubelet-pod-cache-sync-period", o.KubeletPodCacheSyncPeriod,
"The period of meta server to sync pod from kubelet 10255 port")
fs.IntVar(&o.KubeletPodCacheSyncMaxRate, "kubelet-pod-cache-sync-max-rate", o.KubeletPodCacheSyncMaxRate,
Expand All @@ -171,6 +157,8 @@ func (o *MetaServerOptions) AddFlags(fss *cliflag.NamedFlagSets) {

fs.DurationVar(&o.CustomNodeConfigCacheTTL, "custom-node-config-cache-ttl", o.CustomNodeConfigCacheTTL,
"The ttl of custom node config fetcher cache remote cnc")

o.MetricFetcherOptions.AddFlags(fss)
}

// ApplyTo fills up config with options
Expand All @@ -187,10 +175,6 @@ func (o *MetaServerOptions) ApplyTo(c *metaserver.MetaServerConfiguration) error
c.ServiceProfileSkipCorruptionError = o.ServiceProfileSkipCorruptionError
c.ServiceProfileCacheTTL = o.ServiceProfileCacheTTL

c.MetricInsurancePeriod = o.MetricInsurancePeriod
c.MetricProvisions = o.MetricProvisions
c.RodanServerPort = o.RodanServerPort

c.KubeletPodCacheSyncPeriod = o.KubeletPodCacheSyncPeriod
c.KubeletPodCacheSyncMaxRate = rate.Limit(o.KubeletPodCacheSyncMaxRate)
c.KubeletPodCacheSyncBurstBulk = o.KubeletPodCacheSyncBurstBulk
Expand All @@ -200,5 +184,9 @@ func (o *MetaServerOptions) ApplyTo(c *metaserver.MetaServerConfiguration) error

c.CustomNodeConfigCacheTTL = o.CustomNodeConfigCacheTTL

if err := o.MetricFetcherOptions.ApplyTo(c.MetricConfiguration); err != nil {
return err
}

return nil
}
102 changes: 102 additions & 0 deletions cmd/katalyst-agent/app/options/metaserver/metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
Copyright 2022 The Katalyst Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metaserver

import (
"time"

cliflag "k8s.io/component-base/cli/flag"

"github.com/kubewharf/katalyst-core/pkg/config/agent/metaserver"
)

const defaultMetricInsurancePeriod = 0 * time.Second

const defaultRodanServerPort = 9102

type MetricFetcherOptions struct {
MetricInsurancePeriod time.Duration
MetricProvisions []string

DefaultInterval time.Duration
ProvisionerIntervalSecs map[string]int

*MalachiteOptions
*CgroupOptions
*KubeletOptions
*RodanOptions
}

type (
MalachiteOptions struct{}
CgroupOptions struct{}
KubeletOptions struct{}
RodanOptions struct {
ServerPort int
}
)

func NewMetricFetcherOptions() *MetricFetcherOptions {
return &MetricFetcherOptions{
MetricInsurancePeriod: defaultMetricInsurancePeriod,
MetricProvisions: []string{metaserver.MetricProvisionerMalachite, metaserver.MetricProvisionerKubelet},

DefaultInterval: time.Second * 5,
ProvisionerIntervalSecs: make(map[string]int),

MalachiteOptions: &MalachiteOptions{},
CgroupOptions: &CgroupOptions{},
KubeletOptions: &KubeletOptions{},
RodanOptions: &RodanOptions{
ServerPort: defaultRodanServerPort,
},
}
}

// AddFlags adds flags to the specified FlagSet.
func (o *MetricFetcherOptions) AddFlags(fss *cliflag.NamedFlagSets) {
fs := fss.FlagSet("metric-server")

fs.DurationVar(&o.MetricInsurancePeriod, "metric-insurance-period", o.MetricInsurancePeriod,
"The meta server return metric data and MetricDataExpired if the update time of metric data is earlier than this period.")
fs.StringSliceVar(&o.MetricProvisions, "metric-provisioners", o.MetricProvisions,
"The provisioners that should be enabled by default")

fs.DurationVar(&o.DefaultInterval, "metric-interval", o.DefaultInterval,
"The default metric provisioner collecting interval")
fs.StringToIntVar(&o.ProvisionerIntervalSecs, "metric-provisioner-intervals", o.ProvisionerIntervalSecs,
"The metric provisioner collecting intervals for each individual provisioner")

fs.IntVar(&o.RodanOptions.ServerPort, "rodan-server-port", o.RodanOptions.ServerPort,
"The rodan metric provisioner server port")
}

// ApplyTo fills up config with options
func (o *MetricFetcherOptions) ApplyTo(c *metaserver.MetricConfiguration) error {
c.MetricInsurancePeriod = o.MetricInsurancePeriod
c.MetricProvisions = o.MetricProvisions

c.DefaultInterval = o.DefaultInterval
c.ProvisionerIntervals = make(map[string]time.Duration)
for name, secs := range o.ProvisionerIntervalSecs {
c.ProvisionerIntervals[name] = time.Second * time.Duration(secs)
}

c.RodanServerPort = o.RodanOptions.ServerPort

return nil
}
19 changes: 18 additions & 1 deletion pkg/config/agent/metaserver/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,22 @@ const (
type MetricConfiguration struct {
MetricInsurancePeriod time.Duration
MetricProvisions []string

DefaultInterval time.Duration
ProvisionerIntervals map[string]time.Duration

*MalachiteMetricConfiguration
*CgroupMetricConfiguration
*KubeletMetricConfiguration
*RodanMetricConfiguration
}

type MalachiteMetricConfiguration struct{}

type CgroupMetricConfiguration struct{}

type KubeletMetricConfiguration struct{}

type RodanMetricConfiguration struct {
RodanServerPort int
}
Expand Down Expand Up @@ -71,7 +84,11 @@ type AgentConfiguration struct {
func NewAgentConfiguration() *AgentConfiguration {
return &AgentConfiguration{
MetricConfiguration: &MetricConfiguration{
RodanMetricConfiguration: &RodanMetricConfiguration{},
ProvisionerIntervals: make(map[string]time.Duration),
MalachiteMetricConfiguration: &MalachiteMetricConfiguration{},
CgroupMetricConfiguration: &CgroupMetricConfiguration{},
KubeletMetricConfiguration: &KubeletMetricConfiguration{},
RodanMetricConfiguration: &RodanMetricConfiguration{},
},
PodConfiguration: &PodConfiguration{},
NodeConfiguration: &NodeConfiguration{},
Expand Down
Loading

0 comments on commit 74b648c

Please sign in to comment.