Skip to content

Commit

Permalink
lint
Browse files Browse the repository at this point in the history
  • Loading branch information
VihasMakwana committed Jan 28, 2025
1 parent 3d7dbe6 commit 3b2c397
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 12 deletions.
13 changes: 6 additions & 7 deletions pkg/stanza/fileconsumer/internal/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"

"go.opentelemetry.io/collector/component"
Expand All @@ -21,6 +20,11 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
)

const (
archiveIndexKey = "knownFilesArchiveIndex"
archivePollsToArchiveKey = "knonwFilesPollsToArchive"
)

// Interface for tracking files that are being consumed.
type Tracker interface {
Add(reader *reader.Reader)
Expand Down Expand Up @@ -56,11 +60,6 @@ type fileTracker struct {
archiveIndex int
}

var errInvalidValue = errors.New("invalid value")

var archiveIndexKey = "knownFilesArchiveIndex"
var archivePollsToArchiveKey = "knonwFilesPollsToArchive"

func NewFileTracker(ctx context.Context, set component.TelemetrySettings, maxBatchFiles int, pollsToArchive int, persister operator.Persister) Tracker {
knownFiles := make([]*fileset.Fileset[*reader.Metadata], 3)
for i := 0; i < len(knownFiles); i++ {
Expand Down Expand Up @@ -196,7 +195,7 @@ func (t *fileTracker) restoreArchiveIndex(ctx context.Context) {
}

func (t *fileTracker) rewriteArchive(ctx context.Context, previousPollsToArchive int) {
// Function to swap data between two indices
// helper to rewrite data from oldIndex to newIndex
rewrite := func(newIdx, oldIdex int) error {
oldVal, err := t.persister.Get(ctx, archiveKey(oldIdex))
if err != nil {
Expand Down
8 changes: 3 additions & 5 deletions pkg/stanza/fileconsumer/internal/tracker/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ func testArchiveRestoration(t *testing.T, pollsToArchive int, newPollsToArchive
// 1.25 means archive is 125% filled (i.e it was rolled over once)
pctFilled := []float32{0.25, 0.5, 0.75, 1, 1.25, 1.50, 1.75, 2.00}
for _, pct := range pctFilled {

persister := testutil.NewUnscopedMockPersister()
tracker := NewFileTracker(context.Background(), componenttest.NewNopTelemetrySettings(), 0, pollsToArchive, persister).(*fileTracker)
iterations := int(pct * float32(pollsToArchive))
Expand All @@ -119,7 +118,7 @@ func testArchiveRestoration(t *testing.T, pollsToArchive int, newPollsToArchive
val, err := persister.Get(context.Background(), archiveKey(i))
require.NoError(t, err)
if val != nil {
count += 1
count++
}
}
require.Equal(t, min(iterations, pollsToArchive), count)
Expand All @@ -138,8 +137,8 @@ func testArchiveRestoration(t *testing.T, pollsToArchive int, newPollsToArchive
foundIteration := int(element.Offset)
require.Equal(t, mostRecentIteration, foundIteration)
}
mostRecentIteration -= 1
startIdx -= 1
mostRecentIteration--
startIdx--
}

// make sure we've removed all extra keys
Expand All @@ -150,7 +149,6 @@ func testArchiveRestoration(t *testing.T, pollsToArchive int, newPollsToArchive
}
}
}

}

func populatedPersisterData(persister operator.Persister, fps []*fingerprint.Fingerprint) []bool {
Expand Down

0 comments on commit 3b2c397

Please sign in to comment.