Skip to content

Commit

Permalink
Merge branch 'main' of github.com:samoii/quickwit into aws-s3-sse
Browse files Browse the repository at this point in the history
  • Loading branch information
samoii committed Oct 3, 2024
2 parents 6ef96ad + 060dfd8 commit 6529325
Show file tree
Hide file tree
Showing 31 changed files with 1,657 additions and 357 deletions.
2 changes: 1 addition & 1 deletion docs/configuration/node-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ This section contains the configuration options for a Searcher.
| `max_num_concurrent_split_searches` | Maximum number of concurrent split search requests running on a Searcher. | `100` |
| `max_num_concurrent_split_streams` | Maximum number of concurrent split stream requests running on a Searcher. | `100` |
| `split_cache` | Searcher split cache configuration options defined in the section below. Cache disabled if unspecified. | |

| `request_timeout_secs` | The time before a search request is cancelled. This should match the timeout of the stack calling into quickwit if there is one set. | `30` |

### Searcher split cache configuration

Expand Down
14 changes: 8 additions & 6 deletions docs/get-started/tutorials/trace-analytics-with-grafana.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ You only need a few minutes to get Grafana working with Quickwit and build meani

## Create a Docker Compose recipe

Let's add a [Quickwit instance](../installation.md) with the OTLP service enabled.
First, create a `docker-compose.yml` file. This file will define the services needed to run Quickwit with OpenTelemetry and Grafana with the Quickwit Datasource plugin.

Below is the complete Docker Compose configuration:

```yaml
version: '3.0'
Expand All @@ -25,23 +27,21 @@ services:
ports:
- 7280:7280
command: ["run"]
```

Then we create a [Grafana](https://grafana.com/docs/grafana/latest/setup-grafana/installation/docker/#run-grafana-via-docker-compose) service with the [Quickwit Datasource](https://github.com/quickwit-oss/quickwit-datasource) plugin.
```yaml
grafana:
image: grafana/grafana-oss
container_name: grafana
ports:
- "${MAP_HOST_GRAFANA:-127.0.0.1}:3000:3000"
environment:
GF_INSTALL_PLUGINS: https://github.com/quickwit-oss/quickwit-datasource/releases/download/v0.3.1/quickwit-quickwit-datasource-0.3.1.zip;quickwit-quickwit-datasource
GF_INSTALL_PLUGINS: https://github.com/quickwit-oss/quickwit-datasource/releases/download/v0.4.6/quickwit-quickwit-datasource-0.4.6.zip;quickwit-quickwit-datasource
GF_AUTH_DISABLE_LOGIN_FORM: "true"
GF_AUTH_ANONYMOUS_ENABLED: "true"
GF_AUTH_ANONYMOUS_ORG_ROLE: Admin
```
The default Grafana port is 3000. If this port is already taken, you can modify the port mapping, for example, changing 3000:3000 to 3100:3000 or any other available port.
Save and run the recipe:
```bash
Expand Down Expand Up @@ -99,3 +99,5 @@ Quickwit sends itself its own traces, so you should already have data to display
Here's what your first dashboard can look like :

![Quickwit Panel in Grafana Dashboard](../../assets/images/screenshot-grafana-tutorial-dashboard.png)


2 changes: 1 addition & 1 deletion docs/reference/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ title: Metrics
sidebar_position: 70
---

Quickwit exposes some key metrics via [Prometheus](https://prometheus.io/). You can use any front-end that supports Prometheus to examine the behavior of Quickwit visually.
Quickwit exposes key metrics in the [Prometheus](https://prometheus.io/) format on the `/metrics` endpoint. You can use any front-end that supports Prometheus to examine the behavior of Quickwit visually.

## Cache Metrics

Expand Down
1 change: 0 additions & 1 deletion quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 14 additions & 8 deletions quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,8 @@ fn indexing_task_to_chitchat_kv(indexing_task: &IndexingTask) -> (String, String
let index_uid = indexing_task.index_uid();
let key = format!("{INDEXING_TASK_PREFIX}{}", indexing_task.pipeline_uid());
let shard_ids_str = shard_ids.iter().sorted().join(",");
let value = format!("{index_uid}:{source_id}:{shard_ids_str}");
let fingerprint = indexing_task.params_fingerprint;
let value = format!("{index_uid}:{source_id}:{fingerprint}:{shard_ids_str}");
(key, value)
}

Expand All @@ -536,16 +537,20 @@ fn parse_shard_ids_str(shard_ids_str: &str) -> Vec<ShardId> {
fn chitchat_kv_to_indexing_task(key: &str, value: &str) -> Option<IndexingTask> {
let pipeline_uid_str = key.strip_prefix(INDEXING_TASK_PREFIX)?;
let pipeline_uid = PipelineUid::from_str(pipeline_uid_str).ok()?;
let (source_uid, shards_str) = value.rsplit_once(':')?;
let (index_uid, source_id) = source_uid.rsplit_once(':')?;
let mut field_iterator = value.rsplitn(4, ':');
let shards_str = field_iterator.next()?;
let fingerprint_str = field_iterator.next()?;
let source_id = field_iterator.next()?;
let index_uid = field_iterator.next()?;
let params_fingerprint: u64 = fingerprint_str.parse().ok()?;
let index_uid = index_uid.parse().ok()?;
let shard_ids = parse_shard_ids_str(shards_str);
Some(IndexingTask {
index_uid: Some(index_uid),
source_id: source_id.to_string(),
pipeline_uid: Some(pipeline_uid),
shard_ids,
params_fingerprint: 0,
params_fingerprint,
})
}

Expand Down Expand Up @@ -1143,11 +1148,11 @@ mod tests {
let mut chitchat_guard = chitchat_handle.lock().await;
chitchat_guard.self_node_state().set(
format!("{INDEXING_TASK_PREFIX}01BX5ZZKBKACTAV9WEVGEMMVS0"),
"my_index:00000000000000000000000000:my_source:1,3".to_string(),
"my_index:00000000000000000000000000:my_source:41:1,3".to_string(),
);
chitchat_guard.self_node_state().set(
format!("{INDEXING_TASK_PREFIX}01BX5ZZKBKACTAV9WEVGEMMVS1"),
"my_index-00000000000000000000000000-my_source:3,5".to_string(),
"my_index-00000000000000000000000000-my_source:53:3,5".to_string(),
);
}
node.wait_for_ready_members(|members| members.len() == 1, Duration::from_secs(5))
Expand Down Expand Up @@ -1358,14 +1363,15 @@ mod tests {
#[test]
fn test_parse_chitchat_kv() {
assert!(
chitchat_kv_to_indexing_task("invalidulid", "my_index:uid:my_source:1,3").is_none()
chitchat_kv_to_indexing_task("invalidulid", "my_index:uid:my_source:42:1,3").is_none()
);
let task = super::chitchat_kv_to_indexing_task(
"indexer.task:01BX5ZZKBKACTAV9WEVGEMMVS0",
"my_index:00000000000000000000000000:my_source:00000000000000000001,\
"my_index:00000000000000000000000000:my_source:42:00000000000000000001,\
00000000000000000003",
)
.unwrap();
assert_eq!(task.params_fingerprint, 42);
assert_eq!(
task.pipeline_uid(),
PipelineUid::from_str("01BX5ZZKBKACTAV9WEVGEMMVS0").unwrap()
Expand Down
20 changes: 10 additions & 10 deletions quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,10 +676,8 @@ impl IngestController {
model: &mut ControlPlaneModel,
progress: &Progress,
) -> MetastoreResult<()> {
const NUM_PERMITS: u64 = 1;

if !model
.acquire_scaling_permits(&source_uid, ScalingMode::Up, NUM_PERMITS)
.acquire_scaling_permits(&source_uid, ScalingMode::Up)
.unwrap_or(false)
{
return Ok(());
Expand All @@ -698,7 +696,7 @@ impl IngestController {
if successful_source_uids.is_empty() {
// We did not manage to create the shard.
// We can release our permit.
model.release_scaling_permits(&source_uid, ScalingMode::Up, NUM_PERMITS);
model.release_scaling_permits(&source_uid, ScalingMode::Up);
warn!(
index_uid=%source_uid.index_uid,
source_id=%source_uid.source_id,
Expand All @@ -722,7 +720,7 @@ impl IngestController {
source_id=%source_uid.source_id,
"scaling up number of shards to {new_num_open_shards} failed: {metastore_error:?}"
);
model.release_scaling_permits(&source_uid, ScalingMode::Up, NUM_PERMITS);
model.release_scaling_permits(&source_uid, ScalingMode::Up);
Err(metastore_error)
}
}
Expand Down Expand Up @@ -860,10 +858,12 @@ impl IngestController {
model: &mut ControlPlaneModel,
progress: &Progress,
) -> MetastoreResult<()> {
const NUM_PERMITS: u64 = 1;
if shard_stats.num_open_shards == 0 {
return Ok(());
}

if !model
.acquire_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS)
.acquire_scaling_permits(&source_uid, ScalingMode::Down)
.unwrap_or(false)
{
return Ok(());
Expand All @@ -876,12 +876,12 @@ impl IngestController {
"scaling down number of shards to {new_num_open_shards}"
);
let Some((leader_id, shard_id)) = find_scale_down_candidate(&source_uid, model) else {
model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS);
model.release_scaling_permits(&source_uid, ScalingMode::Down);
return Ok(());
};
info!("scaling down shard {shard_id} from {leader_id}");
let Some(ingester) = self.ingester_pool.get(&leader_id) else {
model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS);
model.release_scaling_permits(&source_uid, ScalingMode::Down);
return Ok(());
};
let shard_pkeys = vec![ShardPKey {
Expand All @@ -896,7 +896,7 @@ impl IngestController {
.await
{
warn!("failed to scale down number of shards: {error}");
model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS);
model.release_scaling_permits(&source_uid, ScalingMode::Down);
return Ok(());
}
model.close_shards(&source_uid, &[shard_id]);
Expand Down
12 changes: 3 additions & 9 deletions quickwit/quickwit-control-plane/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,25 +378,19 @@ impl ControlPlaneModel {
&mut self,
source_uid: &SourceUid,
scaling_mode: ScalingMode,
num_permits: u64,
) -> Option<bool> {
self.shard_table
.acquire_scaling_permits(source_uid, scaling_mode, num_permits)
.acquire_scaling_permits(source_uid, scaling_mode)
}

pub fn drain_scaling_permits(&mut self, source_uid: &SourceUid, scaling_mode: ScalingMode) {
self.shard_table
.drain_scaling_permits(source_uid, scaling_mode)
}

pub fn release_scaling_permits(
&mut self,
source_uid: &SourceUid,
scaling_mode: ScalingMode,
num_permits: u64,
) {
pub fn release_scaling_permits(&mut self, source_uid: &SourceUid, scaling_mode: ScalingMode) {
self.shard_table
.release_scaling_permits(source_uid, scaling_mode, num_permits)
.release_scaling_permits(source_uid, scaling_mode)
}
}

Expand Down
28 changes: 11 additions & 17 deletions quickwit/quickwit-control-plane/src/model/shard_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,14 +544,13 @@ impl ShardTable {
&mut self,
source_uid: &SourceUid,
scaling_mode: ScalingMode,
num_permits: u64,
) -> Option<bool> {
let table_entry = self.table_entries.get_mut(source_uid)?;
let scaling_rate_limiter = match scaling_mode {
ScalingMode::Up => &mut table_entry.scaling_up_rate_limiter,
ScalingMode::Down => &mut table_entry.scaling_down_rate_limiter,
};
Some(scaling_rate_limiter.acquire(num_permits))
Some(scaling_rate_limiter.acquire(1))
}

pub fn drain_scaling_permits(&mut self, source_uid: &SourceUid, scaling_mode: ScalingMode) {
Expand All @@ -564,18 +563,13 @@ impl ShardTable {
}
}

pub fn release_scaling_permits(
&mut self,
source_uid: &SourceUid,
scaling_mode: ScalingMode,
num_permits: u64,
) {
pub fn release_scaling_permits(&mut self, source_uid: &SourceUid, scaling_mode: ScalingMode) {
if let Some(table_entry) = self.table_entries.get_mut(source_uid) {
let scaling_rate_limiter = match scaling_mode {
ScalingMode::Up => &mut table_entry.scaling_up_rate_limiter,
ScalingMode::Down => &mut table_entry.scaling_down_rate_limiter,
};
scaling_rate_limiter.release(num_permits);
scaling_rate_limiter.release(1);
}
}
}
Expand Down Expand Up @@ -1058,7 +1052,7 @@ mod tests {
source_id: source_id.clone(),
};
assert!(shard_table
.acquire_scaling_permits(&source_uid, ScalingMode::Up, 1)
.acquire_scaling_permits(&source_uid, ScalingMode::Up)
.is_none());

shard_table.add_source(&index_uid, &source_id);
Expand All @@ -1071,7 +1065,7 @@ mod tests {
.available_permits();

assert!(shard_table
.acquire_scaling_permits(&source_uid, ScalingMode::Up, 1)
.acquire_scaling_permits(&source_uid, ScalingMode::Up)
.unwrap());

let new_available_permits = shard_table
Expand All @@ -1096,7 +1090,7 @@ mod tests {
source_id: source_id.clone(),
};
assert!(shard_table
.acquire_scaling_permits(&source_uid, ScalingMode::Down, 1)
.acquire_scaling_permits(&source_uid, ScalingMode::Down)
.is_none());

shard_table.add_source(&index_uid, &source_id);
Expand All @@ -1109,7 +1103,7 @@ mod tests {
.available_permits();

assert!(shard_table
.acquire_scaling_permits(&source_uid, ScalingMode::Down, 1)
.acquire_scaling_permits(&source_uid, ScalingMode::Down)
.unwrap());

let new_available_permits = shard_table
Expand Down Expand Up @@ -1143,10 +1137,10 @@ mod tests {
.available_permits();

assert!(shard_table
.acquire_scaling_permits(&source_uid, ScalingMode::Up, 1)
.acquire_scaling_permits(&source_uid, ScalingMode::Up)
.unwrap());

shard_table.release_scaling_permits(&source_uid, ScalingMode::Up, 1);
shard_table.release_scaling_permits(&source_uid, ScalingMode::Up);

let new_available_permits = shard_table
.table_entries
Expand Down Expand Up @@ -1179,10 +1173,10 @@ mod tests {
.available_permits();

assert!(shard_table
.acquire_scaling_permits(&source_uid, ScalingMode::Down, 1)
.acquire_scaling_permits(&source_uid, ScalingMode::Down)
.unwrap());

shard_table.release_scaling_permits(&source_uid, ScalingMode::Down, 1);
shard_table.release_scaling_permits(&source_uid, ScalingMode::Down);

let new_available_permits = shard_table
.table_entries
Expand Down
1 change: 0 additions & 1 deletion quickwit/quickwit-datetime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ license.workspace = true
[dependencies]
anyhow = { workspace = true }
itertools = { workspace = true }
ouroboros = "0.18.0"
serde = { workspace = true }
serde_json = { workspace = true }
tantivy = { workspace = true }
Expand Down
Loading

0 comments on commit 6529325

Please sign in to comment.