Skip to content

Conversation

@belimawr
Copy link
Contributor

@belimawr belimawr commented Nov 21, 2025

Proposed commit message

Filestream could miss ingesting the last few lines of a file when the
following happened:
- The Harvester reaches EOF and stops on its backoff
- The inactive check that runs on its own go routine marks the file as
  inactive and cancels the reader/harvester context.
- The file watcher, that runs on its own goroutine, detects a change
  in file size and tries to start a new harvester. The file watcher
  updates its internal state of the file as its current size.
- The harvester fails to start because there is one already running
  (the one blocked on backoff wait).
- The backoff expires and the Harvester resumes running and exits
  right away.
- The file watcher has a state (size) for the file that is different
  than what was actually ingested, so it does not try to start a new
  harvester until there is another change in the file. 

This makes Filebeat to miss the last few lines added to the file.

This commit fixes this problem by making the harvester notify the file
watcher when it stops and the amount of data is has read. During the
scan the file watcher can replace its internal state by the harvester,
allowing it to start a new harvester if necessary.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in ./changelog/fragments using the changelog tool.

## Disruptive User Impact

Author's Checklist

How to test this PR locally

Run the tests

cd filebeat/input/filestream
go test -race -v -count=1 -tags=integration -run="TestFileWatcherNotifications|TestDataAddedAfterCloseInactive"
# Test the fixed race condition
go test -race -v -count=1 -tags=integration -run=TestRotatingCloseInactiveLargerWriteRate

Manual test

Testing this fix manually is possible, but requires you to monitor the
logs and add data to the file being ingested at a specific time.

At a very high level, the steps are:

  1. Start Filebeat
  2. Wait it to read the whole file
  3. While the harvester is blocked:
    1. Add more data to the file
    2. Wait for the file scanner to run and fail to start a new
      harvester
  4. Wait for the harvester to close
  5. Wait for the file scanner to run again
  6. Wait for a new harvester to start
  7. Wait the harvester to reach EOF
  8. Ensure the whole file has been ingested

If you ran this test without the fix from this PR, after #4
Filestream will not try to start any more harvesters for the file,
effectively missing the last few lines.


The best way to manually test this PR is to have two terminals open,
one running Filebeat and another ready to append data to the file
Filebeat is ingesting.

  1. Create a file with at least 1kb of data and write down its size

    flog -n 20 > /tmp/flog.log
    wc -c /tmp/flog.log
  2. Start Filebeat with following config:

    filebeat.yml

    filebeat.inputs:
      - type: filestream
        paths:
          - /tmp/flog.log
        prospector.scanner.check_interval: 4s
        close.on_state_change.inactive: 2s
        close.on_state_change.check_interval: 2s
        backoff:
          init: 15s
          max: 15s
    
    output.file:
      path: ${path.home}
      filename: output
    
    logging:
      level: debug
      to_stderr: true
      selectors:
        - input.file_watcher
        - input.filestream
        - input.scanner

    To make the logs easier to read, you can send the logs to stdout
    and pipe them through jq:

    go run . --path.home=$PWD 2>&1 | jq '{"ts": ."@timestamp", "lvl": ."log.level", "logger": ."log.logger", "m": .message}' -c
    
  3. Wait for the log entry: '/tmp/flog.log' is inactive

  4. Add data to the file flog -n 2 >> /tmp/flog.log

  5. Wait for the log entry: File /tmp/flog.log has been updated

  6. Wait for the log entry: Harvester already running

  7. Wait for the log entry: File is inactive. Closing. Path='/tmp/flog.log'

  8. Wait for the log entry: Stopped harvester for file

  9. Wait for the log entry: Updating previous state because harvester was closed. '/tmp/flog.log': xxx, where xxx is the original file size.

  10. Wait for the log entry: File /tmp/flog.log has been updated

  11. Wait for the log entry: Starting harvester for file

  12. Wait for the log entry: End of file reached: /tmp/flog.log; Backoff now.

  13. Ensure all events have been read: wc -l output*.ndjson.

Related issues

## Use cases
## Screenshots
## Logs

Benchmarks

Go Benchmark

This is likely not very relevant to the final form of this PR, but I ran some benchmarks comparing the different strategies to prevent the race condition when accessing the offset and lastTimeRead in the harvester, below are the results and the code

% go test -bench=BenchmarkOffset .   
goos: linux
goarch: amd64
pkg: github.com/elastic/beats/v7/filebeat/input/filestream
cpu: 11th Gen Intel(R) Core(TM) i9-11950H @ 2.60GHz
BenchmarkOffsetAndLastTimeRead/atomic-16                33326760                39.15 ns/op
BenchmarkOffsetAndLastTimeRead/mutex-16                 32199968                35.71 ns/op
BenchmarkOffsetAndLastTimeRead/race-16                  41037943                26.28 ns/op
PASS
ok      github.com/elastic/beats/v7/filebeat/input/filestream   13.290s

% go test -bench=BenchmarkOffset .
goos: linux
goarch: amd64
pkg: github.com/elastic/beats/v7/filebeat/input/filestream
cpu: 11th Gen Intel(R) Core(TM) i9-11950H @ 2.60GHz
BenchmarkOffsetAndLastTimeRead/atomic-16                34732719                37.35 ns/op
BenchmarkOffsetAndLastTimeRead/mutex-16                 33407095                35.86 ns/op
BenchmarkOffsetAndLastTimeRead/race-16                  45181587                25.54 ns/op
PASS
ok      github.com/elastic/beats/v7/filebeat/input/filestream   13.408s
filebeat/input/filestream/filestream_test.go

func BenchmarkOffsetAndLastTimeRead(b *testing.B) {
	a := benchAtomic{}
	b.Run("atomic", func(b *testing.B) {
		for b.Loop() {
			a.Inc()
		}
	})
	m := benchMutex{}
	b.Run("mutex", func(b *testing.B) {
		for b.Loop() {
			m.Inc()
		}
	})
	r := benchRace{}
	b.Run("race", func(b *testing.B) {
		for b.Loop() {
			r.Inc()
		}
	})
}

type benchAtomic struct {
	offset       atomic.Int64
	lastTimeRead atomic.Int64
}

func (b *benchAtomic) Inc() {
	b.offset.Add(42)
	b.lastTimeRead.Store(time.Now().UnixNano())
}

type benchMutex struct {
	mutex        sync.Mutex
	offset       int64
	lastTimeRead time.Time
}

func (b *benchMutex) Inc() {
	b.mutex.Lock()
	b.offset += 42
	b.lastTimeRead = time.Now()
	b.mutex.Unlock()
}

type benchRace struct {
	offset       int64
	lastTimeRead time.Time
}

func (b *benchRace) Inc() {
	b.offset += 42
	b.lastTimeRead = time.Now()
}

Benchbuilder

Latest release: v9.2.1

Suite Name Benchmark Name Version Duration (sec) EPS Bytes/Event
logs-datastream filestream-json-1024b-balanced 9.2.1 2m43.075351941s 12264.000000 175.3162
logs-datastream filestream-json-1024b-throughput 9.2.1 48.46343038s 41269.000000 183.1183
logs-datastream filestream-json-1024b-scale 9.2.1 2m47.897040994s 11912.000000 176.5148
logs-datastream filestream-json-1024b-latency 9.2.1 4m51.107096736s 6870.000000 178.5985

PR version

Suite Name Benchmark Name Version Duration (sec) EPS Bytes/Event
logs-datastream filestream-json-1024b-balanced 9.3.0 2m41.103916351s 12414.000000 175.3734
logs-datastream filestream-json-1024b-throughput 9.3.0 47.520195331s 42088.000000 182.5625
logs-datastream filestream-json-1024b-scale 9.3.0 2m44.102216849s 12188.000000 175.8389
logs-datastream filestream-json-1024b-latency 9.3.0 4m56.598482898s 6743.000000 179.3721

This is an manual backport of pull request #47247 done by @belimawr

@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Nov 21, 2025
@github-actions
Copy link
Contributor

🤖 GitHub comments

Just comment with:

  • run docs-build : Re-trigger the docs validation. (use unformatted text in the comment!)

@belimawr belimawr changed the title Backport/9.1/pr 47247 [8.19](backport #47247) [Filebeat/Filestream] Fix missing last few lines of a file Nov 21, 2025
@belimawr belimawr changed the base branch from main to 9.1 November 21, 2025 14:17
@mergify
Copy link
Contributor

mergify bot commented Nov 21, 2025

This pull request is now in conflicts. Could you fix it? 🙏
To fixup this pull request, you can check out it locally. See documentation: https://help.github.com/articles/checking-out-pull-requests-locally/

git fetch upstream
git checkout -b backport/9.1/pr-47247 upstream/backport/9.1/pr-47247
git merge upstream/main
git push upstream backport/9.1/pr-47247

@mergify
Copy link
Contributor

mergify bot commented Nov 21, 2025

This pull request does not have a backport label.
If this is a bug or security fix, could you label this PR @belimawr? 🙏.
For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)

To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-8./d is the label to automatically backport to the 8./d branch. /d is the digit
  • backport-active-all is the label that automatically backports to all active branches.
  • backport-active-8 is the label that automatically backports to all active minor branches for the 8 major.
  • backport-active-9 is the label that automatically backports to all active minor branches for the 9 major.

@belimawr belimawr added the Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team label Nov 21, 2025
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Nov 21, 2025
@belimawr belimawr added needs_team Indicates that the issue/PR needs a Team:* label backport-8.19 Automated backport to the 8.19 branch labels Nov 21, 2025
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Nov 21, 2025
@github-actions
Copy link
Contributor

🔍 Preview links for changed docs

More links …

In total, 1549 files changed.

@belimawr belimawr changed the title [8.19](backport #47247) [Filebeat/Filestream] Fix missing last few lines of a file [9.1](backport #47247) [Filebeat/Filestream] Fix missing last few lines of a file Nov 24, 2025
@belimawr belimawr marked this pull request as ready for review November 24, 2025 16:13
@belimawr belimawr requested review from a team as code owners November 24, 2025 16:13
@belimawr belimawr requested review from andrzej-stencel and rdner and removed request for a team November 24, 2025 16:13
@elasticmachine
Copy link
Contributor

Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backport-8.19 Automated backport to the 8.19 branch Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants