From a1a12196e98dac3cd12c6ca60c46563f19c69ab1 Mon Sep 17 00:00:00 2001 From: ZhouJinsong Date: Fri, 6 Sep 2024 10:24:28 +0800 Subject: [PATCH] [AMORO-3176] Support multiple format in internal catalog (#3177) Support multiple format in internal catalog --- .../amoro/server/RestCatalogService.java | 5 +- .../amoro/server/catalog/CatalogBuilder.java | 13 +- .../server/catalog/InternalCatalogImpl.java | 260 ++++++++++++++++++ .../catalog/InternalIcebergCatalogImpl.java | 169 ------------ .../catalog/InternalMixedCatalogImpl.java | 164 ----------- .../server/catalog/MixedHiveCatalogImpl.java | 137 --------- .../server/terminal/TerminalManager.java | 14 +- .../mixed/MixedIcebergCatalogFactory.java | 9 +- .../amoro/mixed/BasicMixedIcebergCatalog.java | 11 +- .../amoro/hive/catalog/MixedHiveCatalog.java | 2 +- .../spark/mixed/MixedSparkCatalogBase.java | 6 +- 11 files changed, 292 insertions(+), 498 deletions(-) create mode 100644 amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalogImpl.java delete mode 100644 amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalIcebergCatalogImpl.java delete mode 100644 amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalMixedCatalogImpl.java delete mode 100644 amoro-ams/src/main/java/org/apache/amoro/server/catalog/MixedHiveCatalogImpl.java diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/RestCatalogService.java b/amoro-ams/src/main/java/org/apache/amoro/server/RestCatalogService.java index 2a08824137..5c37d6ca8a 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/RestCatalogService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/RestCatalogService.java @@ -437,9 +437,8 @@ private InternalCatalog getCatalog(String catalog) { internalCatalog instanceof InternalCatalog, "The catalog is not an iceberg rest catalog"); Set tableFormats = CatalogUtil.tableFormats(internalCatalog.getMetadata()); Preconditions.checkArgument( - tableFormats.size() == 1 - && (tableFormats.contains(TableFormat.ICEBERG) - || tableFormats.contains(TableFormat.MIXED_ICEBERG)), + tableFormats.contains(TableFormat.ICEBERG) + || tableFormats.contains(TableFormat.MIXED_ICEBERG), "The catalog is not an iceberg rest catalog"); return (InternalCatalog) internalCatalog; } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/CatalogBuilder.java b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/CatalogBuilder.java index a2e1b27b5b..40d659ecfa 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/CatalogBuilder.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/CatalogBuilder.java @@ -74,11 +74,10 @@ public static ServerCatalog buildServerCatalog( formatSupportedMatrix.containsKey(type), "unsupported catalog type: %s", type); Set supportedFormats = formatSupportedMatrix.get(type); - TableFormat tableFormat = tableFormats.iterator().next(); Preconditions.checkState( - supportedFormats.contains(tableFormat), + supportedFormats.containsAll(tableFormats), "Table format %s is not supported for metastore type: %s", - tableFormat, + tableFormats, type); switch (type) { @@ -91,13 +90,7 @@ public static ServerCatalog buildServerCatalog( catalogMeta.getCatalogProperties().put(CatalogMetaProperties.AMS_URI, amsUri); return new ExternalCatalog(catalogMeta); case CATALOG_TYPE_AMS: - if (tableFormat.equals(TableFormat.MIXED_ICEBERG)) { - return new InternalMixedCatalogImpl(catalogMeta, serverConfiguration); - } else if (tableFormat.equals(TableFormat.ICEBERG)) { - return new InternalIcebergCatalogImpl(catalogMeta, serverConfiguration); - } else { - throw new IllegalStateException("AMS catalog support iceberg/mixed-iceberg table only."); - } + return new InternalCatalogImpl(catalogMeta, serverConfiguration); default: throw new IllegalStateException("unsupported catalog type:" + type); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalogImpl.java b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalogImpl.java new file mode 100644 index 0000000000..b812c61da2 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalogImpl.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.catalog; + +import static org.apache.amoro.server.table.internal.InternalTableConstants.CHANGE_STORE_TABLE_NAME_SUFFIX; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalListener; +import org.apache.amoro.AmoroTable; +import org.apache.amoro.TableFormat; +import org.apache.amoro.api.CatalogMeta; +import org.apache.amoro.config.Configurations; +import org.apache.amoro.formats.iceberg.IcebergTable; +import org.apache.amoro.io.AuthenticatedFileIO; +import org.apache.amoro.mixed.InternalMixedIcebergCatalog; +import org.apache.amoro.server.AmoroManagementConf; +import org.apache.amoro.server.RestCatalogService; +import org.apache.amoro.server.exception.ObjectNotExistsException; +import org.apache.amoro.server.table.TableMetadata; +import org.apache.amoro.server.table.internal.InternalIcebergCreator; +import org.apache.amoro.server.table.internal.InternalIcebergHandler; +import org.apache.amoro.server.table.internal.InternalMixedIcebergCreator; +import org.apache.amoro.server.table.internal.InternalMixedIcebergHandler; +import org.apache.amoro.server.table.internal.InternalTableCreator; +import org.apache.amoro.server.table.internal.InternalTableHandler; +import org.apache.amoro.server.utils.InternalTableUtil; +import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; +import org.apache.amoro.table.BasicKeyedTable; +import org.apache.amoro.table.BasicUnkeyedTable; +import org.apache.amoro.table.MixedTable; +import org.apache.amoro.table.PrimaryKeySpec; +import org.apache.amoro.utils.CatalogUtil; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.rest.RESTCatalog; +import org.apache.iceberg.rest.requests.CreateTableRequest; + +public class InternalCatalogImpl extends InternalCatalog { + + private static final String URI = "uri"; + + final int httpPort; + final String exposedHost; + + final Cache, FileIO> fileIOCloser; + + protected InternalCatalogImpl(CatalogMeta metadata, Configurations serverConfiguration) { + super(metadata); + this.httpPort = serverConfiguration.getInteger(AmoroManagementConf.HTTP_SERVER_PORT); + this.exposedHost = serverConfiguration.getString(AmoroManagementConf.SERVER_EXPOSE_HOST); + this.fileIOCloser = newFileIOCloser(); + } + + @Override + public CatalogMeta getMetadata() { + CatalogMeta meta = super.getMetadata(); + if (!meta.getCatalogProperties().containsKey(URI)) { + meta.putToCatalogProperties(URI, defaultRestURI()); + } + meta.putToCatalogProperties(CatalogProperties.CATALOG_IMPL, RESTCatalog.class.getName()); + return meta.deepCopy(); + } + + @Override + public void updateMetadata(CatalogMeta metadata) { + String defaultUrl = defaultRestURI(); + String uri = metadata.getCatalogProperties().getOrDefault(URI, defaultUrl); + if (defaultUrl.equals(uri)) { + metadata.getCatalogProperties().remove(URI); + } + super.updateMetadata(metadata); + } + + @Override + public AmoroTable loadTable(String database, String tableName) { + Preconditions.checkArgument( + !isChangeStoreName(tableName), "table name is invalid for load table"); + + InternalTableHandler handler; + try { + handler = newTableHandler(database, tableName); + } catch (ObjectNotExistsException e) { + return null; + } + if (TableFormat.ICEBERG.equals(handler.tableMetadata().getFormat())) { + return loadIcebergTable(database, tableName, handler); + } else if (TableFormat.MIXED_ICEBERG.equals(handler.tableMetadata().getFormat())) { + return loadMixedIcebergTable(database, tableName, handler); + } else { + throw new IllegalArgumentException( + "Unsupported table format:" + handler.tableMetadata().getFormat()); + } + } + + private AmoroTable loadIcebergTable( + String database, String tableName, InternalTableHandler handler) { + TableMetadata tableMetadata = handler.tableMetadata(); + TableOperations ops = handler.newTableOperator(); + + BaseTable table = + new BaseTable( + ops, + TableIdentifier.of( + tableMetadata.getTableIdentifier().getDatabase(), + tableMetadata.getTableIdentifier().getTableName()) + .toString()); + org.apache.amoro.table.TableIdentifier tableIdentifier = + org.apache.amoro.table.TableIdentifier.of(name(), database, tableName); + AmoroTable amoroTable = + IcebergTable.newIcebergTable( + tableIdentifier, + table, + CatalogUtil.buildMetaStore(getMetadata()), + getMetadata().getCatalogProperties()); + fileIOCloser.put(amoroTable, ops.io()); + return amoroTable; + } + + private AmoroTable loadMixedIcebergTable( + String database, String tableName, InternalTableHandler handler) { + TableMetadata tableMetadata = handler.tableMetadata(); + org.apache.amoro.table.TableIdentifier tableIdentifier = + org.apache.amoro.table.TableIdentifier.of(name(), database, tableName); + AuthenticatedFileIO fileIO = InternalTableUtil.newIcebergFileIo(getMetadata()); + MixedTable mixedIcebergTable; + + BaseTable baseTable = loadTableStore(tableMetadata, false); + if (InternalTableUtil.isKeyedMixedTable(tableMetadata)) { + BaseTable changeTable = loadTableStore(tableMetadata, true); + + PrimaryKeySpec.Builder keySpecBuilder = PrimaryKeySpec.builderFor(baseTable.schema()); + tableMetadata.buildTableMeta().getKeySpec().getFields().forEach(keySpecBuilder::addColumn); + PrimaryKeySpec keySpec = keySpecBuilder.build(); + + mixedIcebergTable = + new BasicKeyedTable( + tableMetadata.getTableLocation(), + keySpec, + new BasicKeyedTable.BaseInternalTable( + tableIdentifier, baseTable, fileIO, getMetadata().getCatalogProperties()), + new BasicKeyedTable.ChangeInternalTable( + tableIdentifier, changeTable, fileIO, getMetadata().getCatalogProperties())); + } else { + mixedIcebergTable = + new BasicUnkeyedTable( + tableIdentifier, baseTable, fileIO, getMetadata().getCatalogProperties()); + } + AmoroTable amoroTable = + new org.apache.amoro.formats.mixed.MixedTable(mixedIcebergTable, TableFormat.MIXED_ICEBERG); + fileIOCloser.put(amoroTable, fileIO); + return amoroTable; + } + + private BaseTable loadTableStore(TableMetadata tableMetadata, boolean isChangeStore) { + TableOperations ops = newTableStoreHandler(tableMetadata, isChangeStore).newTableOperator(); + return new BaseTable( + ops, + TableIdentifier.of( + tableMetadata.getTableIdentifier().getDatabase(), + tableMetadata.getTableIdentifier().getTableName()) + .toString()); + } + + private String defaultRestURI() { + return "http://" + exposedHost + ":" + httpPort + RestCatalogService.ICEBERG_REST_API_PREFIX; + } + + @Override + public InternalTableCreator newTableCreator( + String database, String tableName, TableFormat format, CreateTableRequest creatorArguments) { + if (tableExists(database, tableName)) { + throw new AlreadyExistsException( + "Table " + name() + "." + database + "." + tableName + " already exists."); + } + if (TableFormat.ICEBERG.equals(format)) { + return new InternalIcebergCreator(getMetadata(), database, tableName, creatorArguments); + } else if (TableFormat.MIXED_ICEBERG.equals(format)) { + return new InternalMixedIcebergCreator(getMetadata(), database, tableName, creatorArguments); + } else { + throw new IllegalArgumentException("Unsupported table format:" + format); + } + } + + @Override + @SuppressWarnings("unchecked") + public InternalTableHandler newTableHandler(String database, String tableName) { + String realTableName = realTableName(tableName); + TableMetadata metadata = loadTableMetadata(database, realTableName); + if (TableFormat.ICEBERG.equals(metadata.getFormat())) { + return new InternalIcebergHandler(getMetadata(), metadata); + } else if (TableFormat.MIXED_ICEBERG.equals(metadata.getFormat())) { + boolean isChangeStore = isChangeStoreName(tableName); + return newTableStoreHandler(metadata, isChangeStore); + } else { + throw new IllegalArgumentException("Unsupported table format:" + metadata.getFormat()); + } + } + + private String realTableName(String tableStoreName) { + if (isChangeStoreName(tableStoreName)) { + return tableStoreName.substring( + 0, tableStoreName.length() - CHANGE_STORE_TABLE_NAME_SUFFIX.length()); + } + return tableStoreName; + } + + private boolean isChangeStoreName(String tableName) { + String separator = InternalMixedIcebergCatalog.CHANGE_STORE_SEPARATOR; + if (!tableName.contains(separator)) { + return false; + } + Preconditions.checkArgument( + tableName.indexOf(separator) == tableName.lastIndexOf(separator) + && tableName.endsWith(CHANGE_STORE_TABLE_NAME_SUFFIX), + "illegal table name: %s, %s is not allowed in table name.", + tableName, + separator); + + return true; + } + + private InternalTableHandler newTableStoreHandler( + TableMetadata metadata, boolean isChangeStore) { + return new InternalMixedIcebergHandler(getMetadata(), metadata, isChangeStore); + } + + private Cache, FileIO> newFileIOCloser() { + return Caffeine.newBuilder() + .weakKeys() + .removalListener( + (RemovalListener, FileIO>) + (tbl, fileIO, cause) -> { + if (null != fileIO) { + fileIO.close(); + } + }) + .build(); + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalIcebergCatalogImpl.java b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalIcebergCatalogImpl.java deleted file mode 100644 index bfc4a70b83..0000000000 --- a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalIcebergCatalogImpl.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.server.catalog; - -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.RemovalListener; -import org.apache.amoro.AmoroTable; -import org.apache.amoro.TableFormat; -import org.apache.amoro.api.CatalogMeta; -import org.apache.amoro.config.Configurations; -import org.apache.amoro.formats.iceberg.IcebergTable; -import org.apache.amoro.io.AuthenticatedFileIO; -import org.apache.amoro.server.AmoroManagementConf; -import org.apache.amoro.server.RestCatalogService; -import org.apache.amoro.server.exception.ObjectNotExistsException; -import org.apache.amoro.server.table.TableMetadata; -import org.apache.amoro.server.table.internal.InternalIcebergCreator; -import org.apache.amoro.server.table.internal.InternalIcebergHandler; -import org.apache.amoro.server.table.internal.InternalTableCreator; -import org.apache.amoro.server.table.internal.InternalTableHandler; -import org.apache.amoro.server.utils.InternalTableUtil; -import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; -import org.apache.amoro.utils.CatalogUtil; -import org.apache.iceberg.BaseTable; -import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.TableOperations; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.exceptions.AlreadyExistsException; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.rest.RESTCatalog; -import org.apache.iceberg.rest.requests.CreateTableRequest; - -public class InternalIcebergCatalogImpl extends InternalCatalog { - - private static final String URI = "uri"; - - final int httpPort; - final String exposedHost; - - final Cache, FileIO> fileIOCloser; - - protected InternalIcebergCatalogImpl(CatalogMeta metadata, Configurations serverConfiguration) { - super(metadata); - this.httpPort = serverConfiguration.getInteger(AmoroManagementConf.HTTP_SERVER_PORT); - this.exposedHost = serverConfiguration.getString(AmoroManagementConf.SERVER_EXPOSE_HOST); - this.fileIOCloser = newFileIOCloser(); - } - - @Override - public CatalogMeta getMetadata() { - CatalogMeta meta = super.getMetadata(); - if (!meta.getCatalogProperties().containsKey(URI)) { - meta.putToCatalogProperties(URI, defaultRestURI()); - } - meta.putToCatalogProperties(CatalogProperties.CATALOG_IMPL, RESTCatalog.class.getName()); - return meta.deepCopy(); - } - - @Override - public void updateMetadata(CatalogMeta metadata) { - String defaultUrl = defaultRestURI(); - String uri = metadata.getCatalogProperties().getOrDefault(URI, defaultUrl); - if (defaultUrl.equals(uri)) { - metadata.getCatalogProperties().remove(URI); - } - super.updateMetadata(metadata); - } - - @Override - public AmoroTable loadTable(String database, String tableName) { - InternalTableHandler handler; - try { - handler = newTableHandler(database, tableName); - } catch (ObjectNotExistsException e) { - return null; - } - TableMetadata tableMetadata = handler.tableMetadata(); - TableOperations ops = handler.newTableOperator(); - - BaseTable table = - new BaseTable( - ops, - TableIdentifier.of( - tableMetadata.getTableIdentifier().getDatabase(), - tableMetadata.getTableIdentifier().getTableName()) - .toString()); - org.apache.amoro.table.TableIdentifier tableIdentifier = - org.apache.amoro.table.TableIdentifier.of(name(), database, tableName); - AmoroTable amoroTable = - IcebergTable.newIcebergTable( - tableIdentifier, - table, - CatalogUtil.buildMetaStore(getMetadata()), - getMetadata().getCatalogProperties()); - fileIOCloser.put(amoroTable, ops.io()); - return amoroTable; - } - - protected AuthenticatedFileIO fileIO(CatalogMeta catalogMeta) { - return InternalTableUtil.newIcebergFileIo(catalogMeta); - } - - private String defaultRestURI() { - return "http://" + exposedHost + ":" + httpPort + RestCatalogService.ICEBERG_REST_API_PREFIX; - } - - @Override - public InternalTableCreator newTableCreator( - String database, String tableName, TableFormat format, CreateTableRequest creatorArguments) { - - Preconditions.checkArgument( - format == format(), "the catalog only support to create %s table", format().name()); - if (tableExists(database, tableName)) { - throw new AlreadyExistsException( - "Table " + name() + "." + database + "." + tableName + " already " + "exists."); - } - return newTableCreator(database, tableName, creatorArguments); - } - - protected TableFormat format() { - return TableFormat.ICEBERG; - } - - protected InternalTableCreator newTableCreator( - String database, String tableName, CreateTableRequest request) { - return new InternalIcebergCreator(getMetadata(), database, tableName, request); - } - - @Override - public InternalTableHandler newTableHandler(String database, String tableName) { - TableMetadata metadata = loadTableMetadata(database, tableName); - Preconditions.checkState( - metadata.getFormat() == format(), - "the catalog only support to handle %s table", - format().name()); - //noinspection unchecked - return (InternalTableHandler) new InternalIcebergHandler(getMetadata(), metadata); - } - - private Cache, FileIO> newFileIOCloser() { - return Caffeine.newBuilder() - .weakKeys() - .removalListener( - (RemovalListener, FileIO>) - (tbl, fileIO, cause) -> { - if (null != fileIO) { - fileIO.close(); - } - }) - .build(); - } -} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalMixedCatalogImpl.java b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalMixedCatalogImpl.java deleted file mode 100644 index 45d2cfb339..0000000000 --- a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalMixedCatalogImpl.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.server.catalog; - -import static org.apache.amoro.server.table.internal.InternalTableConstants.CHANGE_STORE_TABLE_NAME_SUFFIX; - -import org.apache.amoro.AmoroTable; -import org.apache.amoro.TableFormat; -import org.apache.amoro.api.CatalogMeta; -import org.apache.amoro.config.Configurations; -import org.apache.amoro.io.AuthenticatedFileIO; -import org.apache.amoro.mixed.InternalMixedIcebergCatalog; -import org.apache.amoro.server.persistence.mapper.TableMetaMapper; -import org.apache.amoro.server.table.TableMetadata; -import org.apache.amoro.server.table.internal.InternalMixedIcebergCreator; -import org.apache.amoro.server.table.internal.InternalMixedIcebergHandler; -import org.apache.amoro.server.table.internal.InternalTableCreator; -import org.apache.amoro.server.table.internal.InternalTableHandler; -import org.apache.amoro.server.utils.InternalTableUtil; -import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; -import org.apache.amoro.table.BasicKeyedTable; -import org.apache.amoro.table.BasicUnkeyedTable; -import org.apache.amoro.table.MixedTable; -import org.apache.amoro.table.PrimaryKeySpec; -import org.apache.iceberg.BaseTable; -import org.apache.iceberg.TableOperations; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.rest.requests.CreateTableRequest; - -public class InternalMixedCatalogImpl extends InternalIcebergCatalogImpl { - - protected InternalMixedCatalogImpl(CatalogMeta metadata, Configurations serverConfiguration) { - super(metadata, serverConfiguration); - } - - @Override - protected InternalTableCreator newTableCreator( - String database, String tableName, CreateTableRequest request) { - return new InternalMixedIcebergCreator(getMetadata(), database, tableName, request); - } - - @Override - public InternalTableHandler newTableHandler(String database, String tableStoreName) { - String tableName = tableName(tableStoreName); - boolean isChangeStore = isChangeStoreName(tableStoreName); - TableMetadata metadata = loadTableMetadata(database, tableName); - Preconditions.checkState( - metadata.getFormat() == format(), - "the catalog only support to handle %s table", - format().name()); - //noinspection unchecked - return (InternalTableHandler) newTableStoreHandler(metadata, isChangeStore); - } - - private InternalTableHandler newTableStoreHandler( - TableMetadata metadata, boolean isChangeStore) { - return new InternalMixedIcebergHandler(getMetadata(), metadata, isChangeStore); - } - - private String tableName(String tableStoreName) { - if (isChangeStoreName(tableStoreName)) { - return tableStoreName.substring( - 0, tableStoreName.length() - CHANGE_STORE_TABLE_NAME_SUFFIX.length()); - } - return tableStoreName; - } - - private boolean isChangeStoreName(String tableName) { - String separator = InternalMixedIcebergCatalog.CHANGE_STORE_SEPARATOR; - if (!tableName.contains(separator)) { - return false; - } - Preconditions.checkArgument( - tableName.indexOf(separator) == tableName.lastIndexOf(separator) - && tableName.endsWith(CHANGE_STORE_TABLE_NAME_SUFFIX), - "illegal table name: %s, %s is not allowed in table name.", - tableName, - separator); - - return true; - } - - @Override - public AmoroTable loadTable(String database, String tableName) { - Preconditions.checkArgument( - !isChangeStoreName(tableName), "table name is invalid for load table"); - TableMetadata tableMetadata = - getAs( - TableMetaMapper.class, - mapper -> - mapper.selectTableMetaByName(getMetadata().getCatalogName(), database, tableName)); - if (tableMetadata == null) { - return null; - } - Preconditions.checkArgument( - TableFormat.MIXED_ICEBERG == tableMetadata.getFormat(), - "Table: %s.%s.%s is not a mixed-iceberg table", - name(), - database, - tableName); - - org.apache.amoro.table.TableIdentifier tableIdentifier = - org.apache.amoro.table.TableIdentifier.of(name(), database, tableName); - AuthenticatedFileIO fileIO = InternalTableUtil.newIcebergFileIo(getMetadata()); - MixedTable mixedIcebergTable; - - BaseTable baseTable = loadTableStore(tableMetadata, false); - if (InternalTableUtil.isKeyedMixedTable(tableMetadata)) { - BaseTable changeTable = loadTableStore(tableMetadata, true); - - PrimaryKeySpec.Builder keySpecBuilder = PrimaryKeySpec.builderFor(baseTable.schema()); - tableMetadata.buildTableMeta().getKeySpec().getFields().forEach(keySpecBuilder::addColumn); - PrimaryKeySpec keySpec = keySpecBuilder.build(); - - mixedIcebergTable = - new BasicKeyedTable( - tableMetadata.getTableLocation(), - keySpec, - new BasicKeyedTable.BaseInternalTable( - tableIdentifier, baseTable, fileIO, getMetadata().getCatalogProperties()), - new BasicKeyedTable.ChangeInternalTable( - tableIdentifier, changeTable, fileIO, getMetadata().getCatalogProperties())); - } else { - - mixedIcebergTable = - new BasicUnkeyedTable( - tableIdentifier, baseTable, fileIO, getMetadata().getCatalogProperties()); - } - AmoroTable amoroTable = - new org.apache.amoro.formats.mixed.MixedTable(mixedIcebergTable, TableFormat.MIXED_ICEBERG); - fileIOCloser.put(amoroTable, fileIO); - return amoroTable; - } - - protected TableFormat format() { - return TableFormat.MIXED_ICEBERG; - } - - protected BaseTable loadTableStore(TableMetadata tableMetadata, boolean isChangeStore) { - TableOperations ops = newTableStoreHandler(tableMetadata, isChangeStore).newTableOperator(); - return new BaseTable( - ops, - TableIdentifier.of( - tableMetadata.getTableIdentifier().getDatabase(), - tableMetadata.getTableIdentifier().getTableName()) - .toString()); - } -} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/MixedHiveCatalogImpl.java b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/MixedHiveCatalogImpl.java deleted file mode 100644 index 0d1cc9b1cb..0000000000 --- a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/MixedHiveCatalogImpl.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.server.catalog; - -import org.apache.amoro.AmoroTable; -import org.apache.amoro.TableFormat; -import org.apache.amoro.api.CatalogMeta; -import org.apache.amoro.formats.mixed.MixedTable; -import org.apache.amoro.hive.CachedHiveClientPool; -import org.apache.amoro.hive.HMSClient; -import org.apache.amoro.hive.catalog.MixedHiveTables; -import org.apache.amoro.server.persistence.mapper.TableMetaMapper; -import org.apache.amoro.server.table.TableMetadata; -import org.apache.amoro.server.table.internal.InternalTableCreator; -import org.apache.amoro.server.table.internal.InternalTableHandler; -import org.apache.amoro.utils.CatalogUtil; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.iceberg.rest.requests.CreateTableRequest; -import org.apache.thrift.TException; - -import java.util.List; - -public class MixedHiveCatalogImpl extends InternalCatalog { - protected MixedHiveTables tables; - private volatile CachedHiveClientPool hiveClientPool; - - protected MixedHiveCatalogImpl(CatalogMeta catalogMeta) { - super(catalogMeta); - this.tables = - new MixedHiveTables( - catalogMeta.getCatalogProperties(), CatalogUtil.buildMetaStore(catalogMeta)); - hiveClientPool = ((MixedHiveTables) tables()).getHiveClientPool(); - } - - @Override - public void updateMetadata(CatalogMeta metadata) { - super.updateMetadata(metadata); - hiveClientPool = ((MixedHiveTables) tables()).getHiveClientPool(); - this.tables = - new MixedHiveTables(metadata.getCatalogProperties(), CatalogUtil.buildMetaStore(metadata)); - } - - @Override - public AmoroTable loadTable(String database, String tableName) { - TableMetadata tableMetadata = - getAs( - TableMetaMapper.class, - mapper -> - mapper.selectTableMetaByName(getMetadata().getCatalogName(), database, tableName)); - if (tableMetadata == null) { - return null; - } - return new MixedTable( - tables.loadTableByMeta(tableMetadata.buildTableMeta()), TableFormat.MIXED_HIVE); - } - - @Override - public void createDatabase(String databaseName) { - // do not handle database operations - } - - @Override - public void dropDatabase(String databaseName) { - // do not handle database operations - } - - @Override - public InternalTableCreator newTableCreator( - String database, String tableName, TableFormat format, CreateTableRequest creatorArguments) { - throw new UnsupportedOperationException(); - } - - @Override - public InternalTableHandler newTableHandler(String database, String tableName) { - throw new UnsupportedOperationException(); - } - - @Override - protected void decreaseDatabaseTableCount(String databaseName) { - // do not handle database operations - } - - @Override - protected void increaseDatabaseTableCount(String databaseName) { - // do not handle database operations - } - - @Override - public boolean databaseExists(String database) { - try { - return hiveClientPool.run( - client -> { - try { - client.getDatabase(database); - return true; - } catch (NoSuchObjectException exception) { - return false; - } - }); - } catch (TException | InterruptedException e) { - throw new RuntimeException("Failed to get databases", e); - } - } - - @Override - public List listDatabases() { - try { - return hiveClientPool.run(HMSClient::getAllDatabases); - } catch (TException | InterruptedException e) { - throw new RuntimeException("Failed to list databases", e); - } - } - - public CachedHiveClientPool getHiveClient() { - return hiveClientPool; - } - - private MixedHiveTables tables() { - return tables; - } -} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/terminal/TerminalManager.java b/amoro-ams/src/main/java/org/apache/amoro/server/terminal/TerminalManager.java index 7ed15669cb..64dba7a82e 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/terminal/TerminalManager.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/terminal/TerminalManager.java @@ -252,10 +252,11 @@ public void dispose() { private String catalogConnectorType(CatalogMeta catalogMeta) { String catalogType = catalogMeta.getCatalogType(); Set tableFormatSet = CatalogUtil.tableFormats(catalogMeta); - if (catalogType.equalsIgnoreCase(CatalogType.AMS.name())) { - if (tableFormatSet.contains(TableFormat.MIXED_ICEBERG)) { - return "arctic"; + if (tableFormatSet.size() > 1) { + return "unified"; + } else if (tableFormatSet.contains(TableFormat.MIXED_ICEBERG)) { + return "mixed_iceberg"; } else if (tableFormatSet.contains(TableFormat.ICEBERG)) { return "iceberg"; } @@ -263,9 +264,10 @@ private String catalogConnectorType(CatalogMeta catalogMeta) { || catalogType.equalsIgnoreCase(CatalogType.HADOOP.name())) { if (tableFormatSet.size() > 1) { return "unified"; - } else if (tableFormatSet.contains(TableFormat.MIXED_HIVE) - || tableFormatSet.contains(TableFormat.MIXED_ICEBERG)) { - return "arctic"; + } else if (tableFormatSet.contains(TableFormat.MIXED_ICEBERG)) { + return "mixed_iceberg"; + } else if (tableFormatSet.contains(TableFormat.MIXED_HIVE)) { + return "mixed_hive"; } else if (tableFormatSet.contains(TableFormat.ICEBERG)) { return "iceberg"; } else if (tableFormatSet.contains(TableFormat.PAIMON)) { diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/mixed/MixedIcebergCatalogFactory.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/mixed/MixedIcebergCatalogFactory.java index b8b0d4ffb4..90fea3e7e7 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/mixed/MixedIcebergCatalogFactory.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/mixed/MixedIcebergCatalogFactory.java @@ -18,14 +18,16 @@ package org.apache.amoro.formats.mixed; +import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE; + import org.apache.amoro.FormatCatalog; import org.apache.amoro.FormatCatalogFactory; import org.apache.amoro.TableFormat; import org.apache.amoro.mixed.CatalogLoader; import org.apache.amoro.mixed.MixedFormatCatalog; import org.apache.amoro.properties.CatalogMetaProperties; +import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; import org.apache.amoro.table.TableMetaStore; -import org.apache.amoro.utils.MixedFormatCatalogUtil; import java.util.Map; @@ -49,9 +51,8 @@ public TableFormat format() { @Override public Map convertCatalogProperties( String catalogName, String metastoreType, Map unifiedCatalogProperties) { - Map properties = - MixedFormatCatalogUtil.withIcebergCatalogInitializeProperties( - catalogName, metastoreType, unifiedCatalogProperties); + Map properties = Maps.newHashMap(unifiedCatalogProperties); + properties.put(ICEBERG_CATALOG_TYPE, metastoreType); properties.put(CatalogMetaProperties.TABLE_FORMATS, format().name()); return properties; } diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/mixed/BasicMixedIcebergCatalog.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/mixed/BasicMixedIcebergCatalog.java index 033eb2e163..3b90493a69 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/mixed/BasicMixedIcebergCatalog.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/mixed/BasicMixedIcebergCatalog.java @@ -18,6 +18,8 @@ package org.apache.amoro.mixed; +import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE; + import org.apache.amoro.AmsClient; import org.apache.amoro.PooledAmsClient; import org.apache.amoro.io.AuthenticatedFileIO; @@ -76,11 +78,16 @@ public void initialize(String name, Map properties, TableMetaSto String databaseFilter = properties.get(CatalogMetaProperties.KEY_DATABASE_FILTER); databaseFilterPattern = Pattern.compile(databaseFilter); } + String metastoreType = properties.get(ICEBERG_CATALOG_TYPE); + Map icebergCatalogProperties = + MixedFormatCatalogUtil.withIcebergCatalogInitializeProperties( + name, metastoreType, properties); org.apache.iceberg.catalog.Catalog catalog = - buildIcebergCatalog(name, properties, metaStore.getConfiguration()); + buildIcebergCatalog(name, icebergCatalogProperties, metaStore.getConfiguration()); this.name = name; this.tableMetaStore = metaStore; - this.icebergCatalog = MixedFormatCatalogUtil.buildCacheCatalog(catalog, properties); + this.icebergCatalog = + MixedFormatCatalogUtil.buildCacheCatalog(catalog, icebergCatalogProperties); if (catalog instanceof SupportsNamespaces) { this.asNamespaceCatalog = (SupportsNamespaces) catalog; } diff --git a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/catalog/MixedHiveCatalog.java b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/catalog/MixedHiveCatalog.java index 5b04e54b64..6e43e0fb81 100644 --- a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/catalog/MixedHiveCatalog.java +++ b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/catalog/MixedHiveCatalog.java @@ -112,7 +112,7 @@ public void initialize(String name, Map properties, TableMetaSto this.catalogProperties = properties; this.tableMetaStore = metaStore; this.tables = newMixedHiveTables(properties, metaStore); - this.hiveClientPool = ((MixedHiveTables) tables).getHiveClientPool(); + this.hiveClientPool = tables.getHiveClientPool(); } protected MixedHiveTables getTables() { diff --git a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSparkCatalogBase.java b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSparkCatalogBase.java index 952375870e..8276a6a75d 100644 --- a/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSparkCatalogBase.java +++ b/amoro-format-mixed/amoro-mixed-spark/amoro-mixed-spark-3-common/src/main/java/org/apache/amoro/spark/mixed/MixedSparkCatalogBase.java @@ -20,9 +20,11 @@ import static org.apache.amoro.spark.mixed.SparkSQLProperties.REFRESH_CATALOG_BEFORE_USAGE; import static org.apache.amoro.spark.mixed.SparkSQLProperties.REFRESH_CATALOG_BEFORE_USAGE_DEFAULT; +import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE; import org.apache.amoro.mixed.CatalogLoader; import org.apache.amoro.mixed.MixedFormatCatalog; +import org.apache.amoro.properties.CatalogMetaProperties; import org.apache.amoro.shade.guava32.com.google.common.base.Joiner; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; @@ -63,10 +65,10 @@ public void setAuthenticationContext(TableMetaStore tableMetaStore) { @Override public final void initialize(String name, CaseInsensitiveStringMap options) { this.catalogName = name; - String catalogUrl = options.get("ams.uri"); + String catalogUrl = options.get(CatalogMetaProperties.AMS_URI); if (StringUtils.isNotBlank(catalogUrl)) { // initialize for unified catalog. - String metastoreType = options.get("type"); + String metastoreType = options.get(ICEBERG_CATALOG_TYPE); String registerName = options.get("register-name"); Preconditions.checkArgument( StringUtils.isNotEmpty(metastoreType),