Skip to content

Commit

Permalink
Rename cleanup interval config
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Sep 5, 2024
1 parent b4d4ece commit d1885af
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 9 deletions.
3 changes: 2 additions & 1 deletion docs/configuration/source-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ Required fields for the SQS `notifications` parameter items:
- `raw_uri`: a message containing just the file object URI (e.g. `s3://mybucket/mykey`)
- `deduplication_window_duration_sec`: maximum duration for which ingested files checkpoints are kept (default 3600)
- `deduplication_window_max_messages`: maximum number of ingested file checkpoints kept (default 100k)
- `deduplication_cleanup_interval_secs`: frequency at which outdated file checkpoints are cleaned up

*Adding a file source with SQS notifications to an index with the [CLI](../reference/cli.md#source)*

Expand All @@ -84,7 +85,7 @@ EOF
- the notification message could not be parsed (e.g it is not a valid S3 notification)
- the file was not found
- the file is corrupted (e.g unexpected compression)
- AWS S3 notifications and AWS SQS provide "at least once" delivery guaranties. To avoid duplicates, the file source includes a mechanism that prevents the same file from being ingested twice. It works by storing checkpoints in the metastore that track the indexing progress for each file. You can decrease `deduplication_window_duration_sec` and/or `deduplication_window_max_messages` to reduce the load on the metastore.
- AWS S3 notifications and AWS SQS provide "at least once" delivery guaranties. To avoid duplicates, the file source includes a mechanism that prevents the same file from being ingested twice. It works by storing checkpoints in the metastore that track the indexing progress for each file. You can decrease `deduplication_window_*` or increase `deduplication_cleanup_interval_secs` to reduce the load on the metastore.

:::

Expand Down
11 changes: 6 additions & 5 deletions quickwit/quickwit-config/src/source_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,8 @@ pub struct FileSourceSqs {
pub deduplication_window_duration_secs: u32,
#[serde(default = "default_deduplication_window_max_messages")]
pub deduplication_window_max_messages: u32,
#[serde(default = "default_checkpoint_cleanup_interval_secs")]
pub checkpoint_cleanup_interval_secs: u32,
#[serde(default = "default_deduplication_cleanup_interval_secs")]
pub deduplication_cleanup_interval_secs: u32,
}

fn default_deduplication_window_duration_secs() -> u32 {
Expand All @@ -280,7 +280,7 @@ fn default_deduplication_window_max_messages() -> u32 {
100_000
}

fn default_checkpoint_cleanup_interval_secs() -> u32 {
fn default_deduplication_cleanup_interval_secs() -> u32 {
60
}

Expand Down Expand Up @@ -912,7 +912,8 @@ mod tests {
deduplication_window_duration_secs: default_deduplication_window_duration_secs(
),
deduplication_window_max_messages: default_deduplication_window_max_messages(),
checkpoint_cleanup_interval_secs: default_checkpoint_cleanup_interval_secs()
deduplication_cleanup_interval_secs:
default_deduplication_cleanup_interval_secs()
})),
);
let file_params_reserialized = serde_json::to_value(&file_params_deserialized).unwrap();
Expand All @@ -924,7 +925,7 @@ mod tests {
"message_type": "s3_notification",
"deduplication_window_duration_secs": default_deduplication_window_duration_secs(),
"deduplication_window_max_messages": default_deduplication_window_max_messages(),
"checkpoint_cleanup_interval_secs": default_checkpoint_cleanup_interval_secs(),
"deduplication_cleanup_interval_secs": default_deduplication_cleanup_interval_secs(),
}]})
);
}
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/source/file_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ mod localstack_tests {
message_type: FileSourceMessageType::RawUri,
deduplication_window_duration_secs: 100,
deduplication_window_max_messages: 100,
checkpoint_cleanup_interval_secs: 60,
deduplication_cleanup_interval_secs: 60,
}));
let source_config = SourceConfig::for_test(
"test-file-source-sqs-notifications",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl QueueCoordinator {
message_type,
Some(config.deduplication_window_duration_secs),
Some(config.deduplication_window_max_messages),
Duration::from_secs(config.checkpoint_cleanup_interval_secs as u64),
Duration::from_secs(config.deduplication_cleanup_interval_secs as u64),
))
}

Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-integration-tests/src/tests/sqs_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ async fn test_sqs_garbage_collect() {
queue_url: {}
message_type: raw_uri
deduplication_window_max_messages: 5
checkpoint_cleanup_interval_secs: 3
deduplication_cleanup_interval_secs: 3
input_format: plain_text
"#,
source_id, queue_url
Expand Down

0 comments on commit d1885af

Please sign in to comment.