Skip to content

Commit

Permalink
Validate completion keys.
Browse files Browse the repository at this point in the history
  • Loading branch information
seizethedave committed Jan 2, 2025
1 parent 0062ff8 commit 4e8dd7a
Showing 1 changed file with 33 additions and 5 deletions.
38 changes: 33 additions & 5 deletions pkg/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,11 @@ func TestBlockBuilder_StartWithExistingCommit_PullMode(t *testing.T) {
return completeJobCalls > 0
}, 5*time.Second, 100*time.Millisecond, "expected job completion")

require.EqualValues(t,
[]schedulerpb.JobKey{{Id: "test-job-4898", Epoch: 90000}},
scheduler.completeJobCalls,
)

// Because there is a commit, on startup, block-builder must consume samples only after the commit.
expSamples := producedSamples[1+(len(producedSamples)/2):]

Expand Down Expand Up @@ -387,6 +392,11 @@ func TestBlockBuilder_StartWithLookbackOnNoCommit_PullMode(t *testing.T) {
return bb.jobIteration.Load() > 0
}, 5*time.Second, 100*time.Millisecond, "expected job completion")

require.EqualValues(t,
[]schedulerpb.JobKey{{Id: "test-job-4898", Epoch: 90001}},
scheduler.completeJobCalls,
)

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1")
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -565,7 +575,14 @@ func TestBlockBuilder_ReachHighWatermarkBeforeLastCycleSection_PullMode(t *testi
})

// Wait for both jobs to complete.
require.Eventually(t, func() bool { return bb.jobIteration.Load() >= 2 }, 5*time.Second, 100*time.Millisecond, "expected job completion")
require.Eventually(t, func() bool {
return bb.jobIteration.Load() >= 2
}, 5*time.Second, 100*time.Millisecond, "expected job completion")

require.EqualValues(t,
[]schedulerpb.JobKey{{Id: "test-job-p0-4898", Epoch: 90002}, {Id: "test-job-p1-4899", Epoch: 90070}},
scheduler.completeJobCalls,
)

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1")
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
Expand Down Expand Up @@ -704,6 +721,11 @@ func TestBlockBuilder_WithMultipleTenants_PullMode(t *testing.T) {
return completeJobCalls > 0
}, 5*time.Second, 100*time.Millisecond, "expected job completion")

require.EqualValues(t,
[]schedulerpb.JobKey{{Id: "test-job-4898", Epoch: 90003}},
scheduler.completeJobCalls,
)

for _, tenant := range tenants {
bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, tenant)
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
Expand Down Expand Up @@ -1157,6 +1179,11 @@ func TestPullMode(t *testing.T) {
return completeJobCalls == 2
}, 5*time.Second, 100*time.Millisecond, "expected to complete two jobs")

require.EqualValues(t,
[]schedulerpb.JobKey{{Id: "test-job-p0-0", Epoch: 220}, {Id: "test-job-p1-0", Epoch: 233}},
scheduler.completeJobCalls,
)

bucketDir := path.Join(cfg.BlocksStorage.Bucket.Filesystem.Directory, "1")
db, err := tsdb.Open(bucketDir, promslog.NewNopLogger(), nil, nil, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -1184,7 +1211,7 @@ type mockSchedulerClient struct {
}
runCalls int
getJobCalls int
completeJobCalls int
completeJobCalls []schedulerpb.JobKey
}

func (m *mockSchedulerClient) Run(_ context.Context) {
Expand All @@ -1211,10 +1238,11 @@ func (m *mockSchedulerClient) GetJob(ctx context.Context) (schedulerpb.JobKey, s
return schedulerpb.JobKey{}, schedulerpb.JobSpec{}, ctx.Err()
}

func (m *mockSchedulerClient) CompleteJob(schedulerpb.JobKey) error {
func (m *mockSchedulerClient) CompleteJob(key schedulerpb.JobKey) error {
m.mu.Lock()
defer m.mu.Unlock()
m.completeJobCalls++

m.completeJobCalls = append(m.completeJobCalls, key)

// Do nothing.
return nil
Expand All @@ -1232,5 +1260,5 @@ func (m *mockSchedulerClient) addJob(key schedulerpb.JobKey, spec schedulerpb.Jo
func (m *mockSchedulerClient) counts() (int, int, int) {
m.mu.Lock()
defer m.mu.Unlock()
return m.runCalls, m.getJobCalls, m.completeJobCalls
return m.runCalls, m.getJobCalls, len(m.completeJobCalls)
}

0 comments on commit 4e8dd7a

Please sign in to comment.