Skip to content

Commit 6ff6430

Browse files
Merge branch 'main' into trinity/batch-delete
2 parents b426d80 + ec951aa commit 6ff6430

File tree

47 files changed

+256
-116
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+256
-116
lines changed

quickwit/Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ serde_json_borrow = "0.5"
215215
serde_qs = { version = "0.12", features = ["warp"] }
216216
serde_with = "3.9.0"
217217
serde_yaml = "0.9"
218+
serial_test = { version = "3.1.1", features = ["file_locks"] }
218219
siphasher = "0.3"
219220
smallvec = "1"
220221
sqlx = { version = "0.7", features = [

quickwit/quickwit-actors/src/actor.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,10 @@ pub enum ActorExitStatus {
3636
/// The actor successfully exited.
3737
///
3838
/// It happens either because:
39-
/// - all of the existing mailboxes were dropped and the actor message queue was exhausted.
40-
/// No new message could ever arrive to the actor. (This exit is triggered by the framework.)
41-
/// or
42-
/// - the actor `process_message` method returned `Err(ExitStatusCode::Success)`.
43-
/// (This exit is triggered by the actor implementer.)
39+
/// - all of the existing mailboxes were dropped and the actor message queue was exhausted. No
40+
/// new message could ever arrive to the actor. (This exit is triggered by the framework.) or
41+
/// - the actor `process_message` method returned `Err(ExitStatusCode::Success)`. (This exit is
42+
/// triggered by the actor implementer.)
4443
///
4544
/// (This is equivalent to exit status code 0.)
4645
/// Note that this is not really an error.

quickwit/quickwit-actors/src/scheduler.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -323,10 +323,9 @@ impl Scheduler {
323323
/// Updates the simulated time shift, if appropriate.
324324
///
325325
/// We advance time if:
326-
/// - someone is actually requesting for a simulated fast forward in time.
327-
/// (if Universe::simulate_time_shift(..) has been called).
328-
/// - no message is queued for processing, no initialize or no finalize
329-
/// is being processed.
326+
/// - someone is actually requesting for a simulated fast forward in time. (if
327+
/// Universe::simulate_time_shift(..) has been called).
328+
/// - no message is queued for processing, no initialize or no finalize is being processed.
330329
fn advance_time_if_necessary(&mut self) {
331330
let Some(scheduler_client) = self.scheduler_client() else {
332331
return;

quickwit/quickwit-cli/tests/helpers.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,6 @@ pub struct TestResourceFiles {
114114
pub index_config_without_uri: Uri,
115115
pub index_config_with_retention: Uri,
116116
pub log_docs: Uri,
117-
pub wikipedia_docs: Uri,
118117
}
119118

120119
/// A struct to hold few info about the test environment.
@@ -130,7 +129,6 @@ pub struct TestEnv {
130129
/// The metastore URI.
131130
pub metastore_uri: Uri,
132131
pub metastore_resolver: MetastoreResolver,
133-
pub metastore: MetastoreServiceClient,
134132

135133
pub cluster_endpoint: Url,
136134

@@ -219,7 +217,6 @@ pub async fn create_test_env(
219217
let storage_resolver = StorageResolver::unconfigured();
220218
let storage = storage_resolver.resolve(&metastore_uri).await?;
221219
let metastore_resolver = MetastoreResolver::unconfigured();
222-
let metastore = metastore_resolver.resolve(&metastore_uri).await?;
223220
let index_uri = metastore_uri.join(&index_id).unwrap();
224221
let index_config_path = resources_dir_path.join("index_config.yaml");
225222
fs::write(
@@ -258,7 +255,7 @@ pub async fn create_test_env(
258255
let log_docs_path = resources_dir_path.join("logs.json");
259256
fs::write(&log_docs_path, LOGS_JSON_DOCS)?;
260257
let wikipedia_docs_path = resources_dir_path.join("wikis.json");
261-
fs::write(&wikipedia_docs_path, WIKI_JSON_DOCS)?;
258+
fs::write(wikipedia_docs_path, WIKI_JSON_DOCS)?;
262259

263260
let cluster_endpoint = Url::parse(&format!("http://localhost:{rest_listen_port}"))
264261
.context("failed to parse cluster endpoint")?;
@@ -269,7 +266,6 @@ pub async fn create_test_env(
269266
index_config_without_uri: uri_from_path(&index_config_without_uri_path),
270267
index_config_with_retention: uri_from_path(&index_config_with_retention_path),
271268
log_docs: uri_from_path(&log_docs_path),
272-
wikipedia_docs: uri_from_path(&wikipedia_docs_path),
273269
};
274270

275271
Ok(TestEnv {
@@ -279,7 +275,6 @@ pub async fn create_test_env(
279275
resource_files,
280276
metastore_uri,
281277
metastore_resolver,
282-
metastore,
283278
cluster_endpoint,
284279
index_id,
285280
index_uri,

quickwit/quickwit-cluster/src/cluster.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,7 @@ impl Cluster {
416416
/// Tasks are grouped by (index_id, source_id), each group is stored in a key as follows:
417417
/// - key: `{INDEXING_TASK_PREFIX}{index_id}{INDEXING_TASK_SEPARATOR}{source_id}`
418418
/// - value: Number of indexing tasks in the group.
419+
///
419420
/// Keys present in chitchat state but not in the given `indexing_tasks` are marked for
420421
/// deletion.
421422
pub async fn update_self_node_indexing_tasks(&self, indexing_tasks: &[IndexingTask]) {

quickwit/quickwit-codegen/example/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,6 @@ quickwit-actors = { workspace = true, features = ["testsuite"] }
3636

3737
[build-dependencies]
3838
quickwit-codegen = { workspace = true }
39+
40+
[features]
41+
testsuite = []

quickwit/quickwit-common/build.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// Copyright (C) 2024 Quickwit, Inc.
2+
//
3+
// Quickwit is offered under the AGPL v3.0 and as commercial software.
4+
// For commercial licensing, contact us at hello@quickwit.io.
5+
//
6+
// AGPL:
7+
// This program is free software: you can redistribute it and/or modify
8+
// it under the terms of the GNU Affero General Public License as
9+
// published by the Free Software Foundation, either version 3 of the
10+
// License, or (at your option) any later version.
11+
//
12+
// This program is distributed in the hope that it will be useful,
13+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
// GNU Affero General Public License for more details.
16+
//
17+
// You should have received a copy of the GNU Affero General Public License
18+
// along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
20+
fn main() {
21+
println!("cargo::rustc-check-cfg=cfg(tokio_unstable)");
22+
}

quickwit/quickwit-common/src/pubsub.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ type EventSubscriptions<E> = HashMap<usize, EventSubscription<E>>;
5454
/// The event broker makes it possible to
5555
/// - emit specific local events
5656
/// - subscribe to these local events
57+
///
5758
/// The event broker is not distributed in itself. Only events emitted
5859
/// locally will be received by the subscribers.
5960
///

quickwit/quickwit-common/src/thread_pool.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,11 @@ impl ThreadPool {
6969
///
7070
/// Here are two important differences however:
7171
///
72-
/// 1) The task runs on a rayon thread pool managed by Quickwit.
73-
/// This pool is specifically used only to run CPU-intensive work
74-
/// and is configured to contain `num_cpus` cores.
72+
/// 1) The task runs on a rayon thread pool managed by Quickwit. This pool is specifically used
73+
/// only to run CPU-intensive work and is configured to contain `num_cpus` cores.
7574
///
76-
/// 2) Before the task is effectively scheduled, we check that
77-
/// the spawner is still interested in its result.
75+
/// 2) Before the task is effectively scheduled, we check that the spawner is still interested
76+
/// in its result.
7877
///
7978
/// It is therefore required to `await` the result of this
8079
/// function to get any work done.

quickwit/quickwit-config/src/source_config/serialize.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ impl SourceConfigForSerialization {
6868
/// Checks the validity of the `SourceConfig` as a "deserializable source".
6969
///
7070
/// Two remarks:
71-
/// - This does not check connectivity, it just validate configuration,
72-
/// without performing any IO. See `check_connectivity(..)`.
71+
/// - This does not check connectivity, it just validate configuration, without performing any
72+
/// IO. See `check_connectivity(..)`.
7373
/// - This is used each time the `SourceConfig` is deserialized (at creation but also during
7474
/// communications with the metastore). When ingesting from stdin, we programmatically create
7575
/// an invalid `SourceConfig` and only use it locally.

quickwit/quickwit-config/src/storage_config.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
// along with this program. If not, see <http://www.gnu.org/licenses/>.
1919

2020
use std::ops::Deref;
21+
use std::sync::OnceLock;
2122
use std::{env, fmt};
2223

2324
use anyhow::ensure;
@@ -370,11 +371,14 @@ impl S3StorageConfig {
370371
}
371372

372373
pub fn force_path_style_access(&self) -> Option<bool> {
373-
let force_path_style_access = get_bool_from_env(
374-
"QW_S3_FORCE_PATH_STYLE_ACCESS",
375-
self.force_path_style_access,
376-
);
377-
Some(force_path_style_access)
374+
static FORCE_PATH_STYLE: OnceLock<Option<bool>> = OnceLock::new();
375+
*FORCE_PATH_STYLE.get_or_init(|| {
376+
let force_path_style_access = get_bool_from_env(
377+
"QW_S3_FORCE_PATH_STYLE_ACCESS",
378+
self.force_path_style_access,
379+
);
380+
Some(force_path_style_access)
381+
})
378382
}
379383
}
380384

quickwit/quickwit-config/src/templating.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use tracing::debug;
3030
// `ENV_VAR` or `ENV_VAR:DEFAULT`
3131
// Ignores whitespaces in curly braces
3232
static TEMPLATE_ENV_VAR_CAPTURE: Lazy<Regex> = Lazy::new(|| {
33-
Regex::new(r"\$\{\s*([A-Za-z0-9_]+)\s*(?::\-\s*([\S]+)\s*)?}")
33+
Regex::new(r"\$\{\s*([A-Za-z0-9_]+)\s*(?::\-\s*([^\s\}]+)\s*)?}")
3434
.expect("regular expression should compile")
3535
});
3636

@@ -158,6 +158,23 @@ mod test {
158158
assert_eq!(rendered, "metastore_uri: s3://test-bucket/metastore");
159159
}
160160

161+
#[test]
162+
fn test_template_render_with_multiple_vars_per_line() {
163+
let config_content =
164+
b"metastore_uri: s3://${RENDER_MULTIPLE_BUCKET}/${RENDER_MULTIPLE_PREFIX:-index}#polling_interval=${RENDER_MULTIPLE_INTERVAL}s";
165+
env::set_var("RENDER_MULTIPLE_BUCKET", "test-bucket");
166+
env::set_var("RENDER_MULTIPLE_PREFIX", "metastore");
167+
env::set_var("RENDER_MULTIPLE_INTERVAL", "30");
168+
let rendered = render_config(config_content).unwrap();
169+
std::env::remove_var("RENDER_MULTIPLE_BUCKET");
170+
std::env::remove_var("RENDER_MULTIPLE_PREFIX");
171+
std::env::remove_var("RENDER_MULTIPLE_INTERVAL");
172+
assert_eq!(
173+
rendered,
174+
"metastore_uri: s3://test-bucket/metastore#polling_interval=30s"
175+
);
176+
}
177+
161178
#[test]
162179
fn test_template_render_ignores_commented_lines() {
163180
{

quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -633,14 +633,13 @@ fn inflate_node_capacities_if_necessary(problem: &mut SchedulingProblem) {
633633
/// to transform scheduling into a math problem.
634634
///
635635
/// This function implementation therefore goes
636-
/// - 1) transform our problem into a scheduling problem. Something closer to a well-defined
637-
/// optimization problem. In particular this step removes:
638-
/// - the notion of shard ids, and only considers a number of shards being allocated.
639-
/// - node_ids and shard ids. These are replaced by integers.
640-
/// - 2) convert the current situation of the cluster into something a previous scheduling
641-
/// solution.
642-
/// - 3) compute the new scheduling solution.
643-
/// - 4) convert the new scheduling solution back to the real world by reallocating the shard ids.
636+
/// 1) transform our problem into a scheduling problem. Something closer to a well-defined
637+
/// optimization problem. In particular this step removes:
638+
/// - the notion of shard ids, and only considers a number of shards being allocated.
639+
/// - node_ids and shard ids. These are replaced by integers.
640+
/// 2) convert the current situation of the cluster into something a previous scheduling solution.
641+
/// 3) compute the new scheduling solution.
642+
/// 4) convert the new scheduling solution back to the real world by reallocating the shard ids.
644643
///
645644
/// TODO cut into pipelines.
646645
/// Panics if any sources has no shards.

quickwit/quickwit-directories/src/debug_proxy_directory.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,8 @@ impl ReadOperationBuilder {
111111
/// recording all of its read operations.
112112
///
113113
/// It has two purpose
114-
/// - It is used when building our hotcache, to identify the file sections that
115-
/// should be in the hotcache.
114+
/// - It is used when building our hotcache, to identify the file sections that should be in the
115+
/// hotcache.
116116
/// - It is used in the search-api to provide debugging/performance information.
117117
#[derive(Debug)]
118118
pub struct DebugProxyDirectory<D: Directory> {

quickwit/quickwit-doc-mapper/src/default_doc_mapper/mapping_tree.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1735,7 +1735,7 @@ mod tests {
17351735
#[test]
17361736
fn test_parse_i64_too_large() {
17371737
let leaf = LeafType::I64(QuickwitNumericOptions::default());
1738-
let err = leaf.value_from_json(json!(u64::max_value())).err().unwrap();
1738+
let err = leaf.value_from_json(json!(u64::MAX)).err().unwrap();
17391739
assert_eq!(
17401740
err,
17411741
"expected i64, got inconvertible JSON number `18446744073709551615`"

quickwit/quickwit-indexing/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ testsuite = [
101101
"quickwit-storage/testsuite"
102102
]
103103
vrl = ["dep:vrl", "quickwit-config/vrl"]
104+
ci-test = []
104105

105106
[dev-dependencies]
106107
bytes = { workspace = true }

quickwit/quickwit-indexing/src/actors/cooperative_indexing.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,18 +36,17 @@ static ORIGIN_OF_TIME: Lazy<Instant> = Lazy::new(Instant::now);
3636
/// Cooperative indexing is a mechanism to deal with a large amount of pipelines.
3737
///
3838
/// Instead of having all pipelines index concurrently, cooperative indexing:
39-
/// - have them take turn, making sure that at most only N pipelines are indexing
40-
/// at the same time. This has the benefit is reducing RAM using (by having a limited number
41-
/// of `IndexWriter` at the same time), reducing context switching.
42-
/// - keeps the different pipelines work uniformously spread in time. If the system is not
43-
/// at capacity, we prefer to have the indexing pipeline as desynchronized as possible
44-
/// to make sure they don't all use the same resources (disk/cpu/network) at the
45-
/// same time.
39+
/// - have them take turn, making sure that at most only N pipelines are indexing at the same time.
40+
/// This has the benefit is reducing RAM using (by having a limited number of `IndexWriter` at the
41+
/// same time), reducing context switching.
42+
/// - keeps the different pipelines work uniformously spread in time. If the system is not at
43+
/// capacity, we prefer to have the indexing pipeline as desynchronized as possible to make sure
44+
/// they don't all use the same resources (disk/cpu/network) at the same time.
4645
///
4746
/// It works by:
4847
/// - a semaphore is used to restrict the number of pipelines indexing at the same time.
49-
/// - in the indexer when `on_drain` is called, the indexer will cut a split and
50-
/// "go to sleep" for a given amount of time.
48+
/// - in the indexer when `on_drain` is called, the indexer will cut a split and "go to sleep" for a
49+
/// given amount of time.
5150
///
5251
/// The key logic is in the computation of that sleep time.
5352
///

quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,9 @@ impl Ord for ScheduledMerge {
114114
/// This actor is not supervised and should stay as simple as possible.
115115
/// In particular,
116116
/// - the `ScheduleMerge` handler should reply in microseconds.
117-
/// - the task should never be dropped before reaching its `split_downloader_mailbox` destination
118-
/// as it would break the consistency of `MergePlanner` with the metastore (ie: several splits will
119-
/// never be merged).
117+
/// - the task should never be dropped before reaching its `split_downloader_mailbox` destination as
118+
/// it would break the consistency of `MergePlanner` with the metastore (ie: several splits will
119+
/// never be merged).
120120
pub struct MergeSchedulerService {
121121
merge_semaphore: Arc<Semaphore>,
122122
merge_concurrency: usize,

quickwit/quickwit-indexing/src/merge_policy/stable_log_merge_policy.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use crate::merge_policy::{splits_short_debug, MergeOperation, MergePolicy};
4040
///
4141
/// The policy first builds the merge operations
4242
///
43-
/// 1. Build merge operations
43+
/// ### Build merge operations
4444
/// We start by sorting the splits by reverse date so that the most recent splits are
4545
/// coming first.
4646
/// We iterate through the splits and assign them to increasing levels.
@@ -157,8 +157,8 @@ enum MergeCandidateSize {
157157
/// We should not add an extra split in this candidate.
158158
/// This can happen for any of the two following reasons:
159159
/// - the number of splits involved already reached `merge_factor_max`.
160-
/// - the overall number of docs that will end up in the merged segment already
161-
/// exceeds `max_merge_docs`.
160+
/// - the overall number of docs that will end up in the merged segment already exceeds
161+
/// `max_merge_docs`.
162162
OneMoreSplitWouldBeTooBig,
163163
}
164164

quickwit/quickwit-indexing/src/metrics.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pub struct IndexerMetrics {
3333
pub pending_merge_operations: IntGauge,
3434
pub pending_merge_bytes: IntGauge,
3535
// We use a lazy counter, as most users do not use Kafka.
36+
#[cfg_attr(not(feature = "kafka"), allow(dead_code))]
3637
pub kafka_rebalance_total: Lazy<IntCounter>,
3738
}
3839

quickwit/quickwit-indexing/src/source/kafka_source.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ macro_rules! return_if_err {
125125
/// The rebalance protocol at a very high level:
126126
/// - A consumer joins or leaves a consumer group.
127127
/// - Consumers receive a revoke partitions notification, which gives them the opportunity to commit
128-
/// the work in progress.
128+
/// the work in progress.
129129
/// - Broker waits for ALL the consumers to ack the revoke notification (synchronization barrier).
130130
/// - Consumers receive new partition assignmennts.
131131
///

quickwit/quickwit-indexing/src/source/kinesis/api.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ pub(crate) async fn list_shards(
119119
}
120120
}
121121

122-
#[cfg(test)]
122+
#[cfg(all(test, feature = "kinesis-localstack-tests"))]
123123
pub(crate) mod tests {
124124
use std::collections::BTreeSet;
125125
use std::time::Duration;

quickwit/quickwit-indexing/src/source/kinesis/helpers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ pub async fn get_kinesis_client(region_or_endpoint: RegionOrEndpoint) -> anyhow:
5050
Ok(Client::from_conf(kinesis_config.build()))
5151
}
5252

53-
#[cfg(test)]
53+
#[cfg(all(test, feature = "kinesis-localstack-tests"))]
5454
pub(crate) mod tests {
5555
use std::collections::HashMap;
5656
use std::time::Duration;

quickwit/quickwit-indexing/src/source/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -627,7 +627,10 @@ mod tests {
627627
}
628628
}
629629

630-
#[cfg(any(feature = "kafka", feature = "sqs"))]
630+
#[cfg(all(
631+
test,
632+
any(feature = "kafka-broker-tests", feature = "sqs-localstack-tests")
633+
))]
631634
pub fn with_metastore(mut self, metastore: MetastoreServiceClient) -> Self {
632635
self.metastore_opt = Some(metastore);
633636
self

0 commit comments

Comments
 (0)