diff --git a/cmd/katalyst-agent/app/options/orm/orm_base.go b/cmd/katalyst-agent/app/options/orm/orm_base.go index c39c55532..5a9ad2a32 100644 --- a/cmd/katalyst-agent/app/options/orm/orm_base.go +++ b/cmd/katalyst-agent/app/options/orm/orm_base.go @@ -22,10 +22,11 @@ import ( cliflag "k8s.io/component-base/cli/flag" ormconfig "github.com/kubewharf/katalyst-core/pkg/config/agent/orm" + "github.com/kubewharf/katalyst-core/pkg/consts" ) type GenericORMPluginOptions struct { - ORMWorkMode string + ORMWorkMode consts.WorkMode ORMReconcilePeriod time.Duration ORMResourceNamesMap map[string]string ORMPodNotifyChanLen int @@ -42,7 +43,7 @@ type GenericORMPluginOptions struct { func NewGenericORMPluginOptions() *GenericORMPluginOptions { return &GenericORMPluginOptions{ - ORMWorkMode: "bypass", + ORMWorkMode: consts.WorkModeBypass, ORMReconcilePeriod: time.Second * 5, ORMResourceNamesMap: map[string]string{}, ORMPodNotifyChanLen: 10, @@ -61,7 +62,7 @@ func NewGenericORMPluginOptions() *GenericORMPluginOptions { func (o *GenericORMPluginOptions) AddFlags(fss *cliflag.NamedFlagSets) { fs := fss.FlagSet("orm") - fs.StringVar(&o.ORMWorkMode, "orm-work-mode", o.ORMWorkMode, "orm work mode, nri or bypass") + fs.StringVar((*string)(&o.ORMWorkMode), "orm-work-mode", string(o.ORMWorkMode), "orm work mode, nri or bypass") fs.DurationVar(&o.ORMReconcilePeriod, "orm-reconcile-period", o.ORMReconcilePeriod, "orm resource reconcile period") fs.StringToStringVar(&o.ORMResourceNamesMap, "orm-resource-names-map", o.ORMResourceNamesMap, diff --git a/pkg/agent/orm/manager.go b/pkg/agent/orm/manager.go index 615c4f897..455d9e4ac 100644 --- a/pkg/agent/orm/manager.go +++ b/pkg/agent/orm/manager.go @@ -26,11 +26,9 @@ import ( "sync" "time" - "github.com/containerd/nri/pkg/api" "github.com/containerd/nri/pkg/stub" "github.com/opencontainers/selinux/go-selinux" "google.golang.org/grpc" - "gopkg.in/yaml.v3" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" @@ -46,9 +44,9 @@ import ( "github.com/kubewharf/katalyst-core/pkg/agent/orm/server" "github.com/kubewharf/katalyst-core/pkg/agent/orm/server/podresources" "github.com/kubewharf/katalyst-core/pkg/agent/orm/topology" - "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util" "github.com/kubewharf/katalyst-core/pkg/config" "github.com/kubewharf/katalyst-core/pkg/config/generic" + "github.com/kubewharf/katalyst-core/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/metaserver" metaserverpod "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod" "github.com/kubewharf/katalyst-core/pkg/metrics" @@ -58,14 +56,10 @@ import ( "github.com/kubewharf/katalyst-core/pkg/util/native" ) -type nriConfig struct { - Events []string `json:"events"` -} - type ManagerImpl struct { ctx context.Context - mode workMode + mode consts.WorkMode socketname string socketdir string @@ -122,7 +116,7 @@ func NewManager(socketPath string, emitter metrics.MetricEmitter, metaServer *me } m := &ManagerImpl{ - mode: workMode(config.ORMWorkMode), + mode: config.ORMWorkMode, socketdir: dir, socketname: file, @@ -164,42 +158,17 @@ func NewManager(socketPath string, emitter metrics.MetricEmitter, metaServer *me } func (m *ManagerImpl) initORMWorkMode(config *config.Configuration) { - if !m.validateNRIMode(config) { - m.mode = workModeBypass + if m.validateNRIMode(config) { + m.mode = consts.WorkModeNri + klog.Infof("[ORM] init ORM work mode with nri mode") + } else { + m.mode = consts.WorkModeBypass klog.Infof("[ORM] init ORM work mode with bypass mode") m.resourceExecutor = executor.NewExecutor(cgroupmgr.GetManager()) - return } - klog.Infof("[ORM] init ORM work mode with nri mode") return } -func (m *ManagerImpl) validateNRIMode(config *config.Configuration) bool { - var err error - if config.ORMWorkMode != string(workModeNri) { - return false - } - if _, err := os.Stat(config.ORMNRISocketPath); os.IsNotExist(err) { - klog.Errorf("[ORM] nri socket path %q does not exist", config.ORMNRISocketPath) - return false - } - var opts []stub.Option - opts = append(opts, stub.WithPluginName(config.ORMNRIPluginName)) - opts = append(opts, stub.WithPluginIdx(config.ORMNRIPluginIndex)) - opts = append(opts, stub.WithSocketPath(config.ORMNRISocketPath)) - m.nriOptions = opts - - if m.nriMask, err = api.ParseEventMask(config.ORMNRIHandleEvents); err != nil { - klog.Errorf("[ORM] parse nri handle events fail: %v", err) - return false - } - if m.nriStub, err = stub.New(m, append(opts, stub.WithOnClose(m.onClose))...); err != nil { - klog.Errorf("[ORM] create nri stub fail: %v", err) - return false - } - return true -} - func (m *ManagerImpl) Run(ctx context.Context) { klog.V(2).Infof("[ORM] running...") m.ctx = ctx @@ -248,7 +217,7 @@ func (m *ManagerImpl) Run(ctx context.Context) { klog.V(5).Infof("[ORM] start serve socketPath %v", socketPath) - if m.mode == workModeBypass { + if m.mode == consts.WorkModeBypass { go func() { m.process() }() @@ -370,7 +339,7 @@ func (m *ManagerImpl) Allocate(pod *v1.Pod, container *v1.Container) error { // allocate resources for current pod, return after resource allocate when run in NRIMode err := m.addContainer(pod, container) - if err != nil || m.mode == workModeNri { + if err != nil || m.mode == consts.WorkModeNri { return err } @@ -452,7 +421,7 @@ func (m *ManagerImpl) processAddPod(podUID string) error { pod *v1.Pod err error ) - if m.mode == workModeNri { + if m.mode == consts.WorkModeNri { nriQueryCtx := context.WithValue(m.ctx, metaserverpod.BypassCacheKey, metaserverpod.BypassCacheTrue) pod, err = m.metaManager.GetPod(nriQueryCtx, podUID) } else { @@ -683,7 +652,7 @@ func (m *ManagerImpl) reconcile() { } } - if m.mode == workModeNri { + if m.mode == consts.WorkModeNri { containerId, err := native.GetContainerID(pod, container.Name) if err != nil { klog.Errorf("[ORM] pod: %s/%s/%s, container: %s, get container id fail: %v", @@ -762,135 +731,6 @@ func (m *ManagerImpl) IsContainerRequestResource(container *v1.Container, resour return false, nil } -// ************************************NRI Plugin Interface implement ************************************************** - -func (m *ManagerImpl) Configure(_ context.Context, config, runtime, version string) (stub.EventMask, error) { - klog.V(4).Infof("got configuration data: %q from runtime %s %s", config, runtime, version) - if config == "" { - return m.nriMask, nil - } - - err := yaml.Unmarshal([]byte(config), &m.nriConf) - if err != nil { - return 0, fmt.Errorf("failed to parse provided configuration: %w", err) - } - - m.nriMask, err = api.ParseEventMask(m.nriConf.Events...) - if err != nil { - return 0, fmt.Errorf("failed to parse events in configuration: %w", err) - } - - klog.V(6).Infof("handle NRI Configure successfully, config %s, runtime %s, version %s", - config, runtime, version) - return m.nriMask, nil -} - -func (m *ManagerImpl) Synchronize(_ context.Context, pods []*api.PodSandbox, containers []*api.Container) ( - []*api.ContainerUpdate, error, -) { - // todo: update existed containers resources if orm stared after the Pod create events - return nil, nil -} - -func (m *ManagerImpl) RunPodSandbox(_ context.Context, pod *api.PodSandbox) error { - klog.Infof("[ORM] RunPodSandbox, pod: %s/%s/%s", pod.Namespace, pod.Name, pod.Uid) - klog.V(6).Infof("[ORM] RunPodSandbox, pod annotations: %v", pod.Annotations) - err := m.processAddPod(pod.Uid) - if err != nil { - klog.Errorf("[ORM] RunPodSandbox processAddPod fail, pod: %s/%s/%s, err: %v", - pod.Namespace, pod.Name, pod.Uid, err) - } - return err -} - -func (m *ManagerImpl) CreateContainer(_ context.Context, pod *api.PodSandbox, container *api.Container) ( - *api.ContainerAdjustment, []*api.ContainerUpdate, error, -) { - klog.Infof("[ORM] CreateContainer, pod: %s/%s/%s, container: %v", pod.Namespace, pod.Name, pod.Uid, container.Name) - containerAllResources := m.podResources.containerAllResources(pod.Uid, container.Name) - if containerAllResources == nil { - klog.V(5).Infof("[ORM] CreateContainer process failed, pod: %s/%s/%s, container: %v, resources nil", - pod.Namespace, pod.Name, pod.Uid, container.Name) - return nil, nil, nil - } - - adjust := &api.ContainerAdjustment{} - for _, resourceAllocationInfo := range containerAllResources { - switch resourceAllocationInfo.OciPropertyName { - case util.OCIPropertyNameCPUSetCPUs: - if resourceAllocationInfo.AllocationResult != "" { - adjust.SetLinuxCPUSetCPUs(resourceAllocationInfo.AllocationResult) - } - case util.OCIPropertyNameCPUSetMems: - if resourceAllocationInfo.AllocationResult != "" { - adjust.SetLinuxCPUSetMems(resourceAllocationInfo.AllocationResult) - } - } - } - klog.V(5).Infof("[ORM] handle NRI CreateContainer successfully, pod: %s/%s/%s, container: %s, adjust: %v", - pod.Namespace, pod.Name, pod.Uid, container.Name, adjust) - return adjust, nil, nil -} - -func (m *ManagerImpl) UpdateContainer(_ context.Context, pod *api.PodSandbox, container *api.Container, r *api.LinuxResources, -) ([]*api.ContainerUpdate, error) { - // todo: hook this method to update container resources - return nil, nil - // containerUpdate := m.getNRIContainerUpdate(pod.Uid, container.Id, container.Name) - // klog.V(5).Infof("[ORM] handle NRI UpdateContainer successfully, pod: %s/%s/%s, container: %s, update: %v", - // pod.Namespace, pod.Name, pod.Uid, container.Name, containerUpdate) - // return []*api.ContainerUpdate{containerUpdate}, nil -} - -func (m *ManagerImpl) RemovePodSandbox(_ context.Context, pod *api.PodSandbox) error { - klog.Infof("[ORM] RemovePodSandbox, pod: %s/%s/%s", pod.Namespace, pod.Name, pod.Uid) - err := m.processDeletePod(pod.Uid) - if err != nil { - klog.Errorf("[ORM] RemovePodSandbox processDeletePod fail, pod: %s/%s/%s, err: %v", - pod.Namespace, pod.Name, pod.Uid, err) - } - return err -} - -func (m *ManagerImpl) onClose() { - m.nriStub.Stop() - klog.V(6).Infof("NRI server closes") -} - -func (m *ManagerImpl) updateContainerByNRI(podUID, containerId, containerName string) { - klog.V(2).Infof("[ORM] updateContainerByNRI, pod: %v, container: %v", podUID, containerName) - containerUpdate := m.getNRIContainerUpdate(podUID, containerId, containerName) - _, err := m.nriStub.UpdateContainers([]*api.ContainerUpdate{containerUpdate}) - if err != nil { - klog.Errorf("[ORM] updateContainerByNRI fail, pod %v container %v,resource %v, err: %v", podUID, containerName, err) - } -} - -func (m *ManagerImpl) getNRIContainerUpdate(podUID, containerId, containerName string) *api.ContainerUpdate { - containerUpdate := &api.ContainerUpdate{ - ContainerId: containerId, - Linux: &api.LinuxContainerUpdate{Resources: &api.LinuxResources{Cpu: &api.LinuxCPU{Cpus: "", Mems: ""}}}, - } - containerAllResources := m.podResources.containerAllResources(podUID, containerName) - for _, resourceAllocationInfo := range containerAllResources { - switch resourceAllocationInfo.OciPropertyName { - case util.OCIPropertyNameCPUSetCPUs: - if resourceAllocationInfo.AllocationResult != "" { - containerUpdate.Linux.Resources.Cpu.Cpus = resourceAllocationInfo.AllocationResult - } - case util.OCIPropertyNameCPUSetMems: - if resourceAllocationInfo.AllocationResult != "" { - containerUpdate.Linux.Resources.Cpu.Mems = resourceAllocationInfo.AllocationResult - } - default: - - } - } - return containerUpdate -} - -// ********************************************************************************************************************* - func GetContainerTypeAndIndex(pod *v1.Pod, container *v1.Container) (containerType pluginapi.ContainerType, containerIndex uint64, err error) { if pod == nil || container == nil { err = fmt.Errorf("got nil pod: %v or container: %v", pod, container) diff --git a/pkg/agent/orm/manager_nri.go b/pkg/agent/orm/manager_nri.go new file mode 100644 index 000000000..ffc5653de --- /dev/null +++ b/pkg/agent/orm/manager_nri.go @@ -0,0 +1,188 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package orm + +import ( + "context" + "fmt" + "os" + + "gopkg.in/yaml.v3" + + "github.com/containerd/nri/pkg/api" + "github.com/containerd/nri/pkg/stub" + "k8s.io/klog/v2" + + "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util" + "github.com/kubewharf/katalyst-core/pkg/config" + "github.com/kubewharf/katalyst-core/pkg/consts" +) + +type nriConfig struct { + Events []string `json:"events"` +} + +func (m *ManagerImpl) validateNRIMode(config *config.Configuration) bool { + var err error + if config.ORMWorkMode != consts.WorkModeNri { + return false + } + if _, err := os.Stat(config.ORMNRISocketPath); os.IsNotExist(err) { + klog.Errorf("[ORM] nri socket path %q does not exist", config.ORMNRISocketPath) + return false + } + var opts []stub.Option + opts = append(opts, stub.WithPluginName(config.ORMNRIPluginName)) + opts = append(opts, stub.WithPluginIdx(config.ORMNRIPluginIndex)) + opts = append(opts, stub.WithSocketPath(config.ORMNRISocketPath)) + m.nriOptions = opts + + if m.nriMask, err = api.ParseEventMask(config.ORMNRIHandleEvents); err != nil { + klog.Errorf("[ORM] parse nri handle events fail: %v", err) + return false + } + if m.nriStub, err = stub.New(m, append(opts, stub.WithOnClose(m.onClose))...); err != nil { + klog.Errorf("[ORM] create nri stub fail: %v", err) + return false + } + return true +} + +func (m *ManagerImpl) Configure(_ context.Context, config, runtime, version string) (stub.EventMask, error) { + klog.V(4).Infof("got configuration data: %q from runtime %s %s", config, runtime, version) + if config == "" { + return m.nriMask, nil + } + + err := yaml.Unmarshal([]byte(config), &m.nriConf) + if err != nil { + return 0, fmt.Errorf("failed to parse provided configuration: %w", err) + } + + m.nriMask, err = api.ParseEventMask(m.nriConf.Events...) + if err != nil { + return 0, fmt.Errorf("failed to parse events in configuration: %w", err) + } + + klog.V(6).Infof("handle NRI Configure successfully, config %s, runtime %s, version %s", + config, runtime, version) + return m.nriMask, nil +} + +func (m *ManagerImpl) Synchronize(_ context.Context, pods []*api.PodSandbox, containers []*api.Container) ( + []*api.ContainerUpdate, error, +) { + // todo: update existed containers resources if orm stared after the Pod create events + return nil, nil +} + +func (m *ManagerImpl) RunPodSandbox(_ context.Context, pod *api.PodSandbox) error { + klog.Infof("[ORM] RunPodSandbox, pod: %s/%s/%s", pod.Namespace, pod.Name, pod.Uid) + klog.V(6).Infof("[ORM] RunPodSandbox, pod annotations: %v", pod.Annotations) + err := m.processAddPod(pod.Uid) + if err != nil { + klog.Errorf("[ORM] RunPodSandbox processAddPod fail, pod: %s/%s/%s, err: %v", + pod.Namespace, pod.Name, pod.Uid, err) + } + return err +} + +func (m *ManagerImpl) CreateContainer(_ context.Context, pod *api.PodSandbox, container *api.Container) ( + *api.ContainerAdjustment, []*api.ContainerUpdate, error, +) { + klog.Infof("[ORM] CreateContainer, pod: %s/%s/%s, container: %v", pod.Namespace, pod.Name, pod.Uid, container.Name) + containerAllResources := m.podResources.containerAllResources(pod.Uid, container.Name) + if containerAllResources == nil { + klog.V(5).Infof("[ORM] CreateContainer process failed, pod: %s/%s/%s, container: %v, resources nil", + pod.Namespace, pod.Name, pod.Uid, container.Name) + return nil, nil, nil + } + + adjust := &api.ContainerAdjustment{} + for _, resourceAllocationInfo := range containerAllResources { + switch resourceAllocationInfo.OciPropertyName { + case util.OCIPropertyNameCPUSetCPUs: + if resourceAllocationInfo.AllocationResult != "" { + adjust.SetLinuxCPUSetCPUs(resourceAllocationInfo.AllocationResult) + } + case util.OCIPropertyNameCPUSetMems: + if resourceAllocationInfo.AllocationResult != "" { + adjust.SetLinuxCPUSetMems(resourceAllocationInfo.AllocationResult) + } + } + } + klog.V(5).Infof("[ORM] handle NRI CreateContainer successfully, pod: %s/%s/%s, container: %s, adjust: %v", + pod.Namespace, pod.Name, pod.Uid, container.Name, adjust) + return adjust, nil, nil +} + +func (m *ManagerImpl) UpdateContainer(_ context.Context, pod *api.PodSandbox, container *api.Container, r *api.LinuxResources, +) ([]*api.ContainerUpdate, error) { + // todo: hook this method to update container resources + return nil, nil + // containerUpdate := m.getNRIContainerUpdate(pod.Uid, container.Id, container.Name) + // klog.V(5).Infof("[ORM] handle NRI UpdateContainer successfully, pod: %s/%s/%s, container: %s, update: %v", + // pod.Namespace, pod.Name, pod.Uid, container.Name, containerUpdate) + // return []*api.ContainerUpdate{containerUpdate}, nil +} + +func (m *ManagerImpl) RemovePodSandbox(_ context.Context, pod *api.PodSandbox) error { + klog.Infof("[ORM] RemovePodSandbox, pod: %s/%s/%s", pod.Namespace, pod.Name, pod.Uid) + err := m.processDeletePod(pod.Uid) + if err != nil { + klog.Errorf("[ORM] RemovePodSandbox processDeletePod fail, pod: %s/%s/%s, err: %v", + pod.Namespace, pod.Name, pod.Uid, err) + } + return err +} + +func (m *ManagerImpl) onClose() { + m.nriStub.Stop() + klog.V(6).Infof("NRI server closes") +} + +func (m *ManagerImpl) updateContainerByNRI(podUID, containerId, containerName string) { + klog.V(2).Infof("[ORM] updateContainerByNRI, pod: %v, container: %v", podUID, containerName) + containerUpdate := m.getNRIContainerUpdate(podUID, containerId, containerName) + _, err := m.nriStub.UpdateContainers([]*api.ContainerUpdate{containerUpdate}) + if err != nil { + klog.Errorf("[ORM] updateContainerByNRI fail, pod %v container %v,resource %v, err: %v", podUID, containerName, err) + } +} + +func (m *ManagerImpl) getNRIContainerUpdate(podUID, containerId, containerName string) *api.ContainerUpdate { + containerUpdate := &api.ContainerUpdate{ + ContainerId: containerId, + Linux: &api.LinuxContainerUpdate{Resources: &api.LinuxResources{Cpu: &api.LinuxCPU{Cpus: "", Mems: ""}}}, + } + containerAllResources := m.podResources.containerAllResources(podUID, containerName) + for _, resourceAllocationInfo := range containerAllResources { + switch resourceAllocationInfo.OciPropertyName { + case util.OCIPropertyNameCPUSetCPUs: + if resourceAllocationInfo.AllocationResult != "" { + containerUpdate.Linux.Resources.Cpu.Cpus = resourceAllocationInfo.AllocationResult + } + case util.OCIPropertyNameCPUSetMems: + if resourceAllocationInfo.AllocationResult != "" { + containerUpdate.Linux.Resources.Cpu.Mems = resourceAllocationInfo.AllocationResult + } + default: + + } + } + return containerUpdate +} diff --git a/pkg/agent/orm/manager_nri_test.go b/pkg/agent/orm/manager_nri_test.go new file mode 100644 index 000000000..120ccefc9 --- /dev/null +++ b/pkg/agent/orm/manager_nri_test.go @@ -0,0 +1,293 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package orm + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "testing" + + "github.com/containerd/nri/pkg/api" + "github.com/containerd/nri/pkg/stub" + cadvisorapi "github.com/google/cadvisor/info/v1" + "github.com/stretchr/testify/assert" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + + "github.com/kubewharf/katalyst-core/pkg/agent/orm/endpoint" + "github.com/kubewharf/katalyst-core/pkg/agent/orm/metamanager" + "github.com/kubewharf/katalyst-core/pkg/agent/orm/topology" + "github.com/kubewharf/katalyst-core/pkg/metrics" +) + +type fakeNRIStub struct{} + +func (f *fakeNRIStub) Run(ctx context.Context) error { + return nil +} + +func (f *fakeNRIStub) Start(ctx context.Context) error { + return nil +} + +func (f *fakeNRIStub) Stop() { + return +} + +func (f *fakeNRIStub) Wait() { + return +} + +func (f *fakeNRIStub) UpdateContainers(_ []*api.ContainerUpdate) ([]*api.ContainerUpdate, error) { + return nil, nil +} + +func TestManagerImpl_Configure(t *testing.T) { + t.Parallel() + conf := "{\"events\":[\"RunPodSandbox\",\"CreateContainer\",\"UpdateContainer\",\"RemovePodSandbox\"]}" + m := &ManagerImpl{ + nriStub: &fakeNRIStub{}, + } + eventMask, err := m.Configure(context.TODO(), conf, "", "") + res := stub.EventMask(141) + assert.NoError(t, err) + assert.Equal(t, eventMask, res) +} + +func TestManagerImpl_Synchronize(t *testing.T) { + t.Parallel() + m := &ManagerImpl{ + nriStub: &fakeNRIStub{}, + } + update, err := m.Synchronize(context.TODO(), []*api.PodSandbox{}, []*api.Container{}) + assert.NoError(t, err) + assert.Nil(t, update) +} + +func TestManagerImpl_RunPodSandbox(t *testing.T) { + t.Parallel() + + pods := []*v1.Pod{ + makePod("testPod1", v1.ResourceList{ + "cpu": *resource.NewQuantity(2, resource.DecimalSI), + "memory": *resource.NewQuantity(2, resource.DecimalSI), + }), + makePod("testPod2", v1.ResourceList{ + "cpu": *resource.NewQuantity(2, resource.DecimalSI), + "memory": *resource.NewQuantity(2, resource.DecimalSI), + }), + } + + m := &ManagerImpl{ + nriStub: &fakeNRIStub{}, + endpoints: map[string]endpoint.EndpointInfo{}, + podResources: newPodResourcesChk(), + } + topologyManager, _ := topology.NewManager([]cadvisorapi.Node{ + { + Id: 0, + }, + }, "none", nil) + topologyManager.AddHintProvider(m) + m.topologyManager = topologyManager + + ckDir, err := ioutil.TempDir("", "checkpoint-Test") + assert.NoError(t, err) + defer func() { _ = os.RemoveAll(ckDir) }() + + conf := generateTestConfiguration(ckDir) + metaServer, err := generateTestMetaServer(conf, pods) + assert.NoError(t, err) + metaManager := metamanager.NewManager(metrics.DummyMetrics{}, m.podResources.pods, metaServer) + m.metaManager = metaManager + + checkpointManager, err := checkpointmanager.NewCheckpointManager("/tmp/process") + assert.NoError(t, err) + + m.checkpointManager = checkpointManager + podUID1 := "testPodUID1" + err = m.RunPodSandbox(context.TODO(), &api.PodSandbox{Uid: podUID1}) + assert.Error(t, err, fmt.Errorf("failed to find pod by uid testPod1")) +} + +func TestManagerImpl_CreateContainer(t *testing.T) { + t.Parallel() + m := &ManagerImpl{ + podResources: newPodResourcesChk(), + } + + // test CpuSetCpus + resourceCpuSetCpusAllocationInfo := generateCpuSetCpusAllocationInfo() + podUID1 := "testPodUID1" + podName1 := "testPodName1" + containerName1 := "testContainer1" + containerID1 := "03edb7b6b6becaba276d2c8f5557927661774a69c0cb2230a1fe1f297ca4d4f6" + m.podResources.insert(podUID1, containerName1, "cpu", resourceCpuSetCpusAllocationInfo) + pod1 := &api.PodSandbox{ + Name: podName1, + Uid: podUID1, + } + container1 := &api.Container{ + Id: containerID1, + Name: containerName1, + } + containerAdjust1, _, err := m.CreateContainer(context.TODO(), pod1, container1) + assert.NoError(t, err) + res1 := &api.ContainerAdjustment{ + Linux: &api.LinuxContainerAdjustment{Resources: &api.LinuxResources{Cpu: &api.LinuxCPU{ + Cpus: "5-6,10", + }}}, + } + assert.Equal(t, containerAdjust1, res1) + + // test CpuSetMems + resourceCpuSetMemsAllocationInfo := generateCpuSetMemsAllocationInfo() + podUID2 := "testPodUID2" + podName2 := "testPodName2" + containerName2 := "testContainer2" + containerID2 := "03edb7b6b6becaba276d2c8f5557927661774a69c0cb2230a1fe1f297ca4d4f5" + m.podResources.insert(podUID2, containerName2, "cpu", resourceCpuSetMemsAllocationInfo) + pod2 := &api.PodSandbox{ + Name: podName2, + Uid: podUID2, + } + container2 := &api.Container{ + Id: containerID2, + Name: containerName2, + } + containerAdjust2, _, err := m.CreateContainer(context.TODO(), pod2, container2) + assert.NoError(t, err) + res2 := &api.ContainerAdjustment{ + Linux: &api.LinuxContainerAdjustment{Resources: &api.LinuxResources{Cpu: &api.LinuxCPU{ + Mems: "7-8,11", + }}}, + } + assert.Equal(t, containerAdjust2, res2) + + // test pod not exist + podNotExist := &api.PodSandbox{ + Name: "PodNotExist", + Uid: "PodUIDNotExist", + } + containerNotExist := &api.Container{ + Id: "ContainerIDNotExist", + Name: "ContainerNotExist", + } + containerAdjust, _, err := m.CreateContainer(context.TODO(), podNotExist, containerNotExist) + assert.Nil(t, containerAdjust) + assert.NoError(t, err) +} + +func TestManagerImpl_UpdateContainer(t *testing.T) { + t.Parallel() + m := &ManagerImpl{ + nriStub: &fakeNRIStub{}, + } + update, err := m.UpdateContainer(context.TODO(), &api.PodSandbox{}, &api.Container{}, &api.LinuxResources{}) + assert.NoError(t, err) + assert.Nil(t, update) +} + +func TestManagerImpl_RemovePodSandbox(t *testing.T) { + t.Parallel() + m := &ManagerImpl{ + nriStub: &fakeNRIStub{}, + endpoints: map[string]endpoint.EndpointInfo{}, + podResources: newPodResourcesChk(), + } + topologyManager, _ := topology.NewManager([]cadvisorapi.Node{ + { + Id: 0, + }, + }, "none", nil) + topologyManager.AddHintProvider(m) + m.topologyManager = topologyManager + + checkpointManager, err := checkpointmanager.NewCheckpointManager("/tmp/process") + assert.NoError(t, err) + + m.checkpointManager = checkpointManager + podUID := "testPodUID" + err = m.RemovePodSandbox(context.TODO(), &api.PodSandbox{Uid: podUID}) + assert.NoError(t, err) +} + +func TestManagerImpl_onClose(t *testing.T) { + t.Parallel() + m := &ManagerImpl{ + nriStub: &fakeNRIStub{}, + } + m.onClose() +} + +func TestManagerImpl_updateContainerByNRI(t *testing.T) { + t.Parallel() + m := &ManagerImpl{ + podResources: newPodResourcesChk(), + nriStub: &fakeNRIStub{}, + } + + resourceCpuSetCpusAllocationInfo := generateCpuSetCpusAllocationInfo() + podUID1 := "testPodUID1" + containerName1 := "testContainer1" + containerID1 := "03edb7b6b6becaba276d2c8f5557927661774a69c0cb2230a1fe1f297ca4d4f6" + m.podResources.insert(podUID1, containerName1, "cpu", resourceCpuSetCpusAllocationInfo) + m.updateContainerByNRI(podUID1, containerID1, containerName1) +} + +func TestManagerImpl_getNRIContainerUpdate(t *testing.T) { + t.Parallel() + m := &ManagerImpl{ + podResources: newPodResourcesChk(), + } + + resourceCpuSetCpusAllocationInfo := generateCpuSetCpusAllocationInfo() + podUID1 := "testPodUID1" + containerName1 := "testContainer1" + containerID1 := "03edb7b6b6becaba276d2c8f5557927661774a69c0cb2230a1fe1f297ca4d4f6" + m.podResources.insert(podUID1, containerName1, "cpu", resourceCpuSetCpusAllocationInfo) + containerUpdate1 := m.getNRIContainerUpdate(podUID1, containerID1, containerName1) + res1 := &api.ContainerUpdate{ + ContainerId: containerID1, + Linux: &api.LinuxContainerUpdate{Resources: &api.LinuxResources{ + Cpu: &api.LinuxCPU{ + Cpus: "5-6,10", + }, + }}, + } + assert.Equal(t, containerUpdate1, res1) + + resourceCpuSetMemsAllocationInfo := generateCpuSetMemsAllocationInfo() + podUID2 := "testPodUID2" + containerName2 := "testContainer2" + containerID2 := "03edb7b6b6becaba276d2c8f5557927661774a69c0cb2230a1fe1f297ca4d4f5" + m.podResources.insert(podUID2, containerName2, "cpu", resourceCpuSetMemsAllocationInfo) + containerUpdate2 := m.getNRIContainerUpdate(podUID2, containerID2, containerName2) + res2 := &api.ContainerUpdate{ + ContainerId: containerID2, + Linux: &api.LinuxContainerUpdate{Resources: &api.LinuxResources{ + Cpu: &api.LinuxCPU{ + Mems: "7-8,11", + }, + }}, + } + assert.Equal(t, containerUpdate2, res2) +} diff --git a/pkg/agent/orm/manager_test.go b/pkg/agent/orm/manager_test.go index a1e9a776d..9501d6a2b 100644 --- a/pkg/agent/orm/manager_test.go +++ b/pkg/agent/orm/manager_test.go @@ -24,10 +24,6 @@ import ( "testing" "time" - "github.com/containerd/nri/pkg/stub" - - "github.com/containerd/nri/pkg/api" - cadvisorapi "github.com/google/cadvisor/info/v1" "github.com/stretchr/testify/assert" "google.golang.org/grpc" @@ -47,6 +43,7 @@ import ( "github.com/kubewharf/katalyst-core/pkg/agent/orm/topology" "github.com/kubewharf/katalyst-core/pkg/config" "github.com/kubewharf/katalyst-core/pkg/config/generic" + "github.com/kubewharf/katalyst-core/pkg/consts" "github.com/kubewharf/katalyst-core/pkg/metaserver" "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/pod" "github.com/kubewharf/katalyst-core/pkg/metrics" @@ -103,7 +100,7 @@ func TestProcess(t *testing.T) { m := &ManagerImpl{ ctx: ctx, - mode: workModeBypass, + mode: consts.WorkModeBypass, endpoints: map[string]endpoint.EndpointInfo{}, socketdir: "/tmp/process", metaManager: metamanager, @@ -187,7 +184,7 @@ func TestReconcile(t *testing.T) { assert.NoError(t, err) m := &ManagerImpl{ - mode: workModeBypass, + mode: consts.WorkModeBypass, endpoints: map[string]endpoint.EndpointInfo{}, socketdir: "/tmp/reconcile", metaManager: metamanager, @@ -372,7 +369,7 @@ func TestRun(t *testing.T) { assert.NoError(t, err) m := &ManagerImpl{ - mode: workModeBypass, + mode: consts.WorkModeBypass, reconcilePeriod: 2 * time.Second, endpoints: map[string]endpoint.EndpointInfo{}, socketdir: "/tmp/run", @@ -675,257 +672,3 @@ func (m *MockEndpoint) GetTopologyAwareAllocatableResources(c context.Context, r } return nil, nil } - -type fakeNRIStub struct{} - -func (f *fakeNRIStub) Run(ctx context.Context) error { - return nil -} - -func (f *fakeNRIStub) Start(ctx context.Context) error { - return nil -} - -func (f *fakeNRIStub) Stop() { - return -} - -func (f *fakeNRIStub) Wait() { - return -} - -func (f *fakeNRIStub) UpdateContainers(_ []*api.ContainerUpdate) ([]*api.ContainerUpdate, error) { - return nil, nil -} - -func TestManagerImpl_Configure(t *testing.T) { - t.Parallel() - conf := "{\"events\":[\"RunPodSandbox\",\"CreateContainer\",\"UpdateContainer\",\"RemovePodSandbox\"]}" - m := &ManagerImpl{ - nriStub: &fakeNRIStub{}, - } - eventMask, err := m.Configure(context.TODO(), conf, "", "") - res := stub.EventMask(141) - assert.NoError(t, err) - assert.Equal(t, eventMask, res) -} - -func TestManagerImpl_Synchronize(t *testing.T) { - t.Parallel() - m := &ManagerImpl{ - nriStub: &fakeNRIStub{}, - } - update, err := m.Synchronize(context.TODO(), []*api.PodSandbox{}, []*api.Container{}) - assert.NoError(t, err) - assert.Nil(t, update) -} - -func TestManagerImpl_RunPodSandbox(t *testing.T) { - t.Parallel() - - pods := []*v1.Pod{ - makePod("testPod1", v1.ResourceList{ - "cpu": *resource.NewQuantity(2, resource.DecimalSI), - "memory": *resource.NewQuantity(2, resource.DecimalSI), - }), - makePod("testPod2", v1.ResourceList{ - "cpu": *resource.NewQuantity(2, resource.DecimalSI), - "memory": *resource.NewQuantity(2, resource.DecimalSI), - }), - } - - m := &ManagerImpl{ - nriStub: &fakeNRIStub{}, - endpoints: map[string]endpoint.EndpointInfo{}, - podResources: newPodResourcesChk(), - } - topologyManager, _ := topology.NewManager([]cadvisorapi.Node{ - { - Id: 0, - }, - }, "none", nil) - topologyManager.AddHintProvider(m) - m.topologyManager = topologyManager - - ckDir, err := ioutil.TempDir("", "checkpoint-Test") - assert.NoError(t, err) - defer func() { _ = os.RemoveAll(ckDir) }() - - conf := generateTestConfiguration(ckDir) - metaServer, err := generateTestMetaServer(conf, pods) - assert.NoError(t, err) - metaManager := metamanager.NewManager(metrics.DummyMetrics{}, m.podResources.pods, metaServer) - m.metaManager = metaManager - - checkpointManager, err := checkpointmanager.NewCheckpointManager("/tmp/process") - assert.NoError(t, err) - - m.checkpointManager = checkpointManager - podUID1 := "testPodUID1" - err = m.RunPodSandbox(context.TODO(), &api.PodSandbox{Uid: podUID1}) - assert.Error(t, err, fmt.Errorf("failed to find pod by uid testPod1")) -} - -func TestManagerImpl_CreateContainer(t *testing.T) { - t.Parallel() - m := &ManagerImpl{ - podResources: newPodResourcesChk(), - } - - // test CpuSetCpus - resourceCpuSetCpusAllocationInfo := generateCpuSetCpusAllocationInfo() - podUID1 := "testPodUID1" - podName1 := "testPodName1" - containerName1 := "testContainer1" - containerID1 := "03edb7b6b6becaba276d2c8f5557927661774a69c0cb2230a1fe1f297ca4d4f6" - m.podResources.insert(podUID1, containerName1, "cpu", resourceCpuSetCpusAllocationInfo) - pod1 := &api.PodSandbox{ - Name: podName1, - Uid: podUID1, - } - container1 := &api.Container{ - Id: containerID1, - Name: containerName1, - } - containerAdjust1, _, err := m.CreateContainer(context.TODO(), pod1, container1) - assert.NoError(t, err) - res1 := &api.ContainerAdjustment{ - Linux: &api.LinuxContainerAdjustment{Resources: &api.LinuxResources{Cpu: &api.LinuxCPU{ - Cpus: "5-6,10", - }}}, - } - assert.Equal(t, containerAdjust1, res1) - - // test CpuSetMems - resourceCpuSetMemsAllocationInfo := generateCpuSetMemsAllocationInfo() - podUID2 := "testPodUID2" - podName2 := "testPodName2" - containerName2 := "testContainer2" - containerID2 := "03edb7b6b6becaba276d2c8f5557927661774a69c0cb2230a1fe1f297ca4d4f5" - m.podResources.insert(podUID2, containerName2, "cpu", resourceCpuSetMemsAllocationInfo) - pod2 := &api.PodSandbox{ - Name: podName2, - Uid: podUID2, - } - container2 := &api.Container{ - Id: containerID2, - Name: containerName2, - } - containerAdjust2, _, err := m.CreateContainer(context.TODO(), pod2, container2) - assert.NoError(t, err) - res2 := &api.ContainerAdjustment{ - Linux: &api.LinuxContainerAdjustment{Resources: &api.LinuxResources{Cpu: &api.LinuxCPU{ - Mems: "7-8,11", - }}}, - } - assert.Equal(t, containerAdjust2, res2) - - // test pod not exist - podNotExist := &api.PodSandbox{ - Name: "PodNotExist", - Uid: "PodUIDNotExist", - } - containerNotExist := &api.Container{ - Id: "ContainerIDNotExist", - Name: "ContainerNotExist", - } - containerAdjust, _, err := m.CreateContainer(context.TODO(), podNotExist, containerNotExist) - assert.Nil(t, containerAdjust) - assert.NoError(t, err) -} - -func TestManagerImpl_UpdateContainer(t *testing.T) { - t.Parallel() - m := &ManagerImpl{ - nriStub: &fakeNRIStub{}, - } - update, err := m.UpdateContainer(context.TODO(), &api.PodSandbox{}, &api.Container{}, &api.LinuxResources{}) - assert.NoError(t, err) - assert.Nil(t, update) -} - -func TestManagerImpl_RemovePodSandbox(t *testing.T) { - t.Parallel() - m := &ManagerImpl{ - nriStub: &fakeNRIStub{}, - endpoints: map[string]endpoint.EndpointInfo{}, - podResources: newPodResourcesChk(), - } - topologyManager, _ := topology.NewManager([]cadvisorapi.Node{ - { - Id: 0, - }, - }, "none", nil) - topologyManager.AddHintProvider(m) - m.topologyManager = topologyManager - - checkpointManager, err := checkpointmanager.NewCheckpointManager("/tmp/process") - assert.NoError(t, err) - - m.checkpointManager = checkpointManager - podUID := "testPodUID" - err = m.RemovePodSandbox(context.TODO(), &api.PodSandbox{Uid: podUID}) - assert.NoError(t, err) -} - -func TestManagerImpl_onClose(t *testing.T) { - t.Parallel() - m := &ManagerImpl{ - nriStub: &fakeNRIStub{}, - } - m.onClose() -} - -func TestManagerImpl_updateContainerByNRI(t *testing.T) { - t.Parallel() - m := &ManagerImpl{ - podResources: newPodResourcesChk(), - nriStub: &fakeNRIStub{}, - } - - resourceCpuSetCpusAllocationInfo := generateCpuSetCpusAllocationInfo() - podUID1 := "testPodUID1" - containerName1 := "testContainer1" - containerID1 := "03edb7b6b6becaba276d2c8f5557927661774a69c0cb2230a1fe1f297ca4d4f6" - m.podResources.insert(podUID1, containerName1, "cpu", resourceCpuSetCpusAllocationInfo) - m.updateContainerByNRI(podUID1, containerID1, containerName1) -} - -func TestManagerImpl_getNRIContainerUpdate(t *testing.T) { - t.Parallel() - m := &ManagerImpl{ - podResources: newPodResourcesChk(), - } - - resourceCpuSetCpusAllocationInfo := generateCpuSetCpusAllocationInfo() - podUID1 := "testPodUID1" - containerName1 := "testContainer1" - containerID1 := "03edb7b6b6becaba276d2c8f5557927661774a69c0cb2230a1fe1f297ca4d4f6" - m.podResources.insert(podUID1, containerName1, "cpu", resourceCpuSetCpusAllocationInfo) - containerUpdate1 := m.getNRIContainerUpdate(podUID1, containerID1, containerName1) - res1 := &api.ContainerUpdate{ - ContainerId: containerID1, - Linux: &api.LinuxContainerUpdate{Resources: &api.LinuxResources{ - Cpu: &api.LinuxCPU{ - Cpus: "5-6,10", - }, - }}, - } - assert.Equal(t, containerUpdate1, res1) - - resourceCpuSetMemsAllocationInfo := generateCpuSetMemsAllocationInfo() - podUID2 := "testPodUID2" - containerName2 := "testContainer2" - containerID2 := "03edb7b6b6becaba276d2c8f5557927661774a69c0cb2230a1fe1f297ca4d4f5" - m.podResources.insert(podUID2, containerName2, "cpu", resourceCpuSetMemsAllocationInfo) - containerUpdate2 := m.getNRIContainerUpdate(podUID2, containerID2, containerName2) - res2 := &api.ContainerUpdate{ - ContainerId: containerID2, - Linux: &api.LinuxContainerUpdate{Resources: &api.LinuxResources{ - Cpu: &api.LinuxCPU{ - Mems: "7-8,11", - }, - }}, - } - assert.Equal(t, containerUpdate2, res2) -} diff --git a/pkg/agent/orm/types.go b/pkg/agent/orm/types.go index e69962247..cba34043d 100644 --- a/pkg/agent/orm/types.go +++ b/pkg/agent/orm/types.go @@ -48,10 +48,3 @@ const ( NoneDevicesProvider = "" ) - -type workMode string - -const ( - workModeNri workMode = "nri" - workModeBypass workMode = "bypass" -) diff --git a/pkg/config/agent/orm/orm_base.go b/pkg/config/agent/orm/orm_base.go index eb7b2d647..ddde9deb3 100644 --- a/pkg/config/agent/orm/orm_base.go +++ b/pkg/config/agent/orm/orm_base.go @@ -16,10 +16,14 @@ limitations under the License. package orm -import "time" +import ( + "time" + + "github.com/kubewharf/katalyst-core/pkg/consts" +) type GenericORMConfiguration struct { - ORMWorkMode string + ORMWorkMode consts.WorkMode ORMReconcilePeriod time.Duration ORMResourceNamesMap map[string]string ORMPodNotifyChanLen int @@ -36,7 +40,7 @@ type GenericORMConfiguration struct { func NewGenericORMConfiguration() *GenericORMConfiguration { return &GenericORMConfiguration{ - ORMWorkMode: "bypass", + ORMWorkMode: consts.WorkModeBypass, ORMReconcilePeriod: time.Second * 5, ORMResourceNamesMap: map[string]string{}, ORMPodNotifyChanLen: 10, diff --git a/pkg/consts/orm.go b/pkg/consts/orm.go new file mode 100644 index 000000000..dc30bcd0e --- /dev/null +++ b/pkg/consts/orm.go @@ -0,0 +1,24 @@ +/* +Copyright 2022 The Katalyst Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package consts + +type WorkMode string + +const ( + WorkModeNri WorkMode = "nri" + WorkModeBypass WorkMode = "bypass" +)