Skip to content

Commit

Permalink
feat(qrm): support low priority group in network plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
csfldf committed Nov 5, 2024
1 parent d1dc9d9 commit c3efcce
Show file tree
Hide file tree
Showing 11 changed files with 222 additions and 18 deletions.
5 changes: 5 additions & 0 deletions pkg/agent/qrm-plugins/network/state/consts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package state

const (
NetBandwidthImplicitAnnotationKey = "resource.katalyst.kubewharf.io/net_bandwidth_implicit"
)
31 changes: 30 additions & 1 deletion pkg/agent/qrm-plugins/network/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package state
import (
"encoding/json"
"fmt"
"strconv"

info "github.com/google/cadvisor/info/v1"
pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1"
Expand All @@ -39,8 +40,11 @@ type AllocationInfo struct {
PodType string `json:"pod_type,omitempty"`
Egress uint32 `json:"egress"`
Ingress uint32 `json:"ingress"`
IfName string `json:"if_name"` // we do not support cross-nic bandwidth
IfName string `json:"if_name"` // we do not support cross-nic bandwidth
NSName string `json:"ns_name"`
NumaNodes machine.CPUSet `json:"numa_node"` // associated numa nodes of the socket connecting to the selected NIC
QoSLevel string `json:"qosLevel"`
NetClassID string `json:"net_class_id"`

Labels map[string]string `json:"labels"`
Annotations map[string]string `json:"annotations"`
Expand Down Expand Up @@ -109,6 +113,8 @@ func (ai *AllocationInfo) Clone() *AllocationInfo {
Ingress: ai.Ingress,
IfName: ai.IfName,
NumaNodes: ai.NumaNodes.Clone(),
QoSLevel: ai.QoSLevel,
NetClassID: ai.NetClassID,
Labels: general.DeepCopyMap(ai.Labels),
Annotations: general.DeepCopyMap(ai.Annotations),
}
Expand All @@ -126,6 +132,29 @@ func (ai *AllocationInfo) CheckSideCar() bool {
return ai.ContainerType == pluginapi.ContainerType_SIDECAR.String()
}

func (ai *AllocationInfo) GetRequestedEgress() (uint32, error) {
if ai == nil {
return 0, fmt.Errorf("nil AllocationInfo")
}

if ai.Egress > 0 && ai.Annotations[NetBandwidthImplicitAnnotationKey] != "" {
return 0, fmt.Errorf("ambiguous ai.Egress: %d, %s: %s",
ai.Egress, NetBandwidthImplicitAnnotationKey, ai.Annotations[NetBandwidthImplicitAnnotationKey])
} else if ai.Egress > 0 {
return ai.Egress, nil
} else if ai.Annotations[NetBandwidthImplicitAnnotationKey] != "" {
ret, err := strconv.Atoi(ai.Annotations[NetBandwidthImplicitAnnotationKey])
if err != nil {
return 0, fmt.Errorf("parse %s: %s failed with error: %v",
NetBandwidthImplicitAnnotationKey, ai.Annotations[NetBandwidthImplicitAnnotationKey], err)
}

return uint32(ret), nil
}

return 0, nil
}

func (pe PodEntries) Clone() PodEntries {
clone := make(PodEntries)
for podUID, containerEntries := range pe {
Expand Down
132 changes: 116 additions & 16 deletions pkg/agent/qrm-plugins/network/staticpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/util/machine"
"github.com/kubewharf/katalyst-core/pkg/util/native"
"github.com/kubewharf/katalyst-core/pkg/util/qos"
qrmgeneral "github.com/kubewharf/katalyst-core/pkg/util/qrm"
)

const (
Expand Down Expand Up @@ -82,6 +83,7 @@ type StaticPolicy struct {
CgroupV2Env bool
qosLevelToNetClassMap map[string]uint32
applyNetClassFunc func(podUID, containerID string, data *common.NetClsData) error
applyNetworkGroupsFunc func(map[string]*qrmgeneral.NetworkGroup) error
podLevelNetClassAnnoKey string
podLevelNetAttributesAnnoKeys []string
ipv4ResourceAllocationAnnotationKey string
Expand All @@ -94,6 +96,8 @@ type StaticPolicy struct {
podAnnotationKeptKeys []string
podLabelKeptKeys []string

lowPriorityGroups map[string]*qrmgeneral.NetworkGroup

// aliveCgroupID is used to record the alive cgroupIDs and their last alive time
aliveCgroupID map[uint64]time.Time
}
Expand Down Expand Up @@ -159,6 +163,12 @@ func NewStaticPolicy(agentCtx *agent.GenericContext, conf *config.Configuration,

policyImplement.ApplyConfig(conf.StaticAgentConfiguration)

// [TODO] generate and apply
err = policyImplement.generateLowPriorityGroup()
if err != nil {
return false, agent.ComponentStub{}, fmt.Errorf("generateLowPriorityGroup failed with error: %v", err)
}

pluginWrapper, err := skeleton.NewRegistrationPluginWrapper(policyImplement, conf.QRMPluginSocketDirs,
func(key string, value int64) {
_ = wrappedEmitter.StoreInt64(key, value, metrics.MetricTypeNameRaw)
Expand Down Expand Up @@ -514,6 +524,14 @@ func (p *StaticPolicy) Allocate(_ context.Context,
return nil, err
}

netClassID, err := p.getNetClassID(podAnnotations, p.podLevelNetClassAnnoKey, qosLevel)
if err != nil {
err = fmt.Errorf("getNetClassID for pod: %s/%s, container: %s failed with error: %v",
req.PodNamespace, req.PodName, req.ContainerName, err)
general.Errorf("%s", err.Error())
return nil, err
}

reqInt, _, err := util.GetQuantityFromResourceReq(req)
if err != nil {
return nil, fmt.Errorf("getReqQuantityFromResourceReq failed with error: %v", err)
Expand All @@ -525,7 +543,8 @@ func (p *StaticPolicy) Allocate(_ context.Context,
"containerName", req.ContainerName,
"qosLevel", qosLevel,
"reqAnnotations", req.Annotations,
"netBandwidthReq(Mbps)", reqInt)
"netBandwidthReq(Mbps)", reqInt,
"netClassID", netClassID)

p.Lock()
defer func() {
Expand Down Expand Up @@ -570,7 +589,7 @@ func (p *StaticPolicy) Allocate(_ context.Context,
"bandwidthReq(Mbps)", reqInt,
"currentResult(Mbps)", allocationInfo.Egress)

resourceAllocationAnnotations, err := p.getResourceAllocationAnnotations(podAnnotations, allocationInfo)
resourceAllocationAnnotations, err := p.getResourceAllocationAnnotations(podAnnotations, allocationInfo, netClassID)
if err != nil {
err = fmt.Errorf("getResourceAllocationAnnotations for pod: %s/%s, container: %s failed with error: %v",
req.PodNamespace, req.PodName, req.ContainerName, err)
Expand Down Expand Up @@ -660,12 +679,23 @@ func (p *StaticPolicy) Allocate(_ context.Context,
Egress: uint32(reqInt),
Ingress: uint32(reqInt),
IfName: selectedNIC.Iface,
NSName: selectedNIC.NSName,
NumaNodes: siblingNUMAs,
Labels: general.DeepCopyMap(req.Labels),
Annotations: general.DeepCopyMap(req.Annotations),
QoSLevel: qosLevel,
NetClassID: fmt.Sprintf("%d", netClassID),
}

resourceAllocationAnnotations, err := p.getResourceAllocationAnnotations(podAnnotations, newAllocation)
err = applyImplicitReq(req, newAllocation)
if err != nil {
err = fmt.Errorf("p.applyImplicitReq for pod: %s/%s, container: %s failed with error: %v",
req.PodNamespace, req.PodName, req.ContainerName, err)
general.Errorf("%s", err.Error())
return nil, err
}

resourceAllocationAnnotations, err := p.getResourceAllocationAnnotations(podAnnotations, newAllocation, netClassID)
if err != nil {
err = fmt.Errorf("getResourceAllocationAnnotations for pod: %s/%s, container: %s failed with error: %v",
req.PodNamespace, req.PodName, req.ContainerName, err)
Expand All @@ -690,6 +720,8 @@ func (p *StaticPolicy) Allocate(_ context.Context,
// update state cache
p.state.SetMachineState(machineState)

p.generateAndApplyGroups()

return packAllocationResponse(req, newAllocation, respHint, resourceAllocationAnnotations)
}

Expand Down Expand Up @@ -730,7 +762,13 @@ func (p *StaticPolicy) applyNetClass() {
continue
}

classID, err := p.getNetClassID(pod.GetAnnotations(), p.podLevelNetClassAnnoKey)
qosLevel, err := p.qosConfig.GetQoSLevel(nil, pod.Annotations)
if err != nil {
general.Errorf("get qos level for pod: %s/%s failed with err", pod.Namespace, pod.Name)
continue
}

classID, err := p.getNetClassID(pod.GetAnnotations(), p.podLevelNetClassAnnoKey, qosLevel)
if err != nil {
general.Errorf("get net class id failed, pod: %s, err: %s", native.GenerateUniqObjectNameKey(pod), err)
continue
Expand Down Expand Up @@ -944,21 +982,20 @@ func (p *StaticPolicy) selectNICsByReq(req *pluginapi.ResourceRequest) ([]machin
return candidateNICs, nil
}

func (p *StaticPolicy) getResourceAllocationAnnotations(podAnnotations map[string]string, allocation *state.AllocationInfo) (map[string]string, error) {
netClsID, err := p.getNetClassID(podAnnotations, p.podLevelNetClassAnnoKey)
if err != nil {
return nil, fmt.Errorf("getNetClassID failed with error: %v", err)
}

func (p *StaticPolicy) getResourceAllocationAnnotations(podAnnotations map[string]string,
allocation *state.AllocationInfo, netClsID uint32) (map[string]string, error) {
selectedNIC := p.getNICByName(allocation.IfName)

resourceAllocationAnnotations := map[string]string{
p.ipv4ResourceAllocationAnnotationKey: strings.Join(selectedNIC.GetNICIPs(machine.IPVersionV4), IPsSeparator),
p.ipv6ResourceAllocationAnnotationKey: strings.Join(selectedNIC.GetNICIPs(machine.IPVersionV6), IPsSeparator),
p.netInterfaceNameResourceAllocationAnnotationKey: selectedNIC.Iface,
p.netClassIDResourceAllocationAnnotationKey: fmt.Sprintf("%d", netClsID),
}

if !isImplicitReq(podAnnotations) {
// TODO: support differentiated Egress/Ingress bandwidth later
p.netBandwidthResourceAllocationAnnotationKey: strconv.Itoa(int(allocation.Egress)),
resourceAllocationAnnotations[p.netBandwidthResourceAllocationAnnotationKey] = strconv.Itoa(int(allocation.Egress))
}

if len(selectedNIC.NSAbsolutePath) > 0 {
Expand Down Expand Up @@ -1002,10 +1039,12 @@ func (p *StaticPolicy) removePod(podUID string) error {
p.state.SetPodEntries(podEntries)
p.state.SetMachineState(machineState)

p.generateAndApplyGroups()

return nil
}

func (p *StaticPolicy) getNetClassID(podAnnotations map[string]string, podLevelNetClassAnnoKey string) (uint32, error) {
func (p *StaticPolicy) getNetClassID(podAnnotations map[string]string, podLevelNetClassAnnoKey, qosLevel string) (uint32, error) {
isPodLevelNetClassExist, classID, err := qos.GetPodNetClassID(podAnnotations, podLevelNetClassAnnoKey)
if err != nil {
return 0, err
Expand All @@ -1014,10 +1053,6 @@ func (p *StaticPolicy) getNetClassID(podAnnotations map[string]string, podLevelN
return classID, nil
}

qosLevel, err := p.qosConfig.GetQoSLevel(nil, podAnnotations)
if err != nil {
return 0, err
}
return p.getNetClassIDByQoSLevel(qosLevel)
}

Expand Down Expand Up @@ -1092,3 +1127,68 @@ func (p *StaticPolicy) clearResidualNetClass(activeNetClsData map[uint64]*common
}
}
}

func (p *StaticPolicy) generateLowPriorityGroup() error {

lowPriorityGroups := make(map[string]*qrmgeneral.NetworkGroup)
machineState := p.state.GetMachineState()

for nicName, nicState := range machineState {
groupName := getGroupName(nicName, LowPriorityGroupNameSuffix)
// [TODO] since getNICByName has alreday been used,
// we also assume nic name is unique here.
// But if the assumption is broken, we should reconsider logic here.
lowPriorityGroups[groupName] = &qrmgeneral.NetworkGroup{
Egress: nicState.EgressState.Allocatable,
}

negtive := false
for podUID, containerEntries := range nicState.PodEntries {
for containerName, allocationInfo := range containerEntries {
if allocationInfo.QoSLevel == apiconsts.PodAnnotationQoSLevelReclaimedCores {
if allocationInfo.NetClassID != "" {
lowPriorityGroups[groupName].NetClassIDs = append(lowPriorityGroups[groupName].NetClassIDs, allocationInfo.NetClassID)
}

requestedEgress, err := allocationInfo.GetRequestedEgress()

if err != nil {
return fmt.Errorf("GetRequestedEgress for pod: %s, container: %s failed with error: %v", podUID, containerName, err)
}

if !negtive && lowPriorityGroups[groupName].Egress > requestedEgress {
lowPriorityGroups[groupName].Egress -= requestedEgress
} else {
negtive = true
}
}
}
}

// [TODO] make 0.05 as option
if negtive {
lowPriorityGroups[groupName].Egress = uint32(float64(nicState.EgressState.Allocatable) * 0.05)
} else {
lowPriorityGroups[groupName].Egress = general.MaxUInt32(uint32(float64(nicState.EgressState.Allocatable)*0.05), lowPriorityGroups[groupName].Egress)
}
}

general.Infof("old lowPriorityGroups: %+v, new lowPriorityGroups: %+v", p.lowPriorityGroups, lowPriorityGroups)

p.lowPriorityGroups = lowPriorityGroups
return nil
}

func (p *StaticPolicy) generateAndApplyGroups() {
err := p.generateLowPriorityGroup()

if err != nil {
general.ErrorS(err, "generateLowPriorityGroup failed")
} else {
err = p.applyNetworkGroupsFunc(p.lowPriorityGroups)

if err != nil {
general.ErrorS(err, "applyGroups failed")
}
}
}
1 change: 1 addition & 0 deletions pkg/agent/qrm-plugins/network/staticpolicy/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package staticpolicy
28 changes: 28 additions & 0 deletions pkg/agent/qrm-plugins/network/staticpolicy/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ package staticpolicy

import (
"fmt"
"math"
"math/rand"
"time"

pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1"

"github.com/kubewharf/katalyst-api/pkg/consts"
apiconsts "github.com/kubewharf/katalyst-api/pkg/consts"
"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/agent"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/network/state"
qrmutil "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
)
Expand All @@ -42,6 +45,8 @@ const (
RandomOne NICSelectionPoligy = "random"
FirstOne NICSelectionPoligy = "first"
LastOne NICSelectionPoligy = "last"

LowPriorityGroupNameSuffix = "low_priority"
)

type NICFilter func(nics []machine.InterfaceInfo, req *pluginapi.ResourceRequest, agentCtx *agent.GenericContext) []machine.InterfaceInfo
Expand Down Expand Up @@ -325,3 +330,26 @@ func getResourceIdentifier(ifaceNS, ifaceName string) string {

return ifaceName
}

func applyImplicitReq(req *pluginapi.ResourceRequest, allocationInfo *state.AllocationInfo) error {
if req == nil || allocationInfo == nil {
return fmt.Errorf("nil req or allocationInfo")
}

if !isImplicitReq(req.Annotations) {
return nil
}

allocationInfo.Annotations[state.NetBandwidthImplicitAnnotationKey] =
fmt.Sprintf("%s",
general.Max(int(math.Ceil(req.ResourceRequests[string(apiconsts.ResourceNetBandwidth)])), 0))
return nil
}

func isImplicitReq(annotations map[string]string) bool {
return annotations[qrmutil.PodAnnotationQRMDeclarationKey] == qrmutil.PodAnnotationQRMDeclarationTrue
}

func getGroupName(nicName, groupSuffix string) string {
return fmt.Sprintf("%s_%s", nicName, groupSuffix)
}
6 changes: 6 additions & 0 deletions pkg/agent/qrm-plugins/util/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ const (
OCIPropertyNameMemoryLimitInBytes = "MemoryLimitInBytes"
)

const (
//[TODO]: move them to apiserver
PodAnnotationQRMDeclarationKey = "katalyst.kubewharf.io/qrm-declaration"
PodAnnotationQRMDeclarationTrue = "true"
)

const QRMTimeFormat = "2006-01-02 15:04:05.999999999 -0700 MST"

const QRMPluginPolicyTagName = "policy"
9 changes: 8 additions & 1 deletion pkg/agent/qrm-plugins/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,14 @@ func GetQuantityFromResourceReq(req *pluginapi.ResourceRequest) (int, float64, e
return general.Max(int(math.Ceil(req.ResourceRequests[key])), 0), req.ResourceRequests[key], nil
case string(apiconsts.ReclaimedResourceMilliCPU):
return general.Max(int(math.Ceil(req.ResourceRequests[key]/1000.0)), 0), req.ResourceRequests[key] / 1000.0, nil
case string(v1.ResourceMemory), string(apiconsts.ReclaimedResourceMemory), string(apiconsts.ResourceNetBandwidth):
case string(v1.ResourceMemory), string(apiconsts.ReclaimedResourceMemory):
return general.Max(int(math.Ceil(req.ResourceRequests[key])), 0), req.ResourceRequests[key], nil
case string(apiconsts.ResourceNetBandwidth):
if req.Annotations[PodAnnotationQRMDeclarationKey] != PodAnnotationQRMDeclarationTrue {
general.Infof("detect %s: %s, return %s: 0 instead of %s: %.2f",
PodAnnotationQRMDeclarationKey, PodAnnotationQRMDeclarationTrue, key, key, req.ResourceRequests[key])
return 0, 0, nil
}
return general.Max(int(math.Ceil(req.ResourceRequests[key])), 0), req.ResourceRequests[key], nil
default:
return 0, 0, fmt.Errorf("invalid request resource name: %s", key)
Expand Down
Loading

0 comments on commit c3efcce

Please sign in to comment.