Skip to content

Commit

Permalink
read source code
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Oct 2, 2024
1 parent c6f2067 commit 5796c12
Showing 12 changed files with 55 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -2553,7 +2553,11 @@ public static List<ConfigOption<?>> getOptions() {

/** Specifies the sort engine for table with primary key. */
public enum SortEngine implements DescribedEnum {
// 最小堆排序
MIN_HEAP("min-heap", "Use min-heap for multiway sorting."),

// 使用loser-tree进行多向排序。与heapsort相比,loser树的比较更少,效率更高。
// https://developer.volcengine.com/articles/7236667079003209785
LOSER_TREE(
"loser-tree",
"Use loser-tree for multiway sorting. Compared with heapsort, loser-tree has fewer comparisons and is more efficient.");
Original file line number Diff line number Diff line change
@@ -79,6 +79,7 @@ public FileRecordReader(
@Nullable PartitionInfo partitionInfo)
throws IOException {
try {
// Orc、Parquet or Avro file reader.
this.reader = readerFactory.createReader(context);
} catch (Exception e) {
FileUtils.checkExists(context.fileIO(), context.filePath());
Original file line number Diff line number Diff line change
@@ -102,10 +102,13 @@ public RecordReader<KeyValue> createRecordReader(DataFileMeta file) throws IOExc

public RecordReader<KeyValue> createRecordReader(
long schemaId, String fileName, long fileSize, int level) throws IOException {

// asyncThreshold 为异步读取文件的阈值,默认是10MB
if (fileSize >= asyncThreshold && fileName.endsWith(".orc")) {
return new AsyncRecordReader<>(
() -> createRecordReader(schemaId, fileName, level, false, 2, fileSize));
}
// return KeyValueDataFileRecordReader(FileRecordReader(orc、parquet、avro))
return createRecordReader(schemaId, fileName, level, true, null, fileSize);
}

@@ -135,6 +138,7 @@ private RecordReader<KeyValue> createRecordReader(
Path filePath = pathFactory.toPath(fileName);

RecordReader<InternalRow> fileRecordReader =
// Orc、Parquet or Avro file reader.
new FileRecordReader(
bulkFormatMapping.getReaderFactory(),
orcPoolSize == null
Original file line number Diff line number Diff line change
@@ -107,11 +107,12 @@ public <T> RecordReader<T> mergeSort(
@Nullable FieldsComparator userDefinedSeqComparator,
MergeFunctionWrapper<T> mergeFunction)
throws IOException {
// 溢出排序
if (ioManager != null && lazyReaders.size() > spillThreshold) {
return spillMergeSort(
lazyReaders, keyComparator, userDefinedSeqComparator, mergeFunction);
}

// 纯内存排序
return mergeSortNoSpill(
lazyReaders, keyComparator, userDefinedSeqComparator, mergeFunction);
}
Original file line number Diff line number Diff line change
@@ -51,8 +51,10 @@ public static <T> RecordReader<T> readerForMergeTree(
throws IOException {
List<ReaderSupplier<T>> readers = new ArrayList<>();
for (List<SortedRun> section : sections) {
// 每一个 sort 对应一个 SortMergeReader
readers.add(
() ->
// return sort reader(min-heap reader or loser-sort reader)
readerForSection(
section,
readerFactory,
@@ -83,18 +85,26 @@ public long estimateSize() {

@Override
public RecordReader<KeyValue> get() throws IOException {
/**
* 1. 遍历 SortRun 中的文件,创建 FileReader 2.
* 返回ConcatReader,简单的将所有的文件读取作为一个整体进行返回
*/
return readerForRun(run, readerFactory);
}
});
}
// 对section 中需要合并的每个SortRun中的文件进行合并, 生成 MergeSort 排序, 会存在spill 和 no spill两种(随后进行分析)
return mergeSorter.mergeSort(
readers, userKeyComparator, userDefinedSeqComparator, mergeFunctionWrapper);
}

// 为 SortRun 中的每个文件生成一个reader
private static RecordReader<KeyValue> readerForRun(
SortedRun run, FileReaderFactory<KeyValue> readerFactory) throws IOException {
List<ReaderSupplier<KeyValue>> readers = new ArrayList<>();
// 对每个文件生成一个 file reader
for (DataFileMeta file : run.files()) {
// 通过 KeyValueFileReaderFactory 创建reader
readers.add(() -> readerFactory.createRecordReader(file));
}
return ConcatRecordReader.create(readers);
Original file line number Diff line number Diff line change
@@ -35,6 +35,8 @@
* input list is already sorted by key and sequence number, and the key intervals do not overlap
* each other.
*/

// 拼接的reader
public class ConcatRecordReader<T> implements RecordReader<T> {

private final Queue<ReaderSupplier<T>> queue;
@@ -45,6 +47,7 @@ protected ConcatRecordReader(List<? extends ReaderSupplier<T>> readerFactories)
readerFactories.forEach(
supplier ->
Preconditions.checkNotNull(supplier, "Reader factory must not be null."));
// 将 reader 添加队列中, 最终会依次访问
this.queue = new LinkedList<>(readerFactories);
}

@@ -63,6 +66,7 @@ public static <R> RecordReader<R> create(ReaderSupplier<R> reader1, ReaderSuppli
public RecordIterator<T> readBatch() throws IOException {
while (true) {
if (current != null) {
// 依次遍历 SortMergeReader
RecordIterator<T> iterator = current.readBatch();
if (iterator != null) {
return iterator;
Original file line number Diff line number Diff line change
@@ -70,12 +70,17 @@ public IntervalPartition(List<DataFileMeta> inputFiles, Comparator<InternalRow>
public List<List<SortedRun>> partition() {
List<List<SortedRun>> result = new ArrayList<>();
List<DataFileMeta> section = new ArrayList<>();

// 边界值
BinaryRow bound = null;

// files 是通过策略选取出来的
// 然后对策略进行分区
for (DataFileMeta meta : files) {
// 比较是不是有重叠
// 如果当前的文件的minkey 大于上个文件的maxkey, 则进行
if (!section.isEmpty() && keyComparator.compare(meta.minKey(), bound) > 0) {
// larger than current right bound, conclude current section and create a new one 大于当前右边界,结束当前部分并创建一个新的部分
// larger than current right bound, conclude current section and create a new one
// 大于当前右边界,结束当前部分并创建一个拆分
result.add(partition(section));
section.clear();
bound = null;
@@ -112,11 +117,14 @@ private List<SortedRun> partition(List<DataFileMeta> metas) {
// any file list whose max key < meta.minKey() is sufficient,
// for convenience we pick the smallest
List<DataFileMeta> top = queue.poll();

if (keyComparator.compare(meta.minKey(), top.get(top.size() - 1).maxKey()) > 0) {
// 有交叉,需要合并
// append current file to an existing partition
top.add(meta);
} else {
// create a new partition
// 数据没有交叉,生成一个新的
List<DataFileMeta> newRun = new ArrayList<>();
newRun.add(meta);
queue.add(newRun);
Original file line number Diff line number Diff line change
@@ -115,7 +115,8 @@ public List<DataFileMeta> allFiles() {
@Override
public void triggerCompaction(boolean fullCompaction) {
Optional<CompactUnit> optionalUnit;
// L0 每一个文件对应一个sorted run, L0往下每一层有一个sorted run. 每一个sorted run 对应一个或多个文件,文件写到一定大小就会rolling out. 所以同一层的sorted run看做是一个全局按照pk排序的文件.
// L0 每一个文件对应一个sorted run, L0往下每一层有一个sorted run. 每一个sorted run 对应一个或多个文件,文件写到一定大小就会rolling
// out. 所以同一层的sorted run看做是一个全局按照pk排序的文件.
List<LevelSortedRun> runs = levels.levelSortedRuns();
if (fullCompaction) {
Preconditions.checkState(
Original file line number Diff line number Diff line change
@@ -80,12 +80,14 @@ protected CompactResult rewriteCompaction(
RecordReader<KeyValue> reader = null;
Exception collectedExceptions = null;
try {
// ConcatRecordReader(合并 sections, 每个 section 都有一个List<SortedRun> 需要合并)
reader =
readerForMergeTree(
sections, new ReducerMergeFunctionWrapper(mfFactory.create()));
if (dropDelete) {
reader = new DropDeleteReader(reader);
}
// 需要将这些数据遍历并写入文件中
writer.write(new RecordReaderIterator<>(reader));
} catch (Exception e) {
collectedExceptions = e;
Original file line number Diff line number Diff line change
@@ -66,6 +66,8 @@ public MergeTreeCompactTask(
this.rewriter = rewriter;
this.outputLevel = unit.outputLevel();
this.compactDfSupplier = compactDfSupplier;

// 将有范围交叉的进行分区后处理
this.partitioned = new IntervalPartition(unit.files(), keyComparator).partition();
this.dropDelete = dropDelete;
this.maxLevel = maxLevel;
@@ -85,26 +87,34 @@ protected CompactResult doCompact() throws Exception {

// 检查相邻和顺序的文件并进行压缩,顺序去执行压缩
for (List<SortedRun> section : partitioned) {
// 当前分割批次的数量> 1, 则直接加到后补名单中
if (section.size() > 1) {
candidate.add(section);
} else {

SortedRun run = section.get(0);
// No overlapping:
// We can just upgrade the large file and just change the level instead of
// rewriting it
// But for small files, we will try to compact it

// 小文件进行压缩,大的文件,直接升级大下个层级即可
for (DataFileMeta file : run.files()) {
if (file.fileSize() < minFileSize) {
// Smaller files are rewritten along with the previous files
// 小文件需要先压缩
candidate.add(singletonList(SortedRun.fromSingle(file)));
} else {
// Large file appear, rewrite previous and upgrade it
// 先 rewrite 小文件
rewrite(candidate, result);
// 把大文件升级到下一层级
upgrade(file, result);
}
}
}
}
// 重写
rewrite(candidate, result);
result.setDeletionFile(compactDfSupplier.get());
return result;
@@ -119,16 +129,19 @@ protected String logMetric(
}

private void upgrade(DataFileMeta file, CompactResult toUpdate) throws Exception {
// 若当前文件的level与输出文件的level相等,则不进行任何操作
if (file.level() == outputLevel) {
return;
}

if (outputLevel != maxLevel || file.deleteRowCount().map(d -> d == 0).orElse(false)) {
// 如果outputLevel不是最大的level, 或者当前文件的删除的行是0, 则进行
CompactResult upgradeResult = rewriter.upgrade(outputLevel, file);
toUpdate.merge(upgradeResult);
upgradeFilesNum++;
} else {
// files with delete records should not be upgraded directly to max level
// 有删除记录的文件不能直接升级到最高的level
List<List<SortedRun>> candidate = new ArrayList<>();
candidate.add(new ArrayList<>());
candidate.get(0).add(SortedRun.fromSingle(file));
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@
*/
public interface SortMergeReader<T> extends RecordReader<T> {

// 创建 sort merge reader
static <T> SortMergeReader<T> createSortMergeReader(
List<RecordReader<KeyValue>> readers,
Comparator<InternalRow> userKeyComparator,
Original file line number Diff line number Diff line change
@@ -87,7 +87,7 @@ public RecordIterator<T> readBatch() throws IOException {
// empty iterator, clean up and try next batch
iterator.releaseBatch();
} else {
// found next kv
// found next kv (插入指定的元素)
minHeap.offer(new Element(kv, iterator, reader));
break;
}
@@ -197,6 +197,7 @@ private Element(
}

// IMPORTANT: Must not call this for elements still in priority queue!
// 对于仍在优先级队列中的元素,不得调用此命令
private boolean update() throws IOException {
KeyValue nextKv = iterator.next();
if (nextKv == null) {

0 comments on commit 5796c12

Please sign in to comment.