Skip to content

Commit

Permalink
Merge pull request #157 from oracle/update-1.3.0
Browse files Browse the repository at this point in the history
Updates for Release 1.3.0
  • Loading branch information
jeandelavarene authored Nov 12, 2024
2 parents bc7e1a6 + ad2d023 commit 7182da7
Show file tree
Hide file tree
Showing 16 changed files with 514 additions and 235 deletions.
199 changes: 87 additions & 112 deletions README.md

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

<groupId>com.oracle.database.r2dbc</groupId>
<artifactId>oracle-r2dbc</artifactId>
<version>1.2.0</version>
<version>1.3.0</version>
<name>oracle-r2dbc</name>
<description>
Oracle R2DBC Driver implementing version 1.0.0 of the R2DBC SPI for Oracle Database.
Expand Down Expand Up @@ -65,9 +65,9 @@

<properties>
<java.version>11</java.version>
<ojdbc.version>23.4.0.24.05</ojdbc.version>
<ojdbc.version>23.6.0.24.10</ojdbc.version>
<r2dbc.version>1.0.0.RELEASE</r2dbc.version>
<reactor.version>3.5.11</reactor.version>
<reactor.version>3.6.11</reactor.version>
<reactive-streams.version>1.0.3</reactive-streams.version>
<junit.version>5.9.1</junit.version>
<spring-jdbc.version>5.3.19</spring-jdbc.version>
Expand Down
14 changes: 8 additions & 6 deletions src/main/java/oracle/r2dbc/impl/OracleBatchImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@
*/
final class OracleBatchImpl implements Batch {

/** The OracleConnectionImpl that created this Batch */
private final OracleConnectionImpl r2dbcConnection;

/** Adapts Oracle JDBC Driver APIs into Reactive Streams APIs */
private final ReactiveJdbcAdapter adapter;

Expand Down Expand Up @@ -83,12 +86,11 @@ final class OracleBatchImpl implements Batch {
* @param jdbcConnection JDBC connection to an Oracle Database. Not null.
* @param adapter Adapts JDBC calls into reactive streams. Not null.
*/
OracleBatchImpl(
Duration timeout, Connection jdbcConnection, ReactiveJdbcAdapter adapter) {
OracleBatchImpl(Duration timeout, OracleConnectionImpl r2dbcConnection) {
this.timeout = timeout;
this.jdbcConnection =
requireNonNull(jdbcConnection, "jdbcConnection is null");
this.adapter = requireNonNull(adapter, "adapter is null");
this.r2dbcConnection = r2dbcConnection;
this.jdbcConnection = r2dbcConnection.jdbcConnection();
this.adapter = r2dbcConnection.adapter();
}

/**
Expand All @@ -103,7 +105,7 @@ public Batch add(String sql) {
requireOpenConnection(jdbcConnection);
requireNonNull(sql, "sql is null");
statements.add(
new OracleStatementImpl(sql, timeout, jdbcConnection, adapter));
new OracleStatementImpl(sql, timeout, r2dbcConnection));
return this;
}

Expand Down
82 changes: 77 additions & 5 deletions src/main/java/oracle/r2dbc/impl/OracleConnectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,17 @@
import java.sql.SQLException;
import java.sql.Savepoint;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import static io.r2dbc.spi.IsolationLevel.READ_COMMITTED;
import static io.r2dbc.spi.IsolationLevel.SERIALIZABLE;
import static io.r2dbc.spi.TransactionDefinition.ISOLATION_LEVEL;
import static io.r2dbc.spi.TransactionDefinition.LOCK_WAIT_TIMEOUT;
import static io.r2dbc.spi.TransactionDefinition.NAME;
import static io.r2dbc.spi.TransactionDefinition.READ_ONLY;
import static oracle.r2dbc.impl.OracleR2dbcExceptions.requireNonNull;
import static oracle.r2dbc.impl.OracleR2dbcExceptions.fromJdbc;
import static oracle.r2dbc.impl.OracleR2dbcExceptions.requireNonNull;
import static oracle.r2dbc.impl.OracleR2dbcExceptions.requireOpenConnection;
import static oracle.r2dbc.impl.OracleR2dbcExceptions.runJdbc;
import static oracle.r2dbc.impl.OracleR2dbcExceptions.toR2dbcException;
Expand Down Expand Up @@ -126,6 +128,12 @@ final class OracleConnectionImpl implements Connection, Lifecycle {
*/
private TransactionDefinition currentTransaction = null;

/**
* A set of tasks that must complete before the {@link #jdbcConnection} is
* closed. The tasks are executed by subscribing to a Publisher.
*/
private final Set<Publisher<?>> closeTasks = ConcurrentHashMap.newKeySet();

/**
* Constructs a new connection that uses the specified {@code adapter} to
* perform database operations with the specified {@code jdbcConnection}.
Expand Down Expand Up @@ -369,7 +377,52 @@ else if (isReadOnly == null && name == null) {
*/
@Override
public Publisher<Void> close() {
return adapter.publishClose(jdbcConnection);

Publisher<Void> closeTasksPublisher = Mono.defer(() -> {
Publisher<?>[] closeTasksArray = closeTasks.toArray(Publisher<?>[]::new);
closeTasks.clear();

return Flux.concatDelayError(closeTasksArray).then();
});

return Flux.concatDelayError(
closeTasksPublisher,
adapter.publishClose(jdbcConnection));
}

/**
* <p>
* Adds a publisher that must be subscribed to and must terminate before
* closing the JDBC connection. This method can be used to ensure that certain
* tasks are completed before the {@link #jdbcConnection()} is closed and
* becomes unusable.
* </p><p>
* The publisher returned by this method emits the same result as the
* publisher passed into this method. However, when the returned publisher
* terminates, it will also remove any reference to the publisher that was
* passed into this method. <i>If the returned publisher is never subscribed
* to, then the reference will not be cleared until the connection is
* closed!</i> So, this method should only be used in cases where the user is
* responsible for subscribing to the returned publisher, in the same way they
* would be responsible for calling close() on an AutoCloseable. If the user
* is not responsible for subscribing, then there is no reasonable way for
* them to reduce the number object references that this method will create;
* They would either need to close this connection, or subscribe to a
* publisher when there is no obligation to do so.
* </p>
*
* @param publisher Publisher that must be subscribed to before closing the
* JDBC connection. Not null.
*
* @return A publisher that emits the same result as the publisher passed into
* this method, and clears any reference to it when terminated. Not null.
*/
<T> Publisher<T> addCloseTask(Publisher<T> publisher) {
closeTasks.add(publisher);

return Publishers.concatTerminal(
publisher,
Mono.fromRunnable(() -> closeTasks.remove(publisher)));
}

/**
Expand Down Expand Up @@ -417,7 +470,7 @@ public Publisher<Void> commitTransaction() {
@Override
public Batch createBatch() {
requireOpenConnection(jdbcConnection);
return new OracleBatchImpl(statementTimeout, jdbcConnection, adapter);
return new OracleBatchImpl(statementTimeout, this);
}

/**
Expand All @@ -441,8 +494,7 @@ public Batch createBatch() {
public Statement createStatement(String sql) {
requireNonNull(sql, "sql is null");
requireOpenConnection(jdbcConnection);
return new OracleStatementImpl(
sql, statementTimeout, jdbcConnection, adapter);
return new OracleStatementImpl(sql, statementTimeout, this);
}

/**
Expand Down Expand Up @@ -826,4 +878,24 @@ public Publisher<Void> preRelease() {
});
}

/**
* Returns the JDBC connection that this R2DBC connection executes database
* calls with.
*
* @return The JDBC connection which backs this R2DBC connection. Not null.
*/
java.sql.Connection jdbcConnection() {
return jdbcConnection;
}

/**
* Returns the adapter that adapts the asynchronous API of the
* {@link #jdbcConnection()} that backs this R2DBC connection.
*
* @return The JDBC connection that backs this R2DBC connection. Not null.
*/
ReactiveJdbcAdapter adapter() {
return adapter;
}

}
94 changes: 50 additions & 44 deletions src/main/java/oracle/r2dbc/impl/OracleReadableImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import oracle.sql.INTERVALYM;
import oracle.sql.TIMESTAMPLTZ;
import oracle.sql.TIMESTAMPTZ;
import org.reactivestreams.Publisher;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -93,6 +94,9 @@
class OracleReadableImpl implements io.r2dbc.spi.Readable {


/** The R2DBC connection that created this readable */
private final OracleConnectionImpl r2dbcConnection;

/** The JDBC connection that created this readable */
private final java.sql.Connection jdbcConnection;

Expand All @@ -117,21 +121,21 @@ class OracleReadableImpl implements io.r2dbc.spi.Readable {
* {@code jdbcReadable} and obtains metadata of the values from
* {@code resultMetadata}.
* </p>
* @param jdbcConnection JDBC connection that created the
* @param r2dbcConnection R2DBC connection that created the
* {@code jdbcReadable}. Not null.
* @param jdbcReadable Readable values from a JDBC Driver. Not null.
* @param readablesMetadata Metadata of each value. Not null.
* @param adapter Adapts JDBC calls into reactive streams. Not null.
*/
private OracleReadableImpl(
java.sql.Connection jdbcConnection, DependentCounter dependentCounter,
JdbcReadable jdbcReadable, ReadablesMetadata<?> readablesMetadata,
ReactiveJdbcAdapter adapter) {
this.jdbcConnection = jdbcConnection;
OracleConnectionImpl r2dbcConnection, DependentCounter dependentCounter,
JdbcReadable jdbcReadable, ReadablesMetadata<?> readablesMetadata) {
this.r2dbcConnection = r2dbcConnection;
this.jdbcConnection = r2dbcConnection.jdbcConnection();
this.dependentCounter = dependentCounter;
this.jdbcReadable = jdbcReadable;
this.readablesMetadata = readablesMetadata;
this.adapter = adapter;
this.adapter = r2dbcConnection.adapter();
}

/**
Expand All @@ -151,11 +155,10 @@ private OracleReadableImpl(
* {@code metadata}. Not null.
*/
static Row createRow(
java.sql.Connection jdbcConnection, DependentCounter dependentCounter,
JdbcReadable jdbcReadable, RowMetadataImpl metadata,
ReactiveJdbcAdapter adapter) {
OracleConnectionImpl r2dbcConnection, DependentCounter dependentCounter,
JdbcReadable jdbcReadable, RowMetadataImpl metadata) {
return new RowImpl(
jdbcConnection, dependentCounter, jdbcReadable, metadata, adapter);
r2dbcConnection, dependentCounter, jdbcReadable, metadata);
}

/**
Expand All @@ -164,7 +167,7 @@ static Row createRow(
* the provided {@code jdbcReadable} and {@code rowMetadata}. The metadata
* object is used to determine the default type mapping of column values.
* </p>
* @param jdbcConnection JDBC connection that created the
* @param r2dbcConnection R2DBC connection that created the
* {@code jdbcReadable}. Not null.
* @param dependentCounter Counter that is increased for each dependent
* {@code Result} created by the returned {@code OutParameters}
Expand All @@ -175,11 +178,10 @@ static Row createRow(
* {@code metadata}. Not null.
*/
static OutParameters createOutParameters(
java.sql.Connection jdbcConnection, DependentCounter dependentCounter,
JdbcReadable jdbcReadable, OutParametersMetadataImpl metadata,
ReactiveJdbcAdapter adapter) {
OracleConnectionImpl r2dbcConnection, DependentCounter dependentCounter,
JdbcReadable jdbcReadable, OutParametersMetadataImpl metadata) {
return new OutParametersImpl(
jdbcConnection, dependentCounter, jdbcReadable, metadata, adapter);
r2dbcConnection, dependentCounter, jdbcReadable, metadata);
}

/**
Expand Down Expand Up @@ -335,11 +337,16 @@ private ByteBuffer getByteBuffer(int index) {
*/
private Blob getBlob(int index) {
java.sql.Blob jdbcBlob = jdbcReadable.getObject(index, java.sql.Blob.class);
return jdbcBlob == null
? null
: OracleLargeObjects.createBlob(
adapter.publishBlobRead(jdbcBlob),
adapter.publishBlobFree(jdbcBlob));

if (jdbcBlob == null)
return null;

Publisher<Void> freePublisher =
r2dbcConnection.addCloseTask(adapter.publishBlobFree(jdbcBlob));

return OracleLargeObjects.createBlob(
adapter.publishBlobRead(jdbcBlob),
freePublisher);
}

/**
Expand Down Expand Up @@ -367,11 +374,15 @@ private Clob getClob(int index) {
jdbcClob = jdbcReadable.getObject(index, java.sql.Clob.class);
}

return jdbcClob == null
? null
: OracleLargeObjects.createClob(
adapter.publishClobRead(jdbcClob),
adapter.publishClobFree(jdbcClob));
if (jdbcClob == null)
return null;

Publisher<Void> freePublisher =
r2dbcConnection.addCloseTask(adapter.publishClobFree(jdbcClob));

return OracleLargeObjects.createClob(
adapter.publishClobRead(jdbcClob),
freePublisher);
}

/**
Expand Down Expand Up @@ -685,11 +696,10 @@ private OracleR2dbcObjectImpl getOracleR2dbcObject(int index) {
return null;

return new OracleR2dbcObjectImpl(
jdbcConnection,
r2dbcConnection,
dependentCounter,
new StructJdbcReadable(oracleStruct),
ReadablesMetadata.createAttributeMetadata(oracleStruct),
adapter);
ReadablesMetadata.createAttributeMetadata(oracleStruct));
}

/**
Expand Down Expand Up @@ -956,7 +966,7 @@ private Result getResult(int index) {

dependentCounter.increment();
return OracleResultImpl.createQueryResult(
dependentCounter, resultSet, adapter);
r2dbcConnection, dependentCounter, resultSet);
}

/**
Expand Down Expand Up @@ -994,17 +1004,16 @@ private static final class RowImpl
* {@code jdbcReadable}, and uses the specified {@code rowMetadata} to
* determine the default type mapping of column values.
* </p>
* @param jdbcConnection JDBC connection that created the
* @param r2dbcConnection R2DBC connection that created the
* {@code jdbcReadable}. Not null.
* @param jdbcReadable Row data from the Oracle JDBC Driver. Not null.
* @param metadata Meta-data for the specified row. Not null.
* @param adapter Adapts JDBC calls into reactive streams. Not null.
*/
private RowImpl(
java.sql.Connection jdbcConnection, DependentCounter dependentCounter,
JdbcReadable jdbcReadable, RowMetadataImpl metadata,
ReactiveJdbcAdapter adapter) {
super(jdbcConnection, dependentCounter, jdbcReadable, metadata, adapter);
OracleConnectionImpl r2dbcConnection, DependentCounter dependentCounter,
JdbcReadable jdbcReadable, RowMetadataImpl metadata) {
super(r2dbcConnection, dependentCounter, jdbcReadable, metadata);
this.metadata = metadata;
}

Expand Down Expand Up @@ -1044,10 +1053,9 @@ private static final class OutParametersImpl
* @param adapter Adapts JDBC calls into reactive streams. Not null.
*/
private OutParametersImpl(
java.sql.Connection jdbcConnection, DependentCounter dependentCounter,
JdbcReadable jdbcReadable, OutParametersMetadataImpl metadata,
ReactiveJdbcAdapter adapter) {
super(jdbcConnection, dependentCounter, jdbcReadable, metadata, adapter);
OracleConnectionImpl r2dbcConnection, DependentCounter dependentCounter,
JdbcReadable jdbcReadable, OutParametersMetadataImpl metadata) {
super(r2dbcConnection, dependentCounter, jdbcReadable, metadata);
this.metadata = metadata;
}

Expand All @@ -1068,20 +1076,18 @@ private final class OracleR2dbcObjectImpl
* {@code jdbcReadable} and obtains metadata of the values from
* {@code outParametersMetaData}.
* </p>
* @param jdbcConnection JDBC connection that created the
* @param r2dbcConnection R2DBC connection that created the
* {@code jdbcReadable}. Not null.
* @param structJdbcReadable Readable values from a JDBC Driver. Not null.
* @param metadata Metadata of each value. Not null.
* @param adapter Adapts JDBC calls into reactive streams. Not null.
*/
private OracleR2dbcObjectImpl(
java.sql.Connection jdbcConnection,
OracleConnectionImpl r2dbcConnection,
DependentCounter dependentCounter,
StructJdbcReadable structJdbcReadable,
OracleR2dbcObjectMetadataImpl metadata,
ReactiveJdbcAdapter adapter) {
super(
jdbcConnection, dependentCounter, structJdbcReadable, metadata, adapter);
OracleR2dbcObjectMetadataImpl metadata) {
super(r2dbcConnection, dependentCounter, structJdbcReadable, metadata);
this.metadata = metadata;
}

Expand Down
Loading

0 comments on commit 7182da7

Please sign in to comment.