diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java index b17c0ea47c..8ecfddbf8d 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java @@ -7,6 +7,8 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.JsonNodeType; import io.micrometer.core.instrument.Measurement; @@ -497,12 +499,19 @@ public void testInstantiateSinkCustomIndex_WithIsmPolicy( assertThat(settingsIsmNode.get("rollover_alias").textValue(), equalTo(indexAlias)); final String expectedIndexPolicyName = indexAlias + "-policy"; + final String expectedPolicyIndexPattern = indexAlias + "*"; if (isOSBundle()) { // Check managed index await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> { assertThat(getIndexPolicyId(index), equalTo(expectedIndexPolicyName)); } ); + // Check policy index patterns are matching to indexAlias* + // Refer https://github.com/opensearch-project/data-prepper/pull/5118 + await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(getPolicyIndexPatterns(getIndexPolicyId(index)), equalTo(expectedPolicyIndexPattern)); + } + ); } // roll over initial index @@ -1745,7 +1754,18 @@ private String getIndexPolicyId(final String index) throws IOException { responseBody).map().get(index)).get("index.opendistro.index_state_management.policy_id"); return policyId; } + + private String getPolicyIndexPatterns(final String policyId) throws IOException { + // TODO: replace with new _opensearch API + final Request request = new Request(HttpMethod.GET, "/_opendistro/_ism/policies/" + policyId); + final Response response = client.performRequest(request); + final String responseBody = EntityUtils.toString(response.getEntity()); + final ObjectMapper mapper = new ObjectMapper(); + final JsonNode jsonNode = mapper.readTree(responseBody); + final JsonNode indexPatterns = (ObjectNode) jsonNode.get("policy").get("ism_template").get("index_patterns"); + return indexPatterns.get(0).toString(); + } @SuppressWarnings("unchecked") private void wipeAllOpenSearchIndices() throws IOException {