Skip to content

Commit

Permalink
[AMORO-3189] Support loading mixed catalog without ams url on Spark (#…
Browse files Browse the repository at this point in the history
…3190)

* Spark support load mixed catalog without ams url

* Add docs and unit tests

* Fix checkstyle errors

* Fix a mixed format spark catalog unit test error

* Fix mixed spark 3.2 runtime jar includes issue

* Fix a doc error
  • Loading branch information
zhoujinsong committed Sep 19, 2024
1 parent 165168a commit 8b9050e
Show file tree
Hide file tree
Showing 12 changed files with 234 additions and 211 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,48 @@

public class CommonUnifiedCatalog implements UnifiedCatalog {

private final String catalogName;
private final String metaStoreType;
private final Supplier<CatalogMeta> metaSupplier;
// Client side catalog properties
private final Map<String, String> clientProperties;
// Catalog properties after merging the server and client
private Map<String, String> catalogProperties;
private CatalogMeta meta;
private Map<TableFormat, FormatCatalog> formatCatalogs = Maps.newHashMap();
private final Map<String, String> properties = Maps.newHashMap();

private TableMetaStore tableMetaStore;
private Map<TableFormat, FormatCatalog> formatCatalogs = Maps.newHashMap();

public CommonUnifiedCatalog(
Supplier<CatalogMeta> catalogMetaSupplier, Map<String, String> properties) {
CatalogMeta catalogMeta = catalogMetaSupplier.get();
CatalogUtil.mergeCatalogProperties(catalogMeta, properties);
this.meta = catalogMeta;
this.catalogName = catalogMeta.getCatalogName();
this.metaStoreType = catalogMeta.getCatalogType();
this.tableMetaStore = CatalogUtil.buildMetaStore(catalogMeta);
this.properties.putAll(properties);
this.clientProperties = properties;
this.catalogProperties = catalogMeta.getCatalogProperties();
this.metaSupplier = catalogMetaSupplier;
initializeFormatCatalogs();
}

public CommonUnifiedCatalog(
String catalogName,
String metaStoreType,
Map<String, String> properties,
TableMetaStore tableMetaStore) {
this.catalogName = catalogName;
this.metaStoreType = metaStoreType;
this.tableMetaStore = tableMetaStore;
this.clientProperties = properties;
this.catalogProperties = properties;
this.metaSupplier = null;
initializeFormatCatalogs();
}

@Override
public String metastoreType() {
return meta.getCatalogType();
return metaStoreType;
}

@Override
Expand Down Expand Up @@ -126,7 +147,7 @@ public AmoroTable<?> loadTable(String database, String table) {

@Override
public String name() {
return this.meta.getCatalogName();
return catalogName;
}

@Override
Expand Down Expand Up @@ -158,7 +179,7 @@ public List<TableIDWithFormat> listTables(String database) {
tableName -> {
TableFormat format = tableNameToFormat.get(tableName);
return TableIDWithFormat.of(
TableIdentifier.of(this.meta.getCatalogName(), database, tableName), format);
TableIdentifier.of(catalogName, database, tableName), format);
})
.collect(Collectors.toList());
}
Expand All @@ -175,33 +196,34 @@ public boolean dropTable(String database, String table, boolean purge) {

@Override
public synchronized void refresh() {
CatalogMeta newMeta = metaSupplier.get();
CatalogUtil.mergeCatalogProperties(meta, properties);
if (newMeta.equals(this.meta)) {
return;
if (metaSupplier != null) {
CatalogMeta newMeta = metaSupplier.get();
CatalogUtil.mergeCatalogProperties(meta, clientProperties);
if (newMeta.equals(this.meta)) {
return;
}
this.catalogProperties = newMeta.getCatalogProperties();
this.tableMetaStore = CatalogUtil.buildMetaStore(newMeta);
this.meta = newMeta;
this.initializeFormatCatalogs();
}
this.tableMetaStore = CatalogUtil.buildMetaStore(newMeta);
this.meta = newMeta;
this.initializeFormatCatalogs();
}

@Override
public Map<String, String> properties() {
return this.meta.getCatalogProperties();
return catalogProperties;
}

protected void initializeFormatCatalogs() {
ServiceLoader<FormatCatalogFactory> loader = ServiceLoader.load(FormatCatalogFactory.class);
Set<TableFormat> formats = CatalogUtil.tableFormats(this.meta);
TableMetaStore store = CatalogUtil.buildMetaStore(this.meta);
Set<TableFormat> formats = CatalogUtil.tableFormats(metaStoreType, catalogProperties);
Map<TableFormat, FormatCatalog> formatCatalogs = Maps.newConcurrentMap();
for (FormatCatalogFactory factory : loader) {
if (formats.contains(factory.format())) {
Map<String, String> catalogProperties =
factory.convertCatalogProperties(
name(), meta.getCatalogType(), meta.getCatalogProperties());
Map<String, String> formatCatalogProperties =
factory.convertCatalogProperties(name(), metaStoreType, this.catalogProperties);
FormatCatalog catalog =
factory.create(name(), meta.getCatalogType(), catalogProperties, store);
factory.create(name(), metaStoreType, formatCatalogProperties, tableMetaStore);
formatCatalogs.put(factory.format(), catalog);
}
}
Expand Down
124 changes: 60 additions & 64 deletions amoro-common/src/main/java/org/apache/amoro/table/TableMetaStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.amoro.table;

import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.base.Charsets;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.base.Strings;
Expand All @@ -39,7 +38,6 @@
import sun.security.krb5.KrbException;

import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.Serializable;
Expand All @@ -48,6 +46,7 @@
import java.lang.reflect.Field;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.PrivilegedAction;
Expand All @@ -70,7 +69,7 @@ public class TableMetaStore implements Serializable {
new ConcurrentHashMap<>();

public static final TableMetaStore EMPTY =
TableMetaStore.builder().withConfiguration(new Configuration()).buildForTest();
TableMetaStore.builder().withConfiguration(new Configuration()).build();

public static final String HADOOP_CONF_DIR = "conf.hadoop.dir";
public static final String HIVE_SITE = "hive-site";
Expand Down Expand Up @@ -126,14 +125,12 @@ public class TableMetaStore implements Serializable {
private final byte[] krbConf;
private final String krbPrincipal;
private final boolean disableAuth;
private final String accessKey;
private final String secretKey;

private transient RuntimeContext runtimeContext;
private transient String authInformation;

private String accessKey;

private String secretKey;

public static Builder builder() {
return new Builder();
}
Expand Down Expand Up @@ -170,6 +167,21 @@ private TableMetaStore(
this.secretKey = secretKey;
}

private TableMetaStore(Configuration configuration) {
this.disableAuth = true;
this.metaStoreSite = null;
this.hdfsSite = null;
this.coreSite = null;
this.authMethod = null;
this.hadoopUsername = null;
this.krbKeyTab = null;
this.krbConf = null;
this.krbPrincipal = null;
this.accessKey = null;
this.secretKey = null;
getRuntimeContext().setConfiguration(configuration);
}

public byte[] getMetaStoreSite() {
return metaStoreSite;
}
Expand Down Expand Up @@ -218,7 +230,7 @@ public synchronized Configuration getConfiguration() {
return getRuntimeContext().getConfiguration();
}

public synchronized UserGroupInformation getUGI() {
private synchronized UserGroupInformation getUGI() {
return getRuntimeContext().getUGI();
}

Expand Down Expand Up @@ -567,7 +579,6 @@ public static class Builder {
private byte[] krbKeyTab;
private byte[] krbConf;
private String krbPrincipal;

private String accessKey;
private String secretKey;
private boolean disableAuth = true;
Expand Down Expand Up @@ -710,7 +721,7 @@ public Builder withConfiguration(Configuration configuration) {

private byte[] readBytesFromFile(String filePath) {
try {
return IOUtils.toByteArray(new FileInputStream(filePath));
return IOUtils.toByteArray(Files.newInputStream(Paths.get(filePath)));
} catch (IOException e) {
throw new UncheckedIOException("Read config failed:" + filePath, e);
}
Expand Down Expand Up @@ -751,61 +762,46 @@ private void readProperties() {
}

public TableMetaStore build() {
readProperties();
if (!disableAuth & !AUTH_METHOD_AK_SK.equals(authMethod)) {
Preconditions.checkNotNull(hdfsSite);
Preconditions.checkNotNull(coreSite);
}
if (AUTH_METHOD_SIMPLE.equals(authMethod)) {
Preconditions.checkNotNull(hadoopUsername);
} else if (AUTH_METHOD_KERBEROS.equals(authMethod)) {
Preconditions.checkNotNull(krbConf);
Preconditions.checkNotNull(krbKeyTab);
Preconditions.checkNotNull(krbPrincipal);
} else if (AUTH_METHOD_AK_SK.equals(authMethod)) {
Preconditions.checkNotNull(accessKey);
Preconditions.checkNotNull(secretKey);
} else if (authMethod != null) {
throw new IllegalArgumentException("Unsupported auth method:" + authMethod);
}
if (disableAuth && configuration != null) {
LOG.info("Build table meta store with local configuration:" + configuration);
return new TableMetaStore(configuration);
} else {
readProperties();
if (!AUTH_METHOD_AK_SK.equals(authMethod)) {
Preconditions.checkNotNull(hdfsSite);
Preconditions.checkNotNull(coreSite);
}
if (AUTH_METHOD_SIMPLE.equals(authMethod)) {
Preconditions.checkNotNull(hadoopUsername);
} else if (AUTH_METHOD_KERBEROS.equals(authMethod)) {
Preconditions.checkNotNull(krbConf);
Preconditions.checkNotNull(krbKeyTab);
Preconditions.checkNotNull(krbPrincipal);
} else if (AUTH_METHOD_AK_SK.equals(authMethod)) {
Preconditions.checkNotNull(accessKey);
Preconditions.checkNotNull(secretKey);
} else if (authMethod != null) {
throw new IllegalArgumentException("Unsupported auth method:" + authMethod);
}

LOG.info(
"Construct TableMetaStore with authMethod:{}, hadoopUsername:{}, krbPrincipal:{}",
authMethod,
hadoopUsername,
krbPrincipal);
return new TableMetaStore(
metaStoreSite,
hdfsSite,
coreSite,
authMethod,
hadoopUsername,
krbKeyTab,
krbConf,
krbPrincipal,
accessKey,
secretKey,
disableAuth);
}

@VisibleForTesting
public TableMetaStore buildForTest() {
readProperties();
TableMetaStore tableMetaStore =
new TableMetaStore(
metaStoreSite,
hdfsSite,
coreSite,
authMethod,
hadoopUsername,
krbKeyTab,
krbConf,
krbPrincipal,
accessKey,
secretKey,
disableAuth);
tableMetaStore.getRuntimeContext().setConfiguration(configuration);
return tableMetaStore;
LOG.info(
"Build table meta store with configurations: authMethod:{}, hadoopUsername:{}, krbPrincipal:{}",
authMethod,
hadoopUsername,
krbPrincipal);
return new TableMetaStore(
metaStoreSite,
hdfsSite,
coreSite,
authMethod,
hadoopUsername,
krbKeyTab,
krbConf,
krbPrincipal,
accessKey,
secretKey,
disableAuth);
}
}
}
}
Loading

0 comments on commit 8b9050e

Please sign in to comment.