Skip to content

Commit

Permalink
[CALCITE-6728] Introduce new methods to lookup tables and schemas ins…
Browse files Browse the repository at this point in the history
…ide schemas
  • Loading branch information
kramerul committed Dec 19, 2024
1 parent 68d7a78 commit c204e6f
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import javax.sql.DataSource;
Expand Down Expand Up @@ -68,8 +69,8 @@ public class JdbcCatalogSchema extends JdbcBaseSchema implements Wrapper {

/** default schema name, lazily initialized. */
@SuppressWarnings({"method.invocation.invalid", "Convert2MethodRef"})
private final Supplier<String> defaultSchemaName =
Suppliers.memoize(() -> computeDefaultSchemaName());
private final Supplier<Optional<String>> defaultSchemaName =
Suppliers.memoize(() -> Optional.ofNullable(computeDefaultSchemaName()));

/** Creates a JdbcCatalogSchema. */
public JdbcCatalogSchema(DataSource dataSource, SqlDialect dialect,
Expand Down Expand Up @@ -150,7 +151,7 @@ public static JdbcCatalogSchema create(
return subSchemas;
}

private String computeDefaultSchemaName() {
private @Nullable String computeDefaultSchemaName() {
try (Connection connection = dataSource.getConnection()) {
return connection.getSchema();
} catch (SQLException e) {
Expand All @@ -160,7 +161,7 @@ private String computeDefaultSchemaName() {

/** Returns the name of the default sub-schema. */
public @Nullable String getDefaultSubSchemaName() {
return defaultSchemaName.get();
return defaultSchemaName.get().orElse(null);
}

/** Returns the data source. */
Expand Down
33 changes: 15 additions & 18 deletions core/src/main/java/org/apache/calcite/adapter/jdbc/JdbcSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.Wrapper;
import org.apache.calcite.schema.lookup.IgnoreCaseLookup;
import org.apache.calcite.schema.lookup.LazyReference;
import org.apache.calcite.schema.lookup.LikePattern;
import org.apache.calcite.schema.lookup.Lookup;
import org.apache.calcite.sql.SqlDialect;
Expand All @@ -47,7 +48,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;

import org.checkerframework.checker.initialization.qual.UnknownInitialization;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -88,7 +88,7 @@ public class JdbcSchema extends JdbcBaseSchema implements Schema, Wrapper {
final @Nullable String schema;
public final SqlDialect dialect;
final JdbcConvention convention;
private final Lookup<Table> tables;
private final LazyReference<Lookup<Table>> tables = new LazyReference<>();
private final Lookup<JdbcSchema> subSchemas = Lookup.empty();

@Experimental
Expand All @@ -113,21 +113,6 @@ public JdbcSchema(DataSource dataSource, SqlDialect dialect,
this.convention = convention;
this.catalog = catalog;
this.schema = schema;
@UnknownInitialization
JdbcSchema self = this;
this.tables = new IgnoreCaseLookup<Table>() {
@Override public @Nullable Table get(String name) {
try (Stream<MetaImpl.MetaTable> s = self.getMetaTableStream(name)) {
return s.findFirst().map(it -> jdbcTableMapper(it)).orElse(null);
}
}

@Override public Set<String> getNames(LikePattern pattern) {
try (Stream<MetaImpl.MetaTable> s = self.getMetaTableStream(pattern.pattern)) {
return s.map(it -> it.tableName).collect(Collectors.toSet());
}
}
};
}

public static JdbcSchema create(
Expand Down Expand Up @@ -229,7 +214,19 @@ public static DataSource dataSource(String url, @Nullable String driverClassName
}

@Override public Lookup<Table> tables() {
return tables;
return tables.getOrCompute(() -> new IgnoreCaseLookup<Table>() {
@Override public @Nullable Table get(String name) {
try (Stream<MetaImpl.MetaTable> s = getMetaTableStream(name)) {
return s.findFirst().map(it -> jdbcTableMapper(it)).orElse(null);
}
}

@Override public Set<String> getNames(LikePattern pattern) {
try (Stream<MetaImpl.MetaTable> s = getMetaTableStream(pattern.pattern)) {
return s.map(it -> it.tableName).collect(Collectors.toSet());
}
}
});
}

@Override public Lookup<? extends Schema> subSchemas() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.ImmutableSortedSet;

import org.checkerframework.checker.initialization.qual.UnderInitialization;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.Collection;
Expand Down Expand Up @@ -101,8 +100,7 @@ private SimpleCalciteSchema(@Nullable CalciteSchema parent,
return null;
}

@Override protected CalciteSchema createSubSchema(@UnderInitialization SimpleCalciteSchema this,
Schema schema, String name) {
@Override protected CalciteSchema createSubSchema(Schema schema, String name) {
return new SimpleCalciteSchema(this, schema, name);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.calcite.schema.TemporalTable;
import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.schema.Wrapper;
import org.apache.calcite.schema.lookup.LazyReference;
import org.apache.calcite.schema.lookup.LikePattern;
import org.apache.calcite.schema.lookup.Lookup;
import org.apache.calcite.sql.SqlAccessType;
Expand All @@ -60,7 +61,6 @@

import com.google.common.collect.ImmutableList;

import org.checkerframework.checker.initialization.qual.UnknownInitialization;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.AbstractList;
Expand Down Expand Up @@ -432,16 +432,13 @@ private static class MySchemaPlus implements SchemaPlus {
private final @Nullable SchemaPlus parent;
private final String name;
private final Schema schema;
private final Lookup<? extends SchemaPlus> subSchemas;
private final LazyReference<Lookup<? extends SchemaPlus>> subSchemas = new LazyReference<>();


MySchemaPlus(@Nullable SchemaPlus parent, String name, Schema schema) {
this.parent = parent;
this.name = name;
this.schema = schema;
@UnknownInitialization
MySchemaPlus self = this;
this.subSchemas = schema.subSchemas().map((s, key) -> new MySchemaPlus(self, key, s));
}

public static MySchemaPlus create(Path path) {
Expand All @@ -463,8 +460,8 @@ public static MySchemaPlus create(Path path) {
return name;
}

@Deprecated @Override public @Nullable SchemaPlus getSubSchema(String name) {
return subSchemas.get(name);
@Override public @Nullable SchemaPlus getSubSchema(String name) {
return subSchemas().get(name);
}

@Override public SchemaPlus add(String name, Schema schema) {
Expand Down Expand Up @@ -517,7 +514,8 @@ public static MySchemaPlus create(Path path) {
}

@Override public Lookup<? extends SchemaPlus> subSchemas() {
return subSchemas;
return subSchemas.getOrCompute(
() -> schema.subSchemas().map((s, key) -> new MySchemaPlus(this, key, s)));
}

@Override public @Nullable Table getTable(String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.lookup.CompatibilityLookup;
import org.apache.calcite.schema.lookup.LazyReference;
import org.apache.calcite.schema.lookup.Lookup;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;

import org.checkerframework.checker.initialization.qual.UnknownInitialization;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.Collection;
Expand Down Expand Up @@ -60,23 +60,17 @@
*/
public class AbstractSchema implements Schema {

private Lookup<Table> tables = new CompatibilityLookup<>(this::getTable, this::getTableNames);
private Lookup<Schema> subSchemas =
new CompatibilityLookup<>(this::getSubSchema, this::getSubSchemaNames);

public AbstractSchema() {
@UnknownInitialization
AbstractSchema self = this;
tables = new CompatibilityLookup<>(self::getTable, self::getTableNames);
subSchemas = new CompatibilityLookup<>(self::getSubSchema, self::getSubSchemaNames);
}
private LazyReference<Lookup<Table>> tables = new LazyReference<>();
private LazyReference<Lookup<Schema>> subSchemas = new LazyReference<>();

@Override public Lookup<Table> tables() {
return tables;
return tables.getOrCompute(
() -> new CompatibilityLookup<>(this::getTable, this::getTableNames));
}

@Override public Lookup<? extends Schema> subSchemas() {
return subSchemas;
return subSchemas.getOrCompute(
() -> new CompatibilityLookup<>(this::getSubSchema, this::getSubSchemaNames));
}

@Override public boolean isMutable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

/**
* This class can be used to make a snapshot of a lookups.
Expand All @@ -30,12 +31,11 @@
public class CachedLookup<T> implements Lookup<T> {

private final Lookup<T> delegate;
private volatile Lookup<T> cachedDelegate;
private AtomicReference<Lookup<T>> cachedDelegate = new AtomicReference<>();
private boolean enabled = true;

public CachedLookup(Lookup<T> delegate) {
this.delegate = delegate;
this.cachedDelegate = null;
}

@Override public @Nullable T get(final String name) {
Expand All @@ -54,26 +54,28 @@ private Lookup<T> delegate() {
if (!enabled) {
return delegate;
}
if (cachedDelegate == null) {
synchronized (this) {
if (cachedDelegate == null) {
NameMap<T> map = new NameMap<>();
for (String name : delegate.getNames(LikePattern.any())) {
T entry = delegate.get(name);
if (entry != null) {
map.put(name, delegate.get(name));
}
}
cachedDelegate = new NameMapLookup<>(map);
while (true) {
Lookup<T> cached = cachedDelegate.get();
if (cached != null) {
return cached;
}
NameMap<T> map = new NameMap<>();
for (String name : delegate.getNames(LikePattern.any())) {
T entry = delegate.get(name);
if (entry != null) {
map.put(name, delegate.get(name));
}
}
cached = new NameMapLookup<>(map);
if (cachedDelegate.compareAndSet(null, cached)) {
return cached;
}
}
return cachedDelegate;
}

public void enable(boolean enabled) {
if (!enabled) {
cachedDelegate = null;
cachedDelegate.set(null);
}
this.enabled = enabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@
*/
public abstract class IgnoreCaseLookup<T> implements Lookup<T> {

private volatile NameMap<String> nameMap = null;

public IgnoreCaseLookup() {
}
private LazyReference<NameMap<String>> nameMap = new LazyReference<>();

/**
* Returns a named entity with a given name, or null if not found.
Expand All @@ -50,31 +47,30 @@ public IgnoreCaseLookup() {
* @return Entity, or null
*/
@Override @Nullable public Named<T> getIgnoreCase(String name) {
Map.Entry<String, String> entry = getNameMap(false).range(name, false).firstEntry();
if (entry == null) {
entry = getNameMap(true).range(name, false).firstEntry();
if (entry == null) {
int retryCounter = 0;
while (true) {
Map.Entry<String, String> entry = nameMap.getOrCompute(this::loadNames)
.range(name, false)
.firstEntry();
if (entry != null) {
T result = get(entry.getValue());
return result == null ? null : new Named<>(entry.getKey(), result);
}
retryCounter++;
if (retryCounter > 1) {
return null;
}
nameMap.reset();
}
T result = get(entry.getValue());
return result == null ? null : new Named<>(entry.getKey(), result);
}

@Override public abstract Set<String> getNames(LikePattern pattern);

private NameMap<String> getNameMap(boolean forceReload) {
if (nameMap == null || forceReload) {
synchronized (this) {
if (nameMap == null || forceReload) {
NameMap<String> tmp = new NameMap<>();
for (String name : getNames(LikePattern.any())) {
tmp.put(name, name);
}
nameMap = tmp;
}
}
private NameMap<String> loadNames() {
NameMap<String> result = new NameMap<>();
for (String name : getNames(LikePattern.any())) {
result.put(name, name);
}
return nameMap;
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,8 @@ public T getOrCompute(Supplier<T> supplier) {
}
}
}

public void reset() {
value.set(null);
}
}

0 comments on commit c204e6f

Please sign in to comment.