Skip to content

Commit

Permalink
Make TableFormat extensible| compile errors
Browse files Browse the repository at this point in the history
  • Loading branch information
baiyangtx committed Sep 9, 2024
1 parent 7db2d95 commit ecb5690
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ public void getTableList(Context ctx) {
} else if (format.equals(TableFormat.ICEBERG)) {
return TableMeta.TableType.HUDI.toString();
} else {
throw new IllegalStateException("Unknown format");
return format.toString();
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,18 +174,14 @@ protected void createTable() {
TableMetadata tableMetadata = tableMetadata();
tableService().createTable(catalogMeta.getCatalogName(), tableMetadata);
} else {
switch (catalogTestHelper.tableFormat()) {
case ICEBERG:
createIcebergTable();
break;
case MIXED_ICEBERG:
createMixedIcebergTable();
break;
case MIXED_HIVE:
createMixedHiveTable();
break;
default:
throw new IllegalStateException("un-support format");
if (catalogTestHelper.tableFormat().equals(TableFormat.ICEBERG)) {
createIcebergTable();
} else if (catalogTestHelper.tableFormat().equals(TableFormat.MIXED_ICEBERG)) {
createMixedIcebergTable();
} else if (catalogTestHelper.tableFormat().equals(TableFormat.MIXED_HIVE)) {
createMixedHiveTable();
} else {
throw new IllegalStateException("un-support format");
}
tableService().exploreExternalCatalog();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.amoro.catalog;

import org.apache.amoro.TableFormat;
import org.apache.amoro.TableTestHelper;
import org.apache.amoro.table.MixedTable;
import org.apache.amoro.table.TableBuilder;
Expand All @@ -44,14 +45,11 @@ public void setupTable() {
this.tableMetaStore = MixedFormatCatalogUtil.buildMetaStore(getCatalogMeta());

getUnifiedCatalog().createDatabase(TableTestHelper.TEST_DB_NAME);
switch (getTestFormat()) {
case MIXED_HIVE:
case MIXED_ICEBERG:
createMixedFormatTable();
break;
case ICEBERG:
createIcebergFormatTable();
break;
TableFormat format = getTestFormat();
if (TableFormat.MIXED_HIVE.equals(format) || TableFormat.MIXED_ICEBERG.equals(format)) {
createMixedFormatTable();
} else if (TableFormat.ICEBERG.equals(format)) {
createIcebergFormatTable();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
Configuration configuration = new Configuration();
table.getOptions().forEach(configuration::setString);
TableFormat format = configuration.get(TABLE_FORMAT);
TableFormat format = TableFormat.valueOf(configuration.get(TABLE_FORMAT));
TableIdentifier tableIdentifier =
TableIdentifier.of(
unifiedCatalog.name(), tablePath.getDatabaseName(), tablePath.getObjectName());
Expand Down Expand Up @@ -464,25 +464,19 @@ private AmoroTable<?> loadAmoroTable(ObjectPath tablePath) {
private AbstractCatalog createOriginalCatalog(
TableIdentifier tableIdentifier, TableFormat tableFormat) {
CatalogFactory catalogFactory;

switch (tableFormat) {
case MIXED_ICEBERG:
case MIXED_HIVE:
catalogFactory = new MixedCatalogFactory();
break;
case ICEBERG:
catalogFactory = new IcebergFlinkCatalogFactory(hadoopConf);
break;
case PAIMON:
catalogFactory =
new PaimonFlinkCatalogFactory(
unifiedCatalog.properties(), unifiedCatalog.metastoreType());
break;
default:
throw new UnsupportedOperationException(
String.format(
"Unsupported table format: [%s] in the unified catalog, table identifier is [%s], the supported table formats are [%s].",
tableFormat, tableIdentifier, FlinkUnifiedCatalogFactory.SUPPORTED_FORMATS));
if (tableFormat.in(TableFormat.MIXED_HIVE, TableFormat.MIXED_ICEBERG)) {
catalogFactory = new MixedCatalogFactory();
} else if (tableFormat.equals(TableFormat.ICEBERG)) {
catalogFactory = new IcebergFlinkCatalogFactory(hadoopConf);
} else if (tableFormat.equals(TableFormat.PAIMON)) {
catalogFactory =
new PaimonFlinkCatalogFactory(
unifiedCatalog.properties(), unifiedCatalog.metastoreType());
} else {
throw new UnsupportedOperationException(
String.format(
"Unsupported table format: [%s] in the unified catalog, table identifier is [%s], the supported table formats are [%s].",
tableFormat, tableIdentifier, FlinkUnifiedCatalogFactory.SUPPORTED_FORMATS));
}

AbstractCatalog originalCatalog;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {
ObjectIdentifier identifier = context.getObjectIdentifier();
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
Configuration options = (Configuration) helper.getOptions();
TableFormat tableFormat = options.get(MixedFormatValidator.TABLE_FORMAT);
TableFormat tableFormat = TableFormat.valueOf(options.get(MixedFormatValidator.TABLE_FORMAT));

return getOriginalCatalog(tableFormat)
.flatMap(AbstractCatalog::getFactory)
Expand All @@ -76,7 +76,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
ObjectIdentifier identifier = context.getObjectIdentifier();
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
Configuration options = (Configuration) helper.getOptions();
TableFormat tableFormat = options.get(MixedFormatValidator.TABLE_FORMAT);
TableFormat tableFormat = TableFormat.valueOf(options.get(MixedFormatValidator.TABLE_FORMAT));

return getOriginalCatalog(tableFormat)
.flatMap(AbstractCatalog::getFactory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,18 +283,18 @@ public class MixedFormatValidator extends ConnectorDescriptorValidator {
+ " of the key. Default is -1, means it is automatically determined: every shard will be at least 512KB and"
+ " number of shard bits will not exceed 6.");

public static final ConfigOption<TableFormat> TABLE_FORMAT =
public static final ConfigOption<String> TABLE_FORMAT =
ConfigOptions.key("table.format")
.enumType(TableFormat.class)
.defaultValue(TableFormat.MIXED_ICEBERG)
.stringType()
.defaultValue(TableFormat.MIXED_ICEBERG.name())
.withDescription(
String.format(
"The format of the table, valid values are %s, %s, %s or %s, and Flink choose '%s' as default format.",
TableFormat.ICEBERG,
TableFormat.MIXED_ICEBERG,
TableFormat.MIXED_HIVE,
TableFormat.PAIMON,
TableFormat.MIXED_ICEBERG));
TableFormat.ICEBERG.name(),
TableFormat.MIXED_ICEBERG.name(),
TableFormat.MIXED_HIVE.name(),
TableFormat.PAIMON.name(),
TableFormat.MIXED_ICEBERG.name()));

public static final ConfigOption<Integer> SCAN_PARALLELISM =
ConfigOptions.key("source.parallelism")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,30 +93,28 @@ private String selectMixedCatalogByFormat(ExtensionContext context, ExtensionReg
Preconditions.condition(
format == TableFormat.MIXED_ICEBERG || format == TableFormat.MIXED_HIVE,
"must be a mixed-format");
switch (format) {
case MIXED_ICEBERG:
return SparkTestContext.SparkCatalogNames.MIXED_ICEBERG;
case MIXED_HIVE:
return SparkTestContext.SparkCatalogNames.MIXED_HIVE;
default:
throw new IllegalArgumentException("must be a mixed-format");
if (TableFormat.MIXED_HIVE.equals(format)) {
return SparkTestContext.SparkCatalogNames.MIXED_HIVE;
} else if (TableFormat.MIXED_ICEBERG.equals(format)) {
return SparkTestContext.SparkCatalogNames.MIXED_ICEBERG;
} else {
throw new IllegalArgumentException("must be a mixed-format");
}
}

private String selectUnifiedCatalogByFormat(
ExtensionContext context, ExtensionRegistry registry) {
TableFormat format = formatFromMethodArgs(context, registry);
switch (format) {
case MIXED_ICEBERG:
return SparkTestContext.SparkCatalogNames.UNIFIED_MIXED_ICEBERG;
case MIXED_HIVE:
return SparkTestContext.SparkCatalogNames.UNIFIED_MIXED_HIVE;
case ICEBERG:
return SparkTestContext.SparkCatalogNames.UNIFIED_ICEBERG;
case PAIMON:
return SparkTestContext.SparkCatalogNames.UNIFIED_PAIMON;
default:
throw new IllegalArgumentException("unknown format");
if (TableFormat.MIXED_HIVE.equals(format)) {
return SparkTestContext.SparkCatalogNames.UNIFIED_MIXED_HIVE;
} else if (TableFormat.MIXED_ICEBERG.equals(format)) {
return SparkTestContext.SparkCatalogNames.UNIFIED_MIXED_ICEBERG;
} else if (TableFormat.ICEBERG.equals(format)) {
return SparkTestContext.SparkCatalogNames.UNIFIED_ICEBERG;
} else if (TableFormat.PAIMON.equals(format)) {
return SparkTestContext.SparkCatalogNames.UNIFIED_PAIMON;
} else {
throw new IllegalArgumentException("must be a mixed-format");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.amoro.table.TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT;
import static org.apache.iceberg.spark.Spark3Util.toTransforms;

import org.apache.amoro.TableFormat;
import org.apache.amoro.shade.guava32.com.google.common.base.Joiner;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.spark.table.MixedSparkTable;
Expand Down Expand Up @@ -186,12 +187,10 @@ public static Object convertConstant(Type type, Object value) {
}

public static String mixedTableProvider(MixedTable table) {
switch (table.format()) {
case MIXED_ICEBERG:
case MIXED_HIVE:
return table.format().name().toLowerCase(Locale.ROOT);
default:
throw new IllegalArgumentException("Not a mixed-format table:" + table.format());
if (table.format().in(TableFormat.MIXED_HIVE, TableFormat.MIXED_ICEBERG)) {
return table.format().name().toLowerCase(Locale.ROOT);
} else {
throw new IllegalArgumentException("Not a mixed-format table:" + table.format());
}
}
}

0 comments on commit ecb5690

Please sign in to comment.