Skip to content

Commit

Permalink
add Tests and fix error type
Browse files Browse the repository at this point in the history
  • Loading branch information
jchrys committed Jan 18, 2024
1 parent e839c69 commit 9e8e571
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 6 deletions.
14 changes: 8 additions & 6 deletions src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ public final class MySqlConnection implements Connection, Lifecycle, ConnectionS

private static final ServerVersion MYSQL_8 = ServerVersion.create(8, 0, 0);

private static final ServerVersion MYSQL_STATEMENT_TIMEOUT = ServerVersion.create(5, 7, 4);
private static final ServerVersion MYSQL_5_7_4 = ServerVersion.create(5, 7, 4);

private static final ServerVersion MARIA_STATEMENT_TIMEOUT = ServerVersion.create(10, 2, 0);
private static final ServerVersion MARIA_10_2_0 = ServerVersion.create(10, 2, 0);

private static final BiConsumer<ServerMessage, SynchronousSink<Boolean>> PING = (message, sink) -> {
if (message instanceof ErrorMessage) {
Expand Down Expand Up @@ -403,13 +403,15 @@ public Mono<Void> setStatementTimeout(Duration timeout) {
requireNonNull(timeout, "timeout must not be null");
final boolean isMariaDb = context.getCapability().isMariaDb();
final ServerVersion serverVersion = context.getServerVersion();
final String sql = isMariaDb ? "SET max_statement_time=" + timeout.getSeconds()
: "SET SESSION MAX_EXECUTION_TIME=" + timeout.toMillis();
final long timeoutMs = timeout.toMillis();
final String sql = isMariaDb ? "SET max_statement_time=" + timeoutMs / 1000.0
: "SET SESSION MAX_EXECUTION_TIME=" + timeoutMs;

// mariadb: https://mariadb.com/kb/en/aborting-statements/
// mysql: https://dev.mysql.com/blog-archive/server-side-select-statement-timeouts/
if (isMariaDb && serverVersion.isGreaterThanOrEqualTo(MARIA_STATEMENT_TIMEOUT)
|| !isMariaDb && serverVersion.isGreaterThanOrEqualTo(MYSQL_STATEMENT_TIMEOUT)) {
// ref: https://github.com/mariadb-corporation/mariadb-connector-r2dbc
if (isMariaDb && serverVersion.isGreaterThanOrEqualTo(MARIA_10_2_0)
|| !isMariaDb && serverVersion.isGreaterThanOrEqualTo(MYSQL_5_7_4)) {
return QueryFlow.executeVoid(client, sql);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public R2dbcException toException(@Nullable String sql) {
return new R2dbcTransientResourceException(message, sqlState, code);
case 1205: // Wait lock timeout
case 1907: // Statement executing timeout
case 3024: // query timeout
return new R2dbcTimeoutException(message, sqlState, code);
case 1613: // Transaction rollback because of took too long
return new R2dbcRollbackException(message, sqlState, code);
Expand Down
22 changes: 22 additions & 0 deletions src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.asyncer.r2dbc.mysql;

import io.r2dbc.spi.R2dbcBadGrammarException;
import io.r2dbc.spi.R2dbcTimeoutException;
import io.r2dbc.spi.Result;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -50,6 +51,10 @@ void badGrammar(Function<? super MySqlConnection, Publisher<?>> runner) {
process(runner).verifyError(R2dbcBadGrammarException.class);
}

void timeout(Function<? super MySqlConnection, Publisher<?>> runner) {
process(runner).verifyError(R2dbcTimeoutException.class);
}

void illegalArgument(Function<? super MySqlConnection, Publisher<?>> runner) {
process(runner).expectError(IllegalArgumentException.class).verify(Duration.ofSeconds(3));
}
Expand Down Expand Up @@ -136,4 +141,21 @@ boolean envIsLessThanMySql57OrMariaDb102() {

return ver.isLessThan(ServerVersion.create(5, 7, 0));
}

boolean envIsLessThanMySql574OrMariaDb102() {
String version = System.getProperty("test.mysql.version");

if (version == null || version.isEmpty()) {
return true;
}

ServerVersion ver = ServerVersion.parse(version);
String type = System.getProperty("test.db.type");

if ("mariadb".equalsIgnoreCase(type)) {
return ver.isLessThan(ServerVersion.create(10, 2, 0));
}

return ver.isLessThan(ServerVersion.create(5, 7, 4));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,17 @@ void testUnionQueryWithJsonColumnDecodedAsString() {
);
}

@Test
@DisabledIf("envIsLessThanMySql574OrMariaDb102")
void setStatementTimeoutTest() {
final String sql = "SELECT 1 WHERE SLEEP(1) > 1";
timeout(connection -> connection.setStatementTimeout(Duration.ofMillis(500))
.then(Mono.from(connection.createStatement(sql).execute()))
.flatMapMany(result -> Mono.from(result.map((row, metadata) -> row.get(0, String.class))))
.doOnNext(it -> System.out.println(it))
);
}

private static JsonNode parseJson(String json) {
ObjectMapper mapper = new ObjectMapper();
try {
Expand Down

0 comments on commit 9e8e571

Please sign in to comment.