diff --git a/.changelog/24065.txt b/.changelog/24065.txt new file mode 100644 index 000000000000..82a374f841c9 --- /dev/null +++ b/.changelog/24065.txt @@ -0,0 +1,3 @@ +```release-note:bug +scaling: Fixed a bug where scaling policies would not get created during job submission unless namespace field was set in jobspec +``` diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 2f7b9f2c2e7f..6c74b6a48d5b 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -16,6 +16,7 @@ import ( "github.com/hashicorp/nomad/acl" api "github.com/hashicorp/nomad/api" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/jobspec" "github.com/hashicorp/nomad/jobspec2" "github.com/hashicorp/nomad/nomad/structs" @@ -938,15 +939,17 @@ func (s *HTTPServer) apiJobAndRequestToStructs(job *api.Job, req *http.Request, job, queryRegion, writeReq.Region, s.agent.GetConfig().Region, ) - sJob := ApiJobToStructJob(job) - sJob.Region = jobRegion - writeReq.Region = requestRegion - + // mutate the namespace before we convert just in case anything is expecting + // the namespace to be correct queryNamespace := req.URL.Query().Get("namespace") namespace := namespaceForJob(job.Namespace, queryNamespace, writeReq.Namespace) - sJob.Namespace = namespace + job.Namespace = pointer.Of(namespace) writeReq.Namespace = namespace + sJob := ApiJobToStructJob(job) + sJob.Region = jobRegion + writeReq.Region = requestRegion + return sJob, writeReq } @@ -1193,7 +1196,8 @@ func ApiTgToStructsTG(job *structs.Job, taskGroup *api.TaskGroup, tg *structs.Ta } if taskGroup.Scaling != nil { - tg.Scaling = ApiScalingPolicyToStructs(tg.Count, taskGroup.Scaling).TargetTaskGroup(job, tg) + tg.Scaling = ApiScalingPolicyToStructs( + job, tg, nil, tg.Count, taskGroup.Scaling) } tg.EphemeralDisk = &structs.EphemeralDisk{ @@ -1330,7 +1334,7 @@ func ApiTaskToStructsTask(job *structs.Job, group *structs.TaskGroup, for _, policy := range apiTask.ScalingPolicies { structsTask.ScalingPolicies = append( structsTask.ScalingPolicies, - ApiScalingPolicyToStructs(0, policy).TargetTask(job, group, structsTask)) + ApiScalingPolicyToStructs(job, group, structsTask, 0, policy)) } } diff --git a/command/agent/scaling_endpoint.go b/command/agent/scaling_endpoint.go index 18a19a1c9612..473eb1681fc5 100644 --- a/command/agent/scaling_endpoint.go +++ b/command/agent/scaling_endpoint.go @@ -81,7 +81,7 @@ func (s *HTTPServer) scalingPolicyQuery(resp http.ResponseWriter, req *http.Requ return out.Policy, nil } -func ApiScalingPolicyToStructs(count int, ap *api.ScalingPolicy) *structs.ScalingPolicy { +func ApiScalingPolicyToStructs(job *structs.Job, tg *structs.TaskGroup, task *structs.Task, count int, ap *api.ScalingPolicy) *structs.ScalingPolicy { p := structs.ScalingPolicy{ Type: ap.Type, Policy: ap.Policy, @@ -103,5 +103,9 @@ func ApiScalingPolicyToStructs(count int, ap *api.ScalingPolicy) *structs.Scalin } else { p.Min = int64(count) } + + // COMPAT(1.12.0) - canonicalization is done in Job.Register as of 1.9, + // remove this canonicalization in 1.12.0 LTS + p.Canonicalize(job, tg, task) return &p } diff --git a/nomad/fsm.go b/nomad/fsm.go index 3e7d99d7c86f..2df72d8bc7eb 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -1748,7 +1748,7 @@ func (n *nomadFSM) restoreImpl(old io.ReadCloser, filter *FSMFilter) error { if filter.Include(scalingPolicy) { // Handle upgrade path: // - Set policy type if empty - scalingPolicy.Canonicalize() + scalingPolicy.Canonicalize(nil, nil, nil) if err := restore.ScalingPolicyRestore(scalingPolicy); err != nil { return err } diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 0595a3327aae..d29a6d2c8c15 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -6627,7 +6627,8 @@ func TestJobEndpoint_Plan_Scaling(t *testing.T) { tg := job.TaskGroups[0] tg.Tasks[0].Resources.MemoryMB = 999999999 scaling := &structs.ScalingPolicy{Min: 1, Max: 100, Type: structs.ScalingPolicyTypeHorizontal} - tg.Scaling = scaling.TargetTaskGroup(job, tg) + scaling.Canonicalize(job, tg, nil) + tg.Scaling = scaling planReq := &structs.JobPlanRequest{ Job: job, Diff: false, diff --git a/nomad/mock/job.go b/nomad/mock/job.go index b3592dbf9b9d..d3a709042861 100644 --- a/nomad/mock/job.go +++ b/nomad/mock/job.go @@ -189,7 +189,7 @@ func JobWithScalingPolicy() (*structs.Job, *structs.ScalingPolicy) { Policy: map[string]interface{}{}, Enabled: true, } - policy.TargetTaskGroup(job, job.TaskGroups[0]) + policy.Canonicalize(job, job.TaskGroups[0], nil) job.TaskGroups[0].Scaling = policy return job, policy } diff --git a/nomad/scaling_endpoint_test.go b/nomad/scaling_endpoint_test.go index 67a7e59061e6..0d39cea2c12b 100644 --- a/nomad/scaling_endpoint_test.go +++ b/nomad/scaling_endpoint_test.go @@ -163,15 +163,15 @@ func TestScalingEndpoint_ListPolicies(t *testing.T) { j1 := mock.Job() j1polV := mock.ScalingPolicy() j1polV.Type = "vertical-cpu" - j1polV.TargetTask(j1, j1.TaskGroups[0], j1.TaskGroups[0].Tasks[0]) + j1polV.Canonicalize(j1, j1.TaskGroups[0], j1.TaskGroups[0].Tasks[0]) j1polH := mock.ScalingPolicy() j1polH.Type = "horizontal" - j1polH.TargetTaskGroup(j1, j1.TaskGroups[0]) + j1polH.Canonicalize(j1, j1.TaskGroups[0], nil) j2 := mock.Job() j2polH := mock.ScalingPolicy() j2polH.Type = "horizontal" - j2polH.TargetTaskGroup(j2, j2.TaskGroups[0]) + j2polH.Canonicalize(j2, j2.TaskGroups[0], nil) s1.fsm.State().UpsertJob(structs.MsgTypeTestSetup, 1000, nil, j1) s1.fsm.State().UpsertJob(structs.MsgTypeTestSetup, 1000, nil, j2) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index d58405330d18..3a68aea1fdde 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -6251,10 +6251,25 @@ const ( ScalingPolicyTypeHorizontal = "horizontal" ) -func (p *ScalingPolicy) Canonicalize() { +func (p *ScalingPolicy) Canonicalize(job *Job, tg *TaskGroup, task *Task) { if p.Type == "" { p.Type = ScalingPolicyTypeHorizontal } + + // during restore we canonicalize to update, but these values will already + // have been populated during submit and we don't have references to the + // job, group, and task + if job != nil && tg != nil { + p.Target = map[string]string{ + ScalingTargetNamespace: job.Namespace, + ScalingTargetJob: job.ID, + ScalingTargetGroup: tg.Name, + } + + if task != nil { + p.Target[ScalingTargetTask] = task.Name + } + } } func (p *ScalingPolicy) Copy() *ScalingPolicy { @@ -6343,23 +6358,6 @@ func (p *ScalingPolicy) Diff(p2 *ScalingPolicy) bool { return !reflect.DeepEqual(*p, copy) } -// TargetTaskGroup updates a ScalingPolicy target to specify a given task group -func (p *ScalingPolicy) TargetTaskGroup(job *Job, tg *TaskGroup) *ScalingPolicy { - p.Target = map[string]string{ - ScalingTargetNamespace: job.Namespace, - ScalingTargetJob: job.ID, - ScalingTargetGroup: tg.Name, - } - return p -} - -// TargetTask updates a ScalingPolicy target to specify a given task -func (p *ScalingPolicy) TargetTask(job *Job, tg *TaskGroup, task *Task) *ScalingPolicy { - p.TargetTaskGroup(job, tg) - p.Target[ScalingTargetTask] = task.Name - return p -} - func (p *ScalingPolicy) Stub() *ScalingPolicyListStub { stub := &ScalingPolicyListStub{ ID: p.ID, @@ -6954,7 +6952,7 @@ func (tg *TaskGroup) Canonicalize(job *Job) { } if tg.Scaling != nil { - tg.Scaling.Canonicalize() + tg.Scaling.Canonicalize(job, tg, nil) } for _, service := range tg.Services { @@ -8052,6 +8050,10 @@ func (t *Task) Canonicalize(job *Job, tg *TaskGroup) { t.KillTimeout = DefaultKillTimeout } + for _, policy := range t.ScalingPolicies { + policy.Canonicalize(job, tg, t) + } + if t.Vault != nil { t.Vault.Canonicalize() } diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 642bdc33100d..21cf4f26c5e9 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -6426,10 +6426,17 @@ func TestDispatchPayloadConfig_Validate(t *testing.T) { func TestScalingPolicy_Canonicalize(t *testing.T) { ci.Parallel(t) + job := &Job{Namespace: "prod", ID: "example"} + tg := &TaskGroup{Name: "web"} + task := &Task{Name: "httpd"} + cases := []struct { name string input *ScalingPolicy expected *ScalingPolicy + job *Job + tg *TaskGroup + task *Task }{ { name: "empty policy", @@ -6441,14 +6448,42 @@ func TestScalingPolicy_Canonicalize(t *testing.T) { input: &ScalingPolicy{Type: "other-type"}, expected: &ScalingPolicy{Type: "other-type"}, }, + { + name: "policy with type and task group", + input: &ScalingPolicy{Type: "other-type"}, + expected: &ScalingPolicy{ + Type: "other-type", + Target: map[string]string{ + ScalingTargetNamespace: "prod", + ScalingTargetJob: "example", + ScalingTargetGroup: "web", + }, + }, + job: job, + tg: tg, + }, + { + name: "policy with type and task", + input: &ScalingPolicy{Type: "other-type"}, + expected: &ScalingPolicy{ + Type: "other-type", + Target: map[string]string{ + ScalingTargetNamespace: "prod", + ScalingTargetJob: "example", + ScalingTargetGroup: "web", + ScalingTargetTask: "httpd", + }, + }, + job: job, + tg: tg, + task: task, + }, } - for _, c := range cases { - t.Run(c.name, func(t *testing.T) { - require := require.New(t) - - c.input.Canonicalize() - require.Equal(c.expected, c.input) + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + tc.input.Canonicalize(tc.job, tc.tg, tc.task) + must.Eq(t, tc.expected, tc.input) }) } }