Skip to content

Commit

Permalink
fix(query): backport hash table scatter will always send agg meta (#1โ€ฆ
Browse files Browse the repository at this point in the history
โ€ฆ7267)

* fix(query): hash table scatter will always send agg meta (#17245)

* fix(query): hash table scatter will always send agg meta

* fix(query): hash table scatter will always send agg meta

* fix(query): z

* fix(query): z

* update

* update

* update

* z

* z

* z

* z

* z

* z

* z

* fix(query): increase state_rows in copy agg state rows (#17252)

* fix(query): increase state_rows in copy agg state rows

* fix(query): increase state_rows in copy agg state rows

* fix(query): increase state_rows in copy agg state rows

* Revert "fix(query): increase state_rows in copy agg state rows (#17252)"

This reverts commit 7af80b1.
  • Loading branch information
sundy-li authored Jan 14, 2025
1 parent 9987c34 commit 4c4896d
Show file tree
Hide file tree
Showing 11 changed files with 115 additions and 78 deletions.
11 changes: 5 additions & 6 deletions src/query/expression/src/aggregate/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,15 +390,14 @@ impl Payload {
true
}

pub fn empty_block(&self) -> DataBlock {
let columns = self
.aggrs
.iter()
.map(|f| ColumnBuilder::with_capacity(&f.return_type().unwrap(), 0).build())
pub fn empty_block(&self, fake_rows: Option<usize>) -> DataBlock {
let fake_rows = fake_rows.unwrap_or(0);
let columns = (0..self.aggrs.len())
.map(|_| ColumnBuilder::repeat_default(&DataType::Binary, fake_rows).build())
.chain(
self.group_types
.iter()
.map(|t| ColumnBuilder::with_capacity(t, 0).build()),
.map(|t| ColumnBuilder::repeat_default(t, fake_rows).build()),
)
.collect_vec();
DataBlock::new_from_columns(columns)
Expand Down
4 changes: 2 additions & 2 deletions src/query/expression/src/aggregate/payload_flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl Payload {
}

if blocks.is_empty() {
return Ok(self.empty_block());
return Ok(self.empty_block(None));
}
DataBlock::concat(&blocks)
}
Expand Down Expand Up @@ -173,7 +173,7 @@ impl Payload {
}

if blocks.is_empty() {
return Ok(self.empty_block());
return Ok(self.empty_block(None));
}

DataBlock::concat(&blocks)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,26 +173,20 @@ impl FlightScatter for HashTableHashScatter {
AggregateMeta::Partitioned { .. } => unreachable!(),
AggregateMeta::AggregateSpilling(payload) => {
for p in scatter_partitioned_payload(payload, self.buckets)? {
blocks.push(match p.len() == 0 {
true => DataBlock::empty(),
false => DataBlock::empty_with_meta(
AggregateMeta::create_agg_spilling(p),
),
});
blocks.push(DataBlock::empty_with_meta(
AggregateMeta::create_agg_spilling(p),
));
}
}
AggregateMeta::AggregatePayload(p) => {
for payload in scatter_payload(p.payload, self.buckets)? {
blocks.push(match payload.len() == 0 {
true => DataBlock::empty(),
false => {
DataBlock::empty_with_meta(AggregateMeta::create_agg_payload(
p.bucket,
payload,
p.max_partition_count,
))
}
});
blocks.push(DataBlock::empty_with_meta(
AggregateMeta::create_agg_payload(
p.bucket,
payload,
p.max_partition_count,
),
));
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ impl NewTransformPartitionBucket {
#[allow(unused_assignments)]
fn add_bucket(&mut self, mut data_block: DataBlock) -> Result<(isize, usize)> {
let (mut bucket, mut partition_count) = (0, 0);
let mut is_empty_block = false;
if let Some(block_meta) = data_block.get_meta() {
if let Some(block_meta) = AggregateMeta::downcast_ref_from(block_meta) {
(bucket, partition_count) = match block_meta {
Expand Down Expand Up @@ -250,7 +251,11 @@ impl NewTransformPartitionBucket {
if let Some(AggregateMeta::Spilled(buckets_payload)) =
AggregateMeta::downcast_from(meta)
{
let partition_count = buckets_payload[0].max_partition_count;
let partition_count = if !buckets_payload.is_empty() {
buckets_payload[0].max_partition_count
} else {
MAX_PARTITION_COUNT
};
self.max_partition_count =
self.max_partition_count.max(partition_count);

Expand All @@ -274,12 +279,14 @@ impl NewTransformPartitionBucket {
unreachable!()
}
AggregateMeta::Serialized(payload) => {
is_empty_block = payload.data_block.is_empty();
self.max_partition_count =
self.max_partition_count.max(payload.max_partition_count);

(payload.bucket, payload.max_partition_count)
}
AggregateMeta::AggregatePayload(payload) => {
is_empty_block = payload.payload.len() == 0;
self.max_partition_count =
self.max_partition_count.max(payload.max_partition_count);

Expand All @@ -298,23 +305,25 @@ impl NewTransformPartitionBucket {
));
}

if self.all_inputs_init {
if partition_count != self.max_partition_count {
return Err(ErrorCode::Internal(
if !is_empty_block {
if self.all_inputs_init {
if partition_count != self.max_partition_count {
return Err(ErrorCode::Internal(
"Internal, the partition count does not equal the max partition count on TransformPartitionBucket.
",
));
}
match self.buckets_blocks.entry(bucket) {
Entry::Vacant(v) => {
v.insert(vec![data_block]);
}
Entry::Occupied(mut v) => {
v.get_mut().push(data_block);
}
};
} else {
self.unpartitioned_blocks.push(data_block);
match self.buckets_blocks.entry(bucket) {
Entry::Vacant(v) => {
v.insert(vec![data_block]);
}
Entry::Occupied(mut v) => {
v.get_mut().push(data_block);
}
};
} else {
self.unpartitioned_blocks.push(data_block);
}
}

Ok((bucket, partition_count))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,23 @@ pub struct AggregateSerdeMeta {
pub columns_layout: Vec<usize>,
// use for new agg hashtable
pub max_partition_count: usize,
pub is_empty: bool,
}

impl AggregateSerdeMeta {
pub fn create(bucket: isize) -> BlockMetaInfoPtr {
Box::new(AggregateSerdeMeta {
typ: BUCKET_TYPE,
bucket,
location: None,
data_range: None,
columns_layout: vec![],
max_partition_count: 0,
})
}

pub fn create_agg_payload(bucket: isize, max_partition_count: usize) -> BlockMetaInfoPtr {
pub fn create_agg_payload(
bucket: isize,
max_partition_count: usize,
is_empty: bool,
) -> BlockMetaInfoPtr {
Box::new(AggregateSerdeMeta {
typ: BUCKET_TYPE,
bucket,
location: None,
data_range: None,
columns_layout: vec![],
max_partition_count,
is_empty,
})
}

Expand All @@ -61,6 +56,7 @@ impl AggregateSerdeMeta {
location: String,
data_range: Range<u64>,
columns_layout: Vec<usize>,
is_empty: bool,
) -> BlockMetaInfoPtr {
Box::new(AggregateSerdeMeta {
typ: SPILLED_TYPE,
Expand All @@ -69,6 +65,7 @@ impl AggregateSerdeMeta {
location: Some(location),
data_range: Some(data_range),
max_partition_count: 0,
is_empty,
})
}

Expand All @@ -86,6 +83,7 @@ impl AggregateSerdeMeta {
location: Some(location),
data_range: Some(data_range),
max_partition_count,
is_empty: false,
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ pub struct SerializeAggregateStream {
pub payload: Pin<Box<SerializePayload>>,
flush_state: PayloadFlushState,
end_iter: bool,
nums: usize,
}

unsafe impl Send for SerializeAggregateStream {}
Expand All @@ -198,6 +199,7 @@ impl SerializeAggregateStream {
flush_state: PayloadFlushState::default(),
_params: params.clone(),
end_iter: false,
nums: 0,
}
}
}
Expand Down Expand Up @@ -225,10 +227,32 @@ impl SerializeAggregateStream {
}

match block {
Some(block) => Ok(Some(block.add_meta(Some(
AggregateSerdeMeta::create_agg_payload(p.bucket, p.max_partition_count),
))?)),
None => Ok(None),
Some(block) => {
self.nums += 1;
Ok(Some(block.add_meta(Some(
AggregateSerdeMeta::create_agg_payload(
p.bucket,
p.max_partition_count,
false,
),
))?))
}
None => {
// always return at least one block
if self.nums == 0 {
self.nums += 1;
let block = p.payload.empty_block(Some(1));
Ok(Some(block.add_meta(Some(
AggregateSerdeMeta::create_agg_payload(
p.bucket,
p.max_partition_count,
true,
),
))?))
} else {
Ok(None)
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,26 @@ impl TransformDeserializer {
}
Some(meta) => {
return match meta.typ == BUCKET_TYPE {
true => Ok(DataBlock::empty_with_meta(
AggregateMeta::create_serialized(
meta.bucket,
deserialize_block(
dict,
fragment_data,
&self.schema,
self.arrow_schema.clone(),
)?,
meta.max_partition_count,
),
)),
true => {
let mut block = deserialize_block(
dict,
fragment_data,
&self.schema,
self.arrow_schema.clone(),
)?;

if meta.is_empty {
block = block.slice(0..0);
}

Ok(DataBlock::empty_with_meta(
AggregateMeta::create_serialized(
meta.bucket,
block,
meta.max_partition_count,
),
))
}
false => {
let data_schema = Arc::new(exchange_defines::spilled_schema());
let arrow_schema = Arc::new(exchange_defines::spilled_arrow_schema());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,31 +149,28 @@ impl BlockMetaTransform<ExchangeShuffleMeta> for TransformExchangeAggregateSeria
}

Some(AggregateMeta::AggregatePayload(p)) => {
let (bucket, max_partition_count) = (p.bucket, p.max_partition_count);

if index == self.local_pos {
serialized_blocks.push(FlightSerialized::DataBlock(
block.add_meta(Some(Box::new(AggregateMeta::AggregatePayload(p))))?,
));
continue;
}

let bucket = compute_block_number(p.bucket, p.max_partition_count)?;
let block_number = compute_block_number(bucket, max_partition_count)?;
let stream = SerializeAggregateStream::create(
&self.params,
SerializePayload::AggregatePayload(p),
);
let mut stream_blocks = stream.into_iter().collect::<Result<Vec<_>>>()?;

if stream_blocks.is_empty() {
serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty()));
} else {
let mut c = DataBlock::concat(&stream_blocks)?;
if let Some(meta) = stream_blocks[0].take_meta() {
c.replace_meta(meta);
}

let c = serialize_block(bucket, c, &self.options)?;
serialized_blocks.push(FlightSerialized::DataBlock(c));
debug_assert!(!stream_blocks.is_empty());
let mut c = DataBlock::concat(&stream_blocks)?;
if let Some(meta) = stream_blocks[0].take_meta() {
c.replace_meta(meta);
}
let c = serialize_block(block_number, c, &self.options)?;
serialized_blocks.push(FlightSerialized::DataBlock(c));
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ impl TransformFinalAggregate {
AggregateMeta::Serialized(payload) => match agg_hashtable.as_mut() {
Some(ht) => {
debug_assert!(bucket == payload.bucket);

let payload = payload.convert_to_partitioned_payload(
self.params.group_data_types.clone(),
self.params.aggregate_functions.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ impl IEJoinState {
fn intersection(&self, left_block: &DataBlock, right_block: &DataBlock) -> bool {
let left_len = left_block.num_rows();
let right_len = right_block.num_rows();
if left_len == 0 || right_len == 0 {
return false;
}

let left_l1_column = left_block.columns()[0]
.value
.convert_to_full_column(&self.l1_data_type, left_len);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ impl BlockingTransform for TransformSRF {
}

let input = self.input.take().unwrap();
if input.is_empty() {
return Ok(None);
}

let mut result_size = 0;
let mut used = 0;
Expand Down

0 comments on commit 4c4896d

Please sign in to comment.