From 667e06ca27f480309029628fb59a45ef55ba9196 Mon Sep 17 00:00:00 2001 From: Andrey G Date: Mon, 6 Jan 2025 10:33:55 +0200 Subject: [PATCH] FMWK-634 Apply expression filter as secondary index filter fallback (#817) --- .../data/aerospike/query/QueryEngine.java | 31 +++++++++++++++++++ .../aerospike/query/ReactorQueryEngine.java | 28 ++++++++++++++++- .../aerospike/query/StatementBuilder.java | 10 +++--- ...tiveAerospikeTemplateFindByQueryTests.java | 22 ++++++++----- .../AerospikeTemplateFindByQueryTests.java | 12 +++++++ .../data/aerospike/logging/LoggingTests.java | 2 +- 6 files changed, 91 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/springframework/data/aerospike/query/QueryEngine.java b/src/main/java/org/springframework/data/aerospike/query/QueryEngine.java index bcb5b8939..6b77fec4d 100644 --- a/src/main/java/org/springframework/data/aerospike/query/QueryEngine.java +++ b/src/main/java/org/springframework/data/aerospike/query/QueryEngine.java @@ -16,6 +16,7 @@ */ package org.springframework.data.aerospike.query; +import com.aerospike.client.AerospikeException; import com.aerospike.client.IAerospikeClient; import com.aerospike.client.Key; import com.aerospike.client.Record; @@ -25,6 +26,7 @@ import com.aerospike.client.query.Statement; import lombok.Getter; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.aerospike.config.AerospikeDataSettings; @@ -32,6 +34,14 @@ import org.springframework.data.aerospike.repository.query.Query; import org.springframework.lang.Nullable; +import java.util.List; + +import static com.aerospike.client.ResultCode.INDEX_GENERIC; +import static com.aerospike.client.ResultCode.INDEX_MAXCOUNT; +import static com.aerospike.client.ResultCode.INDEX_NAME_MAXLEN; +import static com.aerospike.client.ResultCode.INDEX_NOTFOUND; +import static com.aerospike.client.ResultCode.INDEX_NOTREADABLE; +import static com.aerospike.client.ResultCode.INDEX_OOM; import static org.springframework.data.aerospike.query.QualifierUtils.queryCriteriaIsNotNull; /** @@ -40,6 +50,7 @@ * @author peter * @author Anastasiia Smirnova */ +@Slf4j public class QueryEngine { private static final Logger logger = LoggerFactory.getLogger(QueryEngine.class); @@ -47,6 +58,8 @@ public class QueryEngine { "Query without a filter will initiate a scan. Since scans are potentially dangerous operations, they are " + "disabled by default in spring-data-aerospike. " + "If you still need to use them, enable them via `scans-enabled` property."; + public static final List SEC_INDEX_ERROR_RESULT_CODES = List.of( + INDEX_NOTFOUND, INDEX_OOM, INDEX_NOTREADABLE, INDEX_GENERIC, INDEX_NAME_MAXLEN, INDEX_MAXCOUNT); private final IAerospikeClient client; @Getter private final StatementBuilder statementBuilder; @@ -111,6 +124,24 @@ public KeyRecordIterator select(String namespace, String set, String[] binNames, } RecordSet rs = client.query(localQueryPolicy, statement); + try { + return new KeyRecordIterator(namespace, rs); + } catch (AerospikeException e) { + if (statement.getFilter() != null && SEC_INDEX_ERROR_RESULT_CODES.contains(e.getResultCode())) { + log.warn("Got secondary index related exception (resultCode: {}), retrying with filter expression only", + e.getResultCode()); + return retryWithFilterExpression(namespace, qualifier, statement); + } + throw e; + } + } + + private KeyRecordIterator retryWithFilterExpression(String namespace, Qualifier qualifier, Statement statement) { + // retry without sIndex filter + qualifier.setHasSecIndexFilter(false); + QueryPolicy localQueryPolicyFallback = getQueryPolicy(qualifier, true); + statement.setFilter(null); + RecordSet rs = client.query(localQueryPolicyFallback, statement); return new KeyRecordIterator(namespace, rs); } diff --git a/src/main/java/org/springframework/data/aerospike/query/ReactorQueryEngine.java b/src/main/java/org/springframework/data/aerospike/query/ReactorQueryEngine.java index 94fb529bd..c796bbcd2 100644 --- a/src/main/java/org/springframework/data/aerospike/query/ReactorQueryEngine.java +++ b/src/main/java/org/springframework/data/aerospike/query/ReactorQueryEngine.java @@ -16,6 +16,7 @@ */ package org.springframework.data.aerospike.query; +import com.aerospike.client.AerospikeException; import com.aerospike.client.Key; import com.aerospike.client.policy.Policy; import com.aerospike.client.policy.QueryPolicy; @@ -24,6 +25,8 @@ import com.aerospike.client.reactor.IAerospikeReactorClient; import lombok.Getter; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.reactivestreams.Publisher; import org.springframework.data.aerospike.config.AerospikeDataSettings; import org.springframework.data.aerospike.query.qualifier.Qualifier; import org.springframework.data.aerospike.repository.query.Query; @@ -32,6 +35,7 @@ import reactor.core.publisher.Mono; import static org.springframework.data.aerospike.query.QualifierUtils.queryCriteriaIsNotNull; +import static org.springframework.data.aerospike.query.QueryEngine.SEC_INDEX_ERROR_RESULT_CODES; /** * This class provides a multi-filter reactive query engine that augments the query capability in Aerospike. @@ -39,6 +43,7 @@ * @author Sergii Karpenko * @author Anastasiia Smirnova */ +@Slf4j public class ReactorQueryEngine { private final IAerospikeReactorClient client; @@ -103,7 +108,28 @@ public Flux select(String namespace, String set, String[] binNames, @ return Flux.error(new IllegalStateException(QueryEngine.SCANS_DISABLED_MESSAGE)); } - return client.query(localQueryPolicy, statement); + return client.query(localQueryPolicy, statement) + .onErrorResume(throwable -> { + if (throwable instanceof AerospikeException ae + && statement.getFilter() != null + && SEC_INDEX_ERROR_RESULT_CODES.contains(ae.getResultCode())) + { + log.warn( + "Got secondary index related exception (resultCode: {}), retrying with filter expression only", + ae.getResultCode()); + return retryWithFilterExpression(qualifier, statement); + } + // for other exceptions + return Mono.error(throwable); + }); + } + + private Publisher retryWithFilterExpression(Qualifier qualifier, Statement statement) { + // retry without sIndex filter + qualifier.setHasSecIndexFilter(false); + QueryPolicy localQueryPolicyFallback = getQueryPolicy(qualifier, true); + statement.setFilter(null); + return client.query(localQueryPolicyFallback, statement); } /** diff --git a/src/main/java/org/springframework/data/aerospike/query/StatementBuilder.java b/src/main/java/org/springframework/data/aerospike/query/StatementBuilder.java index e22c9eccb..b221f9d45 100644 --- a/src/main/java/org/springframework/data/aerospike/query/StatementBuilder.java +++ b/src/main/java/org/springframework/data/aerospike/query/StatementBuilder.java @@ -131,19 +131,19 @@ private void setFilterFromSingleQualifier(Statement stmt, Qualifier qualifier) { } private boolean isIndexedBin(Statement stmt, Qualifier qualifier) { - boolean hasIndex = false, hasField = false; + boolean hasIndexesForField = false, hasField = false; if (StringUtils.hasLength(qualifier.getBinName())) { hasField = true; - hasIndex = indexesCache.hasIndexFor( + hasIndexesForField = indexesCache.hasIndexFor( new IndexedField(stmt.getNamespace(), stmt.getSetName(), qualifier.getBinName()) ); } if (log.isDebugEnabled() && hasField) { - log.debug("Qualifier #{}, bin {}.{}.{} has secondary index: {}", - qualifier.hashCode(), stmt.getNamespace(), stmt.getSetName(), qualifier.getBinName(), hasIndex); + log.debug("Qualifier #{}, bin {}.{}.{} has secondary index(es): {}", qualifier.hashCode(), + stmt.getNamespace(), stmt.getSetName(), qualifier.getBinName(), hasIndexesForField); } - return hasIndex; + return hasIndexesForField; } private int getMinBinValuesRatioForQualifier(Statement stmt, Qualifier qualifier) { diff --git a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateFindByQueryTests.java b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateFindByQueryTests.java index 0070c83bd..11edc1285 100644 --- a/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateFindByQueryTests.java +++ b/src/test/java/org/springframework/data/aerospike/core/reactive/ReactiveAerospikeTemplateFindByQueryTests.java @@ -1,6 +1,6 @@ package org.springframework.data.aerospike.core.reactive; -import org.junit.jupiter.api.BeforeAll; +import com.aerospike.client.query.IndexType; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; @@ -31,12 +31,6 @@ @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class ReactiveAerospikeTemplateFindByQueryTests extends BaseReactiveIntegrationTests { - @BeforeAll - public void beforeAllSetUp() { - additionalAerospikeTestOperations.deleteAllAndVerify(Person.class); - additionalAerospikeTestOperations.deleteAllAndVerify(Person.class, OVERRIDE_SET_NAME); - } - @Override @BeforeEach public void setUp() { @@ -45,6 +39,20 @@ public void setUp() { super.setUp(); } + @Test + public void findWithFilterEqual_String_fallbackToFilterExp() { + // incompatible secondary index (should be STRING) causes "index not found" exception + reactiveTemplate.createIndex(Person.class, "person_first_name_index_numeric", "firstName", + IndexType.NUMERIC).block(); + Query query = QueryUtils.createQueryForMethodWithArgs(serverVersionSupport, "findByFirstName", "Dave"); + reactiveTemplate.insert(new Person(nextId(), "Dave", "Matthews")).block(); + // after getting index exception there is a fallback to filter exp only + List result = reactiveTemplate.find(query, Person.class).collectList().block(); + assertThat(Objects.requireNonNull(result).stream().map(Person::getFirstName).collect(Collectors.toList())) + .containsExactly("Dave"); + reactiveTemplate.deleteIndex(Person.class, "person_first_name_index_numeric").block(); + } + @Test public void findAll_OrderByFirstName() { List persons = new ArrayList<>(); diff --git a/src/test/java/org/springframework/data/aerospike/core/sync/AerospikeTemplateFindByQueryTests.java b/src/test/java/org/springframework/data/aerospike/core/sync/AerospikeTemplateFindByQueryTests.java index 3d134a3b0..2ae75ee96 100644 --- a/src/test/java/org/springframework/data/aerospike/core/sync/AerospikeTemplateFindByQueryTests.java +++ b/src/test/java/org/springframework/data/aerospike/core/sync/AerospikeTemplateFindByQueryTests.java @@ -103,6 +103,7 @@ public void afterAll() { template.deleteAll(allPersons); template.deleteAll(allPersons, OVERRIDE_SET_NAME); additionalAerospikeTestOperations.dropIndex(Person.class, "person_first_name_index"); + additionalAerospikeTestOperations.dropIndex(Person.class, "person_byte_array_index"); } @Test @@ -112,6 +113,17 @@ public void findWithFilterEqual_String() { assertThat(result).containsOnly(dave); } + @Test + public void findWithFilterEqual_String_fallbackToFilterExp() { + additionalAerospikeTestOperations.createIndex(Person.class, "person_first_name_index_numeric", "firstName", + IndexType.NUMERIC); // incompatible secondary index (should be STRING) causes "index not found" exception + Query query = QueryUtils.createQueryForMethodWithArgs(serverVersionSupport, "findByFirstName", "Dave"); + // after getting index exception there is a fallback to filter exp only + Stream result = template.find(query, Person.class); + assertThat(result).containsOnly(dave); + additionalAerospikeTestOperations.dropIndex(Person.class, "person_first_name_index_numeric"); + } + @Test public void findWithFilterEqual_ByteArray() { if (serverVersionSupport.isServerVersionGtOrEq7()) { diff --git a/src/test/java/org/springframework/data/aerospike/logging/LoggingTests.java b/src/test/java/org/springframework/data/aerospike/logging/LoggingTests.java index 5b97d333c..492fae6a3 100644 --- a/src/test/java/org/springframework/data/aerospike/logging/LoggingTests.java +++ b/src/test/java/org/springframework/data/aerospike/logging/LoggingTests.java @@ -53,7 +53,7 @@ void binIsIndexed() { // 3 events: Created query, Bin has secondary index, Secondary index filter is not set assertThat(memoryAppender.countEventsForLogger(LOGGER_NAME)).isEqualTo(3); - String msg = "bin TEST.testSet.testField has secondary index: false"; + String msg = "bin TEST.testSet.testField has secondary index(es): false"; assertThat(memoryAppender.search(msg, Level.DEBUG).size()).isEqualTo(1); assertThat(memoryAppender.contains(msg, Level.INFO)).isFalse(); }