From 25c305f7d91a4144fc4d2699fc1ef0b405ab4892 Mon Sep 17 00:00:00 2001 From: Daniel Bennett Date: Tue, 1 Oct 2024 12:56:51 -0400 Subject: [PATCH 1/2] test bug: tagged versions count against limit specifically tagged versions that are not the latest --- nomad/state/state_store_test.go | 34 +++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index bc9a328fa5c..abb59857669 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -2975,42 +2975,44 @@ func TestStatestore_JobVersionTag(t *testing.T) { assertVersions(t, []uint64{2, 1, 0}) // tag 2 of them - applyTag(t, 0) applyTag(t, 1) + applyTag(t, 2) // nothing should change assertVersions(t, []uint64{2, 1, 0}) - // add 2 more, up to JobTrackedVersions (5) - upsertJob(t) - upsertJob(t) - assertVersions(t, []uint64{4, 3, 2, 1, 0}) + // add 3 more, up to JobTrackedVersions (5) + 1 (6) + for range 3 { + upsertJob(t) + } + assertVersions(t, []uint64{5, 4, 3, 2, 1, 0}) // tag one more - applyTag(t, 2) + applyTag(t, 3) // again nothing should change - assertVersions(t, []uint64{4, 3, 2, 1, 0}) + assertVersions(t, []uint64{5, 4, 3, 2, 1, 0}) } - // removing a tag at this point should leave the version in place + // removing a tag at this point should leave the version in place, + // because we still have room within JobTrackedVersions { - unsetTag(t, "v2") - assertVersions(t, []uint64{4, 3, 2, 1, 0}) + unsetTag(t, "v3") + assertVersions(t, []uint64{5, 4, 3, 2, 1, 0}) } - // adding more versions should replace 2-4, - // and leave 0-1 in place because they are tagged + // adding more versions should replace 0,3-5 + // and leave 1-2 in place because they are tagged { for range 10 { upsertJob(t) } - assertVersions(t, []uint64{14, 13, 12, 11, 10, 1, 0}) + assertVersions(t, []uint64{15, 14, 13, 12, 11, 2, 1}) } // untagging version 1 now should delete it immediately, // since we now have more than JobTrackedVersions { unsetTag(t, "v1") - assertVersions(t, []uint64{14, 13, 12, 11, 10, 0}) + assertVersions(t, []uint64{15, 14, 13, 12, 11, 2}) } // test some error conditions @@ -3034,10 +3036,10 @@ func TestStatestore_JobVersionTag(t *testing.T) { // tag name already exists err = state.UpdateJobVersionTag(nextIndex(state), job.Namespace, &structs.JobApplyTagRequest{ JobID: job.ID, - Tag: &structs.JobVersionTag{Name: "v0"}, + Tag: &structs.JobVersionTag{Name: "v2"}, Version: 10, }) - must.ErrorContains(t, err, fmt.Sprintf(`"v0" already exists on a different version of job %q`, job.ID)) + must.ErrorContains(t, err, fmt.Sprintf(`"v2" already exists on a different version of job %q`, job.ID)) } // deleting all versions should also delete tagged versions From b79c39fe8520522c451d987bbe0cdf0412d6cc26 Mon Sep 17 00:00:00 2001 From: Daniel Bennett Date: Tue, 1 Oct 2024 13:50:24 -0400 Subject: [PATCH 2/2] fix: use original logic, sans tagged versions --- nomad/state/state_store.go | 32 +++++++++++++------------------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index c9bfbcbffe7..f24fb7a84a4 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2160,8 +2160,9 @@ func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *txn) return fmt.Errorf("index update failed: %v", err) } - // Get all the historic jobs for this ID - all, err := s.jobVersionByID(txn, nil, job.Namespace, job.ID) + // Get all the historic jobs for this ID, except those with a VersionTag, + // as they should always be kept. They are in Version order, high to low. + all, err := s.jobVersionByID(txn, nil, job.Namespace, job.ID, false) if err != nil { return fmt.Errorf("failed to look up job versions for %q: %v", job.ID, err) } @@ -2188,21 +2189,10 @@ func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *txn) all[max-1], all[max] = all[max], all[max-1] } - // Find the oldest non-tagged version to delete - deleteIdx := -1 - for i := len(all) - 1; i >= max; i-- { - if all[i].VersionTag == nil { - deleteIdx = i - break - } - } - - // If we found a non-tagged version to delete, delete it - if deleteIdx != -1 { - d := all[deleteIdx] - if err := txn.Delete("job_version", d); err != nil { - return fmt.Errorf("failed to delete job %v (%d) from job_version", d.ID, d.Version) - } + // Delete the oldest one + d := all[max] + if err := txn.Delete("job_version", d); err != nil { + return fmt.Errorf("failed to delete job %v (%d) from job_version", d.ID, d.Version) } return nil @@ -2314,7 +2304,7 @@ func (s *StateStore) jobsByIDPrefixAllNamespaces(ws memdb.WatchSet, prefix strin func (s *StateStore) JobVersionsByID(ws memdb.WatchSet, namespace, id string) ([]*structs.Job, error) { txn := s.db.ReadTxn() - return s.jobVersionByID(txn, ws, namespace, id) + return s.jobVersionByID(txn, ws, namespace, id, true) } // JobVersionByTagName returns a Job if it has a Tag with the passed name @@ -2335,7 +2325,7 @@ func (s *StateStore) JobVersionByTagName(ws memdb.WatchSet, namespace, id string // jobVersionByID is the underlying implementation for retrieving all tracked // versions of a job and is called under an existing transaction. A watch set // can optionally be passed in to add the job histories to the watch set. -func (s *StateStore) jobVersionByID(txn *txn, ws memdb.WatchSet, namespace, id string) ([]*structs.Job, error) { +func (s *StateStore) jobVersionByID(txn *txn, ws memdb.WatchSet, namespace, id string, includeTagged bool) ([]*structs.Job, error) { // Get all the historic jobs for this ID iter, err := txn.Get("job_version", "id_prefix", namespace, id) if err != nil { @@ -2357,6 +2347,10 @@ func (s *StateStore) jobVersionByID(txn *txn, ws memdb.WatchSet, namespace, id s continue } + if !includeTagged && j.VersionTag != nil { + continue + } + all = append(all, j) }