Skip to content

Commit

Permalink
Make TableFormat extensible
Browse files Browse the repository at this point in the history
  • Loading branch information
baiyangtx committed Sep 5, 2024
1 parent 04fef53 commit 72528e5
Show file tree
Hide file tree
Showing 17 changed files with 106 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public AmoroTable<?> loadTable(String database, String tableName) {
return null;
}
Preconditions.checkArgument(
TableFormat.MIXED_ICEBERG == tableMetadata.getFormat(),
TableFormat.MIXED_ICEBERG.equals(tableMetadata.getFormat()),
"Table: %s.%s.%s is not a mixed-iceberg table",
name(),
database,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,18 +492,14 @@ public void getTableList(Context ctx) {
ServerCatalog serverCatalog = tableService.getServerCatalog(catalog);
Function<TableFormat, String> formatToType =
format -> {
switch (format) {
case MIXED_HIVE:
case MIXED_ICEBERG:
return TableMeta.TableType.ARCTIC.toString();
case PAIMON:
return TableMeta.TableType.PAIMON.toString();
case ICEBERG:
return TableMeta.TableType.ICEBERG.toString();
case HUDI:
return TableMeta.TableType.HUDI.toString();
default:
throw new IllegalStateException("Unknown format");
if (format.equals(TableFormat.MIXED_HIVE) || format.equals(TableFormat.MIXED_ICEBERG)) {
return TableMeta.TableType.ARCTIC.toString();
} else if (format.equals(TableFormat.PAIMON)) {
return TableMeta.TableType.PAIMON.toString();
} else if (format.equals(TableFormat.ICEBERG)) {
return TableMeta.TableType.HUDI.toString();
} else {
throw new IllegalStateException("Unknown format");
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ default void cleanDanglingDeleteFiles(TableRuntime tableRuntime) {

static TableMaintainer ofTable(AmoroTable<?> amoroTable) {
TableFormat format = amoroTable.format();
if (format == TableFormat.MIXED_HIVE || format == TableFormat.MIXED_ICEBERG) {
if (TableFormat.MIXED_HIVE.equals(format) || TableFormat.MIXED_ICEBERG.equals(format)) {
return new MixedTableMaintainer((MixedTable) amoroTable.originalTable());
} else if (format == TableFormat.ICEBERG) {
} else if (TableFormat.ICEBERG.equals(format)) {
return new IcebergTableMaintainer((Table) amoroTable.originalTable());
} else {
throw new RuntimeException("Unsupported table type" + amoroTable.originalTable().getClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public TableRuntime getTableRuntime() {
protected void initEvaluator() {
long startTime = System.currentTimeMillis();
TableFileScanHelper tableFileScanHelper;
if (TableFormat.ICEBERG == mixedTable.format()) {
if (TableFormat.ICEBERG.equals(mixedTable.format())) {
tableFileScanHelper =
new IcebergTableFileScanHelper(mixedTable.asUnkeyedTable(), currentSnapshot.snapshotId());
} else {
Expand Down Expand Up @@ -142,7 +142,7 @@ private Map<String, String> partitionProperties(Pair<Integer, StructLike> partit
}

protected PartitionEvaluator buildEvaluator(Pair<Integer, StructLike> partition) {
if (TableFormat.ICEBERG == mixedTable.format()) {
if (TableFormat.ICEBERG.equals(mixedTable.format())) {
return new CommonPartitionEvaluator(tableRuntime, partition, System.currentTimeMillis());
} else {
Map<String, String> partitionProperties = partitionProperties(partition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public PartitionPlannerFactory(
}

public PartitionEvaluator buildPartitionPlanner(Pair<Integer, StructLike> partition) {
if (TableFormat.ICEBERG == mixedTable.format()) {
if (TableFormat.ICEBERG.equals(mixedTable.format())) {
return new IcebergPartitionPlan(tableRuntime, mixedTable, partition, planTime);
} else {
if (TableTypeUtil.isHive(mixedTable)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ private boolean triggerTableAdded(
AmoroTable<?> table =
catalog.loadTable(
serverTableIdentifier.getDatabase(), serverTableIdentifier.getTableName());
if (TableFormat.ICEBERG == table.format()) {
if (TableFormat.ICEBERG.equals(table.format())) {
if (TablePropertyUtil.isMixedTableStore(table.properties())) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class InternalTableUtil {
*/
public static boolean isLegacyMixedIceberg(
org.apache.amoro.server.table.TableMetadata internalTableMetadata) {
return TableFormat.MIXED_ICEBERG == internalTableMetadata.getFormat()
return TableFormat.MIXED_ICEBERG.equals(internalTableMetadata.getFormat())
&& !Boolean.parseBoolean(
internalTableMetadata.getProperties().get(MIXED_ICEBERG_BASED_REST));
}
Expand Down
73 changes: 66 additions & 7 deletions amoro-common/src/main/java/org/apache/amoro/TableFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,83 @@

package org.apache.amoro;

import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;

import java.util.Map;

/**
* Table formats Amoro supported
*
* @since 0.4.0
*/
public enum TableFormat {
ICEBERG,
MIXED_ICEBERG,
MIXED_HIVE,
PAIMON,
HUDI;
public final class TableFormat {
private static final Map<String, TableFormat> registeredFormats = Maps.newConcurrentMap();

/** Open-source table formats */
public static final TableFormat ICEBERG = register("ICEBERG");

public static final TableFormat MIXED_ICEBERG = register("MIXED_ICEBERG");
public static final TableFormat MIXED_HIVE = register("MIXED_ICEBERG");
public static final TableFormat PAIMON = register("PAIMON");
public static final TableFormat HUDI = register("HUDI");

/**
* Get all registered formats
*
* @return registered formats
*/
public static TableFormat[] values() {
return registeredFormats.values().toArray(new TableFormat[0]);
}

/**
* Register a new TableFormat
*
* @param name table format name
* @return TableFormat.
*/
public static TableFormat register(String name) {
return registeredFormats.computeIfAbsent(name, s -> new TableFormat(name));
}

private final String name;

private TableFormat(String name) {
Preconditions.checkNotNull(name, "TableFormat name should not be null");
this.name = name;
}

public String name() {
return name;
}

public boolean in(TableFormat... tableFormats) {
for (TableFormat tableFormat : tableFormats) {
if (this == tableFormat) {
if (this.equals(tableFormat)) {
return true;
}
}
return false;
}

@Override
public String toString() {
return this.name;
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
} else if (other == null || getClass() != other.getClass()) {
return false;
}
return this.name.equals(((TableFormat) other).name);
}

@Override
public int hashCode() {
return this.name.hashCode();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private static String catalogImpl(String metastoreType, Map<String, String> cata
tableFormats.size() == 1, "Catalog support only one table format now.");
TableFormat tableFormat = tableFormats.iterator().next();
Preconditions.checkArgument(
TableFormat.MIXED_HIVE == tableFormat || TableFormat.MIXED_ICEBERG == tableFormat,
TableFormat.MIXED_HIVE.equals(tableFormat) || TableFormat.MIXED_ICEBERG.equals(tableFormat),
"MixedCatalogLoader only support mixed-format, format: %s",
tableFormat.name());

Expand All @@ -98,20 +98,20 @@ private static String catalogImpl(String metastoreType, Map<String, String> cata
case CatalogMetaProperties.CATALOG_TYPE_GLUE:
case CatalogMetaProperties.CATALOG_TYPE_CUSTOM:
Preconditions.checkArgument(
TableFormat.MIXED_ICEBERG == tableFormat,
TableFormat.MIXED_ICEBERG.equals(tableFormat),
"%s catalog support mixed-iceberg table only.",
metastoreType);
catalogImpl = MIXED_ICEBERG_CATALOG_IMP;
break;
case CatalogMetaProperties.CATALOG_TYPE_HIVE:
if (TableFormat.MIXED_HIVE == tableFormat) {
if (TableFormat.MIXED_HIVE.equals(tableFormat)) {
catalogImpl = HIVE_CATALOG_IMPL;
} else {
catalogImpl = MIXED_ICEBERG_CATALOG_IMP;
}
break;
case CatalogMetaProperties.CATALOG_TYPE_AMS:
if (TableFormat.MIXED_ICEBERG == tableFormat) {
if (TableFormat.MIXED_ICEBERG.equals(tableFormat)) {
catalogImpl = INTERNAL_CATALOG_IMPL;
} else {
throw new IllegalArgumentException("Internal Catalog mixed-iceberg table only");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public static UnkeyedTable baseStore(MixedTable mixedTable) {
/** Return the table root location of the mixed-format table. */
public static String tableRootLocation(MixedTable mixedTable) {
String tableRootLocation;
if (TableFormat.ICEBERG != mixedTable.format() && mixedTable.isUnkeyedTable()) {
if (!TableFormat.ICEBERG.equals(mixedTable.format()) && mixedTable.isUnkeyedTable()) {
tableRootLocation = TableFileUtil.getFileDir(mixedTable.location());
} else {
tableRootLocation = mixedTable.location();
Expand Down Expand Up @@ -179,7 +179,7 @@ private static StructLikeMap<Long> readLegacyPartitionProperties(
* Mix format table will return directly after checking}.
*/
public static PartitionSpec getMixedTablePartitionSpecById(MixedTable mixedTable, int specId) {
if (mixedTable.format() == TableFormat.ICEBERG) {
if (TableFormat.ICEBERG.equals(mixedTable.format())) {
return mixedTable.asUnkeyedTable().specs().get(specId);
} else {
PartitionSpec spec = mixedTable.spec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private void setupCatalogs() {
}
HiveConf hiveConf = hms.getHiveConf();
for (TableFormat format : TableFormat.values()) {
if (format == TableFormat.HUDI) {
if (TableFormat.HUDI.equals(format)) {
continue;
}
// create catalog for all formats in AMS with hive metastore.
Expand All @@ -140,7 +140,7 @@ private void setupCatalogs() {
Joiner.on(',')
.join(
Arrays.stream(TableFormat.values())
.filter(f -> TableFormat.HUDI != f)
.filter(f -> !TableFormat.HUDI.equals(f))
.collect(Collectors.toList()));
allFormats.putToCatalogProperties(CatalogMetaProperties.TABLE_FORMATS, formats);
allFormats.setCatalogName(AMS_ALL_FORMAT_CATALOG_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void testTableFormats(TableFormat format, boolean sessionCatalog) {
+ " PARTITIONED BY (pt) ";
sql(sqlText);
int expect = 0;
if (TableFormat.PAIMON != format || !spark().version().startsWith("3.1")) {
if (!TableFormat.PAIMON.equals(format) || !spark().version().startsWith("3.1")) {
// write is not supported in spark3-1
sqlText =
"INSERT INTO "
Expand Down Expand Up @@ -109,7 +109,7 @@ public void testTableFormats(TableFormat format, boolean sessionCatalog) {
}

private String pkDDL(TableFormat format) {
if (TableFormat.MIXED_HIVE == format || TableFormat.MIXED_ICEBERG == format) {
if (TableFormat.MIXED_HIVE.equals(format) || TableFormat.MIXED_ICEBERG.equals(format)) {
return ", primary key(id)";
}
return "";
Expand Down Expand Up @@ -147,14 +147,10 @@ private void testVisitSubTable(TableFormat format, boolean sessionCatalog) {
}

List<String> subTableNames = Lists.newArrayList();
switch (format) {
case ICEBERG:
subTableNames = icebergInspectTableNames();
break;
case MIXED_ICEBERG:
case MIXED_HIVE:
subTableNames = mixedFormatSubTableNames();
break;
if (TableFormat.ICEBERG.equals(format)) {
subTableNames = icebergInspectTableNames();
} else if (format.in(TableFormat.MIXED_HIVE, TableFormat.MIXED_ICEBERG)) {
subTableNames = mixedFormatSubTableNames();
}

for (String inspectTableName : subTableNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.amoro.table.TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT;
import static org.apache.iceberg.spark.Spark3Util.toTransforms;

import org.apache.amoro.TableFormat;
import org.apache.amoro.shade.guava32.com.google.common.base.Joiner;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.spark.table.MixedSparkTable;
Expand Down Expand Up @@ -186,12 +187,9 @@ public static Object convertConstant(Type type, Object value) {
}

public static String mixedTableProvider(MixedTable table) {
switch (table.format()) {
case MIXED_ICEBERG:
case MIXED_HIVE:
return table.format().name().toLowerCase(Locale.ROOT);
default:
throw new IllegalArgumentException("Not a mixed-format table:" + table.format());
if (table.format().in(TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE)) {
return table.format().name().toLowerCase(Locale.ROOT);
}
throw new IllegalArgumentException("Not a mixed-format table:" + table.format());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public void testSchemaAndData(
TableFiles files = TestTableUtil.files(table);
Asserts.assertAllFilesInBaseStore(files);

if (TableFormat.MIXED_HIVE == format) {
if (TableFormat.MIXED_HIVE.equals(format)) {
Table hiveTable = loadHiveTable();
Asserts.assertHiveColumns(expectSchema, ptSpec, hiveTable.getSd().getCols());
Asserts.assertHivePartition(ptSpec, hiveTable.getPartitionKeys());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public void testPartitionSpec(TableFormat format, String partitionDDL, Partition

MixedTable actualTable = loadTable();
Asserts.assertPartition(expectSpec, actualTable.spec());
if (TableFormat.MIXED_HIVE == format) {
if (TableFormat.MIXED_HIVE.equals(format)) {
Table hiveTable = loadHiveTable();
Asserts.assertHivePartition(expectSpec, hiveTable.getPartitionKeys());
}
Expand Down Expand Up @@ -303,7 +303,7 @@ public void testSchemaAndProperties(

Asserts.assertType(expectSchema.asStruct(), tbl.schema().asStruct());
Asserts.assertHashMapContainExpect(expectProperties, tbl.properties());
if (TableFormat.MIXED_HIVE == format) {
if (TableFormat.MIXED_HIVE.equals(format)) {
Table hiveTable = loadHiveTable();
Asserts.assertHiveColumns(
expectSchema, PartitionSpec.unpartitioned(), hiveTable.getSd().getCols());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public void testSchemaAndData(
TableFiles files = TestTableUtil.files(table);
Asserts.assertAllFilesInBaseStore(files);

if (TableFormat.MIXED_HIVE == format) {
if (TableFormat.MIXED_HIVE.equals(format)) {
Table hiveTable = loadHiveTable();
Asserts.assertHiveColumns(expectSchema, ptSpec, hiveTable.getSd().getCols());
Asserts.assertHivePartition(ptSpec, hiveTable.getPartitionKeys());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public void testPartitionSpec(TableFormat format, String partitionDDL, Partition

MixedTable actualTable = loadTable();
Asserts.assertPartition(expectSpec, actualTable.spec());
if (TableFormat.MIXED_HIVE == format) {
if (TableFormat.MIXED_HIVE.equals(format)) {
Table hiveTable = loadHiveTable();
Asserts.assertHivePartition(expectSpec, hiveTable.getPartitionKeys());
}
Expand Down Expand Up @@ -303,7 +303,7 @@ public void testSchemaAndProperties(

Asserts.assertType(expectSchema.asStruct(), tbl.schema().asStruct());
Asserts.assertHashMapContainExpect(expectProperties, tbl.properties());
if (TableFormat.MIXED_HIVE == format) {
if (TableFormat.MIXED_HIVE.equals(format)) {
Table hiveTable = loadHiveTable();
Asserts.assertHiveColumns(
expectSchema, PartitionSpec.unpartitioned(), hiveTable.getSd().getCols());
Expand Down

0 comments on commit 72528e5

Please sign in to comment.