Skip to content

Commit

Permalink
Support common jdbc catalog lock for filesystem catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Mar 28, 2024
1 parent 3237e1a commit d2d4abc
Show file tree
Hide file tree
Showing 15 changed files with 302 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public abstract class AbstractCatalog implements Catalog {
public static final String DB_SUFFIX = ".db";
protected static final String TABLE_DEFAULT_OPTION_PREFIX = "table-default.";
protected static final String DB_LOCATION_PROP = "location";
protected static final String LOCK_PROP_PREFIX = "lock.";

protected final FileIO fileIO;
protected final Map<String, String> tableDefaultOptions;
Expand Down Expand Up @@ -105,6 +106,30 @@ public Optional<CatalogLockFactory> defaultLockFactory() {
return Optional.empty();
}

@Override
public Optional<CatalogLockContextFactory> lockContextFactory() {
String lock = catalogOptions.get(LOCK_TYPE);
if (lock == null) {
return Optional.empty();
}
return Optional.of(
FactoryUtil.discoverFactory(
AbstractCatalog.class.getClassLoader(),
CatalogLockContextFactory.class,
lock));
}

public Options extractLockConfiguration(Map<String, String> properties) {
Map<String, String> result = new HashMap<>();
properties.forEach(
(key, value) -> {
if (key.startsWith(LOCK_PROP_PREFIX)) {
result.put(key.substring(LOCK_PROP_PREFIX.length()), value);
}
});
return Options.fromMap(result);
}

@Override
public Optional<CatalogLockContext> lockContext() {
return Optional.of(CatalogLockContext.fromOptions(catalogOptions));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public interface Catalog extends AutoCloseable {
*/
Optional<CatalogLockFactory> lockFactory();

/** Get lock context factory for lock factory to create a lock. */
default Optional<CatalogLockContextFactory> lockContextFactory() {
return Optional.empty();
}

/** Get lock context for lock factory to create a lock. */
default Optional<CatalogLockContext> lockContext() {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.catalog;

import org.apache.paimon.factories.Factory;
import org.apache.paimon.options.Options;

import java.io.Serializable;

/** Context for lock context factory to create lock. */
public interface CatalogLockContextFactory extends Factory, Serializable {
CatalogLockContext createLockContext(Options lockOptions, boolean closeConnectionsUsed);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;

import static org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE;
Expand Down Expand Up @@ -198,4 +199,13 @@ public String warehouse() {
public boolean caseSensitive() {
return catalogOptions.get(CASE_SENSITIVE);
}

@Override
public Optional<CatalogLockContext> lockContext() {
return lockContextFactory()
.map(
factory ->
factory.createLockContext(
extractLockConfiguration(catalogOptions.toMap()), true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
Expand All @@ -57,6 +58,7 @@
import static org.apache.paimon.jdbc.JdbcUtils.execute;
import static org.apache.paimon.jdbc.JdbcUtils.insertProperties;
import static org.apache.paimon.jdbc.JdbcUtils.updateTable;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;

/* This file is based on source code from the Iceberg Project (http://iceberg.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
Expand Down Expand Up @@ -350,7 +352,13 @@ public Optional<CatalogLockFactory> defaultLockFactory() {

@Override
public Optional<CatalogLockContext> lockContext() {
return Optional.of(new JdbcCatalogLockContext(connections, catalogKey, options));
String lock = catalogOptions.get(LOCK_TYPE).toLowerCase();
if (StringUtils.isBlank(lock) || lock.equals(JdbcCatalogLockFactory.IDENTIFIER)) {
return Optional.of(new JdbcCatalogLockContext(connections, options, false));
} else {
return lockContextFactory()
.map(factory -> factory.createLockContext(catalogOptions, true));
}
}

private Lock lock(Identifier identifier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class JdbcCatalogLock implements CatalogLock {
private final long checkMaxSleep;
private final long acquireTimeout;
private final String catalogKey;
private final boolean closeConnectionsUsed;

public JdbcCatalogLock(
JdbcClientPool connections,
Expand All @@ -46,6 +47,20 @@ public JdbcCatalogLock(
this.checkMaxSleep = checkMaxSleep;
this.acquireTimeout = acquireTimeout;
this.catalogKey = catalogKey;
this.closeConnectionsUsed = false;
}

public JdbcCatalogLock(
JdbcClientPool connections,
String catalogKey,
long checkMaxSleep,
long acquireTimeout,
boolean closeConnectionsUsed) {
this.connections = connections;
this.checkMaxSleep = checkMaxSleep;
this.acquireTimeout = acquireTimeout;
this.catalogKey = catalogKey;
this.closeConnectionsUsed = closeConnectionsUsed;
}

@Override
Expand Down Expand Up @@ -83,7 +98,9 @@ private void lock(String lockUniqueName) throws SQLException, InterruptedExcepti

@Override
public void close() throws IOException {
// Do nothing
if (closeConnectionsUsed) {
connections.close();
}
}

public static long checkMaxSleep(Map<String, String> conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,59 @@
package org.apache.paimon.jdbc;

import org.apache.paimon.catalog.CatalogLockContext;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;

import java.sql.SQLException;

/** Jdbc lock context. */
public class JdbcCatalogLockContext implements CatalogLockContext {
public class JdbcCatalogLockContext implements CatalogLockContext<JdbcClientPool> {

private final JdbcClientPool connections;
private JdbcClientPool connections;
private final boolean closeConnectionsUsed;
private final String catalogKey;
private final Options options;

public JdbcCatalogLockContext(JdbcClientPool connections, String catalogKey, Options options) {
this.connections = connections;
this.catalogKey = catalogKey;
public JdbcCatalogLockContext(Options options, boolean closeConnectionsUsed) {
this.options = options;
this.catalogKey = options.get(JdbcCatalogOptions.CATALOG_KEY);
this.closeConnectionsUsed = closeConnectionsUsed;
}

JdbcCatalogLockContext(
JdbcClientPool connections, Options options, boolean closeConnectionsUsed) {
this(options, closeConnectionsUsed);
this.connections = connections;
}

@Override
public Options options() {
return options;
}

public JdbcClientPool connections() {
return connections;
}

public String catalogKey() {
return catalogKey;
}

public boolean isCloseConnectionsUsed() {
return closeConnectionsUsed;
}

public JdbcClientPool clientPool() {
if (this.connections == null) {
this.connections =
new JdbcClientPool(
options.get(CatalogOptions.CLIENT_POOL_SIZE),
options.get(CatalogOptions.URI.key()),
options.toMap());
try {
JdbcUtils.createDistributedLockTable(connections, options);
} catch (SQLException e) {
throw new RuntimeException("Cannot initialize JDBC distributed lock.", e);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted in call to initialize", e);
}
}
return connections;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.jdbc;

import org.apache.paimon.catalog.CatalogLockContext;
import org.apache.paimon.catalog.CatalogLockContextFactory;
import org.apache.paimon.options.Options;

/** Factory for jdbc catalog lock context. */
public class JdbcCatalogLockContextFactory implements CatalogLockContextFactory {

@Override
public String identifier() {
return JdbcCatalogLockFactory.IDENTIFIER;
}

@Override
public CatalogLockContext createLockContext(Options lockOptions, boolean closeConnectionsUsed) {
return new JdbcCatalogLockContext(lockOptions, closeConnectionsUsed);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ public CatalogLock createLock(CatalogLockContext context) {
JdbcCatalogLockContext lockContext = (JdbcCatalogLockContext) context;
Map<String, String> optionsMap = lockContext.options().toMap();
return new JdbcCatalogLock(
lockContext.connections(),
lockContext.clientPool(),
lockContext.catalogKey(),
checkMaxSleep(optionsMap),
acquireTimeout(optionsMap));
acquireTimeout(optionsMap),
lockContext.isCloseConnectionsUsed());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@
org.apache.paimon.catalog.FileSystemCatalogFactory
org.apache.paimon.jdbc.JdbcCatalogFactory
org.apache.paimon.jdbc.JdbcCatalogLockFactory
org.apache.paimon.jdbc.JdbcCatalogLockContextFactory
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.catalog;

import org.apache.paimon.fs.Path;
import org.apache.paimon.jdbc.JdbcCatalog;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;

import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;

import org.junit.jupiter.api.BeforeEach;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link FileSystemCatalog}. */
public class FileSystemCatalogTest extends CatalogTestBase {

@BeforeEach
public void setUp() throws Exception {
super.setUp();
catalog = initCatalog(Maps.newHashMap());
}

private FileSystemCatalog initCatalog(Map<String, String> props) {
Map<String, String> properties = Maps.newHashMap();
properties.put(
AbstractCatalog.LOCK_PROP_PREFIX + CatalogOptions.URI.key(),
"jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", ""));

properties.put(
AbstractCatalog.LOCK_PROP_PREFIX + JdbcCatalog.PROPERTY_PREFIX + "username",
"user");
properties.put(
AbstractCatalog.LOCK_PROP_PREFIX + JdbcCatalog.PROPERTY_PREFIX + "password",
"password");
properties.put(CatalogOptions.WAREHOUSE.key(), warehouse);
properties.put(CatalogOptions.LOCK_ENABLED.key(), "true");
properties.put(CatalogOptions.LOCK_TYPE.key(), "jdbc");
properties.putAll(props);
FileSystemCatalog catalog =
new FileSystemCatalog(fileIO, new Path(warehouse), Options.fromMap(properties));
return catalog;
}

@Override
public void testListDatabasesWhenNoDatabases() {
List<String> databases = catalog.listDatabases();
assertThat(databases).isEqualTo(new ArrayList<>());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -689,14 +689,17 @@ public static Catalog createHiveCatalog(CatalogContext context) {
}

public static HiveConf createHiveConf(CatalogContext context) {
String uri = context.options().get(CatalogOptions.URI);
String hiveConfDir = context.options().get(HIVE_CONF_DIR);
String hadoopConfDir = context.options().get(HADOOP_CONF_DIR);
HiveConf hiveConf =
HiveCatalog.createHiveConf(hiveConfDir, hadoopConfDir, context.hadoopConf());
return createHiveConf(context.options(), context.hadoopConf());
}

public static HiveConf createHiveConf(Options options, Configuration hadoopConf) {
String uri = options.get(CatalogOptions.URI);
String hiveConfDir = options.get(HIVE_CONF_DIR);
String hadoopConfDir = options.get(HADOOP_CONF_DIR);
HiveConf hiveConf = HiveCatalog.createHiveConf(hiveConfDir, hadoopConfDir, hadoopConf);

// always using user-set parameters overwrite hive-site.xml parameters
context.options().toMap().forEach(hiveConf::set);
options.toMap().forEach(hiveConf::set);
if (uri != null) {
hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, uri);
}
Expand Down
Loading

0 comments on commit d2d4abc

Please sign in to comment.