Skip to content

Commit 93dd62c

Browse files
committed
Allow repartitioning on files with ranges
Fixes #18940 When any partitioned file had ranges selected (even if the range contained the whole file), repartitioning was being skipped. There was no good reason to disallow this. I have now allowed this and change the implementation to respect file ranges. Added a couple of unit tests too.
1 parent 769f367 commit 93dd62c

File tree

1 file changed

+68
-36
lines changed

1 file changed

+68
-36
lines changed

datafusion/datasource/src/file_groups.rs

Lines changed: 68 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -189,15 +189,6 @@ impl FileGroupPartitioner {
189189
return None;
190190
}
191191

192-
// Perform redistribution only in case all files should be read from beginning to end
193-
let has_ranges = file_groups
194-
.iter()
195-
.flat_map(FileGroup::iter)
196-
.any(|f| f.range.is_some());
197-
if has_ranges {
198-
return None;
199-
}
200-
201192
// special case when order must be preserved
202193
if self.preserve_order_within_groups {
203194
self.repartition_preserving_order(file_groups)
@@ -218,14 +209,21 @@ impl FileGroupPartitioner {
218209

219210
let total_size = flattened_files
220211
.iter()
221-
.map(|f| f.object_meta.size as i64)
212+
.map(|f| {
213+
if let Some(range) = &f.range {
214+
range.end - range.start
215+
} else {
216+
f.object_meta.size as i64
217+
}
218+
})
222219
.sum::<i64>();
223220
if total_size < (repartition_file_min_size as i64) || total_size == 0 {
224221
return None;
225222
}
226223

227224
let target_partition_size =
228225
(total_size as u64).div_ceil(target_partitions as u64);
226+
println!("total_size={total_size}, target_partitions={target_partitions}, target_partition_size={target_partition_size}");
229227

230228
let current_partition_index: usize = 0;
231229
let current_partition_size: u64 = 0;
@@ -235,27 +233,35 @@ impl FileGroupPartitioner {
235233
.into_iter()
236234
.scan(
237235
(current_partition_index, current_partition_size),
238-
|state, source_file| {
236+
|(current_partition_index, current_partition_size), source_file| {
239237
let mut produced_files = vec![];
240-
let mut range_start = 0;
241-
while range_start < source_file.object_meta.size {
238+
let (mut range_start, file_end) =
239+
if let Some(range) = &source_file.range {
240+
(range.start as u64, range.end as u64)
241+
} else {
242+
(0, source_file.object_meta.size)
243+
};
244+
while range_start < file_end {
242245
let range_end = min(
243-
range_start + (target_partition_size - state.1),
244-
source_file.object_meta.size,
246+
range_start
247+
+ (target_partition_size - *current_partition_size),
248+
file_end,
245249
);
246250

247251
let mut produced_file = source_file.clone();
248252
produced_file.range = Some(FileRange {
249253
start: range_start as i64,
250254
end: range_end as i64,
251255
});
252-
produced_files.push((state.0, produced_file));
256+
produced_files.push((*current_partition_index, produced_file));
253257

254-
if state.1 + (range_end - range_start) >= target_partition_size {
255-
state.0 += 1;
256-
state.1 = 0;
258+
if *current_partition_size + (range_end - range_start)
259+
>= target_partition_size
260+
{
261+
*current_partition_index += 1;
262+
*current_partition_size = 0;
257263
} else {
258-
state.1 += range_end - range_start;
264+
*current_partition_size += range_end - range_start;
259265
}
260266
range_start = range_end;
261267
}
@@ -645,6 +651,48 @@ mod test {
645651
assert_partitioned_files(expected, actual);
646652
}
647653

654+
#[test]
655+
fn repartition_single_file_with_range() {
656+
// Single file, single partition into multiple partitions
657+
let single_partition =
658+
vec![FileGroup::new(vec![pfile("a", 123).with_range(0, 123)])];
659+
660+
let actual = FileGroupPartitioner::new()
661+
.with_target_partitions(4)
662+
.with_repartition_file_min_size(10)
663+
.repartition_file_groups(&single_partition);
664+
665+
let expected = Some(vec![
666+
FileGroup::new(vec![pfile("a", 123).with_range(0, 31)]),
667+
FileGroup::new(vec![pfile("a", 123).with_range(31, 62)]),
668+
FileGroup::new(vec![pfile("a", 123).with_range(62, 93)]),
669+
FileGroup::new(vec![pfile("a", 123).with_range(93, 123)]),
670+
]);
671+
assert_partitioned_files(expected, actual);
672+
}
673+
674+
#[test]
675+
fn repartition_single_file_duplicated_with_range() {
676+
// Single file, two partitions into multiple partitions
677+
let single_partition = vec![FileGroup::new(vec![
678+
pfile("a", 100).with_range(0, 50),
679+
pfile("a", 100).with_range(50, 100),
680+
])];
681+
682+
let actual = FileGroupPartitioner::new()
683+
.with_target_partitions(4)
684+
.with_repartition_file_min_size(10)
685+
.repartition_file_groups(&single_partition);
686+
687+
let expected = Some(vec![
688+
FileGroup::new(vec![pfile("a", 100).with_range(0, 25)]),
689+
FileGroup::new(vec![pfile("a", 100).with_range(25, 50)]),
690+
FileGroup::new(vec![pfile("a", 100).with_range(50, 75)]),
691+
FileGroup::new(vec![pfile("a", 100).with_range(75, 100)]),
692+
]);
693+
assert_partitioned_files(expected, actual);
694+
}
695+
648696
#[test]
649697
fn repartition_too_much_partitions() {
650698
// Single file, single partition into 96 partitions
@@ -717,22 +765,6 @@ mod test {
717765
assert_partitioned_files(expected, actual);
718766
}
719767

720-
#[test]
721-
fn repartition_no_action_ranges() {
722-
// No action due to Some(range) in second file
723-
let source_partitions = vec![
724-
FileGroup::new(vec![pfile("a", 123)]),
725-
FileGroup::new(vec![pfile("b", 144).with_range(1, 50)]),
726-
];
727-
728-
let actual = FileGroupPartitioner::new()
729-
.with_target_partitions(65)
730-
.with_repartition_file_min_size(10)
731-
.repartition_file_groups(&source_partitions);
732-
733-
assert_partitioned_files(None, actual)
734-
}
735-
736768
#[test]
737769
fn repartition_no_action_min_size() {
738770
// No action due to target_partition_size

0 commit comments

Comments
 (0)