diff --git a/docs/configuration/source-config.md b/docs/configuration/source-config.md index 83bdace6f96..d00157ead0b 100644 --- a/docs/configuration/source-config.md +++ b/docs/configuration/source-config.md @@ -56,6 +56,8 @@ Required fields for the SQS `notifications` parameter items: - `message_type`: format of the message payload, either - `s3_notification`: an [S3 event notification](https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventNotifications.html) - `raw_uri`: a message containing just the file object URI (e.g. `s3://mybucket/mykey`) + - `deduplication_window_duration_sec`: maximum duration for which ingested files checkpoints are kept (default 3600) + - `deduplication_window_max_messages`: maximum number of ingested file checkpoints kept (default 100k) *Adding a file source with SQS notifications to an index with the [CLI](../reference/cli.md#source)* @@ -82,6 +84,7 @@ EOF - the notification message could not be parsed (e.g it is not a valid S3 notification) - the file was not found - the file is corrupted (e.g unexpected compression) +- AWS S3 notifications and AWS SQS provide "at least once" delivery guaranties. To avoid duplicates, the file source includes a mechanism that prevents the same file from being ingested twice. It works by storing checkpoints in the metastore that track the indexing progress for each file. You can decrease `deduplication_window_duration_sec` and/or `deduplication_window_max_messages` to reduce the load on the metastore. ::: diff --git a/quickwit/quickwit-config/src/source_config/mod.rs b/quickwit/quickwit-config/src/source_config/mod.rs index b9fcaa15018..8fc7583735f 100644 --- a/quickwit/quickwit-config/src/source_config/mod.rs +++ b/quickwit/quickwit-config/src/source_config/mod.rs @@ -264,6 +264,18 @@ pub enum FileSourceMessageType { pub struct FileSourceSqs { pub queue_url: String, pub message_type: FileSourceMessageType, + #[serde(default = "default_deduplication_window_duration_sec")] + pub deduplication_window_duration_sec: u32, + #[serde(default = "default_deduplication_window_max_messages")] + pub deduplication_window_max_messages: u32, +} + +fn default_deduplication_window_duration_sec() -> u32 { + 3600 +} + +fn default_deduplication_window_max_messages() -> u32 { + 100_000 } #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)] @@ -891,12 +903,20 @@ mod tests { queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/queue-name" .to_string(), message_type: FileSourceMessageType::S3Notification, + deduplication_window_duration_sec: default_deduplication_window_duration_sec(), + deduplication_window_max_messages: default_deduplication_window_max_messages(), })), ); let file_params_reserialized = serde_json::to_value(&file_params_deserialized).unwrap(); assert_eq!( file_params_reserialized, - json!({"notifications": [{"type": "sqs", "queue_url": "https://sqs.us-east-1.amazonaws.com/123456789012/queue-name", "message_type": "s3_notification"}]}) + json!({"notifications": [{ + "type": "sqs", + "queue_url": "https://sqs.us-east-1.amazonaws.com/123456789012/queue-name", + "message_type": "s3_notification", + "deduplication_window_duration_sec": default_deduplication_window_duration_sec(), + "deduplication_window_max_messages": default_deduplication_window_max_messages() + }]}) ); } { diff --git a/quickwit/quickwit-indexing/src/source/file_source.rs b/quickwit/quickwit-indexing/src/source/file_source.rs index e3be553ae1b..a339f7cd677 100644 --- a/quickwit/quickwit-indexing/src/source/file_source.rs +++ b/quickwit/quickwit-indexing/src/source/file_source.rs @@ -435,6 +435,8 @@ mod localstack_tests { FileSourceParams::Notifications(FileSourceNotification::Sqs(FileSourceSqs { queue_url, message_type: FileSourceMessageType::RawUri, + deduplication_window_duration_sec: 100, + deduplication_window_max_messages: 100, })); let source_config = SourceConfig::for_test( "test-file-source-sqs-notifications", diff --git a/quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs b/quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs index 0d52ce8352c..459edfbb3f4 100644 --- a/quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs +++ b/quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs @@ -96,6 +96,8 @@ impl QueueCoordinator { source_runtime: SourceRuntime, queue: Arc, message_type: MessageType, + shard_max_age: Option, + shard_max_count: Option, ) -> Self { Self { shared_state: QueueSharedState { @@ -106,8 +108,8 @@ impl QueueCoordinator { 2 * source_runtime.indexing_setting.commit_timeout_secs as u64, ), last_initiated_pruning: Instant::now(), - max_age: None, - max_count: None, + max_age: shard_max_age, + max_count: shard_max_count, pruning_interval: Duration::from_secs(60), }, local_state: QueueLocalState::default(), @@ -141,6 +143,8 @@ impl QueueCoordinator { source_runtime, Arc::new(queue), message_type, + Some(config.deduplication_window_duration_sec), + Some(config.deduplication_window_max_messages), )) }