From fb084abec3789123b82869503279c1289d5e127f Mon Sep 17 00:00:00 2001 From: hc-github-team-nomad-core <82989552+hc-github-team-nomad-core@users.noreply.github.com> Date: Tue, 1 Oct 2024 12:03:07 -0400 Subject: [PATCH] Backport of scaling policy: use request namespace as target if unset in jobspec (#24065) (#24096) When jobs are submitted with a scaling policy, the scaling policy's target only includes the job's namespace if the `namespace` field is set in the jobspec and not from the request. Normally jobs are canonicalized in the RPC handler before being written to Raft. But the scaling policy targets are instead written during the conversion from `api.Job` to `structs.Job`. We populate the `structs.Job` namespace from the request here as well, but only after the conversion has occurred. Swap the order of these operations so that the conversion is always happening with a correct namespace. Long-term we should not be making mutations during conversion either. But we can't remove it immediately because API requests may come from any agent across upgrades. Move the scaling target creation into the `Canonicalize` method and mark it for future removal in the API conversion code path. Fixes: https://github.com/hashicorp/nomad/issues/24039 Co-authored-by: Tim Gross --- .changelog/24065.txt | 3 ++ command/agent/job_endpoint.go | 18 +++++++----- command/agent/scaling_endpoint.go | 6 +++- nomad/fsm.go | 2 +- nomad/job_endpoint_test.go | 3 +- nomad/mock/job.go | 2 +- nomad/scaling_endpoint_test.go | 6 ++-- nomad/structs/structs.go | 40 +++++++++++++------------- nomad/structs/structs_test.go | 47 +++++++++++++++++++++++++++---- 9 files changed, 88 insertions(+), 39 deletions(-) create mode 100644 .changelog/24065.txt diff --git a/.changelog/24065.txt b/.changelog/24065.txt new file mode 100644 index 00000000000..82a374f841c --- /dev/null +++ b/.changelog/24065.txt @@ -0,0 +1,3 @@ +```release-note:bug +scaling: Fixed a bug where scaling policies would not get created during job submission unless namespace field was set in jobspec +``` diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 2f7b9f2c2e7..6c74b6a48d5 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -16,6 +16,7 @@ import ( "github.com/hashicorp/nomad/acl" api "github.com/hashicorp/nomad/api" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/jobspec" "github.com/hashicorp/nomad/jobspec2" "github.com/hashicorp/nomad/nomad/structs" @@ -938,15 +939,17 @@ func (s *HTTPServer) apiJobAndRequestToStructs(job *api.Job, req *http.Request, job, queryRegion, writeReq.Region, s.agent.GetConfig().Region, ) - sJob := ApiJobToStructJob(job) - sJob.Region = jobRegion - writeReq.Region = requestRegion - + // mutate the namespace before we convert just in case anything is expecting + // the namespace to be correct queryNamespace := req.URL.Query().Get("namespace") namespace := namespaceForJob(job.Namespace, queryNamespace, writeReq.Namespace) - sJob.Namespace = namespace + job.Namespace = pointer.Of(namespace) writeReq.Namespace = namespace + sJob := ApiJobToStructJob(job) + sJob.Region = jobRegion + writeReq.Region = requestRegion + return sJob, writeReq } @@ -1193,7 +1196,8 @@ func ApiTgToStructsTG(job *structs.Job, taskGroup *api.TaskGroup, tg *structs.Ta } if taskGroup.Scaling != nil { - tg.Scaling = ApiScalingPolicyToStructs(tg.Count, taskGroup.Scaling).TargetTaskGroup(job, tg) + tg.Scaling = ApiScalingPolicyToStructs( + job, tg, nil, tg.Count, taskGroup.Scaling) } tg.EphemeralDisk = &structs.EphemeralDisk{ @@ -1330,7 +1334,7 @@ func ApiTaskToStructsTask(job *structs.Job, group *structs.TaskGroup, for _, policy := range apiTask.ScalingPolicies { structsTask.ScalingPolicies = append( structsTask.ScalingPolicies, - ApiScalingPolicyToStructs(0, policy).TargetTask(job, group, structsTask)) + ApiScalingPolicyToStructs(job, group, structsTask, 0, policy)) } } diff --git a/command/agent/scaling_endpoint.go b/command/agent/scaling_endpoint.go index 18a19a1c961..473eb1681fc 100644 --- a/command/agent/scaling_endpoint.go +++ b/command/agent/scaling_endpoint.go @@ -81,7 +81,7 @@ func (s *HTTPServer) scalingPolicyQuery(resp http.ResponseWriter, req *http.Requ return out.Policy, nil } -func ApiScalingPolicyToStructs(count int, ap *api.ScalingPolicy) *structs.ScalingPolicy { +func ApiScalingPolicyToStructs(job *structs.Job, tg *structs.TaskGroup, task *structs.Task, count int, ap *api.ScalingPolicy) *structs.ScalingPolicy { p := structs.ScalingPolicy{ Type: ap.Type, Policy: ap.Policy, @@ -103,5 +103,9 @@ func ApiScalingPolicyToStructs(count int, ap *api.ScalingPolicy) *structs.Scalin } else { p.Min = int64(count) } + + // COMPAT(1.12.0) - canonicalization is done in Job.Register as of 1.9, + // remove this canonicalization in 1.12.0 LTS + p.Canonicalize(job, tg, task) return &p } diff --git a/nomad/fsm.go b/nomad/fsm.go index 3e7d99d7c86..2df72d8bc7e 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -1748,7 +1748,7 @@ func (n *nomadFSM) restoreImpl(old io.ReadCloser, filter *FSMFilter) error { if filter.Include(scalingPolicy) { // Handle upgrade path: // - Set policy type if empty - scalingPolicy.Canonicalize() + scalingPolicy.Canonicalize(nil, nil, nil) if err := restore.ScalingPolicyRestore(scalingPolicy); err != nil { return err } diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 0595a3327aa..d29a6d2c8c1 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -6627,7 +6627,8 @@ func TestJobEndpoint_Plan_Scaling(t *testing.T) { tg := job.TaskGroups[0] tg.Tasks[0].Resources.MemoryMB = 999999999 scaling := &structs.ScalingPolicy{Min: 1, Max: 100, Type: structs.ScalingPolicyTypeHorizontal} - tg.Scaling = scaling.TargetTaskGroup(job, tg) + scaling.Canonicalize(job, tg, nil) + tg.Scaling = scaling planReq := &structs.JobPlanRequest{ Job: job, Diff: false, diff --git a/nomad/mock/job.go b/nomad/mock/job.go index b3592dbf9b9..d3a70904286 100644 --- a/nomad/mock/job.go +++ b/nomad/mock/job.go @@ -189,7 +189,7 @@ func JobWithScalingPolicy() (*structs.Job, *structs.ScalingPolicy) { Policy: map[string]interface{}{}, Enabled: true, } - policy.TargetTaskGroup(job, job.TaskGroups[0]) + policy.Canonicalize(job, job.TaskGroups[0], nil) job.TaskGroups[0].Scaling = policy return job, policy } diff --git a/nomad/scaling_endpoint_test.go b/nomad/scaling_endpoint_test.go index 67a7e59061e..0d39cea2c12 100644 --- a/nomad/scaling_endpoint_test.go +++ b/nomad/scaling_endpoint_test.go @@ -163,15 +163,15 @@ func TestScalingEndpoint_ListPolicies(t *testing.T) { j1 := mock.Job() j1polV := mock.ScalingPolicy() j1polV.Type = "vertical-cpu" - j1polV.TargetTask(j1, j1.TaskGroups[0], j1.TaskGroups[0].Tasks[0]) + j1polV.Canonicalize(j1, j1.TaskGroups[0], j1.TaskGroups[0].Tasks[0]) j1polH := mock.ScalingPolicy() j1polH.Type = "horizontal" - j1polH.TargetTaskGroup(j1, j1.TaskGroups[0]) + j1polH.Canonicalize(j1, j1.TaskGroups[0], nil) j2 := mock.Job() j2polH := mock.ScalingPolicy() j2polH.Type = "horizontal" - j2polH.TargetTaskGroup(j2, j2.TaskGroups[0]) + j2polH.Canonicalize(j2, j2.TaskGroups[0], nil) s1.fsm.State().UpsertJob(structs.MsgTypeTestSetup, 1000, nil, j1) s1.fsm.State().UpsertJob(structs.MsgTypeTestSetup, 1000, nil, j2) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index d58405330d1..3a68aea1fdd 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -6251,10 +6251,25 @@ const ( ScalingPolicyTypeHorizontal = "horizontal" ) -func (p *ScalingPolicy) Canonicalize() { +func (p *ScalingPolicy) Canonicalize(job *Job, tg *TaskGroup, task *Task) { if p.Type == "" { p.Type = ScalingPolicyTypeHorizontal } + + // during restore we canonicalize to update, but these values will already + // have been populated during submit and we don't have references to the + // job, group, and task + if job != nil && tg != nil { + p.Target = map[string]string{ + ScalingTargetNamespace: job.Namespace, + ScalingTargetJob: job.ID, + ScalingTargetGroup: tg.Name, + } + + if task != nil { + p.Target[ScalingTargetTask] = task.Name + } + } } func (p *ScalingPolicy) Copy() *ScalingPolicy { @@ -6343,23 +6358,6 @@ func (p *ScalingPolicy) Diff(p2 *ScalingPolicy) bool { return !reflect.DeepEqual(*p, copy) } -// TargetTaskGroup updates a ScalingPolicy target to specify a given task group -func (p *ScalingPolicy) TargetTaskGroup(job *Job, tg *TaskGroup) *ScalingPolicy { - p.Target = map[string]string{ - ScalingTargetNamespace: job.Namespace, - ScalingTargetJob: job.ID, - ScalingTargetGroup: tg.Name, - } - return p -} - -// TargetTask updates a ScalingPolicy target to specify a given task -func (p *ScalingPolicy) TargetTask(job *Job, tg *TaskGroup, task *Task) *ScalingPolicy { - p.TargetTaskGroup(job, tg) - p.Target[ScalingTargetTask] = task.Name - return p -} - func (p *ScalingPolicy) Stub() *ScalingPolicyListStub { stub := &ScalingPolicyListStub{ ID: p.ID, @@ -6954,7 +6952,7 @@ func (tg *TaskGroup) Canonicalize(job *Job) { } if tg.Scaling != nil { - tg.Scaling.Canonicalize() + tg.Scaling.Canonicalize(job, tg, nil) } for _, service := range tg.Services { @@ -8052,6 +8050,10 @@ func (t *Task) Canonicalize(job *Job, tg *TaskGroup) { t.KillTimeout = DefaultKillTimeout } + for _, policy := range t.ScalingPolicies { + policy.Canonicalize(job, tg, t) + } + if t.Vault != nil { t.Vault.Canonicalize() } diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 642bdc33100..21cf4f26c5e 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -6426,10 +6426,17 @@ func TestDispatchPayloadConfig_Validate(t *testing.T) { func TestScalingPolicy_Canonicalize(t *testing.T) { ci.Parallel(t) + job := &Job{Namespace: "prod", ID: "example"} + tg := &TaskGroup{Name: "web"} + task := &Task{Name: "httpd"} + cases := []struct { name string input *ScalingPolicy expected *ScalingPolicy + job *Job + tg *TaskGroup + task *Task }{ { name: "empty policy", @@ -6441,14 +6448,42 @@ func TestScalingPolicy_Canonicalize(t *testing.T) { input: &ScalingPolicy{Type: "other-type"}, expected: &ScalingPolicy{Type: "other-type"}, }, + { + name: "policy with type and task group", + input: &ScalingPolicy{Type: "other-type"}, + expected: &ScalingPolicy{ + Type: "other-type", + Target: map[string]string{ + ScalingTargetNamespace: "prod", + ScalingTargetJob: "example", + ScalingTargetGroup: "web", + }, + }, + job: job, + tg: tg, + }, + { + name: "policy with type and task", + input: &ScalingPolicy{Type: "other-type"}, + expected: &ScalingPolicy{ + Type: "other-type", + Target: map[string]string{ + ScalingTargetNamespace: "prod", + ScalingTargetJob: "example", + ScalingTargetGroup: "web", + ScalingTargetTask: "httpd", + }, + }, + job: job, + tg: tg, + task: task, + }, } - for _, c := range cases { - t.Run(c.name, func(t *testing.T) { - require := require.New(t) - - c.input.Canonicalize() - require.Equal(c.expected, c.input) + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + tc.input.Canonicalize(tc.job, tc.tg, tc.task) + must.Eq(t, tc.expected, tc.input) }) } }