-
Notifications
You must be signed in to change notification settings - Fork 477
Garbage collect shards in SQS Filesource #5339
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
68b5ac4
Garbage collect shards in SQS Filesource
rdettai bd08c0e
Run shard pruning in a background task
rdettai ee32b1c
Expose deduplication window to users
rdettai d9a45bd
Add integration test
rdettai b9a78cb
Rename cleanup interval config
rdettai eb938d6
Address smaller review comments
rdettai a51dc8c
Change strong_count to Weak
rdettai 8a657be
High level design
rdettai 94f54f3
Remove unpure iterators
rdettai 40fdca6
Rewrite time operation to rule out underflow
rdettai da53b90
Remove inappropiate unwrap and fix typo
rdettai 7958bdd
Refactor un-necessary deadline_for_last_extension paramter
rdettai ba49fa7
Add more details to design document
rdettai a600c3d
Clarify what checkpoint_messages does
rdettai File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
59 changes: 59 additions & 0 deletions
59
quickwit/quickwit-indexing/src/source/queue_sources/design.md
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
# Queue source design | ||
|
||
## Exactly once | ||
|
||
Besides the usual failures that can happen during indexing, most queues are also subject to duplicates. To ensure that all object files are indexed exactly once, we track the progress of their indexing using the shard table: | ||
- each file object is tracked as a shard, the file URI is the shard ID | ||
- progress made on the indexing of a given shard is committed in the shard table in a common transaction with the split publishing | ||
- after some time (called deduplication window) shards are garbage collected to keep the size of the shard table small | ||
|
||
## Visibility extension task | ||
|
||
To keep messages invisible to other pipelines while they are being processed, each received message spawns a visibility extension task. This task is responsible of extending the visibility timeout each time the visibility deadlines approaches. When the last batch is read for the message and sent to the indexing pipeline: | ||
- a last visibility extension is requested to give time for the indexing to complete (typically twice the commit timeout) | ||
- the visibility extension task stopped | ||
|
||
## Cleanup of old shards | ||
|
||
Garbage collection is owned by the queue based sources. Each pipeline with a queue source will spawn a garbage collection task. To avoid having an increased load on the metastore as the number of pipeline scales, garbage collection calls are debounced by the control plane. | ||
|
||
## Onboarding new queues | ||
|
||
This module is meant to be generic enough to: | ||
- use other queue implementations, such as GCP Pub/Sub | ||
- source the data from other sources than object storage, e.g directly from the message | ||
|
||
Note that because every single messages is tracked by the metastore, this design will not behave well with high message rates. For instance it is not meant to be efficient with a data stream where every message contains a single event. As a rule of thumb, to protect the metastore, it is discouraged to try processing more than 50 messages per second with this design. This means that high throughput can only be achieved with larger contents for each message (e.g larger files when the using the file source with queue notifications). | ||
|
||
## Implementation | ||
|
||
The `QueueCoordinator` is a concrete implementation of the machinery necessary to consume data from a queue, from the message reception to its acknowledgment after indexing. The `QueueCoordinator` interacts with 3 main components. | ||
|
||
### The `Queue` | ||
|
||
The `Queue` is an abstract interface that can represent any queue implementation (AWS SQS, Google Pub/Sub...). It is sufficient that the queue guaranties at least one delivery of its messages. The abstraction reduces the actual queue's API surface to 3 main functions: | ||
- receive messages that are ready to be processed | ||
- extend their visibility timeout, i.e delay the time at which a message is visible again to other consumers | ||
- acknowledge messages, i.e delete them definitively from the queue after successful indexing | ||
|
||
### The `QueueLocalState` | ||
|
||
The local state is an in memory data structure that keeps track of the knowledge that the current source has of recently received messages. It manages the transitions of messages between 4 states: | ||
- ready for read | ||
- read in progress | ||
- awaiting commit | ||
- completed | ||
|
||
|
||
### The `QueueSharedState` | ||
|
||
The shared state is a client of the Shard API, a metastore API that was mainly designed to serve ingest V2. The Shard API improves on the previous checkpoint API which was stored as a blob in one of the fields of the index model. The flow is the following one: | ||
|
||
The queue source opens a shard, using an ID that uniquely identifies the content of the message as shard ID. For the file source, the shard ID is the file URI. Each source has a unique publish token that is provided in the `OpenShards` metastore request. The response of the `OpenShards` requests returns the token of the caller that called the API first. Either: | ||
- The returned token matches the current pipeline's token. This means that we have the ownership of this message content and can proceed with its indexing | ||
- The returned token does not match the current pipeline's token. This means that another pipeline has the ownership. In that case, we look at the content of the shard: | ||
- if it's already completely processed (EOF), we acknowledge the message drop it | ||
- if its last update timestamp is old (e.g twice the commit timeout), we assume the processing of the content to be stale (e.g the owning pipeline failed). We perform an `AcquireShards` call to update the shard's token in the metastore with the local one. This indicates subsequent attempts to process the shard that this pipeline now has its ownership. Note though that this is subject to a race conditions: 2 pipelines might acquire the shard concurrently. In that case both pipelines will assume that it owns the shard, and one of them will fail at commit time. | ||
- if its last update timestamp is recent, we assume that the processing of the content is still in progress in another pipeline. We just drop the message (without any acknowledgment) and let it be re-processed once its visibility timeout expires. | ||
|
||
The `QueueSharedState` also owns the background task that will periodically initiate a call to `PruneShards` to garbage collect old shards. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.