Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Oct 11, 2024
1 parent fd78096 commit 5d32623
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.paimon.table.system;

import org.apache.paimon.Snapshot;
import org.apache.paimon.branch.Branch;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
Expand All @@ -29,8 +28,6 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.ReadonlyTable;
Expand All @@ -46,9 +43,7 @@
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.IteratorRecordReader;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.SerializationUtils;

Expand All @@ -64,14 +59,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.SortedMap;
import java.util.stream.Collectors;

import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
import static org.apache.paimon.utils.BranchManager.BRANCH_PREFIX;
import static org.apache.paimon.utils.BranchManager.branchPath;
import static org.apache.paimon.utils.FileUtils.listVersionedDirectories;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** A {@link Table} for showing branches of table. */
public class BranchesTable implements ReadonlyTable {
Expand Down Expand Up @@ -230,73 +219,25 @@ public RecordReader<InternalRow> createReader(Split split) {

private List<InternalRow> branches(FileStoreTable table) throws IOException {
BranchManager branchManager = table.branchManager();
SchemaManager schemaManager = new SchemaManager(fileIO, table.location());

Map<String, Branch> branchesAsMap = branchManager.branchObjectsAsMap();

List<Pair<Path, Long>> paths =
listVersionedDirectories(fileIO, branchManager.branchDirectory(), BRANCH_PREFIX)
.map(status -> Pair.of(status.getPath(), status.getModificationTime()))
.collect(Collectors.toList());
List<Branch> branches = branchManager.branchObjects();
List<InternalRow> result = new ArrayList<>();

for (Pair<Path, Long> path : paths) {
String branchName = path.getLeft().getName().substring(BRANCH_PREFIX.length());
String basedTag = null;
Long basedSnapshotId = null;
long creationTime = path.getRight();

Optional<TableSchema> tableSchema =
schemaManager.copyWithBranch(branchName).latest();
if (tableSchema.isPresent()) {
FileStoreTable branchTable =
FileStoreTableFactory.create(
fileIO, new Path(branchPath(table.location(), branchName)));
SortedMap<Snapshot, List<String>> snapshotTags =
branchTable.tagManager().tags();
Long earliestSnapshotId = branchTable.snapshotManager().earliestSnapshotId();
if (snapshotTags.isEmpty()) {
// create based on snapshotId
basedSnapshotId = earliestSnapshotId;
} else {
Snapshot snapshot = snapshotTags.firstKey();
if (Objects.equals(earliestSnapshotId, snapshot.id())) {
// create based on tag
List<String> tags = snapshotTags.get(snapshot);
checkArgument(tags.size() == 1);
basedTag = tags.get(0);
basedSnapshotId = snapshot.id();
} else {
// create based on snapshotId
basedSnapshotId = earliestSnapshotId;
}
}
}
Branch currentBranch = null;
if (branchesAsMap.containsKey(branchName)) {
currentBranch = branchesAsMap.get(branchName);
}

for (Branch branch : branches) {
result.add(
GenericRow.of(
BinaryString.fromString(branchName),
BinaryString.fromString(basedTag),
basedSnapshotId,
(currentBranch == null
|| currentBranch.getBranchCreateTime() == null)
? Timestamp.fromLocalDateTime(
DateTimeUtils.toLocalDateTime(creationTime))
: Timestamp.fromLocalDateTime(
currentBranch.getBranchCreateTime()),
(currentBranch == null
|| currentBranch.getBranchTimeRetained() == null)
BinaryString.fromString(branch.getBranchName()),
BinaryString.fromString(branch.getFromTagName()),
branch.getFromSnapshotId(),
branch.getBranchCreateTime() != null
? Timestamp.fromLocalDateTime(branch.getBranchCreateTime())
: null,
(branch.getBranchTimeRetained() == null)
? null
: Optional.ofNullable(currentBranch.getBranchTimeRetained())
: Optional.ofNullable(branch.getBranchTimeRetained())
.map(Object::toString)
.map(BinaryString::fromString)
.orElse(null)));
}

return result;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.tag.Tag;

import org.slf4j.Logger;
Expand All @@ -34,9 +36,10 @@
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.SortedMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -260,28 +263,23 @@ public List<String> branches() {
}
}

public Map<String, Branch> branchObjectsAsMap() {
List<Branch> branches = branchObjects();
if (branches.isEmpty()) {
return Collections.emptyMap();
}
return branchObjects().stream()
.collect(Collectors.toMap(branch -> branch.getBranchName(), branch -> branch));
}

/** Get all {@link Tag}s. */
public List<Branch> branchObjects() {
try {
List<Path> paths =
List<Pair<Path, Long>> paths =
listVersionedDirectories(fileIO, branchDirectory(), BRANCH_PREFIX)
.map(status -> status.getPath())
.map(status -> Pair.of(status.getPath(), status.getModificationTime()))
.collect(Collectors.toList());
List<Branch> branches = new ArrayList<>();
for (Path path : paths) {
String branchName = path.getName().substring(BRANCH_PREFIX.length());

for (Pair<Path, Long> path : paths) {
String branchName = path.getLeft().getName().substring(BRANCH_PREFIX.length());
Branch branch = Branch.safelyFromPath(fileIO, branchMetadataPath(branchName));
if (branch != null) {
branches.add(branch);
} else {
// Compatible with older versions
branches.add(extractBranchInfo(path, branchName));
}
}
return branches;
Expand All @@ -290,6 +288,43 @@ public List<Branch> branchObjects() {
}
}

private Branch extractBranchInfo(Pair<Path, Long> path, String branchName) {
String fromTagName = null;
Long fromSnapshotId = null;
long creationTime = path.getRight();

Optional<TableSchema> tableSchema = schemaManager.copyWithBranch(branchName).latest();
if (tableSchema.isPresent()) {
FileStoreTable branchTable =
FileStoreTableFactory.create(
fileIO, new Path(branchPath(tablePath, branchName)));
SortedMap<Snapshot, List<String>> snapshotTags = branchTable.tagManager().tags();
Long earliestSnapshotId = branchTable.snapshotManager().earliestSnapshotId();
if (snapshotTags.isEmpty()) {
// create based on snapshotId
fromSnapshotId = earliestSnapshotId;
} else {
Snapshot snapshot = snapshotTags.firstKey();
if (Objects.equals(earliestSnapshotId, snapshot.id())) {
// create based on tag
List<String> tags = snapshotTags.get(snapshot);
checkArgument(tags.size() == 1);
fromTagName = tags.get(0);
fromSnapshotId = snapshot.id();
} else {
// create based on snapshotId
fromSnapshotId = earliestSnapshotId;
}
}
}
return new Branch(
branchName,
fromSnapshotId,
fromTagName,
DateTimeUtils.toLocalDateTime(creationTime),
null);
}

private void validateBranch(String branchName) {
checkArgument(
!isMainBranch(branchName),
Expand Down

0 comments on commit 5d32623

Please sign in to comment.