Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Oct 6, 2024
1 parent 5796c12 commit aaa00b4
Show file tree
Hide file tree
Showing 14 changed files with 54 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
@Public
public interface MemorySegmentPool extends MemorySegmentSource {

// 默认是 32kb
int DEFAULT_PAGE_SIZE = 32 * 1024;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public Path newChangelogPath() {
return newPath(changelogFilePrefix);
}

// 生成新的文件路径
private Path newPath(String prefix) {
String name = prefix + uuid + "-" + pathCount.getAndIncrement() + "." + formatIdentifier;
return new Path(parent, name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ public void write(KeyValue kv) throws Exception {
long sequenceNumber = newSequenceNumber();
boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
if (!success) {
// 若写入不成功,先将内存中数据刷新到磁盘中,并进行一次压缩
flushWriteBuffer(false, false);
success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
if (!success) {
Expand Down Expand Up @@ -214,7 +215,7 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul
waitForLatestCompaction = true;
}

// 判断要不要声明 change log writer(若是input 是直接写入 change log 目录, 所以需要创建新的滚动log 的文件)
// 判断是否要声明 change log writer(若是input 是直接写入 change log 目录, 所以需要创建新的滚动log 的文件)
final RollingFileWriter<KeyValue, DataFileMeta> changelogWriter =
(changelogProducer == ChangelogProducer.INPUT && !isInsertOnly)
? writerFactory.createRollingChangelogFileWriter(0)
Expand All @@ -225,20 +226,23 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul
writerFactory.createRollingMergeTreeFileWriter(0, FileSource.APPEND);

try {
// 将 buffer 中的数据刷新到 l0 中
// 将 memory 中的数据刷新到 level0 , Memory中的数据是已经排序过的, 写入到对应的磁盘文件中
writeBuffer.forEach(
keyComparator,
mergeFunction,
// 直接写入change log 文件中
changelogWriter == null ? null : changelogWriter::write,
dataWriter::write);
} finally {
// 清空 memory, 供后续使用
writeBuffer.clear();
if (changelogWriter != null) {
changelogWriter.close();
}
dataWriter.close();
}

// 返回写入文件的描述 DataFileMeta
List<DataFileMeta> dataMetas = dataWriter.result();
if (changelogWriter != null) {
newFilesChangelog.addAll(changelogWriter.result());
Expand All @@ -253,7 +257,7 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul
newFilesChangelog.addAll(changelogMetas);
}

// 将文件添加到 level0 层
// 将返回的文件结果添加到 level0 层
for (DataFileMeta dataMeta : dataMetas) {
newFiles.add(dataMeta);
compactManager.addNewFile(dataMeta);
Expand All @@ -262,6 +266,7 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul
// waitForLatestCompaction=true等待上一个compaction 完成,再进行下一个压缩开始, 若
// waitForLatestCompaction=false, 则不需要等待,直接触发压缩
trySyncLatestCompaction(waitForLatestCompaction);
// 触发对文件的压缩,内部会分为全压缩和部分压缩 , 部分压缩会有三个方式
compactManager.triggerCompaction(forcedFullCompaction);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ protected boolean rewriteLookupChangelog(int outputLevel, List<List<SortedRun>>
return false;
}

// lookup 只关注 Level0, 因为level0 的数据是直接刷新过来的,level0都是新增的数据
for (List<SortedRun> runs : sections) {
for (SortedRun run : runs) {
for (DataFileMeta file : run.files()) {
Expand All @@ -105,6 +106,7 @@ public CompactResult rewrite(
if (rewriteChangelog(outputLevel, dropDelete, sections)) {
return rewriteOrProduceChangelog(outputLevel, sections, dropDelete, true);
} else {
// 直接走正常逻辑 rewrite compaction
return rewriteCompaction(outputLevel, dropDelete, sections);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public List<List<SortedRun>> partition() {
// 如果当前的文件的minkey 大于上个文件的maxkey, 则进行
if (!section.isEmpty() && keyComparator.compare(meta.minKey(), bound) > 0) {
// larger than current right bound, conclude current section and create a new one
// 大于当前右边界,结束当前部分并创建一个拆分
// 当前与之前的文件组存在重叠,则生成一个 SortedRun, 并新生成一个
result.add(partition(section));
section.clear();
bound = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ public void add(KeyValue kv) {
public ChangelogResult getResult() {
// 1. Compute the latest high level record and containLevel0 of candidates
LinkedList<KeyValue> candidates = mergeFunction.candidates();
// 倒序迭代器
Iterator<KeyValue> descending = candidates.descendingIterator();
// 最高级别的 keyValue 的值数据
KeyValue highLevel = null;
boolean containLevel0 = false;
while (descending.hasNext()) {
Expand All @@ -124,6 +126,7 @@ public ChangelogResult getResult() {
}

// 2. Lookup if latest high level record is absent
// 查找是否缺少最搞层级的记录
if (highLevel == null) {
InternalRow lookupKey = candidates.get(0).key();
T lookupResult = lookup.apply(lookupKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public void triggerCompaction(boolean fullCompaction) {
Optional<CompactUnit> optionalUnit;
// L0 每一个文件对应一个sorted run, L0往下每一层有一个sorted run. 每一个sorted run 对应一个或多个文件,文件写到一定大小就会rolling
// out. 所以同一层的sorted run看做是一个全局按照pk排序的文件.
// 计算总的 sorted run
List<LevelSortedRun> runs = levels.levelSortedRuns();
if (fullCompaction) {
Preconditions.checkState(
Expand All @@ -128,7 +129,7 @@ public void triggerCompaction(boolean fullCompaction) {
"Trigger forced full compaction. Picking from the following runs\n{}",
runs);
}
// 全部压缩到最后MaxLevel
// 全压缩是将文件全部压缩到 MaxLevel
optionalUnit = CompactStrategy.pickFullCompaction(levels.numberOfLevels(), runs);
} else {
if (taskFuture != null) {
Expand All @@ -137,6 +138,7 @@ public void triggerCompaction(boolean fullCompaction) {
if (LOG.isDebugEnabled()) {
LOG.debug("Trigger normal compaction. Picking from the following runs\n{}", runs);
}
// 开始压缩
optionalUnit =
strategy.pick(levels.numberOfLevels(), runs)
.filter(unit -> unit.files().size() > 0)
Expand Down Expand Up @@ -175,6 +177,7 @@ public void triggerCompaction(boolean fullCompaction) {
file.fileSize()))
.collect(Collectors.joining(", ")));
}
// 提交压缩
submitCompaction(unit, dropDelete);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ protected CompactResult doCompact() throws Exception {
// orderliness

// 检查相邻和顺序的文件并进行压缩,顺序去执行压缩
// sorted run 之间的文件不进行交叉
for (List<SortedRun> section : partitioned) {
// 当前分割批次的数量> 1, 则直接加到后补名单中
if (section.size() > 1) {
Expand All @@ -98,17 +99,17 @@ protected CompactResult doCompact() throws Exception {
// rewriting it
// But for small files, we will try to compact it

// 小文件进行压缩,大的文件,直接升级大下个层级即可
// 小文件进行压缩,大的文件,直接升级到下个层级即可
for (DataFileMeta file : run.files()) {
if (file.fileSize() < minFileSize) {
// Smaller files are rewritten along with the previous files
// 小文件需要先压缩
candidate.add(singletonList(SortedRun.fromSingle(file)));
} else {
// Large file appear, rewrite previous and upgrade it
// 先 rewrite 小文件
// 出现大文件,重写之前的文件并升级
rewrite(candidate, result);
// 把大文件升级到下一层级
// 再把当前的大文件升级到最高层
upgrade(file, result);
}
}
Expand All @@ -135,7 +136,7 @@ private void upgrade(DataFileMeta file, CompactResult toUpdate) throws Exception
}

if (outputLevel != maxLevel || file.deleteRowCount().map(d -> d == 0).orElse(false)) {
// 如果outputLevel不是最大的level, 或者当前文件的删除的行是0, 则进行
// 如果outputLevel不是最大的level, 或者当前文件的删除的行是0, 则直接将文件升级到下一层
CompactResult upgradeResult = rewriter.upgrade(outputLevel, file);
toUpdate.merge(upgradeResult);
upgradeFilesNum++;
Expand All @@ -149,6 +150,7 @@ private void upgrade(DataFileMeta file, CompactResult toUpdate) throws Exception
}
}

// 小文件重写
private void rewrite(List<List<SortedRun>> candidate, CompactResult toUpdate) throws Exception {
if (candidate.isEmpty()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class SortMergeReaderWithMinHeap<T> implements SortMergeReader<T> {
private final Comparator<InternalRow> userKeyComparator;
private final MergeFunctionWrapper<T> mergeFunctionWrapper;

// 堆排序
private final PriorityQueue<Element> minHeap;
private final List<Element> polled;

Expand Down Expand Up @@ -166,6 +167,8 @@ private boolean nextImpl() throws IOException {

// fetch all elements with the same key
// note that the same iterator should not produce the same keys, so this code is correct

// 获取 所有相同 id 的数据
while (!minHeap.isEmpty()) {
Element element = minHeap.peek();
if (userKeyComparator.compare(key, element.kv.key()) != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs) {
}

// 1 checking for reducing size amplification
// size amplification ratio = (size(R1) + size(R2) + ... size(Rn-1)) / size(Rn) (空间放大比例)
// 这个算法的基础就是我们经常把数据压缩到最后一层,这一层的数据几乎占了总数据量的 80% 以上
CompactUnit unit = pickForSizeAmp(maxLevel, runs);
if (unit != null) {
if (LOG.isDebugEnabled()) {
Expand All @@ -92,6 +94,8 @@ public Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs) {
}

// 2 checking for size ratio
// size_ratio_trigger = (100 + options.compaction_options_universal.size_ratio) / 100
// (由Individual Size Ratio触发的合并)
unit = pickForSizeRatio(maxLevel, runs);
if (unit != null) {
if (LOG.isDebugEnabled()) {
Expand All @@ -101,6 +105,7 @@ public Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs) {
}

// 3 checking for file num
// SortedRun 达到一定的数量后合并
if (runs.size() > numRunCompactionTrigger) {
// compacting for file num
int candidateCount = runs.size() - numRunCompactionTrigger + 1;
Expand All @@ -113,23 +118,26 @@ public Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs) {
return Optional.empty();
}

// 空间放大比例达到一定程度
@VisibleForTesting
CompactUnit pickForSizeAmp(int maxLevel, List<LevelSortedRun> runs) {
// 前提条件是 SortedRun 数量大于触发压缩的数量
if (runs.size() < numRunCompactionTrigger) {
return null;
}

// 候选人的总数量
// 计算 size(R1) + size(R2) + ... size(Rn-1))
long candidateSize =
runs.subList(0, runs.size() - 1).stream()
.map(LevelSortedRun::run)
.mapToLong(SortedRun::totalSize)
.sum();

// 计算(size(Rn))
long earliestRunSize = runs.get(runs.size() - 1).run().totalSize();

// size amplification = percentage of additional size
// todo 方程的解析
// size amplification ratio = (size(R1) + size(R2) + ... size(Rn-1)) / size(Rn)
if (candidateSize * 100 > maxSizeAmp * earliestRunSize) {
updateLastOptimizedCompaction();
return CompactUnit.fromLevelRuns(maxLevel, runs);
Expand All @@ -140,6 +148,7 @@ CompactUnit pickForSizeAmp(int maxLevel, List<LevelSortedRun> runs) {

@VisibleForTesting
CompactUnit pickForSizeRatio(int maxLevel, List<LevelSortedRun> runs) {
// 触发的前置条件, 必须SortedRun的数量大于指定的触发num
if (runs.size() < numRunCompactionTrigger) {
return null;
}
Expand All @@ -154,13 +163,15 @@ private CompactUnit pickForSizeRatio(

public CompactUnit pickForSizeRatio(
int maxLevel, List<LevelSortedRun> runs, int candidateCount, boolean forcePick) {
// 候选数量默认是从 1 开始
long candidateSize = candidateSize(runs, candidateCount);
for (int i = candidateCount; i < runs.size(); i++) {
LevelSortedRun next = runs.get(i);
// size_ratio_trigger = (100 + options.compaction_options_universal.size_ratio) / 100
// (由Individual Size Ratio触发的合并)
if (candidateSize * (100.0 + sizeRatio) / 100.0 < next.run().totalSize()) {
break;
}

candidateSize += next.run().totalSize();
candidateCount++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ private long writerNumber() {
return writers.values().stream().mapToLong(e -> e.values().size()).sum();
}

// 创建数据写入的容器
@VisibleForTesting
public WriterContainer<T> createWriterContainer(
BinaryRow partition, int bucket, boolean ignorePreviousFiles) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ private MergeTreeCompactRewriter createRewriter(
LookupMergeTreeCompactRewriter.MergeFunctionWrapperFactory<?> wrapperFactory;
FileReaderFactory<KeyValue> lookupReaderFactory = readerFactory;
if (lookupStrategy.isFirstRow) {
// first row 合并策略,不支持ddletion vector 引擎,因为只选择第一个
if (options.deletionVectorsEnabled()) {
throw new UnsupportedOperationException(
"First row merge engine does not need deletion vectors because there is no deletion of old data in this merge engine.");
Expand All @@ -319,6 +320,7 @@ private MergeTreeCompactRewriter createRewriter(
} else {
processor =
lookupStrategy.deletionVector
// position key value 处理器 ,类似 iceberg 的 position delete file
? new PositionedKeyValueProcessor(
valueType,
lookupStrategy.produceChangelog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private StoreSinkWrite.Provider createWriteProvider(
if (coreOptions.writeOnly()) {
waitCompaction = false;
} else {
// 不需要等待全局压缩
// 需要全局压缩的
waitCompaction = coreOptions.prepareCommitWaitCompaction();
int deltaCommits = -1;
if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) {
Expand All @@ -116,7 +116,7 @@ private StoreSinkWrite.Provider createWriteProvider(
/ checkpointConfig.getCheckpointInterval());
}

// 如何 change log 设置的是 full-compaction,全压缩
// 如何 change log 设置的是 full-compaction, 则使用全局压缩方式
// deltaCommits, 当增量提交的数量达到一定程度后进行压缩
if (changelogProducer == ChangelogProducer.FULL_COMPACTION || deltaCommits >= 0) {
int finalDeltaCommits = Math.max(deltaCommits, 1);
Expand All @@ -141,6 +141,7 @@ private StoreSinkWrite.Provider createWriteProvider(
if (coreOptions.needLookup() && !coreOptions.prepareCommitWaitCompaction()) {
return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> {
assertNoSinkMaterializer.run();
// 删除操作也需要知道哪个文件中存在当前这条删除的数据,需要进行查找
return new AsyncLookupSinkWrite(
table,
commitUser,
Expand All @@ -153,7 +154,7 @@ private StoreSinkWrite.Provider createWriteProvider(
metricGroup);
};
}
// 正常写入器
// 普通写入器, 异步压缩
return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> {
assertNoSinkMaterializer.run();
return new StoreSinkWriteImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public FlinkSinkBuilder parallelism(int parallelism) {
}

/** Clustering the input data if possible. */
// 如果可能,对输入数据进行聚类。
public FlinkSinkBuilder clusteringIfPossible(
String clusteringColumns,
String clusteringStrategy,
Expand Down Expand Up @@ -227,12 +228,15 @@ public DataStreamSink<?> build() {
BucketMode bucketMode = table.bucketMode();
switch (bucketMode) {
case HASH_FIXED:
// 固定bucket
return buildForFixedBucket(input);
case HASH_DYNAMIC:
// 动态bucket, 老的数据进入来的bucket中,新数据进入新的 bucket (动态生成)
return buildDynamicBucketSink(input, false);
case CROSS_PARTITION:
return buildDynamicBucketSink(input, true);
case BUCKET_UNAWARE:
// 只适合 append table
return buildUnawareBucketSink(input);
default:
throw new UnsupportedOperationException("Unsupported bucket mode: " + bucketMode);
Expand Down

0 comments on commit aaa00b4

Please sign in to comment.