diff --git a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java index c2bedf146..6a0abd852 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java @@ -33,6 +33,7 @@ import io.r2dbc.spi.Connection; import io.r2dbc.spi.IsolationLevel; import io.r2dbc.spi.Lifecycle; +import io.r2dbc.spi.R2dbcNonTransientResourceException; import io.r2dbc.spi.TransactionDefinition; import io.r2dbc.spi.ValidationDepth; import org.jetbrains.annotations.Nullable; @@ -74,6 +75,10 @@ public final class MySqlConnection implements Connection, Lifecycle, ConnectionS private static final ServerVersion MYSQL_8 = ServerVersion.create(8, 0, 0); + private static final ServerVersion MYSQL_5_7_4 = ServerVersion.create(5, 7, 4); + + private static final ServerVersion MARIA_10_2_0 = ServerVersion.create(10, 2, 0, true); + private static final BiConsumer> PING = (message, sink) -> { if (message instanceof ErrorMessage) { ErrorMessage msg = (ErrorMessage) message; @@ -396,9 +401,28 @@ public Mono setLockWaitTimeout(Duration timeout) { @Override public Mono setStatementTimeout(Duration timeout) { requireNonNull(timeout, "timeout must not be null"); + final boolean isMariaDb = context.getCapability().isMariaDb(); + final ServerVersion serverVersion = context.getServerVersion(); + final long timeoutMs = timeout.toMillis(); + final String sql = isMariaDb ? "SET max_statement_time=" + timeoutMs / 1000.0 + : "SET SESSION MAX_EXECUTION_TIME=" + timeoutMs; + + // mariadb: https://mariadb.com/kb/en/aborting-statements/ + // mysql: https://dev.mysql.com/blog-archive/server-side-select-statement-timeouts/ + // ref: https://github.com/mariadb-corporation/mariadb-connector-r2dbc + if (isMariaDb && serverVersion.isGreaterThanOrEqualTo(MARIA_10_2_0) + || !isMariaDb && serverVersion.isGreaterThanOrEqualTo(MYSQL_5_7_4)) { + return QueryFlow.executeVoid(client, sql); + } - // TODO: implement me - return Mono.empty(); + return Mono.error( + new R2dbcNonTransientResourceException( + "Statement timeout is not supported by server version " + serverVersion, + "HY000", + -1, + sql + ) + ); } boolean isSessionAutoCommit() { diff --git a/src/main/java/io/asyncer/r2dbc/mysql/message/server/ErrorMessage.java b/src/main/java/io/asyncer/r2dbc/mysql/message/server/ErrorMessage.java index 9dee369ee..a7ef07fb8 100644 --- a/src/main/java/io/asyncer/r2dbc/mysql/message/server/ErrorMessage.java +++ b/src/main/java/io/asyncer/r2dbc/mysql/message/server/ErrorMessage.java @@ -74,6 +74,8 @@ public R2dbcException toException() { } public R2dbcException toException(@Nullable String sql) { + // mysql: https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html + // mariadb: https://mariadb.com/kb/en/mariadb-error-code-reference/ // Should keep looking more error codes switch (code) { case 1044: // Database access denied @@ -93,6 +95,8 @@ public R2dbcException toException(@Nullable String sql) { return new R2dbcTransientResourceException(message, sqlState, code); case 1205: // Wait lock timeout case 1907: // Statement executing timeout + case 3024: // Query execution was interrupted, maximum statement execution time exceeded + case 1969: // Query execution was interrupted return new R2dbcTimeoutException(message, sqlState, code); case 1613: // Transaction rollback because of took too long return new R2dbcRollbackException(message, sqlState, code); diff --git a/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java b/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java index 11e754e99..3f81a383d 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java @@ -17,6 +17,7 @@ package io.asyncer.r2dbc.mysql; import io.r2dbc.spi.R2dbcBadGrammarException; +import io.r2dbc.spi.R2dbcTimeoutException; import io.r2dbc.spi.Result; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Publisher; @@ -50,6 +51,10 @@ void badGrammar(Function> runner) { process(runner).verifyError(R2dbcBadGrammarException.class); } + void timeout(Function> runner) { + process(runner).verifyError(R2dbcTimeoutException.class); + } + void illegalArgument(Function> runner) { process(runner).expectError(IllegalArgumentException.class).verify(Duration.ofSeconds(3)); } @@ -136,4 +141,21 @@ boolean envIsLessThanMySql57OrMariaDb102() { return ver.isLessThan(ServerVersion.create(5, 7, 0)); } + + boolean envIsLessThanMySql574OrMariaDb102() { + String version = System.getProperty("test.mysql.version"); + + if (version == null || version.isEmpty()) { + return true; + } + + ServerVersion ver = ServerVersion.parse(version); + String type = System.getProperty("test.db.type"); + + if ("mariadb".equalsIgnoreCase(type)) { + return ver.isLessThan(ServerVersion.create(10, 2, 0)); + } + + return ver.isLessThan(ServerVersion.create(5, 7, 4)); + } } diff --git a/src/test/java/io/asyncer/r2dbc/mysql/QueryIntegrationTestSupport.java b/src/test/java/io/asyncer/r2dbc/mysql/QueryIntegrationTestSupport.java index d03811faf..9a0051457 100644 --- a/src/test/java/io/asyncer/r2dbc/mysql/QueryIntegrationTestSupport.java +++ b/src/test/java/io/asyncer/r2dbc/mysql/QueryIntegrationTestSupport.java @@ -612,6 +612,17 @@ void testUnionQueryWithJsonColumnDecodedAsString() { ); } + @Test + @DisabledIf("envIsLessThanMySql574OrMariaDb102") + void setStatementTimeoutTest() { + final String sql = "SELECT 1 WHERE SLEEP(1) > 1"; + timeout(connection -> connection.setStatementTimeout(Duration.ofMillis(500)) + .then(Mono.from(connection.createStatement(sql).execute())) + .flatMapMany(result -> Mono.from(result.map((row, metadata) -> row.get(0, String.class)))) + .doOnNext(it -> System.out.println(it)) + ); + } + private static JsonNode parseJson(String json) { ObjectMapper mapper = new ObjectMapper(); try {