From 919f92c370b64e590400b72b684226781317ef77 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 1 Oct 2024 11:41:40 -0400 Subject: [PATCH] scaling policy: use request namespace as target if unset in jobspec (#24065) When jobs are submitted with a scaling policy, the scaling policy's target only includes the job's namespace if the `namespace` field is set in the jobspec and not from the request. Normally jobs are canonicalized in the RPC handler before being written to Raft. But the scaling policy targets are instead written during the conversion from `api.Job` to `structs.Job`. We populate the `structs.Job` namespace from the request here as well, but only after the conversion has occurred. Swap the order of these operations so that the conversion is always happening with a correct namespace. Long-term we should not be making mutations during conversion either. But we can't remove it immediately because API requests may come from any agent across upgrades. Move the scaling target creation into the `Canonicalize` method and mark it for future removal in the API conversion code path. Fixes: https://github.com/hashicorp/nomad/issues/24039 --- .changelog/24065.txt | 3 ++ command/agent/job_endpoint.go | 18 +++++++----- command/agent/scaling_endpoint.go | 6 +++- nomad/fsm.go | 2 +- nomad/job_endpoint_test.go | 3 +- nomad/mock/job.go | 2 +- nomad/scaling_endpoint_test.go | 6 ++-- nomad/structs/structs.go | 40 +++++++++++++------------- nomad/structs/structs_test.go | 47 +++++++++++++++++++++++++++---- 9 files changed, 88 insertions(+), 39 deletions(-) create mode 100644 .changelog/24065.txt diff --git a/.changelog/24065.txt b/.changelog/24065.txt new file mode 100644 index 000000000000..82a374f841c9 --- /dev/null +++ b/.changelog/24065.txt @@ -0,0 +1,3 @@ +```release-note:bug +scaling: Fixed a bug where scaling policies would not get created during job submission unless namespace field was set in jobspec +``` diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 2f7b9f2c2e7f..6c74b6a48d5b 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -16,6 +16,7 @@ import ( "github.com/hashicorp/nomad/acl" api "github.com/hashicorp/nomad/api" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/jobspec" "github.com/hashicorp/nomad/jobspec2" "github.com/hashicorp/nomad/nomad/structs" @@ -938,15 +939,17 @@ func (s *HTTPServer) apiJobAndRequestToStructs(job *api.Job, req *http.Request, job, queryRegion, writeReq.Region, s.agent.GetConfig().Region, ) - sJob := ApiJobToStructJob(job) - sJob.Region = jobRegion - writeReq.Region = requestRegion - + // mutate the namespace before we convert just in case anything is expecting + // the namespace to be correct queryNamespace := req.URL.Query().Get("namespace") namespace := namespaceForJob(job.Namespace, queryNamespace, writeReq.Namespace) - sJob.Namespace = namespace + job.Namespace = pointer.Of(namespace) writeReq.Namespace = namespace + sJob := ApiJobToStructJob(job) + sJob.Region = jobRegion + writeReq.Region = requestRegion + return sJob, writeReq } @@ -1193,7 +1196,8 @@ func ApiTgToStructsTG(job *structs.Job, taskGroup *api.TaskGroup, tg *structs.Ta } if taskGroup.Scaling != nil { - tg.Scaling = ApiScalingPolicyToStructs(tg.Count, taskGroup.Scaling).TargetTaskGroup(job, tg) + tg.Scaling = ApiScalingPolicyToStructs( + job, tg, nil, tg.Count, taskGroup.Scaling) } tg.EphemeralDisk = &structs.EphemeralDisk{ @@ -1330,7 +1334,7 @@ func ApiTaskToStructsTask(job *structs.Job, group *structs.TaskGroup, for _, policy := range apiTask.ScalingPolicies { structsTask.ScalingPolicies = append( structsTask.ScalingPolicies, - ApiScalingPolicyToStructs(0, policy).TargetTask(job, group, structsTask)) + ApiScalingPolicyToStructs(job, group, structsTask, 0, policy)) } } diff --git a/command/agent/scaling_endpoint.go b/command/agent/scaling_endpoint.go index 18a19a1c9612..473eb1681fc5 100644 --- a/command/agent/scaling_endpoint.go +++ b/command/agent/scaling_endpoint.go @@ -81,7 +81,7 @@ func (s *HTTPServer) scalingPolicyQuery(resp http.ResponseWriter, req *http.Requ return out.Policy, nil } -func ApiScalingPolicyToStructs(count int, ap *api.ScalingPolicy) *structs.ScalingPolicy { +func ApiScalingPolicyToStructs(job *structs.Job, tg *structs.TaskGroup, task *structs.Task, count int, ap *api.ScalingPolicy) *structs.ScalingPolicy { p := structs.ScalingPolicy{ Type: ap.Type, Policy: ap.Policy, @@ -103,5 +103,9 @@ func ApiScalingPolicyToStructs(count int, ap *api.ScalingPolicy) *structs.Scalin } else { p.Min = int64(count) } + + // COMPAT(1.12.0) - canonicalization is done in Job.Register as of 1.9, + // remove this canonicalization in 1.12.0 LTS + p.Canonicalize(job, tg, task) return &p } diff --git a/nomad/fsm.go b/nomad/fsm.go index 3e7d99d7c86f..2df72d8bc7eb 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -1748,7 +1748,7 @@ func (n *nomadFSM) restoreImpl(old io.ReadCloser, filter *FSMFilter) error { if filter.Include(scalingPolicy) { // Handle upgrade path: // - Set policy type if empty - scalingPolicy.Canonicalize() + scalingPolicy.Canonicalize(nil, nil, nil) if err := restore.ScalingPolicyRestore(scalingPolicy); err != nil { return err } diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 0595a3327aae..d29a6d2c8c15 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -6627,7 +6627,8 @@ func TestJobEndpoint_Plan_Scaling(t *testing.T) { tg := job.TaskGroups[0] tg.Tasks[0].Resources.MemoryMB = 999999999 scaling := &structs.ScalingPolicy{Min: 1, Max: 100, Type: structs.ScalingPolicyTypeHorizontal} - tg.Scaling = scaling.TargetTaskGroup(job, tg) + scaling.Canonicalize(job, tg, nil) + tg.Scaling = scaling planReq := &structs.JobPlanRequest{ Job: job, Diff: false, diff --git a/nomad/mock/job.go b/nomad/mock/job.go index b3592dbf9b9d..d3a709042861 100644 --- a/nomad/mock/job.go +++ b/nomad/mock/job.go @@ -189,7 +189,7 @@ func JobWithScalingPolicy() (*structs.Job, *structs.ScalingPolicy) { Policy: map[string]interface{}{}, Enabled: true, } - policy.TargetTaskGroup(job, job.TaskGroups[0]) + policy.Canonicalize(job, job.TaskGroups[0], nil) job.TaskGroups[0].Scaling = policy return job, policy } diff --git a/nomad/scaling_endpoint_test.go b/nomad/scaling_endpoint_test.go index 67a7e59061e6..0d39cea2c12b 100644 --- a/nomad/scaling_endpoint_test.go +++ b/nomad/scaling_endpoint_test.go @@ -163,15 +163,15 @@ func TestScalingEndpoint_ListPolicies(t *testing.T) { j1 := mock.Job() j1polV := mock.ScalingPolicy() j1polV.Type = "vertical-cpu" - j1polV.TargetTask(j1, j1.TaskGroups[0], j1.TaskGroups[0].Tasks[0]) + j1polV.Canonicalize(j1, j1.TaskGroups[0], j1.TaskGroups[0].Tasks[0]) j1polH := mock.ScalingPolicy() j1polH.Type = "horizontal" - j1polH.TargetTaskGroup(j1, j1.TaskGroups[0]) + j1polH.Canonicalize(j1, j1.TaskGroups[0], nil) j2 := mock.Job() j2polH := mock.ScalingPolicy() j2polH.Type = "horizontal" - j2polH.TargetTaskGroup(j2, j2.TaskGroups[0]) + j2polH.Canonicalize(j2, j2.TaskGroups[0], nil) s1.fsm.State().UpsertJob(structs.MsgTypeTestSetup, 1000, nil, j1) s1.fsm.State().UpsertJob(structs.MsgTypeTestSetup, 1000, nil, j2) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index d58405330d18..3a68aea1fdde 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -6251,10 +6251,25 @@ const ( ScalingPolicyTypeHorizontal = "horizontal" ) -func (p *ScalingPolicy) Canonicalize() { +func (p *ScalingPolicy) Canonicalize(job *Job, tg *TaskGroup, task *Task) { if p.Type == "" { p.Type = ScalingPolicyTypeHorizontal } + + // during restore we canonicalize to update, but these values will already + // have been populated during submit and we don't have references to the + // job, group, and task + if job != nil && tg != nil { + p.Target = map[string]string{ + ScalingTargetNamespace: job.Namespace, + ScalingTargetJob: job.ID, + ScalingTargetGroup: tg.Name, + } + + if task != nil { + p.Target[ScalingTargetTask] = task.Name + } + } } func (p *ScalingPolicy) Copy() *ScalingPolicy { @@ -6343,23 +6358,6 @@ func (p *ScalingPolicy) Diff(p2 *ScalingPolicy) bool { return !reflect.DeepEqual(*p, copy) } -// TargetTaskGroup updates a ScalingPolicy target to specify a given task group -func (p *ScalingPolicy) TargetTaskGroup(job *Job, tg *TaskGroup) *ScalingPolicy { - p.Target = map[string]string{ - ScalingTargetNamespace: job.Namespace, - ScalingTargetJob: job.ID, - ScalingTargetGroup: tg.Name, - } - return p -} - -// TargetTask updates a ScalingPolicy target to specify a given task -func (p *ScalingPolicy) TargetTask(job *Job, tg *TaskGroup, task *Task) *ScalingPolicy { - p.TargetTaskGroup(job, tg) - p.Target[ScalingTargetTask] = task.Name - return p -} - func (p *ScalingPolicy) Stub() *ScalingPolicyListStub { stub := &ScalingPolicyListStub{ ID: p.ID, @@ -6954,7 +6952,7 @@ func (tg *TaskGroup) Canonicalize(job *Job) { } if tg.Scaling != nil { - tg.Scaling.Canonicalize() + tg.Scaling.Canonicalize(job, tg, nil) } for _, service := range tg.Services { @@ -8052,6 +8050,10 @@ func (t *Task) Canonicalize(job *Job, tg *TaskGroup) { t.KillTimeout = DefaultKillTimeout } + for _, policy := range t.ScalingPolicies { + policy.Canonicalize(job, tg, t) + } + if t.Vault != nil { t.Vault.Canonicalize() } diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 642bdc33100d..21cf4f26c5e9 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -6426,10 +6426,17 @@ func TestDispatchPayloadConfig_Validate(t *testing.T) { func TestScalingPolicy_Canonicalize(t *testing.T) { ci.Parallel(t) + job := &Job{Namespace: "prod", ID: "example"} + tg := &TaskGroup{Name: "web"} + task := &Task{Name: "httpd"} + cases := []struct { name string input *ScalingPolicy expected *ScalingPolicy + job *Job + tg *TaskGroup + task *Task }{ { name: "empty policy", @@ -6441,14 +6448,42 @@ func TestScalingPolicy_Canonicalize(t *testing.T) { input: &ScalingPolicy{Type: "other-type"}, expected: &ScalingPolicy{Type: "other-type"}, }, + { + name: "policy with type and task group", + input: &ScalingPolicy{Type: "other-type"}, + expected: &ScalingPolicy{ + Type: "other-type", + Target: map[string]string{ + ScalingTargetNamespace: "prod", + ScalingTargetJob: "example", + ScalingTargetGroup: "web", + }, + }, + job: job, + tg: tg, + }, + { + name: "policy with type and task", + input: &ScalingPolicy{Type: "other-type"}, + expected: &ScalingPolicy{ + Type: "other-type", + Target: map[string]string{ + ScalingTargetNamespace: "prod", + ScalingTargetJob: "example", + ScalingTargetGroup: "web", + ScalingTargetTask: "httpd", + }, + }, + job: job, + tg: tg, + task: task, + }, } - for _, c := range cases { - t.Run(c.name, func(t *testing.T) { - require := require.New(t) - - c.input.Canonicalize() - require.Equal(c.expected, c.input) + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + tc.input.Canonicalize(tc.job, tc.tg, tc.task) + must.Eq(t, tc.expected, tc.input) }) } }