Skip to content

Commit

Permalink
[#3562] Migrate to Quarkus JDBC implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
harism committed Oct 9, 2023
1 parent a3bd800 commit 4276c6f
Show file tree
Hide file tree
Showing 24 changed files with 594 additions and 141 deletions.
4 changes: 4 additions & 0 deletions services/base-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-agroal</artifactId>
</dependency>

<!-- testing -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*******************************************************************************
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*******************************************************************************/

package org.eclipse.hono.service.base.jdbc.client;

import java.util.concurrent.Callable;

import io.vertx.core.AsyncResult;

/**
* JDBC properties for the device store.
* @param <T> Return class type
*/
public class JdbcAsyncResult<T> implements AsyncResult<T> {
private T result;
private Throwable cause;
private boolean succeeded;

@Override
public T result() {
return result;
}

@Override
public Throwable cause() {
return cause;
}

@Override
public boolean succeeded() {
return succeeded;
}

@Override
public boolean failed() {
return !succeeded;
}

/**
* Static method to create AsyncResult instance.
* @param <S> Return class type
* @param func Execution function
* @return AsyncResult instance
*/
public static <S> AsyncResult<S> run(final Callable<S> func) {
final JdbcAsyncResult<S> result = new JdbcAsyncResult<S>();
try {
result.result = func.call();
result.succeeded = true;
} catch (Throwable t) {
result.succeeded = false;
result.cause = t;
}
return result;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*******************************************************************************
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*******************************************************************************/

package org.eclipse.hono.service.base.jdbc.client;

import io.agroal.api.AgroalDataSource;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonArray;
import io.vertx.ext.sql.ResultSet;

/**
* Quarkus AgroalDataSource based JDBC client.
*/
public class JdbcClient implements JdbcOperations {
private final AgroalDataSource dataSource;

/**
* JdbcClient constructor.
* @param dataSource Quarkus Argoal data source
*/
public JdbcClient(final AgroalDataSource dataSource) {
this.dataSource = dataSource;
}

/**
* Open connection asynchronously.
* @param handler Asynchronous listener
*/
public void getConnection(final Handler<AsyncResult<JdbcConnection>> handler) {
handler.handle(JdbcAsyncResult.run(() -> {
return new JdbcConnection(dataSource.getConnection());
}));
}

/**
* Close connection asynchronously.
* @param handler Asynchronous listener
*/
public void close(final Handler<AsyncResult<Void>> handler) {
handler.handle(JdbcAsyncResult.run(() -> {
dataSource.close();
return null;
}));
};

/**
* Close connection synchronously.
*/
public void close() {
dataSource.close();
}

@Override
public void queryWithParams(final String sql, final JsonArray jsonArray, final Handler<AsyncResult<ResultSet>> handler) {
final Promise<JdbcConnection> promise = Promise.promise();
getConnection(promise);
promise.future()
.compose(connection -> {
connection.queryWithParams(sql, jsonArray, handler);
connection.close();
return Future.succeededFuture();
});
}

@Override
public void querySingle(final String sql, final Handler<AsyncResult<@Nullable JsonArray>> handler) {
final Promise<JdbcConnection> promise = Promise.promise();
getConnection(promise);
promise.future()
.compose(connection -> {
connection.querySingle(sql, handler);
connection.close();
return Future.succeededFuture();
});
}

@Override
public void querySingleWithParams(final String sql, final JsonArray arguments, final Handler<AsyncResult<@Nullable JsonArray>> handler) {
final Promise<JdbcConnection> promise = Promise.promise();
getConnection(promise);
promise.future()
.compose(connection -> {
connection.querySingleWithParams(sql, arguments, handler);
connection.close();
return Future.succeededFuture();
});
}

@Override
public void updateWithParams(final String sql, final JsonArray jsonArray, final Handler<AsyncResult<Integer>> handler) {
final Promise<JdbcConnection> promise = Promise.promise();
getConnection(promise);
promise.future()
.compose(connection -> {
connection.updateWithParams(sql, jsonArray, handler);
connection.close();
return Future.succeededFuture();
});
}

@Override
public void call(final String sql, final Handler<AsyncResult<Void>> handler) {
final Promise<JdbcConnection> promise = Promise.promise();
getConnection(promise);
promise.future()
.compose(connection -> {
connection.call(sql, handler);
connection.close();
return Future.succeededFuture();
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*******************************************************************************
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*******************************************************************************/

package org.eclipse.hono.service.base.jdbc.client;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;

import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonArray;
import io.vertx.ext.sql.ResultSet;

/**
* Quarkus based SQL connection.
*/
public class JdbcConnection implements JdbcOperations {

private final Connection connection;

/**
* SQL connection constructor.
* @param connection SQL connection
*/
public JdbcConnection(final Connection connection) {
this.connection = connection;
}

/**
* Asynchronous auto commit setter.
* @param autoCommit True for auto commit
* @param handler Asynchronous operation listener
*/
public void setAutoCommit(final boolean autoCommit, final Handler<AsyncResult<Void>> handler) {
handler.handle(JdbcAsyncResult.run(() -> {
connection.setAutoCommit(autoCommit);
return null;
}));
}

/**
* Close the underlying connection.
* @throws RuntimeException Thrown if closing connection fails
*/
public void close() {
try {
connection.close();
} catch (SQLException ex) {
throw new RuntimeException(ex);
}
}

/**
* Commit asynchronously.
* @param handler Asynchronous commit handler
*/
public void commit(final Handler<AsyncResult<Void>> handler) {
handler.handle(JdbcAsyncResult.run(() -> {
connection.commit();
return null;
}));
}

/**
* Test.
* @param handler test
*/
public void rollback(final Handler<AsyncResult<Void>> handler) {
handler.handle(JdbcAsyncResult.run(() -> {
connection.rollback();
return null;
}));
}

@Override
public void queryWithParams(final String sql, final JsonArray jsonArray, final Handler<AsyncResult<ResultSet>> handler) {
handler.handle(JdbcAsyncResult.run(() -> {
final PreparedStatement stmt = this.connection.prepareStatement(sql);
for (int index = 0; index < jsonArray.size(); index++) {
stmt.setObject(index + 1, jsonArray.getValue(index));
}
final ResultSet rs = readResultSet(stmt.executeQuery());
stmt.close();
return rs;
}));
}

@Override
public void querySingle(final String sql, final Handler<AsyncResult<@Nullable JsonArray>> handler) {
handler.handle(JdbcAsyncResult.run(() -> {
final PreparedStatement stmt = this.connection.prepareStatement(sql);
final java.sql.ResultSet rs = stmt.executeQuery();
JsonArray result = null;
if (rs.next()) {
result = readRow(rs);
}
stmt.close();
return result;
}));
}

@Override
public void querySingleWithParams(final String sql, final JsonArray jsonArray, final Handler<AsyncResult<@Nullable JsonArray>> handler) {
handler.handle(JdbcAsyncResult.run(() -> {
final PreparedStatement stmt = this.connection.prepareStatement(sql);
for (int index = 0; index < jsonArray.size(); index++) {
stmt.setObject(index + 1, jsonArray.getValue(index));
}
final java.sql.ResultSet rs = stmt.executeQuery();
JsonArray result = null;
if (rs.next()) {
result = readRow(rs);
}
stmt.close();
return result;
}));
}

@Override
public void updateWithParams(final String sql, final JsonArray jsonArray, final Handler<AsyncResult<Integer>> handler) {
handler.handle(JdbcAsyncResult.run(() -> {
final PreparedStatement stmt = this.connection.prepareStatement(sql);
for (int index = 0; index < jsonArray.size(); index++) {
stmt.setObject(index + 1, jsonArray.getValue(index));
}
final int updated = stmt.executeUpdate();
stmt.close();
return updated;
}));
}

@Override
public void call(final String sql, final Handler<AsyncResult<Void>> handler) {
handler.handle(JdbcAsyncResult.run(() -> {
final PreparedStatement stmt = this.connection.prepareStatement(sql);
stmt.execute();
stmt.close();
return null;
}));
}

private JsonArray readRow(final java.sql.ResultSet rs) throws SQLException {
final int colCount = rs.getMetaData().getColumnCount();
final JsonArray result = new JsonArray();
for (int idx = 1; idx <= colCount; idx++) {
Object resultObj = rs.getObject(idx);
if (resultObj instanceof Timestamp ts) {
resultObj = ts.toLocalDateTime().toString();
}
result.add(resultObj);
}
return result;
}

private ResultSet readResultSet(final java.sql.ResultSet rs) throws SQLException {
final int colCount = rs.getMetaData().getColumnCount();
final List<String> colNames = new ArrayList<>();
final List<JsonArray> results = new ArrayList<>();

for (int idx = 1; idx <= colCount; ++idx) {
colNames.add(rs.getMetaData().getColumnName(idx));
}

while (rs.next()) {
results.add(readRow(rs));
}

final ResultSet rsOut = new ResultSet();
rsOut.setColumnNames(colNames);
rsOut.setResults(results);
return rsOut;
}

}
Loading

0 comments on commit 4276c6f

Please sign in to comment.