Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove all non-resourcelist gogo annotations from schedulerobjects #4189

Merged
merged 22 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ issues:
max-issues-per-linter: 0
max-same-issues: 0
exclude-rules:
- path: internal/scheduler/schedulerobjects/podutils_test.go
- path: internal/scheduler/internaltypes/podutils_test.go
linters:
- lll

Expand Down
2 changes: 1 addition & 1 deletion internal/common/eventutil/eventutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func ApiJobFromLogSubmitJob(ownerId string, groups []string, queueName string, j

PodSpec: podSpec,
PodSpecs: podSpecs,
SchedulingResourceRequirements: &schedulingResourceRequirements,
SchedulingResourceRequirements: schedulingResourceRequirements,

Created: protoutil.ToTimestamp(time),
Owner: ownerId,
Expand Down
11 changes: 7 additions & 4 deletions internal/common/ingest/testfixtures/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"

protoutil "github.com/armadaproject/armada/internal/common/proto"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/internal/scheduler/testfixtures"
"github.com/armadaproject/armada/internal/server/configuration"
Expand Down Expand Up @@ -167,7 +168,7 @@ var Leased = &armadaevents.EventSequence_Event{
ScheduledAtPriority: 15,
UpdateSequenceNumber: 1,
PodRequirementsOverlay: &schedulerobjects.PodRequirements{
Tolerations: []v1.Toleration{
Tolerations: []*v1.Toleration{
{
Key: "whale",
Value: "true",
Expand Down Expand Up @@ -290,11 +291,13 @@ var JobRequeued = &armadaevents.EventSequence_Event{
{
Requirements: &schedulerobjects.ObjectRequirements_PodRequirements{
PodRequirements: &schedulerobjects.PodRequirements{
NodeSelector: NodeSelector,
Tolerations: Tolerations,
NodeSelector: NodeSelector,
Tolerations: armadaslices.Map(Tolerations, func(t v1.Toleration) *v1.Toleration {
return &t
}),
PreemptionPolicy: "PreemptLowerPriority",
Affinity: Affinity,
ResourceRequirements: v1.ResourceRequirements{
ResourceRequirements: &v1.ResourceRequirements{
Limits: map[v1.ResourceName]resource.Quantity{
"memory": resource.MustParse("64Mi"),
"cpu": resource.MustParse("150m"),
Expand Down
9 changes: 6 additions & 3 deletions internal/scheduler/adapters/adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
v1 "k8s.io/api/core/v1"
k8sResource "k8s.io/apimachinery/pkg/api/resource"

armadaslices "github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/pkg/api"
)
Expand All @@ -17,9 +18,11 @@ func PodRequirementsFromPodSpec(podSpec *v1.PodSpec) *schedulerobjects.PodRequir
preemptionPolicy = string(*podSpec.PreemptionPolicy)
}
return &schedulerobjects.PodRequirements{
NodeSelector: podSpec.NodeSelector,
Affinity: podSpec.Affinity,
Tolerations: podSpec.Tolerations,
NodeSelector: podSpec.NodeSelector,
Affinity: podSpec.Affinity,
Tolerations: armadaslices.Map(podSpec.Tolerations, func(t v1.Toleration) *v1.Toleration {
return &t
}),
PreemptionPolicy: preemptionPolicy,
ResourceRequirements: api.SchedulingResourceRequirementsFromPodSpec(podSpec),
}
Expand Down
10 changes: 7 additions & 3 deletions internal/scheduler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"strconv"

protoutil "github.com/armadaproject/armada/internal/common/proto"

"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
"github.com/pkg/errors"
Expand Down Expand Up @@ -296,15 +298,17 @@ func addNodeSelector(podSpec *armadaevents.PodSpecWithAvoidList, key string, val
}
}

func addTolerations(job *armadaevents.SubmitJob, tolerations []v1.Toleration) {
func addTolerations(job *armadaevents.SubmitJob, tolerations []*v1.Toleration) {
if job == nil || len(tolerations) == 0 {
return
}
if job.MainObject != nil {
switch typed := job.MainObject.Object.(type) {
case *armadaevents.KubernetesMainObject_PodSpec:
if typed.PodSpec != nil && typed.PodSpec.PodSpec != nil {
typed.PodSpec.PodSpec.Tolerations = append(typed.PodSpec.PodSpec.Tolerations, tolerations...)
for _, toleration := range tolerations {
typed.PodSpec.PodSpec.Tolerations = append(typed.PodSpec.PodSpec.Tolerations, *toleration)
}
}
}
}
Expand Down Expand Up @@ -381,7 +385,7 @@ func (srv *ExecutorApi) executorFromLeaseRequest(ctx *armadacontext.Context, req
Id: req.ExecutorId,
Pool: req.Pool,
Nodes: nodes,
LastUpdateTime: now,
LastUpdateTime: protoutil.ToTimestamp(now),
UnassignedJobRuns: req.UnassignedJobRunIds,
}
}
Expand Down
21 changes: 12 additions & 9 deletions internal/scheduler/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ import (
"github.com/armadaproject/armada/internal/common/armadaerrors"
"github.com/armadaproject/armada/internal/common/auth/permission"
"github.com/armadaproject/armada/internal/common/compress"
mocks "github.com/armadaproject/armada/internal/common/mocks"
"github.com/armadaproject/armada/internal/common/mocks"
protoutil "github.com/armadaproject/armada/internal/common/proto"
"github.com/armadaproject/armada/internal/common/slices"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/common/types"
"github.com/armadaproject/armada/internal/common/util"
schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration"
Expand All @@ -29,7 +30,7 @@ import (
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/internal/scheduler/testfixtures"
"github.com/armadaproject/armada/internal/server/configuration"
mocks2 "github.com/armadaproject/armada/internal/server/mocks"
servermocks "github.com/armadaproject/armada/internal/server/mocks"
"github.com/armadaproject/armada/internal/server/permissions"
"github.com/armadaproject/armada/pkg/api"
"github.com/armadaproject/armada/pkg/armadaevents"
Expand Down Expand Up @@ -92,11 +93,11 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
Resources: nil,
},
},
LastSeen: testClock.Now().UTC(),
LastSeen: protoutil.ToTimestamp(testClock.Now().UTC()),
ReportingNodeType: "node-type-1",
},
},
LastUpdateTime: testClock.Now().UTC(),
LastUpdateTime: protoutil.ToTimestamp(testClock.Now().UTC()),
UnassignedJobRuns: []string{runId3},
}

Expand Down Expand Up @@ -181,7 +182,9 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
SubmitMessage: compressedSubmit,
PodRequirementsOverlay: protoutil.MustMarshall(
&schedulerobjects.PodRequirements{
Tolerations: tolerations,
Tolerations: armadaslices.Map(tolerations, func(t v1.Toleration) *v1.Toleration {
return &t
}),
Annotations: map[string]string{configuration.PoolAnnotation: "test-pool", "runtime_gang_cardinality": "3"},
},
),
Expand Down Expand Up @@ -310,7 +313,7 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
mockJobRepository := schedulermocks.NewMockJobRepository(ctrl)
mockExecutorRepository := schedulermocks.NewMockExecutorRepository(ctrl)
mockStream := schedulermocks.NewMockExecutorApi_LeaseJobRunsServer(ctrl)
mockAuthorizer := mocks2.NewMockActionAuthorizer(ctrl)
mockAuthorizer := servermocks.NewMockActionAuthorizer(ctrl)

runIds, err := runIdsFromLeaseRequest(tc.request)
require.NoError(t, err)
Expand Down Expand Up @@ -377,7 +380,7 @@ func TestExecutorApi_LeaseJobRuns_Unauthorised(t *testing.T) {
mockJobRepository := schedulermocks.NewMockJobRepository(ctrl)
mockExecutorRepository := schedulermocks.NewMockExecutorRepository(ctrl)
mockStream := schedulermocks.NewMockExecutorApi_LeaseJobRunsServer(ctrl)
mockAuthorizer := mocks2.NewMockActionAuthorizer(ctrl)
mockAuthorizer := servermocks.NewMockActionAuthorizer(ctrl)

// set up mocks
mockStream.EXPECT().Context().Return(ctx).AnyTimes()
Expand Down Expand Up @@ -504,7 +507,7 @@ func TestExecutorApi_Publish(t *testing.T) {
mockPulsarPublisher := mocks.NewMockPublisher[*armadaevents.EventSequence](ctrl)
mockJobRepository := schedulermocks.NewMockJobRepository(ctrl)
mockExecutorRepository := schedulermocks.NewMockExecutorRepository(ctrl)
mockAuthorizer := mocks2.NewMockActionAuthorizer(ctrl)
mockAuthorizer := servermocks.NewMockActionAuthorizer(ctrl)

// capture all sent messages
var capturedEvents []*armadaevents.EventSequence
Expand Down Expand Up @@ -549,7 +552,7 @@ func TestExecutorApi_Publish_Unauthorised(t *testing.T) {
mockPulsarPublisher := mocks.NewMockPublisher[*armadaevents.EventSequence](ctrl)
mockJobRepository := schedulermocks.NewMockJobRepository(ctrl)
mockExecutorRepository := schedulermocks.NewMockExecutorRepository(ctrl)
mockAuthorizer := mocks2.NewMockActionAuthorizer(ctrl)
mockAuthorizer := servermocks.NewMockActionAuthorizer(ctrl)

sequences := []*armadaevents.EventSequence{
{
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/database/executor_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (r *PostgresExecutorRepository) StoreExecutor(ctx *armadacontext.Context, e
err = queries.UpsertExecutor(ctx, UpsertExecutorParams{
ExecutorID: executor.Id,
LastRequest: compressed,
UpdateTime: executor.LastUpdateTime,
UpdateTime: protoutil.ToStdTime(executor.LastUpdateTime),
})
if err != nil {
return errors.WithStack(err)
Expand Down
16 changes: 10 additions & 6 deletions internal/scheduler/database/executor_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import (
"golang.org/x/exp/slices"

"github.com/armadaproject/armada/internal/common/armadacontext"
protoutil "github.com/armadaproject/armada/internal/common/proto"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)

func TestExecutorRepository_LoadAndSave(t *testing.T) {
t1 := time.Now().UTC().Round(1 * time.Microsecond) // postgres only stores times with micro precision
t1Proto := protoutil.ToTimestamp(t1)
tests := map[string]struct {
executors []*schedulerobjects.Executor
}{
Expand All @@ -26,10 +28,10 @@ func TestExecutorRepository_LoadAndSave(t *testing.T) {
Nodes: []*schedulerobjects.Node{
{
Id: "test-node-1",
LastSeen: t1,
LastSeen: t1Proto,
},
},
LastUpdateTime: t1,
LastUpdateTime: t1Proto,
UnassignedJobRuns: []string{"run1", "run2"},
},
{
Expand All @@ -38,10 +40,10 @@ func TestExecutorRepository_LoadAndSave(t *testing.T) {
Nodes: []*schedulerobjects.Node{
{
Id: "test-node-2",
LastSeen: t1,
LastSeen: t1Proto,
},
},
LastUpdateTime: t1,
LastUpdateTime: t1Proto,
UnassignedJobRuns: []string{"run3", "run4"},
},
},
Expand Down Expand Up @@ -86,7 +88,9 @@ func TestExecutorRepository_LoadAndSave(t *testing.T) {

func TestExecutorRepository_GetLastUpdateTimes(t *testing.T) {
t1 := time.Now().UTC().Round(1 * time.Microsecond) // postgres only stores times with micro precision
t1Proto := protoutil.ToTimestamp(t1)
t2 := t1.Add(-1 * time.Second)
t2Proto := protoutil.ToTimestamp(t2)
tests := map[string]struct {
executors []*schedulerobjects.Executor
expectedUpdateTimes map[string]time.Time
Expand All @@ -95,11 +99,11 @@ func TestExecutorRepository_GetLastUpdateTimes(t *testing.T) {
executors: []*schedulerobjects.Executor{
{
Id: "test-executor-1",
LastUpdateTime: t1,
LastUpdateTime: t1Proto,
},
{
Id: "test-executor-2",
LastUpdateTime: t2,
LastUpdateTime: t2Proto,
},
},
expectedUpdateTimes: map[string]time.Time{"test-executor-1": t1, "test-executor-2": t2},
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/database/job_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ func TestFetchJobRunLeases(t *testing.T) {
Pool: "test-pool",
PodRequirementsOverlay: protoutil.MustMarshall(
&schedulerobjects.PodRequirements{
Tolerations: []v1.Toleration{
Tolerations: []*v1.Toleration{
{
Key: "whale",
Value: "true",
Expand Down
34 changes: 22 additions & 12 deletions internal/scheduler/internaltypes/job_scheduling_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"golang.org/x/exp/maps"
v1 "k8s.io/api/core/v1"

protoutil "github.com/armadaproject/armada/internal/common/proto"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)
Expand Down Expand Up @@ -73,17 +74,24 @@ func FromSchedulerObjectsJobSchedulingInfo(j *schedulerobjects.JobSchedulingInfo
if podRequirements == nil {
return nil, errors.Errorf("job must have pod requirements")
}
rr := podRequirements.GetResourceRequirements().DeepCopy()
if rr == nil {
rr = &v1.ResourceRequirements{}
}
return &JobSchedulingInfo{
Lifetime: j.Lifetime,
PriorityClassName: j.PriorityClassName,
SubmitTime: j.SubmitTime,
SubmitTime: protoutil.ToStdTime(j.SubmitTime),
Priority: j.Priority,
PodRequirements: &PodRequirements{
NodeSelector: podRequirements.NodeSelector,
Affinity: podRequirements.Affinity,
Tolerations: podRequirements.Tolerations,
Annotations: podRequirements.Annotations,
ResourceRequirements: podRequirements.ResourceRequirements,
NodeSelector: maps.Clone(podRequirements.NodeSelector),
Affinity: proto.Clone(podRequirements.Affinity).(*v1.Affinity),
Tolerations: armadaslices.Map(podRequirements.Tolerations, func(t *v1.Toleration) v1.Toleration {
cloned := proto.Clone(t).(*v1.Toleration)
return *cloned
}),
Annotations: maps.Clone(podRequirements.Annotations),
ResourceRequirements: *rr,
},
Version: j.Version,
}, nil
Expand All @@ -94,17 +102,19 @@ func ToSchedulerObjectsJobSchedulingInfo(j *JobSchedulingInfo) *schedulerobjects
return &schedulerobjects.JobSchedulingInfo{
Lifetime: j.Lifetime,
PriorityClassName: j.PriorityClassName,
SubmitTime: j.SubmitTime,
SubmitTime: protoutil.ToTimestamp(j.SubmitTime),
Priority: j.Priority,
ObjectRequirements: []*schedulerobjects.ObjectRequirements{
{
Requirements: &schedulerobjects.ObjectRequirements_PodRequirements{
PodRequirements: &schedulerobjects.PodRequirements{
NodeSelector: podRequirements.NodeSelector,
Affinity: podRequirements.Affinity,
Tolerations: podRequirements.Tolerations,
Annotations: podRequirements.Annotations,
ResourceRequirements: podRequirements.ResourceRequirements,
NodeSelector: maps.Clone(podRequirements.NodeSelector),
Affinity: podRequirements.Affinity.DeepCopy(),
Tolerations: armadaslices.Map(podRequirements.Tolerations, func(t v1.Toleration) *v1.Toleration {
return proto.Clone(&t).(*v1.Toleration)
}),
Annotations: maps.Clone(podRequirements.Annotations),
ResourceRequirements: podRequirements.ResourceRequirements.DeepCopy(),
},
},
},
Expand Down
9 changes: 8 additions & 1 deletion internal/scheduler/internaltypes/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,21 @@ func FromSchedulerObjectsNode(node *schedulerobjects.Node,
}
allocatableByPriority[EvictedPriority] = allocatableResources

taints := make([]v1.Taint, 0, len(node.Taints))
for _, t := range node.Taints {
if t != nil {
taints = append(taints, *t)
}
}

return CreateNodeAndType(
node.Id,
nodeIndex,
node.Executor,
node.Name,
node.Pool,
node.Unschedulable,
node.Taints,
taints,
node.Labels,
indexedTaints,
indexedNodeLabels,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package schedulerobjects
package internaltypes

import (
"crypto/rand"
Expand All @@ -16,18 +16,6 @@ type SchedulingKey [highwayhash.Size]byte

var EmptySchedulingKey SchedulingKey

func (req *PodRequirements) GetAffinityNodeSelector() *v1.NodeSelector {
affinity := req.Affinity
if affinity == nil {
return nil
}
nodeAffinity := affinity.NodeAffinity
if nodeAffinity == nil {
return nil
}
return nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution
}

// SchedulingKeyGenerator is used to generate scheduling keys efficiently.
// A scheduling key is the canonical hash of the scheduling requirements of a job.
// All memory is allocated up-front and re-used. Thread-safe.
Expand Down
Loading