diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index c5675a90a063d..e8119e36a7a08 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -50,7 +50,7 @@ func (m *Manager) Start(persister operator.Persister) error { } // instantiate the tracker - m.instantiateTracker(persister) + m.instantiateTracker(ctx, persister) if persister != nil { m.persister = persister @@ -271,12 +271,12 @@ func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint. return r, nil } -func (m *Manager) instantiateTracker(persister operator.Persister) { +func (m *Manager) instantiateTracker(ctx context.Context, persister operator.Persister) { var t tracker.Tracker if m.noTracking { t = tracker.NewNoStateTracker(m.set, m.maxBatchFiles) } else { - t = tracker.NewFileTracker(m.set, m.maxBatchFiles, m.pollsToArchive, persister) + t = tracker.NewFileTracker(ctx, m.set, m.maxBatchFiles, m.pollsToArchive, persister) } m.tracker = t } diff --git a/pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go b/pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go index 8a5a60b7d734b..4727500b61f03 100644 --- a/pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go +++ b/pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go @@ -10,6 +10,8 @@ import ( "errors" "fmt" + "go.opentelemetry.io/collector/extension/xextension/storage" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" ) @@ -21,7 +23,7 @@ func Save(ctx context.Context, persister operator.Persister, rmds []*reader.Meta return SaveKey(ctx, persister, rmds, knownFilesKey) } -func SaveKey(ctx context.Context, persister operator.Persister, rmds []*reader.Metadata, key string) error { +func SaveKey(ctx context.Context, persister operator.Persister, rmds []*reader.Metadata, key string, ops ...*storage.Operation) error { var buf bytes.Buffer enc := json.NewEncoder(&buf) @@ -37,8 +39,8 @@ func SaveKey(ctx context.Context, persister operator.Persister, rmds []*reader.M errs = append(errs, fmt.Errorf("encode metadata: %w", err)) } } - - if err := persister.Set(ctx, key, buf.Bytes()); err != nil { + ops = append(ops, storage.SetOperation(key, buf.Bytes())) + if err := persister.Batch(ctx, ops...); err != nil { errs = append(errs, fmt.Errorf("persist known files: %w", err)) } diff --git a/pkg/stanza/fileconsumer/internal/tracker/tracker.go b/pkg/stanza/fileconsumer/internal/tracker/tracker.go index c784d1c3485a1..c7e6e897f0e3a 100644 --- a/pkg/stanza/fileconsumer/internal/tracker/tracker.go +++ b/pkg/stanza/fileconsumer/internal/tracker/tracker.go @@ -4,10 +4,13 @@ package tracker // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker" import ( + "bytes" "context" + "encoding/json" "fmt" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/extension/xextension/storage" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint" @@ -17,6 +20,11 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" ) +const ( + archiveIndexKey = "knownFilesArchiveIndex" + archivePollsToArchiveKey = "knonwFilesPollsToArchive" +) + // Interface for tracking files that are being consumed. type Tracker interface { Add(reader *reader.Reader) @@ -52,13 +60,14 @@ type fileTracker struct { archiveIndex int } -func NewFileTracker(set component.TelemetrySettings, maxBatchFiles int, pollsToArchive int, persister operator.Persister) Tracker { +func NewFileTracker(ctx context.Context, set component.TelemetrySettings, maxBatchFiles int, pollsToArchive int, persister operator.Persister) Tracker { knownFiles := make([]*fileset.Fileset[*reader.Metadata], 3) for i := 0; i < len(knownFiles); i++ { knownFiles[i] = fileset.New[*reader.Metadata](maxBatchFiles) } set.Logger = set.Logger.With(zap.String("tracker", "fileTracker")) - return &fileTracker{ + + t := &fileTracker{ set: set, maxBatchFiles: maxBatchFiles, currentPollFiles: fileset.New[*reader.Reader](maxBatchFiles), @@ -68,6 +77,11 @@ func NewFileTracker(set component.TelemetrySettings, maxBatchFiles int, pollsToA persister: persister, archiveIndex: 0, } + if t.archiveEnabled() { + t.restoreArchiveIndex(ctx) + } + + return t } func (t *fileTracker) Add(reader *reader.Reader) { @@ -131,7 +145,9 @@ func (t *fileTracker) EndPoll() { // t.knownFiles[0] -> t.knownFiles[1] -> t.knownFiles[2] // Instead of throwing it away, archive it. - t.archive(t.knownFiles[2]) + if t.archiveEnabled() { + t.archive(t.knownFiles[2]) + } copy(t.knownFiles[1:], t.knownFiles) t.knownFiles[0] = fileset.New[*reader.Metadata](t.maxBatchFiles) } @@ -144,6 +160,110 @@ func (t *fileTracker) TotalReaders() int { return total } +func (t *fileTracker) restoreArchiveIndex(ctx context.Context) { + // remove extra "keys" once archive restoration is done + defer t.removeExtraKeys(ctx) + defer func() { + // store current pollsToArchive + if err := t.persister.Set(ctx, archivePollsToArchiveKey, encodeIndex(t.pollsToArchive)); err != nil { + t.set.Logger.Error("Error storing polls_to_archive", zap.Error(err)) + } + }() + + previousPollsToArchive, err := t.getPreviousPollsToArchive(ctx) + if err != nil { + // if there's an error reading previousPollsToArchive, default to current value + previousPollsToArchive = t.pollsToArchive + } + + t.archiveIndex, err = t.getArchiveIndex(ctx) + if err != nil { + t.set.Logger.Error("error while reading the archiveIndexKey. Starting from 0", zap.Error(err)) + return + } + + if previousPollsToArchive < t.pollsToArchive { + // if archive size has increased, we just increment the index until we enconter a nil value + for t.archiveIndex < t.pollsToArchive && t.isSet(ctx, t.archiveIndex) { + t.archiveIndex++ + } + } else if previousPollsToArchive > t.pollsToArchive { + // we will only attempt to rewrite archive if the archive size has shrunk + t.set.Logger.Warn("polls_to_archive has changed. Will attempt to rewrite archive") + t.rewriteArchive(ctx, previousPollsToArchive) + } +} + +func (t *fileTracker) rewriteArchive(ctx context.Context, previousPollsToArchive int) { + // helper to rewrite data from oldIndex to newIndex + rewrite := func(newIdx, oldIdex int) error { + oldVal, err := t.persister.Get(ctx, archiveKey(oldIdex)) + if err != nil { + return err + } + return t.persister.Set(ctx, archiveKey(newIdx), oldVal) + } + // Calculate the least recent index, w.r.t. new archive size + + leastRecentIndex := mod(t.archiveIndex-t.pollsToArchive, previousPollsToArchive) + + // Refer archive.md for the detailed design + if mod(t.archiveIndex-1, previousPollsToArchive) > t.pollsToArchive { + for i := 0; i < t.pollsToArchive; i++ { + if err := rewrite(i, leastRecentIndex); err != nil { + t.set.Logger.Error("error while swapping archive", zap.Error(err)) + } + leastRecentIndex = (leastRecentIndex + 1) % previousPollsToArchive + } + t.archiveIndex = 0 + } else { + if !t.isSet(ctx, t.archiveIndex) { + // If the current index points at an unset key, no need to do anything + return + } + for i := 0; i < t.pollsToArchive-t.archiveIndex; i++ { + if err := rewrite(t.archiveIndex+i, leastRecentIndex); err != nil { + t.set.Logger.Warn("error while swapping archive", zap.Error(err)) + } + leastRecentIndex = (leastRecentIndex + 1) % previousPollsToArchive + } + } +} + +func (t *fileTracker) removeExtraKeys(ctx context.Context) { + for i := t.pollsToArchive; t.isSet(ctx, i); i++ { + if err := t.persister.Delete(ctx, archiveKey(i)); err != nil { + t.set.Logger.Error("error while cleaning extra keys", zap.Error(err)) + } + } +} + +func (t *fileTracker) getPreviousPollsToArchive(ctx context.Context) (int, error) { + byteIndex, err := t.persister.Get(ctx, archivePollsToArchiveKey) + if err != nil { + t.set.Logger.Error("error while reading the archiveIndexKey", zap.Error(err)) + return 0, err + } + previousPollsToArchive, err := decodeIndex(byteIndex) + if err != nil { + t.set.Logger.Error("error while decoding previousPollsToArchive", zap.Error(err)) + return 0, err + } + return previousPollsToArchive, nil +} + +func (t *fileTracker) getArchiveIndex(ctx context.Context) (int, error) { + byteIndex, err := t.persister.Get(ctx, archiveIndexKey) + if err != nil { + return 0, err + } + archiveIndex, err := decodeIndex(byteIndex) + if err != nil { + return 0, err + } + return archiveIndex, nil +} + func (t *fileTracker) archive(metadata *fileset.Fileset[*reader.Metadata]) { // We make use of a ring buffer, where each set of files is stored under a specific index. // Instead of discarding knownFiles[2], write it to the next index and eventually roll over. @@ -162,19 +282,17 @@ func (t *fileTracker) archive(metadata *fileset.Fileset[*reader.Metadata]) { // start // index - if t.pollsToArchive <= 0 || t.persister == nil { - return - } - if err := t.writeArchive(t.archiveIndex, metadata); err != nil { + index := t.archiveIndex + t.archiveIndex = (t.archiveIndex + 1) % t.pollsToArchive // increment the index + indexOp := storage.SetOperation(archiveIndexKey, encodeIndex(t.archiveIndex)) // batch the updated index with metadata + if err := t.writeArchive(index, metadata, indexOp); err != nil { t.set.Logger.Error("error faced while saving to the archive", zap.Error(err)) } - t.archiveIndex = (t.archiveIndex + 1) % t.pollsToArchive // increment the index } // readArchive loads data from the archive for a given index and returns a fileset.Filset. func (t *fileTracker) readArchive(index int) (*fileset.Fileset[*reader.Metadata], error) { - key := fmt.Sprintf("knownFiles%d", index) - metadata, err := checkpoint.LoadKey(context.Background(), t.persister, key) + metadata, err := checkpoint.LoadKey(context.Background(), t.persister, archiveKey(index)) if err != nil { return nil, err } @@ -184,9 +302,17 @@ func (t *fileTracker) readArchive(index int) (*fileset.Fileset[*reader.Metadata] } // writeArchive saves data to the archive for a given index and returns an error, if encountered. -func (t *fileTracker) writeArchive(index int, rmds *fileset.Fileset[*reader.Metadata]) error { - key := fmt.Sprintf("knownFiles%d", index) - return checkpoint.SaveKey(context.Background(), t.persister, rmds.Get(), key) +func (t *fileTracker) writeArchive(index int, rmds *fileset.Fileset[*reader.Metadata], ops ...*storage.Operation) error { + return checkpoint.SaveKey(context.Background(), t.persister, rmds.Get(), archiveKey(index), ops...) +} + +func (t *fileTracker) archiveEnabled() bool { + return t.pollsToArchive > 0 && t.persister != nil +} + +func (t *fileTracker) isSet(ctx context.Context, index int) bool { + val, err := t.persister.Get(ctx, archiveKey(index)) + return val != nil && err == nil } // FindFiles goes through archive, one fileset at a time and tries to match all fingerprints against that loaded set. @@ -295,3 +421,31 @@ func (t *noStateTracker) EndPoll() {} func (t *noStateTracker) TotalReaders() int { return 0 } func (t *noStateTracker) FindFiles([]*fingerprint.Fingerprint) []*reader.Metadata { return nil } + +func encodeIndex(val int) []byte { + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + + // Encode the index + if err := enc.Encode(val); err != nil { + return nil + } + return buf.Bytes() +} + +func decodeIndex(buf []byte) (int, error) { + var index int + + // Decode the index + dec := json.NewDecoder(bytes.NewReader(buf)) + err := dec.Decode(&index) + return max(index, 0), err +} + +func archiveKey(i int) string { + return fmt.Sprintf("knownFiles%d", i) +} + +func mod(x, y int) int { + return (x + y) % y +} diff --git a/pkg/stanza/fileconsumer/internal/tracker/tracker_test.go b/pkg/stanza/fileconsumer/internal/tracker/tracker_test.go index f16e2d6470320..96329055e5888 100644 --- a/pkg/stanza/fileconsumer/internal/tracker/tracker_test.go +++ b/pkg/stanza/fileconsumer/internal/tracker/tracker_test.go @@ -5,6 +5,7 @@ package tracker // import "github.com/open-telemetry/opentelemetry-collector-con import ( "context" + "fmt" "math/rand/v2" "testing" @@ -13,6 +14,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" @@ -27,7 +29,7 @@ func TestFindFilesOrder(t *testing.T) { persister := testutil.NewUnscopedMockPersister() fpInStorage := populatedPersisterData(persister, fps) - tracker := NewFileTracker(componenttest.NewNopTelemetrySettings(), 0, 100, persister) + tracker := NewFileTracker(context.Background(), componenttest.NewNopTelemetrySettings(), 0, 100, persister) matchables := tracker.FindFiles(fps) require.Equal(t, len(fps), len(matchables), "return slice should be of same length as input slice") @@ -44,6 +46,111 @@ func TestFindFilesOrder(t *testing.T) { } } +func TestIndexInBounds(t *testing.T) { + persister := testutil.NewUnscopedMockPersister() + pollsToArchive := 100 + tracker := NewFileTracker(context.Background(), componenttest.NewNopTelemetrySettings(), 0, pollsToArchive, persister).(*fileTracker) + + // no index exists. archiveIndex should be 0 + require.Equal(t, 0, tracker.archiveIndex) + + // run archiving. Each time, index should be in bound. + for i := 0; i < 1099; i++ { + require.Equalf(t, i%pollsToArchive, tracker.archiveIndex, "Index should %d, but was %d", i%pollsToArchive, tracker.archiveIndex) + tracker.archive(&fileset.Fileset[*reader.Metadata]{}) + require.Truef(t, tracker.archiveIndex >= 0 && tracker.archiveIndex < pollsToArchive, "Index should be between 0 and %d, but was %d", pollsToArchive, tracker.archiveIndex) + } + oldIndex := tracker.archiveIndex + + // re-create archive + tracker = NewFileTracker(context.Background(), componenttest.NewNopTelemetrySettings(), 0, pollsToArchive, persister).(*fileTracker) + + // index should exist and new archiveIndex should be equal to oldIndex + require.Equalf(t, oldIndex, tracker.archiveIndex, "New index should %d, but was %d", oldIndex, tracker.archiveIndex) + + // re-create archive, with reduced pollsToArchive + pollsToArchive = 70 + tracker = NewFileTracker(context.Background(), componenttest.NewNopTelemetrySettings(), 0, pollsToArchive, persister).(*fileTracker) + + // index should exist but it is out of bounds. So it should reset to 0 + require.Equalf(t, 0, tracker.archiveIndex, "Index should be reset to 0 but was %d", tracker.archiveIndex) +} + +func TestArchiveRestoration(t *testing.T) { + pollsToArchiveGrid := []int{10, 20, 50, 100, 200} + for _, pollsToArchive := range pollsToArchiveGrid { + for _, newPollsToArchive := range pollsToArchiveGrid { + t.Run(fmt.Sprintf("%d-%d", pollsToArchive, newPollsToArchive), func(t *testing.T) { + testArchiveRestoration(t, pollsToArchive, newPollsToArchive) + }) + } + } +} + +func testArchiveRestoration(t *testing.T, pollsToArchive int, newPollsToArchive int) { + // test for various scenarios + // 0.25 menas archive is 25% filled + // 1.25 means archive is 125% filled (i.e it was rolled over once) + pctFilled := []float32{0.25, 0.5, 0.75, 1, 1.25, 1.50, 1.75, 2.00} + for _, pct := range pctFilled { + persister := testutil.NewUnscopedMockPersister() + tracker := NewFileTracker(context.Background(), componenttest.NewNopTelemetrySettings(), 0, pollsToArchive, persister).(*fileTracker) + iterations := int(pct * float32(pollsToArchive)) + for i := 0; i < iterations; i++ { + fileset := &fileset.Fileset[*reader.Metadata]{} + fileset.Add(&reader.Metadata{ + // for the sake of this test case. + // bigger the offset, more recent the element + Offset: int64(i), + }) + tracker.archive(fileset) + } + // make sure all keys are present in persister + for i := 0; i < iterations; i++ { + archiveIndex := i % pollsToArchive + val, err := persister.Get(context.Background(), archiveKey(archiveIndex)) + require.NoError(t, err) + require.NotNil(t, val) + } + // also, make sure we have not written "extra" stuff (for partially filled archive) + count := 0 + for i := 0; i < pollsToArchive; i++ { + val, err := persister.Get(context.Background(), archiveKey(i)) + require.NoError(t, err) + if val != nil { + count++ + } + } + require.Equal(t, min(iterations, pollsToArchive), count) + tracker = NewFileTracker(context.Background(), componenttest.NewNopTelemetrySettings(), 0, newPollsToArchive, persister).(*fileTracker) + if pollsToArchive > newPollsToArchive { + // if archive has shrunk, new archive should contain most recent elements + // start from most recent element + startIdx := mod(tracker.archiveIndex-1, newPollsToArchive) + mostRecentIteration := iterations - 1 + for i := 0; i < newPollsToArchive; i++ { + val, err := tracker.readArchive(startIdx) + require.NoError(t, err) + if val.Len() > 0 { + element, err := val.Pop() + require.NoError(t, err) + foundIteration := int(element.Offset) + require.Equal(t, mostRecentIteration, foundIteration) + } + mostRecentIteration-- + startIdx-- + } + + // make sure we've removed all extra keys + for i := newPollsToArchive; i < pollsToArchive; i++ { + val, err := persister.Get(context.Background(), archiveKey(newPollsToArchive)) + require.NoError(t, err) + require.Nil(t, val) + } + } + } +} + func populatedPersisterData(persister operator.Persister, fps []*fingerprint.Fingerprint) []bool { md := make([]*reader.Metadata, 0) diff --git a/pkg/stanza/fileconsumer/util_test.go b/pkg/stanza/fileconsumer/util_test.go index 69bb92ca26cd3..c1d97cd71a555 100644 --- a/pkg/stanza/fileconsumer/util_test.go +++ b/pkg/stanza/fileconsumer/util_test.go @@ -4,6 +4,7 @@ package fileconsumer import ( + "context" "testing" "github.com/stretchr/testify/require" @@ -22,7 +23,7 @@ func testManager(t *testing.T, cfg *Config, opts ...Option) (*Manager, *emittest func testManagerWithSink(t *testing.T, cfg *Config, sink *emittest.Sink, opts ...Option) *Manager { set := componenttest.NewNopTelemetrySettings() input, err := cfg.Build(set, sink.Callback, opts...) - input.tracker = tracker.NewFileTracker(set, cfg.MaxBatches, cfg.PollsToArchive, testutil.NewUnscopedMockPersister()) + input.tracker = tracker.NewFileTracker(context.Background(), set, cfg.MaxBatches, cfg.PollsToArchive, testutil.NewUnscopedMockPersister()) require.NoError(t, err) t.Cleanup(func() { input.tracker.ClosePreviousFiles() }) return input diff --git a/pkg/stanza/operator/persister.go b/pkg/stanza/operator/persister.go index 8d0784799eafc..d9521df2df7d0 100644 --- a/pkg/stanza/operator/persister.go +++ b/pkg/stanza/operator/persister.go @@ -6,6 +6,8 @@ package operator // import "github.com/open-telemetry/opentelemetry-collector-co import ( "context" "fmt" + + "go.opentelemetry.io/collector/extension/xextension/storage" ) // Persister is an interface used to persist data @@ -13,6 +15,7 @@ type Persister interface { Get(context.Context, string) ([]byte, error) Set(context.Context, string, []byte) error Delete(context.Context, string) error + Batch(ctx context.Context, ops ...*storage.Operation) error } type scopedPersister struct { @@ -38,3 +41,10 @@ func (p scopedPersister) Set(ctx context.Context, key string, value []byte) erro func (p scopedPersister) Delete(ctx context.Context, key string) error { return p.Persister.Delete(ctx, fmt.Sprintf("%s.%s", p.scope, key)) } + +func (p scopedPersister) Batch(ctx context.Context, ops ...*storage.Operation) error { + for _, op := range ops { + op.Key = fmt.Sprintf("%s.%s", p.scope, op.Key) + } + return p.Persister.Batch(ctx, ops...) +} diff --git a/pkg/stanza/testutil/util.go b/pkg/stanza/testutil/util.go index b7ee946d8f9eb..48a110bbf55d6 100644 --- a/pkg/stanza/testutil/util.go +++ b/pkg/stanza/testutil/util.go @@ -8,6 +8,8 @@ import ( "strings" "sync" + "go.opentelemetry.io/collector/extension/xextension/storage" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" ) @@ -46,6 +48,24 @@ func (p *mockPersister) Delete(_ context.Context, k string) error { return nil } +func (p *mockPersister) Batch(_ context.Context, ops ...*storage.Operation) error { + var err error + for _, op := range ops { + switch op.Type { + case storage.Get: + op.Value, err = p.Get(context.Background(), op.Key) + case storage.Set: + err = p.Set(context.Background(), op.Key, op.Value) + case storage.Delete: + err = p.Delete(context.Background(), op.Key) + } + if err != nil { + return err + } + } + return nil +} + // NewUnscopedMockPersister will return a new persister for testing func NewUnscopedMockPersister() operator.Persister { data := make(map[string][]byte)