diff --git a/services/base-jdbc/pom.xml b/services/base-jdbc/pom.xml index 7f5b001982..faee15f781 100644 --- a/services/base-jdbc/pom.xml +++ b/services/base-jdbc/pom.xml @@ -1,6 +1,6 @@ diff --git a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/client/JdbcClient.java b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/client/JdbcClient.java new file mode 100644 index 0000000000..f3a02d4e40 --- /dev/null +++ b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/client/JdbcClient.java @@ -0,0 +1,196 @@ +/******************************************************************************* + * Copyright (c) 2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + *******************************************************************************/ + +package org.eclipse.hono.service.base.jdbc.client; + +import java.io.IOException; +import java.sql.Connection; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import io.agroal.api.AgroalDataSource; +import io.vertx.codegen.annotations.Nullable; +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonArray; +import io.vertx.ext.sql.ResultSet; + +/** + * Quarkus Agroal DataSource based JDBC client. + */ +public class JdbcClient implements JdbcOperations { + + private final Vertx vertx; + private final AgroalDataSource dataSource; + private final ExecutorService pool; + private final Queue connections; + private boolean isClosed; + + /** + * JdbcClient constructor. + * @param vertx Vert.x instance to use + * @param dataSource Quarkus Argoal data source + */ + public JdbcClient(final Vertx vertx, final AgroalDataSource dataSource) { + this.vertx = vertx; + this.dataSource = dataSource; + this.pool = Executors.newFixedThreadPool(1); + this.connections = new ConcurrentLinkedQueue<>(); + this.isClosed = false; + } + + @Override + public void close() { + this.isClosed = true; + for (JdbcConnection connection : this.connections) { + connection.close(); + } + this.connections.clear(); + this.dataSource.close(); + this.pool.shutdown(); + } + + @Override + public void close(final Promise handler) { + this.vertx.executeBlocking(promise -> { + try { + close(); + promise.complete(); + } catch (Exception ex) { + promise.fail(ex); + } + }, handler); + }; + + /** + * Open connection asynchronously. + * @param handler Asynchronous listener + */ + public void getConnection(final Handler> handler) { + getConnectionFuture().onComplete(handler); + } + + @Override + public void queryWithParams(final String sql, final JsonArray jsonArray, final Handler> handler) { + getConnectionFuture().compose(connection -> { + final Promise promise = Promise.promise(); + connection.queryWithParams(sql, jsonArray, promise); + return promise.future().onComplete(result -> { + final Promise close = Promise.promise(); + connection.close(close); + close.future().onComplete(ignore -> { + this.connections.remove(connection); + }); + }); + }).onComplete(handler); + } + + @Override + public void querySingle(final String sql, final Handler> handler) { + getConnectionFuture().compose(connection -> { + final Promise<@Nullable JsonArray> promise = Promise.promise(); + connection.querySingle(sql, promise); + return promise.future().onComplete(result -> { + final Promise close = Promise.promise(); + connection.close(close); + close.future().onComplete(ignore -> { + this.connections.remove(connection); + }); + }); + }).onComplete(handler); + } + + @Override + public void querySingleWithParams(final String sql, final JsonArray arguments, final Handler> handler) { + getConnectionFuture().compose(connection -> { + final Promise<@Nullable JsonArray> promise = Promise.promise(); + connection.querySingleWithParams(sql, arguments, promise); + return promise.future().onComplete(result -> { + final Promise close = Promise.promise(); + connection.close(close); + close.future().onComplete(ignore -> { + this.connections.remove(connection); + }); + }); + }).onComplete(handler); + } + + @Override + public void updateWithParams(final String sql, final JsonArray jsonArray, final Handler> handler) { + getConnectionFuture().compose(connection -> { + final Promise promise = Promise.promise(); + connection.updateWithParams(sql, jsonArray, promise); + return promise.future().onComplete(result -> { + final Promise close = Promise.promise(); + connection.close(close); + close.future().onComplete(ignore -> { + this.connections.remove(connection); + }); + }); + }).onComplete(handler); + } + + @Override + public void call(final String sql, final Handler> handler) { + getConnectionFuture().compose(connection -> { + final Promise promise = Promise.promise(); + connection.call(sql, promise); + return promise.future().onComplete(result -> { + final Promise close = Promise.promise(); + connection.close(close); + close.future().onComplete(ignore -> { + this.connections.remove(connection); + }); + }); + }).onComplete(handler); + } + + @Override + public void callWithParams(final String sql, final JsonArray jsonArray, final Handler> handler) { + getConnectionFuture().compose(connection -> { + final Promise promise = Promise.promise(); + connection.callWithParams(sql, jsonArray, promise); + return promise.future().onComplete(result -> { + final Promise close = Promise.promise(); + connection.close(close); + close.future().onComplete(ignore -> { + this.connections.remove(connection); + }); + }); + }).onComplete(handler); + } + + private Future getConnectionFuture() { + if (this.isClosed) { + return Future.failedFuture(new IOException("Client is already closed")); + } + final Promise promise = Promise.promise(); + this.pool.execute(() -> { + try { + final Connection connection = this.dataSource.getConnection(); + final JdbcConnection jdbcConnection = new JdbcConnection(this.vertx.getOrCreateContext(), connection); + connections.add(jdbcConnection); + promise.complete(jdbcConnection); + } catch (Exception ex) { + promise.fail(ex); + } + }); + return promise.future(); + } + +} diff --git a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/client/JdbcConnection.java b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/client/JdbcConnection.java new file mode 100644 index 0000000000..85c8d165e6 --- /dev/null +++ b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/client/JdbcConnection.java @@ -0,0 +1,222 @@ +/******************************************************************************* + * Copyright (c) 2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + *******************************************************************************/ + +package org.eclipse.hono.service.base.jdbc.client; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; + +import io.vertx.codegen.annotations.Nullable; +import io.vertx.core.AsyncResult; +import io.vertx.core.Context; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.json.JsonArray; +import io.vertx.ext.sql.ResultSet; + +/** + * Quarkus based SQL connection. + */ +public class JdbcConnection implements JdbcOperations { + + private final Context ctx; + private final Connection connection; + + /** + * SQL connection constructor. + * @param ctx Context instance to use + * @param connection JDBC connection + */ + protected JdbcConnection(final Context ctx, final Connection connection) { + this.ctx = ctx; + this.connection = connection; + } + + @Override + public void close() { + try { + this.connection.close(); + } catch (SQLException ex) { + } + } + + @Override + public void close(final Promise handler) { + schedule(() -> { + close(); + return (Void) null; + }).onComplete(handler); + } + + /** + * Asynchronous auto commit setter. + * @param autoCommit True for auto commit + * @param handler Asynchronous operation listener + */ + public void setAutoCommit(final boolean autoCommit, final Handler> handler) { + schedule(() -> { + this.connection.setAutoCommit(autoCommit); + return (Void) null; + }).onComplete(handler); + } + + /** + * Commit asynchronously. + * @param handler Asynchronous commit handler + */ + public void commit(final Handler> handler) { + schedule(() -> { + this.connection.commit(); + return (Void) null; + }).onComplete(handler); + } + + /** + * Test. + * @param handler test + */ + public void rollback(final Handler> handler) { + schedule(() -> { + this.connection.rollback(); + return (Void) null; + }).onComplete(handler); + } + + @Override + public void queryWithParams(final String sql, final JsonArray jsonArray, final Handler> handler) { + schedule(() -> { + final PreparedStatement stmt = this.connection.prepareStatement(sql); + for (int index = 0; index < jsonArray.size(); index++) { + stmt.setObject(index + 1, jsonArray.getValue(index)); + } + return readResultSet(stmt.executeQuery()); + }).onComplete(handler); + } + + @Override + public void querySingle(final String sql, final Handler> handler) { + schedule(() -> { + final PreparedStatement stmt = this.connection.prepareStatement(sql); + final java.sql.ResultSet rs = stmt.executeQuery(); + JsonArray result = null; + if (rs.next()) { + result = readRow(rs); + } + return result; + }).onComplete(handler); + } + + @Override + public void querySingleWithParams(final String sql, final JsonArray jsonArray, final Handler> handler) { + schedule(() -> { + final PreparedStatement stmt = this.connection.prepareStatement(sql); + for (int index = 0; index < jsonArray.size(); index++) { + stmt.setObject(index + 1, jsonArray.getValue(index)); + } + final java.sql.ResultSet rs = stmt.executeQuery(); + JsonArray result = null; + if (rs.next()) { + result = readRow(rs); + } + return result; + }).onComplete(handler); + } + + @Override + public void updateWithParams(final String sql, final JsonArray jsonArray, final Handler> handler) { + schedule(() -> { + final PreparedStatement stmt = this.connection.prepareStatement(sql); + for (int index = 0; index < jsonArray.size(); index++) { + stmt.setObject(index + 1, jsonArray.getValue(index)); + } + return stmt.executeUpdate(); + }).onComplete(handler); + } + + @Override + public void call(final String sql, final Handler> handler) { + schedule(() -> { + final PreparedStatement stmt = this.connection.prepareStatement(sql); + stmt.execute(); + return (Void) null; + }).onComplete(handler); + } + + @Override + public void callWithParams(final String sql, final JsonArray jsonArray, final Handler> handler) { + schedule(() -> { + final PreparedStatement stmt = this.connection.prepareStatement(sql); + for (int index = 0; index < jsonArray.size(); index++) { + stmt.setObject(index + 1, jsonArray.getValue(index)); + } + stmt.execute(); + return (Void) null; + }).onComplete(handler); + } + + private Future schedule(final Callable func) { + final Promise output = Promise.promise(); + this.ctx.executeBlocking(promise -> { + try { + if (this.connection.isClosed()) { + promise.fail("Connection is already closed"); + } else { + final T result = func.call(); + promise.complete(result); + } + } catch (Exception ex) { + promise.fail(ex); + } + }, output); + return output.future(); + } + + private JsonArray readRow(final java.sql.ResultSet rs) throws SQLException { + final int colCount = rs.getMetaData().getColumnCount(); + final JsonArray result = new JsonArray(); + for (int idx = 1; idx <= colCount; idx++) { + Object resultObj = rs.getObject(idx); + if (resultObj instanceof Timestamp ts) { + resultObj = ts.toLocalDateTime().toString(); + } + result.add(resultObj); + } + return result; + } + + private ResultSet readResultSet(final java.sql.ResultSet rs) throws SQLException { + final int colCount = rs.getMetaData().getColumnCount(); + final List colNames = new ArrayList<>(); + final List results = new ArrayList<>(); + + for (int idx = 1; idx <= colCount; ++idx) { + colNames.add(rs.getMetaData().getColumnName(idx)); + } + + while (rs.next()) { + results.add(readRow(rs)); + } + + final ResultSet rsOut = new ResultSet(); + rsOut.setColumnNames(colNames); + rsOut.setResults(results); + return rsOut; + } + +} diff --git a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/client/JdbcOperations.java b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/client/JdbcOperations.java new file mode 100644 index 0000000000..21750e6d0e --- /dev/null +++ b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/client/JdbcOperations.java @@ -0,0 +1,75 @@ +/******************************************************************************* + * Copyright (c) 2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + *******************************************************************************/ + +package org.eclipse.hono.service.base.jdbc.client; + +import java.io.Closeable; + +import io.vertx.codegen.annotations.Nullable; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.json.JsonArray; +import io.vertx.ext.sql.ResultSet; + +/** + * SQL operations for JDBC. + */ +public interface JdbcOperations extends Closeable, io.vertx.core.Closeable { + + /** + * Test. + * @param sql test + * @param jsonArray test + * @param handler test + */ + void queryWithParams(String sql, JsonArray jsonArray, Handler> handler); + + /** + * Test. + * @param sql test + * @param handler test + */ + void querySingle(String sql, Handler> handler); + + /** + * Test. + * @param sql test + * @param arguments test + * @param handler test + */ + void querySingleWithParams(String sql, JsonArray arguments, Handler> handler); + + /** + * Test. + * @param sql test + * @param jsonArray test + * @param handler test + */ + void updateWithParams(String sql, JsonArray jsonArray, Handler> handler); + + /** + * Test. + * @param sql test + * @param handler test + */ + void call(String sql, Handler> handler); + + /** + * Test. + * @param sql test + * @param jsonArray test + * @param handler test + */ + void callWithParams(String sql, JsonArray jsonArray, Handler> handler); + +} diff --git a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/config/JdbcProperties.java b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/config/JdbcProperties.java index 4f0b067fa5..1e6a14e38f 100644 --- a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/config/JdbcProperties.java +++ b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/config/JdbcProperties.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2020, 2022 Contributors to the Eclipse Foundation + * Copyright (c) 2020, 2023 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -13,17 +13,27 @@ package org.eclipse.hono.service.base.jdbc.config; +import java.sql.SQLException; +import java.time.Duration; import java.util.Objects; +import java.util.Optional; +import org.eclipse.hono.service.base.jdbc.client.JdbcClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude.Include; +import io.agroal.api.AgroalDataSource; +import io.agroal.api.configuration.AgroalDataSourceConfiguration; +import io.agroal.api.configuration.supplier.AgroalConnectionFactoryConfigurationSupplier; +import io.agroal.api.configuration.supplier.AgroalConnectionPoolConfigurationSupplier; +import io.agroal.api.configuration.supplier.AgroalDataSourceConfigurationSupplier; +import io.agroal.api.security.NamePrincipal; +import io.agroal.api.security.SimplePassword; import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; -import io.vertx.ext.jdbc.JDBCClient; /** * Configuration properties for a JDBC service. @@ -108,36 +118,45 @@ public void setTableName(final String tableName) { /** * Creates a JDBC client for configuration properties. * - * @param vertx The vertx instance to use. + * @param vertx Vert.x instance * @param dataSourceProperties The properties. * @return The client. + * @throws RuntimeException Throws exception if data source create fails */ - public static JDBCClient dataSource(final Vertx vertx, final JdbcProperties dataSourceProperties) { - - final JsonObject config = new JsonObject() + public static JdbcClient dataSource(final Vertx vertx, final JdbcProperties dataSourceProperties) { + final String config = new JsonObject() .put("url", dataSourceProperties.getUrl()) - .put("user", dataSourceProperties.getUsername()); - - // password is added later, after logging - - if (dataSourceProperties.getDriverClass() != null) { - config.put("driver_class", dataSourceProperties.getDriverClass()); - } - if (dataSourceProperties.getMaximumPoolSize() != null) { - config.put("max_pool_size", dataSourceProperties.getMaximumPoolSize()); - } + .put("user", dataSourceProperties.getUsername()) + .put("driver_class", dataSourceProperties.getDriverClass()) + .put("max_pool_size", dataSourceProperties.getMaximumPoolSize()) + .toString(); log.info("Creating new SQL client: {} - table: {}", config, dataSourceProperties.getTableName()); - // put password after logging - - config - .put("password", dataSourceProperties.getPassword()); - - // create new client - - return JDBCClient.create(vertx, config); - + final var factoryConfigSupplier = new AgroalConnectionFactoryConfigurationSupplier() + .jdbcUrl(dataSourceProperties.getUrl()) + .connectionProviderClassName(dataSourceProperties.getDriverClass()); + Optional.ofNullable(dataSourceProperties.getUsername()) + .ifPresent(name -> factoryConfigSupplier.principal(new NamePrincipal(name))); + Optional.ofNullable(dataSourceProperties.getPassword()) + .ifPresent(pass -> factoryConfigSupplier.credential(new SimplePassword(pass))); + + // maximumPoolSize must be set + final var poolConfigSupplier = new AgroalConnectionPoolConfigurationSupplier() + .maxSize(Optional.ofNullable(dataSourceProperties.getMaximumPoolSize()).orElse(4)) + .connectionFactoryConfiguration(factoryConfigSupplier) + .acquisitionTimeout(Duration.ofSeconds(30)); + + final var dataSourceConfig = new AgroalDataSourceConfigurationSupplier() + .dataSourceImplementation(AgroalDataSourceConfiguration.DataSourceImplementation.AGROAL) + .connectionPoolConfiguration(poolConfigSupplier.get()) + .get(); + + try { + return new JdbcClient(vertx, AgroalDataSource.from(dataSourceConfig)); + } catch (SQLException ex) { + throw new RuntimeException(ex); + } } } diff --git a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/AbstractStore.java b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/AbstractStore.java index 7207672941..74c2aa6b72 100644 --- a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/AbstractStore.java +++ b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/AbstractStore.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2020, 2022 Contributors to the Eclipse Foundation + * Copyright (c) 2020, 2023 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -20,6 +20,7 @@ import java.util.function.Function; import org.eclipse.hono.service.HealthCheckProvider; +import org.eclipse.hono.service.base.jdbc.client.JdbcClient; import org.eclipse.hono.service.base.jdbc.store.Statement.ExpandedStatement; import org.eclipse.hono.tracing.TracingHelper; @@ -31,9 +32,7 @@ import io.vertx.core.Future; import io.vertx.ext.healthchecks.HealthCheckHandler; import io.vertx.ext.healthchecks.Status; -import io.vertx.ext.jdbc.JDBCClient; import io.vertx.ext.sql.ResultSet; -import io.vertx.ext.sql.UpdateResult; /** * An abstract JDBC based data store. @@ -45,7 +44,7 @@ public abstract class AbstractStore implements HealthCheckProvider, AutoCloseabl */ public static final String DEFAULT_CHECK_SQL = "SELECT 1"; - private final JDBCClient client; + private final JdbcClient client; private final Tracer tracer; private final ExpandedStatement checkSql; @@ -58,7 +57,7 @@ public abstract class AbstractStore implements HealthCheckProvider, AutoCloseabl * @param checkSql An optional SQL statement, which will be used to check if the connection to the * database is OK. It this value is empty, the default statement {@value #DEFAULT_CHECK_SQL} will be used. */ - public AbstractStore(final JDBCClient client, final Tracer tracer, final Optional checkSql) { + public AbstractStore(final JdbcClient client, final Tracer tracer, final Optional checkSql) { this.client = Objects.requireNonNull(client); this.tracer = Objects.requireNonNull(tracer); this.checkSql = checkSql.orElseGet(() -> Statement.statement(DEFAULT_CHECK_SQL)).expand(); @@ -77,7 +76,7 @@ public void registerLivenessChecks(final HealthCheckHandler livenessHandler) { public void registerReadinessChecks(final HealthCheckHandler readinessHandler) { readinessHandler.register("sql", Duration.ofSeconds(10).toMillis(), p -> { this.checkSql - .query(this.client) + .call(this.client) .onComplete(ar -> { if (ar.succeeded()) { p.tryComplete(Status.OK()); @@ -113,8 +112,8 @@ public void registerReadinessChecks(final HealthCheckHandler readinessHandler) { * broken optimistic lock, and a failed future will be returned. Otherwise it is considered * an "object not found" condition. */ - protected Future checkOptimisticLock( - final Future result, + protected Future checkOptimisticLock( + final Future result, final Span span, final Optional resourceVersion, final Function> reader) { @@ -130,11 +129,11 @@ protected Future checkOptimisticLock( span.log(ImmutableMap.builder() .put("event", "check update result") - .put("update_count", r.getUpdated()) + .put("update_count", r) .build()); // if we updated something ... - if (r.getUpdated() != 0) { + if (r != 0) { // ... then the optimistic lock held return Future.succeededFuture(r); } diff --git a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/SQL.java b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/SQL.java index 6efc2d71cc..b3666c91fc 100644 --- a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/SQL.java +++ b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/SQL.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2020, 2022 Contributors to the Eclipse Foundation + * Copyright (c) 2020, 2023 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -23,6 +23,8 @@ import java.util.function.BiFunction; import java.util.function.Consumer; +import org.eclipse.hono.service.base.jdbc.client.JdbcClient; +import org.eclipse.hono.service.base.jdbc.client.JdbcConnection; import org.eclipse.hono.tracing.TracingHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,8 +40,6 @@ import io.opentracing.tag.Tags; import io.vertx.core.Future; import io.vertx.core.Promise; -import io.vertx.ext.sql.SQLClient; -import io.vertx.ext.sql.SQLConnection; /** * SQL helper methods. @@ -93,7 +93,7 @@ public static Future translateException(final Throwable e) { * @param state The auto-commit state. * @return A future for tracking the outcome. */ - public static Future setAutoCommit(final Tracer tracer, final SpanContext context, final SQLConnection connection, final boolean state) { + public static Future setAutoCommit(final Tracer tracer, final SpanContext context, final JdbcConnection connection, final boolean state) { final Span span = startSqlSpan(tracer, context, "set autocommit", builder -> { builder.withTag("db.autocommit", state); }); @@ -110,7 +110,7 @@ public static Future setAutoCommit(final Tracer tracer, final Spa * @param connection The database connection to work on. * @return A future for tracking the outcome. */ - public static Future commit(final Tracer tracer, final SpanContext context, final SQLConnection connection) { + public static Future commit(final Tracer tracer, final SpanContext context, final JdbcConnection connection) { final Span span = startSqlSpan(tracer, context, "commit", null); final Promise promise = Promise.promise(); connection.commit(promise); @@ -125,7 +125,7 @@ public static Future commit(final Tracer tracer, final SpanContex * @param connection The database connection to work on. * @return A future for tracking the outcome. */ - public static Future rollback(final Tracer tracer, final SpanContext context, final SQLConnection connection) { + public static Future rollback(final Tracer tracer, final SpanContext context, final JdbcConnection connection) { final Span span = startSqlSpan(tracer, context, "rollback", null); final Promise promise = Promise.promise(); connection.rollback(promise); @@ -286,12 +286,12 @@ public static boolean hasCauseOf(final Throwable e, final * @param The type of the result. * @return A future, tracking the outcome of the operation. */ - public static Future runTransactionally(final SQLClient client, final Tracer tracer, final SpanContext context, final BiFunction> function) { + public static Future runTransactionally(final JdbcClient client, final Tracer tracer, final SpanContext context, final BiFunction> function) { final Span span = startSqlSpan(tracer, context, "run transactionally", builder -> { }); - final Promise promise = Promise.promise(); + final Promise promise = Promise.promise(); client.getConnection(promise); return promise.future() diff --git a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/Statement.java b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/Statement.java index c312d33c49..5c8e842959 100644 --- a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/Statement.java +++ b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/Statement.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2020 Contributors to the Eclipse Foundation + * Copyright (c) 2020, 2023 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -28,6 +28,8 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.eclipse.hono.service.base.jdbc.client.JdbcOperations; + import com.google.common.base.MoreObjects; import com.google.errorprone.annotations.FormatMethod; import com.google.errorprone.annotations.FormatString; @@ -42,8 +44,6 @@ import io.vertx.core.Promise; import io.vertx.core.json.JsonArray; import io.vertx.ext.sql.ResultSet; -import io.vertx.ext.sql.SQLOperations; -import io.vertx.ext.sql.UpdateResult; /** * An SQL statement, which can map named parameters to positional parameters. @@ -285,7 +285,7 @@ public Span startSqlSpan() { * @param connection The connection to work on. * @return A future tracking the query result. */ - public Future query(final SQLOperations connection) { + public Future query(final JdbcOperations connection) { final Span sqlSpan = startSqlSpan(); return SQL.finishSpan(run(connection::queryWithParams), sqlSpan, (r, log) -> { log.put("rows", r.getNumRows()); @@ -297,10 +297,22 @@ public Future query(final SQLOperations connection) { * @param connection The connection to work on. * @return A future tracking the update result. */ - public Future update(final SQLOperations connection) { + public Future update(final JdbcOperations connection) { final Span sqlSpan = startSqlSpan(); return SQL.finishSpan(run(connection::updateWithParams), sqlSpan, (r, log) -> { - log.put("rows", r.getUpdated()); + log.put("rows", r); + }); + } + + /** + * Execute this statement as a call. + * @param connection The connection to work on. + * @return A future tracking the call result. + */ + public Future call(final JdbcOperations connection) { + final Span sqlSpan = startSqlSpan(); + return SQL.finishSpan(run(connection::callWithParams), sqlSpan, (r, log) -> { + log.put("rows", r); }); } diff --git a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/device/AbstractDeviceStore.java b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/device/AbstractDeviceStore.java index 91363f447b..794dbb04e0 100644 --- a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/device/AbstractDeviceStore.java +++ b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/device/AbstractDeviceStore.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2020, 2022 Contributors to the Eclipse Foundation + * Copyright (c) 2020, 2023 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -16,6 +16,8 @@ import java.util.Optional; import org.eclipse.hono.deviceregistry.service.device.DeviceKey; +import org.eclipse.hono.service.base.jdbc.client.JdbcClient; +import org.eclipse.hono.service.base.jdbc.client.JdbcOperations; import org.eclipse.hono.service.base.jdbc.store.AbstractStore; import org.eclipse.hono.service.base.jdbc.store.Statement; import org.eclipse.hono.service.base.jdbc.store.StatementConfiguration; @@ -26,9 +28,7 @@ import io.opentracing.SpanContext; import io.opentracing.Tracer; import io.vertx.core.Future; -import io.vertx.ext.jdbc.JDBCClient; import io.vertx.ext.sql.ResultSet; -import io.vertx.ext.sql.SQLOperations; /** * An abstract base for implementing a device registration store. @@ -37,7 +37,7 @@ public abstract class AbstractDeviceStore extends AbstractStore { private static final Logger log = LoggerFactory.getLogger(AbstractDeviceStore.class); - protected final JDBCClient client; + protected final JdbcClient client; protected final Tracer tracer; private final Statement readRegistrationStatement; @@ -49,7 +49,7 @@ public abstract class AbstractDeviceStore extends AbstractStore { * @param tracer The tracer to use. * @param cfg The SQL statement configuration. */ - public AbstractDeviceStore(final JDBCClient client, final Tracer tracer, final StatementConfiguration cfg) { + public AbstractDeviceStore(final JdbcClient client, final Tracer tracer, final StatementConfiguration cfg) { super(client, tracer, cfg.getStatement("checkConnection")); this.client = client; @@ -75,7 +75,7 @@ public AbstractDeviceStore(final JDBCClient client, final Tracer tracer, final S * * @return The future, tracking the outcome of the operation. */ - protected Future readDevice(final SQLOperations operations, final DeviceKey key, final Span span) { + protected Future readDevice(final JdbcOperations operations, final DeviceKey key, final Span span) { return read(operations, key, this.readRegistrationStatement, span.context()); } @@ -93,7 +93,7 @@ protected Future readDevice(final SQLOperations operations, final Dev * * @return The future, tracking the outcome of the operation. */ - protected Future read(final SQLOperations operations, final DeviceKey key, final Statement statement, final SpanContext spanContext) { + protected Future read(final JdbcOperations operations, final DeviceKey key, final Statement statement, final SpanContext spanContext) { return read(operations, key, Optional.empty(), statement, spanContext); } @@ -112,7 +112,7 @@ protected Future read(final SQLOperations operations, final DeviceKey * * @return The future, tracking the outcome of the operation. */ - protected Future read(final SQLOperations operations, final DeviceKey key, final Optional resourceVersion, final Statement statement, final SpanContext spanContext) { + protected Future read(final JdbcOperations operations, final DeviceKey key, final Optional resourceVersion, final Statement statement, final SpanContext spanContext) { final var expanded = statement.expand(params -> { params.put("tenant_id", key.getTenantId()); @@ -124,7 +124,7 @@ protected Future read(final SQLOperations operations, final DeviceKey return expanded .trace(this.tracer, spanContext) - .query(this.client); + .query(operations); } diff --git a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/device/TableAdapterStore.java b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/device/TableAdapterStore.java index 54ddff0b67..1aec67c374 100644 --- a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/device/TableAdapterStore.java +++ b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/device/TableAdapterStore.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2020, 2022 Contributors to the Eclipse Foundation + * Copyright (c) 2020, 2023 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -23,6 +23,8 @@ import org.eclipse.hono.deviceregistry.service.credentials.CredentialKey; import org.eclipse.hono.deviceregistry.service.device.DeviceKey; +import org.eclipse.hono.service.base.jdbc.client.JdbcClient; +import org.eclipse.hono.service.base.jdbc.client.JdbcOperations; import org.eclipse.hono.service.base.jdbc.store.SQL; import org.eclipse.hono.service.base.jdbc.store.Statement; import org.eclipse.hono.service.base.jdbc.store.StatementConfiguration; @@ -37,7 +39,6 @@ import io.opentracing.Tracer; import io.vertx.core.Future; import io.vertx.core.json.Json; -import io.vertx.ext.jdbc.JDBCClient; import io.vertx.ext.sql.ResultSet; /** @@ -59,7 +60,7 @@ public class TableAdapterStore extends AbstractDeviceStore { * @param cfg The SQL statement configuration. * @param dialect Database type, from the JDBC URL scheme */ - public TableAdapterStore(final JDBCClient client, final Tracer tracer, final StatementConfiguration cfg, final String dialect) { + public TableAdapterStore(final JdbcClient client, final Tracer tracer, final StatementConfiguration cfg, final String dialect) { super(client, tracer, cfg); this.dialect = dialect; cfg.dump(log); @@ -81,13 +82,13 @@ public TableAdapterStore(final JDBCClient client, final Tracer tracer, final Sta /** - * Read a device using {@link #readDevice(io.vertx.ext.sql.SQLOperations, DeviceKey, Span)} and the + * Read a device using {@link #readDevice(JdbcOperations, DeviceKey, Span)} and the * current SQL client. * * @param key The key of the device to read. * @param span The span to contribute to. * - * @return The result from {@link #readDevice(io.vertx.ext.sql.SQLOperations, DeviceKey, Span)}. + * @return The result from {@link #readDevice(JdbcOperations, DeviceKey, Span)}. */ protected Future readDevice(final DeviceKey key, final Span span) { return readDevice(this.client, key, span); @@ -97,7 +98,7 @@ protected Future readDevice(final DeviceKey key, final Span span) { * Reads the device data. *

* This reads the device data using - * {@link #readDevice(io.vertx.ext.sql.SQLOperations, DeviceKey, Span)} and + * {@link #readDevice(JdbcOperations, DeviceKey, Span)} and * transforms the plain result into a {@link DeviceReadResult}. *

* If now rows where found, the result will be empty. If more than one row is found, diff --git a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/device/TableManagementStore.java b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/device/TableManagementStore.java index e1a27ded42..67734c0ab7 100644 --- a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/device/TableManagementStore.java +++ b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/device/TableManagementStore.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2020, 2022 Contributors to the Eclipse Foundation + * Copyright (c) 2020, 2023 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -31,6 +31,9 @@ import org.eclipse.hono.deviceregistry.service.device.DeviceKey; import org.eclipse.hono.deviceregistry.util.DeviceRegistryUtils; import org.eclipse.hono.deviceregistry.util.Versioned; +import org.eclipse.hono.service.base.jdbc.client.JdbcClient; +import org.eclipse.hono.service.base.jdbc.client.JdbcConnection; +import org.eclipse.hono.service.base.jdbc.client.JdbcOperations; import org.eclipse.hono.service.base.jdbc.store.EntityNotFoundException; import org.eclipse.hono.service.base.jdbc.store.OptimisticLockingException; import org.eclipse.hono.service.base.jdbc.store.SQL; @@ -57,10 +60,7 @@ import io.vertx.core.Promise; import io.vertx.core.json.Json; import io.vertx.core.json.JsonObject; -import io.vertx.ext.jdbc.JDBCClient; import io.vertx.ext.sql.ResultSet; -import io.vertx.ext.sql.SQLConnection; -import io.vertx.ext.sql.UpdateResult; /** * A data store for devices and credentials, based on a table data model. @@ -110,7 +110,7 @@ public class TableManagementStore extends AbstractDeviceStore { * @param tracer The tracer to use. * @param cfg The SQL statement configuration. */ - public TableManagementStore(final JDBCClient client, final Tracer tracer, final StatementConfiguration cfg) { + public TableManagementStore(final JdbcClient client, final Tracer tracer, final StatementConfiguration cfg) { super(client, tracer, cfg); cfg.dump(log); @@ -229,9 +229,9 @@ public TableManagementStore(final JDBCClient client, final Tracer tracer, final PAGE_OFFSET); } - private static Future checkUpdateOutcome(final UpdateResult updateResult) { + private static Future checkUpdateOutcome(final Integer updateResult) { - if (updateResult.getUpdated() < 0) { + if (updateResult < 0) { // conflict log.debug("Optimistic lock broke"); return Future.failedFuture(new OptimisticLockingException()); @@ -282,7 +282,7 @@ private static Future extractVersionForUpdate(final ResultSet device, fi * @param span The span to contribute to. * @return A future tracking the outcome of the operation. */ - protected Future readDeviceForUpdate(final SQLConnection connection, final DeviceKey key, final SpanContext span) { + protected Future readDeviceForUpdate(final JdbcConnection connection, final DeviceKey key, final SpanContext span) { return read(connection, key, Optional.empty(), this.readForUpdateStatement, span); } @@ -337,14 +337,14 @@ public Future> createDevice( log.debug("createDevice - statement: {}", expanded); - return getDeviceCount(key.getTenantId(), span.context(), this.countDevicesOfTenantStatement, null, null) + return getDeviceCount(connection, key.getTenantId(), span.context(), this.countDevicesOfTenantStatement, null, null) .compose(currentDeviceCount -> tenant.checkDeviceLimitReached( key.getTenantId(), currentDeviceCount, globalDevicesPerTenantLimit)) .compose(ok -> expanded .trace(this.tracer, context) - .update(this.client) + .update(connection) .recover(SQL::translateException)) .compose(x -> createGroups(connection, key, new HashSet<>(device.getMemberOf()), context)); @@ -357,7 +357,7 @@ public Future> createDevice( } private Future createGroups( - final SQLConnection connection, + final JdbcConnection connection, final DeviceKey key, final Set memberOf, final SpanContext context) { @@ -383,7 +383,7 @@ private Future createGroups( } - private Future deleteGroups(final SQLConnection connection, + private Future deleteGroups(final JdbcConnection connection, final DeviceKey key, final SpanContext context) { @@ -426,7 +426,7 @@ private Future deleteGroups(final SQLConnection connection, * @param span The span to contribute to. * @return A future, tracking the outcome of the operation. */ - protected Future updateJsonField( + protected Future updateJsonField( final DeviceKey key, final Statement statement, final String jsonValue, @@ -538,7 +538,7 @@ public Future> updateDevice( * Reads the device data. *

* This reads the device data using - * {@link #readDevice(io.vertx.ext.sql.SQLOperations, DeviceKey, Span)} and + * {@link #readDevice(JdbcOperations, DeviceKey, Span)} and * transforms the plain result into a {@link DeviceReadResult}. *

* If now rows where found, the result will be empty. If more than one row is found, @@ -590,7 +590,7 @@ public Future> readDevice(final DeviceKey key, final * @param spanContext The span to contribute to. * @return A future, tracking the outcome of the operation. */ - public Future deleteDevice( + public Future deleteDevice( final DeviceKey key, final Optional resourceVersion, final SpanContext spanContext) { @@ -636,7 +636,7 @@ public Future deleteDevice( * @param spanContext The span to contribute to. * @return A future tracking the outcome of the operation. */ - public Future dropTenant(final String tenantId, final SpanContext spanContext) { + public Future dropTenant(final String tenantId, final SpanContext spanContext) { final Span span = TracingHelper.buildChildSpan(this.tracer, spanContext, "drop tenant", getClass().getSimpleName()) .withTag(TracingHelper.TAG_TENANT_ID, tenantId) @@ -658,6 +658,7 @@ public Future dropTenant(final String tenantId, final SpanContext /** * Gets the number of devices that are registered for a tenant. * + * @param connection The connection to use for query. * @param tenantId The tenant to count devices for. * @param spanContext The span to contribute to. * @param countStatement The count statement to use. @@ -666,7 +667,7 @@ public Future dropTenant(final String tenantId, final SpanContext * @return A future tracking the outcome of the operation. * @throws NullPointerException if tenant is {@code null}. */ - public Future getDeviceCount(final String tenantId, final SpanContext spanContext, final Statement countStatement, final String field, final String value) { + public Future getDeviceCount(final JdbcOperations connection, final String tenantId, final SpanContext spanContext, final Statement countStatement, final String field, final String value) { Objects.requireNonNull(tenantId); @@ -684,7 +685,7 @@ public Future getDeviceCount(final String tenantId, final SpanContext s return expanded .trace(this.tracer, span.context()) - .query(this.client) + .query(connection) .map(r -> { final var entries = r.getRows(true); switch (entries.size()) { @@ -819,7 +820,7 @@ public Future> setCredentials( private Future getCredentialsDto( final DeviceKey key, - final SQLConnection connection, + final JdbcOperations connection, final Span span) { return readCredentialsStatement @@ -874,7 +875,7 @@ public Future> getCredentials(final DeviceKey ke map.put(DEVICE_ID, key.getDeviceId()); }); - final Promise promise = Promise.promise(); + final Promise promise = Promise.promise(); this.client.getConnection(promise); return promise.future() @@ -958,7 +959,7 @@ public Future> findDevices(final String tenantId, fin .withTag(TracingHelper.TAG_TENANT_ID, tenantId) .start(); - final Future deviceCountFuture = getDeviceCount(tenantId, span.context(), countStatement, field, value); + final Future deviceCountFuture = getDeviceCount(this.client, tenantId, span.context(), countStatement, field, value); return deviceCountFuture .compose(count -> expanded.trace(this.tracer, span.context()).query(this.client)) diff --git a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/tenant/AbstractTenantStore.java b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/tenant/AbstractTenantStore.java index 5ff99ccc34..73481029ca 100644 --- a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/tenant/AbstractTenantStore.java +++ b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/tenant/AbstractTenantStore.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2020, 2022 Contributors to the Eclipse Foundation + * Copyright (c) 2020, 2023 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -18,6 +18,8 @@ import java.util.Optional; import java.util.stream.Collectors; +import org.eclipse.hono.service.base.jdbc.client.JdbcClient; +import org.eclipse.hono.service.base.jdbc.client.JdbcOperations; import org.eclipse.hono.service.base.jdbc.store.AbstractStore; import org.eclipse.hono.service.base.jdbc.store.SQL; import org.eclipse.hono.service.base.jdbc.store.Statement; @@ -35,9 +37,7 @@ import io.vertx.core.Future; import io.vertx.core.json.Json; import io.vertx.core.json.JsonObject; -import io.vertx.ext.jdbc.JDBCClient; import io.vertx.ext.sql.ResultSet; -import io.vertx.ext.sql.SQLOperations; /** * A data store for tenant information. @@ -49,7 +49,7 @@ public abstract class AbstractTenantStore extends AbstractStore { private static final Logger log = LoggerFactory.getLogger(AbstractTenantStore.class); - protected final JDBCClient client; + protected final JdbcClient client; protected final Tracer tracer; private final Statement readStatement; @@ -62,7 +62,7 @@ public abstract class AbstractTenantStore extends AbstractStore { * @param tracer The tracer to use. * @param cfg The statement configuration to use. */ - public AbstractTenantStore(final JDBCClient client, final Tracer tracer, final StatementConfiguration cfg) { + public AbstractTenantStore(final JdbcClient client, final Tracer tracer, final StatementConfiguration cfg) { super(client, tracer, cfg.getStatement("checkConnection")); cfg.dump(log); @@ -123,7 +123,7 @@ public Future> readTenant(final String id, final Span * @param spanContext The span to contribute to. * @return A future, tracking the outcome of the operation. */ - protected Future> readTenantBy(final SQLOperations operations, final ExpandedStatement expanded, final SpanContext spanContext) { + protected Future> readTenantBy(final JdbcOperations operations, final ExpandedStatement expanded, final SpanContext spanContext) { final Span span = TracingHelper.buildChildSpan(this.tracer, spanContext, "read tenant by", getClass().getSimpleName()) .start(); @@ -182,7 +182,7 @@ protected Future> readTenantBy(final SQLOperations op * @param spanContext The span to contribute to. * @return A future, tracking the outcome of the operation. */ - protected Future readTenantEntryById(final SQLOperations operations, final String id, final SpanContext spanContext) { + protected Future readTenantEntryById(final JdbcOperations operations, final String id, final SpanContext spanContext) { final Span span = TracingHelper.buildChildSpan(this.tracer, spanContext, "read tenant entry", getClass().getSimpleName()) .withTag(TracingHelper.TAG_TENANT_ID, id) @@ -209,7 +209,7 @@ protected Future readTenantEntryById(final SQLOperations operations, * @param spanContext The span to contribute to. * @return A future, tracking the outcome of the operation. */ - protected Future readTenantTrustAnchors(final SQLOperations operations, final String id, final SpanContext spanContext) { + protected Future readTenantTrustAnchors(final JdbcOperations operations, final String id, final SpanContext spanContext) { final Span span = TracingHelper.buildChildSpan(this.tracer, spanContext, "populate trust anchors", getClass().getSimpleName()) .withTag(TracingHelper.TAG_TENANT_ID, id) @@ -237,7 +237,7 @@ protected Future readTenantTrustAnchors(final SQLOperations operation * @return The future, tracking the outcome of the operation. */ protected Future fillTrustAnchors( - final SQLOperations operations, + final JdbcOperations operations, final TenantReadResult tenant, final SpanContext spanContext ) { diff --git a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/tenant/AdapterStore.java b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/tenant/AdapterStore.java index c86f58b427..cc4bde80f8 100644 --- a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/tenant/AdapterStore.java +++ b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/tenant/AdapterStore.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2020, 2022 Contributors to the Eclipse Foundation + * Copyright (c) 2020, 2023 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -17,6 +17,7 @@ import java.util.Optional; import org.eclipse.hono.deviceregistry.util.DeviceRegistryUtils; +import org.eclipse.hono.service.base.jdbc.client.JdbcClient; import org.eclipse.hono.service.base.jdbc.store.SQL; import org.eclipse.hono.service.base.jdbc.store.Statement; import org.eclipse.hono.service.base.jdbc.store.StatementConfiguration; @@ -27,8 +28,6 @@ import io.opentracing.Tracer; import io.vertx.core.Future; import io.vertx.core.json.JsonObject; -import io.vertx.ext.jdbc.JDBCClient; - /** * A data store for tenant information. @@ -44,7 +43,7 @@ public class AdapterStore extends AbstractTenantStore { * @param tracer The tracer to use. * @param cfg The statement configuration to use. */ - public AdapterStore(final JDBCClient client, final Tracer tracer, final StatementConfiguration cfg) { + public AdapterStore(final JdbcClient client, final Tracer tracer, final StatementConfiguration cfg) { super(client, tracer, cfg); this.readByTrustAnchorStatement = cfg diff --git a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/tenant/ManagementStore.java b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/tenant/ManagementStore.java index 5d2c4d8f6b..28853a97ad 100644 --- a/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/tenant/ManagementStore.java +++ b/services/base-jdbc/src/main/java/org/eclipse/hono/service/base/jdbc/store/tenant/ManagementStore.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2020, 2022 Contributors to the Eclipse Foundation + * Copyright (c) 2020, 2023 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -22,6 +22,9 @@ import org.eclipse.hono.client.ClientErrorException; import org.eclipse.hono.deviceregistry.util.Versioned; +import org.eclipse.hono.service.base.jdbc.client.JdbcClient; +import org.eclipse.hono.service.base.jdbc.client.JdbcConnection; +import org.eclipse.hono.service.base.jdbc.client.JdbcOperations; import org.eclipse.hono.service.base.jdbc.store.EntityNotFoundException; import org.eclipse.hono.service.base.jdbc.store.SQL; import org.eclipse.hono.service.base.jdbc.store.Statement; @@ -42,10 +45,6 @@ import io.vertx.core.Promise; import io.vertx.core.json.Json; import io.vertx.core.json.JsonObject; -import io.vertx.ext.jdbc.JDBCClient; -import io.vertx.ext.sql.SQLConnection; -import io.vertx.ext.sql.SQLOperations; -import io.vertx.ext.sql.UpdateResult; /** * A data store for tenant management information. @@ -76,7 +75,7 @@ public class ManagementStore extends AbstractTenantStore { * @param tracer The tracer to use. * @param cfg The statement configuration to use. */ - public ManagementStore(final JDBCClient client, final Tracer tracer, final StatementConfiguration cfg) { + public ManagementStore(final JdbcClient client, final Tracer tracer, final StatementConfiguration cfg) { super(client, tracer, cfg); @@ -221,7 +220,7 @@ public Future> create(final String tenantId, final Tenant tenant log.debug("create - statement: {}", expanded); return expanded .trace(this.tracer, span.context()) - .update(this.client) + .update(connection) .recover(SQL::translateException) // insert all trust anchors @@ -235,7 +234,7 @@ public Future> create(final String tenantId, final Tenant tenant } - private Future deleteAllTrustAnchors(final SQLConnection connection, final String tenantId, final Span span) { + private Future deleteAllTrustAnchors(final JdbcConnection connection, final String tenantId, final Span span) { return this.deleteAllTrustAnchorsStatement @@ -249,7 +248,7 @@ private Future deleteAllTrustAnchors(final SQLConnection connection, final } - private Future insertAllTrustAnchors(final SQLConnection connection, final String tenantId, final Tenant tenant, final Span span) { + private Future insertAllTrustAnchors(final JdbcConnection connection, final String tenantId, final Tenant tenant, final Span span) { if (tenant.getTrustedCertificateAuthorities() == null || tenant.getTrustedCertificateAuthorities().isEmpty()) { return Future.succeededFuture(); @@ -311,7 +310,7 @@ public Future> read(final String id, final SpanContex * @param spanContext The span to contribute to. * @return The future, tracking the outcome of the operation. */ - public Future delete(final String tenantId, final Optional resourceVersion, final SpanContext spanContext) { + public Future delete(final String tenantId, final Optional resourceVersion, final SpanContext spanContext) { final Span span = TracingHelper.buildChildSpan(this.tracer, spanContext, "delete tenant", getClass().getSimpleName()) .withTag(TracingHelper.TAG_TENANT_ID, tenantId) @@ -385,7 +384,7 @@ public Future> update(final String tenantId, final Tenant tenant // check if the entity was found .flatMap(r -> { - if (r.getUpdated() <= 0) { + if (r <= 0) { return Future.failedFuture(new EntityNotFoundException()); } else { return Future.succeededFuture(); @@ -424,8 +423,8 @@ public Future> update(final String tenantId, final Tenant tenant * @param span The span to contribute to. * @return The future, tracking the outcome of the operation. */ - protected Future updateJsonField( - final SQLOperations operations, + protected Future updateJsonField( + final JdbcOperations operations, final String tenantId, final Statement statement, final String jsonValue, @@ -445,13 +444,13 @@ protected Future updateJsonField( // execute update final var result = expanded .trace(this.tracer, span.context()) - .update(this.client); + .update(operations); // process result, check optimistic lock return checkOptimisticLock( result, span, resourceVersion, - checkSpan -> readTenantEntryById(this.client, tenantId, checkSpan.context())); + checkSpan -> readTenantEntryById(operations, tenantId, checkSpan.context())); } /** diff --git a/services/device-registry-jdbc/pom.xml b/services/device-registry-jdbc/pom.xml index 990c2d1c62..b101dac617 100644 --- a/services/device-registry-jdbc/pom.xml +++ b/services/device-registry-jdbc/pom.xml @@ -1,6 +1,6 @@ diff --git a/services/device-registry-jdbc/src/main/java/org/eclipse/hono/deviceregistry/jdbc/impl/ClasspathSchemaCreator.java b/services/device-registry-jdbc/src/main/java/org/eclipse/hono/deviceregistry/jdbc/impl/ClasspathSchemaCreator.java index 4182e4d25b..a52cd46750 100644 --- a/services/device-registry-jdbc/src/main/java/org/eclipse/hono/deviceregistry/jdbc/impl/ClasspathSchemaCreator.java +++ b/services/device-registry-jdbc/src/main/java/org/eclipse/hono/deviceregistry/jdbc/impl/ClasspathSchemaCreator.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Contributors to the Eclipse Foundation + * Copyright (c) 2021, 2023 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -17,6 +17,7 @@ import java.util.Optional; import org.eclipse.hono.deviceregistry.jdbc.config.SchemaCreator; +import org.eclipse.hono.service.base.jdbc.client.JdbcClient; import org.eclipse.hono.service.base.jdbc.config.JdbcProperties; import org.eclipse.hono.service.base.jdbc.store.SQL; import org.eclipse.hono.service.base.jdbc.store.Statement; @@ -31,7 +32,6 @@ import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; -import io.vertx.ext.jdbc.JDBCClient; /** * Create the expected database schema if it does not exist from SQL script bundled with the classpath. @@ -102,7 +102,7 @@ private Future loadAndRunScript(final JdbcProperties jdbcProperties, final private Future runScript(final JdbcProperties jdbcProperties, final String script, final SpanContext ctx) { - final JDBCClient jdbcClient = JdbcProperties.dataSource(vertx, jdbcProperties); + final JdbcClient jdbcClient = JdbcProperties.dataSource(vertx, jdbcProperties); final Promise clientCloseTracker = Promise.promise(); SQL.runTransactionally(jdbcClient, tracer, ctx, @@ -112,7 +112,7 @@ private Future runScript(final JdbcProperties jdbcProperties, final String .map(stmt -> { log.debug("creating database schema in [{}] using script: {}", jdbcProperties.getUrl(), stmt); return stmt - .query(jdbcClient) + .call(connection) .recover(SQL::translateException); }) .orElseGet(() -> { diff --git a/services/device-registry-jdbc/src/main/java/org/eclipse/hono/deviceregistry/jdbc/impl/DeviceManagementServiceImpl.java b/services/device-registry-jdbc/src/main/java/org/eclipse/hono/deviceregistry/jdbc/impl/DeviceManagementServiceImpl.java index aa04d2b163..5b0a4c6aa5 100644 --- a/services/device-registry-jdbc/src/main/java/org/eclipse/hono/deviceregistry/jdbc/impl/DeviceManagementServiceImpl.java +++ b/services/device-registry-jdbc/src/main/java/org/eclipse/hono/deviceregistry/jdbc/impl/DeviceManagementServiceImpl.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2020, 2022 Contributors to the Eclipse Foundation + * Copyright (c) 2020, 2023 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -122,7 +122,7 @@ protected Future> processDeleteDevice(final DeviceKey key, final Op return this.store .deleteDevice(key, resourceVersion, span.context()) .map(r -> { - if (r.getUpdated() <= 0) { + if (r <= 0) { throw new ClientErrorException( key.getTenantId(), HttpURLConnection.HTTP_NOT_FOUND, diff --git a/services/device-registry-jdbc/src/main/java/org/eclipse/hono/deviceregistry/jdbc/impl/TenantManagementServiceImpl.java b/services/device-registry-jdbc/src/main/java/org/eclipse/hono/deviceregistry/jdbc/impl/TenantManagementServiceImpl.java index 0a22d8ec49..6694847715 100644 --- a/services/device-registry-jdbc/src/main/java/org/eclipse/hono/deviceregistry/jdbc/impl/TenantManagementServiceImpl.java +++ b/services/device-registry-jdbc/src/main/java/org/eclipse/hono/deviceregistry/jdbc/impl/TenantManagementServiceImpl.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2020, 2022 Contributors to the Eclipse Foundation + * Copyright (c) 2020, 2023 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -118,7 +118,7 @@ protected Future> processDeleteTenant(final String tenantId, final .delete(tenantId, resourceVersion, span.context()) .map(r -> { - if (r.getUpdated() <= 0) { + if (r <= 0) { throw new ClientErrorException( tenantId, HttpURLConnection.HTTP_NOT_FOUND, diff --git a/services/device-registry-jdbc/src/main/resources/application.properties b/services/device-registry-jdbc/src/main/resources/application.properties index 35eb4801a1..18e3f11774 100644 --- a/services/device-registry-jdbc/src/main/resources/application.properties +++ b/services/device-registry-jdbc/src/main/resources/application.properties @@ -2,4 +2,6 @@ ${quarkus.application.properties} quarkus.jackson.accept-case-insensitive-enums=true # fail deserialization of JSON objects sent by clients if they contain unexpected content quarkus.jackson.fail-on-unknown-properties=true - +# enable h2 and postgres extensions +quarkus.datasource.h2.db-kind=h2 +quarkus.datasource.pg.db-kind=pg