Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-3317]: Move optimizing planer and scan to iceberg/mixed-format module #3314

Merged
merged 15 commits into from
Nov 13, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,4 @@ public class AmoroServiceConstants {
public static final long INVALID_TIME = 0;

public static final long QUOTA_LOOK_BACK_TIME = 60 * 60 * 1000;

public static final long INVALID_SNAPSHOT_ID = -1L;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,18 @@
import org.apache.amoro.api.CommitMetaProducer;
import org.apache.amoro.data.DataFileType;
import org.apache.amoro.data.FileNameRules;
import org.apache.amoro.optimizing.MetricsSummary;
import org.apache.amoro.optimizing.OptimizingType;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.process.ProcessTaskStatus;
import org.apache.amoro.server.dashboard.component.reverser.IcebergTableMetaExtract;
import org.apache.amoro.server.dashboard.model.TableBasicInfo;
import org.apache.amoro.server.dashboard.model.TableStatistics;
import org.apache.amoro.server.dashboard.utils.AmsUtil;
import org.apache.amoro.server.dashboard.utils.TableStatCollector;
import org.apache.amoro.server.optimizing.MetricsSummary;
import org.apache.amoro.server.optimizing.OptimizingProcessMeta;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.OptimizingTaskMeta;
import org.apache.amoro.server.optimizing.OptimizingType;
import org.apache.amoro.server.optimizing.TaskRuntime;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
Expand Down Expand Up @@ -558,7 +559,7 @@ public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
public Map<String, String> getTableOptimizingTypes(AmoroTable<?> amoroTable) {
Map<String, String> types = Maps.newHashMap();
for (OptimizingType type : OptimizingType.values()) {
types.put(type.name(), type.getStatus().displayValue());
types.put(type.name(), OptimizingStatus.ofOptimizingType(type).displayValue());
}
return types;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.amoro.hive.utils.HiveTableUtil;
import org.apache.amoro.hive.utils.UpgradeHiveTableUtil;
import org.apache.amoro.mixed.CatalogLoader;
import org.apache.amoro.optimizing.plan.AbstractOptimizingEvaluator;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.properties.HiveTableProperties;
Expand All @@ -49,7 +50,6 @@
import org.apache.amoro.server.dashboard.utils.AmsUtil;
import org.apache.amoro.server.dashboard.utils.CommonUtil;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.plan.OptimizingEvaluator;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.shade.guava32.com.google.common.base.Function;
Expand Down Expand Up @@ -152,7 +152,8 @@ public void getTableDetail(Context ctx) {
TableRuntime tableRuntime = tableService.getRuntime(serverTableIdentifier.get().getId());
if (tableRuntime != null) {
tableSummary.setOptimizingStatus(tableRuntime.getOptimizingStatus().name());
OptimizingEvaluator.PendingInput tableRuntimeSummary = tableRuntime.getTableSummary();
AbstractOptimizingEvaluator.PendingInput tableRuntimeSummary =
tableRuntime.getTableSummary();
if (tableRuntimeSummary != null) {
tableSummary.setHealthScore(tableRuntimeSummary.getHealthScore());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

package org.apache.amoro.server.dashboard.utils;

import org.apache.amoro.optimizing.MetricsSummary;
import org.apache.amoro.optimizing.plan.AbstractOptimizingEvaluator;
import org.apache.amoro.server.dashboard.model.TableOptimizingInfo;
import org.apache.amoro.server.optimizing.MetricsSummary;
import org.apache.amoro.server.optimizing.OptimizingProcess;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.plan.OptimizingEvaluator;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.table.descriptor.FilesStatistics;
import org.apache.iceberg.ContentFile;
Expand Down Expand Up @@ -61,7 +61,7 @@ public static TableOptimizingInfo buildTableOptimizeInfo(TableRuntime optimizing
}

private static FilesStatistics collectPendingFileInfo(
OptimizingEvaluator.PendingInput pendingInput) {
AbstractOptimizingEvaluator.PendingInput pendingInput) {
if (pendingInput == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@
import org.apache.amoro.data.PrimaryKeyedFile;
import org.apache.amoro.exception.OptimizingCommitException;
import org.apache.amoro.hive.utils.TableTypeUtil;
import org.apache.amoro.iceberg.Constants;
import org.apache.amoro.op.OverwriteBaseFiles;
import org.apache.amoro.op.SnapshotSummary;
import org.apache.amoro.optimizing.RewriteFilesInput;
import org.apache.amoro.optimizing.RewriteFilesOutput;
import org.apache.amoro.server.AmoroServiceConstants;
import org.apache.amoro.optimizing.RewriteStageTask;
import org.apache.amoro.table.MixedTable;
import org.apache.amoro.utils.ContentFiles;
import org.apache.amoro.utils.MixedTableUtil;
Expand Down Expand Up @@ -75,8 +76,7 @@ public KeyedTableCommit(
super(fromSnapshotId, table, tasks);
this.table = table;
this.tasks = tasks;
this.fromSnapshotId =
fromSnapshotId == null ? AmoroServiceConstants.INVALID_SNAPSHOT_ID : fromSnapshotId;
this.fromSnapshotId = fromSnapshotId == null ? Constants.INVALID_SNAPSHOT_ID : fromSnapshotId;
this.fromSequenceOfPartitions = fromSequenceOfPartitions;
this.toSequenceOfPartitions = toSequenceOfPartitions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.amoro.server.optimizing;

import org.apache.amoro.optimizing.MetricsSummary;
import org.apache.amoro.optimizing.OptimizingType;
import org.apache.amoro.process.ProcessStatus;

public interface OptimizingProcess {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.amoro.server.optimizing;

import org.apache.amoro.optimizing.MetricsSummary;
import org.apache.amoro.optimizing.OptimizingType;
import org.apache.amoro.process.ProcessStatus;

import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,23 @@
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.api.OptimizingTaskId;
import org.apache.amoro.exception.OptimizingClosedException;
import org.apache.amoro.optimizing.MetricsSummary;
import org.apache.amoro.optimizing.OptimizingType;
import org.apache.amoro.optimizing.RewriteFilesInput;
import org.apache.amoro.optimizing.RewriteStageTask;
import org.apache.amoro.optimizing.plan.AbstractOptimizingPlanner;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.server.AmoroServiceConstants;
import org.apache.amoro.server.manager.MetricManager;
import org.apache.amoro.server.optimizing.plan.OptimizingPlanner;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.TaskFilesPersistence;
import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
import org.apache.amoro.server.resource.OptimizerInstance;
import org.apache.amoro.server.resource.QuotaProvider;
import org.apache.amoro.server.table.TableManager;
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.utils.IcebergTableUtil;
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
Expand Down Expand Up @@ -263,8 +267,8 @@ private TableOptimizingProcess planInternal(TableRuntime tableRuntime) {
tableRuntime.beginPlanning();
try {
AmoroTable<?> table = tableManager.loadTable(tableRuntime.getTableIdentifier());
OptimizingPlanner planner =
OptimizingPlanner.createOptimizingPlanner(
AbstractOptimizingPlanner planner =
IcebergTableUtil.createOptimizingPlanner(
tableRuntime.refresh(table),
(MixedTable) table.originalTable(),
getAvailableCore(),
Expand Down Expand Up @@ -371,7 +375,7 @@ public TaskRuntime poll() {
}
}

public TableOptimizingProcess(OptimizingPlanner planner, TableRuntime tableRuntime) {
public TableOptimizingProcess(AbstractOptimizingPlanner planner, TableRuntime tableRuntime) {
processId = planner.getProcessId();
this.tableRuntime = tableRuntime;
optimizingType = planner.getOptimizingType();
Expand Down Expand Up @@ -585,7 +589,13 @@ public void commit() {

@Override
public MetricsSummary getSummary() {
return new MetricsSummary(taskMap.values());
List<MetricsSummary> taskSummaries =
taskMap.values().stream()
.map(TaskRuntime::getTaskDescriptor)
.map(RewriteStageTask::getSummary)
.collect(Collectors.toList());

return new MetricsSummary(taskSummaries);
}

private UnKeyedTableCommit buildCommit() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.amoro.server.optimizing;

import org.apache.amoro.optimizing.OptimizingType;

public enum OptimizingStatus {
FULL_OPTIMIZING("full", true, 100),
MAJOR_OPTIMIZING("major", true, 200),
Expand Down Expand Up @@ -67,4 +69,17 @@ public static OptimizingStatus ofDisplayValue(String displayValue) {
}
return null;
}

public static OptimizingStatus ofOptimizingType(OptimizingType optimizingType) {
switch (optimizingType) {
case FULL:
return FULL_OPTIMIZING;
case MAJOR:
return MAJOR_OPTIMIZING;
case MINOR:
return MINOR_OPTIMIZING;
default:
throw new IllegalStateException("unknown optimizing-type: " + optimizingType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.amoro.server.optimizing;

import org.apache.amoro.optimizing.MetricsSummary;

import java.util.Map;

/** A simplified meta of task, not include input/output files. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.amoro.exception.IllegalTaskStateException;
import org.apache.amoro.exception.OptimizingClosedException;
import org.apache.amoro.exception.TaskRuntimeException;
import org.apache.amoro.process.StagedTaskDescriptor;
import org.apache.amoro.server.AmoroServiceConstants;
import org.apache.amoro.server.persistence.StatedPersistentBase;
import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@
import org.apache.amoro.hive.utils.HivePartitionUtil;
import org.apache.amoro.hive.utils.HiveTableUtil;
import org.apache.amoro.hive.utils.TableTypeUtil;
import org.apache.amoro.iceberg.Constants;
import org.apache.amoro.op.SnapshotSummary;
import org.apache.amoro.optimizing.OptimizingInputProperties;
import org.apache.amoro.optimizing.RewriteFilesOutput;
import org.apache.amoro.optimizing.RewriteStageTask;
import org.apache.amoro.properties.HiveTableProperties;
import org.apache.amoro.server.AmoroServiceConstants;
import org.apache.amoro.server.utils.IcebergTableUtil;
import org.apache.amoro.table.MixedTable;
import org.apache.amoro.table.UnkeyedTable;
Expand Down Expand Up @@ -262,7 +263,7 @@ private void rewriteFiles(
}

RewriteFiles rewriteFiles = transaction.newRewrite();
if (targetSnapshotId != AmoroServiceConstants.INVALID_SNAPSHOT_ID) {
if (targetSnapshotId != Constants.INVALID_SNAPSHOT_ID) {
long sequenceNumber = table.asUnkeyedTable().snapshot(targetSnapshotId).sequenceNumber();
rewriteFiles.validateFromSnapshot(targetSnapshotId).dataSequenceNumber(sequenceNumber);
}
Expand Down Expand Up @@ -349,11 +350,11 @@ private DataFile moveTargetFiles(DataFile targetFile, String hiveLocation) {
private static Set<String> getCommittedDataFilesFromSnapshotId(
UnkeyedTable table, Long snapshotId) {
long currentSnapshotId = IcebergTableUtil.getSnapshotId(table, true);
if (currentSnapshotId == AmoroServiceConstants.INVALID_SNAPSHOT_ID) {
if (currentSnapshotId == Constants.INVALID_SNAPSHOT_ID) {
return Collections.emptySet();
}

if (snapshotId == AmoroServiceConstants.INVALID_SNAPSHOT_ID) {
if (snapshotId == Constants.INVALID_SNAPSHOT_ID) {
snapshotId = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import org.apache.amoro.api.CommitMetaProducer;
import org.apache.amoro.config.DataExpirationConfig;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.iceberg.Constants;
import org.apache.amoro.io.AuthenticatedFileIO;
import org.apache.amoro.io.PathInfo;
import org.apache.amoro.io.SupportsFileSystemOperations;
import org.apache.amoro.server.AmoroServiceConstants;
import org.apache.amoro.server.table.TableConfigurations;
import org.apache.amoro.server.table.TableOrphanFilesCleaningMetrics;
import org.apache.amoro.server.table.TableRuntime;
Expand Down Expand Up @@ -659,7 +659,7 @@ CloseableIterable<FileEntry> fileScan(
return CloseableIterable.empty();
}
long snapshotId = snapshot.snapshotId();
if (snapshotId == AmoroServiceConstants.INVALID_SNAPSHOT_ID) {
if (snapshotId == Constants.INVALID_SNAPSHOT_ID) {
tasks = tableScan.planFiles();
} else {
tasks = tableScan.useSnapshot(snapshotId).planFiles();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

import org.apache.amoro.TableFormat;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.optimizing.OptimizingType;
import org.apache.amoro.optimizing.plan.AbstractOptimizingEvaluator;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.OptimizingType;
import org.apache.amoro.server.optimizing.plan.OptimizingEvaluator;

import java.util.Map;

Expand All @@ -45,8 +45,8 @@ public class TableRuntimeMeta {
private long currentStatusStartTime;
private String optimizerGroup;
private TableConfiguration tableConfig;
private OptimizingEvaluator.PendingInput pendingInput;
private OptimizingEvaluator.PendingInput tableSummary;
private AbstractOptimizingEvaluator.PendingInput pendingInput;
private AbstractOptimizingEvaluator.PendingInput tableSummary;
private long optimizingProcessId = 0;
private ProcessStatus processStatus;
private OptimizingType optimizingType;
Expand Down Expand Up @@ -189,11 +189,11 @@ public void setLastOptimizedSnapshotId(long lastOptimizedSnapshotId) {
this.lastOptimizedSnapshotId = lastOptimizedSnapshotId;
}

public OptimizingEvaluator.PendingInput getTableSummary() {
public AbstractOptimizingEvaluator.PendingInput getTableSummary() {
return tableSummary;
}

public void setTableSummary(OptimizingEvaluator.PendingInput tableSummary) {
public void setTableSummary(AbstractOptimizingEvaluator.PendingInput tableSummary) {
this.tableSummary = tableSummary;
}

Expand Down Expand Up @@ -285,11 +285,11 @@ public void setSummary(String summary) {
this.summary = summary;
}

public OptimizingEvaluator.PendingInput getPendingInput() {
public AbstractOptimizingEvaluator.PendingInput getPendingInput() {
return pendingInput;
}

public void setPendingInput(OptimizingEvaluator.PendingInput pendingInput) {
public void setPendingInput(AbstractOptimizingEvaluator.PendingInput pendingInput) {
this.pendingInput = pendingInput;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.amoro.optimizing.RewriteFilesInput;
import org.apache.amoro.optimizing.RewriteFilesOutput;
import org.apache.amoro.server.optimizing.RewriteStageTask;
import org.apache.amoro.optimizing.RewriteStageTask;
import org.apache.amoro.server.optimizing.TaskRuntime;
import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
import org.apache.amoro.server.utils.CompressUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

package org.apache.amoro.server.persistence.converter;

import org.apache.amoro.server.optimizing.RewriteStageTask;
import org.apache.amoro.server.optimizing.StagedTaskDescriptor;
import org.apache.amoro.optimizing.RewriteStageTask;
import org.apache.amoro.process.StagedTaskDescriptor;
import org.apache.ibatis.type.JdbcType;
import org.apache.ibatis.type.TypeHandler;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
package org.apache.amoro.server.persistence.mapper;

import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.optimizing.MetricsSummary;
import org.apache.amoro.optimizing.OptimizingType;
import org.apache.amoro.optimizing.RewriteFilesInput;
import org.apache.amoro.optimizing.RewriteStageTask;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.server.optimizing.MetricsSummary;
import org.apache.amoro.process.StagedTaskDescriptor;
import org.apache.amoro.server.optimizing.OptimizingProcessMeta;
import org.apache.amoro.server.optimizing.OptimizingTaskMeta;
import org.apache.amoro.server.optimizing.OptimizingType;
import org.apache.amoro.server.optimizing.RewriteStageTask;
import org.apache.amoro.server.optimizing.StagedTaskDescriptor;
import org.apache.amoro.server.optimizing.TaskRuntime;
import org.apache.amoro.server.persistence.converter.JsonObjectConverter;
import org.apache.amoro.server.persistence.converter.Long2TsConverter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
import org.apache.amoro.metrics.Metric;
import org.apache.amoro.metrics.MetricDefine;
import org.apache.amoro.metrics.MetricKey;
import org.apache.amoro.optimizing.OptimizingType;
import org.apache.amoro.server.AmoroServiceConstants;
import org.apache.amoro.server.metrics.MetricRegistry;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.OptimizingType;
import org.apache.amoro.server.optimizing.maintainer.IcebergTableMaintainer;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
Expand Down
Loading
Loading