Skip to content

Commit

Permalink
fix(metric): better agg state cache miss ratio metric (#19012)
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <stdrc@outlook.com>
  • Loading branch information
stdrc authored Oct 21, 2024
1 parent 2a71574 commit 120747f
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 23 deletions.
9 changes: 4 additions & 5 deletions src/stream/src/executor/aggregation/agg_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
row_count_index: usize,
extreme_cache_size: usize,
input_schema: &Schema,
) -> StreamExecutorResult<(Self, AggStateCacheStats)> {
) -> StreamExecutorResult<Self> {
let encoded_states = intermediate_state_table
.get_row(group_key.as_ref().map(GroupKey::table_pk))
.await?;
Expand Down Expand Up @@ -239,12 +239,11 @@ impl<S: StateStore, Strtg: Strategy> AggGroup<S, Strtg> {
};

if encoded_states.is_some() {
let (_, outputs, stats) = this.get_outputs(storages, agg_funcs).await?;
let (_, outputs, _stats) = this.get_outputs(storages, agg_funcs).await?;
this.prev_outputs = Some(outputs);
Ok((this, stats))
} else {
Ok((this, AggStateCacheStats::default()))
}

Ok(this)
}

/// Create a group from encoded states for EOWC. The previous output is set to `None`.
Expand Down
7 changes: 3 additions & 4 deletions src/stream/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
Some(async {
// Create `AggGroup` for the current group if not exists. This will
// restore agg states from the intermediate state table.
let (agg_group, stats) = AggGroup::create(
let agg_group = AggGroup::create(
this.version,
Some(GroupKey::new(
key.deserialize(group_key_types)?,
Expand All @@ -302,7 +302,7 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
&this.input_schema,
)
.await?;
Ok::<_, StreamExecutorError>((key.clone(), Box::new(agg_group), stats))
Ok::<_, StreamExecutorError>((key.clone(), Box::new(agg_group)))
})
}
}
Expand All @@ -315,10 +315,9 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
vars.stats.chunk_lookup_miss_count += 1;
let mut buffered = stream::iter(futs).buffer_unordered(10).fuse();
while let Some(result) = buffered.next().await {
let (key, agg_group, stats) = result?;
let (key, agg_group) = result?;
let none = vars.dirty_groups.insert(key, agg_group);
debug_assert!(none.is_none());
vars.stats.merge_state_cache_stats(stats);
}
}
Ok(())
Expand Down
27 changes: 13 additions & 14 deletions src/stream/src/executor/simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,21 +255,20 @@ impl<S: StateStore> SimpleAggExecutor<S> {
yield Message::Barrier(barrier);

// This will fetch previous agg states from the intermediate state table.
let (agg_group, _stats) = AggGroup::create(
this.version,
None,
&this.agg_calls,
&this.agg_funcs,
&this.storages,
&this.intermediate_state_table,
&this.input_pk_indices,
this.row_count_index,
this.extreme_cache_size,
&this.input_schema,
)
.await?;
let mut vars = ExecutionVars {
agg_group,
agg_group: AggGroup::create(
this.version,
None,
&this.agg_calls,
&this.agg_funcs,
&this.storages,
&this.intermediate_state_table,
&this.input_pk_indices,
this.row_count_index,
this.extreme_cache_size,
&this.input_schema,
)
.await?,
distinct_dedup,
state_changed: false,
};
Expand Down

0 comments on commit 120747f

Please sign in to comment.