Skip to content

Commit

Permalink
Merge pull request #697 from xu282934741/qrm-state-optimize
Browse files Browse the repository at this point in the history
refactor(qrm): refactor the state module for enhanced extensibility
  • Loading branch information
xu282934741 authored Sep 24, 2024
2 parents d1dc9d9 + 8cbe21e commit 7104feb
Show file tree
Hide file tree
Showing 13 changed files with 98 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func makeState(topo *machine.CPUTopology) (qrmstate.State, error) {
if err != nil {
return nil, fmt.Errorf("make tmp dir for checkpoint failed with error: %v", err)
}
return qrmstate.NewCheckpointState(tmpDir, "test", "test", topo, false)
return qrmstate.NewCheckpointState(tmpDir, "test", "test", topo, false, qrmstate.GenerateMachineStateFromPodEntries)
}

func TestNewCPUPressureEviction(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func makeState(topo *machine.CPUTopology) (qrmstate.State, error) {
if err != nil {
return nil, fmt.Errorf("make tmp dir for checkpoint failed with error: %v", err)
}
return qrmstate.NewCheckpointState(tmpDir, "test", "test", topo, false)
return qrmstate.NewCheckpointState(tmpDir, "test", "test", topo, false, qrmstate.GenerateMachineStateFromPodEntries)
}

func TestNewCPUPressureLoadEviction(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func NewDynamicPolicy(agentCtx *agent.GenericContext, conf *config.Configuration
}

stateImpl, stateErr := state.NewCheckpointState(conf.GenericQRMPluginConfiguration.StateFileDirectory, cpuPluginStateFileName,
cpuconsts.CPUResourcePluginPolicyNameDynamic, agentCtx.CPUTopology, conf.SkipCPUStateCorruption)
cpuconsts.CPUResourcePluginPolicyNameDynamic, agentCtx.CPUTopology, conf.SkipCPUStateCorruption, state.GenerateMachineStateFromPodEntries)
if stateErr != nil {
return false, agent.ComponentStub{}, fmt.Errorf("NewCheckpointState failed with error: %v", stateErr)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/qrm-plugins/cpu/dynamicpolicy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func getTestDynamicPolicyWithInitialization(topology *machine.CPUTopology, state
}

func getTestDynamicPolicyWithoutInitialization(topology *machine.CPUTopology, stateFileDirectory string) (*DynamicPolicy, error) {
stateImpl, err := state.NewCheckpointState(stateFileDirectory, cpuPluginStateFileName, cpuconsts.CPUResourcePluginPolicyNameDynamic, topology, false)
stateImpl, err := state.NewCheckpointState(stateFileDirectory, cpuPluginStateFileName, cpuconsts.CPUResourcePluginPolicyNameDynamic, topology, false, state.GenerateMachineStateFromPodEntries)
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,3 +749,5 @@ type State interface {
type ReadonlyState interface {
reader
}

type GenerateMachineStateFromPodEntriesFunc func(topology *machine.CPUTopology, podEntries PodEntries) (NUMANodeMap, error)
22 changes: 12 additions & 10 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,35 +40,37 @@ type stateCheckpoint struct {
checkpointName string
// when we add new properties to checkpoint,
// it will cause checkpoint corruption, and we should skip it
skipStateCorruption bool
skipStateCorruption bool
GenerateMachineStateFromPodEntries GenerateMachineStateFromPodEntriesFunc
}

var _ State = &stateCheckpoint{}

func NewCheckpointState(stateDir, checkpointName, policyName string,
topology *machine.CPUTopology, skipStateCorruption bool,
topology *machine.CPUTopology, skipStateCorruption bool, generateMachineStateFunc GenerateMachineStateFromPodEntriesFunc,
) (State, error) {
checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir)
if err != nil {
return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
}

sc := &stateCheckpoint{
cache: NewCPUPluginState(topology),
policyName: policyName,
checkpointManager: checkpointManager,
checkpointName: checkpointName,
skipStateCorruption: skipStateCorruption,
cache: NewCPUPluginState(topology),
policyName: policyName,
checkpointManager: checkpointManager,
checkpointName: checkpointName,
skipStateCorruption: skipStateCorruption,
GenerateMachineStateFromPodEntries: generateMachineStateFunc,
}

if err := sc.restoreState(topology); err != nil {
if err := sc.RestoreState(topology); err != nil {
return nil, fmt.Errorf("could not restore state from checkpoint: %v, please drain this node and delete "+
"the cpu plugin checkpoint file %q before restarting Kubelet", err, path.Join(stateDir, checkpointName))
}
return sc, nil
}

func (sc *stateCheckpoint) restoreState(topology *machine.CPUTopology) error {
func (sc *stateCheckpoint) RestoreState(topology *machine.CPUTopology) error {
sc.Lock()
defer sc.Unlock()
var err error
Expand All @@ -94,7 +96,7 @@ func (sc *stateCheckpoint) restoreState(topology *machine.CPUTopology) error {
return fmt.Errorf("[cpu_plugin] configured policy %q differs from state checkpoint policy %q", sc.policyName, checkpoint.PolicyName)
}

generatedMachineState, err := GenerateMachineStateFromPodEntries(topology, checkpoint.PodEntries, sc.policyName)
generatedMachineState, err := sc.GenerateMachineStateFromPodEntries(topology, checkpoint.PodEntries)
if err != nil {
return fmt.Errorf("GenerateMachineStateFromPodEntries failed with error: %v", err)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1449,15 +1449,15 @@ func TestNewCheckpointState(t *testing.T) {
require.NoError(t, cpm.CreateCheckpoint(cpuPluginStateFileName, checkpoint), "could not create testing checkpoint")
}

restoredState, err := NewCheckpointState(testingDir, cpuPluginStateFileName, policyName, cpuTopology, false)
restoredState, err := NewCheckpointState(testingDir, cpuPluginStateFileName, policyName, cpuTopology, false, GenerateMachineStateFromPodEntries)
if strings.TrimSpace(tc.expectedError) != "" {
require.Error(t, err)
require.Contains(t, err.Error(), "could not restore state from checkpoint:")
require.Contains(t, err.Error(), tc.expectedError)

// test skip corruption
if strings.Contains(err.Error(), "checkpoint is corrupted") {
_, err = NewCheckpointState(testingDir, cpuPluginStateFileName, policyName, cpuTopology, true)
_, err = NewCheckpointState(testingDir, cpuPluginStateFileName, policyName, cpuTopology, true, GenerateMachineStateFromPodEntries)
require.Nil(t, err)
}
} else {
Expand Down Expand Up @@ -1945,15 +1945,15 @@ func TestClearState(t *testing.T) {
}
defer os.RemoveAll(testingDir)

state1, err := NewCheckpointState(testingDir, cpuPluginStateFileName, policyName, tc.cpuTopology, false)
state1, err := NewCheckpointState(testingDir, cpuPluginStateFileName, policyName, tc.cpuTopology, false, GenerateMachineStateFromPodEntries)
as.Nil(err)

state1.ClearState()

state1.SetMachineState(tc.machineState)
state1.SetPodEntries(tc.podEntries)

state2, err := NewCheckpointState(testingDir, cpuPluginStateFileName, policyName, tc.cpuTopology, false)
state2, err := NewCheckpointState(testingDir, cpuPluginStateFileName, policyName, tc.cpuTopology, false, GenerateMachineStateFromPodEntries)
as.Nil(err)
assertStateEqual(t, state2, state1)
})
Expand Down Expand Up @@ -2438,7 +2438,7 @@ func TestCheckpointStateHelpers(t *testing.T) {
t.Run(tc.description, func(t *testing.T) {
t.Parallel()

state, err := NewCheckpointState(testingDir, cpuPluginStateFileName, policyName, tc.cpuTopology, false)
state, err := NewCheckpointState(testingDir, cpuPluginStateFileName, policyName, tc.cpuTopology, false, GenerateMachineStateFromPodEntries)
as.Nil(err)

state.ClearState()
Expand Down
131 changes: 68 additions & 63 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"k8s.io/klog/v2"

apiconsts "github.com/kubewharf/katalyst-api/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/consts"
cpuconsts "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/consts"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
Expand Down Expand Up @@ -187,68 +186,9 @@ func GetNonBindingSharedRequestedQuantityFromPodEntries(podEntries PodEntries, n
return CPUPreciseCeil(reqFloat64)
}

// GenerateMachineStateFromPodEntries returns NUMANodeMap for given resource based on
// machine info and reserved resources along with existed pod entries
func GenerateMachineStateFromPodEntries(topology *machine.CPUTopology, podEntries PodEntries, policyName string) (NUMANodeMap, error) {
if topology == nil {
return nil, fmt.Errorf("GenerateMachineStateFromPodEntries got nil topology")
}

machineState := make(NUMANodeMap)
for _, numaNode := range topology.CPUDetails.NUMANodes().ToSliceInt64() {
numaNodeState := &NUMANodeState{}
numaNodeAllCPUs := topology.CPUDetails.CPUsInNUMANodes(int(numaNode)).Clone()
allocatedCPUsInNumaNode := machine.NewCPUSet()

for podUID, containerEntries := range podEntries {
if containerEntries.IsPoolEntry() {
continue
}
for containerName, allocationInfo := range containerEntries {
if allocationInfo == nil {
general.Warningf("nil allocationInfo in podEntries")
continue
}

// the container hasn't cpuset assignment in the current NUMA node
if allocationInfo.OriginalTopologyAwareAssignments[int(numaNode)].Size() == 0 &&
allocationInfo.TopologyAwareAssignments[int(numaNode)].Size() == 0 {
continue
}

switch policyName {
case consts.CPUResourcePluginPolicyNameDynamic:
// only modify allocated and default properties in NUMA node state if the policy is dynamic and the entry indicates numa_binding.
// shared_cores with numa_binding also contributes to numaNodeState.AllocatedCPUSet,
// it's convenient that we can skip NUMA with AllocatedCPUSet > 0 when allocating CPUs for dedicated_cores with numa_binding.
if CheckNUMABinding(allocationInfo) {
allocatedCPUsInNumaNode = allocatedCPUsInNumaNode.Union(allocationInfo.OriginalTopologyAwareAssignments[int(numaNode)])
}
case consts.CPUResourcePluginPolicyNameNative:
// only modify allocated and default properties in NUMA node state if the policy is native and the QoS class is Guaranteed
if CheckDedicatedPool(allocationInfo) {
allocatedCPUsInNumaNode = allocatedCPUsInNumaNode.Union(allocationInfo.OriginalTopologyAwareAssignments[int(numaNode)])
}
}

topologyAwareAssignments, _ := machine.GetNumaAwareAssignments(topology, allocationInfo.AllocationResult.Intersection(numaNodeAllCPUs))
originalTopologyAwareAssignments, _ := machine.GetNumaAwareAssignments(topology, allocationInfo.OriginalAllocationResult.Intersection(numaNodeAllCPUs))

numaNodeAllocationInfo := allocationInfo.Clone()
numaNodeAllocationInfo.AllocationResult = allocationInfo.AllocationResult.Intersection(numaNodeAllCPUs)
numaNodeAllocationInfo.OriginalAllocationResult = allocationInfo.OriginalAllocationResult.Intersection(numaNodeAllCPUs)
numaNodeAllocationInfo.TopologyAwareAssignments = topologyAwareAssignments
numaNodeAllocationInfo.OriginalTopologyAwareAssignments = originalTopologyAwareAssignments

numaNodeState.SetAllocationInfo(podUID, containerName, numaNodeAllocationInfo)
}
}

numaNodeState.AllocatedCPUSet = allocatedCPUsInNumaNode.Clone()
numaNodeState.DefaultCPUSet = numaNodeAllCPUs.Difference(numaNodeState.AllocatedCPUSet)
machineState[int(numaNode)] = numaNodeState
}
return machineState, nil
// GenerateMachineStateFromPodEntries for dynamic policy
func GenerateMachineStateFromPodEntries(topology *machine.CPUTopology, podEntries PodEntries) (NUMANodeMap, error) {
return GenerateMachineStateFromPodEntriesByPolicy(topology, podEntries, cpuconsts.CPUResourcePluginPolicyNameDynamic)
}

func IsIsolationPool(poolName string) bool {
Expand Down Expand Up @@ -490,3 +430,68 @@ func checkCPUSetMap(map1, map2 map[int]machine.CPUSet) bool {
func CPUPreciseCeil(request float64) int {
return int(math.Ceil(float64(int(request*1000)) / 1000))
}

// GenerateMachineStateFromPodEntriesByPolicy returns NUMANodeMap for given resource based on
// machine info and reserved resources along with existed pod entries and policy name
// todo: extracting entire state package as a common standalone utility
func GenerateMachineStateFromPodEntriesByPolicy(topology *machine.CPUTopology, podEntries PodEntries, policyName string) (NUMANodeMap, error) {
if topology == nil {
return nil, fmt.Errorf("GenerateMachineStateFromPodEntriesByPolicy got nil topology")
}

machineState := make(NUMANodeMap)
for _, numaNode := range topology.CPUDetails.NUMANodes().ToSliceInt64() {
numaNodeState := &NUMANodeState{}
numaNodeAllCPUs := topology.CPUDetails.CPUsInNUMANodes(int(numaNode)).Clone()
allocatedCPUsInNumaNode := machine.NewCPUSet()

for podUID, containerEntries := range podEntries {
if containerEntries.IsPoolEntry() {
continue
}
for containerName, allocationInfo := range containerEntries {
if allocationInfo == nil {
general.Warningf("nil allocationInfo in podEntries")
continue
}

// the container hasn't cpuset assignment in the current NUMA node
if allocationInfo.OriginalTopologyAwareAssignments[int(numaNode)].Size() == 0 &&
allocationInfo.TopologyAwareAssignments[int(numaNode)].Size() == 0 {
continue
}

switch policyName {
case cpuconsts.CPUResourcePluginPolicyNameDynamic:
// only modify allocated and default properties in NUMA node state if the policy is dynamic and the entry indicates numa_binding.
// shared_cores with numa_binding also contributes to numaNodeState.AllocatedCPUSet,
// it's convenient that we can skip NUMA with AllocatedCPUSet > 0 when allocating CPUs for dedicated_cores with numa_binding.
if CheckNUMABinding(allocationInfo) {
allocatedCPUsInNumaNode = allocatedCPUsInNumaNode.Union(allocationInfo.OriginalTopologyAwareAssignments[int(numaNode)])
}
case cpuconsts.CPUResourcePluginPolicyNameNative:
// only modify allocated and default properties in NUMA node state if the policy is native and the QoS class is Guaranteed
if CheckDedicatedPool(allocationInfo) {
allocatedCPUsInNumaNode = allocatedCPUsInNumaNode.Union(allocationInfo.OriginalTopologyAwareAssignments[int(numaNode)])
}
}

topologyAwareAssignments, _ := machine.GetNumaAwareAssignments(topology, allocationInfo.AllocationResult.Intersection(numaNodeAllCPUs))
originalTopologyAwareAssignments, _ := machine.GetNumaAwareAssignments(topology, allocationInfo.OriginalAllocationResult.Intersection(numaNodeAllCPUs))

numaNodeAllocationInfo := allocationInfo.Clone()
numaNodeAllocationInfo.AllocationResult = allocationInfo.AllocationResult.Intersection(numaNodeAllCPUs)
numaNodeAllocationInfo.OriginalAllocationResult = allocationInfo.OriginalAllocationResult.Intersection(numaNodeAllCPUs)
numaNodeAllocationInfo.TopologyAwareAssignments = topologyAwareAssignments
numaNodeAllocationInfo.OriginalTopologyAwareAssignments = originalTopologyAwareAssignments

numaNodeState.SetAllocationInfo(podUID, containerName, numaNodeAllocationInfo)
}
}

numaNodeState.AllocatedCPUSet = allocatedCPUsInNumaNode.Clone()
numaNodeState.DefaultCPUSet = numaNodeAllCPUs.Difference(numaNodeState.AllocatedCPUSet)
machineState[int(numaNode)] = numaNodeState
}
return machineState, nil
}
2 changes: 1 addition & 1 deletion pkg/agent/qrm-plugins/cpu/dynamicpolicy/state/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ func TestGenerateCPUMachineStateByPodEntries(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "checkpoint-TestGenerateCPUMachineStateByPodEntries")
as.Nil(err)

machineState, err := GenerateMachineStateFromPodEntries(tc.cpuTopology, tc.podEntries, cpuconsts.CPUResourcePluginPolicyNameDynamic)
machineState, err := GenerateMachineStateFromPodEntries(tc.cpuTopology, tc.podEntries)
as.Nil(err)

as.Equalf(tc.expectedMachineState, machineState, "failed in test case: %s", tc.description)
Expand Down
3 changes: 1 addition & 2 deletions pkg/agent/qrm-plugins/cpu/dynamicpolicy/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1"

apiconsts "github.com/kubewharf/katalyst-api/pkg/consts"
cpuconsts "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/consts"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state"
"github.com/kubewharf/katalyst-core/pkg/util/general"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
Expand All @@ -38,7 +37,7 @@ func getProportionalSize(oldPoolSize, oldTotalSize, newTotalSize int, ceil bool)
}

func generateMachineStateFromPodEntries(topology *machine.CPUTopology, podEntries state.PodEntries) (state.NUMANodeMap, error) {
return state.GenerateMachineStateFromPodEntries(topology, podEntries, cpuconsts.CPUResourcePluginPolicyNameDynamic)
return state.GenerateMachineStateFromPodEntries(topology, podEntries)
}

// updateAllocationInfoByReq updates allocationInfo by latest req when admitting active pod,
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/qrm-plugins/cpu/nativepolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func NewNativePolicy(agentCtx *agent.GenericContext, conf *config.Configuration,
general.Infof("new native policy")

stateImpl, stateErr := state.NewCheckpointState(conf.GenericQRMPluginConfiguration.StateFileDirectory, cpuPluginStateFileName,
cpuconsts.CPUResourcePluginPolicyNameNative, agentCtx.CPUTopology, conf.SkipCPUStateCorruption)
cpuconsts.CPUResourcePluginPolicyNameNative, agentCtx.CPUTopology, conf.SkipCPUStateCorruption, nativepolicyutil.GenerateMachineStateFromPodEntries)
if stateErr != nil {
return false, agent.ComponentStub{}, fmt.Errorf("NewCheckpointState failed with error: %v", stateErr)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/qrm-plugins/cpu/nativepolicy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

cpuconsts "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/consts"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state"
nativepolicyutil "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/nativepolicy/util"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util"
"github.com/kubewharf/katalyst-core/pkg/config/agent/dynamic"
"github.com/kubewharf/katalyst-core/pkg/metrics"
Expand All @@ -42,7 +43,7 @@ const (

func getTestNativePolicy(topology *machine.CPUTopology, stateFileDirectory string) (*NativePolicy, error) {
stateImpl, err := state.NewCheckpointState(stateFileDirectory, cpuPluginStateFileName,
cpuconsts.CPUResourcePluginPolicyNameNative, topology, false)
cpuconsts.CPUResourcePluginPolicyNameNative, topology, false, nativepolicyutil.GenerateMachineStateFromPodEntries)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/qrm-plugins/cpu/nativepolicy/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/util/machine"
)

// GenerateMachineStateFromPodEntries for native policy
func GenerateMachineStateFromPodEntries(topology *machine.CPUTopology, podEntries state.PodEntries) (state.NUMANodeMap, error) {
return state.GenerateMachineStateFromPodEntries(topology, podEntries, cpuconsts.CPUResourcePluginPolicyNameNative)
return state.GenerateMachineStateFromPodEntriesByPolicy(topology, podEntries, cpuconsts.CPUResourcePluginPolicyNameNative)
}

0 comments on commit 7104feb

Please sign in to comment.