Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scaling policy: use request namespace as target if unset in jobspec #24065

Merged
merged 2 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/24065.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
scaling: Fixed a bug where scaling policies would not get created during job submission unless namespace field was set in jobspec
```
18 changes: 11 additions & 7 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/hashicorp/nomad/acl"
api "github.com/hashicorp/nomad/api"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/jobspec2"
"github.com/hashicorp/nomad/nomad/structs"
)
Expand Down Expand Up @@ -1008,15 +1009,17 @@ func (s *HTTPServer) apiJobAndRequestToStructs(job *api.Job, req *http.Request,
job, queryRegion, writeReq.Region, s.agent.GetConfig().Region,
)

sJob := ApiJobToStructJob(job)
sJob.Region = jobRegion
writeReq.Region = requestRegion

// mutate the namespace before we convert just in case anything is expecting
// the namespace to be correct
queryNamespace := req.URL.Query().Get("namespace")
namespace := namespaceForJob(job.Namespace, queryNamespace, writeReq.Namespace)
sJob.Namespace = namespace
job.Namespace = pointer.Of(namespace)
writeReq.Namespace = namespace

sJob := ApiJobToStructJob(job)
sJob.Region = jobRegion
writeReq.Region = requestRegion

return sJob, writeReq
}

Expand Down Expand Up @@ -1264,7 +1267,8 @@ func ApiTgToStructsTG(job *structs.Job, taskGroup *api.TaskGroup, tg *structs.Ta
}

if taskGroup.Scaling != nil {
tg.Scaling = ApiScalingPolicyToStructs(tg.Count, taskGroup.Scaling).TargetTaskGroup(job, tg)
tg.Scaling = ApiScalingPolicyToStructs(
job, tg, nil, tg.Count, taskGroup.Scaling)
}

tg.EphemeralDisk = &structs.EphemeralDisk{
Expand Down Expand Up @@ -1401,7 +1405,7 @@ func ApiTaskToStructsTask(job *structs.Job, group *structs.TaskGroup,
for _, policy := range apiTask.ScalingPolicies {
structsTask.ScalingPolicies = append(
structsTask.ScalingPolicies,
ApiScalingPolicyToStructs(0, policy).TargetTask(job, group, structsTask))
ApiScalingPolicyToStructs(job, group, structsTask, 0, policy))
}
}

Expand Down
6 changes: 5 additions & 1 deletion command/agent/scaling_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (s *HTTPServer) scalingPolicyQuery(resp http.ResponseWriter, req *http.Requ
return out.Policy, nil
}

func ApiScalingPolicyToStructs(count int, ap *api.ScalingPolicy) *structs.ScalingPolicy {
func ApiScalingPolicyToStructs(job *structs.Job, tg *structs.TaskGroup, task *structs.Task, count int, ap *api.ScalingPolicy) *structs.ScalingPolicy {
p := structs.ScalingPolicy{
Type: ap.Type,
Policy: ap.Policy,
Expand All @@ -103,5 +103,9 @@ func ApiScalingPolicyToStructs(count int, ap *api.ScalingPolicy) *structs.Scalin
} else {
p.Min = int64(count)
}

// COMPAT(1.12.0) - canonicalization is done in Job.Register as of 1.9,
// remove this canonicalization in 1.12.0 LTS
p.Canonicalize(job, tg, task)
return &p
}
2 changes: 1 addition & 1 deletion nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -1776,7 +1776,7 @@ func (n *nomadFSM) restoreImpl(old io.ReadCloser, filter *FSMFilter) error {
if filter.Include(scalingPolicy) {
// Handle upgrade path:
// - Set policy type if empty
scalingPolicy.Canonicalize()
scalingPolicy.Canonicalize(nil, nil, nil)
if err := restore.ScalingPolicyRestore(scalingPolicy); err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6628,7 +6628,8 @@ func TestJobEndpoint_Plan_Scaling(t *testing.T) {
tg := job.TaskGroups[0]
tg.Tasks[0].Resources.MemoryMB = 999999999
scaling := &structs.ScalingPolicy{Min: 1, Max: 100, Type: structs.ScalingPolicyTypeHorizontal}
tg.Scaling = scaling.TargetTaskGroup(job, tg)
scaling.Canonicalize(job, tg, nil)
tg.Scaling = scaling
planReq := &structs.JobPlanRequest{
Job: job,
Diff: false,
Expand Down
2 changes: 1 addition & 1 deletion nomad/mock/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func JobWithScalingPolicy() (*structs.Job, *structs.ScalingPolicy) {
Policy: map[string]interface{}{},
Enabled: true,
}
policy.TargetTaskGroup(job, job.TaskGroups[0])
policy.Canonicalize(job, job.TaskGroups[0], nil)
job.TaskGroups[0].Scaling = policy
return job, policy
}
Expand Down
6 changes: 3 additions & 3 deletions nomad/scaling_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,15 +163,15 @@ func TestScalingEndpoint_ListPolicies(t *testing.T) {
j1 := mock.Job()
j1polV := mock.ScalingPolicy()
j1polV.Type = "vertical-cpu"
j1polV.TargetTask(j1, j1.TaskGroups[0], j1.TaskGroups[0].Tasks[0])
j1polV.Canonicalize(j1, j1.TaskGroups[0], j1.TaskGroups[0].Tasks[0])
j1polH := mock.ScalingPolicy()
j1polH.Type = "horizontal"
j1polH.TargetTaskGroup(j1, j1.TaskGroups[0])
j1polH.Canonicalize(j1, j1.TaskGroups[0], nil)

j2 := mock.Job()
j2polH := mock.ScalingPolicy()
j2polH.Type = "horizontal"
j2polH.TargetTaskGroup(j2, j2.TaskGroups[0])
j2polH.Canonicalize(j2, j2.TaskGroups[0], nil)

s1.fsm.State().UpsertJob(structs.MsgTypeTestSetup, 1000, nil, j1)
s1.fsm.State().UpsertJob(structs.MsgTypeTestSetup, 1000, nil, j2)
Expand Down
40 changes: 21 additions & 19 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6347,10 +6347,25 @@ const (
ScalingPolicyTypeHorizontal = "horizontal"
)

func (p *ScalingPolicy) Canonicalize() {
func (p *ScalingPolicy) Canonicalize(job *Job, tg *TaskGroup, task *Task) {
if p.Type == "" {
p.Type = ScalingPolicyTypeHorizontal
}

// during restore we canonicalize to update, but these values will already
// have been populated during submit and we don't have references to the
// job, group, and task
if job != nil && tg != nil {
p.Target = map[string]string{
ScalingTargetNamespace: job.Namespace,
ScalingTargetJob: job.ID,
ScalingTargetGroup: tg.Name,
}

if task != nil {
p.Target[ScalingTargetTask] = task.Name
}
}
}

func (p *ScalingPolicy) Copy() *ScalingPolicy {
Expand Down Expand Up @@ -6439,23 +6454,6 @@ func (p *ScalingPolicy) Diff(p2 *ScalingPolicy) bool {
return !reflect.DeepEqual(*p, copy)
}

// TargetTaskGroup updates a ScalingPolicy target to specify a given task group
func (p *ScalingPolicy) TargetTaskGroup(job *Job, tg *TaskGroup) *ScalingPolicy {
p.Target = map[string]string{
ScalingTargetNamespace: job.Namespace,
ScalingTargetJob: job.ID,
ScalingTargetGroup: tg.Name,
}
return p
}

// TargetTask updates a ScalingPolicy target to specify a given task
func (p *ScalingPolicy) TargetTask(job *Job, tg *TaskGroup, task *Task) *ScalingPolicy {
p.TargetTaskGroup(job, tg)
p.Target[ScalingTargetTask] = task.Name
return p
}

func (p *ScalingPolicy) Stub() *ScalingPolicyListStub {
stub := &ScalingPolicyListStub{
ID: p.ID,
Expand Down Expand Up @@ -7050,7 +7048,7 @@ func (tg *TaskGroup) Canonicalize(job *Job) {
}

if tg.Scaling != nil {
tg.Scaling.Canonicalize()
tg.Scaling.Canonicalize(job, tg, nil)
}

for _, service := range tg.Services {
Expand Down Expand Up @@ -8155,6 +8153,10 @@ func (t *Task) Canonicalize(job *Job, tg *TaskGroup) {
t.KillTimeout = DefaultKillTimeout
}

for _, policy := range t.ScalingPolicies {
policy.Canonicalize(job, tg, t)
}

if t.Vault != nil {
t.Vault.Canonicalize()
}
Expand Down
47 changes: 41 additions & 6 deletions nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6511,10 +6511,17 @@ func TestDispatchPayloadConfig_Validate(t *testing.T) {
func TestScalingPolicy_Canonicalize(t *testing.T) {
ci.Parallel(t)

job := &Job{Namespace: "prod", ID: "example"}
tg := &TaskGroup{Name: "web"}
task := &Task{Name: "httpd"}

cases := []struct {
name string
input *ScalingPolicy
expected *ScalingPolicy
job *Job
tg *TaskGroup
task *Task
}{
{
name: "empty policy",
Expand All @@ -6526,14 +6533,42 @@ func TestScalingPolicy_Canonicalize(t *testing.T) {
input: &ScalingPolicy{Type: "other-type"},
expected: &ScalingPolicy{Type: "other-type"},
},
{
name: "policy with type and task group",
input: &ScalingPolicy{Type: "other-type"},
expected: &ScalingPolicy{
Type: "other-type",
Target: map[string]string{
ScalingTargetNamespace: "prod",
ScalingTargetJob: "example",
ScalingTargetGroup: "web",
},
},
job: job,
tg: tg,
},
{
name: "policy with type and task",
input: &ScalingPolicy{Type: "other-type"},
expected: &ScalingPolicy{
Type: "other-type",
Target: map[string]string{
ScalingTargetNamespace: "prod",
ScalingTargetJob: "example",
ScalingTargetGroup: "web",
ScalingTargetTask: "httpd",
},
},
job: job,
tg: tg,
task: task,
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
require := require.New(t)

c.input.Canonicalize()
require.Equal(c.expected, c.input)
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
tc.input.Canonicalize(tc.job, tc.tg, tc.task)
must.Eq(t, tc.expected, tc.input)
})
}
}
Expand Down