Skip to content

Commit

Permalink
Implemented Connection#setStatementTimeout
Browse files Browse the repository at this point in the history
Motivation:
Previously, the `Connection#setStatementTimeout` method did not perform any operation (NO-OP).

Modification:
Successfully implemented the functionality for `Connection#setStatementTimeout`.

Result:
The `Connection#setStatementTimeout` method is now fully operational and functional.

Co-authored-by: jchrys <45776091+jchrys@users.noreply.github.com>
  • Loading branch information
Shamon Hashmi and jchrys committed Jan 18, 2024
1 parent 8752f66 commit 77d7600
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 2 deletions.
28 changes: 26 additions & 2 deletions src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.IsolationLevel;
import io.r2dbc.spi.Lifecycle;
import io.r2dbc.spi.R2dbcNonTransientResourceException;
import io.r2dbc.spi.TransactionDefinition;
import io.r2dbc.spi.ValidationDepth;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -74,6 +75,10 @@ public final class MySqlConnection implements Connection, Lifecycle, ConnectionS

private static final ServerVersion MYSQL_8 = ServerVersion.create(8, 0, 0);

private static final ServerVersion MYSQL_5_7_4 = ServerVersion.create(5, 7, 4);

private static final ServerVersion MARIA_10_2_0 = ServerVersion.create(10, 2, 0, true);

private static final BiConsumer<ServerMessage, SynchronousSink<Boolean>> PING = (message, sink) -> {
if (message instanceof ErrorMessage) {
ErrorMessage msg = (ErrorMessage) message;
Expand Down Expand Up @@ -396,9 +401,28 @@ public Mono<Void> setLockWaitTimeout(Duration timeout) {
@Override
public Mono<Void> setStatementTimeout(Duration timeout) {
requireNonNull(timeout, "timeout must not be null");
final boolean isMariaDb = context.getCapability().isMariaDb();
final ServerVersion serverVersion = context.getServerVersion();
final long timeoutMs = timeout.toMillis();
final String sql = isMariaDb ? "SET max_statement_time=" + timeoutMs / 1000.0
: "SET SESSION MAX_EXECUTION_TIME=" + timeoutMs;

// mariadb: https://mariadb.com/kb/en/aborting-statements/
// mysql: https://dev.mysql.com/blog-archive/server-side-select-statement-timeouts/
// ref: https://github.com/mariadb-corporation/mariadb-connector-r2dbc
if (isMariaDb && serverVersion.isGreaterThanOrEqualTo(MARIA_10_2_0)
|| !isMariaDb && serverVersion.isGreaterThanOrEqualTo(MYSQL_5_7_4)) {
return QueryFlow.executeVoid(client, sql);
}

// TODO: implement me
return Mono.empty();
return Mono.error(
new R2dbcNonTransientResourceException(
"Statement timeout is not supported by server version " + serverVersion,
"HY000",
-1,
sql
)
);
}

boolean isSessionAutoCommit() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public R2dbcException toException() {
}

public R2dbcException toException(@Nullable String sql) {
// mysql: https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html
// mariadb: https://mariadb.com/kb/en/mariadb-error-code-reference/
// Should keep looking more error codes
switch (code) {
case 1044: // Database access denied
Expand All @@ -93,6 +95,8 @@ public R2dbcException toException(@Nullable String sql) {
return new R2dbcTransientResourceException(message, sqlState, code);
case 1205: // Wait lock timeout
case 1907: // Statement executing timeout
case 3024: // Query execution was interrupted, maximum statement execution time exceeded
case 1969: // Query execution was interrupted
return new R2dbcTimeoutException(message, sqlState, code);
case 1613: // Transaction rollback because of took too long
return new R2dbcRollbackException(message, sqlState, code);
Expand Down
22 changes: 22 additions & 0 deletions src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.asyncer.r2dbc.mysql;

import io.r2dbc.spi.R2dbcBadGrammarException;
import io.r2dbc.spi.R2dbcTimeoutException;
import io.r2dbc.spi.Result;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -50,6 +51,10 @@ void badGrammar(Function<? super MySqlConnection, Publisher<?>> runner) {
process(runner).verifyError(R2dbcBadGrammarException.class);
}

void timeout(Function<? super MySqlConnection, Publisher<?>> runner) {
process(runner).verifyError(R2dbcTimeoutException.class);
}

void illegalArgument(Function<? super MySqlConnection, Publisher<?>> runner) {
process(runner).expectError(IllegalArgumentException.class).verify(Duration.ofSeconds(3));
}
Expand Down Expand Up @@ -136,4 +141,21 @@ boolean envIsLessThanMySql57OrMariaDb102() {

return ver.isLessThan(ServerVersion.create(5, 7, 0));
}

boolean envIsLessThanMySql574OrMariaDb102() {
String version = System.getProperty("test.mysql.version");

if (version == null || version.isEmpty()) {
return true;
}

ServerVersion ver = ServerVersion.parse(version);
String type = System.getProperty("test.db.type");

if ("mariadb".equalsIgnoreCase(type)) {
return ver.isLessThan(ServerVersion.create(10, 2, 0));
}

return ver.isLessThan(ServerVersion.create(5, 7, 4));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,17 @@ void testUnionQueryWithJsonColumnDecodedAsString() {
);
}

@Test
@DisabledIf("envIsLessThanMySql574OrMariaDb102")
void setStatementTimeoutTest() {
final String sql = "SELECT 1 WHERE SLEEP(1) > 1";
timeout(connection -> connection.setStatementTimeout(Duration.ofMillis(500))
.then(Mono.from(connection.createStatement(sql).execute()))
.flatMapMany(result -> Mono.from(result.map((row, metadata) -> row.get(0, String.class))))
.doOnNext(it -> System.out.println(it))
);
}

private static JsonNode parseJson(String json) {
ObjectMapper mapper = new ObjectMapper();
try {
Expand Down

0 comments on commit 77d7600

Please sign in to comment.