Skip to content

Commit

Permalink
Merge pull request opensearch-project#801 from peternied/reliable-cat…
Browse files Browse the repository at this point in the history
…-indices

Fix flaky behavior when comparing _cat/indices
  • Loading branch information
peternied authored Jul 9, 2024
2 parents 70340cb + 1b1d9ff commit 049cf90
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 19 deletions.
26 changes: 7 additions & 19 deletions DocumentsFromSnapshotMigration/src/test/java/com/rfs/FullTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.rfs.common.SnapshotShardUnpacker;
import com.rfs.common.SourceRepo;
import com.rfs.framework.SearchClusterContainer;
import com.rfs.http.SearchClusterRequests;
import com.rfs.framework.PreloadedSearchClusterContainer;
import com.rfs.transformers.TransformFunctions;
import com.rfs.transformers.Transformer;
Expand Down Expand Up @@ -183,28 +184,15 @@ public void testDocumentMigration(SearchClusterContainer.Version baseSourceImage
private void checkClusterMigrationOnFinished(SearchClusterContainer esSourceContainer,
OpensearchContainer<?> osTargetContainer) {
var targetClient = new RestClient(new ConnectionDetails(osTargetContainer.getHttpHostAddress(), null, null));
var sourceMap = getIndexToCountMap(new RestClient(new ConnectionDetails(esSourceContainer.getUrl(),
null, null)));
var sourceClient = new RestClient(new ConnectionDetails(esSourceContainer.getUrl(), null, null));

var requests = new SearchClusterRequests();
var sourceMap = requests.getMapOfIndexAndDocCount(sourceClient);
var refreshResponse = targetClient.get("_refresh");
Assertions.assertEquals(200, refreshResponse.code);
var targetMap = getIndexToCountMap(targetClient);
MatcherAssert.assertThat(targetMap, Matchers.equalTo(sourceMap));
}
var targetMap = requests.getMapOfIndexAndDocCount(targetClient);

private Map<String,Integer> getIndexToCountMap(RestClient client) {;
var lines = Optional.ofNullable(client.get("_cat/indices"))
.flatMap(r->Optional.ofNullable(r.body))
.map(b->b.split("\n"))
.orElse(new String[0]);
return Arrays.stream(lines)
.map(line -> {
var matcher = CAT_INDICES_INDEX_COUNT_PATTERN.matcher(line);
return !matcher.find() ? null :
new AbstractMap.SimpleEntry<>(matcher.group(1), matcher.group(2));
})
.filter(Objects::nonNull)
.filter(kvp->!kvp.getKey().startsWith("."))
.collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, kvp -> Integer.parseInt(kvp.getValue())));
MatcherAssert.assertThat(targetMap, Matchers.equalTo(sourceMap));
}

@SneakyThrows
Expand Down
2 changes: 2 additions & 0 deletions RFS/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ dependencies {
testFixturesImplementation group: 'com.github.docker-java', name: 'docker-java-core'
testFixturesImplementation group: 'com.github.docker-java', name: 'docker-java-transport-httpclient5'
testFixturesImplementation group: 'org.testcontainers', name: 'testcontainers'

testFixturesImplementation group: 'org.hamcrest', name: 'hamcrest'
}

application {
Expand Down
59 changes: 59 additions & 0 deletions RFS/src/testFixtures/java/com/rfs/http/SearchClusterRequests.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.rfs.http;


import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.rfs.common.RestClient;

import lombok.SneakyThrows;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

public class SearchClusterRequests {

private final ObjectMapper mapper = new ObjectMapper();

@SneakyThrows
public Map<String,Integer> getMapOfIndexAndDocCount(final RestClient client) {
var catIndicesResponse = client.get("_cat/indices?format=json");
assertThat(catIndicesResponse.code, equalTo(200));

var catBodyJson = mapper.readTree(catIndicesResponse.body);
var allIndices = new ArrayList<String>();
catBodyJson.forEach(index -> allIndices.add(index.get("index").asText()));

var interestingIndices = filterToInterestingIndices(allIndices);

/**
* Why not trust the doc.count from `_cat/indices?
* Turns out that count can include deleted/updated documents too depending on the search cluster implementation
* by querying count directly on each index it ensures the number of documents no matter if this bug exists or not
*
* See https://github.com/elastic/elasticsearch/issues/25868#issuecomment-317990140
*/
var mapOfIndexAndDocCount = interestingIndices.stream()
.collect(Collectors.toMap(i -> i, i -> {
try {
var response = client.get(i + "/_count");
var countFromResponse = mapper.readTree(response.body).get("count").asInt();
return countFromResponse;
} catch (Exception e) {
throw new RuntimeException(e);
}
}));

return mapOfIndexAndDocCount;
}

public List<String> filterToInterestingIndices(final List<String> indices) {
return indices.stream()
.filter(index -> !index.startsWith("."))
.filter(index -> !index.startsWith("reindexed-logs"))
.collect(Collectors.toList());
}
}

0 comments on commit 049cf90

Please sign in to comment.