Skip to content

fix: preserve processed_entries during stale buffer cleanup#13094

Open
grapestore wants to merge 2 commits intoapache:masterfrom
grapestore:fix/batch-processor-manager-stale-cleanup
Open

fix: preserve processed_entries during stale buffer cleanup#13094
grapestore wants to merge 2 commits intoapache:masterfrom
grapestore:fix/batch-processor-manager-stale-cleanup

Conversation

@grapestore
Copy link

Description

remove_stale_objects in batch-processor-manager.lua deletes stale buffers (empty entry_buffer and batch_to_process) every 1800 seconds. When a buffer is deleted, its processed_entries count is lost.

However, total_pushed_entries lives on the manager instance and accumulates permanently across the worker lifetime. The pending entry count is calculated as total_pushed_entries - total_processed_entries, where total_processed_entries() sums only living buffers' processed_entries.

This mismatch causes pending to grow unboundedly after repeated stale cleanups, eventually exceeding max_pending_entries. Once exceeded, both add_entry and add_entry_to_new_processor reject all new entries — no new processor can be created, so processed_entries stays at 0 permanently. All log entries are dropped until worker restart.

Reproduction (max_pending=10, 3 entries/cycle)

[cycle 1]  push 3 entries → flush complete
           pushed=3, processed=3
           → stale cleanup: buffer deleted → processed=3 lost

[cycle 2]  push 3 entries → new processor created
           pending = 3 - 0 = 3 < 10 → OK
           pushed=6, processed=3 (new buffer)
           → stale cleanup: buffer deleted → processed=3 lost

[cycle 3]  pending = 6 - 0 = 6 < 10 → OK
           pushed=9 → stale cleanup: buffer deleted

[cycle 4]  pending = 9 - 0 = 9 < 10 → OK
           pushed=12 → stale cleanup: buffer deleted

[cycle 5]  pending = 12 - 0 = 12 > 10
           → ❌ permanent drop, no recovery until worker restart

Fix

Add total_stale_processed_entries field to the manager instance. Before deleting a stale buffer, accumulate its processed_entries into this field. Use it as the base value in total_processed_entries().

Changes in 3 places:

  1. _M.new — initialize total_stale_processed_entries = 0
  2. remove_stale_objects — preserve processed_entries before buffer deletion
  3. total_processed_entries — start summation from total_stale_processed_entries

@dosubot dosubot bot added size:XS This PR changes 0-9 lines, ignoring generated files. bug Something isn't working labels Mar 18, 2026
Copy link
Contributor

@Baoyuantop Baoyuantop left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test case:

  1. Configure max_pending_entries and send several requests to generate processed_entries in the buffer.

  2. Trigger a stale cleanup using set_check_stale_interval(1) + ngx.sleep to remove the buffer.

  3. Send requests again to verify that the new requests are not incorrectly blocked by max_pending_entries (i.e., processed_entries are correctly preserved).

@dosubot dosubot bot added size:M This PR changes 30-99 lines, ignoring generated files. and removed size:XS This PR changes 0-9 lines, ignoring generated files. labels Mar 24, 2026
@Baoyuantop
Copy link
Contributor

Hi @grapestore, please fix the failed CI

@Baoyuantop
Copy link
Contributor

Hi @grapestore, code LGTM, after fixing the failed tests, we can proceed with the merge.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working size:M This PR changes 30-99 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants