diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index bf67eadfff4c..122a205061fe 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -2553,7 +2553,11 @@ public static List> getOptions() { /** Specifies the sort engine for table with primary key. */ public enum SortEngine implements DescribedEnum { + // 最小堆排序 MIN_HEAP("min-heap", "Use min-heap for multiway sorting."), + + // 使用loser-tree进行多向排序。与heapsort相比,loser树的比较更少,效率更高。 + // https://developer.volcengine.com/articles/7236667079003209785 LOSER_TREE( "loser-tree", "Use loser-tree for multiway sorting. Compared with heapsort, loser-tree has fewer comparisons and is more efficient."); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java index 1e12025ba533..df112edacc94 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java @@ -79,6 +79,7 @@ public FileRecordReader( @Nullable PartitionInfo partitionInfo) throws IOException { try { + // Orc、Parquet or Avro file reader. this.reader = readerFactory.createReader(context); } catch (Exception e) { FileUtils.checkExists(context.fileIO(), context.filePath()); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java index fdbb727e5674..4a4367b26646 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java @@ -102,10 +102,13 @@ public RecordReader createRecordReader(DataFileMeta file) throws IOExc public RecordReader createRecordReader( long schemaId, String fileName, long fileSize, int level) throws IOException { + + // asyncThreshold 为异步读取文件的阈值,默认是10MB if (fileSize >= asyncThreshold && fileName.endsWith(".orc")) { return new AsyncRecordReader<>( () -> createRecordReader(schemaId, fileName, level, false, 2, fileSize)); } + // return KeyValueDataFileRecordReader(FileRecordReader(orc、parquet、avro)) return createRecordReader(schemaId, fileName, level, true, null, fileSize); } @@ -135,6 +138,7 @@ private RecordReader createRecordReader( Path filePath = pathFactory.toPath(fileName); RecordReader fileRecordReader = + // Orc、Parquet or Avro file reader. new FileRecordReader( bulkFormatMapping.getReaderFactory(), orcPoolSize == null diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java index ae6db76b90ba..5d5800780bc4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java @@ -107,11 +107,12 @@ public RecordReader mergeSort( @Nullable FieldsComparator userDefinedSeqComparator, MergeFunctionWrapper mergeFunction) throws IOException { + // 溢出排序 if (ioManager != null && lazyReaders.size() > spillThreshold) { return spillMergeSort( lazyReaders, keyComparator, userDefinedSeqComparator, mergeFunction); } - + // 纯内存排序 return mergeSortNoSpill( lazyReaders, keyComparator, userDefinedSeqComparator, mergeFunction); } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java index 88a9c2593f1e..ee5333bb213c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java @@ -51,8 +51,10 @@ public static RecordReader readerForMergeTree( throws IOException { List> readers = new ArrayList<>(); for (List section : sections) { + // 每一个 sort 对应一个 SortMergeReader readers.add( () -> + // return sort reader(min-heap reader or loser-sort reader) readerForSection( section, readerFactory, @@ -83,18 +85,26 @@ public long estimateSize() { @Override public RecordReader get() throws IOException { + /** + * 1. 遍历 SortRun 中的文件,创建 FileReader 2. + * 返回ConcatReader,简单的将所有的文件读取作为一个整体进行返回 + */ return readerForRun(run, readerFactory); } }); } + // 对section 中需要合并的每个SortRun中的文件进行合并, 生成 MergeSort 排序, 会存在spill 和 no spill两种(随后进行分析) return mergeSorter.mergeSort( readers, userKeyComparator, userDefinedSeqComparator, mergeFunctionWrapper); } + // 为 SortRun 中的每个文件生成一个reader private static RecordReader readerForRun( SortedRun run, FileReaderFactory readerFactory) throws IOException { List> readers = new ArrayList<>(); + // 对每个文件生成一个 file reader for (DataFileMeta file : run.files()) { + // 通过 KeyValueFileReaderFactory 创建reader readers.add(() -> readerFactory.createRecordReader(file)); } return ConcatRecordReader.create(readers); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ConcatRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ConcatRecordReader.java index c9f5bebfca8e..ac369575389c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ConcatRecordReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ConcatRecordReader.java @@ -35,6 +35,8 @@ * input list is already sorted by key and sequence number, and the key intervals do not overlap * each other. */ + +// 拼接的reader public class ConcatRecordReader implements RecordReader { private final Queue> queue; @@ -45,6 +47,7 @@ protected ConcatRecordReader(List> readerFactories) readerFactories.forEach( supplier -> Preconditions.checkNotNull(supplier, "Reader factory must not be null.")); + // 将 reader 添加队列中, 最终会依次访问 this.queue = new LinkedList<>(readerFactories); } @@ -63,6 +66,7 @@ public static RecordReader create(ReaderSupplier reader1, ReaderSuppli public RecordIterator readBatch() throws IOException { while (true) { if (current != null) { + // 依次遍历 SortMergeReader RecordIterator iterator = current.readBatch(); if (iterator != null) { return iterator; diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/IntervalPartition.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/IntervalPartition.java index 9c5e91af9091..a37f7181517d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/IntervalPartition.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/IntervalPartition.java @@ -70,12 +70,17 @@ public IntervalPartition(List inputFiles, Comparator public List> partition() { List> result = new ArrayList<>(); List section = new ArrayList<>(); + + // 边界值 BinaryRow bound = null; + // files 是通过策略选取出来的 + // 然后对策略进行分区 for (DataFileMeta meta : files) { - // 比较是不是有重叠 + // 如果当前的文件的minkey 大于上个文件的maxkey, 则进行 if (!section.isEmpty() && keyComparator.compare(meta.minKey(), bound) > 0) { - // larger than current right bound, conclude current section and create a new one 大于当前右边界,结束当前部分并创建一个新的部分 + // larger than current right bound, conclude current section and create a new one + // 大于当前右边界,结束当前部分并创建一个拆分 result.add(partition(section)); section.clear(); bound = null; @@ -112,11 +117,14 @@ private List partition(List metas) { // any file list whose max key < meta.minKey() is sufficient, // for convenience we pick the smallest List top = queue.poll(); + if (keyComparator.compare(meta.minKey(), top.get(top.size() - 1).maxKey()) > 0) { + // 有交叉,需要合并 // append current file to an existing partition top.add(meta); } else { // create a new partition + // 数据没有交叉,生成一个新的 List newRun = new ArrayList<>(); newRun.add(meta); queue.add(newRun); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java index cb34fd8cbbe9..efc1c806ee2f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java @@ -115,7 +115,8 @@ public List allFiles() { @Override public void triggerCompaction(boolean fullCompaction) { Optional optionalUnit; - // L0 每一个文件对应一个sorted run, L0往下每一层有一个sorted run. 每一个sorted run 对应一个或多个文件,文件写到一定大小就会rolling out. 所以同一层的sorted run看做是一个全局按照pk排序的文件. + // L0 每一个文件对应一个sorted run, L0往下每一层有一个sorted run. 每一个sorted run 对应一个或多个文件,文件写到一定大小就会rolling + // out. 所以同一层的sorted run看做是一个全局按照pk排序的文件. List runs = levels.levelSortedRuns(); if (fullCompaction) { Preconditions.checkState( diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java index df299fd84c52..03c18df36b53 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java @@ -80,12 +80,14 @@ protected CompactResult rewriteCompaction( RecordReader reader = null; Exception collectedExceptions = null; try { + // ConcatRecordReader(合并 sections, 每个 section 都有一个List 需要合并) reader = readerForMergeTree( sections, new ReducerMergeFunctionWrapper(mfFactory.create())); if (dropDelete) { reader = new DropDeleteReader(reader); } + // 需要将这些数据遍历并写入文件中 writer.write(new RecordReaderIterator<>(reader)); } catch (Exception e) { collectedExceptions = e; diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java index bd74412c629a..5a9cad9b3d3b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java @@ -66,6 +66,8 @@ public MergeTreeCompactTask( this.rewriter = rewriter; this.outputLevel = unit.outputLevel(); this.compactDfSupplier = compactDfSupplier; + + // 将有范围交叉的进行分区后处理 this.partitioned = new IntervalPartition(unit.files(), keyComparator).partition(); this.dropDelete = dropDelete; this.maxLevel = maxLevel; @@ -85,26 +87,34 @@ protected CompactResult doCompact() throws Exception { // 检查相邻和顺序的文件并进行压缩,顺序去执行压缩 for (List section : partitioned) { + // 当前分割批次的数量> 1, 则直接加到后补名单中 if (section.size() > 1) { candidate.add(section); } else { + SortedRun run = section.get(0); // No overlapping: // We can just upgrade the large file and just change the level instead of // rewriting it // But for small files, we will try to compact it + + // 小文件进行压缩,大的文件,直接升级大下个层级即可 for (DataFileMeta file : run.files()) { if (file.fileSize() < minFileSize) { // Smaller files are rewritten along with the previous files + // 小文件需要先压缩 candidate.add(singletonList(SortedRun.fromSingle(file))); } else { // Large file appear, rewrite previous and upgrade it + // 先 rewrite 小文件 rewrite(candidate, result); + // 把大文件升级到下一层级 upgrade(file, result); } } } } + // 重写 rewrite(candidate, result); result.setDeletionFile(compactDfSupplier.get()); return result; @@ -119,16 +129,19 @@ protected String logMetric( } private void upgrade(DataFileMeta file, CompactResult toUpdate) throws Exception { + // 若当前文件的level与输出文件的level相等,则不进行任何操作 if (file.level() == outputLevel) { return; } if (outputLevel != maxLevel || file.deleteRowCount().map(d -> d == 0).orElse(false)) { + // 如果outputLevel不是最大的level, 或者当前文件的删除的行是0, 则进行 CompactResult upgradeResult = rewriter.upgrade(outputLevel, file); toUpdate.merge(upgradeResult); upgradeFilesNum++; } else { // files with delete records should not be upgraded directly to max level + // 有删除记录的文件不能直接升级到最高的level List> candidate = new ArrayList<>(); candidate.add(new ArrayList<>()); candidate.get(0).add(SortedRun.fromSingle(file)); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReader.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReader.java index 598ca2aa692c..b3ba1d4d6012 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReader.java @@ -38,6 +38,7 @@ */ public interface SortMergeReader extends RecordReader { + // 创建 sort merge reader static SortMergeReader createSortMergeReader( List> readers, Comparator userKeyComparator, diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReaderWithMinHeap.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReaderWithMinHeap.java index a78ef334f071..69607c71c1f6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReaderWithMinHeap.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReaderWithMinHeap.java @@ -87,7 +87,7 @@ public RecordIterator readBatch() throws IOException { // empty iterator, clean up and try next batch iterator.releaseBatch(); } else { - // found next kv + // found next kv (插入指定的元素) minHeap.offer(new Element(kv, iterator, reader)); break; } @@ -197,6 +197,7 @@ private Element( } // IMPORTANT: Must not call this for elements still in priority queue! + // 对于仍在优先级队列中的元素,不得调用此命令 private boolean update() throws IOException { KeyValue nextKv = iterator.next(); if (nextKv == null) {