From 1f1faa3ed893a07a9f99e586ccd9fd3820534703 Mon Sep 17 00:00:00 2001 From: yrizhkov Date: Thu, 2 Jan 2025 10:54:30 +0200 Subject: [PATCH] FMWK-346 Use reactive MRT operations --- pom.xml | 2 +- .../AerospikeReactiveTransaction.java | 27 +++++++++++-------- .../AerospikeReactiveTransactionManager.java | 25 ++++++++--------- 3 files changed, 30 insertions(+), 24 deletions(-) diff --git a/pom.xml b/pom.xml index 4577ac91..92615759 100644 --- a/pom.xml +++ b/pom.xml @@ -31,7 +31,7 @@ 3.3.0 1.6 9.0.2 - 8.1.2 + 9.0.2 3.7.0 3.1.9 2.13.0 diff --git a/src/main/java/org/springframework/data/aerospike/transaction/reactive/AerospikeReactiveTransaction.java b/src/main/java/org/springframework/data/aerospike/transaction/reactive/AerospikeReactiveTransaction.java index 31ce1f4f..6eb16046 100644 --- a/src/main/java/org/springframework/data/aerospike/transaction/reactive/AerospikeReactiveTransaction.java +++ b/src/main/java/org/springframework/data/aerospike/transaction/reactive/AerospikeReactiveTransaction.java @@ -1,9 +1,11 @@ package org.springframework.data.aerospike.transaction.reactive; -import org.springframework.data.aerospike.transaction.sync.AerospikeTransactionResourceHolder; +import com.aerospike.client.AbortStatus; +import com.aerospike.client.CommitStatus; import org.springframework.lang.Nullable; import org.springframework.transaction.support.SmartTransactionObject; import org.springframework.util.Assert; +import reactor.core.publisher.Mono; /** * A {@link SmartTransactionObject} implementation that has reactive transaction resource holder @@ -39,26 +41,29 @@ void setResourceHolder(@Nullable AerospikeReactiveTransactionResourceHolder reso this.resourceHolder = resourceHolder; } - private void failIfNoTransaction() { - if (!hasResourceHolder()) { - throw new IllegalStateException("Error: expecting transaction to exist"); - } + private Mono getResourceHolder() { + return Mono.fromCallable(() -> { + if (!hasResourceHolder()) { + throw new IllegalStateException("Error: expecting transaction to exist"); + } + return resourceHolder; + }); } /** * Commit the transaction */ - public void commitTransaction() { - failIfNoTransaction(); - resourceHolder.getClient().getAerospikeClient().commit(resourceHolder.getTransaction()); + public Mono commitTransaction() { + return getResourceHolder() + .flatMap(h -> h.getClient().commit(h.getTransaction())); } /** * Rollback (abort) the transaction */ - public void abortTransaction() { - failIfNoTransaction(); - resourceHolder.getClient().getAerospikeClient().abort(resourceHolder.getTransaction()); + public Mono abortTransaction() { + return getResourceHolder() + .flatMap(h -> h.getClient().abort(h.getTransaction())); } @Override diff --git a/src/main/java/org/springframework/data/aerospike/transaction/reactive/AerospikeReactiveTransactionManager.java b/src/main/java/org/springframework/data/aerospike/transaction/reactive/AerospikeReactiveTransactionManager.java index c8db4f52..a9e05f2d 100644 --- a/src/main/java/org/springframework/data/aerospike/transaction/reactive/AerospikeReactiveTransactionManager.java +++ b/src/main/java/org/springframework/data/aerospike/transaction/reactive/AerospikeReactiveTransactionManager.java @@ -25,7 +25,6 @@ public class AerospikeReactiveTransactionManager extends AbstractReactiveTransac /** * Create a new instance of {@link AerospikeReactiveTransactionManager} */ - public AerospikeReactiveTransactionManager(IAerospikeReactorClient client) { this.client = client; } @@ -73,7 +72,12 @@ protected Mono doBegin(TransactionSynchronizationManager synchronizationMa rHolder.setSynchronizedWithTransaction(true); synchronizationManager.bindResource(client, rHolder); }) - .onErrorMap(e -> new TransactionSystemException("Could not bind transaction resource", e)) + .onErrorMap(e -> { + if (e instanceof TransactionSystemException) { + return e; + } + return new TransactionSystemException("Could not bind transaction resource", e); + }) .then(); }); } @@ -89,10 +93,8 @@ private Mono createResourceHolder(IA @Override protected Mono doCommit(TransactionSynchronizationManager synchronizationManager, GenericReactiveTransaction status) { - return Mono.fromRunnable(() -> { - AerospikeReactiveTransaction transaction = getTransaction(status); - transaction.commitTransaction(); - }) + return Mono.fromSupplier(() -> getTransaction(status)) + .flatMap(AerospikeReactiveTransaction::commitTransaction) .onErrorMap(e -> new TransactionSystemException("Could not commit transaction", e)) .then(); } @@ -100,10 +102,8 @@ protected Mono doCommit(TransactionSynchronizationManager synchronizationM @Override protected Mono doRollback(TransactionSynchronizationManager synchronizationManager, GenericReactiveTransaction status) { - return Mono.fromRunnable(() -> { - AerospikeReactiveTransaction transaction = getTransaction(status); - transaction.abortTransaction(); - }) + return Mono.fromSupplier(() -> getTransaction(status)) + .flatMap(AerospikeReactiveTransaction::abortTransaction) .onErrorMap(e -> new TransactionSystemException("Could not abort transaction", e)) .then(); } @@ -135,7 +135,8 @@ protected Mono doSetRollbackOnly(TransactionSynchronizationManager synchro AerospikeReactiveTransaction transaction = toAerospikeTransaction(status); transaction.getRequiredResourceHolder().setRollbackOnly(); }) - .onErrorMap(e -> new TransactionSystemException("Could not resume transaction", e)) + .onErrorMap(e -> + new TransactionSystemException("Could not set rollback only for a transaction", e)) .then(); } @@ -149,7 +150,7 @@ protected Mono doCleanupAfterCompletion(TransactionSynchronizationManager synchronizationManager.unbindResource(client); aerospikeTransaction.getRequiredResourceHolder().clear(); }) - .onErrorMap(e -> new TransactionSystemException("Could not resume transaction", e)) + .onErrorMap(e -> new TransactionSystemException("Could not clean up transaction", e)) .then(); } }