diff --git a/src/stream/src/executor/aggregation/agg_group.rs b/src/stream/src/executor/aggregation/agg_group.rs index 116df36b4090..ba59b60ce68d 100644 --- a/src/stream/src/executor/aggregation/agg_group.rs +++ b/src/stream/src/executor/aggregation/agg_group.rs @@ -207,7 +207,7 @@ impl AggGroup { row_count_index: usize, extreme_cache_size: usize, input_schema: &Schema, - ) -> StreamExecutorResult<(Self, AggStateCacheStats)> { + ) -> StreamExecutorResult { let encoded_states = intermediate_state_table .get_row(group_key.as_ref().map(GroupKey::table_pk)) .await?; @@ -239,12 +239,11 @@ impl AggGroup { }; if encoded_states.is_some() { - let (_, outputs, stats) = this.get_outputs(storages, agg_funcs).await?; + let (_, outputs, _stats) = this.get_outputs(storages, agg_funcs).await?; this.prev_outputs = Some(outputs); - Ok((this, stats)) - } else { - Ok((this, AggStateCacheStats::default())) } + + Ok(this) } /// Create a group from encoded states for EOWC. The previous output is set to `None`. diff --git a/src/stream/src/executor/hash_agg.rs b/src/stream/src/executor/hash_agg.rs index eedba01799c6..25003aff7f70 100644 --- a/src/stream/src/executor/hash_agg.rs +++ b/src/stream/src/executor/hash_agg.rs @@ -286,7 +286,7 @@ impl HashAggExecutor { Some(async { // Create `AggGroup` for the current group if not exists. This will // restore agg states from the intermediate state table. - let (agg_group, stats) = AggGroup::create( + let agg_group = AggGroup::create( this.version, Some(GroupKey::new( key.deserialize(group_key_types)?, @@ -302,7 +302,7 @@ impl HashAggExecutor { &this.input_schema, ) .await?; - Ok::<_, StreamExecutorError>((key.clone(), Box::new(agg_group), stats)) + Ok::<_, StreamExecutorError>((key.clone(), Box::new(agg_group))) }) } } @@ -315,10 +315,9 @@ impl HashAggExecutor { vars.stats.chunk_lookup_miss_count += 1; let mut buffered = stream::iter(futs).buffer_unordered(10).fuse(); while let Some(result) = buffered.next().await { - let (key, agg_group, stats) = result?; + let (key, agg_group) = result?; let none = vars.dirty_groups.insert(key, agg_group); debug_assert!(none.is_none()); - vars.stats.merge_state_cache_stats(stats); } } Ok(()) diff --git a/src/stream/src/executor/simple_agg.rs b/src/stream/src/executor/simple_agg.rs index 058ab1142f7a..6f8526661589 100644 --- a/src/stream/src/executor/simple_agg.rs +++ b/src/stream/src/executor/simple_agg.rs @@ -255,21 +255,20 @@ impl SimpleAggExecutor { yield Message::Barrier(barrier); // This will fetch previous agg states from the intermediate state table. - let (agg_group, _stats) = AggGroup::create( - this.version, - None, - &this.agg_calls, - &this.agg_funcs, - &this.storages, - &this.intermediate_state_table, - &this.input_pk_indices, - this.row_count_index, - this.extreme_cache_size, - &this.input_schema, - ) - .await?; let mut vars = ExecutionVars { - agg_group, + agg_group: AggGroup::create( + this.version, + None, + &this.agg_calls, + &this.agg_funcs, + &this.storages, + &this.intermediate_state_table, + &this.input_pk_indices, + this.row_count_index, + this.extreme_cache_size, + &this.input_schema, + ) + .await?, distinct_dedup, state_changed: false, };