diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml
index 0034ac81956..00e495bf99f 100644
--- a/.github/workflows/coverage.yml
+++ b/.github/workflows/coverage.yml
@@ -37,7 +37,7 @@ jobs:
- "4571:4571"
- "8080:8080"
env:
- SERVICES: kinesis,s3
+ SERVICES: kinesis,s3,sqs
options: >-
--health-cmd "curl -k https://localhost:4566"
--health-interval 10s
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index a5e6b2bdeeb..e6af8704d66 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -29,7 +29,7 @@ When you submit a pull request to the project, the CI system runs several verifi
You will be notified by email from the CI system if any issues are discovered, but if you want to run these checks locally before submitting PR or in order to verify changes you can use the following commands in the root directory:
1. To verify that all tests are passing, run `make test-all`.
2. To fix code style and format as well as catch common mistakes run `make fix`. Alternatively, run `make -k test-all docker-compose-down` to tear down the Docker services after running all the tests.
-3. To build docs run `make build-docs`.
+3. To build docs run `make build-rustdoc`.
# Development
@@ -58,7 +58,7 @@ Run `make test-all` to run all tests.
* `make fmt` - runs formatter, this command requires the nightly toolchain to be installed by running `rustup toolchain install nightly`.
* `make fix` - runs formatter and clippy checks.
* `make typos` - runs the spellcheck tool over the codebase. (Install by running `cargo install typos-cli`)
-* `make docs` - builds docs.
+* `make doc` - builds docs.
* `make docker-compose-up` - starts Docker services.
* `make docker-compose-down` - stops Docker services.
* `make docker-compose-logs` - shows Docker logs.
diff --git a/distribution/lambda/README.md b/distribution/lambda/README.md
index 48db36d878c..4ed5143831b 100644
--- a/distribution/lambda/README.md
+++ b/distribution/lambda/README.md
@@ -95,6 +95,12 @@ simplify the setup and avoid unstable deployments.
[1]: https://rust-lang-nursery.github.io/rust-cookbook/development_tools/debugging/config_log.html
+> [!TIP]
+> The Indexer Lambda's logging is quite verbose. To reduce the associated
+> CloudWatch costs, you can disable some lower level logs by setting the
+> `RUST_LOG` environment variable to `info,quickwit_actors=warn`, or disable
+> INFO logs altogether by setting `RUST_LOG=warn`.
+
Indexer only:
| Variable | Description | Default |
|---|---|---|
@@ -151,7 +157,13 @@ You can query and visualize the Quickwit Searcher Lambda from Grafana by using t
#### Configure Grafana data source
-You need to provide the following information.
+If you don't have a Grafana instance running yet, you can start one with the Quickwit plugin installed using Docker:
+
+```bash
+docker run -e GF_INSTALL_PLUGINS="quickwit-quickwit-datasource" -p 3000:3000 grafana/grafana
+```
+
+In the `Connections > Data sources` page, add a new Quickwit data source and configure the following settings:
|Variable|Description|Example|
|--|--|--|
@@ -159,4 +171,4 @@ You need to provide the following information.
|Custom HTTP Headers| If you configure API Gateway to require an API key, set `x-api-key` HTTP Header | Header: `x-api-key`
Value: API key value|
|Index ID| Same as `QW_LAMBDA_INDEX_ID` | hdfs-logs |
-After entering these values, click "Save & test" and you can now query your Quickwit Lambda from Grafana!
+After entering these values, click "Save & test". You can now query your Quickwit Lambda from Grafana!
diff --git a/distribution/lambda/cdk/cli.py b/distribution/lambda/cdk/cli.py
index c18fd14f289..ecb3ffdb155 100644
--- a/distribution/lambda/cdk/cli.py
+++ b/distribution/lambda/cdk/cli.py
@@ -320,14 +320,16 @@ def _clean_s3_bucket(bucket_name: str, prefix: str = ""):
print(f"Cleaning up bucket {bucket_name}/{prefix}...")
s3 = session.resource("s3")
bucket = s3.Bucket(bucket_name)
- bucket.objects.filter(Prefix=prefix).delete()
+ try:
+ bucket.objects.filter(Prefix=prefix).delete()
+ except s3.meta.client.exceptions.NoSuchBucket:
+ print(f"Bucket {bucket_name} not found, skipping cleanup")
def empty_hdfs_bucket():
bucket_name = _get_cloudformation_output_value(
app.HDFS_STACK_NAME, hdfs_stack.INDEX_STORE_BUCKET_NAME_EXPORT_NAME
)
-
_clean_s3_bucket(bucket_name)
diff --git a/distribution/lambda/cdk/stacks/examples/mock_data_stack.py b/distribution/lambda/cdk/stacks/examples/mock_data_stack.py
index 027b8afb98c..8a4a5c9290b 100644
--- a/distribution/lambda/cdk/stacks/examples/mock_data_stack.py
+++ b/distribution/lambda/cdk/stacks/examples/mock_data_stack.py
@@ -165,7 +165,12 @@ def __init__(
index_id=index_id,
index_config_bucket=index_config.s3_bucket_name,
index_config_key=index_config.s3_object_key,
- indexer_environment=lambda_env,
+ indexer_environment={
+ # the actor system is very verbose when the source is shutting
+ # down (each Lambda invocation)
+ "RUST_LOG": "info,quickwit_actors=warn",
+ **lambda_env,
+ },
searcher_environment=lambda_env,
indexer_package_location=indexer_package_location,
searcher_package_location=searcher_package_location,
diff --git a/docker-compose.yml b/docker-compose.yml
index 58b9d8b99f7..0667d9ac434 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -27,7 +27,7 @@ networks:
services:
localstack:
- image: localstack/localstack:${LOCALSTACK_VERSION:-2.3.2}
+ image: localstack/localstack:${LOCALSTACK_VERSION:-3.5.0}
container_name: localstack
ports:
- "${MAP_HOST_LOCALSTACK:-127.0.0.1}:4566:4566"
@@ -37,7 +37,7 @@ services:
- all
- localstack
environment:
- SERVICES: kinesis,s3
+ SERVICES: kinesis,s3,sqs
PERSISTENCE: 1
volumes:
- .localstack:/etc/localstack/init/ready.d
diff --git a/docs/assets/sqs-file-source.tf b/docs/assets/sqs-file-source.tf
new file mode 100644
index 00000000000..ffd348c1193
--- /dev/null
+++ b/docs/assets/sqs-file-source.tf
@@ -0,0 +1,134 @@
+terraform {
+ required_version = "1.7.5"
+ required_providers {
+ aws = {
+ source = "hashicorp/aws"
+ version = "~> 5.39.1"
+ }
+ }
+}
+
+provider "aws" {
+ region = "us-east-1"
+ default_tags {
+ tags = {
+ provisioner = "terraform"
+ author = "Quickwit"
+ }
+ }
+}
+
+locals {
+ sqs_notification_queue_name = "qw-tuto-s3-event-notifications"
+ source_bucket_name = "qw-tuto-source-bucket"
+}
+
+resource "aws_s3_bucket" "file_source" {
+ bucket_prefix = local.source_bucket_name
+ force_destroy = true
+}
+
+data "aws_iam_policy_document" "sqs_notification" {
+ statement {
+ effect = "Allow"
+
+ principals {
+ type = "*"
+ identifiers = ["*"]
+ }
+
+ actions = ["sqs:SendMessage"]
+ resources = ["arn:aws:sqs:*:*:${local.sqs_notification_queue_name}"]
+
+ condition {
+ test = "ArnEquals"
+ variable = "aws:SourceArn"
+ values = [aws_s3_bucket.file_source.arn]
+ }
+ }
+}
+
+
+resource "aws_sqs_queue" "s3_events" {
+ name = local.sqs_notification_queue_name
+ policy = data.aws_iam_policy_document.sqs_notification.json
+
+ redrive_policy = jsonencode({
+ deadLetterTargetArn = aws_sqs_queue.s3_events_deadletter.arn
+ maxReceiveCount = 5
+ })
+}
+
+resource "aws_sqs_queue" "s3_events_deadletter" {
+ name = "${locals.sqs_notification_queue_name}-deadletter"
+}
+
+resource "aws_sqs_queue_redrive_allow_policy" "s3_events_deadletter" {
+ queue_url = aws_sqs_queue.s3_events_deadletter.id
+
+ redrive_allow_policy = jsonencode({
+ redrivePermission = "byQueue",
+ sourceQueueArns = [aws_sqs_queue.s3_events.arn]
+ })
+}
+
+resource "aws_s3_bucket_notification" "bucket_notification" {
+ bucket = aws_s3_bucket.file_source.id
+
+ queue {
+ queue_arn = aws_sqs_queue.s3_events.arn
+ events = ["s3:ObjectCreated:*"]
+ }
+}
+
+data "aws_iam_policy_document" "quickwit_node" {
+ statement {
+ effect = "Allow"
+ actions = [
+ "sqs:ReceiveMessage",
+ "sqs:DeleteMessage",
+ "sqs:ChangeMessageVisibility",
+ "sqs:GetQueueAttributes",
+ ]
+ resources = [aws_sqs_queue.s3_events.arn]
+ }
+ statement {
+ effect = "Allow"
+ actions = ["s3:GetObject"]
+ resources = ["${aws_s3_bucket.file_source.arn}/*"]
+ }
+}
+
+resource "aws_iam_user" "quickwit_node" {
+ name = "quickwit-filesource-tutorial"
+ path = "/system/"
+}
+
+resource "aws_iam_user_policy" "quickwit_node" {
+ name = "quickwit-filesource-tutorial"
+ user = aws_iam_user.quickwit_node.name
+ policy = data.aws_iam_policy_document.quickwit_node.json
+}
+
+resource "aws_iam_access_key" "quickwit_node" {
+ user = aws_iam_user.quickwit_node.name
+}
+
+output "source_bucket_name" {
+ value = aws_s3_bucket.file_source.bucket
+
+}
+
+output "notification_queue_url" {
+ value = aws_sqs_queue.s3_events.id
+}
+
+output "quickwit_node_access_key_id" {
+ value = aws_iam_access_key.quickwit_node.id
+ sensitive = true
+}
+
+output "quickwit_node_secret_access_key" {
+ value = aws_iam_access_key.quickwit_node.secret
+ sensitive = true
+}
diff --git a/docs/configuration/source-config.md b/docs/configuration/source-config.md
index 479c97e2365..83bdace6f96 100644
--- a/docs/configuration/source-config.md
+++ b/docs/configuration/source-config.md
@@ -29,15 +29,62 @@ The source type designates the kind of source being configured. As of version 0.
The source parameters indicate how to connect to a data store and are specific to the source type.
-### File source (CLI only)
+### File source
-A file source reads data from a local file. The file must consist of JSON objects separated by a newline (NDJSON).
-As of version 0.5, a file source can only be ingested with the [CLI command](/docs/reference/cli.md#tool-local-ingest). Compressed files (bz2, gzip, ...) and remote files (Amazon S3, HTTP, ...) are not supported.
+A file source reads data from files containing JSON objects separated by newlines (NDJSON). Gzip compression is supported provided that the file name ends with the `.gz` suffix.
+
+#### Ingest a single file (CLI only)
+
+To ingest a specific file, run the indexing directly in an adhoc CLI process with:
+
+```bash
+./quickwit tool local-ingest --index --input-path
+```
+
+Both local and object files are supported, provided that the environment is configured with the appropriate permissions. A tutorial is available [here](/docs/ingest-data/ingest-local-file.md).
+
+#### Notification based file ingestion (beta)
+
+Quickwit can automatically ingest all new files that are uploaded to an S3 bucket. This requires creating and configuring an [SQS notification queue](https://docs.aws.amazon.com/AmazonS3/latest/userguide/ways-to-add-notification-config-to-bucket.html). A complete example can be found [in this tutorial](/docs/ingest-data/sqs-files.md).
+
+
+The `notifications` parameter takes an array of notification settings. Currently one notifier can be configured per source and only the SQS notification `type` is supported.
+
+Required fields for the SQS `notifications` parameter items:
+- `type`: `sqs`
+- `queue_url`: complete URL of the SQS queue (e.g `https://sqs.us-east-1.amazonaws.com/123456789012/queue-name`)
+- `message_type`: format of the message payload, either
+ - `s3_notification`: an [S3 event notification](https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventNotifications.html)
+ - `raw_uri`: a message containing just the file object URI (e.g. `s3://mybucket/mykey`)
+
+*Adding a file source with SQS notifications to an index with the [CLI](../reference/cli.md#source)*
```bash
-./quickwit tool local-ingest --input-path
+cat << EOF > source-config.yaml
+version: 0.8
+source_id: my-sqs-file-source
+source_type: file
+num_pipelines: 2
+params:
+ notifications:
+ - type: sqs
+ queue_url: https://sqs.us-east-1.amazonaws.com/123456789012/queue-name
+ message_type: s3_notification
+EOF
+./quickwit source create --index my-index --source-config source-config.yaml
```
+:::note
+
+- Quickwit does not automatically delete the source files after a successful ingestion. You can use [S3 object expiration](https://docs.aws.amazon.com/AmazonS3/latest/userguide/lifecycle-expire-general-considerations.html) to configure how long they should be retained in the bucket.
+- Configure the notification to only forward events of type `s3:ObjectCreated:*`. Other events are acknowledged by the source without further processing and an warning is logged.
+- We strongly recommend using a [dead letter queue](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html) to receive all messages that couldn't be processed by the file source. A `maxReceiveCount` of 5 is a good default value. Here are some common situations where the notification message ends up in the dead letter queue:
+ - the notification message could not be parsed (e.g it is not a valid S3 notification)
+ - the file was not found
+ - the file is corrupted (e.g unexpected compression)
+
+:::
+
### Ingest API source
An ingest API source reads data from the [Ingest API](/docs/reference/rest-api.md#ingest-data-into-an-index). This source is automatically created at the index creation and cannot be deleted nor disabled.
diff --git a/docs/deployment/kubernetes/gke.md b/docs/deployment/kubernetes/gke.md
index d2b43cc5cc3..7c821779aa9 100644
--- a/docs/deployment/kubernetes/gke.md
+++ b/docs/deployment/kubernetes/gke.md
@@ -65,6 +65,10 @@ image:
pullPolicy: Always
tag: edge
+serviceAccount:
+ create: false
+ name: quickwit-sa
+
config:
default_index_root_uri: gs://{BUCKET}/qw-indexes
metastore_uri: gs://{BUCKET}/qw-indexes
diff --git a/docs/get-started/tutorials/tutorial-hdfs-logs.md b/docs/get-started/tutorials/tutorial-hdfs-logs.md
index dd544e1c652..23a941081ee 100644
--- a/docs/get-started/tutorials/tutorial-hdfs-logs.md
+++ b/docs/get-started/tutorials/tutorial-hdfs-logs.md
@@ -80,7 +80,7 @@ curl -o hdfs_logs_index_config.yaml https://raw.githubusercontent.com/quickwit-o
The index config defines five fields: `timestamp`, `tenant_id`, `severity_text`, `body`, and one JSON field
for the nested values `resource.service`, we could use an object field here and maintain a fixed schema, but for convenience we're going to use a JSON field.
It also sets the `default_search_fields`, the `tag_fields`, and the `timestamp_field`.
-The `timestamp_field` and `tag_fields` are used by Quickwit for [splits pruning](../../overview/architecture) at query time to boost search speed.
+The `timestamp_field` and `tag_fields` are used by Quickwit for [splits pruning](../../overview/concepts/querying.md#time-sharding) at query time to boost search speed.
Check out the [index config docs](../../configuration/index-config) for more details.
```yaml title="hdfs-logs-index.yaml"
diff --git a/docs/ingest-data/ingest-local-file.md b/docs/ingest-data/ingest-local-file.md
index 2a5b1bced03..6eb37e7c3eb 100644
--- a/docs/ingest-data/ingest-local-file.md
+++ b/docs/ingest-data/ingest-local-file.md
@@ -72,6 +72,12 @@ Clearing local cache directory...
✔ Documents successfully indexed.
```
+:::tip
+
+Object store URIs like `s3://mybucket/mykey.json` are also supported as `--input-path`, provided that your environment is configured with the appropriate permissions.
+
+:::
+
## Tear down resources (optional)
That's it! You can now tear down the resources you created. You can do so by running the following command:
diff --git a/docs/ingest-data/sqs-files.md b/docs/ingest-data/sqs-files.md
new file mode 100644
index 00000000000..ebca49629d7
--- /dev/null
+++ b/docs/ingest-data/sqs-files.md
@@ -0,0 +1,248 @@
+---
+title: S3 with SQS notifications
+description: A short tutorial describing how to set up Quickwit to ingest data from S3 files using an SQS notifier
+tags: [s3, sqs, integration]
+icon_url: /img/tutorials/file-ndjson.svg
+sidebar_position: 5
+---
+
+In this tutorial, we describe how to set up Quickwit to ingest data from S3
+with bucket notification events flowing through SQS. We will first create the
+AWS resources (S3 bucket, SQS queue, notifications) using terraform. We will
+then configure the Quickwit index and file source. Finally we will send some
+data to the source bucket and verify that it gets indexed.
+
+## AWS resources
+
+The complete terraform script can be downloaded [here](../assets/sqs-file-source.tf).
+
+First, create the bucket that will receive the source data files (NDJSON format):
+
+```
+resource "aws_s3_bucket" "file_source" {
+ bucket_prefix = "qw-tuto-source-bucket"
+}
+```
+
+Then setup the SQS queue that will carry the notifications when files are added
+to the bucket. The queue is configured with a policy that allows the source
+bucket to write the S3 notification messages to it. Also create a dead letter
+queue (DLQ) to receive the messages that couldn't be processed by the file
+source (e.g corrupted files). Messages are moved to the DLQ after 5 indexing
+attempts.
+
+```
+locals {
+ sqs_notification_queue_name = "qw-tuto-s3-event-notifications"
+}
+
+data "aws_iam_policy_document" "sqs_notification" {
+ statement {
+ effect = "Allow"
+
+ principals {
+ type = "*"
+ identifiers = ["*"]
+ }
+
+ actions = ["sqs:SendMessage"]
+ resources = ["arn:aws:sqs:*:*:${local.sqs_notification_queue_name}"]
+
+ condition {
+ test = "ArnEquals"
+ variable = "aws:SourceArn"
+ values = [aws_s3_bucket.file_source.arn]
+ }
+ }
+}
+
+resource "aws_sqs_queue" "s3_events_deadletter" {
+ name = "${locals.sqs_notification_queue_name}-deadletter"
+}
+
+resource "aws_sqs_queue" "s3_events" {
+ name = local.sqs_notification_queue_name
+ policy = data.aws_iam_policy_document.sqs_notification.json
+
+ redrive_policy = jsonencode({
+ deadLetterTargetArn = aws_sqs_queue.s3_events_deadletter.arn
+ maxReceiveCount = 5
+ })
+}
+
+resource "aws_sqs_queue_redrive_allow_policy" "s3_events_deadletter" {
+ queue_url = aws_sqs_queue.s3_events_deadletter.id
+
+ redrive_allow_policy = jsonencode({
+ redrivePermission = "byQueue",
+ sourceQueueArns = [aws_sqs_queue.s3_events.arn]
+ })
+}
+```
+
+Configure the bucket notification that writes messages to SQS each time a new
+file is created in the source bucket:
+
+```
+resource "aws_s3_bucket_notification" "bucket_notification" {
+ bucket = aws_s3_bucket.file_source.id
+
+ queue {
+ queue_arn = aws_sqs_queue.s3_events.arn
+ events = ["s3:ObjectCreated:*"]
+ }
+}
+```
+
+:::note
+
+Only events of type `s3:ObjectCreated:*` are supported. Other types (e.g.
+`ObjectRemoved`) are acknowledged and a warning is logged.
+
+:::
+
+The source needs to have access to both the notification queue and the source
+bucket. The following policy document contains the minimum permissions required
+by the source:
+
+```
+data "aws_iam_policy_document" "quickwit_node" {
+ statement {
+ effect = "Allow"
+ actions = [
+ "sqs:ReceiveMessage",
+ "sqs:DeleteMessage",
+ "sqs:ChangeMessageVisibility",
+ "sqs:GetQueueAttributes",
+ ]
+ resources = [aws_sqs_queue.s3_events.arn]
+ }
+ statement {
+ effect = "Allow"
+ actions = ["s3:GetObject"]
+ resources = ["${aws_s3_bucket.file_source.arn}/*"]
+ }
+}
+```
+
+Create the IAM user and credentials that will be used to
+associate this policy to your local Quickwit instance:
+
+```
+resource "aws_iam_user" "quickwit_node" {
+ name = "quickwit-filesource-tutorial"
+ path = "/system/"
+}
+
+resource "aws_iam_user_policy" "quickwit_node" {
+ name = "quickwit-filesource-tutorial"
+ user = aws_iam_user.quickwit_node.name
+ policy = data.aws_iam_policy_document.quickwit_node.json
+}
+
+resource "aws_iam_access_key" "quickwit_node" {
+ user = aws_iam_user.quickwit_node.name
+}
+```
+
+
+:::warning
+
+We don't recommend using IAM user credentials for running Quickwit nodes in
+production. This is just a simplified setup for the sake of the tutorial. When
+running on EC2/ECS, attach the policy document to an IAM roles instead.
+
+:::
+
+Download the [complete terraform script](../assets/sqs-file-source.tf) and
+deploy it using `terraform init` and `terraform apply`. After a successful
+execution, the outputs required to configure Quickwit will be listed. You can
+display the values of the sensitive outputs (key id and secret key) with:
+
+
+```bash
+terraform output quickwit_node_access_key_id
+terraform output quickwit_node_secret_access_key
+```
+
+## Run Quickwit
+
+[Install Quickwit locally](/docs/get-started/installation), then in your install
+directory, run Quickwit with the necessary access rights by replacing the
+`` and `` with the
+matching Terraform output values:
+
+```bash
+AWS_ACCESS_KEY_ID= \
+AWS_SECRET_ACCESS_KEY= \
+AWS_REGION=us-east-1 \
+./quickwit run
+```
+
+## Configure the index and the source
+
+In another terminal, in the Quickwit install directory, create an index:
+
+```bash
+cat << EOF > tutorial-sqs-file-index.yaml
+version: 0.7
+index_id: tutorial-sqs-file
+doc_mapping:
+ mode: dynamic
+indexing_settings:
+ commit_timeout_secs: 30
+EOF
+
+./quickwit index create --index-config tutorial-sqs-file-index.yaml
+```
+
+Replacing `` with the corresponding Terraform output
+value, create a file source for that index:
+
+```bash
+cat << EOF > tutorial-sqs-file-source.yaml
+version: 0.8
+source_id: sqs-filesource
+source_type: file
+num_pipelines: 2
+params:
+ notifications:
+ - type: sqs
+ queue_url:
+ message_type: s3_notification
+EOF
+
+./quickwit source create --index tutorial-sqs-file --source-config tutorial-sqs-file-source.yaml
+```
+
+:::tip
+
+The `num_pipeline` configuration controls how many consumers will poll from the queue in parallel. Choose the number according to the indexer compute resources you want to dedicate to this source. As a rule of thumb, configure 1 pipeline for every 2 cores.
+
+:::
+
+## Ingest data
+
+We can now ingest data into Quickwit by uploading files to S3. If you have the
+AWS CLI installed, run the following command, replacing ``
+with the associated Terraform output:
+
+```bash
+curl https://quickwit-datasets-public.s3.amazonaws.com/hdfs-logs-multitenants-10000.json | \
+ aws s3 cp - s3:///hdfs-logs-multitenants-10000.json
+```
+
+If you prefer not to use the AWS CLI, you can also download the file and upload
+it manually to the source bucket using the AWS console.
+
+Wait approximately 1 minute and the data should appear in the index:
+
+```bash
+./quickwit index describe --index tutorial-sqs-file
+```
+
+## Tear down the resources
+
+The AWS resources instantiated in this tutorial don't incur any fixed costs, but
+we still recommend deleting them when you are done. In the directory with the
+Terraform script, run `terraform destroy`.
diff --git a/docs/overview/concepts/indexing.md b/docs/overview/concepts/indexing.md
index 4feba085b41..6dab81f392a 100644
--- a/docs/overview/concepts/indexing.md
+++ b/docs/overview/concepts/indexing.md
@@ -30,16 +30,7 @@ The disk space allocated to the split store is controlled by the config paramete
## Data sources
-A data source designates the location and set of parameters that allow to connect to and ingest data from an external data store, which can be a file, a stream, or a database. Often, Quickwit simply refers to data sources as "sources". The indexing engine supports file-based and stream-based sources. Quickwit can insert data into an index from one or multiple sources, defined in the index config.
-
-
-### File sources
-
-File sources are sources that read data from a file stored on the local file system.
-
-### Streaming sources
-
-Streaming sources are sources that read data from a streaming service such as Apache Kafka. As of version 0.2, Quickwit only supports Apache Kafka. Future versions of Quickwit will support additional streaming services such as Amazon Kinesis.
+A data source designates the location and set of parameters that allow to connect to and ingest data from an external data store, which can be a file, a stream, or a database. Often, Quickwit simply refers to data sources as "sources". The indexing engine supports local adhoc file ingests using [the CLI](/docs/reference/cli#tool-local-ingest) and streaming sources (e.g. the Kafka source). Quickwit can insert data into an index from one or multiple sources. More details can be found [in the source configuration page](https://quickwit.io/docs/configuration/source-config).
## Checkpoint
diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock
index 0db9bbed0de..a8d34e1406a 100644
--- a/quickwit/Cargo.lock
+++ b/quickwit/Cargo.lock
@@ -458,6 +458,28 @@ dependencies = [
"url",
]
+[[package]]
+name = "aws-sdk-sqs"
+version = "1.36.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3587fbaf540d65337c2356ebf3f78fba160025b3d69634175f1ea3a7895738e9"
+dependencies = [
+ "aws-credential-types",
+ "aws-runtime",
+ "aws-smithy-async",
+ "aws-smithy-http",
+ "aws-smithy-json",
+ "aws-smithy-runtime",
+ "aws-smithy-runtime-api",
+ "aws-smithy-types",
+ "aws-types",
+ "bytes",
+ "http 0.2.12",
+ "once_cell",
+ "regex-lite",
+ "tracing",
+]
+
[[package]]
name = "aws-sdk-sso"
version = "1.36.0"
@@ -1131,9 +1153,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
-version = "1.6.0"
+version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9"
+checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50"
dependencies = [
"serde",
]
@@ -2026,12 +2048,6 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10"
-[[package]]
-name = "dotenv"
-version = "0.15.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f"
-
[[package]]
name = "dotenvy"
version = "0.15.7"
@@ -3517,9 +3533,9 @@ dependencies = [
[[package]]
name = "lambda_runtime"
-version = "0.11.3"
+version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9be8f0e7a5db270feb93a7a3593c22a4c5fb8e8f260f5f490e0c3a5ffeb009db"
+checksum = "ed49669d6430292aead991e19bf13153135a884f916e68f32997c951af637ebe"
dependencies = [
"async-stream",
"base64 0.22.1",
@@ -4717,7 +4733,7 @@ checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "ownedbytes"
version = "0.7.0"
-source = "git+https://github.com/quickwit-oss/tantivy/?rev=13e9885#13e9885dfda8cebf4bfef72f53bf811da8549445"
+source = "git+https://github.com/quickwit-oss/tantivy/?rev=c71ec80#c71ec8086d6563c4bb7e573182a26b280a3ac519"
dependencies = [
"stable_deref_trait",
]
@@ -5627,6 +5643,7 @@ dependencies = [
"aws-config",
"aws-sdk-kinesis",
"aws-sdk-s3",
+ "aws-sdk-sqs",
"aws-smithy-async",
"aws-smithy-runtime",
"aws-types",
@@ -5952,6 +5969,7 @@ dependencies = [
"async-compression",
"async-trait",
"aws-sdk-kinesis",
+ "aws-sdk-sqs",
"bytes",
"bytesize",
"criterion",
@@ -5988,6 +6006,7 @@ dependencies = [
"quickwit-storage",
"rand 0.8.5",
"rdkafka",
+ "regex",
"reqwest",
"serde",
"serde_json",
@@ -6045,6 +6064,7 @@ name = "quickwit-integration-tests"
version = "0.8.0"
dependencies = [
"anyhow",
+ "aws-sdk-sqs",
"futures-util",
"hyper 0.14.29",
"itertools 0.13.0",
@@ -6052,6 +6072,7 @@ dependencies = [
"quickwit-cli",
"quickwit-common",
"quickwit-config",
+ "quickwit-indexing",
"quickwit-metastore",
"quickwit-proto",
"quickwit-rest-client",
@@ -6063,6 +6084,7 @@ dependencies = [
"tokio",
"tonic",
"tracing",
+ "tracing-subscriber",
]
[[package]]
@@ -6140,7 +6162,7 @@ dependencies = [
"flate2",
"http 0.2.12",
"lambda_http",
- "lambda_runtime 0.11.3",
+ "lambda_runtime 0.13.0",
"mime_guess",
"once_cell",
"opentelemetry",
@@ -6189,7 +6211,7 @@ dependencies = [
"async-trait",
"bytes",
"bytesize",
- "dotenv",
+ "dotenvy",
"futures",
"http 0.2.12",
"itertools 0.13.0",
@@ -8137,7 +8159,7 @@ dependencies = [
[[package]]
name = "tantivy"
version = "0.23.0"
-source = "git+https://github.com/quickwit-oss/tantivy/?rev=13e9885#13e9885dfda8cebf4bfef72f53bf811da8549445"
+source = "git+https://github.com/quickwit-oss/tantivy/?rev=c71ec80#c71ec8086d6563c4bb7e573182a26b280a3ac519"
dependencies = [
"aho-corasick",
"arc-swap",
@@ -8190,7 +8212,7 @@ dependencies = [
[[package]]
name = "tantivy-bitpacker"
version = "0.6.0"
-source = "git+https://github.com/quickwit-oss/tantivy/?rev=13e9885#13e9885dfda8cebf4bfef72f53bf811da8549445"
+source = "git+https://github.com/quickwit-oss/tantivy/?rev=c71ec80#c71ec8086d6563c4bb7e573182a26b280a3ac519"
dependencies = [
"bitpacking",
]
@@ -8198,7 +8220,7 @@ dependencies = [
[[package]]
name = "tantivy-columnar"
version = "0.3.0"
-source = "git+https://github.com/quickwit-oss/tantivy/?rev=13e9885#13e9885dfda8cebf4bfef72f53bf811da8549445"
+source = "git+https://github.com/quickwit-oss/tantivy/?rev=c71ec80#c71ec8086d6563c4bb7e573182a26b280a3ac519"
dependencies = [
"downcast-rs",
"fastdivide",
@@ -8213,7 +8235,7 @@ dependencies = [
[[package]]
name = "tantivy-common"
version = "0.7.0"
-source = "git+https://github.com/quickwit-oss/tantivy/?rev=13e9885#13e9885dfda8cebf4bfef72f53bf811da8549445"
+source = "git+https://github.com/quickwit-oss/tantivy/?rev=c71ec80#c71ec8086d6563c4bb7e573182a26b280a3ac519"
dependencies = [
"async-trait",
"byteorder",
@@ -8236,7 +8258,7 @@ dependencies = [
[[package]]
name = "tantivy-query-grammar"
version = "0.22.0"
-source = "git+https://github.com/quickwit-oss/tantivy/?rev=13e9885#13e9885dfda8cebf4bfef72f53bf811da8549445"
+source = "git+https://github.com/quickwit-oss/tantivy/?rev=c71ec80#c71ec8086d6563c4bb7e573182a26b280a3ac519"
dependencies = [
"nom",
]
@@ -8244,7 +8266,7 @@ dependencies = [
[[package]]
name = "tantivy-sstable"
version = "0.3.0"
-source = "git+https://github.com/quickwit-oss/tantivy/?rev=13e9885#13e9885dfda8cebf4bfef72f53bf811da8549445"
+source = "git+https://github.com/quickwit-oss/tantivy/?rev=c71ec80#c71ec8086d6563c4bb7e573182a26b280a3ac519"
dependencies = [
"tantivy-bitpacker",
"tantivy-common",
@@ -8255,7 +8277,7 @@ dependencies = [
[[package]]
name = "tantivy-stacker"
version = "0.3.0"
-source = "git+https://github.com/quickwit-oss/tantivy/?rev=13e9885#13e9885dfda8cebf4bfef72f53bf811da8549445"
+source = "git+https://github.com/quickwit-oss/tantivy/?rev=c71ec80#c71ec8086d6563c4bb7e573182a26b280a3ac519"
dependencies = [
"murmurhash32",
"rand_distr",
@@ -8265,7 +8287,7 @@ dependencies = [
[[package]]
name = "tantivy-tokenizer-api"
version = "0.3.0"
-source = "git+https://github.com/quickwit-oss/tantivy/?rev=13e9885#13e9885dfda8cebf4bfef72f53bf811da8549445"
+source = "git+https://github.com/quickwit-oss/tantivy/?rev=c71ec80#c71ec8086d6563c4bb7e573182a26b280a3ac519"
dependencies = [
"serde",
]
@@ -8471,9 +8493,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
-version = "1.39.1"
+version = "1.39.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d040ac2b29ab03b09d4129c2f5bbd012a3ac2f79d38ff506a4bf8dd34b0eac8a"
+checksum = "daa4fb1bc778bd6f04cbfc4bb2d06a7396a8f299dc33ea1900cedaa316f467b1"
dependencies = [
"backtrace",
"bytes",
diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml
index 025f8e193ac..51b3df6d541 100644
--- a/quickwit/Cargo.toml
+++ b/quickwit/Cargo.toml
@@ -101,7 +101,7 @@ console-subscriber = "0.1.8"
criterion = { version = "0.5", features = ["async_tokio"] }
cron = "0.12.0"
dialoguer = "0.10.3"
-dotenv = "0.15"
+dotenvy = "0.15"
dyn-clone = "1.0.10"
enum-iterator = "1.5"
env_logger = "0.10"
@@ -279,6 +279,7 @@ aws-config = "1.5.4"
aws-credential-types = { version = "1.2", features = ["hardcoded-credentials"] }
aws-sdk-kinesis = "1.36"
aws-sdk-s3 = "1.42"
+aws-sdk-sqs = "1.36"
aws-smithy-async = "1.2"
aws-smithy-runtime = "1.6.2"
aws-smithy-types = { version = "1.2", features = ["byte-stream-poll-next"] }
@@ -324,7 +325,7 @@ quickwit-serve = { path = "quickwit-serve" }
quickwit-storage = { path = "quickwit-storage" }
quickwit-telemetry = { path = "quickwit-telemetry" }
-tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "13e9885", default-features = false, features = [
+tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "c71ec80", default-features = false, features = [
"lz4-compression",
"mmap",
"quickwit",
diff --git a/quickwit/deny.toml b/quickwit/deny.toml
index 0c6c498786b..139785b1c9a 100644
--- a/quickwit/deny.toml
+++ b/quickwit/deny.toml
@@ -9,6 +9,7 @@
# The values provided in this template are the default values that will be used
# when any section or field is not specified in your own configuration
+[graph]
# If 1 or more target triples (and optionally, target_features) are specified,
# only the specified targets will be checked when running `cargo deny check`.
# This means, if a particular package is only ever used as a target specific
@@ -31,42 +32,22 @@ targets = [
# More documentation for the advisories section can be found here:
# https://embarkstudios.github.io/cargo-deny/checks/advisories/cfg.html
[advisories]
+version = 2
# The path where the advisory database is cloned/fetched into
db-path = "~/.cargo/advisory-db"
# The url(s) of the advisory databases to use
db-urls = ["https://github.com/rustsec/advisory-db"]
-# The lint level for security vulnerabilities
-vulnerability = "deny"
-# The lint level for unmaintained crates
-unmaintained = "warn"
-# The lint level for crates that have been yanked from their source registry
-yanked = "warn"
-# The lint level for crates with security notices. Note that as of
-# 2019-12-17 there are no security notice advisories in
-# https://github.com/rustsec/advisory-db
-notice = "warn"
# A list of advisory IDs to ignore. Note that ignored advisories will still
# output a note when they are encountered.
ignore = [
- # TODO Remove me after rsa gets patched and released.
- "RUSTSEC-2023-0071"
+ "RUSTSEC-2021-0153", # `encoding` is unmaintained, it's used in lindera
]
-# Threshold for security vulnerabilities, any vulnerability with a CVSS score
-# lower than the range specified will be ignored. Note that ignored advisories
-# will still output a note when they are encountered.
-# * None - CVSS Score 0.0
-# * Low - CVSS Score 0.1 - 3.9
-# * Medium - CVSS Score 4.0 - 6.9
-# * High - CVSS Score 7.0 - 8.9
-# * Critical - CVSS Score 9.0 - 10.0
-#severity-threshold =
# This section is considered when running `cargo deny check licenses`
# More documentation for the licenses section can be found here:
# https://embarkstudios.github.io/cargo-deny/checks/licenses/cfg.html
[licenses]
-# The lint level for crates which do not have a detectable license
-unlicensed = "deny"
+version = 2
# List of explicitly allowed licenses
# See https://spdx.org/licenses/ for list of possible licenses
# [possible values: any SPDX 3.11 short identifier (+ optional exception)].
@@ -86,26 +67,6 @@ allow = [
"Zlib",
"zlib-acknowledgement",
]
-# List of explicitly disallowed licenses
-# See https://spdx.org/licenses/ for list of possible licenses
-# [possible values: any SPDX 3.11 short identifier (+ optional exception)].
-deny = [
- #"Nokia",
-]
-# Lint level for licenses considered copyleft
-copyleft = "warn"
-# Blanket approval or denial for OSI-approved or FSF Free/Libre licenses
-# * both - The license will be approved if it is both OSI-approved *AND* FSF
-# * either - The license will be approved if it is either OSI-approved *OR* FSF
-# * osi-only - The license will be approved if is OSI-approved *AND NOT* FSF
-# * fsf-only - The license will be approved if is FSF *AND NOT* OSI-approved
-# * neither - This predicate is ignored and the default lint level is used
-allow-osi-fsf-free = "neither"
-# Lint level used when no other predicates are matched
-# 1. License isn't in the allow or deny lists
-# 2. License isn't copyleft
-# 3. License isn't OSI/FSF, or allow-osi-fsf-free = "neither"
-default = "deny"
# The confidence threshold for detecting a license from license text.
# The higher the value, the more closely the license text must be to the
# canonical license text of a valid SPDX license file.
@@ -114,20 +75,30 @@ confidence-threshold = 0.8
# Allow 1 or more licenses on a per-crate basis, so that particular licenses
# aren't accepted for every possible crate as with the normal allow list
exceptions = [
+ { name = "quickwit-actors", allow = ["AGPL-3.0"], version = "*" },
{ name = "quickwit-aws", allow = ["AGPL-3.0"], version = "*" },
{ name = "quickwit-cli", allow = ["AGPL-3.0"], version = "*" },
{ name = "quickwit-cluster", allow = ["AGPL-3.0"], version = "*" },
+ { name = "quickwit-codegen", allow = ["AGPL-3.0"], version = "*" },
+ { name = "quickwit-codegen-example", allow = ["AGPL-3.0"], version = "*" },
{ name = "quickwit-common", allow = ["AGPL-3.0"], version = "*" },
{ name = "quickwit-config", allow = ["AGPL-3.0"], version = "*" },
- { name = "quickwit-index-management", allow = ["AGPL-3.0"], version = "*" },
+ { name = "quickwit-control-plane", allow = ["AGPL-3.0"], version = "*" },
+ { name = "quickwit-datetime", allow = ["AGPL-3.0"], version = "*" },
{ name = "quickwit-directories", allow = ["AGPL-3.0"], version = "*" },
{ name = "quickwit-doc-mapper", allow = ["AGPL-3.0"], version = "*" },
{ name = "quickwit-indexing", allow = ["AGPL-3.0"], version = "*" },
- { name = "quickwit-ingest-api", allow = ["AGPL-3.0"], version = "*" },
+ { name = "quickwit-index-management", allow = ["AGPL-3.0"], version = "*" },
+ { name = "quickwit-ingest", allow = ["AGPL-3.0"], version = "*" },
+ { name = "quickwit-integration-tests", allow = ["AGPL-3.0"], version = "*" },
{ name = "quickwit-jaeger", allow = ["AGPL-3.0"], version = "*" },
+ { name = "quickwit-janitor", allow = ["AGPL-3.0"], version = "*" },
+ { name = "quickwit-lambda", allow = ["AGPL-3.0"], version = "*" },
+ { name = "quickwit-macros", allow = ["AGPL-3.0"], version = "*" },
{ name = "quickwit-metastore", allow = ["AGPL-3.0"], version = "*" },
- { name = "quickwit-metastore-utils", allow = ["AGPL-3.0"], version = "*" },
+ { name = "quickwit-opentelemetry", allow = ["AGPL-3.0"], version = "*" },
{ name = "quickwit-proto", allow = ["AGPL-3.0"], version = "*" },
+ { name = "quickwit-query", allow = ["AGPL-3.0"], version = "*" },
{ name = "quickwit-rest-client", allow = ["AGPL-3.0"], version = "*" },
{ name = "quickwit-search", allow = ["AGPL-3.0"], version = "*" },
{ name = "quickwit-serve", allow = ["AGPL-3.0"], version = "*" },
@@ -224,8 +195,8 @@ allow-git = []
[sources.allow-org]
# 1 or more github.com organizations to allow git sources for
-github = [""]
+github = ["quickwit-oss"]
# 1 or more gitlab.com organizations to allow git sources for
-gitlab = [""]
+gitlab = []
# 1 or more bitbucket.org organizations to allow git sources for
-bitbucket = [""]
+bitbucket = []
diff --git a/quickwit/quickwit-actors/src/actor_context.rs b/quickwit/quickwit-actors/src/actor_context.rs
index af7a8a3f7c9..0dbe1194bba 100644
--- a/quickwit/quickwit-actors/src/actor_context.rs
+++ b/quickwit/quickwit-actors/src/actor_context.rs
@@ -339,8 +339,11 @@ impl ActorContext {
self.self_mailbox.try_send_message(msg)
}
- /// Schedules a message that will be sent to the high-priority
- /// queue of the actor Mailbox once `after_duration` has elapsed.
+ /// Schedules a message that will be sent to the high-priority queue of the
+ /// actor Mailbox once `after_duration` has elapsed.
+ ///
+ /// Note that this holds a reference to the actor mailbox until the message
+ /// is actually sent.
pub fn schedule_self_msg(&self, after_duration: Duration, message: M)
where
A: DeferableReplyHandler,
diff --git a/quickwit/quickwit-aws/Cargo.toml b/quickwit/quickwit-aws/Cargo.toml
index f67d36155b4..19bf0ceb6e0 100644
--- a/quickwit/quickwit-aws/Cargo.toml
+++ b/quickwit/quickwit-aws/Cargo.toml
@@ -14,6 +14,7 @@ license.workspace = true
aws-config = { workspace = true }
aws-sdk-kinesis = { workspace = true, optional = true }
aws-sdk-s3 = { workspace = true }
+aws-sdk-sqs = { workspace = true, optional = true }
aws-smithy-async = { workspace = true }
aws-smithy-runtime = { workspace = true }
aws-types = { workspace = true }
@@ -27,3 +28,4 @@ quickwit-common = { workspace = true }
[features]
kinesis = ["aws-sdk-kinesis"]
+sqs = ["aws-sdk-sqs"]
diff --git a/quickwit/quickwit-aws/src/error.rs b/quickwit/quickwit-aws/src/error.rs
index e7c6dfdd077..ba2e620a27e 100644
--- a/quickwit/quickwit-aws/src/error.rs
+++ b/quickwit/quickwit-aws/src/error.rs
@@ -19,13 +19,6 @@
#![allow(clippy::match_like_matches_macro)]
-#[cfg(feature = "kinesis")]
-use aws_sdk_kinesis::operation::{
- create_stream::CreateStreamError, delete_stream::DeleteStreamError,
- describe_stream::DescribeStreamError, get_records::GetRecordsError,
- get_shard_iterator::GetShardIteratorError, list_shards::ListShardsError,
- list_streams::ListStreamsError, merge_shards::MergeShardsError, split_shard::SplitShardError,
-};
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::operation::abort_multipart_upload::AbortMultipartUploadError;
use aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadError;
@@ -109,89 +102,124 @@ impl AwsRetryable for HeadObjectError {
}
#[cfg(feature = "kinesis")]
-impl AwsRetryable for GetRecordsError {
- fn is_retryable(&self) -> bool {
- match self {
- GetRecordsError::KmsThrottlingException(_) => true,
- GetRecordsError::ProvisionedThroughputExceededException(_) => true,
- _ => false,
+mod kinesis {
+ use aws_sdk_kinesis::operation::create_stream::CreateStreamError;
+ use aws_sdk_kinesis::operation::delete_stream::DeleteStreamError;
+ use aws_sdk_kinesis::operation::describe_stream::DescribeStreamError;
+ use aws_sdk_kinesis::operation::get_records::GetRecordsError;
+ use aws_sdk_kinesis::operation::get_shard_iterator::GetShardIteratorError;
+ use aws_sdk_kinesis::operation::list_shards::ListShardsError;
+ use aws_sdk_kinesis::operation::list_streams::ListStreamsError;
+ use aws_sdk_kinesis::operation::merge_shards::MergeShardsError;
+ use aws_sdk_kinesis::operation::split_shard::SplitShardError;
+
+ use super::*;
+
+ impl AwsRetryable for GetRecordsError {
+ fn is_retryable(&self) -> bool {
+ match self {
+ GetRecordsError::KmsThrottlingException(_) => true,
+ GetRecordsError::ProvisionedThroughputExceededException(_) => true,
+ _ => false,
+ }
}
}
-}
-#[cfg(feature = "kinesis")]
-impl AwsRetryable for GetShardIteratorError {
- fn is_retryable(&self) -> bool {
- matches!(
- self,
- GetShardIteratorError::ProvisionedThroughputExceededException(_)
- )
+ impl AwsRetryable for GetShardIteratorError {
+ fn is_retryable(&self) -> bool {
+ matches!(
+ self,
+ GetShardIteratorError::ProvisionedThroughputExceededException(_)
+ )
+ }
}
-}
-#[cfg(feature = "kinesis")]
-impl AwsRetryable for ListShardsError {
- fn is_retryable(&self) -> bool {
- matches!(
- self,
- ListShardsError::ResourceInUseException(_) | ListShardsError::LimitExceededException(_)
- )
+ impl AwsRetryable for ListShardsError {
+ fn is_retryable(&self) -> bool {
+ matches!(
+ self,
+ ListShardsError::ResourceInUseException(_)
+ | ListShardsError::LimitExceededException(_)
+ )
+ }
}
-}
-#[cfg(feature = "kinesis")]
-impl AwsRetryable for CreateStreamError {
- fn is_retryable(&self) -> bool {
- matches!(
- self,
- CreateStreamError::ResourceInUseException(_)
- | CreateStreamError::LimitExceededException(_)
- )
+ impl AwsRetryable for CreateStreamError {
+ fn is_retryable(&self) -> bool {
+ matches!(
+ self,
+ CreateStreamError::ResourceInUseException(_)
+ | CreateStreamError::LimitExceededException(_)
+ )
+ }
}
-}
-#[cfg(feature = "kinesis")]
-impl AwsRetryable for DeleteStreamError {
- fn is_retryable(&self) -> bool {
- matches!(
- self,
- DeleteStreamError::ResourceInUseException(_)
- | DeleteStreamError::LimitExceededException(_)
- )
+ impl AwsRetryable for DeleteStreamError {
+ fn is_retryable(&self) -> bool {
+ matches!(
+ self,
+ DeleteStreamError::ResourceInUseException(_)
+ | DeleteStreamError::LimitExceededException(_)
+ )
+ }
}
-}
-#[cfg(feature = "kinesis")]
-impl AwsRetryable for DescribeStreamError {
- fn is_retryable(&self) -> bool {
- matches!(self, DescribeStreamError::LimitExceededException(_))
+ impl AwsRetryable for DescribeStreamError {
+ fn is_retryable(&self) -> bool {
+ matches!(self, DescribeStreamError::LimitExceededException(_))
+ }
}
-}
-#[cfg(feature = "kinesis")]
-impl AwsRetryable for ListStreamsError {
- fn is_retryable(&self) -> bool {
- matches!(self, ListStreamsError::LimitExceededException(_))
+ impl AwsRetryable for ListStreamsError {
+ fn is_retryable(&self) -> bool {
+ matches!(self, ListStreamsError::LimitExceededException(_))
+ }
}
-}
-#[cfg(feature = "kinesis")]
-impl AwsRetryable for MergeShardsError {
- fn is_retryable(&self) -> bool {
- matches!(
- self,
- MergeShardsError::ResourceInUseException(_)
- | MergeShardsError::LimitExceededException(_)
- )
+ impl AwsRetryable for MergeShardsError {
+ fn is_retryable(&self) -> bool {
+ matches!(
+ self,
+ MergeShardsError::ResourceInUseException(_)
+ | MergeShardsError::LimitExceededException(_)
+ )
+ }
+ }
+
+ impl AwsRetryable for SplitShardError {
+ fn is_retryable(&self) -> bool {
+ matches!(
+ self,
+ SplitShardError::ResourceInUseException(_)
+ | SplitShardError::LimitExceededException(_)
+ )
+ }
}
}
-#[cfg(feature = "kinesis")]
-impl AwsRetryable for SplitShardError {
- fn is_retryable(&self) -> bool {
- matches!(
- self,
- SplitShardError::ResourceInUseException(_) | SplitShardError::LimitExceededException(_)
- )
+#[cfg(feature = "sqs")]
+mod sqs {
+ use aws_sdk_sqs::operation::change_message_visibility::ChangeMessageVisibilityError;
+ use aws_sdk_sqs::operation::delete_message_batch::DeleteMessageBatchError;
+ use aws_sdk_sqs::operation::receive_message::ReceiveMessageError;
+
+ use super::*;
+
+ impl AwsRetryable for ReceiveMessageError {
+ fn is_retryable(&self) -> bool {
+ false
+ }
+ }
+
+ impl AwsRetryable for DeleteMessageBatchError {
+ fn is_retryable(&self) -> bool {
+ false
+ }
+ }
+
+ impl AwsRetryable for ChangeMessageVisibilityError {
+ fn is_retryable(&self) -> bool {
+ false
+ }
}
}
diff --git a/quickwit/quickwit-cli/Cargo.toml b/quickwit/quickwit-cli/Cargo.toml
index 36cfc371aa7..d403eef2922 100644
--- a/quickwit/quickwit-cli/Cargo.toml
+++ b/quickwit/quickwit-cli/Cargo.toml
@@ -92,6 +92,7 @@ release-feature-set = [
"quickwit-indexing/kafka",
"quickwit-indexing/kinesis",
"quickwit-indexing/pulsar",
+ "quickwit-indexing/sqs",
"quickwit-indexing/vrl",
"quickwit-storage/azure",
"quickwit-storage/gcs",
@@ -104,6 +105,7 @@ release-feature-vendored-set = [
"pprof",
"quickwit-indexing/kinesis",
"quickwit-indexing/pulsar",
+ "quickwit-indexing/sqs",
"quickwit-indexing/vrl",
"quickwit-indexing/vendored-kafka",
"quickwit-storage/azure",
@@ -116,6 +118,7 @@ release-macos-feature-vendored-set = [
"openssl-support",
"quickwit-indexing/kinesis",
"quickwit-indexing/pulsar",
+ "quickwit-indexing/sqs",
"quickwit-indexing/vrl",
"quickwit-indexing/vendored-kafka-macos",
"quickwit-storage/azure",
diff --git a/quickwit/quickwit-cli/src/source.rs b/quickwit/quickwit-cli/src/source.rs
index 1a1948fdd99..ee90689ca94 100644
--- a/quickwit/quickwit-cli/src/source.rs
+++ b/quickwit/quickwit-cli/src/source.rs
@@ -744,7 +744,7 @@ mod tests {
source_id: "foo-source".to_string(),
num_pipelines: NonZeroUsize::new(1).unwrap(),
enabled: true,
- source_params: SourceParams::file("path/to/file"),
+ source_params: SourceParams::file_from_str("path/to/file").unwrap(),
transform_config: None,
input_format: SourceInputFormat::Json,
}];
@@ -753,9 +753,10 @@ mod tests {
source_type: "file".to_string(),
enabled: "true".to_string(),
}];
+ let expected_uri = Uri::from_str("path/to/file").unwrap();
let expected_params = vec![ParamsRow {
key: "filepath".to_string(),
- value: JsonValue::String("path/to/file".to_string()),
+ value: JsonValue::String(expected_uri.to_string()),
}];
let expected_checkpoint = vec![
CheckpointRow {
@@ -820,12 +821,12 @@ mod tests {
let expected_sources = [
SourceRow {
source_id: "bar-source".to_string(),
- source_type: "file".to_string(),
+ source_type: "stdin".to_string(),
enabled: "true".to_string(),
},
SourceRow {
source_id: "foo-source".to_string(),
- source_type: "file".to_string(),
+ source_type: "stdin".to_string(),
enabled: "true".to_string(),
},
];
diff --git a/quickwit/quickwit-cli/src/tool.rs b/quickwit/quickwit-cli/src/tool.rs
index b936cbc4897..c7ab1911205 100644
--- a/quickwit/quickwit-cli/src/tool.rs
+++ b/quickwit/quickwit-cli/src/tool.rs
@@ -173,7 +173,7 @@ pub fn build_tool_command() -> Command {
pub struct LocalIngestDocsArgs {
pub config_uri: Uri,
pub index_id: IndexId,
- pub input_path_opt: Option,
+ pub input_path_opt: Option,
pub input_format: SourceInputFormat,
pub overwrite: bool,
pub vrl_script: Option,
@@ -251,9 +251,7 @@ impl ToolCliCommand {
.remove_one::("index")
.expect("`index` should be a required arg.");
let input_path_opt = if let Some(input_path) = matches.remove_one::("input-path") {
- Uri::from_str(&input_path)?
- .filepath()
- .map(|path| path.to_path_buf())
+ Some(Uri::from_str(&input_path)?)
} else {
None
};
@@ -410,8 +408,8 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
get_resolvers(&config.storage_configs, &config.metastore_configs);
let mut metastore = metastore_resolver.resolve(&config.metastore_uri).await?;
- let source_params = if let Some(filepath) = args.input_path_opt.as_ref() {
- SourceParams::file(filepath)
+ let source_params = if let Some(uri) = args.input_path_opt.as_ref() {
+ SourceParams::file_from_uri(uri.clone())
} else {
SourceParams::stdin()
};
diff --git a/quickwit/quickwit-cli/tests/cli.rs b/quickwit/quickwit-cli/tests/cli.rs
index 205fd778a85..524098537b6 100644
--- a/quickwit/quickwit-cli/tests/cli.rs
+++ b/quickwit/quickwit-cli/tests/cli.rs
@@ -26,7 +26,7 @@ use std::path::Path;
use anyhow::Result;
use clap::error::ErrorKind;
-use helpers::{TestEnv, TestStorageType};
+use helpers::{uri_from_path, TestEnv, TestStorageType};
use quickwit_cli::checklist::ChecklistError;
use quickwit_cli::cli::build_cli;
use quickwit_cli::index::{
@@ -38,6 +38,7 @@ use quickwit_cli::tool::{
};
use quickwit_common::fs::get_cache_directory_path;
use quickwit_common::rand::append_random_suffix;
+use quickwit_common::uri::Uri;
use quickwit_config::{RetentionPolicy, SourceInputFormat, CLI_SOURCE_ID};
use quickwit_metastore::{
ListSplitsRequestExt, MetastoreResolver, MetastoreServiceExt, MetastoreServiceStreamSplitsExt,
@@ -62,11 +63,11 @@ async fn create_logs_index(test_env: &TestEnv) -> anyhow::Result<()> {
create_index_cli(args).await
}
-async fn local_ingest_docs(input_path: &Path, test_env: &TestEnv) -> anyhow::Result<()> {
+async fn local_ingest_docs(uri: Uri, test_env: &TestEnv) -> anyhow::Result<()> {
let args = LocalIngestDocsArgs {
config_uri: test_env.resource_files.config.clone(),
index_id: test_env.index_id.clone(),
- input_path_opt: Some(input_path.to_path_buf()),
+ input_path_opt: Some(uri),
input_format: SourceInputFormat::Json,
overwrite: false,
clear_cache: true,
@@ -75,6 +76,10 @@ async fn local_ingest_docs(input_path: &Path, test_env: &TestEnv) -> anyhow::Res
local_ingest_docs_cli(args).await
}
+async fn local_ingest_log_docs(test_env: &TestEnv) -> anyhow::Result<()> {
+ local_ingest_docs(test_env.resource_files.log_docs.clone(), test_env).await
+}
+
#[test]
fn test_cmd_help() {
let cmd = build_cli();
@@ -253,14 +258,17 @@ async fn test_ingest_docs_cli() {
// Ensure cache directory is empty.
let cache_directory_path = get_cache_directory_path(&test_env.data_dir_path);
-
assert!(cache_directory_path.read_dir().unwrap().next().is_none());
+ let does_not_exist_uri = uri_from_path(&test_env.data_dir_path)
+ .join("file-does-not-exist.json")
+ .unwrap();
+
// Ingest a non-existing file should fail.
let args = LocalIngestDocsArgs {
config_uri: test_env.resource_files.config,
index_id: test_env.index_id,
- input_path_opt: Some(test_env.data_dir_path.join("file-does-not-exist.json")),
+ input_path_opt: Some(does_not_exist_uri),
input_format: SourceInputFormat::Json,
overwrite: false,
clear_cache: true,
@@ -333,9 +341,7 @@ async fn test_cmd_search_aggregation() {
test_env.start_server().await.unwrap();
create_logs_index(&test_env).await.unwrap();
- local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env)
- .await
- .unwrap();
+ local_ingest_log_docs(&test_env).await.unwrap();
let aggregation: Value = json!(
{
@@ -433,9 +439,7 @@ async fn test_cmd_search_with_snippets() -> Result<()> {
test_env.start_server().await.unwrap();
create_logs_index(&test_env).await.unwrap();
- local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env)
- .await
- .unwrap();
+ local_ingest_log_docs(&test_env).await.unwrap();
// search with snippets
let args = SearchIndexArgs {
@@ -488,9 +492,7 @@ async fn test_search_index_cli() {
sort_by_score: false,
};
- local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env)
- .await
- .unwrap();
+ local_ingest_log_docs(&test_env).await.unwrap();
let args = create_search_args("level:info");
@@ -601,9 +603,7 @@ async fn test_delete_index_cli_dry_run() {
.unwrap();
assert!(metastore.index_exists(&index_id).await.unwrap());
- local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env)
- .await
- .unwrap();
+ local_ingest_log_docs(&test_env).await.unwrap();
// On non-empty index
let args = create_delete_args(true);
@@ -627,9 +627,7 @@ async fn test_delete_index_cli() {
test_env.start_server().await.unwrap();
create_logs_index(&test_env).await.unwrap();
- local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env)
- .await
- .unwrap();
+ local_ingest_log_docs(&test_env).await.unwrap();
let args = DeleteIndexArgs {
client_args: test_env.default_client_args(),
@@ -653,9 +651,7 @@ async fn test_garbage_collect_cli_no_grace() {
test_env.start_server().await.unwrap();
create_logs_index(&test_env).await.unwrap();
let index_uid = test_env.index_metadata().await.unwrap().index_uid;
- local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env)
- .await
- .unwrap();
+ local_ingest_log_docs(&test_env).await.unwrap();
let metastore = MetastoreResolver::unconfigured()
.resolve(&test_env.metastore_uri)
@@ -763,9 +759,7 @@ async fn test_garbage_collect_index_cli() {
test_env.start_server().await.unwrap();
create_logs_index(&test_env).await.unwrap();
let index_uid = test_env.index_metadata().await.unwrap().index_uid;
- local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env)
- .await
- .unwrap();
+ local_ingest_log_docs(&test_env).await.unwrap();
let refresh_metastore = |metastore| async {
// In this test we rely on the file backed metastore and
@@ -915,9 +909,7 @@ async fn test_all_local_index() {
.unwrap();
assert!(metadata_file_exists);
- local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env)
- .await
- .unwrap();
+ local_ingest_log_docs(&test_env).await.unwrap();
let query_response = reqwest::get(format!(
"http://127.0.0.1:{}/api/v1/{}/search?query=level:info",
@@ -971,16 +963,21 @@ async fn test_all_with_s3_localstack_cli() {
test_env.start_server().await.unwrap();
create_logs_index(&test_env).await.unwrap();
- let s3_path = upload_test_file(
+ let s3_uri = upload_test_file(
test_env.storage_resolver.clone(),
- test_env.resource_files.log_docs.clone(),
+ test_env
+ .resource_files
+ .log_docs
+ .filepath()
+ .unwrap()
+ .to_path_buf(),
"quickwit-integration-tests",
"sources/",
&append_random_suffix("test-all--cli-s3-localstack"),
)
.await;
- local_ingest_docs(&s3_path, &test_env).await.unwrap();
+ local_ingest_docs(s3_uri, &test_env).await.unwrap();
// Cli search
let args = SearchIndexArgs {
diff --git a/quickwit/quickwit-cli/tests/helpers.rs b/quickwit/quickwit-cli/tests/helpers.rs
index 0a52f6b9792..67839f7368b 100644
--- a/quickwit/quickwit-cli/tests/helpers.rs
+++ b/quickwit/quickwit-cli/tests/helpers.rs
@@ -17,9 +17,8 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-use std::borrow::Borrow;
use std::fs;
-use std::path::PathBuf;
+use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::Arc;
@@ -114,8 +113,8 @@ pub struct TestResourceFiles {
pub index_config: Uri,
pub index_config_without_uri: Uri,
pub index_config_with_retention: Uri,
- pub log_docs: PathBuf,
- pub wikipedia_docs: PathBuf,
+ pub log_docs: Uri,
+ pub wikipedia_docs: Uri,
}
/// A struct to hold few info about the test environment.
@@ -192,8 +191,8 @@ pub enum TestStorageType {
LocalFileSystem,
}
-fn uri_from_path(path: PathBuf) -> Uri {
- Uri::from_str(&format!("file://{}", path.display())).unwrap()
+pub fn uri_from_path(path: &Path) -> Uri {
+ Uri::from_str(path.to_str().unwrap()).unwrap()
}
/// Creates all necessary artifacts in a test environment.
@@ -265,12 +264,12 @@ pub async fn create_test_env(
.context("failed to parse cluster endpoint")?;
let resource_files = TestResourceFiles {
- config: uri_from_path(node_config_path),
- index_config: uri_from_path(index_config_path),
- index_config_without_uri: uri_from_path(index_config_without_uri_path),
- index_config_with_retention: uri_from_path(index_config_with_retention_path),
- log_docs: log_docs_path,
- wikipedia_docs: wikipedia_docs_path,
+ config: uri_from_path(&node_config_path),
+ index_config: uri_from_path(&index_config_path),
+ index_config_without_uri: uri_from_path(&index_config_without_uri_path),
+ index_config_with_retention: uri_from_path(&index_config_with_retention_path),
+ log_docs: uri_from_path(&log_docs_path),
+ wikipedia_docs: uri_from_path(&wikipedia_docs_path),
};
Ok(TestEnv {
@@ -297,15 +296,14 @@ pub async fn upload_test_file(
bucket: &str,
prefix: &str,
filename: &str,
-) -> PathBuf {
+) -> Uri {
let test_data = tokio::fs::read(local_src_path).await.unwrap();
- let mut src_location: PathBuf = [r"s3://", bucket, prefix].iter().collect();
- let storage_uri = Uri::from_str(src_location.to_string_lossy().borrow()).unwrap();
+ let src_location = format!("s3://{}/{}", bucket, prefix);
+ let storage_uri = Uri::from_str(&src_location).unwrap();
let storage = storage_resolver.resolve(&storage_uri).await.unwrap();
storage
.put(&PathBuf::from(filename), Box::new(test_data))
.await
.unwrap();
- src_location.push(filename);
- src_location
+ storage_uri.join(filename).unwrap()
}
diff --git a/quickwit/quickwit-common/src/fs.rs b/quickwit/quickwit-common/src/fs.rs
index adcb432e1b1..1aaa43d8286 100644
--- a/quickwit/quickwit-common/src/fs.rs
+++ b/quickwit/quickwit-common/src/fs.rs
@@ -34,7 +34,7 @@ pub async fn empty_dir>(path: P) -> anyhow::Result<()> {
Ok(())
}
-/// Helper function to get the cache path.
+/// Helper function to get the indexer split cache path.
pub fn get_cache_directory_path(data_dir_path: &Path) -> PathBuf {
data_dir_path.join("indexer-split-cache").join("splits")
}
diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs
index e41c46dce5b..5e256793fcd 100644
--- a/quickwit/quickwit-config/src/lib.rs
+++ b/quickwit/quickwit-config/src/lib.rs
@@ -55,11 +55,13 @@ pub use quickwit_doc_mapper::DocMapping;
use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json::Value as JsonValue;
+use source_config::FileSourceParamsForSerde;
pub use source_config::{
- load_source_config_from_user_config, FileSourceParams, KafkaSourceParams, KinesisSourceParams,
- PubSubSourceParams, PulsarSourceAuth, PulsarSourceParams, RegionOrEndpoint, SourceConfig,
- SourceInputFormat, SourceParams, TransformConfig, VecSourceParams, VoidSourceParams,
- CLI_SOURCE_ID, INGEST_API_SOURCE_ID, INGEST_V2_SOURCE_ID,
+ load_source_config_from_user_config, FileSourceMessageType, FileSourceNotification,
+ FileSourceParams, FileSourceSqs, KafkaSourceParams, KinesisSourceParams, PubSubSourceParams,
+ PulsarSourceAuth, PulsarSourceParams, RegionOrEndpoint, SourceConfig, SourceInputFormat,
+ SourceParams, TransformConfig, VecSourceParams, VoidSourceParams, CLI_SOURCE_ID,
+ INGEST_API_SOURCE_ID, INGEST_V2_SOURCE_ID,
};
use tracing::warn;
@@ -112,7 +114,10 @@ pub fn disable_ingest_v1() -> bool {
IndexTemplateV0_8,
SourceInputFormat,
SourceParams,
- FileSourceParams,
+ FileSourceMessageType,
+ FileSourceNotification,
+ FileSourceParamsForSerde,
+ FileSourceSqs,
PubSubSourceParams,
KafkaSourceParams,
KinesisSourceParams,
diff --git a/quickwit/quickwit-config/src/source_config/mod.rs b/quickwit/quickwit-config/src/source_config/mod.rs
index bc1c0cf3168..b9fcaa15018 100644
--- a/quickwit/quickwit-config/src/source_config/mod.rs
+++ b/quickwit/quickwit-config/src/source_config/mod.rs
@@ -19,8 +19,8 @@
pub(crate) mod serialize;
+use std::borrow::Cow;
use std::num::NonZeroUsize;
-use std::path::{Path, PathBuf};
use std::str::FromStr;
use bytes::Bytes;
@@ -82,6 +82,7 @@ impl SourceConfig {
SourceParams::Kinesis(_) => SourceType::Kinesis,
SourceParams::PubSub(_) => SourceType::PubSub,
SourceParams::Pulsar(_) => SourceType::Pulsar,
+ SourceParams::Stdin => SourceType::Stdin,
SourceParams::Vec(_) => SourceType::Vec,
SourceParams::Void(_) => SourceType::Void,
}
@@ -98,6 +99,7 @@ impl SourceConfig {
SourceParams::Kafka(params) => serde_json::to_value(params),
SourceParams::Kinesis(params) => serde_json::to_value(params),
SourceParams::Pulsar(params) => serde_json::to_value(params),
+ SourceParams::Stdin => serde_json::to_value(()),
SourceParams::Vec(params) => serde_json::to_value(params),
SourceParams::Void(params) => serde_json::to_value(params),
}
@@ -214,6 +216,7 @@ impl FromStr for SourceInputFormat {
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(tag = "source_type", content = "params", rename_all = "snake_case")]
pub enum SourceParams {
+ #[schema(value_type = FileSourceParamsForSerde)]
File(FileSourceParams),
Ingest,
#[serde(rename = "ingest-api")]
@@ -225,17 +228,22 @@ pub enum SourceParams {
#[serde(rename = "pubsub")]
PubSub(PubSubSourceParams),
Pulsar(PulsarSourceParams),
+ Stdin,
Vec(VecSourceParams),
Void(VoidSourceParams),
}
impl SourceParams {
- pub fn file>(filepath: P) -> Self {
- Self::File(FileSourceParams::file(filepath))
+ pub fn file_from_uri(uri: Uri) -> Self {
+ Self::File(FileSourceParams::Filepath(uri))
+ }
+
+ pub fn file_from_str>(filepath: P) -> anyhow::Result {
+ Uri::from_str(filepath.as_ref()).map(Self::file_from_uri)
}
pub fn stdin() -> Self {
- Self::File(FileSourceParams::stdin())
+ Self::Stdin
}
pub fn void() -> Self {
@@ -243,41 +251,92 @@ impl SourceParams {
}
}
+#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
+#[serde(rename_all = "snake_case")]
+pub enum FileSourceMessageType {
+ /// See
+ S3Notification,
+ /// A string with the URI of the file (e.g `s3://bucket/key`)
+ RawUri,
+}
+
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
+pub struct FileSourceSqs {
+ pub queue_url: String,
+ pub message_type: FileSourceMessageType,
+}
+
+#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
+#[serde(tag = "type", rename_all = "snake_case")]
+pub enum FileSourceNotification {
+ Sqs(FileSourceSqs),
+}
+
+#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
-pub struct FileSourceParams {
- /// Path of the file to read. Assume stdin if None.
- #[schema(value_type = String)]
- #[serde(skip_serializing_if = "Option::is_none")]
- #[serde(default)]
- #[serde(deserialize_with = "absolute_filepath_from_str")]
- pub filepath: Option, //< If None read from stdin.
+pub(super) struct FileSourceParamsForSerde {
+ #[serde(default, skip_serializing_if = "Vec::is_empty")]
+ notifications: Vec,
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ filepath: Option,
}
-/// Deserializing as an URI first to validate the input.
-///
-/// TODO: we might want to replace `PathBuf` with `Uri` directly in
-/// `FileSourceParams`
-fn absolute_filepath_from_str<'de, D>(deserializer: D) -> Result