Skip to content

Commit

Permalink
Merge branch 'master' into hotfix/bump-dependency-0905
Browse files Browse the repository at this point in the history
  • Loading branch information
czy006 committed Sep 10, 2024
2 parents ff0ef02 + b52f64d commit 4cbcd65
Show file tree
Hide file tree
Showing 26 changed files with 279 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.amoro.server.dashboard.utils.AmsUtil;
import org.apache.amoro.server.dashboard.utils.CommonUtil;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.plan.OptimizingEvaluator;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.shade.guava32.com.google.common.base.Function;
Expand Down Expand Up @@ -149,6 +150,10 @@ public void getTableDetail(Context ctx) {
if (serverTableIdentifier.isPresent()) {
TableRuntime tableRuntime = tableService.getRuntime(serverTableIdentifier.get());
tableSummary.setOptimizingStatus(tableRuntime.getOptimizingStatus().name());
OptimizingEvaluator.PendingInput tableRuntimeSummary = tableRuntime.getTableSummary();
if (tableRuntimeSummary != null) {
tableSummary.setHealthScore(tableRuntimeSummary.getHealthScore());
}
} else {
tableSummary.setOptimizingStatus(OptimizingStatus.IDLE.name());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ public void expireSnapshots(TableRuntime tableRuntime) {
if (!expireSnapshotEnabled(tableRuntime)) {
return;
}
expireSnapshots(mustOlderThan(tableRuntime, System.currentTimeMillis()));
expireSnapshots(
mustOlderThan(tableRuntime, System.currentTimeMillis()),
tableRuntime.getTableConfiguration().getSnapshotMinCount());
}

protected boolean expireSnapshotEnabled(TableRuntime tableRuntime) {
Expand All @@ -176,18 +178,22 @@ protected boolean expireSnapshotEnabled(TableRuntime tableRuntime) {
}

@VisibleForTesting
void expireSnapshots(long mustOlderThan) {
expireSnapshots(mustOlderThan, expireSnapshotNeedToExcludeFiles());
void expireSnapshots(long mustOlderThan, int minCount) {
expireSnapshots(mustOlderThan, minCount, expireSnapshotNeedToExcludeFiles());
}

private void expireSnapshots(long olderThan, Set<String> exclude) {
LOG.debug("start expire snapshots older than {}, the exclude is {}", olderThan, exclude);
private void expireSnapshots(long olderThan, int minCount, Set<String> exclude) {
LOG.debug(
"start expire snapshots older than {} and retain last {} snapshots, the exclude is {}",
olderThan,
minCount,
exclude);
final AtomicInteger toDeleteFiles = new AtomicInteger(0);
Set<String> parentDirectories = new HashSet<>();
Set<String> expiredFiles = new HashSet<>();
table
.expireSnapshots()
.retainLast(1)
.retainLast(Math.max(minCount, 1))
.expireOlderThan(olderThan)
.deleteWith(
file -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ public void expireSnapshots(TableRuntime tableRuntime) {
}

@VisibleForTesting
protected void expireSnapshots(long mustOlderThan) {
protected void expireSnapshots(long mustOlderThan, int minCount) {
if (changeMaintainer != null) {
changeMaintainer.expireSnapshots(mustOlderThan);
changeMaintainer.expireSnapshots(mustOlderThan, minCount);
}
baseMaintainer.expireSnapshots(mustOlderThan);
baseMaintainer.expireSnapshots(mustOlderThan, minCount);
}

@Override
Expand Down Expand Up @@ -291,9 +291,9 @@ public ChangeTableMaintainer(UnkeyedTable unkeyedTable) {

@Override
@VisibleForTesting
void expireSnapshots(long mustOlderThan) {
void expireSnapshots(long mustOlderThan, int minCount) {
expireFiles(mustOlderThan);
super.expireSnapshots(mustOlderThan);
super.expireSnapshots(mustOlderThan, minCount);
}

@Override
Expand All @@ -303,7 +303,9 @@ public void expireSnapshots(TableRuntime tableRuntime) {
}
long now = System.currentTimeMillis();
expireFiles(now - snapshotsKeepTime(tableRuntime));
expireSnapshots(mustOlderThan(tableRuntime, now));
expireSnapshots(
mustOlderThan(tableRuntime, now),
tableRuntime.getTableConfiguration().getSnapshotMinCount());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ protected interface TaskSplitter {
List<SplitTask> splitTasks(int targetTaskCount);
}

@Override
public int getHealthScore() {
return evaluator.getHealthScore();
}

@Override
public int getFragmentFileCount() {
return evaluator().getFragmentFileCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,59 @@ public boolean anyDeleteExist() {
return equalityDeleteFileCount > 0 || posDeleteFileCount > 0;
}

@Override
public int getHealthScore() {
long dataFilesSize = getFragmentFileSize() + getSegmentFileSize();
long dataFiles = getFragmentFileCount() + getSegmentFileCount();
long dataRecords = getFragmentFileRecords() + getSegmentFileRecords();

double averageDataFileSize = getNormalizedRatio(dataFilesSize, dataFiles);
double eqDeleteRatio = getNormalizedRatio(equalityDeleteFileRecords, dataRecords);
double posDeleteRatio = getNormalizedRatio(posDeleteFileRecords, dataRecords);

double tablePenaltyFactor = getTablePenaltyFactor(dataFiles, dataFilesSize);
return (int)
Math.ceil(
100
- tablePenaltyFactor
* (40 * getSmallFilePenaltyFactor(averageDataFileSize)
+ 40 * getEqDeletePenaltyFactor(eqDeleteRatio)
+ 20 * getPosDeletePenaltyFactor(posDeleteRatio)));
}

private double getEqDeletePenaltyFactor(double eqDeleteRatio) {
double eqDeleteRatioThreshold = config.getMajorDuplicateRatio();
return getNormalizedRatio(eqDeleteRatio, eqDeleteRatioThreshold);
}

private double getPosDeletePenaltyFactor(double posDeleteRatio) {
double posDeleteRatioThreshold = config.getMajorDuplicateRatio() * 2;
return getNormalizedRatio(posDeleteRatio, posDeleteRatioThreshold);
}

private double getSmallFilePenaltyFactor(double averageDataFileSize) {
return 1 - getNormalizedRatio(averageDataFileSize, minTargetSize);
}

private double getTablePenaltyFactor(long dataFiles, long dataFilesSize) {
// if the number of table files is less than or equal to 1,
// there is no penalty, i.e., the table is considered to be perfectly healthy
if (dataFiles <= 1) {
return 0;
}
// The small table has very little impact on performance,
// so there is only a small penalty
return getNormalizedRatio(dataFiles, config.getMinorLeastFileCount())
* getNormalizedRatio(dataFilesSize, config.getTargetSize());
}

private double getNormalizedRatio(double numerator, double denominator) {
if (denominator <= 0) {
return 0;
}
return Math.min(numerator, denominator) / denominator;
}

@Override
public int getFragmentFileCount() {
return fragmentFileCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,12 @@ public static class PendingInput {
private long equalityDeleteBytes = 0L;
private long equalityDeleteFileRecords = 0L;
private long positionalDeleteFileRecords = 0L;
private int healthScore = -1; // -1 means not calculated

public PendingInput() {}

public PendingInput(Collection<PartitionEvaluator> evaluators) {
double totalHealthScore = 0;
for (PartitionEvaluator evaluator : evaluators) {
partitions
.computeIfAbsent(evaluator.getPartition().first(), ignore -> Sets.newHashSet())
Expand All @@ -217,7 +219,9 @@ public PendingInput(Collection<PartitionEvaluator> evaluators) {
equalityDeleteBytes += evaluator.getEqualityDeleteFileSize();
equalityDeleteFileRecords += evaluator.getEqualityDeleteFileRecords();
equalityDeleteFileCount += evaluator.getEqualityDeleteFileCount();
totalHealthScore += evaluator.getHealthScore();
}
healthScore = (int) Math.ceil(totalHealthScore / evaluators.size());
}

public Map<Integer, Set<StructLike>> getPartitions() {
Expand Down Expand Up @@ -260,6 +264,10 @@ public long getPositionalDeleteFileRecords() {
return positionalDeleteFileRecords;
}

public int getHealthScore() {
return healthScore;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
Expand All @@ -273,6 +281,7 @@ public String toString() {
.add("equalityDeleteBytes", equalityDeleteBytes)
.add("equalityDeleteFileRecords", equalityDeleteFileRecords)
.add("positionalDeleteFileRecords", positionalDeleteFileRecords)
.add("healthScore", healthScore)
.toString();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ interface Weight extends Comparable<Weight> {}
*/
OptimizingType getOptimizingType();

/** Get health score of this partition. */
int getHealthScore();

/** Get the count of fragment files involved in optimizing. */
int getFragmentFileCount();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.temporal.ChronoUnit;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
Expand All @@ -61,10 +62,19 @@ public static TableConfiguration parseTableConfig(Map<String, String> properties
TableProperties.ENABLE_TABLE_EXPIRE,
TableProperties.ENABLE_TABLE_EXPIRE_DEFAULT))
.setSnapshotTTLMinutes(
CompatiblePropertyUtil.propertyAsLong(
ConfigHelpers.TimeUtils.parseDuration(
CompatiblePropertyUtil.propertyAsString(
properties,
TableProperties.SNAPSHOT_KEEP_DURATION,
TableProperties.SNAPSHOT_KEEP_DURATION_DEFAULT),
ChronoUnit.MINUTES)
.getSeconds()
/ 60)
.setSnapshotMinCount(
CompatiblePropertyUtil.propertyAsInt(
properties,
TableProperties.BASE_SNAPSHOT_KEEP_MINUTES,
TableProperties.BASE_SNAPSHOT_KEEP_MINUTES_DEFAULT))
TableProperties.SNAPSHOT_MIN_COUNT,
TableProperties.SNAPSHOT_MIN_COUNT_DEFAULT))
.setChangeDataTTLMinutes(
CompatiblePropertyUtil.propertyAsLong(
properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,10 @@ public OptimizingEvaluator.PendingInput getPendingInput() {
return pendingInput;
}

public OptimizingEvaluator.PendingInput getTableSummary() {
return tableSummary;
}

private boolean updateConfigInternal(Map<String, String> properties) {
TableConfiguration newTableConfig = TableConfigurations.parseTableConfig(properties);
if (tableConfiguration.equals(newTableConfig)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,20 @@ public class TableSummaryMetrics {
.withTags("catalog", "database", "table")
.build();

// table summary snapshots number metrics
// table summary snapshots number metric
public static final MetricDefine TABLE_SUMMARY_SNAPSHOTS =
defineGauge("table_summary_snapshots")
.withDescription("Number of snapshots in the table")
.withTags("catalog", "database", "table")
.build();

// table summary health score metric
public static final MetricDefine TABLE_SUMMARY_HEALTH_SCORE =
defineGauge("table_summary_health_score")
.withDescription("Health score of the table")
.withTags("catalog", "database", "table")
.build();

private final ServerTableIdentifier identifier;
private final List<MetricKey> registeredMetricKeys = Lists.newArrayList();
private MetricRegistry globalRegistry;
Expand All @@ -136,6 +143,7 @@ public class TableSummaryMetrics {
private long dataFilesRecords = 0L;
private long equalityDeleteFilesRecords = 0L;
private long snapshots = 0L;
private long healthScore = 0L;

public TableSummaryMetrics(ServerTableIdentifier identifier) {
this.identifier = identifier;
Expand Down Expand Up @@ -191,9 +199,12 @@ public void register(MetricRegistry registry) {
TABLE_SUMMARY_EQUALITY_DELETE_FILES_RECORDS,
(Gauge<Long>) () -> equalityDeleteFilesRecords);

// register snapshots number metrics
// register snapshots number metric
registerMetric(registry, TABLE_SUMMARY_SNAPSHOTS, (Gauge<Long>) () -> snapshots);

// register health score metric
registerMetric(registry, TABLE_SUMMARY_HEALTH_SCORE, (Gauge<Long>) () -> healthScore);

globalRegistry = registry;
}
}
Expand Down Expand Up @@ -231,6 +242,8 @@ public void refresh(OptimizingEvaluator.PendingInput tableSummary) {
positionDeleteFilesRecords = tableSummary.getPositionalDeleteFileRecords();
dataFilesRecords = tableSummary.getDataFileRecords();
equalityDeleteFilesRecords = tableSummary.getEqualityDeleteFileRecords();

healthScore = tableSummary.getHealthScore();
}

public void refreshSnapshots(MixedTable table) {
Expand Down
Loading

0 comments on commit 4cbcd65

Please sign in to comment.