Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Oct 10, 2024
1 parent 112b4dc commit aaf66b2
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 11 deletions.
14 changes: 7 additions & 7 deletions paimon-core/src/main/java/org/apache/paimon/branch/Branch.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,19 @@ public class Branch {

@JsonProperty(FIELD_BRANCH_TIME_RETAINED)
@Nullable
private final Duration tagTimeRetained;
private final Duration branchTimeRetained;

public Branch(
@JsonProperty(FIELD_BRANCH_NAME) String branchName,
@JsonProperty(FIELD_FROM_SNAPSHOT_ID) Long fromSnapshotId,
@JsonProperty(FIELD_FROM_TAG_NAME) String fromTagName,
@JsonProperty(FIELD_BRANCH_CREATE_TIME) LocalDateTime branchCreateTime,
@JsonProperty(FIELD_BRANCH_TIME_RETAINED) Duration tagTimeRetained) {
@JsonProperty(FIELD_BRANCH_TIME_RETAINED) Duration branchTimeRetained) {
this.branchName = branchName;
this.fromSnapshotId = fromSnapshotId;
this.fromTagName = fromTagName;
this.branchCreateTime = branchCreateTime;
this.tagTimeRetained = tagTimeRetained;
this.branchTimeRetained = branchTimeRetained;
}

@JsonGetter(FIELD_BRANCH_NAME)
Expand Down Expand Up @@ -101,8 +101,8 @@ public LocalDateTime getBranchCreateTime() {

@JsonGetter(FIELD_BRANCH_TIME_RETAINED)
@Nullable
public Duration getTagTimeRetained() {
return tagTimeRetained;
public Duration getBranchTimeRetained() {
return branchTimeRetained;
}

public static Branch fromTagAndBranchTtl(
Expand Down Expand Up @@ -150,12 +150,12 @@ public boolean equals(Object o) {
&& Objects.equals(fromSnapshotId, branch.fromSnapshotId)
&& Objects.equals(fromTagName, branch.fromTagName)
&& Objects.equals(branchCreateTime, branch.branchCreateTime)
&& Objects.equals(tagTimeRetained, branch.tagTimeRetained);
&& Objects.equals(branchTimeRetained, branch.branchTimeRetained);
}

@Override
public int hashCode() {
return Objects.hash(
branchName, fromSnapshotId, fromTagName, branchCreateTime, tagTimeRetained);
branchName, fromSnapshotId, fromTagName, branchCreateTime, branchTimeRetained);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.table.system;

import org.apache.paimon.Snapshot;
import org.apache.paimon.branch.Branch;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
Expand Down Expand Up @@ -87,7 +88,9 @@ public class BranchesTable implements ReadonlyTable {
new DataField(
1, "created_from_tag", SerializationUtils.newStringType(true)),
new DataField(2, "created_from_snapshot", new BigIntType(true)),
new DataField(3, "create_time", new TimestampType(false, 3))));
new DataField(3, "create_time", new TimestampType(false, 3)),
new DataField(
4, "time_retained", SerializationUtils.newStringType(true))));

private final FileIO fileIO;
private final Path location;
Expand Down Expand Up @@ -229,6 +232,8 @@ private List<InternalRow> branches(FileStoreTable table) throws IOException {
BranchManager branchManager = table.branchManager();
SchemaManager schemaManager = new SchemaManager(fileIO, table.location());

Map<String, Branch> branchesAsMap = branchManager.branchObjectsAsMap();

List<Pair<Path, Long>> paths =
listVersionedDirectories(fileIO, branchManager.branchDirectory(), BRANCH_PREFIX)
.map(status -> Pair.of(status.getPath(), status.getModificationTime()))
Expand Down Expand Up @@ -267,14 +272,29 @@ private List<InternalRow> branches(FileStoreTable table) throws IOException {
}
}
}
Branch currentBranch = null;
if (branchesAsMap.containsKey(branchName)) {
currentBranch = branchesAsMap.get(branchName);
}

result.add(
GenericRow.of(
BinaryString.fromString(branchName),
BinaryString.fromString(basedTag),
basedSnapshotId,
Timestamp.fromLocalDateTime(
DateTimeUtils.toLocalDateTime(creationTime))));
(currentBranch == null
|| currentBranch.getBranchCreateTime() == null)
? Timestamp.fromLocalDateTime(
DateTimeUtils.toLocalDateTime(creationTime))
: Timestamp.fromLocalDateTime(
currentBranch.getBranchCreateTime()),
(currentBranch == null
|| currentBranch.getBranchTimeRetained() == null)
? null
: Optional.ofNullable(currentBranch.getBranchTimeRetained())
.map(Object::toString)
.map(BinaryString::fromString)
.orElse(null)));
}

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -258,6 +260,15 @@ public List<String> branches() {
}
}

public Map<String, Branch> branchObjectsAsMap() {
List<Branch> branches = branchObjects();
if (branches.isEmpty()) {
return Collections.emptyMap();
}
return branchObjects().stream()
.collect(Collectors.toMap(branch -> branch.getBranchName(), branch -> branch));
}

/** Get all {@link Tag}s. */
public List<Branch> branchObjects() {
try {
Expand Down Expand Up @@ -301,7 +312,7 @@ public List<Branch> expireBranches() {
List<Branch> branches = branchObjects();
for (Branch branch : branches) {
LocalDateTime createTime = branch.getBranchCreateTime();
Duration timeRetained = branch.getTagTimeRetained();
Duration timeRetained = branch.getBranchTimeRetained();
if (createTime == null || timeRetained == null) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.TimeUtils;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -90,4 +91,19 @@ void testBranches() throws Exception {
.collect(Collectors.toList()))
.containsExactlyInAnyOrder("my_branch1", "my_branch2", "my_branch3");
}

@Test
void testBranchesWithMaxAge() throws Exception {
String retainTime = "10000ms";
table.createBranch("my_branch1", "2023-07-17", TimeUtils.parseDuration(retainTime));
table.createBranch("my_branch2", "2023-07-18", TimeUtils.parseDuration(retainTime));
table.createBranch("my_branch3", "2023-07-18", TimeUtils.parseDuration(retainTime));
List<InternalRow> branches = read(branchesTable);
assertThat(branches.size()).isEqualTo(3);
assertThat(
branches.stream()
.map(v -> v.getString(0).toString())
.collect(Collectors.toList()))
.containsExactlyInAnyOrder("my_branch1", "my_branch2", "my_branch3");
}
}

0 comments on commit aaf66b2

Please sign in to comment.