diff --git a/DocumentsFromSnapshotMigration/src/test/java/com/rfs/FullTest.java b/DocumentsFromSnapshotMigration/src/test/java/com/rfs/FullTest.java index 7719bbbcc..ee7f43c8a 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/com/rfs/FullTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/com/rfs/FullTest.java @@ -19,6 +19,7 @@ import com.rfs.common.SnapshotShardUnpacker; import com.rfs.common.SourceRepo; import com.rfs.framework.SearchClusterContainer; +import com.rfs.http.SearchClusterRequests; import com.rfs.framework.PreloadedSearchClusterContainer; import com.rfs.transformers.TransformFunctions; import com.rfs.transformers.Transformer; @@ -183,28 +184,15 @@ public void testDocumentMigration(SearchClusterContainer.Version baseSourceImage private void checkClusterMigrationOnFinished(SearchClusterContainer esSourceContainer, OpensearchContainer osTargetContainer) { var targetClient = new RestClient(new ConnectionDetails(osTargetContainer.getHttpHostAddress(), null, null)); - var sourceMap = getIndexToCountMap(new RestClient(new ConnectionDetails(esSourceContainer.getUrl(), - null, null))); + var sourceClient = new RestClient(new ConnectionDetails(esSourceContainer.getUrl(), null, null)); + + var requests = new SearchClusterRequests(); + var sourceMap = requests.getMapOfIndexAndDocCount(sourceClient); var refreshResponse = targetClient.get("_refresh"); Assertions.assertEquals(200, refreshResponse.code); - var targetMap = getIndexToCountMap(targetClient); - MatcherAssert.assertThat(targetMap, Matchers.equalTo(sourceMap)); - } + var targetMap = requests.getMapOfIndexAndDocCount(targetClient); - private Map getIndexToCountMap(RestClient client) {; - var lines = Optional.ofNullable(client.get("_cat/indices")) - .flatMap(r->Optional.ofNullable(r.body)) - .map(b->b.split("\n")) - .orElse(new String[0]); - return Arrays.stream(lines) - .map(line -> { - var matcher = CAT_INDICES_INDEX_COUNT_PATTERN.matcher(line); - return !matcher.find() ? null : - new AbstractMap.SimpleEntry<>(matcher.group(1), matcher.group(2)); - }) - .filter(Objects::nonNull) - .filter(kvp->!kvp.getKey().startsWith(".")) - .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, kvp -> Integer.parseInt(kvp.getValue()))); + MatcherAssert.assertThat(targetMap, Matchers.equalTo(sourceMap)); } @SneakyThrows diff --git a/RFS/build.gradle b/RFS/build.gradle index 91b94f72c..1d3fde551 100644 --- a/RFS/build.gradle +++ b/RFS/build.gradle @@ -66,6 +66,8 @@ dependencies { testFixturesImplementation group: 'com.github.docker-java', name: 'docker-java-core' testFixturesImplementation group: 'com.github.docker-java', name: 'docker-java-transport-httpclient5' testFixturesImplementation group: 'org.testcontainers', name: 'testcontainers' + + testFixturesImplementation group: 'org.hamcrest', name: 'hamcrest' } application { diff --git a/RFS/src/testFixtures/java/com/rfs/http/SearchClusterRequests.java b/RFS/src/testFixtures/java/com/rfs/http/SearchClusterRequests.java new file mode 100644 index 000000000..a5e2f3a73 --- /dev/null +++ b/RFS/src/testFixtures/java/com/rfs/http/SearchClusterRequests.java @@ -0,0 +1,59 @@ +package com.rfs.http; + + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.rfs.common.RestClient; + +import lombok.SneakyThrows; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +public class SearchClusterRequests { + + private final ObjectMapper mapper = new ObjectMapper(); + + @SneakyThrows + public Map getMapOfIndexAndDocCount(final RestClient client) { + var catIndicesResponse = client.get("_cat/indices?format=json"); + assertThat(catIndicesResponse.code, equalTo(200)); + + var catBodyJson = mapper.readTree(catIndicesResponse.body); + var allIndices = new ArrayList(); + catBodyJson.forEach(index -> allIndices.add(index.get("index").asText())); + + var interestingIndices = filterToInterestingIndices(allIndices); + + /** + * Why not trust the doc.count from `_cat/indices? + * Turns out that count can include deleted/updated documents too depending on the search cluster implementation + * by querying count directly on each index it ensures the number of documents no matter if this bug exists or not + * + * See https://github.com/elastic/elasticsearch/issues/25868#issuecomment-317990140 + */ + var mapOfIndexAndDocCount = interestingIndices.stream() + .collect(Collectors.toMap(i -> i, i -> { + try { + var response = client.get(i + "/_count"); + var countFromResponse = mapper.readTree(response.body).get("count").asInt(); + return countFromResponse; + } catch (Exception e) { + throw new RuntimeException(e); + } + })); + + return mapOfIndexAndDocCount; + } + + public List filterToInterestingIndices(final List indices) { + return indices.stream() + .filter(index -> !index.startsWith(".")) + .filter(index -> !index.startsWith("reindexed-logs")) + .collect(Collectors.toList()); + } +}