Skip to content

Commit

Permalink
[#3562] Migrate to Quarkus JDBC implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
harism committed Oct 14, 2023
1 parent 6757e40 commit 9085ff4
Show file tree
Hide file tree
Showing 19 changed files with 661 additions and 124 deletions.
6 changes: 5 additions & 1 deletion services/base-jdbc/pom.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2020, 2021 Contributors to the Eclipse Foundation
Copyright (c) 2020, 2023 Contributors to the Eclipse Foundation
See the NOTICE file(s) distributed with this work for additional
information regarding copyright ownership.
Expand Down Expand Up @@ -86,6 +86,10 @@
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-agroal</artifactId>
</dependency>

<!-- testing -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*******************************************************************************
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*******************************************************************************/

package org.eclipse.hono.service.base.jdbc.client;

import java.io.IOException;
import java.sql.Connection;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import io.agroal.api.AgroalDataSource;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonArray;
import io.vertx.ext.sql.ResultSet;

/**
* Quarkus Agroal DataSource based JDBC client.
*/
public class JdbcClient implements JdbcOperations {

private final Vertx vertx;
private final AgroalDataSource dataSource;
private final ExecutorService pool;
private final Queue<JdbcConnection> connections;
private boolean isClosed;

/**
* JdbcClient constructor.
* @param vertx Vert.x instance to use
* @param dataSource Quarkus Argoal data source
*/
public JdbcClient(final Vertx vertx, final AgroalDataSource dataSource) {
this.vertx = vertx;
this.dataSource = dataSource;
this.pool = Executors.newFixedThreadPool(1);
this.connections = new ConcurrentLinkedQueue<>();
this.isClosed = false;
}

@Override
public void close() {
this.isClosed = true;
for (JdbcConnection connection : this.connections) {
connection.close();
}
this.connections.clear();
this.dataSource.close();
this.pool.shutdown();
}

@Override
public void close(final Promise<Void> handler) {
this.vertx.executeBlocking(promise -> {
try {
close();
promise.complete();
} catch (Exception ex) {
promise.fail(ex);
}
}, handler);
};

/**
* Open connection asynchronously.
* @param handler Asynchronous listener
*/
public void getConnection(final Handler<AsyncResult<JdbcConnection>> handler) {
getConnectionFuture().onComplete(handler);
}

@Override
public void queryWithParams(final String sql, final JsonArray jsonArray, final Handler<AsyncResult<ResultSet>> handler) {
getConnectionFuture().compose(connection -> {
final Promise<ResultSet> promise = Promise.promise();
connection.queryWithParams(sql, jsonArray, promise);
return promise.future().onComplete(result -> {
final Promise<Void> close = Promise.promise();
connection.close(close);
close.future().onComplete(ignore -> {
this.connections.remove(connection);
});
});
}).onComplete(handler);
}

@Override
public void querySingle(final String sql, final Handler<AsyncResult<@Nullable JsonArray>> handler) {
getConnectionFuture().compose(connection -> {
final Promise<@Nullable JsonArray> promise = Promise.promise();
connection.querySingle(sql, promise);
return promise.future().onComplete(result -> {
final Promise<Void> close = Promise.promise();
connection.close(close);
close.future().onComplete(ignore -> {
this.connections.remove(connection);
});
});
}).onComplete(handler);
}

@Override
public void querySingleWithParams(final String sql, final JsonArray arguments, final Handler<AsyncResult<@Nullable JsonArray>> handler) {
getConnectionFuture().compose(connection -> {
final Promise<@Nullable JsonArray> promise = Promise.promise();
connection.querySingleWithParams(sql, arguments, promise);
return promise.future().onComplete(result -> {
final Promise<Void> close = Promise.promise();
connection.close(close);
close.future().onComplete(ignore -> {
this.connections.remove(connection);
});
});
}).onComplete(handler);
}

@Override
public void updateWithParams(final String sql, final JsonArray jsonArray, final Handler<AsyncResult<Integer>> handler) {
getConnectionFuture().compose(connection -> {
final Promise<Integer> promise = Promise.promise();
connection.updateWithParams(sql, jsonArray, promise);
return promise.future().onComplete(result -> {
final Promise<Void> close = Promise.promise();
connection.close(close);
close.future().onComplete(ignore -> {
this.connections.remove(connection);
});
});
}).onComplete(handler);
}

@Override
public void call(final String sql, final Handler<AsyncResult<Void>> handler) {
getConnectionFuture().compose(connection -> {
final Promise<Void> promise = Promise.promise();
connection.call(sql, promise);
return promise.future().onComplete(result -> {
final Promise<Void> close = Promise.promise();
connection.close(close);
close.future().onComplete(ignore -> {
this.connections.remove(connection);
});
});
}).onComplete(handler);
}

@Override
public void callWithParams(final String sql, final JsonArray jsonArray, final Handler<AsyncResult<Void>> handler) {
getConnectionFuture().compose(connection -> {
final Promise<Void> promise = Promise.promise();
connection.callWithParams(sql, jsonArray, promise);
return promise.future().onComplete(result -> {
final Promise<Void> close = Promise.promise();
connection.close(close);
close.future().onComplete(ignore -> {
this.connections.remove(connection);
});
});
}).onComplete(handler);
}

private Future<JdbcConnection> getConnectionFuture() {
if (this.isClosed) {
return Future.failedFuture(new IOException("Client is already closed"));
}
final Promise<JdbcConnection> promise = Promise.promise();
this.pool.execute(() -> {
try {
final Connection connection = this.dataSource.getConnection();
final JdbcConnection jdbcConnection = new JdbcConnection(this.vertx.getOrCreateContext(), connection);
connections.add(jdbcConnection);
promise.complete(jdbcConnection);
} catch (Exception ex) {
promise.fail(ex);
}
});
return promise.future();
}

}
Loading

0 comments on commit 9085ff4

Please sign in to comment.