Skip to content

Commit

Permalink
FMWK-634 Apply expression filter as secondary index filter fallback (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
agrgr authored Jan 6, 2025
1 parent 992c194 commit 667e06c
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.springframework.data.aerospike.query;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
Expand All @@ -25,13 +26,22 @@
import com.aerospike.client.query.Statement;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.aerospike.config.AerospikeDataSettings;
import org.springframework.data.aerospike.query.qualifier.Qualifier;
import org.springframework.data.aerospike.repository.query.Query;
import org.springframework.lang.Nullable;

import java.util.List;

import static com.aerospike.client.ResultCode.INDEX_GENERIC;
import static com.aerospike.client.ResultCode.INDEX_MAXCOUNT;
import static com.aerospike.client.ResultCode.INDEX_NAME_MAXLEN;
import static com.aerospike.client.ResultCode.INDEX_NOTFOUND;
import static com.aerospike.client.ResultCode.INDEX_NOTREADABLE;
import static com.aerospike.client.ResultCode.INDEX_OOM;
import static org.springframework.data.aerospike.query.QualifierUtils.queryCriteriaIsNotNull;

/**
Expand All @@ -40,13 +50,16 @@
* @author peter
* @author Anastasiia Smirnova
*/
@Slf4j
public class QueryEngine {

private static final Logger logger = LoggerFactory.getLogger(QueryEngine.class);
public static final String SCANS_DISABLED_MESSAGE =
"Query without a filter will initiate a scan. Since scans are potentially dangerous operations, they are " +
"disabled by default in spring-data-aerospike. " +
"If you still need to use them, enable them via `scans-enabled` property.";
public static final List<Integer> SEC_INDEX_ERROR_RESULT_CODES = List.of(
INDEX_NOTFOUND, INDEX_OOM, INDEX_NOTREADABLE, INDEX_GENERIC, INDEX_NAME_MAXLEN, INDEX_MAXCOUNT);
private final IAerospikeClient client;
@Getter
private final StatementBuilder statementBuilder;
Expand Down Expand Up @@ -111,6 +124,24 @@ public KeyRecordIterator select(String namespace, String set, String[] binNames,
}

RecordSet rs = client.query(localQueryPolicy, statement);
try {
return new KeyRecordIterator(namespace, rs);
} catch (AerospikeException e) {
if (statement.getFilter() != null && SEC_INDEX_ERROR_RESULT_CODES.contains(e.getResultCode())) {
log.warn("Got secondary index related exception (resultCode: {}), retrying with filter expression only",
e.getResultCode());
return retryWithFilterExpression(namespace, qualifier, statement);
}
throw e;
}
}

private KeyRecordIterator retryWithFilterExpression(String namespace, Qualifier qualifier, Statement statement) {
// retry without sIndex filter
qualifier.setHasSecIndexFilter(false);
QueryPolicy localQueryPolicyFallback = getQueryPolicy(qualifier, true);
statement.setFilter(null);
RecordSet rs = client.query(localQueryPolicyFallback, statement);
return new KeyRecordIterator(namespace, rs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.springframework.data.aerospike.query;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Key;
import com.aerospike.client.policy.Policy;
import com.aerospike.client.policy.QueryPolicy;
Expand All @@ -24,6 +25,8 @@
import com.aerospike.client.reactor.IAerospikeReactorClient;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.springframework.data.aerospike.config.AerospikeDataSettings;
import org.springframework.data.aerospike.query.qualifier.Qualifier;
import org.springframework.data.aerospike.repository.query.Query;
Expand All @@ -32,13 +35,15 @@
import reactor.core.publisher.Mono;

import static org.springframework.data.aerospike.query.QualifierUtils.queryCriteriaIsNotNull;
import static org.springframework.data.aerospike.query.QueryEngine.SEC_INDEX_ERROR_RESULT_CODES;

/**
* This class provides a multi-filter reactive query engine that augments the query capability in Aerospike.
*
* @author Sergii Karpenko
* @author Anastasiia Smirnova
*/
@Slf4j
public class ReactorQueryEngine {

private final IAerospikeReactorClient client;
Expand Down Expand Up @@ -103,7 +108,28 @@ public Flux<KeyRecord> select(String namespace, String set, String[] binNames, @
return Flux.error(new IllegalStateException(QueryEngine.SCANS_DISABLED_MESSAGE));
}

return client.query(localQueryPolicy, statement);
return client.query(localQueryPolicy, statement)
.onErrorResume(throwable -> {
if (throwable instanceof AerospikeException ae
&& statement.getFilter() != null
&& SEC_INDEX_ERROR_RESULT_CODES.contains(ae.getResultCode()))
{
log.warn(
"Got secondary index related exception (resultCode: {}), retrying with filter expression only",
ae.getResultCode());
return retryWithFilterExpression(qualifier, statement);
}
// for other exceptions
return Mono.error(throwable);
});
}

private Publisher<? extends KeyRecord> retryWithFilterExpression(Qualifier qualifier, Statement statement) {
// retry without sIndex filter
qualifier.setHasSecIndexFilter(false);
QueryPolicy localQueryPolicyFallback = getQueryPolicy(qualifier, true);
statement.setFilter(null);
return client.query(localQueryPolicyFallback, statement);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,19 +131,19 @@ private void setFilterFromSingleQualifier(Statement stmt, Qualifier qualifier) {
}

private boolean isIndexedBin(Statement stmt, Qualifier qualifier) {
boolean hasIndex = false, hasField = false;
boolean hasIndexesForField = false, hasField = false;
if (StringUtils.hasLength(qualifier.getBinName())) {
hasField = true;
hasIndex = indexesCache.hasIndexFor(
hasIndexesForField = indexesCache.hasIndexFor(
new IndexedField(stmt.getNamespace(), stmt.getSetName(), qualifier.getBinName())
);
}

if (log.isDebugEnabled() && hasField) {
log.debug("Qualifier #{}, bin {}.{}.{} has secondary index: {}",
qualifier.hashCode(), stmt.getNamespace(), stmt.getSetName(), qualifier.getBinName(), hasIndex);
log.debug("Qualifier #{}, bin {}.{}.{} has secondary index(es): {}", qualifier.hashCode(),
stmt.getNamespace(), stmt.getSetName(), qualifier.getBinName(), hasIndexesForField);
}
return hasIndex;
return hasIndexesForField;
}

private int getMinBinValuesRatioForQualifier(Statement stmt, Qualifier qualifier) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.springframework.data.aerospike.core.reactive;

import org.junit.jupiter.api.BeforeAll;
import com.aerospike.client.query.IndexType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
Expand Down Expand Up @@ -31,12 +31,6 @@
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class ReactiveAerospikeTemplateFindByQueryTests extends BaseReactiveIntegrationTests {

@BeforeAll
public void beforeAllSetUp() {
additionalAerospikeTestOperations.deleteAllAndVerify(Person.class);
additionalAerospikeTestOperations.deleteAllAndVerify(Person.class, OVERRIDE_SET_NAME);
}

@Override
@BeforeEach
public void setUp() {
Expand All @@ -45,6 +39,20 @@ public void setUp() {
super.setUp();
}

@Test
public void findWithFilterEqual_String_fallbackToFilterExp() {
// incompatible secondary index (should be STRING) causes "index not found" exception
reactiveTemplate.createIndex(Person.class, "person_first_name_index_numeric", "firstName",
IndexType.NUMERIC).block();
Query query = QueryUtils.createQueryForMethodWithArgs(serverVersionSupport, "findByFirstName", "Dave");
reactiveTemplate.insert(new Person(nextId(), "Dave", "Matthews")).block();
// after getting index exception there is a fallback to filter exp only
List<Person> result = reactiveTemplate.find(query, Person.class).collectList().block();
assertThat(Objects.requireNonNull(result).stream().map(Person::getFirstName).collect(Collectors.toList()))
.containsExactly("Dave");
reactiveTemplate.deleteIndex(Person.class, "person_first_name_index_numeric").block();
}

@Test
public void findAll_OrderByFirstName() {
List<Person> persons = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public void afterAll() {
template.deleteAll(allPersons);
template.deleteAll(allPersons, OVERRIDE_SET_NAME);
additionalAerospikeTestOperations.dropIndex(Person.class, "person_first_name_index");
additionalAerospikeTestOperations.dropIndex(Person.class, "person_byte_array_index");
}

@Test
Expand All @@ -112,6 +113,17 @@ public void findWithFilterEqual_String() {
assertThat(result).containsOnly(dave);
}

@Test
public void findWithFilterEqual_String_fallbackToFilterExp() {
additionalAerospikeTestOperations.createIndex(Person.class, "person_first_name_index_numeric", "firstName",
IndexType.NUMERIC); // incompatible secondary index (should be STRING) causes "index not found" exception
Query query = QueryUtils.createQueryForMethodWithArgs(serverVersionSupport, "findByFirstName", "Dave");
// after getting index exception there is a fallback to filter exp only
Stream<Person> result = template.find(query, Person.class);
assertThat(result).containsOnly(dave);
additionalAerospikeTestOperations.dropIndex(Person.class, "person_first_name_index_numeric");
}

@Test
public void findWithFilterEqual_ByteArray() {
if (serverVersionSupport.isServerVersionGtOrEq7()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ void binIsIndexed() {

// 3 events: Created query, Bin has secondary index, Secondary index filter is not set
assertThat(memoryAppender.countEventsForLogger(LOGGER_NAME)).isEqualTo(3);
String msg = "bin TEST.testSet.testField has secondary index: false";
String msg = "bin TEST.testSet.testField has secondary index(es): false";
assertThat(memoryAppender.search(msg, Level.DEBUG).size()).isEqualTo(1);
assertThat(memoryAppender.contains(msg, Level.INFO)).isFalse();
}
Expand Down

0 comments on commit 667e06c

Please sign in to comment.