diff --git a/amoro-common/src/main/java/org/apache/amoro/CommonUnifiedCatalog.java b/amoro-common/src/main/java/org/apache/amoro/CommonUnifiedCatalog.java index 84e8559724..79b6e55be8 100644 --- a/amoro-common/src/main/java/org/apache/amoro/CommonUnifiedCatalog.java +++ b/amoro-common/src/main/java/org/apache/amoro/CommonUnifiedCatalog.java @@ -35,27 +35,48 @@ public class CommonUnifiedCatalog implements UnifiedCatalog { + private final String catalogName; + private final String metaStoreType; private final Supplier metaSupplier; + // Client side catalog properties + private final Map clientProperties; + // Catalog properties after merging the server and client + private Map catalogProperties; private CatalogMeta meta; - private Map formatCatalogs = Maps.newHashMap(); - private final Map properties = Maps.newHashMap(); - private TableMetaStore tableMetaStore; + private Map formatCatalogs = Maps.newHashMap(); public CommonUnifiedCatalog( Supplier catalogMetaSupplier, Map properties) { CatalogMeta catalogMeta = catalogMetaSupplier.get(); CatalogUtil.mergeCatalogProperties(catalogMeta, properties); this.meta = catalogMeta; + this.catalogName = catalogMeta.getCatalogName(); + this.metaStoreType = catalogMeta.getCatalogType(); this.tableMetaStore = CatalogUtil.buildMetaStore(catalogMeta); - this.properties.putAll(properties); + this.clientProperties = properties; + this.catalogProperties = catalogMeta.getCatalogProperties(); this.metaSupplier = catalogMetaSupplier; initializeFormatCatalogs(); } + public CommonUnifiedCatalog( + String catalogName, + String metaStoreType, + Map properties, + TableMetaStore tableMetaStore) { + this.catalogName = catalogName; + this.metaStoreType = metaStoreType; + this.tableMetaStore = tableMetaStore; + this.clientProperties = properties; + this.catalogProperties = properties; + this.metaSupplier = null; + initializeFormatCatalogs(); + } + @Override public String metastoreType() { - return meta.getCatalogType(); + return metaStoreType; } @Override @@ -126,7 +147,7 @@ public AmoroTable loadTable(String database, String table) { @Override public String name() { - return this.meta.getCatalogName(); + return catalogName; } @Override @@ -158,7 +179,7 @@ public List listTables(String database) { tableName -> { TableFormat format = tableNameToFormat.get(tableName); return TableIDWithFormat.of( - TableIdentifier.of(this.meta.getCatalogName(), database, tableName), format); + TableIdentifier.of(catalogName, database, tableName), format); }) .collect(Collectors.toList()); } @@ -175,33 +196,34 @@ public boolean dropTable(String database, String table, boolean purge) { @Override public synchronized void refresh() { - CatalogMeta newMeta = metaSupplier.get(); - CatalogUtil.mergeCatalogProperties(meta, properties); - if (newMeta.equals(this.meta)) { - return; + if (metaSupplier != null) { + CatalogMeta newMeta = metaSupplier.get(); + CatalogUtil.mergeCatalogProperties(meta, clientProperties); + if (newMeta.equals(this.meta)) { + return; + } + this.catalogProperties = newMeta.getCatalogProperties(); + this.tableMetaStore = CatalogUtil.buildMetaStore(newMeta); + this.meta = newMeta; + this.initializeFormatCatalogs(); } - this.tableMetaStore = CatalogUtil.buildMetaStore(newMeta); - this.meta = newMeta; - this.initializeFormatCatalogs(); } @Override public Map properties() { - return this.meta.getCatalogProperties(); + return catalogProperties; } protected void initializeFormatCatalogs() { ServiceLoader loader = ServiceLoader.load(FormatCatalogFactory.class); - Set formats = CatalogUtil.tableFormats(this.meta); - TableMetaStore store = CatalogUtil.buildMetaStore(this.meta); + Set formats = CatalogUtil.tableFormats(metaStoreType, catalogProperties); Map formatCatalogs = Maps.newConcurrentMap(); for (FormatCatalogFactory factory : loader) { if (formats.contains(factory.format())) { - Map catalogProperties = - factory.convertCatalogProperties( - name(), meta.getCatalogType(), meta.getCatalogProperties()); + Map formatCatalogProperties = + factory.convertCatalogProperties(name(), metaStoreType, this.catalogProperties); FormatCatalog catalog = - factory.create(name(), meta.getCatalogType(), catalogProperties, store); + factory.create(name(), metaStoreType, formatCatalogProperties, tableMetaStore); formatCatalogs.put(factory.format(), catalog); } } diff --git a/amoro-common/src/main/java/org/apache/amoro/table/TableMetaStore.java b/amoro-common/src/main/java/org/apache/amoro/table/TableMetaStore.java index 9983b159c3..db6827bca5 100644 --- a/amoro-common/src/main/java/org/apache/amoro/table/TableMetaStore.java +++ b/amoro-common/src/main/java/org/apache/amoro/table/TableMetaStore.java @@ -19,7 +19,6 @@ package org.apache.amoro.table; import org.apache.amoro.properties.CatalogMetaProperties; -import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; import org.apache.amoro.shade.guava32.com.google.common.base.Charsets; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.base.Strings; @@ -39,7 +38,6 @@ import sun.security.krb5.KrbException; import java.io.ByteArrayInputStream; -import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.Serializable; @@ -48,6 +46,7 @@ import java.lang.reflect.Field; import java.net.MalformedURLException; import java.net.URL; +import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.security.PrivilegedAction; @@ -70,7 +69,7 @@ public class TableMetaStore implements Serializable { new ConcurrentHashMap<>(); public static final TableMetaStore EMPTY = - TableMetaStore.builder().withConfiguration(new Configuration()).buildForTest(); + TableMetaStore.builder().withConfiguration(new Configuration()).build(); public static final String HADOOP_CONF_DIR = "conf.hadoop.dir"; public static final String HIVE_SITE = "hive-site"; @@ -126,14 +125,12 @@ public class TableMetaStore implements Serializable { private final byte[] krbConf; private final String krbPrincipal; private final boolean disableAuth; + private final String accessKey; + private final String secretKey; private transient RuntimeContext runtimeContext; private transient String authInformation; - private String accessKey; - - private String secretKey; - public static Builder builder() { return new Builder(); } @@ -170,6 +167,21 @@ private TableMetaStore( this.secretKey = secretKey; } + private TableMetaStore(Configuration configuration) { + this.disableAuth = true; + this.metaStoreSite = null; + this.hdfsSite = null; + this.coreSite = null; + this.authMethod = null; + this.hadoopUsername = null; + this.krbKeyTab = null; + this.krbConf = null; + this.krbPrincipal = null; + this.accessKey = null; + this.secretKey = null; + getRuntimeContext().setConfiguration(configuration); + } + public byte[] getMetaStoreSite() { return metaStoreSite; } @@ -218,7 +230,7 @@ public synchronized Configuration getConfiguration() { return getRuntimeContext().getConfiguration(); } - public synchronized UserGroupInformation getUGI() { + private synchronized UserGroupInformation getUGI() { return getRuntimeContext().getUGI(); } @@ -567,7 +579,6 @@ public static class Builder { private byte[] krbKeyTab; private byte[] krbConf; private String krbPrincipal; - private String accessKey; private String secretKey; private boolean disableAuth = true; @@ -710,7 +721,7 @@ public Builder withConfiguration(Configuration configuration) { private byte[] readBytesFromFile(String filePath) { try { - return IOUtils.toByteArray(new FileInputStream(filePath)); + return IOUtils.toByteArray(Files.newInputStream(Paths.get(filePath))); } catch (IOException e) { throw new UncheckedIOException("Read config failed:" + filePath, e); } @@ -751,61 +762,46 @@ private void readProperties() { } public TableMetaStore build() { - readProperties(); - if (!disableAuth & !AUTH_METHOD_AK_SK.equals(authMethod)) { - Preconditions.checkNotNull(hdfsSite); - Preconditions.checkNotNull(coreSite); - } - if (AUTH_METHOD_SIMPLE.equals(authMethod)) { - Preconditions.checkNotNull(hadoopUsername); - } else if (AUTH_METHOD_KERBEROS.equals(authMethod)) { - Preconditions.checkNotNull(krbConf); - Preconditions.checkNotNull(krbKeyTab); - Preconditions.checkNotNull(krbPrincipal); - } else if (AUTH_METHOD_AK_SK.equals(authMethod)) { - Preconditions.checkNotNull(accessKey); - Preconditions.checkNotNull(secretKey); - } else if (authMethod != null) { - throw new IllegalArgumentException("Unsupported auth method:" + authMethod); - } + if (disableAuth && configuration != null) { + LOG.info("Build table meta store with local configuration:" + configuration); + return new TableMetaStore(configuration); + } else { + readProperties(); + if (!AUTH_METHOD_AK_SK.equals(authMethod)) { + Preconditions.checkNotNull(hdfsSite); + Preconditions.checkNotNull(coreSite); + } + if (AUTH_METHOD_SIMPLE.equals(authMethod)) { + Preconditions.checkNotNull(hadoopUsername); + } else if (AUTH_METHOD_KERBEROS.equals(authMethod)) { + Preconditions.checkNotNull(krbConf); + Preconditions.checkNotNull(krbKeyTab); + Preconditions.checkNotNull(krbPrincipal); + } else if (AUTH_METHOD_AK_SK.equals(authMethod)) { + Preconditions.checkNotNull(accessKey); + Preconditions.checkNotNull(secretKey); + } else if (authMethod != null) { + throw new IllegalArgumentException("Unsupported auth method:" + authMethod); + } - LOG.info( - "Construct TableMetaStore with authMethod:{}, hadoopUsername:{}, krbPrincipal:{}", - authMethod, - hadoopUsername, - krbPrincipal); - return new TableMetaStore( - metaStoreSite, - hdfsSite, - coreSite, - authMethod, - hadoopUsername, - krbKeyTab, - krbConf, - krbPrincipal, - accessKey, - secretKey, - disableAuth); - } - - @VisibleForTesting - public TableMetaStore buildForTest() { - readProperties(); - TableMetaStore tableMetaStore = - new TableMetaStore( - metaStoreSite, - hdfsSite, - coreSite, - authMethod, - hadoopUsername, - krbKeyTab, - krbConf, - krbPrincipal, - accessKey, - secretKey, - disableAuth); - tableMetaStore.getRuntimeContext().setConfiguration(configuration); - return tableMetaStore; + LOG.info( + "Build table meta store with configurations: authMethod:{}, hadoopUsername:{}, krbPrincipal:{}", + authMethod, + hadoopUsername, + krbPrincipal); + return new TableMetaStore( + metaStoreSite, + hdfsSite, + coreSite, + authMethod, + hadoopUsername, + krbKeyTab, + krbConf, + krbPrincipal, + accessKey, + secretKey, + disableAuth); + } } } } diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/mixed/CatalogLoader.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/mixed/CatalogLoader.java index 2b534937a1..3842e8e6af 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/mixed/CatalogLoader.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/mixed/CatalogLoader.java @@ -18,28 +18,24 @@ package org.apache.amoro.mixed; +import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE; + import org.apache.amoro.AmsClient; import org.apache.amoro.Constants; import org.apache.amoro.PooledAmsClient; import org.apache.amoro.TableFormat; -import org.apache.amoro.api.AmoroTableMetastore; import org.apache.amoro.api.CatalogMeta; import org.apache.amoro.api.NoSuchObjectException; -import org.apache.amoro.client.AmsClientPools; import org.apache.amoro.client.AmsThriftUrl; import org.apache.amoro.properties.CatalogMetaProperties; -import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; -import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; import org.apache.amoro.shade.thrift.org.apache.thrift.TException; import org.apache.amoro.table.TableMetaStore; import org.apache.amoro.utils.MixedFormatCatalogUtil; import org.apache.iceberg.common.DynConstructors; -import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; /** Catalogs, create mixed-format catalog from metastore thrift url. */ public class CatalogLoader { @@ -82,40 +78,24 @@ public static MixedFormatCatalog load(String catalogUrl) { * @return class name for catalog */ private static String catalogImpl(String metastoreType, Map catalogProperties) { - Set tableFormats = - MixedFormatCatalogUtil.tableFormats(metastoreType, catalogProperties); - Preconditions.checkArgument( - tableFormats.size() == 1, "Catalog support only one table format now."); - TableFormat tableFormat = tableFormats.iterator().next(); - Preconditions.checkArgument( - TableFormat.MIXED_HIVE == tableFormat || TableFormat.MIXED_ICEBERG == tableFormat, - "MixedCatalogLoader only support mixed-format, format: %s", - tableFormat.name()); - String catalogImpl; switch (metastoreType) { case CatalogMetaProperties.CATALOG_TYPE_HADOOP: case CatalogMetaProperties.CATALOG_TYPE_GLUE: case CatalogMetaProperties.CATALOG_TYPE_CUSTOM: - Preconditions.checkArgument( - TableFormat.MIXED_ICEBERG == tableFormat, - "%s catalog support mixed-iceberg table only.", - metastoreType); catalogImpl = MIXED_ICEBERG_CATALOG_IMP; break; case CatalogMetaProperties.CATALOG_TYPE_HIVE: - if (TableFormat.MIXED_HIVE == tableFormat) { + Set tableFormats = + MixedFormatCatalogUtil.tableFormats(metastoreType, catalogProperties); + if (tableFormats.contains(TableFormat.MIXED_HIVE)) { catalogImpl = HIVE_CATALOG_IMPL; } else { catalogImpl = MIXED_ICEBERG_CATALOG_IMP; } break; case CatalogMetaProperties.CATALOG_TYPE_AMS: - if (TableFormat.MIXED_ICEBERG == tableFormat) { - catalogImpl = INTERNAL_CATALOG_IMPL; - } else { - throw new IllegalArgumentException("Internal Catalog mixed-iceberg table only"); - } + catalogImpl = INTERNAL_CATALOG_IMPL; break; default: throw new IllegalStateException("unsupported metastore type:" + metastoreType); @@ -142,21 +122,6 @@ public static CatalogMeta loadMeta(String catalogUrl) { } } - /** - * Show catalog list in metastore. - * - * @param metastoreUrl url of ams - * @return catalog name list - */ - public static List catalogs(String metastoreUrl) { - try { - return ((AmoroTableMetastore.Iface) AmsClientPools.getClientPool(metastoreUrl).iface()) - .getCatalogs().stream().map(CatalogMeta::getCatalogName).collect(Collectors.toList()); - } catch (TException e) { - throw new IllegalStateException("failed when load catalogs", e); - } - } - /** * Entrypoint for loading catalog * @@ -200,25 +165,10 @@ public static MixedFormatCatalog createCatalog( Map properties, TableMetaStore metaStore) { String catalogImpl = catalogImpl(metastoreType, properties); - properties = - MixedFormatCatalogUtil.withIcebergCatalogInitializeProperties( - catalogName, metastoreType, properties); - MixedFormatCatalog catalog = buildCatalog(catalogImpl); - catalog.initialize(catalogName, properties, metaStore); - return catalog; - } - - @VisibleForTesting - public static MixedFormatCatalog createCatalog( - String catalogName, - String catalogImpl, - String metastoreType, - Map properties, - TableMetaStore metaStore) { - properties = - MixedFormatCatalogUtil.withIcebergCatalogInitializeProperties( - catalogName, metastoreType, properties); MixedFormatCatalog catalog = buildCatalog(catalogImpl); + if (!properties.containsKey(ICEBERG_CATALOG_TYPE)) { + properties.put(ICEBERG_CATALOG_TYPE, metastoreType); + } catalog.initialize(catalogName, properties, metaStore); return catalog; } diff --git a/amoro-format-iceberg/src/test/java/org/apache/amoro/TestUnifiedCatalog.java b/amoro-format-iceberg/src/test/java/org/apache/amoro/TestUnifiedCatalog.java index 5b0bca78c2..489eb8b479 100644 --- a/amoro-format-iceberg/src/test/java/org/apache/amoro/TestUnifiedCatalog.java +++ b/amoro-format-iceberg/src/test/java/org/apache/amoro/TestUnifiedCatalog.java @@ -20,7 +20,9 @@ import org.apache.amoro.api.CatalogMeta; import org.apache.amoro.catalog.CatalogTestHelper; +import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; +import org.apache.amoro.table.TableMetaStore; import org.junit.Assert; import org.junit.Before; import org.junit.ClassRule; @@ -60,11 +62,32 @@ public void setupCatalogMeta() { @Test public void testCatalogLoader() { - UnifiedCatalog catalog = + UnifiedCatalog unifiedCatalog = UnifiedCatalogLoader.loadUnifiedCatalog( testAms.getServerUrl(), meta.getCatalogName(), Maps.newHashMap()); + validateUnifiedCatalog(unifiedCatalog); + } + + @Test + public void testCreateUnifiedCatalog() { + UnifiedCatalog unifiedCatalog = + new CommonUnifiedCatalog( + meta.getCatalogName(), + meta.getCatalogType(), + meta.getCatalogProperties(), + TableMetaStore.EMPTY); + validateUnifiedCatalog(unifiedCatalog); + } + + private void validateUnifiedCatalog(UnifiedCatalog unifiedCatalog) { + Assert.assertNotNull(unifiedCatalog); + Assert.assertEquals(CommonUnifiedCatalog.class.getName(), unifiedCatalog.getClass().getName()); - Assert.assertNotNull(catalog); - Assert.assertEquals(CommonUnifiedCatalog.class.getName(), catalog.getClass().getName()); + unifiedCatalog.createDatabase(TableTestHelper.TEST_DB_NAME); + Assert.assertEquals( + Lists.newArrayList(TableTestHelper.TEST_DB_NAME), unifiedCatalog.listDatabases()); + Assert.assertEquals(0, unifiedCatalog.listTables(TableTestHelper.TEST_DB_NAME).size()); + unifiedCatalog.dropDatabase(TableTestHelper.TEST_DB_NAME); + unifiedCatalog.refresh(); } } diff --git a/amoro-format-iceberg/src/test/java/org/apache/amoro/catalog/TestCatalogLoader.java b/amoro-format-iceberg/src/test/java/org/apache/amoro/catalog/TestCatalogLoader.java index 5ebb8cda3a..b471492336 100644 --- a/amoro-format-iceberg/src/test/java/org/apache/amoro/catalog/TestCatalogLoader.java +++ b/amoro-format-iceberg/src/test/java/org/apache/amoro/catalog/TestCatalogLoader.java @@ -62,24 +62,6 @@ public void testLoadNotExistedCatalog() { () -> CatalogLoader.load(getCatalogUrl(TEST_CATALOG_NAME))); } - @Test - public void testLoadCatalogWithErrorFormat() { - Map properties = Maps.newHashMap(); - CatalogMeta catalogMeta = - CatalogTestHelpers.buildCatalogMeta( - TEST_CATALOG_NAME, - CatalogMetaProperties.CATALOG_TYPE_HADOOP, - properties, - TableFormat.MIXED_ICEBERG); - TEST_AMS.getAmsHandler().createCatalog(catalogMeta); - // lack warehouse - Assert.assertThrows( - "failed when load catalog test", - IllegalStateException.class, - () -> CatalogLoader.load(getCatalogUrl(TEST_CATALOG_NAME))); - TEST_AMS.getAmsHandler().dropCatalog(TEST_CATALOG_NAME); - } - private String getCatalogUrl(String catalogName) { return TEST_AMS.getServerUrl() + "/" + catalogName; } diff --git a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalogBase.java b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalogBase.java index e4662d9e92..aed47f5b8a 100644 --- a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalogBase.java +++ b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalogBase.java @@ -18,7 +18,10 @@ package org.apache.amoro.spark; +import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE; + import org.apache.amoro.AmoroTable; +import org.apache.amoro.CommonUnifiedCatalog; import org.apache.amoro.Constants; import org.apache.amoro.FormatCatalogFactory; import org.apache.amoro.TableFormat; @@ -29,7 +32,11 @@ import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap; import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; +import org.apache.amoro.table.TableMetaStore; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.spark.SparkUtil; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; @@ -60,6 +67,8 @@ public class SparkUnifiedCatalogBase implements TableCatalog, SupportsNamespaces, ProcedureCatalog { private static final Logger LOG = LoggerFactory.getLogger(SparkUnifiedCatalogBase.class); + + public static final String CATALOG_REGISTER_NAME = "register-name"; private static final Map defaultTableCatalogImplMap = ImmutableMap.of( TableFormat.ICEBERG, "org.apache.iceberg.spark.SparkCatalog", @@ -74,28 +83,40 @@ public class SparkUnifiedCatalogBase implements TableCatalog, SupportsNamespaces @Override public void initialize(String name, CaseInsensitiveStringMap options) { + this.name = name; Map properties = Maps.newHashMap(options); String uri = options.get(SparkUnifiedCatalogProperties.URI); properties.remove(SparkUnifiedCatalogProperties.URI); - Preconditions.checkNotNull(uri, "lack required option: %s", SparkUnifiedCatalogProperties.URI); - - AmsThriftUrl catalogUri = AmsThriftUrl.parse(uri, Constants.THRIFT_TABLE_SERVICE_NAME); - String registerCatalogName = catalogUri.catalogName(); - - if (StringUtils.isBlank(registerCatalogName)) { - registerCatalogName = name; - if (CatalogManager.SESSION_CATALOG_NAME().equalsIgnoreCase(registerCatalogName)) { - LOG.warn( - "Catalog name is not exists in catalog uri, using spark catalog as register catalog name, but " - + "current name " - + registerCatalogName - + " is spark session catalog name."); + if (StringUtils.isNotEmpty(uri)) { + AmsThriftUrl catalogUri = AmsThriftUrl.parse(uri, Constants.THRIFT_TABLE_SERVICE_NAME); + String registerCatalogName = catalogUri.catalogName(); + + if (StringUtils.isBlank(registerCatalogName)) { + registerCatalogName = name; + if (CatalogManager.SESSION_CATALOG_NAME().equalsIgnoreCase(registerCatalogName)) { + LOG.warn( + "Catalog name is not exists in catalog uri, using spark catalog as register catalog name, but " + + "current name " + + registerCatalogName + + " is spark session catalog name."); + } } + + this.unifiedCatalog = + UnifiedCatalogLoader.loadUnifiedCatalog( + catalogUri.serverUrl(), registerCatalogName, properties); + } else { + String metastoreType = properties.get(ICEBERG_CATALOG_TYPE); + Preconditions.checkArgument( + StringUtils.isNotEmpty(metastoreType), + "Lack required property: type when initializing unified spark catalog"); + Configuration localConfiguration = + SparkUtil.hadoopConfCatalogOverrides(SparkSession.active(), name); + TableMetaStore tableMetaStore = + TableMetaStore.builder().withConfiguration(localConfiguration).build(); + this.unifiedCatalog = + new CommonUnifiedCatalog(name, metastoreType, properties, tableMetaStore); } - this.name = name; - this.unifiedCatalog = - UnifiedCatalogLoader.loadUnifiedCatalog( - catalogUri.serverUrl(), registerCatalogName, properties); ServiceLoader sparkTableFormats = ServiceLoader.load(SparkTableFormat.class); for (SparkTableFormat format : sparkTableFormats) { tableFormats.put(format.format(), format); @@ -317,7 +338,7 @@ private TableCatalog initializeTableCatalog(TableFormat format) { if (tableCatalog instanceof SupportAuthentication) { ((SupportAuthentication) tableCatalog) .setAuthenticationContext(unifiedCatalog.authenticationContext()); - tableCatalogInitializeProperties.put("register-name", unifiedCatalog.name()); + tableCatalogInitializeProperties.put(CATALOG_REGISTER_NAME, unifiedCatalog.name()); } tableCatalog.initialize(name, new CaseInsensitiveStringMap(tableCatalogInitializeProperties)); return tableCatalog; diff --git a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSparkCatalogBase.java b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSparkCatalogBase.java index 8276a6a75d..1ae7e78e20 100644 --- a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSparkCatalogBase.java +++ b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSparkCatalogBase.java @@ -18,20 +18,23 @@ package org.apache.amoro.spark.mixed; +import static org.apache.amoro.spark.SparkUnifiedCatalogBase.CATALOG_REGISTER_NAME; import static org.apache.amoro.spark.mixed.SparkSQLProperties.REFRESH_CATALOG_BEFORE_USAGE; import static org.apache.amoro.spark.mixed.SparkSQLProperties.REFRESH_CATALOG_BEFORE_USAGE_DEFAULT; import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE; import org.apache.amoro.mixed.CatalogLoader; import org.apache.amoro.mixed.MixedFormatCatalog; -import org.apache.amoro.properties.CatalogMetaProperties; import org.apache.amoro.shade.guava32.com.google.common.base.Joiner; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; +import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; import org.apache.amoro.spark.SupportAuthentication; import org.apache.amoro.table.TableIdentifier; import org.apache.amoro.table.TableMetaStore; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.spark.SparkUtil; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; import org.apache.spark.sql.connector.catalog.Identifier; @@ -65,30 +68,34 @@ public void setAuthenticationContext(TableMetaStore tableMetaStore) { @Override public final void initialize(String name, CaseInsensitiveStringMap options) { this.catalogName = name; - String catalogUrl = options.get(CatalogMetaProperties.AMS_URI); - if (StringUtils.isNotBlank(catalogUrl)) { - // initialize for unified catalog. - String metastoreType = options.get(ICEBERG_CATALOG_TYPE); - String registerName = options.get("register-name"); - Preconditions.checkArgument( - StringUtils.isNotEmpty(metastoreType), - "Lack required property: type when initialized by unified catalog."); - Preconditions.checkNotNull( - tableMetaStore, - "Authentication context must be set when initialized by unified catalog."); - Preconditions.checkArgument( - StringUtils.isNotEmpty(registerName), - "Lack required property: register-name when initialized by unified catalog"); - catalog = CatalogLoader.createCatalog(registerName, metastoreType, options, tableMetaStore); - } else { - catalogUrl = options.get("url"); + Map properties = Maps.newHashMap(options); + if (tableMetaStore == null) { + String catalogUrl = options.get("url"); if (StringUtils.isBlank(catalogUrl)) { catalogUrl = options.get("uri"); } + if (catalogUrl != null) { + catalog = CatalogLoader.load(catalogUrl, properties); + } else { + Configuration localConfiguration = + SparkUtil.hadoopConfCatalogOverrides(SparkSession.active(), name); + tableMetaStore = TableMetaStore.builder().withConfiguration(localConfiguration).build(); + } + } + if (catalog == null) { + String metastoreType = options.get(ICEBERG_CATALOG_TYPE); + String registerName = options.get(CATALOG_REGISTER_NAME); Preconditions.checkArgument( - StringUtils.isNotBlank(catalogUrl), "lack required properties: url"); - - catalog = CatalogLoader.load(catalogUrl, options); + StringUtils.isNotEmpty(metastoreType), + "Lack required property: type when initializing mixed spark catalog"); + Preconditions.checkNotNull( + tableMetaStore, "Lack authentication context when initializing mixed spark catalog"); + catalog = + CatalogLoader.createCatalog( + registerName == null ? catalogName : registerName, + metastoreType, + properties, + tableMetaStore); } this.options = options; } diff --git a/amoro-format-mixed/amoro-mixed-spark/v3.2/amoro-mixed-spark-3.2/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalog.java b/amoro-format-mixed/amoro-mixed-spark/v3.2/amoro-mixed-spark-3.2/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalog.java index 6a9e840bd6..4dd464d554 100644 --- a/amoro-format-mixed/amoro-mixed-spark/v3.2/amoro-mixed-spark-3.2/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalog.java +++ b/amoro-format-mixed/amoro-mixed-spark/v3.2/amoro-mixed-spark-3.2/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalog.java @@ -28,7 +28,6 @@ import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; import org.apache.spark.sql.connector.iceberg.catalog.ProcedureCatalog; -/** @Auth: hzwangtao6 @Time: 2024/5/24 14:27 @Description: */ public class SparkUnifiedCatalog extends SparkUnifiedCatalogBase implements TableCatalog, SupportsNamespaces, ProcedureCatalog, FunctionCatalog { /** diff --git a/amoro-format-mixed/amoro-mixed-spark/v3.2/amoro-mixed-spark-runtime-3.2/pom.xml b/amoro-format-mixed/amoro-mixed-spark/v3.2/amoro-mixed-spark-runtime-3.2/pom.xml index 642010ae93..acce293f81 100644 --- a/amoro-format-mixed/amoro-mixed-spark/v3.2/amoro-mixed-spark-runtime-3.2/pom.xml +++ b/amoro-format-mixed/amoro-mixed-spark/v3.2/amoro-mixed-spark-runtime-3.2/pom.xml @@ -60,7 +60,9 @@ org.apache.amoro:amoro-format-mixed-spark-3-common org.apache.amoro:amoro-format-mixed-spark-3.2 - org.apache.amoro:amoro-core + org.apache.amoro:amoro-common + org.apache.amoro:amoro-format-iceberg + org.apache.amoro:amoro-mixed-hive org.apache.amoro:amoro-shade-guava-32 org.apache.amoro:amoro-shade-jackson-2 org.apache.amoro:amoro-shade-thrift @@ -110,6 +112,12 @@ org/apache/iceberg/spark/extensions/** + + org.apache.iceberg:iceberg-spark-3.2_2.12 + + META-INF/services/org.apache.spark.sql.sources.DataSourceRegister + + diff --git a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalog.java b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalog.java index 44c8133000..09fbfeb5a0 100644 --- a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalog.java +++ b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-3.3/src/main/java/org/apache/amoro/spark/SparkUnifiedCatalog.java @@ -31,7 +31,6 @@ import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; import org.apache.spark.sql.connector.iceberg.catalog.ProcedureCatalog; -/** @Auth: hzwangtao6 @Time: 2024/5/24 14:27 @Description: */ public class SparkUnifiedCatalog extends SparkUnifiedCatalogBase implements TableCatalog, SupportsNamespaces, ProcedureCatalog, FunctionCatalog { diff --git a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-runtime-3.3/pom.xml b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-runtime-3.3/pom.xml index 33ebd34400..3af0cf6de2 100644 --- a/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-runtime-3.3/pom.xml +++ b/amoro-format-mixed/amoro-mixed-spark/v3.3/amoro-mixed-spark-runtime-3.3/pom.xml @@ -60,8 +60,9 @@ org.apache.amoro:amoro-format-mixed-spark-3-common org.apache.amoro:amoro-format-mixed-spark-3.3 - org.apache.amoro:amoro-core - org.apache.amoro:amoro-format-mixed-hive + org.apache.amoro:amoro-common + org.apache.amoro:amoro-format-iceberg + org.apache.amoro:amoro-mixed-hive org.apache.amoro:amoro-shade-guava-32 org.apache.amoro:amoro-shade-jackson-2 org.apache.amoro:amoro-shade-thrift @@ -115,6 +116,12 @@ org/apache/iceberg/spark/extensions/** + + org.apache.iceberg:iceberg-spark-3.3_2.12 + + META-INF/services/org.apache.spark.sql.sources.DataSourceRegister + + diff --git a/docs/engines/spark/spark-conf.md b/docs/engines/spark/spark-conf.md index fd5d978543..6afdb65bc2 100644 --- a/docs/engines/spark/spark-conf.md +++ b/docs/engines/spark/spark-conf.md @@ -31,13 +31,22 @@ menu: ### Using Mixed-Format in a standalone catalog Starting from version 3.x, Spark supports configuring an independent Catalog. -If you want to use a Mixed-Format table in a standalone Catalog, you can configure it as follows: +If you want to use a Mixed-Format table in a standalone Catalog, you can create a mixed_catalog and load catalog +metadata from AMS with following properties: ```properties spark.sql.catalog.mixed_catalog=org.apache.amoro.spark.MixedFormatSparkCatalog spark.sql.catalog.mixed_catalog.url=thrift://${AMS_HOST}:${AMS_PORT}/${AMS_CATALOG_NAME_HIVE} ``` +Or create a mixed_catalog with local configurations with following properties: +```properties +spark.sql.catalog.mixed_catalog=org.apache.amoro.spark.MixedFormatSparkCatalog +# Configure mixed catalog type as you needed +spark.sql.catalog.mixed_catalog.type=hadoop +spark.sql.catalog.mixed_catalog.warehouse=/warehouse/hadoop_mixed_catalog +``` + Then, execute the following SQL in the Spark SQL Client to switch to the corresponding catalog. ```sql @@ -47,7 +56,7 @@ use mixed_catalog; Of course, you can also access Mixed-Format tables by directly using the triplet `mixed_catalog.{db_name}.{table_name}`. -You can also set Spark's default Catalog to your configured Catalog using the following properties. +You can also set Spark's default catalog to your configured catalog using the following properties. In this way, you don't need to use the `use {catalog}` command to switch the default catalog. ```properties