Skip to content

Commit 7c543ee

Browse files
committed
Rename cleanup interval config
1 parent e8ec45a commit 7c543ee

File tree

5 files changed

+11
-9
lines changed

5 files changed

+11
-9
lines changed

docs/configuration/source-config.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ Required fields for the SQS `notifications` parameter items:
5858
- `raw_uri`: a message containing just the file object URI (e.g. `s3://mybucket/mykey`)
5959
- `deduplication_window_duration_sec`: maximum duration for which ingested files checkpoints are kept (default 3600)
6060
- `deduplication_window_max_messages`: maximum number of ingested file checkpoints kept (default 100k)
61+
- `deduplication_cleanup_interval_secs`: frequency at which outdated file checkpoints are cleaned up
6162

6263
*Adding a file source with SQS notifications to an index with the [CLI](../reference/cli.md#source)*
6364

@@ -84,7 +85,7 @@ EOF
8485
- the notification message could not be parsed (e.g it is not a valid S3 notification)
8586
- the file was not found
8687
- the file is corrupted (e.g unexpected compression)
87-
- AWS S3 notifications and AWS SQS provide "at least once" delivery guaranties. To avoid duplicates, the file source includes a mechanism that prevents the same file from being ingested twice. It works by storing checkpoints in the metastore that track the indexing progress for each file. You can decrease `deduplication_window_duration_sec` and/or `deduplication_window_max_messages` to reduce the load on the metastore.
88+
- AWS S3 notifications and AWS SQS provide "at least once" delivery guaranties. To avoid duplicates, the file source includes a mechanism that prevents the same file from being ingested twice. It works by storing checkpoints in the metastore that track the indexing progress for each file. You can decrease `deduplication_window_*` or increase `deduplication_cleanup_interval_secs` to reduce the load on the metastore.
8889

8990
:::
9091

quickwit/quickwit-config/src/source_config/mod.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -268,8 +268,8 @@ pub struct FileSourceSqs {
268268
pub deduplication_window_duration_secs: u32,
269269
#[serde(default = "default_deduplication_window_max_messages")]
270270
pub deduplication_window_max_messages: u32,
271-
#[serde(default = "default_checkpoint_cleanup_interval_secs")]
272-
pub checkpoint_cleanup_interval_secs: u32,
271+
#[serde(default = "default_deduplication_cleanup_interval_secs")]
272+
pub deduplication_cleanup_interval_secs: u32,
273273
}
274274

275275
fn default_deduplication_window_duration_secs() -> u32 {
@@ -280,7 +280,7 @@ fn default_deduplication_window_max_messages() -> u32 {
280280
100_000
281281
}
282282

283-
fn default_checkpoint_cleanup_interval_secs() -> u32 {
283+
fn default_deduplication_cleanup_interval_secs() -> u32 {
284284
60
285285
}
286286

@@ -912,7 +912,8 @@ mod tests {
912912
deduplication_window_duration_secs: default_deduplication_window_duration_secs(
913913
),
914914
deduplication_window_max_messages: default_deduplication_window_max_messages(),
915-
checkpoint_cleanup_interval_secs: default_checkpoint_cleanup_interval_secs()
915+
deduplication_cleanup_interval_secs:
916+
default_deduplication_cleanup_interval_secs()
916917
})),
917918
);
918919
let file_params_reserialized = serde_json::to_value(&file_params_deserialized).unwrap();
@@ -924,7 +925,7 @@ mod tests {
924925
"message_type": "s3_notification",
925926
"deduplication_window_duration_secs": default_deduplication_window_duration_secs(),
926927
"deduplication_window_max_messages": default_deduplication_window_max_messages(),
927-
"checkpoint_cleanup_interval_secs": default_checkpoint_cleanup_interval_secs(),
928+
"deduplication_cleanup_interval_secs": default_deduplication_cleanup_interval_secs(),
928929
}]})
929930
);
930931
}

quickwit/quickwit-indexing/src/source/file_source.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ mod localstack_tests {
437437
message_type: FileSourceMessageType::RawUri,
438438
deduplication_window_duration_secs: 100,
439439
deduplication_window_max_messages: 100,
440-
checkpoint_cleanup_interval_secs: 60,
440+
deduplication_cleanup_interval_secs: 60,
441441
}));
442442
let source_config = SourceConfig::for_test(
443443
"test-file-source-sqs-notifications",

quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ impl QueueCoordinator {
143143
message_type,
144144
Some(config.deduplication_window_duration_secs),
145145
Some(config.deduplication_window_max_messages),
146-
Duration::from_secs(config.checkpoint_cleanup_interval_secs as u64),
146+
Duration::from_secs(config.deduplication_cleanup_interval_secs as u64),
147147
))
148148
}
149149

quickwit/quickwit-integration-tests/src/tests/sqs_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ async fn test_sqs_garbage_collect() {
191191
queue_url: {}
192192
message_type: raw_uri
193193
deduplication_window_max_messages: 5
194-
checkpoint_cleanup_interval_secs: 3
194+
deduplication_cleanup_interval_secs: 3
195195
input_format: plain_text
196196
"#,
197197
source_id, queue_url

0 commit comments

Comments
 (0)