Skip to content

Commit

Permalink
Expose deduplication window to users
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Sep 3, 2024
1 parent d79888b commit b680d1e
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 3 deletions.
3 changes: 3 additions & 0 deletions docs/configuration/source-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ Required fields for the SQS `notifications` parameter items:
- `message_type`: format of the message payload, either
- `s3_notification`: an [S3 event notification](https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventNotifications.html)
- `raw_uri`: a message containing just the file object URI (e.g. `s3://mybucket/mykey`)
- `deduplication_window_duration_sec`: maximum duration for which ingested files checkpoints are kept (default 3600)
- `deduplication_window_max_messages`: maximum number of ingested file checkpoints kept (default 100k)

*Adding a file source with SQS notifications to an index with the [CLI](../reference/cli.md#source)*

Expand All @@ -82,6 +84,7 @@ EOF
- the notification message could not be parsed (e.g it is not a valid S3 notification)
- the file was not found
- the file is corrupted (e.g unexpected compression)
- AWS S3 notifications and AWS SQS provide "at least once" delivery guaranties. To avoid duplicates, the file source includes a mechanism that prevents the same file from being ingested twice. It works by storing checkpoints in the metastore that track the indexing progress for each file. You can decrease `deduplication_window_duration_sec` and/or `deduplication_window_max_messages` to reduce the load on the metastore.

:::

Expand Down
22 changes: 21 additions & 1 deletion quickwit/quickwit-config/src/source_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,18 @@ pub enum FileSourceMessageType {
pub struct FileSourceSqs {
pub queue_url: String,
pub message_type: FileSourceMessageType,
#[serde(default = "default_deduplication_window_duration_sec")]
pub deduplication_window_duration_sec: u32,
#[serde(default = "default_deduplication_window_max_messages")]
pub deduplication_window_max_messages: u32,
}

fn default_deduplication_window_duration_sec() -> u32 {
3600
}

fn default_deduplication_window_max_messages() -> u32 {
100_000
}

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
Expand Down Expand Up @@ -891,12 +903,20 @@ mod tests {
queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/queue-name"
.to_string(),
message_type: FileSourceMessageType::S3Notification,
deduplication_window_duration_sec: default_deduplication_window_duration_sec(),
deduplication_window_max_messages: default_deduplication_window_max_messages(),
})),
);
let file_params_reserialized = serde_json::to_value(&file_params_deserialized).unwrap();
assert_eq!(
file_params_reserialized,
json!({"notifications": [{"type": "sqs", "queue_url": "https://sqs.us-east-1.amazonaws.com/123456789012/queue-name", "message_type": "s3_notification"}]})
json!({"notifications": [{
"type": "sqs",
"queue_url": "https://sqs.us-east-1.amazonaws.com/123456789012/queue-name",
"message_type": "s3_notification",
"deduplication_window_duration_sec": default_deduplication_window_duration_sec(),
"deduplication_window_max_messages": default_deduplication_window_max_messages()
}]})
);
}
{
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-indexing/src/source/file_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,8 @@ mod localstack_tests {
FileSourceParams::Notifications(FileSourceNotification::Sqs(FileSourceSqs {
queue_url,
message_type: FileSourceMessageType::RawUri,
deduplication_window_duration_sec: 100,
deduplication_window_max_messages: 100,
}));
let source_config = SourceConfig::for_test(
"test-file-source-sqs-notifications",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ impl QueueCoordinator {
source_runtime: SourceRuntime,
queue: Arc<dyn Queue>,
message_type: MessageType,
shard_max_age: Option<u32>,
shard_max_count: Option<u32>,
) -> Self {
Self {
shared_state: QueueSharedState {
Expand All @@ -106,8 +108,8 @@ impl QueueCoordinator {
2 * source_runtime.indexing_setting.commit_timeout_secs as u64,
),
last_initiated_pruning: Instant::now(),
max_age: None,
max_count: None,
max_age: shard_max_age,
max_count: shard_max_count,
pruning_interval: Duration::from_secs(60),
},
local_state: QueueLocalState::default(),
Expand Down Expand Up @@ -141,6 +143,8 @@ impl QueueCoordinator {
source_runtime,
Arc::new(queue),
message_type,
Some(config.deduplication_window_duration_sec),
Some(config.deduplication_window_max_messages),
))
}

Expand Down

0 comments on commit b680d1e

Please sign in to comment.