Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

differently exclude tagged job versions from being pruned #24102

Merged
merged 2 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 13 additions & 19 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2160,8 +2160,9 @@ func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *txn)
return fmt.Errorf("index update failed: %v", err)
}

// Get all the historic jobs for this ID
all, err := s.jobVersionByID(txn, nil, job.Namespace, job.ID)
// Get all the historic jobs for this ID, except those with a VersionTag,
// as they should always be kept. They are in Version order, high to low.
all, err := s.jobVersionByID(txn, nil, job.Namespace, job.ID, false)
if err != nil {
return fmt.Errorf("failed to look up job versions for %q: %v", job.ID, err)
}
Expand All @@ -2188,21 +2189,10 @@ func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *txn)
all[max-1], all[max] = all[max], all[max-1]
}

// Find the oldest non-tagged version to delete
deleteIdx := -1
for i := len(all) - 1; i >= max; i-- {
if all[i].VersionTag == nil {
deleteIdx = i
break
}
}

// If we found a non-tagged version to delete, delete it
if deleteIdx != -1 {
d := all[deleteIdx]
if err := txn.Delete("job_version", d); err != nil {
return fmt.Errorf("failed to delete job %v (%d) from job_version", d.ID, d.Version)
}
// Delete the oldest one
d := all[max]
if err := txn.Delete("job_version", d); err != nil {
return fmt.Errorf("failed to delete job %v (%d) from job_version", d.ID, d.Version)
}

return nil
Expand Down Expand Up @@ -2314,7 +2304,7 @@ func (s *StateStore) jobsByIDPrefixAllNamespaces(ws memdb.WatchSet, prefix strin
func (s *StateStore) JobVersionsByID(ws memdb.WatchSet, namespace, id string) ([]*structs.Job, error) {
txn := s.db.ReadTxn()

return s.jobVersionByID(txn, ws, namespace, id)
return s.jobVersionByID(txn, ws, namespace, id, true)
}

// JobVersionByTagName returns a Job if it has a Tag with the passed name
Expand All @@ -2335,7 +2325,7 @@ func (s *StateStore) JobVersionByTagName(ws memdb.WatchSet, namespace, id string
// jobVersionByID is the underlying implementation for retrieving all tracked
// versions of a job and is called under an existing transaction. A watch set
// can optionally be passed in to add the job histories to the watch set.
func (s *StateStore) jobVersionByID(txn *txn, ws memdb.WatchSet, namespace, id string) ([]*structs.Job, error) {
func (s *StateStore) jobVersionByID(txn *txn, ws memdb.WatchSet, namespace, id string, includeTagged bool) ([]*structs.Job, error) {
// Get all the historic jobs for this ID
iter, err := txn.Get("job_version", "id_prefix", namespace, id)
if err != nil {
Expand All @@ -2357,6 +2347,10 @@ func (s *StateStore) jobVersionByID(txn *txn, ws memdb.WatchSet, namespace, id s
continue
}

if !includeTagged && j.VersionTag != nil {
continue
}

all = append(all, j)
}

Expand Down
34 changes: 18 additions & 16 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2975,42 +2975,44 @@ func TestStatestore_JobVersionTag(t *testing.T) {
assertVersions(t, []uint64{2, 1, 0})

// tag 2 of them
applyTag(t, 0)
applyTag(t, 1)
applyTag(t, 2)
// nothing should change
assertVersions(t, []uint64{2, 1, 0})

// add 2 more, up to JobTrackedVersions (5)
upsertJob(t)
upsertJob(t)
assertVersions(t, []uint64{4, 3, 2, 1, 0})
// add 3 more, up to JobTrackedVersions (5) + 1 (6)
for range 3 {
upsertJob(t)
}
assertVersions(t, []uint64{5, 4, 3, 2, 1, 0})

// tag one more
applyTag(t, 2)
applyTag(t, 3)
// again nothing should change
assertVersions(t, []uint64{4, 3, 2, 1, 0})
assertVersions(t, []uint64{5, 4, 3, 2, 1, 0})
}

// removing a tag at this point should leave the version in place
// removing a tag at this point should leave the version in place,
// because we still have room within JobTrackedVersions
{
unsetTag(t, "v2")
assertVersions(t, []uint64{4, 3, 2, 1, 0})
unsetTag(t, "v3")
assertVersions(t, []uint64{5, 4, 3, 2, 1, 0})
}

// adding more versions should replace 2-4,
// and leave 0-1 in place because they are tagged
// adding more versions should replace 0,3-5
// and leave 1-2 in place because they are tagged
{
for range 10 {
upsertJob(t)
}
assertVersions(t, []uint64{14, 13, 12, 11, 10, 1, 0})
assertVersions(t, []uint64{15, 14, 13, 12, 11, 2, 1})
}

// untagging version 1 now should delete it immediately,
// since we now have more than JobTrackedVersions
{
unsetTag(t, "v1")
assertVersions(t, []uint64{14, 13, 12, 11, 10, 0})
assertVersions(t, []uint64{15, 14, 13, 12, 11, 2})
}

// test some error conditions
Expand All @@ -3034,10 +3036,10 @@ func TestStatestore_JobVersionTag(t *testing.T) {
// tag name already exists
err = state.UpdateJobVersionTag(nextIndex(state), job.Namespace, &structs.JobApplyTagRequest{
JobID: job.ID,
Tag: &structs.JobVersionTag{Name: "v0"},
Tag: &structs.JobVersionTag{Name: "v2"},
Version: 10,
})
must.ErrorContains(t, err, fmt.Sprintf(`"v0" already exists on a different version of job %q`, job.ID))
must.ErrorContains(t, err, fmt.Sprintf(`"v2" already exists on a different version of job %q`, job.ID))
}

// deleting all versions should also delete tagged versions
Expand Down