From aaa00b42b7f965820aec92424bd02504694dce7f Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Sun, 6 Oct 2024 20:58:23 +0800 Subject: [PATCH] fixed --- .../apache/paimon/memory/MemorySegmentPool.java | 1 + .../apache/paimon/io/DataFilePathFactory.java | 1 + .../paimon/mergetree/MergeTreeWriter.java | 11 ++++++++--- .../compact/ChangelogMergeTreeRewriter.java | 2 ++ .../mergetree/compact/IntervalPartition.java | 2 +- .../LookupChangelogMergeFunctionWrapper.java | 3 +++ .../compact/MergeTreeCompactManager.java | 5 ++++- .../mergetree/compact/MergeTreeCompactTask.java | 10 ++++++---- .../compact/SortMergeReaderWithMinHeap.java | 3 +++ .../mergetree/compact/UniversalCompaction.java | 17 ++++++++++++++--- .../operation/AbstractFileStoreWrite.java | 1 + .../operation/KeyValueFileStoreWrite.java | 2 ++ .../org/apache/paimon/flink/sink/FlinkSink.java | 7 ++++--- .../paimon/flink/sink/FlinkSinkBuilder.java | 4 ++++ 14 files changed, 54 insertions(+), 15 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentPool.java b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentPool.java index 18538b2fae0d..f3be55e24a22 100644 --- a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentPool.java +++ b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentPool.java @@ -31,6 +31,7 @@ @Public public interface MemorySegmentPool extends MemorySegmentSource { + // 默认是 32kb int DEFAULT_PAGE_SIZE = 32 * 1024; /** diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java index 742888b36420..1903a36d4fbe 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java @@ -61,6 +61,7 @@ public Path newChangelogPath() { return newPath(changelogFilePrefix); } + // 生成新的文件路径 private Path newPath(String prefix) { String name = prefix + uuid + "-" + pathCount.getAndIncrement() + "." + formatIdentifier; return new Path(parent, name); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index ee3f6ecf67d6..fc0d2c9587ae 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -166,6 +166,7 @@ public void write(KeyValue kv) throws Exception { long sequenceNumber = newSequenceNumber(); boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value()); if (!success) { + // 若写入不成功,先将内存中数据刷新到磁盘中,并进行一次压缩 flushWriteBuffer(false, false); success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value()); if (!success) { @@ -214,7 +215,7 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul waitForLatestCompaction = true; } - // 判断要不要声明 change log writer(若是input 是直接写入 change log 目录, 所以需要创建新的滚动log 的文件) + // 判断是否要声明 change log writer(若是input 是直接写入 change log 目录, 所以需要创建新的滚动log 的文件) final RollingFileWriter changelogWriter = (changelogProducer == ChangelogProducer.INPUT && !isInsertOnly) ? writerFactory.createRollingChangelogFileWriter(0) @@ -225,13 +226,15 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul writerFactory.createRollingMergeTreeFileWriter(0, FileSource.APPEND); try { - // 将 buffer 中的数据刷新到 l0 中 + // 将 memory 中的数据刷新到 level0 , Memory中的数据是已经排序过的, 写入到对应的磁盘文件中 writeBuffer.forEach( keyComparator, mergeFunction, + // 直接写入change log 文件中 changelogWriter == null ? null : changelogWriter::write, dataWriter::write); } finally { + // 清空 memory, 供后续使用 writeBuffer.clear(); if (changelogWriter != null) { changelogWriter.close(); @@ -239,6 +242,7 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul dataWriter.close(); } + // 返回写入文件的描述 DataFileMeta List dataMetas = dataWriter.result(); if (changelogWriter != null) { newFilesChangelog.addAll(changelogWriter.result()); @@ -253,7 +257,7 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul newFilesChangelog.addAll(changelogMetas); } - // 将文件添加到 level0 层 + // 将返回的文件结果添加到 level0 层 for (DataFileMeta dataMeta : dataMetas) { newFiles.add(dataMeta); compactManager.addNewFile(dataMeta); @@ -262,6 +266,7 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul // waitForLatestCompaction=true等待上一个compaction 完成,再进行下一个压缩开始, 若 // waitForLatestCompaction=false, 则不需要等待,直接触发压缩 trySyncLatestCompaction(waitForLatestCompaction); + // 触发对文件的压缩,内部会分为全压缩和部分压缩 , 部分压缩会有三个方式 compactManager.triggerCompaction(forcedFullCompaction); } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java index 56c61d87435a..06d4c08909af 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java @@ -87,6 +87,7 @@ protected boolean rewriteLookupChangelog(int outputLevel, List> return false; } + // lookup 只关注 Level0, 因为level0 的数据是直接刷新过来的,level0都是新增的数据 for (List runs : sections) { for (SortedRun run : runs) { for (DataFileMeta file : run.files()) { @@ -105,6 +106,7 @@ public CompactResult rewrite( if (rewriteChangelog(outputLevel, dropDelete, sections)) { return rewriteOrProduceChangelog(outputLevel, sections, dropDelete, true); } else { + // 直接走正常逻辑 rewrite compaction return rewriteCompaction(outputLevel, dropDelete, sections); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/IntervalPartition.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/IntervalPartition.java index a37f7181517d..309206a28394 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/IntervalPartition.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/IntervalPartition.java @@ -80,7 +80,7 @@ public List> partition() { // 如果当前的文件的minkey 大于上个文件的maxkey, 则进行 if (!section.isEmpty() && keyComparator.compare(meta.minKey(), bound) > 0) { // larger than current right bound, conclude current section and create a new one - // 大于当前右边界,结束当前部分并创建一个拆分 + // 当前与之前的文件组存在重叠,则生成一个 SortedRun, 并新生成一个 result.add(partition(section)); section.clear(); bound = null; diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java index f182c7f4ade3..19aa9c14066a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java @@ -108,7 +108,9 @@ public void add(KeyValue kv) { public ChangelogResult getResult() { // 1. Compute the latest high level record and containLevel0 of candidates LinkedList candidates = mergeFunction.candidates(); + // 倒序迭代器 Iterator descending = candidates.descendingIterator(); + // 最高级别的 keyValue 的值数据 KeyValue highLevel = null; boolean containLevel0 = false; while (descending.hasNext()) { @@ -124,6 +126,7 @@ public ChangelogResult getResult() { } // 2. Lookup if latest high level record is absent + // 查找是否缺少最搞层级的记录 if (highLevel == null) { InternalRow lookupKey = candidates.get(0).key(); T lookupResult = lookup.apply(lookupKey); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java index efc1c806ee2f..66c25f1cb509 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java @@ -117,6 +117,7 @@ public void triggerCompaction(boolean fullCompaction) { Optional optionalUnit; // L0 每一个文件对应一个sorted run, L0往下每一层有一个sorted run. 每一个sorted run 对应一个或多个文件,文件写到一定大小就会rolling // out. 所以同一层的sorted run看做是一个全局按照pk排序的文件. + // 计算总的 sorted run List runs = levels.levelSortedRuns(); if (fullCompaction) { Preconditions.checkState( @@ -128,7 +129,7 @@ public void triggerCompaction(boolean fullCompaction) { "Trigger forced full compaction. Picking from the following runs\n{}", runs); } - // 全部压缩到最后MaxLevel + // 全压缩是将文件全部压缩到 MaxLevel optionalUnit = CompactStrategy.pickFullCompaction(levels.numberOfLevels(), runs); } else { if (taskFuture != null) { @@ -137,6 +138,7 @@ public void triggerCompaction(boolean fullCompaction) { if (LOG.isDebugEnabled()) { LOG.debug("Trigger normal compaction. Picking from the following runs\n{}", runs); } + // 开始压缩 optionalUnit = strategy.pick(levels.numberOfLevels(), runs) .filter(unit -> unit.files().size() > 0) @@ -175,6 +177,7 @@ public void triggerCompaction(boolean fullCompaction) { file.fileSize())) .collect(Collectors.joining(", "))); } + // 提交压缩 submitCompaction(unit, dropDelete); }); } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java index 5a9cad9b3d3b..56e2980c2509 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactTask.java @@ -86,6 +86,7 @@ protected CompactResult doCompact() throws Exception { // orderliness // 检查相邻和顺序的文件并进行压缩,顺序去执行压缩 + // sorted run 之间的文件不进行交叉 for (List section : partitioned) { // 当前分割批次的数量> 1, 则直接加到后补名单中 if (section.size() > 1) { @@ -98,7 +99,7 @@ protected CompactResult doCompact() throws Exception { // rewriting it // But for small files, we will try to compact it - // 小文件进行压缩,大的文件,直接升级大下个层级即可 + // 小文件进行压缩,大的文件,直接升级到下个层级即可 for (DataFileMeta file : run.files()) { if (file.fileSize() < minFileSize) { // Smaller files are rewritten along with the previous files @@ -106,9 +107,9 @@ protected CompactResult doCompact() throws Exception { candidate.add(singletonList(SortedRun.fromSingle(file))); } else { // Large file appear, rewrite previous and upgrade it - // 先 rewrite 小文件 + // 出现大文件,重写之前的文件并升级 rewrite(candidate, result); - // 把大文件升级到下一层级 + // 再把当前的大文件升级到最高层 upgrade(file, result); } } @@ -135,7 +136,7 @@ private void upgrade(DataFileMeta file, CompactResult toUpdate) throws Exception } if (outputLevel != maxLevel || file.deleteRowCount().map(d -> d == 0).orElse(false)) { - // 如果outputLevel不是最大的level, 或者当前文件的删除的行是0, 则进行 + // 如果outputLevel不是最大的level, 或者当前文件的删除的行是0, 则直接将文件升级到下一层 CompactResult upgradeResult = rewriter.upgrade(outputLevel, file); toUpdate.merge(upgradeResult); upgradeFilesNum++; @@ -149,6 +150,7 @@ private void upgrade(DataFileMeta file, CompactResult toUpdate) throws Exception } } + // 小文件重写 private void rewrite(List> candidate, CompactResult toUpdate) throws Exception { if (candidate.isEmpty()) { return; diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReaderWithMinHeap.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReaderWithMinHeap.java index 69607c71c1f6..539a4665a3d3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReaderWithMinHeap.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/SortMergeReaderWithMinHeap.java @@ -39,6 +39,7 @@ public class SortMergeReaderWithMinHeap implements SortMergeReader { private final Comparator userKeyComparator; private final MergeFunctionWrapper mergeFunctionWrapper; + // 堆排序 private final PriorityQueue minHeap; private final List polled; @@ -166,6 +167,8 @@ private boolean nextImpl() throws IOException { // fetch all elements with the same key // note that the same iterator should not produce the same keys, so this code is correct + + // 获取 所有相同 id 的数据 while (!minHeap.isEmpty()) { Element element = minHeap.peek(); if (userKeyComparator.compare(key, element.kv.key()) != 0) { diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java index 6754f5cce63f..ca7f983086de 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java @@ -82,6 +82,8 @@ public Optional pick(int numLevels, List runs) { } // 1 checking for reducing size amplification + // size amplification ratio = (size(R1) + size(R2) + ... size(Rn-1)) / size(Rn) (空间放大比例) + // 这个算法的基础就是我们经常把数据压缩到最后一层,这一层的数据几乎占了总数据量的 80% 以上 CompactUnit unit = pickForSizeAmp(maxLevel, runs); if (unit != null) { if (LOG.isDebugEnabled()) { @@ -92,6 +94,8 @@ public Optional pick(int numLevels, List runs) { } // 2 checking for size ratio + // size_ratio_trigger = (100 + options.compaction_options_universal.size_ratio) / 100 + // (由Individual Size Ratio触发的合并) unit = pickForSizeRatio(maxLevel, runs); if (unit != null) { if (LOG.isDebugEnabled()) { @@ -101,6 +105,7 @@ public Optional pick(int numLevels, List runs) { } // 3 checking for file num + // SortedRun 达到一定的数量后合并 if (runs.size() > numRunCompactionTrigger) { // compacting for file num int candidateCount = runs.size() - numRunCompactionTrigger + 1; @@ -113,23 +118,26 @@ public Optional pick(int numLevels, List runs) { return Optional.empty(); } + // 空间放大比例达到一定程度 @VisibleForTesting CompactUnit pickForSizeAmp(int maxLevel, List runs) { + // 前提条件是 SortedRun 数量大于触发压缩的数量 if (runs.size() < numRunCompactionTrigger) { return null; } - // 候选人的总数量 + // 计算 size(R1) + size(R2) + ... size(Rn-1)) long candidateSize = runs.subList(0, runs.size() - 1).stream() .map(LevelSortedRun::run) .mapToLong(SortedRun::totalSize) .sum(); + // 计算(size(Rn)) long earliestRunSize = runs.get(runs.size() - 1).run().totalSize(); // size amplification = percentage of additional size - // todo 方程的解析 + // size amplification ratio = (size(R1) + size(R2) + ... size(Rn-1)) / size(Rn) if (candidateSize * 100 > maxSizeAmp * earliestRunSize) { updateLastOptimizedCompaction(); return CompactUnit.fromLevelRuns(maxLevel, runs); @@ -140,6 +148,7 @@ CompactUnit pickForSizeAmp(int maxLevel, List runs) { @VisibleForTesting CompactUnit pickForSizeRatio(int maxLevel, List runs) { + // 触发的前置条件, 必须SortedRun的数量大于指定的触发num if (runs.size() < numRunCompactionTrigger) { return null; } @@ -154,13 +163,15 @@ private CompactUnit pickForSizeRatio( public CompactUnit pickForSizeRatio( int maxLevel, List runs, int candidateCount, boolean forcePick) { + // 候选数量默认是从 1 开始 long candidateSize = candidateSize(runs, candidateCount); for (int i = candidateCount; i < runs.size(); i++) { LevelSortedRun next = runs.get(i); + // size_ratio_trigger = (100 + options.compaction_options_universal.size_ratio) / 100 + // (由Individual Size Ratio触发的合并) if (candidateSize * (100.0 + sizeRatio) / 100.0 < next.run().totalSize()) { break; } - candidateSize += next.run().totalSize(); candidateCount++; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index dab20d642cb9..dd01a49c68df 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -400,6 +400,7 @@ private long writerNumber() { return writers.values().stream().mapToLong(e -> e.values().size()).sum(); } + // 创建数据写入的容器 @VisibleForTesting public WriterContainer createWriterContainer( BinaryRow partition, int bucket, boolean ignorePreviousFiles) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index 7adaa1246aac..a7c7997b891c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -305,6 +305,7 @@ private MergeTreeCompactRewriter createRewriter( LookupMergeTreeCompactRewriter.MergeFunctionWrapperFactory wrapperFactory; FileReaderFactory lookupReaderFactory = readerFactory; if (lookupStrategy.isFirstRow) { + // first row 合并策略,不支持ddletion vector 引擎,因为只选择第一个 if (options.deletionVectorsEnabled()) { throw new UnsupportedOperationException( "First row merge engine does not need deletion vectors because there is no deletion of old data in this merge engine."); @@ -319,6 +320,7 @@ private MergeTreeCompactRewriter createRewriter( } else { processor = lookupStrategy.deletionVector + // position key value 处理器 ,类似 iceberg 的 position delete file ? new PositionedKeyValueProcessor( valueType, lookupStrategy.produceChangelog diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index bbf01b33d8f5..c0f06eddb862 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -102,7 +102,7 @@ private StoreSinkWrite.Provider createWriteProvider( if (coreOptions.writeOnly()) { waitCompaction = false; } else { - // 不需要等待全局压缩 + // 需要全局压缩的 waitCompaction = coreOptions.prepareCommitWaitCompaction(); int deltaCommits = -1; if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) { @@ -116,7 +116,7 @@ private StoreSinkWrite.Provider createWriteProvider( / checkpointConfig.getCheckpointInterval()); } - // 如何 change log 设置的是 full-compaction,全压缩 + // 如何 change log 设置的是 full-compaction, 则使用全局压缩方式 // deltaCommits, 当增量提交的数量达到一定程度后进行压缩 if (changelogProducer == ChangelogProducer.FULL_COMPACTION || deltaCommits >= 0) { int finalDeltaCommits = Math.max(deltaCommits, 1); @@ -141,6 +141,7 @@ private StoreSinkWrite.Provider createWriteProvider( if (coreOptions.needLookup() && !coreOptions.prepareCommitWaitCompaction()) { return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> { assertNoSinkMaterializer.run(); + // 删除操作也需要知道哪个文件中存在当前这条删除的数据,需要进行查找 return new AsyncLookupSinkWrite( table, commitUser, @@ -153,7 +154,7 @@ private StoreSinkWrite.Provider createWriteProvider( metricGroup); }; } - // 正常写入器 + // 普通写入器, 异步压缩 return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> { assertNoSinkMaterializer.run(); return new StoreSinkWriteImpl( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java index 64c8c396399d..2d79874e4abb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java @@ -134,6 +134,7 @@ public FlinkSinkBuilder parallelism(int parallelism) { } /** Clustering the input data if possible. */ + // 如果可能,对输入数据进行聚类。 public FlinkSinkBuilder clusteringIfPossible( String clusteringColumns, String clusteringStrategy, @@ -227,12 +228,15 @@ public DataStreamSink build() { BucketMode bucketMode = table.bucketMode(); switch (bucketMode) { case HASH_FIXED: + // 固定bucket return buildForFixedBucket(input); case HASH_DYNAMIC: + // 动态bucket, 老的数据进入来的bucket中,新数据进入新的 bucket (动态生成) return buildDynamicBucketSink(input, false); case CROSS_PARTITION: return buildDynamicBucketSink(input, true); case BUCKET_UNAWARE: + // 只适合 append table return buildUnawareBucketSink(input); default: throw new UnsupportedOperationException("Unsupported bucket mode: " + bucketMode);